ATLAS-1503: optimization of export implementation Signed-off-by: Madhan Neethiraj <[email protected]> (cherry picked from commit 7154e12d13fe4ccba4207bbf6f411353411f06b2)
Project: http://git-wip-us.apache.org/repos/asf/incubator-atlas/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-atlas/commit/5b59acd7 Tree: http://git-wip-us.apache.org/repos/asf/incubator-atlas/tree/5b59acd7 Diff: http://git-wip-us.apache.org/repos/asf/incubator-atlas/diff/5b59acd7 Branch: refs/heads/0.8-incubating Commit: 5b59acd7d3b435521b28e0a36968275750738132 Parents: 2cf954a Author: ashutoshm <[email protected]> Authored: Fri Mar 10 10:50:51 2017 -0800 Committer: Madhan Neethiraj <[email protected]> Committed: Fri Mar 17 00:35:16 2017 -0700 ---------------------------------------------------------------------- .../atlas/model/impexp/AtlasExportResult.java | 20 +++++++ .../atlas/web/resources/AdminResource.java | 10 ++-- .../atlas/web/resources/ExportService.java | 62 +++++++++++++++----- .../org/apache/atlas/web/resources/ZipSink.java | 18 +----- 4 files changed, 74 insertions(+), 36 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/incubator-atlas/blob/5b59acd7/intg/src/main/java/org/apache/atlas/model/impexp/AtlasExportResult.java ---------------------------------------------------------------------- diff --git a/intg/src/main/java/org/apache/atlas/model/impexp/AtlasExportResult.java b/intg/src/main/java/org/apache/atlas/model/impexp/AtlasExportResult.java index e6a967e..8f3075e 100644 --- a/intg/src/main/java/org/apache/atlas/model/impexp/AtlasExportResult.java +++ b/intg/src/main/java/org/apache/atlas/model/impexp/AtlasExportResult.java @@ -180,6 +180,16 @@ public class AtlasExportResult implements Serializable { return toString(new StringBuilder()).toString(); } + public void clear() { + if(this.data != null) { + this.data.clear(); + } + + if(this.metrics != null) { + this.metrics.clear(); + } + } + @JsonAutoDetect(getterVisibility=PUBLIC_ONLY, setterVisibility=PUBLIC_ONLY, fieldVisibility=NONE) @JsonSerialize(include=JsonSerialize.Inclusion.NON_NULL) @JsonIgnoreProperties(ignoreUnknown=true) @@ -234,5 +244,15 @@ public class AtlasExportResult implements Serializable { public String toString() { return toString(new StringBuilder()).toString(); } + + public void clear() { + if(this.typesDef!= null) { + this.typesDef.clear(); + } + + if(this.entityCreationOrder != null) { + this.entityCreationOrder.clear(); + } + } } } \ No newline at end of file http://git-wip-us.apache.org/repos/asf/incubator-atlas/blob/5b59acd7/webapp/src/main/java/org/apache/atlas/web/resources/AdminResource.java ---------------------------------------------------------------------- diff --git a/webapp/src/main/java/org/apache/atlas/web/resources/AdminResource.java b/webapp/src/main/java/org/apache/atlas/web/resources/AdminResource.java index 31a4cf9..0dfdeb2 100755 --- a/webapp/src/main/java/org/apache/atlas/web/resources/AdminResource.java +++ b/webapp/src/main/java/org/apache/atlas/web/resources/AdminResource.java @@ -51,7 +51,6 @@ import org.springframework.security.core.GrantedAuthority; import org.springframework.security.core.context.SecurityContextHolder; import javax.inject.Singleton; -import javax.servlet.ServletOutputStream; import javax.servlet.http.HttpServletRequest; import javax.servlet.http.HttpServletResponse; import javax.ws.rs.*; @@ -309,7 +308,7 @@ public class AdminResource { ZipSink exportSink = null; try { - exportSink = new ZipSink(); + exportSink = new ZipSink(httpServletResponse.getOutputStream()); ExportService exportService = new ExportService(this.typeRegistry); AtlasExportResult result = exportService.run(exportSink, request, Servlets.getUserName(httpServletRequest), @@ -318,14 +317,13 @@ public class AdminResource { exportSink.close(); - ServletOutputStream outStream = httpServletResponse.getOutputStream(); - exportSink.writeTo(outStream); - + httpServletResponse.addHeader("Content-Encoding","gzip"); httpServletResponse.setContentType("application/zip"); httpServletResponse.setHeader("Content-Disposition", "attachment; filename=" + result.getClass().getSimpleName()); + httpServletResponse.setHeader("Transfer-Encoding", "chunked"); - outStream.flush(); + httpServletResponse.getOutputStream().flush(); return Response.ok().build(); } catch (IOException excp) { LOG.error("export() failed", excp); http://git-wip-us.apache.org/repos/asf/incubator-atlas/blob/5b59acd7/webapp/src/main/java/org/apache/atlas/web/resources/ExportService.java ---------------------------------------------------------------------- diff --git a/webapp/src/main/java/org/apache/atlas/web/resources/ExportService.java b/webapp/src/main/java/org/apache/atlas/web/resources/ExportService.java index c1891e0..e123ff7 100644 --- a/webapp/src/main/java/org/apache/atlas/web/resources/ExportService.java +++ b/webapp/src/main/java/org/apache/atlas/web/resources/ExportService.java @@ -105,8 +105,9 @@ public class ExportService { LOG.error("Operation failed: ", ex); } finally { atlasGraph.releaseGremlinScriptEngine(context.scriptEngine); - LOG.info("<== export(user={}, from={}): status {}", userName, requestingIP, context.result.getOperationStatus()); + context.clear(); + result.clear(); } return context.result; @@ -124,8 +125,8 @@ public class ExportService { processEntity(entity, context, TraversalDirection.UNKNOWN); } - while (!context.guidsToProcess.isEmpty()) { - String guid = context.guidsToProcess.remove(0); + while (!context.guidsToProcessIsEmpty()) { + String guid = context.guidsToProcessRemove(0); TraversalDirection direction = context.guidDirection.get(guid); AtlasEntity entity = entityGraphRetriever.toAtlasEntity(guid); @@ -245,7 +246,7 @@ public class ExportService { private void getEntityGuidsForConnectedFetch(AtlasEntity entity, ExportContext context, TraversalDirection direction) throws AtlasBaseException { if (direction == TraversalDirection.UNKNOWN) { - getConnectedEntityGuids(entity, context, TraversalDirection.OUTWARD, TraversalDirection.OUTWARD); + getConnectedEntityGuids(entity, context, TraversalDirection.OUTWARD, TraversalDirection.INWARD); } else { if (isProcessEntity(entity)) { direction = TraversalDirection.OUTWARD; @@ -271,7 +272,7 @@ public class ExportService { String query = getQueryForTraversalDirection(direction); if (LOG.isDebugEnabled()) { - LOG.debug("==> getConnectedEntityGuids({}): guidsToProcess {} query {}", AtlasTypeUtil.getAtlasObjectId(entity), context.guidsToProcess.size(), query); + LOG.debug("==> getConnectedEntityGuids({}): guidsToProcess {} query {}", AtlasTypeUtil.getAtlasObjectId(entity), context.guidsToProcessSize(), query); } context.bindings.clear(); @@ -289,8 +290,8 @@ public class ExportService { if (currentDirection == null) { context.guidDirection.put(guid, direction); - if (!context.guidsToProcess.contains(guid)) { - context.guidsToProcess.add(guid); + if (!context.guidsToProcessContains(guid)) { + context.guidsToProcessAdd(guid); } } else if (currentDirection == TraversalDirection.OUTWARD && direction == TraversalDirection.INWARD) { context.guidDirection.put(guid, direction); @@ -298,14 +299,14 @@ public class ExportService { // the entity should be reprocessed to get inward entities context.guidsProcessed.remove(guid); - if (!context.guidsToProcess.contains(guid)) { - context.guidsToProcess.add(guid); + if (!context.guidsToProcessContains(guid)) { + context.guidsToProcessAdd(guid); } } } if (LOG.isDebugEnabled()) { - LOG.debug("<== getConnectedEntityGuids({}): found {} guids; guidsToProcess {}", entity.getGuid(), guids.size(), context.guidsToProcess.size()); + LOG.debug("<== getConnectedEntityGuids({}): found {} guids; guidsToProcess {}", entity.getGuid(), guids.size(), context.guidsToProcessSize()); } } } @@ -323,7 +324,7 @@ public class ExportService { private void getEntityGuidsForFullFetch(AtlasEntity entity, ExportContext context) { if (LOG.isDebugEnabled()) { - LOG.debug("==> getEntityGuidsForFullFetch({}): guidsToProcess {}", AtlasTypeUtil.getAtlasObjectId(entity), context.guidsToProcess.size()); + LOG.debug("==> getEntityGuidsForFullFetch({}): guidsToProcess {}", AtlasTypeUtil.getAtlasObjectId(entity), context.guidsToProcessSize()); } String query = this.gremlinQueryProvider.getQuery(AtlasGremlinQuery.EXPORT_BY_GUID_FULL); @@ -339,8 +340,8 @@ public class ExportService { for (String guid : result) { if (!context.guidsProcessed.contains(guid)) { - if (!context.guidsToProcess.contains(guid)) { - context.guidsToProcess.add(guid); + if (!context.guidsToProcessContains(guid)) { + context.guidsToProcessAdd(guid); } context.guidDirection.put(guid, TraversalDirection.BOTH); @@ -348,7 +349,7 @@ public class ExportService { } if (LOG.isDebugEnabled()) { - LOG.debug("<== getEntityGuidsForFullFetch({}): found {} guids; guidsToProcess {}", entity.getGuid(), result.size(), context.guidsToProcess.size()); + LOG.debug("<== getEntityGuidsForFullFetch({}): found {} guids; guidsToProcess {}", entity.getGuid(), result.size(), context.guidsToProcessSize()); } } @@ -434,7 +435,8 @@ public class ExportService { private class ExportContext { final Set<String> guidsProcessed = new HashSet<>(); - final List<String> guidsToProcess = new ArrayList<>(); + private final List<String> guidsToProcessList = new ArrayList<>(); + private final Set<String> guidsToProcessSet = new HashSet<>(); final Map<String, TraversalDirection> guidDirection = new HashMap<>(); final AtlasExportResult result; final ZipSink sink; @@ -477,5 +479,35 @@ public class ExportService { return matchType; } + + public void clear() { + guidsToProcessList.clear(); + guidsToProcessSet.clear(); + guidsProcessed.clear(); + guidDirection.clear(); + } + + public boolean guidsToProcessIsEmpty() { + return this.guidsToProcessList.isEmpty(); + } + + public String guidsToProcessRemove(int i) { + String s = this.guidsToProcessList.remove(i); + guidsToProcessSet.remove(s); + return s; + } + + public int guidsToProcessSize() { + return this.guidsToProcessList.size(); + } + + public boolean guidsToProcessContains(String guid) { + return guidsToProcessSet.contains(guid); + } + + public void guidsToProcessAdd(String guid) { + this.guidsToProcessList.add(guid); + guidsToProcessSet.add(guid); + } } } http://git-wip-us.apache.org/repos/asf/incubator-atlas/blob/5b59acd7/webapp/src/main/java/org/apache/atlas/web/resources/ZipSink.java ---------------------------------------------------------------------- diff --git a/webapp/src/main/java/org/apache/atlas/web/resources/ZipSink.java b/webapp/src/main/java/org/apache/atlas/web/resources/ZipSink.java index 2e4cb01..37d9eb5 100644 --- a/webapp/src/main/java/org/apache/atlas/web/resources/ZipSink.java +++ b/webapp/src/main/java/org/apache/atlas/web/resources/ZipSink.java @@ -18,18 +18,16 @@ package org.apache.atlas.web.resources; import org.apache.atlas.exception.AtlasBaseException; -import org.apache.atlas.model.instance.AtlasEntity; import org.apache.atlas.model.impexp.AtlasExportResult; +import org.apache.atlas.model.instance.AtlasEntity; import org.apache.atlas.model.typedef.AtlasTypesDef; import org.apache.atlas.type.AtlasType; import org.slf4j.Logger; import org.slf4j.LoggerFactory; -import java.io.ByteArrayOutputStream; import java.io.IOException; import java.io.OutputStream; import java.util.List; -import java.util.Map; import java.util.zip.ZipEntry; import java.util.zip.ZipOutputStream; @@ -37,15 +35,9 @@ public class ZipSink { private static final Logger LOG = LoggerFactory.getLogger(ZipSink.class); private ZipOutputStream zipOutputStream; - private ByteArrayOutputStream byteArrayOutputStream; - - public ZipSink() { - init(); - } - private void init() { - byteArrayOutputStream = new ByteArrayOutputStream(); - zipOutputStream = new ZipOutputStream(byteArrayOutputStream); + public ZipSink(OutputStream outputStream) { + zipOutputStream = new ZipOutputStream(outputStream); } public void add(AtlasEntity entity) throws AtlasBaseException { @@ -68,10 +60,6 @@ public class ZipSink { saveToZip(ZipExportFileNames.ATLAS_EXPORT_ORDER_NAME, jsonData); } - public void writeTo(OutputStream stream) throws IOException { - byteArrayOutputStream.writeTo(stream); - } - public void close() { try { if(zipOutputStream != null) {
