Repository: incubator-unomi Updated Branches: refs/heads/UNOMI-204 [created] 74f7172ef
UNOMI-204 : Add count method in QueryBuilder, adds cardinality/count aggregates, first optimization for PastEventCondition Project: http://git-wip-us.apache.org/repos/asf/incubator-unomi/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-unomi/commit/74f7172e Tree: http://git-wip-us.apache.org/repos/asf/incubator-unomi/tree/74f7172e Diff: http://git-wip-us.apache.org/repos/asf/incubator-unomi/diff/74f7172e Branch: refs/heads/UNOMI-204 Commit: 74f7172ef4148d7ba25673f9a3d6a0c1c08528a1 Parents: 10e694e Author: tdraier <dra...@apache.org> Authored: Tue Oct 9 18:15:28 2018 +0200 Committer: tdraier <dra...@apache.org> Committed: Tue Oct 9 18:15:28 2018 +0200 ---------------------------------------------------------------------- .../ElasticSearchPersistenceServiceImpl.java | 25 ++++++- .../conditions/ConditionESQueryBuilder.java | 3 + .../ConditionESQueryBuilderDispatcher.java | 32 ++++++++ .../spi/aggregate/TermsAggregate.java | 18 +++++ .../PastEventConditionESQueryBuilder.java | 77 +++++++++++++++----- 5 files changed, 137 insertions(+), 18 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/incubator-unomi/blob/74f7172e/persistence-elasticsearch/core/src/main/java/org/apache/unomi/persistence/elasticsearch/ElasticSearchPersistenceServiceImpl.java ---------------------------------------------------------------------- diff --git a/persistence-elasticsearch/core/src/main/java/org/apache/unomi/persistence/elasticsearch/ElasticSearchPersistenceServiceImpl.java b/persistence-elasticsearch/core/src/main/java/org/apache/unomi/persistence/elasticsearch/ElasticSearchPersistenceServiceImpl.java index 46e7cbb..23cedfb 100644 --- a/persistence-elasticsearch/core/src/main/java/org/apache/unomi/persistence/elasticsearch/ElasticSearchPersistenceServiceImpl.java +++ b/persistence-elasticsearch/core/src/main/java/org/apache/unomi/persistence/elasticsearch/ElasticSearchPersistenceServiceImpl.java @@ -61,6 +61,7 @@ import org.elasticsearch.common.unit.DistanceUnit; import org.elasticsearch.common.unit.TimeValue; import org.elasticsearch.common.xcontent.XContentType; import org.elasticsearch.index.IndexNotFoundException; +import org.elasticsearch.index.query.IdsQueryBuilder; import org.elasticsearch.index.query.QueryBuilder; import org.elasticsearch.index.query.QueryBuilders; import org.elasticsearch.index.query.RangeQueryBuilder; @@ -83,6 +84,8 @@ import org.elasticsearch.search.aggregations.bucket.missing.MissingAggregationBu import org.elasticsearch.search.aggregations.bucket.range.RangeAggregationBuilder; import org.elasticsearch.search.aggregations.bucket.range.date.DateRangeAggregationBuilder; import org.elasticsearch.search.aggregations.bucket.range.ip.IpRangeAggregationBuilder; +import org.elasticsearch.search.aggregations.bucket.terms.TermsAggregationBuilder; +import org.elasticsearch.search.aggregations.bucket.terms.support.IncludeExclude; import org.elasticsearch.search.aggregations.metrics.InternalNumericMetricsAggregation; import org.elasticsearch.search.sort.GeoDistanceSortBuilder; import org.elasticsearch.search.sort.SortBuilders; @@ -1306,7 +1309,15 @@ public class ElasticSearchPersistenceServiceImpl implements PersistenceService, @Override public long queryCount(Condition query, String itemType) { - return queryCount(conditionESQueryBuilderDispatcher.buildFilter(query), itemType); + try { + return conditionESQueryBuilderDispatcher.count(query); + } catch (UnsupportedOperationException e) { + QueryBuilder filter = conditionESQueryBuilderDispatcher.buildFilter(query); + if (filter instanceof IdsQueryBuilder) { + return ((IdsQueryBuilder) filter).ids().size(); + } + return queryCount(filter, itemType); + } } private long queryCount(final QueryBuilder filter, final String itemType) { @@ -1564,6 +1575,12 @@ public class ElasticSearchPersistenceServiceImpl implements PersistenceService, //default if (fieldName != null) { bucketsAggregation = AggregationBuilders.terms("buckets").field(fieldName).size(Integer.parseInt(aggregateQueryBucketSize)); + if (aggregate instanceof TermsAggregate) { + TermsAggregate termsAggregate = (TermsAggregate) aggregate; + if (termsAggregate.getPartition() > -1 && termsAggregate.getNumPartitions() > -1) { + ((TermsAggregationBuilder) bucketsAggregation).includeExclude(new IncludeExclude(termsAggregate.getPartition(), termsAggregate.getNumPartitions())); + } + } } else { // field name could be null if no existing data exists } @@ -1781,6 +1798,12 @@ public class ElasticSearchPersistenceServiceImpl implements PersistenceService, case "max": filterAggregation.subAggregation(AggregationBuilders.max("max").field(field)); break; + case "card": + filterAggregation.subAggregation(AggregationBuilders.cardinality("card").field(field)); + break; + case "count": + filterAggregation.subAggregation(AggregationBuilders.count("count").field(field)); + break; } } } http://git-wip-us.apache.org/repos/asf/incubator-unomi/blob/74f7172e/persistence-elasticsearch/core/src/main/java/org/apache/unomi/persistence/elasticsearch/conditions/ConditionESQueryBuilder.java ---------------------------------------------------------------------- diff --git a/persistence-elasticsearch/core/src/main/java/org/apache/unomi/persistence/elasticsearch/conditions/ConditionESQueryBuilder.java b/persistence-elasticsearch/core/src/main/java/org/apache/unomi/persistence/elasticsearch/conditions/ConditionESQueryBuilder.java index 78a5cba..7a100d0 100644 --- a/persistence-elasticsearch/core/src/main/java/org/apache/unomi/persistence/elasticsearch/conditions/ConditionESQueryBuilder.java +++ b/persistence-elasticsearch/core/src/main/java/org/apache/unomi/persistence/elasticsearch/conditions/ConditionESQueryBuilder.java @@ -26,4 +26,7 @@ public interface ConditionESQueryBuilder { QueryBuilder buildQuery(Condition condition, Map<String, Object> context, ConditionESQueryBuilderDispatcher dispatcher); + default long count(Condition condition, Map<String, Object> context, ConditionESQueryBuilderDispatcher dispatcher) { + throw new UnsupportedOperationException(); + } } http://git-wip-us.apache.org/repos/asf/incubator-unomi/blob/74f7172e/persistence-elasticsearch/core/src/main/java/org/apache/unomi/persistence/elasticsearch/conditions/ConditionESQueryBuilderDispatcher.java ---------------------------------------------------------------------- diff --git a/persistence-elasticsearch/core/src/main/java/org/apache/unomi/persistence/elasticsearch/conditions/ConditionESQueryBuilderDispatcher.java b/persistence-elasticsearch/core/src/main/java/org/apache/unomi/persistence/elasticsearch/conditions/ConditionESQueryBuilderDispatcher.java index b29a631..d08e283 100644 --- a/persistence-elasticsearch/core/src/main/java/org/apache/unomi/persistence/elasticsearch/conditions/ConditionESQueryBuilderDispatcher.java +++ b/persistence-elasticsearch/core/src/main/java/org/apache/unomi/persistence/elasticsearch/conditions/ConditionESQueryBuilderDispatcher.java @@ -86,5 +86,37 @@ public class ConditionESQueryBuilderDispatcher { return QueryBuilders.matchAllQuery(); } + public long count(Condition condition) { + return count(condition, new HashMap<>()); + } + + public long count(Condition condition, Map<String, Object> context) { + if(condition == null || condition.getConditionType() == null) { + throw new IllegalArgumentException("Condition is null or doesn't have type, impossible to build filter"); + } + + String queryBuilderKey = condition.getConditionType().getQueryBuilder(); + if (queryBuilderKey == null && condition.getConditionType().getParentCondition() != null) { + context.putAll(condition.getParameterValues()); + return count(condition.getConditionType().getParentCondition(), context); + } + + if (queryBuilderKey == null) { + throw new UnsupportedOperationException("No query builder defined for : " + condition.getConditionTypeId()); + } + if (queryBuilders.containsKey(queryBuilderKey)) { + ConditionESQueryBuilder queryBuilder = queryBuilders.get(queryBuilderKey); + Condition contextualCondition = ConditionContextHelper.getContextualCondition(condition, context); + if (contextualCondition != null) { + return queryBuilder.count(contextualCondition, context, this); + } + } + + // if no matching + if (logger.isDebugEnabled()) { + logger.debug("No matching query builder for condition {} and context {}", condition, context); + } + throw new UnsupportedOperationException(); + } } http://git-wip-us.apache.org/repos/asf/incubator-unomi/blob/74f7172e/persistence-spi/src/main/java/org/apache/unomi/persistence/spi/aggregate/TermsAggregate.java ---------------------------------------------------------------------- diff --git a/persistence-spi/src/main/java/org/apache/unomi/persistence/spi/aggregate/TermsAggregate.java b/persistence-spi/src/main/java/org/apache/unomi/persistence/spi/aggregate/TermsAggregate.java index a0591f5..3b9d741 100644 --- a/persistence-spi/src/main/java/org/apache/unomi/persistence/spi/aggregate/TermsAggregate.java +++ b/persistence-spi/src/main/java/org/apache/unomi/persistence/spi/aggregate/TermsAggregate.java @@ -18,7 +18,25 @@ package org.apache.unomi.persistence.spi.aggregate; public class TermsAggregate extends BaseAggregate{ + private int partition = -1; + private int numPartitions = -1; + + public TermsAggregate(String field) { super(field); } + + public TermsAggregate(String field, int partition, int numPartitions) { + super(field); + this.partition = partition; + this.numPartitions = numPartitions; + } + + public int getPartition() { + return partition; + } + + public int getNumPartitions() { + return numPartitions; + } } http://git-wip-us.apache.org/repos/asf/incubator-unomi/blob/74f7172e/plugins/baseplugin/src/main/java/org/apache/unomi/plugins/baseplugin/conditions/PastEventConditionESQueryBuilder.java ---------------------------------------------------------------------- diff --git a/plugins/baseplugin/src/main/java/org/apache/unomi/plugins/baseplugin/conditions/PastEventConditionESQueryBuilder.java b/plugins/baseplugin/src/main/java/org/apache/unomi/plugins/baseplugin/conditions/PastEventConditionESQueryBuilder.java index e51aaa8..b3c169c 100644 --- a/plugins/baseplugin/src/main/java/org/apache/unomi/plugins/baseplugin/conditions/PastEventConditionESQueryBuilder.java +++ b/plugins/baseplugin/src/main/java/org/apache/unomi/plugins/baseplugin/conditions/PastEventConditionESQueryBuilder.java @@ -44,6 +44,64 @@ public class PastEventConditionESQueryBuilder implements ConditionESQueryBuilder } public QueryBuilder buildQuery(Condition condition, Map<String, Object> context, ConditionESQueryBuilderDispatcher dispatcher) { + Condition eventCondition = getEventCondition(condition, context); + //todo : Check behaviour with important number of profiles + Set<String> ids = new HashSet<String>(); + Integer minimumEventCount = condition.getParameter("minimumEventCount") == null ? 0 : (Integer) condition.getParameter("minimumEventCount"); + Integer maximumEventCount = condition.getParameter("maximumEventCount") == null ? Integer.MAX_VALUE : (Integer) condition.getParameter("maximumEventCount"); + + Map<String, Double> m = persistenceService.getSingleValuesMetrics(eventCondition, new String[]{"card", "count"}, "profileId.keyword", Event.ITEM_TYPE); + long card = m.get("_card").longValue(); + + int numParts = (int) (card / 1000); + for (int i = 0; i < numParts; i++) { + Map<String, Long> eventCountByProfile = persistenceService.aggregateWithOptimizedQuery(eventCondition, new TermsAggregate("profileId", i, numParts), Event.ITEM_TYPE); + if (eventCountByProfile != null) { + for (Map.Entry<String, Long> entry : eventCountByProfile.entrySet()) { + if (!entry.getKey().startsWith("_")) { + if (entry.getValue() >= minimumEventCount && entry.getValue() <= maximumEventCount) { + ids.add(entry.getKey()); + } + } + } + } + } + + return QueryBuilders.idsQuery(Profile.ITEM_TYPE).addIds(ids.toArray(new String[ids.size()])); + } + + public long count(Condition condition, Map<String, Object> context, ConditionESQueryBuilderDispatcher dispatcher) { + Condition eventCondition = getEventCondition(condition, context); + + Integer minimumEventCount = condition.getParameter("minimumEventCount") == null ? 0 : (Integer) condition.getParameter("minimumEventCount"); + Integer maximumEventCount = condition.getParameter("maximumEventCount") == null ? Integer.MAX_VALUE : (Integer) condition.getParameter("maximumEventCount"); + + Map<String, Double> m = persistenceService.getSingleValuesMetrics(eventCondition, new String[]{"card", "count"}, "profileId.keyword", Event.ITEM_TYPE); + long card = m.get("_card").longValue(); + long count = m.get("_count").longValue(); + + if (minimumEventCount != 0 || maximumEventCount != Integer.MAX_VALUE) { + int result = 0; + int numParts = (int) (card / 1000); + for (int i = 0; i < numParts; i++) { + Map<String, Long> eventCountByProfile = persistenceService.aggregateWithOptimizedQuery(eventCondition, new TermsAggregate("profileId", i, numParts), Event.ITEM_TYPE); + if (eventCountByProfile != null) { + for (Map.Entry<String, Long> entry : eventCountByProfile.entrySet()) { + if (!entry.getKey().startsWith("_")) { + if (entry.getValue() >= minimumEventCount && entry.getValue() <= maximumEventCount) { + result ++; + } + } + } + } + } + return result; + } + + return card; + } + + private Condition getEventCondition(Condition condition, Map<String, Object> context) { Condition eventCondition; try { eventCondition = (Condition) condition.getParameter("eventCondition"); @@ -70,22 +128,7 @@ public class PastEventConditionESQueryBuilder implements ConditionESQueryBuilder numberOfDaysCondition.setParameter("propertyValueDateExpr", "now-" + numberOfDays + "d"); l.add(numberOfDaysCondition); } - //todo : Check behaviour with important number of profiles - Set<String> ids = new HashSet<String>(); - Integer minimumEventCount = condition.getParameter("minimumEventCount") == null ? 0 : (Integer) condition.getParameter("minimumEventCount"); - Integer maximumEventCount = condition.getParameter("maximumEventCount") == null ? Integer.MAX_VALUE : (Integer) condition.getParameter("maximumEventCount"); - - Map<String, Long> eventCountByProfile = persistenceService.aggregateWithOptimizedQuery(andCondition, new TermsAggregate("profileId"), Event.ITEM_TYPE); - if (eventCountByProfile != null) { - for (Map.Entry<String, Long> entry : eventCountByProfile.entrySet()) { - if (!entry.getKey().startsWith("_")) { - if (entry.getValue() >= minimumEventCount && entry.getValue() <= maximumEventCount) { - ids.add(entry.getKey()); - } - } - } - } - - return QueryBuilders.idsQuery(Profile.ITEM_TYPE).addIds(ids.toArray(new String[ids.size()])); + return andCondition; } + }