Batch write to the export stream and use a cleaner filename.
Project: http://git-wip-us.apache.org/repos/asf/usergrid/repo Commit: http://git-wip-us.apache.org/repos/asf/usergrid/commit/2d56d813 Tree: http://git-wip-us.apache.org/repos/asf/usergrid/tree/2d56d813 Diff: http://git-wip-us.apache.org/repos/asf/usergrid/diff/2d56d813 Branch: refs/heads/master Commit: 2d56d813a9d99295ef72ab055e8600f62df364e9 Parents: 82e7ec5 Author: Michael Russo <russomich...@google.com> Authored: Mon Mar 27 18:48:49 2017 -0700 Committer: Michael Russo <russomich...@google.com> Committed: Mon Mar 27 18:48:49 2017 -0700 ---------------------------------------------------------------------- .../export/ExportServiceImpl.java | 153 ++++++++++++------- .../rest/applications/ApplicationResource.java | 4 +- 2 files changed, 104 insertions(+), 53 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/usergrid/blob/2d56d813/stack/core/src/main/java/org/apache/usergrid/corepersistence/export/ExportServiceImpl.java ---------------------------------------------------------------------- diff --git a/stack/core/src/main/java/org/apache/usergrid/corepersistence/export/ExportServiceImpl.java b/stack/core/src/main/java/org/apache/usergrid/corepersistence/export/ExportServiceImpl.java index ebbcc58..47d6982 100644 --- a/stack/core/src/main/java/org/apache/usergrid/corepersistence/export/ExportServiceImpl.java +++ b/stack/core/src/main/java/org/apache/usergrid/corepersistence/export/ExportServiceImpl.java @@ -48,7 +48,9 @@ import rx.schedulers.Schedulers; import java.io.*; +import java.util.HashMap; import java.util.Map; +import java.util.concurrent.atomic.AtomicInteger; import java.util.zip.ZipEntry; import java.util.zip.ZipOutputStream; @@ -98,91 +100,138 @@ public class ExportServiceImpl implements ExportService { final ZipOutputStream zipOutputStream = new ZipOutputStream(stream); - //final AtomicInteger count = new AtomicInteger(); final ApplicationScope appScope = exportRequestBuilder.getApplicationScope().get(); final Observable<ApplicationScope> applicationScopes = Observable.just(appScope); - - //final String jobId = StringUtils.sanitizeUUID( UUIDGenerator.newTimeUUID() ); final EntityCollectionManager ecm = entityCollectionManagerFactory.createCollectionManager(appScope); - final String rootPath = appScope.getApplication().getUuid().toString(); - GraphManager gm = managerCache.getGraphManager( appScope ); + final AtomicInteger entityFileCount = new AtomicInteger(); + final AtomicInteger connectionFileCount = new AtomicInteger(); + allEntityIdsObservable.getEdgesToEntities( applicationScopes, Optional.absent(), Optional.absent() ) - .doOnNext( edgeScope -> { + .buffer( 1000 ) + .doOnNext( edgeScopes -> { + try { - // load the entity and convert to a normal map - Entity entity = ecm.load(edgeScope.getEdge().getTargetNode()).toBlocking().lastOrDefault(null); - Map entityMap = CpEntityMapUtils.toMap(entity); + final String filenameWithPath = "entities/" + + "entities."+ entityFileCount.get() + ".json"; + + logger.debug("adding zip entry: {}", filenameWithPath); + zipOutputStream.putNextEntry(new ZipEntry(filenameWithPath)); + + edgeScopes.forEach( edgeScope -> { + + + try { + // load the entity and convert to a normal map + Entity entity = ecm.load(edgeScope.getEdge().getTargetNode()).toBlocking().lastOrDefault(null); + Map entityMap = CpEntityMapUtils.toMap(entity); + + if (entity != null) { + - if (entity != null) { - final String filenameWithPath = rootPath + "/" + - edgeScope.getEdge().getSourceNode().getUuid().toString() + "_" + - edgeScope.getEdge().getType() + "_" + - edgeScope.getEdge().getTargetNode().getUuid().toString() + ".json"; - logger.debug("adding zip entry: {}", filenameWithPath); - zipOutputStream.putNextEntry(new ZipEntry(filenameWithPath)); + logger.debug("writing and flushing entity to zip stream: {}", jsonSerializer.toString(entityMap)); + zipOutputStream.write(jsonSerializer.toString(entityMap).getBytes()); + zipOutputStream.write("\n".getBytes()); + + } else { + logger.warn("{} did not have corresponding entity, not writing", edgeScope.toString()); + } + + } catch (IOException e) { + logger.warn("Unable to create entry in zip export for edge {}", edgeScope); + } - logger.debug("writing and flushing entity to zip stream: {}", jsonSerializer.toString(entityMap)); - zipOutputStream.write(jsonSerializer.toString(entityMap).getBytes()); - zipOutputStream.closeEntry(); - zipOutputStream.flush(); - } else { - logger.warn("{} did not have corresponding entity, not writing", edgeScope.toString()); - } + entityFileCount.addAndGet(1); + + }); + + zipOutputStream.closeEntry(); + zipOutputStream.flush(); } catch (IOException e) { - logger.warn("Unable to create entry in zip export for edge {}", edgeScope); + logger.warn("Unable to create entry in zip export for batch entities"); } //writeStateMeta( jobId, Status.INPROGRESS, count.addAndGet(1), System.currentTimeMillis() ); }) - .flatMap( edgeScope -> { + .doOnNext( edgeScopes -> { - // find all connection types for the each entity emitted from the app - return gm.getEdgeTypesFromSource(CpNamingUtils.createConnectionTypeSearch(edgeScope.getEdge().getTargetNode())) - .flatMap(emittedEdgeType -> { + try{ - logger.debug("loading edges of type {} from node {}", emittedEdgeType, edgeScope.getEdge().getTargetNode()); - return gm.loadEdgesFromSource(new SimpleSearchByEdgeType( edgeScope.getEdge().getTargetNode(), - emittedEdgeType, Long.MAX_VALUE, SearchByEdgeType.Order.DESCENDING, Optional.absent() )); + final String filenameWithPath = "connections/" + + "connections." + connectionFileCount.get() + ".json"; - }).doOnNext( markedEdge -> { + logger.debug("adding zip entry: {}", filenameWithPath); + zipOutputStream.putNextEntry(new ZipEntry(filenameWithPath)); - if (!markedEdge.isDeleted()){ + edgeScopes.forEach(edgeScope -> gm.getEdgeTypesFromSource(CpNamingUtils.createConnectionTypeSearch(edgeScope.getEdge().getTargetNode())) + .flatMap(emittedEdgeType -> { - // todo, probably don't need to load the target node itself since it would be loaded in normal collection walking - Entity entity = ecm.load(markedEdge.getTargetNode()).toBlocking().lastOrDefault(null); + logger.debug("loading edges of type {} from node {}", emittedEdgeType, edgeScope.getEdge().getTargetNode()); + return gm.loadEdgesFromSource(new SimpleSearchByEdgeType(edgeScope.getEdge().getTargetNode(), + emittedEdgeType, Long.MAX_VALUE, SearchByEdgeType.Order.DESCENDING, Optional.absent())); - Map entityMap = CpEntityMapUtils.toMap(entity); + }) - try { - final String filenameWithPath = rootPath + "/" + - markedEdge.getSourceNode().getUuid().toString() + "_" + - markedEdge.getType() + "_" + - markedEdge.getTargetNode().getUuid().toString() + ".json"; + .buffer( 1000 ) + .doOnNext(markedEdges -> { - logger.debug("adding zip entry: {}", filenameWithPath); - zipOutputStream.putNextEntry(new ZipEntry(filenameWithPath)); - logger.debug("writing and flushing entity to zip stream: {}", jsonSerializer.toString(entityMap)); - zipOutputStream.write(jsonSerializer.toString(entityMap).getBytes()); - zipOutputStream.closeEntry(); - zipOutputStream.flush(); - } catch (IOException e) { - logger.warn("Unable to create entry in zip export for edge {}", markedEdge.toString()); - } - } + markedEdges.forEach( markedEdge -> { - }); + if (!markedEdge.isDeleted()) { + + // doing the load to just again make sure bad connections are not exported + Entity entity = ecm.load(markedEdge.getTargetNode()).toBlocking().lastOrDefault(null); + + if (entity != null) { + + try { + + final Map<String,String> connectionMap = new HashMap<String,String>(1){{ + put("sourceNodeUUID", markedEdge.getSourceNode().getUuid().toString() ); + put("relationship", CpNamingUtils.getConnectionNameFromEdgeName(markedEdge.getType()) ); + put("targetNodeUUID", markedEdge.getTargetNode().getUuid().toString()); + }}; + + logger.debug("writing and flushing connection to zip stream: {}", jsonSerializer.toString(connectionMap).getBytes()); + zipOutputStream.write(jsonSerializer.toString(connectionMap).getBytes()); + zipOutputStream.write("\n".getBytes()); + + + } catch (IOException e) { + logger.warn("Unable to create entry in zip export for edge {}", markedEdge.toString()); + } + } else { + logger.warn("Exported connection has a missing target node, not creating connection in export. Edge: {}", markedEdge); + } + } + + }); + + + + + }).toBlocking().lastOrDefault(null)); + + connectionFileCount.addAndGet(1); + + zipOutputStream.closeEntry(); + zipOutputStream.flush(); + + + } catch (IOException e) { + logger.warn("Unable to create entry in zip export for batch connections"); + } }) .doOnCompleted(() -> { http://git-wip-us.apache.org/repos/asf/usergrid/blob/2d56d813/stack/rest/src/main/java/org/apache/usergrid/rest/applications/ApplicationResource.java ---------------------------------------------------------------------- diff --git a/stack/rest/src/main/java/org/apache/usergrid/rest/applications/ApplicationResource.java b/stack/rest/src/main/java/org/apache/usergrid/rest/applications/ApplicationResource.java index 920287b..7479a90 100644 --- a/stack/rest/src/main/java/org/apache/usergrid/rest/applications/ApplicationResource.java +++ b/stack/rest/src/main/java/org/apache/usergrid/rest/applications/ApplicationResource.java @@ -31,6 +31,7 @@ import org.apache.usergrid.corepersistence.export.ExportService; import org.apache.usergrid.corepersistence.index.ReIndexRequestBuilder; import org.apache.usergrid.corepersistence.index.ReIndexService; import org.apache.usergrid.management.ApplicationInfo; +import org.apache.usergrid.management.OrganizationInfo; import org.apache.usergrid.management.exceptions.DisabledAdminUserException; import org.apache.usergrid.management.exceptions.DisabledAppUserException; import org.apache.usergrid.management.exceptions.UnactivatedAdminUserException; @@ -705,6 +706,7 @@ public class ApplicationResource extends CollectionResource { throw new UnauthorizedException(); } + ApplicationInfo appInfo = management.getApplicationInfo(applicationId); final ExportRequestBuilder request = new ExportRequestBuilderImpl().withApplicationId( applicationId ); StreamingOutput stream = new StreamingOutput() { @@ -715,7 +717,7 @@ public class ApplicationResource extends CollectionResource { }; return Response .ok(stream) - .header("Content-Disposition", "attachment; filename=\"usergrid_export-"+System.currentTimeMillis()+".zip\"") + .header("Content-Disposition", "attachment; filename=\""+appInfo.getName().replace("/","_")+"_"+System.currentTimeMillis()+".zip\"") .build(); }