fateh288 commented on code in PR #447:
URL: https://github.com/apache/ranger/pull/447#discussion_r1882819088


##########
tagsync/src/main/java/org/apache/ranger/tagsync/source/atlas/AtlasTagSource.java:
##########
@@ -45,288 +45,250 @@
 import java.util.Properties;
 
 public class AtlasTagSource extends AbstractTagSource {
-       private static final Logger LOG = 
LoggerFactory.getLogger(AtlasTagSource.class);
-
-       public static final String TAGSYNC_ATLAS_PROPERTIES_FILE_NAME = 
"atlas-application.properties";
-
-       public static final String TAGSYNC_ATLAS_KAFKA_ENDPOINTS      = 
"atlas.kafka.bootstrap.servers";
-       public static final String TAGSYNC_ATLAS_ZOOKEEPER_ENDPOINT   = 
"atlas.kafka.zookeeper.connect";
-       public static final String TAGSYNC_ATLAS_CONSUMER_GROUP       = 
"atlas.kafka.entities.group.id";
-
-       public static final int    MAX_WAIT_TIME_IN_MILLIS = 1000;
-
-       private             int    maxBatchSize;
-
-       private ConsumerRunnable consumerTask;
-       private Thread myThread = null;
-
-       @Override
-       public boolean initialize(Properties properties) {
-               if (LOG.isDebugEnabled()) {
-                       LOG.debug("==> AtlasTagSource.initialize()");
-               }
-
-               Properties atlasProperties = new Properties();
-
-               boolean ret = 
AtlasResourceMapperUtil.initializeAtlasResourceMappers(properties);
-
-               if (ret) {
-
-                       InputStream inputStream = 
getClass().getClassLoader().getResourceAsStream(TAGSYNC_ATLAS_PROPERTIES_FILE_NAME);
-
-                       if (inputStream != null) {
-                               try {
-                                       atlasProperties.load(inputStream);
-                               } catch (Exception exception) {
-                                       ret = false;
-                                       LOG.error("Cannot load Atlas 
application properties file, file-name:" + TAGSYNC_ATLAS_PROPERTIES_FILE_NAME, 
exception);
-                               } finally {
-                                       try {
-                                               inputStream.close();
-                                       } catch (IOException ioException) {
-                                               LOG.error("Cannot close Atlas 
application properties file, file-name:" + TAGSYNC_ATLAS_PROPERTIES_FILE_NAME, 
ioException);
-                                       }
-                               }
-                       } else {
-                               ret = false;
-                               LOG.error("Cannot find Atlas application 
properties file");
-                       }
-               }
-
-               if (ret) {
-                       if 
(StringUtils.isBlank(atlasProperties.getProperty(TAGSYNC_ATLAS_KAFKA_ENDPOINTS)))
 {
-                               ret = false;
-                               LOG.error("Value of property '" + 
TAGSYNC_ATLAS_KAFKA_ENDPOINTS + "' is not specified!");
-                       }
-                       if 
(StringUtils.isBlank(atlasProperties.getProperty(TAGSYNC_ATLAS_ZOOKEEPER_ENDPOINT)))
 {
-                               ret = false;
-                               LOG.error("Value of property '" + 
TAGSYNC_ATLAS_ZOOKEEPER_ENDPOINT + "' is not specified!");
-                       }
-                       if 
(StringUtils.isBlank(atlasProperties.getProperty(TAGSYNC_ATLAS_CONSUMER_GROUP)))
 {
-                               ret = false;
-                               LOG.error("Value of property '" + 
TAGSYNC_ATLAS_CONSUMER_GROUP + "' is not specified!");
-                       }
-               }
-
-               if (ret) {
-                       NotificationInterface notification = 
NotificationProvider.get();
-                       List<NotificationConsumer<EntityNotification>> 
iterators = 
notification.createConsumers(NotificationInterface.NotificationType.ENTITIES, 
1);
-
-                       consumerTask = new ConsumerRunnable(iterators.get(0));
-               }
-
-               maxBatchSize = TagSyncConfig.getSinkMaxBatchSize(properties);
-
-               if (LOG.isDebugEnabled()) {
-                       LOG.debug("<== AtlasTagSource.initialize(), result=" + 
ret);
-               }
-               return ret;
-       }
-
-       @Override
-       public boolean start() {
-               if (LOG.isDebugEnabled()) {
-                       LOG.debug("==> AtlasTagSource.start()");
-               }
-               if (consumerTask == null) {
-                       LOG.error("No consumerTask!!!");
-               } else {
-                       myThread = new Thread(consumerTask);
-                       myThread.setDaemon(true);
-                       myThread.start();
-               }
-               if (LOG.isDebugEnabled()) {
-                       LOG.debug("<== AtlasTagSource.start()");
-               }
-               return myThread != null;
-       }
-
-       @Override
-       public void stop() {
-               if (myThread != null && myThread.isAlive()) {
-                       myThread.interrupt();
-               }
-       }
-
-       private static String 
getPrintableEntityNotification(EntityNotificationWrapper notification) {
-               StringBuilder sb = new StringBuilder();
-
-               sb.append("{ Notification-Type: 
").append(notification.getOpType()).append(", ");
+    public static final String TAGSYNC_ATLAS_PROPERTIES_FILE_NAME = 
"atlas-application.properties";
+    public static final String TAGSYNC_ATLAS_KAFKA_ENDPOINTS    = 
"atlas.kafka.bootstrap.servers";
+    public static final String TAGSYNC_ATLAS_ZOOKEEPER_ENDPOINT = 
"atlas.kafka.zookeeper.connect";
+    public static final String TAGSYNC_ATLAS_CONSUMER_GROUP     = 
"atlas.kafka.entities.group.id";
+    public static final int MAX_WAIT_TIME_IN_MILLIS = 1000;
+    private static final Logger LOG = 
LoggerFactory.getLogger(AtlasTagSource.class);

Review Comment:
   Addressed



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: dev-unsubscr...@ranger.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org

Reply via email to