Github user snoopdave commented on a diff in the pull request:

    https://github.com/apache/incubator-usergrid/pull/277#discussion_r32363214
  
    --- Diff: 
stack/tools/src/main/java/org/apache/usergrid/tools/ExportAdmins.java ---
    @@ -60,213 +82,393 @@ public void runTool( CommandLine line ) throws 
Exception {
             outputDir = createOutputParentDir();
             logger.info( "Export directory: " + outputDir.getAbsolutePath() );
     
    -        exportAdminUsers();
    -    }
    +        if (StringUtils.isNotEmpty( line.getOptionValue( READ_THREAD_COUNT 
) )) {
    +            try {
    +                readThreadCount = Integer.parseInt( line.getOptionValue( 
READ_THREAD_COUNT ) );
    +            } catch (NumberFormatException nfe) {
    +                logger.error( "-" + READ_THREAD_COUNT + " must be 
specified as an integer. Aborting..." );
    +                return;
    +            }
    +        } else {
    +            readThreadCount = 20;
    +        }
     
    +        // start write queue worker
     
    -    private void exportAdminUsers() throws Exception {
    +        BlockingQueue<AdminUserWriteTask> writeQueue = new 
LinkedBlockingQueue<AdminUserWriteTask>();
    +        AdminUserWriter adminUserWriter = new AdminUserWriter( writeQueue 
);
    +        Thread writeThread = new Thread( adminUserWriter );
    +        writeThread.start();
    +        logger.debug( "Write thread started" );
     
    -        int count = 0;
    +        // start read queue workers
     
    +        BlockingQueue<UUID> readQueue = new LinkedBlockingQueue<UUID>();
    +        List<AdminUserReader> readers = new ArrayList<AdminUserReader>();
    +        for (int i = 0; i < readThreadCount; i++) {
    +            AdminUserReader worker = new AdminUserReader( readQueue, 
writeQueue );
    +            Thread readerThread = new Thread( worker, "AdminUserReader-" + 
i );
    +            readerThread.start();
    +            readers.add( worker );
    +        }
    +        logger.debug( readThreadCount + " read worker threads started" );
    +
    +        // query for IDs, add each to read queue
    +
    +        Query query = new Query();
    +        query.setLimit( MAX_ENTITY_FETCH );
    +        query.setResultsLevel( Level.IDS );
             EntityManager em = emf.getEntityManager( 
CassandraService.MANAGEMENT_APPLICATION_ID );
    +        Results ids = em.searchCollection( em.getApplicationRef(), 
"users", query );
     
    -        // write one JSON file for management application users
    +        while (ids.size() > 0) {
    +            for (UUID uuid : ids.getIds()) {
    +                readQueue.add( uuid );
    +                logger.debug( "Added uuid to readQueue: " + uuid );
    +            }
    +            if (ids.getCursor() == null) {
    +                break;
    +            }
    +            query.setCursor( ids.getCursor() );
    +            ids = em.searchCollection( em.getApplicationRef(), "users", 
query );
    +        }
     
    -        JsonGenerator usersFile =
    -                getJsonGenerator( createOutputFile( ADMIN_USERS_PREFIX, 
em.getApplication().getName() ) );
    -        usersFile.writeStartArray();
    +        adminUserWriter.setDone( true );
    +        for (AdminUserReader aur : readers) {
    +            aur.setDone( true );
    +        }
     
    -        // write one JSON file for metadata: collections, connections and 
dictionaries of those users
    +        logger.debug( "Waiting for write thread to complete" );
    +        writeThread.join();
    +    }
     
    -        JsonGenerator metadataFile =
    -                getJsonGenerator( createOutputFile( 
ADMIN_USER_METADATA_PREFIX, em.getApplication().getName() ) );
    -        metadataFile.writeStartObject();
     
    -        // query for and loop through all users in management application
    +    @Override
    +    @SuppressWarnings("static-access")
    +    public Options createOptions() {
     
    -        Query query = new Query();
    -        query.setLimit( MAX_ENTITY_FETCH );
    -        query.setResultsLevel( Results.Level.ALL_PROPERTIES );
    +        Options options = super.createOptions();
     
    -        Results entities = em.searchCollection( em.getApplicationRef(), 
"users", query );
    +        Option readThreads = OptionBuilder
    +                .hasArg().withType(0).withDescription("Read Threads -" + 
READ_THREAD_COUNT).create(READ_THREAD_COUNT);
     
    -        while ( entities.size() > 0 ) {
    +        options.addOption( readThreads );
    +        return options;
    +    }
     
    -            for ( Entity entity : entities ) {
     
    -                // write user to application file
    -                usersFile.writeObject( entity );
    -                echo( entity );
    +    public class AdminUserReader implements Runnable {
     
    -                // write user's collections, connections, etc. to 
collections file
    -                saveEntityMetadata( metadataFile, em, null, entity );
    +        private boolean done = true;
     
    -                logger.debug("Exported user {}", entity.getProperty( 
"email" ));
    +        private final BlockingQueue<UUID> readQueue;
    +        private final BlockingQueue<AdminUserWriteTask> writeQueue;
    +
    +        public AdminUserReader( BlockingQueue<UUID> readQueue, 
BlockingQueue<AdminUserWriteTask> writeQueue ) {
    +            this.readQueue = readQueue;
    +            this.writeQueue = writeQueue;
    +        }
     
    -                count++;
    -                if ( count % 1000 == 0 ) {
    -                    logger.info("Exported {} admin users", count);
    -                }
     
    +        @Override
    +        public void run() {
    +            try {
    +                readAndQueueAdminUsers();
    +            } catch (Exception e) {
    +                logger.error("Error reading data for export", e);
                 }
    +        }
     
    -            if ( entities.getCursor() == null ) {
    -                break;
    +
    +        private void readAndQueueAdminUsers() throws Exception {
    +
    +            EntityManager em = emf.getEntityManager( 
CassandraService.MANAGEMENT_APPLICATION_ID );
    +
    +            while ( true ) {
    +
    +                UUID uuid = null;
    +                try {
    +                    uuid = readQueue.poll( 30, TimeUnit.SECONDS );
    +                    logger.debug("Got item from entityId queue: " + uuid );
    +
    +                    if ( uuid == null && done ) {
    +                        break;
    +                    }
    +
    +                    Entity entity = em.get( uuid );
    +
    +                    AdminUserWriteTask task = new AdminUserWriteTask();
    +                    task.adminUser = entity;
    +
    +                    addCollectionsToTask(   task, entity );
    +                    addDictionariesToTask(  task, entity );
    +                    addConnectionsToTask(   task, entity );
    +                    addOrganizationsToTask( task, entity );
    +
    +                    writeQueue.add( task );
    +
    +                } catch ( Exception e ) {
    +                    logger.error("Error reading data for user " + uuid, e 
);
    +                }
                 }
    -            query.setCursor( entities.getCursor() );
    -            entities = em.searchCollection( em.getApplicationRef(), 
"users", query );
             }
     
    -        metadataFile.writeEndObject();
    -        metadataFile.close();
     
    -        usersFile.writeEndArray();
    -        usersFile.close();
    +        private void addCollectionsToTask(AdminUserWriteTask task, Entity 
entity) throws Exception {
     
    -        logger.info("Exported total of {} admin users", count);
    -    }
    +            EntityManager em = emf.getEntityManager( 
CassandraService.MANAGEMENT_APPLICATION_ID );
    +            Set<String> collections = em.getCollections( entity );
    +            if ((collections == null) || collections.isEmpty()) {
    +                return;
    +            }
     
    +            task.collectionsByName = new HashMap<String, List<UUID>>();
     
    -    /**
    -     * Serialize and save the collection members of this 
<code>entity</code>
    -     *
    -     * @param em Entity Manager
    -     * @param application Application name
    -     * @param entity entity
    -     */
    -    private void saveEntityMetadata(
    -            JsonGenerator jg, EntityManager em, String application, Entity 
entity) throws Exception {
    +            for (String collectionName : collections) {
    +
    +                List<UUID> uuids = task.collectionsByName.get( 
collectionName );
    +                if ( uuids == null ) {
    +                    uuids = new ArrayList<UUID>();
    +                    task.collectionsByName.put( collectionName, uuids );
    +                }
     
    -        saveCollections( jg, em, entity );
    -        saveConnections( entity, em, jg );
    -        saveOrganizations( entity, em, jg );
    -        saveDictionaries( entity, em, jg );
    +                Results collectionMembers = em.getCollection( entity, 
collectionName, null, 100000, Level.IDS, false );
     
    -        // End the object if it was Started
    -        jg.writeEndObject();
    -    }
    +                List<UUID> entityIds = collectionMembers.getIds();
    +
    +                if ((entityIds != null) && !entityIds.isEmpty()) {
    +                    for (UUID childEntityUUID : entityIds) {
    +                        uuids.add( childEntityUUID );
    +                    }
    +                }
    +            }
    +        }
     
     
    -    private void saveCollections(JsonGenerator jg, EntityManager em, 
Entity entity) throws Exception {
    +        private void addDictionariesToTask(AdminUserWriteTask task, Entity 
entity) throws Exception {
    +            EntityManager em = emf.getEntityManager( 
CassandraService.MANAGEMENT_APPLICATION_ID );
     
    -        Set<String> collections = em.getCollections( entity );
    +            Set<String> dictionaries = em.getDictionaries( entity );
     
    -        // Only create entry for Entities that have collections
    -        if ( ( collections == null ) || collections.isEmpty() ) {
    -            return;
    +            task.dictionariesByName = new HashMap<String, Map<Object, 
Object>>();
    +
    +            for (String dictionary : dictionaries) {
    +                Map<Object, Object> dict = em.getDictionaryAsMap( entity, 
dictionary );
    +                task.dictionariesByName.put( dictionary, dict );
    +            }
             }
     
    -        jg.writeFieldName( entity.getUuid().toString() );
    -        jg.writeStartObject();
     
    -        for ( String collectionName : collections ) {
    +        private void addConnectionsToTask(AdminUserWriteTask task, Entity 
entity) throws Exception {
    +            EntityManager em = emf.getEntityManager( 
CassandraService.MANAGEMENT_APPLICATION_ID );
     
    -            jg.writeFieldName( collectionName );
    -            // Start collection array.
    -            jg.writeStartArray();
    +            task.connectionsByType = new HashMap<String, 
List<ConnectionRef>>();
    +
    +            Set<String> connectionTypes = em.getConnectionTypes( entity );
    +            for (String connectionType : connectionTypes) {
     
    -            Results collectionMembers = em.getCollection( entity, 
collectionName, null, 100000, Level.IDS, false );
    +                List<ConnectionRef> connRefs = task.connectionsByType.get( 
connectionType );
    +                if ( connRefs == null ) {
    +                    connRefs = new ArrayList<ConnectionRef>();
    +                }
     
    -            List<UUID> entityIds = collectionMembers.getIds();
    +                Results results = em.getConnectedEntities( 
entity.getUuid(), connectionType, null, Level.IDS );
    +                List<ConnectionRef> connections = results.getConnections();
     
    -            if ( ( entityIds != null ) && !entityIds.isEmpty() ) {
    -                for ( UUID childEntityUUID : entityIds ) {
    -                    jg.writeObject( childEntityUUID.toString() );
    +                for (ConnectionRef connectionRef : connections) {
    +                    connRefs.add( connectionRef );
                     }
                 }
    +        }
     
    -            // End collection array.
    -            jg.writeEndArray();
    +
    +        private void addOrganizationsToTask(AdminUserWriteTask task, 
Entity entity) throws Exception {
    +            task.orgNamesByUuid = 
managementService.getOrganizationsForAdminUser( entity.getUuid() );
    +        }
    +
    +        public void setDone(boolean done) {
    +            this.done = done;
             }
         }
     
    +    class AdminUserWriter implements Runnable {
     
    -    /**
    -     * Persists the connection for this entity.
    -     */
    -    private void saveDictionaries( Entity entity, EntityManager em, 
JsonGenerator jg ) throws Exception {
    +        private boolean done = false;
    +
    +        private final BlockingQueue<AdminUserWriteTask> taskQueue;
    +
    +        public AdminUserWriter( BlockingQueue<AdminUserWriteTask> 
taskQueue ) {
    +            this.taskQueue = taskQueue;
    +        }
    +
    +
    +        @Override
    +        public void run() {
    +            try {
    +                writeEntities();
    +            } catch (Exception e) {
    +                logger.error("Error writing export data", e);
    +            }
    +        }
    +
    +
    +        private void writeEntities() throws Exception {
    +            EntityManager em = emf.getEntityManager( 
CassandraService.MANAGEMENT_APPLICATION_ID );
     
    -        jg.writeFieldName( "dictionaries" );
    -        jg.writeStartObject();
    +            // write one JSON file for management application users
    +            JsonGenerator usersFile =
    +                    getJsonGenerator( createOutputFile( 
ADMIN_USERS_PREFIX, em.getApplication().getName() ) );
    +            usersFile.writeStartArray();
     
    -        Set<String> dictionaries = em.getDictionaries( entity );
    -        for ( String dictionary : dictionaries ) {
    +            // write one JSON file for metadata: collections, connections 
and dictionaries of those users
    +            JsonGenerator metadataFile =
    +                    getJsonGenerator( createOutputFile( 
ADMIN_USER_METADATA_PREFIX, em.getApplication().getName() ) );
    --- End diff --
    
    No, because all Admin Users belong to the one and only management app.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

Reply via email to