Repository: incubator-atlas Updated Branches: refs/heads/master 42426a1b7 -> 8a32ccaae
http://git-wip-us.apache.org/repos/asf/incubator-atlas/blob/8a32ccaa/repository/src/test/java/org/apache/atlas/repository/impexp/ZipSourceTest.java ---------------------------------------------------------------------- diff --git a/repository/src/test/java/org/apache/atlas/repository/impexp/ZipSourceTest.java b/repository/src/test/java/org/apache/atlas/repository/impexp/ZipSourceTest.java new file mode 100644 index 0000000..e124ffc --- /dev/null +++ b/repository/src/test/java/org/apache/atlas/repository/impexp/ZipSourceTest.java @@ -0,0 +1,106 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.atlas.repository.impexp; + +import org.apache.atlas.exception.AtlasBaseException; +import org.apache.atlas.model.instance.AtlasEntity; +import org.apache.atlas.model.typedef.AtlasTypesDef; +import org.testng.Assert; +import org.testng.annotations.DataProvider; +import org.testng.annotations.Test; + +import java.io.ByteArrayInputStream; +import java.io.FileInputStream; +import java.io.IOException; +import java.util.List; + +public class ZipSourceTest { + @DataProvider(name = "zipFileStocks") + public static Object[][] getDataFromZipFile() throws IOException { + FileInputStream fs = ZipFileResourceTestUtils.getFileInputStream("stocks.zip"); + + return new Object[][] {{ new ZipSource(fs) }}; + } + + @Test + public void improperInit_ReturnsNullCreationOrder() throws IOException, AtlasBaseException { + byte bytes[] = new byte[10]; + ByteArrayInputStream bais = new ByteArrayInputStream(bytes); + ZipSource zs = new ZipSource(bais); + List<String> s = zs.getCreationOrder(); + Assert.assertNull(s); + } + + @Test(dataProvider = "zipFileStocks") + public void examineContents_BehavesAsExpected(ZipSource zipSource) throws IOException, AtlasBaseException { + List<String> creationOrder = zipSource.getCreationOrder(); + Assert.assertNotNull(creationOrder); + Assert.assertEquals(creationOrder.size(), 4); + + AtlasTypesDef typesDef = zipSource.getTypesDef(); + Assert.assertNotNull(typesDef); + Assert.assertEquals(typesDef.getEntityDefs().size(), 6); + + useCreationOrderToFetchEntitiesWithExtInfo(zipSource, creationOrder); + useCreationOrderToFetchEntities(zipSource, creationOrder); + attemptToFetchNonExistentGuid_ReturnsNull(zipSource, "non-existent-guid"); + verifyGuidRemovalOnImportComplete(zipSource, creationOrder.get(0)); + } + + private void useCreationOrderToFetchEntities(ZipSource zipSource, List<String> creationOrder) { + for (String guid : creationOrder) { + AtlasEntity e = zipSource.getByGuid(guid); + Assert.assertNotNull(e); + } + } + + private void verifyGuidRemovalOnImportComplete(ZipSource zipSource, String guid) { + AtlasEntity e = zipSource.getByGuid(guid); + Assert.assertNotNull(e); + + zipSource.onImportComplete(guid); + + e = zipSource.getByGuid(guid); + Assert.assertNull(e); + } + + private void attemptToFetchNonExistentGuid_ReturnsNull(ZipSource zipSource, String guid) { + AtlasEntity e = zipSource.getByGuid(guid); + Assert.assertNull(e); + } + + private void useCreationOrderToFetchEntitiesWithExtInfo(ZipSource zipSource, List<String> creationOrder) throws AtlasBaseException { + for (String guid : creationOrder) { + AtlasEntity.AtlasEntityExtInfo e = zipSource.getEntityWithExtInfo(guid); + Assert.assertNotNull(e); + } + } + + @Test(dataProvider = "zipFileStocks") + public void iteratorBehavor_WorksAsExpected(ZipSource zipSource) throws IOException, AtlasBaseException { + Assert.assertTrue(zipSource.hasNext()); + + List<String> creationOrder = zipSource.getCreationOrder(); + for (int i = 0; i < creationOrder.size(); i++) { + AtlasEntity e = zipSource.next(); + Assert.assertEquals(e.getGuid(), creationOrder.get(i)); + } + + Assert.assertFalse(zipSource.hasNext()); + } +} http://git-wip-us.apache.org/repos/asf/incubator-atlas/blob/8a32ccaa/repository/src/test/resources/dept-employee-test-utils.zip ---------------------------------------------------------------------- diff --git a/repository/src/test/resources/dept-employee-test-utils.zip b/repository/src/test/resources/dept-employee-test-utils.zip new file mode 100644 index 0000000..e6f36f8 Binary files /dev/null and b/repository/src/test/resources/dept-employee-test-utils.zip differ http://git-wip-us.apache.org/repos/asf/incubator-atlas/blob/8a32ccaa/repository/src/test/resources/logging-v1-full.zip ---------------------------------------------------------------------- diff --git a/repository/src/test/resources/logging-v1-full.zip b/repository/src/test/resources/logging-v1-full.zip new file mode 100644 index 0000000..69c54ee Binary files /dev/null and b/repository/src/test/resources/logging-v1-full.zip differ http://git-wip-us.apache.org/repos/asf/incubator-atlas/blob/8a32ccaa/repository/src/test/resources/reporting-v1-full.zip ---------------------------------------------------------------------- diff --git a/repository/src/test/resources/reporting-v1-full.zip b/repository/src/test/resources/reporting-v1-full.zip new file mode 100644 index 0000000..c2fc9c2 Binary files /dev/null and b/repository/src/test/resources/reporting-v1-full.zip differ http://git-wip-us.apache.org/repos/asf/incubator-atlas/blob/8a32ccaa/repository/src/test/resources/sales-v1-full.zip ---------------------------------------------------------------------- diff --git a/repository/src/test/resources/sales-v1-full.zip b/repository/src/test/resources/sales-v1-full.zip new file mode 100644 index 0000000..07afbf6 Binary files /dev/null and b/repository/src/test/resources/sales-v1-full.zip differ http://git-wip-us.apache.org/repos/asf/incubator-atlas/blob/8a32ccaa/repository/src/test/resources/stocks.zip ---------------------------------------------------------------------- diff --git a/repository/src/test/resources/stocks.zip b/repository/src/test/resources/stocks.zip new file mode 100644 index 0000000..4737ac3 Binary files /dev/null and b/repository/src/test/resources/stocks.zip differ http://git-wip-us.apache.org/repos/asf/incubator-atlas/blob/8a32ccaa/webapp/src/main/java/org/apache/atlas/web/resources/AdminResource.java ---------------------------------------------------------------------- diff --git a/webapp/src/main/java/org/apache/atlas/web/resources/AdminResource.java b/webapp/src/main/java/org/apache/atlas/web/resources/AdminResource.java index 097589f..9fe1624 100755 --- a/webapp/src/main/java/org/apache/atlas/web/resources/AdminResource.java +++ b/webapp/src/main/java/org/apache/atlas/web/resources/AdminResource.java @@ -31,6 +31,10 @@ import org.apache.atlas.model.impexp.AtlasExportResult; import org.apache.atlas.model.impexp.AtlasImportRequest; import org.apache.atlas.model.impexp.AtlasImportResult; import org.apache.atlas.model.metrics.AtlasMetrics; +import org.apache.atlas.repository.impexp.ExportService; +import org.apache.atlas.repository.impexp.ImportService; +import org.apache.atlas.repository.impexp.ZipSink; +import org.apache.atlas.repository.impexp.ZipSource; import org.apache.atlas.repository.store.graph.AtlasEntityStore; import org.apache.atlas.services.MetricsService; import org.apache.atlas.store.AtlasTypeDefStore; @@ -53,7 +57,13 @@ import org.springframework.security.core.context.SecurityContextHolder; import javax.inject.Singleton; import javax.servlet.http.HttpServletRequest; import javax.servlet.http.HttpServletResponse; -import javax.ws.rs.*; +import javax.ws.rs.Consumes; +import javax.ws.rs.GET; +import javax.ws.rs.POST; +import javax.ws.rs.Path; +import javax.ws.rs.Produces; +import javax.ws.rs.QueryParam; +import javax.ws.rs.WebApplicationException; import javax.ws.rs.core.Context; import javax.ws.rs.core.MediaType; import javax.ws.rs.core.Response; @@ -359,7 +369,7 @@ public class AdminResource { try { AtlasImportRequest request = new AtlasImportRequest(Servlets.getParameterMap(httpServletRequest)); ByteArrayInputStream inputStream = new ByteArrayInputStream(bytes); - ImportService importService = new ImportService(this.typesDefStore, this.entityStore, this.typeRegistry); + ImportService importService = new ImportService(this.typesDefStore, this.entityStore, this.typeRegistry); ZipSource zipSource = new ZipSource(inputStream); http://git-wip-us.apache.org/repos/asf/incubator-atlas/blob/8a32ccaa/webapp/src/main/java/org/apache/atlas/web/resources/ExportService.java ---------------------------------------------------------------------- diff --git a/webapp/src/main/java/org/apache/atlas/web/resources/ExportService.java b/webapp/src/main/java/org/apache/atlas/web/resources/ExportService.java deleted file mode 100644 index 159369c..0000000 --- a/webapp/src/main/java/org/apache/atlas/web/resources/ExportService.java +++ /dev/null @@ -1,692 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.atlas.web.resources; - -import org.apache.atlas.AtlasErrorCode; -import org.apache.atlas.AtlasException; -import org.apache.atlas.AtlasServiceException; -import org.apache.atlas.exception.AtlasBaseException; -import org.apache.atlas.model.TypeCategory; -import org.apache.atlas.model.impexp.AtlasExportRequest; -import org.apache.atlas.model.impexp.AtlasExportResult; -import org.apache.atlas.model.instance.AtlasClassification; -import org.apache.atlas.model.instance.AtlasEntity; -import org.apache.atlas.model.instance.AtlasEntity.AtlasEntityWithExtInfo; -import org.apache.atlas.model.instance.AtlasObjectId; -import org.apache.atlas.model.typedef.AtlasBaseTypeDef; -import org.apache.atlas.model.typedef.AtlasClassificationDef; -import org.apache.atlas.model.typedef.AtlasEnumDef; -import org.apache.atlas.model.typedef.AtlasEntityDef; -import org.apache.atlas.model.typedef.AtlasStructDef; -import org.apache.atlas.model.typedef.AtlasStructDef.AtlasAttributeDef; -import org.apache.atlas.model.typedef.AtlasTypesDef; -import org.apache.atlas.repository.graph.AtlasGraphProvider; -import org.apache.atlas.repository.graphdb.AtlasGraph; -import org.apache.atlas.repository.store.graph.v1.EntityGraphRetriever; -import org.apache.atlas.type.AtlasArrayType; -import org.apache.atlas.type.AtlasClassificationType; -import org.apache.atlas.type.AtlasEnumType; -import org.apache.atlas.type.AtlasEntityType; -import org.apache.atlas.type.AtlasMapType; -import org.apache.atlas.type.AtlasStructType; -import org.apache.atlas.type.AtlasStructType.AtlasAttribute; -import org.apache.atlas.type.AtlasType; -import org.apache.atlas.type.AtlasTypeRegistry; -import org.apache.atlas.type.AtlasTypeUtil; -import org.apache.atlas.util.AtlasGremlinQueryProvider; -import org.apache.atlas.util.AtlasGremlinQueryProvider.AtlasGremlinQuery; -import org.apache.commons.collections.CollectionUtils; -import org.apache.commons.collections.MapUtils; -import org.apache.commons.lang3.StringUtils; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; - -import javax.script.ScriptEngine; -import javax.script.ScriptException; -import java.util.ArrayList; -import java.util.Collections; -import java.util.HashMap; -import java.util.HashSet; -import java.util.List; -import java.util.Map; -import java.util.Set; - -import static org.apache.atlas.model.impexp.AtlasExportRequest.*; - -public class ExportService { - private static final Logger LOG = LoggerFactory.getLogger(ExportService.class); - - private final AtlasTypeRegistry typeRegistry; - private final AtlasGraph atlasGraph; - private final EntityGraphRetriever entityGraphRetriever; - private final AtlasGremlinQueryProvider gremlinQueryProvider; - - public ExportService(final AtlasTypeRegistry typeRegistry) throws AtlasBaseException { - this.typeRegistry = typeRegistry; - this.entityGraphRetriever = new EntityGraphRetriever(this.typeRegistry); - this.atlasGraph = AtlasGraphProvider.getGraphInstance(); - this.gremlinQueryProvider = AtlasGremlinQueryProvider.INSTANCE; - } - - public AtlasExportResult run(ZipSink exportSink, AtlasExportRequest request, String userName, String hostName, - String requestingIP) throws AtlasBaseException { - long startTime = System.currentTimeMillis(); - AtlasExportResult result = new AtlasExportResult(request, userName, hostName, requestingIP, startTime); - ExportContext context = new ExportContext(result, exportSink); - - try { - LOG.info("==> export(user={}, from={})", userName, requestingIP); - - for (AtlasObjectId item : request.getItemsToExport()) { - processObjectId(item, context); - } - - long endTime = System.currentTimeMillis(); - - AtlasTypesDef typesDef = context.result.getData().getTypesDef(); - - for (String entityType : context.entityTypes) { - AtlasEntityDef entityDef = typeRegistry.getEntityDefByName(entityType); - - typesDef.getEntityDefs().add(entityDef); - } - - for (String classificationType : context.classificationTypes) { - AtlasClassificationDef classificationDef = typeRegistry.getClassificationDefByName(classificationType); - - typesDef.getClassificationDefs().add(classificationDef); - } - - for (String structType : context.structTypes) { - AtlasStructDef structDef = typeRegistry.getStructDefByName(structType); - - typesDef.getStructDefs().add(structDef); - } - - for (String enumType : context.enumTypes) { - AtlasEnumDef enumDef = typeRegistry.getEnumDefByName(enumType); - - typesDef.getEnumDefs().add(enumDef); - } - - context.sink.setExportOrder(context.result.getData().getEntityCreationOrder()); - context.sink.setTypesDef(context.result.getData().getTypesDef()); - context.result.setData(null); - context.result.setOperationStatus(AtlasExportResult.OperationStatus.SUCCESS); - context.result.incrementMeticsCounter("duration", (int) (endTime - startTime)); - - context.sink.setResult(context.result); - } catch(Exception ex) { - LOG.error("Operation failed: ", ex); - } finally { - atlasGraph.releaseGremlinScriptEngine(context.scriptEngine); - LOG.info("<== export(user={}, from={}): status {}", userName, requestingIP, context.result.getOperationStatus()); - context.clear(); - result.clear(); - } - - return context.result; - } - - private void processObjectId(AtlasObjectId item, ExportContext context) throws AtlasServiceException, AtlasException, AtlasBaseException { - if (LOG.isDebugEnabled()) { - LOG.debug("==> processObjectId({})", item); - } - - try { - List<AtlasEntityWithExtInfo> entities = getStartingEntity(item, context); - - for (AtlasEntityWithExtInfo entityWithExtInfo : entities) { - processEntity(entityWithExtInfo.getEntity().getGuid(), context); - } - - while (!context.guidsToProcess.isEmpty()) { - while (!context.guidsToProcess.isEmpty()) { - String guid = context.guidsToProcess.remove(0); - processEntity(guid, context); - } - - if (!context.guidsLineageToProcess.isEmpty()) { - context.guidsToProcess.addAll(context.guidsLineageToProcess); - context.guidsLineageToProcess.clear(); - } - } - } catch (AtlasBaseException excp) { - context.result.setOperationStatus(AtlasExportResult.OperationStatus.PARTIAL_SUCCESS); - - LOG.error("Fetching entity failed for: {}", item, excp); - } - - if (LOG.isDebugEnabled()) { - LOG.debug("<== processObjectId({})", item); - } - } - - private List<AtlasEntityWithExtInfo> getStartingEntity(AtlasObjectId item, ExportContext context) throws AtlasBaseException { - List<AtlasEntityWithExtInfo> ret = new ArrayList<>(); - - if (StringUtils.isNotEmpty(item.getGuid())) { - AtlasEntityWithExtInfo entity = entityGraphRetriever.toAtlasEntityWithExtInfo(item); - - if (entity != null) { - ret = Collections.singletonList(entity); - } - } else if (StringUtils.isNotEmpty(item.getTypeName()) && MapUtils.isNotEmpty(item.getUniqueAttributes())) { - String typeName = item.getTypeName(); - AtlasEntityType entityType = typeRegistry.getEntityTypeByName(typeName); - - if (entityType == null) { - throw new AtlasBaseException(AtlasErrorCode.UNKNOWN_TYPENAME, typeName); - } - - final String queryTemplate; - if (StringUtils.equalsIgnoreCase(context.matchType, MATCH_TYPE_STARTS_WITH)) { - queryTemplate = gremlinQueryProvider.getQuery(AtlasGremlinQuery.EXPORT_TYPE_STARTS_WITH); - } else if (StringUtils.equalsIgnoreCase(context.matchType, MATCH_TYPE_ENDS_WITH)) { - queryTemplate = gremlinQueryProvider.getQuery(AtlasGremlinQuery.EXPORT_TYPE_ENDS_WITH); - } else if (StringUtils.equalsIgnoreCase(context.matchType, MATCH_TYPE_CONTAINS)) { - queryTemplate = gremlinQueryProvider.getQuery(AtlasGremlinQuery.EXPORT_TYPE_CONTAINS); - } else if (StringUtils.equalsIgnoreCase(context.matchType, MATCH_TYPE_MATCHES)) { - queryTemplate = gremlinQueryProvider.getQuery(AtlasGremlinQuery.EXPORT_TYPE_MATCHES); - } else { // default - queryTemplate = gremlinQueryProvider.getQuery(AtlasGremlinQuery.EXPORT_TYPE_DEFAULT); - } - - for (Map.Entry<String, Object> e : item.getUniqueAttributes().entrySet()) { - String attrName = e.getKey(); - Object attrValue = e.getValue(); - - AtlasAttribute attribute = entityType.getAttribute(attrName); - - if (attribute == null || attrValue == null) { - continue; - } - - context.bindings.clear(); - context.bindings.put("typeName", typeName); - context.bindings.put("attrName", attribute.getQualifiedName()); - context.bindings.put("attrValue", attrValue); - - List<String> guids = executeGremlinQueryForGuids(queryTemplate, context); - - if (CollectionUtils.isNotEmpty(guids)) { - for (String guid : guids) { - AtlasEntityWithExtInfo entityWithExtInfo = entityGraphRetriever.toAtlasEntityWithExtInfo(guid); - - if (entityWithExtInfo == null) { - continue; - } - - ret.add(entityWithExtInfo); - } - } - - break; - } - - LOG.info("export(item={}; matchType={}, fetchType={}): found {} entities", item, context.matchType, context.fetchType, ret.size()); - } - - return ret; - } - - private void processEntity(String guid, ExportContext context) throws AtlasBaseException { - if (LOG.isDebugEnabled()) { - LOG.debug("==> processEntity({})", guid); - } - - if (!context.guidsProcessed.contains(guid)) { - TraversalDirection direction = context.guidDirection.get(guid); - AtlasEntityWithExtInfo entityWithExtInfo = entityGraphRetriever.toAtlasEntityWithExtInfo(guid); - - context.result.getData().getEntityCreationOrder().add(entityWithExtInfo.getEntity().getGuid()); - - addEntity(entityWithExtInfo, context); - addTypes(entityWithExtInfo.getEntity(), context); - - context.guidsProcessed.add(entityWithExtInfo.getEntity().getGuid()); - getConntedEntitiesBasedOnOption(entityWithExtInfo.getEntity(), context, direction); - - if(entityWithExtInfo.getReferredEntities() != null) { - for (AtlasEntity e : entityWithExtInfo.getReferredEntities().values()) { - addTypes(e, context); - getConntedEntitiesBasedOnOption(e, context, direction); - } - - context.guidsProcessed.addAll(entityWithExtInfo.getReferredEntities().keySet()); - } - } - - if (LOG.isDebugEnabled()) { - LOG.debug("<== processEntity({})", guid); - } - } - - private void getConntedEntitiesBasedOnOption(AtlasEntity entity, ExportContext context, TraversalDirection direction) throws AtlasBaseException { - switch (context.fetchType) { - case CONNECTED: - getEntityGuidsForConnectedFetch(entity, context, direction); - break; - - case FULL: - default: - getEntityGuidsForFullFetch(entity, context); - } - } - - private void getEntityGuidsForConnectedFetch(AtlasEntity entity, ExportContext context, TraversalDirection direction) throws AtlasBaseException { - if (direction == null || direction == TraversalDirection.UNKNOWN) { - getConnectedEntityGuids(entity, context, TraversalDirection.OUTWARD, TraversalDirection.INWARD); - } else { - if (isProcessEntity(entity)) { - direction = TraversalDirection.OUTWARD; - } - - getConnectedEntityGuids(entity, context, direction); - } - } - - private boolean isProcessEntity(AtlasEntity entity) throws AtlasBaseException { - String typeName = entity.getTypeName(); - AtlasEntityType entityType = typeRegistry.getEntityTypeByName(typeName); - - return entityType.isSubTypeOf(AtlasBaseTypeDef.ATLAS_TYPE_PROCESS); - } - - private void getConnectedEntityGuids(AtlasEntity entity, ExportContext context, TraversalDirection... directions) { - if(directions == null) { - return; - } - - for (TraversalDirection direction : directions) { - String query = getQueryForTraversalDirection(direction); - - if (LOG.isDebugEnabled()) { - LOG.debug("==> getConnectedEntityGuids({}): guidsToProcess {} query {}", AtlasTypeUtil.getAtlasObjectId(entity), context.guidsToProcess.size(), query); - } - - context.bindings.clear(); - context.bindings.put("startGuid", entity.getGuid()); - - List<HashMap<String, Object>> result = executeGremlinQuery(query, context); - - if (CollectionUtils.isEmpty(result)) { - continue; - } - - for (HashMap<String, Object> hashMap : result) { - String guid = (String) hashMap.get("__guid"); - TraversalDirection currentDirection = context.guidDirection.get(guid); - boolean isLineage = (boolean) hashMap.get("isProcess"); - - if (currentDirection == null) { - context.addToBeProcessed(isLineage, guid, direction); - - } else if (currentDirection == TraversalDirection.OUTWARD && direction == TraversalDirection.INWARD) { - // the entity should be reprocessed to get inward entities - context.guidsProcessed.remove(guid); - context.addToBeProcessed(isLineage, guid, direction); - } - } - - if (LOG.isDebugEnabled()) { - LOG.debug("<== getConnectedEntityGuids({}): found {} guids; guidsToProcess {}", entity.getGuid(), result.size(), context.guidsToProcess.size()); - } - } - } - - private String getQueryForTraversalDirection(TraversalDirection direction) { - switch (direction) { - case INWARD: - return this.gremlinQueryProvider.getQuery(AtlasGremlinQuery.EXPORT_BY_GUID_CONNECTED_IN_EDGE); - - default: - case OUTWARD: - return this.gremlinQueryProvider.getQuery(AtlasGremlinQuery.EXPORT_BY_GUID_CONNECTED_OUT_EDGE); - } - } - - private void getEntityGuidsForFullFetch(AtlasEntity entity, ExportContext context) { - if (LOG.isDebugEnabled()) { - LOG.debug("==> getEntityGuidsForFullFetch({}): guidsToProcess {}", AtlasTypeUtil.getAtlasObjectId(entity), context.guidsToProcess.size()); - } - - String query = this.gremlinQueryProvider.getQuery(AtlasGremlinQuery.EXPORT_BY_GUID_FULL); - - context.bindings.clear(); - context.bindings.put("startGuid", entity.getGuid()); - - List<HashMap<String, Object>> result = executeGremlinQuery(query, context); - - if (CollectionUtils.isEmpty(result)) { - return; - } - - for (HashMap<String, Object> hashMap : result) { - String guid = (String) hashMap.get("__guid"); - boolean isLineage = (boolean) hashMap.get("isProcess"); - - if (!context.guidsProcessed.contains(guid)) { - context.addToBeProcessed(isLineage, guid, TraversalDirection.BOTH); - } - } - - if (LOG.isDebugEnabled()) { - LOG.debug("<== getEntityGuidsForFullFetch({}): found {} guids; guidsToProcess {}", entity.getGuid(), result.size(), context.guidsToProcess.size()); - } - } - - private void addEntity(AtlasEntityWithExtInfo entity, ExportContext context) throws AtlasBaseException { - context.sink.add(entity); - - context.result.incrementMeticsCounter(String.format("entity:%s", entity.getEntity().getTypeName())); - if(entity.getReferredEntities() != null) { - for (AtlasEntity e: entity.getReferredEntities().values()) { - context.result.incrementMeticsCounter(String.format("entity:%s", e.getTypeName())); - } - } - - context.result.incrementMeticsCounter("entity:withExtInfo"); - context.reportProgress(); - } - - private void addTypes(AtlasEntity entity, ExportContext context) { - addEntityType(entity.getTypeName(), context); - - if(CollectionUtils.isNotEmpty(entity.getClassifications())) { - for (AtlasClassification c : entity.getClassifications()) { - addClassificationType(c.getTypeName(), context); - } - } - } - - private void addType(String typeName, ExportContext context) { - AtlasType type = null; - - try { - type = typeRegistry.getType(typeName); - - addType(type, context); - } catch (AtlasBaseException excp) { - LOG.error("unknown type {}", typeName); - } - } - - private void addEntityType(String typeName, ExportContext context) { - if (!context.entityTypes.contains(typeName)) { - AtlasEntityType entityType = typeRegistry.getEntityTypeByName(typeName); - - addEntityType(entityType, context); - } - } - - private void addClassificationType(String typeName, ExportContext context) { - if (!context.classificationTypes.contains(typeName)) { - AtlasClassificationType classificationType = typeRegistry.getClassificationTypeByName(typeName); - - addClassificationType(classificationType, context); - } - } - - private void addType(AtlasType type, ExportContext context) { - if (type.getTypeCategory() == TypeCategory.PRIMITIVE) { - return; - } - - if (type instanceof AtlasArrayType) { - AtlasArrayType arrayType = (AtlasArrayType)type; - - addType(arrayType.getElementType(), context); - } else if (type instanceof AtlasMapType) { - AtlasMapType mapType = (AtlasMapType)type; - - addType(mapType.getKeyType(), context); - addType(mapType.getValueType(), context); - } else if (type instanceof AtlasEntityType) { - addEntityType((AtlasEntityType)type, context); - } else if (type instanceof AtlasClassificationType) { - addClassificationType((AtlasClassificationType)type, context); - } else if (type instanceof AtlasStructType) { - addStructType((AtlasStructType)type, context); - } else if (type instanceof AtlasEnumType) { - addEnumType((AtlasEnumType)type, context); - } - } - - private void addEntityType(AtlasEntityType entityType, ExportContext context) { - if (!context.entityTypes.contains(entityType.getTypeName())) { - context.entityTypes.add(entityType.getTypeName()); - - addAttributeTypes(entityType, context); - - if (CollectionUtils.isNotEmpty(entityType.getAllSuperTypes())) { - for (String superType : entityType.getAllSuperTypes()) { - addEntityType(superType, context); - } - } - } - } - - private void addClassificationType(AtlasClassificationType classificationType, ExportContext context) { - if (!context.classificationTypes.contains(classificationType.getTypeName())) { - context.classificationTypes.add(classificationType.getTypeName()); - - addAttributeTypes(classificationType, context); - - if (CollectionUtils.isNotEmpty(classificationType.getAllSuperTypes())) { - for (String superType : classificationType.getAllSuperTypes()) { - addClassificationType(superType, context); - } - } - } - } - - private void addStructType(AtlasStructType structType, ExportContext context) { - if (!context.structTypes.contains(structType.getTypeName())) { - context.structTypes.add(structType.getTypeName()); - - addAttributeTypes(structType, context); - } - } - - private void addEnumType(AtlasEnumType enumType, ExportContext context) { - if (!context.enumTypes.contains(enumType.getTypeName())) { - context.enumTypes.add(enumType.getTypeName()); - } - } - - private void addAttributeTypes(AtlasStructType structType, ExportContext context) { - for (AtlasAttributeDef attributeDef : structType.getStructDef().getAttributeDefs()) { - addType(attributeDef.getTypeName(), context); - } - } - - private List<HashMap<String, Object>> executeGremlinQuery(String query, ExportContext context) { - try { - return (List<HashMap<String, Object>>) atlasGraph.executeGremlinScript(context.scriptEngine, context.bindings, query, false); - } catch (ScriptException e) { - LOG.error("Script execution failed for query: ", query, e); - return null; - } - } - - private List<String> executeGremlinQueryForGuids(String query, ExportContext context) { - try { - return (List<String>) atlasGraph.executeGremlinScript(context.scriptEngine, context.bindings, query, false); - } catch (ScriptException e) { - LOG.error("Script execution failed for query: ", query, e); - return null; - } - } - - private enum TraversalDirection { - UNKNOWN, - INWARD, - OUTWARD, - BOTH; - } - - - public enum ExportFetchType { - FULL(FETCH_TYPE_FULL), - CONNECTED(FETCH_TYPE_CONNECTED); - - final String str; - ExportFetchType(String s) { - this.str = s; - } - - public static final ExportFetchType from(String s) { - for (ExportFetchType b : ExportFetchType.values()) { - if (b.str.equalsIgnoreCase(s)) { - return b; - } - } - - return FULL; - } - } - - private class UniqueList<T> { - private final List<T> list = new ArrayList<>(); - private final Set<T> set = new HashSet<>(); - - public void add(T e) { - if(set.contains(e)) { - return; - } - - list.add(e); - set.add(e); - } - - public void addAll(UniqueList<T> uniqueList) { - for (T item : uniqueList.list) { - if(set.contains(item)) continue; - - set.add(item); - list.add(item); - } - } - - public T remove(int index) { - T e = list.remove(index); - set.remove(e); - return e; - } - - public boolean contains(T e) { - return set.contains(e); - } - - public int size() { - return list.size(); - } - - public boolean isEmpty() { - return list.isEmpty(); - } - - public void clear() { - list.clear(); - set.clear(); - } - } - - - private class ExportContext { - final Set<String> guidsProcessed = new HashSet<>(); - final UniqueList<String> guidsToProcess = new UniqueList<>(); - final UniqueList<String> guidsLineageToProcess = new UniqueList<>(); - final Map<String, TraversalDirection> guidDirection = new HashMap<>(); - final Set<String> entityTypes = new HashSet<>(); - final Set<String> classificationTypes = new HashSet<>(); - final Set<String> structTypes = new HashSet<>(); - final Set<String> enumTypes = new HashSet<>(); - final AtlasExportResult result; - final ZipSink sink; - - private final ScriptEngine scriptEngine; - private final Map<String, Object> bindings; - private final ExportFetchType fetchType; - private final String matchType; - - private int progressReportCount = 0; - - ExportContext(AtlasExportResult result, ZipSink sink) throws AtlasBaseException { - this.result = result; - this.sink = sink; - - scriptEngine = atlasGraph.getGremlinScriptEngine(); - bindings = new HashMap<>(); - fetchType = getFetchType(result.getRequest()); - matchType = getMatchType(result.getRequest()); - } - - private ExportFetchType getFetchType(AtlasExportRequest request) { - Object fetchOption = request.getOptions() != null ? request.getOptions().get(OPTION_FETCH_TYPE) : null; - - if (fetchOption instanceof String) { - return ExportFetchType.from((String) fetchOption); - } else if (fetchOption instanceof ExportFetchType) { - return (ExportFetchType) fetchOption; - } - - return ExportFetchType.FULL; - } - - private String getMatchType(AtlasExportRequest request) { - String matchType = null; - - if (MapUtils.isNotEmpty(request.getOptions())) { - if (request.getOptions().get(OPTION_ATTR_MATCH_TYPE) != null) { - matchType = request.getOptions().get(OPTION_ATTR_MATCH_TYPE).toString(); - } - } - - return matchType; - } - - public void clear() { - guidsToProcess.clear(); - guidsProcessed.clear(); - guidDirection.clear(); - } - - public void addToBeProcessed(boolean isSuperTypeProcess, String guid, TraversalDirection direction) { - if(!isSuperTypeProcess) { - guidsToProcess.add(guid); - } - - if(isSuperTypeProcess) { - guidsLineageToProcess.add(guid); - } - - guidDirection.put(guid, direction); - } - - public void reportProgress() { - - if ((guidsProcessed.size() - progressReportCount) > 1000) { - progressReportCount = guidsProcessed.size(); - - LOG.info("export(): in progress.. number of entities exported: {}", this.guidsProcessed.size()); - } - } - } -} http://git-wip-us.apache.org/repos/asf/incubator-atlas/blob/8a32ccaa/webapp/src/main/java/org/apache/atlas/web/resources/ImportService.java ---------------------------------------------------------------------- diff --git a/webapp/src/main/java/org/apache/atlas/web/resources/ImportService.java b/webapp/src/main/java/org/apache/atlas/web/resources/ImportService.java deleted file mode 100644 index eb81e3c..0000000 --- a/webapp/src/main/java/org/apache/atlas/web/resources/ImportService.java +++ /dev/null @@ -1,164 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * <p> - * http://www.apache.org/licenses/LICENSE-2.0 - * <p> - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.atlas.web.resources; - -import org.apache.atlas.AtlasErrorCode; -import org.apache.atlas.exception.AtlasBaseException; -import org.apache.atlas.model.impexp.AtlasImportRequest; -import org.apache.atlas.model.impexp.AtlasImportResult; -import org.apache.atlas.model.typedef.*; -import org.apache.atlas.repository.store.bootstrap.AtlasTypeDefStoreInitializer; -import org.apache.atlas.repository.store.graph.AtlasEntityStore; -import org.apache.atlas.store.AtlasTypeDefStore; -import org.apache.atlas.type.AtlasTypeRegistry; -import org.apache.commons.io.FileUtils; -import org.apache.commons.lang3.StringUtils; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; - -import java.io.ByteArrayInputStream; -import java.io.File; -import java.io.FileNotFoundException; - - -public class ImportService { - private static final Logger LOG = LoggerFactory.getLogger(ImportService.class); - - private final AtlasTypeDefStore typeDefStore; - private final AtlasEntityStore entityStore; - private final AtlasTypeRegistry typeRegistry; - - private long startTimestamp; - private long endTimestamp; - - - public ImportService(final AtlasTypeDefStore typeDefStore, final AtlasEntityStore entityStore, AtlasTypeRegistry typeRegistry) { - this.typeDefStore = typeDefStore; - this.entityStore = entityStore; - this.typeRegistry = typeRegistry; - } - - public AtlasImportResult run(ZipSource source, AtlasImportRequest request, String userName, - String hostName, String requestingIP) throws AtlasBaseException { - AtlasImportResult result = new AtlasImportResult(request, userName, requestingIP, hostName, System.currentTimeMillis()); - - try { - LOG.info("==> import(user={}, from={})", userName, requestingIP); - - startTimestamp = System.currentTimeMillis(); - processTypes(source.getTypesDef(), result); - processEntities(source, result); - - result.setOperationStatus(AtlasImportResult.OperationStatus.SUCCESS); - } catch (AtlasBaseException excp) { - LOG.error("import(user={}, from={}): failed", userName, requestingIP, excp); - - throw excp; - } catch (Exception excp) { - LOG.error("import(user={}, from={}): failed", userName, requestingIP, excp); - - throw new AtlasBaseException(excp); - } finally { - source.close(); - LOG.info("<== import(user={}, from={}): status={}", userName, requestingIP, result.getOperationStatus()); - } - - return result; - } - - public AtlasImportResult run(AtlasImportRequest request, String userName, String hostName, String requestingIP) - throws AtlasBaseException { - String fileName = (String)request.getOptions().get("FILENAME"); - - if (StringUtils.isBlank(fileName)) { - throw new AtlasBaseException(AtlasErrorCode.INVALID_PARAMETERS, "FILENAME parameter not found"); - } - - AtlasImportResult result = null; - - try { - LOG.info("==> import(user={}, from={}, fileName={})", userName, requestingIP, fileName); - - File file = new File(fileName); - ZipSource source = new ZipSource(new ByteArrayInputStream(FileUtils.readFileToByteArray(file))); - - result = run(source, request, userName, hostName, requestingIP); - } catch (AtlasBaseException excp) { - LOG.error("import(user={}, from={}, fileName={}): failed", userName, requestingIP, excp); - - throw excp; - } catch (FileNotFoundException excp) { - LOG.error("import(user={}, from={}, fileName={}): file not found", userName, requestingIP, excp); - - throw new AtlasBaseException(AtlasErrorCode.INVALID_PARAMETERS, fileName + ": file not found"); - } catch (Exception excp) { - LOG.error("import(user={}, from={}, fileName={}): failed", userName, requestingIP, excp); - - throw new AtlasBaseException(excp); - } finally { - LOG.info("<== import(user={}, from={}, fileName={}): status={}", userName, requestingIP, fileName, - (result == null ? AtlasImportResult.OperationStatus.FAIL : result.getOperationStatus())); - } - - return result; - } - - private void processTypes(AtlasTypesDef typeDefinitionMap, AtlasImportResult result) throws AtlasBaseException { - setGuidToEmpty(typeDefinitionMap); - - AtlasTypesDef typesToCreate = AtlasTypeDefStoreInitializer.getTypesToCreate(typeDefinitionMap, this.typeRegistry); - - if (!typesToCreate.isEmpty()) { - typeDefStore.createTypesDef(typesToCreate); - - updateMetricsForTypesDef(typesToCreate, result); - } - } - - private void updateMetricsForTypesDef(AtlasTypesDef typeDefinitionMap, AtlasImportResult result) { - result.incrementMeticsCounter("typedef:classification", typeDefinitionMap.getClassificationDefs().size()); - result.incrementMeticsCounter("typedef:enum", typeDefinitionMap.getEnumDefs().size()); - result.incrementMeticsCounter("typedef:entitydef", typeDefinitionMap.getEntityDefs().size()); - result.incrementMeticsCounter("typedef:struct", typeDefinitionMap.getStructDefs().size()); - } - - private void setGuidToEmpty(AtlasTypesDef typesDef) { - for (AtlasEntityDef def: typesDef.getEntityDefs()) { - def.setGuid(null); - } - - for (AtlasClassificationDef def: typesDef.getClassificationDefs()) { - def.setGuid(null); - } - - for (AtlasEnumDef def: typesDef.getEnumDefs()) { - def.setGuid(null); - } - - for (AtlasStructDef def: typesDef.getStructDefs()) { - def.setGuid(null); - } - } - - private void processEntities(ZipSource importSource, AtlasImportResult result) throws AtlasBaseException { - this.entityStore.bulkImport(importSource, result); - - endTimestamp = System.currentTimeMillis(); - result.incrementMeticsCounter("duration", (int) (this.endTimestamp - this.startTimestamp)); - } -} http://git-wip-us.apache.org/repos/asf/incubator-atlas/blob/8a32ccaa/webapp/src/main/java/org/apache/atlas/web/resources/ZipExportFileNames.java ---------------------------------------------------------------------- diff --git a/webapp/src/main/java/org/apache/atlas/web/resources/ZipExportFileNames.java b/webapp/src/main/java/org/apache/atlas/web/resources/ZipExportFileNames.java deleted file mode 100644 index c41ff56..0000000 --- a/webapp/src/main/java/org/apache/atlas/web/resources/ZipExportFileNames.java +++ /dev/null @@ -1,34 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.atlas.web.resources; - -public enum ZipExportFileNames { - ATLAS_EXPORT_INFO_NAME("atlas-export-info"), - ATLAS_EXPORT_ORDER_NAME("atlas-export-order"), - ATLAS_TYPESDEF_NAME("atlas-typesdef"); - - public final String name; - ZipExportFileNames(String name) { - this.name = name; - } - - @Override - public String toString() { - return this.name; - } -} http://git-wip-us.apache.org/repos/asf/incubator-atlas/blob/8a32ccaa/webapp/src/main/java/org/apache/atlas/web/resources/ZipSink.java ---------------------------------------------------------------------- diff --git a/webapp/src/main/java/org/apache/atlas/web/resources/ZipSink.java b/webapp/src/main/java/org/apache/atlas/web/resources/ZipSink.java deleted file mode 100644 index c197d41..0000000 --- a/webapp/src/main/java/org/apache/atlas/web/resources/ZipSink.java +++ /dev/null @@ -1,103 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.atlas.web.resources; - -import org.apache.atlas.exception.AtlasBaseException; -import org.apache.atlas.model.impexp.AtlasExportResult; -import org.apache.atlas.model.instance.AtlasEntity; -import org.apache.atlas.model.typedef.AtlasTypesDef; -import org.apache.atlas.type.AtlasType; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; - -import java.io.IOException; -import java.io.OutputStream; -import java.util.List; -import java.util.zip.ZipEntry; -import java.util.zip.ZipOutputStream; - -public class ZipSink { - private static final Logger LOG = LoggerFactory.getLogger(ZipSink.class); - - private ZipOutputStream zipOutputStream; - - public ZipSink(OutputStream outputStream) { - zipOutputStream = new ZipOutputStream(outputStream); - } - - public void add(AtlasEntity entity) throws AtlasBaseException { - String jsonData = convertToJSON(entity); - saveToZip(entity.getGuid(), jsonData); - } - - public void add(AtlasEntity.AtlasEntityWithExtInfo entityWithExtInfo) throws AtlasBaseException { - String jsonData = convertToJSON(entityWithExtInfo); - saveToZip(entityWithExtInfo.getEntity().getGuid(), jsonData); - } - - public void setResult(AtlasExportResult result) throws AtlasBaseException { - String jsonData = convertToJSON(result); - saveToZip(ZipExportFileNames.ATLAS_EXPORT_INFO_NAME, jsonData); - } - - public void setTypesDef(AtlasTypesDef typesDef) throws AtlasBaseException { - String jsonData = convertToJSON(typesDef); - saveToZip(ZipExportFileNames.ATLAS_TYPESDEF_NAME, jsonData); - } - - public void setExportOrder(List<String> result) throws AtlasBaseException { - String jsonData = convertToJSON(result); - saveToZip(ZipExportFileNames.ATLAS_EXPORT_ORDER_NAME, jsonData); - } - - public void close() { - try { - if(zipOutputStream != null) { - zipOutputStream.close(); - zipOutputStream = null; - } - } catch (IOException e) { - LOG.error("Error closing Zip file", e); - } - } - - private String convertToJSON(Object entity) { - return AtlasType.toJson(entity); - } - - private void saveToZip(ZipExportFileNames fileName, String jsonData) throws AtlasBaseException { - saveToZip(fileName.toString(), jsonData); - } - - private void saveToZip(String fileName, String jsonData) throws AtlasBaseException { - try { - addToZipStream(fileName.toString() + ".json", jsonData); - } catch (IOException e) { - throw new AtlasBaseException(String.format("Error writing file %s.", fileName), e); - } - } - - private void addToZipStream(String entryName, String payload) throws IOException { - - ZipEntry e = new ZipEntry(entryName); - zipOutputStream.putNextEntry(e); - - zipOutputStream.write(payload.getBytes()); - zipOutputStream.closeEntry(); - } -} http://git-wip-us.apache.org/repos/asf/incubator-atlas/blob/8a32ccaa/webapp/src/main/java/org/apache/atlas/web/resources/ZipSource.java ---------------------------------------------------------------------- diff --git a/webapp/src/main/java/org/apache/atlas/web/resources/ZipSource.java b/webapp/src/main/java/org/apache/atlas/web/resources/ZipSource.java deleted file mode 100644 index 8b2b9df..0000000 --- a/webapp/src/main/java/org/apache/atlas/web/resources/ZipSource.java +++ /dev/null @@ -1,210 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * <p> - * http://www.apache.org/licenses/LICENSE-2.0 - * <p> - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.atlas.web.resources; - -import org.apache.atlas.exception.AtlasBaseException; -import org.apache.atlas.model.instance.AtlasEntity; -import org.apache.atlas.model.instance.AtlasEntity.AtlasEntityWithExtInfo; -import org.apache.atlas.model.typedef.AtlasTypesDef; -import org.apache.atlas.repository.store.graph.v1.EntityImportStream; -import org.codehaus.jackson.map.ObjectMapper; -import org.codehaus.jackson.type.TypeReference; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; - -import java.io.ByteArrayInputStream; -import java.io.ByteArrayOutputStream; -import java.io.IOException; -import java.util.HashMap; -import java.util.Iterator; -import java.util.List; -import java.util.Map; -import java.util.zip.ZipEntry; -import java.util.zip.ZipInputStream; - -import static org.apache.atlas.AtlasErrorCode.JSON_ERROR_OBJECT_MAPPER_NULL_RETURNED; - - -public class ZipSource implements EntityImportStream { - private static final Logger LOG = LoggerFactory.getLogger(ZipSource.class); - - private final ByteArrayInputStream inputStream; - private List<String> creationOrder; - private Iterator<String> iterator; - private Map<String, String> guidEntityJsonMap; - - public ZipSource(ByteArrayInputStream inputStream) throws IOException { - this.inputStream = inputStream; - guidEntityJsonMap = new HashMap<>(); - - updateGuidZipEntryMap(); - this.setCreationOrder(); - } - - public AtlasTypesDef getTypesDef() throws AtlasBaseException { - final String fileName = ZipExportFileNames.ATLAS_TYPESDEF_NAME.toString(); - - String s = (String) getFromCache(fileName); - return convertFromJson(AtlasTypesDef.class, s); - } - - private void setCreationOrder() { - String fileName = ZipExportFileNames.ATLAS_EXPORT_ORDER_NAME.toString(); - - try { - String s = getFromCache(fileName); - this.creationOrder = convertFromJson(new TypeReference<List<String>>(){}, s); - this.iterator = this.creationOrder.iterator(); - } catch (AtlasBaseException e) { - LOG.error(String.format("Error retrieving '%s' from zip.", fileName), e); - } - } - - private void updateGuidZipEntryMap() throws IOException { - - inputStream.reset(); - - ZipInputStream zipInputStream = new ZipInputStream(inputStream); - ZipEntry zipEntry = zipInputStream.getNextEntry(); - while (zipEntry != null) { - String entryName = zipEntry.getName().replace(".json", ""); - - if (guidEntityJsonMap.containsKey(entryName)) continue; - - byte[] buf = new byte[1024]; - - int n = 0; - ByteArrayOutputStream bos = new ByteArrayOutputStream(); - while ((n = zipInputStream.read(buf, 0, 1024)) > -1) { - bos.write(buf, 0, n); - } - - guidEntityJsonMap.put(entryName, bos.toString()); - zipEntry = zipInputStream.getNextEntry(); - - } - - zipInputStream.close(); - } - - public List<String> getCreationOrder() throws AtlasBaseException { - return this.creationOrder; - } - - public AtlasEntity.AtlasEntityWithExtInfo getEntityWithExtInfo(String guid) throws AtlasBaseException { - String s = (String) getFromCache(guid); - AtlasEntity.AtlasEntityWithExtInfo entityWithExtInfo = convertFromJson(AtlasEntity.AtlasEntityWithExtInfo.class, s); - return entityWithExtInfo; - } - - private <T> T convertFromJson(TypeReference clazz, String jsonData) throws AtlasBaseException { - try { - ObjectMapper mapper = new ObjectMapper(); - - T ret = mapper.readValue(jsonData, clazz); - if(ret == null) { - throw new AtlasBaseException(JSON_ERROR_OBJECT_MAPPER_NULL_RETURNED, clazz.toString()); - } - - return ret; - } catch (Exception e) { - throw new AtlasBaseException("Error converting file to JSON.", e); - } - } - - private <T> T convertFromJson(Class<T> clazz, String jsonData) throws AtlasBaseException { - try { - ObjectMapper mapper = new ObjectMapper(); - - return mapper.readValue(jsonData, clazz); - - } catch (Exception e) { - throw new AtlasBaseException("Error converting file to JSON.", e); - } - } - - private String getFromCache(String entryName) { - return guidEntityJsonMap.get(entryName); - } - - public void close() { - try { - inputStream.close(); - guidEntityJsonMap.clear(); - } - catch(IOException ex) { - LOG.warn("{}: Error closing streams."); - } - } - - @Override - public boolean hasNext() { - return this.iterator.hasNext(); - } - - @Override - public AtlasEntity next() { - AtlasEntityWithExtInfo entityWithExtInfo = getNextEntityWithExtInfo(); - - return entityWithExtInfo != null ? entityWithExtInfo.getEntity() : null; - } - - @Override - public AtlasEntityWithExtInfo getNextEntityWithExtInfo() { - try { - return getEntityWithExtInfo(this.iterator.next()); - } catch (AtlasBaseException e) { - e.printStackTrace(); - return null; - } - } - - @Override - public void reset() { - try { - getCreationOrder(); - this.iterator = this.creationOrder.iterator(); - } catch (AtlasBaseException e) { - LOG.error("reset", e); - } - } - - @Override - public AtlasEntity getByGuid(String guid) { - try { - return getEntity(guid); - } catch (AtlasBaseException e) { - e.printStackTrace(); - return null; - } - } - - private AtlasEntity getEntity(String guid) throws AtlasBaseException { - if(guidEntityJsonMap.containsKey(guid)) { - AtlasEntityWithExtInfo extInfo = getEntityWithExtInfo(guid); - return (extInfo != null) ? extInfo.getEntity() : null; - } - - return null; - } - - @Override - public void onImportComplete(String guid) { - guidEntityJsonMap.remove(guid); - } -}
