This is an automated email from the ASF dual-hosted git repository.

vel pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/ranger.git


The following commit(s) were added to refs/heads/master by this push:
     new a97f094  RANGER-2313: tagsync fails to authenticate with ranger in 
kerberized cluster when using ranger-tagsync-update.sh script
a97f094 is described below

commit a97f0947e192ea67cba64a9ae4be18f6375b2dc3
Author: Abhay Kulkarni <>
AuthorDate: Thu Dec 27 10:18:22 2018 -0800

    RANGER-2313: tagsync fails to authenticate with ranger in kerberized 
cluster when using ranger-tagsync-update.sh script
    
    Signed-off-by: Velmurugan Periasamy <v...@apache.org>
---
 .../ranger/tagsync/process/TagSynchronizer.java    |   2 +-
 .../source/atlas/EntityNotificationWrapper.java    |   4 +-
 .../source/atlasrest/AtlasRESTTagSource.java       | 177 +++++++++++++--------
 .../ranger/tagsync/source/file/FileTagSource.java  |  31 ++--
 4 files changed, 129 insertions(+), 85 deletions(-)

diff --git 
a/tagsync/src/main/java/org/apache/ranger/tagsync/process/TagSynchronizer.java 
b/tagsync/src/main/java/org/apache/ranger/tagsync/process/TagSynchronizer.java
index 49ff76f..8806c74 100644
--- 
a/tagsync/src/main/java/org/apache/ranger/tagsync/process/TagSynchronizer.java
+++ 
b/tagsync/src/main/java/org/apache/ranger/tagsync/process/TagSynchronizer.java
@@ -353,7 +353,7 @@ public class TagSynchronizer {
                return tagSource;
        }
 
-       private static boolean initializeKerberosIdentity(Properties props) {
+       public static boolean initializeKerberosIdentity(Properties props) {
                if (LOG.isDebugEnabled()) {
                        LOG.debug("==> 
TagSynchronizer.initializeKerberosIdentity()");
                }
diff --git 
a/tagsync/src/main/java/org/apache/ranger/tagsync/source/atlas/EntityNotificationWrapper.java
 
b/tagsync/src/main/java/org/apache/ranger/tagsync/source/atlas/EntityNotificationWrapper.java
index adaa2f9..9781aa6 100644
--- 
a/tagsync/src/main/java/org/apache/ranger/tagsync/source/atlas/EntityNotificationWrapper.java
+++ 
b/tagsync/src/main/java/org/apache/ranger/tagsync/source/atlas/EntityNotificationWrapper.java
@@ -93,7 +93,7 @@ public class EntityNotificationWrapper {
                 isEntityTypeHandled    = isEntityActive && 
AtlasResourceMapperUtil.isEntityTypeHandled(entityTypeName);
                 isEntityDeleteOp       = 
EntityNotificationV2.OperationType.ENTITY_DELETE == 
v2Notification.getOperationType();
                 isEntityCreateOp       = 
EntityNotificationV2.OperationType.ENTITY_CREATE == 
v2Notification.getOperationType();
-                isEmptyClassifications = 
CollectionUtils.isNotEmpty(atlasEntity.getClassifications());
+                isEmptyClassifications = 
CollectionUtils.isEmpty(atlasEntity.getClassifications());
 
                 List<AtlasClassification> allClassifications = 
atlasEntity.getClassifications();
 
@@ -166,7 +166,7 @@ public class EntityNotificationWrapper {
                 isEntityTypeHandled    = isEntityActive && 
AtlasResourceMapperUtil.isEntityTypeHandled(entityTypeName);
                 isEntityDeleteOp       = 
EntityNotificationV1.OperationType.ENTITY_DELETE == 
v1Notification.getOperationType();
                 isEntityCreateOp       = 
EntityNotificationV1.OperationType.ENTITY_CREATE == 
v1Notification.getOperationType();
-                isEmptyClassifications = 
CollectionUtils.isNotEmpty(v1Notification.getAllTraits());
+                isEmptyClassifications = 
CollectionUtils.isEmpty(v1Notification.getAllTraits());
 
                 List<Struct> allTraits = ((EntityNotificationV1) 
notification).getAllTraits();
 
diff --git 
a/tagsync/src/main/java/org/apache/ranger/tagsync/source/atlasrest/AtlasRESTTagSource.java
 
b/tagsync/src/main/java/org/apache/ranger/tagsync/source/atlasrest/AtlasRESTTagSource.java
index 2b4a668..8b12aff 100644
--- 
a/tagsync/src/main/java/org/apache/ranger/tagsync/source/atlasrest/AtlasRESTTagSource.java
+++ 
b/tagsync/src/main/java/org/apache/ranger/tagsync/source/atlasrest/AtlasRESTTagSource.java
@@ -69,7 +69,8 @@ import java.util.TimeZone;
 public class AtlasRESTTagSource extends AbstractTagSource implements Runnable {
        private static final Log LOG = 
LogFactory.getLog(AtlasRESTTagSource.class);
 
-    private static final ThreadLocal<DateFormat> DATE_FORMATTER = new 
ThreadLocal<DateFormat>() {
+       private static final int REQUESTED_ENTITIES_LIMIT_MAX = 10000;
+       private static final ThreadLocal<DateFormat> DATE_FORMATTER = new 
ThreadLocal<DateFormat>() {
                @Override
                protected DateFormat initialValue() {
                        SimpleDateFormat dateFormat = new 
SimpleDateFormat(AtlasBaseTypeDef.SERIALIZED_DATE_FORMAT_STR);
@@ -97,26 +98,34 @@ public class AtlasRESTTagSource extends AbstractTagSource 
implements Runnable {
 
                TagSynchronizer.printConfigurationProperties(props);
 
-               TagSink tagSink = TagSynchronizer.initializeTagSink(props);
+               boolean ret = TagSynchronizer.initializeKerberosIdentity(props);
 
-               if (tagSink != null) {
+               if (ret) {
 
-                       if (atlasRESTTagSource.initialize(props)) {
-                               try {
-                                       tagSink.start();
-                                       atlasRESTTagSource.setTagSink(tagSink);
-                                       atlasRESTTagSource.synchUp();
-                               } catch (Exception exception) {
-                                       LOG.error("ServiceTags upload failed : 
", exception);
+                       TagSink tagSink = 
TagSynchronizer.initializeTagSink(props);
+
+                       if (tagSink != null) {
+
+                               if (atlasRESTTagSource.initialize(props)) {
+                                       try {
+                                               tagSink.start();
+                                               
atlasRESTTagSource.setTagSink(tagSink);
+                                               atlasRESTTagSource.synchUp();
+                                       } catch (Exception exception) {
+                                               LOG.error("ServiceTags upload 
failed : ", exception);
+                                               System.exit(1);
+                                       }
+                               } else {
+                                       LOG.error("AtlasRESTTagSource 
initialization failed, exiting.");
                                        System.exit(1);
                                }
+
                        } else {
-                               LOG.error("AtlasRESTTagSource initialized 
failed, exiting.");
+                               LOG.error("TagSink initialization failed, 
exiting.");
                                System.exit(1);
                        }
-
                } else {
-                       LOG.error("TagSink initialialization failed, exiting.");
+                       LOG.error("Error initializing kerberos identity");
                        System.exit(1);
                }
 
@@ -236,76 +245,104 @@ public class AtlasRESTTagSource extends 
AbstractTagSource implements Runnable {
         if (LOG.isDebugEnabled()) {
             LOG.debug("==> getAtlasActiveEntities()");
         }
-        List<RangerAtlasEntityWithTags> ret = null;
-
-        SearchParameters searchParams = new SearchParameters();
-        AtlasTypeRegistry typeRegistry = new AtlasTypeRegistry();
-        AtlasTypeRegistry.AtlasTransientTypeRegistry tty = null;
-        AtlasSearchResult searchResult = null;
+        List<RangerAtlasEntityWithTags> ret         = new ArrayList<>();
 
-        searchParams.setClassification("*");
-        searchParams.setIncludeClassificationAttributes(true);
-        searchParams.setOffset(0);
-        searchParams.setLimit(Integer.MAX_VALUE);
-
-        boolean commitUpdates = false;
+        AtlasClientV2                   atlasClient = null;
         try {
-            AtlasClientV2 atlasClient = getAtlasClient();
-            searchResult = atlasClient.facetedSearch(searchParams);
-            AtlasTypesDef typesDef = atlasClient.getAllTypeDefs(new 
SearchFilter());
-            tty = typeRegistry.lockTypeRegistryForUpdate();
-            tty.addTypes(typesDef);
-            commitUpdates = true;
-        } catch (AtlasServiceException | AtlasBaseException | IOException 
excp) {
-            LOG.error("failed to download tags from Atlas", excp);
-        } catch (Exception unexpectedException) {
-            LOG.error("Failed to download tags from Atlas due to unexpected 
exception", unexpectedException);
-        } finally {
-            if (tty != null) {
-                typeRegistry.releaseTypeRegistryForUpdate(tty, commitUpdates);
-            }
+            atlasClient = getAtlasClient();
+        } catch (IOException exception) {
+            LOG.error("Failed to get Atlas client.", exception);
         }
 
-        if (commitUpdates && searchResult != null) {
-            if (LOG.isDebugEnabled()) {
-                LOG.debug(AtlasType.toJson(searchResult));
-            }
-            ret = new ArrayList<>();
-            List<AtlasEntityHeader> entityHeaders = searchResult.getEntities();
-            if (CollectionUtils.isNotEmpty(entityHeaders)) {
-                for (AtlasEntityHeader header : entityHeaders) {
-                    if (!header.getStatus().equals(AtlasEntity.Status.ACTIVE)) 
{
-                        if (LOG.isDebugEnabled()) {
-                            LOG.debug("Skipping entity because it is not 
ACTIVE, header:[" + header + "]");
-                        }
-                        continue;
+        if (atlasClient != null) {
+
+            SearchParameters searchParams = new SearchParameters();
+
+            searchParams.setExcludeDeletedEntities(true);
+            searchParams.setClassification("*");
+            //searchParams.setIncludeSubClassifications(true);
+            //searchParams.setIncludeSubTypes(true);
+            searchParams.setIncludeClassificationAttributes(true);
+            searchParams.setLimit(REQUESTED_ENTITIES_LIMIT_MAX);
+
+            boolean isMoreData;
+            int     nextStartIndex = 0;
+
+            do {
+                AtlasTypeRegistry                            typeRegistry  = 
new AtlasTypeRegistry();
+                AtlasTypeRegistry.AtlasTransientTypeRegistry tty           = 
null;
+                AtlasSearchResult                            searchResult  = 
null;
+                boolean                                      commitUpdates = 
false;
+
+                searchParams.setOffset(nextStartIndex);
+                isMoreData = false;
+
+                try {
+                    searchResult = atlasClient.facetedSearch(searchParams);
+                    AtlasTypesDef typesDef = atlasClient.getAllTypeDefs(new 
SearchFilter());
+                    tty = typeRegistry.lockTypeRegistryForUpdate();
+                    tty.addTypes(typesDef);
+                    commitUpdates = true;
+                } catch (AtlasServiceException | AtlasBaseException excp) {
+                    LOG.error("failed to download tags from Atlas", excp);
+                    ret = null;
+                } catch (Exception unexpectedException) {
+                    LOG.error("Failed to download tags from Atlas due to 
unexpected exception", unexpectedException);
+                    ret = null;
+                } finally {
+                    if (tty != null) {
+                        typeRegistry.releaseTypeRegistryForUpdate(tty, 
commitUpdates);
                     }
+                }
 
-                    String typeName = header.getTypeName();
-                    if 
(!AtlasResourceMapperUtil.isEntityTypeHandled(typeName)) {
-                        if (LOG.isDebugEnabled()) {
-                            LOG.debug("Not fetching Atlas entities of type:[" 
+ typeName + "]");
-                        }
-                        continue;
+                if (commitUpdates && searchResult != null) {
+                    if (LOG.isDebugEnabled()) {
+                        LOG.debug(AtlasType.toJson(searchResult));
                     }
 
-                    List<EntityNotificationWrapper.RangerAtlasClassification>  
        allTagsForEntity       = new ArrayList<>();
+                    List<AtlasEntityHeader> entityHeaders = 
searchResult.getEntities();
 
-                    for (AtlasClassification classification : 
header.getClassifications()) {
-                        
List<EntityNotificationWrapper.RangerAtlasClassification> tags = 
resolveTag(typeRegistry, classification);
-                        if (tags != null) {
-                            allTagsForEntity.addAll(tags);
-                        }
-                    }
+                    if (CollectionUtils.isNotEmpty(entityHeaders)) {
 
-                    if (CollectionUtils.isNotEmpty(allTagsForEntity)) {
-                        RangerAtlasEntity         entity         = new 
RangerAtlasEntity(typeName, header.getGuid(), header.getAttributes());
-                        RangerAtlasEntityWithTags entityWithTags = new 
RangerAtlasEntityWithTags(entity, allTagsForEntity, typeRegistry);
+                        nextStartIndex += entityHeaders.size();
+                        isMoreData = true;
 
-                        ret.add(entityWithTags);
+                        for (AtlasEntityHeader header : entityHeaders) {
+                            if 
(!header.getStatus().equals(AtlasEntity.Status.ACTIVE)) {
+                                if (LOG.isDebugEnabled()) {
+                                    LOG.debug("Skipping entity because it is 
not ACTIVE, header:[" + header + "]");
+                                }
+                                continue;
+                            }
+
+                            String typeName = header.getTypeName();
+                            if 
(!AtlasResourceMapperUtil.isEntityTypeHandled(typeName)) {
+                                if (LOG.isDebugEnabled()) {
+                                    LOG.debug("Not fetching Atlas entities of 
type:[" + typeName + "]");
+                                }
+                                continue;
+                            }
+
+                            
List<EntityNotificationWrapper.RangerAtlasClassification> allTagsForEntity = 
new ArrayList<>();
+
+                            for (AtlasClassification classification : 
header.getClassifications()) {
+                                
List<EntityNotificationWrapper.RangerAtlasClassification> tags = 
resolveTag(typeRegistry, classification);
+                                if (tags != null) {
+                                    allTagsForEntity.addAll(tags);
+                                }
+                            }
+
+                            if (CollectionUtils.isNotEmpty(allTagsForEntity)) {
+                                RangerAtlasEntity entity = new 
RangerAtlasEntity(typeName, header.getGuid(), header.getAttributes());
+                                RangerAtlasEntityWithTags entityWithTags = new 
RangerAtlasEntityWithTags(entity, allTagsForEntity, typeRegistry);
+
+                                ret.add(entityWithTags);
+                            }
+                        }
                     }
                 }
-            }
+            } while (isMoreData);
+
         }
 
         if (LOG.isDebugEnabled()) {
diff --git 
a/tagsync/src/main/java/org/apache/ranger/tagsync/source/file/FileTagSource.java
 
b/tagsync/src/main/java/org/apache/ranger/tagsync/source/file/FileTagSource.java
index f0a3fd0..62a5f73 100644
--- 
a/tagsync/src/main/java/org/apache/ranger/tagsync/source/file/FileTagSource.java
+++ 
b/tagsync/src/main/java/org/apache/ranger/tagsync/source/file/FileTagSource.java
@@ -66,26 +66,33 @@ public class FileTagSource extends AbstractTagSource 
implements Runnable {
 
                TagSynchronizer.printConfigurationProperties(props);
 
-               TagSink tagSink = TagSynchronizer.initializeTagSink(props);
+               boolean ret = TagSynchronizer.initializeKerberosIdentity(props);
 
-               if (tagSink != null) {
+               if (ret) {
+                       TagSink tagSink = 
TagSynchronizer.initializeTagSink(props);
 
-                       if (fileTagSource.initialize(props)) {
-                               try {
-                                       tagSink.start();
-                                       fileTagSource.setTagSink(tagSink);
-                                       fileTagSource.synchUp();
-                               } catch (Exception exception) {
-                                       LOG.error("ServiceTags upload failed : 
", exception);
+                       if (tagSink != null) {
+
+                               if (fileTagSource.initialize(props)) {
+                                       try {
+                                               tagSink.start();
+                                               
fileTagSource.setTagSink(tagSink);
+                                               fileTagSource.synchUp();
+                                       } catch (Exception exception) {
+                                               LOG.error("ServiceTags upload 
failed : ", exception);
+                                               System.exit(1);
+                                       }
+                               } else {
+                                       LOG.error("FileTagSource initialized 
failed, exiting.");
                                        System.exit(1);
                                }
+
                        } else {
-                               LOG.error("FileTagSource initialized failed, 
exiting.");
+                               LOG.error("TagSink initialialization failed, 
exiting.");
                                System.exit(1);
                        }
-
                } else {
-                       LOG.error("TagSink initialialization failed, exiting.");
+                       LOG.error("Error initializing kerberos identity");
                        System.exit(1);
                }
 

Reply via email to