This is an automated email from the ASF dual-hosted git repository.

pvillard pushed a commit to branch support/nifi-1.x
in repository https://gitbox.apache.org/repos/asf/nifi.git


The following commit(s) were added to refs/heads/support/nifi-1.x by this push:
     new 3ce2deb706 NIFI-12944 - Add PeerAddress as Attribute into the flowfile
3ce2deb706 is described below

commit 3ce2deb706247bbc17bc65304f0b41f4c2fabb5c
Author: Ricardo Ferreira <ricardo.g.ferre...@nokia.com>
AuthorDate: Thu Mar 21 16:34:21 2024 +0000

    NIFI-12944 - Add PeerAddress as Attribute into the flowfile
    
    Signed-off-by: Pierre Villard <pierre.villard...@gmail.com>
    
    This closes #8557.
---
 .../nifi/snmp/operations/SNMPTrapReceiver.java     | 10 ++++++++--
 .../nifi/snmp/operations/SNMPTrapReceiverTest.java | 23 ++++++++++++++++++++++
 2 files changed, 31 insertions(+), 2 deletions(-)

diff --git 
a/nifi-nar-bundles/nifi-snmp-bundle/nifi-snmp-processors/src/main/java/org/apache/nifi/snmp/operations/SNMPTrapReceiver.java
 
b/nifi-nar-bundles/nifi-snmp-bundle/nifi-snmp-processors/src/main/java/org/apache/nifi/snmp/operations/SNMPTrapReceiver.java
index d2eaf19528..c93bb83e46 100644
--- 
a/nifi-nar-bundles/nifi-snmp-bundle/nifi-snmp-processors/src/main/java/org/apache/nifi/snmp/operations/SNMPTrapReceiver.java
+++ 
b/nifi-nar-bundles/nifi-snmp-bundle/nifi-snmp-processors/src/main/java/org/apache/nifi/snmp/operations/SNMPTrapReceiver.java
@@ -25,6 +25,7 @@ import org.snmp4j.CommandResponder;
 import org.snmp4j.CommandResponderEvent;
 import org.snmp4j.PDU;
 import org.snmp4j.PDUv1;
+import org.snmp4j.smi.Address;
 
 import java.util.Map;
 
@@ -46,7 +47,7 @@ public class SNMPTrapReceiver implements CommandResponder {
         final PDU pdu = event.getPDU();
         if (isValidTrapPdu(pdu)) {
             final ProcessSession processSession = 
processSessionFactory.createSession();
-            final FlowFile flowFile = createFlowFile(processSession, pdu);
+            final FlowFile flowFile = createFlowFile(processSession,event);
             processSession.getProvenanceReporter().create(flowFile, 
event.getPeerAddress() + "/" + pdu.getRequestID());
             if (pdu.getErrorStatus() == PDU.noError) {
                 processSession.transfer(flowFile, REL_SUCCESS);
@@ -59,14 +60,19 @@ public class SNMPTrapReceiver implements CommandResponder {
         }
     }
 
-    private FlowFile createFlowFile(final ProcessSession processSession, final 
PDU pdu) {
+    private FlowFile createFlowFile(final ProcessSession processSession, final 
 CommandResponderEvent event) {
         FlowFile flowFile = processSession.create();
         final Map<String, String> attributes;
+        final PDU pdu = event.getPDU();
+        final Address peerAddress = event.getPeerAddress();
         if (pdu instanceof PDUv1) {
             attributes = SNMPUtils.getV1TrapPduAttributeMap((PDUv1) pdu);
         } else {
             attributes = SNMPUtils.getPduAttributeMap(pdu);
         }
+        if (peerAddress.isValid()) {
+            processSession.putAttribute(flowFile, SNMPUtils.SNMP_PROP_PREFIX + 
"peerAddress", peerAddress.toString());
+        }
         flowFile = processSession.putAllAttributes(flowFile, attributes);
         return flowFile;
     }
diff --git 
a/nifi-nar-bundles/nifi-snmp-bundle/nifi-snmp-processors/src/test/java/org/apache/nifi/snmp/operations/SNMPTrapReceiverTest.java
 
b/nifi-nar-bundles/nifi-snmp-bundle/nifi-snmp-processors/src/test/java/org/apache/nifi/snmp/operations/SNMPTrapReceiverTest.java
index eeca40eefe..0d273c3921 100644
--- 
a/nifi-nar-bundles/nifi-snmp-bundle/nifi-snmp-processors/src/test/java/org/apache/nifi/snmp/operations/SNMPTrapReceiverTest.java
+++ 
b/nifi-nar-bundles/nifi-snmp-bundle/nifi-snmp-processors/src/test/java/org/apache/nifi/snmp/operations/SNMPTrapReceiverTest.java
@@ -29,6 +29,7 @@ import org.junit.jupiter.api.Test;
 import org.snmp4j.CommandResponderEvent;
 import org.snmp4j.PDU;
 import org.snmp4j.PDUv1;
+import org.snmp4j.smi.Address;
 import org.snmp4j.smi.OID;
 import org.snmp4j.smi.VariableBinding;
 
@@ -95,9 +96,15 @@ class SNMPTrapReceiverTest {
         when(mockV1Pdu.getType()).thenReturn(PDU.V1TRAP);
         when(mockV1Pdu.getEnterprise()).thenReturn(new 
OID("1.3.6.1.2.1.1.1.0"));
         when(mockV1Pdu.getSpecificTrap()).thenReturn(4);
+
+        final Address mockAddress = mock(Address.class);
+        when(mockAddress.toString()).thenReturn("127.0.0.1/62");
+        when(mockAddress.isValid()).thenReturn(true);
+
         final Vector<VariableBinding> vbs = new Vector<>();
         doReturn(vbs).when(mockV1Pdu).getVariableBindings();
         when(mockEvent.getPDU()).thenReturn(mockV1Pdu);
+        when(mockEvent.getPeerAddress()).thenReturn(mockAddress);
         
when(mockProcessSessionFactory.createSession()).thenReturn(mockProcessSession);
 
         snmpTrapReceiver.processPdu(mockEvent);
@@ -107,6 +114,8 @@ class SNMPTrapReceiverTest {
 
         assertEquals("1.3.6.1.2.1.1.1.0", 
flowFile.getAttribute("snmp$enterprise"));
         assertEquals(String.valueOf(4), 
flowFile.getAttribute("snmp$specificTrapType"));
+        assertEquals("127.0.0.1/62", 
flowFile.getAttribute("snmp$peerAddress"));
+
     }
 
     @Test
@@ -117,8 +126,15 @@ class SNMPTrapReceiverTest {
         when(mockPdu.getErrorIndex()).thenReturn(123);
         when(mockPdu.getErrorStatusText()).thenReturn("test error status 
text");
         final Vector<VariableBinding> vbs = new Vector<>();
+
+        final Address mockAddress = mock(Address.class);
+        when(mockAddress.toString()).thenReturn("127.0.0.1/62");
+        when(mockAddress.isValid()).thenReturn(true);
+
         doReturn(vbs).when(mockPdu).getVariableBindings();
         when(mockEvent.getPDU()).thenReturn(mockPdu);
+        when(mockEvent.getPeerAddress()).thenReturn(mockAddress);
+
         
when(mockProcessSessionFactory.createSession()).thenReturn(mockProcessSession);
 
         snmpTrapReceiver.processPdu(mockEvent);
@@ -128,6 +144,7 @@ class SNMPTrapReceiverTest {
 
         assertEquals(String.valueOf(123), 
flowFile.getAttribute("snmp$errorIndex"));
         assertEquals("test error status text", 
flowFile.getAttribute("snmp$errorStatusText"));
+        assertEquals("127.0.0.1/62", 
flowFile.getAttribute("snmp$peerAddress"));
     }
 
     @Test
@@ -136,9 +153,14 @@ class SNMPTrapReceiverTest {
 
         when(mockPdu.getType()).thenReturn(PDU.TRAP);
         when(mockPdu.getErrorStatus()).thenReturn(PDU.badValue);
+
+        final Address mockAddress = mock(Address.class);
+        when(mockAddress.isValid()).thenReturn(false);
+
         final Vector<VariableBinding> vbs = new Vector<>();
         doReturn(vbs).when(mockPdu).getVariableBindings();
         when(mockEvent.getPDU()).thenReturn(mockPdu);
+        when(mockEvent.getPeerAddress()).thenReturn(mockAddress);
         
when(mockProcessSessionFactory.createSession()).thenReturn(mockProcessSession);
 
         snmpTrapReceiver.processPdu(mockEvent);
@@ -149,5 +171,6 @@ class SNMPTrapReceiverTest {
         final FlowFile flowFile = flowFiles.get(0);
 
         assertEquals(String.valueOf(PDU.badValue), 
flowFile.getAttribute("snmp$errorStatus"));
+        assertEquals(null, flowFile.getAttribute("snmp$peerAddress"));
     }
 }

Reply via email to