ATLAS-1709: Unit tests for import and export APIs Signed-off-by: Sarath Subramanian <[email protected]> (cherry picked from commit 8a32ccaae6ce24f52371a2802c29973db59244d3)
Project: http://git-wip-us.apache.org/repos/asf/incubator-atlas/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-atlas/commit/07e7faa6 Tree: http://git-wip-us.apache.org/repos/asf/incubator-atlas/tree/07e7faa6 Diff: http://git-wip-us.apache.org/repos/asf/incubator-atlas/diff/07e7faa6 Branch: refs/heads/0.8-incubating Commit: 07e7faa6afbb55ba0302ccd93af95d612929f4f9 Parents: 6436f8c Author: ashutoshm <[email protected]> Authored: Wed Apr 12 14:32:29 2017 -0700 Committer: Sarath Subramanian <[email protected]> Committed: Wed Apr 12 14:35:53 2017 -0700 ---------------------------------------------------------------------- .../atlas/repository/impexp/ExportService.java | 692 +++++++++++++++++++ .../atlas/repository/impexp/ImportService.java | 168 +++++ .../repository/impexp/ZipExportFileNames.java | 34 + .../apache/atlas/repository/impexp/ZipSink.java | 103 +++ .../atlas/repository/impexp/ZipSource.java | 216 ++++++ .../repository/impexp/ExportServiceTest.java | 291 ++++++++ .../impexp/ImportServiceReportingTest.java | 52 ++ .../repository/impexp/ImportServiceTest.java | 85 +++ .../impexp/ZipFileResourceTestUtils.java | 158 +++++ .../atlas/repository/impexp/ZipSinkTest.java | 153 ++++ .../atlas/repository/impexp/ZipSourceTest.java | 106 +++ .../test/resources/dept-employee-test-utils.zip | Bin 0 -> 2932 bytes .../src/test/resources/logging-v1-full.zip | Bin 0 -> 4853 bytes .../src/test/resources/reporting-v1-full.zip | Bin 0 -> 10801 bytes repository/src/test/resources/sales-v1-full.zip | Bin 0 -> 10799 bytes repository/src/test/resources/stocks.zip | Bin 0 -> 5078 bytes .../atlas/web/resources/AdminResource.java | 14 +- .../atlas/web/resources/ExportService.java | 692 ------------------- .../atlas/web/resources/ImportService.java | 164 ----- .../atlas/web/resources/ZipExportFileNames.java | 34 - .../org/apache/atlas/web/resources/ZipSink.java | 103 --- .../apache/atlas/web/resources/ZipSource.java | 210 ------ 22 files changed, 2070 insertions(+), 1205 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/incubator-atlas/blob/07e7faa6/repository/src/main/java/org/apache/atlas/repository/impexp/ExportService.java ---------------------------------------------------------------------- diff --git a/repository/src/main/java/org/apache/atlas/repository/impexp/ExportService.java b/repository/src/main/java/org/apache/atlas/repository/impexp/ExportService.java new file mode 100644 index 0000000..73c3140 --- /dev/null +++ b/repository/src/main/java/org/apache/atlas/repository/impexp/ExportService.java @@ -0,0 +1,692 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.atlas.repository.impexp; + +import org.apache.atlas.AtlasErrorCode; +import org.apache.atlas.AtlasException; +import org.apache.atlas.AtlasServiceException; +import org.apache.atlas.exception.AtlasBaseException; +import org.apache.atlas.model.TypeCategory; +import org.apache.atlas.model.impexp.AtlasExportRequest; +import org.apache.atlas.model.impexp.AtlasExportResult; +import org.apache.atlas.model.instance.AtlasClassification; +import org.apache.atlas.model.instance.AtlasEntity; +import org.apache.atlas.model.instance.AtlasEntity.AtlasEntityWithExtInfo; +import org.apache.atlas.model.instance.AtlasObjectId; +import org.apache.atlas.model.typedef.AtlasBaseTypeDef; +import org.apache.atlas.model.typedef.AtlasClassificationDef; +import org.apache.atlas.model.typedef.AtlasEntityDef; +import org.apache.atlas.model.typedef.AtlasEnumDef; +import org.apache.atlas.model.typedef.AtlasStructDef; +import org.apache.atlas.model.typedef.AtlasStructDef.AtlasAttributeDef; +import org.apache.atlas.model.typedef.AtlasTypesDef; +import org.apache.atlas.repository.graph.AtlasGraphProvider; +import org.apache.atlas.repository.graphdb.AtlasGraph; +import org.apache.atlas.repository.store.graph.v1.EntityGraphRetriever; +import org.apache.atlas.type.AtlasArrayType; +import org.apache.atlas.type.AtlasClassificationType; +import org.apache.atlas.type.AtlasEntityType; +import org.apache.atlas.type.AtlasEnumType; +import org.apache.atlas.type.AtlasMapType; +import org.apache.atlas.type.AtlasStructType; +import org.apache.atlas.type.AtlasStructType.AtlasAttribute; +import org.apache.atlas.type.AtlasType; +import org.apache.atlas.type.AtlasTypeRegistry; +import org.apache.atlas.type.AtlasTypeUtil; +import org.apache.atlas.util.AtlasGremlinQueryProvider; +import org.apache.atlas.util.AtlasGremlinQueryProvider.AtlasGremlinQuery; +import org.apache.commons.collections.CollectionUtils; +import org.apache.commons.collections.MapUtils; +import org.apache.commons.lang3.StringUtils; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import javax.script.ScriptEngine; +import javax.script.ScriptException; +import java.util.ArrayList; +import java.util.Collections; +import java.util.HashMap; +import java.util.HashSet; +import java.util.List; +import java.util.Map; +import java.util.Set; + +import static org.apache.atlas.model.impexp.AtlasExportRequest.*; + +public class ExportService { + private static final Logger LOG = LoggerFactory.getLogger(ExportService.class); + + private final AtlasTypeRegistry typeRegistry; + private final AtlasGraph atlasGraph; + private final EntityGraphRetriever entityGraphRetriever; + private final AtlasGremlinQueryProvider gremlinQueryProvider; + + public ExportService(final AtlasTypeRegistry typeRegistry) throws AtlasBaseException { + this.typeRegistry = typeRegistry; + this.entityGraphRetriever = new EntityGraphRetriever(this.typeRegistry); + this.atlasGraph = AtlasGraphProvider.getGraphInstance(); + this.gremlinQueryProvider = AtlasGremlinQueryProvider.INSTANCE; + } + + public AtlasExportResult run(ZipSink exportSink, AtlasExportRequest request, String userName, String hostName, + String requestingIP) throws AtlasBaseException { + long startTime = System.currentTimeMillis(); + AtlasExportResult result = new AtlasExportResult(request, userName, requestingIP, hostName, startTime); + ExportContext context = new ExportContext(result, exportSink); + + try { + LOG.info("==> export(user={}, from={})", userName, requestingIP); + + for (AtlasObjectId item : request.getItemsToExport()) { + processObjectId(item, context); + } + + long endTime = System.currentTimeMillis(); + + AtlasTypesDef typesDef = context.result.getData().getTypesDef(); + + for (String entityType : context.entityTypes) { + AtlasEntityDef entityDef = typeRegistry.getEntityDefByName(entityType); + + typesDef.getEntityDefs().add(entityDef); + } + + for (String classificationType : context.classificationTypes) { + AtlasClassificationDef classificationDef = typeRegistry.getClassificationDefByName(classificationType); + + typesDef.getClassificationDefs().add(classificationDef); + } + + for (String structType : context.structTypes) { + AtlasStructDef structDef = typeRegistry.getStructDefByName(structType); + + typesDef.getStructDefs().add(structDef); + } + + for (String enumType : context.enumTypes) { + AtlasEnumDef enumDef = typeRegistry.getEnumDefByName(enumType); + + typesDef.getEnumDefs().add(enumDef); + } + + context.sink.setExportOrder(context.result.getData().getEntityCreationOrder()); + context.sink.setTypesDef(context.result.getData().getTypesDef()); + context.result.setData(null); + context.result.setOperationStatus(AtlasExportResult.OperationStatus.SUCCESS); + context.result.incrementMeticsCounter("duration", (int) (endTime - startTime)); + + context.sink.setResult(context.result); + } catch(Exception ex) { + LOG.error("Operation failed: ", ex); + } finally { + atlasGraph.releaseGremlinScriptEngine(context.scriptEngine); + LOG.info("<== export(user={}, from={}): status {}", userName, requestingIP, context.result.getOperationStatus()); + context.clear(); + result.clear(); + } + + return context.result; + } + + private void processObjectId(AtlasObjectId item, ExportContext context) throws AtlasServiceException, AtlasException, AtlasBaseException { + if (LOG.isDebugEnabled()) { + LOG.debug("==> processObjectId({})", item); + } + + try { + List<AtlasEntityWithExtInfo> entities = getStartingEntity(item, context); + + for (AtlasEntityWithExtInfo entityWithExtInfo : entities) { + processEntity(entityWithExtInfo.getEntity().getGuid(), context); + } + + while (!context.guidsToProcess.isEmpty()) { + while (!context.guidsToProcess.isEmpty()) { + String guid = context.guidsToProcess.remove(0); + processEntity(guid, context); + } + + if (!context.guidsLineageToProcess.isEmpty()) { + context.guidsToProcess.addAll(context.guidsLineageToProcess); + context.guidsLineageToProcess.clear(); + } + } + } catch (AtlasBaseException excp) { + context.result.setOperationStatus(AtlasExportResult.OperationStatus.PARTIAL_SUCCESS); + + LOG.error("Fetching entity failed for: {}", item, excp); + } + + if (LOG.isDebugEnabled()) { + LOG.debug("<== processObjectId({})", item); + } + } + + private List<AtlasEntityWithExtInfo> getStartingEntity(AtlasObjectId item, ExportContext context) throws AtlasBaseException { + List<AtlasEntityWithExtInfo> ret = new ArrayList<>(); + + if (StringUtils.isNotEmpty(item.getGuid())) { + AtlasEntityWithExtInfo entity = entityGraphRetriever.toAtlasEntityWithExtInfo(item); + + if (entity != null) { + ret = Collections.singletonList(entity); + } + } else if (StringUtils.isNotEmpty(item.getTypeName()) && MapUtils.isNotEmpty(item.getUniqueAttributes())) { + String typeName = item.getTypeName(); + AtlasEntityType entityType = typeRegistry.getEntityTypeByName(typeName); + + if (entityType == null) { + throw new AtlasBaseException(AtlasErrorCode.UNKNOWN_TYPENAME, typeName); + } + + final String queryTemplate; + if (StringUtils.equalsIgnoreCase(context.matchType, MATCH_TYPE_STARTS_WITH)) { + queryTemplate = gremlinQueryProvider.getQuery(AtlasGremlinQuery.EXPORT_TYPE_STARTS_WITH); + } else if (StringUtils.equalsIgnoreCase(context.matchType, MATCH_TYPE_ENDS_WITH)) { + queryTemplate = gremlinQueryProvider.getQuery(AtlasGremlinQuery.EXPORT_TYPE_ENDS_WITH); + } else if (StringUtils.equalsIgnoreCase(context.matchType, MATCH_TYPE_CONTAINS)) { + queryTemplate = gremlinQueryProvider.getQuery(AtlasGremlinQuery.EXPORT_TYPE_CONTAINS); + } else if (StringUtils.equalsIgnoreCase(context.matchType, MATCH_TYPE_MATCHES)) { + queryTemplate = gremlinQueryProvider.getQuery(AtlasGremlinQuery.EXPORT_TYPE_MATCHES); + } else { // default + queryTemplate = gremlinQueryProvider.getQuery(AtlasGremlinQuery.EXPORT_TYPE_DEFAULT); + } + + for (Map.Entry<String, Object> e : item.getUniqueAttributes().entrySet()) { + String attrName = e.getKey(); + Object attrValue = e.getValue(); + + AtlasAttribute attribute = entityType.getAttribute(attrName); + + if (attribute == null || attrValue == null) { + continue; + } + + context.bindings.clear(); + context.bindings.put("typeName", typeName); + context.bindings.put("attrName", attribute.getQualifiedName()); + context.bindings.put("attrValue", attrValue); + + List<String> guids = executeGremlinQueryForGuids(queryTemplate, context); + + if (CollectionUtils.isNotEmpty(guids)) { + for (String guid : guids) { + AtlasEntityWithExtInfo entityWithExtInfo = entityGraphRetriever.toAtlasEntityWithExtInfo(guid); + + if (entityWithExtInfo == null) { + continue; + } + + ret.add(entityWithExtInfo); + } + } + + break; + } + + LOG.info("export(item={}; matchType={}, fetchType={}): found {} entities", item, context.matchType, context.fetchType, ret.size()); + } + + return ret; + } + + private void processEntity(String guid, ExportContext context) throws AtlasBaseException { + if (LOG.isDebugEnabled()) { + LOG.debug("==> processEntity({})", guid); + } + + if (!context.guidsProcessed.contains(guid)) { + TraversalDirection direction = context.guidDirection.get(guid); + AtlasEntityWithExtInfo entityWithExtInfo = entityGraphRetriever.toAtlasEntityWithExtInfo(guid); + + context.result.getData().getEntityCreationOrder().add(entityWithExtInfo.getEntity().getGuid()); + + addEntity(entityWithExtInfo, context); + addTypes(entityWithExtInfo.getEntity(), context); + + context.guidsProcessed.add(entityWithExtInfo.getEntity().getGuid()); + getConntedEntitiesBasedOnOption(entityWithExtInfo.getEntity(), context, direction); + + if(entityWithExtInfo.getReferredEntities() != null) { + for (AtlasEntity e : entityWithExtInfo.getReferredEntities().values()) { + addTypes(e, context); + getConntedEntitiesBasedOnOption(e, context, direction); + } + + context.guidsProcessed.addAll(entityWithExtInfo.getReferredEntities().keySet()); + } + } + + if (LOG.isDebugEnabled()) { + LOG.debug("<== processEntity({})", guid); + } + } + + private void getConntedEntitiesBasedOnOption(AtlasEntity entity, ExportContext context, TraversalDirection direction) throws AtlasBaseException { + switch (context.fetchType) { + case CONNECTED: + getEntityGuidsForConnectedFetch(entity, context, direction); + break; + + case FULL: + default: + getEntityGuidsForFullFetch(entity, context); + } + } + + private void getEntityGuidsForConnectedFetch(AtlasEntity entity, ExportContext context, TraversalDirection direction) throws AtlasBaseException { + if (direction == null || direction == TraversalDirection.UNKNOWN) { + getConnectedEntityGuids(entity, context, TraversalDirection.OUTWARD, TraversalDirection.INWARD); + } else { + if (isProcessEntity(entity)) { + direction = TraversalDirection.OUTWARD; + } + + getConnectedEntityGuids(entity, context, direction); + } + } + + private boolean isProcessEntity(AtlasEntity entity) throws AtlasBaseException { + String typeName = entity.getTypeName(); + AtlasEntityType entityType = typeRegistry.getEntityTypeByName(typeName); + + return entityType.isSubTypeOf(AtlasBaseTypeDef.ATLAS_TYPE_PROCESS); + } + + private void getConnectedEntityGuids(AtlasEntity entity, ExportContext context, TraversalDirection... directions) { + if(directions == null) { + return; + } + + for (TraversalDirection direction : directions) { + String query = getQueryForTraversalDirection(direction); + + if (LOG.isDebugEnabled()) { + LOG.debug("==> getConnectedEntityGuids({}): guidsToProcess {} query {}", AtlasTypeUtil.getAtlasObjectId(entity), context.guidsToProcess.size(), query); + } + + context.bindings.clear(); + context.bindings.put("startGuid", entity.getGuid()); + + List<HashMap<String, Object>> result = executeGremlinQuery(query, context); + + if (CollectionUtils.isEmpty(result)) { + continue; + } + + for (HashMap<String, Object> hashMap : result) { + String guid = (String) hashMap.get("__guid"); + TraversalDirection currentDirection = context.guidDirection.get(guid); + boolean isLineage = (boolean) hashMap.get("isProcess"); + + if (currentDirection == null) { + context.addToBeProcessed(isLineage, guid, direction); + + } else if (currentDirection == TraversalDirection.OUTWARD && direction == TraversalDirection.INWARD) { + // the entity should be reprocessed to get inward entities + context.guidsProcessed.remove(guid); + context.addToBeProcessed(isLineage, guid, direction); + } + } + + if (LOG.isDebugEnabled()) { + LOG.debug("<== getConnectedEntityGuids({}): found {} guids; guidsToProcess {}", entity.getGuid(), result.size(), context.guidsToProcess.size()); + } + } + } + + private String getQueryForTraversalDirection(TraversalDirection direction) { + switch (direction) { + case INWARD: + return this.gremlinQueryProvider.getQuery(AtlasGremlinQuery.EXPORT_BY_GUID_CONNECTED_IN_EDGE); + + default: + case OUTWARD: + return this.gremlinQueryProvider.getQuery(AtlasGremlinQuery.EXPORT_BY_GUID_CONNECTED_OUT_EDGE); + } + } + + private void getEntityGuidsForFullFetch(AtlasEntity entity, ExportContext context) { + if (LOG.isDebugEnabled()) { + LOG.debug("==> getEntityGuidsForFullFetch({}): guidsToProcess {}", AtlasTypeUtil.getAtlasObjectId(entity), context.guidsToProcess.size()); + } + + String query = this.gremlinQueryProvider.getQuery(AtlasGremlinQuery.EXPORT_BY_GUID_FULL); + + context.bindings.clear(); + context.bindings.put("startGuid", entity.getGuid()); + + List<HashMap<String, Object>> result = executeGremlinQuery(query, context); + + if (CollectionUtils.isEmpty(result)) { + return; + } + + for (HashMap<String, Object> hashMap : result) { + String guid = (String) hashMap.get("__guid"); + boolean isLineage = (boolean) hashMap.get("isProcess"); + + if (!context.guidsProcessed.contains(guid)) { + context.addToBeProcessed(isLineage, guid, TraversalDirection.BOTH); + } + } + + if (LOG.isDebugEnabled()) { + LOG.debug("<== getEntityGuidsForFullFetch({}): found {} guids; guidsToProcess {}", entity.getGuid(), result.size(), context.guidsToProcess.size()); + } + } + + private void addEntity(AtlasEntityWithExtInfo entity, ExportContext context) throws AtlasBaseException { + context.sink.add(entity); + + context.result.incrementMeticsCounter(String.format("entity:%s", entity.getEntity().getTypeName())); + if(entity.getReferredEntities() != null) { + for (AtlasEntity e: entity.getReferredEntities().values()) { + context.result.incrementMeticsCounter(String.format("entity:%s", e.getTypeName())); + } + } + + context.result.incrementMeticsCounter("entity:withExtInfo"); + context.reportProgress(); + } + + private void addTypes(AtlasEntity entity, ExportContext context) { + addEntityType(entity.getTypeName(), context); + + if(CollectionUtils.isNotEmpty(entity.getClassifications())) { + for (AtlasClassification c : entity.getClassifications()) { + addClassificationType(c.getTypeName(), context); + } + } + } + + private void addType(String typeName, ExportContext context) { + AtlasType type = null; + + try { + type = typeRegistry.getType(typeName); + + addType(type, context); + } catch (AtlasBaseException excp) { + LOG.error("unknown type {}", typeName); + } + } + + private void addEntityType(String typeName, ExportContext context) { + if (!context.entityTypes.contains(typeName)) { + AtlasEntityType entityType = typeRegistry.getEntityTypeByName(typeName); + + addEntityType(entityType, context); + } + } + + private void addClassificationType(String typeName, ExportContext context) { + if (!context.classificationTypes.contains(typeName)) { + AtlasClassificationType classificationType = typeRegistry.getClassificationTypeByName(typeName); + + addClassificationType(classificationType, context); + } + } + + private void addType(AtlasType type, ExportContext context) { + if (type.getTypeCategory() == TypeCategory.PRIMITIVE) { + return; + } + + if (type instanceof AtlasArrayType) { + AtlasArrayType arrayType = (AtlasArrayType)type; + + addType(arrayType.getElementType(), context); + } else if (type instanceof AtlasMapType) { + AtlasMapType mapType = (AtlasMapType)type; + + addType(mapType.getKeyType(), context); + addType(mapType.getValueType(), context); + } else if (type instanceof AtlasEntityType) { + addEntityType((AtlasEntityType)type, context); + } else if (type instanceof AtlasClassificationType) { + addClassificationType((AtlasClassificationType)type, context); + } else if (type instanceof AtlasStructType) { + addStructType((AtlasStructType)type, context); + } else if (type instanceof AtlasEnumType) { + addEnumType((AtlasEnumType)type, context); + } + } + + private void addEntityType(AtlasEntityType entityType, ExportContext context) { + if (!context.entityTypes.contains(entityType.getTypeName())) { + context.entityTypes.add(entityType.getTypeName()); + + addAttributeTypes(entityType, context); + + if (CollectionUtils.isNotEmpty(entityType.getAllSuperTypes())) { + for (String superType : entityType.getAllSuperTypes()) { + addEntityType(superType, context); + } + } + } + } + + private void addClassificationType(AtlasClassificationType classificationType, ExportContext context) { + if (!context.classificationTypes.contains(classificationType.getTypeName())) { + context.classificationTypes.add(classificationType.getTypeName()); + + addAttributeTypes(classificationType, context); + + if (CollectionUtils.isNotEmpty(classificationType.getAllSuperTypes())) { + for (String superType : classificationType.getAllSuperTypes()) { + addClassificationType(superType, context); + } + } + } + } + + private void addStructType(AtlasStructType structType, ExportContext context) { + if (!context.structTypes.contains(structType.getTypeName())) { + context.structTypes.add(structType.getTypeName()); + + addAttributeTypes(structType, context); + } + } + + private void addEnumType(AtlasEnumType enumType, ExportContext context) { + if (!context.enumTypes.contains(enumType.getTypeName())) { + context.enumTypes.add(enumType.getTypeName()); + } + } + + private void addAttributeTypes(AtlasStructType structType, ExportContext context) { + for (AtlasAttributeDef attributeDef : structType.getStructDef().getAttributeDefs()) { + addType(attributeDef.getTypeName(), context); + } + } + + private List<HashMap<String, Object>> executeGremlinQuery(String query, ExportContext context) { + try { + return (List<HashMap<String, Object>>) atlasGraph.executeGremlinScript(context.scriptEngine, context.bindings, query, false); + } catch (ScriptException e) { + LOG.error("Script execution failed for query: ", query, e); + return null; + } + } + + private List<String> executeGremlinQueryForGuids(String query, ExportContext context) { + try { + return (List<String>) atlasGraph.executeGremlinScript(context.scriptEngine, context.bindings, query, false); + } catch (ScriptException e) { + LOG.error("Script execution failed for query: ", query, e); + return null; + } + } + + private enum TraversalDirection { + UNKNOWN, + INWARD, + OUTWARD, + BOTH; + } + + + public enum ExportFetchType { + FULL(FETCH_TYPE_FULL), + CONNECTED(FETCH_TYPE_CONNECTED); + + final String str; + ExportFetchType(String s) { + this.str = s; + } + + public static final ExportFetchType from(String s) { + for (ExportFetchType b : ExportFetchType.values()) { + if (b.str.equalsIgnoreCase(s)) { + return b; + } + } + + return FULL; + } + } + + private class UniqueList<T> { + private final List<T> list = new ArrayList<>(); + private final Set<T> set = new HashSet<>(); + + public void add(T e) { + if(set.contains(e)) { + return; + } + + list.add(e); + set.add(e); + } + + public void addAll(UniqueList<T> uniqueList) { + for (T item : uniqueList.list) { + if(set.contains(item)) continue; + + set.add(item); + list.add(item); + } + } + + public T remove(int index) { + T e = list.remove(index); + set.remove(e); + return e; + } + + public boolean contains(T e) { + return set.contains(e); + } + + public int size() { + return list.size(); + } + + public boolean isEmpty() { + return list.isEmpty(); + } + + public void clear() { + list.clear(); + set.clear(); + } + } + + + private class ExportContext { + final Set<String> guidsProcessed = new HashSet<>(); + final UniqueList<String> guidsToProcess = new UniqueList<>(); + final UniqueList<String> guidsLineageToProcess = new UniqueList<>(); + final Map<String, TraversalDirection> guidDirection = new HashMap<>(); + final Set<String> entityTypes = new HashSet<>(); + final Set<String> classificationTypes = new HashSet<>(); + final Set<String> structTypes = new HashSet<>(); + final Set<String> enumTypes = new HashSet<>(); + final AtlasExportResult result; + final ZipSink sink; + + private final ScriptEngine scriptEngine; + private final Map<String, Object> bindings; + private final ExportFetchType fetchType; + private final String matchType; + + private int progressReportCount = 0; + + ExportContext(AtlasExportResult result, ZipSink sink) throws AtlasBaseException { + this.result = result; + this.sink = sink; + + scriptEngine = atlasGraph.getGremlinScriptEngine(); + bindings = new HashMap<>(); + fetchType = getFetchType(result.getRequest()); + matchType = getMatchType(result.getRequest()); + } + + private ExportFetchType getFetchType(AtlasExportRequest request) { + Object fetchOption = request.getOptions() != null ? request.getOptions().get(OPTION_FETCH_TYPE) : null; + + if (fetchOption instanceof String) { + return ExportFetchType.from((String) fetchOption); + } else if (fetchOption instanceof ExportFetchType) { + return (ExportFetchType) fetchOption; + } + + return ExportFetchType.FULL; + } + + private String getMatchType(AtlasExportRequest request) { + String matchType = null; + + if (MapUtils.isNotEmpty(request.getOptions())) { + if (request.getOptions().get(OPTION_ATTR_MATCH_TYPE) != null) { + matchType = request.getOptions().get(OPTION_ATTR_MATCH_TYPE).toString(); + } + } + + return matchType; + } + + public void clear() { + guidsToProcess.clear(); + guidsProcessed.clear(); + guidDirection.clear(); + } + + public void addToBeProcessed(boolean isSuperTypeProcess, String guid, TraversalDirection direction) { + if(!isSuperTypeProcess) { + guidsToProcess.add(guid); + } + + if(isSuperTypeProcess) { + guidsLineageToProcess.add(guid); + } + + guidDirection.put(guid, direction); + } + + public void reportProgress() { + + if ((guidsProcessed.size() - progressReportCount) > 1000) { + progressReportCount = guidsProcessed.size(); + + LOG.info("export(): in progress.. number of entities exported: {}", this.guidsProcessed.size()); + } + } + } +} http://git-wip-us.apache.org/repos/asf/incubator-atlas/blob/07e7faa6/repository/src/main/java/org/apache/atlas/repository/impexp/ImportService.java ---------------------------------------------------------------------- diff --git a/repository/src/main/java/org/apache/atlas/repository/impexp/ImportService.java b/repository/src/main/java/org/apache/atlas/repository/impexp/ImportService.java new file mode 100644 index 0000000..9ec15e0 --- /dev/null +++ b/repository/src/main/java/org/apache/atlas/repository/impexp/ImportService.java @@ -0,0 +1,168 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * <p> + * http://www.apache.org/licenses/LICENSE-2.0 + * <p> + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.atlas.repository.impexp; + +import org.apache.atlas.AtlasErrorCode; +import org.apache.atlas.exception.AtlasBaseException; +import org.apache.atlas.model.impexp.AtlasImportRequest; +import org.apache.atlas.model.impexp.AtlasImportResult; +import org.apache.atlas.model.typedef.AtlasClassificationDef; +import org.apache.atlas.model.typedef.AtlasEntityDef; +import org.apache.atlas.model.typedef.AtlasEnumDef; +import org.apache.atlas.model.typedef.AtlasStructDef; +import org.apache.atlas.model.typedef.AtlasTypesDef; +import org.apache.atlas.repository.store.bootstrap.AtlasTypeDefStoreInitializer; +import org.apache.atlas.repository.store.graph.AtlasEntityStore; +import org.apache.atlas.store.AtlasTypeDefStore; +import org.apache.atlas.type.AtlasTypeRegistry; +import org.apache.commons.io.FileUtils; +import org.apache.commons.lang3.StringUtils; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.io.ByteArrayInputStream; +import java.io.File; +import java.io.FileNotFoundException; + + +public class ImportService { + private static final Logger LOG = LoggerFactory.getLogger(ImportService.class); + + private final AtlasTypeDefStore typeDefStore; + private final AtlasEntityStore entityStore; + private final AtlasTypeRegistry typeRegistry; + + private long startTimestamp; + private long endTimestamp; + + + public ImportService(final AtlasTypeDefStore typeDefStore, final AtlasEntityStore entityStore, AtlasTypeRegistry typeRegistry) { + this.typeDefStore = typeDefStore; + this.entityStore = entityStore; + this.typeRegistry = typeRegistry; + } + + public AtlasImportResult run(ZipSource source, AtlasImportRequest request, String userName, + String hostName, String requestingIP) throws AtlasBaseException { + AtlasImportResult result = new AtlasImportResult(request, userName, requestingIP, hostName, System.currentTimeMillis()); + + try { + LOG.info("==> import(user={}, from={})", userName, requestingIP); + + startTimestamp = System.currentTimeMillis(); + processTypes(source.getTypesDef(), result); + processEntities(source, result); + + result.setOperationStatus(AtlasImportResult.OperationStatus.SUCCESS); + } catch (AtlasBaseException excp) { + LOG.error("import(user={}, from={}): failed", userName, requestingIP, excp); + + throw excp; + } catch (Exception excp) { + LOG.error("import(user={}, from={}): failed", userName, requestingIP, excp); + + throw new AtlasBaseException(excp); + } finally { + source.close(); + LOG.info("<== import(user={}, from={}): status={}", userName, requestingIP, result.getOperationStatus()); + } + + return result; + } + + public AtlasImportResult run(AtlasImportRequest request, String userName, String hostName, String requestingIP) + throws AtlasBaseException { + String fileName = (String)request.getOptions().get("FILENAME"); + + if (StringUtils.isBlank(fileName)) { + throw new AtlasBaseException(AtlasErrorCode.INVALID_PARAMETERS, "FILENAME parameter not found"); + } + + AtlasImportResult result = null; + + try { + LOG.info("==> import(user={}, from={}, fileName={})", userName, requestingIP, fileName); + + File file = new File(fileName); + ZipSource source = new ZipSource(new ByteArrayInputStream(FileUtils.readFileToByteArray(file))); + + result = run(source, request, userName, hostName, requestingIP); + } catch (AtlasBaseException excp) { + LOG.error("import(user={}, from={}, fileName={}): failed", userName, requestingIP, excp); + + throw excp; + } catch (FileNotFoundException excp) { + LOG.error("import(user={}, from={}, fileName={}): file not found", userName, requestingIP, excp); + + throw new AtlasBaseException(AtlasErrorCode.INVALID_PARAMETERS, fileName + ": file not found"); + } catch (Exception excp) { + LOG.error("import(user={}, from={}, fileName={}): failed", userName, requestingIP, excp); + + throw new AtlasBaseException(excp); + } finally { + LOG.info("<== import(user={}, from={}, fileName={}): status={}", userName, requestingIP, fileName, + (result == null ? AtlasImportResult.OperationStatus.FAIL : result.getOperationStatus())); + } + + return result; + } + + private void processTypes(AtlasTypesDef typeDefinitionMap, AtlasImportResult result) throws AtlasBaseException { + setGuidToEmpty(typeDefinitionMap); + + AtlasTypesDef typesToCreate = AtlasTypeDefStoreInitializer.getTypesToCreate(typeDefinitionMap, this.typeRegistry); + + if (!typesToCreate.isEmpty()) { + typeDefStore.createTypesDef(typesToCreate); + + updateMetricsForTypesDef(typesToCreate, result); + } + } + + private void updateMetricsForTypesDef(AtlasTypesDef typeDefinitionMap, AtlasImportResult result) { + result.incrementMeticsCounter("typedef:classification", typeDefinitionMap.getClassificationDefs().size()); + result.incrementMeticsCounter("typedef:enum", typeDefinitionMap.getEnumDefs().size()); + result.incrementMeticsCounter("typedef:entitydef", typeDefinitionMap.getEntityDefs().size()); + result.incrementMeticsCounter("typedef:struct", typeDefinitionMap.getStructDefs().size()); + } + + private void setGuidToEmpty(AtlasTypesDef typesDef) { + for (AtlasEntityDef def: typesDef.getEntityDefs()) { + def.setGuid(null); + } + + for (AtlasClassificationDef def: typesDef.getClassificationDefs()) { + def.setGuid(null); + } + + for (AtlasEnumDef def: typesDef.getEnumDefs()) { + def.setGuid(null); + } + + for (AtlasStructDef def: typesDef.getStructDefs()) { + def.setGuid(null); + } + } + + private void processEntities(ZipSource importSource, AtlasImportResult result) throws AtlasBaseException { + this.entityStore.bulkImport(importSource, result); + + endTimestamp = System.currentTimeMillis(); + result.incrementMeticsCounter("duration", (int) (this.endTimestamp - this.startTimestamp)); + } +} http://git-wip-us.apache.org/repos/asf/incubator-atlas/blob/07e7faa6/repository/src/main/java/org/apache/atlas/repository/impexp/ZipExportFileNames.java ---------------------------------------------------------------------- diff --git a/repository/src/main/java/org/apache/atlas/repository/impexp/ZipExportFileNames.java b/repository/src/main/java/org/apache/atlas/repository/impexp/ZipExportFileNames.java new file mode 100644 index 0000000..351b475 --- /dev/null +++ b/repository/src/main/java/org/apache/atlas/repository/impexp/ZipExportFileNames.java @@ -0,0 +1,34 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.atlas.repository.impexp; + +public enum ZipExportFileNames { + ATLAS_EXPORT_INFO_NAME("atlas-export-info"), + ATLAS_EXPORT_ORDER_NAME("atlas-export-order"), + ATLAS_TYPESDEF_NAME("atlas-typesdef"); + + public final String name; + ZipExportFileNames(String name) { + this.name = name; + } + + @Override + public String toString() { + return this.name; + } +} http://git-wip-us.apache.org/repos/asf/incubator-atlas/blob/07e7faa6/repository/src/main/java/org/apache/atlas/repository/impexp/ZipSink.java ---------------------------------------------------------------------- diff --git a/repository/src/main/java/org/apache/atlas/repository/impexp/ZipSink.java b/repository/src/main/java/org/apache/atlas/repository/impexp/ZipSink.java new file mode 100644 index 0000000..4bb04da --- /dev/null +++ b/repository/src/main/java/org/apache/atlas/repository/impexp/ZipSink.java @@ -0,0 +1,103 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.atlas.repository.impexp; + +import org.apache.atlas.exception.AtlasBaseException; +import org.apache.atlas.model.impexp.AtlasExportResult; +import org.apache.atlas.model.instance.AtlasEntity; +import org.apache.atlas.model.typedef.AtlasTypesDef; +import org.apache.atlas.type.AtlasType; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.io.IOException; +import java.io.OutputStream; +import java.util.List; +import java.util.zip.ZipEntry; +import java.util.zip.ZipOutputStream; + +public class ZipSink { + private static final Logger LOG = LoggerFactory.getLogger(ZipSink.class); + + private ZipOutputStream zipOutputStream; + + public ZipSink(OutputStream outputStream) { + zipOutputStream = new ZipOutputStream(outputStream); + } + + public void add(AtlasEntity entity) throws AtlasBaseException { + String jsonData = convertToJSON(entity); + saveToZip(entity.getGuid(), jsonData); + } + + public void add(AtlasEntity.AtlasEntityWithExtInfo entityWithExtInfo) throws AtlasBaseException { + String jsonData = convertToJSON(entityWithExtInfo); + saveToZip(entityWithExtInfo.getEntity().getGuid(), jsonData); + } + + public void setResult(AtlasExportResult result) throws AtlasBaseException { + String jsonData = convertToJSON(result); + saveToZip(ZipExportFileNames.ATLAS_EXPORT_INFO_NAME, jsonData); + } + + public void setTypesDef(AtlasTypesDef typesDef) throws AtlasBaseException { + String jsonData = convertToJSON(typesDef); + saveToZip(ZipExportFileNames.ATLAS_TYPESDEF_NAME, jsonData); + } + + public void setExportOrder(List<String> result) throws AtlasBaseException { + String jsonData = convertToJSON(result); + saveToZip(ZipExportFileNames.ATLAS_EXPORT_ORDER_NAME, jsonData); + } + + public void close() { + try { + if(zipOutputStream != null) { + zipOutputStream.close(); + zipOutputStream = null; + } + } catch (IOException e) { + LOG.error("Error closing Zip file", e); + } + } + + private String convertToJSON(Object entity) { + return AtlasType.toJson(entity); + } + + private void saveToZip(ZipExportFileNames fileName, String jsonData) throws AtlasBaseException { + saveToZip(fileName.toString(), jsonData); + } + + private void saveToZip(String fileName, String jsonData) throws AtlasBaseException { + try { + addToZipStream(fileName.toString() + ".json", jsonData); + } catch (IOException e) { + throw new AtlasBaseException(String.format("Error writing file %s.", fileName), e); + } + } + + private void addToZipStream(String entryName, String payload) throws IOException { + + ZipEntry e = new ZipEntry(entryName); + zipOutputStream.putNextEntry(e); + + zipOutputStream.write(payload.getBytes()); + zipOutputStream.closeEntry(); + } +} http://git-wip-us.apache.org/repos/asf/incubator-atlas/blob/07e7faa6/repository/src/main/java/org/apache/atlas/repository/impexp/ZipSource.java ---------------------------------------------------------------------- diff --git a/repository/src/main/java/org/apache/atlas/repository/impexp/ZipSource.java b/repository/src/main/java/org/apache/atlas/repository/impexp/ZipSource.java new file mode 100644 index 0000000..87c9f0e --- /dev/null +++ b/repository/src/main/java/org/apache/atlas/repository/impexp/ZipSource.java @@ -0,0 +1,216 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * <p> + * http://www.apache.org/licenses/LICENSE-2.0 + * <p> + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.atlas.repository.impexp; + +import org.apache.atlas.exception.AtlasBaseException; +import org.apache.atlas.model.impexp.AtlasExportResult; +import org.apache.atlas.model.instance.AtlasEntity; +import org.apache.atlas.model.instance.AtlasEntity.AtlasEntityWithExtInfo; +import org.apache.atlas.model.typedef.AtlasTypesDef; +import org.apache.atlas.repository.store.graph.v1.EntityImportStream; +import org.codehaus.jackson.map.ObjectMapper; +import org.codehaus.jackson.type.TypeReference; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.io.ByteArrayOutputStream; +import java.io.IOException; +import java.io.InputStream; +import java.util.HashMap; +import java.util.Iterator; +import java.util.List; +import java.util.Map; +import java.util.zip.ZipEntry; +import java.util.zip.ZipInputStream; + +import static org.apache.atlas.AtlasErrorCode.JSON_ERROR_OBJECT_MAPPER_NULL_RETURNED; + + +public class ZipSource implements EntityImportStream { + private static final Logger LOG = LoggerFactory.getLogger(ZipSource.class); + + private final InputStream inputStream; + private List<String> creationOrder; + private Iterator<String> iterator; + private Map<String, String> guidEntityJsonMap; + + public ZipSource(InputStream inputStream) throws IOException { + this.inputStream = inputStream; + guidEntityJsonMap = new HashMap<>(); + + updateGuidZipEntryMap(); + this.setCreationOrder(); + } + + public AtlasTypesDef getTypesDef() throws AtlasBaseException { + final String fileName = ZipExportFileNames.ATLAS_TYPESDEF_NAME.toString(); + + String s = (String) getFromCache(fileName); + return convertFromJson(AtlasTypesDef.class, s); + } + + public AtlasExportResult getExportResult() throws AtlasBaseException { + final String fileName = ZipExportFileNames.ATLAS_EXPORT_INFO_NAME.toString(); + + String s = getFromCache(fileName); + return convertFromJson(AtlasExportResult.class, s); + } + + private void setCreationOrder() { + String fileName = ZipExportFileNames.ATLAS_EXPORT_ORDER_NAME.toString(); + + try { + String s = getFromCache(fileName); + this.creationOrder = convertFromJson(new TypeReference<List<String>>(){}, s); + this.iterator = this.creationOrder.iterator(); + } catch (AtlasBaseException e) { + LOG.error(String.format("Error retrieving '%s' from zip.", fileName), e); + } + } + + private void updateGuidZipEntryMap() throws IOException { + + ZipInputStream zipInputStream = new ZipInputStream(inputStream); + ZipEntry zipEntry = zipInputStream.getNextEntry(); + while (zipEntry != null) { + String entryName = zipEntry.getName().replace(".json", ""); + + if (guidEntityJsonMap.containsKey(entryName)) continue; + + byte[] buf = new byte[1024]; + + int n = 0; + ByteArrayOutputStream bos = new ByteArrayOutputStream(); + while ((n = zipInputStream.read(buf, 0, 1024)) > -1) { + bos.write(buf, 0, n); + } + + guidEntityJsonMap.put(entryName, bos.toString()); + zipEntry = zipInputStream.getNextEntry(); + + } + + zipInputStream.close(); + } + + public List<String> getCreationOrder() throws AtlasBaseException { + return this.creationOrder; + } + + public AtlasEntity.AtlasEntityWithExtInfo getEntityWithExtInfo(String guid) throws AtlasBaseException { + String s = (String) getFromCache(guid); + AtlasEntity.AtlasEntityWithExtInfo entityWithExtInfo = convertFromJson(AtlasEntity.AtlasEntityWithExtInfo.class, s); + return entityWithExtInfo; + } + + private <T> T convertFromJson(TypeReference clazz, String jsonData) throws AtlasBaseException { + try { + ObjectMapper mapper = new ObjectMapper(); + + T ret = mapper.readValue(jsonData, clazz); + if(ret == null) { + throw new AtlasBaseException(JSON_ERROR_OBJECT_MAPPER_NULL_RETURNED, clazz.toString()); + } + + return ret; + } catch (Exception e) { + throw new AtlasBaseException("Error converting file to JSON.", e); + } + } + + private <T> T convertFromJson(Class<T> clazz, String jsonData) throws AtlasBaseException { + try { + ObjectMapper mapper = new ObjectMapper(); + + return mapper.readValue(jsonData, clazz); + + } catch (Exception e) { + throw new AtlasBaseException("Error converting file to JSON.", e); + } + } + + private String getFromCache(String entryName) { + return guidEntityJsonMap.get(entryName); + } + + public void close() { + try { + inputStream.close(); + guidEntityJsonMap.clear(); + } + catch(IOException ex) { + LOG.warn("{}: Error closing streams."); + } + } + + @Override + public boolean hasNext() { + return this.iterator.hasNext(); + } + + @Override + public AtlasEntity next() { + AtlasEntityWithExtInfo entityWithExtInfo = getNextEntityWithExtInfo(); + + return entityWithExtInfo != null ? entityWithExtInfo.getEntity() : null; + } + + @Override + public AtlasEntityWithExtInfo getNextEntityWithExtInfo() { + try { + return getEntityWithExtInfo(this.iterator.next()); + } catch (AtlasBaseException e) { + e.printStackTrace(); + return null; + } + } + + @Override + public void reset() { + try { + getCreationOrder(); + this.iterator = this.creationOrder.iterator(); + } catch (AtlasBaseException e) { + LOG.error("reset", e); + } + } + + @Override + public AtlasEntity getByGuid(String guid) { + try { + return getEntity(guid); + } catch (AtlasBaseException e) { + e.printStackTrace(); + return null; + } + } + + private AtlasEntity getEntity(String guid) throws AtlasBaseException { + if(guidEntityJsonMap.containsKey(guid)) { + AtlasEntityWithExtInfo extInfo = getEntityWithExtInfo(guid); + return (extInfo != null) ? extInfo.getEntity() : null; + } + + return null; + } + + @Override + public void onImportComplete(String guid) { + guidEntityJsonMap.remove(guid); + } +} http://git-wip-us.apache.org/repos/asf/incubator-atlas/blob/07e7faa6/repository/src/test/java/org/apache/atlas/repository/impexp/ExportServiceTest.java ---------------------------------------------------------------------- diff --git a/repository/src/test/java/org/apache/atlas/repository/impexp/ExportServiceTest.java b/repository/src/test/java/org/apache/atlas/repository/impexp/ExportServiceTest.java new file mode 100644 index 0000000..bac831b --- /dev/null +++ b/repository/src/test/java/org/apache/atlas/repository/impexp/ExportServiceTest.java @@ -0,0 +1,291 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.atlas.repository.impexp; + + +import com.google.inject.Inject; +import org.apache.atlas.RepositoryMetadataModule; +import org.apache.atlas.TestUtilsV2; +import org.apache.atlas.exception.AtlasBaseException; +import org.apache.atlas.model.impexp.AtlasExportRequest; +import org.apache.atlas.model.impexp.AtlasExportResult; +import org.apache.atlas.model.instance.AtlasEntity; +import org.apache.atlas.model.instance.AtlasObjectId; +import org.apache.atlas.model.typedef.AtlasTypesDef; +import org.apache.atlas.repository.store.bootstrap.AtlasTypeDefStoreInitializer; +import org.apache.atlas.repository.store.graph.v1.AtlasEntityChangeNotifier; +import org.apache.atlas.repository.store.graph.v1.AtlasEntityStoreV1; +import org.apache.atlas.repository.store.graph.v1.AtlasEntityStream; +import org.apache.atlas.repository.store.graph.v1.DeleteHandlerV1; +import org.apache.atlas.repository.store.graph.v1.SoftDeleteHandlerV1; +import org.apache.atlas.store.AtlasTypeDefStore; +import org.apache.atlas.type.AtlasTypeRegistry; +import org.junit.Assert; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; +import org.testng.annotations.BeforeClass; +import org.testng.annotations.BeforeTest; +import org.testng.annotations.Guice; +import org.testng.annotations.Test; +import scala.actors.threadpool.Arrays; + +import java.io.ByteArrayInputStream; +import java.io.ByteArrayOutputStream; +import java.io.FileNotFoundException; +import java.io.IOException; +import java.util.ArrayList; +import java.util.HashMap; +import java.util.List; +import java.util.Map; + +import static org.mockito.Mockito.mock; + +@Guice(modules = RepositoryMetadataModule.class) +public class ExportServiceTest { + private static final Logger LOG = LoggerFactory.getLogger(ExportServiceTest.class); + + @Inject + AtlasTypeRegistry typeRegistry; + + @Inject + private AtlasTypeDefStore typeDefStore; + + ExportService exportService; + private DeleteHandlerV1 deleteHandler = mock(SoftDeleteHandlerV1.class);; + private AtlasEntityChangeNotifier mockChangeNotifier = mock(AtlasEntityChangeNotifier.class); + private AtlasEntityStoreV1 entityStore; + + @BeforeClass + public void setupSampleData() throws AtlasBaseException { + entityStore = new AtlasEntityStoreV1(deleteHandler, typeRegistry, mockChangeNotifier);; + + AtlasTypesDef sampleTypes = TestUtilsV2.defineDeptEmployeeTypes(); + AtlasTypesDef typesToCreate = AtlasTypeDefStoreInitializer.getTypesToCreate(sampleTypes, typeRegistry); + + if (!typesToCreate.isEmpty()) { + typeDefStore.createTypesDef(typesToCreate); + } + + AtlasEntity.AtlasEntitiesWithExtInfo hrDept = TestUtilsV2.createDeptEg2(); + + AtlasEntityStream entityStream = new AtlasEntityStream(hrDept); + entityStore.createOrUpdate(entityStream, false); + LOG.debug("==> setupSampleData: ", AtlasEntity.dumpObjects(hrDept.getEntities(), null).toString()); + } + + @BeforeTest + public void setupExportService () throws AtlasBaseException { + exportService = new ExportService(typeRegistry); + } + + private AtlasExportRequest getRequestForFullFetch() { + AtlasExportRequest request = new AtlasExportRequest(); + + List<AtlasObjectId> itemsToExport = new ArrayList<>(); + itemsToExport.add(new AtlasObjectId("hive_db", "qualifiedName", "default@cl1")); + request.setItemsToExport(itemsToExport); + + return request; + } + + private AtlasExportRequest getRequestForDept(boolean addFetchType, String fetchTypeValue, boolean addMatchType, String matchTypeValue) { + AtlasExportRequest request = new AtlasExportRequest(); + + List<AtlasObjectId> itemsToExport = new ArrayList<>(); + itemsToExport.add(new AtlasObjectId("Department", "name", "hr")); + request.setItemsToExport(itemsToExport); + + setOptionsMap(request, addFetchType, fetchTypeValue, addMatchType, matchTypeValue); + return request; + } + + private AtlasExportRequest getRequestForEmployee() { + AtlasExportRequest request = new AtlasExportRequest(); + + List<AtlasObjectId> itemsToExport = new ArrayList<>(); + itemsToExport.add(new AtlasObjectId("Employee", "name", "Max")); + request.setItemsToExport(itemsToExport); + + setOptionsMap(request, true, "CONNECTED", false, ""); + return request; + } + + private void setOptionsMap(AtlasExportRequest request, + boolean addFetchType, String fetchTypeValue, boolean addMatchType, String matchTypeValue) { + Map<String, Object> optionsMap = null; + if(addFetchType) { + if(optionsMap == null) { + optionsMap = new HashMap<>(); + } + + optionsMap.put("fetchType", fetchTypeValue); + request.setOptions(optionsMap); + } + + if(addMatchType) { + if(optionsMap == null) { + optionsMap = new HashMap<>(); + } + + optionsMap.put("matchType", matchTypeValue); + } + + if(optionsMap != null) { + request.setOptions(optionsMap); + } + } + + private ZipSource runExportWithParameters(AtlasExportRequest request) throws AtlasBaseException, IOException { + final String requestingIP = "1.0.0.0"; + final String hostName = "localhost"; + final String userName = "admin"; + + ByteArrayOutputStream baos = new ByteArrayOutputStream(); + ZipSink zipSink = new ZipSink(baos); + AtlasExportResult result = exportService.run(zipSink, request, userName, hostName, requestingIP); + Assert.assertEquals(result.getOperationStatus(), AtlasExportResult.OperationStatus.SUCCESS); + + zipSink.close(); + + ByteArrayInputStream bis = new ByteArrayInputStream(baos.toByteArray()); + ZipSource zipSource = new ZipSource(bis); + return zipSource; + } + + @Test + public void exportType_Succeeds() throws AtlasBaseException, FileNotFoundException { + String requestingIP = "1.0.0.0"; + String hostName = "root"; + + AtlasExportRequest request = getRequestForFullFetch(); + ByteArrayOutputStream baos = new ByteArrayOutputStream(); + ZipSink zipSink = new ZipSink(baos); + AtlasExportResult result = exportService.run(zipSink, request, "admin", hostName, requestingIP); + + Assert.assertNotNull(exportService); + Assert.assertEquals(result.getHostName(), hostName); + Assert.assertEquals(result.getClientIpAddress(), requestingIP); + Assert.assertEquals(request, result.getRequest()); + } + + @Test + public void requestingEntityNotFound_NoData() throws AtlasBaseException, IOException { + String requestingIP = "1.0.0.0"; + String hostName = "root"; + + ByteArrayOutputStream baos = new ByteArrayOutputStream(); + ZipSink zipSink = new ZipSink(baos); + AtlasExportResult result = exportService.run( + zipSink, getRequestForFullFetch(), "admin", hostName, requestingIP); + + Assert.assertNull(result.getData()); + + ByteArrayInputStream bais = new ByteArrayInputStream(baos.toByteArray()); + ZipSource zipSource = new ZipSource(bais); + + Assert.assertNotNull(exportService); + Assert.assertNotNull(zipSource.getCreationOrder()); + Assert.assertFalse(zipSource.hasNext()); + } + + @Test + public void requestingEntityFoundDefaultFetch_ContainsData() throws Exception { + ZipSource source = runExportWithParameters( + getRequestForDept(false, "", false, "")); + verifyExportForHrData(source); + } + + @Test + public void requestingHrEntityWithMatchTypeContains_ContainsData() throws Exception { + ZipSource source = runExportWithParameters( + getRequestForDept(false, "", true, "CONTAINS")); + verifyExportForHrData(source); + } + + @Test + public void requestingHrEntityWithMatchTypeEndsWith_ContainsData() throws Exception { + ZipSource source = runExportWithParameters( + getRequestForDept(false, "", true, "ENDSWITH")); + verifyExportForHrData(source); + } + + @Test + public void requestingDeptEntityFoundFullFetch_ContainsData() throws Exception { + ZipSource source = runExportWithParameters( + getRequestForDept(true, "FULL", false, "")); + verifyExportForHrData(source); + } + + @Test + public void requestingDeptEntityFoundConnectedFetch_ContainsData() throws Exception { + ZipSource source = runExportWithParameters( + getRequestForDept(true, "CONNECTED", false, "")); + verifyExportForHrDataForConnected(source); + } + + @Test + public void requestingEmployeeEntityFoundConnectedFetch_ContainsData() throws Exception { + ZipSource zipSource = runExportWithParameters(getRequestForEmployee()); + verifyExportForEmployeeData(zipSource); + } + + private void verifyExportForEmployeeData(ZipSource zipSource) throws AtlasBaseException { + final List<String> expectedEntityTypes = Arrays.asList(new String[]{"Manager", "Employee", "Department"}); + + Assert.assertNotNull(zipSource.getCreationOrder()); + Assert.assertEquals(zipSource.getCreationOrder().size(), 2); + Assert.assertTrue(zipSource.hasNext()); + + while (zipSource.hasNext()) { + AtlasEntity entity = zipSource.next(); + Assert.assertEquals(entity.getStatus(), AtlasEntity.Status.ACTIVE); + Assert.assertTrue(expectedEntityTypes.contains(entity.getTypeName())); + } + + verifyTypeDefs(zipSource); + } + + private void verifyExportForHrData(ZipSource zipSource) throws IOException, AtlasBaseException { + Assert.assertNotNull(zipSource.getCreationOrder()); + Assert.assertTrue(zipSource.getCreationOrder().size() == 1); + Assert.assertTrue(zipSource.hasNext()); + + AtlasEntity entity = zipSource.next(); + Assert.assertTrue(entity.getTypeName().equals("Department")); + Assert.assertEquals(entity.getStatus(), AtlasEntity.Status.ACTIVE); + verifyTypeDefs(zipSource); + } + + private void verifyExportForHrDataForConnected(ZipSource zipSource) throws IOException, AtlasBaseException { + Assert.assertNotNull(zipSource.getCreationOrder()); + Assert.assertTrue(zipSource.getCreationOrder().size() == 2); + Assert.assertTrue(zipSource.hasNext()); + + AtlasEntity entity = zipSource.next(); + Assert.assertTrue(entity.getTypeName().equals("Department")); + Assert.assertEquals(entity.getStatus(), AtlasEntity.Status.ACTIVE); + verifyTypeDefs(zipSource); + } + + private void verifyTypeDefs(ZipSource zipSource) throws AtlasBaseException { + Assert.assertEquals(zipSource.getTypesDef().getEnumDefs().size(), 1); + Assert.assertEquals(zipSource.getTypesDef().getClassificationDefs().size(), 0); + Assert.assertEquals(zipSource.getTypesDef().getStructDefs().size(), 1); + Assert.assertEquals(zipSource.getTypesDef().getEntityDefs().size(), 4); + } +} http://git-wip-us.apache.org/repos/asf/incubator-atlas/blob/07e7faa6/repository/src/test/java/org/apache/atlas/repository/impexp/ImportServiceReportingTest.java ---------------------------------------------------------------------- diff --git a/repository/src/test/java/org/apache/atlas/repository/impexp/ImportServiceReportingTest.java b/repository/src/test/java/org/apache/atlas/repository/impexp/ImportServiceReportingTest.java new file mode 100644 index 0000000..411299e --- /dev/null +++ b/repository/src/test/java/org/apache/atlas/repository/impexp/ImportServiceReportingTest.java @@ -0,0 +1,52 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.atlas.repository.impexp; + +import com.google.inject.Inject; +import org.apache.atlas.RepositoryMetadataModule; +import org.apache.atlas.exception.AtlasBaseException; +import org.apache.atlas.repository.store.graph.AtlasEntityStore; +import org.apache.atlas.store.AtlasTypeDefStore; +import org.apache.atlas.type.AtlasTypeRegistry; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; +import org.testng.annotations.DataProvider; +import org.testng.annotations.Guice; +import org.testng.annotations.Test; + +import java.io.IOException; + +import static org.apache.atlas.repository.impexp.ZipFileResourceTestUtils.getZipSource; +import static org.apache.atlas.repository.impexp.ZipFileResourceTestUtils.loadModelFromJson; +import static org.apache.atlas.repository.impexp.ZipFileResourceTestUtils.runAndVerifyQuickStart_v1_Import; + +@Guice(modules = RepositoryMetadataModule.class) +public class ImportServiceReportingTest { + private static final Logger LOG = LoggerFactory.getLogger(ImportServiceReportingTest.class); + + @Inject + AtlasTypeRegistry typeRegistry; + + @Inject + private AtlasTypeDefStore typeDefStore; + + @Inject + private AtlasEntityStore entityStore; + + +} http://git-wip-us.apache.org/repos/asf/incubator-atlas/blob/07e7faa6/repository/src/test/java/org/apache/atlas/repository/impexp/ImportServiceTest.java ---------------------------------------------------------------------- diff --git a/repository/src/test/java/org/apache/atlas/repository/impexp/ImportServiceTest.java b/repository/src/test/java/org/apache/atlas/repository/impexp/ImportServiceTest.java new file mode 100644 index 0000000..131a6e1 --- /dev/null +++ b/repository/src/test/java/org/apache/atlas/repository/impexp/ImportServiceTest.java @@ -0,0 +1,85 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.atlas.repository.impexp; + +import com.google.inject.Inject; +import org.apache.atlas.RepositoryMetadataModule; +import org.apache.atlas.exception.AtlasBaseException; +import org.apache.atlas.repository.store.graph.AtlasEntityStore; +import org.apache.atlas.store.AtlasTypeDefStore; +import org.apache.atlas.type.AtlasTypeRegistry; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; +import org.testng.ITestContext; +import org.testng.annotations.DataProvider; +import org.testng.annotations.Guice; +import org.testng.annotations.Test; + +import java.io.IOException; + +import static org.apache.atlas.repository.impexp.ZipFileResourceTestUtils.getZipSource; +import static org.apache.atlas.repository.impexp.ZipFileResourceTestUtils.loadModelFromJson; +import static org.apache.atlas.repository.impexp.ZipFileResourceTestUtils.runAndVerifyQuickStart_v1_Import; + +@Guice(modules = RepositoryMetadataModule.class) +public class ImportServiceTest { + private static final Logger LOG = LoggerFactory.getLogger(ImportServiceTest.class); + + @Inject + AtlasTypeRegistry typeRegistry; + + @Inject + private AtlasTypeDefStore typeDefStore; + + @Inject + private AtlasEntityStore entityStore; + + @DataProvider(name = "sales") + public static Object[][] getDataFromQuickStart_v1_Sales(ITestContext context) throws IOException { + return getZipSource("sales-v1-full.zip"); + } + + @Test(dataProvider = "sales") + public void importDB1_Succeeds(ZipSource zipSource) throws AtlasBaseException, IOException { + loadModelFromJson("0010-base_model.json", typeDefStore, typeRegistry); + runAndVerifyQuickStart_v1_Import(new ImportService(typeDefStore, entityStore, typeRegistry), zipSource); + } + + @DataProvider(name = "reporting") + public static Object[][] getDataFromReporting() throws IOException { + return getZipSource("reporting-v1-full.zip"); + } + + @Test(dataProvider = "reporting") + public void importDB2_Succeeds(ZipSource zipSource) throws AtlasBaseException, IOException { + loadModelFromJson("0010-base_model.json", typeDefStore, typeRegistry); + runAndVerifyQuickStart_v1_Import(new ImportService(typeDefStore, entityStore, typeRegistry), zipSource); + } + + @DataProvider(name = "logging") + public static Object[][] getDataFromLogging(ITestContext context) throws IOException { + return getZipSource("logging-v1-full.zip"); + } + + @Test(dataProvider = "logging") + public void importDB3_Succeeds(ZipSource zipSource) throws AtlasBaseException, IOException { + loadModelFromJson("0010-base_model.json", typeDefStore, typeRegistry); + runAndVerifyQuickStart_v1_Import(new ImportService(typeDefStore, entityStore, typeRegistry), zipSource); + } + +} http://git-wip-us.apache.org/repos/asf/incubator-atlas/blob/07e7faa6/repository/src/test/java/org/apache/atlas/repository/impexp/ZipFileResourceTestUtils.java ---------------------------------------------------------------------- diff --git a/repository/src/test/java/org/apache/atlas/repository/impexp/ZipFileResourceTestUtils.java b/repository/src/test/java/org/apache/atlas/repository/impexp/ZipFileResourceTestUtils.java new file mode 100644 index 0000000..612c445 --- /dev/null +++ b/repository/src/test/java/org/apache/atlas/repository/impexp/ZipFileResourceTestUtils.java @@ -0,0 +1,158 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.atlas.repository.impexp; + +import com.google.common.collect.Sets; +import org.apache.atlas.exception.AtlasBaseException; +import org.apache.atlas.model.impexp.AtlasExportResult; +import org.apache.atlas.model.impexp.AtlasImportRequest; +import org.apache.atlas.model.impexp.AtlasImportResult; +import org.apache.atlas.model.typedef.AtlasTypesDef; +import org.apache.atlas.repository.store.bootstrap.AtlasTypeDefStoreInitializer; +import org.apache.atlas.store.AtlasTypeDefStore; +import org.apache.atlas.type.AtlasType; +import org.apache.atlas.type.AtlasTypeRegistry; +import org.apache.commons.io.FileUtils; +import org.apache.solr.common.StringUtils; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; +import org.testng.Assert; + +import java.io.File; +import java.io.FileInputStream; +import java.io.FileNotFoundException; +import java.io.IOException; +import java.util.HashMap; +import java.util.List; +import java.util.Map; +import java.util.Set; + +public class ZipFileResourceTestUtils { + public static final Logger LOG = LoggerFactory.getLogger(ZipFileResourceTestUtils.class); + + public static FileInputStream getFileInputStream(String fileName) { + final String userDir = System.getProperty("user.dir"); + String filePath = getFilePath(userDir, fileName); + File f = new File(filePath); + FileInputStream fs = null; + try { + fs = new FileInputStream(f); + } catch (FileNotFoundException e) { + LOG.error("File could not be found at: %s", filePath, e); + } + return fs; + } + + private static String getFilePath(String startPath, String fileName) { + return startPath + "/src/test/resources/" + fileName; + } + + public static String getModelJson(String fileName) throws IOException { + final String userDir = System.getProperty("user.dir"); + String filePath = userDir + "/../addons/models/" + fileName; + File f = new File(filePath); + String s = FileUtils.readFileToString(f); + Assert.assertFalse(StringUtils.isEmpty(s), "Model file read correctly!"); + + return s; + } + + public static Object[][] getZipSource(String fileName) throws IOException { + FileInputStream fs = ZipFileResourceTestUtils.getFileInputStream(fileName); + + return new Object[][]{{new ZipSource(fs)}}; + } + + + public static void verifyImportedEntities(List<String> creationOrder, List<String> processedEntities) { + Set<String> lhs = com.google.common.collect.Sets.newHashSet(creationOrder); + Set<String> rhs = com.google.common.collect.Sets.newHashSet(processedEntities); + Set<String> difference = Sets.difference(lhs, rhs); + + Assert.assertNotNull(difference); + Assert.assertEquals(difference.size(), 0); + } + + public static void verifyImportedMetrics(AtlasExportResult exportResult, AtlasImportResult importResult) { + Map<String, Integer> metricsForCompare = getImportMetricsForCompare(importResult); + for (Map.Entry<String, Integer> entry : exportResult.getMetrics().entrySet()) { + if(entry.getKey().startsWith("entity") == false || + entry.getKey().contains("withExtInfo") || + entry.getKey().contains("Column") || + entry.getKey().contains("StorageDesc")) continue; + + Assert.assertTrue(metricsForCompare.containsKey(entry.getKey())); + Assert.assertEquals(entry.getValue(), metricsForCompare.get(entry.getKey())); + } + } + + private static Map<String,Integer> getImportMetricsForCompare(AtlasImportResult result) { + Map<String, Integer> r = new HashMap<>(); + for (Map.Entry<String, Integer> entry : result.getMetrics().entrySet()) { + r.put(entry.getKey().replace(":updated", "").replace(":created", ""), entry.getValue()); + } + + return r; + } + + + public static void loadModelFromJson(String fileName, AtlasTypeDefStore typeDefStore, AtlasTypeRegistry typeRegistry) throws IOException, AtlasBaseException { + AtlasTypesDef typesFromJson = getAtlasTypesDefFromFile(fileName); + createTypesAsNeeded(typesFromJson, typeDefStore, typeRegistry); + } + + private static void createTypesAsNeeded(AtlasTypesDef typesFromJson, AtlasTypeDefStore typeDefStore, AtlasTypeRegistry typeRegistry) throws AtlasBaseException { + AtlasTypesDef typesToCreate = AtlasTypeDefStoreInitializer.getTypesToCreate(typesFromJson, typeRegistry); + + if (!typesToCreate.isEmpty()) { + typeDefStore.createTypesDef(typesToCreate); + } + } + + private static AtlasTypesDef getAtlasTypesDefFromFile(String fileName) throws IOException { + String sampleTypes = ZipFileResourceTestUtils.getModelJson(fileName); + return AtlasType.fromJson(sampleTypes, AtlasTypesDef.class); + } + + public static AtlasImportRequest getDefaultImportRequest() { + return new AtlasImportRequest(); + } + + + public static AtlasImportResult runImportWithParameters(ImportService importService, AtlasImportRequest request, ZipSource source) throws AtlasBaseException, IOException { + final String requestingIP = "1.0.0.0"; + final String hostName = "localhost"; + final String userName = "admin"; + + AtlasImportResult result = importService.run(source, request, userName, hostName, requestingIP); + Assert.assertEquals(result.getOperationStatus(), AtlasImportResult.OperationStatus.SUCCESS); + return result; + } + + public static void runAndVerifyQuickStart_v1_Import(ImportService importService, ZipSource zipSource) throws AtlasBaseException, IOException { + AtlasExportResult exportResult = zipSource.getExportResult(); + List<String> creationOrder = zipSource.getCreationOrder(); + + AtlasImportRequest request = getDefaultImportRequest(); + AtlasImportResult result = runImportWithParameters(importService, request, zipSource); + + Assert.assertNotNull(result); + verifyImportedMetrics(exportResult, result); + verifyImportedEntities(creationOrder, result.getProcessedEntities()); + } +} http://git-wip-us.apache.org/repos/asf/incubator-atlas/blob/07e7faa6/repository/src/test/java/org/apache/atlas/repository/impexp/ZipSinkTest.java ---------------------------------------------------------------------- diff --git a/repository/src/test/java/org/apache/atlas/repository/impexp/ZipSinkTest.java b/repository/src/test/java/org/apache/atlas/repository/impexp/ZipSinkTest.java new file mode 100644 index 0000000..635caf7 --- /dev/null +++ b/repository/src/test/java/org/apache/atlas/repository/impexp/ZipSinkTest.java @@ -0,0 +1,153 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * <p> + * http://www.apache.org/licenses/LICENSE-2.0 + * <p> + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.atlas.repository.impexp; + + +import org.apache.atlas.exception.AtlasBaseException; +import org.apache.atlas.model.impexp.AtlasExportRequest; +import org.apache.atlas.model.impexp.AtlasExportResult; +import org.apache.atlas.model.instance.AtlasObjectId; +import org.apache.atlas.type.AtlasType; +import org.testng.Assert; +import org.testng.annotations.Test; + +import java.io.ByteArrayInputStream; +import java.io.ByteArrayOutputStream; +import java.io.IOException; +import java.util.ArrayList; +import java.util.Arrays; +import java.util.List; +import java.util.zip.ZipEntry; +import java.util.zip.ZipInputStream; + +public class ZipSinkTest { + private ByteArrayOutputStream byteArrayOutputStream = new ByteArrayOutputStream(); + private ZipSink zipSink; + private List<String> defaultExportOrder = new ArrayList<>(Arrays.asList("a", "b", "c", "d")); + private AtlasExportResult defaultExportResult; + + private void initZipSinkWithExportOrder() throws AtlasBaseException { + zipSink = new ZipSink(byteArrayOutputStream); + zipSink.setExportOrder(defaultExportOrder); + zipSink.close(); + } + + private AtlasExportResult getDefaultExportResult() { + AtlasExportRequest request = new AtlasExportRequest(); + + List<AtlasObjectId> itemsToExport = new ArrayList<>(); + itemsToExport.add(new AtlasObjectId("hive_db", "qualifiedName", "default")); + request.setItemsToExport(itemsToExport); + + defaultExportResult = new AtlasExportResult(request, "admin", "1.0.0.0", "root", 100); + return defaultExportResult; + } + + private ZipInputStream getZipInputStreamForDefaultExportOrder() throws AtlasBaseException { + initZipSinkWithExportOrder(); + + ByteArrayInputStream bis = new ByteArrayInputStream(byteArrayOutputStream.toByteArray()); + return new ZipInputStream(bis); + } + + private String getZipEntryAsStream(ZipInputStream zis) throws IOException { + byte[] buf = new byte[1024]; + int n = 0; + ByteArrayOutputStream bos = new ByteArrayOutputStream(); + while ((n = zis.read(buf, 0, 1024)) > -1) { + bos.write(buf, 0, n); + } + + Assert.assertNotNull(bos); + return bos.toString(); + } + + @Test + public void correctInit_succeeds() throws AtlasBaseException { + initZipSinkWithExportOrder(); + Assert.assertTrue(true); + Assert.assertNotNull(zipSink); + } + + @Test + public void zipWithExactlyOneEntry_succeeds() { + + try { + ZipInputStream zis = getZipInputStreamForDefaultExportOrder(); + + try { + Assert.assertNotNull(zis.getNextEntry()); + Assert.assertNull(zis.getNextEntry()); + } catch (IOException e) { + + Assert.assertTrue(false); + } + } catch (AtlasBaseException e) { + + Assert.assertTrue(false, "No exception should be thrown."); + } + } + + @Test + public void verifyExportOrderEntryName_verifies() throws AtlasBaseException, IOException { + + ZipInputStream zis = getZipInputStreamForDefaultExportOrder(); + ZipEntry ze = zis.getNextEntry(); + + Assert.assertEquals(ze.getName().replace(".json", ""), ZipExportFileNames.ATLAS_EXPORT_ORDER_NAME.toString()); + } + + @Test + public void zipWithExactlyOneEntry_ContentsVerified() throws AtlasBaseException, IOException { + + ZipInputStream zis = getZipInputStreamForDefaultExportOrder(); + zis.getNextEntry(); + + Assert.assertEquals(getZipEntryAsStream(zis).replace("\"", "'"), "['a','b','c','d']"); + } + + @Test + public void zipWithExactlyTwoEntries_ContentsVerified() throws AtlasBaseException, IOException { + + ByteArrayOutputStream byteOutputStream = new ByteArrayOutputStream(); + useZipSinkToCreateZipWithTwoEntries(byteOutputStream); + + ByteArrayInputStream bis = new ByteArrayInputStream(byteOutputStream.toByteArray()); + ZipInputStream zipStream = new ZipInputStream(bis); + ZipEntry entry = zipStream.getNextEntry(); + + Assert.assertEquals(getZipEntryAsStream(zipStream), "[\"a\",\"b\",\"c\",\"d\"]"); + Assert.assertEquals(entry.getName().replace(".json", ""), ZipExportFileNames.ATLAS_EXPORT_ORDER_NAME.toString()); + + entry = zipStream.getNextEntry(); + Assert.assertEquals(entry.getName().replace(".json", ""), ZipExportFileNames.ATLAS_EXPORT_INFO_NAME.toString()); + Assert.assertTrue(compareJsonWithObject(getZipEntryAsStream(zipStream), defaultExportResult)); + } + + private void useZipSinkToCreateZipWithTwoEntries(ByteArrayOutputStream byteOutputStream) throws AtlasBaseException { + ZipSink zs = new ZipSink(byteOutputStream); + zs.setExportOrder(defaultExportOrder); + zs.setResult(getDefaultExportResult()); + zs.close(); + } + + private boolean compareJsonWithObject(String s, AtlasExportResult defaultExportResult) { + String json = AtlasType.toJson(defaultExportResult); + return json.equals(s); + } +}
