Repository: nifi Updated Branches: refs/heads/master c49933f03 -> 71cd497fe
NIFI-3942 Making IPLookupService reload the database file on the fly when detecting the file has changed Project: http://git-wip-us.apache.org/repos/asf/nifi/repo Commit: http://git-wip-us.apache.org/repos/asf/nifi/commit/f35e0ecd Tree: http://git-wip-us.apache.org/repos/asf/nifi/tree/f35e0ecd Diff: http://git-wip-us.apache.org/repos/asf/nifi/diff/f35e0ecd Branch: refs/heads/master Commit: f35e0ecdd0e0fd8940b1caa688405d2128f8eb61 Parents: c49933f Author: Bryan Bende <bbe...@apache.org> Authored: Fri May 19 15:06:48 2017 -0400 Committer: Mark Payne <marka...@hotmail.com> Committed: Mon May 22 09:06:42 2017 -0400 ---------------------------------------------------------------------- .../nifi/lookup/maxmind/IPLookupService.java | 135 +++++++++++++++---- 1 file changed, 109 insertions(+), 26 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/nifi/blob/f35e0ecd/nifi-nar-bundles/nifi-standard-services/nifi-lookup-services-bundle/nifi-lookup-services/src/main/java/org/apache/nifi/lookup/maxmind/IPLookupService.java ---------------------------------------------------------------------- diff --git a/nifi-nar-bundles/nifi-standard-services/nifi-lookup-services-bundle/nifi-lookup-services/src/main/java/org/apache/nifi/lookup/maxmind/IPLookupService.java b/nifi-nar-bundles/nifi-standard-services/nifi-lookup-services-bundle/nifi-lookup-services/src/main/java/org/apache/nifi/lookup/maxmind/IPLookupService.java index 1ac6b36..88c611e 100644 --- a/nifi-nar-bundles/nifi-standard-services/nifi-lookup-services-bundle/nifi-lookup-services/src/main/java/org/apache/nifi/lookup/maxmind/IPLookupService.java +++ b/nifi-nar-bundles/nifi-standard-services/nifi-lookup-services-bundle/nifi-lookup-services/src/main/java/org/apache/nifi/lookup/maxmind/IPLookupService.java @@ -17,16 +17,16 @@ package org.apache.nifi.lookup.maxmind; -import java.io.File; -import java.io.IOException; -import java.net.InetAddress; -import java.util.ArrayList; -import java.util.HashMap; -import java.util.List; -import java.util.Map; -import java.util.Optional; -import java.util.concurrent.TimeUnit; - +import com.maxmind.geoip2.model.AnonymousIpResponse; +import com.maxmind.geoip2.model.CityResponse; +import com.maxmind.geoip2.model.ConnectionTypeResponse; +import com.maxmind.geoip2.model.ConnectionTypeResponse.ConnectionType; +import com.maxmind.geoip2.model.DomainResponse; +import com.maxmind.geoip2.model.IspResponse; +import com.maxmind.geoip2.record.Country; +import com.maxmind.geoip2.record.Location; +import com.maxmind.geoip2.record.Subdivision; +import org.apache.commons.codec.digest.DigestUtils; import org.apache.nifi.annotation.documentation.CapabilityDescription; import org.apache.nifi.annotation.documentation.Tags; import org.apache.nifi.annotation.lifecycle.OnEnabled; @@ -41,15 +41,19 @@ import org.apache.nifi.serialization.record.MapRecord; import org.apache.nifi.serialization.record.Record; import org.apache.nifi.util.StopWatch; -import com.maxmind.geoip2.model.AnonymousIpResponse; -import com.maxmind.geoip2.model.CityResponse; -import com.maxmind.geoip2.model.ConnectionTypeResponse; -import com.maxmind.geoip2.model.ConnectionTypeResponse.ConnectionType; -import com.maxmind.geoip2.model.DomainResponse; -import com.maxmind.geoip2.model.IspResponse; -import com.maxmind.geoip2.record.Country; -import com.maxmind.geoip2.record.Location; -import com.maxmind.geoip2.record.Subdivision; +import java.io.File; +import java.io.FileInputStream; +import java.io.IOException; +import java.io.InputStream; +import java.net.InetAddress; +import java.util.ArrayList; +import java.util.HashMap; +import java.util.List; +import java.util.Map; +import java.util.Optional; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.locks.Lock; +import java.util.concurrent.locks.ReentrantLock; @Tags({"lookup", "enrich", "ip", "geo", "ipgeo", "maxmind", "isp", "domain", "cellular", "anonymous", "tor"}) @CapabilityDescription("A lookup service that provides several types of enrichment information for IP addresses. The service is configured by providing a MaxMind " @@ -57,7 +61,15 @@ import com.maxmind.geoip2.record.Subdivision; + "service to provide all of the available enrichment data may be slower than returning only a portion of the available enrichments. View the Usage of this component " + "and choose to view Additional Details for more information, such as the Schema that pertains to the information that is returned.") public class IPLookupService extends AbstractControllerService implements RecordLookupService { + + private volatile String databaseFile = null; private volatile DatabaseReader databaseReader = null; + private volatile String databaseChecksum = null; + private volatile long databaseLastRefreshAttempt = -1; + + private final Lock dbWriteLock = new ReentrantLock(); + + static final long REFRESH_THRESHOLD_MS = 5 * 60 * 1000; static final PropertyDescriptor GEO_DATABASE_FILE = new PropertyDescriptor.Builder() .name("database-file") @@ -65,6 +77,7 @@ public class IPLookupService extends AbstractControllerService implements Record .description("Path to Maxmind IP Enrichment Database File") .required(true) .addValidator(StandardValidators.FILE_EXISTS_VALIDATOR) + .expressionLanguageSupported(true) .build(); static final PropertyDescriptor LOOKUP_CITY = new PropertyDescriptor.Builder() .name("lookup-city") @@ -128,13 +141,23 @@ public class IPLookupService extends AbstractControllerService implements Record @OnEnabled public void onEnabled(final ConfigurationContext context) throws IOException { - final String dbFileString = context.getProperty(GEO_DATABASE_FILE).getValue(); - final File dbFile = new File(dbFileString); - final StopWatch stopWatch = new StopWatch(true); - final DatabaseReader reader = new DatabaseReader.Builder(dbFile).build(); - stopWatch.stop(); - getLogger().info("Completed loading of Maxmind Database. Elapsed time was {} milliseconds.", new Object[] {stopWatch.getDuration(TimeUnit.MILLISECONDS)}); - databaseReader = reader; + databaseFile = context.getProperty(GEO_DATABASE_FILE).evaluateAttributeExpressions().getValue(); + + final File dbFile = new File(databaseFile); + final String dbFileChecksum = getChecksum(dbFile); + loadDatabase(dbFile, dbFileChecksum); + + // initialize the last refresh attempt to the time the service was enabled + databaseLastRefreshAttempt = System.currentTimeMillis(); + } + + private String getChecksum(final File file) throws IOException { + String fileChecksum; + try (final InputStream in = new FileInputStream(file)){ + fileChecksum = DigestUtils.md5Hex(in); + } + + return fileChecksum; } @OnStopped @@ -143,6 +166,11 @@ public class IPLookupService extends AbstractControllerService implements Record if (reader != null) { reader.close(); } + + databaseFile = null; + databaseReader = null; + databaseChecksum = null; + databaseLastRefreshAttempt = -1; } @Override @@ -151,6 +179,19 @@ public class IPLookupService extends AbstractControllerService implements Record return Optional.empty(); } + // determine if we should attempt to refresh the database based on exceeding a certain amount of time since last refresh + if (shouldAttemptDatabaseRefresh()) { + try { + refreshDatabase(); + } catch (IOException e) { + throw new LookupFailureException("Failed to refresh database file: " + e.getMessage(), e); + } + } + + // assign to a local so we don't need a read lock, this way another thread can update the member variable reference + // while the current thread continues using the local reference + final DatabaseReader databaseReader = this.databaseReader; + final InetAddress inetAddress; try { inetAddress = InetAddress.getByName(key); @@ -239,6 +280,48 @@ public class IPLookupService extends AbstractControllerService implements Record return Optional.ofNullable(createContainerRecord(geoRecord, ispRecord, domainName, connectionType, anonymousIpRecord)); } + // returns true if the reader was never initialized or if the database hasn't been updated in longer than our threshold + private boolean shouldAttemptDatabaseRefresh() { + return System.currentTimeMillis() - databaseLastRefreshAttempt >= REFRESH_THRESHOLD_MS; + } + + private void refreshDatabase() throws IOException { + // since this is the only place the write lock is used, if something else has it then we know another thread is + // already refreshing the database so we can just move on if we don't get the lock, no need to block + if (dbWriteLock.tryLock()) { + try { + // now that we have the lock check again to make sure we still need to refresh + if (shouldAttemptDatabaseRefresh()) { + final File dbFile = new File(databaseFile); + final String dbFileChecksum = getChecksum(dbFile); + if (!dbFileChecksum.equals(databaseChecksum)) { + loadDatabase(dbFile, dbFileChecksum); + } else { + getLogger().debug("Checksum hasn't changed, database will not be reloaded"); + } + + // update the timestamp even if we didn't refresh so that we'll wait a full threshold again + databaseLastRefreshAttempt = System.currentTimeMillis(); + } else { + getLogger().debug("Acquired write lock, but no longer need to reload the database"); + } + } finally { + dbWriteLock.unlock(); + } + } else { + getLogger().debug("Unable to acquire write lock, skipping reload of database"); + } + } + + private void loadDatabase(final File dbFile, final String dbFileChecksum) throws IOException { + final StopWatch stopWatch = new StopWatch(true); + final DatabaseReader reader = new DatabaseReader.Builder(dbFile).build(); + stopWatch.stop(); + getLogger().info("Completed loading of Maxmind Database. Elapsed time was {} milliseconds.", new Object[]{stopWatch.getDuration(TimeUnit.MILLISECONDS)}); + databaseReader = reader; + databaseChecksum = dbFileChecksum; + } + private Record createRecord(final CityResponse city) { if (city == null) { return null;