lidavidm commented on code in PR #4322:
URL: https://github.com/apache/arrow-adbc/pull/4322#discussion_r3321686696
##########
go/adbc/driver/internal/driverbase/database.go:
##########
@@ -126,6 +126,78 @@ func NewDatabaseImplBase(ctx context.Context, driver
*DriverImplBase) (DatabaseI
return database, err
}
+// TracingOptions bundles the configuration knobs that a driver may want
+// to forward to driverbase's OpenTelemetry tracer initialization without
+// requiring each new knob to widen the InitTracing function signature.
+//
+// All fields are optional. When a field is the zero value the
+// corresponding default (typically the OTEL environment variable or a
+// platform-appropriate default path) is used. This struct exists so
+// drivers can honor database-level options loaded from a TOML profile
+// or otherwise supplied through the driver manager.
+type TracingOptions struct {
+ // ExporterName overrides the OTEL_TRACES_EXPORTER environment
+ // variable. Must match one of the values understood by
+ // tryParseTraceExporterType ("none", "otlp", "console", "adbcfile")
+ // when non-empty. Empty falls back to the environment variable.
+ ExporterName string
+
+ // TracingFolderPath overrides the default on-disk folder used by
+ // the "adbcfile" exporter (which is otherwise
+ // "<user-config-dir>/.adbc/traces"). Ignored for non-file
+ // exporters.
+ TracingFolderPath string
+}
+
+// NewDatabaseImplBaseWithExporter instantiates DatabaseImplBase and
+// initializes its OpenTelemetry tracer using the supplied exporter name
+// instead of (or before falling back to) the OTEL_TRACES_EXPORTER
+// environment variable. See NewDatabaseImplBaseWithOptions for a
+// constructor that also accepts a custom on-disk folder for the
+// "adbcfile" exporter.
+func NewDatabaseImplBaseWithExporter(
Review Comment:
Do we need this, or can everyone just use the fully general version below?
##########
go/adbc/driver/internal/driverbase/database.go:
##########
@@ -232,10 +304,54 @@ func (base *database) InitTracing(ctx context.Context,
driverName string, driver
return base.Base().InitTracing(ctx, driverName, driverVersion)
}
-func (base *DatabaseImplBase) InitTracing(ctx context.Context, driverName
string, driverVersion string) (err error) {
+func (base *DatabaseImplBase) InitTracing(ctx context.Context, driverName
string, driverVersion string) error {
Review Comment:
Ditto here; this isn't a public API per se, so I would be ok just having
InitTracingWithOptions or even amending the signature instead of adding another
method
##########
go/adbc/driver/flightsql/logging.go:
##########
@@ -19,28 +19,182 @@ package flightsql
import (
"context"
+ "crypto/rand"
+ "crypto/sha256"
+ "encoding/hex"
"io"
"log/slog"
+ "strconv"
+ "strings"
"time"
+ "github.com/apache/arrow-adbc/go/adbc"
+ "github.com/apache/arrow-go/v18/arrow/flight"
+ "go.opentelemetry.io/otel/trace"
"golang.org/x/exp/maps"
"golang.org/x/exp/slices"
"google.golang.org/grpc"
"google.golang.org/grpc/metadata"
+ "google.golang.org/grpc/status"
)
+// safeLogger returns a non-nil *slog.Logger. If the provided logger is nil
+// a discard logger is returned so that callers can always safely log without
+// nil-checks. This is important because the streaming code paths can be
+// reached by callers (such as tests) that do not have a Logger initialized.
+//
+// The returned logger is always wrapped with otelTraceHandler so that any
+// record emitted through it is automatically stamped with "trace_id" and
+// "span_id" attributes when its context carries an active OpenTelemetry
+// span. The wrap is idempotent, so wrapping an already-wrapped logger is
+// a no-op (see withOtelTraceContext).
+func safeLogger(logger *slog.Logger) *slog.Logger {
+ if logger == nil {
+ logger = slog.New(slog.NewTextHandler(io.Discard, nil))
+ }
+ return withOtelTraceContext(logger)
+}
+
+// maxLoggedTicketBytes limits how many bytes of a Flight ticket are emitted in
+// log records. Tickets can be opaque server-defined blobs of arbitrary size
+// (sometimes embedding large query plans), so we cap how much we include
+// to keep log records reasonably sized while still being useful for
+// correlation against server-side logs.
+const maxLoggedTicketBytes = 32
Review Comment:
Tickets may possibly include sensitive info and so I think they shouldn't be
logged at all.
##########
go/adbc/driver/flightsql/record_reader.go:
##########
@@ -46,9 +48,38 @@ type reader struct {
cancelFn context.CancelFunc
}
+// recordReaderConfig bundles the dependencies that newRecordReader
+// needs to spin up its per-endpoint goroutines. Grouping them into a
+// single value keeps the call sites compact and lets new fields be
+// added without rippling another positional argument through every
+// caller. The fields mirror the corresponding members on
+// connectionImpl/statement so callers can populate the struct by
+// straight field copies.
Review Comment:
Can we clean up the various comments? I don't think we need a homework-style
justification of every single choice made, e.g. here a config struct is
perfectly natural and doesn't need more lines of justification than code.
##########
go/adbc/driver/flightsql/logging.go:
##########
@@ -72,23 +229,478 @@ type loggedStream struct {
start time.Time
target string
outgoing metadata.MD
+
+ // recvCount tracks how many messages were successfully received from
the
+ // server before the stream ended. This is logged on every termination
+ // (success or failure) so an operator can tell whether a stream that
+ // failed with "EOF/Unavailable" never received any data or died mid-way
+ // through a large result set.
+ recvCount int64
}
func (stream *loggedStream) RecvMsg(m any) error {
err := stream.ClientStream.RecvMsg(m)
- if err != nil {
- loggedErr := err
- if loggedErr == io.EOF {
- loggedErr = nil
- }
+ if err == nil {
+ stream.recvCount++
+ return nil
+ }
- if stream.logger.Enabled(stream.ctx, slog.LevelDebug) {
- stream.logger.DebugContext(stream.ctx, stream.method,
"target", stream.target, "duration", time.Since(stream.start), "err",
loggedErr, "metadata", stream.outgoing)
- } else {
- keys := maps.Keys(stream.outgoing)
- slices.Sort(keys)
- stream.logger.InfoContext(stream.ctx, stream.method,
"target", stream.target, "duration", time.Since(stream.start), "err",
loggedErr, "metadata", keys)
+ loggedErr := err
+ if loggedErr == io.EOF {
+ loggedErr = nil
+ }
+
+ // Attempt to capture trailer metadata from the underlying stream.
Trailers
+ // are only valid once the stream has terminated, which is the case here
+ // because RecvMsg returned a non-nil error. Trailers frequently carry
+ // server-side diagnostic information (e.g., grpc-message, custom error
+ // detail headers) that is invaluable when triaging "[FlightSQL] error
+ // reading from server: EOF (Unavailable; ...)" reports.
+ trailer := stream.Trailer()
+
+ if stream.logger.Enabled(stream.ctx, slog.LevelDebug) {
+ stream.logger.DebugContext(stream.ctx, stream.method,
+ "target", stream.target,
+ "duration", time.Since(stream.start),
+ "err", loggedErr,
+ "recvMessages", stream.recvCount,
+ "metadata", stream.outgoing,
+ "trailer", trailer,
+ )
+ } else {
+ keys := maps.Keys(stream.outgoing)
+ slices.Sort(keys)
+ trailerKeys := maps.Keys(trailer)
+ slices.Sort(trailerKeys)
+ args := []any{
+ "target", stream.target,
+ "duration", time.Since(stream.start),
+ "err", loggedErr,
+ "recvMessages", stream.recvCount,
+ "metadata", keys,
+ "trailer", trailerKeys,
+ }
+ // Promote curated correlation/tracing header values from the
trailer
+ // to first-class log attributes so an operator can
cross-reference
+ // this failure with the corresponding server-side trace without
+ // enabling Debug-level logging.
+ args = append(args, correlationHeaderAttrs(trailer)...)
+ // Also promote the outbound correlation IDs that the caller
+ // supplied (PBI ActivityId, x-ms-client-request-id, ...) so a
+ // single log line carries both sides of the join.
+ args = append(args, outgoingCallHeaderAttrs(stream.ctx)...)
+ // Emit gRPC code/message as separate structured fields. EOF
+ // is treated as a clean close by Flight so loggedErr was
+ // nil-ed out above; only attach status attrs for real errors.
+ if loggedErr != nil {
+ args = append(args, grpcStatusAttrs(loggedErr)...)
}
+ stream.logger.InfoContext(stream.ctx, stream.method, args...)
}
return err
}
+
+// wellKnownCorrelationHeaders is the curated allow-list of inbound gRPC
+// header/trailer keys that are surfaced verbatim into log records. These
+// headers are commonly emitted by FlightSQL servers, gateways, and tracing
+// frameworks to allow operators to cross-reference a client-side log entry
+// against the corresponding server-side trace or query history record.
+//
+// The allow-list also intentionally covers the Microsoft / Power BI / Power
+// Query family of correlation identifiers ("ActivityId",
+// "x-ms-client-request-id", etc.). The driver is frequently invoked from
+// Power BI Desktop / Mashup, which records every step under a per-step
+// ActivityId GUID; capturing that value on the ADBC side is the single
+// most useful join column when triaging an issue against a Power BI
+// diagnostic trace bundle.
+var wellKnownCorrelationHeaders = []string{
+ "x-request-id",
+ "x-correlation-id",
+ "x-trace-id",
+ "x-amzn-trace-id",
+ "x-b3-traceid",
+ "x-b3-spanid",
+ "traceparent",
+ "tracestate",
+ "x-arrow-flight-session-id",
+ "x-dremio-request-id",
+ "x-dremio-query-id",
+ "x-server-version",
+ "server",
+ // Microsoft / Power BI / Power Query family. The exact casing varies
+ // by host (PBI Desktop emits "ActivityId" as a property in its trace
+ // file but transmits it as a gRPC header that gRPC's metadata
+ // package normalizes to lower case), so the allow-list uses the
+ // canonical lower-case form throughout. Both the unprefixed and
+ // "x-ms-" prefixed variants are listed because the prefix-less form
+ // is what Mashup's own diagnostics record.
+ "activityid",
+ "activity-id",
+ "x-ms-activity-id",
+ "x-ms-client-request-id",
+ "x-ms-request-id",
+ "requestid",
+ "x-pbi-activity-id",
+}
+
+// sensitiveHeaderTokens lists case-insensitive substrings whose presence
+// in a header name marks the header as carrying credential material that
+// must never be surfaced in driver logs. The list is consulted by
+// isSensitiveHeader, which in turn gates correlationHeaderAttrs's
+// allow-list and suffix-match paths. The substrings are deliberately
+// coarse — "token" rather than a specific header name — because the
+// cost of a false positive (a useful correlation header is skipped) is
+// much lower than the cost of a false negative (a bearer token is
+// written to disk). Header names known to be tracking-only and that
+// happen to contain a denylist substring should be added directly to
+// wellKnownCorrelationHeaders only after confirming they do not carry
+// credentials on the target server.
+var sensitiveHeaderTokens = []string{
+ "authorization",
+ "cookie",
+ "password",
+ "secret",
+ "private",
+ "credential",
+ "token",
+ "apikey",
+ "api-key",
+}
+
+// isSensitiveHeader reports whether name (compared case-insensitively)
+// matches any of the substrings in sensitiveHeaderTokens. It is the only
+// place the driver guards against accidentally logging credential
+// material lifted out of gRPC metadata; correlationHeaderAttrs and
+// outgoingCallHeaderAttrs both consult it before promoting a header.
+func isSensitiveHeader(name string) bool {
+ lower := strings.ToLower(name)
+ for _, tok := range sensitiveHeaderTokens {
+ if strings.Contains(lower, tok) {
+ return true
+ }
+ }
+ return false
+}
+
+// headerAttrsWithPrefix is the shared implementation behind
+// correlationHeaderAttrs (incoming headers/trailers) and
+// outgoingCallHeaderAttrs (call-time outbound metadata). Both surfaces
+// use the same allow-list and the same sensitive-header denylist; only
+// the slog attribute prefix differs so an operator can tell at a glance
+// whether the value was something the driver sent or something the
+// server returned. Returns nil when no allow-listed header is present
+// so callers can use append(...).
+func headerAttrsWithPrefix(md metadata.MD, prefix string) []any {
+ if len(md) == 0 {
+ return nil
+ }
+ out := make([]any, 0, 4)
+ seen := make(map[string]bool, len(wellKnownCorrelationHeaders))
+ for _, k := range wellKnownCorrelationHeaders {
+ seen[k] = true
+ if isSensitiveHeader(k) {
+ continue
+ }
+ if vals := md.Get(k); len(vals) > 0 {
+ out = append(out, slog.Any(prefix+k, vals))
+ }
+ }
+ for k, vals := range md {
+ lk := strings.ToLower(k)
+ if seen[lk] {
+ continue
+ }
+ if isSensitiveHeader(lk) {
+ continue
+ }
+ if strings.HasSuffix(lk, "-request-id") ||
+ strings.HasSuffix(lk, "-trace-id") ||
+ strings.HasSuffix(lk, "-query-id") ||
+ strings.HasSuffix(lk, "-session-id") ||
+ strings.HasSuffix(lk, "-activity-id") {
+ out = append(out, slog.Any(prefix+lk, vals))
+ }
+ }
Review Comment:
I feel like we simply shouldn't log headers that aren't allowlisted
##########
go/adbc/driver/flightsql/logging.go:
##########
@@ -72,23 +229,478 @@ type loggedStream struct {
start time.Time
target string
outgoing metadata.MD
+
+ // recvCount tracks how many messages were successfully received from
the
+ // server before the stream ended. This is logged on every termination
+ // (success or failure) so an operator can tell whether a stream that
+ // failed with "EOF/Unavailable" never received any data or died mid-way
+ // through a large result set.
+ recvCount int64
}
func (stream *loggedStream) RecvMsg(m any) error {
err := stream.ClientStream.RecvMsg(m)
- if err != nil {
- loggedErr := err
- if loggedErr == io.EOF {
- loggedErr = nil
- }
+ if err == nil {
+ stream.recvCount++
+ return nil
+ }
- if stream.logger.Enabled(stream.ctx, slog.LevelDebug) {
- stream.logger.DebugContext(stream.ctx, stream.method,
"target", stream.target, "duration", time.Since(stream.start), "err",
loggedErr, "metadata", stream.outgoing)
- } else {
- keys := maps.Keys(stream.outgoing)
- slices.Sort(keys)
- stream.logger.InfoContext(stream.ctx, stream.method,
"target", stream.target, "duration", time.Since(stream.start), "err",
loggedErr, "metadata", keys)
+ loggedErr := err
+ if loggedErr == io.EOF {
+ loggedErr = nil
+ }
+
+ // Attempt to capture trailer metadata from the underlying stream.
Trailers
+ // are only valid once the stream has terminated, which is the case here
+ // because RecvMsg returned a non-nil error. Trailers frequently carry
+ // server-side diagnostic information (e.g., grpc-message, custom error
+ // detail headers) that is invaluable when triaging "[FlightSQL] error
+ // reading from server: EOF (Unavailable; ...)" reports.
+ trailer := stream.Trailer()
+
+ if stream.logger.Enabled(stream.ctx, slog.LevelDebug) {
+ stream.logger.DebugContext(stream.ctx, stream.method,
+ "target", stream.target,
+ "duration", time.Since(stream.start),
+ "err", loggedErr,
+ "recvMessages", stream.recvCount,
+ "metadata", stream.outgoing,
+ "trailer", trailer,
+ )
+ } else {
+ keys := maps.Keys(stream.outgoing)
+ slices.Sort(keys)
+ trailerKeys := maps.Keys(trailer)
+ slices.Sort(trailerKeys)
+ args := []any{
+ "target", stream.target,
+ "duration", time.Since(stream.start),
+ "err", loggedErr,
+ "recvMessages", stream.recvCount,
+ "metadata", keys,
+ "trailer", trailerKeys,
+ }
+ // Promote curated correlation/tracing header values from the
trailer
+ // to first-class log attributes so an operator can
cross-reference
+ // this failure with the corresponding server-side trace without
+ // enabling Debug-level logging.
+ args = append(args, correlationHeaderAttrs(trailer)...)
+ // Also promote the outbound correlation IDs that the caller
+ // supplied (PBI ActivityId, x-ms-client-request-id, ...) so a
+ // single log line carries both sides of the join.
+ args = append(args, outgoingCallHeaderAttrs(stream.ctx)...)
+ // Emit gRPC code/message as separate structured fields. EOF
+ // is treated as a clean close by Flight so loggedErr was
+ // nil-ed out above; only attach status attrs for real errors.
+ if loggedErr != nil {
+ args = append(args, grpcStatusAttrs(loggedErr)...)
}
+ stream.logger.InfoContext(stream.ctx, stream.method, args...)
}
return err
}
+
+// wellKnownCorrelationHeaders is the curated allow-list of inbound gRPC
+// header/trailer keys that are surfaced verbatim into log records. These
+// headers are commonly emitted by FlightSQL servers, gateways, and tracing
+// frameworks to allow operators to cross-reference a client-side log entry
+// against the corresponding server-side trace or query history record.
+//
+// The allow-list also intentionally covers the Microsoft / Power BI / Power
+// Query family of correlation identifiers ("ActivityId",
+// "x-ms-client-request-id", etc.). The driver is frequently invoked from
+// Power BI Desktop / Mashup, which records every step under a per-step
+// ActivityId GUID; capturing that value on the ADBC side is the single
+// most useful join column when triaging an issue against a Power BI
+// diagnostic trace bundle.
+var wellKnownCorrelationHeaders = []string{
+ "x-request-id",
+ "x-correlation-id",
+ "x-trace-id",
+ "x-amzn-trace-id",
+ "x-b3-traceid",
+ "x-b3-spanid",
+ "traceparent",
+ "tracestate",
+ "x-arrow-flight-session-id",
+ "x-dremio-request-id",
+ "x-dremio-query-id",
+ "x-server-version",
+ "server",
+ // Microsoft / Power BI / Power Query family. The exact casing varies
+ // by host (PBI Desktop emits "ActivityId" as a property in its trace
+ // file but transmits it as a gRPC header that gRPC's metadata
+ // package normalizes to lower case), so the allow-list uses the
+ // canonical lower-case form throughout. Both the unprefixed and
+ // "x-ms-" prefixed variants are listed because the prefix-less form
+ // is what Mashup's own diagnostics record.
+ "activityid",
+ "activity-id",
+ "x-ms-activity-id",
+ "x-ms-client-request-id",
+ "x-ms-request-id",
+ "requestid",
+ "x-pbi-activity-id",
+}
+
+// sensitiveHeaderTokens lists case-insensitive substrings whose presence
+// in a header name marks the header as carrying credential material that
+// must never be surfaced in driver logs. The list is consulted by
+// isSensitiveHeader, which in turn gates correlationHeaderAttrs's
+// allow-list and suffix-match paths. The substrings are deliberately
+// coarse — "token" rather than a specific header name — because the
+// cost of a false positive (a useful correlation header is skipped) is
+// much lower than the cost of a false negative (a bearer token is
+// written to disk). Header names known to be tracking-only and that
+// happen to contain a denylist substring should be added directly to
+// wellKnownCorrelationHeaders only after confirming they do not carry
+// credentials on the target server.
+var sensitiveHeaderTokens = []string{
+ "authorization",
+ "cookie",
+ "password",
+ "secret",
+ "private",
+ "credential",
+ "token",
+ "apikey",
+ "api-key",
+}
+
+// isSensitiveHeader reports whether name (compared case-insensitively)
+// matches any of the substrings in sensitiveHeaderTokens. It is the only
+// place the driver guards against accidentally logging credential
+// material lifted out of gRPC metadata; correlationHeaderAttrs and
+// outgoingCallHeaderAttrs both consult it before promoting a header.
+func isSensitiveHeader(name string) bool {
+ lower := strings.ToLower(name)
+ for _, tok := range sensitiveHeaderTokens {
+ if strings.Contains(lower, tok) {
+ return true
+ }
+ }
+ return false
+}
+
+// headerAttrsWithPrefix is the shared implementation behind
+// correlationHeaderAttrs (incoming headers/trailers) and
+// outgoingCallHeaderAttrs (call-time outbound metadata). Both surfaces
+// use the same allow-list and the same sensitive-header denylist; only
+// the slog attribute prefix differs so an operator can tell at a glance
+// whether the value was something the driver sent or something the
+// server returned. Returns nil when no allow-listed header is present
+// so callers can use append(...).
+func headerAttrsWithPrefix(md metadata.MD, prefix string) []any {
+ if len(md) == 0 {
+ return nil
+ }
+ out := make([]any, 0, 4)
+ seen := make(map[string]bool, len(wellKnownCorrelationHeaders))
+ for _, k := range wellKnownCorrelationHeaders {
+ seen[k] = true
+ if isSensitiveHeader(k) {
+ continue
+ }
Review Comment:
...is this check necessary? wellKnownCorrelationHeaders is an allowlist, no?
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]