Repository: incubator-atlas Updated Branches: refs/heads/master 82142995f -> 98990b8d0
ATLAS-1622: Full text mapping using v2 model Signed-off-by: Madhan Neethiraj <[email protected]> Project: http://git-wip-us.apache.org/repos/asf/incubator-atlas/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-atlas/commit/98990b8d Tree: http://git-wip-us.apache.org/repos/asf/incubator-atlas/tree/98990b8d Diff: http://git-wip-us.apache.org/repos/asf/incubator-atlas/diff/98990b8d Branch: refs/heads/master Commit: 98990b8d0cf6b90916c300e31b6aa4ad8e7f0d4c Parents: 8214299 Author: apoorvnaik <[email protected]> Authored: Thu Mar 2 16:18:22 2017 -0800 Committer: Madhan Neethiraj <[email protected]> Committed: Thu Mar 2 17:57:18 2017 -0800 ---------------------------------------------------------------------- .../graph/DefaultGraphPersistenceStrategy.java | 2 +- .../atlas/repository/graph/FullTextMapper.java | 2 +- .../repository/graph/FullTextMapperV2.java | 177 +++++++++++++++++++ .../graph/GraphBackedMetadataRepository.java | 4 +- .../graph/GraphToTypedInstanceMapper.java | 4 +- .../graph/v1/AtlasEntityChangeNotifier.java | 19 +- .../java/org/apache/atlas/RequestContext.java | 41 ++++- 7 files changed, 223 insertions(+), 26 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/incubator-atlas/blob/98990b8d/repository/src/main/java/org/apache/atlas/discovery/graph/DefaultGraphPersistenceStrategy.java ---------------------------------------------------------------------- diff --git a/repository/src/main/java/org/apache/atlas/discovery/graph/DefaultGraphPersistenceStrategy.java b/repository/src/main/java/org/apache/atlas/discovery/graph/DefaultGraphPersistenceStrategy.java index 1f2a754..9b0aa4c 100755 --- a/repository/src/main/java/org/apache/atlas/discovery/graph/DefaultGraphPersistenceStrategy.java +++ b/repository/src/main/java/org/apache/atlas/discovery/graph/DefaultGraphPersistenceStrategy.java @@ -183,7 +183,7 @@ public class DefaultGraphPersistenceStrategy implements GraphPersistenceStrategi AtlasVertex classVertex = (AtlasVertex) value; String guid = classVertex.getProperty(Constants.GUID_PROPERTY_KEY, String.class); // Check if the instance we need was previously loaded. - ITypedReferenceableInstance classInstance = RequestContext.get().getInstance(guid); + ITypedReferenceableInstance classInstance = RequestContext.get().getInstanceV1(guid); if (classInstance == null) { classInstance = metadataRepository.getGraphToInstanceMapper().mapGraphToTypedInstance(guid, classVertex); } http://git-wip-us.apache.org/repos/asf/incubator-atlas/blob/98990b8d/repository/src/main/java/org/apache/atlas/repository/graph/FullTextMapper.java ---------------------------------------------------------------------- diff --git a/repository/src/main/java/org/apache/atlas/repository/graph/FullTextMapper.java b/repository/src/main/java/org/apache/atlas/repository/graph/FullTextMapper.java index 89f48ed..d0a9ade 100644 --- a/repository/src/main/java/org/apache/atlas/repository/graph/FullTextMapper.java +++ b/repository/src/main/java/org/apache/atlas/repository/graph/FullTextMapper.java @@ -55,7 +55,7 @@ public class FullTextMapper { String guid = GraphHelper.getGuid(instanceVertex); ITypedReferenceableInstance typedReference; RequestContext context = RequestContext.get(); - typedReference = context.getInstance(guid); + typedReference = context.getInstanceV1(guid); if (typedReference != null) { if (LOG.isDebugEnabled()) { http://git-wip-us.apache.org/repos/asf/incubator-atlas/blob/98990b8d/repository/src/main/java/org/apache/atlas/repository/graph/FullTextMapperV2.java ---------------------------------------------------------------------- diff --git a/repository/src/main/java/org/apache/atlas/repository/graph/FullTextMapperV2.java b/repository/src/main/java/org/apache/atlas/repository/graph/FullTextMapperV2.java new file mode 100644 index 0000000..2eb2d52 --- /dev/null +++ b/repository/src/main/java/org/apache/atlas/repository/graph/FullTextMapperV2.java @@ -0,0 +1,177 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.atlas.repository.graph; + +import com.google.inject.Singleton; +import org.apache.atlas.ApplicationProperties; +import org.apache.atlas.RequestContext; +import org.apache.atlas.exception.AtlasBaseException; +import org.apache.atlas.model.instance.AtlasClassification; +import org.apache.atlas.model.instance.AtlasEntity; +import org.apache.atlas.model.instance.AtlasEntity.AtlasEntityExtInfo; +import org.apache.atlas.model.instance.AtlasEntity.AtlasEntityWithExtInfo; +import org.apache.atlas.model.instance.AtlasObjectId; +import org.apache.atlas.model.instance.AtlasStruct; +import org.apache.atlas.repository.store.graph.v1.EntityGraphRetriever; +import org.apache.atlas.type.AtlasTypeRegistry; +import org.apache.commons.collections.CollectionUtils; +import org.apache.commons.configuration.Configuration; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import javax.inject.Inject; +import java.util.HashSet; +import java.util.List; +import java.util.Map; +import java.util.Set; + + +@Singleton +public class FullTextMapperV2 { + private static final Logger LOG = LoggerFactory.getLogger(FullTextMapperV2.class); + + private static final String FULL_TEXT_DELIMITER = " "; + private static final String FULL_TEXT_FOLLOW_REFERENCES = "atlas.search.fulltext.followReferences"; + + private final EntityGraphRetriever entityGraphRetriever; + private final boolean followReferences; + + @Inject + public FullTextMapperV2(AtlasTypeRegistry typeRegistry) { + this.entityGraphRetriever = new EntityGraphRetriever(typeRegistry); + + Configuration configuration = null; + + try { + configuration = ApplicationProperties.get(); + } catch (Throwable e) { + if (LOG.isDebugEnabled()) { + LOG.debug("AtlasApplication properties couldn't be loaded", e); + } + } finally { + followReferences = configuration != null && configuration.getBoolean(FULL_TEXT_FOLLOW_REFERENCES, false); + } + } + + public String map(String guid) throws AtlasBaseException { + String ret = null; + RequestContext context = RequestContext.get(); + AtlasEntityWithExtInfo entity = context.getInstanceV2(guid); + + if (entity == null) { + entity = entityGraphRetriever.toAtlasEntityWithExtInfo(guid); + + if (entity != null) { + context.cache(entity); + + if (LOG.isDebugEnabled()) { + LOG.debug("Cache miss -> GUID = {}", guid); + } + } + } + + if (entity != null) { + StringBuilder sb = new StringBuilder(); + + map(entity.getEntity(), entity, sb, new HashSet<String>()); + + ret = sb.toString(); + } + + if (LOG.isDebugEnabled()) { + LOG.debug("FullTextMapperV2.map({}): {}", guid, ret); + } + + return ret; + } + + private void map(AtlasEntity entity, AtlasEntityExtInfo entityExtInfo, StringBuilder sb, Set<String> processedGuids) throws AtlasBaseException { + if (entity == null || processedGuids.contains(entity.getGuid())) { + return; + } + + processedGuids.add(entity.getGuid()); + + sb.append(entity.getTypeName()).append(FULL_TEXT_DELIMITER); + + mapAttributes(entity.getAttributes(), entityExtInfo, sb, processedGuids); + + List<AtlasClassification> classifications = entity.getClassifications(); + if (CollectionUtils.isNotEmpty(classifications)) { + for (AtlasClassification classification : classifications) { + sb.append(classification.getTypeName()).append(FULL_TEXT_DELIMITER); + + mapAttributes(classification.getAttributes(), entityExtInfo, sb, processedGuids); + } + } + } + + private void mapAttributes(Map<String, Object> attributes, AtlasEntityExtInfo entityExtInfo, StringBuilder sb, Set<String> processedGuids) throws AtlasBaseException { + for (Map.Entry<String, Object> attributeEntry : attributes.entrySet()) { + String attribKey = attributeEntry.getKey(); + Object attrValue = attributeEntry.getValue(); + + if (attrValue == null) { + continue; + } + + sb.append(attribKey).append(FULL_TEXT_DELIMITER); + + mapAttribute(attrValue, entityExtInfo, sb, processedGuids); + } + } + + private void mapAttribute(Object value, AtlasEntityExtInfo entityExtInfo, StringBuilder sb, Set<String> processedGuids) throws AtlasBaseException { + if (value instanceof AtlasObjectId) { + if (followReferences) { + AtlasObjectId objectId = (AtlasObjectId) value; + AtlasEntity entity = entityExtInfo.getEntity(objectId.getGuid()); + + if (entity != null) { + map(entity, entityExtInfo, sb, processedGuids); + } + } + } else if (value instanceof List) { + List valueList = (List) value; + + for (Object listElement : valueList) { + mapAttribute(listElement, entityExtInfo, sb, processedGuids); + } + } else if (value instanceof Map) { + Map valueMap = (Map) value; + + for (Object key : valueMap.keySet()) { + mapAttribute(key, entityExtInfo, sb, processedGuids); + mapAttribute(valueMap.get(key), entityExtInfo, sb, processedGuids); + } + } else if (value instanceof Enum) { + Enum enumValue = (Enum) value; + + sb.append(enumValue.name()).append(FULL_TEXT_DELIMITER); + } else if (value instanceof AtlasStruct) { + AtlasStruct atlasStruct = (AtlasStruct) value; + + for (Map.Entry<String, Object> entry : atlasStruct.getAttributes().entrySet()) { + sb.append(entry.getKey()).append(FULL_TEXT_DELIMITER); + mapAttribute(entry.getValue(), entityExtInfo, sb, processedGuids); + } + } else { + sb.append(String.valueOf(value)).append(FULL_TEXT_DELIMITER); + } + } +} http://git-wip-us.apache.org/repos/asf/incubator-atlas/blob/98990b8d/repository/src/main/java/org/apache/atlas/repository/graph/GraphBackedMetadataRepository.java ---------------------------------------------------------------------- diff --git a/repository/src/main/java/org/apache/atlas/repository/graph/GraphBackedMetadataRepository.java b/repository/src/main/java/org/apache/atlas/repository/graph/GraphBackedMetadataRepository.java index 4461447..0faafb0 100755 --- a/repository/src/main/java/org/apache/atlas/repository/graph/GraphBackedMetadataRepository.java +++ b/repository/src/main/java/org/apache/atlas/repository/graph/GraphBackedMetadataRepository.java @@ -197,7 +197,7 @@ public class GraphBackedMetadataRepository implements MetadataRepository { String guid = guids[i]; // First, check the cache. - ITypedReferenceableInstance cached = context.getInstance(guid); + ITypedReferenceableInstance cached = context.getInstanceV1(guid); if (cached != null) { result[i] = cached; } else { @@ -254,7 +254,7 @@ public class GraphBackedMetadataRepository implements MetadataRepository { Constants.STATE_PROPERTY_KEY, Id.EntityState.ACTIVE.name()); String guid = GraphHelper.getGuid(instanceVertex); - ITypedReferenceableInstance cached = RequestContext.get().getInstance(guid); + ITypedReferenceableInstance cached = RequestContext.get().getInstanceV1(guid); if(cached != null) { return cached; } http://git-wip-us.apache.org/repos/asf/incubator-atlas/blob/98990b8d/repository/src/main/java/org/apache/atlas/repository/graph/GraphToTypedInstanceMapper.java ---------------------------------------------------------------------- diff --git a/repository/src/main/java/org/apache/atlas/repository/graph/GraphToTypedInstanceMapper.java b/repository/src/main/java/org/apache/atlas/repository/graph/GraphToTypedInstanceMapper.java index dfa4407..9e54c2d 100644 --- a/repository/src/main/java/org/apache/atlas/repository/graph/GraphToTypedInstanceMapper.java +++ b/repository/src/main/java/org/apache/atlas/repository/graph/GraphToTypedInstanceMapper.java @@ -73,7 +73,7 @@ public final class GraphToTypedInstanceMapper { //We don't do a cache check here since we want that to be at a higher level //where the vertex lookup can also be avoided. However, this is a convenient //place to add a check to see if there are any places that were missed. - if(RequestContext.get().getInstance(guid) != null) { + if(RequestContext.get().getInstanceV1(guid) != null) { LOG.warn("Looking up previously cached guid at: ", new Exception()); } @@ -216,7 +216,7 @@ public final class GraphToTypedInstanceMapper { if (attributeInfo.isComposite) { //Also, when you retrieve a type's instance, you get the complete object graph of the composites LOG.debug("Found composite, mapping vertex to instance"); - ITypedReferenceableInstance cached = RequestContext.get().getInstance(guid); + ITypedReferenceableInstance cached = RequestContext.get().getInstanceV1(guid); if(cached != null) { return cached; } http://git-wip-us.apache.org/repos/asf/incubator-atlas/blob/98990b8d/repository/src/main/java/org/apache/atlas/repository/store/graph/v1/AtlasEntityChangeNotifier.java ---------------------------------------------------------------------- diff --git a/repository/src/main/java/org/apache/atlas/repository/store/graph/v1/AtlasEntityChangeNotifier.java b/repository/src/main/java/org/apache/atlas/repository/store/graph/v1/AtlasEntityChangeNotifier.java index e112b64..5243f36 100644 --- a/repository/src/main/java/org/apache/atlas/repository/store/graph/v1/AtlasEntityChangeNotifier.java +++ b/repository/src/main/java/org/apache/atlas/repository/store/graph/v1/AtlasEntityChangeNotifier.java @@ -51,23 +51,15 @@ public class AtlasEntityChangeNotifier { private final Set<EntityChangeListener> entityChangeListeners; private final AtlasInstanceConverter instanceConverter; - private final FullTextMapper fullTextMapper; @Inject - private DeleteHandler deleteHandler; + private FullTextMapperV2 fullTextMapperV2; @Inject public AtlasEntityChangeNotifier(Set<EntityChangeListener> entityChangeListeners, AtlasInstanceConverter instanceConverter) { this.entityChangeListeners = entityChangeListeners; this.instanceConverter = instanceConverter; - - // This is only needed for the Legacy FullTextMapper, once the V2 changes are in place this can be replaced/removed - AtlasGraphProvider graphProvider = new AtlasGraphProvider(); - GraphToTypedInstanceMapper graphToTypedInstanceMapper = new GraphToTypedInstanceMapper(graphProvider); - TypedInstanceToGraphMapper typedInstanceToGraphMapper = new TypedInstanceToGraphMapper(graphToTypedInstanceMapper, deleteHandler); - - this.fullTextMapper = new FullTextMapper(typedInstanceToGraphMapper, graphToTypedInstanceMapper); } public void onEntitiesMutated(EntityMutationResponse entityMutationResponse) throws AtlasBaseException { @@ -208,18 +200,19 @@ public class AtlasEntityChangeNotifier { } for (AtlasEntityHeader atlasEntityHeader : atlasEntityHeaders) { - AtlasVertex atlasVertex = AtlasGraphUtilsV1.findByGuid(atlasEntityHeader.getGuid()); + String guid = atlasEntityHeader.getGuid(); + AtlasVertex atlasVertex = AtlasGraphUtilsV1.findByGuid(guid); if(atlasVertex == null) { continue; } try { - String fullText = fullTextMapper.mapRecursive(atlasVertex, true); + String fullText = fullTextMapperV2.map(guid); GraphHelper.setProperty(atlasVertex, Constants.ENTITY_TEXT_PROPERTY_KEY, fullText); - } catch (AtlasException e) { - LOG.error("FullText mapping failed for Vertex[ guid = {} ]", atlasEntityHeader.getGuid(), e); + } catch (AtlasBaseException e) { + LOG.error("FullText mapping failed for Vertex[ guid = {} ]", guid, e); } } } http://git-wip-us.apache.org/repos/asf/incubator-atlas/blob/98990b8d/server-api/src/main/java/org/apache/atlas/RequestContext.java ---------------------------------------------------------------------- diff --git a/server-api/src/main/java/org/apache/atlas/RequestContext.java b/server-api/src/main/java/org/apache/atlas/RequestContext.java index bb11d67..cf56f08 100644 --- a/server-api/src/main/java/org/apache/atlas/RequestContext.java +++ b/server-api/src/main/java/org/apache/atlas/RequestContext.java @@ -27,6 +27,7 @@ import java.util.Map; import java.util.Set; import org.apache.atlas.metrics.Metrics; +import org.apache.atlas.model.instance.AtlasEntity.AtlasEntityWithExtInfo; import org.apache.atlas.typesystem.ITypedReferenceableInstance; import org.apache.atlas.typesystem.persistence.Id; import org.apache.atlas.typesystem.types.ClassType; @@ -43,12 +44,13 @@ public class RequestContext { private Set<String> updatedEntityIds = new LinkedHashSet<>(); private Set<String> deletedEntityIds = new LinkedHashSet<>(); private List<ITypedReferenceableInstance> deletedEntities = new ArrayList<>(); - private Map<String,ITypedReferenceableInstance> entityCache = new HashMap<>(); + private Map<String,ITypedReferenceableInstance> entityCacheV1 = new HashMap<>(); + private Map<String,AtlasEntityWithExtInfo> entityCacheV2 = new HashMap<>(); private String user; private long requestTime; - TypeSystem typeSystem = TypeSystem.getInstance(); + private TypeSystem typeSystem = TypeSystem.getInstance(); private Metrics metrics = new Metrics(); private RequestContext() { @@ -83,7 +85,28 @@ public class RequestContext { * */ public void cache(ITypedReferenceableInstance instance) { - entityCache.put(instance.getId()._getId(), instance); + entityCacheV1.put(instance.getId()._getId(), instance); + } + + /** + * Adds the specified instance to the cache + * + */ + public void cache(AtlasEntityWithExtInfo entity) { + if (entity != null && entity.getEntity() != null && entity.getEntity().getGuid() != null) { + entityCacheV2.put(entity.getEntity().getGuid(), entity); + } + } + + /** + * Checks if an instance with the given guid is in the cache for this request. Either returns the instance + * or null if it is not in the cache. + * + * @param guid the guid to find + * @return Either the instance or null if it is not in the cache. + */ + public ITypedReferenceableInstance getInstanceV1(String guid) { + return entityCacheV1.get(guid); } /** @@ -93,16 +116,20 @@ public class RequestContext { * @param guid the guid to find * @return Either the instance or null if it is not in the cache. */ - public ITypedReferenceableInstance getInstance(String guid) { - return entityCache.get(guid); + public AtlasEntityWithExtInfo getInstanceV2(String guid) { + return entityCacheV2.get(guid); } public static void clear() { RequestContext instance = CURRENT_CONTEXT.get(); if (instance != null) { - if (instance.entityCache != null) { - instance.entityCache.clear(); + if (instance.entityCacheV1 != null) { + instance.entityCacheV1.clear(); + } + + if (instance.entityCacheV2 != null) { + instance.entityCacheV2.clear(); } }
