MAILBOX-266 Introduce a scroll Iterable

Project: http://git-wip-us.apache.org/repos/asf/james-project/repo
Commit: http://git-wip-us.apache.org/repos/asf/james-project/commit/ebeafcf0
Tree: http://git-wip-us.apache.org/repos/asf/james-project/tree/ebeafcf0
Diff: http://git-wip-us.apache.org/repos/asf/james-project/diff/ebeafcf0

Branch: refs/heads/master
Commit: ebeafcf0e58691961c1ce4a7bb7394bfaab164a7
Parents: a032429
Author: Benoit Tellier <btell...@linagora.com>
Authored: Mon Mar 28 10:16:48 2016 +0700
Committer: Benoit Tellier <btell...@linagora.com>
Committed: Wed Apr 6 16:18:45 2016 +0700

----------------------------------------------------------------------
 .../elasticsearch/NodeMappingFactory.java       |   6 +-
 .../search/ElasticSearchSearcher.java           |  17 +-
 .../elasticsearch/search/ScrollIterable.java    |  81 +++++++++
 .../search/ScrollIterableTest.java              | 173 +++++++++++++++++++
 4 files changed, 267 insertions(+), 10 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/james-project/blob/ebeafcf0/mailbox/elasticsearch/src/main/java/org/apache/james/mailbox/elasticsearch/NodeMappingFactory.java
----------------------------------------------------------------------
diff --git 
a/mailbox/elasticsearch/src/main/java/org/apache/james/mailbox/elasticsearch/NodeMappingFactory.java
 
b/mailbox/elasticsearch/src/main/java/org/apache/james/mailbox/elasticsearch/NodeMappingFactory.java
index 5a2e90f..758f3c2 100644
--- 
a/mailbox/elasticsearch/src/main/java/org/apache/james/mailbox/elasticsearch/NodeMappingFactory.java
+++ 
b/mailbox/elasticsearch/src/main/java/org/apache/james/mailbox/elasticsearch/NodeMappingFactory.java
@@ -43,12 +43,16 @@ public class NodeMappingFactory {
     public static final String NESTED = "nested";
 
     public static ClientProvider applyMapping(ClientProvider clientProvider) {
+        return applyMapping(clientProvider, getMappingContent());
+    }
+
+    public static ClientProvider applyMapping(ClientProvider clientProvider, 
XContentBuilder mappingsSources) {
         try (Client client = clientProvider.get()) {
             client.admin()
                 .indices()
                 .preparePutMapping(ElasticSearchIndexer.MAILBOX_INDEX)
                 .setType(ElasticSearchIndexer.MESSAGE_TYPE)
-                .setSource(getMappingContent())
+                .setSource(mappingsSources)
                 .execute()
                 .actionGet();
         }

http://git-wip-us.apache.org/repos/asf/james-project/blob/ebeafcf0/mailbox/elasticsearch/src/main/java/org/apache/james/mailbox/elasticsearch/search/ElasticSearchSearcher.java
----------------------------------------------------------------------
diff --git 
a/mailbox/elasticsearch/src/main/java/org/apache/james/mailbox/elasticsearch/search/ElasticSearchSearcher.java
 
b/mailbox/elasticsearch/src/main/java/org/apache/james/mailbox/elasticsearch/search/ElasticSearchSearcher.java
index b0f1734..96d90b5 100644
--- 
a/mailbox/elasticsearch/src/main/java/org/apache/james/mailbox/elasticsearch/search/ElasticSearchSearcher.java
+++ 
b/mailbox/elasticsearch/src/main/java/org/apache/james/mailbox/elasticsearch/search/ElasticSearchSearcher.java
@@ -21,8 +21,11 @@ package org.apache.james.mailbox.elasticsearch.search;
 
 import java.util.Iterator;
 import java.util.Optional;
+import java.util.stream.Stream;
 import java.util.stream.StreamSupport;
 
+import javax.inject.Inject;
+
 import org.apache.james.mailbox.elasticsearch.ClientProvider;
 import org.apache.james.mailbox.elasticsearch.ElasticSearchIndexer;
 import org.apache.james.mailbox.elasticsearch.json.JsonMessageConstants;
@@ -40,8 +43,6 @@ import org.elasticsearch.search.SearchHit;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
-import javax.inject.Inject;
-
 public class ElasticSearchSearcher<Id extends MailboxId> {
 
     private static final Logger LOGGER = 
LoggerFactory.getLogger(ElasticSearchSearcher.class);
@@ -57,9 +58,9 @@ public class ElasticSearchSearcher<Id extends MailboxId> {
 
     public Iterator<Long> search(Mailbox<Id> mailbox, SearchQuery searchQuery) 
throws MailboxException {
         try (Client client = clientProvider.get()) {
-            return 
transformResponseToUidIterator(getSearchRequestBuilder(client, mailbox, 
searchQuery)
-                .get()
-            );
+            return new ScrollIterable(client, getSearchRequestBuilder(client, 
mailbox, searchQuery)).stream()
+                .flatMap(this::transformResponseToUidStream)
+                .iterator();
         }
     }
 
@@ -76,13 +77,11 @@ public class ElasticSearchSearcher<Id extends MailboxId> {
                 (partialResult1, partialResult2) -> partialResult1);
     }
 
-    private Iterator<Long> transformResponseToUidIterator(SearchResponse 
searchResponse) {
+    private Stream<Long> transformResponseToUidStream(SearchResponse 
searchResponse) {
         return StreamSupport.stream(searchResponse.getHits().spliterator(), 
false)
             .map(this::extractUidFromHit)
             .filter(Optional::isPresent)
-            .map(Optional::get)
-            .iterator();
-
+            .map(Optional::get);
     }
 
     private Optional<Long> extractUidFromHit(SearchHit hit) {

http://git-wip-us.apache.org/repos/asf/james-project/blob/ebeafcf0/mailbox/elasticsearch/src/main/java/org/apache/james/mailbox/elasticsearch/search/ScrollIterable.java
----------------------------------------------------------------------
diff --git 
a/mailbox/elasticsearch/src/main/java/org/apache/james/mailbox/elasticsearch/search/ScrollIterable.java
 
b/mailbox/elasticsearch/src/main/java/org/apache/james/mailbox/elasticsearch/search/ScrollIterable.java
new file mode 100644
index 0000000..76acce1
--- /dev/null
+++ 
b/mailbox/elasticsearch/src/main/java/org/apache/james/mailbox/elasticsearch/search/ScrollIterable.java
@@ -0,0 +1,81 @@
+/****************************************************************
+ * Licensed to the Apache Software Foundation (ASF) under one   *
+ * or more contributor license agreements.  See the NOTICE file *
+ * distributed with this work for additional information        *
+ * regarding copyright ownership.  The ASF licenses this file   *
+ * to you under the Apache License, Version 2.0 (the            *
+ * "License"); you may not use this file except in compliance   *
+ * with the License.  You may obtain a copy of the License at   *
+ *                                                              *
+ *   http://www.apache.org/licenses/LICENSE-2.0                 *
+ *                                                              *
+ * Unless required by applicable law or agreed to in writing,   *
+ * software distributed under the License is distributed on an  *
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY       *
+ * KIND, either express or implied.  See the License for the    *
+ * specific language governing permissions and limitations      *
+ * under the License.                                           *
+ ****************************************************************/
+
+package org.apache.james.mailbox.elasticsearch.search;
+
+import java.util.Iterator;
+import java.util.stream.Stream;
+import java.util.stream.StreamSupport;
+
+import org.elasticsearch.action.ListenableActionFuture;
+import org.elasticsearch.action.search.SearchRequestBuilder;
+import org.elasticsearch.action.search.SearchResponse;
+import org.elasticsearch.client.Client;
+import org.elasticsearch.common.unit.TimeValue;
+
+public class ScrollIterable implements Iterable<SearchResponse> {
+
+    private static final TimeValue TIMEOUT = new TimeValue(60000);
+    private final Client client;
+    private final SearchRequestBuilder searchRequestBuilder;
+
+    public ScrollIterable(Client client, SearchRequestBuilder 
searchRequestBuilder) {
+        this.client = client;
+        this.searchRequestBuilder = searchRequestBuilder;
+    }
+
+    @Override
+    public Iterator<SearchResponse> iterator() {
+        return new ScrollIterator(client, searchRequestBuilder);
+    }
+
+    public Stream<SearchResponse> stream() {
+        return StreamSupport.stream(spliterator(), false);
+    }
+
+    public static class ScrollIterator implements Iterator<SearchResponse> {
+
+        private final Client client;
+        private ListenableActionFuture<SearchResponse> searchResponseFuture;
+
+        public ScrollIterator(Client client, SearchRequestBuilder 
searchRequestBuilder) {
+            this.client = client;
+            this.searchResponseFuture = searchRequestBuilder.execute();
+        }
+
+        @Override
+        public boolean hasNext() {
+            return 
!allSearchResponsesConsumed(searchResponseFuture.actionGet());
+        }
+
+        @Override
+        public SearchResponse next() {
+            SearchResponse result = searchResponseFuture.actionGet();
+            searchResponseFuture =  
client.prepareSearchScroll(result.getScrollId())
+                .setScroll(TIMEOUT)
+                .execute();
+            return result;
+        }
+
+        private boolean allSearchResponsesConsumed(SearchResponse 
searchResponse) {
+            return searchResponse.getHits().getHits().length == 0;
+        }
+    }
+
+}

http://git-wip-us.apache.org/repos/asf/james-project/blob/ebeafcf0/mailbox/elasticsearch/src/test/java/org/apache/james/mailbox/elasticsearch/search/ScrollIterableTest.java
----------------------------------------------------------------------
diff --git 
a/mailbox/elasticsearch/src/test/java/org/apache/james/mailbox/elasticsearch/search/ScrollIterableTest.java
 
b/mailbox/elasticsearch/src/test/java/org/apache/james/mailbox/elasticsearch/search/ScrollIterableTest.java
new file mode 100644
index 0000000..2d574f0
--- /dev/null
+++ 
b/mailbox/elasticsearch/src/test/java/org/apache/james/mailbox/elasticsearch/search/ScrollIterableTest.java
@@ -0,0 +1,173 @@
+/****************************************************************
+ * Licensed to the Apache Software Foundation (ASF) under one   *
+ * or more contributor license agreements.  See the NOTICE file *
+ * distributed with this work for additional information        *
+ * regarding copyright ownership.  The ASF licenses this file   *
+ * to you under the Apache License, Version 2.0 (the            *
+ * "License"); you may not use this file except in compliance   *
+ * with the License.  You may obtain a copy of the License at   *
+ *                                                              *
+ *   http://www.apache.org/licenses/LICENSE-2.0                 *
+ *                                                              *
+ * Unless required by applicable law or agreed to in writing,   *
+ * software distributed under the License is distributed on an  *
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY       *
+ * KIND, either express or implied.  See the License for the    *
+ * specific language governing permissions and limitations      *
+ * under the License.                                           *
+ ****************************************************************/
+
+package org.apache.james.mailbox.elasticsearch.search;
+
+import static org.assertj.core.api.Assertions.assertThat;
+import static org.elasticsearch.common.xcontent.XContentFactory.jsonBuilder;
+import static org.elasticsearch.index.query.QueryBuilders.matchAllQuery;
+
+import java.io.IOException;
+import java.util.Arrays;
+import java.util.List;
+import java.util.stream.Collectors;
+
+import org.apache.james.mailbox.elasticsearch.ClientProvider;
+import org.apache.james.mailbox.elasticsearch.ElasticSearchIndexer;
+import org.apache.james.mailbox.elasticsearch.EmbeddedElasticSearch;
+import org.apache.james.mailbox.elasticsearch.IndexCreationFactory;
+import org.apache.james.mailbox.elasticsearch.NodeMappingFactory;
+import org.apache.james.mailbox.elasticsearch.utils.TestingClientProvider;
+import org.elasticsearch.action.search.SearchRequestBuilder;
+import org.elasticsearch.client.Client;
+import org.elasticsearch.common.unit.TimeValue;
+import org.elasticsearch.common.xcontent.XContentBuilder;
+import org.elasticsearch.search.SearchHit;
+import org.junit.Before;
+import org.junit.Rule;
+import org.junit.Test;
+import org.junit.rules.RuleChain;
+import org.junit.rules.TemporaryFolder;
+
+public class ScrollIterableTest {
+
+    public static final TimeValue TIMEOUT = new TimeValue(6000);
+    public static final int SIZE = 2;
+    public static final String MESSAGE = "message";
+
+    private TemporaryFolder temporaryFolder = new TemporaryFolder();
+    private EmbeddedElasticSearch embeddedElasticSearch= new 
EmbeddedElasticSearch(temporaryFolder);
+
+    @Rule
+    public RuleChain ruleChain = 
RuleChain.outerRule(temporaryFolder).around(embeddedElasticSearch);
+
+    private ClientProvider clientProvider;
+
+    @Before
+    public void setUp() throws Exception {
+        clientProvider = new 
TestingClientProvider(embeddedElasticSearch.getNode());
+        IndexCreationFactory.createIndex(clientProvider);
+        embeddedElasticSearch.awaitForElasticSearch();
+        NodeMappingFactory.applyMapping(clientProvider, getMappingsSources());
+    }
+
+    private XContentBuilder getMappingsSources() throws IOException {
+        return jsonBuilder()
+            .startObject()
+                .startObject(ElasticSearchIndexer.MESSAGE_TYPE)
+                    .startObject(NodeMappingFactory.PROPERTIES)
+                        .startObject(MESSAGE)
+                            .field(NodeMappingFactory.TYPE, 
NodeMappingFactory.STRING)
+                        .endObject()
+                    .endObject()
+                .endObject()
+            .endObject();
+    }
+
+    @Test
+    public void scrollIterableShouldWorkWhenEmpty() {
+        try (Client client = clientProvider.get()) {
+            SearchRequestBuilder searchRequestBuilder = 
client.prepareSearch(ElasticSearchIndexer.MAILBOX_INDEX)
+                .setTypes(ElasticSearchIndexer.MESSAGE_TYPE)
+                .setScroll(TIMEOUT)
+                .setQuery(matchAllQuery())
+                .setSize(SIZE);
+            assertThat(new ScrollIterable(client, 
searchRequestBuilder)).isEmpty();
+        }
+    }
+
+    @Test
+    public void scrollIterableShouldWorkWhenOneElement() {
+        try (Client client = clientProvider.get()) {
+            String id = "1";
+            client.prepareIndex(ElasticSearchIndexer.MAILBOX_INDEX, 
ElasticSearchIndexer.MESSAGE_TYPE, id)
+                .setSource(MESSAGE, "Sample message")
+                .execute();
+
+            embeddedElasticSearch.awaitForElasticSearch();
+
+            SearchRequestBuilder searchRequestBuilder = 
client.prepareSearch(ElasticSearchIndexer.MAILBOX_INDEX)
+                .setTypes(ElasticSearchIndexer.MESSAGE_TYPE)
+                .setScroll(TIMEOUT)
+                .setQuery(matchAllQuery())
+                .setSize(SIZE);
+            assertThat(convertToIdList(new ScrollIterable(client, 
searchRequestBuilder))).containsOnly(id);
+        }
+    }
+
+    @Test
+    public void scrollIterableShouldWorkWhenSizeElement() {
+        try (Client client = clientProvider.get()) {
+            String id1 = "1";
+            client.prepareIndex(ElasticSearchIndexer.MAILBOX_INDEX, 
ElasticSearchIndexer.MESSAGE_TYPE, id1)
+                .setSource(MESSAGE, "Sample message")
+                .execute();
+
+            String id2 = "2";
+            client.prepareIndex(ElasticSearchIndexer.MAILBOX_INDEX, 
ElasticSearchIndexer.MESSAGE_TYPE, id2)
+                .setSource(MESSAGE, "Sample message")
+                .execute();
+
+            embeddedElasticSearch.awaitForElasticSearch();
+
+            SearchRequestBuilder searchRequestBuilder = 
client.prepareSearch(ElasticSearchIndexer.MAILBOX_INDEX)
+                .setTypes(ElasticSearchIndexer.MESSAGE_TYPE)
+                .setScroll(TIMEOUT)
+                .setQuery(matchAllQuery())
+                .setSize(SIZE);
+            assertThat(convertToIdList(new ScrollIterable(client, 
searchRequestBuilder))).containsOnly(id1, id2);
+        }
+    }
+
+    @Test
+    public void scrollIterableShouldWorkWhenMoreThanSizeElement() {
+        try (Client client = clientProvider.get()) {
+            String id1 = "1";
+            client.prepareIndex(ElasticSearchIndexer.MAILBOX_INDEX, 
ElasticSearchIndexer.MESSAGE_TYPE, id1)
+                .setSource(MESSAGE, "Sample message")
+                .execute();
+
+            String id2 = "2";
+            client.prepareIndex(ElasticSearchIndexer.MAILBOX_INDEX, 
ElasticSearchIndexer.MESSAGE_TYPE, id2)
+                .setSource(MESSAGE, "Sample message")
+                .execute();
+
+            String id3 = "3";
+            client.prepareIndex(ElasticSearchIndexer.MAILBOX_INDEX, 
ElasticSearchIndexer.MESSAGE_TYPE, id3)
+                .setSource(MESSAGE, "Sample message")
+                .execute();
+
+            embeddedElasticSearch.awaitForElasticSearch();
+
+            SearchRequestBuilder searchRequestBuilder = 
client.prepareSearch(ElasticSearchIndexer.MAILBOX_INDEX)
+                .setTypes(ElasticSearchIndexer.MESSAGE_TYPE)
+                .setScroll(TIMEOUT)
+                .setQuery(matchAllQuery())
+                .setSize(SIZE);
+            assertThat(convertToIdList(new ScrollIterable(client, 
searchRequestBuilder))).containsOnly(id1, id2, id3);
+        }
+    }
+
+    private List<String> convertToIdList(ScrollIterable scrollIterable) {
+        return scrollIterable.stream()
+            .flatMap(searchResponse -> 
Arrays.asList(searchResponse.getHits().getHits()).stream())
+            .map(SearchHit::getId)
+            .collect(Collectors.toList());
+    }
+}


---------------------------------------------------------------------
To unsubscribe, e-mail: server-dev-unsubscr...@james.apache.org
For additional commands, e-mail: server-dev-h...@james.apache.org

Reply via email to