Repository: incubator-atlas Updated Branches: refs/heads/master 41839141b -> 2dd0f0709
ATLAS-1684: export should include super-type definitions, import should preserve system attribute values Project: http://git-wip-us.apache.org/repos/asf/incubator-atlas/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-atlas/commit/2dd0f070 Tree: http://git-wip-us.apache.org/repos/asf/incubator-atlas/tree/2dd0f070 Diff: http://git-wip-us.apache.org/repos/asf/incubator-atlas/diff/2dd0f070 Branch: refs/heads/master Commit: 2dd0f0709cbfb4dd9fbd8421640ee4067b730629 Parents: 4183914 Author: Madhan Neethiraj <[email protected]> Authored: Sat Mar 25 02:58:19 2017 -0700 Committer: Madhan Neethiraj <[email protected]> Committed: Sun Mar 26 18:21:38 2017 -0700 ---------------------------------------------------------------------- .../graph/v1/AtlasEntityChangeNotifier.java | 3 + .../store/graph/v1/AtlasEntityStoreV1.java | 9 +- .../store/graph/v1/EntityGraphMapper.java | 23 +++ .../atlas/web/resources/ExportService.java | 157 ++++++++++++++++--- .../atlas/web/resources/ImportService.java | 7 +- 5 files changed, 173 insertions(+), 26 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/incubator-atlas/blob/2dd0f070/repository/src/main/java/org/apache/atlas/repository/store/graph/v1/AtlasEntityChangeNotifier.java ---------------------------------------------------------------------- diff --git a/repository/src/main/java/org/apache/atlas/repository/store/graph/v1/AtlasEntityChangeNotifier.java b/repository/src/main/java/org/apache/atlas/repository/store/graph/v1/AtlasEntityChangeNotifier.java index 8dd3556..0439ada 100644 --- a/repository/src/main/java/org/apache/atlas/repository/store/graph/v1/AtlasEntityChangeNotifier.java +++ b/repository/src/main/java/org/apache/atlas/repository/store/graph/v1/AtlasEntityChangeNotifier.java @@ -233,6 +233,9 @@ public class AtlasEntityChangeNotifier { } AtlasVertex atlasVertex = AtlasGraphUtilsV1.findByGuid(entityId); + if(atlasVertex == null) { + return; + } if (atlasVertex == null) { LOG.warn("updateFullTextMapping(): no entity exists with guid {}", entityId); http://git-wip-us.apache.org/repos/asf/incubator-atlas/blob/2dd0f070/repository/src/main/java/org/apache/atlas/repository/store/graph/v1/AtlasEntityStoreV1.java ---------------------------------------------------------------------- diff --git a/repository/src/main/java/org/apache/atlas/repository/store/graph/v1/AtlasEntityStoreV1.java b/repository/src/main/java/org/apache/atlas/repository/store/graph/v1/AtlasEntityStoreV1.java index fa4c051..32b1ea8 100644 --- a/repository/src/main/java/org/apache/atlas/repository/store/graph/v1/AtlasEntityStoreV1.java +++ b/repository/src/main/java/org/apache/atlas/repository/store/graph/v1/AtlasEntityStoreV1.java @@ -170,10 +170,6 @@ public class AtlasEntityStoreV1 implements AtlasEntityStore { EntityMutationResponse resp = createOrUpdate(oneEntityStream, false, true); - if(CollectionUtils.isNotEmpty(entity.getClassifications())) { - addClassifications(entity.getGuid(), entity.getClassifications()); - } - updateImportMetrics("entity:%s:created", resp.getCreatedEntities(), processedGuids, importResult); updateImportMetrics("entity:%s:updated", resp.getUpdatedEntities(), processedGuids, importResult); updateImportMetrics("entity:%s:deleted", resp.getDeletedEntities(), processedGuids, importResult); @@ -567,6 +563,11 @@ public class AtlasEntityStoreV1 implements AtlasEntityStore { context.addCreated(guid, entity, entityType, vertex); } + + // during import, update the system attributes + if (entityStream instanceof EntityImportStream) { + entityGraphMapper.updateSystemAttributes(vertex, entity); + } } } http://git-wip-us.apache.org/repos/asf/incubator-atlas/blob/2dd0f070/repository/src/main/java/org/apache/atlas/repository/store/graph/v1/EntityGraphMapper.java ---------------------------------------------------------------------- diff --git a/repository/src/main/java/org/apache/atlas/repository/store/graph/v1/EntityGraphMapper.java b/repository/src/main/java/org/apache/atlas/repository/store/graph/v1/EntityGraphMapper.java index 29bda93..9d11aa5 100644 --- a/repository/src/main/java/org/apache/atlas/repository/store/graph/v1/EntityGraphMapper.java +++ b/repository/src/main/java/org/apache/atlas/repository/store/graph/v1/EntityGraphMapper.java @@ -51,6 +51,7 @@ import org.apache.atlas.type.AtlasTypeUtil; import org.apache.atlas.type.AtlasStructType.AtlasAttribute; import org.apache.commons.collections.CollectionUtils; import org.apache.commons.collections.MapUtils; +import org.apache.commons.lang3.StringUtils; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -112,6 +113,28 @@ public class EntityGraphMapper { return ret; } + public void updateSystemAttributes(AtlasVertex vertex, AtlasEntity entity) { + if (entity.getStatus() != null) { + AtlasGraphUtilsV1.setProperty(vertex, Constants.STATE_PROPERTY_KEY, entity.getStatus().name()); + } + + if (entity.getCreateTime() != null) { + AtlasGraphUtilsV1.setProperty(vertex, Constants.TIMESTAMP_PROPERTY_KEY, entity.getCreateTime().getTime()); + } + + if (entity.getUpdateTime() != null) { + AtlasGraphUtilsV1.setProperty(vertex, Constants.MODIFICATION_TIMESTAMP_PROPERTY_KEY, entity.getUpdateTime().getTime()); + } + + if (StringUtils.isNotEmpty(entity.getCreatedBy())) { + AtlasGraphUtilsV1.setProperty(vertex, Constants.CREATED_BY_KEY, entity.getCreatedBy()); + } + + if (StringUtils.isNotEmpty(entity.getUpdatedBy())) { + AtlasGraphUtilsV1.setProperty(vertex, Constants.MODIFIED_BY_KEY, entity.getUpdatedBy()); + } + } + public EntityMutationResponse mapAttributesAndClassifications(EntityMutationContext context, final boolean isPartialUpdate, final boolean replaceClassifications) throws AtlasBaseException { EntityMutationResponse resp = new EntityMutationResponse(); http://git-wip-us.apache.org/repos/asf/incubator-atlas/blob/2dd0f070/webapp/src/main/java/org/apache/atlas/web/resources/ExportService.java ---------------------------------------------------------------------- diff --git a/webapp/src/main/java/org/apache/atlas/web/resources/ExportService.java b/webapp/src/main/java/org/apache/atlas/web/resources/ExportService.java index 54faee0..ffdbfac 100644 --- a/webapp/src/main/java/org/apache/atlas/web/resources/ExportService.java +++ b/webapp/src/main/java/org/apache/atlas/web/resources/ExportService.java @@ -21,6 +21,7 @@ import org.apache.atlas.AtlasErrorCode; import org.apache.atlas.AtlasException; import org.apache.atlas.AtlasServiceException; import org.apache.atlas.exception.AtlasBaseException; +import org.apache.atlas.model.TypeCategory; import org.apache.atlas.model.impexp.AtlasExportRequest; import org.apache.atlas.model.impexp.AtlasExportResult; import org.apache.atlas.model.instance.AtlasClassification; @@ -29,13 +30,22 @@ import org.apache.atlas.model.instance.AtlasEntity.AtlasEntityWithExtInfo; import org.apache.atlas.model.instance.AtlasObjectId; import org.apache.atlas.model.typedef.AtlasBaseTypeDef; import org.apache.atlas.model.typedef.AtlasClassificationDef; +import org.apache.atlas.model.typedef.AtlasEnumDef; import org.apache.atlas.model.typedef.AtlasEntityDef; +import org.apache.atlas.model.typedef.AtlasStructDef; +import org.apache.atlas.model.typedef.AtlasStructDef.AtlasAttributeDef; import org.apache.atlas.model.typedef.AtlasTypesDef; import org.apache.atlas.repository.graph.AtlasGraphProvider; import org.apache.atlas.repository.graphdb.AtlasGraph; import org.apache.atlas.repository.store.graph.v1.EntityGraphRetriever; +import org.apache.atlas.type.AtlasArrayType; +import org.apache.atlas.type.AtlasClassificationType; +import org.apache.atlas.type.AtlasEnumType; import org.apache.atlas.type.AtlasEntityType; +import org.apache.atlas.type.AtlasMapType; +import org.apache.atlas.type.AtlasStructType; import org.apache.atlas.type.AtlasStructType.AtlasAttribute; +import org.apache.atlas.type.AtlasType; import org.apache.atlas.type.AtlasTypeRegistry; import org.apache.atlas.type.AtlasTypeUtil; import org.apache.atlas.util.AtlasGremlinQueryProvider; @@ -88,6 +98,32 @@ public class ExportService { long endTime = System.currentTimeMillis(); + AtlasTypesDef typesDef = context.result.getData().getTypesDef(); + + for (String entityType : context.entityTypes) { + AtlasEntityDef entityDef = typeRegistry.getEntityDefByName(entityType); + + typesDef.getEntityDefs().add(entityDef); + } + + for (String classificationType : context.classificationTypes) { + AtlasClassificationDef classificationDef = typeRegistry.getClassificationDefByName(classificationType); + + typesDef.getClassificationDefs().add(classificationDef); + } + + for (String structType : context.structTypes) { + AtlasStructDef structDef = typeRegistry.getStructDefByName(structType); + + typesDef.getStructDefs().add(structDef); + } + + for (String enumType : context.enumTypes) { + AtlasEnumDef enumDef = typeRegistry.getEnumDefByName(enumType); + + typesDef.getEnumDefs().add(enumDef); + } + context.sink.setExportOrder(context.result.getData().getEntityCreationOrder()); context.sink.setTypesDef(context.result.getData().getTypesDef()); context.result.setData(null); @@ -221,16 +257,14 @@ public class ExportService { context.result.getData().getEntityCreationOrder().add(entityWithExtInfo.getEntity().getGuid()); addEntity(entityWithExtInfo, context); - addTypesAsNeeded(entityWithExtInfo.getEntity().getTypeName(), context); - addClassificationsAsNeeded(entityWithExtInfo.getEntity(), context); + addTypes(entityWithExtInfo.getEntity(), context); context.guidsProcessed.add(entityWithExtInfo.getEntity().getGuid()); getConntedEntitiesBasedOnOption(entityWithExtInfo.getEntity(), context, direction); if(entityWithExtInfo.getReferredEntities() != null) { for (AtlasEntity e : entityWithExtInfo.getReferredEntities().values()) { - addTypesAsNeeded(e.getTypeName(), context); - addClassificationsAsNeeded(e, context); + addTypes(e, context); getConntedEntitiesBasedOnOption(e, context, direction); } @@ -371,33 +405,114 @@ public class ExportService { context.reportProgress(); } - private void addClassificationsAsNeeded(AtlasEntity entity, ExportContext context) { - AtlasExportResult result = context.result; - AtlasTypesDef typesDef = result.getData().getTypesDef(); + private void addTypes(AtlasEntity entity, ExportContext context) { + addEntityType(entity.getTypeName(), context); if(CollectionUtils.isNotEmpty(entity.getClassifications())) { for (AtlasClassification c : entity.getClassifications()) { - if (typesDef.hasClassificationDef(c.getTypeName())) { - continue; + addClassificationType(c.getTypeName(), context); + } + } + } + + private void addType(String typeName, ExportContext context) { + AtlasType type = null; + + try { + type = typeRegistry.getType(typeName); + + addType(type, context); + } catch (AtlasBaseException excp) { + LOG.error("unknown type {}", typeName); + } + } + + private void addEntityType(String typeName, ExportContext context) { + if (!context.entityTypes.contains(typeName)) { + AtlasEntityType entityType = typeRegistry.getEntityTypeByName(typeName); + + addEntityType(entityType, context); + } + } + + private void addClassificationType(String typeName, ExportContext context) { + if (!context.classificationTypes.contains(typeName)) { + AtlasClassificationType classificationType = typeRegistry.getClassificationTypeByName(typeName); + + addClassificationType(classificationType, context); + } + } + + private void addType(AtlasType type, ExportContext context) { + if (type.getTypeCategory() == TypeCategory.PRIMITIVE) { + return; + } + + if (type instanceof AtlasArrayType) { + AtlasArrayType arrayType = (AtlasArrayType)type; + + addType(arrayType.getElementType(), context); + } else if (type instanceof AtlasMapType) { + AtlasMapType mapType = (AtlasMapType)type; + + addType(mapType.getKeyType(), context); + addType(mapType.getValueType(), context); + } else if (type instanceof AtlasEntityType) { + addEntityType((AtlasEntityType)type, context); + } else if (type instanceof AtlasClassificationType) { + addClassificationType((AtlasClassificationType)type, context); + } else if (type instanceof AtlasStructType) { + addStructType((AtlasStructType)type, context); + } else if (type instanceof AtlasEnumType) { + addEnumType((AtlasEnumType)type, context); + } + } + + private void addEntityType(AtlasEntityType entityType, ExportContext context) { + if (!context.entityTypes.contains(entityType.getTypeName())) { + context.entityTypes.add(entityType.getTypeName()); + + addAttributeTypes(entityType, context); + + if (CollectionUtils.isNotEmpty(entityType.getAllSuperTypes())) { + for (String superType : entityType.getAllSuperTypes()) { + addEntityType(superType, context); } + } + } + } + + private void addClassificationType(AtlasClassificationType classificationType, ExportContext context) { + if (!context.classificationTypes.contains(classificationType.getTypeName())) { + context.classificationTypes.add(classificationType.getTypeName()); - AtlasClassificationDef cd = typeRegistry.getClassificationDefByName(c.getTypeName()); + addAttributeTypes(classificationType, context); - typesDef.getClassificationDefs().add(cd); - result.incrementMeticsCounter("typedef:classification"); + if (CollectionUtils.isNotEmpty(classificationType.getAllSuperTypes())) { + for (String superType : classificationType.getAllSuperTypes()) { + addClassificationType(superType, context); + } } } } - private void addTypesAsNeeded(String typeName, ExportContext context) { - AtlasExportResult result = context.result; - AtlasTypesDef typesDef = result.getData().getTypesDef(); + private void addStructType(AtlasStructType structType, ExportContext context) { + if (!context.structTypes.contains(structType.getTypeName())) { + context.structTypes.add(structType.getTypeName()); + + addAttributeTypes(structType, context); + } + } - if(!typesDef.hasEntityDef(typeName)) { - AtlasEntityDef typeDefinition = typeRegistry.getEntityDefByName(typeName); + private void addEnumType(AtlasEnumType enumType, ExportContext context) { + if (!context.enumTypes.contains(enumType.getTypeName())) { + context.enumTypes.add(enumType.getTypeName()); + } + } - typesDef.getEntityDefs().add(typeDefinition); - result.incrementMeticsCounter("typedef:" + typeDefinition.getName()); + private void addAttributeTypes(AtlasStructType structType, ExportContext context) { + for (AtlasAttributeDef attributeDef : structType.getStructDef().getAttributeDefs()) { + addType(attributeDef.getTypeName(), context); } } @@ -499,6 +614,10 @@ public class ExportService { final UniqueList<String> guidsToProcess = new UniqueList<>(); final UniqueList<String> guidsLineageToProcess = new UniqueList<>(); final Map<String, TraversalDirection> guidDirection = new HashMap<>(); + final Set<String> entityTypes = new HashSet<>(); + final Set<String> classificationTypes = new HashSet<>(); + final Set<String> structTypes = new HashSet<>(); + final Set<String> enumTypes = new HashSet<>(); final AtlasExportResult result; final ZipSink sink; http://git-wip-us.apache.org/repos/asf/incubator-atlas/blob/2dd0f070/webapp/src/main/java/org/apache/atlas/web/resources/ImportService.java ---------------------------------------------------------------------- diff --git a/webapp/src/main/java/org/apache/atlas/web/resources/ImportService.java b/webapp/src/main/java/org/apache/atlas/web/resources/ImportService.java index 857553d..eb81e3c 100644 --- a/webapp/src/main/java/org/apache/atlas/web/resources/ImportService.java +++ b/webapp/src/main/java/org/apache/atlas/web/resources/ImportService.java @@ -120,13 +120,14 @@ public class ImportService { private void processTypes(AtlasTypesDef typeDefinitionMap, AtlasImportResult result) throws AtlasBaseException { setGuidToEmpty(typeDefinitionMap); + AtlasTypesDef typesToCreate = AtlasTypeDefStoreInitializer.getTypesToCreate(typeDefinitionMap, this.typeRegistry); + if (!typesToCreate.isEmpty()) { typeDefStore.createTypesDef(typesToCreate); - } - typeDefStore.updateTypesDef(typeDefinitionMap); - updateMetricsForTypesDef(typeDefinitionMap, result); + updateMetricsForTypesDef(typesToCreate, result); + } } private void updateMetricsForTypesDef(AtlasTypesDef typeDefinitionMap, AtlasImportResult result) {
