Repository: incubator-ranger
Updated Branches:
  refs/heads/master 97f1d5d05 -> f9fb61102


 Ranger-466: PolicyRefresher should timeout when Ranger Admin is non responsive 
and should use local cache for policy enforcement if present -Add on Changes


Project: http://git-wip-us.apache.org/repos/asf/incubator-ranger/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-ranger/commit/f9fb6110
Tree: http://git-wip-us.apache.org/repos/asf/incubator-ranger/tree/f9fb6110
Diff: http://git-wip-us.apache.org/repos/asf/incubator-ranger/diff/f9fb6110

Branch: refs/heads/master
Commit: f9fb6110274803c949b6262270b288b0274aa299
Parents: 97f1d5d
Author: rmani <rm...@hortonworks.com>
Authored: Thu May 14 15:12:05 2015 -0700
Committer: rmani <rm...@hortonworks.com>
Committed: Thu May 14 15:12:05 2015 -0700

----------------------------------------------------------------------
 .../ranger/plugin/util/PolicyRefresher.java     | 181 +++++++++++--------
 1 file changed, 104 insertions(+), 77 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-ranger/blob/f9fb6110/agents-common/src/main/java/org/apache/ranger/plugin/util/PolicyRefresher.java
----------------------------------------------------------------------
diff --git 
a/agents-common/src/main/java/org/apache/ranger/plugin/util/PolicyRefresher.java
 
b/agents-common/src/main/java/org/apache/ranger/plugin/util/PolicyRefresher.java
index 341a65e..0729339 100644
--- 
a/agents-common/src/main/java/org/apache/ranger/plugin/util/PolicyRefresher.java
+++ 
b/agents-common/src/main/java/org/apache/ranger/plugin/util/PolicyRefresher.java
@@ -45,9 +45,9 @@ public class PolicyRefresher extends Thread {
        private final String            cacheFile;
        private final Gson              gson;
 
-       private long    pollingIntervalMs     = 30 * 1000;
-       private long    lastKnownVersion          = -1;
-       private boolean policyCacheLoadedOnce = false;
+       private long    pollingIntervalMs   = 30 * 1000;
+       private long    lastKnownVersion    = -1;
+       private boolean policiesSetInPlugin = false;
 
 
        public PolicyRefresher(RangerBasePlugin plugIn, String serviceType, 
String appId, String serviceName, RangerAdminClient rangerAdmin, long 
pollingIntervalMs, String cacheDir) {
@@ -128,6 +128,9 @@ public class PolicyRefresher extends Thread {
 
 
        public void startRefresher() {
+
+               loadPolicy();
+
                super.start();
        }
 
@@ -142,122 +145,146 @@ public class PolicyRefresher extends Thread {
        }
 
        public void run() {
-               boolean loadFromCacheFlag = false;
 
                if(LOG.isDebugEnabled()) {
                        LOG.debug("==> PolicyRefresher(serviceName=" + 
serviceName + ").run()");
                }
 
                while(true) {
+                       loadPolicy();
                        try {
-                               ServicePolicies svcPolicies = 
rangerAdmin.getServicePoliciesIfUpdated(lastKnownVersion);
+                               Thread.sleep(pollingIntervalMs);
+                       } catch(InterruptedException excp) {
+                               LOG.info("PolicyRefresher(serviceName=" + 
serviceName + ").run(): interrupted! Exiting thread", excp);
+                               break;
+                       }
+               }
 
-                               boolean isUpdated = svcPolicies != null;
+               if(LOG.isDebugEnabled()) {
+                       LOG.debug("<== PolicyRefresher(serviceName=" + 
serviceName + ").run()");
+               }
+       }
 
-                               if(isUpdated) {
-                                       long newVersion = 
svcPolicies.getPolicyVersion() == null ? -1 : 
svcPolicies.getPolicyVersion().longValue();
+       private void loadPolicy() {
 
-                               if(!StringUtils.equals(serviceName, 
svcPolicies.getServiceName())) {
-                                       LOG.warn("PolicyRefresher(serviceName=" 
+ serviceName + "): ignoring unexpected serviceName '" + 
svcPolicies.getServiceName() + "' in service-store");
+               if(LOG.isDebugEnabled()) {
+                       LOG.debug("==> PolicyRefresher(serviceName=" + 
serviceName + ").loadPolicy()");
+               }
 
-                                       svcPolicies.setServiceName(serviceName);
-                               }
+               //load policy from PolicyAmdin
+               ServicePolicies svcPolicies = loadPolicyfromPolicyAdmin();
+
+               if ( svcPolicies == null) {
+                 //if Policy fetch from Policy Admin Fails, load from cache
+                 if (!policiesSetInPlugin) {
+                          svcPolicies = loadFromCache();
+                       }
+               } else {
+                       saveToCache(svcPolicies);
+               }
 
-                                       LOG.info("PolicyRefresher(serviceName=" 
+ serviceName + "): found updated version. lastKnownVersion=" + 
lastKnownVersion + "; newVersion=" + newVersion);
+               if (svcPolicies != null) {
+                       plugIn.setPolicies(svcPolicies);
+                       policiesSetInPlugin = true;
+               }
 
-                                       saveToCache(svcPolicies);
+               if(LOG.isDebugEnabled()) {
+                       LOG.debug("<== PolicyRefresher(serviceName=" + 
serviceName + ").loadPolicy()");
+               }
+       }
 
-                               lastKnownVersion = newVersion;
+       private ServicePolicies loadPolicyfromPolicyAdmin() { 
 
-                                       plugIn.setPolicies(svcPolicies);
-                               } else {
-                                       if(LOG.isDebugEnabled()) {
-                                               
LOG.debug("PolicyRefresher(serviceName=" + serviceName + ").run(): no update 
found. lastKnownVersion=" + lastKnownVersion);
-                                       }
-                               }
+               if(LOG.isDebugEnabled()) {
+                       LOG.debug("==> PolicyRefresher(serviceName=" + 
serviceName + ").loadPolicyfromPolicyAdmin()");
+               }
+
+               ServicePolicies svcPolicies = null;
+
+               try {
+                       svcPolicies = 
rangerAdmin.getServicePoliciesIfUpdated(lastKnownVersion);
+
+                       boolean isUpdated = svcPolicies != null;
 
-                               loadFromCacheFlag         = false;
+                       if(isUpdated) {
+                               long newVersion = 
svcPolicies.getPolicyVersion() == null ? -1 : 
svcPolicies.getPolicyVersion().longValue();
 
-                               policyCacheLoadedOnce = false;
+                               if(!StringUtils.equals(serviceName, 
svcPolicies.getServiceName())) {
+                                       LOG.warn("PolicyRefresher(serviceName=" 
+ serviceName + "): ignoring unexpected serviceName '" + 
svcPolicies.getServiceName() + "' in service-store");
 
-                       } catch(Exception excp) {
-                               loadFromCacheFlag = true;
-                               LOG.error("PolicyRefresher(serviceName=" + 
serviceName + "): failed to refresh policies. Will continue to use last known 
version of policies (" + lastKnownVersion + ")", excp);
-                       } finally {
-                               if (loadFromCacheFlag && 
!policyCacheLoadedOnce) {
-                                       //If ConnectionTime or PolicyAdmin 
down, fetch the Policy from Local Cache and load
-                                       LOG.info("PolicyRefresher(serviceName=" 
+ serviceName + "): failed to refresh policy from Policy Manager. Loading 
Policies from local Cache. lastKnownVersion=" + lastKnownVersion);
-                                       loadFromCache();
-                                       policyCacheLoadedOnce = true;
+                                       svcPolicies.setServiceName(serviceName);
                                }
-                       }
 
-                       try {
-                               Thread.sleep(pollingIntervalMs);
-                       } catch(InterruptedException excp) {
-                               LOG.info("PolicyRefresher(serviceName=" + 
serviceName + ").run(): interrupted! Exiting thread", excp);
+                               LOG.info("PolicyRefresher(serviceName=" + 
serviceName + "): found updated version. lastKnownVersion=" + lastKnownVersion 
+ "; newVersion=" + newVersion);
 
-                               break;
+                               lastKnownVersion = newVersion;
+
+                       } else {
+                               if(LOG.isDebugEnabled()) {
+                                       
LOG.debug("PolicyRefresher(serviceName=" + serviceName + ").run(): no update 
found. lastKnownVersion=" + lastKnownVersion);
+                               }
                        }
-               }
+                } catch(Exception excp) {
+                       LOG.error("PolicyRefresher(serviceName=" + serviceName 
+ "): failed to refresh policies. Will continue to use last known version of 
policies (" + lastKnownVersion + ")", excp);
+                }
 
-               if(LOG.isDebugEnabled()) {
-                       LOG.debug("<== PolicyRefresher(serviceName=" + 
serviceName + ").run()");
-               }
+                if(LOG.isDebugEnabled()) {
+                       LOG.debug("<== PolicyRefresher(serviceName=" + 
serviceName + ").loadPolicyfromPolicyAdmin()");
+                }
+
+                return svcPolicies;
        }
 
-       private void loadFromCache() {
-               if(LOG.isDebugEnabled()) {
-                       LOG.debug("==> PolicyRefresher(serviceName=" + 
serviceName + ").loadFromCache()");
-               }
 
-               RangerBasePlugin plugIn = this.plugIn;
+       private ServicePolicies loadFromCache() {
 
-               if(plugIn != null) {
-               File cacheFile = StringUtils.isEmpty(this.cacheFile) ? null : 
new File(this.cacheFile);
+               ServicePolicies policies = null;
 
-               if(cacheFile != null && cacheFile.isFile() && 
cacheFile.canRead()) {
-                       Reader reader = null;
+               if(LOG.isDebugEnabled()) {
+                       LOG.debug("==> PolicyRefresher(serviceName=" + 
serviceName + ").loadFromCache()");
+               }
 
-                       try {
-                               reader = new FileReader(cacheFile);
+               File cacheFile = StringUtils.isEmpty(this.cacheFile) ? null : 
new File(this.cacheFile);
 
-                               ServicePolicies policies = 
gson.fromJson(reader, ServicePolicies.class);
+       if(cacheFile != null && cacheFile.isFile() && cacheFile.canRead()) {
+               Reader reader = null;
 
-                               if(policies != null) {
-                                       if(!StringUtils.equals(serviceName, 
policies.getServiceName())) {
-                                               LOG.warn("ignoring unexpected 
serviceName '" + policies.getServiceName() + "' in cache file '" + 
cacheFile.getAbsolutePath() + "'");
+               try {
+                       reader = new FileReader(cacheFile);
 
-                                               
policies.setServiceName(serviceName);
-                                       }
+                       policies = gson.fromJson(reader, ServicePolicies.class);
 
-                                       lastKnownVersion = 
policies.getPolicyVersion() == null ? -1 : 
policies.getPolicyVersion().longValue();
+                       if(policies != null) {
+                               if(!StringUtils.equals(serviceName, 
policies.getServiceName())) {
+                                       LOG.warn("ignoring unexpected 
serviceName '" + policies.getServiceName() + "' in cache file '" + 
cacheFile.getAbsolutePath() + "'");
 
-                                       plugIn.setPolicies(policies);
-                               }
-                       } catch (Exception excp) {
-                               LOG.error("failed to load policies from cache 
file " + cacheFile.getAbsolutePath(), excp);
-                       } finally {
-                               if(reader != null) {
-                                       try {
-                                               reader.close();
-                                       } catch(Exception excp) {
-                                               LOG.error("error while closing 
opened cache file " + cacheFile.getAbsolutePath(), excp);
-                                       }
+                                       policies.setServiceName(serviceName);
                                }
-                       }
-                       } else {
-                               LOG.warn("cache file does not exist or not 
readble '" + (cacheFile == null ? null : cacheFile.getAbsolutePath()) + "'");
-                       }
+
+                               lastKnownVersion = policies.getPolicyVersion() 
== null ? -1 : policies.getPolicyVersion().longValue();
+                        }
+               } catch (Exception excp) {
+                       LOG.error("failed to load policies from cache file " + 
cacheFile.getAbsolutePath(), excp);
+               } finally {
+                       if(reader != null) {
+                               try {
+                                       reader.close();
+                               } catch(Exception excp) {
+                                       LOG.error("error while closing opened 
cache file " + cacheFile.getAbsolutePath(), excp);
+                               }
+                       }
+               }
                } else {
-                       LOG.warn("policyEngine is null");
+                       LOG.warn("cache file does not exist or not readble '" + 
(cacheFile == null ? null : cacheFile.getAbsolutePath()) + "'");
                }
 
                if(LOG.isDebugEnabled()) {
                        LOG.debug("<== PolicyRefresher(serviceName=" + 
serviceName + ").loadFromCache()");
                }
-       }
 
+               return policies;
+       }
+       
        private void saveToCache(ServicePolicies policies) {
                if(LOG.isDebugEnabled()) {
                        LOG.debug("==> PolicyRefresher(serviceName=" + 
serviceName + ").saveToCache()");

Reply via email to