Github user snoopdave commented on a diff in the pull request:

    https://github.com/apache/incubator-usergrid/pull/275#discussion_r32321377
  
    --- Diff: 
stack/tools/src/main/java/org/apache/usergrid/tools/ImportAdmins.java ---
    @@ -133,84 +163,129 @@ private void importAdminUsers() throws Exception {
          *
          * @param fileName Name of admin user data file.
          */
    -    private void importAdminUsers( String fileName ) throws Exception {
    +    private void importAdminUsers(final String fileName,
    +                                  final int writeThreadCount,
    +                                  final int auditThreadCount) throws 
Exception {
     
             int count = 0;
     
    -        File adminUsersFile = new File( importDir, fileName );
    +        File adminUsersFile = new File(importDir, fileName);
    +
    +        logger.info("----- Loading file: " + 
adminUsersFile.getAbsolutePath());
    +        JsonParser jp = getJsonParserForFile(adminUsersFile);
    +
    +        int loopCounter = 0;
     
    -        logger.info( "----- Loading file: " + 
adminUsersFile.getAbsolutePath() );
    -        JsonParser jp = getJsonParserForFile( adminUsersFile );
    +        BlockingQueue<Map<String, Object>> workQueue = new 
LinkedBlockingQueue<Map<String, Object>>();
    +        BlockingQueue<Map<String, Object>> auditQueue = new 
LinkedBlockingQueue<Map<String, Object>>();
    +
    +        startAdminWorkers(workQueue, auditQueue, writeThreadCount);
    +        startAdminAuditors(auditQueue, auditThreadCount);
     
             JsonToken token = jp.nextToken();
    -        validateStartArray( token );
    +        validateStartArray(token);
     
    -        EntityManager em = emf.getEntityManager( MANAGEMENT_APPLICATION_ID 
);
    +        while (jp.nextValue() != JsonToken.END_ARRAY) {
    +            loopCounter += 1;
     
    -        while ( jp.nextValue() != JsonToken.END_ARRAY ) {
    +            @SuppressWarnings("unchecked")
    +            Map<String, Object> entityProps = 
jp.readValueAs(HashMap.class);
    +            if (loopCounter % 100 == 1)
    +                logger.info("Publishing to queue... counter=" + 
loopCounter);
     
    -            @SuppressWarnings( "unchecked" )
    -            Map<String, Object> entityProps = jp.readValueAs( 
HashMap.class );
    +            workQueue.add(entityProps);
    +        }
     
    -            // Import/create the entity
    -            UUID uuid = getId( entityProps );
    -            String type = getType( entityProps );
    +        waitForQueueAndMeasure(workQueue, adminWriteThreads, "Admin 
Write");
    +        waitForQueueAndMeasure(auditQueue, adminAuditThreads, "Admin 
Audit");
     
    +        logger.info("----- End: Imported {} admin users from file {}",
    +                count, adminUsersFile.getAbsolutePath());
     
    -            try {
    -                em.create( uuid, type, entityProps );
    +        jp.close();
    +    }
     
    -                logger.debug( "Imported admin user {} {}", uuid, 
entityProps.get( "username" ) );
    -                count++;
    -                if ( count % 1000 == 0 ) {
    -                    logger.info("Imported {} admin users", count);
    -                }
    -            }
    -            catch ( DuplicateUniquePropertyExistsException de ) {
    -                logger.warn( "Unable to create entity. It appears to be a 
duplicate: " +
    -                    "id={}, type={}, name={}, username={}",
    -                    new Object[] { uuid, type, entityProps.get("name"), 
entityProps.get("username")});
    -                if ( logger.isDebugEnabled() ) {
    -                    logger.debug( "Exception" , de );
    -                }
    -                continue;
    -            }
    +    private static void waitForQueueAndMeasure(final BlockingQueue 
workQueue,
    +                                               final Map<Stoppable, 
Thread> threadMap,
    +                                               final String identifier) 
throws InterruptedException {
    +        double rateAverageSum = 0;
    +        int iterationCounter = 0;
     
    -            if ( em.get( uuid ) == null ) {
    -                logger.error( "Holy hell, we wrote an entity and it's 
missing.  " +
    -                                "Entity Id was {} and type is {}", uuid, 
type );
    -                System.exit( 1 );
    -            }
    -            echo( entityProps );
    +        while (!workQueue.isEmpty()) {
    +            iterationCounter += 1;
    +
    +            int sizeLast = workQueue.size();
    +            long lastTime = System.currentTimeMillis();
    +            logger.info("Queue {} is not empty, remaining size={}, 
waiting...", identifier, sizeLast);
    +            Thread.sleep(5000);
    +
    +            long timeNow = System.currentTimeMillis();
    +            int sizeNow = workQueue.size();
    +
    +            int processed = sizeLast - sizeNow;
    +
    +            long timeDelta = timeNow - lastTime;
    +
    +            double rateLast = (double) processed / (timeDelta / 1000);
    +            rateAverageSum += rateLast;
    +
    +            long timeRemaining = sizeLast / (long) (rateAverageSum / 
iterationCounter);
    --- End diff --
    
    I had to change this to:
    
    long timeRemaining = (long) ( sizeLast / (rateAverageSum / 
iterationCounter) );
    
    Otherwise the unit tests throws a divide by zero error.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

Reply via email to