MAILBOX-266 Introduce a scroll Iterable
Project: http://git-wip-us.apache.org/repos/asf/james-project/repo Commit: http://git-wip-us.apache.org/repos/asf/james-project/commit/ebeafcf0 Tree: http://git-wip-us.apache.org/repos/asf/james-project/tree/ebeafcf0 Diff: http://git-wip-us.apache.org/repos/asf/james-project/diff/ebeafcf0 Branch: refs/heads/master Commit: ebeafcf0e58691961c1ce4a7bb7394bfaab164a7 Parents: a032429 Author: Benoit Tellier <btell...@linagora.com> Authored: Mon Mar 28 10:16:48 2016 +0700 Committer: Benoit Tellier <btell...@linagora.com> Committed: Wed Apr 6 16:18:45 2016 +0700 ---------------------------------------------------------------------- .../elasticsearch/NodeMappingFactory.java | 6 +- .../search/ElasticSearchSearcher.java | 17 +- .../elasticsearch/search/ScrollIterable.java | 81 +++++++++ .../search/ScrollIterableTest.java | 173 +++++++++++++++++++ 4 files changed, 267 insertions(+), 10 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/james-project/blob/ebeafcf0/mailbox/elasticsearch/src/main/java/org/apache/james/mailbox/elasticsearch/NodeMappingFactory.java ---------------------------------------------------------------------- diff --git a/mailbox/elasticsearch/src/main/java/org/apache/james/mailbox/elasticsearch/NodeMappingFactory.java b/mailbox/elasticsearch/src/main/java/org/apache/james/mailbox/elasticsearch/NodeMappingFactory.java index 5a2e90f..758f3c2 100644 --- a/mailbox/elasticsearch/src/main/java/org/apache/james/mailbox/elasticsearch/NodeMappingFactory.java +++ b/mailbox/elasticsearch/src/main/java/org/apache/james/mailbox/elasticsearch/NodeMappingFactory.java @@ -43,12 +43,16 @@ public class NodeMappingFactory { public static final String NESTED = "nested"; public static ClientProvider applyMapping(ClientProvider clientProvider) { + return applyMapping(clientProvider, getMappingContent()); + } + + public static ClientProvider applyMapping(ClientProvider clientProvider, XContentBuilder mappingsSources) { try (Client client = clientProvider.get()) { client.admin() .indices() .preparePutMapping(ElasticSearchIndexer.MAILBOX_INDEX) .setType(ElasticSearchIndexer.MESSAGE_TYPE) - .setSource(getMappingContent()) + .setSource(mappingsSources) .execute() .actionGet(); } http://git-wip-us.apache.org/repos/asf/james-project/blob/ebeafcf0/mailbox/elasticsearch/src/main/java/org/apache/james/mailbox/elasticsearch/search/ElasticSearchSearcher.java ---------------------------------------------------------------------- diff --git a/mailbox/elasticsearch/src/main/java/org/apache/james/mailbox/elasticsearch/search/ElasticSearchSearcher.java b/mailbox/elasticsearch/src/main/java/org/apache/james/mailbox/elasticsearch/search/ElasticSearchSearcher.java index b0f1734..96d90b5 100644 --- a/mailbox/elasticsearch/src/main/java/org/apache/james/mailbox/elasticsearch/search/ElasticSearchSearcher.java +++ b/mailbox/elasticsearch/src/main/java/org/apache/james/mailbox/elasticsearch/search/ElasticSearchSearcher.java @@ -21,8 +21,11 @@ package org.apache.james.mailbox.elasticsearch.search; import java.util.Iterator; import java.util.Optional; +import java.util.stream.Stream; import java.util.stream.StreamSupport; +import javax.inject.Inject; + import org.apache.james.mailbox.elasticsearch.ClientProvider; import org.apache.james.mailbox.elasticsearch.ElasticSearchIndexer; import org.apache.james.mailbox.elasticsearch.json.JsonMessageConstants; @@ -40,8 +43,6 @@ import org.elasticsearch.search.SearchHit; import org.slf4j.Logger; import org.slf4j.LoggerFactory; -import javax.inject.Inject; - public class ElasticSearchSearcher<Id extends MailboxId> { private static final Logger LOGGER = LoggerFactory.getLogger(ElasticSearchSearcher.class); @@ -57,9 +58,9 @@ public class ElasticSearchSearcher<Id extends MailboxId> { public Iterator<Long> search(Mailbox<Id> mailbox, SearchQuery searchQuery) throws MailboxException { try (Client client = clientProvider.get()) { - return transformResponseToUidIterator(getSearchRequestBuilder(client, mailbox, searchQuery) - .get() - ); + return new ScrollIterable(client, getSearchRequestBuilder(client, mailbox, searchQuery)).stream() + .flatMap(this::transformResponseToUidStream) + .iterator(); } } @@ -76,13 +77,11 @@ public class ElasticSearchSearcher<Id extends MailboxId> { (partialResult1, partialResult2) -> partialResult1); } - private Iterator<Long> transformResponseToUidIterator(SearchResponse searchResponse) { + private Stream<Long> transformResponseToUidStream(SearchResponse searchResponse) { return StreamSupport.stream(searchResponse.getHits().spliterator(), false) .map(this::extractUidFromHit) .filter(Optional::isPresent) - .map(Optional::get) - .iterator(); - + .map(Optional::get); } private Optional<Long> extractUidFromHit(SearchHit hit) { http://git-wip-us.apache.org/repos/asf/james-project/blob/ebeafcf0/mailbox/elasticsearch/src/main/java/org/apache/james/mailbox/elasticsearch/search/ScrollIterable.java ---------------------------------------------------------------------- diff --git a/mailbox/elasticsearch/src/main/java/org/apache/james/mailbox/elasticsearch/search/ScrollIterable.java b/mailbox/elasticsearch/src/main/java/org/apache/james/mailbox/elasticsearch/search/ScrollIterable.java new file mode 100644 index 0000000..76acce1 --- /dev/null +++ b/mailbox/elasticsearch/src/main/java/org/apache/james/mailbox/elasticsearch/search/ScrollIterable.java @@ -0,0 +1,81 @@ +/**************************************************************** + * Licensed to the Apache Software Foundation (ASF) under one * + * or more contributor license agreements. See the NOTICE file * + * distributed with this work for additional information * + * regarding copyright ownership. The ASF licenses this file * + * to you under the Apache License, Version 2.0 (the * + * "License"); you may not use this file except in compliance * + * with the License. You may obtain a copy of the License at * + * * + * http://www.apache.org/licenses/LICENSE-2.0 * + * * + * Unless required by applicable law or agreed to in writing, * + * software distributed under the License is distributed on an * + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY * + * KIND, either express or implied. See the License for the * + * specific language governing permissions and limitations * + * under the License. * + ****************************************************************/ + +package org.apache.james.mailbox.elasticsearch.search; + +import java.util.Iterator; +import java.util.stream.Stream; +import java.util.stream.StreamSupport; + +import org.elasticsearch.action.ListenableActionFuture; +import org.elasticsearch.action.search.SearchRequestBuilder; +import org.elasticsearch.action.search.SearchResponse; +import org.elasticsearch.client.Client; +import org.elasticsearch.common.unit.TimeValue; + +public class ScrollIterable implements Iterable<SearchResponse> { + + private static final TimeValue TIMEOUT = new TimeValue(60000); + private final Client client; + private final SearchRequestBuilder searchRequestBuilder; + + public ScrollIterable(Client client, SearchRequestBuilder searchRequestBuilder) { + this.client = client; + this.searchRequestBuilder = searchRequestBuilder; + } + + @Override + public Iterator<SearchResponse> iterator() { + return new ScrollIterator(client, searchRequestBuilder); + } + + public Stream<SearchResponse> stream() { + return StreamSupport.stream(spliterator(), false); + } + + public static class ScrollIterator implements Iterator<SearchResponse> { + + private final Client client; + private ListenableActionFuture<SearchResponse> searchResponseFuture; + + public ScrollIterator(Client client, SearchRequestBuilder searchRequestBuilder) { + this.client = client; + this.searchResponseFuture = searchRequestBuilder.execute(); + } + + @Override + public boolean hasNext() { + return !allSearchResponsesConsumed(searchResponseFuture.actionGet()); + } + + @Override + public SearchResponse next() { + SearchResponse result = searchResponseFuture.actionGet(); + searchResponseFuture = client.prepareSearchScroll(result.getScrollId()) + .setScroll(TIMEOUT) + .execute(); + return result; + } + + private boolean allSearchResponsesConsumed(SearchResponse searchResponse) { + return searchResponse.getHits().getHits().length == 0; + } + } + +} http://git-wip-us.apache.org/repos/asf/james-project/blob/ebeafcf0/mailbox/elasticsearch/src/test/java/org/apache/james/mailbox/elasticsearch/search/ScrollIterableTest.java ---------------------------------------------------------------------- diff --git a/mailbox/elasticsearch/src/test/java/org/apache/james/mailbox/elasticsearch/search/ScrollIterableTest.java b/mailbox/elasticsearch/src/test/java/org/apache/james/mailbox/elasticsearch/search/ScrollIterableTest.java new file mode 100644 index 0000000..2d574f0 --- /dev/null +++ b/mailbox/elasticsearch/src/test/java/org/apache/james/mailbox/elasticsearch/search/ScrollIterableTest.java @@ -0,0 +1,173 @@ +/**************************************************************** + * Licensed to the Apache Software Foundation (ASF) under one * + * or more contributor license agreements. See the NOTICE file * + * distributed with this work for additional information * + * regarding copyright ownership. The ASF licenses this file * + * to you under the Apache License, Version 2.0 (the * + * "License"); you may not use this file except in compliance * + * with the License. You may obtain a copy of the License at * + * * + * http://www.apache.org/licenses/LICENSE-2.0 * + * * + * Unless required by applicable law or agreed to in writing, * + * software distributed under the License is distributed on an * + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY * + * KIND, either express or implied. See the License for the * + * specific language governing permissions and limitations * + * under the License. * + ****************************************************************/ + +package org.apache.james.mailbox.elasticsearch.search; + +import static org.assertj.core.api.Assertions.assertThat; +import static org.elasticsearch.common.xcontent.XContentFactory.jsonBuilder; +import static org.elasticsearch.index.query.QueryBuilders.matchAllQuery; + +import java.io.IOException; +import java.util.Arrays; +import java.util.List; +import java.util.stream.Collectors; + +import org.apache.james.mailbox.elasticsearch.ClientProvider; +import org.apache.james.mailbox.elasticsearch.ElasticSearchIndexer; +import org.apache.james.mailbox.elasticsearch.EmbeddedElasticSearch; +import org.apache.james.mailbox.elasticsearch.IndexCreationFactory; +import org.apache.james.mailbox.elasticsearch.NodeMappingFactory; +import org.apache.james.mailbox.elasticsearch.utils.TestingClientProvider; +import org.elasticsearch.action.search.SearchRequestBuilder; +import org.elasticsearch.client.Client; +import org.elasticsearch.common.unit.TimeValue; +import org.elasticsearch.common.xcontent.XContentBuilder; +import org.elasticsearch.search.SearchHit; +import org.junit.Before; +import org.junit.Rule; +import org.junit.Test; +import org.junit.rules.RuleChain; +import org.junit.rules.TemporaryFolder; + +public class ScrollIterableTest { + + public static final TimeValue TIMEOUT = new TimeValue(6000); + public static final int SIZE = 2; + public static final String MESSAGE = "message"; + + private TemporaryFolder temporaryFolder = new TemporaryFolder(); + private EmbeddedElasticSearch embeddedElasticSearch= new EmbeddedElasticSearch(temporaryFolder); + + @Rule + public RuleChain ruleChain = RuleChain.outerRule(temporaryFolder).around(embeddedElasticSearch); + + private ClientProvider clientProvider; + + @Before + public void setUp() throws Exception { + clientProvider = new TestingClientProvider(embeddedElasticSearch.getNode()); + IndexCreationFactory.createIndex(clientProvider); + embeddedElasticSearch.awaitForElasticSearch(); + NodeMappingFactory.applyMapping(clientProvider, getMappingsSources()); + } + + private XContentBuilder getMappingsSources() throws IOException { + return jsonBuilder() + .startObject() + .startObject(ElasticSearchIndexer.MESSAGE_TYPE) + .startObject(NodeMappingFactory.PROPERTIES) + .startObject(MESSAGE) + .field(NodeMappingFactory.TYPE, NodeMappingFactory.STRING) + .endObject() + .endObject() + .endObject() + .endObject(); + } + + @Test + public void scrollIterableShouldWorkWhenEmpty() { + try (Client client = clientProvider.get()) { + SearchRequestBuilder searchRequestBuilder = client.prepareSearch(ElasticSearchIndexer.MAILBOX_INDEX) + .setTypes(ElasticSearchIndexer.MESSAGE_TYPE) + .setScroll(TIMEOUT) + .setQuery(matchAllQuery()) + .setSize(SIZE); + assertThat(new ScrollIterable(client, searchRequestBuilder)).isEmpty(); + } + } + + @Test + public void scrollIterableShouldWorkWhenOneElement() { + try (Client client = clientProvider.get()) { + String id = "1"; + client.prepareIndex(ElasticSearchIndexer.MAILBOX_INDEX, ElasticSearchIndexer.MESSAGE_TYPE, id) + .setSource(MESSAGE, "Sample message") + .execute(); + + embeddedElasticSearch.awaitForElasticSearch(); + + SearchRequestBuilder searchRequestBuilder = client.prepareSearch(ElasticSearchIndexer.MAILBOX_INDEX) + .setTypes(ElasticSearchIndexer.MESSAGE_TYPE) + .setScroll(TIMEOUT) + .setQuery(matchAllQuery()) + .setSize(SIZE); + assertThat(convertToIdList(new ScrollIterable(client, searchRequestBuilder))).containsOnly(id); + } + } + + @Test + public void scrollIterableShouldWorkWhenSizeElement() { + try (Client client = clientProvider.get()) { + String id1 = "1"; + client.prepareIndex(ElasticSearchIndexer.MAILBOX_INDEX, ElasticSearchIndexer.MESSAGE_TYPE, id1) + .setSource(MESSAGE, "Sample message") + .execute(); + + String id2 = "2"; + client.prepareIndex(ElasticSearchIndexer.MAILBOX_INDEX, ElasticSearchIndexer.MESSAGE_TYPE, id2) + .setSource(MESSAGE, "Sample message") + .execute(); + + embeddedElasticSearch.awaitForElasticSearch(); + + SearchRequestBuilder searchRequestBuilder = client.prepareSearch(ElasticSearchIndexer.MAILBOX_INDEX) + .setTypes(ElasticSearchIndexer.MESSAGE_TYPE) + .setScroll(TIMEOUT) + .setQuery(matchAllQuery()) + .setSize(SIZE); + assertThat(convertToIdList(new ScrollIterable(client, searchRequestBuilder))).containsOnly(id1, id2); + } + } + + @Test + public void scrollIterableShouldWorkWhenMoreThanSizeElement() { + try (Client client = clientProvider.get()) { + String id1 = "1"; + client.prepareIndex(ElasticSearchIndexer.MAILBOX_INDEX, ElasticSearchIndexer.MESSAGE_TYPE, id1) + .setSource(MESSAGE, "Sample message") + .execute(); + + String id2 = "2"; + client.prepareIndex(ElasticSearchIndexer.MAILBOX_INDEX, ElasticSearchIndexer.MESSAGE_TYPE, id2) + .setSource(MESSAGE, "Sample message") + .execute(); + + String id3 = "3"; + client.prepareIndex(ElasticSearchIndexer.MAILBOX_INDEX, ElasticSearchIndexer.MESSAGE_TYPE, id3) + .setSource(MESSAGE, "Sample message") + .execute(); + + embeddedElasticSearch.awaitForElasticSearch(); + + SearchRequestBuilder searchRequestBuilder = client.prepareSearch(ElasticSearchIndexer.MAILBOX_INDEX) + .setTypes(ElasticSearchIndexer.MESSAGE_TYPE) + .setScroll(TIMEOUT) + .setQuery(matchAllQuery()) + .setSize(SIZE); + assertThat(convertToIdList(new ScrollIterable(client, searchRequestBuilder))).containsOnly(id1, id2, id3); + } + } + + private List<String> convertToIdList(ScrollIterable scrollIterable) { + return scrollIterable.stream() + .flatMap(searchResponse -> Arrays.asList(searchResponse.getHits().getHits()).stream()) + .map(SearchHit::getId) + .collect(Collectors.toList()); + } +} --------------------------------------------------------------------- To unsubscribe, e-mail: server-dev-unsubscr...@james.apache.org For additional commands, e-mail: server-dev-h...@james.apache.org