kevdoran commented on code in PR #11330:
URL: https://github.com/apache/nifi/pull/11330#discussion_r3405239072
##########
nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/components/connector/StandardConnectorNode.java:
##########
@@ -149,6 +157,109 @@ public String getName() {
@Override
public void setName(final String name) {
this.name = name;
+ rebuildLoggingAttributes();
+ }
+
+ /**
+ * Returns the {@link ProcessGroup} that holds the connector-managed flow
(the connector's active
+ * managed root group), or {@code null} before the active flow context has
been established.
+ *
+ * <p>This serves two purposes:
+ * <ul>
+ * <li>It satisfies the {@link GroupedComponent} contract. The
connector's own {@code ComponentLog}
+ * uses a {@code StandardLoggingContext} bound to this node (see
{@code ExtensionBuilder}), so the
+ * connector's own log lines source their MDC from this group's
+ * {@link ProcessGroup#getLoggingAttributes() logging
attributes}.</li>
+ * <li>It is the target onto which {@link #rebuildLoggingAttributes()}
pushes the merged connector
+ * logging attributes (via {@code
StandardProcessGroup.setConnectorLoggingAttributes}); those then
+ * cascade down the managed flow so components running inside it
inherit the connector MDC through
+ * their own logging contexts.</li>
+ * </ul>
+ *
+ * @return the connector's managed root process group, or {@code null}
before the active flow context
+ * has been established
+ */
+ @Override
+ public ProcessGroup getProcessGroup() {
+ final FrameworkFlowContext context = activeFlowContext;
+ return context == null ? null : context.getManagedProcessGroup();
+ }
+
+ /**
+ * Replaces the connector-supplied custom logging attributes. Reserved
keys (those used by the
+ * framework, see {@link ConnectorLoggingAttribute}) are filtered out and
a WARN is logged for
+ * each dropped entry.
+ *
+ * @param attributes the proposed custom attributes; {@code null} or empty
clears the current set
+ */
+ @Override
+ public void setCustomLoggingAttributes(final Map<String, String>
attributes) {
+ final Map<String, String> filtered = filterReservedKeys(attributes);
+
+ synchronized (loggingAttributesLock) {
+ this.customLoggingAttributes = filtered;
+ }
+ rebuildLoggingAttributes();
+ }
+
+ /**
+ * Returns an immutable snapshot of the merged framework + custom logging
attributes currently
+ * advertised by this connector. The framework keys are populated by the
framework from the
+ * connector's identifier, name, component type, and bundle coordinate.
+ *
+ * @return an immutable map of the connector's merged framework and custom
logging attributes; never {@code null}
+ */
+ @Override
+ public Map<String, String> getLoggingAttributes() {
+ return mergedLoggingAttributes;
+ }
+
+ private Map<String, String> filterReservedKeys(final Map<String, String>
attributes) {
+ if (attributes == null || attributes.isEmpty()) {
+ return Map.of();
+ }
+ final Map<String, String> filtered = new HashMap<>(attributes.size());
+ for (final Map.Entry<String, String> entry : attributes.entrySet()) {
+ final String key = entry.getKey();
+ if (key == null || key.isEmpty()) {
+ continue;
+ }
+ if (ConnectorLoggingAttribute.isReserved(key)) {
+ logger.warn("{} attempted to set reserved logging attribute
[{}]; dropping the entry. Reserved keys are managed by the framework.", this,
key);
+ continue;
+ }
+ filtered.put(key, entry.getValue());
+ }
+ return Collections.unmodifiableMap(filtered);
+ }
+
+ private void rebuildLoggingAttributes() {
+ final Map<String, String> merged;
+ final Map<String, String> custom;
+ synchronized (loggingAttributesLock) {
+ custom = customLoggingAttributes;
+ merged = new HashMap<>(custom.size() +
ConnectorLoggingAttribute.values().length);
+ merged.putAll(custom);
+ // Framework keys are applied last so they always win against any
not-yet-filtered overlap.
+ merged.put(ConnectorLoggingAttribute.CONNECTOR_ID.getAttribute(),
identifier);
+
merged.put(ConnectorLoggingAttribute.CONNECTOR_NAME.getAttribute(), name ==
null ? "" : name);
+
merged.put(ConnectorLoggingAttribute.CONNECTOR_COMPONENT.getAttribute(),
componentCanonicalClass == null ? "" : componentCanonicalClass);
+ if (bundleCoordinate != null) {
+
merged.put(ConnectorLoggingAttribute.CONNECTOR_BUNDLE_GROUP.getAttribute(),
bundleCoordinate.getGroup());
+
merged.put(ConnectorLoggingAttribute.CONNECTOR_BUNDLE_ARTIFACT.getAttribute(),
bundleCoordinate.getId());
+
merged.put(ConnectorLoggingAttribute.CONNECTOR_BUNDLE_VERSION.getAttribute(),
bundleCoordinate.getVersion());
+ }
+ this.mergedLoggingAttributes = Collections.unmodifiableMap(merged);
+ }
+
+ pushLoggingAttributesToManagedFlow(merged);
+ }
+
+ private void pushLoggingAttributesToManagedFlow(final Map<String, String>
attributes) {
+ final ProcessGroup managedProcessGroup = getProcessGroup();
+ if (managedProcessGroup instanceof StandardProcessGroup
standardProcessGroup) {
+ standardProcessGroup.setConnectorLoggingAttributes(attributes);
+ }
Review Comment:
I'll add the method to `ProcessGroup`
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]