RANGER-1897: TagSync should replace use of V1 Atlas APIs with V2 APIs for efficient tag-download from Atlas
Project: http://git-wip-us.apache.org/repos/asf/ranger/repo Commit: http://git-wip-us.apache.org/repos/asf/ranger/commit/e8afb9fa Tree: http://git-wip-us.apache.org/repos/asf/ranger/tree/e8afb9fa Diff: http://git-wip-us.apache.org/repos/asf/ranger/diff/e8afb9fa Branch: refs/heads/master Commit: e8afb9faad81e7042877aa528635848a0043cb0c Parents: c7260c3 Author: Abhay Kulkarni <akulka...@hortonworks.com> Authored: Thu Dec 7 17:29:41 2017 -0800 Committer: Abhay Kulkarni <akulka...@hortonworks.com> Committed: Thu Dec 7 17:29:41 2017 -0800 ---------------------------------------------------------------------- .../atlas/authorizer/RangerAtlasResource.java | 4 + pom.xml | 4 +- src/main/assembly/tagsync.xml | 30 +- tagsync/pom.xml | 77 ++--- .../ranger/tagsync/process/TagSynchronizer.java | 2 +- .../tagsync/sink/tagadmin/TagAdminRESTSink.java | 11 +- .../source/atlas/AtlasEntityWithTraits.java | 98 ------ .../source/atlas/AtlasHbaseResourceMapper.java | 12 +- .../source/atlas/AtlasHdfsResourceMapper.java | 13 +- .../source/atlas/AtlasHiveResourceMapper.java | 11 +- .../source/atlas/AtlasKafkaResourceMapper.java | 17 +- .../source/atlas/AtlasNotificationMapper.java | 142 ++++---- .../source/atlas/AtlasResourceMapper.java | 26 +- .../source/atlas/AtlasResourceMapperUtil.java | 66 ++-- .../source/atlas/AtlasStormResourceMapper.java | 10 +- .../tagsync/source/atlas/AtlasTagSource.java | 97 +++--- .../source/atlasrest/AtlasRESTTagSource.java | 238 ++++++++++++-- .../tagsync/source/atlasrest/AtlasRESTUtil.java | 325 ------------------- .../source/atlasrest/RangerAtlasEntity.java | 60 ++++ .../atlasrest/RangerAtlasEntityWithTags.java | 118 +++++++ .../process/TestHbaseResourceMapper.java | 56 ++-- .../tagsync/process/TestHdfsResourceMapper.java | 24 +- .../tagsync/process/TestHiveResourceMapper.java | 28 +- .../process/TestKafkaResourceMapper.java | 16 +- 24 files changed, 679 insertions(+), 806 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/ranger/blob/e8afb9fa/plugin-atlas/src/main/java/org/apache/ranger/authorization/atlas/authorizer/RangerAtlasResource.java ---------------------------------------------------------------------- diff --git a/plugin-atlas/src/main/java/org/apache/ranger/authorization/atlas/authorizer/RangerAtlasResource.java b/plugin-atlas/src/main/java/org/apache/ranger/authorization/atlas/authorizer/RangerAtlasResource.java index f056f3e..4367c5e 100644 --- a/plugin-atlas/src/main/java/org/apache/ranger/authorization/atlas/authorizer/RangerAtlasResource.java +++ b/plugin-atlas/src/main/java/org/apache/ranger/authorization/atlas/authorizer/RangerAtlasResource.java @@ -28,8 +28,10 @@ public class RangerAtlasResource extends RangerAccessResourceImpl { public static final String KEY_TYPE = "type"; public static final String KEY_ENTITY = "entity"; public static final String KEY_OPERATION = "operation"; + /* public static final String KEY_TAXONOMY = "taxonomy"; public static final String KEY_TERM = "term"; + */ private static final Logger LOG = LoggerFactory.getLogger(RangerAtlasResource.class); @@ -44,12 +46,14 @@ public class RangerAtlasResource extends RangerAccessResourceImpl { case OPERATION: setValue(KEY_OPERATION, atlasResource); break; + /* case TAXONOMY: setValue(KEY_TAXONOMY, atlasResource); break; case TERM: setValue(KEY_TERM, atlasResource); break; + */ default: LOG.warn("Invalid Resource : " + atlasResource); break; http://git-wip-us.apache.org/repos/asf/ranger/blob/e8afb9fa/pom.xml ---------------------------------------------------------------------- diff --git a/pom.xml b/pom.xml index a73a52f..3086a1e 100644 --- a/pom.xml +++ b/pom.xml @@ -125,9 +125,10 @@ <apacheds.version>2.0.0-M22</apacheds.version> <asm.all.version>3.2</asm.all.version> <aspectj.version>1.8.2</aspectj.version> - <atlas.version>0.8.1</atlas.version> + <atlas.version>1.0.0-SNAPSHOT</atlas.version> <atlas.guava.version>14.0</atlas.guava.version> <atlas.gson.version>2.5</atlas.gson.version> + <atlas.jackson.version>2.9.2</atlas.jackson.version> <atlas.jettison.version>1.3.7</atlas.jettison.version> <atlas.commons.logging.version>1.1.3</atlas.commons.logging.version> <bouncycastle.version>1.55</bouncycastle.version> @@ -210,6 +211,7 @@ <springframework.version>3.2.10.RELEASE</springframework.version> <sqoop.version>1.99.7</sqoop.version> <storm.version>1.0.2</storm.version> + <sun-jersey-bundle.version>1.19</sun-jersey-bundle.version> <tomcat.embed.version>7.0.82</tomcat.embed.version> <velocity.version>1.7</velocity.version> <zookeeper.version>3.4.6</zookeeper.version> http://git-wip-us.apache.org/repos/asf/ranger/blob/e8afb9fa/src/main/assembly/tagsync.xml ---------------------------------------------------------------------- diff --git a/src/main/assembly/tagsync.xml b/src/main/assembly/tagsync.xml index 0b17151..c929395 100644 --- a/src/main/assembly/tagsync.xml +++ b/src/main/assembly/tagsync.xml @@ -37,14 +37,13 @@ <include>com.101tec:zkclient</include> <include>com.google.code.gson:gson:jar:${gson.version}</include> <include>com.google.guava:guava:jar:${google.guava.version}</include> - <include>com.google.inject:guice:jar:${guice.version}</include> - <include>com.google.inject.extensions:guice-multibindings:jar:${guice.version}</include> <include>com.sun.jersey:jersey-bundle:jar:${jersey-bundle.version}</include> - <include>com.thoughtworks.paranamer:paranamer:jar:${paranamer.version}</include> - <include>com.yammer.metrics:metrics-core</include> + <include>com.sun.jersey.contribs:jersey-multipart:jar:${sun-jersey-bundle.version}</include> <include>org.apache.atlas:atlas-notification:jar:${atlas.version}</include> - <include>org.apache.atlas:atlas-typesystem:jar:${atlas.version}</include> - <include>org.apache.atlas:atlas-client:jar:${atlas.version}</include> + <include>org.apache.atlas:atlas-intg:jar:${atlas.version}</include> + <include>org.apache.atlas:atlas-client-v1:jar:${atlas.version}</include> + <include>org.apache.atlas:atlas-client-v2:jar:${atlas.version}</include> + <include>org.apache.atlas:atlas-client-common:jar:${atlas.version}</include> <include>org.apache.atlas:atlas-common:jar:${atlas.version}</include> <include>org.apache.hadoop:hadoop-auth</include> <include>org.apache.hadoop:hadoop-common</include> @@ -55,20 +54,15 @@ <include>org.apache.ranger:ranger-plugins-common</include> <include>org.apache.ranger:ranger-util</include> <include>org.apache.zookeeper:zookeeper:jar:${zookeeper.version}</include> - <include>org.codehaus.jackson:jackson-core-asl</include> - <include>org.codehaus.jackson:jackson-jaxrs</include> - <include>org.codehaus.jackson:jackson-mapper-asl</include> - <include>org.codehaus.jackson:jackson-xc</include> + <include>com.fasterxml.jackson.core:jackson-annotations:jar:${atlas.jackson.version}</include> + <include>com.fasterxml.jackson.core:jackson-core:jar:${atlas.jackson.version}</include> + <include>com.fasterxml.jackson.core:jackson-databind:jar:${atlas.jackson.version}</include> + <include>com.fasterxml.jackson.jaxrs:jackson-jaxrs-base:jar:${atlas.jackson.version}</include> + <include>com.fasterxml.jackson.jaxrs:jackson-jaxrs-json-provider:jar:${atlas.jackson.version}</include> + <include>org.codehaus.jackson:jackson-core-asl:jar:${codehaus.jackson.version}</include> + <include>org.codehaus.jackson:jackson-jaxrs:jar:${codehaus.jackson.version}</include> <include>org.codehaus.jettison:jettison:jar:${jettison.version}</include> - <include>org.json4s:json4s-native_${scala.binary.version}:jar:${json4s.version}</include> - <include>org.json4s:json4s-core_${scala.binary.version}:jar:${json4s.version}</include> - <include>org.json4s:json4s-ast_${scala.binary.version}:jar:${json4s.version}</include> <include>org.scala-lang:scala-library:jar:${scala.version}</include> - <include>org.scala-lang:scalap:jar:${scala.version}</include> - <include>org.scala-lang:scala-compiler:jar:${scala.version}</include> - <include>org.scala-lang:scala-reflect:jar:${scala.version}</include> - <include>org.scala-lang.modules:scala-xml_${scala.binary.version}:jar:${scala.xml.version}</include> - <include>org.scala-lang.modules:scala-parser-combinators_${scala.binary.version}:jar:${scala.xml.version}</include> <include>org.slf4j:slf4j-api</include> <include>aopalliance:aopalliance:jar:${aopalliance.version}</include> <include>commons-cli:commons-cli:jar:${commons.cli.version}</include> http://git-wip-us.apache.org/repos/asf/ranger/blob/e8afb9fa/tagsync/pom.xml ---------------------------------------------------------------------- diff --git a/tagsync/pom.xml b/tagsync/pom.xml index 74ff155..7e53641 100644 --- a/tagsync/pom.xml +++ b/tagsync/pom.xml @@ -55,6 +55,11 @@ <version>${jersey-bundle.version}</version> </dependency> <dependency> + <groupId>com.sun.jersey.contribs</groupId> + <artifactId>jersey-multipart</artifactId> + <version>${sun-jersey-bundle.version}</version> + </dependency> + <dependency> <groupId>commons-cli</groupId> <artifactId>commons-cli</artifactId> <version>${commons.cli.version}</version> @@ -95,14 +100,14 @@ <version>${project.version}</version> </dependency> <dependency> - <groupId>com.google.inject</groupId> - <artifactId>guice</artifactId> - <version>${guice.version}</version> + <groupId>org.codehaus.jackson</groupId> + <artifactId>jackson-core-asl</artifactId> + <version>${codehaus.jackson.version}</version> </dependency> <dependency> - <groupId>com.google.inject.extensions</groupId> - <artifactId>guice-multibindings</artifactId> - <version>${guice.version}</version> + <groupId>org.codehaus.jackson</groupId> + <artifactId>jackson-jaxrs</artifactId> + <version>${codehaus.jackson.version}</version> </dependency> <dependency> <groupId>org.codehaus.jettison</groupId> @@ -116,12 +121,17 @@ </dependency> <dependency> <groupId>org.apache.atlas</groupId> - <artifactId>atlas-typesystem</artifactId> + <artifactId>atlas-intg</artifactId> + <version>${atlas.version}</version> + </dependency> + <dependency> + <groupId>org.apache.atlas</groupId> + <artifactId>atlas-client-v1</artifactId> <version>${atlas.version}</version> </dependency> <dependency> <groupId>org.apache.atlas</groupId> - <artifactId>atlas-client</artifactId> + <artifactId>atlas-client-v2</artifactId> <version>${atlas.version}</version> </dependency> <dependency> @@ -160,53 +170,34 @@ </dependency> <dependency> <groupId>org.scala-lang</groupId> - <artifactId>scala-compiler</artifactId> - <version>${scala.version}</version> - </dependency> - <dependency> - <groupId>org.scala-lang</groupId> <artifactId>scala-library</artifactId> <version>${scala.version}</version> </dependency> <dependency> - <groupId>org.scala-lang</groupId> - <artifactId>scalap</artifactId> - <version>${scala.version}</version> - </dependency> - <dependency> - <groupId>org.scala-lang</groupId> - <artifactId>scala-reflect</artifactId> - <version>${scala.version}</version> + <groupId>com.fasterxml.jackson.core</groupId> + <artifactId>jackson-core</artifactId> + <version>${atlas.jackson.version}</version> </dependency> <dependency> - <groupId>org.scala-lang.modules</groupId> - <artifactId>scala-xml_${scala.binary.version}</artifactId> - <version>${scala.xml.version}</version> + <groupId>com.fasterxml.jackson.core</groupId> + <artifactId>jackson-databind</artifactId> + <version>${atlas.jackson.version}</version> </dependency> <dependency> - <groupId>org.scala-lang.modules</groupId> - <artifactId>scala-parser-combinators_${scala.binary.version}</artifactId> - <version>${scala.xml.version}</version> + <groupId>com.fasterxml.jackson.jaxrs</groupId> + <artifactId>jackson-jaxrs-base</artifactId> + <version>${atlas.jackson.version}</version> </dependency> <dependency> - <groupId>org.json4s</groupId> - <artifactId>json4s-core_${scala.binary.version}</artifactId> - <version>${json4s.version}</version> + <groupId>com.fasterxml.jackson.jaxrs</groupId> + <artifactId>jackson-jaxrs-json-provider</artifactId> + <version>${atlas.jackson.version}</version> </dependency> <dependency> - <groupId>org.json4s</groupId> - <artifactId>json4s-native_${scala.binary.version}</artifactId> - <version>${json4s.version}</version> - </dependency> - <dependency> - <groupId>org.json4s</groupId> - <artifactId>json4s-ast_${scala.binary.version}</artifactId> - <version>${json4s.version}</version> - </dependency> - <dependency> - <groupId>com.thoughtworks.paranamer</groupId> - <artifactId>paranamer</artifactId> - <version>${paranamer.version}</version> + <groupId>com.fasterxml.jackson.core</groupId> + <artifactId>jackson-annotations</artifactId> + <version>${atlas.jackson.version}</version> </dependency> + </dependencies> </project> http://git-wip-us.apache.org/repos/asf/ranger/blob/e8afb9fa/tagsync/src/main/java/org/apache/ranger/tagsync/process/TagSynchronizer.java ---------------------------------------------------------------------- diff --git a/tagsync/src/main/java/org/apache/ranger/tagsync/process/TagSynchronizer.java b/tagsync/src/main/java/org/apache/ranger/tagsync/process/TagSynchronizer.java index b07cd34..45997e4 100644 --- a/tagsync/src/main/java/org/apache/ranger/tagsync/process/TagSynchronizer.java +++ b/tagsync/src/main/java/org/apache/ranger/tagsync/process/TagSynchronizer.java @@ -107,7 +107,7 @@ public class TagSynchronizer { if (ret) { LOG.info("Initializing TAG source and sink"); - + ret = false; tagSink = initializeTagSink(properties); if (tagSink != null) { http://git-wip-us.apache.org/repos/asf/ranger/blob/e8afb9fa/tagsync/src/main/java/org/apache/ranger/tagsync/sink/tagadmin/TagAdminRESTSink.java ---------------------------------------------------------------------- diff --git a/tagsync/src/main/java/org/apache/ranger/tagsync/sink/tagadmin/TagAdminRESTSink.java b/tagsync/src/main/java/org/apache/ranger/tagsync/sink/tagadmin/TagAdminRESTSink.java index c34b6ea..a1dc8f5 100644 --- a/tagsync/src/main/java/org/apache/ranger/tagsync/sink/tagadmin/TagAdminRESTSink.java +++ b/tagsync/src/main/java/org/apache/ranger/tagsync/sink/tagadmin/TagAdminRESTSink.java @@ -65,13 +65,13 @@ public class TagAdminRESTSink implements TagSink, Runnable { @Override public boolean initialize(Properties properties) { - if(LOG.isDebugEnabled()) { + if (LOG.isDebugEnabled()) { LOG.debug("==> TagAdminRESTSink.initialize()"); } boolean ret = false; - String restUrl = TagSyncConfig.getTagAdminRESTUrl(properties); + String restUrl = TagSyncConfig.getTagAdminRESTUrl(properties); String sslConfigFile = TagSyncConfig.getTagAdminRESTSslConfigFile(properties); String userName = TagSyncConfig.getTagAdminUserName(properties); String password = TagSyncConfig.getTagAdminPassword(properties); @@ -89,16 +89,19 @@ public class TagAdminRESTSink implements TagSink, Runnable { if (StringUtils.isNotBlank(restUrl)) { tagRESTClient = new RangerRESTClient(restUrl, sslConfigFile); - if(!isKerberized) { + if (!isKerberized) { tagRESTClient.setBasicAuthInfo(userName, password); } + // Build and cache REST client. This will catch any errors in building REST client up-front + tagRESTClient.getClient(); + uploadWorkItems = new LinkedBlockingQueue<UploadWorkItem>(); ret = true; } else { LOG.error("No value specified for property 'ranger.tagsync.tagadmin.rest.url'!"); } - if(LOG.isDebugEnabled()) { + if (LOG.isDebugEnabled()) { LOG.debug("<== TagAdminRESTSink.initialize(), result=" + ret); } http://git-wip-us.apache.org/repos/asf/ranger/blob/e8afb9fa/tagsync/src/main/java/org/apache/ranger/tagsync/source/atlas/AtlasEntityWithTraits.java ---------------------------------------------------------------------- diff --git a/tagsync/src/main/java/org/apache/ranger/tagsync/source/atlas/AtlasEntityWithTraits.java b/tagsync/src/main/java/org/apache/ranger/tagsync/source/atlas/AtlasEntityWithTraits.java deleted file mode 100644 index 77dee01..0000000 --- a/tagsync/src/main/java/org/apache/ranger/tagsync/source/atlas/AtlasEntityWithTraits.java +++ /dev/null @@ -1,98 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ - -package org.apache.ranger.tagsync.source.atlas; - -import org.apache.atlas.AtlasException; -import org.apache.atlas.typesystem.IReferenceableInstance; -import org.apache.atlas.typesystem.IStruct; - -import java.util.LinkedList; -import java.util.List; -import java.util.Map; - -public class AtlasEntityWithTraits { - - private final IReferenceableInstance entity; - private final List<IStruct> traits; - - public AtlasEntityWithTraits(IReferenceableInstance entity, List<IStruct> traits) { - this.entity = entity; - this.traits = traits; - } - - public IReferenceableInstance getEntity() { - return entity; - } - - public List<IStruct> getAllTraits() { - return traits == null ? new LinkedList<IStruct>() : traits; - } - - @Override - public String toString( ) { - StringBuilder sb = new StringBuilder(); - - toString(sb); - - return sb.toString(); - } - - public void toString(StringBuilder sb) { - - sb.append("AtlasEntityWithTraits={ "); - - sb.append("Entity-Id: " + entity.getId()._getId()).append(", ") - .append("Entity-Type: " + entity.getTypeName()).append(", ") - .append("Entity-Version: " + entity.getId().getVersion()).append(", ") - .append("Entity-State: " + entity.getId().getStateAsString()).append(", "); - - sb.append("Entity-Values={ "); - try { - for (Map.Entry<String, Object> entry : entity.getValuesMap().entrySet()) { - sb.append("{").append(entry.getKey()).append(", ").append(entry.getValue()).append("}, "); - } - } catch (AtlasException exception) { - // Ignore - } - sb.append(" }"); - - sb.append(", Entity-Traits={ "); - for (IStruct trait : traits) { - try { - sb.append("{traitType=").append(trait.getTypeName()).append(", "); - Map<String, Object> traitValues = trait.getValuesMap(); - sb.append("{"); - for (Map.Entry<String, Object> valueEntry : traitValues.entrySet()) { - sb.append("{").append(valueEntry.getKey()).append(", ").append(valueEntry.getValue()).append("}"); - } - sb.append("}"); - - sb.append(" }"); - } catch (AtlasException exception) { - // Ignore - } - } - sb.append(" }"); - - sb.append(" }"); - - } - -} http://git-wip-us.apache.org/repos/asf/ranger/blob/e8afb9fa/tagsync/src/main/java/org/apache/ranger/tagsync/source/atlas/AtlasHbaseResourceMapper.java ---------------------------------------------------------------------- diff --git a/tagsync/src/main/java/org/apache/ranger/tagsync/source/atlas/AtlasHbaseResourceMapper.java b/tagsync/src/main/java/org/apache/ranger/tagsync/source/atlas/AtlasHbaseResourceMapper.java index 8b36a31..33e804a 100644 --- a/tagsync/src/main/java/org/apache/ranger/tagsync/source/atlas/AtlasHbaseResourceMapper.java +++ b/tagsync/src/main/java/org/apache/ranger/tagsync/source/atlas/AtlasHbaseResourceMapper.java @@ -22,10 +22,10 @@ package org.apache.ranger.tagsync.source.atlas; import java.util.Map; import java.util.HashMap; -import org.apache.atlas.typesystem.IReferenceableInstance; import org.apache.commons.lang.StringUtils; import org.apache.ranger.plugin.model.RangerPolicy.RangerPolicyResource; import org.apache.ranger.plugin.model.RangerServiceResource; +import org.apache.ranger.tagsync.source.atlasrest.RangerAtlasEntity; public class AtlasHbaseResourceMapper extends AtlasResourceMapper { public static final String ENTITY_TYPE_HBASE_TABLE = "hbase_table"; @@ -36,10 +36,6 @@ public class AtlasHbaseResourceMapper extends AtlasResourceMapper { public static final String RANGER_TYPE_HBASE_COLUMN_FAMILY = "column-family"; public static final String RANGER_TYPE_HBASE_COLUMN = "column"; - public static final String ENTITY_ATTRIBUTE_QUALIFIED_NAME = "qualifiedName"; - public static final String QUALIFIED_NAME_DELIMITER = "\\."; - public static final Character QUALIFIED_NAME_DELIMITER_CHAR = '.'; - public static final String[] SUPPORTED_ENTITY_TYPES = { ENTITY_TYPE_HBASE_TABLE, ENTITY_TYPE_HBASE_COLUMN_FAMILY, ENTITY_TYPE_HBASE_COLUMN }; public AtlasHbaseResourceMapper() { @@ -47,8 +43,8 @@ public class AtlasHbaseResourceMapper extends AtlasResourceMapper { } @Override - public RangerServiceResource buildResource(final IReferenceableInstance entity) throws Exception { - String qualifiedName = getEntityAttribute(entity, ENTITY_ATTRIBUTE_QUALIFIED_NAME, String.class); + public RangerServiceResource buildResource(final RangerAtlasEntity entity) throws Exception { + String qualifiedName = (String)entity.getAttributes().get(AtlasResourceMapper.ENTITY_ATTRIBUTE_QUALIFIED_NAME); if (StringUtils.isEmpty(qualifiedName)) { throw new Exception("attribute '" + ENTITY_ATTRIBUTE_QUALIFIED_NAME + "' not found in entity"); } @@ -64,7 +60,7 @@ public class AtlasHbaseResourceMapper extends AtlasResourceMapper { } String entityType = entity.getTypeName(); - String entityGuid = entity.getId() != null ? entity.getId()._getId() : null; + String entityGuid = entity.getGuid(); String serviceName = getRangerServiceName(clusterName); Map<String, RangerPolicyResource> elements = new HashMap<String, RangerPolicyResource>(); http://git-wip-us.apache.org/repos/asf/ranger/blob/e8afb9fa/tagsync/src/main/java/org/apache/ranger/tagsync/source/atlas/AtlasHdfsResourceMapper.java ---------------------------------------------------------------------- diff --git a/tagsync/src/main/java/org/apache/ranger/tagsync/source/atlas/AtlasHdfsResourceMapper.java b/tagsync/src/main/java/org/apache/ranger/tagsync/source/atlas/AtlasHdfsResourceMapper.java index 06bff90..378542c 100644 --- a/tagsync/src/main/java/org/apache/ranger/tagsync/source/atlas/AtlasHdfsResourceMapper.java +++ b/tagsync/src/main/java/org/apache/ranger/tagsync/source/atlas/AtlasHdfsResourceMapper.java @@ -22,12 +22,12 @@ package org.apache.ranger.tagsync.source.atlas; import java.util.HashMap; import java.util.Map; -import org.apache.atlas.typesystem.IReferenceableInstance; import org.apache.commons.lang.StringUtils; import org.apache.hadoop.fs.Path; import org.apache.ranger.plugin.model.RangerPolicy; import org.apache.ranger.plugin.model.RangerPolicy.RangerPolicyResource; import org.apache.ranger.plugin.model.RangerServiceResource; +import org.apache.ranger.tagsync.source.atlasrest.RangerAtlasEntity; public class AtlasHdfsResourceMapper extends AtlasResourceMapper { public static final String ENTITY_TYPE_HDFS_PATH = "hdfs_path"; @@ -35,7 +35,6 @@ public class AtlasHdfsResourceMapper extends AtlasResourceMapper { public static final String ENTITY_ATTRIBUTE_PATH = "path"; public static final String ENTITY_ATTRIBUTE_CLUSTER_NAME = "clusterName"; - public static final String ENTITY_ATTRIBUTE_QUALIFIED_NAME = "qualifiedName"; public static final String[] SUPPORTED_ENTITY_TYPES = { ENTITY_TYPE_HDFS_PATH }; @@ -56,10 +55,10 @@ public class AtlasHdfsResourceMapper extends AtlasResourceMapper { } @Override - public RangerServiceResource buildResource(final IReferenceableInstance entity) throws Exception { - String path = getEntityAttribute(entity, ENTITY_ATTRIBUTE_PATH, String.class); - String clusterName = getEntityAttribute(entity, ENTITY_ATTRIBUTE_CLUSTER_NAME, String.class); - String qualifiedName = getEntityAttribute(entity, ENTITY_ATTRIBUTE_QUALIFIED_NAME, String.class); + public RangerServiceResource buildResource(final RangerAtlasEntity entity) throws Exception { + String path = (String)entity.getAttributes().get(ENTITY_ATTRIBUTE_PATH); + String clusterName = (String)entity.getAttributes().get(ENTITY_ATTRIBUTE_CLUSTER_NAME); + String qualifiedName = (String)entity.getAttributes().get(AtlasResourceMapper.ENTITY_ATTRIBUTE_QUALIFIED_NAME); if(StringUtils.isEmpty(path)) { path = getResourceNameFromQualifiedName(qualifiedName); @@ -81,7 +80,7 @@ public class AtlasHdfsResourceMapper extends AtlasResourceMapper { } } - String entityGuid = entity.getId() != null ? entity.getId()._getId() : null; + String entityGuid = entity.getGuid(); String serviceName = getRangerServiceName(clusterName); Boolean isExcludes = Boolean.FALSE; Boolean isRecursive = Boolean.TRUE; http://git-wip-us.apache.org/repos/asf/ranger/blob/e8afb9fa/tagsync/src/main/java/org/apache/ranger/tagsync/source/atlas/AtlasHiveResourceMapper.java ---------------------------------------------------------------------- diff --git a/tagsync/src/main/java/org/apache/ranger/tagsync/source/atlas/AtlasHiveResourceMapper.java b/tagsync/src/main/java/org/apache/ranger/tagsync/source/atlas/AtlasHiveResourceMapper.java index a359622..3e0a97f 100644 --- a/tagsync/src/main/java/org/apache/ranger/tagsync/source/atlas/AtlasHiveResourceMapper.java +++ b/tagsync/src/main/java/org/apache/ranger/tagsync/source/atlas/AtlasHiveResourceMapper.java @@ -22,10 +22,10 @@ package org.apache.ranger.tagsync.source.atlas; import java.util.Map; import java.util.HashMap; -import org.apache.atlas.typesystem.IReferenceableInstance; import org.apache.commons.lang.StringUtils; import org.apache.ranger.plugin.model.RangerPolicy.RangerPolicyResource; import org.apache.ranger.plugin.model.RangerServiceResource; +import org.apache.ranger.tagsync.source.atlasrest.RangerAtlasEntity; public class AtlasHiveResourceMapper extends AtlasResourceMapper { public static final String ENTITY_TYPE_HIVE_DB = "hive_db"; @@ -36,9 +36,6 @@ public class AtlasHiveResourceMapper extends AtlasResourceMapper { public static final String RANGER_TYPE_HIVE_TABLE = "table"; public static final String RANGER_TYPE_HIVE_COLUMN = "column"; - public static final String ENTITY_ATTRIBUTE_QUALIFIED_NAME = "qualifiedName"; - public static final String QUALIFIED_NAME_DELIMITER = "\\."; - public static final String[] SUPPORTED_ENTITY_TYPES = { ENTITY_TYPE_HIVE_DB, ENTITY_TYPE_HIVE_TABLE, ENTITY_TYPE_HIVE_COLUMN }; public AtlasHiveResourceMapper() { @@ -46,8 +43,8 @@ public class AtlasHiveResourceMapper extends AtlasResourceMapper { } @Override - public RangerServiceResource buildResource(final IReferenceableInstance entity) throws Exception { - String qualifiedName = getEntityAttribute(entity, ENTITY_ATTRIBUTE_QUALIFIED_NAME, String.class); + public RangerServiceResource buildResource(final RangerAtlasEntity entity) throws Exception { + String qualifiedName = (String)entity.getAttributes().get(AtlasResourceMapper.ENTITY_ATTRIBUTE_QUALIFIED_NAME); if (StringUtils.isEmpty(qualifiedName)) { throw new Exception("attribute '" + ENTITY_ATTRIBUTE_QUALIFIED_NAME + "' not found in entity"); } @@ -63,7 +60,7 @@ public class AtlasHiveResourceMapper extends AtlasResourceMapper { } String entityType = entity.getTypeName(); - String entityGuid = entity.getId() != null ? entity.getId()._getId() : null; + String entityGuid = entity.getGuid(); String serviceName = getRangerServiceName(clusterName); String[] resources = resourceStr.split(QUALIFIED_NAME_DELIMITER); String dbName = resources.length > 0 ? resources[0] : null; http://git-wip-us.apache.org/repos/asf/ranger/blob/e8afb9fa/tagsync/src/main/java/org/apache/ranger/tagsync/source/atlas/AtlasKafkaResourceMapper.java ---------------------------------------------------------------------- diff --git a/tagsync/src/main/java/org/apache/ranger/tagsync/source/atlas/AtlasKafkaResourceMapper.java b/tagsync/src/main/java/org/apache/ranger/tagsync/source/atlas/AtlasKafkaResourceMapper.java index 09ae5d1..86e37c3 100644 --- a/tagsync/src/main/java/org/apache/ranger/tagsync/source/atlas/AtlasKafkaResourceMapper.java +++ b/tagsync/src/main/java/org/apache/ranger/tagsync/source/atlas/AtlasKafkaResourceMapper.java @@ -22,18 +22,16 @@ package org.apache.ranger.tagsync.source.atlas; import java.util.HashMap; import java.util.Map; -import org.apache.atlas.typesystem.IReferenceableInstance; import org.apache.commons.lang.StringUtils; import org.apache.ranger.plugin.model.RangerPolicy; import org.apache.ranger.plugin.model.RangerPolicy.RangerPolicyResource; import org.apache.ranger.plugin.model.RangerServiceResource; +import org.apache.ranger.tagsync.source.atlasrest.RangerAtlasEntity; public class AtlasKafkaResourceMapper extends AtlasResourceMapper { public static final String ENTITY_TYPE_KAFKA_TOPIC = "kafka_topic"; public static final String RANGER_TYPE_KAFKA_TOPIC = "topic"; - public static final String ENTITY_ATTRIBUTE_QUALIFIED_NAME = "qualifiedName"; - public static final String[] SUPPORTED_ENTITY_TYPES = { ENTITY_TYPE_KAFKA_TOPIC }; public AtlasKafkaResourceMapper() { @@ -41,12 +39,8 @@ public class AtlasKafkaResourceMapper extends AtlasResourceMapper { } @Override - public RangerServiceResource buildResource(final IReferenceableInstance entity) throws Exception { - String qualifiedName = getEntityAttribute(entity, ENTITY_ATTRIBUTE_QUALIFIED_NAME, String.class); - - if(StringUtils.isEmpty(qualifiedName)) { - throwExceptionWithMessage("attribute '" + ENTITY_ATTRIBUTE_QUALIFIED_NAME + "' not found in entity"); - } + public RangerServiceResource buildResource(final RangerAtlasEntity entity) throws Exception { + String qualifiedName = (String)entity.getAttributes().get(AtlasResourceMapper.ENTITY_ATTRIBUTE_QUALIFIED_NAME); String topic = getResourceNameFromQualifiedName(qualifiedName); @@ -61,16 +55,17 @@ public class AtlasKafkaResourceMapper extends AtlasResourceMapper { } if(StringUtils.isEmpty(clusterName)) { - throwExceptionWithMessage("Cluster name not found in attribute '" + ENTITY_ATTRIBUTE_QUALIFIED_NAME + "'"); + throwExceptionWithMessage("attribute '" + ENTITY_ATTRIBUTE_QUALIFIED_NAME + "' not found in entity"); } + Map<String, RangerPolicyResource> elements = new HashMap<String, RangerPolicy.RangerPolicyResource>(); Boolean isExcludes = Boolean.FALSE; Boolean isRecursive = Boolean.TRUE; elements.put(RANGER_TYPE_KAFKA_TOPIC, new RangerPolicyResource(topic, isExcludes, isRecursive)); - String entityGuid = entity.getId() != null ? entity.getId()._getId() : null; + String entityGuid = entity.getGuid(); String serviceName = getRangerServiceName(clusterName); return new RangerServiceResource(entityGuid, serviceName, elements); http://git-wip-us.apache.org/repos/asf/ranger/blob/e8afb9fa/tagsync/src/main/java/org/apache/ranger/tagsync/source/atlas/AtlasNotificationMapper.java ---------------------------------------------------------------------- diff --git a/tagsync/src/main/java/org/apache/ranger/tagsync/source/atlas/AtlasNotificationMapper.java b/tagsync/src/main/java/org/apache/ranger/tagsync/source/atlas/AtlasNotificationMapper.java index f007ae5..91cf606 100644 --- a/tagsync/src/main/java/org/apache/ranger/tagsync/source/atlas/AtlasNotificationMapper.java +++ b/tagsync/src/main/java/org/apache/ranger/tagsync/source/atlas/AtlasNotificationMapper.java @@ -19,11 +19,9 @@ package org.apache.ranger.tagsync.source.atlas; -import org.apache.atlas.AtlasException; -import org.apache.atlas.notification.entity.EntityNotification; -import org.apache.atlas.typesystem.IReferenceableInstance; -import org.apache.atlas.typesystem.IStruct; -import org.apache.atlas.typesystem.persistence.Id; +import org.apache.atlas.v1.model.notification.EntityNotificationV1; +import org.apache.atlas.v1.model.instance.Id; +import org.apache.atlas.v1.model.instance.Referenceable; import org.apache.commons.collections.CollectionUtils; import org.apache.commons.collections.MapUtils; import org.apache.commons.lang.StringUtils; @@ -34,9 +32,10 @@ import org.apache.ranger.plugin.model.RangerTag; import org.apache.ranger.plugin.model.RangerTagDef; import org.apache.ranger.plugin.model.RangerTagDef.RangerTagAttributeDef; import org.apache.ranger.plugin.util.ServiceTags; +import org.apache.ranger.tagsync.source.atlasrest.RangerAtlasEntity; +import org.apache.ranger.tagsync.source.atlasrest.RangerAtlasEntityWithTags; import java.util.ArrayList; -import java.util.Collections; import java.util.HashMap; import java.util.List; import java.util.Map; @@ -47,7 +46,7 @@ public class AtlasNotificationMapper { private static Map<String, Long> unhandledEventTypes = new HashMap<String, Long>(); - private static void logUnhandledEntityNotification(EntityNotification entityNotification) { + private static void logUnhandledEntityNotification(EntityNotificationV1 entityNotification) { final int REPORTING_INTERVAL_FOR_UNHANDLED_ENTITYTYPE_IN_MILLIS = 5 * 60 * 1000; // 5 minutes @@ -77,39 +76,30 @@ public class AtlasNotificationMapper { } @SuppressWarnings("unchecked") - public static ServiceTags processEntityNotification(EntityNotification entityNotification) { + public static ServiceTags processEntityNotification(EntityNotificationV1 entityNotification) { ServiceTags ret = null; if (isNotificationHandled(entityNotification)) { try { - IReferenceableInstance entity = entityNotification.getEntity(); - - if (entity != null && AtlasResourceMapperUtil.isEntityTypeHandled(entity.getTypeName())) { - AtlasEntityWithTraits entityWithTraits = new AtlasEntityWithTraits(entity, entityNotification.getAllTraits()); - if (entityNotification.getOperationType() == EntityNotification.OperationType.ENTITY_DELETE) { - ret = buildServiceTagsForEntityDeleteNotification(entityWithTraits); - } else { - if (entity.getId().getState() == Id.EntityState.ACTIVE) { - ret = buildServiceTags(entityWithTraits, null); - } else { - if (LOG.isDebugEnabled()) { - LOG.debug("Ignoring entityNotification for entity that is not ACTIVE: " + entityWithTraits); - } - } - } + RangerAtlasEntityWithTags entityWithTags = new RangerAtlasEntityWithTags(entityNotification); + + if (entityNotification.getOperationType() == EntityNotificationV1.OperationType.ENTITY_DELETE) { + ret = buildServiceTagsForEntityDeleteNotification(entityWithTags); } else { - logUnhandledEntityNotification(entityNotification); + ret = buildServiceTags(entityWithTags, null); } } catch (Exception exception) { LOG.error("createServiceTags() failed!! ", exception); } + } else { + logUnhandledEntityNotification(entityNotification); } return ret; } - public static Map<String, ServiceTags> processAtlasEntities(List<AtlasEntityWithTraits> atlasEntities) { + public static Map<String, ServiceTags> processAtlasEntities(List<RangerAtlasEntityWithTags> atlasEntities) { Map<String, ServiceTags> ret = null; try { @@ -121,17 +111,16 @@ public class AtlasNotificationMapper { return ret; } - static private boolean isNotificationHandled(EntityNotification entityNotification) { + static private boolean isNotificationHandled(EntityNotificationV1 entityNotification) { boolean ret = false; - EntityNotification.OperationType opType = entityNotification.getOperationType(); + EntityNotificationV1.OperationType opType = entityNotification.getOperationType(); - if(opType != null) { + if (opType != null) { switch (opType) { - case ENTITY_CREATE: { - LOG.debug("ENTITY_CREATE notification is not handled, as Ranger will get necessary information from any subsequent TRAIT_ADDED notification"); + case ENTITY_CREATE: + ret = CollectionUtils.isNotEmpty(entityNotification.getAllTraits()); break; - } case ENTITY_UPDATE: case ENTITY_DELETE: case TRAIT_ADD: @@ -142,30 +131,38 @@ public class AtlasNotificationMapper { } default: LOG.error(opType + ": unknown notification received - not handled"); + break; + } + if (ret) { + final Referenceable entity = entityNotification.getEntity(); + + ret = entity != null + && entity.getId().getState() == Id.EntityState.ACTIVE + && AtlasResourceMapperUtil.isEntityTypeHandled(entity.getTypeName()); } } return ret; } - static private ServiceTags buildServiceTagsForEntityDeleteNotification(AtlasEntityWithTraits entityWithTraits) throws Exception { + static private ServiceTags buildServiceTagsForEntityDeleteNotification(RangerAtlasEntityWithTags entityWithTags) throws Exception { final ServiceTags ret; - IReferenceableInstance entity = entityWithTraits.getEntity(); + RangerAtlasEntity entity = entityWithTags.getEntity(); - String guid = entity.getId()._getId(); + String guid = entity.getGuid(); if (StringUtils.isNotBlank(guid)) { ret = new ServiceTags(); RangerServiceResource serviceResource = new RangerServiceResource(); serviceResource.setGuid(guid); ret.getServiceResources().add(serviceResource); } else { - ret = buildServiceTags(entityWithTraits, null); + ret = buildServiceTags(entityWithTags, null); if (ret != null) { // tag-definitions should NOT be deleted as part of service-resource delete - ret.setTagDefinitions(Collections.<Long, RangerTagDef>emptyMap()); + ret.setTagDefinitions(MapUtils.EMPTY_MAP); // Ranger deletes tags associated with deleted service-resource - ret.setTags(Collections.<Long, RangerTag>emptyMap()); + ret.setTags(MapUtils.EMPTY_MAP); } } @@ -176,13 +173,13 @@ public class AtlasNotificationMapper { return ret; } - static private Map<String, ServiceTags> buildServiceTags(List<AtlasEntityWithTraits> entitiesWithTraits) throws Exception { + static private Map<String, ServiceTags> buildServiceTags(List<RangerAtlasEntityWithTags> entitiesWithTags) throws Exception { Map<String, ServiceTags> ret = new HashMap<String, ServiceTags>(); - for (AtlasEntityWithTraits element : entitiesWithTraits) { - IReferenceableInstance entity = element.getEntity(); - if (entity != null && entity.getId().getState() == Id.EntityState.ACTIVE) { + for (RangerAtlasEntityWithTags element : entitiesWithTags) { + RangerAtlasEntity entity = element.getEntity(); + if (entity != null) { buildServiceTags(element, ret); } else { if (LOG.isDebugEnabled()) { @@ -241,15 +238,15 @@ public class AtlasNotificationMapper { return ret; } - static private ServiceTags buildServiceTags(AtlasEntityWithTraits entityWithTraits, Map<String, ServiceTags> serviceTagsMap) throws Exception { + static private ServiceTags buildServiceTags(RangerAtlasEntityWithTags entityWithTags, Map<String, ServiceTags> serviceTagsMap) throws Exception { ServiceTags ret = null; - IReferenceableInstance entity = entityWithTraits.getEntity(); + RangerAtlasEntity entity = entityWithTags.getEntity(); RangerServiceResource serviceResource = AtlasResourceMapperUtil.getRangerServiceResource(entity); if (serviceResource != null) { - List<RangerTag> tags = getTags(entityWithTraits); - List<RangerTagDef> tagDefs = getTagDefs(entityWithTraits); + List<RangerTag> tags = getTags(entityWithTags); + List<RangerTagDef> tagDefs = getTagDefs(entityWithTags); String serviceName = serviceResource.getServiceName(); ret = createOrGetServiceTags(serviceTagsMap, serviceName); @@ -279,12 +276,12 @@ public class AtlasNotificationMapper { } } else { if (LOG.isDebugEnabled()) { - LOG.debug("Entity " + entityWithTraits + " does not have any tags associated with it when full-sync is being done."); + LOG.debug("Entity " + entityWithTags + " does not have any tags associated with it when full-sync is being done."); LOG.debug("Will not add this entity to serviceTags, so that this entity, if exists, will be removed from ranger"); } } } else { - LOG.error("Failed to build serviceResource for entity:" + entity.getId()._getId()); + LOG.error("Failed to build serviceResource for entity:" + entity.getGuid()); } return ret; @@ -307,58 +304,33 @@ public class AtlasNotificationMapper { return ret; } - static private List<RangerTag> getTags(AtlasEntityWithTraits entityWithTraits) { + static private List<RangerTag> getTags(RangerAtlasEntityWithTags entityWithTags) { List<RangerTag> ret = new ArrayList<RangerTag>(); - if(entityWithTraits != null && CollectionUtils.isNotEmpty(entityWithTraits.getAllTraits())) { - List<IStruct> traits = entityWithTraits.getAllTraits(); - - for (IStruct trait : traits) { - Map<String, String> tagAttrs = new HashMap<String, String>(); + if (entityWithTags != null && MapUtils.isNotEmpty(entityWithTags.getTags())) { + Map<String, Map<String, String>> tags = entityWithTags.getTags(); - try { - Map<String, Object> attrs = trait.getValuesMap(); - - if(MapUtils.isNotEmpty(attrs)) { - for (Map.Entry<String, Object> attrEntry : attrs.entrySet()) { - String attrName = attrEntry.getKey(); - Object attrValue = attrEntry.getValue(); - - tagAttrs.put(attrName, attrValue != null ? attrValue.toString() : null); - } - } - } catch (AtlasException exception) { - LOG.error("Could not get values for trait:" + trait.getTypeName(), exception); - } - - ret.add(new RangerTag(null, trait.getTypeName(), tagAttrs, RangerTag.OWNER_SERVICERESOURCE)); + for (Map.Entry<String, Map<String, String>> tag : tags.entrySet()) { + ret.add(new RangerTag(null, tag.getKey(), tag.getValue(), RangerTag.OWNER_SERVICERESOURCE)); } } return ret; } - static private List<RangerTagDef> getTagDefs(AtlasEntityWithTraits entityWithTraits) { + static private List<RangerTagDef> getTagDefs(RangerAtlasEntityWithTags entityWithTags) { List<RangerTagDef> ret = new ArrayList<RangerTagDef>(); - if(entityWithTraits != null && CollectionUtils.isNotEmpty(entityWithTraits.getAllTraits())) { - List<IStruct> traits = entityWithTraits.getAllTraits(); - - for (IStruct trait : traits) { - RangerTagDef tagDef = new RangerTagDef(trait.getTypeName(), "Atlas"); - - try { - Map<String, Object> attrs = trait.getValuesMap(); + if (entityWithTags != null && MapUtils.isNotEmpty(entityWithTags.getTags())) { + Map<String, Map<String, String>> tags = entityWithTags.getTags(); - if(MapUtils.isNotEmpty(attrs)) { - for (String attrName : attrs.keySet()) { - tagDef.getAttributeDefs().add(new RangerTagAttributeDef(attrName, "string")); - } + for (Map.Entry<String, Map<String, String>> tag : tags.entrySet()) { + RangerTagDef tagDef = new RangerTagDef(tag.getKey(), "Atlas"); + if (MapUtils.isNotEmpty(tag.getValue())) { + for (String attributeName : tag.getValue().keySet()) { + tagDef.getAttributeDefs().add(new RangerTagAttributeDef(attributeName, entityWithTags.getTagAttributeType(tag.getKey(), attributeName))); } - } catch (AtlasException exception) { - LOG.error("Could not get values for trait:" + trait.getTypeName(), exception); } - ret.add(tagDef); } } http://git-wip-us.apache.org/repos/asf/ranger/blob/e8afb9fa/tagsync/src/main/java/org/apache/ranger/tagsync/source/atlas/AtlasResourceMapper.java ---------------------------------------------------------------------- diff --git a/tagsync/src/main/java/org/apache/ranger/tagsync/source/atlas/AtlasResourceMapper.java b/tagsync/src/main/java/org/apache/ranger/tagsync/source/atlas/AtlasResourceMapper.java index 8ececdf..5d067a5 100644 --- a/tagsync/src/main/java/org/apache/ranger/tagsync/source/atlas/AtlasResourceMapper.java +++ b/tagsync/src/main/java/org/apache/ranger/tagsync/source/atlas/AtlasResourceMapper.java @@ -20,19 +20,20 @@ package org.apache.ranger.tagsync.source.atlas; import java.util.Properties; -import java.util.Map; -import org.apache.atlas.AtlasException; -import org.apache.atlas.typesystem.IReferenceableInstance; import org.apache.commons.lang.StringUtils; import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; import org.apache.ranger.plugin.model.RangerServiceResource; +import org.apache.ranger.tagsync.source.atlasrest.RangerAtlasEntity; public abstract class AtlasResourceMapper { private static final Log LOG = LogFactory.getLog(AtlasResourceMapper.class); public static final String TAGSYNC_DEFAULT_CLUSTER_NAME = "ranger.tagsync.atlas.default.cluster.name"; + public static final String ENTITY_ATTRIBUTE_QUALIFIED_NAME = "qualifiedName"; + public static final String QUALIFIED_NAME_DELIMITER = "\\."; + public static final Character QUALIFIED_NAME_DELIMITER_CHAR = '.'; protected static final String TAGSYNC_SERVICENAME_MAPPER_PROP_PREFIX = "ranger.tagsync.atlas."; protected static final String TAGSYNC_SERVICENAME_MAPPER_PROP_SUFFIX = ".ranger.service"; @@ -73,7 +74,7 @@ public abstract class AtlasResourceMapper { this.defaultClusterName = properties != null ? properties.getProperty(TAGSYNC_DEFAULT_CLUSTER_NAME) : null; } - abstract public RangerServiceResource buildResource(final IReferenceableInstance entity) throws Exception; + abstract public RangerServiceResource buildResource(final RangerAtlasEntity entity) throws Exception; protected String getCustomRangerServiceName(String atlasInstanceName) { if(properties != null) { @@ -118,21 +119,4 @@ public abstract class AtlasResourceMapper { throw new Exception(msg); } - - static protected <T> T getEntityAttribute(IReferenceableInstance entity, String name, Class<T> type) { - T ret = null; - - try { - Map<String, Object> valueMap = entity.getValuesMap(); - ret = getAttribute(valueMap, name, type); - } catch (AtlasException exception) { - LOG.error("Cannot get map of values for entity: " + entity.getId()._getId(), exception); - } - - return ret; - } - - static protected <T> T getAttribute(Map<String, Object> map, String name, Class<T> type) { - return type.cast(map.get(name)); - } } http://git-wip-us.apache.org/repos/asf/ranger/blob/e8afb9fa/tagsync/src/main/java/org/apache/ranger/tagsync/source/atlas/AtlasResourceMapperUtil.java ---------------------------------------------------------------------- diff --git a/tagsync/src/main/java/org/apache/ranger/tagsync/source/atlas/AtlasResourceMapperUtil.java b/tagsync/src/main/java/org/apache/ranger/tagsync/source/atlas/AtlasResourceMapperUtil.java index 40a639b..cd2cb63 100644 --- a/tagsync/src/main/java/org/apache/ranger/tagsync/source/atlas/AtlasResourceMapperUtil.java +++ b/tagsync/src/main/java/org/apache/ranger/tagsync/source/atlas/AtlasResourceMapperUtil.java @@ -19,7 +19,6 @@ package org.apache.ranger.tagsync.source.atlas; -import org.apache.atlas.typesystem.IReferenceableInstance; import org.apache.commons.lang.StringUtils; import org.apache.ranger.plugin.model.RangerServiceResource; @@ -28,14 +27,13 @@ import java.util.*; import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; import org.apache.ranger.tagsync.process.TagSyncConfig; +import org.apache.ranger.tagsync.source.atlasrest.RangerAtlasEntity; public class AtlasResourceMapperUtil { private static final Log LOG = LogFactory.getLog(AtlasResourceMapperUtil.class); private static Map<String, AtlasResourceMapper> atlasResourceMappers = new HashMap<String, AtlasResourceMapper>(); - private static final String MAPPER_NAME_DELIMITER = ","; - public static boolean isEntityTypeHandled(String entityTypeName) { if (LOG.isDebugEnabled()) { LOG.debug("==> isEntityTypeHandled(entityTypeName=" + entityTypeName + ")"); @@ -52,9 +50,9 @@ public class AtlasResourceMapperUtil { return ret; } - public static RangerServiceResource getRangerServiceResource(IReferenceableInstance atlasEntity) { + public static RangerServiceResource getRangerServiceResource(RangerAtlasEntity atlasEntity) { if (LOG.isDebugEnabled()) { - LOG.debug("==> getRangerServiceResource(" + atlasEntity.getId()._getId() +")"); + LOG.debug("==> getRangerServiceResource(" + atlasEntity.getGuid() +")"); } RangerServiceResource resource = null; @@ -65,59 +63,63 @@ public class AtlasResourceMapperUtil { try { resource = mapper.buildResource(atlasEntity); } catch (Exception exception) { - LOG.error("Could not get serviceResource for atlas entity:" + atlasEntity.getId()._getId() + ": ", exception); + LOG.error("Could not get serviceResource for atlas entity:" + atlasEntity.getGuid() + ": ", exception); } } if (LOG.isDebugEnabled()) { - LOG.debug("<== getRangerServiceResource(" + atlasEntity.getId()._getId() +"): resource=" + resource); + LOG.debug("<== getRangerServiceResource(" + atlasEntity.getGuid() +"): resource=" + resource); } return resource; } static public boolean initializeAtlasResourceMappers(Properties properties) { + final String MAPPER_NAME_DELIMITER = ","; + String customMapperNames = TagSyncConfig.getCustomAtlasResourceMappers(properties); if (LOG.isDebugEnabled()) { LOG.debug("==> initializeAtlasResourceMappers.initializeAtlasResourceMappers(" + customMapperNames + ")"); } + boolean ret = true; - // Initialize the default mappers - initializeAtlasResourceMapper(new AtlasHiveResourceMapper(), properties); - initializeAtlasResourceMapper(new AtlasHdfsResourceMapper(), properties); - initializeAtlasResourceMapper(new AtlasHbaseResourceMapper(), properties); - initializeAtlasResourceMapper(new AtlasKafkaResourceMapper(), properties); - initializeAtlasResourceMapper(new AtlasStormResourceMapper(), properties); + List<String> mapperNames = new ArrayList<String>(); + mapperNames.add("org.apache.ranger.tagsync.source.atlas.AtlasHiveResourceMapper"); + mapperNames.add("org.apache.ranger.tagsync.source.atlas.AtlasHdfsResourceMapper"); + mapperNames.add("org.apache.ranger.tagsync.source.atlas.AtlasHbaseResourceMapper"); + mapperNames.add("org.apache.ranger.tagsync.source.atlas.AtlasKafkaResourceMapper"); - // Initialize the custom mappers - boolean ret = true; if (StringUtils.isNotBlank(customMapperNames)) { for (String customMapperName : customMapperNames.split(MAPPER_NAME_DELIMITER)) { - try { - Class<?> clazz = Class.forName(customMapperName); - AtlasResourceMapper resourceMapper = (AtlasResourceMapper) clazz.newInstance(); - - initializeAtlasResourceMapper(resourceMapper, properties); - } catch (Exception exception) { - LOG.error("Failed to create AtlasResourceMapper:" + customMapperName + ": ", exception); - ret = false; - } + mapperNames.add(customMapperName.trim()); + } + } + + for (String mapperName : mapperNames) { + try { + Class<?> clazz = Class.forName(mapperName); + AtlasResourceMapper resourceMapper = (AtlasResourceMapper) clazz.newInstance(); + + resourceMapper.initialize(properties); + + for (String entityTypeName : resourceMapper.getSupportedEntityTypes()) { + add(entityTypeName, resourceMapper); + } + + } catch (Exception exception) { + LOG.error("Failed to create AtlasResourceMapper:" + mapperName + ": ", exception); + ret = false; } } if (LOG.isDebugEnabled()) { - LOG.debug("<== initializeAtlasResourceMappers.initializeAtlasResourceMappers(" + customMapperNames + "): " + ret); + LOG.debug("<== initializeAtlasResourceMappers.initializeAtlasResourceMappers(" + mapperNames + "): " + ret); } return ret; } - private static void initializeAtlasResourceMapper(AtlasResourceMapper resourceMapper, Properties properties) { - resourceMapper.initialize(properties); - - for (String entityTypeName : resourceMapper.getSupportedEntityTypes()) { - atlasResourceMappers.put(entityTypeName, resourceMapper); - } + private static void add(String entityType, AtlasResourceMapper mapper) { + atlasResourceMappers.put(entityType, mapper); } - } http://git-wip-us.apache.org/repos/asf/ranger/blob/e8afb9fa/tagsync/src/main/java/org/apache/ranger/tagsync/source/atlas/AtlasStormResourceMapper.java ---------------------------------------------------------------------- diff --git a/tagsync/src/main/java/org/apache/ranger/tagsync/source/atlas/AtlasStormResourceMapper.java b/tagsync/src/main/java/org/apache/ranger/tagsync/source/atlas/AtlasStormResourceMapper.java index 4ed01ca..650968d 100644 --- a/tagsync/src/main/java/org/apache/ranger/tagsync/source/atlas/AtlasStormResourceMapper.java +++ b/tagsync/src/main/java/org/apache/ranger/tagsync/source/atlas/AtlasStormResourceMapper.java @@ -22,17 +22,15 @@ package org.apache.ranger.tagsync.source.atlas; import java.util.HashMap; import java.util.Map; -import org.apache.atlas.typesystem.IReferenceableInstance; import org.apache.commons.lang.StringUtils; import org.apache.ranger.plugin.model.RangerPolicy.RangerPolicyResource; import org.apache.ranger.plugin.model.RangerServiceResource; +import org.apache.ranger.tagsync.source.atlasrest.RangerAtlasEntity; public class AtlasStormResourceMapper extends AtlasResourceMapper { public static final String ENTITY_TYPE_STORM_TOPOLOGY = "storm_topology"; public static final String RANGER_TYPE_STORM_TOPOLOGY = "topology"; - public static final String ENTITY_ATTRIBUTE_QUALIFIED_NAME = "qualifiedName"; - public static final String[] SUPPORTED_ENTITY_TYPES = { ENTITY_TYPE_STORM_TOPOLOGY }; public AtlasStormResourceMapper() { @@ -40,8 +38,8 @@ public class AtlasStormResourceMapper extends AtlasResourceMapper { } @Override - public RangerServiceResource buildResource(final IReferenceableInstance entity) throws Exception { - String qualifiedName = getEntityAttribute(entity, ENTITY_ATTRIBUTE_QUALIFIED_NAME, String.class); + public RangerServiceResource buildResource(final RangerAtlasEntity entity) throws Exception { + String qualifiedName = (String)entity.getAttributes().get(AtlasResourceMapper.ENTITY_ATTRIBUTE_QUALIFIED_NAME); String topology = getResourceNameFromQualifiedName(qualifiedName); @@ -65,7 +63,7 @@ public class AtlasStormResourceMapper extends AtlasResourceMapper { elements.put(RANGER_TYPE_STORM_TOPOLOGY, new RangerPolicyResource(topology, isExcludes, isRecursive)); - String entityGuid = entity.getId() != null ? entity.getId()._getId() : null; + String entityGuid = entity.getGuid(); String serviceName = getRangerServiceName(clusterName); return new RangerServiceResource(entityGuid, serviceName, elements); http://git-wip-us.apache.org/repos/asf/ranger/blob/e8afb9fa/tagsync/src/main/java/org/apache/ranger/tagsync/source/atlas/AtlasTagSource.java ---------------------------------------------------------------------- diff --git a/tagsync/src/main/java/org/apache/ranger/tagsync/source/atlas/AtlasTagSource.java b/tagsync/src/main/java/org/apache/ranger/tagsync/source/atlas/AtlasTagSource.java index c382db0..8c15ee5 100644 --- a/tagsync/src/main/java/org/apache/ranger/tagsync/source/atlas/AtlasTagSource.java +++ b/tagsync/src/main/java/org/apache/ranger/tagsync/source/atlas/AtlasTagSource.java @@ -20,24 +20,24 @@ package org.apache.ranger.tagsync.source.atlas; -import org.apache.atlas.kafka.AtlasKafkaMessage; import org.apache.atlas.kafka.NotificationProvider; +import org.apache.atlas.notification.NotificationConsumer; +import org.apache.atlas.notification.NotificationInterface; +import org.apache.atlas.v1.model.notification.EntityNotificationV1; import org.apache.commons.lang.StringUtils; import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; - -import org.apache.atlas.notification.NotificationConsumer; -import org.apache.atlas.notification.NotificationInterface; -import org.apache.atlas.notification.entity.EntityNotification; - -import org.apache.kafka.common.TopicPartition; -import org.apache.ranger.tagsync.model.AbstractTagSource; import org.apache.ranger.plugin.util.ServiceTags; +import org.apache.ranger.tagsync.model.AbstractTagSource; +import org.apache.atlas.kafka.AtlasKafkaMessage; +import org.apache.kafka.common.TopicPartition; +import org.apache.ranger.tagsync.source.atlasrest.RangerAtlasEntityWithTags; + import java.io.IOException; import java.io.InputStream; -import java.util.Properties; import java.util.List; +import java.util.Properties; public class AtlasTagSource extends AbstractTagSource { private static final Log LOG = LogFactory.getLog(AtlasTagSource.class); @@ -100,10 +100,11 @@ public class AtlasTagSource extends AbstractTagSource { } if (ret) { - NotificationInterface notification = NotificationProvider.get(); - List<NotificationConsumer<Object>> iterators = notification.createConsumers(NotificationInterface.NotificationType.ENTITIES, 1); + NotificationInterface notification = NotificationProvider.get(); + List<NotificationConsumer<EntityNotificationV1>> iterators = notification.createConsumers(NotificationInterface.NotificationType.ENTITIES, 1); + + consumerTask = new ConsumerRunnable(iterators.get(0)); - consumerTask = new ConsumerRunnable(iterators.get(0)); } if (LOG.isDebugEnabled()) { @@ -137,63 +138,59 @@ public class AtlasTagSource extends AbstractTagSource { } } - private static String getPrintableEntityNotification(EntityNotification notification) { + private static String getPrintableEntityNotification(EntityNotificationV1 notification) { StringBuilder sb = new StringBuilder(); sb.append("{ Notification-Type: ").append(notification.getOperationType()).append(", "); - AtlasEntityWithTraits entityWithTraits = new AtlasEntityWithTraits(notification.getEntity(), notification.getAllTraits()); - sb.append(entityWithTraits.toString()); + RangerAtlasEntityWithTags entityWithTags = new RangerAtlasEntityWithTags(notification); + sb.append(entityWithTags.toString()); + sb.append("}"); return sb.toString(); } private class ConsumerRunnable implements Runnable { - private final NotificationConsumer<Object> consumer; + private final NotificationConsumer<EntityNotificationV1> consumer; - private ConsumerRunnable(NotificationConsumer<Object> consumer) { + private ConsumerRunnable(NotificationConsumer<EntityNotificationV1> consumer) { this.consumer = consumer; } + @Override public void run() { if (LOG.isDebugEnabled()) { LOG.debug("==> ConsumerRunnable.run()"); } while (true) { - try { - List<AtlasKafkaMessage<Object>> messages = consumer.receive(1000L); - for (AtlasKafkaMessage<Object> message : messages) { - Object kafkaMessage = message != null ? message.getMessage() : null; - - if (kafkaMessage != null) { - EntityNotification notification = null; - if (kafkaMessage instanceof EntityNotification) { - notification = (EntityNotification) kafkaMessage; - } else { - LOG.warn("Received Kafka notification of unexpected type:[" + kafkaMessage.getClass().toString() + "], Ignoring..."); - } - if (notification != null) { - if (LOG.isDebugEnabled()) { - LOG.debug("Notification=" + getPrintableEntityNotification(notification)); - } - - ServiceTags serviceTags = AtlasNotificationMapper.processEntityNotification(notification); - if (serviceTags != null) { - updateSink(serviceTags); - } - } - TopicPartition partition = new TopicPartition("ATLAS_ENTITIES", message.getPartition()); - consumer.commit(partition, message.getOffset()); - } else { - LOG.error("Null message received from Kafka!! Ignoring.."); - } - } - } catch (Exception exception) { - LOG.error("Caught exception..: ", exception); - return; - } - } + try { + List<AtlasKafkaMessage<EntityNotificationV1>> messages = consumer.receive(1000L); + + for (AtlasKafkaMessage<EntityNotificationV1> message : messages) { + EntityNotificationV1 notification = message != null ? message.getMessage() : null; + + if (notification != null) { + if (LOG.isDebugEnabled()) { + LOG.debug("Notification=" + getPrintableEntityNotification(notification)); + } + + ServiceTags serviceTags = AtlasNotificationMapper.processEntityNotification(notification); + if (serviceTags != null) { + updateSink(serviceTags); + } + + TopicPartition partition = new TopicPartition("ATLAS_ENTITIES", message.getPartition()); + consumer.commit(partition, message.getOffset()); + } else { + LOG.error("Null entityNotification received from Kafka!! Ignoring.."); + } + } + } catch (Exception exception) { + LOG.error("Caught exception..: ", exception); + return; + } + } } } } http://git-wip-us.apache.org/repos/asf/ranger/blob/e8afb9fa/tagsync/src/main/java/org/apache/ranger/tagsync/source/atlasrest/AtlasRESTTagSource.java ---------------------------------------------------------------------- diff --git a/tagsync/src/main/java/org/apache/ranger/tagsync/source/atlasrest/AtlasRESTTagSource.java b/tagsync/src/main/java/org/apache/ranger/tagsync/source/atlasrest/AtlasRESTTagSource.java index 4e0ae90..b715869 100644 --- a/tagsync/src/main/java/org/apache/ranger/tagsync/source/atlasrest/AtlasRESTTagSource.java +++ b/tagsync/src/main/java/org/apache/ranger/tagsync/source/atlasrest/AtlasRESTTagSource.java @@ -20,35 +20,67 @@ package org.apache.ranger.tagsync.source.atlasrest; import com.google.gson.Gson; - import com.google.gson.GsonBuilder; - +import org.apache.atlas.AtlasClientV2; +import org.apache.atlas.AtlasServiceException; +import org.apache.atlas.exception.AtlasBaseException; +import org.apache.atlas.model.SearchFilter; +import org.apache.atlas.model.discovery.AtlasSearchResult; +import org.apache.atlas.model.discovery.SearchParameters; +import org.apache.atlas.model.instance.AtlasClassification; +import org.apache.atlas.model.instance.AtlasEntity; +import org.apache.atlas.model.instance.AtlasEntityHeader; +import org.apache.atlas.model.typedef.AtlasBaseTypeDef; +import org.apache.atlas.model.typedef.AtlasTypesDef; +import org.apache.atlas.type.AtlasBuiltInTypes; +import org.apache.atlas.type.AtlasClassificationType; +import org.apache.atlas.type.AtlasStructType; +import org.apache.atlas.type.AtlasType; +import org.apache.atlas.type.AtlasTypeRegistry; import org.apache.commons.collections.CollectionUtils; import org.apache.commons.collections.MapUtils; import org.apache.commons.lang.StringUtils; import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; -import org.apache.ranger.plugin.util.RangerRESTClient; -import org.apache.ranger.tagsync.model.AbstractTagSource; +import org.apache.hadoop.security.UserGroupInformation; import org.apache.ranger.plugin.util.ServiceTags; +import org.apache.ranger.tagsync.model.AbstractTagSource; import org.apache.ranger.tagsync.model.TagSink; import org.apache.ranger.tagsync.process.TagSyncConfig; import org.apache.ranger.tagsync.process.TagSynchronizer; -import org.apache.ranger.tagsync.source.atlas.AtlasEntityWithTraits; import org.apache.ranger.tagsync.source.atlas.AtlasNotificationMapper; import org.apache.ranger.tagsync.source.atlas.AtlasResourceMapperUtil; +import java.io.IOException; +import java.text.DateFormat; +import java.text.SimpleDateFormat; +import java.util.ArrayList; +import java.util.HashMap; import java.util.List; import java.util.Map; import java.util.Properties; +import java.util.Set; +import java.util.TimeZone; public class AtlasRESTTagSource extends AbstractTagSource implements Runnable { private static final Log LOG = LogFactory.getLog(AtlasRESTTagSource.class); - private long sleepTimeBetweenCycleInMillis; + private static final ThreadLocal<DateFormat> DATE_FORMATTER = new ThreadLocal<DateFormat>() { + @Override + protected DateFormat initialValue() { + SimpleDateFormat dateFormat = new SimpleDateFormat(AtlasBaseTypeDef.SERIALIZED_DATE_FORMAT_STR); + + dateFormat.setTimeZone(TimeZone.getTimeZone("UTC")); - private AtlasRESTUtil atlasRESTUtil = null; + return dateFormat; + } + }; + + private long sleepTimeBetweenCycleInMillis; + private String[] restUrls = null; + private boolean isKerberized = false; + private String[] userNamePassword = null; private Thread myThread = null; @@ -95,30 +127,26 @@ public class AtlasRESTTagSource extends AbstractTagSource implements Runnable { boolean ret = AtlasResourceMapperUtil.initializeAtlasResourceMappers(properties); sleepTimeBetweenCycleInMillis = TagSyncConfig.getTagSourceAtlasDownloadIntervalInMillis(properties); - final boolean isKerberized = TagSyncConfig.getTagsyncKerberosIdentity(properties) != null; + isKerberized = TagSyncConfig.getTagsyncKerberosIdentity(properties) != null; - String restUrl = TagSyncConfig.getAtlasRESTEndpoint(properties); + String restEndpoint = TagSyncConfig.getAtlasRESTEndpoint(properties); String sslConfigFile = TagSyncConfig.getAtlasRESTSslConfigFile(properties); - String userName = TagSyncConfig.getAtlasRESTUserName(properties); - String password = TagSyncConfig.getAtlasRESTPassword(properties); + this.userNamePassword = new String[] { TagSyncConfig.getAtlasRESTUserName(properties), TagSyncConfig.getAtlasRESTPassword(properties) }; if (LOG.isDebugEnabled()) { - LOG.debug("restUrl=" + restUrl); + LOG.debug("restUrl=" + restEndpoint); LOG.debug("sslConfigFile=" + sslConfigFile); - LOG.debug("userName=" + userName); + LOG.debug("userName=" + userNamePassword[0]); LOG.debug("kerberized=" + isKerberized); } - - if (StringUtils.isNotEmpty(restUrl)) { - if (!restUrl.endsWith("/")) { - restUrl += "/"; - } - RangerRESTClient atlasRESTClient = new RangerRESTClient(restUrl, sslConfigFile); - - if (!isKerberized) { - atlasRESTClient.setBasicAuthInfo(userName, password); - } - atlasRESTUtil = new AtlasRESTUtil(atlasRESTClient, isKerberized); + if (StringUtils.isNotEmpty(restEndpoint)) { + this.restUrls = restEndpoint.split(","); + + for (int i = 0; i < restUrls.length; i++) { + if (!restUrls[i].endsWith("/")) { + restUrls[i] += "/"; + } + } } else { LOG.info("AtlasEndpoint not specified, Initial download of Atlas-entities cannot be done."); ret = false; @@ -174,16 +202,15 @@ public class AtlasRESTTagSource extends AbstractTagSource implements Runnable { public void synchUp() { - List<AtlasEntityWithTraits> atlasEntities = atlasRESTUtil.getAtlasEntities(); + List<RangerAtlasEntityWithTags> rangerAtlasEntities = getAtlasActiveEntities(); - if (CollectionUtils.isNotEmpty(atlasEntities)) { + if (CollectionUtils.isNotEmpty(rangerAtlasEntities)) { if (LOG.isDebugEnabled()) { - for (AtlasEntityWithTraits element : atlasEntities) { + for (RangerAtlasEntityWithTags element : rangerAtlasEntities) { LOG.debug(element); } } - - Map<String, ServiceTags> serviceTagsMap = AtlasNotificationMapper.processAtlasEntities(atlasEntities); + Map<String, ServiceTags> serviceTagsMap = AtlasNotificationMapper.processAtlasEntities(rangerAtlasEntities); if (MapUtils.isNotEmpty(serviceTagsMap)) { for (Map.Entry<String, ServiceTags> entry : serviceTagsMap.entrySet()) { @@ -202,5 +229,158 @@ public class AtlasRESTTagSource extends AbstractTagSource implements Runnable { } + private List<RangerAtlasEntityWithTags> getAtlasActiveEntities() { + if (LOG.isDebugEnabled()) { + LOG.debug("==> getAtlasActiveEntities()"); + } + List<RangerAtlasEntityWithTags> ret = null; + + SearchParameters searchParams = new SearchParameters(); + AtlasTypeRegistry typeRegistry = new AtlasTypeRegistry(); + AtlasTypeRegistry.AtlasTransientTypeRegistry tty = null; + AtlasSearchResult searchResult = null; + + searchParams.setClassification("*"); + searchParams.setIncludeClassificationAttributes(true); + searchParams.setOffset(0); + searchParams.setLimit(Integer.MAX_VALUE); + + boolean commitUpdates = false; + try { + AtlasClientV2 atlasClient = getAtlasClient(); + searchResult = atlasClient.facetedSearch(searchParams); + AtlasTypesDef typesDef = atlasClient.getAllTypeDefs(new SearchFilter()); + tty = typeRegistry.lockTypeRegistryForUpdate(); + tty.addTypes(typesDef); + commitUpdates = true; + } catch (AtlasServiceException | AtlasBaseException | IOException excp) { + LOG.error("failed to download tags from Atlas", excp); + } catch (Exception unexpectedException) { + LOG.error("Failed to download tags from Atlas due to unexpected exception", unexpectedException); + } finally { + if (tty != null) { + typeRegistry.releaseTypeRegistryForUpdate(tty, commitUpdates); + } + } + + if (commitUpdates && searchResult != null) { + if (LOG.isDebugEnabled()) { + LOG.debug(AtlasType.toJson(searchResult)); + } + ret = new ArrayList<>(); + List<AtlasEntityHeader> entityHeaders = searchResult.getEntities(); + if (CollectionUtils.isNotEmpty(entityHeaders)) { + for (AtlasEntityHeader header : entityHeaders) { + if (!header.getStatus().equals(AtlasEntity.Status.ACTIVE)) { + if (LOG.isDebugEnabled()) { + LOG.debug("Skipping entity because it is not ACTIVE, header:[" + header + "]"); + } + continue; + } + + String typeName = header.getTypeName(); + if (!AtlasResourceMapperUtil.isEntityTypeHandled(typeName)) { + if (LOG.isDebugEnabled()) { + LOG.debug("Not fetching Atlas entities of type:[" + typeName + "]"); + } + continue; + } + + Map<String, Map<String, String>> allTagsForEntity = new HashMap<>(); + + for (AtlasClassification classification : header.getClassifications()) { + Map<String, Map<String, String>> tags = resolveTag(typeRegistry, classification.getTypeName(), classification.getAttributes()); + if (tags != null) { + allTagsForEntity.putAll(tags); + } + } + + if (MapUtils.isNotEmpty(allTagsForEntity)) { + + RangerAtlasEntity entity = new RangerAtlasEntity(typeName, header.getGuid(), header.getAttributes()); + RangerAtlasEntityWithTags entityWithTags = new RangerAtlasEntityWithTags(entity, allTagsForEntity, typeRegistry); + ret.add(entityWithTags); + } + } + } + } + + if (LOG.isDebugEnabled()) { + LOG.debug("<== getAtlasActiveEntities()"); + } + + return ret; + } + + /* + * Returns a map of <tag-name, List<attributeName, [attributeValue, attributeType]>> + */ + private Map<String, Map<String, String>> resolveTag(AtlasTypeRegistry typeRegistry, String typeName, Map<String, Object> attributes) { + Map<String, Map<String, String>> ret = new HashMap<>(); + + try { + AtlasClassificationType classificationType = typeRegistry.getClassificationTypeByName(typeName); + if (classificationType != null) { + Map<String, String> allAttributes = new HashMap<>(); + if (MapUtils.isNotEmpty(attributes) && MapUtils.isNotEmpty(classificationType.getAllAttributes())) { + for (Map.Entry<String, Object> attribute : attributes.entrySet()) { + String name = attribute.getKey(); + Object value = attribute.getValue(); + if (value != null) { + String stringValue = value.toString(); + AtlasStructType.AtlasAttribute atlasAttribute = classificationType.getAttribute(name); + if (atlasAttribute != null) { + if (value instanceof Number) { + if (atlasAttribute.getAttributeType() instanceof AtlasBuiltInTypes.AtlasDateType) { + stringValue = DATE_FORMATTER.get().format(value); + } + } + allAttributes.put(name, stringValue); + } + } + } + } + // Put most derived classificationType with all attributes + ret.put(typeName, allAttributes); + + // Find base classification types + Set<String> superTypeNames = classificationType.getAllSuperTypes(); + for (String superTypeName : superTypeNames) { + AtlasClassificationType superType = typeRegistry.getClassificationTypeByName(superTypeName); + if (superType != null) { + Map<String, String> attributeMap = new HashMap<>(); + if (MapUtils.isNotEmpty(attributes) && MapUtils.isNotEmpty(superType.getAllAttributes())) { + for (String name : superType.getAllAttributes().keySet()) { + String stringValue = allAttributes.get(name); + if (stringValue != null) { + attributeMap.put(name, stringValue); + } + } + } + ret.put(superTypeName, attributeMap); + } + } + } + } catch (Exception exception) { + LOG.error("Error in resolving tags for type:[" + typeName + "]", exception); + } + return ret; + } + + private AtlasClientV2 getAtlasClient() throws IOException { + final AtlasClientV2 ret; + + if (isKerberized) { + UserGroupInformation ugi = UserGroupInformation.getLoginUser(); + + ugi.checkTGTAndReloginFromKeytab(); + + ret = new AtlasClientV2(ugi, ugi.getShortUserName(), restUrls); + } else { + ret = new AtlasClientV2(restUrls, userNamePassword); + } + + return ret; + } } http://git-wip-us.apache.org/repos/asf/ranger/blob/e8afb9fa/tagsync/src/main/java/org/apache/ranger/tagsync/source/atlasrest/AtlasRESTUtil.java ---------------------------------------------------------------------- diff --git a/tagsync/src/main/java/org/apache/ranger/tagsync/source/atlasrest/AtlasRESTUtil.java b/tagsync/src/main/java/org/apache/ranger/tagsync/source/atlasrest/AtlasRESTUtil.java deleted file mode 100644 index 00a101e..0000000 --- a/tagsync/src/main/java/org/apache/ranger/tagsync/source/atlasrest/AtlasRESTUtil.java +++ /dev/null @@ -1,325 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.ranger.tagsync.source.atlasrest; - -import com.google.gson.Gson; -import com.sun.jersey.api.client.ClientResponse; -import com.sun.jersey.api.client.WebResource; -import org.apache.atlas.typesystem.IReferenceableInstance; -import org.apache.atlas.typesystem.IStruct; -import org.apache.atlas.typesystem.Struct; -import org.apache.atlas.typesystem.json.InstanceSerialization; -import org.apache.commons.collections.CollectionUtils; -import org.apache.commons.collections.MapUtils; -import org.apache.commons.lang.StringUtils; -import org.apache.hadoop.security.UserGroupInformation; -import org.apache.log4j.Logger; -import org.apache.ranger.admin.client.datatype.RESTResponse; -import org.apache.ranger.plugin.util.RangerRESTClient; -import org.apache.ranger.tagsync.source.atlas.AtlasEntityWithTraits; -import org.apache.ranger.tagsync.source.atlas.AtlasResourceMapperUtil; - -import java.io.IOException; -import java.security.PrivilegedAction; -import java.util.ArrayList; -import java.util.HashMap; -import java.util.LinkedList; -import java.util.List; -import java.util.Map; - -@SuppressWarnings("unchecked") -public class AtlasRESTUtil { - private static final Logger LOG = Logger.getLogger(AtlasRESTUtil.class); - - private static final String REST_MIME_TYPE_JSON = "application/json"; - private static final String API_ATLAS_TYPES = "api/atlas/types"; - private static final String API_ATLAS_ENTITIES = "api/atlas/entities?type="; - private static final String API_ATLAS_ENTITY = "api/atlas/entities/"; - private static final String API_ATLAS_TYPE = "api/atlas/types/"; - - private static final String RESULTS_ATTRIBUTE = "results"; - private static final String DEFINITION_ATTRIBUTE = "definition"; - private static final String VALUES_ATTRIBUTE = "values"; - private static final String TRAITS_ATTRIBUTE = "traits"; - private static final String TYPE_NAME_ATTRIBUTE = "typeName"; - private static final String TRAIT_TYPES_ATTRIBUTE = "traitTypes"; - private static final String SUPER_TYPES_ATTRIBUTE = "superTypes"; - private static final String ATTRIBUTE_DEFINITIONS_ATTRIBUTE = "attributeDefinitions"; - private static final String NAME_ATTRIBUTE = "name"; - - private final Gson gson = new Gson(); - - private final RangerRESTClient atlasRESTClient; - - private final boolean isKerberized; - - public AtlasRESTUtil(RangerRESTClient atlasRESTClient, boolean isKerberized) { - if (LOG.isDebugEnabled()) { - LOG.debug("==> AtlasRESTUtil()"); - } - - this.atlasRESTClient = atlasRESTClient; - - this.isKerberized = isKerberized; - - if (LOG.isDebugEnabled()) { - LOG.debug("<== AtlasRESTUtil()"); - } - } - - public List<AtlasEntityWithTraits> getAtlasEntities() { - if (LOG.isDebugEnabled()) { - LOG.debug("==> getAtlasEntities()"); - } - - List<AtlasEntityWithTraits> ret = new ArrayList<AtlasEntityWithTraits>(); - - Map<String, Object> typesResponse = atlasAPI(API_ATLAS_TYPES); - - List<String> types = getAttribute(typesResponse, RESULTS_ATTRIBUTE, List.class); - - if (CollectionUtils.isNotEmpty(types)) { - - for (String type : types) { - - if (!AtlasResourceMapperUtil.isEntityTypeHandled(type)) { - if (LOG.isDebugEnabled()) { - LOG.debug("Not fetching Atlas entities of type: " + type); - } - continue; - } - - Map<String, Object> entitiesResponse = atlasAPI(API_ATLAS_ENTITIES + type); - - List<String> guids = getAttribute(entitiesResponse, RESULTS_ATTRIBUTE, List.class); - - if (CollectionUtils.isEmpty(guids)) { - if (LOG.isDebugEnabled()) { - LOG.debug("No Atlas entities for type: " + type); - } - continue; - } - - for (String guid : guids) { - - Map<String, Object> entityResponse = atlasAPI(API_ATLAS_ENTITY + guid); - - Map<String, Object> definition = getAttribute(entityResponse, DEFINITION_ATTRIBUTE, Map.class); - - Map<String, Object> traitsAttribute = getAttribute(definition, TRAITS_ATTRIBUTE, Map.class); - - List<IStruct> allTraits = new LinkedList<>(); - - if (MapUtils.isNotEmpty(traitsAttribute)) { - - for (Map.Entry<String, Object> entry : traitsAttribute.entrySet()) { - - Map<String, Object> trait = (Map<String, Object>) entry.getValue(); - - Map<String, Object> traitValues = getAttribute(trait, VALUES_ATTRIBUTE, Map.class); - String traitTypeName = getAttribute(trait, TYPE_NAME_ATTRIBUTE, String.class); - - if (StringUtils.isEmpty(traitTypeName)) { - continue; - } - - List<IStruct> superTypes = getTraitSuperTypes(getTraitType(traitTypeName), traitValues); - - Struct trait1 = new Struct(traitTypeName, traitValues); - - allTraits.add(trait1); - allTraits.addAll(superTypes); - } - } - - IReferenceableInstance entity = InstanceSerialization.fromJsonReferenceable(gson.toJson(definition), true); - - if (entity != null) { - AtlasEntityWithTraits atlasEntity = new AtlasEntityWithTraits(entity, allTraits); - ret.add(atlasEntity); - } else { - if (LOG.isInfoEnabled()) { - LOG.info("Could not create Atlas entity from its definition, type=" + type + ", guid=" + guid); - } - } - - } - - } - if (LOG.isDebugEnabled()) { - LOG.debug("<== getAtlasEntities()"); - } - } - - return ret; - } - - private Map<String, Object> getTraitType(String traitName) { - if (LOG.isDebugEnabled()) { - LOG.debug("==> getTraitType(" + traitName + ")"); - } - Map<String, Object> ret = null; - - Map<String, Object> typeResponse = atlasAPI(API_ATLAS_TYPE + traitName); - - Map<String, Object> definition = getAttribute(typeResponse, DEFINITION_ATTRIBUTE, Map.class); - - List traitTypes = getAttribute(definition, TRAIT_TYPES_ATTRIBUTE, List.class); - - if (CollectionUtils.isNotEmpty(traitTypes)) { - ret = (Map<String, Object>) traitTypes.get(0); - } - - if (LOG.isDebugEnabled()) { - LOG.debug("<== getTraitType(" + traitName + ")"); - } - return ret; - } - - private List<IStruct> getTraitSuperTypes(Map<String, Object> traitType, Map<String, Object> values) { - - if (LOG.isDebugEnabled()) { - LOG.debug("==> getTraitSuperTypes()"); - } - List<IStruct> ret = new LinkedList<>(); - - if (traitType != null) { - - List<String> superTypeNames = getAttribute(traitType, SUPER_TYPES_ATTRIBUTE, List.class); - - if (CollectionUtils.isNotEmpty(superTypeNames)) { - for (String superTypeName : superTypeNames) { - - Map<String, Object> superTraitType = getTraitType(superTypeName); - - if (superTraitType != null) { - List<Map<String, Object>> attributeDefinitions = (List) superTraitType.get(ATTRIBUTE_DEFINITIONS_ATTRIBUTE); - - Map<String, Object> superTypeValues = new HashMap<>(); - for (Map<String, Object> attributeDefinition : attributeDefinitions) { - - String attributeName = attributeDefinition.get(NAME_ATTRIBUTE).toString(); - if (values.containsKey(attributeName)) { - superTypeValues.put(attributeName, values.get(attributeName)); - } - } - - List<IStruct> superTraits = getTraitSuperTypes(getTraitType(superTypeName), values); - - Struct superTrait = new Struct(superTypeName, superTypeValues); - - ret.add(superTrait); - ret.addAll(superTraits); - } - } - } - } - if (LOG.isDebugEnabled()) { - LOG.debug("<== getTraitSuperTypes()"); - } - return ret; - } - - private Map<String, Object> atlasAPI(final String endpoint) { - - if (LOG.isDebugEnabled()) { - LOG.debug("==> atlasAPI(" + endpoint + ")"); - } - Map<String, Object> ret = new HashMap<String, Object>(); - - try { - UserGroupInformation userGroupInformation = null; - if (isKerberized) { - userGroupInformation = UserGroupInformation.getLoginUser(); - - try { - userGroupInformation.checkTGTAndReloginFromKeytab(); - } catch (IOException ioe) { - LOG.error("Error renewing TGT and relogin", ioe); - userGroupInformation = null; - } - } - if (userGroupInformation != null) { - LOG.debug("Using kerberos authentication"); - if(LOG.isDebugEnabled()) { - LOG.debug("Using Principal = "+ userGroupInformation.getUserName()); - } - ret = userGroupInformation.doAs(new PrivilegedAction<Map<String, Object>>() { - @Override - public Map<String, Object> run() { - try{ - return executeAtlasAPI(endpoint); - }catch (Exception e) { - LOG.error("Atlas API failed with message : ", e); - } - return null; - } - }); - } else { - LOG.debug("Using basic authentication"); - ret = executeAtlasAPI(endpoint); - } - } catch (Exception exception) { - LOG.error("Exception when fetching Atlas objects.", exception); - ret = null; - } - - if (LOG.isDebugEnabled()) { - LOG.debug("<== atlasAPI(" + endpoint + ")"); - } - return ret; - } - - private Map<String, Object> executeAtlasAPI(final String endpoint) { - - if (LOG.isDebugEnabled()) { - LOG.debug("==> executeAtlasAPI(" + endpoint + ")"); - } - - Map<String, Object> ret = new HashMap<String, Object>(); - - try { - final WebResource webResource = atlasRESTClient.getResource(endpoint); - - ClientResponse response = webResource.accept(REST_MIME_TYPE_JSON).type(REST_MIME_TYPE_JSON).get(ClientResponse.class); - - if (response != null && response.getStatus() == 200) { - ret = response.getEntity(ret.getClass()); - } else { - RESTResponse resp = RESTResponse.fromClientResponse(response); - LOG.error("Error getting atlas data request=" + webResource.toString() - + ", response=" + resp.toString()); - } - } catch (Exception exception) { - LOG.error("Exception when fetching Atlas objects.", exception); - ret = null; - } - - if (LOG.isDebugEnabled()) { - LOG.debug("<== executeAtlasAPI(" + endpoint + ")"); - } - - return ret; - } - - private <T> T getAttribute(Map<String, Object> map, String name, Class<T> type) { - return MapUtils.isNotEmpty(map) ? type.cast(map.get(name)) : null; - } - -}