This is an automated email from the ASF dual-hosted git repository. turcsanyi pushed a commit to branch support/nifi-1.x in repository https://gitbox.apache.org/repos/asf/nifi.git
The following commit(s) were added to refs/heads/support/nifi-1.x by this push: new 8511191ad1 NIFI-11261 Added Primary Node State handling to GetAzureEventHub 8511191ad1 is described below commit 8511191ad149907eaf658845ab0e7669d4b712d7 Author: exceptionfactory <exceptionfact...@apache.org> AuthorDate: Wed Mar 8 14:53:52 2023 -0600 NIFI-11261 Added Primary Node State handling to GetAzureEventHub - Updated Qpid Proton J from 0.34.0 to 0.34.1 This closes #7023. Signed-off-by: Peter Turcsanyi <turcsa...@apache.org> --- .../azure/eventhub/GetAzureEventHub.java | 98 ++++++++++++++++++++-- .../azure/eventhub/GetAzureEventHubTest.java | 49 ++++++++++- nifi-nar-bundles/nifi-azure-bundle/pom.xml | 2 +- 3 files changed, 140 insertions(+), 9 deletions(-) diff --git a/nifi-nar-bundles/nifi-azure-bundle/nifi-azure-processors/src/main/java/org/apache/nifi/processors/azure/eventhub/GetAzureEventHub.java b/nifi-nar-bundles/nifi-azure-bundle/nifi-azure-processors/src/main/java/org/apache/nifi/processors/azure/eventhub/GetAzureEventHub.java index d9f00839d3..855b531d92 100644 --- a/nifi-nar-bundles/nifi-azure-bundle/nifi-azure-processors/src/main/java/org/apache/nifi/processors/azure/eventhub/GetAzureEventHub.java +++ b/nifi-nar-bundles/nifi-azure-bundle/nifi-azure-processors/src/main/java/org/apache/nifi/processors/azure/eventhub/GetAzureEventHub.java @@ -23,13 +23,16 @@ import java.util.Collection; import java.util.Collections; import java.util.LinkedHashMap; import java.util.List; +import java.util.Optional; import java.util.Map; import java.util.Set; import java.util.concurrent.BlockingQueue; import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.LinkedBlockingQueue; import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicReference; +import com.azure.core.amqp.AmqpClientOptions; import com.azure.core.credential.AzureNamedKeyCredential; import com.azure.identity.ManagedIdentityCredential; import com.azure.identity.ManagedIdentityCredentialBuilder; @@ -49,10 +52,13 @@ import org.apache.nifi.annotation.documentation.SeeAlso; import org.apache.nifi.annotation.documentation.Tags; import org.apache.nifi.annotation.lifecycle.OnScheduled; import org.apache.nifi.annotation.lifecycle.OnStopped; +import org.apache.nifi.annotation.notification.OnPrimaryNodeStateChange; +import org.apache.nifi.annotation.notification.PrimaryNodeState; import org.apache.nifi.components.PropertyDescriptor; import org.apache.nifi.components.PropertyValue; import org.apache.nifi.components.ValidationContext; import org.apache.nifi.components.ValidationResult; +import org.apache.nifi.controller.NodeTypeProvider; import org.apache.nifi.expression.ExpressionLanguageScope; import org.apache.nifi.flowfile.FlowFile; import org.apache.nifi.processor.AbstractProcessor; @@ -61,6 +67,7 @@ import org.apache.nifi.processor.ProcessSession; import org.apache.nifi.processor.Relationship; import org.apache.nifi.processor.exception.ProcessException; import org.apache.nifi.processor.util.StandardValidators; +import org.apache.nifi.scheduling.ExecutionNode; import org.apache.nifi.util.StopWatch; import org.apache.nifi.processors.azure.eventhub.utils.AzureEventHubUtils; @@ -85,6 +92,8 @@ public class GetAzureEventHub extends AbstractProcessor { private static final Duration DEFAULT_FETCH_TIMEOUT = Duration.ofSeconds(60); private static final int DEFAULT_FETCH_SIZE = 100; + private static final String NODE_CLIENT_IDENTIFIER_FORMAT = "%s-%s"; + static final PropertyDescriptor EVENT_HUB_NAME = new PropertyDescriptor.Builder() .name("Event Hub Name") .description("Name of Azure Event Hubs source") @@ -180,10 +189,16 @@ public class GetAzureEventHub extends AbstractProcessor { private final Map<String, EventPosition> partitionEventPositions = new ConcurrentHashMap<>(); - private volatile BlockingQueue<String> partitionIds = new LinkedBlockingQueue<>(); + private final BlockingQueue<String> partitionIds = new LinkedBlockingQueue<>(); + + private final AtomicReference<ExecutionNode> configuredExecutionNode = new AtomicReference<>(ExecutionNode.ALL); + private volatile int receiverFetchSize; + private volatile Duration receiverFetchTimeout; + private EventHubClientBuilder configuredClientBuilder; + private EventHubConsumerClient eventHubConsumerClient; @Override @@ -201,20 +216,40 @@ public class GetAzureEventHub extends AbstractProcessor { return AzureEventHubUtils.customValidate(ACCESS_POLICY, POLICY_PRIMARY_KEY, context); } + @OnPrimaryNodeStateChange + public void onPrimaryNodeStateChange(final PrimaryNodeState primaryNodeState) { + final ExecutionNode executionNode = configuredExecutionNode.get(); + if (executionNode == ExecutionNode.PRIMARY) { + if (PrimaryNodeState.PRIMARY_NODE_REVOKED == primaryNodeState) { + closeClient(); + getLogger().info("Consumer Client closed based on Execution Node [{}] and Primary Node State [{}]", executionNode, primaryNodeState); + } else { + createClient(); + getLogger().info("Consumer Client created based on Execution Node [{}] and Primary Node State [{}]", executionNode, primaryNodeState); + } + } else { + getLogger().debug("Consumer Client not changed based on Execution Node [{}]", executionNode); + } + } + @OnStopped public void closeClient() { + partitionIds.clear(); partitionEventPositions.clear(); if (eventHubConsumerClient == null) { - getLogger().info("Azure Event Hub Consumer Client not configured"); + getLogger().debug("Consumer Client not configured"); } else { eventHubConsumerClient.close(); + getLogger().info("Consumer Client for Event Hub [{}] closed", eventHubConsumerClient.getEventHubName()); } } @OnScheduled public void onScheduled(final ProcessContext context) { - eventHubConsumerClient = createEventHubConsumerClient(context); + configuredExecutionNode.set(context.getExecutionNode()); + configuredClientBuilder = createEventHubClientBuilder(context); + createClient(); if (context.getProperty(RECEIVER_FETCH_SIZE).isSet()) { receiverFetchSize = context.getProperty(RECEIVER_FETCH_SIZE).asInteger(); @@ -227,8 +262,6 @@ public class GetAzureEventHub extends AbstractProcessor { receiverFetchTimeout = DEFAULT_FETCH_TIMEOUT; } - this.partitionIds = getPartitionIds(); - final PropertyValue enqueuedTimeProperty = context.getProperty(ENQUEUE_TIME); final Instant initialEnqueuedTime; if (enqueuedTimeProperty.isSet()) { @@ -310,7 +343,30 @@ public class GetAzureEventHub extends AbstractProcessor { return eventHubConsumerClient.receiveFromPartition(partitionId, receiverFetchSize, eventPosition, receiverFetchTimeout); } - private EventHubConsumerClient createEventHubConsumerClient(final ProcessContext context) { + private void createClient() { + if (isCreateClientEnabled()) { + closeClient(); + eventHubConsumerClient = configuredClientBuilder.buildConsumerClient(); + partitionIds.addAll(getPartitionIds()); + getLogger().info("Consumer Client created for Event Hub [{}] Partitions {}", eventHubConsumerClient.getEventHubName(), partitionIds); + } + } + + private boolean isCreateClientEnabled() { + final boolean enabled; + + final ExecutionNode executionNode = configuredExecutionNode.get(); + if (ExecutionNode.PRIMARY == executionNode) { + final NodeTypeProvider nodeTypeProvider = getNodeTypeProvider(); + enabled = nodeTypeProvider.isPrimary(); + } else { + enabled = true; + } + + return enabled; + } + + private EventHubClientBuilder createEventHubClientBuilder(final ProcessContext context) { final String namespace = context.getProperty(NAMESPACE).getValue(); final String eventHubName = context.getProperty(EVENT_HUB_NAME).getValue(); final String serviceBusEndpoint = context.getProperty(SERVICE_BUS_ENDPOINT).getValue(); @@ -332,7 +388,14 @@ public class GetAzureEventHub extends AbstractProcessor { final AzureNamedKeyCredential azureNamedKeyCredential = new AzureNamedKeyCredential(policyName, policyKey); eventHubClientBuilder.credential(fullyQualifiedNamespace, eventHubName, azureNamedKeyCredential); } - return eventHubClientBuilder.buildConsumerClient(); + + // Set Azure Event Hub Client Identifier using Processor Identifier instead of default random UUID + final AmqpClientOptions clientOptions = new AmqpClientOptions(); + final String clientIdentifier = getClientIdentifier(); + clientOptions.setIdentifier(clientIdentifier); + eventHubClientBuilder.clientOptions(clientOptions); + + return eventHubClientBuilder; } private String getTransitUri(final String partitionId) { @@ -344,6 +407,27 @@ public class GetAzureEventHub extends AbstractProcessor { ); } + private String getClientIdentifier() { + final String clientIdentifier; + + final String componentIdentifier = getIdentifier(); + + final NodeTypeProvider nodeTypeProvider = getNodeTypeProvider(); + if (nodeTypeProvider.isClustered()) { + final Optional<String> currentNode = nodeTypeProvider.getCurrentNode(); + if (currentNode.isPresent()) { + final String currentNodeId = currentNode.get(); + clientIdentifier = String.format(NODE_CLIENT_IDENTIFIER_FORMAT, currentNodeId, componentIdentifier); + } else { + clientIdentifier = componentIdentifier; + } + } else { + clientIdentifier = componentIdentifier; + } + + return clientIdentifier; + } + private Map<String, String> getAttributes(final PartitionEvent partitionEvent) { final Map<String, String> attributes = new LinkedHashMap<>(); diff --git a/nifi-nar-bundles/nifi-azure-bundle/nifi-azure-processors/src/test/java/org/apache/nifi/processors/azure/eventhub/GetAzureEventHubTest.java b/nifi-nar-bundles/nifi-azure-bundle/nifi-azure-processors/src/test/java/org/apache/nifi/processors/azure/eventhub/GetAzureEventHubTest.java index a46aa54558..e40d714111 100644 --- a/nifi-nar-bundles/nifi-azure-bundle/nifi-azure-processors/src/test/java/org/apache/nifi/processors/azure/eventhub/GetAzureEventHubTest.java +++ b/nifi-nar-bundles/nifi-azure-bundle/nifi-azure-processors/src/test/java/org/apache/nifi/processors/azure/eventhub/GetAzureEventHubTest.java @@ -20,6 +20,9 @@ import com.azure.messaging.eventhubs.EventData; import com.azure.messaging.eventhubs.models.LastEnqueuedEventProperties; import com.azure.messaging.eventhubs.models.PartitionContext; import com.azure.messaging.eventhubs.models.PartitionEvent; +import org.apache.nifi.annotation.notification.PrimaryNodeState; +import org.apache.nifi.processor.ProcessContext; +import org.apache.nifi.scheduling.ExecutionNode; import org.apache.nifi.util.MockFlowFile; import org.apache.nifi.util.TestRunner; import org.apache.nifi.util.TestRunners; @@ -33,8 +36,11 @@ import java.util.List; import java.util.concurrent.BlockingQueue; import java.util.concurrent.LinkedBlockingQueue; +import static org.mockito.Mockito.spy; +import static org.mockito.Mockito.when; + public class GetAzureEventHubTest { - private static final String DOMAIN_NAME = "servicebus"; + private static final String DOMAIN_NAME = "DOMAIN"; private static final String EVENT_HUB_NAMESPACE = "NAMESPACE"; private static final String EVENT_HUB_NAME = "NAME"; private static final String POLICY_NAME = "POLICY"; @@ -111,6 +117,47 @@ public class GetAzureEventHubTest { flowFile.assertAttributeEquals("eventhub.name", EVENT_HUB_NAME); } + @Test + public void testPrimaryNodeRevoked() { + setProperties(); + + final ProcessContext processContext = spy(testRunner.getProcessContext()); + when(processContext.getExecutionNode()).thenReturn(ExecutionNode.PRIMARY); + + testRunner.setIsConfiguredForClustering(true); + testRunner.setPrimaryNode(true); + final GetAzureEventHub processor = (GetAzureEventHub) testRunner.getProcessor(); + processor.onScheduled(processContext); + processor.onPrimaryNodeStateChange(PrimaryNodeState.PRIMARY_NODE_REVOKED); + + final PartitionEvent partitionEvent = createPartitionEvent(); + partitionEvents.add(partitionEvent); + + testRunner.run(1, true, false); + testRunner.assertAllFlowFilesTransferred(GetAzureEventHub.REL_SUCCESS, 0); + } + + @Test + public void testPrimaryNodeRevokedThenElected() { + setProperties(); + + final ProcessContext processContext = spy(testRunner.getProcessContext()); + when(processContext.getExecutionNode()).thenReturn(ExecutionNode.PRIMARY); + + testRunner.setIsConfiguredForClustering(true); + testRunner.setPrimaryNode(true); + final GetAzureEventHub processor = (GetAzureEventHub) testRunner.getProcessor(); + processor.onScheduled(processContext); + processor.onPrimaryNodeStateChange(PrimaryNodeState.PRIMARY_NODE_REVOKED); + processor.onPrimaryNodeStateChange(PrimaryNodeState.ELECTED_PRIMARY_NODE); + + final PartitionEvent partitionEvent = createPartitionEvent(); + partitionEvents.add(partitionEvent); + + testRunner.run(1, true, false); + testRunner.assertAllFlowFilesTransferred(GetAzureEventHub.REL_SUCCESS, 1); + } + private class MockGetAzureEventHub extends GetAzureEventHub { @Override diff --git a/nifi-nar-bundles/nifi-azure-bundle/pom.xml b/nifi-nar-bundles/nifi-azure-bundle/pom.xml index db952a3f47..baaf79eb42 100644 --- a/nifi-nar-bundles/nifi-azure-bundle/pom.xml +++ b/nifi-nar-bundles/nifi-azure-bundle/pom.xml @@ -29,7 +29,7 @@ <azure.sdk.bom.version>1.2.9</azure.sdk.bom.version> <microsoft.azure-storage.version>8.6.6</microsoft.azure-storage.version> <msal4j.version>1.13.3</msal4j.version> - <qpid.proton.version>0.34.0</qpid.proton.version> + <qpid.proton.version>0.34.1</qpid.proton.version> </properties> <modules>