This is an automated email from the ASF dual-hosted git repository.

btellier pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/james-project.git

commit 5b6427a687f265f8f1e511462e2ce3f6b66d0136
Author: Benoit Tellier <btell...@linagora.com>
AuthorDate: Thu Oct 10 10:44:44 2019 +0700

    JAMES-2917 Rely on ElasticSearch routing key
    
    Our queries are mostly bounded to a mailbox or an user. We can easily
    limit the number of ElasticSearch nodes involved in a given query by
    grouping the underlying documents on the same node using a routingKey.
    
    Without routing key, each shard needs to execute the query. The coordinator
    needs also to be waiting for the slowest shard.
    
    This changeset unlocks significant throughput enhancement (proportional
    to the number of shard) and also a possible high percentile latencies 
enhancement.
    
    However a data reindex is needed.
---
 .../james/backends/es/DeleteByQueryPerformer.java  | 20 +++--
 ...{UpdatedRepresentation.java => DocumentId.java} | 44 ++++-------
 .../james/backends/es/ElasticSearchIndexer.java    | 39 ++++++----
 ...{UpdatedRepresentation.java => RoutingKey.java} | 46 +++++------
 .../james/backends/es/UpdatedRepresentation.java   |  8 +-
 .../apache/james/backends/es/DocumentIdTest.java   | 46 +++++++++++
 .../backends/es/ElasticSearchIndexerTest.java      | 90 ++++++++++++----------
 .../apache/james/backends/es/RoutingKeyTest.java   | 46 +++++++++++
 .../ElasticSearchListeningMessageSearchIndex.java  | 39 ++++++----
 .../search/ElasticSearchSearcher.java              | 11 ++-
 .../events/ElasticSearchQuotaMailboxListener.java  | 17 +++-
 11 files changed, 264 insertions(+), 142 deletions(-)

diff --git 
a/backends-common/elasticsearch/src/main/java/org/apache/james/backends/es/DeleteByQueryPerformer.java
 
b/backends-common/elasticsearch/src/main/java/org/apache/james/backends/es/DeleteByQueryPerformer.java
index 26376c6..f9ba528 100644
--- 
a/backends-common/elasticsearch/src/main/java/org/apache/james/backends/es/DeleteByQueryPerformer.java
+++ 
b/backends-common/elasticsearch/src/main/java/org/apache/james/backends/es/DeleteByQueryPerformer.java
@@ -44,23 +44,26 @@ public class DeleteByQueryPerformer {
     private final WriteAliasName aliasName;
 
     @VisibleForTesting
-    public DeleteByQueryPerformer(RestHighLevelClient client, int batchSize, 
WriteAliasName aliasName) {
+    DeleteByQueryPerformer(RestHighLevelClient client, int batchSize, 
WriteAliasName aliasName) {
         this.client = client;
         this.batchSize = batchSize;
         this.aliasName = aliasName;
     }
 
-    public Mono<Void> perform(QueryBuilder queryBuilder) {
-        return Flux.fromStream(new ScrolledSearch(client, 
prepareSearch(queryBuilder)).searchResponses())
-            .flatMap(searchResponse -> deleteRetrievedIds(client, 
searchResponse))
+    public Mono<Void> perform(QueryBuilder queryBuilder, RoutingKey 
routingKey) {
+        SearchRequest searchRequest = prepareSearch(queryBuilder, routingKey);
+
+        return Flux.fromStream(new ScrolledSearch(client, 
searchRequest).searchResponses())
+            .flatMap(searchResponse -> deleteRetrievedIds(client, 
searchResponse, routingKey))
             .thenEmpty(Mono.empty());
     }
 
-    private SearchRequest prepareSearch(QueryBuilder queryBuilder) {
+    private SearchRequest prepareSearch(QueryBuilder queryBuilder, RoutingKey 
routingKey) {
         return new SearchRequest(aliasName.getValue())
             .types(NodeMappingFactory.DEFAULT_MAPPING_NAME)
             .scroll(TIMEOUT)
-            .source(searchSourceBuilder(queryBuilder));
+            .source(searchSourceBuilder(queryBuilder))
+            .routing(routingKey.asString());
     }
 
     private SearchSourceBuilder searchSourceBuilder(QueryBuilder queryBuilder) 
{
@@ -69,14 +72,15 @@ public class DeleteByQueryPerformer {
             .size(batchSize);
     }
 
-    private Mono<BulkResponse> deleteRetrievedIds(RestHighLevelClient client, 
SearchResponse searchResponse) {
+    private Mono<BulkResponse> deleteRetrievedIds(RestHighLevelClient client, 
SearchResponse searchResponse, RoutingKey routingKey) {
         BulkRequest request = new BulkRequest();
 
         for (SearchHit hit : searchResponse.getHits()) {
             request.add(
                 new DeleteRequest(aliasName.getValue())
                     .type(NodeMappingFactory.DEFAULT_MAPPING_NAME)
-                    .id(hit.getId()));
+                    .id(hit.getId())
+                    .routing(routingKey.asString()));
         }
 
         return Mono.fromCallable(() -> client.bulk(request));
diff --git 
a/backends-common/elasticsearch/src/main/java/org/apache/james/backends/es/UpdatedRepresentation.java
 
b/backends-common/elasticsearch/src/main/java/org/apache/james/backends/es/DocumentId.java
similarity index 56%
copy from 
backends-common/elasticsearch/src/main/java/org/apache/james/backends/es/UpdatedRepresentation.java
copy to 
backends-common/elasticsearch/src/main/java/org/apache/james/backends/es/DocumentId.java
index 92ce8c8..7633208 100644
--- 
a/backends-common/elasticsearch/src/main/java/org/apache/james/backends/es/UpdatedRepresentation.java
+++ 
b/backends-common/elasticsearch/src/main/java/org/apache/james/backends/es/DocumentId.java
@@ -16,54 +16,44 @@
  * specific language governing permissions and limitations      *
  * under the License.                                           *
  ****************************************************************/
+
 package org.apache.james.backends.es;
 
 import java.util.Objects;
 
 import org.elasticsearch.common.Strings;
 
-import com.google.common.base.MoreObjects;
 import com.google.common.base.Preconditions;
 
-public class UpdatedRepresentation {
-    private final String id;
-    private final String updatedDocumentPart;
+public class DocumentId {
 
-    public UpdatedRepresentation(String id, String updatedDocumentPart) {
-        Preconditions.checkArgument(!Strings.isNullOrEmpty(id), "Updated id 
must be specified " + id);
-        
Preconditions.checkArgument(!Strings.isNullOrEmpty(updatedDocumentPart), 
"Updated document must be specified");
-        this.id = id;
-        this.updatedDocumentPart = updatedDocumentPart;
+    public static DocumentId fromString(String value) {
+        return new DocumentId(value);
     }
 
-    public String getId() {
-        return id;
+    private final String value;
+
+    private DocumentId(String value) {
+        Preconditions.checkArgument(!Strings.isNullOrEmpty(value), "DocumentId 
must be specified");
+        this.value = value;
     }
 
-    public String getUpdatedDocumentPart() {
-        return updatedDocumentPart;
+    public String asString() {
+        return value;
     }
 
     @Override
     public final boolean equals(Object o) {
-        if (o instanceof UpdatedRepresentation) {
-            UpdatedRepresentation other = (UpdatedRepresentation) o;
-            return Objects.equals(id, other.id)
-                && Objects.equals(updatedDocumentPart, 
other.updatedDocumentPart);
+        if (o instanceof DocumentId) {
+            DocumentId that = (DocumentId) o;
+
+            return Objects.equals(this.value, that.value);
         }
         return false;
     }
 
     @Override
     public final int hashCode() {
-        return Objects.hash(id, updatedDocumentPart);
-    }
-
-    @Override
-    public String toString() {
-        return MoreObjects.toStringHelper(this)
-            .add("id", id)
-            .add("updatedDocumentPart", updatedDocumentPart)
-            .toString();
+        return Objects.hash(value);
     }
-}
\ No newline at end of file
+}
diff --git 
a/backends-common/elasticsearch/src/main/java/org/apache/james/backends/es/ElasticSearchIndexer.java
 
b/backends-common/elasticsearch/src/main/java/org/apache/james/backends/es/ElasticSearchIndexer.java
index bc586a4..99a9f80 100644
--- 
a/backends-common/elasticsearch/src/main/java/org/apache/james/backends/es/ElasticSearchIndexer.java
+++ 
b/backends-common/elasticsearch/src/main/java/org/apache/james/backends/es/ElasticSearchIndexer.java
@@ -64,28 +64,34 @@ public class ElasticSearchIndexer {
         this.aliasName = aliasName;
     }
 
-    public IndexResponse index(String id, String content) throws IOException {
+    public IndexResponse index(DocumentId id, String content, RoutingKey 
routingKey) throws IOException {
         checkArgument(content);
-        if (LOGGER.isDebugEnabled()) {
-            LOGGER.debug("Indexing {}: {}", id, StringUtils.left(content, 
DEBUG_MAX_LENGTH_CONTENT));
-        }
-        return client.index(
-            new IndexRequest(aliasName.getValue())
+        logContent(id, content);
+        return client.index(new IndexRequest(aliasName.getValue())
                 .type(NodeMappingFactory.DEFAULT_MAPPING_NAME)
-                .id(id)
-                .source(content, XContentType.JSON),
+                .id(id.asString())
+                .source(content, XContentType.JSON)
+                .routing(routingKey.asString()),
             RequestOptions.DEFAULT);
     }
 
-    public Optional<BulkResponse> update(List<UpdatedRepresentation> 
updatedDocumentParts) throws IOException {
+    private void logContent(DocumentId id, String content) {
+        if (LOGGER.isDebugEnabled()) {
+            LOGGER.debug("Indexing {}: {}", id.asString(), 
StringUtils.left(content, DEBUG_MAX_LENGTH_CONTENT));
+        }
+    }
+
+    public Optional<BulkResponse> update(List<UpdatedRepresentation> 
updatedDocumentParts, RoutingKey routingKey) throws IOException {
         try {
             Preconditions.checkNotNull(updatedDocumentParts);
+            Preconditions.checkNotNull(routingKey);
             BulkRequest request = new BulkRequest();
             updatedDocumentParts.forEach(updatedDocumentPart -> request.add(
                 new UpdateRequest(aliasName.getValue(),
                     NodeMappingFactory.DEFAULT_MAPPING_NAME,
-                    updatedDocumentPart.getId())
-                .doc(updatedDocumentPart.getUpdatedDocumentPart(), 
XContentType.JSON)));
+                    updatedDocumentPart.getId().asString())
+                .doc(updatedDocumentPart.getUpdatedDocumentPart(), 
XContentType.JSON)
+                .routing(routingKey.asString())));
             return Optional.of(client.bulk(request, RequestOptions.DEFAULT));
         } catch (ValidationException e) {
             LOGGER.warn("Error while updating index", e);
@@ -93,22 +99,23 @@ public class ElasticSearchIndexer {
         }
     }
 
-    public Optional<BulkResponse> delete(List<String> ids) throws IOException {
+    public Optional<BulkResponse> delete(List<DocumentId> ids, RoutingKey 
routingKey) throws IOException {
         try {
             BulkRequest request = new BulkRequest();
             ids.forEach(id -> request.add(
                 new DeleteRequest(aliasName.getValue())
                     .type(NodeMappingFactory.DEFAULT_MAPPING_NAME)
-                    .id(id)));
-            return Optional.of(client.bulk(request));
+                    .id(id.asString())
+                    .routing(routingKey.asString())));
+            return Optional.of(client.bulk(request, RequestOptions.DEFAULT));
         } catch (ValidationException e) {
             LOGGER.warn("Error while deleting index", e);
             return Optional.empty();
         }
     }
 
-    public void deleteAllMatchingQuery(QueryBuilder queryBuilder) {
-        deleteByQueryPerformer.perform(queryBuilder).block();
+    public void deleteAllMatchingQuery(QueryBuilder queryBuilder, RoutingKey 
routingKey) {
+        deleteByQueryPerformer.perform(queryBuilder, routingKey).block();
     }
 
     private void checkArgument(String content) {
diff --git 
a/backends-common/elasticsearch/src/main/java/org/apache/james/backends/es/UpdatedRepresentation.java
 
b/backends-common/elasticsearch/src/main/java/org/apache/james/backends/es/RoutingKey.java
similarity index 56%
copy from 
backends-common/elasticsearch/src/main/java/org/apache/james/backends/es/UpdatedRepresentation.java
copy to 
backends-common/elasticsearch/src/main/java/org/apache/james/backends/es/RoutingKey.java
index 92ce8c8..05021e5 100644
--- 
a/backends-common/elasticsearch/src/main/java/org/apache/james/backends/es/UpdatedRepresentation.java
+++ 
b/backends-common/elasticsearch/src/main/java/org/apache/james/backends/es/RoutingKey.java
@@ -16,54 +16,44 @@
  * specific language governing permissions and limitations      *
  * under the License.                                           *
  ****************************************************************/
+
 package org.apache.james.backends.es;
 
 import java.util.Objects;
 
 import org.elasticsearch.common.Strings;
 
-import com.google.common.base.MoreObjects;
 import com.google.common.base.Preconditions;
 
-public class UpdatedRepresentation {
-    private final String id;
-    private final String updatedDocumentPart;
-
-    public UpdatedRepresentation(String id, String updatedDocumentPart) {
-        Preconditions.checkArgument(!Strings.isNullOrEmpty(id), "Updated id 
must be specified " + id);
-        
Preconditions.checkArgument(!Strings.isNullOrEmpty(updatedDocumentPart), 
"Updated document must be specified");
-        this.id = id;
-        this.updatedDocumentPart = updatedDocumentPart;
+public class RoutingKey {
+    public static RoutingKey fromString(String value) {
+        return new RoutingKey(value);
     }
 
-    public String getId() {
-        return id;
+
+    private final String value;
+
+    private RoutingKey(String value) {
+        Preconditions.checkArgument(!Strings.isNullOrEmpty(value), "RoutingKey 
must be specified");
+        this.value = value;
     }
 
-    public String getUpdatedDocumentPart() {
-        return updatedDocumentPart;
+    public String asString() {
+        return value;
     }
 
     @Override
     public final boolean equals(Object o) {
-        if (o instanceof UpdatedRepresentation) {
-            UpdatedRepresentation other = (UpdatedRepresentation) o;
-            return Objects.equals(id, other.id)
-                && Objects.equals(updatedDocumentPart, 
other.updatedDocumentPart);
+        if (o instanceof RoutingKey) {
+            RoutingKey that = (RoutingKey) o;
+
+            return Objects.equals(this.value, that.value);
         }
         return false;
     }
 
     @Override
     public final int hashCode() {
-        return Objects.hash(id, updatedDocumentPart);
-    }
-
-    @Override
-    public String toString() {
-        return MoreObjects.toStringHelper(this)
-            .add("id", id)
-            .add("updatedDocumentPart", updatedDocumentPart)
-            .toString();
+        return Objects.hash(value);
     }
-}
\ No newline at end of file
+}
diff --git 
a/backends-common/elasticsearch/src/main/java/org/apache/james/backends/es/UpdatedRepresentation.java
 
b/backends-common/elasticsearch/src/main/java/org/apache/james/backends/es/UpdatedRepresentation.java
index 92ce8c8..a14605c 100644
--- 
a/backends-common/elasticsearch/src/main/java/org/apache/james/backends/es/UpdatedRepresentation.java
+++ 
b/backends-common/elasticsearch/src/main/java/org/apache/james/backends/es/UpdatedRepresentation.java
@@ -26,17 +26,17 @@ import com.google.common.base.MoreObjects;
 import com.google.common.base.Preconditions;
 
 public class UpdatedRepresentation {
-    private final String id;
+    private final DocumentId id;
     private final String updatedDocumentPart;
 
-    public UpdatedRepresentation(String id, String updatedDocumentPart) {
-        Preconditions.checkArgument(!Strings.isNullOrEmpty(id), "Updated id 
must be specified " + id);
+    public UpdatedRepresentation(DocumentId id, String updatedDocumentPart) {
+        Preconditions.checkNotNull(id);
         
Preconditions.checkArgument(!Strings.isNullOrEmpty(updatedDocumentPart), 
"Updated document must be specified");
         this.id = id;
         this.updatedDocumentPart = updatedDocumentPart;
     }
 
-    public String getId() {
+    public DocumentId getId() {
         return id;
     }
 
diff --git 
a/backends-common/elasticsearch/src/test/java/org/apache/james/backends/es/DocumentIdTest.java
 
b/backends-common/elasticsearch/src/test/java/org/apache/james/backends/es/DocumentIdTest.java
new file mode 100644
index 0000000..bc9306b
--- /dev/null
+++ 
b/backends-common/elasticsearch/src/test/java/org/apache/james/backends/es/DocumentIdTest.java
@@ -0,0 +1,46 @@
+/****************************************************************
+ * Licensed to the Apache Software Foundation (ASF) under one   *
+ * or more contributor license agreements.  See the NOTICE file *
+ * distributed with this work for additional information        *
+ * regarding copyright ownership.  The ASF licenses this file   *
+ * to you under the Apache License, Version 2.0 (the            *
+ * "License"); you may not use this file except in compliance   *
+ * with the License.  You may obtain a copy of the License at   *
+ *                                                              *
+ *   http://www.apache.org/licenses/LICENSE-2.0                 *
+ *                                                              *
+ * Unless required by applicable law or agreed to in writing,   *
+ * software distributed under the License is distributed on an  *
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY       *
+ * KIND, either express or implied.  See the License for the    *
+ * specific language governing permissions and limitations      *
+ * under the License.                                           *
+ ****************************************************************/
+
+package org.apache.james.backends.es;
+
+import static org.assertj.core.api.Assertions.assertThatThrownBy;
+
+import org.junit.jupiter.api.Test;
+
+import nl.jqno.equalsverifier.EqualsVerifier;
+
+class DocumentIdTest {
+    @Test
+    void documentIdShouldRespectBeanContract() {
+        EqualsVerifier.forClass(DocumentId.class)
+            .verify();
+    }
+
+    @Test
+    void fromStringShouldThrowWhenNull() {
+        assertThatThrownBy(() -> DocumentId.fromString(null))
+            .isInstanceOf(IllegalArgumentException.class);
+    }
+
+    @Test
+    void fromStringShouldThrowWhenEmpty() {
+        assertThatThrownBy(() -> DocumentId.fromString(""))
+            .isInstanceOf(IllegalArgumentException.class);
+    }
+}
\ No newline at end of file
diff --git 
a/backends-common/elasticsearch/src/test/java/org/apache/james/backends/es/ElasticSearchIndexerTest.java
 
b/backends-common/elasticsearch/src/test/java/org/apache/james/backends/es/ElasticSearchIndexerTest.java
index ba4e4f7..e8fefbc 100644
--- 
a/backends-common/elasticsearch/src/test/java/org/apache/james/backends/es/ElasticSearchIndexerTest.java
+++ 
b/backends-common/elasticsearch/src/test/java/org/apache/james/backends/es/ElasticSearchIndexerTest.java
@@ -20,6 +20,7 @@
 package org.apache.james.backends.es;
 
 import static org.assertj.core.api.Assertions.assertThat;
+import static org.assertj.core.api.Assertions.assertThatCode;
 import static org.assertj.core.api.Assertions.assertThatThrownBy;
 import static org.awaitility.Duration.ONE_HUNDRED_MILLISECONDS;
 import static org.elasticsearch.index.query.QueryBuilders.termQuery;
@@ -43,6 +44,9 @@ import org.junit.Test;
 import com.google.common.collect.ImmutableList;
 
 public class ElasticSearchIndexerTest {
+    public static RoutingKey useDocumentId(DocumentId documentId) {
+        return RoutingKey.fromString(documentId.asString());
+    }
 
     private static final int MINIMUM_BATCH_SIZE = 1;
     private static final IndexName INDEX_NAME = new IndexName("index_name");
@@ -52,6 +56,8 @@ public class ElasticSearchIndexerTest {
         .with().pollInterval(ONE_HUNDRED_MILLISECONDS)
         .and().pollDelay(ONE_HUNDRED_MILLISECONDS)
         .await();
+    private static final RoutingKey ROUTING = RoutingKey.fromString("routing");
+    private static final DocumentId DOCUMENT_ID = DocumentId.fromString("1");
 
     @Rule
     public DockerElasticSearchRule elasticSearch = new 
DockerElasticSearchRule();
@@ -75,10 +81,10 @@ public class ElasticSearchIndexerTest {
 
     @Test
     public void indexMessageShouldWork() throws Exception {
-        String messageId = "1";
+        DocumentId documentId = DocumentId.fromString("1");
         String content = "{\"message\": \"trying out Elasticsearch\"}";
         
-        testee.index(messageId, content);
+        testee.index(documentId, content, useDocumentId(documentId));
         elasticSearch.awaitForElasticSearch();
         
         SearchResponse searchResponse = client.search(
@@ -90,19 +96,18 @@ public class ElasticSearchIndexerTest {
     
     @Test
     public void indexMessageShouldThrowWhenJsonIsNull() {
-        assertThatThrownBy(() -> testee.index("1", null))
+        assertThatThrownBy(() -> testee.index(DOCUMENT_ID, null, ROUTING))
             .isInstanceOf(IllegalArgumentException.class);
     }
     
     @Test
     public void updateMessages() throws Exception {
-        String messageId = "1";
         String content = "{\"message\": \"trying out 
Elasticsearch\",\"field\":\"Should be unchanged\"}";
 
-        testee.index(messageId, content);
+        testee.index(DOCUMENT_ID, content, useDocumentId(DOCUMENT_ID));
         elasticSearch.awaitForElasticSearch();
 
-        testee.update(ImmutableList.of(new UpdatedRepresentation(messageId, 
"{\"message\": \"mastering out Elasticsearch\"}")));
+        testee.update(ImmutableList.of(new UpdatedRepresentation(DOCUMENT_ID, 
"{\"message\": \"mastering out Elasticsearch\"}")), useDocumentId(DOCUMENT_ID));
         elasticSearch.awaitForElasticSearch();
 
 
@@ -121,37 +126,42 @@ public class ElasticSearchIndexerTest {
 
     @Test
     public void updateMessageShouldThrowWhenJsonIsNull() {
-        assertThatThrownBy(() -> testee.update(ImmutableList.of(new 
UpdatedRepresentation("1", null))))
+        assertThatThrownBy(() -> testee.update(ImmutableList.of(
+                new UpdatedRepresentation(DOCUMENT_ID, null)), ROUTING))
             .isInstanceOf(IllegalArgumentException.class);
     }
 
     @Test
     public void updateMessageShouldThrowWhenIdIsNull() {
-        assertThatThrownBy(() -> testee.update(ImmutableList.of(new 
UpdatedRepresentation(null, "{\"message\": \"mastering out Elasticsearch\"}"))))
-            .isInstanceOf(IllegalArgumentException.class);
+        assertThatThrownBy(() -> testee.update(ImmutableList.of(
+                new UpdatedRepresentation(null, "{\"message\": \"mastering out 
Elasticsearch\"}")), ROUTING))
+            .isInstanceOf(NullPointerException.class);
     }
 
     @Test
     public void updateMessageShouldThrowWhenJsonIsEmpty() {
-        assertThatThrownBy(() -> testee.update(ImmutableList.of(new 
UpdatedRepresentation("1", ""))))
+        assertThatThrownBy(() -> testee.update(ImmutableList.of(
+                new UpdatedRepresentation(DOCUMENT_ID, "")), ROUTING))
             .isInstanceOf(IllegalArgumentException.class);
     }
 
     @Test
-    public void updateMessageShouldThrowWhenIdIsEmpty() {
-        assertThatThrownBy(() -> testee.update(ImmutableList.of(new 
UpdatedRepresentation("", "{\"message\": \"mastering out Elasticsearch\"}"))))
-            .isInstanceOf(IllegalArgumentException.class);
+    public void updateMessageShouldThrowWhenRoutingKeyIsNull() {
+        assertThatThrownBy(() -> testee.update(ImmutableList.of(
+                new UpdatedRepresentation(DOCUMENT_ID, "{\"message\": 
\"mastering out Elasticsearch\"}")), null))
+            .isInstanceOf(NullPointerException.class);
     }
 
     @Test
     public void deleteByQueryShouldWorkOnSingleMessage() throws Exception {
-        String messageId = "1:2";
+        DocumentId documentId =  DocumentId.fromString("1:2");
         String content = "{\"message\": \"trying out Elasticsearch\", 
\"property\":\"1\"}";
+        RoutingKey routingKey = useDocumentId(documentId);
 
-        testee.index(messageId, content);
+        testee.index(documentId, content, routingKey);
         elasticSearch.awaitForElasticSearch();
         
-        testee.deleteAllMatchingQuery(termQuery("property", "1"));
+        testee.deleteAllMatchingQuery(termQuery("property", "1"), routingKey);
         elasticSearch.awaitForElasticSearch();
         
         CALMLY_AWAIT.atMost(Duration.TEN_SECONDS)
@@ -164,23 +174,23 @@ public class ElasticSearchIndexerTest {
 
     @Test
     public void deleteByQueryShouldWorkWhenMultipleMessages() throws Exception 
{
-        String messageId = "1:1";
+        DocumentId documentId = DocumentId.fromString("1:1");
         String content = "{\"message\": \"trying out Elasticsearch\", 
\"property\":\"1\"}";
         
-        testee.index(messageId, content);
-        
-        String messageId2 = "1:2";
+        testee.index(documentId, content, ROUTING);
+
+        DocumentId documentId2 = DocumentId.fromString("1:2");
         String content2 = "{\"message\": \"trying out Elasticsearch 2\", 
\"property\":\"1\"}";
         
-        testee.index(messageId2, content2);
-        
-        String messageId3 = "2:3";
+        testee.index(documentId2, content2, ROUTING);
+
+        DocumentId documentId3 = DocumentId.fromString("2:3");
         String content3 = "{\"message\": \"trying out Elasticsearch 3\", 
\"property\":\"2\"}";
         
-        testee.index(messageId3, content3);
+        testee.index(documentId3, content3, ROUTING);
         elasticSearch.awaitForElasticSearch();
 
-        testee.deleteAllMatchingQuery(termQuery("property", "1"));
+        testee.deleteAllMatchingQuery(termQuery("property", "1"), ROUTING);
         elasticSearch.awaitForElasticSearch();
         
         CALMLY_AWAIT.atMost(Duration.TEN_SECONDS)
@@ -193,13 +203,13 @@ public class ElasticSearchIndexerTest {
     
     @Test
     public void deleteMessage() throws Exception {
-        String messageId = "1:2";
+        DocumentId documentId = DocumentId.fromString("1:2");
         String content = "{\"message\": \"trying out Elasticsearch\"}";
 
-        testee.index(messageId, content);
+        testee.index(documentId, content, useDocumentId(documentId));
         elasticSearch.awaitForElasticSearch();
 
-        testee.delete(ImmutableList.of(messageId));
+        testee.delete(ImmutableList.of(documentId), useDocumentId(documentId));
         elasticSearch.awaitForElasticSearch();
         
         SearchResponse searchResponse = client.search(
@@ -211,23 +221,23 @@ public class ElasticSearchIndexerTest {
 
     @Test
     public void deleteShouldWorkWhenMultipleMessages() throws Exception {
-        String messageId = "1:1";
+        DocumentId documentId = DocumentId.fromString("1:1");
         String content = "{\"message\": \"trying out Elasticsearch\", 
\"mailboxId\":\"1\"}";
 
-        testee.index(messageId, content);
+        testee.index(documentId, content, ROUTING);
 
-        String messageId2 = "1:2";
+        DocumentId documentId2 = DocumentId.fromString("1:2");
         String content2 = "{\"message\": \"trying out Elasticsearch 2\", 
\"mailboxId\":\"1\"}";
 
-        testee.index(messageId2, content2);
+        testee.index(documentId2, content2, ROUTING);
 
-        String messageId3 = "2:3";
+        DocumentId documentId3 = DocumentId.fromString("2:3");
         String content3 = "{\"message\": \"trying out Elasticsearch 3\", 
\"mailboxId\":\"2\"}";
 
-        testee.index(messageId3, content3);
+        testee.index(documentId3, content3, ROUTING);
         elasticSearch.awaitForElasticSearch();
 
-        testee.delete(ImmutableList.of(messageId, messageId3));
+        testee.delete(ImmutableList.of(documentId, documentId3), ROUTING);
         elasticSearch.awaitForElasticSearch();
 
         SearchResponse searchResponse = client.search(
@@ -238,12 +248,14 @@ public class ElasticSearchIndexerTest {
     }
     
     @Test
-    public void updateMessagesShouldNotThrowWhenEmptyList() throws Exception {
-        testee.update(ImmutableList.of());
+    public void updateMessagesShouldNotThrowWhenEmptyList() {
+        assertThatCode(() -> testee.update(ImmutableList.of(), ROUTING))
+            .doesNotThrowAnyException();
     }
     
     @Test
-    public void deleteMessagesShouldNotThrowWhenEmptyList() throws Exception {
-        testee.delete(ImmutableList.of());
+    public void deleteMessagesShouldNotThrowWhenEmptyList() {
+        assertThatCode(() -> testee.delete(ImmutableList.of(), ROUTING))
+            .doesNotThrowAnyException();
     }
 }
diff --git 
a/backends-common/elasticsearch/src/test/java/org/apache/james/backends/es/RoutingKeyTest.java
 
b/backends-common/elasticsearch/src/test/java/org/apache/james/backends/es/RoutingKeyTest.java
new file mode 100644
index 0000000..cfa4d4e
--- /dev/null
+++ 
b/backends-common/elasticsearch/src/test/java/org/apache/james/backends/es/RoutingKeyTest.java
@@ -0,0 +1,46 @@
+/****************************************************************
+ * Licensed to the Apache Software Foundation (ASF) under one   *
+ * or more contributor license agreements.  See the NOTICE file *
+ * distributed with this work for additional information        *
+ * regarding copyright ownership.  The ASF licenses this file   *
+ * to you under the Apache License, Version 2.0 (the            *
+ * "License"); you may not use this file except in compliance   *
+ * with the License.  You may obtain a copy of the License at   *
+ *                                                              *
+ *   http://www.apache.org/licenses/LICENSE-2.0                 *
+ *                                                              *
+ * Unless required by applicable law or agreed to in writing,   *
+ * software distributed under the License is distributed on an  *
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY       *
+ * KIND, either express or implied.  See the License for the    *
+ * specific language governing permissions and limitations      *
+ * under the License.                                           *
+ ****************************************************************/
+
+package org.apache.james.backends.es;
+
+import static org.assertj.core.api.Assertions.assertThatThrownBy;
+
+import org.junit.jupiter.api.Test;
+
+import nl.jqno.equalsverifier.EqualsVerifier;
+
+class RoutingKeyTest {
+    @Test
+    void routingKeyShouldRespectBeanContract() {
+        EqualsVerifier.forClass(RoutingKey.class)
+            .verify();
+    }
+
+    @Test
+    void fromStringShouldThrowWhenNull() {
+        assertThatThrownBy(() -> RoutingKey.fromString(null))
+            .isInstanceOf(IllegalArgumentException.class);
+    }
+
+    @Test
+    void fromStringShouldThrowWhenEmpty() {
+        assertThatThrownBy(() -> RoutingKey.fromString(""))
+            .isInstanceOf(IllegalArgumentException.class);
+    }
+}
\ No newline at end of file
diff --git 
a/mailbox/elasticsearch/src/main/java/org/apache/james/mailbox/elasticsearch/events/ElasticSearchListeningMessageSearchIndex.java
 
b/mailbox/elasticsearch/src/main/java/org/apache/james/mailbox/elasticsearch/events/ElasticSearchListeningMessageSearchIndex.java
index 657f670..c695b85 100644
--- 
a/mailbox/elasticsearch/src/main/java/org/apache/james/mailbox/elasticsearch/events/ElasticSearchListeningMessageSearchIndex.java
+++ 
b/mailbox/elasticsearch/src/main/java/org/apache/james/mailbox/elasticsearch/events/ElasticSearchListeningMessageSearchIndex.java
@@ -30,7 +30,9 @@ import java.util.stream.Stream;
 import javax.inject.Inject;
 import javax.inject.Named;
 
+import org.apache.james.backends.es.DocumentId;
 import org.apache.james.backends.es.ElasticSearchIndexer;
+import org.apache.james.backends.es.RoutingKey;
 import org.apache.james.backends.es.UpdatedRepresentation;
 import org.apache.james.mailbox.MailboxManager.MessageCapabilities;
 import org.apache.james.mailbox.MailboxManager.SearchCapabilities;
@@ -51,6 +53,7 @@ import org.apache.james.mailbox.store.SessionProvider;
 import org.apache.james.mailbox.store.mail.model.MailboxMessage;
 import org.apache.james.mailbox.store.search.ListeningMessageSearchIndex;
 import org.apache.james.util.OptionalUtils;
+import org.elasticsearch.index.query.TermQueryBuilder;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -139,7 +142,7 @@ public class ElasticSearchListeningMessageSearchIndex 
extends ListeningMessageSe
 
         String jsonContent = generateIndexedJson(mailbox, message, session);
 
-        elasticSearchIndexer.index(indexIdFor(mailbox, message.getUid()), 
jsonContent);
+        elasticSearchIndexer.index(indexIdFor(mailbox, message.getUid()), 
jsonContent, toRoutingKey(mailbox.getMailboxId()));
     }
 
     private String generateIndexedJson(Mailbox mailbox, MailboxMessage 
message, MailboxSession session) throws JsonProcessingException {
@@ -161,26 +164,29 @@ public class ElasticSearchListeningMessageSearchIndex 
extends ListeningMessageSe
             elasticSearchIndexer
                 .delete(expungedUids.stream()
                     .map(uid ->  indexIdFor(mailbox, uid))
-                    .collect(Guavate.toImmutableList()));
+                    .collect(Guavate.toImmutableList()),
+                    toRoutingKey(mailbox.getMailboxId()));
     }
 
     @Override
     public void deleteAll(MailboxSession session, MailboxId mailboxId) {
-            elasticSearchIndexer
-                .deleteAllMatchingQuery(
-                    termQuery(
-                        JsonMessageConstants.MAILBOX_ID,
-                        mailboxId.serialize()));
+        TermQueryBuilder queryBuilder = termQuery(
+            JsonMessageConstants.MAILBOX_ID,
+            mailboxId.serialize());
+
+        elasticSearchIndexer
+                .deleteAllMatchingQuery(queryBuilder, toRoutingKey(mailboxId));
     }
 
     @Override
     public void update(MailboxSession session, Mailbox mailbox, 
List<UpdatedFlags> updatedFlagsList) throws IOException {
-            elasticSearchIndexer
-                .update(updatedFlagsList.stream()
-                    .map(Throwing.<UpdatedFlags, 
UpdatedRepresentation>function(
-                            updatedFlags -> 
createUpdatedDocumentPartFromUpdatedFlags(mailbox, updatedFlags))
-                        .sneakyThrow())
-                    .collect(Guavate.toImmutableList()));
+        ImmutableList<UpdatedRepresentation> updates = 
updatedFlagsList.stream()
+            .map(Throwing.<UpdatedFlags, UpdatedRepresentation>function(
+                updatedFlags -> 
createUpdatedDocumentPartFromUpdatedFlags(mailbox, updatedFlags))
+                .sneakyThrow())
+            .collect(Guavate.toImmutableList());
+
+        elasticSearchIndexer.update(updates, 
toRoutingKey(mailbox.getMailboxId()));
     }
 
     private UpdatedRepresentation 
createUpdatedDocumentPartFromUpdatedFlags(Mailbox mailbox, UpdatedFlags 
updatedFlags) throws JsonProcessingException {
@@ -190,8 +196,8 @@ public class ElasticSearchListeningMessageSearchIndex 
extends ListeningMessageSe
                     .getUpdatedJsonMessagePart(updatedFlags.getNewFlags(), 
updatedFlags.getModSeq()));
     }
 
-    private String indexIdFor(Mailbox mailbox, MessageUid uid) {
-        return String.join(ID_SEPARATOR, mailbox.getMailboxId().serialize(), 
String.valueOf(uid.asLong()));
+    private DocumentId indexIdFor(Mailbox mailbox, MessageUid uid) {
+        return DocumentId.fromString(String.join(ID_SEPARATOR, 
mailbox.getMailboxId().serialize(), String.valueOf(uid.asLong())));
     }
 
     private void logIfNoMessageId(SearchResult searchResult) {
@@ -200,4 +206,7 @@ public class ElasticSearchListeningMessageSearchIndex 
extends ListeningMessageSe
         }
     }
 
+    private RoutingKey toRoutingKey(MailboxId mailboxId) {
+        return RoutingKey.fromString(mailboxId.serialize());
+    }
 }
diff --git 
a/mailbox/elasticsearch/src/main/java/org/apache/james/mailbox/elasticsearch/search/ElasticSearchSearcher.java
 
b/mailbox/elasticsearch/src/main/java/org/apache/james/mailbox/elasticsearch/search/ElasticSearchSearcher.java
index 9406b68..48b67e4 100644
--- 
a/mailbox/elasticsearch/src/main/java/org/apache/james/mailbox/elasticsearch/search/ElasticSearchSearcher.java
+++ 
b/mailbox/elasticsearch/src/main/java/org/apache/james/mailbox/elasticsearch/search/ElasticSearchSearcher.java
@@ -82,9 +82,9 @@ public class ElasticSearchSearcher {
             .orElse(pairStream);
     }
 
-    private SearchRequest prepareSearch(Collection<MailboxId> users, 
SearchQuery query, Optional<Integer> limit) {
+    private SearchRequest prepareSearch(Collection<MailboxId> mailboxIds, 
SearchQuery query, Optional<Integer> limit) {
         SearchSourceBuilder searchSourceBuilder = new SearchSourceBuilder()
-            .query(queryConverter.from(users, query))
+            .query(queryConverter.from(mailboxIds, query))
             .size(computeRequiredSize(limit))
             .storedFields(STORED_FIELDS);
 
@@ -96,7 +96,12 @@ public class ElasticSearchSearcher {
         return new SearchRequest(aliasName.getValue())
             .types(NodeMappingFactory.DEFAULT_MAPPING_NAME)
             .scroll(TIMEOUT)
-            .source(searchSourceBuilder);
+            .source(searchSourceBuilder)
+            .routing(toRoutingKeys(mailboxIds));
+    }
+
+    private String[] toRoutingKeys(Collection<MailboxId> mailboxIds) {
+        return 
mailboxIds.stream().map(MailboxId::serialize).toArray(String[]::new);
     }
 
     private int computeRequiredSize(Optional<Integer> limit) {
diff --git 
a/mailbox/plugin/quota-search-elasticsearch/src/main/java/org/apache/james/quota/search/elasticsearch/events/ElasticSearchQuotaMailboxListener.java
 
b/mailbox/plugin/quota-search-elasticsearch/src/main/java/org/apache/james/quota/search/elasticsearch/events/ElasticSearchQuotaMailboxListener.java
index 4fb96cb..7d27e53 100644
--- 
a/mailbox/plugin/quota-search-elasticsearch/src/main/java/org/apache/james/quota/search/elasticsearch/events/ElasticSearchQuotaMailboxListener.java
+++ 
b/mailbox/plugin/quota-search-elasticsearch/src/main/java/org/apache/james/quota/search/elasticsearch/events/ElasticSearchQuotaMailboxListener.java
@@ -23,7 +23,10 @@ import java.io.IOException;
 import javax.inject.Inject;
 import javax.inject.Named;
 
+import org.apache.james.backends.es.DocumentId;
 import org.apache.james.backends.es.ElasticSearchIndexer;
+import org.apache.james.backends.es.RoutingKey;
+import org.apache.james.core.User;
 import org.apache.james.mailbox.events.Event;
 import org.apache.james.mailbox.events.Group;
 import org.apache.james.mailbox.events.MailboxListener;
@@ -64,7 +67,17 @@ public class ElasticSearchQuotaMailboxListener implements 
MailboxListener.GroupM
     }
 
     private void handleEvent(QuotaUsageUpdatedEvent event) throws IOException {
-        indexer.index(event.getUser().asString(),
-            quotaRatioToElasticSearchJson.convertToJson(event));
+        User user = event.getUser();
+        indexer.index(toDocumentId(user),
+            quotaRatioToElasticSearchJson.convertToJson(event),
+            toRoutingKey(user));
+    }
+
+    private RoutingKey toRoutingKey(User user) {
+        return RoutingKey.fromString(user.asString());
+    }
+
+    private DocumentId toDocumentId(User user) {
+        return DocumentId.fromString(user.asString());
     }
 }


---------------------------------------------------------------------
To unsubscribe, e-mail: server-dev-unsubscr...@james.apache.org
For additional commands, e-mail: server-dev-h...@james.apache.org

Reply via email to