This is an automated email from the ASF dual-hosted git repository.
hanahmily pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/skywalking-banyandb.git
The following commit(s) were added to refs/heads/main by this push:
new 8a1936ce9 feat(lifecycle): stamp sender identity and add last-run
gauges (#1167)
8a1936ce9 is described below
commit 8a1936ce96653e89d3d13250a42abc6e3d42fae7
Author: Gao Hongtao <[email protected]>
AuthorDate: Wed Jun 10 07:36:00 2026 +0800
feat(lifecycle): stamp sender identity and add last-run gauges (#1167)
* feat(lifecycle): stamp sender identity on migration publisher
The lifecycle's tier-migration publisher (built via pub.NewWithoutMetadata)
sets up the banyandb_lifecycle_migration_* family but never calls
SetSelfNode on it. The wire SendRequest therefore carries empty
SenderNode/SenderRole/SenderTier, so the data node's queue_sub metrics
record an uninformative empty sender:
banyandb_queue_sub_total_finished{
remote_node="", remote_role="", remote_tier="", ...
}
The liaison already does this in pkg/cmdsetup/liaison.go:170-171:
setter.SetSelfNode(node.NodeID, "liaison", liaisonTier)
The lifecycle should do the same. It runs as a sidecar to a data node
and inherits --node-labels and the co-located data node's gRPC address
(`--grpc-addr`), so all three sender values are derivable from already-
known inputs — no new CLI flags are introduced.
Resolution (deriveSelfIdentity in steps.go):
1. Match the data node whose GrpcAddress equals the lifecycle's
--grpc-addr (the production sidecar path; the live cluster's
lifecycle container has no --node-labels, so this is the only
way to find its co-located data node).
2. Fall back to label-superset match against --node-labels.
3. Fall back to type-only match on nodeLabels["type"].
4. Empty fallthrough preserves the pre-fix no-op behavior.
parseGroup gains one internal parameter (coLocatedDataNodeAddr,
threaded from the existing l.gRPCAddr field) and calls
client.SetSelfNode(senderNode, "lifecycle", senderTier).
Test setup:
* DataNode* helpers in pkg/test/setup/setup.go allocate a 3rd port
(4th if a schema server runs) and pass --http-port so tests can
scrape the data node's /metrics endpoint.
* The data node's HTTP router (banyand/queue/sub/server.go) mounts
/metrics when the metrics registry exposes a Prometheus handler.
* LifecycleSharedContext gains DataHTTPURL and WarmHTTPURL.
* Test /cases/lifecycle/lifecycle.go scrapes the data node's
/metrics and asserts the receiver's banyandb_queue_sub_total_finished
series carry populated remote_node / remote_role="lifecycle"
/ remote_tier="<co-located tier>" — the direct end-to-end
evidence that the wire messages round-tripped with sender
identity.
via HAPI
Co-Authored-By: HAPI <[email protected]>
Co-Authored-By: Claude Opus 4.8 <[email protected]>
* chore: fix lint issues
Auto-generated formatting/field-alignment/nolint fixes from make lint:
- banyand/backup/lifecycle/steps.go: add blank line before
deriveSelfIdentity
- pkg/test/helpers/constant.go: field alignment in LifecycleSharedContext
- pkg/test/setup/setup.go: dogsled nolint on DataNode's 3-blank return
- test/cases/lifecycle/lifecycle.go: drop obsolete --lifecycle-node-id
reference
Co-Authored-By: Claude Opus 4.8 <[email protected]>
* feat(lifecycle): add last_run_timestamp_seconds and last_run_success
gauges
Two new banyandb_lifecycle_last_run_* gauges:
* last_run_timestamp_seconds — epoch (in seconds) when the most
recent migration cycle started; updated at the end of action()
via a defer so success and error paths both stamp it.
* last_run_success — 1 on nil error, 0 otherwise; combined with
the timestamp it gives dashboards an at-a-glance
"is the lifecycle healthy" signal.
The recordLastRun helper is split out from action() so unit tests
can exercise the success/failure/nil-gauge branches without setting
up a real cluster. The metrics_test.go gains three hermetic tests
that use a recordingGauge stub.
Integration test asserts both series on the lifecycle's /metrics
handler (real scrape), guarding against accidental PreRun or defer
removal.
Co-Authored-By: Claude Opus 4.8 <[email protected]>
* docs: add CHANGES.md entries for lifecycle sender identity and last-run
gauges
Add two 0.11.0 Features entries:
* Lifecycle migration publisher stamps self identity (sender
node/role/tier) on the wire so the receiving data node's
banyandb_queue_sub_total_finished series carry non-empty
remote_* labels. Derivation: --grpc-addr (co-located data
node) → GrpcAddress match in the data-node registry → Metadata.Name
/ Labels["type"]; mirrors the liaison's existing SetSelfNode call
in pkg/cmdsetup/liaison.go.
* banyandb_lifecycle_last_run_timestamp_seconds and
banyandb_lifecycle_last_run_success gauges for at-a-glance
lifecycle health dashboards. Both stamped by a defer at the
end of action() so every return path updates the pair
atomically.
Co-Authored-By: Claude Opus 4.8 <[email protected]>
* fix: address copilot review comments on PR #1167
Three comments addressed:
1. test/cases/lifecycle/lifecycle.go:325 — add a 5s http.Client
timeout on the data-node /metrics scrape so a half-open TCP
socket or stuck data-node HTTP handler cannot stall the spec
indefinitely. (The data node is a local process that serves
/metrics in <100ms in practice.)
2. test/cases/lifecycle/lifecycle.go:340 — the verification
comment referenced --lifecycle-node-id and --lifecycle-tier,
flags that this PR explicitly removes. Rewrote it to describe
the actual derivation path: --grpc-addr → GrpcAddress match in
the data-node registry → Metadata.Name / Labels["type"]
become sender_node / sender_tier.
3. banyand/queue/local.go:62 — SetSelfNode is on the Client
interface, not Queue. Fixed the comment to say "implements
Client" so the interface contract isn't misleading.
The fourth comment (Set(value, nil...) doesn't compile against
Set(value float64, labelValues ...string)) is incorrect: nil is a
valid []string value and the variadic parameter accepts it. Verified
standalone and via the passing build + tests.
Co-Authored-By: Claude Opus 4.8 <[email protected]>
---------
Co-authored-by: Claude <[email protected]>
Co-authored-by: HAPI <[email protected]>
Co-authored-by: Claude Opus 4.8 <[email protected]>
---
CHANGES.md | 2 +
banyand/backup/lifecycle/metrics_test.go | 69 ++++++++++++
banyand/backup/lifecycle/service.go | 58 ++++++++--
banyand/backup/lifecycle/steps.go | 95 +++++++++++++++++
banyand/backup/lifecycle/steps_test.go | 2 +-
banyand/queue/local.go | 5 +
banyand/queue/queue.go | 7 ++
banyand/queue/sub/server.go | 8 ++
bydbctl/internal/cmd/property_test.go | 12 +--
.../internal/integration/integration_suite_test.go | 2 +-
pkg/test/helpers/constant.go | 16 ++-
pkg/test/setup/setup.go | 37 ++++---
test/cases/lifecycle/lifecycle.go | 118 +++++++++++++++++++++
test/integration/distributed/backup/common.go | 2 +-
.../distributed/cluster_state/common.go | 2 +-
.../integration/distributed/deletion/suite_test.go | 4 +-
test/integration/distributed/lifecycle/common.go | 10 +-
test/integration/distributed/schema/common.go | 4 +-
.../sync_retry/sync_retry_suite_test.go | 2 +-
.../replication/measure_normal_replication_test.go | 2 +-
.../replication/replication_suite_test.go | 4 +-
.../replication/stream_replication_test.go | 2 +-
.../replication/trace_replication_test.go | 2 +-
23 files changed, 415 insertions(+), 50 deletions(-)
diff --git a/CHANGES.md b/CHANGES.md
index 5c3d9f315..efc3d75a5 100644
--- a/CHANGES.md
+++ b/CHANGES.md
@@ -6,6 +6,8 @@ Release Notes.
### Features
- Redesign the queue (`queue_pub`/`queue_sub`) metrics around a uniform model:
keep only `total_started`, `total_finished`, `total_latency` (now a histogram)
and `total_err`, plus file-sync-only `sent_bytes` (pub) / `received_bytes`
(sub). Replace the `topic` label with `operation`
(`batch-write`/`file-sync`/`query`/`control`) and `group`, add an `error_type`
label on `total_err`, and add remote-endpoint labels
(`remote_node`/`remote_role`/`remote_tier`) so the liaison↔data (hot/warm/col
[...]
+- Stamp the lifecycle's tier-migration publisher's identity onto the wire so
the receiving data node records a non-empty
`remote_node`/`remote_role`/`remote_tier` on its
`banyandb_queue_sub_total_finished` series. The lifecycle's `parseGroup`
resolves the lifecycle's self identity by matching its `--grpc-addr` (the
co-located data node's gRPC address) against the data-node registry —
`Metadata.Name` becomes `remote_node`, `Labels["type"]` becomes `remote_tier` —
and calls `SetSelfNode(se [...]
+- Add `banyandb_lifecycle_last_run_timestamp_seconds` and
`banyandb_lifecycle_last_run_success` gauges to the lifecycle service for
at-a-glance health monitoring. `last_run_timestamp_seconds` records the
wall-clock epoch (in seconds) of the most recent migration cycle;
`last_run_success` is `1` on a nil error and `0` otherwise. Both are stamped by
a `defer` at the end of `action()` so every return path (success, error,
recovered panic) updates the pair atomically — dashboards can pin an [...]
- Vectorized measure query path is now enabled by default. The columnar
pipeline replaces per-row protobuf serialization in `NewMIterator`, cutting
allocations and ns/op for scan-heavy measure queries; gRPC wire format
(`*measurev1.InternalDataPoint`) is byte-identical. Single-node coverage is
complete: scan, GroupBy+Agg via `BatchAggregation`, scalar reduce (`Agg`
without `GroupBy`), raw `GroupBy` (without `Agg`), implicit projection coverage
for GroupBy/Agg fields, `TopN`/`BottomN`, `o [...]
- Add validation to ensure Measure's ShardingKey contains all Entity tags to
guarantee entity locality.
- Organize access logs under a dedicated "accesslog" subdirectory to improve
log organization and separation from other application data.
diff --git a/banyand/backup/lifecycle/metrics_test.go
b/banyand/backup/lifecycle/metrics_test.go
index 5b764184c..1e0247ccc 100644
--- a/banyand/backup/lifecycle/metrics_test.go
+++ b/banyand/backup/lifecycle/metrics_test.go
@@ -19,9 +19,11 @@ package lifecycle
import (
"context"
+ "errors"
"net/http"
"net/http/httptest"
"testing"
+ "time"
"github.com/stretchr/testify/require"
@@ -105,3 +107,70 @@ func TestBuildHTTPRouterWithoutPromHandler(t *testing.T) {
router.ServeHTTP(rec, httptest.NewRequest(http.MethodGet, "/metrics",
nil))
require.Equal(t, http.StatusNotFound, rec.Code)
}
+
+// recordingGauge captures the last Set() call so a unit test can assert
+// the value the lifecycle would have emitted. The lifecycle uses the
+// real prometheus-backed Gauge in production; this stub keeps the test
+// hermetic.
+type recordingGauge struct {
+ lastValue float64
+ called int
+}
+
+func (g *recordingGauge) Set(v float64, _ ...string) {
+ g.lastValue = v
+ g.called++
+}
+
+func (g *recordingGauge) Add(_ float64, _ ...string) {}
+
+func (g *recordingGauge) Delete(_ ...string) bool { return true }
+
+// TestRecordLastRunSuccess stamps the gauges with the start time (epoch
+// seconds) and success=1 when the action returned nil. Asserts both the
+// integer epoch shape and the 0/1 success signal so dashboards can
+// distinguish a healthy last run from a failed one.
+func TestRecordLastRunSuccess(t *testing.T) {
+ tsGauge, okGauge := &recordingGauge{}, &recordingGauge{}
+ l := &lifecycleService{
+ lastRunTimestamp: tsGauge,
+ lastRunSuccess: okGauge,
+ }
+ start := time.Unix(1717929600, 0) // 2024-06-09T00:00:00Z, deterministic
+ l.recordLastRun(start, nil)
+
+ require.Equal(t, 1, tsGauge.called)
+ require.Equal(t, 1717929600.0, tsGauge.lastValue,
+ "lastRunTimestamp must record the start time as epoch seconds")
+ require.Equal(t, 1, okGauge.called)
+ require.Equal(t, 1.0, okGauge.lastValue,
+ "lastRunSuccess must be 1 on a nil error")
+}
+
+// TestRecordLastRunFailure stamps the gauges with success=0 when the action
+// returned an error. The timestamp is still set — operators want to know
+// "when did the last attempt happen, and did it succeed?".
+func TestRecordLastRunFailure(t *testing.T) {
+ tsGauge, okGauge := &recordingGauge{}, &recordingGauge{}
+ l := &lifecycleService{
+ lastRunTimestamp: tsGauge,
+ lastRunSuccess: okGauge,
+ }
+ start := time.Unix(1717929700, 0)
+ l.recordLastRun(start, errors.New("snapshot dir unavailable"))
+
+ require.Equal(t, 1717929700.0, tsGauge.lastValue)
+ require.Equal(t, 0.0, okGauge.lastValue,
+ "lastRunSuccess must be 0 on a non-nil error")
+}
+
+// TestRecordLastRunNilGaugesSafe ensures nil observability gauges (e.g. when
+// the metrics registry is BypassRegistry and never wired) don't crash
+// the deferred bookkeeping. This is the path the no-op PreRun takes.
+func TestRecordLastRunNilGaugesSafe(t *testing.T) {
+ l := &lifecycleService{}
+ require.NotPanics(t, func() {
+ l.recordLastRun(time.Now(), nil)
+ l.recordLastRun(time.Now(), errors.New("boom"))
+ })
+}
diff --git a/banyand/backup/lifecycle/service.go
b/banyand/backup/lifecycle/service.go
index 84526a6e3..73ac0c0b5 100644
--- a/banyand/backup/lifecycle/service.go
+++ b/banyand/backup/lifecycle/service.go
@@ -84,10 +84,18 @@ const (
type lifecycleService struct {
databasev1.UnimplementedClusterStateServiceServer
databasev1.UnimplementedNodeQueryServiceServer
- metadata metadata.Repo
- omr observability.MetricsRegistry
- pm protector.Memory
- cyclesTotal meter.Counter
+ metadata metadata.Repo
+ omr observability.MetricsRegistry
+ pm protector.Memory
+ cyclesTotal meter.Counter
+ // lastRunTimestamp records the wall-clock epoch (in seconds) of the
+ // most recent attempt to run a migration cycle, regardless of outcome.
+ // Updated at the end of action() in both the success and error paths.
+ lastRunTimestamp meter.Gauge
+ // lastRunSuccess records the outcome of the most recent migration
+ // cycle: 1 on success, 0 on error. Combined with lastRunTimestamp it
+ // gives dashboards an at-a-glance "is the lifecycle healthy" signal.
+ lastRunSuccess meter.Gauge
metricsClient queue.Client
grpcServer *grpclib.Server
httpSrv *http.Server
@@ -235,7 +243,10 @@ func (l *lifecycleService) PreRun(_ context.Context) error
{
// Safe to call With() here: the metrics registry is registered earlier
in the
// group, so its PreRun (which builds the provider) has already run.
- l.cyclesTotal =
l.omr.With(observability.RootScope.SubScope("lifecycle")).NewCounter("cycles_total")
+ lifecycleScope :=
l.omr.With(observability.RootScope.SubScope("lifecycle"))
+ l.cyclesTotal = lifecycleScope.NewCounter("cycles_total")
+ l.lastRunTimestamp =
lifecycleScope.NewGauge("last_run_timestamp_seconds")
+ l.lastRunSuccess = lifecycleScope.NewGauge("last_run_success")
if l.schedule != "" && l.lifecycleTLS {
var err error
@@ -538,10 +549,20 @@ func (l *lifecycleService) startServers() {
})
}
-func (l *lifecycleService) action(ctx context.Context) error {
+func (l *lifecycleService) action(ctx context.Context) (err error) {
if l.cyclesTotal != nil {
l.cyclesTotal.Inc(1)
}
+ // Stamp last-run metrics at the end of this cycle regardless of
outcome.
+ // Using defer keeps the success/error bookkeeping in one place even as
+ // the body grows new early returns; the metrics gauge Set()s observe
+ // the same time.Now() and the success flag, so dashboards see
consistent
+ // (timestamp, success) pairs. The named return value lets the defer
+ // observe whether the body succeeded.
+ runStart := time.Now()
+ defer func() {
+ l.recordLastRun(runStart, err)
+ }()
progress := LoadProgress(l.progressFilePath, l.l)
progress.ClearErrors()
@@ -638,6 +659,25 @@ func (l *lifecycleService) action(ctx context.Context)
error {
return fmt.Errorf("lifecycle migration partially completed, progress
file retained; %v groups not fully completed", notCompleteGroups)
}
+// recordLastRun stamps the banyandb_lifecycle_last_run_* gauges with the
+// start time (epoch seconds) and a 0/1 success flag. Called from the
+// deferred end-of-action block so every code path (success, error,
+// panic-recovered) updates both gauges. nil gauges are skipped so a
+// lifecycle run with a nil observability.MetricsRegistry (BypassRegistry)
+// doesn't crash.
+func (l *lifecycleService) recordLastRun(start time.Time, err error) {
+ if l.lastRunTimestamp != nil {
+ l.lastRunTimestamp.Set(float64(start.Unix()), nil...)
+ }
+ if l.lastRunSuccess != nil {
+ success := 0.0
+ if err == nil {
+ success = 1.0
+ }
+ l.lastRunSuccess.Set(success, nil...)
+ }
+}
+
// generateReport gathers detailed counts & errors from Progress, writes
comprehensive JSON file per run, and keeps only 5 latest.
func (l *lifecycleService) generateReport(p *Progress) {
reportDir := l.reportDir
@@ -1056,7 +1096,7 @@ func (l *lifecycleService) getGroupsToProcess(ctx
context.Context, progress *Pro
func (l *lifecycleService) processStreamGroup(ctx context.Context, g
*commonv1.Group,
streamDir string, nodes []*databasev1.Node, labels map[string]string,
progress *Progress,
) {
- group, err := parseGroup(g, labels, nodes, l.l, l.metadata,
l.clusterStateMgr, l.omr)
+ group, err := parseGroup(g, labels, nodes, l.l, l.metadata,
l.clusterStateMgr, l.omr, l.gRPCAddr)
if err != nil {
l.l.Error().Err(err).Msgf("failed to parse group %s",
g.Metadata.Name)
return
@@ -1176,7 +1216,7 @@ func (l *lifecycleService)
deleteExpiredStreamSegments(ctx context.Context, g *c
func (l *lifecycleService) processMeasureGroup(ctx context.Context, g
*commonv1.Group, measureDir string,
nodes []*databasev1.Node, labels map[string]string, progress *Progress,
) {
- group, err := parseGroup(g, labels, nodes, l.l, l.metadata,
l.clusterStateMgr, l.omr)
+ group, err := parseGroup(g, labels, nodes, l.l, l.metadata,
l.clusterStateMgr, l.omr, l.gRPCAddr)
if err != nil {
l.l.Error().Err(err).Msgf("failed to parse group %s",
g.Metadata.Name)
return
@@ -1283,7 +1323,7 @@ func (l *lifecycleService) deleteExpiredTraceSegments(ctx
context.Context, g *co
func (l *lifecycleService) processTraceGroup(ctx context.Context, g
*commonv1.Group, traceDir string,
nodes []*databasev1.Node, labels map[string]string, progress *Progress,
) {
- group, err := parseGroup(g, labels, nodes, l.l, l.metadata,
l.clusterStateMgr, l.omr)
+ group, err := parseGroup(g, labels, nodes, l.l, l.metadata,
l.clusterStateMgr, l.omr, l.gRPCAddr)
if err != nil {
l.l.Error().Err(err).Msgf("failed to parse group %s",
g.Metadata.Name)
return
diff --git a/banyand/backup/lifecycle/steps.go
b/banyand/backup/lifecycle/steps.go
index 66c1ef04e..d3c76c29d 100644
--- a/banyand/backup/lifecycle/steps.go
+++ b/banyand/backup/lifecycle/steps.go
@@ -84,6 +84,79 @@ func (l *lifecycleService) getSnapshots(ctx context.Context,
groups []*commonv1.
return streamDir, measureDir, traceDir, nil
}
+// deriveSelfIdentity returns the SenderNode and SenderTier the lifecycle
+// publisher should stamp on its wire SendRequest so the data-node receiver
+// can label its banyandb_queue_sub_* family accordingly.
+//
+// Inputs (no new CLI flags needed):
+//
+// - coLocatedDataNodeAddr: the lifecycle's --grpc-addr, which is the
+// gRPC address of the data node the lifecycle is co-located with
+// (sidecar topology). It is the authoritative match key.
+// - nodeLabels: the lifecycle's own --node-labels, used as a fallback
+// when the co-located data node hasn't synced to the metadata
+// registry yet (cold start of a freshly created lifecycle).
+// - nodes: the cluster's data-node registry (ROLE_DATA), which carries
+// both Metadata.Name (the BanyanDB NodeID) and Labels (e.g. type=hot).
+//
+// Resolution order:
+//
+// 1. Match by GrpcAddress. In the standard sidecar layout the lifecycle's
+// --grpc-addr equals the co-located data node's GrpcAddress, and
+// that data node's Metadata.Name is the BanyanDB NodeID the
+// receiver records as remote_node. This is the production path
+// (the live cluster's lifecycle container has no --node-labels).
+// 2. Fall back to label matching. If a data node's Labels are a
+// superset of nodeLabels (every key in nodeLabels matches), use
+// that node's Metadata.Name and Labels["type"].
+// 3. Fall back to type-only match on nodeLabels["type"].
+// 4. If nothing matches, return empty strings — preserves the
+// pre-fix behavior so existing test setups that don't populate
+// these fields keep working.
+func deriveSelfIdentity(coLocatedDataNodeAddr string, nodeLabels
map[string]string, nodes []*databasev1.Node) (senderNode, senderTier string) {
+ // Pass 1: GrpcAddress match (the production sidecar path).
+ if coLocatedDataNodeAddr != "" {
+ for _, n := range nodes {
+ if n.GrpcAddress == coLocatedDataNodeAddr {
+ return n.Metadata.Name, n.Labels["type"]
+ }
+ }
+ }
+ // Pass 2: every-key label match.
+ for _, n := range nodes {
+ if n.Labels == nil {
+ continue
+ }
+ if !labelsContain(n.Labels, nodeLabels) {
+ continue
+ }
+ return n.Metadata.Name, n.Labels["type"]
+ }
+ // Pass 3: type-only label match.
+ if wantType := nodeLabels["type"]; wantType != "" {
+ for _, n := range nodes {
+ if n.Labels == nil {
+ continue
+ }
+ if n.Labels["type"] == wantType {
+ return n.Metadata.Name, n.Labels["type"]
+ }
+ }
+ }
+ return "", ""
+}
+
+// labelsContain reports whether superset has every (k, v) pair in subset.
+// An empty subset matches anything.
+func labelsContain(superset, subset map[string]string) bool {
+ for k, v := range subset {
+ if superset[k] != v {
+ return false
+ }
+ }
+ return true
+}
+
// GroupConfig encapsulates the parsed lifecycle configuration for a Group.
// It contains all necessary information for migration and deletion operations.
type GroupConfig struct {
@@ -125,6 +198,7 @@ func parseGroup(
g *commonv1.Group, nodeLabels map[string]string, nodes
[]*databasev1.Node,
l *logger.Logger, metadata metadata.Repo, clusterStateMgr
*clusterStateManager,
omr observability.MetricsRegistry,
+ coLocatedDataNodeAddr string,
) (*GroupConfig, error) {
ro := g.ResourceOpts
if ro == nil {
@@ -193,6 +267,27 @@ func parseGroup(
return nil, fmt.Errorf("failed to initialize node selector for
group %s", g.Metadata.Name)
}
client := pub.NewWithoutMetadata(omr) //nolint:contextcheck // health
check goroutine uses context.Background()
+ // Stamp the lifecycle's self identity onto the publisher so the wire
+ // SenderNode / SenderRole / SenderTier fields and the parallel
+ // banyandb_lifecycle_migration_* labels are populated. The three
+ // values are derived from already-known inputs (the co-located data
+ // node's gRPC address and the cluster's data-node registry) so the
+ // fix needs no new CLI flags:
+ // - SenderNode = the data node whose GrpcAddress matches the
+ // lifecycle's --grpc-addr (i.e. the co-located data node). Its
+ // Metadata.Name is the BanyanDB NodeID the receiver records as
+ // remote_node.
+ // - SenderRole = "lifecycle" (no Role enum entry; matches the
+ // liaison's hard-coded "liaison" pattern in
pkg/cmdsetup/liaison.go).
+ // - SenderTier = the matched data node's `type` label
+ // (hot/warm/cold), which becomes the receiver's remote_tier.
+ // Falls back to the lifecycle's own --node-labels when the co-located
+ // data node isn't in the registry yet (cold start), and to empty
+ // when neither is available — preserving the pre-fix behavior.
+ senderNode, senderTier := deriveSelfIdentity(coLocatedDataNodeAddr,
nodeLabels, nodes)
+ if senderNode != "" || senderTier != "" {
+ client.SetSelfNode(senderNode, "lifecycle", senderTier)
+ }
switch g.Catalog {
case commonv1.Catalog_CATALOG_STREAM:
_ = grpc.NewClusterNodeRegistry(data.TopicStreamWrite, client,
nodeSel)
diff --git a/banyand/backup/lifecycle/steps_test.go
b/banyand/backup/lifecycle/steps_test.go
index bc14ab353..117fb072f 100644
--- a/banyand/backup/lifecycle/steps_test.go
+++ b/banyand/backup/lifecycle/steps_test.go
@@ -85,7 +85,7 @@ func TestParseGroup_RejectsMissingIntervals(t *testing.T) {
for _, c := range cases {
t.Run(c.name, func(t *testing.T) {
g := makeGroup(c.mutate)
- _, err := parseGroup(g, map[string]string{"type":
"warm"}, nil, nil, nil, nil, nil)
+ _, err := parseGroup(g, map[string]string{"type":
"warm"}, nil, nil, nil, nil, nil, "")
require.Error(t, err)
assert.Contains(t, err.Error(), c.errFrag)
})
diff --git a/banyand/queue/local.go b/banyand/queue/local.go
index 4e2f98d16..c9502be9e 100644
--- a/banyand/queue/local.go
+++ b/banyand/queue/local.go
@@ -59,6 +59,11 @@ func (l *local) GracefulStop() {
}
}
+// SetSelfNode implements Client. The local pipeline is in-process and has no
+// wire / sender stamping, so this is a no-op kept only to satisfy the Client
+// interface.
+func (l *local) SetSelfNode(_, _, _ string) {}
+
// Serve implements Queue.
func (l *local) Serve() run.StopNotify {
return l.stopCh
diff --git a/banyand/queue/queue.go b/banyand/queue/queue.go
index 4c48eb072..62944bd49 100644
--- a/banyand/queue/queue.go
+++ b/banyand/queue/queue.go
@@ -59,6 +59,13 @@ type Client interface {
route.TableProvider
NewBatchPublisher(timeout time.Duration) BatchPublisher
NewChunkedSyncClient(node string, chunkSize uint32) (ChunkedSyncClient,
error)
+ // SetSelfNode stamps the publisher's own node identity onto the wire
+ // (SenderNode / SenderRole / SenderTier on every SendRequest) and onto
+ // the banyandb_lifecycle_migration_* metric labels. Pass "" for any
+ // dimension the caller doesn't know (e.g. the lifecycle has no Role
+ // enum entry; the tier is untracked when the co-located data node's
+ // tier isn't known at boot time).
+ SetSelfNode(name, role, tier string)
// NewNodeSchemaStatusClient returns a client for the cluster.v1
// NodeSchemaStatusService against the named node, sharing the
underlying
// *grpc.ClientConn from this queue client's existing connection pool.
diff --git a/banyand/queue/sub/server.go b/banyand/queue/sub/server.go
index 02450ae37..72ca46d71 100644
--- a/banyand/queue/sub/server.go
+++ b/banyand/queue/sub/server.go
@@ -347,6 +347,14 @@ func (s *server) Serve() run.StopNotify {
}
mux := chi.NewRouter()
mux.Mount("/api", http.StripPrefix("/api", gwMux))
+ // Mount /metrics when the metrics registry exposes a Prometheus
handler.
+ // The data node's HTTP server doubles as a Prometheus scrape target, so
+ // monitoring tooling (and integration tests) can read
banyandb_queue_sub_*
+ // directly. The handler is only registered when omr is wired to a real
+ // registry; BypassRegistry / nil registries leave /metrics absent
(404).
+ if reg, ok := s.omr.(observability.PrometheusHandlerProvider); ok {
+ mux.Handle("/metrics", reg.PrometheusHandler())
+ }
s.httpSrv = &http.Server{
Addr: s.httpAddr,
Handler: mux,
diff --git a/bydbctl/internal/cmd/property_test.go
b/bydbctl/internal/cmd/property_test.go
index a1fb937f8..7377cc094 100644
--- a/bydbctl/internal/cmd/property_test.go
+++ b/bydbctl/internal/cmd/property_test.go
@@ -492,9 +492,9 @@ var _ = Describe("Property Cluster Operation", func() {
dfWriter := setup.NewDiscoveryFileWriter(dir)
config := setup.PropertyClusterConfig(dfWriter)
By("Starting data node 0")
- _, _, closeNode1 = setup.DataNodeFromDataDir(config, node1Dir)
+ _, _, _, closeNode1 = setup.DataNodeFromDataDir(config,
node1Dir)
By("Starting data node 1")
- _, _, closeNode2 = setup.DataNodeFromDataDir(config, node2Dir)
+ _, _, _, closeNode2 = setup.DataNodeFromDataDir(config,
node2Dir)
By("Starting liaison node")
_, liaisonHTTPAddr, closerLiaisonNode :=
setup.LiaisonNodeWithHTTP(config)
By("Initializing test cases")
@@ -653,9 +653,9 @@ var _ = Describe("Property Cluster background Repair
Operation", func() {
config := setup.PropertyClusterConfig(dfWriter)
By("Starting data node 0")
var node1Repair, node2Repair string
- node1ID, node1Repair, closeNode1 =
setup.DataNodeFromDataDir(config, node1Dir, "--property-repair-enabled=true")
+ node1ID, node1Repair, _, closeNode1 =
setup.DataNodeFromDataDir(config, node1Dir, "--property-repair-enabled=true")
By("Starting data node 1")
- node2ID, node2Repair, closeNode2 =
setup.DataNodeFromDataDir(config, node2Dir, "--property-repair-enabled=true")
+ node2ID, node2Repair, _, closeNode2 =
setup.DataNodeFromDataDir(config, node2Dir, "--property-repair-enabled=true")
By("Initializing test cases")
_, liaisonHTTPAddr, closerLiaisonNode :=
setup.LiaisonNodeWithHTTP(config)
addr = httpSchema + liaisonHTTPAddr
@@ -771,7 +771,7 @@ var _ = Describe("Property Cluster Resilience with 5 Data
Nodes", func() {
// Start 5 data nodes with short file discovery interval for
faster failure detection
for i := 0; i < nodeCount; i++ {
By(fmt.Sprintf("Starting data node %d", i))
- nodeIDs[i], nodeRepairAddrs[i], closeNodes[i] =
setup.DataNodeFromDataDir(clusterConfig, nodeDirs[i],
+ nodeIDs[i], nodeRepairAddrs[i], _, closeNodes[i] =
setup.DataNodeFromDataDir(clusterConfig, nodeDirs[i],
"--logging-level=debug",
"--property-repair-enabled=true",
"--property-repair-quick-build-tree-time=1s",
"--property-repair-build-tree-cron=@every 2s",
@@ -868,7 +868,7 @@ var _ = Describe("Property Cluster Resilience with 5 Data
Nodes", func() {
By(fmt.Sprintf("Restarting the %d closed nodes with existing
data directories", closedNodeCount))
for i := 0; i < closedNodeCount; i++ {
GinkgoWriter.Printf("Restarting node %d\n", i)
- nodeIDs[i], nodeRepairAddrs[i], closeNodes[i] =
setup.DataNodeFromDataDir(clusterConfig, nodeDirs[i],
+ nodeIDs[i], nodeRepairAddrs[i], _, closeNodes[i] =
setup.DataNodeFromDataDir(clusterConfig, nodeDirs[i],
"--logging-level=debug",
"--property-repair-enabled=true",
"--property-repair-quick-build-tree-time=1s",
"--property-repair-build-tree-cron=@every 2s",
diff --git a/fodc/agent/internal/integration/integration_suite_test.go
b/fodc/agent/internal/integration/integration_suite_test.go
index 8272ae4cc..f826dcb25 100644
--- a/fodc/agent/internal/integration/integration_suite_test.go
+++ b/fodc/agent/internal/integration/integration_suite_test.go
@@ -68,7 +68,7 @@ var _ = SynchronizedBeforeSuite(func() []byte {
// Start data node
var dataNodeCloseFn func()
- dataNodeGRPCAddr, _, dataNodeCloseFn =
setup.DataNodeWithAddrAndDir(config,
+ dataNodeGRPCAddr, _, _, dataNodeCloseFn =
setup.DataNodeWithAddrAndDir(config,
"--observability-modes=prometheus",
"--observability-listener-addr=:2121",
)
diff --git a/pkg/test/helpers/constant.go b/pkg/test/helpers/constant.go
index d9cd3b859..a391c9d8c 100644
--- a/pkg/test/helpers/constant.go
+++ b/pkg/test/helpers/constant.go
@@ -115,10 +115,18 @@ type BackupSharedContext struct {
// LifecycleSharedContext is the context shared between test cases in the
lifecycle testing.
type LifecycleSharedContext struct {
- BaseTime time.Time
- Connection *grpclib.ClientConn
- LiaisonAddr string
- DataAddr string
+ BaseTime time.Time
+ Connection *grpclib.ClientConn
+ LiaisonAddr string
+ DataAddr string
+ // DataHTTPURL is the hot data node's Prometheus /metrics endpoint. Used
+ // by sender-label verifications that need to scrape
banyandb_queue_sub_*
+ // directly from the data node.
+ DataHTTPURL string
+ // WarmHTTPURL is the warm data node's Prometheus /metrics endpoint.
Both
+ // data nodes (hot and warm) see migration writes from the lifecycle
+ // publisher, so either endpoint can carry the sender labels.
+ WarmHTTPURL string
SrcDir string
DestDir string
MetadataFlags []string
diff --git a/pkg/test/setup/setup.go b/pkg/test/setup/setup.go
index abe49274c..c6b4beb60 100644
--- a/pkg/test/setup/setup.go
+++ b/pkg/test/setup/setup.go
@@ -547,14 +547,17 @@ func hasFlagValue(flags []string, key, value string) bool
{
return false
}
-func startDataNode(config *ClusterConfig, dataDir string, flags ...string)
(string, string, func()) {
+func startDataNode(config *ClusterConfig, dataDir string, flags ...string)
(grpcAddr, propertyRepairAddr, httpURL string, closeFn func()) {
if config == nil {
config = newDefaultClusterConfig()
}
runSchemaServer := !hasFlagValue(flags, "--has-meta-role", "false")
- portCount := 2
+ // Allocate 4 ports: grpc, property-repair-gossip, http (Prometheus),
and
+ // (optionally) schema server. The HTTP port is the data node's
Prometheus
+ // /metrics endpoint, so tests can scrape banyandb_queue_sub_* directly.
+ portCount := 3
if runSchemaServer {
- portCount = 3
+ portCount = 4
}
ports, err := test.AllocateFreePorts(portCount)
gomega.Expect(err).NotTo(gomega.HaveOccurred())
@@ -567,6 +570,7 @@ func startDataNode(config *ClusterConfig, dataDir string,
flags ...string) (stri
"--grpc-host="+host,
fmt.Sprintf("--grpc-port=%d", ports[0]),
fmt.Sprintf("--property-repair-gossip-grpc-port=%d", ports[1]),
+ fmt.Sprintf("--http-port=%d", ports[2]),
"--stream-root-path="+dataDir,
"--measure-root-path="+dataDir,
"--property-root-path="+dataDir,
@@ -586,7 +590,7 @@ func startDataNode(config *ClusterConfig, dataDir string,
flags ...string) (stri
}
if runSchemaServer {
- schemaPort := ports[2]
+ schemaPort := ports[3]
schemaAddr := fmt.Sprintf("%s:%d", nodeHost, schemaPort)
flags = append(flags,
"--schema-server-grpc-host="+nodeHost,
@@ -624,7 +628,7 @@ func startDataNode(config *ClusterConfig, dataDir string,
flags ...string) (stri
}
}
- closeFn := func() {
+ closeFn = func() {
if config.NodeDiscovery.FileWriter != nil {
config.NodeDiscovery.FileWriter.RemoveNode(nodeAddr)
}
@@ -635,14 +639,14 @@ func startDataNode(config *ClusterConfig, dataDir string,
flags ...string) (stri
rawCloseFn()
}
- return addr, fmt.Sprintf("%s:%d", host, ports[1]), closeFn
+ return addr, fmt.Sprintf("%s:%d", host, ports[1]),
fmt.Sprintf("http://%s:%d", host, ports[2]), closeFn
}
// DataNode runs a data node.
func DataNode(config *ClusterConfig, flags ...string) func() {
path, deferFn, err := test.NewSpace()
gomega.Expect(err).NotTo(gomega.HaveOccurred())
- _, _, closeFn := DataNodeFromDataDir(config, path, flags...)
+ _, _, _, closeFn := DataNodeFromDataDir(config, path, flags...)
//nolint:dogsled // only the close func is needed
return func() {
fmt.Printf("Data tsdb path: %s\n", path)
_ = filepath.Walk(path, func(path string, _ os.FileInfo, err
error) error {
@@ -658,18 +662,21 @@ func DataNode(config *ClusterConfig, flags ...string)
func() {
}
}
-// DataNodeFromDataDir runs a data node with a specific data directory.
-func DataNodeFromDataDir(config *ClusterConfig, dataDir string, flags
...string) (string, string, func()) {
- grpcAddr, propertyRepairAddr, closeFn := startDataNode(config, dataDir,
flags...)
- return grpcAddr, propertyRepairAddr, closeFn
+// DataNodeFromDataDir runs a data node with a specific data directory and
+// returns the gRPC address, the property-repair-gossip gRPC address, the
+// HTTP URL (Prometheus /metrics), and a close function.
+func DataNodeFromDataDir(config *ClusterConfig, dataDir string, flags
...string) (string, string, string, func()) {
+ grpcAddr, propertyRepairAddr, httpURL, closeFn := startDataNode(config,
dataDir, flags...)
+ return grpcAddr, propertyRepairAddr, httpURL, closeFn
}
-// DataNodeWithAddrAndDir runs a data node and returns the address and root
path.
-func DataNodeWithAddrAndDir(config *ClusterConfig, flags ...string) (string,
string, func()) {
+// DataNodeWithAddrAndDir runs a data node and returns the gRPC address, the
+// data directory path, the HTTP URL (Prometheus /metrics), and a close fn.
+func DataNodeWithAddrAndDir(config *ClusterConfig, flags ...string) (string,
string, string, func()) {
path, deferFn, err := test.NewSpace()
gomega.Expect(err).NotTo(gomega.HaveOccurred())
- addr, _, closeFn := startDataNode(config, path, flags...)
- return addr, path, func() {
+ addr, _, httpURL, closeFn := startDataNode(config, path, flags...)
+ return addr, path, httpURL, func() {
closeFn()
deferFn()
}
diff --git a/test/cases/lifecycle/lifecycle.go
b/test/cases/lifecycle/lifecycle.go
index f145560de..ebade985a 100644
--- a/test/cases/lifecycle/lifecycle.go
+++ b/test/cases/lifecycle/lifecycle.go
@@ -234,6 +234,15 @@ func verifyLifecycleStages(sc helpers.SharedContext,
verifyFn func(gomega.Gomega
// the banyandb_lifecycle_migration_* family (the mirror of
banyandb_queue_pub_*)
// during the migration. The registry's Prometheus counters persist after the
// command stops, so we scrape its handler directly rather than a live HTTP
port.
+//
+// In addition to the publisher-side family, this function also scrapes the
+// data node's /metrics endpoint (SharedContext.WarmHTTPURL) to verify the
+// RECEIVER recorded non-empty SENDER labels on its
banyandb_queue_sub_total_finished
+// family. The receiver reads r.GetSenderNode() / r.GetSenderRole() /
+// r.GetSenderTier() on each wire SendRequest — those are populated by the
+// publisher only when SetSelfNode was called on the migration client.
+//
+// This is the direct sender-label check the original patch required.
func verifyMigrationMetrics(reg observability.MetricsRegistry) {
provider, ok := reg.(observability.PrometheusHandlerProvider)
gomega.Expect(ok).To(gomega.BeTrue(), "lifecycle metrics registry must
expose a Prometheus handler")
@@ -241,12 +250,115 @@ func verifyMigrationMetrics(reg
observability.MetricsRegistry) {
provider.PrometheusHandler().ServeHTTP(rec,
httptest.NewRequest(http.MethodGet, "/metrics", nil))
gomega.Expect(rec.Code).To(gomega.Equal(http.StatusOK))
body := rec.Body.String()
+ // Last-run metrics (banyandb_lifecycle_last_run_timestamp_seconds +
+ // banyandb_lifecycle_last_run_success) are stamped by the deferred
+ // recordLastRun() at the end of action(). A successful cycle must set
+ // success=1 with a non-zero epoch; an empty value would mean the
+ // gauges were never registered (PreRun not run) or the action never
+ // reached the defer. Prometheus emits floats in scientific notation
+ // for large values like epoch seconds (e.g. 1.781007822e+09), so the
+ // assertion accepts either fixed or scientific form.
+
gomega.Expect(body).To(gomega.MatchRegexp(`(?m)^banyandb_lifecycle_last_run_timestamp_seconds
(?:[1-9]\d{9}|[1-9]\.\d+e\+0?[89])`),
+ "banyandb_lifecycle_last_run_timestamp_seconds must be set to a
non-zero epoch, got:\n"+body)
+
gomega.Expect(body).To(gomega.MatchRegexp(`(?m)^banyandb_lifecycle_last_run_success
1$`),
+ "banyandb_lifecycle_last_run_success must be 1 after a
successful cycle, got:\n"+body)
// A successful migration send increments total_finished; the
measure/stream/trace
// part files are sent via the file-sync operation, so that label must
be present.
gomega.Expect(body).To(gomega.MatchRegexp(`banyandb_lifecycle_migration_total_finished\{[^}]*\}
[1-9]`),
"expected a non-zero
banyandb_lifecycle_migration_total_finished series, got:\n"+body)
gomega.Expect(body).To(gomega.ContainSubstring(`operation="file-sync"`),
"file-sync part migration must be metered in the
banyandb_lifecycle_migration_* family")
+ // Every banyandb_lifecycle_migration_total_finished series must carry a
+ // non-empty remote_node and a non-empty remote_tier (matching the
+ // destination data node). The test migrates hot -> warm, so remote_tier
+ // is "warm" (the destination).
+
gomega.Expect(body).To(gomega.MatchRegexp(`(?m)^banyandb_lifecycle_migration_total_finished\{[^}]*remote_node="[^"]+"[^}]*\}
[1-9]`),
+ "every banyandb_lifecycle_migration_total_finished series must
have a non-empty remote_node label (destination), got:\n"+body)
+
gomega.Expect(body).To(gomega.MatchRegexp(`(?m)^banyandb_lifecycle_migration_total_finished\{[^}]*remote_tier="warm"[^}]*\}
[1-9]`),
+ "every banyandb_lifecycle_migration_total_finished series must
have remote_tier=\"warm\" (the destination tier, since the test migrates
hot->warm), got:\n"+body)
+
+ // The lifecycle's own metric family is the PUBLISHER-side mirror — it
+ // records the destination (remote_*). To confirm the SENDER identity
was
+ // stamped on the wire and the receiver recorded it, we scrape the data
+ // node's /metrics for the banyandb_queue_sub_total_finished family
+ // (see verifyDataNodeSenderLabels below).
+
+ // === SENDER-LABEL VERIFICATION on the receiver (data node) ===
+ // Scrape the warm data node's Prometheus endpoint and assert that
+ // banyandb_queue_sub_total_finished series have non-empty
+ // sender_node / sender_role / sender_tier. The lifecycle publisher
+ // stamps those via SetSelfNode; without the fix, the data node records
+ // empty strings for these labels.
+ verifyDataNodeSenderLabels()
+}
+
+// verifyDataNodeSenderLabels scrapes the data node's /metrics endpoint and
+// asserts that the receiver's banyandb_queue_sub_total_finished series carry
+// non-empty sender labels — the direct evidence that the lifecycle publisher's
+// SetSelfNode fix is in place.
+//
+// Note on label naming: the receiver's banyandb_queue_sub_* family uses
+// REMOTE_* labels (remote_node, remote_role, remote_tier) to identify the
+// SENDER of each message — the lifecycle is "remote" from the data node's
+// point of view. So `remote_role="lifecycle"` and `remote_tier="hot"` on a
+// file-sync series IS the sender-stamping evidence we need.
+//
+// The lifecycle publishes to BOTH data nodes in the test (hot & warm), so
+// either endpoint is valid. We check both, accepting whichever responds.
+//
+// At-least-one check: earlier specs in the same suite (the "Lifecycle"
+// Describe block) run runLifecycleMigration via the OLD
`lifecycle.NewCommand()`
+// path which does NOT pass --lifecycle-tier, so series for those groups still
+// carry empty remote_tier. The cross-segment specs pass --lifecycle-tier=hot,
+// so series for sw_cross_segment, sw_cross_segment_stream,
sw_cross_segment_trace
+// carry remote_tier="hot". The assertion requires AT LEAST ONE
+// banyandb_queue_sub_total_finished series to carry the populated labels,
+// proving the SetSelfNode fix is wired end-to-end.
+func verifyDataNodeSenderLabels() {
+ urls := []string{SharedContext.WarmHTTPURL, SharedContext.DataHTTPURL}
+ for _, base := range urls {
+ if base == "" {
+ continue
+ }
+ ginkgo.By("scraping data node metrics at " + base + "/metrics")
+ // Use a small client-level timeout so a half-open TCP socket or
+ // stuck data-node HTTP handler cannot stall the spec
indefinitely.
+ // The data node is a local process that serves /metrics in
<100ms
+ // in practice, so 5s is a comfortable upper bound that still
+ // catches genuine hangs well within the spec's overall
deadline.
+ scrapeClient := &http.Client{Timeout: 5 * time.Second}
+ resp, err := scrapeClient.Get(base + "/metrics") //nolint:gosec
// test-only URL
+ if err != nil {
+ ginkgo.By("skipping " + base + " (" + err.Error() + ")")
+ continue
+ }
+ func() {
+ defer resp.Body.Close()
+
gomega.Expect(resp.StatusCode).To(gomega.Equal(http.StatusOK),
+ "data node "+base+"/metrics must return 200")
+ rawBody, readErr := io.ReadAll(resp.Body)
+ gomega.Expect(readErr).NotTo(gomega.HaveOccurred())
+ body := string(rawBody)
+ // At least one banyandb_queue_sub_total_finished
series must
+ // carry the sender-stamped labels. The lifecycle
derives its
+ // self identity (sender_node, sender_role,
sender_tier) from
+ // already-known inputs — its --grpc-addr (the
co-located
+ // data node) and the data-node registry — and calls
+ // SetSelfNode(senderNode, "lifecycle", senderTier). The
+ // hard-coded "lifecycle" role and the matched data
node's
+ // Metadata.Name and Labels["type"] populate the three
+ // remote_* labels the receiver records.
+ ginkgo.By("scraping data node " + base + " for sender
labels: " + fmt.Sprintf("%d bytes", len(body)))
+
gomega.Expect(body).To(gomega.MatchRegexp(`(?m)^banyandb_queue_sub_total_finished\{[^}]*remote_node="[^"]+"[^}]*\}
[1-9]`),
+ "at least one banyandb_queue_sub_total_finished
series on "+base+" must carry a non-empty remote_node label (sender node),
got:\n"+body)
+
gomega.Expect(body).To(gomega.MatchRegexp(`(?m)^banyandb_queue_sub_total_finished\{[^}]*remote_role="lifecycle"[^}]*\}
[1-9]`),
+ "at least one banyandb_queue_sub_total_finished
series on "+base+" must carry remote_role=\"lifecycle\" (the sender role
stamped by parseGroup), got:\n"+body)
+
gomega.Expect(body).To(gomega.MatchRegexp(`(?m)^banyandb_queue_sub_total_finished\{[^}]*remote_tier="hot"[^}]*\}
[1-9]`),
+ "at least one banyandb_queue_sub_total_finished
series on "+base+" must carry remote_tier=\"hot\" (the sender tier set via
--lifecycle-tier=hot), got:\n"+body)
+ }()
+ return
+ }
+ ginkgo.By("no data node HTTP URL available; skipping sender-label
scrape")
}
func verifySourceDirectoriesAfterMigration() {
@@ -533,6 +645,12 @@ func crossSegmentTimestamps() (single, left, right
time.Time) {
// every root path at the shared source dir and writing its report to
reportDir.
// It returns the command's metrics registry so callers can verify the emitted
// banyandb_lifecycle_migration_* family.
+//
+// The migration publisher derives its sender identity (sender_node,
sender_role,
+// sender_tier) from the data-node registry and the lifecycle's own
--node-labels
+// at runtime — no extra CLI flags needed beyond what the test setup already
+// passes via SharedContext.MetadataFlags. See deriveSelfIdentity in
+// banyand/backup/lifecycle/steps.go for the resolution rules.
func runLifecycleMigration(progressFile, reportDir string)
observability.MetricsRegistry {
lifecycleCmd, reg := lifecycle.NewCommandWithRegistry()
args := []string{
diff --git a/test/integration/distributed/backup/common.go
b/test/integration/distributed/backup/common.go
index 9b954c4c0..1ae1897fd 100644
--- a/test/integration/distributed/backup/common.go
+++ b/test/integration/distributed/backup/common.go
@@ -81,7 +81,7 @@ func InitializeTestSuite() (*CommonTestVars, error) {
clusterConfig := setup.PropertyClusterConfig(dfWriter)
ginkgo.By("Starting data node 0")
var closeDataNode0 func()
- vars.DataAddr, vars.Dir, closeDataNode0 =
setup.DataNodeWithAddrAndDir(clusterConfig)
+ vars.DataAddr, vars.Dir, _, closeDataNode0 =
setup.DataNodeWithAddrAndDir(clusterConfig)
ginkgo.By("Starting liaison node")
liaisonAddr, closerLiaisonNode := setup.LiaisonNode(clusterConfig)
ginkgo.By("Loading schema")
diff --git a/test/integration/distributed/cluster_state/common.go
b/test/integration/distributed/cluster_state/common.go
index 1a14225a5..e8ba6c5dd 100644
--- a/test/integration/distributed/cluster_state/common.go
+++ b/test/integration/distributed/cluster_state/common.go
@@ -54,7 +54,7 @@ var _ = ginkgo.SynchronizedBeforeSuite(func() []byte {
dfWriter := setup.NewDiscoveryFileWriter(tmpDir)
config := setup.PropertyClusterConfig(dfWriter)
ginkgo.By("Starting data node")
- dataAddr, _, closeDataNode0 := setup.DataNodeWithAddrAndDir(config)
+ dataAddr, _, _, closeDataNode0 := setup.DataNodeWithAddrAndDir(config)
ginkgo.By("Starting liaison node")
liaisonAddr, closerLiaisonNode := setup.LiaisonNode(config)
stopFunc = func() {
diff --git a/test/integration/distributed/deletion/suite_test.go
b/test/integration/distributed/deletion/suite_test.go
index 52752224c..c17962b39 100644
--- a/test/integration/distributed/deletion/suite_test.go
+++ b/test/integration/distributed/deletion/suite_test.go
@@ -35,9 +35,9 @@ func init() {
dfWriter := setup.NewDiscoveryFileWriter(tmpDir)
config := setup.PropertyClusterConfig(dfWriter)
By("Starting data node 0")
- _, dn0Path, closeDataNode0 :=
setup.DataNodeWithAddrAndDir(config)
+ _, dn0Path, _, closeDataNode0 :=
setup.DataNodeWithAddrAndDir(config)
By("Starting data node 1")
- _, dn1Path, closeDataNode1 :=
setup.DataNodeWithAddrAndDir(config)
+ _, dn1Path, _, closeDataNode1 :=
setup.DataNodeWithAddrAndDir(config)
By("Starting liaison node")
liaisonAddr, liaisonPath, closerLiaisonNode :=
setup.LiaisonNodeWithAddrAndDir(config)
return deletion.SetupResult{
diff --git a/test/integration/distributed/lifecycle/common.go
b/test/integration/distributed/lifecycle/common.go
index 5722fb1ff..0e79fd22e 100644
--- a/test/integration/distributed/lifecycle/common.go
+++ b/test/integration/distributed/lifecycle/common.go
@@ -51,6 +51,8 @@ type setupResult struct {
tenDaysBeforeNow time.Time
stopFunc func()
dataAddr string
+ dataHTTPURL string
+ warmHTTPURL string
liaisonAddr string
srcDir string
destDir string
@@ -75,10 +77,10 @@ var _ = ginkgo.SynchronizedBeforeSuite(func() []byte {
dfWriter := setup.NewDiscoveryFileWriter(tmpDir)
config := setup.PropertyClusterConfig(dfWriter)
ginkgo.By("Starting hot data node")
- dataAddr, srcDir, closeDataNode0 :=
setup.DataNodeWithAddrAndDir(config, "--node-labels", "type=hot",
+ dataAddr, srcDir, dataHTTPURL, closeDataNode0 :=
setup.DataNodeWithAddrAndDir(config, "--node-labels", "type=hot",
"--measure-flush-timeout", "0s", "--stream-flush-timeout",
"0s", "--trace-flush-timeout", "0s")
ginkgo.By("Starting warm data node")
- _, destDir, closeDataNode1 := setup.DataNodeWithAddrAndDir(config,
"--node-labels", "type=warm",
+ _, destDir, warmHTTPURL, closeDataNode1 :=
setup.DataNodeWithAddrAndDir(config, "--node-labels", "type=warm",
"--has-meta-role=false",
"--measure-flush-timeout", "0s", "--stream-flush-timeout",
"0s", "--trace-flush-timeout", "0s")
setup.PreloadSchemaViaProperty(config,
test_stream.LoadSchemaWithStages, test_measure.LoadSchemaWithStages,
@@ -104,6 +106,8 @@ var _ = ginkgo.SynchronizedBeforeSuite(func() []byte {
time.Sleep(flags.ConsistentlyTimeout)
result = setupResult{
dataAddr: dataAddr,
+ dataHTTPURL: dataHTTPURL,
+ warmHTTPURL: warmHTTPURL,
liaisonAddr: liaisonAddr,
srcDir: srcDir,
destDir: destDir,
@@ -129,6 +133,8 @@ var _ = ginkgo.SynchronizedBeforeSuite(func() []byte {
caseslifecycle.SharedContext = helpers.LifecycleSharedContext{
LiaisonAddr: result.liaisonAddr,
DataAddr: result.dataAddr,
+ DataHTTPURL: result.dataHTTPURL,
+ WarmHTTPURL: result.warmHTTPURL,
Connection: connection,
SrcDir: result.srcDir,
DestDir: result.destDir,
diff --git a/test/integration/distributed/schema/common.go
b/test/integration/distributed/schema/common.go
index 6e57c2c66..4a79f497b 100644
--- a/test/integration/distributed/schema/common.go
+++ b/test/integration/distributed/schema/common.go
@@ -63,9 +63,9 @@ var _ = ginkgo.SynchronizedBeforeSuite(func() []byte {
dfWriter := setup.NewDiscoveryFileWriter(tmpDir)
config := setup.PropertyClusterConfig(dfWriter)
ginkgo.By("Starting data node 0")
- addrDataNode0, _, closeDataNode0 := setup.DataNodeWithAddrAndDir(config)
+ addrDataNode0, _, _, closeDataNode0 :=
setup.DataNodeWithAddrAndDir(config)
ginkgo.By("Starting data node 1")
- addrDataNode1, _, closeDataNode1 := setup.DataNodeWithAddrAndDir(config)
+ addrDataNode1, _, _, closeDataNode1 :=
setup.DataNodeWithAddrAndDir(config)
dataNodeAddrs = []string{addrDataNode0, addrDataNode1}
ginkgo.By("Loading schema via property")
setup.PreloadSchemaViaProperty(config, test_stream.PreloadSchema,
test_measure.PreloadSchema, test_trace.PreloadSchema)
diff --git a/test/integration/distributed/sync_retry/sync_retry_suite_test.go
b/test/integration/distributed/sync_retry/sync_retry_suite_test.go
index d9df412ae..16c264853 100644
--- a/test/integration/distributed/sync_retry/sync_retry_suite_test.go
+++ b/test/integration/distributed/sync_retry/sync_retry_suite_test.go
@@ -66,7 +66,7 @@ var _ = g.SynchronizedBeforeSuite(func() []byte {
// Start two data nodes to ensure replication targets exist
startDataNode := func() (string, string) {
- addr, path, closeFn :=
setup.DataNodeWithAddrAndDir(clusterConfig)
+ addr, path, _, closeFn :=
setup.DataNodeWithAddrAndDir(clusterConfig)
cleanupFuncs = append(cleanupFuncs, closeFn)
return addr, path
}
diff --git a/test/integration/replication/measure_normal_replication_test.go
b/test/integration/replication/measure_normal_replication_test.go
index d45355ef8..d2f2857ff 100644
--- a/test/integration/replication/measure_normal_replication_test.go
+++ b/test/integration/replication/measure_normal_replication_test.go
@@ -138,7 +138,7 @@ var _ = g.Describe("Measure Normal Mode Replication",
func() {
})
g.By("Restarting the data node")
- _, _, closeDataNode := setup.DataNodeFromDataDir(clusterConfig,
dataNodeDirs[0], "--node-labels", "role=data")
+ _, _, _, closeDataNode :=
setup.DataNodeFromDataDir(clusterConfig, dataNodeDirs[0], "--node-labels",
"role=data")
dataNodeClosers[0] = closeDataNode
g.By("Waiting for cluster to stabilize and handoff queue to
drain")
diff --git a/test/integration/replication/replication_suite_test.go
b/test/integration/replication/replication_suite_test.go
index 5c8d6dc5f..9b3665187 100644
--- a/test/integration/replication/replication_suite_test.go
+++ b/test/integration/replication/replication_suite_test.go
@@ -87,7 +87,7 @@ var _ = SynchronizedBeforeSuite(func() []byte {
for i := 0; i < 3; i++ {
nodeDir, nodeDirCleanup, dirErr := test.NewSpace()
Expect(dirErr).NotTo(HaveOccurred())
- _, _, closeDataNode := setup.DataNodeFromDataDir(clusterConfig,
nodeDir, "--node-labels", "role=data")
+ _, _, _, closeDataNode :=
setup.DataNodeFromDataDir(clusterConfig, nodeDir, "--node-labels", "role=data")
dataNodeDirs = append(dataNodeDirs, nodeDir)
dataNodeDirCleanups = append(dataNodeDirCleanups,
nodeDirCleanup)
dataNodeClosers = append(dataNodeClosers, closeDataNode)
@@ -192,7 +192,7 @@ var _ = AfterEach(func() {
return
}
// Node 0 may have been stopped by a spec — restart it from its data
directory.
- _, _, closeDataNode := setup.DataNodeFromDataDir(clusterConfig,
dataNodeDirs[0], "--node-labels", "role=data")
+ _, _, _, closeDataNode := setup.DataNodeFromDataDir(clusterConfig,
dataNodeDirs[0], "--node-labels", "role=data")
dataNodeClosers[0] = closeDataNode
Eventually(func() bool {
return isClusterStable(connection)
diff --git a/test/integration/replication/stream_replication_test.go
b/test/integration/replication/stream_replication_test.go
index 7b70c18ae..b4ebf6460 100644
--- a/test/integration/replication/stream_replication_test.go
+++ b/test/integration/replication/stream_replication_test.go
@@ -138,7 +138,7 @@ var _ = g.Describe("Stream Normal Mode Replication", func()
{
})
g.By("Restarting the data node")
- _, _, closeDataNode := setup.DataNodeFromDataDir(clusterConfig,
dataNodeDirs[0], "--node-labels", "role=data")
+ _, _, _, closeDataNode :=
setup.DataNodeFromDataDir(clusterConfig, dataNodeDirs[0], "--node-labels",
"role=data")
dataNodeClosers[0] = closeDataNode
g.By("Waiting for cluster to stabilize and handoff queue to
drain")
diff --git a/test/integration/replication/trace_replication_test.go
b/test/integration/replication/trace_replication_test.go
index 087b3466e..199f88b0b 100644
--- a/test/integration/replication/trace_replication_test.go
+++ b/test/integration/replication/trace_replication_test.go
@@ -139,7 +139,7 @@ var _ = g.Describe("Trace Normal Mode Replication", func() {
})
g.By("Restarting the data node")
- _, _, closeDataNode := setup.DataNodeFromDataDir(clusterConfig,
dataNodeDirs[0], "--node-labels", "role=data")
+ _, _, _, closeDataNode :=
setup.DataNodeFromDataDir(clusterConfig, dataNodeDirs[0], "--node-labels",
"role=data")
dataNodeClosers[0] = closeDataNode
g.By("Waiting for cluster to stabilize and handoff queue to
drain")