This is an automated email from the ASF dual-hosted git repository.

turcsanyi pushed a commit to branch support/nifi-1.x
in repository https://gitbox.apache.org/repos/asf/nifi.git


The following commit(s) were added to refs/heads/support/nifi-1.x by this push:
     new 8511191ad1 NIFI-11261 Added Primary Node State handling to 
GetAzureEventHub
8511191ad1 is described below

commit 8511191ad149907eaf658845ab0e7669d4b712d7
Author: exceptionfactory <exceptionfact...@apache.org>
AuthorDate: Wed Mar 8 14:53:52 2023 -0600

    NIFI-11261 Added Primary Node State handling to GetAzureEventHub
    
    - Updated Qpid Proton J from 0.34.0 to 0.34.1
    
    This closes #7023.
    
    Signed-off-by: Peter Turcsanyi <turcsa...@apache.org>
---
 .../azure/eventhub/GetAzureEventHub.java           | 98 ++++++++++++++++++++--
 .../azure/eventhub/GetAzureEventHubTest.java       | 49 ++++++++++-
 nifi-nar-bundles/nifi-azure-bundle/pom.xml         |  2 +-
 3 files changed, 140 insertions(+), 9 deletions(-)

diff --git 
a/nifi-nar-bundles/nifi-azure-bundle/nifi-azure-processors/src/main/java/org/apache/nifi/processors/azure/eventhub/GetAzureEventHub.java
 
b/nifi-nar-bundles/nifi-azure-bundle/nifi-azure-processors/src/main/java/org/apache/nifi/processors/azure/eventhub/GetAzureEventHub.java
index d9f00839d3..855b531d92 100644
--- 
a/nifi-nar-bundles/nifi-azure-bundle/nifi-azure-processors/src/main/java/org/apache/nifi/processors/azure/eventhub/GetAzureEventHub.java
+++ 
b/nifi-nar-bundles/nifi-azure-bundle/nifi-azure-processors/src/main/java/org/apache/nifi/processors/azure/eventhub/GetAzureEventHub.java
@@ -23,13 +23,16 @@ import java.util.Collection;
 import java.util.Collections;
 import java.util.LinkedHashMap;
 import java.util.List;
+import java.util.Optional;
 import java.util.Map;
 import java.util.Set;
 import java.util.concurrent.BlockingQueue;
 import java.util.concurrent.ConcurrentHashMap;
 import java.util.concurrent.LinkedBlockingQueue;
 import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicReference;
 
+import com.azure.core.amqp.AmqpClientOptions;
 import com.azure.core.credential.AzureNamedKeyCredential;
 import com.azure.identity.ManagedIdentityCredential;
 import com.azure.identity.ManagedIdentityCredentialBuilder;
@@ -49,10 +52,13 @@ import org.apache.nifi.annotation.documentation.SeeAlso;
 import org.apache.nifi.annotation.documentation.Tags;
 import org.apache.nifi.annotation.lifecycle.OnScheduled;
 import org.apache.nifi.annotation.lifecycle.OnStopped;
+import org.apache.nifi.annotation.notification.OnPrimaryNodeStateChange;
+import org.apache.nifi.annotation.notification.PrimaryNodeState;
 import org.apache.nifi.components.PropertyDescriptor;
 import org.apache.nifi.components.PropertyValue;
 import org.apache.nifi.components.ValidationContext;
 import org.apache.nifi.components.ValidationResult;
+import org.apache.nifi.controller.NodeTypeProvider;
 import org.apache.nifi.expression.ExpressionLanguageScope;
 import org.apache.nifi.flowfile.FlowFile;
 import org.apache.nifi.processor.AbstractProcessor;
@@ -61,6 +67,7 @@ import org.apache.nifi.processor.ProcessSession;
 import org.apache.nifi.processor.Relationship;
 import org.apache.nifi.processor.exception.ProcessException;
 import org.apache.nifi.processor.util.StandardValidators;
+import org.apache.nifi.scheduling.ExecutionNode;
 import org.apache.nifi.util.StopWatch;
 import org.apache.nifi.processors.azure.eventhub.utils.AzureEventHubUtils;
 
@@ -85,6 +92,8 @@ public class GetAzureEventHub extends AbstractProcessor {
     private static final Duration DEFAULT_FETCH_TIMEOUT = 
Duration.ofSeconds(60);
     private static final int DEFAULT_FETCH_SIZE = 100;
 
+    private static final String NODE_CLIENT_IDENTIFIER_FORMAT = "%s-%s";
+
     static final PropertyDescriptor EVENT_HUB_NAME = new 
PropertyDescriptor.Builder()
             .name("Event Hub Name")
             .description("Name of Azure Event Hubs source")
@@ -180,10 +189,16 @@ public class GetAzureEventHub extends AbstractProcessor {
 
     private final Map<String, EventPosition> partitionEventPositions = new 
ConcurrentHashMap<>();
 
-    private volatile BlockingQueue<String> partitionIds = new 
LinkedBlockingQueue<>();
+    private final BlockingQueue<String> partitionIds = new 
LinkedBlockingQueue<>();
+
+    private final AtomicReference<ExecutionNode> configuredExecutionNode = new 
AtomicReference<>(ExecutionNode.ALL);
+
     private volatile int receiverFetchSize;
+
     private volatile Duration receiverFetchTimeout;
 
+    private EventHubClientBuilder configuredClientBuilder;
+
     private EventHubConsumerClient eventHubConsumerClient;
 
     @Override
@@ -201,20 +216,40 @@ public class GetAzureEventHub extends AbstractProcessor {
         return AzureEventHubUtils.customValidate(ACCESS_POLICY, 
POLICY_PRIMARY_KEY, context);
     }
 
+    @OnPrimaryNodeStateChange
+    public void onPrimaryNodeStateChange(final PrimaryNodeState 
primaryNodeState) {
+        final ExecutionNode executionNode = configuredExecutionNode.get();
+        if (executionNode == ExecutionNode.PRIMARY) {
+            if (PrimaryNodeState.PRIMARY_NODE_REVOKED == primaryNodeState) {
+                closeClient();
+                getLogger().info("Consumer Client closed based on Execution 
Node [{}] and Primary Node State [{}]", executionNode, primaryNodeState);
+            } else {
+                createClient();
+                getLogger().info("Consumer Client created based on Execution 
Node [{}] and Primary Node State [{}]", executionNode, primaryNodeState);
+            }
+        } else {
+            getLogger().debug("Consumer Client not changed based on Execution 
Node [{}]", executionNode);
+        }
+    }
+
     @OnStopped
     public void closeClient() {
+        partitionIds.clear();
         partitionEventPositions.clear();
 
         if (eventHubConsumerClient == null) {
-            getLogger().info("Azure Event Hub Consumer Client not configured");
+            getLogger().debug("Consumer Client not configured");
         } else {
             eventHubConsumerClient.close();
+            getLogger().info("Consumer Client for Event Hub [{}] closed", 
eventHubConsumerClient.getEventHubName());
         }
     }
 
     @OnScheduled
     public void onScheduled(final ProcessContext context) {
-        eventHubConsumerClient = createEventHubConsumerClient(context);
+        configuredExecutionNode.set(context.getExecutionNode());
+        configuredClientBuilder = createEventHubClientBuilder(context);
+        createClient();
 
         if (context.getProperty(RECEIVER_FETCH_SIZE).isSet()) {
             receiverFetchSize = 
context.getProperty(RECEIVER_FETCH_SIZE).asInteger();
@@ -227,8 +262,6 @@ public class GetAzureEventHub extends AbstractProcessor {
             receiverFetchTimeout = DEFAULT_FETCH_TIMEOUT;
         }
 
-        this.partitionIds = getPartitionIds();
-
         final PropertyValue enqueuedTimeProperty = 
context.getProperty(ENQUEUE_TIME);
         final Instant initialEnqueuedTime;
         if (enqueuedTimeProperty.isSet()) {
@@ -310,7 +343,30 @@ public class GetAzureEventHub extends AbstractProcessor {
         return eventHubConsumerClient.receiveFromPartition(partitionId, 
receiverFetchSize, eventPosition, receiverFetchTimeout);
     }
 
-    private EventHubConsumerClient createEventHubConsumerClient(final 
ProcessContext context) {
+    private void createClient() {
+        if (isCreateClientEnabled()) {
+            closeClient();
+            eventHubConsumerClient = 
configuredClientBuilder.buildConsumerClient();
+            partitionIds.addAll(getPartitionIds());
+            getLogger().info("Consumer Client created for Event Hub [{}] 
Partitions {}", eventHubConsumerClient.getEventHubName(), partitionIds);
+        }
+    }
+
+    private boolean isCreateClientEnabled() {
+        final boolean enabled;
+
+        final ExecutionNode executionNode = configuredExecutionNode.get();
+        if (ExecutionNode.PRIMARY == executionNode) {
+            final NodeTypeProvider nodeTypeProvider = getNodeTypeProvider();
+            enabled = nodeTypeProvider.isPrimary();
+        } else {
+            enabled = true;
+        }
+
+        return enabled;
+    }
+
+    private EventHubClientBuilder createEventHubClientBuilder(final 
ProcessContext context) {
         final String namespace = context.getProperty(NAMESPACE).getValue();
         final String eventHubName = 
context.getProperty(EVENT_HUB_NAME).getValue();
         final String serviceBusEndpoint = 
context.getProperty(SERVICE_BUS_ENDPOINT).getValue();
@@ -332,7 +388,14 @@ public class GetAzureEventHub extends AbstractProcessor {
             final AzureNamedKeyCredential azureNamedKeyCredential = new 
AzureNamedKeyCredential(policyName, policyKey);
             eventHubClientBuilder.credential(fullyQualifiedNamespace, 
eventHubName, azureNamedKeyCredential);
         }
-        return eventHubClientBuilder.buildConsumerClient();
+
+        // Set Azure Event Hub Client Identifier using Processor Identifier 
instead of default random UUID
+        final AmqpClientOptions clientOptions = new AmqpClientOptions();
+        final String clientIdentifier = getClientIdentifier();
+        clientOptions.setIdentifier(clientIdentifier);
+        eventHubClientBuilder.clientOptions(clientOptions);
+
+        return eventHubClientBuilder;
     }
 
     private String getTransitUri(final String partitionId) {
@@ -344,6 +407,27 @@ public class GetAzureEventHub extends AbstractProcessor {
         );
     }
 
+    private String getClientIdentifier() {
+        final String clientIdentifier;
+
+        final String componentIdentifier = getIdentifier();
+
+        final NodeTypeProvider nodeTypeProvider = getNodeTypeProvider();
+        if (nodeTypeProvider.isClustered()) {
+            final Optional<String> currentNode = 
nodeTypeProvider.getCurrentNode();
+            if (currentNode.isPresent()) {
+                final String currentNodeId = currentNode.get();
+                clientIdentifier = 
String.format(NODE_CLIENT_IDENTIFIER_FORMAT, currentNodeId, 
componentIdentifier);
+            } else {
+                clientIdentifier = componentIdentifier;
+            }
+        } else {
+            clientIdentifier = componentIdentifier;
+        }
+
+        return clientIdentifier;
+    }
+
     private Map<String, String> getAttributes(final PartitionEvent 
partitionEvent) {
         final Map<String, String> attributes = new LinkedHashMap<>();
 
diff --git 
a/nifi-nar-bundles/nifi-azure-bundle/nifi-azure-processors/src/test/java/org/apache/nifi/processors/azure/eventhub/GetAzureEventHubTest.java
 
b/nifi-nar-bundles/nifi-azure-bundle/nifi-azure-processors/src/test/java/org/apache/nifi/processors/azure/eventhub/GetAzureEventHubTest.java
index a46aa54558..e40d714111 100644
--- 
a/nifi-nar-bundles/nifi-azure-bundle/nifi-azure-processors/src/test/java/org/apache/nifi/processors/azure/eventhub/GetAzureEventHubTest.java
+++ 
b/nifi-nar-bundles/nifi-azure-bundle/nifi-azure-processors/src/test/java/org/apache/nifi/processors/azure/eventhub/GetAzureEventHubTest.java
@@ -20,6 +20,9 @@ import com.azure.messaging.eventhubs.EventData;
 import com.azure.messaging.eventhubs.models.LastEnqueuedEventProperties;
 import com.azure.messaging.eventhubs.models.PartitionContext;
 import com.azure.messaging.eventhubs.models.PartitionEvent;
+import org.apache.nifi.annotation.notification.PrimaryNodeState;
+import org.apache.nifi.processor.ProcessContext;
+import org.apache.nifi.scheduling.ExecutionNode;
 import org.apache.nifi.util.MockFlowFile;
 import org.apache.nifi.util.TestRunner;
 import org.apache.nifi.util.TestRunners;
@@ -33,8 +36,11 @@ import java.util.List;
 import java.util.concurrent.BlockingQueue;
 import java.util.concurrent.LinkedBlockingQueue;
 
+import static org.mockito.Mockito.spy;
+import static org.mockito.Mockito.when;
+
 public class GetAzureEventHubTest {
-    private static final String DOMAIN_NAME = "servicebus";
+    private static final String DOMAIN_NAME = "DOMAIN";
     private static final String EVENT_HUB_NAMESPACE = "NAMESPACE";
     private static final String EVENT_HUB_NAME = "NAME";
     private static final String POLICY_NAME = "POLICY";
@@ -111,6 +117,47 @@ public class GetAzureEventHubTest {
         flowFile.assertAttributeEquals("eventhub.name", EVENT_HUB_NAME);
     }
 
+    @Test
+    public void testPrimaryNodeRevoked() {
+        setProperties();
+
+        final ProcessContext processContext = 
spy(testRunner.getProcessContext());
+        
when(processContext.getExecutionNode()).thenReturn(ExecutionNode.PRIMARY);
+
+        testRunner.setIsConfiguredForClustering(true);
+        testRunner.setPrimaryNode(true);
+        final GetAzureEventHub processor = (GetAzureEventHub) 
testRunner.getProcessor();
+        processor.onScheduled(processContext);
+        
processor.onPrimaryNodeStateChange(PrimaryNodeState.PRIMARY_NODE_REVOKED);
+
+        final PartitionEvent partitionEvent = createPartitionEvent();
+        partitionEvents.add(partitionEvent);
+
+        testRunner.run(1, true, false);
+        testRunner.assertAllFlowFilesTransferred(GetAzureEventHub.REL_SUCCESS, 
0);
+    }
+
+    @Test
+    public void testPrimaryNodeRevokedThenElected() {
+        setProperties();
+
+        final ProcessContext processContext = 
spy(testRunner.getProcessContext());
+        
when(processContext.getExecutionNode()).thenReturn(ExecutionNode.PRIMARY);
+
+        testRunner.setIsConfiguredForClustering(true);
+        testRunner.setPrimaryNode(true);
+        final GetAzureEventHub processor = (GetAzureEventHub) 
testRunner.getProcessor();
+        processor.onScheduled(processContext);
+        
processor.onPrimaryNodeStateChange(PrimaryNodeState.PRIMARY_NODE_REVOKED);
+        
processor.onPrimaryNodeStateChange(PrimaryNodeState.ELECTED_PRIMARY_NODE);
+
+        final PartitionEvent partitionEvent = createPartitionEvent();
+        partitionEvents.add(partitionEvent);
+
+        testRunner.run(1, true, false);
+        testRunner.assertAllFlowFilesTransferred(GetAzureEventHub.REL_SUCCESS, 
1);
+    }
+
     private class MockGetAzureEventHub extends GetAzureEventHub {
 
         @Override
diff --git a/nifi-nar-bundles/nifi-azure-bundle/pom.xml 
b/nifi-nar-bundles/nifi-azure-bundle/pom.xml
index db952a3f47..baaf79eb42 100644
--- a/nifi-nar-bundles/nifi-azure-bundle/pom.xml
+++ b/nifi-nar-bundles/nifi-azure-bundle/pom.xml
@@ -29,7 +29,7 @@
         <azure.sdk.bom.version>1.2.9</azure.sdk.bom.version>
         
<microsoft.azure-storage.version>8.6.6</microsoft.azure-storage.version>
         <msal4j.version>1.13.3</msal4j.version>
-        <qpid.proton.version>0.34.0</qpid.proton.version>
+        <qpid.proton.version>0.34.1</qpid.proton.version>
     </properties>
 
     <modules>

Reply via email to