This is an automated email from the ASF dual-hosted git repository.

madhan pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/ranger.git


The following commit(s) were added to refs/heads/master by this push:
     new 2da8a1f55 RANGER-4375: updated to log plugin activities asynchronously 
- #2
2da8a1f55 is described below

commit 2da8a1f55928ff57d8b4b9cefb8e82109943aa83
Author: Madhan Neethiraj <mad...@apache.org>
AuthorDate: Fri Aug 25 00:50:41 2023 -0700

    RANGER-4375: updated to log plugin activities asynchronously - #2
---
 .../main/java/org/apache/ranger/biz/AssetMgr.java  | 93 ++++++++++++----------
 .../ranger/service/RangerTransactionService.java   | 57 ++++++++++++-
 2 files changed, 108 insertions(+), 42 deletions(-)

diff --git a/security-admin/src/main/java/org/apache/ranger/biz/AssetMgr.java 
b/security-admin/src/main/java/org/apache/ranger/biz/AssetMgr.java
index 1dc3d372d..8bbeba783 100644
--- a/security-admin/src/main/java/org/apache/ranger/biz/AssetMgr.java
+++ b/security-admin/src/main/java/org/apache/ranger/biz/AssetMgr.java
@@ -44,7 +44,6 @@ import org.apache.ranger.common.AppConstants;
 import org.apache.ranger.common.DateUtil;
 import org.apache.ranger.common.JSONUtil;
 import org.apache.ranger.common.MessageEnums;
-import org.apache.ranger.common.PropertiesUtil;
 import org.apache.ranger.common.RangerCommonEnums;
 import org.apache.ranger.common.RangerConstants;
 import org.apache.ranger.common.SearchCriteria;
@@ -75,6 +74,9 @@ import org.springframework.stereotype.Component;
 
 @Component
 public class AssetMgr extends AssetMgrBase {
+       private static final String PROP_RANGER_LOG_SC_NOT_MODIFIED          = 
"ranger.log.SC_NOT_MODIFIED";
+       private static final String PROP_PLUGIN_ACTIVITY_AUDIT_NOT_MODIFIED  = 
"ranger.plugin.activity.audit.not.modified";
+       private static final String PROP_PLUGIN_ACTIVITY_AUDIT_COMMIT_INLINE = 
"ranger.plugin.activity.audit.commit.inline";
 
        @Autowired
        XPermMapService xPermMapService;
@@ -136,7 +138,9 @@ public class AssetMgr extends AssetMgrBase {
        @Autowired
        ServiceMgr serviceMgr;
 
-       boolean pluginActivityAuditCommitInline = false;
+       boolean rangerLogNotModified              = false;
+       boolean pluginActivityAuditLogNotModified = false;
+       boolean pluginActivityAuditCommitInline   = false;
 
        private static final Logger logger = 
LoggerFactory.getLogger(AssetMgr.class);
 
@@ -146,9 +150,13 @@ public class AssetMgr extends AssetMgrBase {
        public void init() {
                logger.info("==> AssetMgr.init()");
 
-               pluginActivityAuditCommitInline = 
RangerAdminConfig.getInstance().getBoolean("ranger.plugin.activity.audit.commit.inline",
 false);
+               rangerLogNotModified              = 
RangerAdminConfig.getInstance().getBoolean(PROP_RANGER_LOG_SC_NOT_MODIFIED, 
false);
+               pluginActivityAuditLogNotModified = 
RangerAdminConfig.getInstance().getBoolean(PROP_PLUGIN_ACTIVITY_AUDIT_NOT_MODIFIED,
 false);
+               pluginActivityAuditCommitInline   = 
RangerAdminConfig.getInstance().getBoolean(PROP_PLUGIN_ACTIVITY_AUDIT_COMMIT_INLINE,
 false);
 
-               logger.info("ranger.plugin.activity.audit.commit.inline={}", 
pluginActivityAuditCommitInline);
+               logger.info("{}={}", PROP_RANGER_LOG_SC_NOT_MODIFIED, 
rangerLogNotModified);
+               logger.info("{}={}", PROP_PLUGIN_ACTIVITY_AUDIT_NOT_MODIFIED, 
pluginActivityAuditLogNotModified);
+               logger.info("{}={}", PROP_PLUGIN_ACTIVITY_AUDIT_COMMIT_INLINE, 
pluginActivityAuditCommitInline);
 
                logger.info("<== AssetMgr.init()");
        }
@@ -662,13 +670,10 @@ public class AssetMgr extends AssetMgrBase {
        public void createPolicyAudit(final XXPolicyExportAudit 
xXPolicyExportAudit) {
                final Runnable commitWork;
                if (xXPolicyExportAudit.getHttpRetCode() == 
HttpServletResponse.SC_NOT_MODIFIED) {
-                       boolean logNotModified = 
PropertiesUtil.getBooleanProperty("ranger.log.SC_NOT_MODIFIED", false);
-                       if (!logNotModified) {
-                               commitWork = null;
+                       if (!rangerLogNotModified) {
+                               logger.debug("Not logging HttpServletResponse. 
SC_NOT_MODIFIED. To enable, set configuration: {}=true", 
PROP_RANGER_LOG_SC_NOT_MODIFIED);
 
-                               logger.debug("Not logging HttpServletResponse."
-                                               + "SC_NOT_MODIFIED, to enable, 
update "
-                                               + ": 
ranger.log.SC_NOT_MODIFIED");
+                               commitWork = null;
                        } else {
                                // Create PolicyExportAudit record after 
transaction is completed. If it is created in-line here
                                // then the TransactionManager will roll-back 
the changes because the HTTP return code is
@@ -762,34 +767,40 @@ public class AssetMgr extends AssetMgrBase {
                final Runnable commitWork;
 
                if (httpCode == HttpServletResponse.SC_NOT_MODIFIED) {
-                       // Create or update PluginInfo record after transaction 
is completed. If it is created in-line here
-                       // then the TransactionManager will roll-back the 
changes because the HTTP return code is
-                       // HttpServletResponse.SC_NOT_MODIFIED
-
-                       switch (entityType) {
-                               case RangerPluginInfo.ENTITY_TYPE_POLICIES:
-                                       isTagVersionResetNeeded = 
rangerDaoManager.getXXService().findAssociatedTagService(pluginInfo.getServiceName())
 == null;
-                                       break;
-                               case RangerPluginInfo.ENTITY_TYPE_TAGS:
-                                       isTagVersionResetNeeded = false;
-                                       break;
-                               case RangerPluginInfo.ENTITY_TYPE_ROLES:
-                                       isTagVersionResetNeeded = false;
-                                       break;
-                               case RangerPluginInfo.ENTITY_TYPE_USERSTORE:
-                                       isTagVersionResetNeeded = false;
-                                       break;
-                               default:
-                                       isTagVersionResetNeeded = false;
-                                       break;
-                       }
+                       if (!pluginActivityAuditLogNotModified) {
+                               logger.debug("Not logging HttpServletResponse. 
SC_NOT_MODIFIED. To enable, set configuration: {}=true", 
PROP_PLUGIN_ACTIVITY_AUDIT_NOT_MODIFIED);
 
-                       commitWork = new Runnable() {
-                               @Override
-                               public void run() {
-                                       
doCreateOrUpdateXXPluginInfo(pluginInfo, entityType, isTagVersionResetNeeded, 
clusterName);
+                               commitWork = null;
+                       } else {
+                               // Create or update PluginInfo record after 
transaction is completed. If it is created in-line here
+                               // then the TransactionManager will roll-back 
the changes because the HTTP return code is
+                               // HttpServletResponse.SC_NOT_MODIFIED
+
+                               switch (entityType) {
+                                       case 
RangerPluginInfo.ENTITY_TYPE_POLICIES:
+                                               isTagVersionResetNeeded = 
rangerDaoManager.getXXService().findAssociatedTagService(pluginInfo.getServiceName())
 == null;
+                                               break;
+                                       case RangerPluginInfo.ENTITY_TYPE_TAGS:
+                                               isTagVersionResetNeeded = false;
+                                               break;
+                                       case RangerPluginInfo.ENTITY_TYPE_ROLES:
+                                               isTagVersionResetNeeded = false;
+                                               break;
+                                       case 
RangerPluginInfo.ENTITY_TYPE_USERSTORE:
+                                               isTagVersionResetNeeded = false;
+                                               break;
+                                       default:
+                                               isTagVersionResetNeeded = false;
+                                               break;
                                }
-                       };
+
+                               commitWork = new Runnable() {
+                                       @Override
+                                       public void run() {
+                                               
doCreateOrUpdateXXPluginInfo(pluginInfo, entityType, isTagVersionResetNeeded, 
clusterName);
+                                       }
+                               };
+                       }
                } else if (httpCode == HttpServletResponse.SC_NOT_FOUND) {
                        if ((isPolicyDownloadRequest(entityType) && 
(pluginInfo.getPolicyActiveVersion() == null || 
pluginInfo.getPolicyActiveVersion() == -1))
                                        || (isTagDownloadRequest(entityType) && 
(pluginInfo.getTagActiveVersion() == null || pluginInfo.getTagActiveVersion() 
== -1))
@@ -820,10 +831,12 @@ public class AssetMgr extends AssetMgrBase {
                        };
                }
 
-               if (pluginActivityAuditCommitInline) {
-                       
transactionSynchronizationAdapter.executeOnTransactionCompletion(commitWork);
-               } else {
-                       
transactionSynchronizationAdapter.executeAsyncOnTransactionComplete(commitWork);
+               if (commitWork != null) {
+                       if (pluginActivityAuditCommitInline) {
+                               
transactionSynchronizationAdapter.executeOnTransactionCompletion(commitWork);
+                       } else {
+                               
transactionSynchronizationAdapter.executeAsyncOnTransactionComplete(commitWork);
+                       }
                }
 
                if (logger.isDebugEnabled()) {
diff --git 
a/security-admin/src/main/java/org/apache/ranger/service/RangerTransactionService.java
 
b/security-admin/src/main/java/org/apache/ranger/service/RangerTransactionService.java
index 49d07fcd0..0e7ae7daa 100644
--- 
a/security-admin/src/main/java/org/apache/ranger/service/RangerTransactionService.java
+++ 
b/security-admin/src/main/java/org/apache/ranger/service/RangerTransactionService.java
@@ -19,6 +19,7 @@
 
 package org.apache.ranger.service;
 
+import org.apache.ranger.authorization.hadoop.config.RangerAdminConfig;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 import org.springframework.beans.factory.annotation.Autowired;
@@ -35,22 +36,41 @@ import javax.annotation.PreDestroy;
 import java.util.concurrent.Executors;
 import java.util.concurrent.ScheduledExecutorService;
 import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicLong;
 
 import static java.util.concurrent.TimeUnit.MILLISECONDS;
 
 @Service
 public class RangerTransactionService {
+    private static final String PROP_THREADPOOL_SIZE          = 
"ranger.admin.transaction.service.threadpool.size";
+    private static final String PROP_SUMMARY_LOG_INTERVAL_SEC = 
"ranger.admin.transaction.service.summary.log.interval.sec";
+
     @Autowired
     @Qualifier(value = "transactionManager")
     PlatformTransactionManager txManager;
 
     private static final Logger LOG = 
LoggerFactory.getLogger(RangerTransactionService.class);
 
-    private ScheduledExecutorService scheduler = null;
+    private ScheduledExecutorService scheduler            = null;
+    private AtomicLong               scheduledTaskCount   = new AtomicLong(0);
+    private AtomicLong               executedTaskCount    = new AtomicLong(0);
+    private AtomicLong               failedTaskCount      = new AtomicLong(0);
+    private long                     summaryLogIntervalMs = 5 * 60 * 1000;
+    private long                     nextLogSummaryTime   = 
System.currentTimeMillis() + summaryLogIntervalMs;
 
     @PostConstruct
     public void init() {
-        scheduler = Executors.newScheduledThreadPool(1);
+        RangerAdminConfig config = RangerAdminConfig.getInstance();
+
+        int  numOfThreads          = config.getInt(PROP_THREADPOOL_SIZE, 1);
+        long summaryLogIntervalSec = 
config.getInt(PROP_SUMMARY_LOG_INTERVAL_SEC, 5 * 60);
+
+        scheduler            = Executors.newScheduledThreadPool(numOfThreads);
+        summaryLogIntervalMs = summaryLogIntervalSec * 1000;
+        nextLogSummaryTime   = System.currentTimeMillis() + 
summaryLogIntervalSec;
+
+        LOG.info("{}={}", PROP_THREADPOOL_SIZE, numOfThreads);
+        LOG.info("{}={}", PROP_SUMMARY_LOG_INTERVAL_SEC, 
summaryLogIntervalSec);
     }
 
     @PreDestroy
@@ -59,6 +79,8 @@ public class RangerTransactionService {
             LOG.info("attempt to shutdown RangerTransactionService");
             scheduler.shutdown();
             scheduler.awaitTermination(5, TimeUnit.SECONDS);
+
+            logSummary();
         }
         catch (InterruptedException e) {
             LOG.error("RangerTransactionService tasks interrupted");
@@ -90,16 +112,47 @@ public class RangerTransactionService {
                                 }
                             });
                         } catch (Exception e) {
+                            failedTaskCount.getAndIncrement();
+
                             LOG.error("Failed to commit TransactionService 
transaction", e);
                             LOG.error("Ignoring...");
+                        } finally {
+                            executedTaskCount.getAndIncrement();
+                            logSummaryIfNeeded();
                         }
                     }
                 }
             }, delayInMillis, MILLISECONDS);
+
+            scheduledTaskCount.getAndIncrement();
+
+            logSummaryIfNeeded();
         } catch (Exception e) {
             LOG.error("Failed to schedule TransactionService transaction:", e);
             LOG.error("Ignroing...");
         }
     }
 
+    private void logSummaryIfNeeded() {
+        long now = System.currentTimeMillis();
+
+        if (summaryLogIntervalMs > 0 && now > nextLogSummaryTime) {
+            synchronized (this) {
+                if (now > nextLogSummaryTime) {
+                    nextLogSummaryTime = now + summaryLogIntervalMs;
+
+                    logSummary();
+                }
+            }
+        }
+    }
+
+    private void logSummary() {
+        long scheduled = scheduledTaskCount.get();
+        long executed  = executedTaskCount.get();
+        long failed    = failedTaskCount.get();
+        long pending   = scheduled - executed;
+
+        LOG.info("RangerTransactionService: tasks(scheduled={}, executed={}, 
failed={}, pending={})", scheduled, executed, failed, pending);
+    }
 }
\ No newline at end of file

Reply via email to