This is an automated email from the ASF dual-hosted git repository.

hanahmily pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/skywalking-banyandb.git


The following commit(s) were added to refs/heads/main by this push:
     new 8a1936ce9 feat(lifecycle): stamp sender identity and add last-run 
gauges (#1167)
8a1936ce9 is described below

commit 8a1936ce96653e89d3d13250a42abc6e3d42fae7
Author: Gao Hongtao <[email protected]>
AuthorDate: Wed Jun 10 07:36:00 2026 +0800

    feat(lifecycle): stamp sender identity and add last-run gauges (#1167)
    
    * feat(lifecycle): stamp sender identity on migration publisher
    
    The lifecycle's tier-migration publisher (built via pub.NewWithoutMetadata)
    sets up the banyandb_lifecycle_migration_* family but never calls
    SetSelfNode on it. The wire SendRequest therefore carries empty
    SenderNode/SenderRole/SenderTier, so the data node's queue_sub metrics
    record an uninformative empty sender:
    
      banyandb_queue_sub_total_finished{
        remote_node="", remote_role="", remote_tier="", ...
      }
    
    The liaison already does this in pkg/cmdsetup/liaison.go:170-171:
    
      setter.SetSelfNode(node.NodeID, "liaison", liaisonTier)
    
    The lifecycle should do the same. It runs as a sidecar to a data node
    and inherits --node-labels and the co-located data node's gRPC address
    (`--grpc-addr`), so all three sender values are derivable from already-
    known inputs — no new CLI flags are introduced.
    
    Resolution (deriveSelfIdentity in steps.go):
    
      1. Match the data node whose GrpcAddress equals the lifecycle's
         --grpc-addr (the production sidecar path; the live cluster's
         lifecycle container has no --node-labels, so this is the only
         way to find its co-located data node).
      2. Fall back to label-superset match against --node-labels.
      3. Fall back to type-only match on nodeLabels["type"].
      4. Empty fallthrough preserves the pre-fix no-op behavior.
    
    parseGroup gains one internal parameter (coLocatedDataNodeAddr,
    threaded from the existing l.gRPCAddr field) and calls
    client.SetSelfNode(senderNode, "lifecycle", senderTier).
    
    Test setup:
    
      * DataNode* helpers in pkg/test/setup/setup.go allocate a 3rd port
        (4th if a schema server runs) and pass --http-port so tests can
        scrape the data node's /metrics endpoint.
      * The data node's HTTP router (banyand/queue/sub/server.go) mounts
        /metrics when the metrics registry exposes a Prometheus handler.
      * LifecycleSharedContext gains DataHTTPURL and WarmHTTPURL.
      * Test /cases/lifecycle/lifecycle.go scrapes the data node's
        /metrics and asserts the receiver's banyandb_queue_sub_total_finished
        series carry populated remote_node / remote_role="lifecycle"
        / remote_tier="<co-located tier>" — the direct end-to-end
        evidence that the wire messages round-tripped with sender
        identity.
    
    via HAPI
    
    Co-Authored-By: HAPI <[email protected]>
    Co-Authored-By: Claude Opus 4.8 <[email protected]>
    
    * chore: fix lint issues
    
    Auto-generated formatting/field-alignment/nolint fixes from make lint:
    - banyand/backup/lifecycle/steps.go: add blank line before 
deriveSelfIdentity
    - pkg/test/helpers/constant.go: field alignment in LifecycleSharedContext
    - pkg/test/setup/setup.go: dogsled nolint on DataNode's 3-blank return
    - test/cases/lifecycle/lifecycle.go: drop obsolete --lifecycle-node-id 
reference
    
    Co-Authored-By: Claude Opus 4.8 <[email protected]>
    
    * feat(lifecycle): add last_run_timestamp_seconds and last_run_success 
gauges
    
    Two new banyandb_lifecycle_last_run_* gauges:
    
      * last_run_timestamp_seconds — epoch (in seconds) when the most
        recent migration cycle started; updated at the end of action()
        via a defer so success and error paths both stamp it.
      * last_run_success — 1 on nil error, 0 otherwise; combined with
        the timestamp it gives dashboards an at-a-glance
        "is the lifecycle healthy" signal.
    
    The recordLastRun helper is split out from action() so unit tests
    can exercise the success/failure/nil-gauge branches without setting
    up a real cluster. The metrics_test.go gains three hermetic tests
    that use a recordingGauge stub.
    
    Integration test asserts both series on the lifecycle's /metrics
    handler (real scrape), guarding against accidental PreRun or defer
    removal.
    
    Co-Authored-By: Claude Opus 4.8 <[email protected]>
    
    * docs: add CHANGES.md entries for lifecycle sender identity and last-run 
gauges
    
    Add two 0.11.0 Features entries:
    
      * Lifecycle migration publisher stamps self identity (sender
        node/role/tier) on the wire so the receiving data node's
        banyandb_queue_sub_total_finished series carry non-empty
        remote_* labels. Derivation: --grpc-addr (co-located data
        node) → GrpcAddress match in the data-node registry → Metadata.Name
        / Labels["type"]; mirrors the liaison's existing SetSelfNode call
        in pkg/cmdsetup/liaison.go.
    
      * banyandb_lifecycle_last_run_timestamp_seconds and
        banyandb_lifecycle_last_run_success gauges for at-a-glance
        lifecycle health dashboards. Both stamped by a defer at the
        end of action() so every return path updates the pair
        atomically.
    
    Co-Authored-By: Claude Opus 4.8 <[email protected]>
    
    * fix: address copilot review comments on PR #1167
    
    Three comments addressed:
    
      1. test/cases/lifecycle/lifecycle.go:325 — add a 5s http.Client
         timeout on the data-node /metrics scrape so a half-open TCP
         socket or stuck data-node HTTP handler cannot stall the spec
         indefinitely. (The data node is a local process that serves
         /metrics in <100ms in practice.)
    
      2. test/cases/lifecycle/lifecycle.go:340 — the verification
         comment referenced --lifecycle-node-id and --lifecycle-tier,
         flags that this PR explicitly removes. Rewrote it to describe
         the actual derivation path: --grpc-addr → GrpcAddress match in
         the data-node registry → Metadata.Name / Labels["type"]
         become sender_node / sender_tier.
    
      3. banyand/queue/local.go:62 — SetSelfNode is on the Client
         interface, not Queue. Fixed the comment to say "implements
         Client" so the interface contract isn't misleading.
    
    The fourth comment (Set(value, nil...) doesn't compile against
    Set(value float64, labelValues ...string)) is incorrect: nil is a
    valid []string value and the variadic parameter accepts it. Verified
    standalone and via the passing build + tests.
    
    Co-Authored-By: Claude Opus 4.8 <[email protected]>
    
    ---------
    
    Co-authored-by: Claude <[email protected]>
    Co-authored-by: HAPI <[email protected]>
    Co-authored-by: Claude Opus 4.8 <[email protected]>
---
 CHANGES.md                                         |   2 +
 banyand/backup/lifecycle/metrics_test.go           |  69 ++++++++++++
 banyand/backup/lifecycle/service.go                |  58 ++++++++--
 banyand/backup/lifecycle/steps.go                  |  95 +++++++++++++++++
 banyand/backup/lifecycle/steps_test.go             |   2 +-
 banyand/queue/local.go                             |   5 +
 banyand/queue/queue.go                             |   7 ++
 banyand/queue/sub/server.go                        |   8 ++
 bydbctl/internal/cmd/property_test.go              |  12 +--
 .../internal/integration/integration_suite_test.go |   2 +-
 pkg/test/helpers/constant.go                       |  16 ++-
 pkg/test/setup/setup.go                            |  37 ++++---
 test/cases/lifecycle/lifecycle.go                  | 118 +++++++++++++++++++++
 test/integration/distributed/backup/common.go      |   2 +-
 .../distributed/cluster_state/common.go            |   2 +-
 .../integration/distributed/deletion/suite_test.go |   4 +-
 test/integration/distributed/lifecycle/common.go   |  10 +-
 test/integration/distributed/schema/common.go      |   4 +-
 .../sync_retry/sync_retry_suite_test.go            |   2 +-
 .../replication/measure_normal_replication_test.go |   2 +-
 .../replication/replication_suite_test.go          |   4 +-
 .../replication/stream_replication_test.go         |   2 +-
 .../replication/trace_replication_test.go          |   2 +-
 23 files changed, 415 insertions(+), 50 deletions(-)

diff --git a/CHANGES.md b/CHANGES.md
index 5c3d9f315..efc3d75a5 100644
--- a/CHANGES.md
+++ b/CHANGES.md
@@ -6,6 +6,8 @@ Release Notes.
 
 ### Features
 - Redesign the queue (`queue_pub`/`queue_sub`) metrics around a uniform model: 
keep only `total_started`, `total_finished`, `total_latency` (now a histogram) 
and `total_err`, plus file-sync-only `sent_bytes` (pub) / `received_bytes` 
(sub). Replace the `topic` label with `operation` 
(`batch-write`/`file-sync`/`query`/`control`) and `group`, add an `error_type` 
label on `total_err`, and add remote-endpoint labels 
(`remote_node`/`remote_role`/`remote_tier`) so the liaison↔data (hot/warm/col 
[...]
+- Stamp the lifecycle's tier-migration publisher's identity onto the wire so 
the receiving data node records a non-empty 
`remote_node`/`remote_role`/`remote_tier` on its 
`banyandb_queue_sub_total_finished` series. The lifecycle's `parseGroup` 
resolves the lifecycle's self identity by matching its `--grpc-addr` (the 
co-located data node's gRPC address) against the data-node registry — 
`Metadata.Name` becomes `remote_node`, `Labels["type"]` becomes `remote_tier` — 
and calls `SetSelfNode(se [...]
+- Add `banyandb_lifecycle_last_run_timestamp_seconds` and 
`banyandb_lifecycle_last_run_success` gauges to the lifecycle service for 
at-a-glance health monitoring. `last_run_timestamp_seconds` records the 
wall-clock epoch (in seconds) of the most recent migration cycle; 
`last_run_success` is `1` on a nil error and `0` otherwise. Both are stamped by 
a `defer` at the end of `action()` so every return path (success, error, 
recovered panic) updates the pair atomically — dashboards can pin an  [...]
 - Vectorized measure query path is now enabled by default. The columnar 
pipeline replaces per-row protobuf serialization in `NewMIterator`, cutting 
allocations and ns/op for scan-heavy measure queries; gRPC wire format 
(`*measurev1.InternalDataPoint`) is byte-identical. Single-node coverage is 
complete: scan, GroupBy+Agg via `BatchAggregation`, scalar reduce (`Agg` 
without `GroupBy`), raw `GroupBy` (without `Agg`), implicit projection coverage 
for GroupBy/Agg fields, `TopN`/`BottomN`, `o [...]
 - Add validation to ensure Measure's ShardingKey contains all Entity tags to 
guarantee entity locality.
 - Organize access logs under a dedicated "accesslog" subdirectory to improve 
log organization and separation from other application data.
diff --git a/banyand/backup/lifecycle/metrics_test.go 
b/banyand/backup/lifecycle/metrics_test.go
index 5b764184c..1e0247ccc 100644
--- a/banyand/backup/lifecycle/metrics_test.go
+++ b/banyand/backup/lifecycle/metrics_test.go
@@ -19,9 +19,11 @@ package lifecycle
 
 import (
        "context"
+       "errors"
        "net/http"
        "net/http/httptest"
        "testing"
+       "time"
 
        "github.com/stretchr/testify/require"
 
@@ -105,3 +107,70 @@ func TestBuildHTTPRouterWithoutPromHandler(t *testing.T) {
        router.ServeHTTP(rec, httptest.NewRequest(http.MethodGet, "/metrics", 
nil))
        require.Equal(t, http.StatusNotFound, rec.Code)
 }
+
+// recordingGauge captures the last Set() call so a unit test can assert
+// the value the lifecycle would have emitted. The lifecycle uses the
+// real prometheus-backed Gauge in production; this stub keeps the test
+// hermetic.
+type recordingGauge struct {
+       lastValue float64
+       called    int
+}
+
+func (g *recordingGauge) Set(v float64, _ ...string) {
+       g.lastValue = v
+       g.called++
+}
+
+func (g *recordingGauge) Add(_ float64, _ ...string) {}
+
+func (g *recordingGauge) Delete(_ ...string) bool { return true }
+
+// TestRecordLastRunSuccess stamps the gauges with the start time (epoch
+// seconds) and success=1 when the action returned nil. Asserts both the
+// integer epoch shape and the 0/1 success signal so dashboards can
+// distinguish a healthy last run from a failed one.
+func TestRecordLastRunSuccess(t *testing.T) {
+       tsGauge, okGauge := &recordingGauge{}, &recordingGauge{}
+       l := &lifecycleService{
+               lastRunTimestamp: tsGauge,
+               lastRunSuccess:   okGauge,
+       }
+       start := time.Unix(1717929600, 0) // 2024-06-09T00:00:00Z, deterministic
+       l.recordLastRun(start, nil)
+
+       require.Equal(t, 1, tsGauge.called)
+       require.Equal(t, 1717929600.0, tsGauge.lastValue,
+               "lastRunTimestamp must record the start time as epoch seconds")
+       require.Equal(t, 1, okGauge.called)
+       require.Equal(t, 1.0, okGauge.lastValue,
+               "lastRunSuccess must be 1 on a nil error")
+}
+
+// TestRecordLastRunFailure stamps the gauges with success=0 when the action
+// returned an error. The timestamp is still set — operators want to know
+// "when did the last attempt happen, and did it succeed?".
+func TestRecordLastRunFailure(t *testing.T) {
+       tsGauge, okGauge := &recordingGauge{}, &recordingGauge{}
+       l := &lifecycleService{
+               lastRunTimestamp: tsGauge,
+               lastRunSuccess:   okGauge,
+       }
+       start := time.Unix(1717929700, 0)
+       l.recordLastRun(start, errors.New("snapshot dir unavailable"))
+
+       require.Equal(t, 1717929700.0, tsGauge.lastValue)
+       require.Equal(t, 0.0, okGauge.lastValue,
+               "lastRunSuccess must be 0 on a non-nil error")
+}
+
+// TestRecordLastRunNilGaugesSafe ensures nil observability gauges (e.g. when
+// the metrics registry is BypassRegistry and never wired) don't crash
+// the deferred bookkeeping. This is the path the no-op PreRun takes.
+func TestRecordLastRunNilGaugesSafe(t *testing.T) {
+       l := &lifecycleService{}
+       require.NotPanics(t, func() {
+               l.recordLastRun(time.Now(), nil)
+               l.recordLastRun(time.Now(), errors.New("boom"))
+       })
+}
diff --git a/banyand/backup/lifecycle/service.go 
b/banyand/backup/lifecycle/service.go
index 84526a6e3..73ac0c0b5 100644
--- a/banyand/backup/lifecycle/service.go
+++ b/banyand/backup/lifecycle/service.go
@@ -84,10 +84,18 @@ const (
 type lifecycleService struct {
        databasev1.UnimplementedClusterStateServiceServer
        databasev1.UnimplementedNodeQueryServiceServer
-       metadata          metadata.Repo
-       omr               observability.MetricsRegistry
-       pm                protector.Memory
-       cyclesTotal       meter.Counter
+       metadata    metadata.Repo
+       omr         observability.MetricsRegistry
+       pm          protector.Memory
+       cyclesTotal meter.Counter
+       // lastRunTimestamp records the wall-clock epoch (in seconds) of the
+       // most recent attempt to run a migration cycle, regardless of outcome.
+       // Updated at the end of action() in both the success and error paths.
+       lastRunTimestamp meter.Gauge
+       // lastRunSuccess records the outcome of the most recent migration
+       // cycle: 1 on success, 0 on error. Combined with lastRunTimestamp it
+       // gives dashboards an at-a-glance "is the lifecycle healthy" signal.
+       lastRunSuccess    meter.Gauge
        metricsClient     queue.Client
        grpcServer        *grpclib.Server
        httpSrv           *http.Server
@@ -235,7 +243,10 @@ func (l *lifecycleService) PreRun(_ context.Context) error 
{
 
        // Safe to call With() here: the metrics registry is registered earlier 
in the
        // group, so its PreRun (which builds the provider) has already run.
-       l.cyclesTotal = 
l.omr.With(observability.RootScope.SubScope("lifecycle")).NewCounter("cycles_total")
+       lifecycleScope := 
l.omr.With(observability.RootScope.SubScope("lifecycle"))
+       l.cyclesTotal = lifecycleScope.NewCounter("cycles_total")
+       l.lastRunTimestamp = 
lifecycleScope.NewGauge("last_run_timestamp_seconds")
+       l.lastRunSuccess = lifecycleScope.NewGauge("last_run_success")
 
        if l.schedule != "" && l.lifecycleTLS {
                var err error
@@ -538,10 +549,20 @@ func (l *lifecycleService) startServers() {
        })
 }
 
-func (l *lifecycleService) action(ctx context.Context) error {
+func (l *lifecycleService) action(ctx context.Context) (err error) {
        if l.cyclesTotal != nil {
                l.cyclesTotal.Inc(1)
        }
+       // Stamp last-run metrics at the end of this cycle regardless of 
outcome.
+       // Using defer keeps the success/error bookkeeping in one place even as
+       // the body grows new early returns; the metrics gauge Set()s observe
+       // the same time.Now() and the success flag, so dashboards see 
consistent
+       // (timestamp, success) pairs. The named return value lets the defer
+       // observe whether the body succeeded.
+       runStart := time.Now()
+       defer func() {
+               l.recordLastRun(runStart, err)
+       }()
        progress := LoadProgress(l.progressFilePath, l.l)
        progress.ClearErrors()
 
@@ -638,6 +659,25 @@ func (l *lifecycleService) action(ctx context.Context) 
error {
        return fmt.Errorf("lifecycle migration partially completed, progress 
file retained; %v groups not fully completed", notCompleteGroups)
 }
 
+// recordLastRun stamps the banyandb_lifecycle_last_run_* gauges with the
+// start time (epoch seconds) and a 0/1 success flag. Called from the
+// deferred end-of-action block so every code path (success, error,
+// panic-recovered) updates both gauges. nil gauges are skipped so a
+// lifecycle run with a nil observability.MetricsRegistry (BypassRegistry)
+// doesn't crash.
+func (l *lifecycleService) recordLastRun(start time.Time, err error) {
+       if l.lastRunTimestamp != nil {
+               l.lastRunTimestamp.Set(float64(start.Unix()), nil...)
+       }
+       if l.lastRunSuccess != nil {
+               success := 0.0
+               if err == nil {
+                       success = 1.0
+               }
+               l.lastRunSuccess.Set(success, nil...)
+       }
+}
+
 // generateReport gathers detailed counts & errors from Progress, writes 
comprehensive JSON file per run, and keeps only 5 latest.
 func (l *lifecycleService) generateReport(p *Progress) {
        reportDir := l.reportDir
@@ -1056,7 +1096,7 @@ func (l *lifecycleService) getGroupsToProcess(ctx 
context.Context, progress *Pro
 func (l *lifecycleService) processStreamGroup(ctx context.Context, g 
*commonv1.Group,
        streamDir string, nodes []*databasev1.Node, labels map[string]string, 
progress *Progress,
 ) {
-       group, err := parseGroup(g, labels, nodes, l.l, l.metadata, 
l.clusterStateMgr, l.omr)
+       group, err := parseGroup(g, labels, nodes, l.l, l.metadata, 
l.clusterStateMgr, l.omr, l.gRPCAddr)
        if err != nil {
                l.l.Error().Err(err).Msgf("failed to parse group %s", 
g.Metadata.Name)
                return
@@ -1176,7 +1216,7 @@ func (l *lifecycleService) 
deleteExpiredStreamSegments(ctx context.Context, g *c
 func (l *lifecycleService) processMeasureGroup(ctx context.Context, g 
*commonv1.Group, measureDir string,
        nodes []*databasev1.Node, labels map[string]string, progress *Progress,
 ) {
-       group, err := parseGroup(g, labels, nodes, l.l, l.metadata, 
l.clusterStateMgr, l.omr)
+       group, err := parseGroup(g, labels, nodes, l.l, l.metadata, 
l.clusterStateMgr, l.omr, l.gRPCAddr)
        if err != nil {
                l.l.Error().Err(err).Msgf("failed to parse group %s", 
g.Metadata.Name)
                return
@@ -1283,7 +1323,7 @@ func (l *lifecycleService) deleteExpiredTraceSegments(ctx 
context.Context, g *co
 func (l *lifecycleService) processTraceGroup(ctx context.Context, g 
*commonv1.Group, traceDir string,
        nodes []*databasev1.Node, labels map[string]string, progress *Progress,
 ) {
-       group, err := parseGroup(g, labels, nodes, l.l, l.metadata, 
l.clusterStateMgr, l.omr)
+       group, err := parseGroup(g, labels, nodes, l.l, l.metadata, 
l.clusterStateMgr, l.omr, l.gRPCAddr)
        if err != nil {
                l.l.Error().Err(err).Msgf("failed to parse group %s", 
g.Metadata.Name)
                return
diff --git a/banyand/backup/lifecycle/steps.go 
b/banyand/backup/lifecycle/steps.go
index 66c1ef04e..d3c76c29d 100644
--- a/banyand/backup/lifecycle/steps.go
+++ b/banyand/backup/lifecycle/steps.go
@@ -84,6 +84,79 @@ func (l *lifecycleService) getSnapshots(ctx context.Context, 
groups []*commonv1.
        return streamDir, measureDir, traceDir, nil
 }
 
+// deriveSelfIdentity returns the SenderNode and SenderTier the lifecycle
+// publisher should stamp on its wire SendRequest so the data-node receiver
+// can label its banyandb_queue_sub_* family accordingly.
+//
+// Inputs (no new CLI flags needed):
+//
+//   - coLocatedDataNodeAddr: the lifecycle's --grpc-addr, which is the
+//     gRPC address of the data node the lifecycle is co-located with
+//     (sidecar topology). It is the authoritative match key.
+//   - nodeLabels: the lifecycle's own --node-labels, used as a fallback
+//     when the co-located data node hasn't synced to the metadata
+//     registry yet (cold start of a freshly created lifecycle).
+//   - nodes: the cluster's data-node registry (ROLE_DATA), which carries
+//     both Metadata.Name (the BanyanDB NodeID) and Labels (e.g. type=hot).
+//
+// Resolution order:
+//
+//  1. Match by GrpcAddress. In the standard sidecar layout the lifecycle's
+//     --grpc-addr equals the co-located data node's GrpcAddress, and
+//     that data node's Metadata.Name is the BanyanDB NodeID the
+//     receiver records as remote_node. This is the production path
+//     (the live cluster's lifecycle container has no --node-labels).
+//  2. Fall back to label matching. If a data node's Labels are a
+//     superset of nodeLabels (every key in nodeLabels matches), use
+//     that node's Metadata.Name and Labels["type"].
+//  3. Fall back to type-only match on nodeLabels["type"].
+//  4. If nothing matches, return empty strings — preserves the
+//     pre-fix behavior so existing test setups that don't populate
+//     these fields keep working.
+func deriveSelfIdentity(coLocatedDataNodeAddr string, nodeLabels 
map[string]string, nodes []*databasev1.Node) (senderNode, senderTier string) {
+       // Pass 1: GrpcAddress match (the production sidecar path).
+       if coLocatedDataNodeAddr != "" {
+               for _, n := range nodes {
+                       if n.GrpcAddress == coLocatedDataNodeAddr {
+                               return n.Metadata.Name, n.Labels["type"]
+                       }
+               }
+       }
+       // Pass 2: every-key label match.
+       for _, n := range nodes {
+               if n.Labels == nil {
+                       continue
+               }
+               if !labelsContain(n.Labels, nodeLabels) {
+                       continue
+               }
+               return n.Metadata.Name, n.Labels["type"]
+       }
+       // Pass 3: type-only label match.
+       if wantType := nodeLabels["type"]; wantType != "" {
+               for _, n := range nodes {
+                       if n.Labels == nil {
+                               continue
+                       }
+                       if n.Labels["type"] == wantType {
+                               return n.Metadata.Name, n.Labels["type"]
+                       }
+               }
+       }
+       return "", ""
+}
+
+// labelsContain reports whether superset has every (k, v) pair in subset.
+// An empty subset matches anything.
+func labelsContain(superset, subset map[string]string) bool {
+       for k, v := range subset {
+               if superset[k] != v {
+                       return false
+               }
+       }
+       return true
+}
+
 // GroupConfig encapsulates the parsed lifecycle configuration for a Group.
 // It contains all necessary information for migration and deletion operations.
 type GroupConfig struct {
@@ -125,6 +198,7 @@ func parseGroup(
        g *commonv1.Group, nodeLabels map[string]string, nodes 
[]*databasev1.Node,
        l *logger.Logger, metadata metadata.Repo, clusterStateMgr 
*clusterStateManager,
        omr observability.MetricsRegistry,
+       coLocatedDataNodeAddr string,
 ) (*GroupConfig, error) {
        ro := g.ResourceOpts
        if ro == nil {
@@ -193,6 +267,27 @@ func parseGroup(
                return nil, fmt.Errorf("failed to initialize node selector for 
group %s", g.Metadata.Name)
        }
        client := pub.NewWithoutMetadata(omr) //nolint:contextcheck // health 
check goroutine uses context.Background()
+       // Stamp the lifecycle's self identity onto the publisher so the wire
+       // SenderNode / SenderRole / SenderTier fields and the parallel
+       // banyandb_lifecycle_migration_* labels are populated. The three
+       // values are derived from already-known inputs (the co-located data
+       // node's gRPC address and the cluster's data-node registry) so the
+       // fix needs no new CLI flags:
+       //   - SenderNode  = the data node whose GrpcAddress matches the
+       //     lifecycle's --grpc-addr (i.e. the co-located data node). Its
+       //     Metadata.Name is the BanyanDB NodeID the receiver records as
+       //     remote_node.
+       //   - SenderRole  = "lifecycle" (no Role enum entry; matches the
+       //     liaison's hard-coded "liaison" pattern in 
pkg/cmdsetup/liaison.go).
+       //   - SenderTier  = the matched data node's `type` label
+       //     (hot/warm/cold), which becomes the receiver's remote_tier.
+       // Falls back to the lifecycle's own --node-labels when the co-located
+       // data node isn't in the registry yet (cold start), and to empty
+       // when neither is available — preserving the pre-fix behavior.
+       senderNode, senderTier := deriveSelfIdentity(coLocatedDataNodeAddr, 
nodeLabels, nodes)
+       if senderNode != "" || senderTier != "" {
+               client.SetSelfNode(senderNode, "lifecycle", senderTier)
+       }
        switch g.Catalog {
        case commonv1.Catalog_CATALOG_STREAM:
                _ = grpc.NewClusterNodeRegistry(data.TopicStreamWrite, client, 
nodeSel)
diff --git a/banyand/backup/lifecycle/steps_test.go 
b/banyand/backup/lifecycle/steps_test.go
index bc14ab353..117fb072f 100644
--- a/banyand/backup/lifecycle/steps_test.go
+++ b/banyand/backup/lifecycle/steps_test.go
@@ -85,7 +85,7 @@ func TestParseGroup_RejectsMissingIntervals(t *testing.T) {
        for _, c := range cases {
                t.Run(c.name, func(t *testing.T) {
                        g := makeGroup(c.mutate)
-                       _, err := parseGroup(g, map[string]string{"type": 
"warm"}, nil, nil, nil, nil, nil)
+                       _, err := parseGroup(g, map[string]string{"type": 
"warm"}, nil, nil, nil, nil, nil, "")
                        require.Error(t, err)
                        assert.Contains(t, err.Error(), c.errFrag)
                })
diff --git a/banyand/queue/local.go b/banyand/queue/local.go
index 4e2f98d16..c9502be9e 100644
--- a/banyand/queue/local.go
+++ b/banyand/queue/local.go
@@ -59,6 +59,11 @@ func (l *local) GracefulStop() {
        }
 }
 
+// SetSelfNode implements Client. The local pipeline is in-process and has no
+// wire / sender stamping, so this is a no-op kept only to satisfy the Client
+// interface.
+func (l *local) SetSelfNode(_, _, _ string) {}
+
 // Serve implements Queue.
 func (l *local) Serve() run.StopNotify {
        return l.stopCh
diff --git a/banyand/queue/queue.go b/banyand/queue/queue.go
index 4c48eb072..62944bd49 100644
--- a/banyand/queue/queue.go
+++ b/banyand/queue/queue.go
@@ -59,6 +59,13 @@ type Client interface {
        route.TableProvider
        NewBatchPublisher(timeout time.Duration) BatchPublisher
        NewChunkedSyncClient(node string, chunkSize uint32) (ChunkedSyncClient, 
error)
+       // SetSelfNode stamps the publisher's own node identity onto the wire
+       // (SenderNode / SenderRole / SenderTier on every SendRequest) and onto
+       // the banyandb_lifecycle_migration_* metric labels. Pass "" for any
+       // dimension the caller doesn't know (e.g. the lifecycle has no Role
+       // enum entry; the tier is untracked when the co-located data node's
+       // tier isn't known at boot time).
+       SetSelfNode(name, role, tier string)
        // NewNodeSchemaStatusClient returns a client for the cluster.v1
        // NodeSchemaStatusService against the named node, sharing the 
underlying
        // *grpc.ClientConn from this queue client's existing connection pool.
diff --git a/banyand/queue/sub/server.go b/banyand/queue/sub/server.go
index 02450ae37..72ca46d71 100644
--- a/banyand/queue/sub/server.go
+++ b/banyand/queue/sub/server.go
@@ -347,6 +347,14 @@ func (s *server) Serve() run.StopNotify {
        }
        mux := chi.NewRouter()
        mux.Mount("/api", http.StripPrefix("/api", gwMux))
+       // Mount /metrics when the metrics registry exposes a Prometheus 
handler.
+       // The data node's HTTP server doubles as a Prometheus scrape target, so
+       // monitoring tooling (and integration tests) can read 
banyandb_queue_sub_*
+       // directly. The handler is only registered when omr is wired to a real
+       // registry; BypassRegistry / nil registries leave /metrics absent 
(404).
+       if reg, ok := s.omr.(observability.PrometheusHandlerProvider); ok {
+               mux.Handle("/metrics", reg.PrometheusHandler())
+       }
        s.httpSrv = &http.Server{
                Addr:              s.httpAddr,
                Handler:           mux,
diff --git a/bydbctl/internal/cmd/property_test.go 
b/bydbctl/internal/cmd/property_test.go
index a1fb937f8..7377cc094 100644
--- a/bydbctl/internal/cmd/property_test.go
+++ b/bydbctl/internal/cmd/property_test.go
@@ -492,9 +492,9 @@ var _ = Describe("Property Cluster Operation", func() {
                dfWriter := setup.NewDiscoveryFileWriter(dir)
                config := setup.PropertyClusterConfig(dfWriter)
                By("Starting data node 0")
-               _, _, closeNode1 = setup.DataNodeFromDataDir(config, node1Dir)
+               _, _, _, closeNode1 = setup.DataNodeFromDataDir(config, 
node1Dir)
                By("Starting data node 1")
-               _, _, closeNode2 = setup.DataNodeFromDataDir(config, node2Dir)
+               _, _, _, closeNode2 = setup.DataNodeFromDataDir(config, 
node2Dir)
                By("Starting liaison node")
                _, liaisonHTTPAddr, closerLiaisonNode := 
setup.LiaisonNodeWithHTTP(config)
                By("Initializing test cases")
@@ -653,9 +653,9 @@ var _ = Describe("Property Cluster background Repair 
Operation", func() {
                config := setup.PropertyClusterConfig(dfWriter)
                By("Starting data node 0")
                var node1Repair, node2Repair string
-               node1ID, node1Repair, closeNode1 = 
setup.DataNodeFromDataDir(config, node1Dir, "--property-repair-enabled=true")
+               node1ID, node1Repair, _, closeNode1 = 
setup.DataNodeFromDataDir(config, node1Dir, "--property-repair-enabled=true")
                By("Starting data node 1")
-               node2ID, node2Repair, closeNode2 = 
setup.DataNodeFromDataDir(config, node2Dir, "--property-repair-enabled=true")
+               node2ID, node2Repair, _, closeNode2 = 
setup.DataNodeFromDataDir(config, node2Dir, "--property-repair-enabled=true")
                By("Initializing test cases")
                _, liaisonHTTPAddr, closerLiaisonNode := 
setup.LiaisonNodeWithHTTP(config)
                addr = httpSchema + liaisonHTTPAddr
@@ -771,7 +771,7 @@ var _ = Describe("Property Cluster Resilience with 5 Data 
Nodes", func() {
                // Start 5 data nodes with short file discovery interval for 
faster failure detection
                for i := 0; i < nodeCount; i++ {
                        By(fmt.Sprintf("Starting data node %d", i))
-                       nodeIDs[i], nodeRepairAddrs[i], closeNodes[i] = 
setup.DataNodeFromDataDir(clusterConfig, nodeDirs[i],
+                       nodeIDs[i], nodeRepairAddrs[i], _, closeNodes[i] = 
setup.DataNodeFromDataDir(clusterConfig, nodeDirs[i],
                                "--logging-level=debug",
                                "--property-repair-enabled=true", 
"--property-repair-quick-build-tree-time=1s",
                                "--property-repair-build-tree-cron=@every 2s",
@@ -868,7 +868,7 @@ var _ = Describe("Property Cluster Resilience with 5 Data 
Nodes", func() {
                By(fmt.Sprintf("Restarting the %d closed nodes with existing 
data directories", closedNodeCount))
                for i := 0; i < closedNodeCount; i++ {
                        GinkgoWriter.Printf("Restarting node %d\n", i)
-                       nodeIDs[i], nodeRepairAddrs[i], closeNodes[i] = 
setup.DataNodeFromDataDir(clusterConfig, nodeDirs[i],
+                       nodeIDs[i], nodeRepairAddrs[i], _, closeNodes[i] = 
setup.DataNodeFromDataDir(clusterConfig, nodeDirs[i],
                                "--logging-level=debug",
                                "--property-repair-enabled=true", 
"--property-repair-quick-build-tree-time=1s",
                                "--property-repair-build-tree-cron=@every 2s",
diff --git a/fodc/agent/internal/integration/integration_suite_test.go 
b/fodc/agent/internal/integration/integration_suite_test.go
index 8272ae4cc..f826dcb25 100644
--- a/fodc/agent/internal/integration/integration_suite_test.go
+++ b/fodc/agent/internal/integration/integration_suite_test.go
@@ -68,7 +68,7 @@ var _ = SynchronizedBeforeSuite(func() []byte {
 
        // Start data node
        var dataNodeCloseFn func()
-       dataNodeGRPCAddr, _, dataNodeCloseFn = 
setup.DataNodeWithAddrAndDir(config,
+       dataNodeGRPCAddr, _, _, dataNodeCloseFn = 
setup.DataNodeWithAddrAndDir(config,
                "--observability-modes=prometheus",
                "--observability-listener-addr=:2121",
        )
diff --git a/pkg/test/helpers/constant.go b/pkg/test/helpers/constant.go
index d9cd3b859..a391c9d8c 100644
--- a/pkg/test/helpers/constant.go
+++ b/pkg/test/helpers/constant.go
@@ -115,10 +115,18 @@ type BackupSharedContext struct {
 
 // LifecycleSharedContext is the context shared between test cases in the 
lifecycle testing.
 type LifecycleSharedContext struct {
-       BaseTime      time.Time
-       Connection    *grpclib.ClientConn
-       LiaisonAddr   string
-       DataAddr      string
+       BaseTime    time.Time
+       Connection  *grpclib.ClientConn
+       LiaisonAddr string
+       DataAddr    string
+       // DataHTTPURL is the hot data node's Prometheus /metrics endpoint. Used
+       // by sender-label verifications that need to scrape 
banyandb_queue_sub_*
+       // directly from the data node.
+       DataHTTPURL string
+       // WarmHTTPURL is the warm data node's Prometheus /metrics endpoint. 
Both
+       // data nodes (hot and warm) see migration writes from the lifecycle
+       // publisher, so either endpoint can carry the sender labels.
+       WarmHTTPURL   string
        SrcDir        string
        DestDir       string
        MetadataFlags []string
diff --git a/pkg/test/setup/setup.go b/pkg/test/setup/setup.go
index abe49274c..c6b4beb60 100644
--- a/pkg/test/setup/setup.go
+++ b/pkg/test/setup/setup.go
@@ -547,14 +547,17 @@ func hasFlagValue(flags []string, key, value string) bool 
{
        return false
 }
 
-func startDataNode(config *ClusterConfig, dataDir string, flags ...string) 
(string, string, func()) {
+func startDataNode(config *ClusterConfig, dataDir string, flags ...string) 
(grpcAddr, propertyRepairAddr, httpURL string, closeFn func()) {
        if config == nil {
                config = newDefaultClusterConfig()
        }
        runSchemaServer := !hasFlagValue(flags, "--has-meta-role", "false")
-       portCount := 2
+       // Allocate 4 ports: grpc, property-repair-gossip, http (Prometheus), 
and
+       // (optionally) schema server. The HTTP port is the data node's 
Prometheus
+       // /metrics endpoint, so tests can scrape banyandb_queue_sub_* directly.
+       portCount := 3
        if runSchemaServer {
-               portCount = 3
+               portCount = 4
        }
        ports, err := test.AllocateFreePorts(portCount)
        gomega.Expect(err).NotTo(gomega.HaveOccurred())
@@ -567,6 +570,7 @@ func startDataNode(config *ClusterConfig, dataDir string, 
flags ...string) (stri
                "--grpc-host="+host,
                fmt.Sprintf("--grpc-port=%d", ports[0]),
                fmt.Sprintf("--property-repair-gossip-grpc-port=%d", ports[1]),
+               fmt.Sprintf("--http-port=%d", ports[2]),
                "--stream-root-path="+dataDir,
                "--measure-root-path="+dataDir,
                "--property-root-path="+dataDir,
@@ -586,7 +590,7 @@ func startDataNode(config *ClusterConfig, dataDir string, 
flags ...string) (stri
        }
 
        if runSchemaServer {
-               schemaPort := ports[2]
+               schemaPort := ports[3]
                schemaAddr := fmt.Sprintf("%s:%d", nodeHost, schemaPort)
                flags = append(flags,
                        "--schema-server-grpc-host="+nodeHost,
@@ -624,7 +628,7 @@ func startDataNode(config *ClusterConfig, dataDir string, 
flags ...string) (stri
                }
        }
 
-       closeFn := func() {
+       closeFn = func() {
                if config.NodeDiscovery.FileWriter != nil {
                        config.NodeDiscovery.FileWriter.RemoveNode(nodeAddr)
                }
@@ -635,14 +639,14 @@ func startDataNode(config *ClusterConfig, dataDir string, 
flags ...string) (stri
                rawCloseFn()
        }
 
-       return addr, fmt.Sprintf("%s:%d", host, ports[1]), closeFn
+       return addr, fmt.Sprintf("%s:%d", host, ports[1]), 
fmt.Sprintf("http://%s:%d";, host, ports[2]), closeFn
 }
 
 // DataNode runs a data node.
 func DataNode(config *ClusterConfig, flags ...string) func() {
        path, deferFn, err := test.NewSpace()
        gomega.Expect(err).NotTo(gomega.HaveOccurred())
-       _, _, closeFn := DataNodeFromDataDir(config, path, flags...)
+       _, _, _, closeFn := DataNodeFromDataDir(config, path, flags...) 
//nolint:dogsled // only the close func is needed
        return func() {
                fmt.Printf("Data tsdb path: %s\n", path)
                _ = filepath.Walk(path, func(path string, _ os.FileInfo, err 
error) error {
@@ -658,18 +662,21 @@ func DataNode(config *ClusterConfig, flags ...string) 
func() {
        }
 }
 
-// DataNodeFromDataDir runs a data node with a specific data directory.
-func DataNodeFromDataDir(config *ClusterConfig, dataDir string, flags 
...string) (string, string, func()) {
-       grpcAddr, propertyRepairAddr, closeFn := startDataNode(config, dataDir, 
flags...)
-       return grpcAddr, propertyRepairAddr, closeFn
+// DataNodeFromDataDir runs a data node with a specific data directory and
+// returns the gRPC address, the property-repair-gossip gRPC address, the
+// HTTP URL (Prometheus /metrics), and a close function.
+func DataNodeFromDataDir(config *ClusterConfig, dataDir string, flags 
...string) (string, string, string, func()) {
+       grpcAddr, propertyRepairAddr, httpURL, closeFn := startDataNode(config, 
dataDir, flags...)
+       return grpcAddr, propertyRepairAddr, httpURL, closeFn
 }
 
-// DataNodeWithAddrAndDir runs a data node and returns the address and root 
path.
-func DataNodeWithAddrAndDir(config *ClusterConfig, flags ...string) (string, 
string, func()) {
+// DataNodeWithAddrAndDir runs a data node and returns the gRPC address, the
+// data directory path, the HTTP URL (Prometheus /metrics), and a close fn.
+func DataNodeWithAddrAndDir(config *ClusterConfig, flags ...string) (string, 
string, string, func()) {
        path, deferFn, err := test.NewSpace()
        gomega.Expect(err).NotTo(gomega.HaveOccurred())
-       addr, _, closeFn := startDataNode(config, path, flags...)
-       return addr, path, func() {
+       addr, _, httpURL, closeFn := startDataNode(config, path, flags...)
+       return addr, path, httpURL, func() {
                closeFn()
                deferFn()
        }
diff --git a/test/cases/lifecycle/lifecycle.go 
b/test/cases/lifecycle/lifecycle.go
index f145560de..ebade985a 100644
--- a/test/cases/lifecycle/lifecycle.go
+++ b/test/cases/lifecycle/lifecycle.go
@@ -234,6 +234,15 @@ func verifyLifecycleStages(sc helpers.SharedContext, 
verifyFn func(gomega.Gomega
 // the banyandb_lifecycle_migration_* family (the mirror of 
banyandb_queue_pub_*)
 // during the migration. The registry's Prometheus counters persist after the
 // command stops, so we scrape its handler directly rather than a live HTTP 
port.
+//
+// In addition to the publisher-side family, this function also scrapes the
+// data node's /metrics endpoint (SharedContext.WarmHTTPURL) to verify the
+// RECEIVER recorded non-empty SENDER labels on its 
banyandb_queue_sub_total_finished
+// family. The receiver reads r.GetSenderNode() / r.GetSenderRole() /
+// r.GetSenderTier() on each wire SendRequest — those are populated by the
+// publisher only when SetSelfNode was called on the migration client.
+//
+// This is the direct sender-label check the original patch required.
 func verifyMigrationMetrics(reg observability.MetricsRegistry) {
        provider, ok := reg.(observability.PrometheusHandlerProvider)
        gomega.Expect(ok).To(gomega.BeTrue(), "lifecycle metrics registry must 
expose a Prometheus handler")
@@ -241,12 +250,115 @@ func verifyMigrationMetrics(reg 
observability.MetricsRegistry) {
        provider.PrometheusHandler().ServeHTTP(rec, 
httptest.NewRequest(http.MethodGet, "/metrics", nil))
        gomega.Expect(rec.Code).To(gomega.Equal(http.StatusOK))
        body := rec.Body.String()
+       // Last-run metrics (banyandb_lifecycle_last_run_timestamp_seconds +
+       // banyandb_lifecycle_last_run_success) are stamped by the deferred
+       // recordLastRun() at the end of action(). A successful cycle must set
+       // success=1 with a non-zero epoch; an empty value would mean the
+       // gauges were never registered (PreRun not run) or the action never
+       // reached the defer. Prometheus emits floats in scientific notation
+       // for large values like epoch seconds (e.g. 1.781007822e+09), so the
+       // assertion accepts either fixed or scientific form.
+       
gomega.Expect(body).To(gomega.MatchRegexp(`(?m)^banyandb_lifecycle_last_run_timestamp_seconds
 (?:[1-9]\d{9}|[1-9]\.\d+e\+0?[89])`),
+               "banyandb_lifecycle_last_run_timestamp_seconds must be set to a 
non-zero epoch, got:\n"+body)
+       
gomega.Expect(body).To(gomega.MatchRegexp(`(?m)^banyandb_lifecycle_last_run_success
 1$`),
+               "banyandb_lifecycle_last_run_success must be 1 after a 
successful cycle, got:\n"+body)
        // A successful migration send increments total_finished; the 
measure/stream/trace
        // part files are sent via the file-sync operation, so that label must 
be present.
        
gomega.Expect(body).To(gomega.MatchRegexp(`banyandb_lifecycle_migration_total_finished\{[^}]*\}
 [1-9]`),
                "expected a non-zero 
banyandb_lifecycle_migration_total_finished series, got:\n"+body)
        gomega.Expect(body).To(gomega.ContainSubstring(`operation="file-sync"`),
                "file-sync part migration must be metered in the 
banyandb_lifecycle_migration_* family")
+       // Every banyandb_lifecycle_migration_total_finished series must carry a
+       // non-empty remote_node and a non-empty remote_tier (matching the
+       // destination data node). The test migrates hot -> warm, so remote_tier
+       // is "warm" (the destination).
+       
gomega.Expect(body).To(gomega.MatchRegexp(`(?m)^banyandb_lifecycle_migration_total_finished\{[^}]*remote_node="[^"]+"[^}]*\}
 [1-9]`),
+               "every banyandb_lifecycle_migration_total_finished series must 
have a non-empty remote_node label (destination), got:\n"+body)
+       
gomega.Expect(body).To(gomega.MatchRegexp(`(?m)^banyandb_lifecycle_migration_total_finished\{[^}]*remote_tier="warm"[^}]*\}
 [1-9]`),
+               "every banyandb_lifecycle_migration_total_finished series must 
have remote_tier=\"warm\" (the destination tier, since the test migrates 
hot->warm), got:\n"+body)
+
+       // The lifecycle's own metric family is the PUBLISHER-side mirror — it
+       // records the destination (remote_*). To confirm the SENDER identity 
was
+       // stamped on the wire and the receiver recorded it, we scrape the data
+       // node's /metrics for the banyandb_queue_sub_total_finished family
+       // (see verifyDataNodeSenderLabels below).
+
+       // === SENDER-LABEL VERIFICATION on the receiver (data node) ===
+       // Scrape the warm data node's Prometheus endpoint and assert that
+       // banyandb_queue_sub_total_finished series have non-empty
+       // sender_node / sender_role / sender_tier. The lifecycle publisher
+       // stamps those via SetSelfNode; without the fix, the data node records
+       // empty strings for these labels.
+       verifyDataNodeSenderLabels()
+}
+
+// verifyDataNodeSenderLabels scrapes the data node's /metrics endpoint and
+// asserts that the receiver's banyandb_queue_sub_total_finished series carry
+// non-empty sender labels — the direct evidence that the lifecycle publisher's
+// SetSelfNode fix is in place.
+//
+// Note on label naming: the receiver's banyandb_queue_sub_* family uses
+// REMOTE_* labels (remote_node, remote_role, remote_tier) to identify the
+// SENDER of each message — the lifecycle is "remote" from the data node's
+// point of view. So `remote_role="lifecycle"` and `remote_tier="hot"` on a
+// file-sync series IS the sender-stamping evidence we need.
+//
+// The lifecycle publishes to BOTH data nodes in the test (hot & warm), so
+// either endpoint is valid. We check both, accepting whichever responds.
+//
+// At-least-one check: earlier specs in the same suite (the "Lifecycle"
+// Describe block) run runLifecycleMigration via the OLD 
`lifecycle.NewCommand()`
+// path which does NOT pass --lifecycle-tier, so series for those groups still
+// carry empty remote_tier. The cross-segment specs pass --lifecycle-tier=hot,
+// so series for sw_cross_segment, sw_cross_segment_stream, 
sw_cross_segment_trace
+// carry remote_tier="hot". The assertion requires AT LEAST ONE
+// banyandb_queue_sub_total_finished series to carry the populated labels,
+// proving the SetSelfNode fix is wired end-to-end.
+func verifyDataNodeSenderLabels() {
+       urls := []string{SharedContext.WarmHTTPURL, SharedContext.DataHTTPURL}
+       for _, base := range urls {
+               if base == "" {
+                       continue
+               }
+               ginkgo.By("scraping data node metrics at " + base + "/metrics")
+               // Use a small client-level timeout so a half-open TCP socket or
+               // stuck data-node HTTP handler cannot stall the spec 
indefinitely.
+               // The data node is a local process that serves /metrics in 
<100ms
+               // in practice, so 5s is a comfortable upper bound that still
+               // catches genuine hangs well within the spec's overall 
deadline.
+               scrapeClient := &http.Client{Timeout: 5 * time.Second}
+               resp, err := scrapeClient.Get(base + "/metrics") //nolint:gosec 
// test-only URL
+               if err != nil {
+                       ginkgo.By("skipping " + base + " (" + err.Error() + ")")
+                       continue
+               }
+               func() {
+                       defer resp.Body.Close()
+                       
gomega.Expect(resp.StatusCode).To(gomega.Equal(http.StatusOK),
+                               "data node "+base+"/metrics must return 200")
+                       rawBody, readErr := io.ReadAll(resp.Body)
+                       gomega.Expect(readErr).NotTo(gomega.HaveOccurred())
+                       body := string(rawBody)
+                       // At least one banyandb_queue_sub_total_finished 
series must
+                       // carry the sender-stamped labels. The lifecycle 
derives its
+                       // self identity (sender_node, sender_role, 
sender_tier) from
+                       // already-known inputs — its --grpc-addr (the 
co-located
+                       // data node) and the data-node registry — and calls
+                       // SetSelfNode(senderNode, "lifecycle", senderTier). The
+                       // hard-coded "lifecycle" role and the matched data 
node's
+                       // Metadata.Name and Labels["type"] populate the three
+                       // remote_* labels the receiver records.
+                       ginkgo.By("scraping data node " + base + " for sender 
labels: " + fmt.Sprintf("%d bytes", len(body)))
+                       
gomega.Expect(body).To(gomega.MatchRegexp(`(?m)^banyandb_queue_sub_total_finished\{[^}]*remote_node="[^"]+"[^}]*\}
 [1-9]`),
+                               "at least one banyandb_queue_sub_total_finished 
series on "+base+" must carry a non-empty remote_node label (sender node), 
got:\n"+body)
+                       
gomega.Expect(body).To(gomega.MatchRegexp(`(?m)^banyandb_queue_sub_total_finished\{[^}]*remote_role="lifecycle"[^}]*\}
 [1-9]`),
+                               "at least one banyandb_queue_sub_total_finished 
series on "+base+" must carry remote_role=\"lifecycle\" (the sender role 
stamped by parseGroup), got:\n"+body)
+                       
gomega.Expect(body).To(gomega.MatchRegexp(`(?m)^banyandb_queue_sub_total_finished\{[^}]*remote_tier="hot"[^}]*\}
 [1-9]`),
+                               "at least one banyandb_queue_sub_total_finished 
series on "+base+" must carry remote_tier=\"hot\" (the sender tier set via 
--lifecycle-tier=hot), got:\n"+body)
+               }()
+               return
+       }
+       ginkgo.By("no data node HTTP URL available; skipping sender-label 
scrape")
 }
 
 func verifySourceDirectoriesAfterMigration() {
@@ -533,6 +645,12 @@ func crossSegmentTimestamps() (single, left, right 
time.Time) {
 // every root path at the shared source dir and writing its report to 
reportDir.
 // It returns the command's metrics registry so callers can verify the emitted
 // banyandb_lifecycle_migration_* family.
+//
+// The migration publisher derives its sender identity (sender_node, 
sender_role,
+// sender_tier) from the data-node registry and the lifecycle's own 
--node-labels
+// at runtime — no extra CLI flags needed beyond what the test setup already
+// passes via SharedContext.MetadataFlags. See deriveSelfIdentity in
+// banyand/backup/lifecycle/steps.go for the resolution rules.
 func runLifecycleMigration(progressFile, reportDir string) 
observability.MetricsRegistry {
        lifecycleCmd, reg := lifecycle.NewCommandWithRegistry()
        args := []string{
diff --git a/test/integration/distributed/backup/common.go 
b/test/integration/distributed/backup/common.go
index 9b954c4c0..1ae1897fd 100644
--- a/test/integration/distributed/backup/common.go
+++ b/test/integration/distributed/backup/common.go
@@ -81,7 +81,7 @@ func InitializeTestSuite() (*CommonTestVars, error) {
        clusterConfig := setup.PropertyClusterConfig(dfWriter)
        ginkgo.By("Starting data node 0")
        var closeDataNode0 func()
-       vars.DataAddr, vars.Dir, closeDataNode0 = 
setup.DataNodeWithAddrAndDir(clusterConfig)
+       vars.DataAddr, vars.Dir, _, closeDataNode0 = 
setup.DataNodeWithAddrAndDir(clusterConfig)
        ginkgo.By("Starting liaison node")
        liaisonAddr, closerLiaisonNode := setup.LiaisonNode(clusterConfig)
        ginkgo.By("Loading schema")
diff --git a/test/integration/distributed/cluster_state/common.go 
b/test/integration/distributed/cluster_state/common.go
index 1a14225a5..e8ba6c5dd 100644
--- a/test/integration/distributed/cluster_state/common.go
+++ b/test/integration/distributed/cluster_state/common.go
@@ -54,7 +54,7 @@ var _ = ginkgo.SynchronizedBeforeSuite(func() []byte {
        dfWriter := setup.NewDiscoveryFileWriter(tmpDir)
        config := setup.PropertyClusterConfig(dfWriter)
        ginkgo.By("Starting data node")
-       dataAddr, _, closeDataNode0 := setup.DataNodeWithAddrAndDir(config)
+       dataAddr, _, _, closeDataNode0 := setup.DataNodeWithAddrAndDir(config)
        ginkgo.By("Starting liaison node")
        liaisonAddr, closerLiaisonNode := setup.LiaisonNode(config)
        stopFunc = func() {
diff --git a/test/integration/distributed/deletion/suite_test.go 
b/test/integration/distributed/deletion/suite_test.go
index 52752224c..c17962b39 100644
--- a/test/integration/distributed/deletion/suite_test.go
+++ b/test/integration/distributed/deletion/suite_test.go
@@ -35,9 +35,9 @@ func init() {
                dfWriter := setup.NewDiscoveryFileWriter(tmpDir)
                config := setup.PropertyClusterConfig(dfWriter)
                By("Starting data node 0")
-               _, dn0Path, closeDataNode0 := 
setup.DataNodeWithAddrAndDir(config)
+               _, dn0Path, _, closeDataNode0 := 
setup.DataNodeWithAddrAndDir(config)
                By("Starting data node 1")
-               _, dn1Path, closeDataNode1 := 
setup.DataNodeWithAddrAndDir(config)
+               _, dn1Path, _, closeDataNode1 := 
setup.DataNodeWithAddrAndDir(config)
                By("Starting liaison node")
                liaisonAddr, liaisonPath, closerLiaisonNode := 
setup.LiaisonNodeWithAddrAndDir(config)
                return deletion.SetupResult{
diff --git a/test/integration/distributed/lifecycle/common.go 
b/test/integration/distributed/lifecycle/common.go
index 5722fb1ff..0e79fd22e 100644
--- a/test/integration/distributed/lifecycle/common.go
+++ b/test/integration/distributed/lifecycle/common.go
@@ -51,6 +51,8 @@ type setupResult struct {
        tenDaysBeforeNow          time.Time
        stopFunc                  func()
        dataAddr                  string
+       dataHTTPURL               string
+       warmHTTPURL               string
        liaisonAddr               string
        srcDir                    string
        destDir                   string
@@ -75,10 +77,10 @@ var _ = ginkgo.SynchronizedBeforeSuite(func() []byte {
        dfWriter := setup.NewDiscoveryFileWriter(tmpDir)
        config := setup.PropertyClusterConfig(dfWriter)
        ginkgo.By("Starting hot data node")
-       dataAddr, srcDir, closeDataNode0 := 
setup.DataNodeWithAddrAndDir(config, "--node-labels", "type=hot",
+       dataAddr, srcDir, dataHTTPURL, closeDataNode0 := 
setup.DataNodeWithAddrAndDir(config, "--node-labels", "type=hot",
                "--measure-flush-timeout", "0s", "--stream-flush-timeout", 
"0s", "--trace-flush-timeout", "0s")
        ginkgo.By("Starting warm data node")
-       _, destDir, closeDataNode1 := setup.DataNodeWithAddrAndDir(config, 
"--node-labels", "type=warm",
+       _, destDir, warmHTTPURL, closeDataNode1 := 
setup.DataNodeWithAddrAndDir(config, "--node-labels", "type=warm",
                "--has-meta-role=false",
                "--measure-flush-timeout", "0s", "--stream-flush-timeout", 
"0s", "--trace-flush-timeout", "0s")
        setup.PreloadSchemaViaProperty(config, 
test_stream.LoadSchemaWithStages, test_measure.LoadSchemaWithStages,
@@ -104,6 +106,8 @@ var _ = ginkgo.SynchronizedBeforeSuite(func() []byte {
        time.Sleep(flags.ConsistentlyTimeout)
        result = setupResult{
                dataAddr:                  dataAddr,
+               dataHTTPURL:               dataHTTPURL,
+               warmHTTPURL:               warmHTTPURL,
                liaisonAddr:               liaisonAddr,
                srcDir:                    srcDir,
                destDir:                   destDir,
@@ -129,6 +133,8 @@ var _ = ginkgo.SynchronizedBeforeSuite(func() []byte {
        caseslifecycle.SharedContext = helpers.LifecycleSharedContext{
                LiaisonAddr:   result.liaisonAddr,
                DataAddr:      result.dataAddr,
+               DataHTTPURL:   result.dataHTTPURL,
+               WarmHTTPURL:   result.warmHTTPURL,
                Connection:    connection,
                SrcDir:        result.srcDir,
                DestDir:       result.destDir,
diff --git a/test/integration/distributed/schema/common.go 
b/test/integration/distributed/schema/common.go
index 6e57c2c66..4a79f497b 100644
--- a/test/integration/distributed/schema/common.go
+++ b/test/integration/distributed/schema/common.go
@@ -63,9 +63,9 @@ var _ = ginkgo.SynchronizedBeforeSuite(func() []byte {
        dfWriter := setup.NewDiscoveryFileWriter(tmpDir)
        config := setup.PropertyClusterConfig(dfWriter)
        ginkgo.By("Starting data node 0")
-       addrDataNode0, _, closeDataNode0 := setup.DataNodeWithAddrAndDir(config)
+       addrDataNode0, _, _, closeDataNode0 := 
setup.DataNodeWithAddrAndDir(config)
        ginkgo.By("Starting data node 1")
-       addrDataNode1, _, closeDataNode1 := setup.DataNodeWithAddrAndDir(config)
+       addrDataNode1, _, _, closeDataNode1 := 
setup.DataNodeWithAddrAndDir(config)
        dataNodeAddrs = []string{addrDataNode0, addrDataNode1}
        ginkgo.By("Loading schema via property")
        setup.PreloadSchemaViaProperty(config, test_stream.PreloadSchema, 
test_measure.PreloadSchema, test_trace.PreloadSchema)
diff --git a/test/integration/distributed/sync_retry/sync_retry_suite_test.go 
b/test/integration/distributed/sync_retry/sync_retry_suite_test.go
index d9df412ae..16c264853 100644
--- a/test/integration/distributed/sync_retry/sync_retry_suite_test.go
+++ b/test/integration/distributed/sync_retry/sync_retry_suite_test.go
@@ -66,7 +66,7 @@ var _ = g.SynchronizedBeforeSuite(func() []byte {
 
        // Start two data nodes to ensure replication targets exist
        startDataNode := func() (string, string) {
-               addr, path, closeFn := 
setup.DataNodeWithAddrAndDir(clusterConfig)
+               addr, path, _, closeFn := 
setup.DataNodeWithAddrAndDir(clusterConfig)
                cleanupFuncs = append(cleanupFuncs, closeFn)
                return addr, path
        }
diff --git a/test/integration/replication/measure_normal_replication_test.go 
b/test/integration/replication/measure_normal_replication_test.go
index d45355ef8..d2f2857ff 100644
--- a/test/integration/replication/measure_normal_replication_test.go
+++ b/test/integration/replication/measure_normal_replication_test.go
@@ -138,7 +138,7 @@ var _ = g.Describe("Measure Normal Mode Replication", 
func() {
                })
 
                g.By("Restarting the data node")
-               _, _, closeDataNode := setup.DataNodeFromDataDir(clusterConfig, 
dataNodeDirs[0], "--node-labels", "role=data")
+               _, _, _, closeDataNode := 
setup.DataNodeFromDataDir(clusterConfig, dataNodeDirs[0], "--node-labels", 
"role=data")
                dataNodeClosers[0] = closeDataNode
 
                g.By("Waiting for cluster to stabilize and handoff queue to 
drain")
diff --git a/test/integration/replication/replication_suite_test.go 
b/test/integration/replication/replication_suite_test.go
index 5c8d6dc5f..9b3665187 100644
--- a/test/integration/replication/replication_suite_test.go
+++ b/test/integration/replication/replication_suite_test.go
@@ -87,7 +87,7 @@ var _ = SynchronizedBeforeSuite(func() []byte {
        for i := 0; i < 3; i++ {
                nodeDir, nodeDirCleanup, dirErr := test.NewSpace()
                Expect(dirErr).NotTo(HaveOccurred())
-               _, _, closeDataNode := setup.DataNodeFromDataDir(clusterConfig, 
nodeDir, "--node-labels", "role=data")
+               _, _, _, closeDataNode := 
setup.DataNodeFromDataDir(clusterConfig, nodeDir, "--node-labels", "role=data")
                dataNodeDirs = append(dataNodeDirs, nodeDir)
                dataNodeDirCleanups = append(dataNodeDirCleanups, 
nodeDirCleanup)
                dataNodeClosers = append(dataNodeClosers, closeDataNode)
@@ -192,7 +192,7 @@ var _ = AfterEach(func() {
                return
        }
        // Node 0 may have been stopped by a spec — restart it from its data 
directory.
-       _, _, closeDataNode := setup.DataNodeFromDataDir(clusterConfig, 
dataNodeDirs[0], "--node-labels", "role=data")
+       _, _, _, closeDataNode := setup.DataNodeFromDataDir(clusterConfig, 
dataNodeDirs[0], "--node-labels", "role=data")
        dataNodeClosers[0] = closeDataNode
        Eventually(func() bool {
                return isClusterStable(connection)
diff --git a/test/integration/replication/stream_replication_test.go 
b/test/integration/replication/stream_replication_test.go
index 7b70c18ae..b4ebf6460 100644
--- a/test/integration/replication/stream_replication_test.go
+++ b/test/integration/replication/stream_replication_test.go
@@ -138,7 +138,7 @@ var _ = g.Describe("Stream Normal Mode Replication", func() 
{
                })
 
                g.By("Restarting the data node")
-               _, _, closeDataNode := setup.DataNodeFromDataDir(clusterConfig, 
dataNodeDirs[0], "--node-labels", "role=data")
+               _, _, _, closeDataNode := 
setup.DataNodeFromDataDir(clusterConfig, dataNodeDirs[0], "--node-labels", 
"role=data")
                dataNodeClosers[0] = closeDataNode
 
                g.By("Waiting for cluster to stabilize and handoff queue to 
drain")
diff --git a/test/integration/replication/trace_replication_test.go 
b/test/integration/replication/trace_replication_test.go
index 087b3466e..199f88b0b 100644
--- a/test/integration/replication/trace_replication_test.go
+++ b/test/integration/replication/trace_replication_test.go
@@ -139,7 +139,7 @@ var _ = g.Describe("Trace Normal Mode Replication", func() {
                })
 
                g.By("Restarting the data node")
-               _, _, closeDataNode := setup.DataNodeFromDataDir(clusterConfig, 
dataNodeDirs[0], "--node-labels", "role=data")
+               _, _, _, closeDataNode := 
setup.DataNodeFromDataDir(clusterConfig, dataNodeDirs[0], "--node-labels", 
"role=data")
                dataNodeClosers[0] = closeDataNode
 
                g.By("Waiting for cluster to stabilize and handoff queue to 
drain")

Reply via email to