Repository: incubator-ranger Updated Branches: refs/heads/master 97f1d5d05 -> f9fb61102
Ranger-466: PolicyRefresher should timeout when Ranger Admin is non responsive and should use local cache for policy enforcement if present -Add on Changes Project: http://git-wip-us.apache.org/repos/asf/incubator-ranger/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-ranger/commit/f9fb6110 Tree: http://git-wip-us.apache.org/repos/asf/incubator-ranger/tree/f9fb6110 Diff: http://git-wip-us.apache.org/repos/asf/incubator-ranger/diff/f9fb6110 Branch: refs/heads/master Commit: f9fb6110274803c949b6262270b288b0274aa299 Parents: 97f1d5d Author: rmani <rm...@hortonworks.com> Authored: Thu May 14 15:12:05 2015 -0700 Committer: rmani <rm...@hortonworks.com> Committed: Thu May 14 15:12:05 2015 -0700 ---------------------------------------------------------------------- .../ranger/plugin/util/PolicyRefresher.java | 181 +++++++++++-------- 1 file changed, 104 insertions(+), 77 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/incubator-ranger/blob/f9fb6110/agents-common/src/main/java/org/apache/ranger/plugin/util/PolicyRefresher.java ---------------------------------------------------------------------- diff --git a/agents-common/src/main/java/org/apache/ranger/plugin/util/PolicyRefresher.java b/agents-common/src/main/java/org/apache/ranger/plugin/util/PolicyRefresher.java index 341a65e..0729339 100644 --- a/agents-common/src/main/java/org/apache/ranger/plugin/util/PolicyRefresher.java +++ b/agents-common/src/main/java/org/apache/ranger/plugin/util/PolicyRefresher.java @@ -45,9 +45,9 @@ public class PolicyRefresher extends Thread { private final String cacheFile; private final Gson gson; - private long pollingIntervalMs = 30 * 1000; - private long lastKnownVersion = -1; - private boolean policyCacheLoadedOnce = false; + private long pollingIntervalMs = 30 * 1000; + private long lastKnownVersion = -1; + private boolean policiesSetInPlugin = false; public PolicyRefresher(RangerBasePlugin plugIn, String serviceType, String appId, String serviceName, RangerAdminClient rangerAdmin, long pollingIntervalMs, String cacheDir) { @@ -128,6 +128,9 @@ public class PolicyRefresher extends Thread { public void startRefresher() { + + loadPolicy(); + super.start(); } @@ -142,122 +145,146 @@ public class PolicyRefresher extends Thread { } public void run() { - boolean loadFromCacheFlag = false; if(LOG.isDebugEnabled()) { LOG.debug("==> PolicyRefresher(serviceName=" + serviceName + ").run()"); } while(true) { + loadPolicy(); try { - ServicePolicies svcPolicies = rangerAdmin.getServicePoliciesIfUpdated(lastKnownVersion); + Thread.sleep(pollingIntervalMs); + } catch(InterruptedException excp) { + LOG.info("PolicyRefresher(serviceName=" + serviceName + ").run(): interrupted! Exiting thread", excp); + break; + } + } - boolean isUpdated = svcPolicies != null; + if(LOG.isDebugEnabled()) { + LOG.debug("<== PolicyRefresher(serviceName=" + serviceName + ").run()"); + } + } - if(isUpdated) { - long newVersion = svcPolicies.getPolicyVersion() == null ? -1 : svcPolicies.getPolicyVersion().longValue(); + private void loadPolicy() { - if(!StringUtils.equals(serviceName, svcPolicies.getServiceName())) { - LOG.warn("PolicyRefresher(serviceName=" + serviceName + "): ignoring unexpected serviceName '" + svcPolicies.getServiceName() + "' in service-store"); + if(LOG.isDebugEnabled()) { + LOG.debug("==> PolicyRefresher(serviceName=" + serviceName + ").loadPolicy()"); + } - svcPolicies.setServiceName(serviceName); - } + //load policy from PolicyAmdin + ServicePolicies svcPolicies = loadPolicyfromPolicyAdmin(); + + if ( svcPolicies == null) { + //if Policy fetch from Policy Admin Fails, load from cache + if (!policiesSetInPlugin) { + svcPolicies = loadFromCache(); + } + } else { + saveToCache(svcPolicies); + } - LOG.info("PolicyRefresher(serviceName=" + serviceName + "): found updated version. lastKnownVersion=" + lastKnownVersion + "; newVersion=" + newVersion); + if (svcPolicies != null) { + plugIn.setPolicies(svcPolicies); + policiesSetInPlugin = true; + } - saveToCache(svcPolicies); + if(LOG.isDebugEnabled()) { + LOG.debug("<== PolicyRefresher(serviceName=" + serviceName + ").loadPolicy()"); + } + } - lastKnownVersion = newVersion; + private ServicePolicies loadPolicyfromPolicyAdmin() { - plugIn.setPolicies(svcPolicies); - } else { - if(LOG.isDebugEnabled()) { - LOG.debug("PolicyRefresher(serviceName=" + serviceName + ").run(): no update found. lastKnownVersion=" + lastKnownVersion); - } - } + if(LOG.isDebugEnabled()) { + LOG.debug("==> PolicyRefresher(serviceName=" + serviceName + ").loadPolicyfromPolicyAdmin()"); + } + + ServicePolicies svcPolicies = null; + + try { + svcPolicies = rangerAdmin.getServicePoliciesIfUpdated(lastKnownVersion); + + boolean isUpdated = svcPolicies != null; - loadFromCacheFlag = false; + if(isUpdated) { + long newVersion = svcPolicies.getPolicyVersion() == null ? -1 : svcPolicies.getPolicyVersion().longValue(); - policyCacheLoadedOnce = false; + if(!StringUtils.equals(serviceName, svcPolicies.getServiceName())) { + LOG.warn("PolicyRefresher(serviceName=" + serviceName + "): ignoring unexpected serviceName '" + svcPolicies.getServiceName() + "' in service-store"); - } catch(Exception excp) { - loadFromCacheFlag = true; - LOG.error("PolicyRefresher(serviceName=" + serviceName + "): failed to refresh policies. Will continue to use last known version of policies (" + lastKnownVersion + ")", excp); - } finally { - if (loadFromCacheFlag && !policyCacheLoadedOnce) { - //If ConnectionTime or PolicyAdmin down, fetch the Policy from Local Cache and load - LOG.info("PolicyRefresher(serviceName=" + serviceName + "): failed to refresh policy from Policy Manager. Loading Policies from local Cache. lastKnownVersion=" + lastKnownVersion); - loadFromCache(); - policyCacheLoadedOnce = true; + svcPolicies.setServiceName(serviceName); } - } - try { - Thread.sleep(pollingIntervalMs); - } catch(InterruptedException excp) { - LOG.info("PolicyRefresher(serviceName=" + serviceName + ").run(): interrupted! Exiting thread", excp); + LOG.info("PolicyRefresher(serviceName=" + serviceName + "): found updated version. lastKnownVersion=" + lastKnownVersion + "; newVersion=" + newVersion); - break; + lastKnownVersion = newVersion; + + } else { + if(LOG.isDebugEnabled()) { + LOG.debug("PolicyRefresher(serviceName=" + serviceName + ").run(): no update found. lastKnownVersion=" + lastKnownVersion); + } } - } + } catch(Exception excp) { + LOG.error("PolicyRefresher(serviceName=" + serviceName + "): failed to refresh policies. Will continue to use last known version of policies (" + lastKnownVersion + ")", excp); + } - if(LOG.isDebugEnabled()) { - LOG.debug("<== PolicyRefresher(serviceName=" + serviceName + ").run()"); - } + if(LOG.isDebugEnabled()) { + LOG.debug("<== PolicyRefresher(serviceName=" + serviceName + ").loadPolicyfromPolicyAdmin()"); + } + + return svcPolicies; } - private void loadFromCache() { - if(LOG.isDebugEnabled()) { - LOG.debug("==> PolicyRefresher(serviceName=" + serviceName + ").loadFromCache()"); - } - RangerBasePlugin plugIn = this.plugIn; + private ServicePolicies loadFromCache() { - if(plugIn != null) { - File cacheFile = StringUtils.isEmpty(this.cacheFile) ? null : new File(this.cacheFile); + ServicePolicies policies = null; - if(cacheFile != null && cacheFile.isFile() && cacheFile.canRead()) { - Reader reader = null; + if(LOG.isDebugEnabled()) { + LOG.debug("==> PolicyRefresher(serviceName=" + serviceName + ").loadFromCache()"); + } - try { - reader = new FileReader(cacheFile); + File cacheFile = StringUtils.isEmpty(this.cacheFile) ? null : new File(this.cacheFile); - ServicePolicies policies = gson.fromJson(reader, ServicePolicies.class); + if(cacheFile != null && cacheFile.isFile() && cacheFile.canRead()) { + Reader reader = null; - if(policies != null) { - if(!StringUtils.equals(serviceName, policies.getServiceName())) { - LOG.warn("ignoring unexpected serviceName '" + policies.getServiceName() + "' in cache file '" + cacheFile.getAbsolutePath() + "'"); + try { + reader = new FileReader(cacheFile); - policies.setServiceName(serviceName); - } + policies = gson.fromJson(reader, ServicePolicies.class); - lastKnownVersion = policies.getPolicyVersion() == null ? -1 : policies.getPolicyVersion().longValue(); + if(policies != null) { + if(!StringUtils.equals(serviceName, policies.getServiceName())) { + LOG.warn("ignoring unexpected serviceName '" + policies.getServiceName() + "' in cache file '" + cacheFile.getAbsolutePath() + "'"); - plugIn.setPolicies(policies); - } - } catch (Exception excp) { - LOG.error("failed to load policies from cache file " + cacheFile.getAbsolutePath(), excp); - } finally { - if(reader != null) { - try { - reader.close(); - } catch(Exception excp) { - LOG.error("error while closing opened cache file " + cacheFile.getAbsolutePath(), excp); - } + policies.setServiceName(serviceName); } - } - } else { - LOG.warn("cache file does not exist or not readble '" + (cacheFile == null ? null : cacheFile.getAbsolutePath()) + "'"); - } + + lastKnownVersion = policies.getPolicyVersion() == null ? -1 : policies.getPolicyVersion().longValue(); + } + } catch (Exception excp) { + LOG.error("failed to load policies from cache file " + cacheFile.getAbsolutePath(), excp); + } finally { + if(reader != null) { + try { + reader.close(); + } catch(Exception excp) { + LOG.error("error while closing opened cache file " + cacheFile.getAbsolutePath(), excp); + } + } + } } else { - LOG.warn("policyEngine is null"); + LOG.warn("cache file does not exist or not readble '" + (cacheFile == null ? null : cacheFile.getAbsolutePath()) + "'"); } if(LOG.isDebugEnabled()) { LOG.debug("<== PolicyRefresher(serviceName=" + serviceName + ").loadFromCache()"); } - } + return policies; + } + private void saveToCache(ServicePolicies policies) { if(LOG.isDebugEnabled()) { LOG.debug("==> PolicyRefresher(serviceName=" + serviceName + ").saveToCache()");