This is an automated email from the ASF dual-hosted git repository.
hanahmily pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/skywalking-banyandb.git
The following commit(s) were added to refs/heads/main by this push:
new 82a9a0218 refactor(lifecycle): carry remote_*, group labels on
cycle-level metrics; drop self_identity_resolution_total (#1172)
82a9a0218 is described below
commit 82a9a021882a1978c04d26abad196eddc30a3097
Author: Gao Hongtao <[email protected]>
AuthorDate: Sat Jun 13 10:13:41 2026 +0800
refactor(lifecycle): carry remote_*, group labels on cycle-level metrics;
drop self_identity_resolution_total (#1172)
---
CHANGES.md | 4 +-
banyand/backup/lifecycle/metrics_test.go | 238 ++++++++-
.../lifecycle/segment_boundary_utils_test.go | 2 +-
banyand/backup/lifecycle/service.go | 222 +++++---
banyand/backup/lifecycle/steps.go | 271 ++++++----
banyand/backup/lifecycle/steps_test.go | 99 +++-
docs/operation/grafana-fodc-nodes.json | 574 ++++++++++++++++++++-
docs/operation/grafana-fodc-workload.json | 4 +-
test/cases/lifecycle/lifecycle.go | 50 +-
test/e2e-v2/cases/fodc/metrics/documented_gap.txt | 4 +
10 files changed, 1227 insertions(+), 241 deletions(-)
diff --git a/CHANGES.md b/CHANGES.md
index d0917c5a1..09fa45c53 100644
--- a/CHANGES.md
+++ b/CHANGES.md
@@ -7,8 +7,10 @@ Release Notes.
### Features
- Add two catalogs to the queue batch-write metrics so traffic is comparable
on both ends: a per-batch-stream **batch catalog**
(`total_batch_started`/`total_batch_finished`/`total_batch_latency`, buckets to
~300s) on `queue_pub`/`queue_sub` and the `lifecycle_migration` mirror, and a
per-message **message catalog**
(`total_message_started`/`total_message_finished`) on `queue_sub` (the
publisher's existing `total_*` already counts per message). All existing
`total_*` series are unchanged [...]
- Redesign the queue (`queue_pub`/`queue_sub`) metrics around a uniform model:
keep only `total_started`, `total_finished`, `total_latency` (now a histogram)
and `total_err`, plus file-sync-only `sent_bytes` (pub) / `received_bytes`
(sub). Replace the `topic` label with `operation`
(`batch-write`/`file-sync`/`query`/`control`) and `group`, add an `error_type`
label on `total_err`, and add remote-endpoint labels
(`remote_node`/`remote_role`/`remote_tier`) so the liaison↔data (hot/warm/col
[...]
-- Stamp the lifecycle's tier-migration publisher's identity onto the wire so
the receiving data node records a non-empty
`remote_node`/`remote_role`/`remote_tier` on its
`banyandb_queue_sub_total_finished` series. The lifecycle's `parseGroup`
resolves the lifecycle's self identity by matching its `--grpc-addr` (the
co-located data node's gRPC address) against the data-node registry —
`Metadata.Name` becomes `remote_node`, `Labels["type"]` becomes `remote_tier` —
and calls `SetSelfNode(se [...]
+- Stamp the lifecycle's tier-migration publisher's identity onto the wire so
the receiving data node records a non-empty
`remote_node`/`remote_role`/`remote_tier` on its
`banyandb_queue_sub_total_finished` series. The lifecycle's `parseGroup`
resolves the lifecycle's self identity by matching the lifecycle pod's hostname
(POD_NAME via the K8s downward API, falling back to `os.Hostname()` — same
precedence as `nativeNodeContext` at `banyand/backup/lifecycle/service.go`)
against the data-n [...]
- Add `banyandb_lifecycle_last_run_timestamp_seconds` and
`banyandb_lifecycle_last_run_success` gauges to the lifecycle service for
at-a-glance health monitoring. `last_run_timestamp_seconds` records the
wall-clock epoch (in seconds) of the most recent migration cycle;
`last_run_success` is `1` on a nil error and `0` otherwise. Both are stamped by
a `defer` at the end of `action()` so every return path (success, error,
recovered panic) updates the pair atomically — dashboards can pin an [...]
+- Refactor the lifecycle cycle-level metrics
(`banyandb_lifecycle_cycles_total`,
`banyandb_lifecycle_last_run_timestamp_seconds`,
`banyandb_lifecycle_last_run_success`) to carry labels `remote_node`,
`remote_role`, `remote_tier`, `group`. The label form mirrors the per-message
`banyandb_lifecycle_migration_*` family emitted by the queue/pub lifecycle
publisher, but the two families describe DIFFERENT things: the cycle-level
series describe the SENDER (the lifecycle pod's co-located data [...]
+- Remove `banyandb_lifecycle_self_identity_resolution_total`. The
regression-detection role moves to the now-labeled
`banyandb_lifecycle_cycles_total{remote_node!=""}` (an empty `remote_node`
series means the registry match failed for every group, the bug the old counter
caught), plus the existing receiver-side count of empty `remote_node` on
lifecycle `banyandb_queue_sub_total_finished` series. The wire-level
`cluster.v1.SendRequest` sender-identity fields are unchanged.
- Vectorized measure query path is now enabled by default. The columnar
pipeline replaces per-row protobuf serialization in `NewMIterator`, cutting
allocations and ns/op for scan-heavy measure queries; gRPC wire format
(`*measurev1.InternalDataPoint`) is byte-identical. Single-node coverage is
complete: scan, GroupBy+Agg via `BatchAggregation`, scalar reduce (`Agg`
without `GroupBy`), raw `GroupBy` (without `Agg`), implicit projection coverage
for GroupBy/Agg fields, `TopN`/`BottomN`, `o [...]
- Add validation to ensure Measure's ShardingKey contains all Entity tags to
guarantee entity locality.
- Organize access logs under a dedicated "accesslog" subdirectory to improve
log organization and separation from other application data.
diff --git a/banyand/backup/lifecycle/metrics_test.go
b/banyand/backup/lifecycle/metrics_test.go
index 1e0247ccc..05c29440d 100644
--- a/banyand/backup/lifecycle/metrics_test.go
+++ b/banyand/backup/lifecycle/metrics_test.go
@@ -108,33 +108,130 @@ func TestBuildHTTPRouterWithoutPromHandler(t *testing.T)
{
require.Equal(t, http.StatusNotFound, rec.Code)
}
-// recordingGauge captures the last Set() call so a unit test can assert
-// the value the lifecycle would have emitted. The lifecycle uses the
-// real prometheus-backed Gauge in production; this stub keeps the test
-// hermetic.
+// recordingGauge captures the Set() and Delete() calls so a unit test
+// can assert the value the lifecycle would have emitted AND the
+// staleness-prevention Delete that recordLastRun issues before each
+// Set. The lifecycle uses the real prometheus-backed Gauge in
+// production; this stub keeps the test hermetic. It also records the
+// labels argument so tests can assert the (remote_node, remote_role,
+// remote_tier, group) tuple stamped by the per-group recordCycleGroup
+// and the cycle-end recordLastRun paths.
type recordingGauge struct {
- lastValue float64
- called int
+ lastLabels []string
+ deletedLabel [][]string
+ lastValue float64
+ called int
+ deleted int
}
-func (g *recordingGauge) Set(v float64, _ ...string) {
+func (g *recordingGauge) Set(v float64, labels ...string) {
g.lastValue = v
+ g.lastLabels = labels
g.called++
}
func (g *recordingGauge) Add(_ float64, _ ...string) {}
-func (g *recordingGauge) Delete(_ ...string) bool { return true }
+func (g *recordingGauge) Delete(labels ...string) bool {
+ g.deleted++
+ g.deletedLabel = append(g.deletedLabel, append([]string{}, labels...))
+ return true
+}
+
+// recordingCounter captures the last Inc() call's label set.
+type recordingCounter struct {
+ lastLabels []string
+ called int
+}
+
+func (c *recordingCounter) Inc(_ float64, labels ...string) {
+ c.lastLabels = labels
+ c.called++
+}
+
+func (c *recordingCounter) Add(_ float64, _ ...string) {}
+
+func (c *recordingCounter) Delete(_ ...string) bool { return true }
+
+// TestRecordCycleGroupStampsLabeledMetrics asserts the per-group helper
+// issues an Inc on cyclesTotal with the
+// (remote_node, remote_role, remote_tier, group) tuple and captures
+// the cycle's last-seen (group, remote_*) tuple on the service for
+// the deferred recordLastRun. lastRunTimestamp and lastRunSuccess are
+// intentionally NOT touched by recordCycleGroup — they are stamped
+// atomically at cycle end so dashboards see consistent (timestamp,
+// success) pairs for the same tuple and the success flag reflects the
+// whole-cycle outcome.
+func TestRecordCycleGroupStampsLabeledMetrics(t *testing.T) {
+ cyc, ts, ok := &recordingCounter{}, &recordingGauge{}, &recordingGauge{}
+ l := &lifecycleService{
+ cyclesTotal: cyc,
+ lastRunTimestamp: ts,
+ lastRunSuccess: ok,
+ }
+ l.recordCycleGroup("metrics-day", "data-hot-0:17912", "lifecycle",
"hot")
+
+ require.Equal(t, 1, cyc.called)
+ require.Equal(t,
+ []string{"data-hot-0:17912", "lifecycle", "hot", "metrics-day"},
+ cyc.lastLabels,
+ "cyclesTotal must be Inc'd with (remote_node, remote_role,
remote_tier, group)")
+ require.Equal(t, 0, ts.called,
+ "lastRunTimestamp must NOT be Set by recordCycleGroup (deferred
recordLastRun's job)")
+ require.Equal(t, 0, ok.called,
+ "lastRunSuccess must NOT be Set by recordCycleGroup (deferred
recordLastRun's job)")
+ require.Equal(t, "data-hot-0:17912", l.lastRunNode)
+ require.Equal(t, "lifecycle", l.lastRunRole)
+ require.Equal(t, "hot", l.lastRunTier)
+ require.Equal(t, "metrics-day", l.lastRunGroup,
+ "lastRunGroup/Node/Role/Tier are the inputs to the deferred
recordLastRun")
+}
+
+// TestRecordLastRunResetsTupleAtActionStart asserts action() resets the
+// cycle's last-seen (group, remote_*) tuple to empty strings so an
+// empty cycle (no parseGroup succeeded) doesn't inherit the previous
+// cycle's labels. Scheduler-driven consecutive cycles would otherwise
+// see a stale group label on last_run_*.
+func TestRecordLastRunResetsTupleAtActionStart(t *testing.T) {
+ ts, ok := &recordingGauge{}, &recordingGauge{}
+ l := &lifecycleService{
+ lastRunTimestamp: ts,
+ lastRunSuccess: ok,
+ // Stale labels from a previous cycle; action() must clear them.
+ lastRunGroup: "stale-group",
+ lastRunNode: "stale-node:17912",
+ lastRunRole: "lifecycle",
+ lastRunTier: "stale",
+ }
+ // Simulate the action() prelude: reset, then call recordLastRun
+ // without any recordCycleGroup in between (empty cycle).
+ l.lastRunGroup = ""
+ l.lastRunNode = ""
+ l.lastRunRole = ""
+ l.lastRunTier = ""
+ l.recordLastRun(time.Unix(1717929900, 0), nil)
+
+ require.Equal(t, 1, ts.called)
+ require.Equal(t, []string{"", "", "", ""}, ts.lastLabels,
+ "empty-cycle path must stamp gauges with empty (group,
remote_*) labels")
+ require.Equal(t, []string{"", "", "", ""}, ok.lastLabels)
+}
// TestRecordLastRunSuccess stamps the gauges with the start time (epoch
// seconds) and success=1 when the action returned nil. Asserts both the
// integer epoch shape and the 0/1 success signal so dashboards can
-// distinguish a healthy last run from a failed one.
+// distinguish a healthy last run from a failed one. The label set is
+// sourced from the cycle's last-seen tuple (set by recordCycleGroup),
+// so we wire one in before the call.
func TestRecordLastRunSuccess(t *testing.T) {
tsGauge, okGauge := &recordingGauge{}, &recordingGauge{}
l := &lifecycleService{
lastRunTimestamp: tsGauge,
lastRunSuccess: okGauge,
+ lastRunGroup: "metrics-day",
+ lastRunNode: "data-hot-0:17912",
+ lastRunRole: "lifecycle",
+ lastRunTier: "hot",
}
start := time.Unix(1717929600, 0) // 2024-06-09T00:00:00Z, deterministic
l.recordLastRun(start, nil)
@@ -142,9 +239,128 @@ func TestRecordLastRunSuccess(t *testing.T) {
require.Equal(t, 1, tsGauge.called)
require.Equal(t, 1717929600.0, tsGauge.lastValue,
"lastRunTimestamp must record the start time as epoch seconds")
+ require.Equal(t,
+ []string{"data-hot-0:17912", "lifecycle", "hot", "metrics-day"},
+ tsGauge.lastLabels,
+ "lastRunTimestamp must be Set with the cycle's (remote_node,
remote_role, remote_tier, group) tuple")
+ require.Equal(t, 0, tsGauge.deleted,
+ "first-ever recordLastRun must NOT issue a Delete (no previous
tuple to clean up)")
require.Equal(t, 1, okGauge.called)
require.Equal(t, 1.0, okGauge.lastValue,
"lastRunSuccess must be 1 on a nil error")
+ require.Equal(t,
+ []string{"data-hot-0:17912", "lifecycle", "hot", "metrics-day"},
+ okGauge.lastLabels,
+ "lastRunSuccess must be Set with the cycle's (remote_node,
remote_role, remote_tier, group) tuple")
+ require.Equal(t, 0, okGauge.deleted)
+}
+
+// TestRecordLastRunTwoCycleReplaceStaleSeries is the regression test
+// for the staleness issue: two consecutive recordLastRun calls with
+// DIFFERENT (group, remote_*) tuples. The first call stamps
+// ("metrics-day", "data-hot-0", "lifecycle", "hot") and updates
+// emittedLastRun*. The second call must Delete that tuple before
+// stamping the new ("metrics-hour", "data-warm-0", "lifecycle",
+// "warm") tuple, so Prometheus doesn't accumulate a stale series
+// shadowing the new one. Without the emittedLastRun* tracking and
+// the Delete-before-Set, cycle B's series would coexist with cycle
+// A's and dashboards could read either as "current".
+func TestRecordLastRunTwoCycleReplaceStaleSeries(t *testing.T) {
+ tsGauge, okGauge := &recordingGauge{}, &recordingGauge{}
+ l := &lifecycleService{
+ lastRunTimestamp: tsGauge,
+ lastRunSuccess: okGauge,
+ lastRunGroup: "metrics-day",
+ lastRunNode: "data-hot-0:17912",
+ lastRunRole: "lifecycle",
+ lastRunTier: "hot",
+ }
+
+ // Cycle A: stamp the hot path.
+ l.recordLastRun(time.Unix(1717929600, 0), nil)
+ require.Equal(t, 1, tsGauge.called)
+ require.Equal(t, 1, okGauge.called)
+ require.Equal(t, 0, tsGauge.deleted)
+ require.Equal(t, 0, okGauge.deleted)
+ require.Equal(t,
+ []string{"data-hot-0:17912", "lifecycle", "hot", "metrics-day"},
+ tsGauge.lastLabels,
+ "cycle A must stamp the hot-path tuple")
+
+ // Cycle B: action() resets lastRun* and the new cycle's
+ // recordCycleGroup overwrites them with the warm-path tuple.
+ l.lastRunGroup = ""
+ l.lastRunNode = ""
+ l.lastRunRole = ""
+ l.lastRunTier = ""
+ l.lastRunGroup = "metrics-hour"
+ l.lastRunNode = "data-warm-0:17912"
+ l.lastRunRole = "lifecycle"
+ l.lastRunTier = "warm"
+ l.recordLastRun(time.Unix(1717929700, 0), nil)
+
+ // Cycle B must Delete cycle A's tuple before stamping the new one.
+ require.Equal(t, 1, tsGauge.deleted,
+ "second recordLastRun must Delete the previous tuple to prevent
stale-series shadowing")
+ require.Equal(t,
+ []string{"data-hot-0:17912", "lifecycle", "hot", "metrics-day"},
+ tsGauge.deletedLabel[0],
+ "Delete must target the cycle A tuple (the previously-emitted
one)")
+ require.Equal(t, 2, tsGauge.called)
+ require.Equal(t, 1717929700.0, tsGauge.lastValue)
+ require.Equal(t,
+ []string{"data-warm-0:17912", "lifecycle", "warm",
"metrics-hour"},
+ tsGauge.lastLabels,
+ "cycle B must stamp the warm-path tuple after deleting the
hot-path one")
+ require.Equal(t, 1, okGauge.deleted)
+}
+
+// TestRecordLastRunEmptyThenNonEmptyCycle is the regression test for
+// the empty-tuple state collision: an empty cycle (no recordCycleGroup
+// ran; the action-start reset left lastRun* all empty) stamps the
+// gauges with all-empty labels, and a subsequent non-empty cycle must
+// still Delete the all-empty-labels series before stamping its own
+// tuple. Without the emittedLastRunSet bool flag, the hasPrev check
+// "any emittedLastRun* field is non-empty" would return false for an
+// all-empty predecessor and skip the Delete, leaving the empty series
+// to shadow the new stamp.
+func TestRecordLastRunEmptyThenNonEmptyCycle(t *testing.T) {
+ tsGauge, okGauge := &recordingGauge{}, &recordingGauge{}
+ l := &lifecycleService{
+ lastRunTimestamp: tsGauge,
+ lastRunSuccess: okGauge,
+ }
+
+ // Cycle A: empty cycle (no recordCycleGroup; all lastRun* are
+ // empty strings from action()'s prelude reset). recordLastRun
+ // stamps the all-empty tuple and sets emittedLastRunSet = true.
+ l.recordLastRun(time.Unix(1717929600, 0), nil)
+ require.Equal(t, 1, tsGauge.called)
+ require.Equal(t, 0, tsGauge.deleted,
+ "first-ever recordLastRun must NOT Delete (no previous tuple)")
+ require.Equal(t, []string{"", "", "", ""}, tsGauge.lastLabels,
+ "empty cycle must stamp all-empty labels")
+ require.True(t, l.emittedLastRunSet,
+ "emittedLastRunSet must be true after a successful Set, even
with all-empty labels")
+ require.Equal(t, "", l.emittedLastRunGroup)
+
+ // Cycle B: non-empty cycle. recordLastRun must Delete the
+ // all-empty tuple from cycle A before stamping the new tuple.
+ l.lastRunGroup = "metrics-day"
+ l.lastRunNode = "data-hot-0:17912"
+ l.lastRunRole = "lifecycle"
+ l.lastRunTier = "hot"
+ l.recordLastRun(time.Unix(1717929700, 0), nil)
+
+ require.Equal(t, 1, tsGauge.deleted,
+ "second recordLastRun must Delete the previous all-empty tuple
(the empty-cycle predecessor)")
+ require.Equal(t, []string{"", "", "", ""}, tsGauge.deletedLabel[0],
+ "Delete must target the all-empty tuple from cycle A")
+ require.Equal(t, 2, tsGauge.called)
+ require.Equal(t,
+ []string{"data-hot-0:17912", "lifecycle", "hot", "metrics-day"},
+ tsGauge.lastLabels,
+ "cycle B must stamp the new tuple after deleting the empty one")
}
// TestRecordLastRunFailure stamps the gauges with success=0 when the action
@@ -155,6 +371,10 @@ func TestRecordLastRunFailure(t *testing.T) {
l := &lifecycleService{
lastRunTimestamp: tsGauge,
lastRunSuccess: okGauge,
+ lastRunGroup: "metrics-day",
+ lastRunNode: "data-warm-1:17912",
+ lastRunRole: "lifecycle",
+ lastRunTier: "warm",
}
start := time.Unix(1717929700, 0)
l.recordLastRun(start, errors.New("snapshot dir unavailable"))
diff --git a/banyand/backup/lifecycle/segment_boundary_utils_test.go
b/banyand/backup/lifecycle/segment_boundary_utils_test.go
index 9384918c4..f4a715c89 100644
--- a/banyand/backup/lifecycle/segment_boundary_utils_test.go
+++ b/banyand/backup/lifecycle/segment_boundary_utils_test.go
@@ -181,8 +181,8 @@ func stage(name, selector string, segNum, ttlNum uint32)
*commonv1.LifecycleStag
func TestGetTargetStageInterval(t *testing.T) {
//nolint:govet // fieldalignment: test struct optimization not critical
type tc struct {
- name string
group *GroupConfig
+ name string
expected storage.IntervalRule
}
diff --git a/banyand/backup/lifecycle/service.go
b/banyand/backup/lifecycle/service.go
index 9f0fafbd1..19338f1b1 100644
--- a/banyand/backup/lifecycle/service.go
+++ b/banyand/backup/lifecycle/service.go
@@ -79,55 +79,63 @@ const (
// metricsNodeKeeperInterval is how often the keeper re-checks that the
local
// data node connection is active and re-registers it if not.
metricsNodeKeeperInterval = 10 * time.Second
+ // lifecycleRoleName is the hard-coded role label the lifecycle stamps
+ // on its own _monitoring series and on the wire-level SenderRole
+ // for tier-migration traffic. Mirrors the liaison's "liaison"
+ // pattern at pkg/cmdsetup/liaison.go:170-171.
+ lifecycleRoleName = "lifecycle"
)
type lifecycleService struct {
databasev1.UnimplementedClusterStateServiceServer
databasev1.UnimplementedNodeQueryServiceServer
- metadata metadata.Repo
- omr observability.MetricsRegistry
- pm protector.Memory
- cyclesTotal meter.Counter
- // lastRunTimestamp records the wall-clock epoch (in seconds) of the
- // most recent attempt to run a migration cycle, regardless of outcome.
- // Updated at the end of action() in both the success and error paths.
- lastRunTimestamp meter.Gauge
- // lastRunSuccess records the outcome of the most recent migration
- // cycle: 1 on success, 0 on error. Combined with lastRunTimestamp it
- // gives dashboards an at-a-glance "is the lifecycle healthy" signal.
- lastRunSuccess meter.Gauge
- metricsClient queue.Client
- grpcServer *grpclib.Server
- httpSrv *http.Server
- tlsReloader *pkgtls.Reloader
- currentNode *databasev1.Node
- clientCloser context.CancelFunc
- stopCh chan struct{}
- sch *timestamp.Scheduler
- l *logger.Logger
- clusterStateMgr *clusterStateManager
- metricsKeeperStop chan struct{}
- lifecycleHost string
- lifecycleHTTPAddr string
- streamRoot string
- traceRoot string
- progressFilePath string
- reportDir string
- schedule string
- cert string
- gRPCAddr string
- lifecycleKeyFile string
- lifecycleGRPCAddr string
- measureRoot string
- lifecycleCertFile string
- localNodeMD schema.Metadata
- maxExecutionTimes int
- chunkSize run.Bytes
- lifecycleGRPCPort uint32
- lifecycleHTTPPort uint32
- enableTLS bool
- insecure bool
- lifecycleTLS bool
+ pm protector.Memory
+ omr observability.MetricsRegistry
+ metricsClient queue.Client
+ cyclesTotal meter.Counter
+ lastRunTimestamp meter.Gauge
+ lastRunSuccess meter.Gauge
+ metadata metadata.Repo
+ l *logger.Logger
+ clusterStateMgr *clusterStateManager
+ metricsKeeperStop chan struct{}
+ sch *timestamp.Scheduler
+ stopCh chan struct{}
+ clientCloser context.CancelFunc
+ currentNode *databasev1.Node
+ tlsReloader *pkgtls.Reloader
+ httpSrv *http.Server
+ grpcServer *grpclib.Server
+ gRPCAddr string
+ schedule string
+ emittedLastRunRole string
+ emittedLastRunNode string
+ emittedLastRunGroup string
+ lifecycleCertFile string
+ lastRunTier string
+ lastRunRole string
+ lastRunNode string
+ lifecycleHost string
+ lifecycleHTTPAddr string
+ streamRoot string
+ traceRoot string
+ progressFilePath string
+ reportDir string
+ emittedLastRunTier string
+ cert string
+ lastRunGroup string
+ lifecycleKeyFile string
+ lifecycleGRPCAddr string
+ measureRoot string
+ localNodeMD schema.Metadata
+ maxExecutionTimes int
+ chunkSize run.Bytes
+ lifecycleGRPCPort uint32
+ lifecycleHTTPPort uint32
+ emittedLastRunSet bool
+ enableTLS bool
+ insecure bool
+ lifecycleTLS bool
}
// NewService creates a new lifecycle service. metricsRegistry replaces the
@@ -164,7 +172,7 @@ func nativeNodeContext(ctx context.Context) context.Context
{
}
}
if nodeID == "" {
- nodeID = "lifecycle"
+ nodeID = lifecycleRoleName
}
return context.WithValue(ctx, common.ContextNodeKey, common.Node{
NodeID: nodeID,
@@ -248,9 +256,10 @@ func (l *lifecycleService) PreRun(_ context.Context) error
{
// Safe to call With() here: the metrics registry is registered earlier
in the
// group, so its PreRun (which builds the provider) has already run.
lifecycleScope :=
l.omr.With(observability.RootScope.SubScope("lifecycle"))
- l.cyclesTotal = lifecycleScope.NewCounter("cycles_total")
- l.lastRunTimestamp =
lifecycleScope.NewGauge("last_run_timestamp_seconds")
- l.lastRunSuccess = lifecycleScope.NewGauge("last_run_success")
+ cycleLabels := []string{"remote_node", "remote_role", "remote_tier",
"group"}
+ l.cyclesTotal = lifecycleScope.NewCounter("cycles_total",
cycleLabels...)
+ l.lastRunTimestamp =
lifecycleScope.NewGauge("last_run_timestamp_seconds", cycleLabels...)
+ l.lastRunSuccess = lifecycleScope.NewGauge("last_run_success",
cycleLabels...)
if l.schedule != "" && l.lifecycleTLS {
var err error
@@ -320,7 +329,7 @@ func (l *lifecycleService) GracefulStop() {
}
func (l *lifecycleService) Name() string {
- return "lifecycle"
+ return lifecycleRoleName
}
// buildLocalNodeMD builds the schema metadata registered on the native metrics
@@ -554,13 +563,26 @@ func (l *lifecycleService) startServers() {
}
func (l *lifecycleService) action(ctx context.Context) (err error) {
- if l.cyclesTotal != nil {
- l.cyclesTotal.Inc(1)
- }
+ // Reset the cycle's last-seen (group, remote_*) tuple so an empty
+ // cycle (no parseGroup succeeded) doesn't inherit the previous
+ // cycle's labels. recordLastRun reads these at the end of the
+ // cycle; without the reset, scheduler-driven consecutive cycles
+ // could see a stale group label.
+ l.lastRunGroup = ""
+ l.lastRunNode = ""
+ l.lastRunRole = ""
+ l.lastRunTier = ""
+ // Do NOT reset the emittedLastRun* fields here — they carry the
+ // (group, remote_*) tuple of the last series actually Set on
+ // Prometheus, which recordLastRun needs to Delete in the next
+ // cycle so the previous cycle's series doesn't accumulate as a
+ // stale labeled gauge. An empty cycle still has a previous emitted
+ // tuple to clean up; the new cycle's Set will then re-stamp with
+ // the current (possibly empty) labels.
// Stamp last-run metrics at the end of this cycle regardless of
outcome.
// Using defer keeps the success/error bookkeeping in one place even as
// the body grows new early returns; the metrics gauge Set()s observe
- // the same time.Now() and the success flag, so dashboards see
consistent
+ // the same runStart and the success flag, so dashboards see consistent
// (timestamp, success) pairs. The named return value lets the defer
// observe whether the body succeeded.
runStart := time.Now()
@@ -670,23 +692,84 @@ func (l *lifecycleService) action(ctx context.Context)
(err error) {
return fmt.Errorf("lifecycle migration partially completed, progress
file retained; %v groups not fully completed", notCompleteGroups)
}
-// recordLastRun stamps the banyandb_lifecycle_last_run_* gauges with the
-// start time (epoch seconds) and a 0/1 success flag. Called from the
-// deferred end-of-action block so every code path (success, error,
-// panic-recovered) updates both gauges. nil gauges are skipped so a
-// lifecycle run with a nil observability.MetricsRegistry (BypassRegistry)
+// recordCycleGroup stamps the per-group banyandb_lifecycle_cycles_total
+// Inc with the (group, remote_node, remote_role, remote_tier) tuple
+// returned by parseGroup. It also captures the (group, remote_*) tuple
+// as the cycle's last-seen identity, which the deferred recordLastRun
+// reads at the end of the cycle to stamp the cycle-level last_run_*
+// gauges. lastRunTimestamp and lastRunSuccess are intentionally NOT
+// touched here — they are stamped atomically at cycle end in
+// recordLastRun so dashboards see consistent (timestamp, success) pairs
+// for the same (group, remote_*) tuple, and so the success flag
+// reflects the whole-cycle outcome (not the last group's parseGroup
+// result).
+//
+// nil counters are skipped so a lifecycle run with a nil
+// observability.MetricsRegistry (BypassRegistry) doesn't crash.
+func (l *lifecycleService) recordCycleGroup(group, senderNode, senderRole,
senderTier string) {
+ if l.cyclesTotal != nil {
+ l.cyclesTotal.Inc(1, senderNode, senderRole, senderTier, group)
+ }
+ l.lastRunGroup = group
+ l.lastRunNode = senderNode
+ l.lastRunRole = senderRole
+ l.lastRunTier = senderTier
+}
+
+// recordLastRun stamps the banyandb_lifecycle_last_run_timestamp_seconds
+// and banyandb_lifecycle_last_run_success gauges with the start time
+// (epoch seconds) and a 0/1 success flag, both using the
+// (group, remote_node, remote_role, remote_tier) tuple from the cycle's
+// last processed group. Called from the deferred end-of-action block so
+// every code path (success, error, panic-recovered) updates both
+// gauges atomically.
+//
+// Prometheus' labeled gauges don't garbage-collect the previous tuple
+// when Set is called with new labels — the old series lingers as
+// "stale" until a scrape expires it. To prevent dashboards from
+// reading a previous cycle's (group, remote_*) tuple as current,
+// recordLastRun Deletes the previously-emitted tuple (tracked in
+// emittedLastRun{Group,Node,Role,Tier}) before stamping the new
+// (current) tuple, and then updates the emitted-tuple fields to
+// reflect the new stamp. The empty-cycle path (no group was processed
+// in the current cycle) still calls Delete on the previous tuple and
+// then Set the all-empty-labels tuple, so the dashboard always sees
+// exactly one current series. nil gauges are skipped so a lifecycle
+// run with a nil observability.MetricsRegistry (BypassRegistry)
// doesn't crash.
func (l *lifecycleService) recordLastRun(start time.Time, err error) {
- if l.lastRunTimestamp != nil {
- l.lastRunTimestamp.Set(float64(start.Unix()), nil...)
+ success := 0.0
+ if err == nil {
+ success = 1.0
}
- if l.lastRunSuccess != nil {
- success := 0.0
- if err == nil {
- success = 1.0
+ prevLabels := []string{
+ l.emittedLastRunNode, l.emittedLastRunRole,
l.emittedLastRunTier, l.emittedLastRunGroup,
+ }
+ if l.emittedLastRunSet {
+ if l.lastRunTimestamp != nil {
+ l.lastRunTimestamp.Delete(prevLabels...)
+ }
+ if l.lastRunSuccess != nil {
+ l.lastRunSuccess.Delete(prevLabels...)
}
- l.lastRunSuccess.Set(success, nil...)
}
+ if l.lastRunTimestamp != nil {
+ l.lastRunTimestamp.Set(float64(start.Unix()), l.lastRunNode,
l.lastRunRole, l.lastRunTier, l.lastRunGroup)
+ }
+ if l.lastRunSuccess != nil {
+ l.lastRunSuccess.Set(success, l.lastRunNode, l.lastRunRole,
l.lastRunTier, l.lastRunGroup)
+ }
+ // Update the emitted-tuple tracking so the next cycle's recordLastRun
+ // deletes THIS cycle's tuple. emittedLastRunSet is unconditionally
+ // true after a successful Set, even for the all-empty-labels case
+ // (so the next non-empty cycle knows to Delete the all-empty
+ // series). This runs after the Set so a panic in Set doesn't
+ // leave the tracking inconsistent with Prometheus.
+ l.emittedLastRunSet = true
+ l.emittedLastRunGroup = l.lastRunGroup
+ l.emittedLastRunNode = l.lastRunNode
+ l.emittedLastRunRole = l.lastRunRole
+ l.emittedLastRunTier = l.lastRunTier
}
// waitForCoLocatedNode waits briefly for the data node behind --grpc-addr to
@@ -1082,11 +1165,12 @@ func (l *lifecycleService) getGroupsToProcess(ctx
context.Context, progress *Pro
func (l *lifecycleService) processStreamGroup(ctx context.Context, g
*commonv1.Group,
streamDir string, nodes []*databasev1.Node, labels map[string]string,
progress *Progress,
) {
- group, err := parseGroup(g, labels, nodes, l.l, l.metadata,
l.clusterStateMgr, l.omr, l.gRPCAddr)
+ group, senderNode, senderRole, senderTier, err := parseGroup(g, labels,
nodes, l.l, l.metadata, l.clusterStateMgr, l.omr)
if err != nil {
l.l.Error().Err(err).Msgf("failed to parse group %s",
g.Metadata.Name)
return
}
+ l.recordCycleGroup(g.Metadata.Name, senderNode, senderRole, senderTier)
defer group.Close()
tr := l.getRemovalSegmentsTimeRange(group)
if tr.Start.IsZero() && tr.End.IsZero() {
@@ -1202,11 +1286,12 @@ func (l *lifecycleService)
deleteExpiredStreamSegments(ctx context.Context, g *c
func (l *lifecycleService) processMeasureGroup(ctx context.Context, g
*commonv1.Group, measureDir string,
nodes []*databasev1.Node, labels map[string]string, progress *Progress,
) {
- group, err := parseGroup(g, labels, nodes, l.l, l.metadata,
l.clusterStateMgr, l.omr, l.gRPCAddr)
+ group, senderNode, senderRole, senderTier, err := parseGroup(g, labels,
nodes, l.l, l.metadata, l.clusterStateMgr, l.omr)
if err != nil {
l.l.Error().Err(err).Msgf("failed to parse group %s",
g.Metadata.Name)
return
}
+ l.recordCycleGroup(g.Metadata.Name, senderNode, senderRole, senderTier)
defer group.Close()
tr := l.getRemovalSegmentsTimeRange(group)
@@ -1309,11 +1394,12 @@ func (l *lifecycleService)
deleteExpiredTraceSegments(ctx context.Context, g *co
func (l *lifecycleService) processTraceGroup(ctx context.Context, g
*commonv1.Group, traceDir string,
nodes []*databasev1.Node, labels map[string]string, progress *Progress,
) {
- group, err := parseGroup(g, labels, nodes, l.l, l.metadata,
l.clusterStateMgr, l.omr, l.gRPCAddr)
+ group, senderNode, senderRole, senderTier, err := parseGroup(g, labels,
nodes, l.l, l.metadata, l.clusterStateMgr, l.omr)
if err != nil {
l.l.Error().Err(err).Msgf("failed to parse group %s",
g.Metadata.Name)
return
}
+ l.recordCycleGroup(g.Metadata.Name, senderNode, senderRole, senderTier)
defer group.Close()
tr := l.getRemovalSegmentsTimeRange(group)
diff --git a/banyand/backup/lifecycle/steps.go
b/banyand/backup/lifecycle/steps.go
index fd5f90477..0bcf801e3 100644
--- a/banyand/backup/lifecycle/steps.go
+++ b/banyand/backup/lifecycle/steps.go
@@ -22,6 +22,7 @@ import (
"fmt"
"net"
"os"
+ "strings"
"github.com/pkg/errors"
"google.golang.org/protobuf/proto"
@@ -85,79 +86,104 @@ func (l *lifecycleService) getSnapshots(ctx
context.Context, groups []*commonv1.
return streamDir, measureDir, traceDir, nil
}
-// deriveSelfIdentity returns the SenderNode and SenderTier the lifecycle
-// publisher should stamp on its wire SendRequest so the data-node receiver
-// can label its banyandb_queue_sub_* family accordingly.
+// resolveSelfIdentity returns the BanyanDB NodeID (Metadata.Name) and
+// tier label (Labels["type"]) the lifecycle publisher should stamp on
+// its wire SendRequest / SyncMetadata, so the data-node receiver labels
+// its banyandb_queue_sub_total_* family with non-empty remote_node and
+// remote_tier.
//
-// Inputs (no new CLI flags needed):
+// Resolution: the lifecycle sidecar's own pod hostname is the stable
+// identifier the receiver will see as the sender. It comes from
+// POD_NAME (K8s downward API) and falls back to os.Hostname() — the
+// same precedence as nativeNodeContext at service.go:160-165. The
+// function then looks the host up directly in the data-node registry,
+// matching against the host portion of GrpcAddress (the registry may
+// carry an IP, a headless-service FQDN, or a loopback alias, depending
+// on which bind address the data pod registered with). The first
+// registry entry whose host matches (with loopback-alias and
+// IP-literal normalization via hostMatches) is the co-located data
+// pod; its Metadata.Name is the SenderNode and its Labels["type"] is
+// the SenderTier.
//
-// - coLocatedDataNodeAddr: the lifecycle's --grpc-addr, which is the
-// gRPC address of the data node the lifecycle is co-located with
-// (sidecar topology). It is the authoritative match key.
-// - nodeLabels: the lifecycle's own --node-labels, used as a fallback
-// when the co-located data node hasn't synced to the metadata
-// registry yet (cold start of a freshly created lifecycle).
-// - nodes: the cluster's data-node registry (ROLE_DATA), which carries
-// both Metadata.Name (the BanyanDB NodeID) and Labels (e.g. type=hot).
-//
-// Resolution order:
-//
-// 1. Match by GrpcAddress. In the standard sidecar layout the lifecycle's
-// --grpc-addr equals the co-located data node's GrpcAddress, and
-// that data node's Metadata.Name is the BanyanDB NodeID the
-// receiver records as remote_node. This is the production path
-// (the live cluster's lifecycle container has no --node-labels).
-// 2. Fall back to label matching. If a data node's Labels are a
-// superset of nodeLabels (every key in nodeLabels matches), use
-// that node's Metadata.Name and Labels["type"].
-// 3. Fall back to type-only match on nodeLabels["type"].
-// 4. If nothing matches, return empty strings — preserves the
-// pre-fix behavior so existing test setups that don't populate
-// these fields keep working.
-func deriveSelfIdentity(coLocatedDataNodeAddr string, nodeLabels
map[string]string, nodes []*databasev1.Node) (senderNode, senderTier string) {
- // Pass 1: GrpcAddress match (the production sidecar path).
- if coLocatedDataNodeAddr != "" {
- for _, n := range nodes {
- if grpcAddrEqual(n.GrpcAddress, coLocatedDataNodeAddr) {
- return n.Metadata.Name, n.Labels["type"]
- }
- }
+// Re-runs on every parseGroup call (no caching) so a data-pod
+// restart, re-registration, or new host is picked up by the next
+// cycle. Returns ok=false when no registry entry matches.
+func resolveSelfIdentity(selfPodHost string, nodes []*databasev1.Node)
(senderNode, senderTier string, ok bool) {
+ if selfPodHost == "" {
+ return "", "", false
}
- // Pass 2: every-key label match. Guarded against an empty label set:
- // labelsContain treats an empty subset as matching anything, which
would
- // attribute the identity to an arbitrary registry node (whichever
happens
- // to be listed first — e.g. a migration target instead of the
co-located
- // data node).
- if len(nodeLabels) > 0 {
- for _, n := range nodes {
- if n.Labels == nil {
- continue
- }
- if !labelsContain(n.Labels, nodeLabels) {
- continue
- }
- return n.Metadata.Name, n.Labels["type"]
+ for _, n := range nodes {
+ if n == nil || n.Metadata == nil {
+ continue
+ }
+ if hostMatches(n.GrpcAddress, selfPodHost) {
+ return n.Metadata.Name, n.Labels["type"], true
}
}
- // Pass 3: type-only label match.
- if wantType := nodeLabels["type"]; wantType != "" {
- for _, n := range nodes {
- if n.Labels == nil {
- continue
- }
- if n.Labels["type"] == wantType {
- return n.Metadata.Name, n.Labels["type"]
- }
+ return "", "", false
+}
+
+// selfPodHostname returns the lifecycle sidecar's own pod host.
+// Precedence matches nativeNodeContext at service.go:160-165:
+// POD_NAME first (K8s downward API), then os.Hostname() as a
+// fallback. Returns "" only if both lookups fail (very rare; e.g.
+// hostname uname syscall returns ENAMETOOLONG).
+func selfPodHostname() string {
+ if v := os.Getenv("POD_NAME"); v != "" {
+ return v
+ }
+ if h, err := os.Hostname(); err == nil {
+ return h
+ }
+ return ""
+}
+
+// hostMatches reports whether aRegistryHost (which may carry a :port
+// and may be a loopback alias, an IP, or a headless-service FQDN)
+// identifies the same pod as selfPodHost. The host portion is
+// extracted via net.SplitHostPort, then reduced to its leftmost
+// label -- but only for FQDNs (multi-label hostnames); IP literals
+// are kept as-is so a 127.0.0.1 form is not truncated to "127".
+// (a FQDN like "data-x.data-x-headless.ns" maps to the pod name
+// "data-x".) Loopback aliases (localhost, 127.0.0.1, ::1) are
+// treated as equivalent so a registry entry advertised as
+// 127.0.0.1:17912 still matches a selfPodHost of "localhost" or
+// vice versa.
+func hostMatches(aRegistryHost, selfPodHost string) bool {
+ if aRegistryHost == "" {
+ return false
+ }
+ if h, _, err := net.SplitHostPort(aRegistryHost); err == nil {
+ aRegistryHost = h
+ }
+ if net.ParseIP(aRegistryHost) == nil {
+ if i := strings.Index(aRegistryHost, "."); i >= 0 {
+ aRegistryHost = aRegistryHost[:i]
}
}
- return "", ""
+ if aRegistryHost == selfPodHost {
+ return true
+ }
+ if isLoopbackHost(selfPodHost) && isLoopbackHost(aRegistryHost) {
+ return true
+ }
+ return false
}
// grpcAddrEqual reports whether two advertised gRPC addresses identify the
-// same endpoint. Besides the exact match, loopback host aliases (localhost,
-// 127.0.0.1, ::1) with the same port are treated as equivalent, so a
-// --grpc-addr given as localhost:PORT still matches a data node registered
-// as 127.0.0.1:PORT.
+// same endpoint. Three equivalences are honored:
+// - exact string match,
+// - host-portion match after reducing each to its leftmost label
+// (FQDN-only; IP literals are kept as-is) with the same port,
+// - both hosts are loopback aliases (localhost / 127.0.0.1 / ::1)
+// with the same port.
+//
+// The middle case is what was missing pre-fix: a --grpc-addr of
+// 127.0.0.1:17912 and a registry GrpcAddress of
+// "<headless-svc>.<ns>:17912" used to be rejected because the
+// headless-svc host is not loopback and not a literal string match.
+// The new behavior is "same port and same leftmost label" wins
+// regardless of the FQDN or loopback status.
func grpcAddrEqual(a, b string) bool {
if a == b {
return true
@@ -167,6 +193,19 @@ func grpcAddrEqual(a, b string) bool {
if errA != nil || errB != nil || portA != portB {
return false
}
+ if net.ParseIP(hostA) == nil {
+ if i := strings.Index(hostA, "."); i >= 0 {
+ hostA = hostA[:i]
+ }
+ }
+ if net.ParseIP(hostB) == nil {
+ if i := strings.Index(hostB, "."); i >= 0 {
+ hostB = hostB[:i]
+ }
+ }
+ if hostA == hostB {
+ return true
+ }
return isLoopbackHost(hostA) && isLoopbackHost(hostB)
}
@@ -179,17 +218,6 @@ func isLoopbackHost(host string) bool {
return ip != nil && ip.IsLoopback()
}
-// labelsContain reports whether superset has every (k, v) pair in subset.
-// An empty subset matches anything.
-func labelsContain(superset, subset map[string]string) bool {
- for k, v := range subset {
- if superset[k] != v {
- return false
- }
- }
- return true
-}
-
// GroupConfig encapsulates the parsed lifecycle configuration for a Group.
// It contains all necessary information for migration and deletion operations.
type GroupConfig struct {
@@ -235,28 +263,27 @@ func parseGroup(
g *commonv1.Group, nodeLabels map[string]string, nodes
[]*databasev1.Node,
l *logger.Logger, metadata metadata.Repo, clusterStateMgr
*clusterStateManager,
omr observability.MetricsRegistry,
- coLocatedDataNodeAddr string,
-) (*GroupConfig, error) {
+) (group *GroupConfig, senderNode, senderRole, senderTier string, err error) {
ro := g.ResourceOpts
if ro == nil {
- return nil, fmt.Errorf("no resource opts in group %s",
g.Metadata.Name)
+ return nil, "", "", "", fmt.Errorf("no resource opts in group
%s", g.Metadata.Name)
}
if len(ro.Stages) == 0 {
- return nil, fmt.Errorf("no stages in group %s", g.Metadata.Name)
+ return nil, "", "", "", fmt.Errorf("no stages in group %s",
g.Metadata.Name)
}
// Validate IntervalRules up-front so later derefs (incl. Stages[i+1])
are safe.
if ro.Ttl == nil {
- return nil, fmt.Errorf("group %s: missing ttl", g.Metadata.Name)
+ return nil, "", "", "", fmt.Errorf("group %s: missing ttl",
g.Metadata.Name)
}
if ro.SegmentInterval == nil {
- return nil, fmt.Errorf("group %s: missing segment_interval",
g.Metadata.Name)
+ return nil, "", "", "", fmt.Errorf("group %s: missing
segment_interval", g.Metadata.Name)
}
for _, st := range ro.Stages {
if st.SegmentInterval == nil {
- return nil, fmt.Errorf("group %s stage %s: missing
segment_interval", g.Metadata.Name, st.Name)
+ return nil, "", "", "", fmt.Errorf("group %s stage %s:
missing segment_interval", g.Metadata.Name, st.Name)
}
if st.Ttl == nil {
- return nil, fmt.Errorf("group %s stage %s: missing
ttl", g.Metadata.Name, st.Name)
+ return nil, "", "", "", fmt.Errorf("group %s stage %s:
missing ttl", g.Metadata.Name, st.Name)
}
}
ttlTime := proto.Clone(ro.Ttl).(*commonv1.IntervalRule)
@@ -265,9 +292,9 @@ func parseGroup(
var targetSegmentInterval *commonv1.IntervalRule
var sourceStage string
for i, st := range ro.Stages {
- selector, err := pub.ParseLabelSelector(st.NodeSelector)
- if err != nil {
- return nil, errors.WithMessagef(err, "failed to parse
node selector %s", st.NodeSelector)
+ selector, parseErr := pub.ParseLabelSelector(st.NodeSelector)
+ if parseErr != nil {
+ return nil, "", "", "", errors.WithMessagef(parseErr,
"failed to parse node selector %s", st.NodeSelector)
}
ttlTime.Num += st.Ttl.Num
if !selector.Matches(nodeLabels) {
@@ -275,7 +302,7 @@ func parseGroup(
}
if i+1 >= len(ro.Stages) {
l.Info().Msgf("no next stage for group %s at stage %s",
g.Metadata.Name, st.Name)
- return nil, nil
+ return nil, "", "", "", nil
}
nst = ro.Stages[i+1]
sourceStage = st.Name
@@ -302,33 +329,59 @@ func parseGroup(
}
nsl, err := pub.ParseLabelSelector(nst.NodeSelector)
if err != nil {
- return nil, errors.WithMessagef(err, "failed to parse node
selector %s", nst.NodeSelector)
+ return nil, "", "", "", errors.WithMessagef(err, "failed to
parse node selector %s", nst.NodeSelector)
}
nodeSel := node.NewRoundRobinSelector("", metadata)
if ok, _ := nodeSel.OnInit([]schema.Kind{schema.KindGroup}); !ok {
- return nil, fmt.Errorf("failed to initialize node selector for
group %s", g.Metadata.Name)
+ return nil, "", "", "", fmt.Errorf("failed to initialize node
selector for group %s", g.Metadata.Name)
}
client := pub.NewWithoutMetadata(omr) //nolint:contextcheck // health
check goroutine uses context.Background()
// Stamp the lifecycle's self identity onto the publisher so the wire
// SenderNode / SenderRole / SenderTier fields and the parallel
- // banyandb_lifecycle_migration_* labels are populated. The three
- // values are derived from already-known inputs (the co-located data
- // node's gRPC address and the cluster's data-node registry) so the
- // fix needs no new CLI flags:
- // - SenderNode = the data node whose GrpcAddress matches the
- // lifecycle's --grpc-addr (i.e. the co-located data node). Its
- // Metadata.Name is the BanyanDB NodeID the receiver records as
- // remote_node.
- // - SenderRole = "lifecycle" (no Role enum entry; matches the
- // liaison's hard-coded "liaison" pattern in
pkg/cmdsetup/liaison.go).
- // - SenderTier = the matched data node's `type` label
- // (hot/warm/cold), which becomes the receiver's remote_tier.
- // Falls back to the lifecycle's own --node-labels when the co-located
- // data node isn't in the registry yet (cold start), and to empty
- // when neither is available — preserving the pre-fix behavior.
- senderNode, senderTier := deriveSelfIdentity(coLocatedDataNodeAddr,
nodeLabels, nodes)
- if senderNode != "" || senderTier != "" {
- client.SetSelfNode(senderNode, "lifecycle", senderTier)
+ // banyandb_lifecycle_migration_* labels are populated. The
+ // resolveSelfIdentity algorithm matches the lifecycle's own pod
+ // hostname (POD_NAME -> os.Hostname(), same precedence as
+ // nativeNodeContext at service.go:160-165) against the
+ // data-node registry's GrpcAddress with loopback-alias and
+ // port-strip normalization. The first matching registry entry is
+ // the co-located data pod; its Metadata.Name is the BanyanDB
+ // NodeID the receiver records as remote_node, and its
+ // Labels["type"] is the receiver's remote_tier. SenderRole is
+ // hard-coded to "lifecycle" to mirror the liaison's
+ // "liaison" pattern at pkg/cmdsetup/liaison.go:170-171.
+ //
+ // The (senderNode, "lifecycle", senderTier) tuple returned here is
+ // consumed by three downstream emissions, all sharing the same
+ // (remote_node, remote_role, remote_tier) label form: (a) the wire
+ // SenderNode/Role/Tier fields on every SendRequest
+ // (banyand/queue/queue.go:62-68), (b) the per-message
+ // banyandb_lifecycle_migration_* family emitted by the
+ // lifecycle-tier pub (file-sync:
banyand/queue/pub/chunked_sync.go:67-82;
+ // batch-write: banyand/queue/pub/batch.go:215, 271, 291-292, 421,
+ // 471-472, 488, 511, 520, 532), and (c) the cycle-level
+ // banyandb_lifecycle_cycles_total + last_run_* metrics stamped by
+ // the caller (process*Group). The two families describe different
+ // sides (sender vs destination) and are not cross-joinable — see
+ // the struct comment in service.go and CHANGES.md for the
+ // asymmetry.
+ selfHost := selfPodHostname()
+ senderNode, senderTier, resolvedOK := resolveSelfIdentity(selfHost,
nodes)
+ if resolvedOK {
+ senderRole = "lifecycle"
+ client.SetSelfNode(senderNode, senderRole, senderTier)
+ // Info log so operators can see which identity the agent
+ // stamped on the wire, and which co-located data pod the
+ // registry picked. This is the log line that surfaces the
+ // "remote node" the user wants visible at startup.
+ l.Info().
+ Str("data_pod", selfHost).
+ Str("sender_node", senderNode).
+ Str("sender_tier", senderTier).
+ Msg("lifecycle: stamped sender identity on wire
(SenderNode, SenderTier)")
+ } else {
+ l.Warn().
+ Str("data_pod", selfHost).
+ Msg("lifecycle: sender identity resolution returned
empty; SenderNode on wire will be empty (pre-fix regression)")
}
switch g.Catalog {
case commonv1.Catalog_CATALOG_STREAM:
@@ -338,7 +391,7 @@ func parseGroup(
case commonv1.Catalog_CATALOG_MEASURE:
_ = grpc.NewClusterNodeRegistry(data.TopicMeasureWrite, client,
nodeSel)
default:
- return nil, fmt.Errorf("unsupported catalog %s for lifecycle
migration of group %s", g.Catalog, g.Metadata.Name)
+ return nil, "", "", "", fmt.Errorf("unsupported catalog %s for
lifecycle migration of group %s", g.Catalog, g.Metadata.Name)
}
var existed bool
@@ -357,7 +410,7 @@ func parseGroup(
}
}
if !existed {
- return nil, errors.New("no nodes matched")
+ return nil, "", "", "", errors.New("no nodes matched")
}
if t := client.GetRouteTable(); t != nil {
@@ -374,7 +427,7 @@ func parseGroup(
TargetStage: nst.Name,
NodeSelector: nodeSel,
QueueClient: client,
- }, nil
+ }, senderNode, senderRole, senderTier, nil
}
type fileInfo struct {
diff --git a/banyand/backup/lifecycle/steps_test.go
b/banyand/backup/lifecycle/steps_test.go
index 203ff3953..04033ba0a 100644
--- a/banyand/backup/lifecycle/steps_test.go
+++ b/banyand/backup/lifecycle/steps_test.go
@@ -86,37 +86,96 @@ func TestParseGroup_RejectsMissingIntervals(t *testing.T) {
for _, c := range cases {
t.Run(c.name, func(t *testing.T) {
g := makeGroup(c.mutate)
- _, err := parseGroup(g, map[string]string{"type":
"warm"}, nil, nil, nil, nil, nil, "")
+ _, _, _, _, err := parseGroup(g,
map[string]string{"type": "warm"}, nil, nil, nil, nil, nil)
require.Error(t, err)
assert.Contains(t, err.Error(), c.errFrag)
})
}
}
-// TestDeriveSelfIdentity verifies the sender-identity resolution order: the
-// co-located data node's address match (including loopback host aliases, since
-// test clusters dial localhost:PORT while nodes register 127.0.0.1:PORT), the
-// label fallbacks, and the guard that keeps an empty label set from
-// wildcard-matching an arbitrary registry node.
-func TestDeriveSelfIdentity(t *testing.T) {
+// Note: parseGroup's (senderNode, senderRole, senderTier) return is
+// driven by resolveSelfIdentity, which is exhaustively covered by
+// TestResolveSelfIdentity below. The full parseGroup body requires a
+// real metadata.Repo for nodeSel.OnInit (steps.go:335) and a real
+// clusterStateMgr for the route table, so calling parseGroup from a
+// unit test requires an integration harness — that coverage lives in
+// test/cases/lifecycle/lifecycle.go.
+
+// TestResolveSelfIdentity exercises the post-fix pod-name primary lookup
+// against representative registry shapes. The first three cases prove
+// the bug fix: a host-portion match on the registry's GrpcAddress
+// (whether loopback, IP, or FQDN) resolves to the co-located data
+// node's Metadata.Name. The last two cases prove the no-match and
+// empty-input guards.
+func TestResolveSelfIdentity(t *testing.T) {
nodes := []*databasev1.Node{
- {Metadata: &commonv1.Metadata{Name: "warm-node"}, GrpcAddress:
"127.0.0.1:2", Labels: map[string]string{"type": "warm"}},
- {Metadata: &commonv1.Metadata{Name: "hot-node"}, GrpcAddress:
"127.0.0.1:1", Labels: map[string]string{"type": "hot"}},
+ // Production-bug repro: lifecycle's --grpc-addr is
+ // 127.0.0.1:17912, but the data pod registered its
+ // GrpcAddress as the headless-service DNS form. The new
+ // algorithm must match on the host portion (data-hot-0).
+ {Metadata: &commonv1.Metadata{Name: "data-hot-0:17912"},
GrpcAddress: "data-hot-0.data-hot-headless.ns:17912", Labels:
map[string]string{"type": "hot"}},
+
+ // Loopback-registered case: --grpc-addr "127.0.0.1:17912"
+ // matches the loopback form via isLoopbackHost equivalence.
+ {Metadata: &commonv1.Metadata{Name: "data-warm-0:17912"},
GrpcAddress: "127.0.0.1:17912", Labels: map[string]string{"type": "warm"}},
+
+ // IP-only registered case (NodeHostProvider=ip): must NOT
+ // match unless the selfPodHost is itself an IP, so we
+ // expect an empty result here. The first case in this
+ // table covers the production path; this one documents
+ // the known limitation of the post-fix algorithm.
+ {Metadata: &commonv1.Metadata{Name: "data-cold-0:17912"},
GrpcAddress: "10.116.3.84:17912", Labels: map[string]string{"type": "cold"}},
}
- node, tier := deriveSelfIdentity("127.0.0.1:1", nil, nodes)
- assert.Equal(t, "hot-node", node, "exact address match")
+ node, tier, ok := resolveSelfIdentity("data-hot-0", nodes)
+ assert.True(t, ok, "DNS-form GrpcAddress must match by host portion
(the production-bug case)")
+ assert.Equal(t, "data-hot-0:17912", node)
assert.Equal(t, "hot", tier)
- node, tier = deriveSelfIdentity("localhost:1", nil, nodes)
- assert.Equal(t, "hot-node", node, "loopback alias must match the
registered 127.0.0.1 form")
- assert.Equal(t, "hot", tier)
+ node, tier, ok = resolveSelfIdentity("127.0.0.1", nodes)
+ assert.True(t, ok, "loopback GrpcAddress matches a loopback
selfPodHost")
+ assert.Equal(t, "data-warm-0:17912", node)
+ assert.Equal(t, "warm", tier)
- node, tier = deriveSelfIdentity("localhost:9", nil, nodes)
- assert.Empty(t, node, "unmatched address with no labels must not
wildcard-match an arbitrary node")
- assert.Empty(t, tier)
+ // selfPodHost="10.116.3.84" matches the IP-form entry.
+ node, tier, ok = resolveSelfIdentity("10.116.3.84", nodes)
+ assert.True(t, ok, "IP-form GrpcAddress matches an IP selfPodHost")
+ assert.Equal(t, "data-cold-0:17912", node)
+ assert.Equal(t, "cold", tier)
- node, tier = deriveSelfIdentity("", map[string]string{"type": "hot"},
nodes)
- assert.Equal(t, "hot-node", node, "label fallback")
- assert.Equal(t, "hot", tier)
+ // selfPodHost="data-warm-1" is not in the registry: no match.
+ _, _, ok = resolveSelfIdentity("data-warm-1", nodes)
+ assert.False(t, ok, "selfPodHost not in registry must return ok=false
(no wildcard)")
+
+ // Empty selfPodHost: no match.
+ _, _, ok = resolveSelfIdentity("", nodes)
+ assert.False(t, ok, "empty selfPodHost must return ok=false (no panic)")
+
+ // grpcAddrEqual: post-fix also accepts host-portion exact match.
+ assert.True(t, grpcAddrEqual("data-x.headless:17912", "data-x:17912"),
+ "same port and same host (after SplitHostPort) must match")
+ assert.True(t, grpcAddrEqual("127.0.0.1:17912", "127.0.0.1:17912"),
+ "exact match still works")
+ assert.True(t, grpcAddrEqual("localhost:17912", "127.0.0.1:17912"),
+ "loopback-vs-loopback still works")
+ assert.False(t, grpcAddrEqual("data-x:17912", "data-y:17912"),
+ "different hosts with same port must not match")
+}
+
+// TestSelfPodHostnamePrecedence checks that selfPodHostname() prefers
+// POD_NAME (the K8s downward API value) and falls back to os.Hostname()
+// when POD_NAME is unset.
+func TestSelfPodHostnamePrecedence(t *testing.T) {
+ t.Setenv("POD_NAME", "my-pod")
+ assert.Equal(t, "my-pod", selfPodHostname(),
+ "POD_NAME takes precedence (K8s downward API)")
+
+ t.Setenv("POD_NAME", "")
+ // Without POD_NAME, the function falls back to os.Hostname().
+ // The test host name is non-empty on Linux, so the result is
+ // either the test runner's hostname or "" if uname fails.
+ // Either way it must not panic and must not return "my-pod".
+ got := selfPodHostname()
+ assert.NotEqual(t, "my-pod", got,
+ "empty POD_NAME must fall back to os.Hostname(), not return the
previous value")
}
diff --git a/docs/operation/grafana-fodc-nodes.json
b/docs/operation/grafana-fodc-nodes.json
index deedcac49..404b0fa91 100644
--- a/docs/operation/grafana-fodc-nodes.json
+++ b/docs/operation/grafana-fodc-nodes.json
@@ -659,7 +659,7 @@
"type": "prometheus",
"uid": "${DS_PROMETHEUS}"
},
- "description": "Each row is one directed flow source→target (bare pod
names) per operation, with the PUBLISHER’s view and the SUBSCRIBER’s view of
the SAME traffic side by side — never sum the two. Request edges pair
banyandb_queue_pub_* (sender) with banyandb_queue_sub_* (receiver);
tier-migration edges pair banyandb_lifecycle_migration_* (lifecycle sidecar,
sharing its data pod’s pod_name) with the receiver’s queue_sub
remote_role=\"lifecycle\" series. The two sides are joined on [...]
+ "description": "Streaming pod-to-pod flows (excludes tier-migration
edges; see the Migration Flows panel below for those). Each row is one directed
source→target per (group, operation). Pub side is the publisher container; Sub
side is the receiver. Units are per-second (rate over $__range). Pub msg/s and
Sub msg/s should match side-to-side; a populated Pub cell with an empty Sub
cell is signal — an uninstrumented side or missing scrape target. p99 latencies
are histogram-quantile o [...]
"fieldConfig": {
"defaults": {
"color": {
@@ -895,7 +895,7 @@
},
"editorMode": "code",
"exemplar": false,
- "expr": "sum by (source, target, operation)
(label_replace(label_replace(rate(banyandb_queue_pub_total_finished{job=~\"$job\",
container_name=~\"$role\", pod_name=~\"$pod\"}[$__rate_interval]) or
rate(banyandb_queue_pub_total_finished{job=~\"$job\", remote_role=~\"$role\",
remote_node=~\"($pod)\\\\..*\"}[$__rate_interval]) or
rate(banyandb_lifecycle_migration_total_finished{job=~\"$job\",
container_name=~\"$role\", pod_name=~\"$pod\"}[$__rate_interval]) or
rate(banyandb_lifecyc [...]
+ "expr": "sum by (source, target, operation)
(label_replace(label_replace(rate(banyandb_queue_pub_total_finished{job=~\"$job\",
container_name=~\"$role\", pod_name=~\"$pod\"}[$__range]) or
rate(banyandb_queue_pub_total_finished{job=~\"$job\", remote_role=~\"$role\",
remote_node=~\"($pod)\\\\..*\"}[$__range]), \"source\", \"$1\", \"pod_name\",
\"(.*)\"), \"target\", \"$1\", \"remote_node\", \"([^.:]+).*\"))",
"format": "table",
"instant": true,
"legendFormat": "__auto",
@@ -909,7 +909,7 @@
},
"editorMode": "code",
"exemplar": false,
- "expr": "sum by (source, target, operation)
(label_replace(label_replace(rate(banyandb_queue_sub_total_started{job=~\"$job\",
container_name=~\"$role\", pod_name=~\"$pod\"}[$__rate_interval]) or
rate(banyandb_queue_sub_total_started{job=~\"$job\", remote_role=~\"$role\",
remote_node=~\"($pod)\\\\..*\"}[$__rate_interval]), \"source\", \"$1\",
\"remote_node\", \"([^.:]+).*\"), \"target\", \"$1\", \"pod_name\", \"(.*)\"))",
+ "expr": "sum by (source, target, operation)
(label_replace(label_replace(rate(banyandb_queue_sub_total_started{job=~\"$job\",
container_name=~\"$role\", pod_name=~\"$pod\",
remote_role!~\"lifecycle\"}[$__range]) or
rate(banyandb_queue_sub_total_started{job=~\"$job\", remote_role=~\"$role\",
remote_node=~\"($pod)\\\\..*\", remote_role!~\"lifecycle\"}[$__range]),
\"source\", \"$1\", \"remote_node\", \"([^.:]+).*\"), \"target\", \"$1\",
\"pod_name\", \"(.*)\"))",
"format": "table",
"instant": true,
"legendFormat": "__auto",
@@ -923,7 +923,7 @@
},
"editorMode": "code",
"exemplar": false,
- "expr": "histogram_quantile(0.99, sum by (le, source, target,
operation)
(label_replace(label_replace(rate(banyandb_queue_pub_total_latency_bucket{job=~\"$job\",
container_name=~\"$role\", pod_name=~\"$pod\"}[$__rate_interval]) or
rate(banyandb_queue_pub_total_latency_bucket{job=~\"$job\",
remote_role=~\"$role\", remote_node=~\"($pod)\\\\..*\"}[$__rate_interval]) or
rate(banyandb_lifecycle_migration_total_latency_bucket{job=~\"$job\",
container_name=~\"$role\", pod_name=~\"$pod [...]
+ "expr": "histogram_quantile(0.99, sum by (le, source, target,
operation)
(label_replace(label_replace(rate(banyandb_queue_pub_total_latency_bucket{job=~\"$job\",
container_name=~\"$role\", pod_name=~\"$pod\"}[$__range]) or
rate(banyandb_queue_pub_total_latency_bucket{job=~\"$job\",
remote_role=~\"$role\", remote_node=~\"($pod)\\\\..*\"}[$__range]), \"source\",
\"$1\", \"pod_name\", \"(.*)\"), \"target\", \"$1\", \"remote_node\",
\"([^.:]+).*\")))",
"format": "table",
"instant": true,
"legendFormat": "__auto",
@@ -937,7 +937,7 @@
},
"editorMode": "code",
"exemplar": false,
- "expr": "histogram_quantile(0.99, sum by (le, source, target,
operation)
(label_replace(label_replace(rate(banyandb_queue_sub_total_latency_bucket{job=~\"$job\",
container_name=~\"$role\", pod_name=~\"$pod\"}[$__rate_interval]) or
rate(banyandb_queue_sub_total_latency_bucket{job=~\"$job\",
remote_role=~\"$role\", remote_node=~\"($pod)\\\\..*\"}[$__rate_interval]),
\"source\", \"$1\", \"remote_node\", \"([^.:]+).*\"), \"target\", \"$1\",
\"pod_name\", \"(.*)\")))",
+ "expr": "histogram_quantile(0.99, sum by (le, source, target,
operation)
(label_replace(label_replace(rate(banyandb_queue_sub_total_latency_bucket{job=~\"$job\",
container_name=~\"$role\", pod_name=~\"$pod\",
remote_role!~\"lifecycle\"}[$__range]) or
rate(banyandb_queue_sub_total_latency_bucket{job=~\"$job\",
remote_role=~\"$role\", remote_node=~\"($pod)\\\\..*\",
remote_role!~\"lifecycle\"}[$__range]), \"source\", \"$1\", \"remote_node\",
\"([^.:]+).*\"), \"target\", \"$1\", \ [...]
"format": "table",
"instant": true,
"legendFormat": "__auto",
@@ -951,7 +951,7 @@
},
"editorMode": "code",
"exemplar": false,
- "expr": "sum by (source, target, operation)
(label_replace(label_replace(rate(banyandb_queue_pub_total_err{job=~\"$job\",
container_name=~\"$role\", pod_name=~\"$pod\"}[$__rate_interval]) or
rate(banyandb_queue_pub_total_err{job=~\"$job\", remote_role=~\"$role\",
remote_node=~\"($pod)\\\\..*\"}[$__rate_interval]) or
rate(banyandb_lifecycle_migration_total_err{job=~\"$job\",
container_name=~\"$role\", pod_name=~\"$pod\"}[$__rate_interval]) or
rate(banyandb_lifecycle_migration_to [...]
+ "expr": "sum by (source, target, operation)
(label_replace(label_replace(rate(banyandb_queue_pub_total_err{job=~\"$job\",
container_name=~\"$role\", pod_name=~\"$pod\"}[$__range]) or
rate(banyandb_queue_pub_total_err{job=~\"$job\", remote_role=~\"$role\",
remote_node=~\"($pod)\\\\..*\"}[$__range]), \"source\", \"$1\", \"pod_name\",
\"(.*)\"), \"target\", \"$1\", \"remote_node\", \"([^.:]+).*\"))",
"format": "table",
"instant": true,
"legendFormat": "__auto",
@@ -965,7 +965,7 @@
},
"editorMode": "code",
"exemplar": false,
- "expr": "sum by (source, target, operation)
(label_replace(label_replace(rate(banyandb_queue_sub_total_err{job=~\"$job\",
container_name=~\"$role\", pod_name=~\"$pod\"}[$__rate_interval]) or
rate(banyandb_queue_sub_total_err{job=~\"$job\", remote_role=~\"$role\",
remote_node=~\"($pod)\\\\..*\"}[$__rate_interval]), \"source\", \"$1\",
\"remote_node\", \"([^.:]+).*\"), \"target\", \"$1\", \"pod_name\", \"(.*)\"))",
+ "expr": "sum by (source, target, operation)
(label_replace(label_replace(rate(banyandb_queue_sub_total_err{job=~\"$job\",
container_name=~\"$role\", pod_name=~\"$pod\",
remote_role!~\"lifecycle\"}[$__range]) or
rate(banyandb_queue_sub_total_err{job=~\"$job\", remote_role=~\"$role\",
remote_node=~\"($pod)\\\\..*\", remote_role!~\"lifecycle\"}[$__range]),
\"source\", \"$1\", \"remote_node\", \"([^.:]+).*\"), \"target\", \"$1\",
\"pod_name\", \"(.*)\"))",
"format": "table",
"instant": true,
"legendFormat": "__auto",
@@ -979,7 +979,7 @@
},
"editorMode": "code",
"exemplar": false,
- "expr": "sum by (source, target, operation)
(label_replace(label_replace(rate(banyandb_queue_pub_sent_bytes{job=~\"$job\",
container_name=~\"$role\", pod_name=~\"$pod\"}[$__rate_interval]) or
rate(banyandb_queue_pub_sent_bytes{job=~\"$job\", remote_role=~\"$role\",
remote_node=~\"($pod)\\\\..*\"}[$__rate_interval]) or
rate(banyandb_lifecycle_migration_sent_bytes{job=~\"$job\",
container_name=~\"$role\", pod_name=~\"$pod\"}[$__rate_interval]) or
rate(banyandb_lifecycle_migration [...]
+ "expr": "sum by (source, target, operation)
(label_replace(label_replace(rate(banyandb_queue_pub_sent_bytes{job=~\"$job\",
container_name=~\"$role\", pod_name=~\"$pod\"}[$__range]) or
rate(banyandb_queue_pub_sent_bytes{job=~\"$job\", remote_role=~\"$role\",
remote_node=~\"($pod)\\\\..*\"}[$__range]), \"source\", \"$1\", \"pod_name\",
\"(.*)\"), \"target\", \"$1\", \"remote_node\", \"([^.:]+).*\"))",
"format": "table",
"instant": true,
"legendFormat": "__auto",
@@ -993,7 +993,7 @@
},
"editorMode": "code",
"exemplar": false,
- "expr": "sum by (source, target, operation)
(label_replace(label_replace(rate(banyandb_queue_sub_received_bytes{job=~\"$job\",
container_name=~\"$role\", pod_name=~\"$pod\"}[$__rate_interval]) or
rate(banyandb_queue_sub_received_bytes{job=~\"$job\", remote_role=~\"$role\",
remote_node=~\"($pod)\\\\..*\"}[$__rate_interval]), \"source\", \"$1\",
\"remote_node\", \"([^.:]+).*\"), \"target\", \"$1\", \"pod_name\", \"(.*)\"))",
+ "expr": "sum by (source, target, operation)
(label_replace(label_replace(rate(banyandb_queue_sub_received_bytes{job=~\"$job\",
container_name=~\"$role\", pod_name=~\"$pod\",
remote_role!~\"lifecycle\"}[$__range]) or
rate(banyandb_queue_sub_received_bytes{job=~\"$job\", remote_role=~\"$role\",
remote_node=~\"($pod)\\\\..*\", remote_role!~\"lifecycle\"}[$__range]),
\"source\", \"$1\", \"remote_node\", \"([^.:]+).*\"), \"target\", \"$1\",
\"pod_name\", \"(.*)\"))",
"format": "table",
"instant": true,
"legendFormat": "__auto",
@@ -1044,13 +1044,537 @@
],
"type": "table"
},
+ {
+ "datasource": {
+ "type": "prometheus",
+ "uid": "${DS_PROMETHEUS}"
+ },
+ "description": "Cross-tier lifecycle migration flows (hot→warm,
warm→cold). Each row is one directed source→target per (group, operation). Pub
side is the lifecycle sidecar inside the SOURCE data pod (publishes via
banyandb_lifecycle_migration_*); Sub side is the data pod RECEIVING (records
via banyandb_queue_sub_total_started{remote_role=\"lifecycle\"}).
Counts/bytes/errors are per-day (rate × 86400) because tier migration is a
daily-batch workload and an instant per-second rate i [...]
+ "fieldConfig": {
+ "defaults": {
+ "color": {
+ "mode": "thresholds"
+ },
+ "custom": {
+ "align": "auto",
+ "cellOptions": {
+ "type": "auto"
+ },
+ "filterable": false,
+ "inspect": false
+ },
+ "decimals": 2,
+ "mappings": [
+ {
+ "type": "special",
+ "options": {
+ "match": "nan",
+ "result": {
+ "text": "-",
+ "index": 0
+ }
+ }
+ }
+ ],
+ "noValue": "-",
+ "thresholds": {
+ "mode": "absolute",
+ "steps": [
+ {
+ "color": "text",
+ "value": null
+ }
+ ]
+ },
+ "unit": "short",
+ "overrides": [
+ {
+ "matcher": {
+ "id": "byName",
+ "options": "Pub msg/day"
+ },
+ "properties": [
+ {
+ "id": "unit",
+ "value": "short"
+ },
+ {
+ "id": "decimals",
+ "value": 2
+ }
+ ]
+ },
+ {
+ "matcher": {
+ "id": "byName",
+ "options": "Sub msg/day"
+ },
+ "properties": [
+ {
+ "id": "unit",
+ "value": "short"
+ },
+ {
+ "id": "decimals",
+ "value": 2
+ }
+ ]
+ },
+ {
+ "matcher": {
+ "id": "byName",
+ "options": "Pub err/day"
+ },
+ "properties": [
+ {
+ "id": "unit",
+ "value": "short"
+ },
+ {
+ "id": "decimals",
+ "value": 2
+ },
+ {
+ "id": "thresholds",
+ "value": {
+ "mode": "absolute",
+ "steps": [
+ {
+ "color": "text",
+ "value": null
+ },
+ {
+ "color": "red",
+ "value": 1e-06
+ }
+ ]
+ }
+ },
+ {
+ "id": "custom.cellOptions",
+ "value": {
+ "type": "color-text"
+ }
+ }
+ ]
+ },
+ {
+ "matcher": {
+ "id": "byName",
+ "options": "Sub err/day"
+ },
+ "properties": [
+ {
+ "id": "unit",
+ "value": "short"
+ },
+ {
+ "id": "decimals",
+ "value": 2
+ },
+ {
+ "id": "thresholds",
+ "value": {
+ "mode": "absolute",
+ "steps": [
+ {
+ "color": "text",
+ "value": null
+ },
+ {
+ "color": "red",
+ "value": 1e-06
+ }
+ ]
+ }
+ },
+ {
+ "id": "custom.cellOptions",
+ "value": {
+ "type": "color-text"
+ }
+ }
+ ]
+ },
+ {
+ "matcher": {
+ "id": "byName",
+ "options": "Pub B/day"
+ },
+ "properties": [
+ {
+ "id": "unit",
+ "value": "bytes"
+ }
+ ]
+ },
+ {
+ "matcher": {
+ "id": "byName",
+ "options": "Sub B/day"
+ },
+ "properties": [
+ {
+ "id": "unit",
+ "value": "bytes"
+ }
+ ]
+ }
+ ]
+ },
+ "overrides": [
+ {
+ "matcher": {
+ "id": "byName",
+ "options": "Pub p99"
+ },
+ "properties": [
+ {
+ "id": "unit",
+ "value": "s"
+ },
+ {
+ "id": "decimals",
+ "value": 3
+ }
+ ]
+ },
+ {
+ "matcher": {
+ "id": "byName",
+ "options": "Sub p99"
+ },
+ "properties": [
+ {
+ "id": "unit",
+ "value": "s"
+ },
+ {
+ "id": "decimals",
+ "value": 3
+ }
+ ]
+ },
+ {
+ "matcher": {
+ "id": "byName",
+ "options": "Pub B/s"
+ },
+ "properties": [
+ {
+ "id": "unit",
+ "value": "Bps"
+ }
+ ]
+ },
+ {
+ "matcher": {
+ "id": "byName",
+ "options": "Sub B/s"
+ },
+ "properties": [
+ {
+ "id": "unit",
+ "value": "Bps"
+ }
+ ]
+ },
+ {
+ "matcher": {
+ "id": "byName",
+ "options": "Pub err/s"
+ },
+ "properties": [
+ {
+ "id": "unit",
+ "value": "ops"
+ },
+ {
+ "id": "custom.cellOptions",
+ "value": {
+ "type": "color-text"
+ }
+ },
+ {
+ "id": "thresholds",
+ "value": {
+ "mode": "absolute",
+ "steps": [
+ {
+ "color": "text",
+ "value": null
+ },
+ {
+ "color": "red",
+ "value": 1e-06
+ }
+ ]
+ }
+ }
+ ]
+ },
+ {
+ "matcher": {
+ "id": "byName",
+ "options": "Sub err/s"
+ },
+ "properties": [
+ {
+ "id": "unit",
+ "value": "ops"
+ },
+ {
+ "id": "custom.cellOptions",
+ "value": {
+ "type": "color-text"
+ }
+ },
+ {
+ "id": "thresholds",
+ "value": {
+ "mode": "absolute",
+ "steps": [
+ {
+ "color": "text",
+ "value": null
+ },
+ {
+ "color": "red",
+ "value": 1e-06
+ }
+ ]
+ }
+ }
+ ]
+ },
+ {
+ "matcher": {
+ "id": "byName",
+ "options": "Source"
+ },
+ "properties": [
+ {
+ "id": "custom.filterable",
+ "value": true
+ }
+ ]
+ },
+ {
+ "matcher": {
+ "id": "byName",
+ "options": "Target"
+ },
+ "properties": [
+ {
+ "id": "custom.filterable",
+ "value": true
+ }
+ ]
+ },
+ {
+ "matcher": {
+ "id": "byName",
+ "options": "Operation"
+ },
+ "properties": [
+ {
+ "id": "custom.filterable",
+ "value": true
+ }
+ ]
+ }
+ ]
+ },
+ "gridPos": {
+ "h": 13,
+ "w": 24,
+ "x": 0,
+ "y": 30
+ },
+ "id": 62,
+ "options": {
+ "cellHeight": "sm",
+ "footer": {
+ "countRows": false,
+ "fields": "",
+ "reducer": [
+ "sum"
+ ],
+ "show": false
+ },
+ "showHeader": true,
+ "sortBy": [
+ {
+ "desc": true,
+ "displayName": "Pub msg/s"
+ }
+ ]
+ },
+ "pluginVersion": "11.2.0",
+ "targets": [
+ {
+ "datasource": {
+ "type": "prometheus",
+ "uid": "${DS_PROMETHEUS}"
+ },
+ "editorMode": "code",
+ "exemplar": false,
+ "format": "table",
+ "instant": true,
+ "legendFormat": "__auto",
+ "range": false,
+ "refId": "A",
+ "expr": "sum by (source, target, operation)
(label_replace(label_replace(rate(banyandb_lifecycle_migration_total_finished{job=~\"$job\",
container_name=~\"$role\", pod_name=~\"$pod\"}[$__range]) * 86400 or
rate(banyandb_lifecycle_migration_total_finished{job=~\"$job\",
remote_role=~\"$role\", remote_node=~\"($pod)\\\\..*\"}[$__range]) * 86400,
\"source\", \"$1\", \"pod_name\", \"(.*)\"), \"target\", \"$1\",
\"remote_node\", \"([^.:]+).*\"))"
+ },
+ {
+ "datasource": {
+ "type": "prometheus",
+ "uid": "${DS_PROMETHEUS}"
+ },
+ "editorMode": "code",
+ "exemplar": false,
+ "format": "table",
+ "instant": true,
+ "legendFormat": "__auto",
+ "range": false,
+ "refId": "B",
+ "expr": "sum by (source, target, operation)
(label_replace(label_replace(rate(banyandb_queue_sub_total_started{job=~\"$job\",
container_name=~\"$role\", pod_name=~\"$pod\",
remote_role=\"lifecycle\"}[$__range]) * 86400 or
rate(banyandb_queue_sub_total_started{job=~\"$job\", remote_role=\"lifecycle\",
remote_node=~\"($pod)\\\\..*\"}[$__range]) * 86400, \"source\", \"$1\",
\"remote_node\", \"([^.:]+).*\"), \"target\", \"$1\", \"pod_name\", \"(.*)\"))"
+ },
+ {
+ "datasource": {
+ "type": "prometheus",
+ "uid": "${DS_PROMETHEUS}"
+ },
+ "editorMode": "code",
+ "exemplar": false,
+ "format": "table",
+ "instant": true,
+ "legendFormat": "__auto",
+ "range": false,
+ "refId": "C",
+ "expr": "histogram_quantile(0.99, sum by (le, source, target,
operation)
(label_replace(label_replace(rate(banyandb_lifecycle_migration_total_latency_bucket{job=~\"$job\",
container_name=~\"$role\", pod_name=~\"$pod\"}[$__range]) or
rate(banyandb_lifecycle_migration_total_latency_bucket{job=~\"$job\",
remote_role=~\"$role\", remote_node=~\"($pod)\\\\..*\"}[$__range]), \"source\",
\"$1\", \"pod_name\", \"(.*)\"), \"target\", \"$1\", \"remote_node\",
\"([^.:]+).*\")))"
+ },
+ {
+ "datasource": {
+ "type": "prometheus",
+ "uid": "${DS_PROMETHEUS}"
+ },
+ "editorMode": "code",
+ "exemplar": false,
+ "format": "table",
+ "instant": true,
+ "legendFormat": "__auto",
+ "range": false,
+ "refId": "D",
+ "expr": "histogram_quantile(0.99, sum by (le, source, target,
operation)
(label_replace(label_replace(rate(banyandb_queue_sub_total_latency_bucket{job=~\"$job\",
container_name=~\"$role\", pod_name=~\"$pod\",
remote_role=\"lifecycle\"}[$__range]) or
rate(banyandb_queue_sub_total_latency_bucket{job=~\"$job\",
remote_role=\"lifecycle\", remote_node=~\"($pod)\\\\..*\"}[$__range]),
\"source\", \"$1\", \"remote_node\", \"([^.:]+).*\"), \"target\", \"$1\",
\"pod_name\", \"(.*)\")))"
+ },
+ {
+ "datasource": {
+ "type": "prometheus",
+ "uid": "${DS_PROMETHEUS}"
+ },
+ "editorMode": "code",
+ "exemplar": false,
+ "format": "table",
+ "instant": true,
+ "legendFormat": "__auto",
+ "range": false,
+ "refId": "E",
+ "expr": "sum by (source, target, operation)
(label_replace(label_replace(rate(banyandb_lifecycle_migration_total_err{job=~\"$job\",
container_name=~\"$role\", pod_name=~\"$pod\"}[$__range]) * 86400 or
rate(banyandb_lifecycle_migration_total_err{job=~\"$job\",
remote_role=~\"$role\", remote_node=~\"($pod)\\\\..*\"}[$__range]) * 86400,
\"source\", \"$1\", \"pod_name\", \"(.*)\"), \"target\", \"$1\",
\"remote_node\", \"([^.:]+).*\"))"
+ },
+ {
+ "datasource": {
+ "type": "prometheus",
+ "uid": "${DS_PROMETHEUS}"
+ },
+ "editorMode": "code",
+ "exemplar": false,
+ "format": "table",
+ "instant": true,
+ "legendFormat": "__auto",
+ "range": false,
+ "refId": "F",
+ "expr": "sum by (source, target, operation)
(label_replace(label_replace(rate(banyandb_queue_sub_total_err{job=~\"$job\",
container_name=~\"$role\", pod_name=~\"$pod\",
remote_role=\"lifecycle\"}[$__range]) * 86400 or
rate(banyandb_queue_sub_total_err{job=~\"$job\", remote_role=\"lifecycle\",
remote_node=~\"($pod)\\\\..*\"}[$__range]) * 86400, \"source\", \"$1\",
\"remote_node\", \"([^.:]+).*\"), \"target\", \"$1\", \"pod_name\", \"(.*)\"))"
+ },
+ {
+ "datasource": {
+ "type": "prometheus",
+ "uid": "${DS_PROMETHEUS}"
+ },
+ "editorMode": "code",
+ "exemplar": false,
+ "format": "table",
+ "instant": true,
+ "legendFormat": "__auto",
+ "range": false,
+ "refId": "G",
+ "expr": "sum by (source, target, operation)
(label_replace(label_replace(rate(banyandb_lifecycle_migration_sent_bytes{job=~\"$job\",
container_name=~\"$role\", pod_name=~\"$pod\"}[$__range]) * 86400 or
rate(banyandb_lifecycle_migration_sent_bytes{job=~\"$job\",
remote_role=~\"$role\", remote_node=~\"($pod)\\\\..*\"}[$__range]) * 86400,
\"source\", \"$1\", \"pod_name\", \"(.*)\"), \"target\", \"$1\",
\"remote_node\", \"([^.:]+).*\"))"
+ },
+ {
+ "datasource": {
+ "type": "prometheus",
+ "uid": "${DS_PROMETHEUS}"
+ },
+ "editorMode": "code",
+ "exemplar": false,
+ "format": "table",
+ "instant": true,
+ "legendFormat": "__auto",
+ "range": false,
+ "refId": "H",
+ "expr": "sum by (source, target, operation)
(label_replace(label_replace(rate(banyandb_queue_sub_received_bytes{job=~\"$job\",
container_name=~\"$role\", pod_name=~\"$pod\",
remote_role=\"lifecycle\"}[$__range]) * 86400 or
rate(banyandb_queue_sub_received_bytes{job=~\"$job\",
remote_role=\"lifecycle\", remote_node=~\"($pod)\\\\..*\"}[$__range]) * 86400,
\"source\", \"$1\", \"remote_node\", \"([^.:]+).*\"), \"target\", \"$1\",
\"pod_name\", \"(.*)\"))"
+ }
+ ],
+ "title": "Migration Flows — Tier Migrations (lifecycle)",
+ "transformations": [
+ {
+ "id": "merge",
+ "options": {}
+ },
+ {
+ "id": "organize",
+ "options": {
+ "excludeByName": {
+ "Time": true
+ },
+ "indexByName": {
+ "source": 0,
+ "target": 1,
+ "operation": 2,
+ "Value #A": 3,
+ "Value #B": 4,
+ "Value #C": 5,
+ "Value #D": 6,
+ "Value #E": 7,
+ "Value #F": 8,
+ "Value #G": 9,
+ "Value #H": 10
+ },
+ "renameByName": {
+ "source": "Source",
+ "target": "Target",
+ "operation": "Operation",
+ "Value #A": "Pub msg/day",
+ "Value #B": "Sub msg/day",
+ "Value #C": "Pub p99",
+ "Value #D": "Sub p99",
+ "Value #E": "Pub err/day",
+ "Value #F": "Sub err/day",
+ "Value #G": "Pub B/day",
+ "Value #H": "Sub B/day"
+ }
+ }
+ }
+ ],
+ "type": "table"
+ },
{
"collapsed": false,
"gridPos": {
"h": 1,
"w": 24,
"x": 0,
- "y": 16
+ "y": 43
},
"id": 13,
"panels": [],
@@ -1119,7 +1643,7 @@
"h": 8,
"w": 12,
"x": 0,
- "y": 17
+ "y": 44
},
"id": 14,
"options": {
@@ -1220,7 +1744,7 @@
"h": 8,
"w": 12,
"x": 12,
- "y": 17
+ "y": 44
},
"id": 15,
"options": {
@@ -1325,7 +1849,7 @@
"h": 8,
"w": 12,
"x": 0,
- "y": 25
+ "y": 52
},
"id": 16,
"options": {
@@ -1430,7 +1954,7 @@
"h": 8,
"w": 12,
"x": 12,
- "y": 25
+ "y": 52
},
"id": 17,
"options": {
@@ -1531,7 +2055,7 @@
"h": 8,
"w": 24,
"x": 0,
- "y": 33
+ "y": 60
},
"id": 18,
"options": {
@@ -1588,7 +2112,7 @@
"h": 1,
"w": 24,
"x": 0,
- "y": 41
+ "y": 68
},
"id": 19,
"panels": [],
@@ -1657,7 +2181,7 @@
"h": 8,
"w": 12,
"x": 0,
- "y": 42
+ "y": 69
},
"id": 20,
"options": {
@@ -1758,7 +2282,7 @@
"h": 8,
"w": 12,
"x": 12,
- "y": 42
+ "y": 69
},
"id": 21,
"options": {
@@ -1863,7 +2387,7 @@
"h": 8,
"w": 24,
"x": 0,
- "y": 50
+ "y": 77
},
"id": 22,
"options": {
@@ -1908,7 +2432,7 @@
"h": 1,
"w": 24,
"x": 0,
- "y": 58
+ "y": 85
},
"id": 50,
"panels": [],
@@ -1977,7 +2501,7 @@
"h": 8,
"w": 12,
"x": 0,
- "y": 59
+ "y": 86
},
"id": 51,
"options": {
@@ -2078,7 +2602,7 @@
"h": 8,
"w": 12,
"x": 12,
- "y": 59
+ "y": 86
},
"id": 52,
"options": {
@@ -2179,7 +2703,7 @@
"h": 8,
"w": 12,
"x": 0,
- "y": 67
+ "y": 94
},
"id": 53,
"options": {
@@ -2293,7 +2817,7 @@
"h": 8,
"w": 12,
"x": 12,
- "y": 67
+ "y": 94
},
"id": 54,
"options": {
@@ -2440,7 +2964,7 @@
]
},
"time": {
- "from": "now-1h",
+ "from": "now-24h",
"to": "now"
},
"timepicker": {},
diff --git a/docs/operation/grafana-fodc-workload.json
b/docs/operation/grafana-fodc-workload.json
index f52c7cc83..bd095b12a 100644
--- a/docs/operation/grafana-fodc-workload.json
+++ b/docs/operation/grafana-fodc-workload.json
@@ -3491,7 +3491,7 @@
"uid": "${DS_PROMETHEUS}"
},
"editorMode": "code",
- "expr": "banyandb_lifecycle_last_run_success{job=~\"$job\",
container_name=~\"$role\", pod_name=~\"$pod\"}",
+ "expr": "banyandb_lifecycle_last_run_success{job=~\"$job\",
container_name=~\"$role\", pod_name=~\"$pod\", group=~\"$group\"}",
"instant": false,
"legendFormat": "{{pod_name}}",
"range": true,
@@ -3555,7 +3555,7 @@
"uid": "${DS_PROMETHEUS}"
},
"editorMode": "code",
- "expr":
"banyandb_lifecycle_last_run_timestamp_seconds{job=~\"$job\",
container_name=~\"$role\", pod_name=~\"$pod\"}",
+ "expr":
"banyandb_lifecycle_last_run_timestamp_seconds{job=~\"$job\",
container_name=~\"$role\", pod_name=~\"$pod\", group=~\"$group\"}",
"instant": false,
"legendFormat": "{{pod_name}}",
"range": true,
diff --git a/test/cases/lifecycle/lifecycle.go
b/test/cases/lifecycle/lifecycle.go
index dee5ce05d..7271f941e 100644
--- a/test/cases/lifecycle/lifecycle.go
+++ b/test/cases/lifecycle/lifecycle.go
@@ -252,16 +252,29 @@ func verifyMigrationMetrics(reg
observability.MetricsRegistry) {
body := rec.Body.String()
// Last-run metrics (banyandb_lifecycle_last_run_timestamp_seconds +
// banyandb_lifecycle_last_run_success) are stamped by the deferred
- // recordLastRun() at the end of action(). A successful cycle must set
+ // recordLastRun() at the end of action() with the (remote_node,
+ // remote_role, remote_tier, group) label set, sourced from the
+ // cycle's last processed group. A successful cycle must set
// success=1 with a non-zero epoch; an empty value would mean the
// gauges were never registered (PreRun not run) or the action never
// reached the defer. Prometheus emits floats in scientific notation
// for large values like epoch seconds (e.g. 1.781007822e+09), so the
- // assertion accepts either fixed or scientific form.
-
gomega.Expect(body).To(gomega.MatchRegexp(`(?m)^banyandb_lifecycle_last_run_timestamp_seconds
(?:[1-9]\d{9}|[1-9]\.\d+e\+0?[89])`),
- "banyandb_lifecycle_last_run_timestamp_seconds must be set to a
non-zero epoch, got:\n"+body)
-
gomega.Expect(body).To(gomega.MatchRegexp(`(?m)^banyandb_lifecycle_last_run_success
1$`),
- "banyandb_lifecycle_last_run_success must be 1 after a
successful cycle, got:\n"+body)
+ // assertion accepts either fixed or scientific form. The metric
+ // names now carry labels — Prometheus' exposition format sorts label
+ // names alphabetically (group, remote_node, remote_role, remote_tier),
+ // and the regex requires all four to be present so a regression to
+ // the unlabeld form fails the regex. The label block is captured as
+ // a single `[^}]*` then each required label is asserted with a
+ // lookbehind-style positive check; an explicit per-label regex would
+ // be more readable but the leading-label-alphabetical-ordering
+ // invariant lets us keep the assertion to a single MatchRegexp.
+ allLabels :=
`group="[^"]*"[^}]*remote_node="[^"]*"[^}]*remote_role="[^"]*"[^}]*remote_tier="[^"]*"`
+ gomega.Expect(body).To(gomega.MatchRegexp(
+
`(?m)^banyandb_lifecycle_last_run_timestamp_seconds\{`+allLabels+`\}
(?:[1-9]\d{9}|[1-9]\.\d+e\+0?[89])`),
+ "banyandb_lifecycle_last_run_timestamp_seconds must be set to a
non-zero epoch with all four labels, got:\n"+body)
+ gomega.Expect(body).To(gomega.MatchRegexp(
+ `(?m)^banyandb_lifecycle_last_run_success\{`+allLabels+`\} 1$`),
+ "banyandb_lifecycle_last_run_success must be 1 after a
successful cycle, with all four labels, got:\n"+body)
// A successful migration send increments total_finished; the
measure/stream/trace
// part files are sent via the file-sync operation, so that label must
be present.
gomega.Expect(body).To(gomega.MatchRegexp(`banyandb_lifecycle_migration_total_finished\{[^}]*\}
[1-9]`),
@@ -652,8 +665,33 @@ func crossSegmentTimestamps() (single, left, right
time.Time) {
// at runtime — no extra CLI flags needed beyond what the test setup already
// passes via SharedContext.MetadataFlags. See deriveSelfIdentity in
// banyand/backup/lifecycle/steps.go for the resolution rules.
+// runLifecycleMigration runs a single hot->warm lifecycle migration, pointing
+// the lifecycle service at the co-located data node. Returns the
MetricsRegistry
+// the lifecycle service registered its metrics with so the test can scrape
them.
+//
+// The integration test cluster has the data node bound to "localhost"
+// (pkg/test/setup/setup.go:host = "localhost") and its GrpcAddress
+// registered as `localhost:<port>`. The lifecycle CLI's resolveSelfIdentity
+// matches selfPodHost against the host portion of the registered
+// GrpcAddress, so we set POD_NAME=localhost for the duration of the
+// call (and restore the prior value on exit) so selfPodHostname()
+// returns "localhost" and matches the data node. In production this
+// is set by the K8s downward API to the lifecycle pod's actual pod
+// name (e.g. demo-banyandb-data-hot-0); the integration test uses
+// "localhost" because the data node's bind address is the loopback.
func runLifecycleMigration(progressFile, reportDir string)
observability.MetricsRegistry {
lifecycleCmd, reg := lifecycle.NewCommandWithRegistry()
+ priorPodName, priorHad := os.LookupEnv("POD_NAME")
+ if err := os.Setenv("POD_NAME", "localhost"); err != nil {
+ gomega.Expect(err).NotTo(gomega.HaveOccurred())
+ }
+ defer func() {
+ if priorHad {
+ _ = os.Setenv("POD_NAME", priorPodName)
+ } else {
+ _ = os.Unsetenv("POD_NAME")
+ }
+ }()
args := []string{
"--grpc-addr", SharedContext.DataAddr,
"--stream-root-path", SharedContext.SrcDir,
diff --git a/test/e2e-v2/cases/fodc/metrics/documented_gap.txt
b/test/e2e-v2/cases/fodc/metrics/documented_gap.txt
index ce8099392..3bf4b494c 100644
--- a/test/e2e-v2/cases/fodc/metrics/documented_gap.txt
+++ b/test/e2e-v2/cases/fodc/metrics/documented_gap.txt
@@ -42,3 +42,7 @@ banyandb_trace_tst_total_merged_parts
# e2e test cluster, so these metrics are never exported within the verify
window.
banyandb_lifecycle_last_run_timestamp_seconds
banyandb_lifecycle_last_run_success
+banyandb_lifecycle_migration_sent_bytes
+banyandb_lifecycle_migration_total_err
+banyandb_lifecycle_migration_total_finished
+banyandb_lifecycle_migration_total_latency_bucket