Clean up unique value scanner and let it get consistently level from the typically read CL property.
Project: http://git-wip-us.apache.org/repos/asf/usergrid/repo Commit: http://git-wip-us.apache.org/repos/asf/usergrid/commit/ada914ac Tree: http://git-wip-us.apache.org/repos/asf/usergrid/tree/ada914ac Diff: http://git-wip-us.apache.org/repos/asf/usergrid/diff/ada914ac Branch: refs/heads/datastax-cass-driver Commit: ada914ac32ade7396f1b4e0f33d147ccbf328137 Parents: 3bba425 Author: Michael Russo <mru...@apigee.com> Authored: Fri Jul 1 14:55:08 2016 -0700 Committer: Michael Russo <mru...@apigee.com> Committed: Fri Jul 1 14:55:08 2016 -0700 ---------------------------------------------------------------------- .../usergrid/tools/UniqueValueScanner.java | 105 +++++++++++-------- 1 file changed, 60 insertions(+), 45 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/usergrid/blob/ada914ac/stack/tools/src/main/java/org/apache/usergrid/tools/UniqueValueScanner.java ---------------------------------------------------------------------- diff --git a/stack/tools/src/main/java/org/apache/usergrid/tools/UniqueValueScanner.java b/stack/tools/src/main/java/org/apache/usergrid/tools/UniqueValueScanner.java index 182e692..68f366b 100644 --- a/stack/tools/src/main/java/org/apache/usergrid/tools/UniqueValueScanner.java +++ b/stack/tools/src/main/java/org/apache/usergrid/tools/UniqueValueScanner.java @@ -18,9 +18,11 @@ package org.apache.usergrid.tools; import java.util.*; +import java.util.concurrent.atomic.AtomicInteger; import com.netflix.astyanax.connectionpool.exceptions.ConnectionException; import com.netflix.astyanax.model.Column; +import com.netflix.astyanax.model.ConsistencyLevel; import com.netflix.astyanax.util.RangeBuilder; import org.apache.usergrid.persistence.Entity; import org.apache.usergrid.persistence.EntityManager; @@ -131,6 +133,7 @@ public class UniqueValueScanner extends ToolBase { } logger.info("Staring Tool: UniqueValueScanner"); + logger.info("Using Cassandra consistency level: {}", System.getProperty("usergrid.read.cl", "CL_LOCAL_QUORUM")); keyspace = injector.getInstance(com.netflix.astyanax.Keyspace.class); @@ -142,6 +145,8 @@ public class UniqueValueScanner extends ToolBase { String entityType = line.getOptionValue(ENTITY_TYPE_ARG); String entityName = line.getOptionValue(ENTITY_NAME_ARG); + AtomicInteger count = new AtomicInteger(0); + if (entityName != null && !entityName.isEmpty()) { if(appToFilter == null){ @@ -157,11 +162,11 @@ public class UniqueValueScanner extends ToolBase { logger.info("Running entity unique load only"); - //do stuff + //do stuff w/o read repair UniqueValueSet uniqueValueSet = uniqueValueSerializationStrategy.load( new ApplicationScopeImpl( new SimpleId(appToFilter, "application" ) ), - entityType, - Collections.singletonList(new StringField( fieldType, entityName) )); + ConsistencyLevel.valueOf(System.getProperty("usergrid.read.cl", "CL_LOCAL_QUORUM")), entityType, + Collections.singletonList(new StringField( fieldType, entityName) ), false); StringBuilder stringBuilder = new StringBuilder(); @@ -194,89 +199,99 @@ public class UniqueValueScanner extends ToolBase { try { rows = keyspace.prepareQuery(CF_UNIQUE_VALUES) + .setConsistencyLevel(ConsistencyLevel.valueOf(System.getProperty("usergrid.read.cl", "CL_LOCAL_QUORUM"))) .getAllRows() - .withColumnRange(new RangeBuilder().build()) + .withColumnRange(new RangeBuilder().setLimit(1000).build()) .execute().getResult().iterator(); } catch (ConnectionException e) { + logger.error("Error connecting to cassandra", e); } UUID finalAppToFilter = appToFilter; - rows.forEachRemaining(row -> { - final String fieldName = row.getKey().getKey().getField().getName(); - final String fieldValue = row.getKey().getKey().getField().getValue().toString(); - final String scopeType = row.getKey().getScope().getType(); - final UUID scopeUUID = row.getKey().getScope().getUuid(); + if( rows != null) { + rows.forEachRemaining(row -> { + count.incrementAndGet(); - if (!fieldName.equalsIgnoreCase(fieldType) || - (finalAppToFilter != null && !finalAppToFilter.equals(scopeUUID)) - ) { - // do nothing + if(count.get() % 1000 == 0 ){ + logger.info("Scanned {} rows in {}", count.get(), CF_UNIQUE_VALUES.getName()); + } - } else { + final String fieldName = row.getKey().getKey().getField().getName(); + final String fieldValue = row.getKey().getKey().getField().getValue().toString(); + final String scopeType = row.getKey().getScope().getType(); + final UUID scopeUUID = row.getKey().getScope().getUuid(); - if (em == null && finalAppToFilter.equals(scopeUUID)) { - em = emf.getEntityManager(scopeUUID); - } - // if we have more than 1 column, let's check for a duplicate - if(row.getColumns().size() > 1) { + if (!fieldName.equalsIgnoreCase(fieldType) || + (finalAppToFilter != null && !finalAppToFilter.equals(scopeUUID)) + ) { + // do nothing - final List<EntityVersion> values = new ArrayList<>(row.getColumns().size()); + } else { - Iterator<Column<EntityVersion>> columns = row.getColumns().iterator(); - columns.forEachRemaining(column -> { + // if we have more than 1 column, let's check for a duplicate + if (row.getColumns() != null && row.getColumns().size() > 1) { + final List<EntityVersion> values = new ArrayList<>(row.getColumns().size()); - final EntityVersion entityVersion = column.getName(); + Iterator<Column<EntityVersion>> columns = row.getColumns().iterator(); + columns.forEachRemaining(column -> { + final EntityVersion entityVersion = column.getName(); - logger.trace( - scopeType + ": " + scopeUUID + ", " + - fieldName + ": " + fieldValue + ", " + - "entity type: " + entityVersion.getEntityId().getType() + ", " + - "entity uuid: " + entityVersion.getEntityId().getUuid() - ); + logger.trace( + scopeType + ": " + scopeUUID + ", " + + fieldName + ": " + fieldValue + ", " + + "entity type: " + entityVersion.getEntityId().getType() + ", " + + "entity uuid: " + entityVersion.getEntityId().getUuid() + ); - if (entityType != null && - entityVersion.getEntityId().getType().equalsIgnoreCase(entityType) - ) { - // add the first value into the list - if(values.size() == 0 ) { + if (entityType != null && + entityVersion.getEntityId().getType().equalsIgnoreCase(entityType) + ) { - values.add(entityVersion); + // add the first value into the list + if (values.size() == 0) { + values.add(entityVersion); - }else{ - if( !values.get(0).getEntityId().getUuid().equals(entityVersion.getEntityId().getUuid())){ + } else { - values.add(entityVersion); + if (!values.get(0).getEntityId().getUuid().equals(entityVersion.getEntityId().getUuid())) { + + values.add(entityVersion); - logger.error("Duplicate found for field [{}={}]. Entry 1: [{}], Entry 2: [{}]", - fieldName, fieldValue, values.get(0).getEntityId(), entityVersion.getEntityId()); + logger.error("Duplicate found for field [{}={}]. Entry 1: [{}], Entry 2: [{}]", + fieldName, fieldValue, values.get(0).getEntityId(), entityVersion.getEntityId()); + + } } + } + }); + } + } - } - }); - } - } + }); + }else{ + logger.warn("No rows returned from table: {}", CF_UNIQUE_VALUES.getName()); - }); + } } }