Use constant score query when searching index for all edge docs and node docs for deleting entities/edges from the index.
Project: http://git-wip-us.apache.org/repos/asf/usergrid/repo Commit: http://git-wip-us.apache.org/repos/asf/usergrid/commit/9a45c492 Tree: http://git-wip-us.apache.org/repos/asf/usergrid/tree/9a45c492 Diff: http://git-wip-us.apache.org/repos/asf/usergrid/diff/9a45c492 Branch: refs/heads/master Commit: 9a45c4923a1873f2efbf3f285df409f2e729f169 Parents: 3b6eb07 Author: Michael Russo <mru...@apigee.com> Authored: Wed Jul 20 10:27:10 2016 +0100 Committer: Michael Russo <mru...@apigee.com> Committed: Wed Jul 20 10:27:10 2016 +0100 ---------------------------------------------------------------------- .../index/impl/EsEntityIndexImpl.java | 53 +++++++++++--------- 1 file changed, 29 insertions(+), 24 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/usergrid/blob/9a45c492/stack/corepersistence/queryindex/src/main/java/org/apache/usergrid/persistence/index/impl/EsEntityIndexImpl.java ---------------------------------------------------------------------- diff --git a/stack/corepersistence/queryindex/src/main/java/org/apache/usergrid/persistence/index/impl/EsEntityIndexImpl.java b/stack/corepersistence/queryindex/src/main/java/org/apache/usergrid/persistence/index/impl/EsEntityIndexImpl.java index d2aff7e..6e04bed 100644 --- a/stack/corepersistence/queryindex/src/main/java/org/apache/usergrid/persistence/index/impl/EsEntityIndexImpl.java +++ b/stack/corepersistence/queryindex/src/main/java/org/apache/usergrid/persistence/index/impl/EsEntityIndexImpl.java @@ -56,12 +56,12 @@ import org.elasticsearch.action.deletebyquery.DeleteByQueryResponse; import org.elasticsearch.action.deletebyquery.IndexDeleteByQueryResponse; import org.elasticsearch.action.search.SearchRequestBuilder; import org.elasticsearch.action.search.SearchResponse; -import org.elasticsearch.action.search.SearchScrollRequestBuilder; import org.elasticsearch.client.AdminClient; import org.elasticsearch.common.settings.ImmutableSettings; import org.elasticsearch.common.settings.Settings; import org.elasticsearch.common.unit.TimeValue; import org.elasticsearch.index.query.*; +import org.elasticsearch.index.query.QueryBuilder; import org.elasticsearch.indices.IndexAlreadyExistsException; import org.elasticsearch.search.SearchHit; import org.elasticsearch.search.SearchHits; @@ -110,7 +110,7 @@ public class EsEntityIndexImpl implements EntityIndex,VersionedData { private static final String VERIFY_TYPE = "entity"; private static final ImmutableMap<String, Object> DEFAULT_PAYLOAD = - ImmutableMap.<String, Object>builder().put(IndexingUtils.ENTITY_ID_FIELDNAME, UUIDGenerator.newTimeUUID().toString()).build(); + ImmutableMap.<String, Object>builder().put(IndexingUtils.ENTITY_ID_FIELDNAME, UUIDGenerator.newTimeUUID().toString()).build(); private final ApplicationScope applicationScope; @@ -196,7 +196,7 @@ public class EsEntityIndexImpl implements EntityIndex,VersionedData { Settings settings = ImmutableSettings.settingsBuilder() .put("index.number_of_shards", numberOfShards) .put("index.number_of_replicas", numberOfReplicas) - //dont' allow unmapped queries, and don't allow dynamic mapping + //dont' allow unmapped queries, and don't allow dynamic mapping .put("index.query.parse.allow_unmapped_fields", false) .put("index.mapper.dynamic", false) .put("action.write_consistency", writeConsistency) @@ -205,9 +205,9 @@ public class EsEntityIndexImpl implements EntityIndex,VersionedData { //Added For Graphite Metrics Timer.Context timeNewIndexCreation = addTimer.time(); final CreateIndexResponse cir = admin.indices().prepareCreate(indexName) - .setSettings(settings) + .setSettings(settings) .execute() - .actionGet(); + .actionGet(); timeNewIndexCreation.stop(); //create the mappings @@ -301,11 +301,11 @@ public class EsEntityIndexImpl implements EntityIndex,VersionedData { final String tempId = UUIDGenerator.newTimeUUID().toString(); esProvider.getClient().prepareIndex( alias.getWriteAlias(), VERIFY_TYPE, tempId ) - .setSource(DEFAULT_PAYLOAD).get(); + .setSource(DEFAULT_PAYLOAD).get(); if (logger.isTraceEnabled()) { logger.trace("Successfully created new document with docId {} in index read {} write {} and type {}", - tempId, alias.getReadAlias(), alias.getWriteAlias(), VERIFY_TYPE); + tempId, alias.getReadAlias(), alias.getWriteAlias(), VERIFY_TYPE); } // delete all types, this way if we miss one it will get cleaned up @@ -313,7 +313,7 @@ public class EsEntityIndexImpl implements EntityIndex,VersionedData { if (logger.isTraceEnabled()) { logger.trace("Successfully deleted documents in read {} write {} and type {} with id {}", - alias.getReadAlias(), alias.getWriteAlias(), VERIFY_TYPE, tempId); + alias.getReadAlias(), alias.getWriteAlias(), VERIFY_TYPE, tempId); } return true; @@ -333,7 +333,7 @@ public class EsEntityIndexImpl implements EntityIndex,VersionedData { //Added For Graphite Metrics Timer.Context timePutIndex = mappingTimer.time(); PutMappingResponse pitr = esProvider.getClient().admin().indices().preparePutMapping( indexName ).setType( "entity" ).setSource( - getMappingsContent() ).execute().actionGet(); + getMappingsContent() ).execute().actionGet(); timePutIndex.stop(); if ( !pitr.isAcknowledged() ) { throw new IndexException( "Unable to create default mappings" ); @@ -381,7 +381,7 @@ public class EsEntityIndexImpl implements EntityIndex,VersionedData { } if (logger.isTraceEnabled()) { logger.trace("Refreshed indexes: {},success:{} failed:{} ", StringUtils.join(indexes, ", "), - successfulShards, failedShards); + successfulShards, failedShards); } IndexRefreshCommandInfo refreshResults = new IndexRefreshCommandInfo(failedShards == 0, @@ -431,7 +431,7 @@ public class EsEntityIndexImpl implements EntityIndex,VersionedData { searchTypes.getTypeNames( applicationScope ), srb ); } - //Added For Graphite Metrics + //Added For Graphite Metrics final Timer.Context timerContext = searchTimer.time(); try { @@ -493,11 +493,13 @@ public class EsEntityIndexImpl implements EntityIndex,VersionedData { .rangeQuery(IndexingUtils.EDGE_TIMESTAMP_FIELDNAME) .gte(queryTimestamp); - QueryBuilder finalQuery = QueryBuilders - .boolQuery() - .must(entityIdQuery) - .must(nodeIdQuery) - .must(timestampQuery); + QueryBuilder finalQuery = QueryBuilders.constantScoreQuery( + QueryBuilders + .boolQuery() + .must(entityIdQuery) + .must(nodeIdQuery) + .must(timestampQuery) + ); searchResponse = srb .setQuery(finalQuery) @@ -552,10 +554,13 @@ public class EsEntityIndexImpl implements EntityIndex,VersionedData { .gte(queryTimestamp) .lt(markedTimestamp); - QueryBuilder finalQuery = QueryBuilders - .boolQuery() - .must(timestampQuery) - .must(nodeQuery); + QueryBuilder finalQuery = QueryBuilders.constantScoreQuery( + QueryBuilders + .boolQuery() + .must(timestampQuery) + .must(nodeQuery) + ); + searchResponse = srb .setQuery(finalQuery) @@ -684,7 +689,7 @@ public class EsEntityIndexImpl implements EntityIndex,VersionedData { candidateResult.getVersion(), markedVersion, candidateResult.getId() - ); + ); } candidates.add(candidateResult); @@ -745,7 +750,7 @@ public class EsEntityIndexImpl implements EntityIndex,VersionedData { try { ClusterHealthResponse chr = esProvider.getClient().admin() - .cluster().health(new ClusterHealthRequest()).get(); + .cluster().health(new ClusterHealthRequest()).get(); return Health.valueOf( chr.getStatus().name() ); } catch ( Exception ex ) { @@ -765,8 +770,8 @@ public class EsEntityIndexImpl implements EntityIndex,VersionedData { try { String[] indexNames = this.getIndexes(); - final ActionFuture<ClusterHealthResponse> future = esProvider.getClient().admin().cluster().health( - new ClusterHealthRequest( indexNames ) ); + final ActionFuture<ClusterHealthResponse> future = esProvider.getClient().admin().cluster().health( + new ClusterHealthRequest( indexNames ) ); //only wait 2 seconds max ClusterHealthResponse chr = future.actionGet(2000);