This is an automated email from the ASF dual-hosted git repository.

exceptionfactory pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/nifi.git


The following commit(s) were added to refs/heads/main by this push:
     new 184757fede NIFI-12230 Add configurable Log Level for IP not found in 
GeoEnrichIP
184757fede is described below

commit 184757fede3e253ba47552ab8e6fe8bd75f8f49f
Author: Pierre Villard <pierre.villard...@gmail.com>
AuthorDate: Thu Oct 19 22:12:34 2023 +0200

    NIFI-12230 Add configurable Log Level for IP not found in GeoEnrichIP
    
    NIFI-12253 Route to not found relationship instead of rolling back in 
GeoEnrichIPRecord
    
    This closes #7909
    
    Signed-off-by: David Handermann <exceptionfact...@apache.org>
---
 .../apache/nifi/processors/AbstractEnrichIP.java   | 15 ++++++++++
 .../org/apache/nifi/processors/GeoEnrichIP.java    | 22 +++++++++++++++
 .../apache/nifi/processors/GeoEnrichIPRecord.java  | 32 +++++++++++++++++++---
 .../nifi/processors/TestGeoEnrichIPRecord.java     | 10 +++++--
 4 files changed, 73 insertions(+), 6 deletions(-)

diff --git 
a/nifi-nar-bundles/nifi-enrich-bundle/nifi-enrich-processors/src/main/java/org/apache/nifi/processors/AbstractEnrichIP.java
 
b/nifi-nar-bundles/nifi-enrich-bundle/nifi-enrich-processors/src/main/java/org/apache/nifi/processors/AbstractEnrichIP.java
index 616d8dc3ae..0da9e224b7 100644
--- 
a/nifi-nar-bundles/nifi-enrich-bundle/nifi-enrich-processors/src/main/java/org/apache/nifi/processors/AbstractEnrichIP.java
+++ 
b/nifi-nar-bundles/nifi-enrich-bundle/nifi-enrich-processors/src/main/java/org/apache/nifi/processors/AbstractEnrichIP.java
@@ -70,6 +70,16 @@ public abstract class AbstractEnrichIP extends 
AbstractProcessor {
             
.addValidator(StandardValidators.createAttributeExpressionLanguageValidator(AttributeExpression.ResultType.STRING))
             .build();
 
+    public static final PropertyDescriptor LOG_LEVEL = new 
PropertyDescriptor.Builder()
+            .name("Log Level")
+            .displayName("Log Level")
+            .required(true)
+            .description("The Log Level to use when an IP is not found in the 
database. Accepted values: INFO, DEBUG, WARN, ERROR.")
+            .addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
+            .defaultValue(MessageLogLevel.WARN.toString())
+            
.expressionLanguageSupported(ExpressionLanguageScope.FLOWFILE_ATTRIBUTES)
+            .build();
+
     public static final Relationship REL_FOUND = new Relationship.Builder()
             .name("found")
             .description("Where to route flow files after successfully 
enriching attributes with data provided by database")
@@ -80,6 +90,10 @@ public abstract class AbstractEnrichIP extends 
AbstractProcessor {
             .description("Where to route flow files after unsuccessfully 
enriching attributes because no data was found")
             .build();
 
+    enum MessageLogLevel {
+        DEBUG, INFO, WARN, ERROR
+    }
+
     private Set<Relationship> relationships;
     private List<PropertyDescriptor> propertyDescriptors;
     final AtomicReference<DatabaseReader> databaseReaderRef = new 
AtomicReference<>(null);
@@ -134,6 +148,7 @@ public abstract class AbstractEnrichIP extends 
AbstractProcessor {
         final List<PropertyDescriptor> props = new ArrayList<>();
         props.add(GEO_DATABASE_FILE);
         props.add(IP_ADDRESS_ATTRIBUTE);
+        props.add(LOG_LEVEL);
         this.propertyDescriptors = Collections.unmodifiableList(props);
     }
 
diff --git 
a/nifi-nar-bundles/nifi-enrich-bundle/nifi-enrich-processors/src/main/java/org/apache/nifi/processors/GeoEnrichIP.java
 
b/nifi-nar-bundles/nifi-enrich-bundle/nifi-enrich-processors/src/main/java/org/apache/nifi/processors/GeoEnrichIP.java
index 96ae4ca8ef..dfcd410ac9 100644
--- 
a/nifi-nar-bundles/nifi-enrich-bundle/nifi-enrich-processors/src/main/java/org/apache/nifi/processors/GeoEnrichIP.java
+++ 
b/nifi-nar-bundles/nifi-enrich-bundle/nifi-enrich-processors/src/main/java/org/apache/nifi/processors/GeoEnrichIP.java
@@ -18,6 +18,7 @@ package org.apache.nifi.processors;
 
 import com.maxmind.db.InvalidDatabaseException;
 import com.maxmind.geoip2.DatabaseReader;
+import com.maxmind.geoip2.exception.AddressNotFoundException;
 import com.maxmind.geoip2.exception.GeoIp2Exception;
 import com.maxmind.geoip2.model.CityResponse;
 import com.maxmind.geoip2.record.Subdivision;
@@ -93,6 +94,7 @@ public class GeoEnrichIP extends AbstractEnrichIP {
         }
 
         DatabaseReader dbReader = databaseReaderRef.get();
+        final MessageLogLevel logLevel = 
MessageLogLevel.valueOf(context.getProperty(LOG_LEVEL).evaluateAttributeExpressions(flowFile).getValue().toUpperCase());
         final String ipAttributeName = 
context.getProperty(IP_ADDRESS_ATTRIBUTE).evaluateAttributeExpressions(flowFile).getValue();
         final String ipAttributeValue = flowFile.getAttribute(ipAttributeName);
 
@@ -130,6 +132,26 @@ public class GeoEnrichIP extends AbstractEnrichIP {
             getLogger().warn("Failure while trying to load enrichment data for 
{} due to {}, rolling back session "
                     + "and will reload the database on the next run", 
flowFile, idbe.getMessage());
             session.rollback();
+            return;
+        } catch (AddressNotFoundException anfe) {
+            session.transfer(flowFile, REL_NOT_FOUND);
+
+            switch (logLevel) {
+                case INFO:
+                    getLogger().info("Address not found in the database", 
anfe);
+                    break;
+                case WARN:
+                    getLogger().warn("Address not found in the database", 
anfe);
+                    break;
+                case ERROR:
+                    getLogger().error("Address not found in the database", 
anfe);
+                    break;
+                case DEBUG:
+                default:
+                    getLogger().debug("Address not found in the database", 
anfe);
+                    break;
+            }
+
             return;
         } catch (GeoIp2Exception | IOException ex) {
             // Note IOException is captured again as dbReader also makes 
InetAddress.getByName() calls.
diff --git 
a/nifi-nar-bundles/nifi-enrich-bundle/nifi-enrich-processors/src/main/java/org/apache/nifi/processors/GeoEnrichIPRecord.java
 
b/nifi-nar-bundles/nifi-enrich-bundle/nifi-enrich-processors/src/main/java/org/apache/nifi/processors/GeoEnrichIPRecord.java
index d8c6aec07e..e917b01047 100644
--- 
a/nifi-nar-bundles/nifi-enrich-bundle/nifi-enrich-processors/src/main/java/org/apache/nifi/processors/GeoEnrichIPRecord.java
+++ 
b/nifi-nar-bundles/nifi-enrich-bundle/nifi-enrich-processors/src/main/java/org/apache/nifi/processors/GeoEnrichIPRecord.java
@@ -18,6 +18,7 @@ package org.apache.nifi.processors;
 
 import com.maxmind.db.InvalidDatabaseException;
 import com.maxmind.geoip2.DatabaseReader;
+import com.maxmind.geoip2.exception.AddressNotFoundException;
 import com.maxmind.geoip2.model.CityResponse;
 import org.apache.nifi.annotation.behavior.InputRequirement;
 import org.apache.nifi.annotation.documentation.CapabilityDescription;
@@ -160,7 +161,7 @@ public class GeoEnrichIPRecord extends AbstractEnrichIP {
 
     private static final List<PropertyDescriptor> DESCRIPTORS = 
Collections.unmodifiableList(Arrays.asList(
             GEO_DATABASE_FILE, READER, WRITER, SPLIT_FOUND_NOT_FOUND, 
IP_RECORD_PATH, GEO_CITY, GEO_LATITUDE,
-            GEO_LONGITUDE, GEO_COUNTRY, GEO_COUNTRY_ISO, GEO_POSTAL_CODE
+            GEO_LONGITUDE, GEO_COUNTRY, GEO_COUNTRY_ISO, GEO_POSTAL_CODE, 
LOG_LEVEL
     ));
 
     @Override
@@ -231,6 +232,8 @@ public class GeoEnrichIPRecord extends AbstractEnrichIP {
             }
 
             String rawIpPath = 
context.getProperty(IP_RECORD_PATH).evaluateAttributeExpressions(input).getValue();
+            final MessageLogLevel logLevel = 
MessageLogLevel.valueOf(context.getProperty(LOG_LEVEL).evaluateAttributeExpressions(input).getValue().toUpperCase());
+
             RecordPath ipPath = cache.getCompiled(rawIpPath);
 
             RecordReader reader = readerFactory.createRecordReader(input, is, 
getLogger());
@@ -249,7 +252,7 @@ public class GeoEnrichIPRecord extends AbstractEnrichIP {
             int notFoundCount = 0;
             while ((record = reader.nextRecord()) != null) {
                 CityResponse response;
-                response = geocode(ipPath, record, dbReader);
+                response = geocode(ipPath, record, dbReader, logLevel);
                 boolean wasEnriched = enrichRecord(response, record, paths);
                 if (wasEnriched) {
                     targetRelationship = REL_FOUND;
@@ -314,7 +317,7 @@ public class GeoEnrichIPRecord extends AbstractEnrichIP {
         return retVal;
     }
 
-    private CityResponse geocode(RecordPath ipPath, Record record, 
DatabaseReader reader) throws Exception {
+    private CityResponse geocode(RecordPath ipPath, Record record, 
DatabaseReader reader, MessageLogLevel logLevel) throws Exception {
         RecordPathResult result = ipPath.evaluate(record);
         Optional<FieldValue> ipField = result.getSelectedFields().findFirst();
         if (ipField.isPresent()) {
@@ -326,7 +329,28 @@ public class GeoEnrichIPRecord extends AbstractEnrichIP {
             String realValue = val.toString();
             InetAddress address = InetAddress.getByName(realValue);
 
-            return reader.city(address);
+            try {
+                return reader.city(address);
+            } catch (AddressNotFoundException anfe) {
+
+                switch (logLevel) {
+                    case INFO:
+                        getLogger().info("Address not found in the database", 
anfe);
+                        break;
+                    case WARN:
+                        getLogger().warn("Address not found in the database", 
anfe);
+                        break;
+                    case ERROR:
+                        getLogger().error("Address not found in the database", 
anfe);
+                        break;
+                    case DEBUG:
+                    default:
+                        getLogger().debug("Address not found in the database", 
anfe);
+                        break;
+                }
+
+                return null;
+            }
         } else {
             return null;
         }
diff --git 
a/nifi-nar-bundles/nifi-enrich-bundle/nifi-enrich-processors/src/test/java/org/apache/nifi/processors/TestGeoEnrichIPRecord.java
 
b/nifi-nar-bundles/nifi-enrich-bundle/nifi-enrich-processors/src/test/java/org/apache/nifi/processors/TestGeoEnrichIPRecord.java
index c7aebb00d1..20a39098b9 100644
--- 
a/nifi-nar-bundles/nifi-enrich-bundle/nifi-enrich-processors/src/test/java/org/apache/nifi/processors/TestGeoEnrichIPRecord.java
+++ 
b/nifi-nar-bundles/nifi-enrich-bundle/nifi-enrich-processors/src/test/java/org/apache/nifi/processors/TestGeoEnrichIPRecord.java
@@ -55,8 +55,10 @@ import static org.mockito.Mockito.mock;
 import static org.mockito.Mockito.when;
 
 public class TestGeoEnrichIPRecord {
+
     private TestRunner runner;
     private DatabaseReader reader;
+
     @BeforeEach
     public void setup() throws Exception {
         reader = mock(DatabaseReader.class);
@@ -96,6 +98,7 @@ public class TestGeoEnrichIPRecord {
         runner.setProperty(GeoEnrichIPRecord.GEO_POSTAL_CODE, 
"/geo/country_postal");
         runner.setProperty(GeoEnrichIPRecord.GEO_LATITUDE, "/geo/lat");
         runner.setProperty(GeoEnrichIPRecord.GEO_LONGITUDE, "/geo/lon");
+        runner.setProperty(AbstractEnrichIP.LOG_LEVEL, "WARN");
         runner.assertValid();
     }
 
@@ -129,7 +132,7 @@ public class TestGeoEnrichIPRecord {
         byte[] raw = runner.getContentAsByteArray(ff);
         String content = new String(raw);
         ObjectMapper mapper = new ObjectMapper();
-        List<Map<String, Object>> result = (List<Map<String, 
Object>>)mapper.readValue(content, List.class);
+        List<Map<String, Object>> result = mapper.readValue(content, 
List.class);
 
         assertNotNull(result);
         assertEquals(1, result.size());
@@ -152,9 +155,11 @@ public class TestGeoEnrichIPRecord {
         @Override
         protected List<PropertyDescriptor> getSupportedPropertyDescriptors() {
             return Collections.unmodifiableList(Arrays.asList(
-                    READER, WRITER, IP_RECORD_PATH, SPLIT_FOUND_NOT_FOUND, 
GEO_CITY, GEO_LATITUDE, GEO_LONGITUDE, GEO_COUNTRY, GEO_COUNTRY_ISO, 
GEO_POSTAL_CODE
+                    READER, WRITER, IP_RECORD_PATH, SPLIT_FOUND_NOT_FOUND, 
GEO_CITY, GEO_LATITUDE, GEO_LONGITUDE,
+                    GEO_COUNTRY, GEO_COUNTRY_ISO, GEO_POSTAL_CODE, LOG_LEVEL
             ));
         }
+        @Override
         @OnScheduled
         public void onScheduled(ProcessContext context) {
             databaseReaderRef.set(reader);
@@ -162,6 +167,7 @@ public class TestGeoEnrichIPRecord {
             writerFactory = 
context.getProperty(WRITER).asControllerService(RecordSetWriterFactory.class);
             splitOutput = 
context.getProperty(SPLIT_FOUND_NOT_FOUND).asBoolean();
         }
+        @Override
         protected void loadDatabaseFile() {
             //  Do nothing, the mock database reader is used
         }

Reply via email to