This is an automated email from the ASF dual-hosted git repository.

dineshkumar pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/ranger.git


The following commit(s) were added to refs/heads/master by this push:
     new 433a2ffa4 RANGER-4151 : Create HA support for tagSync
433a2ffa4 is described below

commit 433a2ffa44c12b278806daf61dca432a70636e74
Author: Dineshkumar Yadav <dineshkumar.ya...@outlook.com>
AuthorDate: Sat Jun 3 18:40:30 2023 +0530

    RANGER-4151 : Create HA support for tagSync
    
    Signed-off-by: Dineshkumar Yadav <dineshkumar.ya...@outlook.com>
---
 distro/src/main/assembly/tagsync.xml               |   7 ++
 tagsync/conf/templates/ranger-tagsync-template.xml |  58 +++++++++++
 tagsync/pom.xml                                    |   6 +-
 .../tagsync/ha/TagSyncHAInitializerImpl.java       | 110 +++++++++++++++++++++
 .../ranger/tagsync/model/AbstractTagSource.java    |   7 +-
 .../ranger/tagsync/process/TagSyncConfig.java      |  17 +++-
 .../tagsync/process/TagSyncMetricsProducer.java    |  11 ++-
 .../ranger/tagsync/process/TagSynchronizer.java    |   9 ++
 .../tagsync/sink/tagadmin/TagAdminRESTSink.java    |  51 +++++-----
 .../tagsync/source/atlas/AtlasTagSource.java       |  10 +-
 .../source/atlasrest/AtlasRESTTagSource.java       |  23 +++--
 .../ranger/tagsync/source/file/FileTagSource.java  |   7 +-
 tagsync/src/main/resources/ranger-tagsync-site.xml |  58 ++++++++++-
 13 files changed, 329 insertions(+), 45 deletions(-)

diff --git a/distro/src/main/assembly/tagsync.xml 
b/distro/src/main/assembly/tagsync.xml
index 44a8233cc..a04026059 100644
--- a/distro/src/main/assembly/tagsync.xml
+++ b/distro/src/main/assembly/tagsync.xml
@@ -27,6 +27,7 @@
                        <useAllReactorProjects>true</useAllReactorProjects>
                        <includes>
                                
<include>org.apache.ranger:ranger-tagsync</include>
+                               
<include>org.apache.ranger:ranger-common-ha</include>
                        </includes>
                        <binaries>
                                <outputDirectory>dist</outputDirectory>
@@ -95,6 +96,12 @@
                                                        
<include>org.slf4j:log4j-over-slf4j:jar:${slf4j.version}</include>
                                                        
<include>ch.qos.logback:logback-classic:jar:${logback.version}</include>
                                                        
<include>ch.qos.logback:logback-core:jar:${logback.version}</include>
+                                                       
<include>org.apache.ranger:ranger-common-ha:jar:${project.version}</include>
+                                                       
<include>org.apache.curator:curator-framework:jar:${curator.version}</include>
+                                                       
<include>org.apache.curator:curator-recipes:jar:${curator.version}</include>
+                                                       
<include>org.apache.curator:curator-client:jar:${curator.version}</include>
+                                                       
<include>org.apache.zookeeper:zookeeper:jar:${zookeeper.version}</include>
+                                                       
<include>org.apache.zookeeper:zookeeper-jute:jar:${zookeeper.version}</include>
                                                </includes>
                                        </dependencySet>
                                </dependencySets>
diff --git a/tagsync/conf/templates/ranger-tagsync-template.xml 
b/tagsync/conf/templates/ranger-tagsync-template.xml
index 40bd3dbe6..464dfc6a9 100644
--- a/tagsync/conf/templates/ranger-tagsync-template.xml
+++ b/tagsync/conf/templates/ranger-tagsync-template.xml
@@ -123,4 +123,62 @@
                <name>ranger.tagsync.metrics.enabled</name>
                <value>false</value>
        </property>
+
+       <!-- HA property -->
+       <property>
+               <name>ranger.service.name</name>
+               <value>ranger-tagsync</value>
+       </property>
+       <property>
+               <name>ranger-tagsync.server.ha.enabled</name>
+               <value>false</value>
+       </property>
+       <property>
+               <name>ranger-tagsync.server.ha.zookeeper.zkroot</name>
+               <value>/ranger-tagsync</value>
+       </property>
+       <property>
+               <name>ranger-tagsync.server.ha.zookeeper.connect</name>
+               <value></value>
+       </property>
+       <property>
+               <name>ranger-tagsync.server.ha.ids</name>
+               <value>id1,id2</value>
+       </property>
+       <property>
+               <name>ranger-tagsync.server.ha.address.id1</name>
+               <value></value>
+       </property>
+       <property>
+               <name>ranger-tagsync.server.ha.address.id2</name>
+               <value></value>
+       </property>
+       <property>
+               
<name>ranger-tagsync.server.ha.zookeeper.retry.sleeptime.ms</name>
+               <value>1000</value>
+       </property>
+       <property>
+               
<name>ranger-tagsync.server.ha.zookeeper.session.timeout.ms</name>
+               <value>20000</value>
+       </property>
+       <property>
+               <name>ranger-tagsync.server.ha.zookeeper.num.retries</name>
+               <value>3</value>
+       </property>
+       <property>
+               <name>ranger-tagsync.server.ha.zookeeper.acl</name>
+               <value></value>
+       </property>
+       <property>
+               <name>ranger-tagsync.server.ha.zookeeper.auth</name>
+               <value></value>
+       </property>
+       <property>
+               <name>ranger-tagsync.server.ha.http.port</name>
+               <value></value>
+       </property>
+       <property>
+               <name>ranger-tagsync.server.ha.https.port</name>
+               <value></value>
+       </property>
 </configuration>
diff --git a/tagsync/pom.xml b/tagsync/pom.xml
index 1005a6d1c..ede5e838a 100644
--- a/tagsync/pom.xml
+++ b/tagsync/pom.xml
@@ -298,6 +298,10 @@
             <artifactId>jackson-annotations</artifactId>
             <version>${atlas.jackson.version}</version>
         </dependency>
-
+        <dependency>
+            <groupId>org.apache.ranger</groupId>
+            <artifactId>ranger-common-ha</artifactId>
+            <version>${project.version}</version>
+        </dependency>
     </dependencies>
 </project>
diff --git 
a/tagsync/src/main/java/org/apache/ranger/tagsync/ha/TagSyncHAInitializerImpl.java
 
b/tagsync/src/main/java/org/apache/ranger/tagsync/ha/TagSyncHAInitializerImpl.java
new file mode 100644
index 000000000..016d6e5cc
--- /dev/null
+++ 
b/tagsync/src/main/java/org/apache/ranger/tagsync/ha/TagSyncHAInitializerImpl.java
@@ -0,0 +1,110 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.ranger.tagsync.ha;
+
+import java.util.ArrayList;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Set;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.ranger.RangerHAInitializer;
+import org.apache.ranger.ha.ActiveInstanceElectorService;
+import org.apache.ranger.ha.ActiveStateChangeHandler;
+import org.apache.ranger.ha.ServiceState;
+import org.apache.ranger.ha.service.HARangerService;
+import org.apache.ranger.ha.service.ServiceManager;
+import org.apache.ranger.tagsync.process.TagSyncConfig;
+import org.apache.log4j.Logger;
+
+
+public class TagSyncHAInitializerImpl extends RangerHAInitializer {
+       private static final Logger LOG = 
Logger.getLogger(TagSyncHAInitializerImpl.class);
+       ActiveInstanceElectorService activeInstanceElectorService       = null;
+       ActiveStateChangeHandler activeStateChangeHandler                       
= null;
+       List<HARangerService> haRangerService                                   
        = null;
+       ServiceManager serviceManager                                           
                = null;
+       private static TagSyncHAInitializerImpl theInstance = null;
+
+       private TagSyncHAInitializerImpl(Configuration configuration) {
+               if(LOG.isDebugEnabled()){
+                       LOG.info("==> 
TagSyncHAInitializerImpl.TagSyncHAInitializerImpl()");
+               }
+               try {
+                       LOG.info("Ranger TagSync server is HA enabled : 
"+configuration.getBoolean(TagSyncConfig.TAGSYNC_SERVER_HA_ENABLED_PARAM, 
false) );
+                       init(configuration);
+               } catch (Exception e) {
+                       LOG.error("TagSyncHAInitializerImpl initialization 
failed", e);
+               }
+               if(LOG.isDebugEnabled()){
+                       LOG.info("<== 
TagSyncHAInitializerImpl.TagSyncHAInitializerImpl()");
+               }
+       }
+
+       public void init(Configuration configuration) throws Exception {
+               super.init(configuration);
+               LOG.info("==> TagSyncHAInitializerImpl.init() initialization 
started");
+               Set<ActiveStateChangeHandler> activeStateChangeHandlerProviders 
= new HashSet<ActiveStateChangeHandler>();
+               activeInstanceElectorService = new 
ActiveInstanceElectorService(activeStateChangeHandlerProviders,
+                               curatorFactory, activeInstanceState, 
serviceState, configuration);
+
+               haRangerService = new ArrayList<HARangerService>();
+               haRangerService.add(activeInstanceElectorService);
+               serviceManager = new ServiceManager(haRangerService);
+               LOG.info("<== TagSyncHAInitializerImpl.init() initialization 
completed");
+       }
+
+
+       @Override
+       public void stop() {
+               if(LOG.isDebugEnabled()){
+                       LOG.debug("==> TagSyncHAInitializerImpl.stop() ");
+               }
+               if (serviceManager != null) {
+                       serviceManager.stop();
+               }
+               if(curatorFactory != null){
+                       curatorFactory.close();
+               }
+               if(LOG.isDebugEnabled()){
+                       LOG.debug("<== TagSyncHAInitializerImpl.stop() ");
+               }
+       }
+
+       public static TagSyncHAInitializerImpl getInstance(Configuration 
configuration) {
+               if(theInstance == null){
+                        synchronized(TagSyncHAInitializerImpl.class){
+                               if(theInstance == null){
+                                       theInstance =  new 
TagSyncHAInitializerImpl(configuration);
+                               }
+                       }
+               }
+               return theInstance;
+       }
+       public boolean isActive() {
+               try {
+                       // To let the curator thread a chance to run and set 
the active state if needed
+                       Thread.sleep(0L);
+               } catch (InterruptedException exception) {
+                       // Ignore
+               }
+               return 
serviceState.getState().equals(ServiceState.ServiceStateValue.ACTIVE);
+       }
+}
diff --git 
a/tagsync/src/main/java/org/apache/ranger/tagsync/model/AbstractTagSource.java 
b/tagsync/src/main/java/org/apache/ranger/tagsync/model/AbstractTagSource.java
index 09f3292e7..ff9937628 100644
--- 
a/tagsync/src/main/java/org/apache/ranger/tagsync/model/AbstractTagSource.java
+++ 
b/tagsync/src/main/java/org/apache/ranger/tagsync/model/AbstractTagSource.java
@@ -21,6 +21,7 @@ package org.apache.ranger.tagsync.model;
 
 import com.google.gson.Gson;
 import org.apache.ranger.plugin.util.ServiceTags;
+import org.apache.ranger.tagsync.process.TagSyncConfig;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -59,14 +60,16 @@ public abstract  class AbstractTagSource implements 
TagSource {
                                LOG.debug("No ServiceTags to upload");
                        }
                } else {
+                       if(!TagSyncConfig.isTagSyncServiceActive()) {
+                               LOG.error("This TagSync server is not in active 
state. Cannot commit transaction!");
+                               throw new RuntimeException("This TagSync server 
is not in active state. Cannot commit transaction!");
+                       }
                        if (LOG.isDebugEnabled()) {
                                String toUploadJSON = new 
Gson().toJson(toUpload);
                                LOG.debug("Uploading serviceTags=" + 
toUploadJSON);
                        }
-
                        try {
                                ServiceTags uploaded = tagSink.upload(toUpload);
-
                                if (LOG.isDebugEnabled()) {
                                        String uploadedJSON = new 
Gson().toJson(uploaded);
                                        LOG.debug("Uploaded serviceTags=" + 
uploadedJSON);
diff --git 
a/tagsync/src/main/java/org/apache/ranger/tagsync/process/TagSyncConfig.java 
b/tagsync/src/main/java/org/apache/ranger/tagsync/process/TagSyncConfig.java
index 590426cd0..87b655df5 100644
--- a/tagsync/src/main/java/org/apache/ranger/tagsync/process/TagSyncConfig.java
+++ b/tagsync/src/main/java/org/apache/ranger/tagsync/process/TagSyncConfig.java
@@ -22,6 +22,7 @@ package org.apache.ranger.tagsync.process;
 import org.apache.commons.lang.StringUtils;
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.security.SecureClientLogin;
+import org.apache.ranger.tagsync.ha.TagSyncHAInitializerImpl;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -43,6 +44,7 @@ import org.apache.ranger.plugin.util.RangerCommonConstants;
 public class TagSyncConfig extends Configuration {
        private static final Logger LOG = 
LoggerFactory.getLogger(TagSyncConfig.class);
 
+       private static TagSyncConfig instance = null;
        private static final String CONFIG_FILE = "ranger-tagsync-site.xml";
 
        private static final String DEFAULT_CONFIG_FILE = 
"ranger-tagsync-default.xml";
@@ -122,7 +124,7 @@ public class TagSyncConfig extends Configuration {
        private static final String  TAGSYNC_SINK_MAX_BATCH_SIZE_PROP    = 
"ranger.tagsync.dest.ranger.max.batch.size";
 
        private static final String 
TAGSYNC_ATLASREST_SOURCE_ENTITIES_BATCH_SIZE = 
"ranger.tagsync.source.atlasrest.entities.batch.size";
-
+       public static final String TAGSYNC_SERVER_HA_ENABLED_PARAM = 
"ranger-tagsync.server.ha.enabled";
 
        private Properties props;
 
@@ -135,7 +137,14 @@ public class TagSyncConfig extends Configuration {
        }
        
        public static TagSyncConfig getInstance() {
-               return new TagSyncConfig();
+               if(instance == null ){
+                       synchronized (TagSyncConfig.class){
+                               if(instance == null ){
+                                       instance = new TagSyncConfig();
+                               }
+                       }
+               }
+               return instance;
        }
 
        public Properties getProperties() {
@@ -215,6 +224,10 @@ public class TagSyncConfig extends Configuration {
                return ret;
        }
 
+       synchronized static public boolean isTagSyncServiceActive() {
+               return 
TagSyncHAInitializerImpl.getInstance(TagSyncConfig.getInstance()).isActive();
+       }
+
        @Override
        public String toString() {
                StringBuilder sb = new StringBuilder();
diff --git 
a/tagsync/src/main/java/org/apache/ranger/tagsync/process/TagSyncMetricsProducer.java
 
b/tagsync/src/main/java/org/apache/ranger/tagsync/process/TagSyncMetricsProducer.java
index a3b5d03e1..4b8decec3 100644
--- 
a/tagsync/src/main/java/org/apache/ranger/tagsync/process/TagSyncMetricsProducer.java
+++ 
b/tagsync/src/main/java/org/apache/ranger/tagsync/process/TagSyncMetricsProducer.java
@@ -55,7 +55,7 @@ public class TagSyncMetricsProducer implements Runnable {
                                                                + "] 
milliseconds before attempting to tagsync metrics information", e);
                                        }
                                        try {
-                                               
writeJVMMetrics(logFileNameWithPath);
+                                               
writeJVMMetrics(logFileNameWithPath, config);
                                        } catch (Throwable t) {
                                                LOG.error("Failed to write 
tagsync metrics into file. Error details: ", t);
                                        }
@@ -71,7 +71,7 @@ public class TagSyncMetricsProducer implements Runnable {
 
        }
 
-       private void writeJVMMetrics(String logFileNameWithPath) throws 
Throwable {
+       private void writeJVMMetrics(String logFileNameWithPath, TagSyncConfig 
config) throws Throwable {
                try {
                        File userMetricFile = null;
                        userMetricFile = new File(logFileNameWithPath);
@@ -79,6 +79,13 @@ public class TagSyncMetricsProducer implements Runnable {
                                userMetricFile.createNewFile();
                        }
                        RangerMetricsUtil rangerMetricsUtil = new 
RangerMetricsUtil();
+                       if 
(config.getBoolean(TagSyncConfig.TAGSYNC_SERVER_HA_ENABLED_PARAM, false)) {
+                               if(config.isTagSyncServiceActive()){
+                                       rangerMetricsUtil.setIsRoleActive(1);
+                               }else{
+                                       rangerMetricsUtil.setIsRoleActive(0);
+                               }
+                       }
                        rangerMetricsUtil.writeMetricsToFile(userMetricFile);
 
                } catch (Throwable t) {
diff --git 
a/tagsync/src/main/java/org/apache/ranger/tagsync/process/TagSynchronizer.java 
b/tagsync/src/main/java/org/apache/ranger/tagsync/process/TagSynchronizer.java
index 55cc0a8aa..7963f88c5 100644
--- 
a/tagsync/src/main/java/org/apache/ranger/tagsync/process/TagSynchronizer.java
+++ 
b/tagsync/src/main/java/org/apache/ranger/tagsync/process/TagSynchronizer.java
@@ -31,6 +31,7 @@ import org.apache.commons.collections.MapUtils;
 import org.apache.commons.lang.StringUtils;
 import org.apache.hadoop.security.SecureClientLogin;
 import org.apache.hadoop.security.UserGroupInformation;
+import org.apache.ranger.tagsync.ha.TagSyncHAInitializerImpl;
 import org.apache.ranger.tagsync.model.TagSink;
 import org.apache.ranger.tagsync.model.TagSource;
 import org.slf4j.Logger;
@@ -52,6 +53,7 @@ public class TagSynchronizer {
 
        private final Object shutdownNotifier = new Object();
        private volatile boolean isShutdownInProgress = false;
+       private TagSyncHAInitializerImpl tagSyncHAinitializerImpl = null;
 
        public static void main(String[] args) {
                TagSynchronizer tagSynchronizer = new TagSynchronizer();
@@ -63,16 +65,23 @@ public class TagSynchronizer {
                tagSynchronizer.setProperties(props);
 
                boolean tagSynchronizerInitialized = 
tagSynchronizer.initialize();
+               tagSynchronizer.tagSyncHAinitializerImpl = 
TagSyncHAInitializerImpl.getInstance(config);
 
                if (tagSynchronizerInitialized) {
                        try {
                                tagSynchronizer.run();
                        } catch (Throwable t) {
                                LOG.error("main thread caught exception..:", t);
+                               if (tagSynchronizer.tagSyncHAinitializerImpl != 
null) {
+                                       
tagSynchronizer.tagSyncHAinitializerImpl.stop();
+                               }
                                System.exit(1);
                        }
                } else {
                        LOG.error("TagSynchronizer failed to initialize 
correctly, exiting..");
+                       if (tagSynchronizer.tagSyncHAinitializerImpl != null) {
+                               tagSynchronizer.tagSyncHAinitializerImpl.stop();
+                       }
                        System.exit(1);
                }
 
diff --git 
a/tagsync/src/main/java/org/apache/ranger/tagsync/sink/tagadmin/TagAdminRESTSink.java
 
b/tagsync/src/main/java/org/apache/ranger/tagsync/sink/tagadmin/TagAdminRESTSink.java
index db76a678f..ac0069a93 100644
--- 
a/tagsync/src/main/java/org/apache/ranger/tagsync/sink/tagadmin/TagAdminRESTSink.java
+++ 
b/tagsync/src/main/java/org/apache/ranger/tagsync/sink/tagadmin/TagAdminRESTSink.java
@@ -366,40 +366,41 @@ public class TagAdminRESTSink implements TagSink, 
Runnable {
                }
 
                while (true) {
-                       UploadWorkItem uploadWorkItem;
+                       if (TagSyncConfig.isTagSyncServiceActive()) {
+                               UploadWorkItem uploadWorkItem;
 
-                       try {
-                               uploadWorkItem = uploadWorkItems.take();
+                               try {
+                                       uploadWorkItem = uploadWorkItems.take();
 
-                               ServiceTags toUpload = 
uploadWorkItem.getServiceTags();
+                                       ServiceTags toUpload = 
uploadWorkItem.getServiceTags();
 
-                               boolean doRetry;
+                                       boolean doRetry;
 
-                               do {
-                                       doRetry = false;
+                                       do {
+                                               doRetry = false;
 
-                                       try {
-                                               ServiceTags uploaded = 
doUpload(toUpload);
-                                               if (uploaded == null) { // 
Treat this as if an Exception is thrown by doUpload
+                                               try {
+                                                       ServiceTags uploaded = 
doUpload(toUpload);
+                                                       if (uploaded == null) { 
// Treat this as if an Exception is thrown by doUpload
+                                                               doRetry = true;
+                                                               
Thread.sleep(rangerAdminConnectionCheckInterval);
+                                                       } else {
+                                                               // ServiceTags 
uploaded successfully
+                                                               
uploadWorkItem.uploadCompleted(uploaded);
+                                                       }
+                                               } catch (InterruptedException 
interrupted) {
+                                                       LOG.error("Caught 
exception..: ", interrupted);
+                                                       return;
+                                               } catch (Exception exception) {
                                                        doRetry = true;
                                                        
Thread.sleep(rangerAdminConnectionCheckInterval);
-                                               } else {
-                                                       // ServiceTags uploaded 
successfully
-                                                       
uploadWorkItem.uploadCompleted(uploaded);
                                                }
-                                       } catch (InterruptedException 
interrupted) {
-                                               LOG.error("Caught exception..: 
", interrupted);
-                                               return;
-                                       } catch (Exception exception) {
-                                               doRetry = true;
-                                               
Thread.sleep(rangerAdminConnectionCheckInterval);
-                                       }
-                               } while (doRetry);
+                                       } while (doRetry);
 
-                       }
-                       catch (InterruptedException exception) {
-                               LOG.error("Interrupted..: ", exception);
-                               return;
+                               } catch (InterruptedException exception) {
+                                       LOG.error("Interrupted..: ", exception);
+                                       return;
+                               }
                        }
                }
 
diff --git 
a/tagsync/src/main/java/org/apache/ranger/tagsync/source/atlas/AtlasTagSource.java
 
b/tagsync/src/main/java/org/apache/ranger/tagsync/source/atlas/AtlasTagSource.java
index 34a39f73c..071f52c4a 100644
--- 
a/tagsync/src/main/java/org/apache/ranger/tagsync/source/atlas/AtlasTagSource.java
+++ 
b/tagsync/src/main/java/org/apache/ranger/tagsync/source/atlas/AtlasTagSource.java
@@ -184,9 +184,12 @@ public class AtlasTagSource extends AbstractTagSource {
                        }
 
                        while (true) {
-
-                               try {
-                                       
List<AtlasKafkaMessage<EntityNotification>> newMessages = 
consumer.receive(MAX_WAIT_TIME_IN_MILLIS);
+                               if (TagSyncConfig.isTagSyncServiceActive()) {
+                                       if (LOG.isDebugEnabled()) {
+                                               LOG.debug("==> 
ConsumerRunnable.run() is running as server is active");
+                                       }
+                                       try {
+                                               
List<AtlasKafkaMessage<EntityNotification>> newMessages = 
consumer.receive(MAX_WAIT_TIME_IN_MILLIS);
 
                                        if (newMessages.size() == 0) {
                                                if (LOG.isDebugEnabled()) {
@@ -251,6 +254,7 @@ public class AtlasTagSource extends AbstractTagSource {
                                                return;
                                        }
                                }
+                         }
                        }
                }
 
diff --git 
a/tagsync/src/main/java/org/apache/ranger/tagsync/source/atlasrest/AtlasRESTTagSource.java
 
b/tagsync/src/main/java/org/apache/ranger/tagsync/source/atlasrest/AtlasRESTTagSource.java
index 9063b03f5..792ced132 100644
--- 
a/tagsync/src/main/java/org/apache/ranger/tagsync/source/atlasrest/AtlasRESTTagSource.java
+++ 
b/tagsync/src/main/java/org/apache/ranger/tagsync/source/atlasrest/AtlasRESTTagSource.java
@@ -189,15 +189,23 @@ public class AtlasRESTTagSource extends AbstractTagSource 
implements Runnable {
         if (LOG.isDebugEnabled()) {
             LOG.debug("==> AtlasRESTTagSource.run()");
         }
-        while (true) {
-            try {
-                synchUp();
-
-                LOG.debug("Sleeping for [" + sleepTimeBetweenCycleInMillis + 
"] milliSeconds");
+            while (true) {
+                try {
+                    if (TagSyncConfig.isTagSyncServiceActive()) {
+                        if (LOG.isDebugEnabled()) {
+                            LOG.debug("==> AtlasRESTTagSource.run() is running 
as server is Active");
+                        }
+                        synchUp();
+                    }else{
+                        if (LOG.isDebugEnabled()) {
+                            LOG.debug("==> This server is running passive 
mode");
+                        }
+                    }
+                    LOG.debug("Sleeping for [" + sleepTimeBetweenCycleInMillis 
+ "] milliSeconds");
 
-                Thread.sleep(sleepTimeBetweenCycleInMillis);
+                    Thread.sleep(sleepTimeBetweenCycleInMillis);
 
-            } catch (InterruptedException exception) {
+                } catch (InterruptedException exception) {
                 LOG.error("Interrupted..: ", exception);
                 return;
             } catch (Exception e) {
@@ -208,7 +216,6 @@ public class AtlasRESTTagSource extends AbstractTagSource 
implements Runnable {
     }
 
        public void synchUp() throws Exception {
-
                List<RangerAtlasEntityWithTags> rangerAtlasEntities = 
getAtlasActiveEntities();
 
                if (CollectionUtils.isNotEmpty(rangerAtlasEntities)) {
diff --git 
a/tagsync/src/main/java/org/apache/ranger/tagsync/source/file/FileTagSource.java
 
b/tagsync/src/main/java/org/apache/ranger/tagsync/source/file/FileTagSource.java
index 8af15f95a..65deccd14 100644
--- 
a/tagsync/src/main/java/org/apache/ranger/tagsync/source/file/FileTagSource.java
+++ 
b/tagsync/src/main/java/org/apache/ranger/tagsync/source/file/FileTagSource.java
@@ -216,7 +216,12 @@ public class FileTagSource extends AbstractTagSource 
implements Runnable {
 
                while (true) {
                        try {
-                               synchUp();
+                               if (TagSyncConfig.isTagSyncServiceActive()) {
+                                       if (LOG.isDebugEnabled()) {
+                                               LOG.debug("==> FileTagSource is 
running as server is active");
+                                       }
+                                       synchUp();
+                               }
                        } catch (Exception e) {
                                LOG.error("Caught exception..", e);
                        } finally {
diff --git a/tagsync/src/main/resources/ranger-tagsync-site.xml 
b/tagsync/src/main/resources/ranger-tagsync-site.xml
index e17bdb213..6bda5f4ab 100644
--- a/tagsync/src/main/resources/ranger-tagsync-site.xml
+++ b/tagsync/src/main/resources/ranger-tagsync-site.xml
@@ -135,5 +135,61 @@
         <value>cl2_hdfs</value>
     </property>
     -->
-
+       <!-- HA property -->
+       <property>
+               <name>ranger.service.name</name>
+               <value>ranger-tagsync</value>
+       </property>
+       <property>
+               <name>ranger-tagsync.server.ha.enabled</name>
+               <value>false</value>
+       </property>
+       <property>
+               <name>ranger-tagsync.server.ha.zookeeper.zkroot</name>
+               <value>/ranger-tagsync</value>
+       </property>
+       <property>
+               <name>ranger-tagsync.server.ha.zookeeper.connect</name>
+               <value></value>
+       </property>
+       <property>
+               <name>ranger-tagsync.server.ha.ids</name>
+               <value>id1,id2</value>
+       </property>
+       <property>
+               <name>ranger-tagsync.server.ha.address.id1</name>
+               <value></value>
+       </property>
+       <property>
+               <name>ranger-tagsync.server.ha.address.id2</name>
+               <value></value>
+       </property>
+       <property>
+               
<name>ranger-tagsync.server.ha.zookeeper.retry.sleeptime.ms</name>
+               <value>1000</value>
+       </property>
+       <property>
+               
<name>ranger-tagsync.server.ha.zookeeper.session.timeout.ms</name>
+               <value>20000</value>
+       </property>
+       <property>
+               <name>ranger-tagsync.server.ha.zookeeper.num.retries</name>
+               <value>3</value>
+       </property>
+       <property>
+               <name>ranger-tagsync.server.ha.zookeeper.acl</name>
+               <value></value>
+       </property>
+       <property>
+               <name>ranger-tagsync.server.ha.zookeeper.auth</name>
+               <value></value>
+       </property>
+       <property>
+               <name>ranger-tagsync.server.ha.http.port</name>
+               <value></value>
+       </property>
+       <property>
+               <name>ranger-tagsync.server.ha.https.port</name>
+               <value></value>
+       </property>
 </configuration>

Reply via email to