Repository: nifi
Updated Branches:
  refs/heads/master c49933f03 -> 71cd497fe


NIFI-3942 Making IPLookupService reload the database file on the fly when 
detecting the file has changed


Project: http://git-wip-us.apache.org/repos/asf/nifi/repo
Commit: http://git-wip-us.apache.org/repos/asf/nifi/commit/f35e0ecd
Tree: http://git-wip-us.apache.org/repos/asf/nifi/tree/f35e0ecd
Diff: http://git-wip-us.apache.org/repos/asf/nifi/diff/f35e0ecd

Branch: refs/heads/master
Commit: f35e0ecdd0e0fd8940b1caa688405d2128f8eb61
Parents: c49933f
Author: Bryan Bende <bbe...@apache.org>
Authored: Fri May 19 15:06:48 2017 -0400
Committer: Mark Payne <marka...@hotmail.com>
Committed: Mon May 22 09:06:42 2017 -0400

----------------------------------------------------------------------
 .../nifi/lookup/maxmind/IPLookupService.java    | 135 +++++++++++++++----
 1 file changed, 109 insertions(+), 26 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/nifi/blob/f35e0ecd/nifi-nar-bundles/nifi-standard-services/nifi-lookup-services-bundle/nifi-lookup-services/src/main/java/org/apache/nifi/lookup/maxmind/IPLookupService.java
----------------------------------------------------------------------
diff --git 
a/nifi-nar-bundles/nifi-standard-services/nifi-lookup-services-bundle/nifi-lookup-services/src/main/java/org/apache/nifi/lookup/maxmind/IPLookupService.java
 
b/nifi-nar-bundles/nifi-standard-services/nifi-lookup-services-bundle/nifi-lookup-services/src/main/java/org/apache/nifi/lookup/maxmind/IPLookupService.java
index 1ac6b36..88c611e 100644
--- 
a/nifi-nar-bundles/nifi-standard-services/nifi-lookup-services-bundle/nifi-lookup-services/src/main/java/org/apache/nifi/lookup/maxmind/IPLookupService.java
+++ 
b/nifi-nar-bundles/nifi-standard-services/nifi-lookup-services-bundle/nifi-lookup-services/src/main/java/org/apache/nifi/lookup/maxmind/IPLookupService.java
@@ -17,16 +17,16 @@
 
 package org.apache.nifi.lookup.maxmind;
 
-import java.io.File;
-import java.io.IOException;
-import java.net.InetAddress;
-import java.util.ArrayList;
-import java.util.HashMap;
-import java.util.List;
-import java.util.Map;
-import java.util.Optional;
-import java.util.concurrent.TimeUnit;
-
+import com.maxmind.geoip2.model.AnonymousIpResponse;
+import com.maxmind.geoip2.model.CityResponse;
+import com.maxmind.geoip2.model.ConnectionTypeResponse;
+import com.maxmind.geoip2.model.ConnectionTypeResponse.ConnectionType;
+import com.maxmind.geoip2.model.DomainResponse;
+import com.maxmind.geoip2.model.IspResponse;
+import com.maxmind.geoip2.record.Country;
+import com.maxmind.geoip2.record.Location;
+import com.maxmind.geoip2.record.Subdivision;
+import org.apache.commons.codec.digest.DigestUtils;
 import org.apache.nifi.annotation.documentation.CapabilityDescription;
 import org.apache.nifi.annotation.documentation.Tags;
 import org.apache.nifi.annotation.lifecycle.OnEnabled;
@@ -41,15 +41,19 @@ import org.apache.nifi.serialization.record.MapRecord;
 import org.apache.nifi.serialization.record.Record;
 import org.apache.nifi.util.StopWatch;
 
-import com.maxmind.geoip2.model.AnonymousIpResponse;
-import com.maxmind.geoip2.model.CityResponse;
-import com.maxmind.geoip2.model.ConnectionTypeResponse;
-import com.maxmind.geoip2.model.ConnectionTypeResponse.ConnectionType;
-import com.maxmind.geoip2.model.DomainResponse;
-import com.maxmind.geoip2.model.IspResponse;
-import com.maxmind.geoip2.record.Country;
-import com.maxmind.geoip2.record.Location;
-import com.maxmind.geoip2.record.Subdivision;
+import java.io.File;
+import java.io.FileInputStream;
+import java.io.IOException;
+import java.io.InputStream;
+import java.net.InetAddress;
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.Optional;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.locks.Lock;
+import java.util.concurrent.locks.ReentrantLock;
 
 @Tags({"lookup", "enrich", "ip", "geo", "ipgeo", "maxmind", "isp", "domain", 
"cellular", "anonymous", "tor"})
 @CapabilityDescription("A lookup service that provides several types of 
enrichment information for IP addresses. The service is configured by providing 
a MaxMind "
@@ -57,7 +61,15 @@ import com.maxmind.geoip2.record.Subdivision;
     + "service to provide all of the available enrichment data may be slower 
than returning only a portion of the available enrichments. View the Usage of 
this component "
     + "and choose to view Additional Details for more information, such as the 
Schema that pertains to the information that is returned.")
 public class IPLookupService extends AbstractControllerService implements 
RecordLookupService {
+
+    private volatile String databaseFile = null;
     private volatile DatabaseReader databaseReader = null;
+    private volatile String databaseChecksum = null;
+    private volatile long databaseLastRefreshAttempt = -1;
+
+    private final Lock dbWriteLock = new ReentrantLock();
+
+    static final long REFRESH_THRESHOLD_MS = 5 * 60 * 1000;
 
     static final PropertyDescriptor GEO_DATABASE_FILE = new 
PropertyDescriptor.Builder()
         .name("database-file")
@@ -65,6 +77,7 @@ public class IPLookupService extends 
AbstractControllerService implements Record
         .description("Path to Maxmind IP Enrichment Database File")
         .required(true)
         .addValidator(StandardValidators.FILE_EXISTS_VALIDATOR)
+        .expressionLanguageSupported(true)
         .build();
     static final PropertyDescriptor LOOKUP_CITY = new 
PropertyDescriptor.Builder()
         .name("lookup-city")
@@ -128,13 +141,23 @@ public class IPLookupService extends 
AbstractControllerService implements Record
 
     @OnEnabled
     public void onEnabled(final ConfigurationContext context) throws 
IOException {
-        final String dbFileString = 
context.getProperty(GEO_DATABASE_FILE).getValue();
-        final File dbFile = new File(dbFileString);
-        final StopWatch stopWatch = new StopWatch(true);
-        final DatabaseReader reader = new 
DatabaseReader.Builder(dbFile).build();
-        stopWatch.stop();
-        getLogger().info("Completed loading of Maxmind Database.  Elapsed time 
was {} milliseconds.", new Object[] 
{stopWatch.getDuration(TimeUnit.MILLISECONDS)});
-        databaseReader = reader;
+        databaseFile = 
context.getProperty(GEO_DATABASE_FILE).evaluateAttributeExpressions().getValue();
+
+        final File dbFile = new File(databaseFile);
+        final String dbFileChecksum = getChecksum(dbFile);
+        loadDatabase(dbFile, dbFileChecksum);
+
+        // initialize the last refresh attempt to the time the service was 
enabled
+        databaseLastRefreshAttempt = System.currentTimeMillis();
+    }
+
+    private String getChecksum(final File file) throws IOException {
+        String fileChecksum;
+        try (final InputStream in = new FileInputStream(file)){
+            fileChecksum = DigestUtils.md5Hex(in);
+        }
+
+        return fileChecksum;
     }
 
     @OnStopped
@@ -143,6 +166,11 @@ public class IPLookupService extends 
AbstractControllerService implements Record
         if (reader != null) {
             reader.close();
         }
+
+        databaseFile = null;
+        databaseReader = null;
+        databaseChecksum = null;
+        databaseLastRefreshAttempt = -1;
     }
 
     @Override
@@ -151,6 +179,19 @@ public class IPLookupService extends 
AbstractControllerService implements Record
             return Optional.empty();
         }
 
+        // determine if we should attempt to refresh the database based on 
exceeding a certain amount of time since last refresh
+        if (shouldAttemptDatabaseRefresh()) {
+            try {
+                refreshDatabase();
+            } catch (IOException e) {
+                throw new LookupFailureException("Failed to refresh database 
file: " + e.getMessage(), e);
+            }
+        }
+
+        // assign to a local so we don't need a read lock, this way another 
thread can update the member variable reference
+        // while the current thread continues using the local reference
+        final DatabaseReader databaseReader = this.databaseReader;
+
         final InetAddress inetAddress;
         try {
             inetAddress = InetAddress.getByName(key);
@@ -239,6 +280,48 @@ public class IPLookupService extends 
AbstractControllerService implements Record
         return Optional.ofNullable(createContainerRecord(geoRecord, ispRecord, 
domainName, connectionType, anonymousIpRecord));
     }
 
+    // returns true if the reader was never initialized or if the database 
hasn't been updated in longer than our threshold
+    private boolean shouldAttemptDatabaseRefresh() {
+        return System.currentTimeMillis() - databaseLastRefreshAttempt >= 
REFRESH_THRESHOLD_MS;
+    }
+
+    private void refreshDatabase() throws IOException {
+        // since this is the only place the write lock is used, if something 
else has it then we know another thread is
+        // already refreshing the database so we can just move on if we don't 
get the lock, no need to block
+        if (dbWriteLock.tryLock()) {
+            try {
+                // now that we have the lock check again to make sure we still 
need to refresh
+                if (shouldAttemptDatabaseRefresh()) {
+                    final File dbFile = new File(databaseFile);
+                    final String dbFileChecksum = getChecksum(dbFile);
+                    if (!dbFileChecksum.equals(databaseChecksum)) {
+                        loadDatabase(dbFile, dbFileChecksum);
+                    } else {
+                        getLogger().debug("Checksum hasn't changed, database 
will not be reloaded");
+                    }
+
+                    // update the timestamp even if we didn't refresh so that 
we'll wait a full threshold again
+                    databaseLastRefreshAttempt = System.currentTimeMillis();
+                } else {
+                    getLogger().debug("Acquired write lock, but no longer need 
to reload the database");
+                }
+            } finally {
+                dbWriteLock.unlock();
+            }
+        } else {
+            getLogger().debug("Unable to acquire write lock, skipping reload 
of database");
+        }
+    }
+
+    private void loadDatabase(final File dbFile, final String dbFileChecksum) 
throws IOException {
+        final StopWatch stopWatch = new StopWatch(true);
+        final DatabaseReader reader = new 
DatabaseReader.Builder(dbFile).build();
+        stopWatch.stop();
+        getLogger().info("Completed loading of Maxmind Database.  Elapsed time 
was {} milliseconds.", new 
Object[]{stopWatch.getDuration(TimeUnit.MILLISECONDS)});
+        databaseReader = reader;
+        databaseChecksum = dbFileChecksum;
+    }
+
     private Record createRecord(final CityResponse city) {
         if (city == null) {
             return null;

Reply via email to