This is an automated email from the ASF dual-hosted git repository. vel pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/ranger.git
The following commit(s) were added to refs/heads/master by this push: new a97f094 RANGER-2313: tagsync fails to authenticate with ranger in kerberized cluster when using ranger-tagsync-update.sh script a97f094 is described below commit a97f0947e192ea67cba64a9ae4be18f6375b2dc3 Author: Abhay Kulkarni <> AuthorDate: Thu Dec 27 10:18:22 2018 -0800 RANGER-2313: tagsync fails to authenticate with ranger in kerberized cluster when using ranger-tagsync-update.sh script Signed-off-by: Velmurugan Periasamy <v...@apache.org> --- .../ranger/tagsync/process/TagSynchronizer.java | 2 +- .../source/atlas/EntityNotificationWrapper.java | 4 +- .../source/atlasrest/AtlasRESTTagSource.java | 177 +++++++++++++-------- .../ranger/tagsync/source/file/FileTagSource.java | 31 ++-- 4 files changed, 129 insertions(+), 85 deletions(-) diff --git a/tagsync/src/main/java/org/apache/ranger/tagsync/process/TagSynchronizer.java b/tagsync/src/main/java/org/apache/ranger/tagsync/process/TagSynchronizer.java index 49ff76f..8806c74 100644 --- a/tagsync/src/main/java/org/apache/ranger/tagsync/process/TagSynchronizer.java +++ b/tagsync/src/main/java/org/apache/ranger/tagsync/process/TagSynchronizer.java @@ -353,7 +353,7 @@ public class TagSynchronizer { return tagSource; } - private static boolean initializeKerberosIdentity(Properties props) { + public static boolean initializeKerberosIdentity(Properties props) { if (LOG.isDebugEnabled()) { LOG.debug("==> TagSynchronizer.initializeKerberosIdentity()"); } diff --git a/tagsync/src/main/java/org/apache/ranger/tagsync/source/atlas/EntityNotificationWrapper.java b/tagsync/src/main/java/org/apache/ranger/tagsync/source/atlas/EntityNotificationWrapper.java index adaa2f9..9781aa6 100644 --- a/tagsync/src/main/java/org/apache/ranger/tagsync/source/atlas/EntityNotificationWrapper.java +++ b/tagsync/src/main/java/org/apache/ranger/tagsync/source/atlas/EntityNotificationWrapper.java @@ -93,7 +93,7 @@ public class EntityNotificationWrapper { isEntityTypeHandled = isEntityActive && AtlasResourceMapperUtil.isEntityTypeHandled(entityTypeName); isEntityDeleteOp = EntityNotificationV2.OperationType.ENTITY_DELETE == v2Notification.getOperationType(); isEntityCreateOp = EntityNotificationV2.OperationType.ENTITY_CREATE == v2Notification.getOperationType(); - isEmptyClassifications = CollectionUtils.isNotEmpty(atlasEntity.getClassifications()); + isEmptyClassifications = CollectionUtils.isEmpty(atlasEntity.getClassifications()); List<AtlasClassification> allClassifications = atlasEntity.getClassifications(); @@ -166,7 +166,7 @@ public class EntityNotificationWrapper { isEntityTypeHandled = isEntityActive && AtlasResourceMapperUtil.isEntityTypeHandled(entityTypeName); isEntityDeleteOp = EntityNotificationV1.OperationType.ENTITY_DELETE == v1Notification.getOperationType(); isEntityCreateOp = EntityNotificationV1.OperationType.ENTITY_CREATE == v1Notification.getOperationType(); - isEmptyClassifications = CollectionUtils.isNotEmpty(v1Notification.getAllTraits()); + isEmptyClassifications = CollectionUtils.isEmpty(v1Notification.getAllTraits()); List<Struct> allTraits = ((EntityNotificationV1) notification).getAllTraits(); diff --git a/tagsync/src/main/java/org/apache/ranger/tagsync/source/atlasrest/AtlasRESTTagSource.java b/tagsync/src/main/java/org/apache/ranger/tagsync/source/atlasrest/AtlasRESTTagSource.java index 2b4a668..8b12aff 100644 --- a/tagsync/src/main/java/org/apache/ranger/tagsync/source/atlasrest/AtlasRESTTagSource.java +++ b/tagsync/src/main/java/org/apache/ranger/tagsync/source/atlasrest/AtlasRESTTagSource.java @@ -69,7 +69,8 @@ import java.util.TimeZone; public class AtlasRESTTagSource extends AbstractTagSource implements Runnable { private static final Log LOG = LogFactory.getLog(AtlasRESTTagSource.class); - private static final ThreadLocal<DateFormat> DATE_FORMATTER = new ThreadLocal<DateFormat>() { + private static final int REQUESTED_ENTITIES_LIMIT_MAX = 10000; + private static final ThreadLocal<DateFormat> DATE_FORMATTER = new ThreadLocal<DateFormat>() { @Override protected DateFormat initialValue() { SimpleDateFormat dateFormat = new SimpleDateFormat(AtlasBaseTypeDef.SERIALIZED_DATE_FORMAT_STR); @@ -97,26 +98,34 @@ public class AtlasRESTTagSource extends AbstractTagSource implements Runnable { TagSynchronizer.printConfigurationProperties(props); - TagSink tagSink = TagSynchronizer.initializeTagSink(props); + boolean ret = TagSynchronizer.initializeKerberosIdentity(props); - if (tagSink != null) { + if (ret) { - if (atlasRESTTagSource.initialize(props)) { - try { - tagSink.start(); - atlasRESTTagSource.setTagSink(tagSink); - atlasRESTTagSource.synchUp(); - } catch (Exception exception) { - LOG.error("ServiceTags upload failed : ", exception); + TagSink tagSink = TagSynchronizer.initializeTagSink(props); + + if (tagSink != null) { + + if (atlasRESTTagSource.initialize(props)) { + try { + tagSink.start(); + atlasRESTTagSource.setTagSink(tagSink); + atlasRESTTagSource.synchUp(); + } catch (Exception exception) { + LOG.error("ServiceTags upload failed : ", exception); + System.exit(1); + } + } else { + LOG.error("AtlasRESTTagSource initialization failed, exiting."); System.exit(1); } + } else { - LOG.error("AtlasRESTTagSource initialized failed, exiting."); + LOG.error("TagSink initialization failed, exiting."); System.exit(1); } - } else { - LOG.error("TagSink initialialization failed, exiting."); + LOG.error("Error initializing kerberos identity"); System.exit(1); } @@ -236,76 +245,104 @@ public class AtlasRESTTagSource extends AbstractTagSource implements Runnable { if (LOG.isDebugEnabled()) { LOG.debug("==> getAtlasActiveEntities()"); } - List<RangerAtlasEntityWithTags> ret = null; - - SearchParameters searchParams = new SearchParameters(); - AtlasTypeRegistry typeRegistry = new AtlasTypeRegistry(); - AtlasTypeRegistry.AtlasTransientTypeRegistry tty = null; - AtlasSearchResult searchResult = null; + List<RangerAtlasEntityWithTags> ret = new ArrayList<>(); - searchParams.setClassification("*"); - searchParams.setIncludeClassificationAttributes(true); - searchParams.setOffset(0); - searchParams.setLimit(Integer.MAX_VALUE); - - boolean commitUpdates = false; + AtlasClientV2 atlasClient = null; try { - AtlasClientV2 atlasClient = getAtlasClient(); - searchResult = atlasClient.facetedSearch(searchParams); - AtlasTypesDef typesDef = atlasClient.getAllTypeDefs(new SearchFilter()); - tty = typeRegistry.lockTypeRegistryForUpdate(); - tty.addTypes(typesDef); - commitUpdates = true; - } catch (AtlasServiceException | AtlasBaseException | IOException excp) { - LOG.error("failed to download tags from Atlas", excp); - } catch (Exception unexpectedException) { - LOG.error("Failed to download tags from Atlas due to unexpected exception", unexpectedException); - } finally { - if (tty != null) { - typeRegistry.releaseTypeRegistryForUpdate(tty, commitUpdates); - } + atlasClient = getAtlasClient(); + } catch (IOException exception) { + LOG.error("Failed to get Atlas client.", exception); } - if (commitUpdates && searchResult != null) { - if (LOG.isDebugEnabled()) { - LOG.debug(AtlasType.toJson(searchResult)); - } - ret = new ArrayList<>(); - List<AtlasEntityHeader> entityHeaders = searchResult.getEntities(); - if (CollectionUtils.isNotEmpty(entityHeaders)) { - for (AtlasEntityHeader header : entityHeaders) { - if (!header.getStatus().equals(AtlasEntity.Status.ACTIVE)) { - if (LOG.isDebugEnabled()) { - LOG.debug("Skipping entity because it is not ACTIVE, header:[" + header + "]"); - } - continue; + if (atlasClient != null) { + + SearchParameters searchParams = new SearchParameters(); + + searchParams.setExcludeDeletedEntities(true); + searchParams.setClassification("*"); + //searchParams.setIncludeSubClassifications(true); + //searchParams.setIncludeSubTypes(true); + searchParams.setIncludeClassificationAttributes(true); + searchParams.setLimit(REQUESTED_ENTITIES_LIMIT_MAX); + + boolean isMoreData; + int nextStartIndex = 0; + + do { + AtlasTypeRegistry typeRegistry = new AtlasTypeRegistry(); + AtlasTypeRegistry.AtlasTransientTypeRegistry tty = null; + AtlasSearchResult searchResult = null; + boolean commitUpdates = false; + + searchParams.setOffset(nextStartIndex); + isMoreData = false; + + try { + searchResult = atlasClient.facetedSearch(searchParams); + AtlasTypesDef typesDef = atlasClient.getAllTypeDefs(new SearchFilter()); + tty = typeRegistry.lockTypeRegistryForUpdate(); + tty.addTypes(typesDef); + commitUpdates = true; + } catch (AtlasServiceException | AtlasBaseException excp) { + LOG.error("failed to download tags from Atlas", excp); + ret = null; + } catch (Exception unexpectedException) { + LOG.error("Failed to download tags from Atlas due to unexpected exception", unexpectedException); + ret = null; + } finally { + if (tty != null) { + typeRegistry.releaseTypeRegistryForUpdate(tty, commitUpdates); } + } - String typeName = header.getTypeName(); - if (!AtlasResourceMapperUtil.isEntityTypeHandled(typeName)) { - if (LOG.isDebugEnabled()) { - LOG.debug("Not fetching Atlas entities of type:[" + typeName + "]"); - } - continue; + if (commitUpdates && searchResult != null) { + if (LOG.isDebugEnabled()) { + LOG.debug(AtlasType.toJson(searchResult)); } - List<EntityNotificationWrapper.RangerAtlasClassification> allTagsForEntity = new ArrayList<>(); + List<AtlasEntityHeader> entityHeaders = searchResult.getEntities(); - for (AtlasClassification classification : header.getClassifications()) { - List<EntityNotificationWrapper.RangerAtlasClassification> tags = resolveTag(typeRegistry, classification); - if (tags != null) { - allTagsForEntity.addAll(tags); - } - } + if (CollectionUtils.isNotEmpty(entityHeaders)) { - if (CollectionUtils.isNotEmpty(allTagsForEntity)) { - RangerAtlasEntity entity = new RangerAtlasEntity(typeName, header.getGuid(), header.getAttributes()); - RangerAtlasEntityWithTags entityWithTags = new RangerAtlasEntityWithTags(entity, allTagsForEntity, typeRegistry); + nextStartIndex += entityHeaders.size(); + isMoreData = true; - ret.add(entityWithTags); + for (AtlasEntityHeader header : entityHeaders) { + if (!header.getStatus().equals(AtlasEntity.Status.ACTIVE)) { + if (LOG.isDebugEnabled()) { + LOG.debug("Skipping entity because it is not ACTIVE, header:[" + header + "]"); + } + continue; + } + + String typeName = header.getTypeName(); + if (!AtlasResourceMapperUtil.isEntityTypeHandled(typeName)) { + if (LOG.isDebugEnabled()) { + LOG.debug("Not fetching Atlas entities of type:[" + typeName + "]"); + } + continue; + } + + List<EntityNotificationWrapper.RangerAtlasClassification> allTagsForEntity = new ArrayList<>(); + + for (AtlasClassification classification : header.getClassifications()) { + List<EntityNotificationWrapper.RangerAtlasClassification> tags = resolveTag(typeRegistry, classification); + if (tags != null) { + allTagsForEntity.addAll(tags); + } + } + + if (CollectionUtils.isNotEmpty(allTagsForEntity)) { + RangerAtlasEntity entity = new RangerAtlasEntity(typeName, header.getGuid(), header.getAttributes()); + RangerAtlasEntityWithTags entityWithTags = new RangerAtlasEntityWithTags(entity, allTagsForEntity, typeRegistry); + + ret.add(entityWithTags); + } + } } } - } + } while (isMoreData); + } if (LOG.isDebugEnabled()) { diff --git a/tagsync/src/main/java/org/apache/ranger/tagsync/source/file/FileTagSource.java b/tagsync/src/main/java/org/apache/ranger/tagsync/source/file/FileTagSource.java index f0a3fd0..62a5f73 100644 --- a/tagsync/src/main/java/org/apache/ranger/tagsync/source/file/FileTagSource.java +++ b/tagsync/src/main/java/org/apache/ranger/tagsync/source/file/FileTagSource.java @@ -66,26 +66,33 @@ public class FileTagSource extends AbstractTagSource implements Runnable { TagSynchronizer.printConfigurationProperties(props); - TagSink tagSink = TagSynchronizer.initializeTagSink(props); + boolean ret = TagSynchronizer.initializeKerberosIdentity(props); - if (tagSink != null) { + if (ret) { + TagSink tagSink = TagSynchronizer.initializeTagSink(props); - if (fileTagSource.initialize(props)) { - try { - tagSink.start(); - fileTagSource.setTagSink(tagSink); - fileTagSource.synchUp(); - } catch (Exception exception) { - LOG.error("ServiceTags upload failed : ", exception); + if (tagSink != null) { + + if (fileTagSource.initialize(props)) { + try { + tagSink.start(); + fileTagSource.setTagSink(tagSink); + fileTagSource.synchUp(); + } catch (Exception exception) { + LOG.error("ServiceTags upload failed : ", exception); + System.exit(1); + } + } else { + LOG.error("FileTagSource initialized failed, exiting."); System.exit(1); } + } else { - LOG.error("FileTagSource initialized failed, exiting."); + LOG.error("TagSink initialialization failed, exiting."); System.exit(1); } - } else { - LOG.error("TagSink initialialization failed, exiting."); + LOG.error("Error initializing kerberos identity"); System.exit(1); }