Kevin Doran created NIP-30:
------------------------------

             Summary: Connector Observability: MDC Logging and Reporting-Task 
Visibility
                 Key: NIP-30
                 URL: https://issues.apache.org/jira/browse/NIP-30
             Project: NiFi Improvement Proposal
          Issue Type: Improvement
            Reporter: Kevin Doran
            Assignee: Kevin Doran


h2. Motivation

The Connector extension point introduced by NIP-11 and implemented under 
NIFI-15258 is a new top-level component whose flow lives inside a managed 
Process Group that is a sibling of the controller's root Process Group, not a 
descendant. Two observability gaps follow from that placement:
 # Logs emitted by components inside a connector-managed flow have no way to 
carry connector identity in MDC. NiFi already provides MDC context for 
components in the controller's root flow via the {{processGroup*}} keys, but 
those keys describe the containing PG hierarchy, not the owning Connector. A 
Connector implementation also has no API to publish its own identity-shaped MDC 
attributes (for example, a connector definition identifier) for the components 
running inside its flow.
 # Reporting tasks that walk {{EventAccess.getControllerStatus()}} cannot see 
connector-managed flows at all. Because connector-managed PGs are siblings of 
the root, the existing controller traversal misses them entirely; built-in and 
third-party reporting tasks therefore export zero metrics for connector flows 
today.

This NIP proposes a small, additive set of API changes in {{nifi-api}} to close 
both gaps without affecting existing components, callers, or test stubs. The 
framework wiring that supplies behavior behind these defaults is out of scope 
and will be tracked in a separate NIFI Jira.
h2. Scope

Three additions and one field, all in {{{}nifi-api{}}}, all backward compatible 
by virtue of {{default}} methods, default field values, and no removed/renamed 
members:
 # New default method on {{{}ConnectorInitializationContext{}}}:
default void setLoggingAttributes(Map<String, String> attributes) {
thrownewUnsupportedOperationException();
}
 # New {{loggingAttributes}} field on {{ProcessGroupStatus}} ({{{}Map<String, 
String>{}}}, defaults to {{{}Map.of(){}}}) with accessors that take a defensive 
immutable copy on set; included in {{clone()}} and in the static {{merge()}} 
helper (additive/union merge across cluster nodes).

 # New status type {{org.apache.nifi.controller.status.ConnectorStatus}} 
carrying connector identity ({{{}id{}}}, {{{}name{}}}) and the 
{{ProcessGroupStatus}} for the Connector's managed root Process Group 
({{{}rootGroupStatus{}}}). Implements {{Cloneable}} and follows the established 
Status-class conventions ({{{}PortStatus{}}}, {{{}ConnectionStatus{}}}, etc.). 
Positioned to grow with connector-level metrics that have no Process-Group 
analog (for example, connector-wide backlog or idle time) in subsequent work.

 # New default method on {{{}EventAccess{}}}:
default Collection<ConnectorStatus> getConnectorStatuses() {
return Collections.emptyList();
}

No existing types, methods, or fields are altered. No new compile-time 
dependencies are introduced; the only new import on the consumer side is 
{{{}org.apache.nifi.controller.status.ConnectorStatus{}}}.
h2. Description
h3. {{ConnectorInitializationContext.setLoggingAttributes(Map)}}

A Connector implementation invokes this from within its {{initialize(...)}} 
lifecycle (or at any later point, e.g. once its configuration steps are 
applied) to publish key/value pairs that the framework will include in the 
SLF4J {{MDC}} for every log line emitted by the Connector itself and by any 
component (Processor, Controller Service, etc.) running inside the Connector's 
managed flow.

Calls replace the previous set of custom attributes. The framework reserves a 
small set of well-known keys that describe the Connector's own identity 
(identifier, name, type FQN, bundle coordinate); attempts to override these via 
this method are dropped and the framework logs a {{WARN}} for each rejected 
key. The exact set of reserved keys is a framework concern and is intentionally 
not specified in {{{}nifi-api{}}}.

The default throws {{UnsupportedOperationException}} so that frameworks that 
have not yet implemented Connector MDC continue to compile and run; the 
framework-supplied {{StandardConnectorInitializationContext}} overrides it.
h3. {{ProcessGroupStatus.loggingAttributes}}

When a reporting task observes a {{{}ProcessGroupStatus{}}}, it now also sees a 
snapshot of the logging attributes that apply to that Process Group at capture 
time. For PGs inside a connector-managed flow this carries the 
connector-identity keys; for ordinary PGs it carries the existing 
{{processGroup*}} keys. Reporting tasks that export to systems like 
OpenTelemetry can then attach those keys as metric attributes without any 
additional traversal.

Setter takes a defensive immutable copy ({{{}Map.copyOf{}}}); {{null}} is 
normalized to {{{}Map.of(){}}}. {{clone()}} routes through the setter so the 
cloned instance always holds an independent immutable map. {{merge()}} is 
additive (union with last-write-wins per key), which keeps the behavior 
forgiving when cluster nodes are mid-deploy and one node has not yet populated 
its map.
h3. {{ConnectorStatus}}

A connector-level status object analogous to {{{}ProcessGroupStatus{}}}. The 
minimum viable fields are the connector's identity and the status of its 
managed root PG. It is introduced now (rather than reusing 
{{ProcessGroupStatus}} directly from 
{{{}EventAccess.getConnectorStatuses(){}}}) because:
 * Reporting tasks need to identify the connector independently of the root 
PG's id, which may not match the connector id.
 * Connectors have observable state that does not exist at the PG level (e.g. 
connector-wide backlog, idle time, lifecycle state). Future NIPs can add fields 
here without re-shaping the {{EventAccess}} return type.

h3. {{EventAccess.getConnectorStatuses()}}

A new entry point for reporting tasks to enumerate connector-managed flows 
alongside the controller's root flow. Implementations walk the runtime's 
Connector registry, build a {{ConnectorStatus}} for each (populated with the 
existing {{getGroupStatus(...)}} result for the managed root), and return them. 
The default returns an empty list so that:
 * Existing {{EventAccess}} implementations (tests, mocks, downstream 
frameworks) compile unchanged.
 * Existing reporting tasks that do not call this method see no behavior change.

The framework implementation of this method, and the changes required in 
built-in reporting tasks to consume it, are out of scope for this NIP and will 
be tracked under a separate NIFI Jira.
h2. Compatibility

Purely additive:
 * All new interface methods have {{default}} implementations (UOE for 
{{{}setLoggingAttributes{}}}, empty collection for 
{{{}getConnectorStatuses{}}}).
 * The new {{loggingAttributes}} field on {{ProcessGroupStatus}} defaults to 
{{{}Map.of(){}}}; existing serialization round-trips of {{ProcessGroupStatus}} 
are unaffected because the field is not currently present in any persisted 
form. Existing reflective consumers see a new no-arg accessor and an empty 
default.
 * {{ConnectorStatus}} is a brand-new type; nothing references it today.
 * No method signatures, return types, or behaviors are altered.

The Connector API is already marked experimental per NIP-11, but these 
particular additions are designed to be safe to lift into a stable form without 
further changes.
h2. Verification
 * Existing unit tests across {{nifi-api}} continue to compile and pass without 
modification.
 * New unit tests cover:
 ** {{ProcessGroupStatus.setLoggingAttributes}} defensive copy (mutating the 
caller's map after {{set}} does not mutate the stored map), {{clone()}} 
independence, and {{merge()}} union semantics across overlapping/disjoint key 
sets.
 ** {{ConnectorStatus.clone()}} independence (mutating the source 
{{rootGroupStatus}} does not affect the clone).
 ** {{EventAccess.getConnectorStatuses()}} default returns an empty, immutable 
collection.
 * Framework-level integration is verified when the corresponding NiFi 
framework changes consume the new methods (tracked separately).

h2. Alternatives
 # Use {{ProcessGroupStatus}} directly from 
{{EventAccess.getConnectorStatuses()}} (no new type). Rejected because (a) the 
connector's identity is not necessarily the root PG's identity, and (b) 
Connectors are expected to grow connector-level observable state (backlog, idle 
time, lifecycle) that does not fit on {{ProcessGroupStatus}} — introducing the 
wrapper now avoids a return-type breaking change later.
 # Surface custom MDC attributes via a connector-side getter 
({{{}Connector.getLoggingAttributes(){}}}) instead of a push call. Rejected 
because many custom attributes are only known after configuration is applied 
(e.g. database name, source schema), at which point a pull-time getter cannot 
describe them. A push API ({{{}setLoggingAttributes{}}}) handles both 
static-at-init and dynamic-from-config cases uniformly. A getter could be added 
later as a non-breaking convenience for static-only connectors if evidence 
accumulates.
 # Extend {{EventAccess.getControllerStatus()}} to recursively include 
connector-managed flows. Rejected because connector-managed flows are 
intentionally siblings of the root group (per NIP-11); merging them under the 
root would conflate two distinct hierarchies and break existing reporting-task 
assumptions.
 # Promote logging attributes via the existing bulletin/event mechanism rather 
than via a snapshot field on {{{}ProcessGroupStatus{}}}. Rejected because the 
snapshot field is the lowest-friction integration point for the OpenTelemetry 
reporting task and other status-based exporters that already iterate 
{{{}ProcessGroupStatus{}}}.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)

Reply via email to