Kevin Doran created NIP-30:
------------------------------
Summary: Connector Observability: MDC Logging and Reporting-Task
Visibility
Key: NIP-30
URL: https://issues.apache.org/jira/browse/NIP-30
Project: NiFi Improvement Proposal
Issue Type: Improvement
Reporter: Kevin Doran
Assignee: Kevin Doran
h2. Motivation
The Connector extension point introduced by NIP-11 and implemented under
NIFI-15258 is a new top-level component whose flow lives inside a managed
Process Group that is a sibling of the controller's root Process Group, not a
descendant. Two observability gaps follow from that placement:
# Logs emitted by components inside a connector-managed flow have no way to
carry connector identity in MDC. NiFi already provides MDC context for
components in the controller's root flow via the {{processGroup*}} keys, but
those keys describe the containing PG hierarchy, not the owning Connector. A
Connector implementation also has no API to publish its own identity-shaped MDC
attributes (for example, a connector definition identifier) for the components
running inside its flow.
# Reporting tasks that walk {{EventAccess.getControllerStatus()}} cannot see
connector-managed flows at all. Because connector-managed PGs are siblings of
the root, the existing controller traversal misses them entirely; built-in and
third-party reporting tasks therefore export zero metrics for connector flows
today.
This NIP proposes a small, additive set of API changes in {{nifi-api}} to close
both gaps without affecting existing components, callers, or test stubs. The
framework wiring that supplies behavior behind these defaults is out of scope
and will be tracked in a separate NIFI Jira.
h2. Scope
Three additions and one field, all in {{{}nifi-api{}}}, all backward compatible
by virtue of {{default}} methods, default field values, and no removed/renamed
members:
# New default method on {{{}ConnectorInitializationContext{}}}:
default void setLoggingAttributes(Map<String, String> attributes) {
thrownewUnsupportedOperationException();
}
# New {{loggingAttributes}} field on {{ProcessGroupStatus}} ({{{}Map<String,
String>{}}}, defaults to {{{}Map.of(){}}}) with accessors that take a defensive
immutable copy on set; included in {{clone()}} and in the static {{merge()}}
helper (additive/union merge across cluster nodes).
# New status type {{org.apache.nifi.controller.status.ConnectorStatus}}
carrying connector identity ({{{}id{}}}, {{{}name{}}}) and the
{{ProcessGroupStatus}} for the Connector's managed root Process Group
({{{}rootGroupStatus{}}}). Implements {{Cloneable}} and follows the established
Status-class conventions ({{{}PortStatus{}}}, {{{}ConnectionStatus{}}}, etc.).
Positioned to grow with connector-level metrics that have no Process-Group
analog (for example, connector-wide backlog or idle time) in subsequent work.
# New default method on {{{}EventAccess{}}}:
default Collection<ConnectorStatus> getConnectorStatuses() {
return Collections.emptyList();
}
No existing types, methods, or fields are altered. No new compile-time
dependencies are introduced; the only new import on the consumer side is
{{{}org.apache.nifi.controller.status.ConnectorStatus{}}}.
h2. Description
h3. {{ConnectorInitializationContext.setLoggingAttributes(Map)}}
A Connector implementation invokes this from within its {{initialize(...)}}
lifecycle (or at any later point, e.g. once its configuration steps are
applied) to publish key/value pairs that the framework will include in the
SLF4J {{MDC}} for every log line emitted by the Connector itself and by any
component (Processor, Controller Service, etc.) running inside the Connector's
managed flow.
Calls replace the previous set of custom attributes. The framework reserves a
small set of well-known keys that describe the Connector's own identity
(identifier, name, type FQN, bundle coordinate); attempts to override these via
this method are dropped and the framework logs a {{WARN}} for each rejected
key. The exact set of reserved keys is a framework concern and is intentionally
not specified in {{{}nifi-api{}}}.
The default throws {{UnsupportedOperationException}} so that frameworks that
have not yet implemented Connector MDC continue to compile and run; the
framework-supplied {{StandardConnectorInitializationContext}} overrides it.
h3. {{ProcessGroupStatus.loggingAttributes}}
When a reporting task observes a {{{}ProcessGroupStatus{}}}, it now also sees a
snapshot of the logging attributes that apply to that Process Group at capture
time. For PGs inside a connector-managed flow this carries the
connector-identity keys; for ordinary PGs it carries the existing
{{processGroup*}} keys. Reporting tasks that export to systems like
OpenTelemetry can then attach those keys as metric attributes without any
additional traversal.
Setter takes a defensive immutable copy ({{{}Map.copyOf{}}}); {{null}} is
normalized to {{{}Map.of(){}}}. {{clone()}} routes through the setter so the
cloned instance always holds an independent immutable map. {{merge()}} is
additive (union with last-write-wins per key), which keeps the behavior
forgiving when cluster nodes are mid-deploy and one node has not yet populated
its map.
h3. {{ConnectorStatus}}
A connector-level status object analogous to {{{}ProcessGroupStatus{}}}. The
minimum viable fields are the connector's identity and the status of its
managed root PG. It is introduced now (rather than reusing
{{ProcessGroupStatus}} directly from
{{{}EventAccess.getConnectorStatuses(){}}}) because:
* Reporting tasks need to identify the connector independently of the root
PG's id, which may not match the connector id.
* Connectors have observable state that does not exist at the PG level (e.g.
connector-wide backlog, idle time, lifecycle state). Future NIPs can add fields
here without re-shaping the {{EventAccess}} return type.
h3. {{EventAccess.getConnectorStatuses()}}
A new entry point for reporting tasks to enumerate connector-managed flows
alongside the controller's root flow. Implementations walk the runtime's
Connector registry, build a {{ConnectorStatus}} for each (populated with the
existing {{getGroupStatus(...)}} result for the managed root), and return them.
The default returns an empty list so that:
* Existing {{EventAccess}} implementations (tests, mocks, downstream
frameworks) compile unchanged.
* Existing reporting tasks that do not call this method see no behavior change.
The framework implementation of this method, and the changes required in
built-in reporting tasks to consume it, are out of scope for this NIP and will
be tracked under a separate NIFI Jira.
h2. Compatibility
Purely additive:
* All new interface methods have {{default}} implementations (UOE for
{{{}setLoggingAttributes{}}}, empty collection for
{{{}getConnectorStatuses{}}}).
* The new {{loggingAttributes}} field on {{ProcessGroupStatus}} defaults to
{{{}Map.of(){}}}; existing serialization round-trips of {{ProcessGroupStatus}}
are unaffected because the field is not currently present in any persisted
form. Existing reflective consumers see a new no-arg accessor and an empty
default.
* {{ConnectorStatus}} is a brand-new type; nothing references it today.
* No method signatures, return types, or behaviors are altered.
The Connector API is already marked experimental per NIP-11, but these
particular additions are designed to be safe to lift into a stable form without
further changes.
h2. Verification
* Existing unit tests across {{nifi-api}} continue to compile and pass without
modification.
* New unit tests cover:
** {{ProcessGroupStatus.setLoggingAttributes}} defensive copy (mutating the
caller's map after {{set}} does not mutate the stored map), {{clone()}}
independence, and {{merge()}} union semantics across overlapping/disjoint key
sets.
** {{ConnectorStatus.clone()}} independence (mutating the source
{{rootGroupStatus}} does not affect the clone).
** {{EventAccess.getConnectorStatuses()}} default returns an empty, immutable
collection.
* Framework-level integration is verified when the corresponding NiFi
framework changes consume the new methods (tracked separately).
h2. Alternatives
# Use {{ProcessGroupStatus}} directly from
{{EventAccess.getConnectorStatuses()}} (no new type). Rejected because (a) the
connector's identity is not necessarily the root PG's identity, and (b)
Connectors are expected to grow connector-level observable state (backlog, idle
time, lifecycle) that does not fit on {{ProcessGroupStatus}} — introducing the
wrapper now avoids a return-type breaking change later.
# Surface custom MDC attributes via a connector-side getter
({{{}Connector.getLoggingAttributes(){}}}) instead of a push call. Rejected
because many custom attributes are only known after configuration is applied
(e.g. database name, source schema), at which point a pull-time getter cannot
describe them. A push API ({{{}setLoggingAttributes{}}}) handles both
static-at-init and dynamic-from-config cases uniformly. A getter could be added
later as a non-breaking convenience for static-only connectors if evidence
accumulates.
# Extend {{EventAccess.getControllerStatus()}} to recursively include
connector-managed flows. Rejected because connector-managed flows are
intentionally siblings of the root group (per NIP-11); merging them under the
root would conflate two distinct hierarchies and break existing reporting-task
assumptions.
# Promote logging attributes via the existing bulletin/event mechanism rather
than via a snapshot field on {{{}ProcessGroupStatus{}}}. Rejected because the
snapshot field is the lowest-friction integration point for the OpenTelemetry
reporting task and other status-based exporters that already iterate
{{{}ProcessGroupStatus{}}}.
--
This message was sent by Atlassian Jira
(v8.20.10#820010)