Repository: incubator-atlas Updated Branches: refs/heads/master 9bddaeb3c -> 525082baa
ATLAS-1618: updated export to support scope option - full/connected Signed-off-by: Madhan Neethiraj <[email protected]> Project: http://git-wip-us.apache.org/repos/asf/incubator-atlas/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-atlas/commit/525082ba Tree: http://git-wip-us.apache.org/repos/asf/incubator-atlas/tree/525082ba Diff: http://git-wip-us.apache.org/repos/asf/incubator-atlas/diff/525082ba Branch: refs/heads/master Commit: 525082baa6f48085c5f436e7b193d3f2c4f58fed Parents: 9bddaeb Author: ashutoshm <[email protected]> Authored: Tue Feb 28 17:34:26 2017 -0800 Committer: Madhan Neethiraj <[email protected]> Committed: Thu Mar 2 20:17:39 2017 -0800 ---------------------------------------------------------------------- .../atlas/model/impexp/AtlasExportRequest.java | 24 +- .../store/graph/AtlasEntityStore.java | 5 +- .../store/graph/v1/AtlasEntityStoreV1.java | 8 +- .../graph/v1/AtlasEntityStreamForImport.java | 8 + .../store/graph/v1/EntityImportStream.java | 3 + .../atlas/util/AtlasGremlin2QueryProvider.java | 6 +- .../atlas/util/AtlasGremlinQueryProvider.java | 4 +- .../atlas/typesystem/types/TypeUtils.java | 23 +- .../atlas/web/resources/AdminResource.java | 23 +- .../atlas/web/resources/ExportService.java | 303 ++++++++++++++----- .../atlas/web/resources/ImportService.java | 43 ++- .../apache/atlas/web/resources/ZipSource.java | 8 + 12 files changed, 336 insertions(+), 122 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/incubator-atlas/blob/525082ba/intg/src/main/java/org/apache/atlas/model/impexp/AtlasExportRequest.java ---------------------------------------------------------------------- diff --git a/intg/src/main/java/org/apache/atlas/model/impexp/AtlasExportRequest.java b/intg/src/main/java/org/apache/atlas/model/impexp/AtlasExportRequest.java index dcb2765..a015e9b 100644 --- a/intg/src/main/java/org/apache/atlas/model/impexp/AtlasExportRequest.java +++ b/intg/src/main/java/org/apache/atlas/model/impexp/AtlasExportRequest.java @@ -18,12 +18,6 @@ package org.apache.atlas.model.impexp; -import java.io.Serializable; -import java.util.ArrayList; -import java.util.HashMap; -import java.util.List; -import java.util.Map; - import org.apache.atlas.model.instance.AtlasObjectId; import org.apache.atlas.model.typedef.AtlasBaseTypeDef; import org.codehaus.jackson.annotate.JsonAutoDetect; @@ -33,6 +27,11 @@ import org.codehaus.jackson.map.annotate.JsonSerialize; import javax.xml.bind.annotation.XmlAccessType; import javax.xml.bind.annotation.XmlAccessorType; import javax.xml.bind.annotation.XmlRootElement; +import java.io.Serializable; +import java.util.ArrayList; +import java.util.HashMap; +import java.util.List; +import java.util.Map; import static org.codehaus.jackson.annotate.JsonAutoDetect.Visibility.NONE; import static org.codehaus.jackson.annotate.JsonAutoDetect.Visibility.PUBLIC_ONLY; @@ -44,12 +43,17 @@ import static org.codehaus.jackson.annotate.JsonAutoDetect.Visibility.PUBLIC_ONL @XmlRootElement @XmlAccessorType(XmlAccessType.PROPERTY) public class AtlasExportRequest implements Serializable { - private static final long serialVersionUID = 1L; - public static final String EXPORT_PARAM_OPTION = "FORMAT"; - public static final String EXPORT_PARAM_OPTION_FORMAT_JSON = "JSON"; - public static final String EXPORT_PARAM_OPTION_FORMAT_ZIP = "ZIP"; + private static final long serialVersionUID = 1L; + public static final String OPTION_FETCH_TYPE = "fetchType"; + public static final String OPTION_ATTR_MATCH_TYPE = "matchType"; + public static final String FETCH_TYPE_FULL = "full"; + public static final String FETCH_TYPE_CONNECTED = "connected"; + public static final String MATCH_TYPE_STARTS_WITH = "startsWith"; + public static final String MATCH_TYPE_ENDS_WITH = "endsWith"; + public static final String MATCH_TYPE_CONTAINS = "contains"; + public static final String MATCH_TYPE_MATCHES = "matches"; private List<AtlasObjectId> itemsToExport = new ArrayList<>(); private Map<String, Object> options = new HashMap<>(); http://git-wip-us.apache.org/repos/asf/incubator-atlas/blob/525082ba/repository/src/main/java/org/apache/atlas/repository/store/graph/AtlasEntityStore.java ---------------------------------------------------------------------- diff --git a/repository/src/main/java/org/apache/atlas/repository/store/graph/AtlasEntityStore.java b/repository/src/main/java/org/apache/atlas/repository/store/graph/AtlasEntityStore.java index 96ca59c..c256ae2 100644 --- a/repository/src/main/java/org/apache/atlas/repository/store/graph/AtlasEntityStore.java +++ b/repository/src/main/java/org/apache/atlas/repository/store/graph/AtlasEntityStore.java @@ -23,9 +23,10 @@ import org.apache.atlas.exception.AtlasBaseException; import org.apache.atlas.model.impexp.AtlasImportResult; import org.apache.atlas.model.instance.AtlasClassification; import org.apache.atlas.model.instance.AtlasEntity; -import org.apache.atlas.model.instance.AtlasEntity.AtlasEntityWithExtInfo; import org.apache.atlas.model.instance.AtlasEntity.AtlasEntitiesWithExtInfo; +import org.apache.atlas.model.instance.AtlasEntity.AtlasEntityWithExtInfo; import org.apache.atlas.model.instance.EntityMutationResponse; +import org.apache.atlas.repository.store.graph.v1.EntityImportStream; import org.apache.atlas.repository.store.graph.v1.EntityStream; import org.apache.atlas.type.AtlasEntityType; @@ -76,7 +77,7 @@ public interface AtlasEntityStore { * @return EntityMutationResponse Entity mutations operations with the corresponding set of entities on which these operations were performed * @throws AtlasBaseException */ - EntityMutationResponse bulkImport(EntityStream entityStream, AtlasImportResult importResult) throws AtlasBaseException; + EntityMutationResponse bulkImport(EntityImportStream entityStream, AtlasImportResult importResult) throws AtlasBaseException; /** * Update a single entity http://git-wip-us.apache.org/repos/asf/incubator-atlas/blob/525082ba/repository/src/main/java/org/apache/atlas/repository/store/graph/v1/AtlasEntityStoreV1.java ---------------------------------------------------------------------- diff --git a/repository/src/main/java/org/apache/atlas/repository/store/graph/v1/AtlasEntityStoreV1.java b/repository/src/main/java/org/apache/atlas/repository/store/graph/v1/AtlasEntityStoreV1.java index 80c0c37..518b52b 100644 --- a/repository/src/main/java/org/apache/atlas/repository/store/graph/v1/AtlasEntityStoreV1.java +++ b/repository/src/main/java/org/apache/atlas/repository/store/graph/v1/AtlasEntityStoreV1.java @@ -142,7 +142,7 @@ public class AtlasEntityStoreV1 implements AtlasEntityStore { } @Override - public EntityMutationResponse bulkImport(EntityStream entityStream, AtlasImportResult importResult) throws AtlasBaseException { + public EntityMutationResponse bulkImport(EntityImportStream entityStream, AtlasImportResult importResult) throws AtlasBaseException { if (LOG.isDebugEnabled()) { LOG.debug("==> bulkImport()"); } @@ -168,6 +168,10 @@ public class AtlasEntityStoreV1 implements AtlasEntityStore { EntityMutationResponse resp = createOrUpdate(oneEntityStream, false, true); + if(CollectionUtils.isNotEmpty(entity.getClassifications())) { + addClassifications(entity.getGuid(), entity.getClassifications()); + } + updateImportMetrics("entity:%s:created", resp.getCreatedEntities(), processedGuids, importResult); updateImportMetrics("entity:%s:updated", resp.getUpdatedEntities(), processedGuids, importResult); updateImportMetrics("entity:%s:deleted", resp.getDeletedEntities(), processedGuids, importResult); @@ -181,6 +185,8 @@ public class AtlasEntityStoreV1 implements AtlasEntityStore { if (resp.getGuidAssignments() != null) { ret.getGuidAssignments().putAll(resp.getGuidAssignments()); } + + entityStream.onImportComplete(entity.getGuid()); } importResult.getProcessedEntities().addAll(processedGuids); http://git-wip-us.apache.org/repos/asf/incubator-atlas/blob/525082ba/repository/src/main/java/org/apache/atlas/repository/store/graph/v1/AtlasEntityStreamForImport.java ---------------------------------------------------------------------- diff --git a/repository/src/main/java/org/apache/atlas/repository/store/graph/v1/AtlasEntityStreamForImport.java b/repository/src/main/java/org/apache/atlas/repository/store/graph/v1/AtlasEntityStreamForImport.java index c0b4d8d..8cb36ac 100644 --- a/repository/src/main/java/org/apache/atlas/repository/store/graph/v1/AtlasEntityStreamForImport.java +++ b/repository/src/main/java/org/apache/atlas/repository/store/graph/v1/AtlasEntityStreamForImport.java @@ -18,6 +18,9 @@ package org.apache.atlas.repository.store.graph.v1; import org.apache.atlas.model.instance.AtlasEntity; +import org.apache.atlas.model.instance.AtlasEntityHeader; + +import java.util.List; public class AtlasEntityStreamForImport extends AtlasEntityStream implements EntityImportStream { public AtlasEntityStreamForImport(AtlasEntity entity) { @@ -27,4 +30,9 @@ public class AtlasEntityStreamForImport extends AtlasEntityStream implements Ent public AtlasEntityStreamForImport(AtlasEntity entity, EntityStream entityStream) { super(entity, entityStream); } + + @Override + public void onImportComplete(String guid) { + + } } http://git-wip-us.apache.org/repos/asf/incubator-atlas/blob/525082ba/repository/src/main/java/org/apache/atlas/repository/store/graph/v1/EntityImportStream.java ---------------------------------------------------------------------- diff --git a/repository/src/main/java/org/apache/atlas/repository/store/graph/v1/EntityImportStream.java b/repository/src/main/java/org/apache/atlas/repository/store/graph/v1/EntityImportStream.java index 51ae312..73994b9 100644 --- a/repository/src/main/java/org/apache/atlas/repository/store/graph/v1/EntityImportStream.java +++ b/repository/src/main/java/org/apache/atlas/repository/store/graph/v1/EntityImportStream.java @@ -17,5 +17,8 @@ */ package org.apache.atlas.repository.store.graph.v1; + public interface EntityImportStream extends EntityStream { + + void onImportComplete(String guid); } http://git-wip-us.apache.org/repos/asf/incubator-atlas/blob/525082ba/repository/src/main/java/org/apache/atlas/util/AtlasGremlin2QueryProvider.java ---------------------------------------------------------------------- diff --git a/repository/src/main/java/org/apache/atlas/util/AtlasGremlin2QueryProvider.java b/repository/src/main/java/org/apache/atlas/util/AtlasGremlin2QueryProvider.java index 798ce38..e4777be 100644 --- a/repository/src/main/java/org/apache/atlas/util/AtlasGremlin2QueryProvider.java +++ b/repository/src/main/java/org/apache/atlas/util/AtlasGremlin2QueryProvider.java @@ -37,8 +37,12 @@ public class AtlasGremlin2QueryProvider extends AtlasGremlinQueryProvider { return "g.V().has('__superTypeNames', T.in, ['Referenceable']).has('__traitNames').count()"; case ENTITIES_FOR_TAG_METRIC: return "g.V().has('__typeName', T.in, g.V().has('__type', 'typeSystem').filter{it.'__type.category'.name() == 'TRAIT'}.'__type.name'.toSet()).groupCount{it.'__typeName'}.cap.toList()"; - case EXPORT_BY_GUID: + case EXPORT_BY_GUID_FULL: return "g.V('__guid', startGuid).bothE().bothV().has('__guid').__guid.dedup().toList()"; + case EXPORT_BY_GUID_CONNECTED_IN_EDGE: + return "g.V('__guid', startGuid).inE().outV().has('__guid').__guid.dedup().toList()"; + case EXPORT_BY_GUID_CONNECTED_OUT_EDGE: + return "g.V('__guid', startGuid).outE().inV().has('__guid').__guid.dedup().toList()"; case EXPORT_TYPE_STARTS_WITH: return "g.V().has('__typeName','%s').filter({it.'%s'.startsWith(attrValue)}).has('__guid').__guid.toList()"; case EXPORT_TYPE_ENDS_WITH: http://git-wip-us.apache.org/repos/asf/incubator-atlas/blob/525082ba/repository/src/main/java/org/apache/atlas/util/AtlasGremlinQueryProvider.java ---------------------------------------------------------------------- diff --git a/repository/src/main/java/org/apache/atlas/util/AtlasGremlinQueryProvider.java b/repository/src/main/java/org/apache/atlas/util/AtlasGremlinQueryProvider.java index ad22bf7..f076750 100644 --- a/repository/src/main/java/org/apache/atlas/util/AtlasGremlinQueryProvider.java +++ b/repository/src/main/java/org/apache/atlas/util/AtlasGremlinQueryProvider.java @@ -42,7 +42,9 @@ public abstract class AtlasGremlinQueryProvider { ENTITIES_FOR_TAG_METRIC, // Import Export related Queries - EXPORT_BY_GUID, + EXPORT_BY_GUID_FULL, + EXPORT_BY_GUID_CONNECTED_IN_EDGE, + EXPORT_BY_GUID_CONNECTED_OUT_EDGE, EXPORT_TYPE_STARTS_WITH, EXPORT_TYPE_ENDS_WITH, EXPORT_TYPE_CONTAINS, http://git-wip-us.apache.org/repos/asf/incubator-atlas/blob/525082ba/typesystem/src/main/java/org/apache/atlas/typesystem/types/TypeUtils.java ---------------------------------------------------------------------- diff --git a/typesystem/src/main/java/org/apache/atlas/typesystem/types/TypeUtils.java b/typesystem/src/main/java/org/apache/atlas/typesystem/types/TypeUtils.java index f5c2ce9..6a14dc4 100755 --- a/typesystem/src/main/java/org/apache/atlas/typesystem/types/TypeUtils.java +++ b/typesystem/src/main/java/org/apache/atlas/typesystem/types/TypeUtils.java @@ -22,12 +22,7 @@ import com.google.common.collect.ImmutableList; import org.apache.atlas.AtlasException; import java.io.IOException; -import java.util.ArrayList; -import java.util.HashMap; -import java.util.HashSet; -import java.util.List; -import java.util.Map; -import java.util.Set; +import java.util.*; import java.util.regex.Matcher; import java.util.regex.Pattern; @@ -82,6 +77,22 @@ public class TypeUtils { public static <L, R> Pair<L, R> of(L left, R right) { return new Pair<>(left, right); } + + public boolean equals(Object o) { + if (this == o) { + return true; + } + + if (o == null || getClass() != o.getClass()) { + return false; + } + + Pair p = (Pair)o; + + return Objects.equals(left, p.left) && Objects.equals(right, p.right); + } + + public int hashCode() { return Objects.hash(left, right); } } /** http://git-wip-us.apache.org/repos/asf/incubator-atlas/blob/525082ba/webapp/src/main/java/org/apache/atlas/web/resources/AdminResource.java ---------------------------------------------------------------------- diff --git a/webapp/src/main/java/org/apache/atlas/web/resources/AdminResource.java b/webapp/src/main/java/org/apache/atlas/web/resources/AdminResource.java index f11d2d1..31a4cf9 100755 --- a/webapp/src/main/java/org/apache/atlas/web/resources/AdminResource.java +++ b/webapp/src/main/java/org/apache/atlas/web/resources/AdminResource.java @@ -22,7 +22,9 @@ import com.google.inject.Inject; import org.apache.atlas.ApplicationProperties; import org.apache.atlas.AtlasClient; import org.apache.atlas.AtlasErrorCode; -import org.apache.atlas.AtlasException; +import org.apache.atlas.authorize.AtlasActionTypes; +import org.apache.atlas.authorize.AtlasResourceTypes; +import org.apache.atlas.authorize.simple.AtlasAuthorizationUtils; import org.apache.atlas.exception.AtlasBaseException; import org.apache.atlas.model.impexp.AtlasExportRequest; import org.apache.atlas.model.impexp.AtlasExportResult; @@ -31,14 +33,12 @@ import org.apache.atlas.model.impexp.AtlasImportResult; import org.apache.atlas.model.metrics.AtlasMetrics; import org.apache.atlas.repository.store.graph.AtlasEntityStore; import org.apache.atlas.services.MetricsService; -import org.apache.atlas.authorize.AtlasActionTypes; -import org.apache.atlas.authorize.AtlasResourceTypes; -import org.apache.atlas.authorize.simple.AtlasAuthorizationUtils; import org.apache.atlas.store.AtlasTypeDefStore; import org.apache.atlas.type.AtlasTypeRegistry; import org.apache.atlas.web.filters.AtlasCSRFPreventionFilter; import org.apache.atlas.web.service.ServiceState; import org.apache.atlas.web.util.Servlets; +import org.apache.commons.configuration.Configuration; import org.apache.commons.configuration.ConfigurationException; import org.apache.commons.configuration.PropertiesConfiguration; import org.apache.commons.lang.StringUtils; @@ -54,13 +54,7 @@ import javax.inject.Singleton; import javax.servlet.ServletOutputStream; import javax.servlet.http.HttpServletRequest; import javax.servlet.http.HttpServletResponse; -import javax.ws.rs.Consumes; -import javax.ws.rs.GET; -import javax.ws.rs.POST; -import javax.ws.rs.Path; -import javax.ws.rs.Produces; -import javax.ws.rs.QueryParam; -import javax.ws.rs.WebApplicationException; +import javax.ws.rs.*; import javax.ws.rs.core.Context; import javax.ws.rs.core.MediaType; import javax.ws.rs.core.Response; @@ -70,9 +64,6 @@ import java.util.Collection; import java.util.HashSet; import java.util.Set; import java.util.concurrent.locks.ReentrantLock; -import org.apache.commons.configuration.Configuration; - -import static org.apache.atlas.repository.converters.AtlasInstanceConverter.toAtlasBaseException; /** @@ -369,7 +360,7 @@ public class AdminResource { try { AtlasImportRequest request = new AtlasImportRequest(Servlets.getParameterMap(httpServletRequest)); ByteArrayInputStream inputStream = new ByteArrayInputStream(bytes); - ImportService importService = new ImportService(this.typesDefStore, this.entityStore); + ImportService importService = new ImportService(this.typesDefStore, this.entityStore, this.typeRegistry); ZipSource zipSource = new ZipSource(inputStream); @@ -405,7 +396,7 @@ public class AdminResource { try { AtlasImportRequest request = new AtlasImportRequest(Servlets.getParameterMap(httpServletRequest)); - ImportService importService = new ImportService(this.typesDefStore, this.entityStore); + ImportService importService = new ImportService(this.typesDefStore, this.entityStore, this.typeRegistry); result = importService.run(request, Servlets.getUserName(httpServletRequest), Servlets.getHostName(httpServletRequest), http://git-wip-us.apache.org/repos/asf/incubator-atlas/blob/525082ba/webapp/src/main/java/org/apache/atlas/web/resources/ExportService.java ---------------------------------------------------------------------- diff --git a/webapp/src/main/java/org/apache/atlas/web/resources/ExportService.java b/webapp/src/main/java/org/apache/atlas/web/resources/ExportService.java index df8bf33..9c06b4b 100644 --- a/webapp/src/main/java/org/apache/atlas/web/resources/ExportService.java +++ b/webapp/src/main/java/org/apache/atlas/web/resources/ExportService.java @@ -27,6 +27,7 @@ import org.apache.atlas.model.impexp.AtlasExportResult; import org.apache.atlas.model.instance.AtlasClassification; import org.apache.atlas.model.instance.AtlasEntity; import org.apache.atlas.model.instance.AtlasObjectId; +import org.apache.atlas.model.typedef.AtlasBaseTypeDef; import org.apache.atlas.model.typedef.AtlasClassificationDef; import org.apache.atlas.model.typedef.AtlasEntityDef; import org.apache.atlas.model.typedef.AtlasTypesDef; @@ -47,65 +48,44 @@ import org.slf4j.LoggerFactory; import javax.script.Bindings; import javax.script.ScriptContext; -import javax.script.ScriptEngine; import javax.script.ScriptException; import java.util.ArrayList; import java.util.Collections; +import java.util.HashMap; import java.util.HashSet; import java.util.List; import java.util.Map; import java.util.Set; +import static org.apache.atlas.model.impexp.AtlasExportRequest.OPTION_FETCH_TYPE; +import static org.apache.atlas.model.impexp.AtlasExportRequest.OPTION_ATTR_MATCH_TYPE; +import static org.apache.atlas.model.impexp.AtlasExportRequest.FETCH_TYPE_FULL; +import static org.apache.atlas.model.impexp.AtlasExportRequest.FETCH_TYPE_CONNECTED; +import static org.apache.atlas.model.impexp.AtlasExportRequest.MATCH_TYPE_STARTS_WITH; +import static org.apache.atlas.model.impexp.AtlasExportRequest.MATCH_TYPE_CONTAINS; +import static org.apache.atlas.model.impexp.AtlasExportRequest.MATCH_TYPE_MATCHES; +import static org.apache.atlas.model.impexp.AtlasExportRequest.MATCH_TYPE_ENDS_WITH; public class ExportService { private static final Logger LOG = LoggerFactory.getLogger(ExportService.class); - public static final String OPTION_ATTR_MATCH_TYPE = "matchType"; - public static final String MATCH_TYPE_STARTS_WITH = "startsWith"; - public static final String MATCH_TYPE_ENDS_WITH = "endsWith"; - public static final String MATCH_TYPE_CONTAINS = "contains"; - public static final String MATCH_TYPE_MATCHES = "matches"; - - private final AtlasTypeRegistry typeRegistry; - private final AtlasGraph atlasGraph; - private final EntityGraphRetriever entityGraphRetriever; + private final AtlasTypeRegistry typeRegistry; + private final AtlasGraph atlasGraph; + private final EntityGraphRetriever entityGraphRetriever; private final AtlasGremlinQueryProvider gremlinQueryProvider; - // query engine support - private final ScriptEngine scriptEngine; - private final Bindings bindings; - public ExportService(final AtlasTypeRegistry typeRegistry) throws AtlasBaseException { this.typeRegistry = typeRegistry; this.entityGraphRetriever = new EntityGraphRetriever(this.typeRegistry); this.atlasGraph = AtlasGraphProvider.getGraphInstance(); this.gremlinQueryProvider = AtlasGremlinQueryProvider.INSTANCE; - - this.scriptEngine = new GremlinGroovyScriptEngine(); - - //Do not cache script compilations due to memory implications - scriptEngine.getContext().setAttribute("#jsr223.groovy.engine.keep.globals", "phantom", ScriptContext.ENGINE_SCOPE); - - bindings = scriptEngine.getBindings(ScriptContext.ENGINE_SCOPE); - } - - private class ExportContext { - final Set<String> guidsProcessed = new HashSet<>(); - final List<String> guidsToProcess = new ArrayList<>(); - final AtlasExportResult result; - final ZipSink sink; - - ExportContext(AtlasExportResult result, ZipSink sink) { - this.result = result; - this.sink = sink; - } } public AtlasExportResult run(ZipSink exportSink, AtlasExportRequest request, String userName, String hostName, String requestingIP) throws AtlasBaseException { - long startTimestamp = System.currentTimeMillis(); - ExportContext context = new ExportContext(new AtlasExportResult(request, userName, hostName, requestingIP, - System.currentTimeMillis()), exportSink); + long startTime = System.currentTimeMillis(); + AtlasExportResult result = new AtlasExportResult(request, userName, hostName, requestingIP, startTime); + ExportContext context = new ExportContext(result, exportSink); try { LOG.info("==> export(user={}, from={})", userName, requestingIP); @@ -114,13 +94,14 @@ public class ExportService { processObjectId(item, context); } + long endTime = System.currentTimeMillis(); + context.sink.setExportOrder(context.result.getData().getEntityCreationOrder()); context.sink.setTypesDef(context.result.getData().getTypesDef()); context.result.setData(null); context.result.setOperationStatus(AtlasExportResult.OperationStatus.SUCCESS); + context.result.incrementMeticsCounter("duration", (int) (endTime - startTime)); - long endTimestamp = System.currentTimeMillis(); - context.result.incrementMeticsCounter("duration", (int) (endTimestamp - startTimestamp)); context.sink.setResult(context.result); } catch(Exception ex) { LOG.error("Operation failed: ", ex); @@ -140,15 +121,15 @@ public class ExportService { List<AtlasEntity> entities = getStartingEntity(item, context); for (AtlasEntity entity: entities) { - processEntity(entity, context); + processEntity(entity, context, TraversalDirection.UNKNOWN); } while (!context.guidsToProcess.isEmpty()) { - String guid = context.guidsToProcess.remove(0); + String guid = context.guidsToProcess.remove(0); + TraversalDirection direction = context.guidDirection.get(guid); + AtlasEntity entity = entityGraphRetriever.toAtlasEntity(guid); - AtlasEntity e = entityGraphRetriever.toAtlasEntity(guid); - - processEntity(e, context); + processEntity(entity, context, direction); } } catch (AtlasBaseException excp) { context.result.setOperationStatus(AtlasExportResult.OperationStatus.PARTIAL_SUCCESS); @@ -178,23 +159,14 @@ public class ExportService { throw new AtlasBaseException(AtlasErrorCode.UNKNOWN_TYPENAME, typeName); } - AtlasExportRequest request = context.result.getRequest(); - String matchType = null; - - if (MapUtils.isNotEmpty(request.getOptions())) { - if (request.getOptions().get(OPTION_ATTR_MATCH_TYPE) != null) { - matchType = request.getOptions().get(OPTION_ATTR_MATCH_TYPE).toString(); - } - } - final String queryTemplate; - if (StringUtils.equalsIgnoreCase(matchType, MATCH_TYPE_STARTS_WITH)) { + if (StringUtils.equalsIgnoreCase(context.matchType, MATCH_TYPE_STARTS_WITH)) { queryTemplate = gremlinQueryProvider.getQuery(AtlasGremlinQuery.EXPORT_TYPE_STARTS_WITH); - } else if (StringUtils.equalsIgnoreCase(matchType, MATCH_TYPE_ENDS_WITH)) { + } else if (StringUtils.equalsIgnoreCase(context.matchType, MATCH_TYPE_ENDS_WITH)) { queryTemplate = gremlinQueryProvider.getQuery(AtlasGremlinQuery.EXPORT_TYPE_ENDS_WITH); - } else if (StringUtils.equalsIgnoreCase(matchType, MATCH_TYPE_CONTAINS)) { + } else if (StringUtils.equalsIgnoreCase(context.matchType, MATCH_TYPE_CONTAINS)) { queryTemplate = gremlinQueryProvider.getQuery(AtlasGremlinQuery.EXPORT_TYPE_CONTAINS); - } else if (StringUtils.equalsIgnoreCase(matchType, MATCH_TYPE_MATCHES)) { + } else if (StringUtils.equalsIgnoreCase(context.matchType, MATCH_TYPE_MATCHES)) { queryTemplate = gremlinQueryProvider.getQuery(AtlasGremlinQuery.EXPORT_TYPE_MATCHES); } else { // default queryTemplate = gremlinQueryProvider.getQuery(AtlasGremlinQuery.EXPORT_TYPE_DEFAULT); @@ -211,7 +183,7 @@ public class ExportService { } String query = String.format(queryTemplate, typeName, attribute.getQualifiedName()); - List<String> guids = executeGremlinScriptFor(query, "attrValue", attrValue.toString()); + List<String> guids = executeGremlinQuery(query, "attrValue", attrValue.toString(), context); if (CollectionUtils.isNotEmpty(guids)) { for (String guid : guids) { @@ -228,13 +200,13 @@ public class ExportService { break; } - LOG.info("export(item={}; matchType={}): found {} entities", item, matchType, ret.size()); + LOG.info("export(item={}; matchType={}, fetchType={}): found {} entities", item, context.matchType, context.fetchType, ret.size()); } return ret; } - private void processEntity(AtlasEntity entity, ExportContext context) throws AtlasBaseException { + private void processEntity(AtlasEntity entity, ExportContext context, TraversalDirection direction) throws AtlasBaseException { if (LOG.isDebugEnabled()) { LOG.debug("==> processEntity({})", AtlasTypeUtil.getAtlasObjectId(entity)); } @@ -247,7 +219,7 @@ public class ExportService { addClassificationsAsNeeded(entity, context); addEntity(entity, context); - getConnectedEntityGuids(entity, context); + getConntedEntitiesBasedOnOption(entity, context, direction); } if (LOG.isDebugEnabled()) { @@ -255,26 +227,125 @@ public class ExportService { } } - private void getConnectedEntityGuids(AtlasEntity entity, ExportContext context) { + private void getConntedEntitiesBasedOnOption(AtlasEntity entity, ExportContext context, TraversalDirection direction) throws AtlasBaseException { + switch (context.fetchType) { + case CONNECTED: + getEntityGuidsForConnectedFetch(entity, context, direction); + break; + + case FULL: + default: + getEntityGuidsForFullFetch(entity, context); + } + } + + private void getEntityGuidsForConnectedFetch(AtlasEntity entity, ExportContext context, TraversalDirection direction) throws AtlasBaseException { + if (direction == TraversalDirection.UNKNOWN) { + getConnectedEntityGuids(entity, context, TraversalDirection.OUTWARD, TraversalDirection.OUTWARD); + } else { + if (isProcessEntity(entity)) { + direction = TraversalDirection.OUTWARD; + } + + getConnectedEntityGuids(entity, context, direction); + } + } + + private boolean isProcessEntity(AtlasEntity entity) throws AtlasBaseException { + String typeName = entity.getTypeName(); + AtlasEntityType entityType = typeRegistry.getEntityTypeByName(typeName); + + return entityType.isSubTypeOf(AtlasBaseTypeDef.ATLAS_TYPE_PROCESS); + } + + private void getConnectedEntityGuids(AtlasEntity entity, ExportContext context, TraversalDirection... directions) { + if(directions == null) { + return; + } + + try { + for (TraversalDirection direction : directions) { + String query = getQueryForTraversalDirection(direction); + + if (LOG.isDebugEnabled()) { + LOG.debug("==> getConnectedEntityGuids({}): guidsToProcess {} query {}", AtlasTypeUtil.getAtlasObjectId(entity), context.guidsToProcess.size(), query); + } + + List<String> guids = executeGremlinQuery(query, entity.getGuid(), context); + + if (CollectionUtils.isEmpty(guids)) { + continue; + } + + for (String guid : guids) { + TraversalDirection currentDirection = context.guidDirection.get(guid); + + if (currentDirection == null) { + context.guidDirection.put(guid, direction); + + if (!context.guidsToProcess.contains(guid)) { + context.guidsToProcess.add(guid); + } + } else if (currentDirection == TraversalDirection.OUTWARD && direction == TraversalDirection.INWARD) { + context.guidDirection.put(guid, direction); + + // the entity should be reprocessed to get inward entities + context.guidsProcessed.remove(guid); + + if (!context.guidsToProcess.contains(guid)) { + context.guidsToProcess.add(guid); + } + } + } + + if (LOG.isDebugEnabled()) { + LOG.debug("<== getConnectedEntityGuids({}): found {} guids; guidsToProcess {}", entity.getGuid(), guids.size(), context.guidsToProcess.size()); + } + } + + } catch (ScriptException e) { + LOG.error("Child entities could not be added for %s", entity.getGuid()); + } + } + + private String getQueryForTraversalDirection(TraversalDirection direction) { + switch (direction) { + case INWARD: + return this.gremlinQueryProvider.getQuery(AtlasGremlinQuery.EXPORT_BY_GUID_CONNECTED_IN_EDGE); + + default: + case OUTWARD: + return this.gremlinQueryProvider.getQuery(AtlasGremlinQuery.EXPORT_BY_GUID_CONNECTED_OUT_EDGE); + } + } + + private void getEntityGuidsForFullFetch(AtlasEntity entity, ExportContext context) { try { + String query = this.gremlinQueryProvider.getQuery(AtlasGremlinQuery.EXPORT_BY_GUID_FULL); + if (LOG.isDebugEnabled()) { - LOG.debug("==> getConnectedEntityGuids({}): guidsToProcess {}", AtlasTypeUtil.getAtlasObjectId(entity), context.guidsToProcess.size()); + LOG.debug("==> getEntityGuidsForFullFetch({}): guidsToProcess {}", AtlasTypeUtil.getAtlasObjectId(entity), context.guidsToProcess.size()); } - List<String> result = executeGremlinScriptForHive(entity.getGuid()); - if(result == null) { + List<String> result = executeGremlinQuery(query, entity.getGuid(), context); + + if (result == null) { return; } for (String guid : result) { if (!context.guidsProcessed.contains(guid)) { - context.guidsToProcess.add(guid); + if (!context.guidsToProcess.contains(guid)) { + context.guidsToProcess.add(guid); + } + + context.guidDirection.put(guid, TraversalDirection.BOTH); } } if (LOG.isDebugEnabled()) { - LOG.debug("<== getConnectedEntityGuids({}): found {} guids; guidsToProcess {}", entity.getGuid(), result.size(), context.guidsToProcess.size()); + LOG.debug("<== getEntityGuidsForFullFetch({}): found {} guids; guidsToProcess {}", entity.getGuid(), result.size(), context.guidsToProcess.size()); } } catch (ScriptException e) { LOG.error("Child entities could not be added for %s", entity.getGuid()); @@ -322,16 +393,19 @@ public class ExportService { } } - private List<String> executeGremlinScriptForHive(String guid) throws ScriptException { - String queryByGuid = gremlinQueryProvider.getQuery(AtlasGremlinQuery.EXPORT_BY_GUID); - return executeGremlinScriptFor(queryByGuid, "startGuid", guid); + private List<String> executeGremlinQuery(String query, String guid, ExportContext context) throws ScriptException { + context.bindings.put("startGuid", guid); + return (List<String>) atlasGraph.executeGremlinScript(context.scriptEngine, + context.bindings, + query, + false); } - private List<String> executeGremlinScriptFor(String query, String parameterName, String parameterValue) { - bindings.put(parameterName, parameterValue); + private List<String> executeGremlinQuery(String query, String parameterName, String parameterValue, ExportContext context) { + context.bindings.put(parameterName, parameterValue); try { - return (List<String>) atlasGraph.executeGremlinScript(this.scriptEngine, - this.bindings, + return (List<String>) atlasGraph.executeGremlinScript(context.scriptEngine, + context.bindings, query, false); } catch (ScriptException e) { @@ -339,4 +413,87 @@ public class ExportService { return null; } } + + + private enum TraversalDirection { + UNKNOWN, + INWARD, + OUTWARD, + BOTH; + } + + + public enum ExportFetchType { + FULL(FETCH_TYPE_FULL), + CONNECTED(FETCH_TYPE_CONNECTED); + + final String str; + ExportFetchType(String s) { + this.str = s; + } + + public static final ExportFetchType from(String s) { + for (ExportFetchType b : ExportFetchType.values()) { + if (b.str.equalsIgnoreCase(s)) { + return b; + } + } + + return FULL; + } + } + + + private class ExportContext { + final Set<String> guidsProcessed = new HashSet<>(); + final List<String> guidsToProcess = new ArrayList<>(); + final Map<String, TraversalDirection> guidDirection = new HashMap<>(); + final AtlasExportResult result; + final ZipSink sink; + + private final GremlinGroovyScriptEngine scriptEngine; + private final Bindings bindings; + private final ExportFetchType fetchType; + private final String matchType; + + ExportContext(AtlasExportResult result, ZipSink sink) { + this.result = result; + this.sink = sink; + + this.scriptEngine = new GremlinGroovyScriptEngine(); + + //Do not cache script compilations due to memory implications + scriptEngine.getContext().setAttribute("#jsr223.groovy.engine.keep.globals", + "phantom", + ScriptContext.ENGINE_SCOPE); + + bindings = scriptEngine.getBindings(ScriptContext.ENGINE_SCOPE); + fetchType = getFetchType(result.getRequest()); + matchType = getMatchType(result.getRequest()); + } + + private ExportFetchType getFetchType(AtlasExportRequest request) { + Object fetchOption = request.getOptions() != null ? request.getOptions().get(OPTION_FETCH_TYPE) : null; + + if (fetchOption instanceof String) { + return ExportFetchType.from((String) fetchOption); + } else if (fetchOption instanceof ExportFetchType) { + return (ExportFetchType) fetchOption; + } + + return ExportFetchType.FULL; + } + + private String getMatchType(AtlasExportRequest request) { + String matchType = null; + + if (MapUtils.isNotEmpty(request.getOptions())) { + if (request.getOptions().get(OPTION_ATTR_MATCH_TYPE) != null) { + matchType = request.getOptions().get(OPTION_ATTR_MATCH_TYPE).toString(); + } + } + + return matchType; + } + } } http://git-wip-us.apache.org/repos/asf/incubator-atlas/blob/525082ba/webapp/src/main/java/org/apache/atlas/web/resources/ImportService.java ---------------------------------------------------------------------- diff --git a/webapp/src/main/java/org/apache/atlas/web/resources/ImportService.java b/webapp/src/main/java/org/apache/atlas/web/resources/ImportService.java index 1bd705d..857553d 100644 --- a/webapp/src/main/java/org/apache/atlas/web/resources/ImportService.java +++ b/webapp/src/main/java/org/apache/atlas/web/resources/ImportService.java @@ -18,21 +18,22 @@ package org.apache.atlas.web.resources; import org.apache.atlas.AtlasErrorCode; +import org.apache.atlas.exception.AtlasBaseException; import org.apache.atlas.model.impexp.AtlasImportRequest; import org.apache.atlas.model.impexp.AtlasImportResult; -import org.apache.atlas.model.instance.EntityMutationResponse; import org.apache.atlas.model.typedef.*; +import org.apache.atlas.repository.store.bootstrap.AtlasTypeDefStoreInitializer; import org.apache.atlas.repository.store.graph.AtlasEntityStore; -import org.apache.commons.io.FileUtils; -import org.apache.atlas.exception.AtlasBaseException; import org.apache.atlas.store.AtlasTypeDefStore; +import org.apache.atlas.type.AtlasTypeRegistry; +import org.apache.commons.io.FileUtils; import org.apache.commons.lang3.StringUtils; import org.slf4j.Logger; import org.slf4j.LoggerFactory; -import java.io.*; -import java.util.List; -import java.util.concurrent.TimeUnit; +import java.io.ByteArrayInputStream; +import java.io.File; +import java.io.FileNotFoundException; public class ImportService { @@ -40,14 +41,16 @@ public class ImportService { private final AtlasTypeDefStore typeDefStore; private final AtlasEntityStore entityStore; + private final AtlasTypeRegistry typeRegistry; private long startTimestamp; private long endTimestamp; - public ImportService(final AtlasTypeDefStore typeDefStore, final AtlasEntityStore entityStore) { + public ImportService(final AtlasTypeDefStore typeDefStore, final AtlasEntityStore entityStore, AtlasTypeRegistry typeRegistry) { this.typeDefStore = typeDefStore; this.entityStore = entityStore; + this.typeRegistry = typeRegistry; } public AtlasImportResult run(ZipSource source, AtlasImportRequest request, String userName, @@ -116,9 +119,13 @@ public class ImportService { } private void processTypes(AtlasTypesDef typeDefinitionMap, AtlasImportResult result) throws AtlasBaseException { - setGuidToEmpty(typeDefinitionMap.getEntityDefs()); - typeDefStore.updateTypesDef(typeDefinitionMap); + setGuidToEmpty(typeDefinitionMap); + AtlasTypesDef typesToCreate = AtlasTypeDefStoreInitializer.getTypesToCreate(typeDefinitionMap, this.typeRegistry); + if (!typesToCreate.isEmpty()) { + typeDefStore.createTypesDef(typesToCreate); + } + typeDefStore.updateTypesDef(typeDefinitionMap); updateMetricsForTypesDef(typeDefinitionMap, result); } @@ -129,9 +136,21 @@ public class ImportService { result.incrementMeticsCounter("typedef:struct", typeDefinitionMap.getStructDefs().size()); } - private void setGuidToEmpty(List<AtlasEntityDef> entityDefList) { - for (AtlasEntityDef edf: entityDefList) { - edf.setGuid(""); + private void setGuidToEmpty(AtlasTypesDef typesDef) { + for (AtlasEntityDef def: typesDef.getEntityDefs()) { + def.setGuid(null); + } + + for (AtlasClassificationDef def: typesDef.getClassificationDefs()) { + def.setGuid(null); + } + + for (AtlasEnumDef def: typesDef.getEnumDefs()) { + def.setGuid(null); + } + + for (AtlasStructDef def: typesDef.getStructDefs()) { + def.setGuid(null); } } http://git-wip-us.apache.org/repos/asf/incubator-atlas/blob/525082ba/webapp/src/main/java/org/apache/atlas/web/resources/ZipSource.java ---------------------------------------------------------------------- diff --git a/webapp/src/main/java/org/apache/atlas/web/resources/ZipSource.java b/webapp/src/main/java/org/apache/atlas/web/resources/ZipSource.java index 4596084..a69f7fa 100644 --- a/webapp/src/main/java/org/apache/atlas/web/resources/ZipSource.java +++ b/webapp/src/main/java/org/apache/atlas/web/resources/ZipSource.java @@ -17,6 +17,7 @@ */ package org.apache.atlas.web.resources; +import org.apache.atlas.model.instance.AtlasEntityHeader; import org.codehaus.jackson.type.TypeReference; import org.apache.atlas.exception.AtlasBaseException; import org.apache.atlas.model.instance.AtlasEntity; @@ -184,4 +185,11 @@ public class ZipSource implements EntityImportStream { return null; } } + + @Override + public void onImportComplete(String guid) { + if(guid != null) { + guidEntityJsonMap.remove(guid); + } + } }
