Repository: incubator-unomi
Updated Branches:
  refs/heads/UNOMI-204 [created] 74f7172ef


UNOMI-204 : Add count method in QueryBuilder, adds cardinality/count 
aggregates, first optimization for PastEventCondition


Project: http://git-wip-us.apache.org/repos/asf/incubator-unomi/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-unomi/commit/74f7172e
Tree: http://git-wip-us.apache.org/repos/asf/incubator-unomi/tree/74f7172e
Diff: http://git-wip-us.apache.org/repos/asf/incubator-unomi/diff/74f7172e

Branch: refs/heads/UNOMI-204
Commit: 74f7172ef4148d7ba25673f9a3d6a0c1c08528a1
Parents: 10e694e
Author: tdraier <dra...@apache.org>
Authored: Tue Oct 9 18:15:28 2018 +0200
Committer: tdraier <dra...@apache.org>
Committed: Tue Oct 9 18:15:28 2018 +0200

----------------------------------------------------------------------
 .../ElasticSearchPersistenceServiceImpl.java    | 25 ++++++-
 .../conditions/ConditionESQueryBuilder.java     |  3 +
 .../ConditionESQueryBuilderDispatcher.java      | 32 ++++++++
 .../spi/aggregate/TermsAggregate.java           | 18 +++++
 .../PastEventConditionESQueryBuilder.java       | 77 +++++++++++++++-----
 5 files changed, 137 insertions(+), 18 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-unomi/blob/74f7172e/persistence-elasticsearch/core/src/main/java/org/apache/unomi/persistence/elasticsearch/ElasticSearchPersistenceServiceImpl.java
----------------------------------------------------------------------
diff --git 
a/persistence-elasticsearch/core/src/main/java/org/apache/unomi/persistence/elasticsearch/ElasticSearchPersistenceServiceImpl.java
 
b/persistence-elasticsearch/core/src/main/java/org/apache/unomi/persistence/elasticsearch/ElasticSearchPersistenceServiceImpl.java
index 46e7cbb..23cedfb 100644
--- 
a/persistence-elasticsearch/core/src/main/java/org/apache/unomi/persistence/elasticsearch/ElasticSearchPersistenceServiceImpl.java
+++ 
b/persistence-elasticsearch/core/src/main/java/org/apache/unomi/persistence/elasticsearch/ElasticSearchPersistenceServiceImpl.java
@@ -61,6 +61,7 @@ import org.elasticsearch.common.unit.DistanceUnit;
 import org.elasticsearch.common.unit.TimeValue;
 import org.elasticsearch.common.xcontent.XContentType;
 import org.elasticsearch.index.IndexNotFoundException;
+import org.elasticsearch.index.query.IdsQueryBuilder;
 import org.elasticsearch.index.query.QueryBuilder;
 import org.elasticsearch.index.query.QueryBuilders;
 import org.elasticsearch.index.query.RangeQueryBuilder;
@@ -83,6 +84,8 @@ import 
org.elasticsearch.search.aggregations.bucket.missing.MissingAggregationBu
 import 
org.elasticsearch.search.aggregations.bucket.range.RangeAggregationBuilder;
 import 
org.elasticsearch.search.aggregations.bucket.range.date.DateRangeAggregationBuilder;
 import 
org.elasticsearch.search.aggregations.bucket.range.ip.IpRangeAggregationBuilder;
+import 
org.elasticsearch.search.aggregations.bucket.terms.TermsAggregationBuilder;
+import 
org.elasticsearch.search.aggregations.bucket.terms.support.IncludeExclude;
 import 
org.elasticsearch.search.aggregations.metrics.InternalNumericMetricsAggregation;
 import org.elasticsearch.search.sort.GeoDistanceSortBuilder;
 import org.elasticsearch.search.sort.SortBuilders;
@@ -1306,7 +1309,15 @@ public class ElasticSearchPersistenceServiceImpl 
implements PersistenceService,
 
     @Override
     public long queryCount(Condition query, String itemType) {
-        return 
queryCount(conditionESQueryBuilderDispatcher.buildFilter(query), itemType);
+        try {
+            return conditionESQueryBuilderDispatcher.count(query);
+        } catch (UnsupportedOperationException e) {
+            QueryBuilder filter = 
conditionESQueryBuilderDispatcher.buildFilter(query);
+            if (filter instanceof IdsQueryBuilder) {
+                return ((IdsQueryBuilder) filter).ids().size();
+            }
+            return queryCount(filter, itemType);
+        }
     }
 
     private long queryCount(final QueryBuilder filter, final String itemType) {
@@ -1564,6 +1575,12 @@ public class ElasticSearchPersistenceServiceImpl 
implements PersistenceService,
                         //default
                         if (fieldName != null) {
                             bucketsAggregation = 
AggregationBuilders.terms("buckets").field(fieldName).size(Integer.parseInt(aggregateQueryBucketSize));
+                            if (aggregate instanceof TermsAggregate) {
+                                TermsAggregate termsAggregate = 
(TermsAggregate) aggregate;
+                                if (termsAggregate.getPartition() > -1 && 
termsAggregate.getNumPartitions() > -1) {
+                                    ((TermsAggregationBuilder) 
bucketsAggregation).includeExclude(new 
IncludeExclude(termsAggregate.getPartition(), 
termsAggregate.getNumPartitions()));
+                                }
+                            }
                         } else {
                             // field name could be null if no existing data 
exists
                         }
@@ -1781,6 +1798,12 @@ public class ElasticSearchPersistenceServiceImpl 
implements PersistenceService,
                             case "max":
                                 
filterAggregation.subAggregation(AggregationBuilders.max("max").field(field));
                                 break;
+                            case "card":
+                                
filterAggregation.subAggregation(AggregationBuilders.cardinality("card").field(field));
+                                break;
+                            case "count":
+                                
filterAggregation.subAggregation(AggregationBuilders.count("count").field(field));
+                                break;
                         }
                     }
                 }

http://git-wip-us.apache.org/repos/asf/incubator-unomi/blob/74f7172e/persistence-elasticsearch/core/src/main/java/org/apache/unomi/persistence/elasticsearch/conditions/ConditionESQueryBuilder.java
----------------------------------------------------------------------
diff --git 
a/persistence-elasticsearch/core/src/main/java/org/apache/unomi/persistence/elasticsearch/conditions/ConditionESQueryBuilder.java
 
b/persistence-elasticsearch/core/src/main/java/org/apache/unomi/persistence/elasticsearch/conditions/ConditionESQueryBuilder.java
index 78a5cba..7a100d0 100644
--- 
a/persistence-elasticsearch/core/src/main/java/org/apache/unomi/persistence/elasticsearch/conditions/ConditionESQueryBuilder.java
+++ 
b/persistence-elasticsearch/core/src/main/java/org/apache/unomi/persistence/elasticsearch/conditions/ConditionESQueryBuilder.java
@@ -26,4 +26,7 @@ public interface ConditionESQueryBuilder {
 
     QueryBuilder buildQuery(Condition condition, Map<String, Object> context, 
ConditionESQueryBuilderDispatcher dispatcher);
 
+    default long count(Condition condition, Map<String, Object> context, 
ConditionESQueryBuilderDispatcher dispatcher) {
+        throw new UnsupportedOperationException();
+    }
 }

http://git-wip-us.apache.org/repos/asf/incubator-unomi/blob/74f7172e/persistence-elasticsearch/core/src/main/java/org/apache/unomi/persistence/elasticsearch/conditions/ConditionESQueryBuilderDispatcher.java
----------------------------------------------------------------------
diff --git 
a/persistence-elasticsearch/core/src/main/java/org/apache/unomi/persistence/elasticsearch/conditions/ConditionESQueryBuilderDispatcher.java
 
b/persistence-elasticsearch/core/src/main/java/org/apache/unomi/persistence/elasticsearch/conditions/ConditionESQueryBuilderDispatcher.java
index b29a631..d08e283 100644
--- 
a/persistence-elasticsearch/core/src/main/java/org/apache/unomi/persistence/elasticsearch/conditions/ConditionESQueryBuilderDispatcher.java
+++ 
b/persistence-elasticsearch/core/src/main/java/org/apache/unomi/persistence/elasticsearch/conditions/ConditionESQueryBuilderDispatcher.java
@@ -86,5 +86,37 @@ public class ConditionESQueryBuilderDispatcher {
         return QueryBuilders.matchAllQuery();
     }
 
+    public long count(Condition condition) {
+        return count(condition, new HashMap<>());
+    }
+
+    public long count(Condition condition, Map<String, Object> context) {
+        if(condition == null || condition.getConditionType() == null) {
+            throw new IllegalArgumentException("Condition is null or doesn't 
have type, impossible to build filter");
+        }
+
+        String queryBuilderKey = 
condition.getConditionType().getQueryBuilder();
+        if (queryBuilderKey == null && 
condition.getConditionType().getParentCondition() != null) {
+            context.putAll(condition.getParameterValues());
+            return count(condition.getConditionType().getParentCondition(), 
context);
+        }
+
+        if (queryBuilderKey == null) {
+            throw new UnsupportedOperationException("No query builder defined 
for : " + condition.getConditionTypeId());
+        }
 
+        if (queryBuilders.containsKey(queryBuilderKey)) {
+            ConditionESQueryBuilder queryBuilder = 
queryBuilders.get(queryBuilderKey);
+            Condition contextualCondition = 
ConditionContextHelper.getContextualCondition(condition, context);
+            if (contextualCondition != null) {
+                return queryBuilder.count(contextualCondition, context, this);
+            }
+        }
+
+        // if no matching
+        if (logger.isDebugEnabled()) {
+            logger.debug("No matching query builder for condition {} and 
context {}", condition, context);
+        }
+        throw new UnsupportedOperationException();
+    }
 }

http://git-wip-us.apache.org/repos/asf/incubator-unomi/blob/74f7172e/persistence-spi/src/main/java/org/apache/unomi/persistence/spi/aggregate/TermsAggregate.java
----------------------------------------------------------------------
diff --git 
a/persistence-spi/src/main/java/org/apache/unomi/persistence/spi/aggregate/TermsAggregate.java
 
b/persistence-spi/src/main/java/org/apache/unomi/persistence/spi/aggregate/TermsAggregate.java
index a0591f5..3b9d741 100644
--- 
a/persistence-spi/src/main/java/org/apache/unomi/persistence/spi/aggregate/TermsAggregate.java
+++ 
b/persistence-spi/src/main/java/org/apache/unomi/persistence/spi/aggregate/TermsAggregate.java
@@ -18,7 +18,25 @@
 package org.apache.unomi.persistence.spi.aggregate;
 
 public class TermsAggregate extends BaseAggregate{
+    private int partition = -1;
+    private int numPartitions = -1;
+
+
     public TermsAggregate(String field) {
         super(field);
     }
+
+    public TermsAggregate(String field, int partition, int numPartitions) {
+        super(field);
+        this.partition = partition;
+        this.numPartitions = numPartitions;
+    }
+
+    public int getPartition() {
+        return partition;
+    }
+
+    public int getNumPartitions() {
+        return numPartitions;
+    }
 }

http://git-wip-us.apache.org/repos/asf/incubator-unomi/blob/74f7172e/plugins/baseplugin/src/main/java/org/apache/unomi/plugins/baseplugin/conditions/PastEventConditionESQueryBuilder.java
----------------------------------------------------------------------
diff --git 
a/plugins/baseplugin/src/main/java/org/apache/unomi/plugins/baseplugin/conditions/PastEventConditionESQueryBuilder.java
 
b/plugins/baseplugin/src/main/java/org/apache/unomi/plugins/baseplugin/conditions/PastEventConditionESQueryBuilder.java
index e51aaa8..b3c169c 100644
--- 
a/plugins/baseplugin/src/main/java/org/apache/unomi/plugins/baseplugin/conditions/PastEventConditionESQueryBuilder.java
+++ 
b/plugins/baseplugin/src/main/java/org/apache/unomi/plugins/baseplugin/conditions/PastEventConditionESQueryBuilder.java
@@ -44,6 +44,64 @@ public class PastEventConditionESQueryBuilder implements 
ConditionESQueryBuilder
     }
 
     public QueryBuilder buildQuery(Condition condition, Map<String, Object> 
context, ConditionESQueryBuilderDispatcher dispatcher) {
+        Condition eventCondition = getEventCondition(condition, context);
+        //todo : Check behaviour with important number of profiles
+        Set<String> ids = new HashSet<String>();
+        Integer minimumEventCount = 
condition.getParameter("minimumEventCount") == null ? 0 : (Integer) 
condition.getParameter("minimumEventCount");
+        Integer maximumEventCount = 
condition.getParameter("maximumEventCount") == null ? Integer.MAX_VALUE : 
(Integer) condition.getParameter("maximumEventCount");
+
+        Map<String, Double> m = 
persistenceService.getSingleValuesMetrics(eventCondition, new String[]{"card", 
"count"}, "profileId.keyword", Event.ITEM_TYPE);
+        long card = m.get("_card").longValue();
+
+        int numParts = (int) (card / 1000);
+        for (int i = 0; i < numParts; i++) {
+            Map<String, Long> eventCountByProfile = 
persistenceService.aggregateWithOptimizedQuery(eventCondition, new 
TermsAggregate("profileId", i, numParts), Event.ITEM_TYPE);
+            if (eventCountByProfile != null) {
+                for (Map.Entry<String, Long> entry : 
eventCountByProfile.entrySet()) {
+                    if (!entry.getKey().startsWith("_")) {
+                        if (entry.getValue() >= minimumEventCount && 
entry.getValue() <= maximumEventCount) {
+                            ids.add(entry.getKey());
+                        }
+                    }
+                }
+            }
+        }
+
+        return 
QueryBuilders.idsQuery(Profile.ITEM_TYPE).addIds(ids.toArray(new 
String[ids.size()]));
+    }
+
+    public long count(Condition condition, Map<String, Object> context, 
ConditionESQueryBuilderDispatcher dispatcher) {
+        Condition eventCondition = getEventCondition(condition, context);
+
+        Integer minimumEventCount = 
condition.getParameter("minimumEventCount") == null ? 0 : (Integer) 
condition.getParameter("minimumEventCount");
+        Integer maximumEventCount = 
condition.getParameter("maximumEventCount") == null ? Integer.MAX_VALUE : 
(Integer) condition.getParameter("maximumEventCount");
+
+        Map<String, Double> m = 
persistenceService.getSingleValuesMetrics(eventCondition, new String[]{"card", 
"count"}, "profileId.keyword", Event.ITEM_TYPE);
+        long card = m.get("_card").longValue();
+        long count = m.get("_count").longValue();
+
+        if (minimumEventCount != 0 || maximumEventCount != Integer.MAX_VALUE) {
+            int result = 0;
+            int numParts = (int) (card / 1000);
+            for (int i = 0; i < numParts; i++) {
+                Map<String, Long> eventCountByProfile = 
persistenceService.aggregateWithOptimizedQuery(eventCondition, new 
TermsAggregate("profileId", i, numParts), Event.ITEM_TYPE);
+                if (eventCountByProfile != null) {
+                    for (Map.Entry<String, Long> entry : 
eventCountByProfile.entrySet()) {
+                        if (!entry.getKey().startsWith("_")) {
+                            if (entry.getValue() >= minimumEventCount && 
entry.getValue() <= maximumEventCount) {
+                                result ++;
+                            }
+                        }
+                    }
+                }
+            }
+            return result;
+        }
+
+        return card;
+    }
+
+    private Condition getEventCondition(Condition condition, Map<String, 
Object> context) {
         Condition eventCondition;
         try {
             eventCondition = (Condition) 
condition.getParameter("eventCondition");
@@ -70,22 +128,7 @@ public class PastEventConditionESQueryBuilder implements 
ConditionESQueryBuilder
             numberOfDaysCondition.setParameter("propertyValueDateExpr", "now-" 
+ numberOfDays + "d");
             l.add(numberOfDaysCondition);
         }
-        //todo : Check behaviour with important number of profiles
-        Set<String> ids = new HashSet<String>();
-        Integer minimumEventCount = 
condition.getParameter("minimumEventCount") == null ? 0 : (Integer) 
condition.getParameter("minimumEventCount");
-        Integer maximumEventCount = 
condition.getParameter("maximumEventCount") == null  ? Integer.MAX_VALUE : 
(Integer) condition.getParameter("maximumEventCount");
-
-        Map<String, Long> eventCountByProfile = 
persistenceService.aggregateWithOptimizedQuery(andCondition, new 
TermsAggregate("profileId"), Event.ITEM_TYPE);
-        if (eventCountByProfile != null) {
-            for (Map.Entry<String, Long> entry : 
eventCountByProfile.entrySet()) {
-                if (!entry.getKey().startsWith("_")) {
-                    if (entry.getValue() >= minimumEventCount && 
entry.getValue() <= maximumEventCount) {
-                        ids.add(entry.getKey());
-                    }
-                }
-            }
-        }
-
-        return 
QueryBuilders.idsQuery(Profile.ITEM_TYPE).addIds(ids.toArray(new 
String[ids.size()]));
+        return andCondition;
     }
+
 }

Reply via email to