Repository: usergrid
Updated Branches:
  refs/heads/master 628ad0129 -> 6af6f17a3


Tool to perform read repair on cassandra and also to  repair entities with 
missing unique value entry.
Have options to dry run only to detect entities with missing unique value entry.


Project: http://git-wip-us.apache.org/repos/asf/usergrid/repo
Commit: http://git-wip-us.apache.org/repos/asf/usergrid/commit/d12d82c2
Tree: http://git-wip-us.apache.org/repos/asf/usergrid/tree/d12d82c2
Diff: http://git-wip-us.apache.org/repos/asf/usergrid/diff/d12d82c2

Branch: refs/heads/master
Commit: d12d82c2b3f31d9d9b4625422f016486a257e245
Parents: abec1d9
Author: Chetan Burse <cbu...@google.com>
Authored: Mon Nov 26 13:17:55 2018 -0800
Committer: Chetan Burse <cbu...@google.com>
Committed: Thu Dec 13 17:20:53 2018 -0800

----------------------------------------------------------------------
 .../usergrid/tools/UniqueValueRepairer.java     | 448 +++++++++++++++++++
 1 file changed, 448 insertions(+)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/usergrid/blob/d12d82c2/stack/tools/src/main/java/org/apache/usergrid/tools/UniqueValueRepairer.java
----------------------------------------------------------------------
diff --git 
a/stack/tools/src/main/java/org/apache/usergrid/tools/UniqueValueRepairer.java 
b/stack/tools/src/main/java/org/apache/usergrid/tools/UniqueValueRepairer.java
new file mode 100644
index 0000000..04915be
--- /dev/null
+++ 
b/stack/tools/src/main/java/org/apache/usergrid/tools/UniqueValueRepairer.java
@@ -0,0 +1,448 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.usergrid.tools;
+
+import java.io.UnsupportedEncodingException;
+import java.lang.reflect.Method;
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.Map.Entry;
+import java.util.Set;
+import java.util.UUID;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicInteger;
+
+import org.apache.commons.cli.CommandLine;
+import org.apache.commons.cli.Option;
+import org.apache.commons.cli.OptionBuilder;
+import org.apache.commons.cli.Options;
+import org.apache.usergrid.corepersistence.rx.impl.AllEntityIdsObservable;
+import org.apache.usergrid.corepersistence.rx.impl.EdgeScope;
+import org.apache.usergrid.corepersistence.util.CpNamingUtils;
+import org.apache.usergrid.management.OrganizationInfo;
+import org.apache.usergrid.persistence.Entity;
+import org.apache.usergrid.persistence.EntityManager;
+import org.apache.usergrid.persistence.Results;
+import org.apache.usergrid.persistence.collection.MvccEntity;
+import 
org.apache.usergrid.persistence.collection.serialization.MvccEntitySerializationStrategy;
+import org.apache.usergrid.persistence.collection.serialization.UniqueValue;
+import 
org.apache.usergrid.persistence.collection.serialization.UniqueValueSerializationStrategy;
+import org.apache.usergrid.persistence.collection.serialization.UniqueValueSet;
+import 
org.apache.usergrid.persistence.collection.serialization.impl.UniqueValueImpl;
+import org.apache.usergrid.persistence.core.scope.ApplicationScope;
+import org.apache.usergrid.persistence.core.scope.ApplicationScopeImpl;
+import org.apache.usergrid.persistence.graph.impl.SimpleEdge;
+import org.apache.usergrid.persistence.model.entity.Id;
+import org.apache.usergrid.persistence.model.entity.SimpleId;
+import org.apache.usergrid.persistence.model.field.StringField;
+import org.apache.usergrid.utils.ConversionUtils;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import com.datastax.driver.core.ConsistencyLevel;
+import com.datastax.driver.core.Session;
+import com.fasterxml.jackson.core.JsonFactory;
+import com.google.common.base.Optional;
+import com.google.common.collect.BiMap;
+
+import rx.Observable;
+import rx.observables.ConnectableObservable;
+import rx.schedulers.Schedulers;
+
+public class UniqueValueRepairer extends ExportingToolBase {
+
+       static final Logger logger = 
LoggerFactory.getLogger(UniqueValueRepairer.class);
+
+       JsonFactory jsonFactory = new JsonFactory();
+       public static final String LAST_ID = "lastId";
+
+       public static final String FIND_MISSING_UNIQUE_VALUES = 
"findMissingUniqueValues";
+       public static final String FIX_MISSING_VALUES = "fixUniqueValues";
+
+       private boolean findMissingUniqueValues = false;
+       private boolean fixMissingValue = false;
+
+       private AllEntityIdsObservable allEntityIdsObs;
+       private SimpleEdge lastEdge = null;
+
+       private ExecutorService entityFetcher = 
Executors.newFixedThreadPool(10);
+       private ExecutorService uniqueValueChecker = 
Executors.newFixedThreadPool(50);
+
+       private Session session;
+       private UniqueValueSerializationStrategy 
uniqueValueSerializationStrategy;
+       private MvccEntitySerializationStrategy mvccEntitySerializationStrategy;
+
+       @Override
+       @SuppressWarnings("static-access")
+       public Options createOptions() {
+
+               Options options = super.createOptions();
+
+               Option findMissingUniqueValues = OptionBuilder
+                               .withDescription("Find entities with missing 
unique value entry  -findMissingUniqueValues")
+                               .create(FIND_MISSING_UNIQUE_VALUES);
+               Option fixMissingUniqueValueEntries = OptionBuilder
+                               .withDescription("Fix entities with missing 
unique value entry  -fixUniqueValues")
+                               .create(FIX_MISSING_VALUES);
+
+               options.addOption(findMissingUniqueValues);
+               options.addOption(fixMissingUniqueValueEntries);
+
+               return options;
+       }
+
+       @Override
+       public void runTool(CommandLine line) throws Exception {
+
+               startSpring();
+               setVerbose(line);
+
+               this.allEntityIdsObs = 
injector.getInstance(AllEntityIdsObservable.class);
+               applyInputParams(line);
+
+               mvccEntitySerializationStrategy = 
injector.getInstance(MvccEntitySerializationStrategy.class);
+               uniqueValueSerializationStrategy = 
injector.getInstance(UniqueValueSerializationStrategy.class);
+               session = injector.getInstance(Session.class);
+
+               startEntityScan();
+
+               logger.info("Finished checking entities. Waiting for threads to 
complete execution.");
+
+               while (true) {
+                       try {
+                               // Spinning to prevent program execution from 
ending.
+                               // Need to replace with some kind of countdown 
latch or task tracker
+                               Thread.sleep(10000);
+                       } catch (InterruptedException e) {
+                               logger.error("Exception while waiting for 
unique check to complete.", e);
+                       }
+               }
+       }
+
+       private void startEntityScan() throws Exception, 
UnsupportedEncodingException {
+
+               for (Entry<UUID, String> organizationName : 
getOrgs().entrySet()) {
+
+                       // Let's skip the test entities.
+                       if 
(organizationName.equals(properties.getProperty("usergrid.test-account.organization")))
 {
+                               continue;
+                       }
+                       fetchApplicationsForOrgs(organizationName.getKey(), 
organizationName.getValue());
+               }
+       }
+
+       private Map<UUID, String> getOrgs() throws Exception {
+               // Loop through the organizations
+               Map<UUID, String> organizationNames = null;
+
+               if (orgId == null && (orgName == null || 
orgName.trim().equals(""))) {
+                       organizationNames = 
managementService.getOrganizations();
+               } else {
+                       OrganizationInfo info = null;
+
+                       if (orgId != null) {
+                               info = 
managementService.getOrganizationByUuid(orgId);
+                       } else {
+                               info = 
managementService.getOrganizationByName(orgName);
+                       }
+
+                       if (info == null) {
+                               logger.error("Organization info is null!");
+                               System.exit(1);
+                       }
+
+                       organizationNames = new HashMap<UUID, String>();
+                       organizationNames.put(info.getUuid(), info.getName());
+               }
+
+               return organizationNames;
+       }
+
+       private void fetchApplicationsForOrgs(UUID orgId, String orgName) 
throws Exception {
+
+               logger.info("Fetch applications for {} : {} ", orgId, orgName);
+
+               // Loop through the applications per organization
+               BiMap<UUID, String> applications = 
managementService.getApplicationsForOrganization(orgId);
+
+               if (applicationId == null && (applicationName == null || 
applicationName.trim().equals(""))) {
+                       // export all apps as appId or name is not provided
+
+                       
Observable.from(applications.entrySet()).subscribe(appEntry -> {
+                               UUID appId = appEntry.getKey();
+                               String appName = 
appEntry.getValue().split("/")[1];
+                               try {
+                                       fetchApplications(appId, appName);
+                               } catch (Exception e) {
+                                       logger.error("There was an exception 
fetching application {} : {}", appName, appId, e);
+                               }
+                       });
+
+               } else {
+
+                       UUID appId = applicationId;
+                       String appName = applicationName;
+
+                       if (applicationId != null) {
+                               appName = applications.get(appId);
+                       } else {
+                               appId = applications.inverse().get(orgName + 
'/' + appName);
+                       }
+
+                       try {
+                               fetchApplications(appId, appName);
+                       } catch (Exception e) {
+                               logger.error("There was an exception fetching 
application {} : {}", appName, appId, e);
+                       }
+
+               }
+       }
+
+       private void fetchApplications(UUID appId, String appName) throws 
Exception {
+
+               logger.info("Fetching application for {} : {} ", appName, 
appId);
+
+               EntityManager em = emf.getEntityManager(appId);
+
+               Set<String> collections = em.getApplicationCollections();
+
+               if (collNames == null || collNames.length <= 0) {
+                       logger.info("Please pass collection name ( 
-collectionName testCollection ) ");
+               } else {
+                       Observable.from(collNames).subscribe(collectionName -> {
+                               if (collections.contains(collectionName)) {
+                                       fetchCollections(appId, collectionName, 
em);
+                               }
+                       });
+               }
+
+       }
+
+       private void fetchCollections(UUID appId, String collectionName, 
EntityManager em) {
+               extractEntitiesForCollection(appId, collectionName);
+       }
+
+       private void extractEntitiesForCollection(UUID applicationId, String 
collectionName) {
+
+               AtomicInteger batch = new AtomicInteger(1);
+
+               final EntityManager rootEm = 
emf.getEntityManager(applicationId);
+
+               ExecutorService edgeScopeFetcher = 
Executors.newFixedThreadPool(1);
+               allEntityIdsObs
+                               
.getEdgesToEntities(Observable.just(CpNamingUtils.getApplicationScope(applicationId)),
+                                               Optional.fromNullable(
+                                                               
CpNamingUtils.getEdgeTypeFromCollectionName(collectionName.toLowerCase())),
+                                               (lastEdge == null ? 
Optional.absent() : Optional.fromNullable(lastEdge)))
+                               .buffer(1000).finallyDo(() -> {
+                                       edgeScopeFetcher.shutdown();
+                                       logger.info("Finished fetching entity 
ids for {}. Shutting down entity edge scope fetcher ",
+                                                       collectionName);
+                                       while 
(!edgeScopeFetcher.isTerminated()) {
+                                               try {
+                                                       
edgeScopeFetcher.awaitTermination(10, TimeUnit.SECONDS);
+                                               } catch (InterruptedException 
e) {
+                                               }
+                                       }
+                                       logger.info("Entity edge scope fetcher 
terminated after shutdown for {}", collectionName);
+                               }).subscribe(edges -> {
+
+                                       logger.info("For collection {}", 
collectionName);
+                                       Integer batchId = 
batch.getAndIncrement();
+                                       logger.info("Started fetching details 
for collection {} batch {} ", collectionName, batchId);
+                                       
Observable.just(edges).subscribeOn(Schedulers.from(edgeScopeFetcher)).subscribe(edgeScopes
 -> {
+
+                                               List<UUID> entityIds = new 
ArrayList<UUID>(1000);
+
+                                               for (EdgeScope edgeScope : 
edgeScopes) {
+                                                       Id entityId = 
edgeScope.getEdge().getTargetNode();
+                                                       if (entityId != null) {
+                                                               
entityIds.add(entityId.getUuid());
+                                                       } else {
+                                                               
edgeScopes.remove(edgeScope);
+                                                       }
+                                               }
+                                               try {
+                                                       String type = 
edgeScopes.get(0).getEdge().getTargetNode().getType();
+
+                                                       
Observable.just(entityIds).subscribeOn(Schedulers.from(entityFetcher)) // 
change to
+                                                                       
.subscribe(entIds -> {
+
+                                                                               
logger.info("Fetched {} entity id's of type {} for batch ID {}", entIds.size(),
+                                                                               
                type, batchId);
+                                                                               
Results entities = rootEm.getEntities(entIds, type);
+                                                                               
logger.info("Fetched {} entities of type {} for batch ID {}", entities.size(),
+                                                                               
                type, batchId);
+                                                                               
try {
+
+                                                                               
        ConnectableObservable<Entity> entityObs = Observable
+                                                                               
                        .from(entities.getEntities()).publish();
+                                                                               
        entityObs.subscribeOn(Schedulers.from(uniqueValueChecker));
+                                                                               
        entityObs.subscribe(t -> {
+                                                                               
                logger.info("Fetched entity with UUID : {}", t.getUuid());
+                                                                               
                if (findMissingUniqueValues) {
+                                                                               
                        String fieldValue = null;
+                                                                               
                        //We can search entity with UUID or name/email based on 
the entity type. 
+                                                                               
                        //This mapping between unique value field(name/email 
etc) and UUID,
+                                                                               
                        //is stored in unique value table. This can either be 
name / email or any other type.
+                                                                               
                        //This value is being passed as field type. 
+                                                                               
            //The code below takes the parameter and retrieves the value of the 
field using the getter method. 
+                                                                               
                        if (fieldType == null || fieldType.equals("")
+                                                                               
                                        || fieldType.equals("name")) {
+                                                                               
                                fieldType = "name";
+                                                                               
                                fieldValue = t.getName();
+                                                                               
                        } else {
+                                                                               
                                try {
+                                                                               
                                        Method method = t.getClass()
+                                                                               
                                                        .getMethod("get"
+                                                                               
                                                                        + 
fieldType.substring(0, 1).toUpperCase()
+                                                                               
                                                                        + 
fieldType.substring(1));
+                                                                               
                                        fieldValue = (String) method.invoke(t);
+                                                                               
                                } catch (Exception e1) {
+                                                                               
                                        logger.error(
+                                                                               
                                                        "Exception while trying 
to fetch field value of type {} for entity {} batch {}",
+                                                                               
                                                        fieldType, t.getUuid(), 
batchId, e1);
+                                                                               
                                }
+                                                                               
                        }
+                                                                               
                        try {
+                                                                               
                                if (fieldValue != null) {
+
+                                                                               
                                        Entity e = 
rootEm.getUniqueEntityFromAlias(t.getType(),
+                                                                               
                                                        fieldValue, false);
+
+                                                                               
                                        if (e == null) {
+                                                                               
                                                logger.info(
+                                                                               
                                                                "No entity 
found for field type {} and field value {} but exists for UUID {}",
+                                                                               
                                                                fieldType, 
fieldValue, t.getUuid());
+                                                                               
                                                if (fixMissingValue) {
+                                                                               
                                                        logger.info(
+                                                                               
                                                                        "Trying 
to repair unique value mapping for {} ",
+                                                                               
                                                                        
t.getUuid());
+                                                                               
                                                        UniqueValueSet 
uniqueValueSet = uniqueValueSerializationStrategy
+                                                                               
                                                                        
.load(new ApplicationScopeImpl(new SimpleId(
+                                                                               
                                                                                
        applicationId, "application")),
+                                                                               
                                                                                
        ConsistencyLevel
+                                                                               
                                                                                
                        .valueOf(System.getProperty(
+                                                                               
                                                                                
                                        "usergrid.read.cl",
+                                                                               
                                                                                
                                        "LOCAL_QUORUM")),
+                                                                               
                                                                                
        t.getType(),
+                                                                               
                                                                                
        Collections.singletonList(
+                                                                               
                                                                                
                        new StringField(fieldType,
+                                                                               
                                                                                
                                        fieldValue)),
+                                                                               
                                                                                
        false);
+
+                                                                               
                                                        ApplicationScope 
applicationScope = new ApplicationScopeImpl(
+                                                                               
                                                                        new 
SimpleId(applicationId, "application"));
+                                                                               
                                                        
com.google.common.base.Optional<MvccEntity> entity = 
mvccEntitySerializationStrategy
+                                                                               
                                                                        
.load(applicationScope, new SimpleId(
+                                                                               
                                                                                
        t.getUuid(), t.getType()));
+
+                                                                               
                                                        if (!entity.isPresent()
+                                                                               
                                                                        || 
!entity.get().getEntity().isPresent()) {
+                                                                               
                                                                throw new 
RuntimeException(
+                                                                               
                                                                                
"Unable to update unique value index because supplied UUID "
+                                                                               
                                                                                
                + t.getUuid()
+                                                                               
                                                                                
                + " does not exist");
+                                                                               
                                                        }
+                                                                               
                                                        logger.info("Delete 
unique value: {}",
+                                                                               
                                                                        
uniqueValueSet.getValue(fieldType));
+                                                                               
                                                        try {
+                                                                               
                                                                
session.execute(uniqueValueSerializationStrategy
+                                                                               
                                                                                
.deleteCQL(applicationScope,
+                                                                               
                                                                                
                uniqueValueSet
+                                                                               
                                                                                
                                .getValue(fieldType)));
+                                                                               
                                                        } catch (Exception ex) {
+                                                                               
                                                                logger.error(
+                                                                               
                                                                                
"Exception while trying to delete the Unique value for {}. Will proceed with 
creating new entry",
+                                                                               
                                                                                
t.getUuid(), ex);
+                                                                               
                                                        }
+
+                                                                               
                                                        UniqueValue 
newUniqueValue = new UniqueValueImpl(
+                                                                               
                                                                        new 
StringField(fieldType, fieldValue),
+                                                                               
                                                                        
entity.get().getId(),
+                                                                               
                                                                        
entity.get().getVersion());
+                                                                               
                                                        logger.info("Writing 
new unique value: {}",
+                                                                               
                                                                        
newUniqueValue);
+                                                                               
                                                        
session.execute(uniqueValueSerializationStrategy
+                                                                               
                                                                        
.writeCQL(applicationScope, newUniqueValue,
+                                                                               
                                                                                
        -1));
+                                                                               
                                                }
+
+                                                                               
                                        } else {
+                                                                               
                                                logger.info(
+                                                                               
                                                                "Found entity 
{} for field type {} and field value {}",
+                                                                               
                                                                e.getUuid(), 
fieldType, fieldValue);
+                                                                               
                                        }
+                                                                               
                                } else {
+                                                                               
                                        logger.info("No value found for field 
{} for entity {}",
+                                                                               
                                                        fieldType, t.getUuid());
+                                                                               
                                }
+                                                                               
                        } catch (Exception e) {
+                                                                               
                                logger.error(
+                                                                               
                                                "Error while checking unique 
values for batch id : {} for entity {}",
+                                                                               
                                                batchId, t.getUuid(), e);
+                                                                               
                        }
+                                                                               
                }
+                                                                               
        });
+                                                                               
        entityObs.connect();
+
+                                                                               
} catch (Exception e) {
+                                                                               
        logger.error(
+                                                                               
                        "Error while checking unique values for batch id : {} 
for collection {}",
+                                                                               
                        batchId, collectionName, e);
+                                                                               
}
+                                                                       });
+
+                                               } catch (Exception e) {
+                                                       logger.error("Exception 
while traversing entities " + edgeScopes.get(0).getEdge(), e);
+                                                       System.exit(0);
+                                               }
+                                       });
+                                       logger.info("Finished entity walk for 
collection {} for batch {}", collectionName, batchId);
+                               });
+               logger.info("Exiting extractEntitiesForCollection() method.");
+       }
+
+       protected void applyInputParams(CommandLine line) {
+
+               if (line.hasOption(ORG_ID)) {
+                       orgId = 
ConversionUtils.uuid(line.getOptionValue(ORG_ID));
+               } else if (line.hasOption(ORG_NAME)) {
+                       orgName = line.getOptionValue(ORG_NAME);
+               }
+
+               if (line.hasOption(APP_ID)) {
+                       applicationId = 
ConversionUtils.uuid(line.getOptionValue(APP_ID));
+               } else if (line.hasOption(APP_NAME)) {
+                       applicationName = line.getOptionValue(APP_NAME);
+               }
+               if (line.hasOption(COLL_NAMES)) {
+                       collNames = line.getOptionValue(COLL_NAMES).split(",");
+               }
+               if (line.hasOption(COLLECTION_NAME)) {
+                       collNames = new String[] { 
line.getOptionValue(COLLECTION_NAME) };
+               }
+               findMissingUniqueValues = 
line.hasOption(FIND_MISSING_UNIQUE_VALUES);
+               fixMissingValue = line.hasOption(FIX_MISSING_VALUES);
+
+       }
+}

Reply via email to