Repository: incubator-atlas Updated Branches: refs/heads/master 852a71183 -> 1d85e95fa
ATLAS-1503: optimization of import API implementation Signed-off-by: Madhan Neethiraj <[email protected]> Project: http://git-wip-us.apache.org/repos/asf/incubator-atlas/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-atlas/commit/1d85e95f Tree: http://git-wip-us.apache.org/repos/asf/incubator-atlas/tree/1d85e95f Diff: http://git-wip-us.apache.org/repos/asf/incubator-atlas/diff/1d85e95f Branch: refs/heads/master Commit: 1d85e95fa06a98d70417c9671cd4d0a0b33a9ed1 Parents: 852a711 Author: ashutoshm <[email protected]> Authored: Fri Feb 17 10:47:22 2017 -0800 Committer: Madhan Neethiraj <[email protected]> Committed: Fri Feb 17 14:38:53 2017 -0800 ---------------------------------------------------------------------- .../java/org/apache/atlas/AtlasErrorCode.java | 1 + .../atlas/model/impexp/AtlasImportResult.java | 25 +++-- .../atlas/model/instance/EntityMutations.java | 2 +- .../test/java/org/apache/atlas/TestUtilsV2.java | 1 + .../store/graph/AtlasEntityStore.java | 9 ++ .../store/graph/v1/AtlasEntityStoreV1.java | 77 +++++++++++-- .../store/graph/v1/AtlasEntityStream.java | 16 ++- .../graph/v1/AtlasEntityStreamForImport.java | 30 ++++++ .../store/graph/v1/EntityGraphMapper.java | 26 ++--- .../store/graph/v1/AtlasEntityStoreV1Test.java | 65 ++++++----- .../atlas/web/resources/AdminResource.java | 43 ++++++-- .../atlas/web/resources/ExportService.java | 11 +- .../atlas/web/resources/ImportService.java | 31 +++--- .../apache/atlas/web/resources/ZipSource.java | 107 ++++++++----------- 14 files changed, 290 insertions(+), 154 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/incubator-atlas/blob/1d85e95f/intg/src/main/java/org/apache/atlas/AtlasErrorCode.java ---------------------------------------------------------------------- diff --git a/intg/src/main/java/org/apache/atlas/AtlasErrorCode.java b/intg/src/main/java/org/apache/atlas/AtlasErrorCode.java index 584bf25..ce5fea3 100644 --- a/intg/src/main/java/org/apache/atlas/AtlasErrorCode.java +++ b/intg/src/main/java/org/apache/atlas/AtlasErrorCode.java @@ -88,6 +88,7 @@ public enum AtlasErrorCode { INDEX_CREATION_FAILED(500, "ATLAS5002E", "Index creation failed for {0}"), INDEX_ROLLBACK_FAILED(500, "ATLAS5003E", "Index rollback failed for {0}"), FAILED_TO_OBTAIN_TYPE_UPDATE_LOCK(500, "ATLAS5004E", "Failed to get the lock; another type update might be in progress. Please try again"), + FAILED_TO_OBTAIN_IMPORT_EXPORT_LOCK(500, "ATLAS5005E", "Another import or export is in progress. Please try again"), NOTIFICATION_FAILED(500, "ATLAS5005E", "Failed to notify for change {0}"); private String errorCode; http://git-wip-us.apache.org/repos/asf/incubator-atlas/blob/1d85e95f/intg/src/main/java/org/apache/atlas/model/impexp/AtlasImportResult.java ---------------------------------------------------------------------- diff --git a/intg/src/main/java/org/apache/atlas/model/impexp/AtlasImportResult.java b/intg/src/main/java/org/apache/atlas/model/impexp/AtlasImportResult.java index a5eeef1..bfb7637 100644 --- a/intg/src/main/java/org/apache/atlas/model/impexp/AtlasImportResult.java +++ b/intg/src/main/java/org/apache/atlas/model/impexp/AtlasImportResult.java @@ -26,7 +26,9 @@ import javax.xml.bind.annotation.XmlAccessType; import javax.xml.bind.annotation.XmlAccessorType; import javax.xml.bind.annotation.XmlRootElement; +import java.util.ArrayList; import java.util.HashMap; +import java.util.List; import java.util.Map; import static org.codehaus.jackson.annotate.JsonAutoDetect.Visibility.NONE; @@ -50,6 +52,7 @@ public class AtlasImportResult { private String hostName; private long timeStamp; private Map<String, Integer> metrics; + private List<String> processedEntities; private OperationStatus operationStatus; public AtlasImportResult() { @@ -58,13 +61,14 @@ public class AtlasImportResult { public AtlasImportResult(AtlasImportRequest request, String userName, String clientIpAddress, String hostName, long timeStamp) { - this.request = request; - this.userName = userName; - this.clientIpAddress = clientIpAddress; - this.hostName = hostName; - this.timeStamp = timeStamp; - this.metrics = new HashMap<>(); - this.operationStatus = OperationStatus.FAIL; + this.request = request; + this.userName = userName; + this.clientIpAddress = clientIpAddress; + this.hostName = hostName; + this.timeStamp = timeStamp; + this.metrics = new HashMap<>(); + this.operationStatus = OperationStatus.FAIL; + this.processedEntities = new ArrayList<>(); } public AtlasImportRequest getRequest() { @@ -133,6 +137,10 @@ public class AtlasImportResult { metrics.put(key, currentValue + incrementBy); } + public void setProcessedEntities(List<String> processedEntities) { this.processedEntities = processedEntities; } + + public List<String> getProcessedEntities() { return this.processedEntities; } + public StringBuilder toString(StringBuilder sb) { if (sb == null) { sb = new StringBuilder(); @@ -149,6 +157,9 @@ public class AtlasImportResult { sb.append("}"); sb.append(", operationStatus='").append(operationStatus).append("'"); + sb.append(", processedEntities=["); + AtlasBaseTypeDef.dumpObjects(processedEntities, sb); + sb.append("]"); sb.append("}"); return sb; http://git-wip-us.apache.org/repos/asf/incubator-atlas/blob/1d85e95f/intg/src/main/java/org/apache/atlas/model/instance/EntityMutations.java ---------------------------------------------------------------------- diff --git a/intg/src/main/java/org/apache/atlas/model/instance/EntityMutations.java b/intg/src/main/java/org/apache/atlas/model/instance/EntityMutations.java index 74e3c57..b509420 100644 --- a/intg/src/main/java/org/apache/atlas/model/instance/EntityMutations.java +++ b/intg/src/main/java/org/apache/atlas/model/instance/EntityMutations.java @@ -46,7 +46,7 @@ public class EntityMutations implements Serializable { CREATE, UPDATE, PARTIAL_UPDATE, - DELETE, + DELETE } public static final class EntityMutation implements Serializable { http://git-wip-us.apache.org/repos/asf/incubator-atlas/blob/1d85e95f/intg/src/test/java/org/apache/atlas/TestUtilsV2.java ---------------------------------------------------------------------- diff --git a/intg/src/test/java/org/apache/atlas/TestUtilsV2.java b/intg/src/test/java/org/apache/atlas/TestUtilsV2.java index ea56dd6..6d85672 100755 --- a/intg/src/test/java/org/apache/atlas/TestUtilsV2.java +++ b/intg/src/test/java/org/apache/atlas/TestUtilsV2.java @@ -562,6 +562,7 @@ public final class TestUtilsV2 { AtlasTypeUtil.createUniqueRequiredAttrDef(NAME, "string"), AtlasTypeUtil.createOptionalAttrDef("isReplicated", "boolean"), AtlasTypeUtil.createOptionalAttrDef("created", "string"), + AtlasTypeUtil.createOptionalAttrDef("parameters", "map<string,string>"), AtlasTypeUtil.createRequiredAttrDef("description", "string")); http://git-wip-us.apache.org/repos/asf/incubator-atlas/blob/1d85e95f/repository/src/main/java/org/apache/atlas/repository/store/graph/AtlasEntityStore.java ---------------------------------------------------------------------- diff --git a/repository/src/main/java/org/apache/atlas/repository/store/graph/AtlasEntityStore.java b/repository/src/main/java/org/apache/atlas/repository/store/graph/AtlasEntityStore.java index 1cd4375..3a037cc 100644 --- a/repository/src/main/java/org/apache/atlas/repository/store/graph/AtlasEntityStore.java +++ b/repository/src/main/java/org/apache/atlas/repository/store/graph/AtlasEntityStore.java @@ -19,6 +19,7 @@ package org.apache.atlas.repository.store.graph; import org.apache.atlas.exception.AtlasBaseException; +import org.apache.atlas.model.impexp.AtlasImportResult; import org.apache.atlas.model.instance.AtlasClassification; import org.apache.atlas.model.instance.AtlasEntity; import org.apache.atlas.model.instance.AtlasEntity.AtlasEntityWithExtInfo; @@ -69,6 +70,14 @@ public interface AtlasEntityStore { EntityMutationResponse createOrUpdate(EntityStream entityStream, boolean isPartialUpdate) throws AtlasBaseException; /** + * Create or update entities in the stream using repeated commits of connected entities + * @param entityStream AtlasEntityStream + * @return EntityMutationResponse Entity mutations operations with the corresponding set of entities on which these operations were performed + * @throws AtlasBaseException + */ + EntityMutationResponse bulkImport(EntityStream entityStream, AtlasImportResult importResult) throws AtlasBaseException; + + /** * Update a single entity * @param entityType type of the entity * @param uniqAttributes Attributes that uniquely identify the entity http://git-wip-us.apache.org/repos/asf/incubator-atlas/blob/1d85e95f/repository/src/main/java/org/apache/atlas/repository/store/graph/v1/AtlasEntityStoreV1.java ---------------------------------------------------------------------- diff --git a/repository/src/main/java/org/apache/atlas/repository/store/graph/v1/AtlasEntityStoreV1.java b/repository/src/main/java/org/apache/atlas/repository/store/graph/v1/AtlasEntityStoreV1.java index 4312287..4684bfe 100644 --- a/repository/src/main/java/org/apache/atlas/repository/store/graph/v1/AtlasEntityStoreV1.java +++ b/repository/src/main/java/org/apache/atlas/repository/store/graph/v1/AtlasEntityStoreV1.java @@ -24,13 +24,10 @@ import org.apache.atlas.AtlasErrorCode; import org.apache.atlas.GraphTransaction; import org.apache.atlas.RequestContextV1; import org.apache.atlas.exception.AtlasBaseException; -import org.apache.atlas.model.instance.AtlasClassification; -import org.apache.atlas.model.instance.AtlasEntity; +import org.apache.atlas.model.impexp.AtlasImportResult; +import org.apache.atlas.model.instance.*; import org.apache.atlas.model.instance.AtlasEntity.AtlasEntityWithExtInfo; import org.apache.atlas.model.instance.AtlasEntity.AtlasEntitiesWithExtInfo; -import org.apache.atlas.model.instance.AtlasObjectId; -import org.apache.atlas.model.instance.EntityMutationResponse; -import org.apache.atlas.model.instance.EntityMutations; import org.apache.atlas.repository.graphdb.AtlasVertex; import org.apache.atlas.repository.store.graph.AtlasEntityStore; import org.apache.atlas.repository.store.graph.EntityGraphDiscovery; @@ -43,10 +40,9 @@ import org.apache.commons.lang3.StringUtils; import org.slf4j.Logger; import org.slf4j.LoggerFactory; -import java.util.ArrayList; -import java.util.Collection; -import java.util.List; -import java.util.Map; +import java.util.*; + +import static org.apache.atlas.model.instance.EntityMutations.EntityOperation.*; @Singleton @@ -130,6 +126,65 @@ public class AtlasEntityStoreV1 implements AtlasEntityStore { } @Override + public EntityMutationResponse bulkImport(EntityStream entityStream, AtlasImportResult importResult) throws AtlasBaseException { + if (LOG.isDebugEnabled()) { + LOG.debug("==> bulkImport()"); + } + + if (entityStream == null || !entityStream.hasNext()) { + throw new AtlasBaseException(AtlasErrorCode.INVALID_PARAMETERS, "no entities to create/update."); + } + + EntityMutationResponse ret = new EntityMutationResponse(); + ret.setGuidAssignments(new HashMap<String, String>()); + + Set<String> processedGuids = new HashSet<>(); + int progressReportedAtCount = 0; + + while (entityStream.hasNext()) { + AtlasEntity entity = entityStream.next(); + + if(processedGuids.contains(entity.getGuid())) { + continue; + } + + AtlasEntityStreamForImport oneEntityStream = new AtlasEntityStreamForImport(entity, entityStream); + + EntityMutationResponse resp = createOrUpdate(oneEntityStream, false); + + updateImportMetrics("entity:%s:created", resp.getCreatedEntities(), processedGuids, importResult); + updateImportMetrics("entity:%s:updated", resp.getUpdatedEntities(), processedGuids, importResult); + updateImportMetrics("entity:%s:deleted", resp.getDeletedEntities(), processedGuids, importResult); + + if ((processedGuids.size() - progressReportedAtCount) > 10) { + progressReportedAtCount = processedGuids.size(); + + LOG.info("bulkImport(): in progress.. number of entities imported: {}", progressReportedAtCount); + } + + if (resp.getGuidAssignments() != null) { + ret.getGuidAssignments().putAll(resp.getGuidAssignments()); + } + } + + importResult.getProcessedEntities().addAll(processedGuids); + LOG.info("bulkImport(): done. Number of entities imported: {}", processedGuids.size()); + + return ret; + } + + private void updateImportMetrics(String prefix, List<AtlasEntityHeader> list, Set<String> processedGuids, AtlasImportResult importResult) { + if (list == null) { + return; + } + + for (AtlasEntityHeader h : list) { + processedGuids.add(h.getGuid()); + importResult.incrementMeticsCounter(String.format(prefix, h.getTypeName())); + } + } + + @Override @GraphTransaction public EntityMutationResponse createOrUpdate(EntityStream entityStream, boolean isPartialUpdate) throws AtlasBaseException { if (LOG.isDebugEnabled()) { @@ -323,11 +378,11 @@ public class AtlasEntityStoreV1 implements AtlasEntityStore { deleteHandler.deleteEntities(deletionCandidates); RequestContextV1 req = RequestContextV1.get(); for (AtlasObjectId id : req.getDeletedEntityIds()) { - response.addEntity(EntityMutations.EntityOperation.DELETE, EntityGraphMapper.constructHeader(id)); + response.addEntity(DELETE, EntityGraphMapper.constructHeader(id)); } for (AtlasObjectId id : req.getUpdatedEntityIds()) { - response.addEntity(EntityMutations.EntityOperation.UPDATE, EntityGraphMapper.constructHeader(id)); + response.addEntity(UPDATE, EntityGraphMapper.constructHeader(id)); } return response; http://git-wip-us.apache.org/repos/asf/incubator-atlas/blob/1d85e95f/repository/src/main/java/org/apache/atlas/repository/store/graph/v1/AtlasEntityStream.java ---------------------------------------------------------------------- diff --git a/repository/src/main/java/org/apache/atlas/repository/store/graph/v1/AtlasEntityStream.java b/repository/src/main/java/org/apache/atlas/repository/store/graph/v1/AtlasEntityStream.java index 010b626..5d9a7d4 100644 --- a/repository/src/main/java/org/apache/atlas/repository/store/graph/v1/AtlasEntityStream.java +++ b/repository/src/main/java/org/apache/atlas/repository/store/graph/v1/AtlasEntityStream.java @@ -24,11 +24,10 @@ import org.apache.atlas.model.instance.AtlasEntity.AtlasEntitiesWithExtInfo; import java.util.Iterator; public class AtlasEntityStream implements EntityStream { - private AtlasEntitiesWithExtInfo entitiesWithExtInfo = new AtlasEntitiesWithExtInfo(); - private Iterator<AtlasEntity> iterator; + private final AtlasEntitiesWithExtInfo entitiesWithExtInfo; + private final EntityStream entityStream; + private Iterator<AtlasEntity> iterator; - public AtlasEntityStream() { - } public AtlasEntityStream(AtlasEntity entity) { this(new AtlasEntitiesWithExtInfo(entity)); @@ -41,6 +40,13 @@ public class AtlasEntityStream implements EntityStream { public AtlasEntityStream(AtlasEntitiesWithExtInfo entitiesWithExtInfo) { this.entitiesWithExtInfo = entitiesWithExtInfo; this.iterator = this.entitiesWithExtInfo.getEntities().iterator(); + this.entityStream = null; + } + + public AtlasEntityStream(AtlasEntity entity, EntityStream entityStream) { + this.entitiesWithExtInfo = new AtlasEntitiesWithExtInfo(entity); + this.iterator = this.entitiesWithExtInfo.getEntities().iterator(); + this.entityStream = entityStream; } @Override @@ -60,7 +66,7 @@ public class AtlasEntityStream implements EntityStream { @Override public AtlasEntity getByGuid(String guid) { - return entitiesWithExtInfo.getEntity(guid); + return entityStream != null ? entityStream.getByGuid(guid) : entitiesWithExtInfo.getEntity(guid); } @Override http://git-wip-us.apache.org/repos/asf/incubator-atlas/blob/1d85e95f/repository/src/main/java/org/apache/atlas/repository/store/graph/v1/AtlasEntityStreamForImport.java ---------------------------------------------------------------------- diff --git a/repository/src/main/java/org/apache/atlas/repository/store/graph/v1/AtlasEntityStreamForImport.java b/repository/src/main/java/org/apache/atlas/repository/store/graph/v1/AtlasEntityStreamForImport.java new file mode 100644 index 0000000..c0b4d8d --- /dev/null +++ b/repository/src/main/java/org/apache/atlas/repository/store/graph/v1/AtlasEntityStreamForImport.java @@ -0,0 +1,30 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * <p> + * http://www.apache.org/licenses/LICENSE-2.0 + * <p> + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.atlas.repository.store.graph.v1; + +import org.apache.atlas.model.instance.AtlasEntity; + +public class AtlasEntityStreamForImport extends AtlasEntityStream implements EntityImportStream { + public AtlasEntityStreamForImport(AtlasEntity entity) { + super(entity); + } + + public AtlasEntityStreamForImport(AtlasEntity entity, EntityStream entityStream) { + super(entity, entityStream); + } +} http://git-wip-us.apache.org/repos/asf/incubator-atlas/blob/1d85e95f/repository/src/main/java/org/apache/atlas/repository/store/graph/v1/EntityGraphMapper.java ---------------------------------------------------------------------- diff --git a/repository/src/main/java/org/apache/atlas/repository/store/graph/v1/EntityGraphMapper.java b/repository/src/main/java/org/apache/atlas/repository/store/graph/v1/EntityGraphMapper.java index 2e71ab8..8c96c7b 100644 --- a/repository/src/main/java/org/apache/atlas/repository/store/graph/v1/EntityGraphMapper.java +++ b/repository/src/main/java/org/apache/atlas/repository/store/graph/v1/EntityGraphMapper.java @@ -739,28 +739,18 @@ public class EntityGraphMapper { private AtlasEntityHeader constructHeader(AtlasEntity entity, final AtlasEntityType type, AtlasVertex vertex) { - //TODO - enhance to return only selective attributes - AtlasEntityHeader header = new AtlasEntityHeader(entity.getTypeName(), AtlasGraphUtilsV1.getIdFromVertex(vertex), entity.getAttributes()); - final Map<String, AtlasStructType.AtlasAttribute> allAttributes = type.getAllAttributes(); - for (String attribute : allAttributes.keySet()) { - AtlasType attributeType = allAttributes.get(attribute).getAttributeType(); - AtlasAttributeDef attributeDef = allAttributes.get(attribute).getAttributeDef(); - if ( header.getAttribute(attribute) == null && (TypeCategory.PRIMITIVE == attributeType.getTypeCategory())) { - - if ( attributeDef.getIsOptional()) { - header.setAttribute(attribute, attributeType.createOptionalDefaultValue()); - } else { - header.setAttribute(attribute, attributeType.createDefaultValue()); - } - } + AtlasEntityHeader header = new AtlasEntityHeader(entity.getTypeName()); + + header.setGuid(AtlasGraphUtilsV1.getIdFromVertex(vertex)); + + for (AtlasAttribute attribute : type.getUniqAttributes().values()) { + header.setAttribute(attribute.getName(), entity.getAttribute(attribute.getName())); } + return header; } public static AtlasEntityHeader constructHeader(AtlasObjectId id) { - AtlasEntityHeader entity = new AtlasEntityHeader(id.getTypeName()); - entity.setGuid(id.getGuid()); - - return entity; + return new AtlasEntityHeader(id.getTypeName(), id.getGuid(), id.getUniqueAttributes()); } } http://git-wip-us.apache.org/repos/asf/incubator-atlas/blob/1d85e95f/repository/src/test/java/org/apache/atlas/repository/store/graph/v1/AtlasEntityStoreV1Test.java ---------------------------------------------------------------------- diff --git a/repository/src/test/java/org/apache/atlas/repository/store/graph/v1/AtlasEntityStoreV1Test.java b/repository/src/test/java/org/apache/atlas/repository/store/graph/v1/AtlasEntityStoreV1Test.java index 1d10461..dd82cb2 100644 --- a/repository/src/test/java/org/apache/atlas/repository/store/graph/v1/AtlasEntityStoreV1Test.java +++ b/repository/src/test/java/org/apache/atlas/repository/store/graph/v1/AtlasEntityStoreV1Test.java @@ -74,6 +74,7 @@ import static org.apache.atlas.TestUtils.NAME; import static org.apache.atlas.TestUtils.randomString; import static org.apache.atlas.TestUtilsV2.TABLE_TYPE; import static org.testng.Assert.assertEquals; +import static org.testng.Assert.assertTrue; @Guice(modules = RepositoryMetadataModule.class) public class AtlasEntityStoreV1Test { @@ -199,8 +200,8 @@ public class AtlasEntityStoreV1Test { init(); EntityMutationResponse response = entityStore.createOrUpdate(new AtlasEntityStream(entitiesInfo), false); - AtlasEntityHeader updatedTable = response.getFirstUpdatedEntityByTypeName(tableEntity.getTypeName()); - validateEntity(entitiesInfo, getEntityFromStore(updatedTable)); + AtlasEntityHeader updatedTableHeader = response.getFirstUpdatedEntityByTypeName(tableEntity.getTypeName()); + validateEntity(entitiesInfo, getEntityFromStore(updatedTableHeader)); //Complete update. Add array elements - col3,col4 AtlasEntity col3 = TestUtilsV2.createColumnEntity(tableEntity); @@ -219,8 +220,8 @@ public class AtlasEntityStoreV1Test { init(); response = entityStore.createOrUpdate(new AtlasEntityStream(entitiesInfo), false); - updatedTable = response.getFirstUpdatedEntityByTypeName(tableEntity.getTypeName()); - validateEntity(entitiesInfo, getEntityFromStore(updatedTable)); + updatedTableHeader = response.getFirstUpdatedEntityByTypeName(tableEntity.getTypeName()); + validateEntity(entitiesInfo, getEntityFromStore(updatedTableHeader)); //Swap elements columns.clear(); @@ -231,8 +232,10 @@ public class AtlasEntityStoreV1Test { init(); response = entityStore.createOrUpdate(new AtlasEntityStream(entitiesInfo), false); - updatedTable = response.getFirstUpdatedEntityByTypeName(tableEntity.getTypeName()); - Assert.assertEquals(((List<AtlasObjectId>) updatedTable.getAttribute(COLUMNS_ATTR_NAME)).size(), 2); + updatedTableHeader = response.getFirstUpdatedEntityByTypeName(tableEntity.getTypeName()); + AtlasEntity updatedEntity = getEntityFromStore(updatedTableHeader); + // deleted columns are also included in "columns" attribute + Assert.assertTrue(((List<AtlasObjectId>) updatedEntity.getAttribute(COLUMNS_ATTR_NAME)).size() >= 2); assertEquals(response.getEntitiesByOperation(EntityMutations.EntityOperation.DELETE).size(), 2); // col1, col2 are deleted @@ -242,8 +245,8 @@ public class AtlasEntityStoreV1Test { init(); response = entityStore.createOrUpdate(new AtlasEntityStream(entitiesInfo), false); - updatedTable = response.getFirstUpdatedEntityByTypeName(tableEntity.getTypeName()); - validateEntity(entitiesInfo, getEntityFromStore(updatedTable)); + updatedTableHeader = response.getFirstUpdatedEntityByTypeName(tableEntity.getTypeName()); + validateEntity(entitiesInfo, getEntityFromStore(updatedTableHeader)); assertEquals(response.getEntitiesByOperation(EntityMutations.EntityOperation.DELETE).size(), 2); } @@ -261,9 +264,10 @@ public class AtlasEntityStoreV1Test { EntityMutationResponse response = entityStore.createOrUpdate(new AtlasEntityStream(entitiesInfo), false); AtlasEntityHeader tableDefinition1 = response.getFirstUpdatedEntityByTypeName(TABLE_TYPE); - validateEntity(entitiesInfo, getEntityFromStore(tableDefinition1)); + AtlasEntity updatedTableDef1 = getEntityFromStore(tableDefinition1); + validateEntity(entitiesInfo, updatedTableDef1); - Assert.assertTrue(partsMap.get("part0").equals(((Map<String, AtlasStruct>) tableDefinition1.getAttribute("partitionsMap")).get("part0"))); + Assert.assertTrue(partsMap.get("part0").equals(((Map<String, AtlasStruct>) updatedTableDef1.getAttribute("partitionsMap")).get("part0"))); //update map - add a map key partsMap.put("part1", new AtlasStruct(TestUtils.PARTITION_STRUCT_TYPE, TestUtilsV2.NAME, "test1")); @@ -273,10 +277,11 @@ public class AtlasEntityStoreV1Test { response = entityStore.createOrUpdate(new AtlasEntityStream(entitiesInfo), false); AtlasEntityHeader tableDefinition2 = response.getFirstUpdatedEntityByTypeName(TABLE_TYPE); - validateEntity(entitiesInfo, getEntityFromStore(tableDefinition2)); + AtlasEntity updatedTableDef2 = getEntityFromStore(tableDefinition2); + validateEntity(entitiesInfo, updatedTableDef2); - assertEquals(((Map<String, AtlasStruct>) tableDefinition2.getAttribute("partitionsMap")).size(), 2); - Assert.assertTrue(partsMap.get("part1").equals(((Map<String, AtlasStruct>) tableDefinition2.getAttribute("partitionsMap")).get("part1"))); + assertEquals(((Map<String, AtlasStruct>) updatedTableDef2.getAttribute("partitionsMap")).size(), 2); + Assert.assertTrue(partsMap.get("part1").equals(((Map<String, AtlasStruct>) updatedTableDef2.getAttribute("partitionsMap")).get("part1"))); //update map - remove a key and add another key partsMap.remove("part0"); @@ -287,11 +292,12 @@ public class AtlasEntityStoreV1Test { response = entityStore.createOrUpdate(new AtlasEntityStream(entitiesInfo), false); AtlasEntityHeader tableDefinition3 = response.getFirstUpdatedEntityByTypeName(TABLE_TYPE); - validateEntity(entitiesInfo, getEntityFromStore(tableDefinition3)); + AtlasEntity updatedTableDef3 = getEntityFromStore(tableDefinition3); + validateEntity(entitiesInfo, updatedTableDef3); - assertEquals(((Map<String, AtlasStruct>) tableDefinition3.getAttribute("partitionsMap")).size(), 2); - Assert.assertNull(((Map<String, AtlasStruct>) tableDefinition3.getAttribute("partitionsMap")).get("part0")); - Assert.assertTrue(partsMap.get("part2").equals(((Map<String, AtlasStruct>) tableDefinition3.getAttribute("partitionsMap")).get("part2"))); + assertEquals(((Map<String, AtlasStruct>) updatedTableDef3.getAttribute("partitionsMap")).size(), 2); + Assert.assertNull(((Map<String, AtlasStruct>) updatedTableDef3.getAttribute("partitionsMap")).get("part0")); + Assert.assertTrue(partsMap.get("part2").equals(((Map<String, AtlasStruct>) updatedTableDef3.getAttribute("partitionsMap")).get("part2"))); //update struct value for existing map key AtlasStruct partition2 = partsMap.get("part2"); @@ -301,11 +307,12 @@ public class AtlasEntityStoreV1Test { response = entityStore.createOrUpdate(new AtlasEntityStream(entitiesInfo), false); AtlasEntityHeader tableDefinition4 = response.getFirstUpdatedEntityByTypeName(TABLE_TYPE); - validateEntity(entitiesInfo, getEntityFromStore(tableDefinition4)); + AtlasEntity updatedTableDef4 = getEntityFromStore(tableDefinition4); + validateEntity(entitiesInfo, updatedTableDef4); - assertEquals(((Map<String, AtlasStruct>) tableDefinition4.getAttribute("partitionsMap")).size(), 2); - Assert.assertNull(((Map<String, AtlasStruct>) tableDefinition4.getAttribute("partitionsMap")).get("part0")); - Assert.assertTrue(partsMap.get("part2").equals(((Map<String, AtlasStruct>) tableDefinition4.getAttribute("partitionsMap")).get("part2"))); + assertEquals(((Map<String, AtlasStruct>) updatedTableDef4.getAttribute("partitionsMap")).size(), 2); + Assert.assertNull(((Map<String, AtlasStruct>) updatedTableDef4.getAttribute("partitionsMap")).get("part0")); + Assert.assertTrue(partsMap.get("part2").equals(((Map<String, AtlasStruct>) updatedTableDef4.getAttribute("partitionsMap")).get("part2"))); //Test map pointing to a class @@ -523,8 +530,9 @@ public class AtlasEntityStoreV1Test { response = entityStore.createOrUpdate(new InMemoryMapEntityStream(tableCloneMap), false); final AtlasEntityHeader tableDefinition = response.getFirstUpdatedEntityByTypeName(TABLE_TYPE); - Assert.assertNotNull(tableDefinition.getAttribute("database")); - Assert.assertEquals(((AtlasObjectId) tableDefinition.getAttribute("database")).getGuid(), dbCreated.getGuid()); + AtlasEntity updatedTableDefinition = getEntityFromStore(tableDefinition); + Assert.assertNotNull(updatedTableDefinition.getAttribute("database")); + Assert.assertEquals(((AtlasObjectId) updatedTableDefinition.getAttribute("database")).getGuid(), dbCreated.getGuid()); } @Test @@ -534,7 +542,7 @@ public class AtlasEntityStoreV1Test { init(); EntityMutationResponse response = entityStore.createOrUpdate(new AtlasEntityStream(dbEntity), false); - AtlasEntityHeader firstEntityCreated = response.getFirstCreatedEntityByTypeName(TestUtilsV2.DATABASE_TYPE); + AtlasEntity firstEntityCreated = getEntityFromStore(response.getFirstCreatedEntityByTypeName(TestUtilsV2.DATABASE_TYPE)); //The optional boolean attribute should have a non-null value final String isReplicatedAttr = "isReplicated"; @@ -552,7 +560,7 @@ public class AtlasEntityStoreV1Test { init(); response = entityStore.createOrUpdate(new AtlasEntityStream(dbEntity), false); - AtlasEntityHeader firstEntityUpdated = response.getFirstUpdatedEntityByTypeName(TestUtilsV2.DATABASE_TYPE); + AtlasEntity firstEntityUpdated = getEntityFromStore(response.getFirstUpdatedEntityByTypeName(TestUtilsV2.DATABASE_TYPE)); Assert.assertNotNull(firstEntityUpdated); Assert.assertNotNull(firstEntityUpdated.getAttribute(isReplicatedAttr)); @@ -736,8 +744,9 @@ public class AtlasEntityStoreV1Test { tblHeader = response.getFirstEntityPartialUpdated(); AtlasEntity updatedTblEntity = getEntityFromStore(tblHeader); - columns = (List<AtlasObjectId>) tblHeader.getAttribute(TestUtilsV2.COLUMNS_ATTR_NAME); - assertEquals(columns.size(), 2); + columns = (List<AtlasObjectId>) updatedTblEntity.getAttribute(TestUtilsV2.COLUMNS_ATTR_NAME); + // deleted columns are included in the attribute; hence use >= + assertTrue(columns.size() >= 2); } @Test @@ -867,7 +876,7 @@ public class AtlasEntityStoreV1Test { if (MapUtils.isNotEmpty(expectedMap)) { Assert.assertTrue(MapUtils.isNotEmpty(actualMap)); - //actual map could have deleted entities. Hence size may not match. + // deleted entries are included in the attribute; hence use >= Assert.assertTrue(actualMap.size() >= expectedMap.size()); for (Object key : expectedMap.keySet()) { http://git-wip-us.apache.org/repos/asf/incubator-atlas/blob/1d85e95f/webapp/src/main/java/org/apache/atlas/web/resources/AdminResource.java ---------------------------------------------------------------------- diff --git a/webapp/src/main/java/org/apache/atlas/web/resources/AdminResource.java b/webapp/src/main/java/org/apache/atlas/web/resources/AdminResource.java index c8c0099..8ff3396 100755 --- a/webapp/src/main/java/org/apache/atlas/web/resources/AdminResource.java +++ b/webapp/src/main/java/org/apache/atlas/web/resources/AdminResource.java @@ -20,7 +20,7 @@ package org.apache.atlas.web.resources; import com.google.inject.Inject; import org.apache.atlas.AtlasClient; -import org.apache.atlas.AtlasException; +import org.apache.atlas.AtlasErrorCode; import org.apache.atlas.exception.AtlasBaseException; import org.apache.atlas.model.impexp.AtlasExportRequest; import org.apache.atlas.model.impexp.AtlasExportResult; @@ -59,6 +59,7 @@ import javax.ws.rs.core.Response; import java.io.ByteArrayInputStream; import java.io.IOException; import java.util.*; +import java.util.concurrent.locks.ReentrantLock; import static org.apache.atlas.repository.converters.AtlasInstanceConverter.toAtlasBaseException; @@ -76,6 +77,8 @@ public class AdminResource { @Context private HttpServletResponse httpServletResponse; + private final ReentrantLock importExportOperationLock; + private static final String isCSRF_ENABLED = "atlas.rest-csrf.enabled"; private static final String BROWSER_USER_AGENT_PARAM = "atlas.rest-csrf.browser-useragents-regex"; private static final String CUSTOM_METHODS_TO_IGNORE_PARAM = "atlas.rest-csrf.methods-to-ignore"; @@ -97,11 +100,12 @@ public class AdminResource { public AdminResource(ServiceState serviceState, MetricsService metricsService, AtlasTypeRegistry typeRegistry, AtlasTypeDefStore typeDefStore, AtlasEntityStore entityStore) { - this.serviceState = serviceState; - this.metricsService = metricsService; - this.typeRegistry = typeRegistry; - this.typesDefStore = typeDefStore; - this.entityStore = entityStore; + this.serviceState = serviceState; + this.metricsService = metricsService; + this.typeRegistry = typeRegistry; + this.typesDefStore = typeDefStore; + this.entityStore = entityStore; + this.importExportOperationLock = new ReentrantLock(); } /** @@ -275,6 +279,10 @@ public class AdminResource { return metrics; } + private void releaseExportImportLock() { + importExportOperationLock.unlock(); + } + @POST @Path("/export") @Consumes(Servlets.JSON_MEDIA_TYPE) @@ -283,6 +291,8 @@ public class AdminResource { LOG.debug("==> AdminResource.export()"); } + acquireExportImportLock("export"); + ZipSink exportSink = null; try { exportSink = new ZipSink(); @@ -308,6 +318,8 @@ public class AdminResource { throw new AtlasBaseException(excp); } finally { + releaseExportImportLock(); + if (exportSink != null) { exportSink.close(); } @@ -327,6 +339,8 @@ public class AdminResource { LOG.debug("==> AdminResource.importData(bytes.length={})", bytes.length); } + acquireExportImportLock("import"); + AtlasImportResult result; try { @@ -344,6 +358,8 @@ public class AdminResource { throw new AtlasBaseException(excp); } finally { + releaseExportImportLock(); + if (LOG.isDebugEnabled()) { LOG.debug("<== AdminResource.importData(binary)"); } @@ -360,6 +376,8 @@ public class AdminResource { LOG.debug("==> AdminResource.importFile()"); } + acquireExportImportLock("importFile"); + AtlasImportResult result; try { @@ -374,6 +392,8 @@ public class AdminResource { throw new AtlasBaseException(excp); } finally { + releaseExportImportLock(); + if (LOG.isDebugEnabled()) { LOG.debug("<== AdminResource.importFile()"); } @@ -407,4 +427,15 @@ public class AdminResource { return ret; } + + private void acquireExportImportLock(String activity) throws AtlasBaseException { + boolean alreadyLocked = importExportOperationLock.isLocked(); + if (alreadyLocked) { + LOG.warn("Another export or import is currently in progress..aborting this " + activity, Thread.currentThread().getName()); + + throw new AtlasBaseException(AtlasErrorCode.FAILED_TO_OBTAIN_IMPORT_EXPORT_LOCK); + } + + importExportOperationLock.lock(); + } } \ No newline at end of file http://git-wip-us.apache.org/repos/asf/incubator-atlas/blob/1d85e95f/webapp/src/main/java/org/apache/atlas/web/resources/ExportService.java ---------------------------------------------------------------------- diff --git a/webapp/src/main/java/org/apache/atlas/web/resources/ExportService.java b/webapp/src/main/java/org/apache/atlas/web/resources/ExportService.java index 7d3d442..04bb4d3 100644 --- a/webapp/src/main/java/org/apache/atlas/web/resources/ExportService.java +++ b/webapp/src/main/java/org/apache/atlas/web/resources/ExportService.java @@ -75,7 +75,7 @@ public class ExportService { public AtlasExportResult run(ZipSink exportSink, AtlasExportRequest request, String userName, String hostName, String requestingIP) throws AtlasBaseException { - + long startTimestamp = System.currentTimeMillis(); ExportContext context = new ExportContext(new AtlasExportResult(request, userName, hostName, requestingIP, System.currentTimeMillis()), exportSink); @@ -90,6 +90,9 @@ public class ExportService { context.sink.setTypesDef(context.result.getData().getTypesDef()); context.result.setData(null); context.result.setOperationStatus(AtlasExportResult.OperationStatus.SUCCESS); + + long endTimestamp = System.currentTimeMillis(); + context.result.incrementMeticsCounter("duration", (int) (endTimestamp - startTimestamp)); context.sink.setResult(context.result); } catch(Exception ex) { LOG.error("Operation failed: ", ex); @@ -175,7 +178,7 @@ public class ExportService { context.sink.add(entity); context.result.incrementMeticsCounter(String.format("entity:%s", entity.getTypeName())); - context.result.incrementMeticsCounter("Entities"); + context.result.incrementMeticsCounter("entities"); if (context.guidsProcessed.size() % 10 == 0) { LOG.info("export(): in progress.. number of entities exported: {}", context.guidsProcessed.size()); @@ -195,7 +198,7 @@ public class ExportService { AtlasClassificationDef cd = typeRegistry.getClassificationDefByName(c.getTypeName()); typesDef.getClassificationDefs().add(cd); - result.incrementMeticsCounter("Classification"); + result.incrementMeticsCounter("typedef:classification"); } } } @@ -208,7 +211,7 @@ public class ExportService { AtlasEntityDef typeDefinition = typeRegistry.getEntityDefByName(typeName); typesDef.getEntityDefs().add(typeDefinition); - result.incrementMeticsCounter("Type(s)"); + result.incrementMeticsCounter("typedef:" + typeDefinition.getName()); } } http://git-wip-us.apache.org/repos/asf/incubator-atlas/blob/1d85e95f/webapp/src/main/java/org/apache/atlas/web/resources/ImportService.java ---------------------------------------------------------------------- diff --git a/webapp/src/main/java/org/apache/atlas/web/resources/ImportService.java b/webapp/src/main/java/org/apache/atlas/web/resources/ImportService.java index 7554cdb..7b0c887 100644 --- a/webapp/src/main/java/org/apache/atlas/web/resources/ImportService.java +++ b/webapp/src/main/java/org/apache/atlas/web/resources/ImportService.java @@ -20,6 +20,7 @@ package org.apache.atlas.web.resources; import org.apache.atlas.AtlasErrorCode; import org.apache.atlas.model.impexp.AtlasImportRequest; import org.apache.atlas.model.impexp.AtlasImportResult; +import org.apache.atlas.model.instance.EntityMutationResponse; import org.apache.atlas.model.typedef.*; import org.apache.atlas.repository.store.graph.AtlasEntityStore; import org.apache.commons.io.FileUtils; @@ -31,6 +32,7 @@ import org.slf4j.LoggerFactory; import java.io.*; import java.util.List; +import java.util.concurrent.TimeUnit; public class ImportService { @@ -39,6 +41,9 @@ public class ImportService { private final AtlasTypeDefStore typeDefStore; private final AtlasEntityStore entityStore; + private long startTimestamp; + private long endTimestamp; + public ImportService(final AtlasTypeDefStore typeDefStore, final AtlasEntityStore entityStore) { this.typeDefStore = typeDefStore; @@ -52,6 +57,7 @@ public class ImportService { try { LOG.info("==> import(user={}, from={})", userName, requestingIP); + startTimestamp = System.currentTimeMillis(); processTypes(source.getTypesDef(), result); processEntities(source, result); @@ -65,12 +71,7 @@ public class ImportService { throw new AtlasBaseException(excp); } finally { - try { - source.close(); - } catch (IOException e) { - // ignore - } - + source.close(); LOG.info("<== import(user={}, from={}): status={}", userName, requestingIP, result.getOperationStatus()); } @@ -118,10 +119,14 @@ public class ImportService { setGuidToEmpty(typeDefinitionMap.getEntityDefs()); typeDefStore.updateTypesDef(typeDefinitionMap); - result.incrementMeticsCounter("Enum(s)", typeDefinitionMap.getEnumDefs().size()); - result.incrementMeticsCounter("Struct(s)", typeDefinitionMap.getStructDefs().size()); - result.incrementMeticsCounter("Classification(s)", typeDefinitionMap.getClassificationDefs().size()); - result.incrementMeticsCounter("Entity definition(s)", typeDefinitionMap.getEntityDefs().size()); + updateMetricsForTypesDef(typeDefinitionMap, result); + } + + private void updateMetricsForTypesDef(AtlasTypesDef typeDefinitionMap, AtlasImportResult result) { + result.incrementMeticsCounter("typedef:classification", typeDefinitionMap.getClassificationDefs().size()); + result.incrementMeticsCounter("typedef:enum", typeDefinitionMap.getEnumDefs().size()); + result.incrementMeticsCounter("typedef:entitydef", typeDefinitionMap.getEntityDefs().size()); + result.incrementMeticsCounter("typedef:struct", typeDefinitionMap.getStructDefs().size()); } private void setGuidToEmpty(List<AtlasEntityDef> entityDefList) { @@ -131,7 +136,9 @@ public class ImportService { } private void processEntities(ZipSource importSource, AtlasImportResult result) throws AtlasBaseException { - this.entityStore.createOrUpdate(importSource, false); - result.incrementMeticsCounter("Entities", importSource.getCreationOrder().size()); + this.entityStore.bulkImport(importSource, result); + + endTimestamp = System.currentTimeMillis(); + result.incrementMeticsCounter("Duration", (int) (this.endTimestamp - this.startTimestamp)); } } http://git-wip-us.apache.org/repos/asf/incubator-atlas/blob/1d85e95f/webapp/src/main/java/org/apache/atlas/web/resources/ZipSource.java ---------------------------------------------------------------------- diff --git a/webapp/src/main/java/org/apache/atlas/web/resources/ZipSource.java b/webapp/src/main/java/org/apache/atlas/web/resources/ZipSource.java index ea62862..e69a139 100644 --- a/webapp/src/main/java/org/apache/atlas/web/resources/ZipSource.java +++ b/webapp/src/main/java/org/apache/atlas/web/resources/ZipSource.java @@ -19,7 +19,6 @@ package org.apache.atlas.web.resources; import org.codehaus.jackson.type.TypeReference; import org.apache.atlas.exception.AtlasBaseException; -import org.apache.atlas.model.impexp.AtlasExportResult; import org.apache.atlas.model.instance.AtlasEntity; import org.apache.atlas.model.typedef.AtlasTypesDef; import org.apache.atlas.repository.store.graph.v1.EntityImportStream; @@ -28,8 +27,10 @@ import org.slf4j.Logger; import org.slf4j.LoggerFactory; import java.io.*; +import java.util.HashMap; import java.util.Iterator; import java.util.List; +import java.util.Map; import java.util.zip.ZipEntry; import java.util.zip.ZipInputStream; @@ -37,103 +38,73 @@ import java.util.zip.ZipInputStream; public class ZipSource implements EntityImportStream { private static final Logger LOG = LoggerFactory.getLogger(ZipSource.class); - private final ByteArrayInputStream inputStream; - private List<String> creationOrder; - private Iterator<String> iterator; + private final ByteArrayInputStream inputStream; + private List<String> creationOrder; + private Iterator<String> iterator; + private Map<String, String> guidEntityJsonMap; - public ZipSource(ByteArrayInputStream inputStream) { + public ZipSource(ByteArrayInputStream inputStream) throws IOException { this.inputStream = inputStream; + guidEntityJsonMap = new HashMap<>(); + updateGuidZipEntryMap(); this.setCreationOrder(); } public AtlasTypesDef getTypesDef() throws AtlasBaseException { final String fileName = ZipExportFileNames.ATLAS_TYPESDEF_NAME.toString(); - try { - String s = get(fileName); - return convertFromJson(AtlasTypesDef.class, s); - } catch (IOException e) { - LOG.error(String.format("Error retrieving '%s' from zip.", fileName), e); - return null; - } + String s = getFromCache(fileName); + return convertFromJson(AtlasTypesDef.class, s); } - public AtlasExportResult getExportResult() throws AtlasBaseException { - String fileName = ZipExportFileNames.ATLAS_EXPORT_INFO_NAME.toString(); - try { - String s = get(fileName); - return convertFromJson(AtlasExportResult.class, s); - } catch (IOException e) { - LOG.error(String.format("Error retrieving '%s' from zip.", fileName), e); - return null; - } - } - - private void setCreationOrder() { String fileName = ZipExportFileNames.ATLAS_EXPORT_ORDER_NAME.toString(); try { - String s = get(fileName); + String s = getFromCache(fileName); this.creationOrder = convertFromJson(new TypeReference<List<String>>(){}, s); this.iterator = this.creationOrder.iterator(); - } catch (IOException e) { - LOG.error(String.format("Error retrieving '%s' from zip.", fileName), e); } catch (AtlasBaseException e) { LOG.error(String.format("Error retrieving '%s' from zip.", fileName), e); } } - public List<String> getCreationOrder() throws AtlasBaseException { - return this.creationOrder; - } - - public AtlasEntity getEntity(String guid) throws AtlasBaseException { - try { - String s = get(guid); - return convertFromJson(AtlasEntity.class, s); - } catch (IOException e) { - LOG.error(String.format("Error retrieving '%s' from zip.", guid), e); - return null; - } - } - - private String get(String entryName) throws IOException { - String ret = ""; + private void updateGuidZipEntryMap() throws IOException { inputStream.reset(); ZipInputStream zipInputStream = new ZipInputStream(inputStream); - ZipEntry zipEntry = zipInputStream.getNextEntry(); - - entryName = entryName + ".json"; - + ZipEntry zipEntry = zipInputStream.getNextEntry(); while (zipEntry != null) { - if (zipEntry.getName().equals(entryName)) { - break; - } + String entryName = zipEntry.getName().replace(".json", ""); - zipEntry = zipInputStream.getNextEntry(); - } + if (guidEntityJsonMap.containsKey(entryName)) continue; + if (zipEntry == null) continue; - if (zipEntry != null) { - ByteArrayOutputStream os = new ByteArrayOutputStream(); - byte[] buf = new byte[1024]; + byte[] buf = new byte[1024]; int n = 0; + ByteArrayOutputStream bos = new ByteArrayOutputStream(); while ((n = zipInputStream.read(buf, 0, 1024)) > -1) { - os.write(buf, 0, n); + bos.write(buf, 0, n); } - ret = os.toString(); - } else { - LOG.warn("{}: no such entry in zip file", entryName); + guidEntityJsonMap.put(entryName, bos.toString()); + zipEntry = zipInputStream.getNextEntry(); + } zipInputStream.close(); + } + + public List<String> getCreationOrder() throws AtlasBaseException { + return this.creationOrder; + } - return ret; + public AtlasEntity getEntity(String guid) throws AtlasBaseException { + String s = getFromCache(guid); + return convertFromJson(AtlasEntity.class, s); } private <T> T convertFromJson(TypeReference clazz, String jsonData) throws AtlasBaseException { @@ -158,8 +129,20 @@ public class ZipSource implements EntityImportStream { } } - public void close() throws IOException { - inputStream.close(); + private String getFromCache(String entryName) { + if(!guidEntityJsonMap.containsKey(entryName)) return ""; + + return guidEntityJsonMap.get(entryName).toString(); + } + + public void close() { + try { + inputStream.close(); + guidEntityJsonMap.clear(); + } + catch(IOException ex) { + LOG.warn("{}: Error closing streams."); + } } @Override
