Clean up unique value scanner and let it get consistently level from the 
typically read CL property.


Project: http://git-wip-us.apache.org/repos/asf/usergrid/repo
Commit: http://git-wip-us.apache.org/repos/asf/usergrid/commit/ada914ac
Tree: http://git-wip-us.apache.org/repos/asf/usergrid/tree/ada914ac
Diff: http://git-wip-us.apache.org/repos/asf/usergrid/diff/ada914ac

Branch: refs/heads/datastax-cass-driver
Commit: ada914ac32ade7396f1b4e0f33d147ccbf328137
Parents: 3bba425
Author: Michael Russo <mru...@apigee.com>
Authored: Fri Jul 1 14:55:08 2016 -0700
Committer: Michael Russo <mru...@apigee.com>
Committed: Fri Jul 1 14:55:08 2016 -0700

----------------------------------------------------------------------
 .../usergrid/tools/UniqueValueScanner.java      | 105 +++++++++++--------
 1 file changed, 60 insertions(+), 45 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/usergrid/blob/ada914ac/stack/tools/src/main/java/org/apache/usergrid/tools/UniqueValueScanner.java
----------------------------------------------------------------------
diff --git 
a/stack/tools/src/main/java/org/apache/usergrid/tools/UniqueValueScanner.java 
b/stack/tools/src/main/java/org/apache/usergrid/tools/UniqueValueScanner.java
index 182e692..68f366b 100644
--- 
a/stack/tools/src/main/java/org/apache/usergrid/tools/UniqueValueScanner.java
+++ 
b/stack/tools/src/main/java/org/apache/usergrid/tools/UniqueValueScanner.java
@@ -18,9 +18,11 @@ package org.apache.usergrid.tools;
 
 
 import java.util.*;
+import java.util.concurrent.atomic.AtomicInteger;
 
 import com.netflix.astyanax.connectionpool.exceptions.ConnectionException;
 import com.netflix.astyanax.model.Column;
+import com.netflix.astyanax.model.ConsistencyLevel;
 import com.netflix.astyanax.util.RangeBuilder;
 import org.apache.usergrid.persistence.Entity;
 import org.apache.usergrid.persistence.EntityManager;
@@ -131,6 +133,7 @@ public class UniqueValueScanner extends ToolBase {
         }
 
         logger.info("Staring Tool: UniqueValueScanner");
+        logger.info("Using Cassandra consistency level: {}", 
System.getProperty("usergrid.read.cl", "CL_LOCAL_QUORUM"));
 
 
         keyspace = injector.getInstance(com.netflix.astyanax.Keyspace.class);
@@ -142,6 +145,8 @@ public class UniqueValueScanner extends ToolBase {
         String entityType = line.getOptionValue(ENTITY_TYPE_ARG);
         String entityName = line.getOptionValue(ENTITY_NAME_ARG);
 
+        AtomicInteger count = new AtomicInteger(0);
+
         if (entityName != null && !entityName.isEmpty()) {
 
             if(appToFilter == null){
@@ -157,11 +162,11 @@ public class UniqueValueScanner extends ToolBase {
             logger.info("Running entity unique load only");
 
 
-            //do stuff
+            //do stuff w/o read repair
             UniqueValueSet uniqueValueSet = 
uniqueValueSerializationStrategy.load(
                 new ApplicationScopeImpl( new SimpleId(appToFilter, 
"application" ) ),
-                entityType,
-                Collections.singletonList(new StringField( fieldType, 
entityName) ));
+                
ConsistencyLevel.valueOf(System.getProperty("usergrid.read.cl", 
"CL_LOCAL_QUORUM")), entityType,
+                Collections.singletonList(new StringField( fieldType, 
entityName) ), false);
 
             StringBuilder stringBuilder = new StringBuilder();
 
@@ -194,89 +199,99 @@ public class UniqueValueScanner extends ToolBase {
             try {
 
                 rows = keyspace.prepareQuery(CF_UNIQUE_VALUES)
+                    
.setConsistencyLevel(ConsistencyLevel.valueOf(System.getProperty("usergrid.read.cl",
 "CL_LOCAL_QUORUM")))
                     .getAllRows()
-                    .withColumnRange(new RangeBuilder().build())
+                    .withColumnRange(new RangeBuilder().setLimit(1000).build())
                     .execute().getResult().iterator();
 
             } catch (ConnectionException e) {
 
+                logger.error("Error connecting to cassandra", e);
             }
 
 
             UUID finalAppToFilter = appToFilter;
-            rows.forEachRemaining(row -> {
 
-                final String fieldName = 
row.getKey().getKey().getField().getName();
-                final String fieldValue = 
row.getKey().getKey().getField().getValue().toString();
-                final String scopeType = row.getKey().getScope().getType();
-                final UUID scopeUUID = row.getKey().getScope().getUuid();
+            if( rows != null) {
+                rows.forEachRemaining(row -> {
 
+                    count.incrementAndGet();
 
-                if (!fieldName.equalsIgnoreCase(fieldType) ||
-                    (finalAppToFilter != null && 
!finalAppToFilter.equals(scopeUUID))
-                    ) {
-                    // do nothing
+                    if(count.get() % 1000 == 0 ){
+                        logger.info("Scanned {} rows in {}", count.get(), 
CF_UNIQUE_VALUES.getName());
+                    }
 
-                } else {
+                    final String fieldName = 
row.getKey().getKey().getField().getName();
+                    final String fieldValue = 
row.getKey().getKey().getField().getValue().toString();
+                    final String scopeType = row.getKey().getScope().getType();
+                    final UUID scopeUUID = row.getKey().getScope().getUuid();
 
-                    if (em == null && finalAppToFilter.equals(scopeUUID)) {
-                        em = emf.getEntityManager(scopeUUID);
-                    }
 
-                    // if we have more than 1 column, let's check for a 
duplicate
-                    if(row.getColumns().size() > 1) {
+                    if (!fieldName.equalsIgnoreCase(fieldType) ||
+                        (finalAppToFilter != null && 
!finalAppToFilter.equals(scopeUUID))
+                        ) {
+                        // do nothing
 
-                        final List<EntityVersion> values = new 
ArrayList<>(row.getColumns().size());
+                    } else {
 
-                        Iterator<Column<EntityVersion>> columns = 
row.getColumns().iterator();
-                        columns.forEachRemaining(column -> {
 
+                        // if we have more than 1 column, let's check for a 
duplicate
+                        if (row.getColumns() != null && 
row.getColumns().size() > 1) {
 
+                            final List<EntityVersion> values = new 
ArrayList<>(row.getColumns().size());
 
-                            final EntityVersion entityVersion = 
column.getName();
+                            Iterator<Column<EntityVersion>> columns = 
row.getColumns().iterator();
+                            columns.forEachRemaining(column -> {
 
 
+                                final EntityVersion entityVersion = 
column.getName();
 
-                            logger.trace(
-                                scopeType + ": " + scopeUUID + ", " +
-                                    fieldName + ": " + fieldValue + ", " +
-                                    "entity type: " + 
entityVersion.getEntityId().getType() + ", " +
-                                    "entity uuid: " + 
entityVersion.getEntityId().getUuid()
-                            );
 
+                                logger.trace(
+                                    scopeType + ": " + scopeUUID + ", " +
+                                        fieldName + ": " + fieldValue + ", " +
+                                        "entity type: " + 
entityVersion.getEntityId().getType() + ", " +
+                                        "entity uuid: " + 
entityVersion.getEntityId().getUuid()
+                                );
 
-                            if (entityType != null &&
-                                
entityVersion.getEntityId().getType().equalsIgnoreCase(entityType)
-                                ) {
 
-                                // add the first value into the list
-                                if(values.size() == 0 ) {
+                                if (entityType != null &&
+                                    
entityVersion.getEntityId().getType().equalsIgnoreCase(entityType)
+                                    ) {
 
-                                    values.add(entityVersion);
+                                    // add the first value into the list
+                                    if (values.size() == 0) {
 
+                                        values.add(entityVersion);
 
-                                }else{
 
-                                    if( 
!values.get(0).getEntityId().getUuid().equals(entityVersion.getEntityId().getUuid())){
+                                    } else {
 
-                                        values.add(entityVersion);
+                                        if 
(!values.get(0).getEntityId().getUuid().equals(entityVersion.getEntityId().getUuid()))
 {
+
+                                            values.add(entityVersion);
 
-                                        logger.error("Duplicate found for 
field [{}={}].  Entry 1: [{}], Entry 2: [{}]",
-                                            fieldName, fieldValue, 
values.get(0).getEntityId(), entityVersion.getEntityId());
+                                            logger.error("Duplicate found for 
field [{}={}].  Entry 1: [{}], Entry 2: [{}]",
+                                                fieldName, fieldValue, 
values.get(0).getEntityId(), entityVersion.getEntityId());
+
+                                        }
 
                                     }
 
+
                                 }
 
+                            });
+                        }
+                    }
 
-                            }
 
-                        });
-                    }
-                }
+                });
+            }else{
 
+                logger.warn("No rows returned from table: {}", 
CF_UNIQUE_VALUES.getName());
 
-            });
+            }
 
         }
     }

Reply via email to