This is an automated email from the ASF dual-hosted git repository. btellier pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/james-project.git
commit 5bf49a7c57210a929203a4da28b2211cb4f36faf Author: Benoit Tellier <btell...@linagora.com> AuthorDate: Wed May 22 13:12:19 2019 +0700 JAMES-2777 James should close ElasticSearch scroll context --- .../james/backends/es/DeleteByQueryPerformer.java | 4 +- .../{ScrollIterable.java => ScrolledSearch.java} | 59 ++++++++---- ...llIterableTest.java => ScrolledSearchTest.java} | 24 ++--- .../org/apache/james/mailbox/MessageManager.java | 3 +- .../ElasticSearchListeningMessageSearchIndex.java | 27 +++--- .../search/ElasticSearchSearcher.java | 24 ++--- .../lucene/search/LuceneMessageSearchIndex.java | 6 +- .../LuceneMailboxMessageSearchIndexTest.java | 102 ++++++++++----------- .../james/vault/DeletedMessageVaultHookTest.java | 13 +-- .../elasticsearch/ElasticSearchQuotaSearcher.java | 17 ++-- .../james/mailbox/store/StoreMessageManager.java | 7 +- .../store/search/LazyMessageSearchIndex.java | 3 +- .../mailbox/store/search/MessageSearchIndex.java | 4 +- .../store/search/SimpleMessageSearchIndex.java | 5 +- .../search/AbstractMessageSearchIndexTest.java | 3 +- .../imap/processor/AbstractMailboxProcessor.java | 6 +- .../james/imap/processor/SearchProcessor.java | 51 ++++++----- .../imap/processor/base/SelectedMailboxImpl.java | 8 +- .../james/imap/processor/SearchProcessorTest.java | 3 +- .../processor/base/MailboxEventAnalyserTest.java | 5 +- .../processor/base/SelectedMailboxImplTest.java | 7 +- .../org/apache/james/FakeMessageSearchIndex.java | 4 +- 22 files changed, 198 insertions(+), 187 deletions(-) diff --git a/backends-common/elasticsearch/src/main/java/org/apache/james/backends/es/DeleteByQueryPerformer.java b/backends-common/elasticsearch/src/main/java/org/apache/james/backends/es/DeleteByQueryPerformer.java index c84c9fc..26376c6 100644 --- a/backends-common/elasticsearch/src/main/java/org/apache/james/backends/es/DeleteByQueryPerformer.java +++ b/backends-common/elasticsearch/src/main/java/org/apache/james/backends/es/DeleteByQueryPerformer.java @@ -19,7 +19,7 @@ package org.apache.james.backends.es; -import org.apache.james.backends.es.search.ScrollIterable; +import org.apache.james.backends.es.search.ScrolledSearch; import org.elasticsearch.action.bulk.BulkRequest; import org.elasticsearch.action.bulk.BulkResponse; import org.elasticsearch.action.delete.DeleteRequest; @@ -51,7 +51,7 @@ public class DeleteByQueryPerformer { } public Mono<Void> perform(QueryBuilder queryBuilder) { - return Flux.fromStream(new ScrollIterable(client, prepareSearch(queryBuilder)).stream()) + return Flux.fromStream(new ScrolledSearch(client, prepareSearch(queryBuilder)).searchResponses()) .flatMap(searchResponse -> deleteRetrievedIds(client, searchResponse)) .thenEmpty(Mono.empty()); } diff --git a/backends-common/elasticsearch/src/main/java/org/apache/james/backends/es/search/ScrollIterable.java b/backends-common/elasticsearch/src/main/java/org/apache/james/backends/es/search/ScrolledSearch.java similarity index 73% rename from backends-common/elasticsearch/src/main/java/org/apache/james/backends/es/search/ScrollIterable.java rename to backends-common/elasticsearch/src/main/java/org/apache/james/backends/es/search/ScrolledSearch.java index 1a8d693..bf015f6 100644 --- a/backends-common/elasticsearch/src/main/java/org/apache/james/backends/es/search/ScrollIterable.java +++ b/backends-common/elasticsearch/src/main/java/org/apache/james/backends/es/search/ScrolledSearch.java @@ -19,39 +19,27 @@ package org.apache.james.backends.es.search; +import java.io.Closeable; +import java.io.IOException; +import java.util.Arrays; import java.util.Iterator; import java.util.concurrent.CompletableFuture; import java.util.stream.Stream; import org.apache.james.backends.es.ListenerToFuture; import org.apache.james.util.streams.Iterators; +import org.elasticsearch.action.search.ClearScrollRequest; import org.elasticsearch.action.search.SearchRequest; import org.elasticsearch.action.search.SearchResponse; import org.elasticsearch.action.search.SearchScrollRequest; import org.elasticsearch.client.RestHighLevelClient; import org.elasticsearch.common.unit.TimeValue; +import org.elasticsearch.search.SearchHit; -public class ScrollIterable implements Iterable<SearchResponse> { - private static final TimeValue TIMEOUT = TimeValue.timeValueMinutes(1); - - private final RestHighLevelClient client; - private final SearchRequest searchRequest; - - public ScrollIterable(RestHighLevelClient client, SearchRequest searchRequest) { - this.client = client; - this.searchRequest = searchRequest; - } +import com.github.fge.lambdas.Throwing; - @Override - public Iterator<SearchResponse> iterator() { - return new ScrollIterator(client, searchRequest); - } - - public Stream<SearchResponse> stream() { - return Iterators.toStream(iterator()); - } - - public static class ScrollIterator implements Iterator<SearchResponse> { +public class ScrolledSearch { + private static class ScrollIterator implements Iterator<SearchResponse>, Closeable { private final RestHighLevelClient client; private CompletableFuture<SearchResponse> searchResponseFuture; @@ -64,6 +52,13 @@ public class ScrollIterable implements Iterable<SearchResponse> { } @Override + public void close() throws IOException { + ClearScrollRequest clearScrollRequest = new ClearScrollRequest(); + clearScrollRequest.addScrollId(searchResponseFuture.join().getScrollId()); + client.clearScroll(clearScrollRequest); + } + + @Override public boolean hasNext() { SearchResponse join = searchResponseFuture.join(); return !allSearchResponsesConsumed(join); @@ -82,9 +77,33 @@ public class ScrollIterable implements Iterable<SearchResponse> { return result; } + public Stream<SearchResponse> stream() { + return Iterators.toStream(this) + .onClose(Throwing.runnable(this::close)); + } + private boolean allSearchResponsesConsumed(SearchResponse searchResponse) { return searchResponse.getHits().getHits().length == 0; } } + private static final TimeValue TIMEOUT = TimeValue.timeValueMinutes(1); + + private final RestHighLevelClient client; + private final SearchRequest searchRequest; + + public ScrolledSearch(RestHighLevelClient client, SearchRequest searchRequest) { + this.client = client; + this.searchRequest = searchRequest; + } + + public Stream<SearchHit> searchHits() { + return searchResponses() + .flatMap(searchResponse -> Arrays.stream(searchResponse.getHits().getHits())); + } + + public Stream<SearchResponse> searchResponses() { + return new ScrollIterator(client, searchRequest) + .stream(); + } } diff --git a/backends-common/elasticsearch/src/test/java/org/apache/james/backends/es/search/ScrollIterableTest.java b/backends-common/elasticsearch/src/test/java/org/apache/james/backends/es/search/ScrolledSearchTest.java similarity index 91% rename from backends-common/elasticsearch/src/test/java/org/apache/james/backends/es/search/ScrollIterableTest.java rename to backends-common/elasticsearch/src/test/java/org/apache/james/backends/es/search/ScrolledSearchTest.java index c52847c..f622a63 100644 --- a/backends-common/elasticsearch/src/test/java/org/apache/james/backends/es/search/ScrollIterableTest.java +++ b/backends-common/elasticsearch/src/test/java/org/apache/james/backends/es/search/ScrolledSearchTest.java @@ -23,9 +23,6 @@ import static org.assertj.core.api.Assertions.assertThat; import static org.awaitility.Awaitility.await; import java.io.IOException; -import java.util.Arrays; -import java.util.List; -import java.util.stream.Collectors; import org.apache.james.backends.es.ClientProvider; import org.apache.james.backends.es.DockerElasticSearchRule; @@ -47,8 +44,7 @@ import org.junit.Before; import org.junit.Rule; import org.junit.Test; -public class ScrollIterableTest { - +public class ScrolledSearchTest { private static final TimeValue TIMEOUT = TimeValue.timeValueMinutes(1); private static final int SIZE = 2; private static final String MESSAGE = "message"; @@ -80,7 +76,7 @@ public class ScrollIterableTest { .query(QueryBuilders.matchAllQuery()) .size(SIZE)); - assertThat(new ScrollIterable(client, searchRequest)) + assertThat(new ScrolledSearch(client, searchRequest).searchHits()) .isEmpty(); } } @@ -103,7 +99,8 @@ public class ScrollIterableTest { .query(QueryBuilders.matchAllQuery()) .size(SIZE)); - assertThat(convertToIdList(new ScrollIterable(client, searchRequest))) + assertThat(new ScrolledSearch(client, searchRequest).searchHits()) + .extracting(SearchHit::getId) .containsOnly(id); } } @@ -132,7 +129,8 @@ public class ScrollIterableTest { .query(QueryBuilders.matchAllQuery()) .size(SIZE)); - assertThat(convertToIdList(new ScrollIterable(client, searchRequest))) + assertThat(new ScrolledSearch(client, searchRequest).searchHits()) + .extracting(SearchHit::getId) .containsOnly(id1, id2); } } @@ -167,18 +165,12 @@ public class ScrollIterableTest { .query(QueryBuilders.matchAllQuery()) .size(SIZE)); - assertThat(convertToIdList(new ScrollIterable(client, searchRequest))) + assertThat(new ScrolledSearch(client, searchRequest).searchHits()) + .extracting(SearchHit::getId) .containsOnly(id1, id2, id3); } } - private List<String> convertToIdList(ScrollIterable scrollIterable) { - return scrollIterable.stream() - .flatMap(searchResponse -> Arrays.stream(searchResponse.getHits().getHits())) - .map(SearchHit::getId) - .collect(Collectors.toList()); - } - private void hasIdsInIndex(RestHighLevelClient client, String... ids) throws IOException { SearchRequest searchRequest = new SearchRequest(INDEX_NAME.getValue()) .scroll(TIMEOUT) diff --git a/mailbox/api/src/main/java/org/apache/james/mailbox/MessageManager.java b/mailbox/api/src/main/java/org/apache/james/mailbox/MessageManager.java index f2d13e0..29d17b0 100644 --- a/mailbox/api/src/main/java/org/apache/james/mailbox/MessageManager.java +++ b/mailbox/api/src/main/java/org/apache/james/mailbox/MessageManager.java @@ -29,6 +29,7 @@ import java.util.Iterator; import java.util.List; import java.util.Map; import java.util.Optional; +import java.util.stream.Stream; import javax.mail.Flags; @@ -103,7 +104,7 @@ public interface MessageManager { * @throws MailboxException * when search fails for other reasons */ - Iterator<MessageUid> search(SearchQuery searchQuery, MailboxSession mailboxSession) throws MailboxException; + Stream<MessageUid> search(SearchQuery searchQuery, MailboxSession mailboxSession) throws MailboxException; /** * Expunges messages in the given range from this mailbox by first retrieving the messages to be deleted diff --git a/mailbox/elasticsearch/src/main/java/org/apache/james/mailbox/elasticsearch/events/ElasticSearchListeningMessageSearchIndex.java b/mailbox/elasticsearch/src/main/java/org/apache/james/mailbox/elasticsearch/events/ElasticSearchListeningMessageSearchIndex.java index 1dd808c..23e2845 100644 --- a/mailbox/elasticsearch/src/main/java/org/apache/james/mailbox/elasticsearch/events/ElasticSearchListeningMessageSearchIndex.java +++ b/mailbox/elasticsearch/src/main/java/org/apache/james/mailbox/elasticsearch/events/ElasticSearchListeningMessageSearchIndex.java @@ -23,9 +23,9 @@ import static org.elasticsearch.index.query.QueryBuilders.termQuery; import java.io.IOException; import java.util.Collection; import java.util.EnumSet; -import java.util.Iterator; import java.util.List; import java.util.Optional; +import java.util.stream.Stream; import javax.inject.Inject; import javax.inject.Named; @@ -50,6 +50,7 @@ import org.apache.james.mailbox.store.MailboxSessionMapperFactory; import org.apache.james.mailbox.store.SessionProvider; import org.apache.james.mailbox.store.mail.model.MailboxMessage; import org.apache.james.mailbox.store.search.ListeningMessageSearchIndex; +import org.apache.james.util.OptionalUtils; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -98,13 +99,13 @@ public class ElasticSearchListeningMessageSearchIndex extends ListeningMessageSe } @Override - public Iterator<MessageUid> search(MailboxSession session, Mailbox mailbox, SearchQuery searchQuery) { + public Stream<MessageUid> search(MailboxSession session, Mailbox mailbox, SearchQuery searchQuery) { Preconditions.checkArgument(session != null, "'session' is mandatory"); Optional<Integer> noLimit = Optional.empty(); + return searcher - .search(ImmutableList.of(mailbox.getMailboxId()), searchQuery, noLimit) - .map(SearchResult::getMessageUid) - .iterator(); + .search(ImmutableList.of(mailbox.getMailboxId()), searchQuery, noLimit) + .map(SearchResult::getMessageUid); } @Override @@ -115,13 +116,15 @@ public class ElasticSearchListeningMessageSearchIndex extends ListeningMessageSe return ImmutableList.of(); } - return searcher.search(mailboxIds, searchQuery, Optional.empty()) - .peek(this::logIfNoMessageId) - .map(SearchResult::getMessageId) - .map(Optional::get) - .distinct() - .limit(limit) - .collect(Guavate.toImmutableList()); + try (Stream<SearchResult> searchResults = searcher.search(mailboxIds, searchQuery, Optional.empty())) { + return searchResults + .peek(this::logIfNoMessageId) + .map(SearchResult::getMessageId) + .flatMap(OptionalUtils::toStream) + .distinct() + .limit(limit) + .collect(Guavate.toImmutableList()); + } } @Override diff --git a/mailbox/elasticsearch/src/main/java/org/apache/james/mailbox/elasticsearch/search/ElasticSearchSearcher.java b/mailbox/elasticsearch/src/main/java/org/apache/james/mailbox/elasticsearch/search/ElasticSearchSearcher.java index 3863302..9406b68 100644 --- a/mailbox/elasticsearch/src/main/java/org/apache/james/mailbox/elasticsearch/search/ElasticSearchSearcher.java +++ b/mailbox/elasticsearch/src/main/java/org/apache/james/mailbox/elasticsearch/search/ElasticSearchSearcher.java @@ -19,7 +19,6 @@ package org.apache.james.mailbox.elasticsearch.search; -import java.util.Arrays; import java.util.Collection; import java.util.Optional; import java.util.stream.Stream; @@ -27,7 +26,7 @@ import java.util.stream.Stream; import org.apache.james.backends.es.AliasName; import org.apache.james.backends.es.NodeMappingFactory; import org.apache.james.backends.es.ReadAliasName; -import org.apache.james.backends.es.search.ScrollIterable; +import org.apache.james.backends.es.search.ScrolledSearch; import org.apache.james.mailbox.MessageUid; import org.apache.james.mailbox.elasticsearch.json.JsonMessageConstants; import org.apache.james.mailbox.elasticsearch.query.QueryConverter; @@ -37,7 +36,6 @@ import org.apache.james.mailbox.model.MessageId; import org.apache.james.mailbox.model.SearchQuery; import org.apache.james.mailbox.store.search.MessageSearchIndex; import org.elasticsearch.action.search.SearchRequest; -import org.elasticsearch.action.search.SearchResponse; import org.elasticsearch.client.RestHighLevelClient; import org.elasticsearch.common.document.DocumentField; import org.elasticsearch.common.unit.TimeValue; @@ -49,9 +47,7 @@ import org.slf4j.LoggerFactory; import com.google.common.collect.ImmutableList; public class ElasticSearchSearcher { - public static final int DEFAULT_SEARCH_SIZE = 100; - private static final Logger LOGGER = LoggerFactory.getLogger(ElasticSearchSearcher.class); private static final TimeValue TIMEOUT = TimeValue.timeValueMinutes(1); private static final ImmutableList<String> STORED_FIELDS = ImmutableList.of(JsonMessageConstants.MAILBOX_ID, @@ -78,8 +74,9 @@ public class ElasticSearchSearcher { public Stream<MessageSearchIndex.SearchResult> search(Collection<MailboxId> mailboxIds, SearchQuery query, Optional<Integer> limit) { SearchRequest searchRequest = prepareSearch(mailboxIds, query, limit); - Stream<MessageSearchIndex.SearchResult> pairStream = new ScrollIterable(client, searchRequest).stream() - .flatMap(this::transformResponseToUidStream); + Stream<MessageSearchIndex.SearchResult> pairStream = new ScrolledSearch(client, searchRequest) + .searchHits() + .flatMap(this::extractContentFromHit); return limit.map(pairStream::limit) .orElse(pairStream); @@ -107,27 +104,20 @@ public class ElasticSearchSearcher { .orElse(size); } - private Stream<MessageSearchIndex.SearchResult> transformResponseToUidStream(SearchResponse searchResponse) { - return Arrays.stream(searchResponse.getHits().getHits()) - .map(this::extractContentFromHit) - .filter(Optional::isPresent) - .map(Optional::get); - } - - private Optional<MessageSearchIndex.SearchResult> extractContentFromHit(SearchHit hit) { + private Stream<MessageSearchIndex.SearchResult> extractContentFromHit(SearchHit hit) { DocumentField mailboxId = hit.field(JsonMessageConstants.MAILBOX_ID); DocumentField uid = hit.field(JsonMessageConstants.UID); Optional<DocumentField> id = retrieveMessageIdField(hit); if (mailboxId != null && uid != null) { Number uidAsNumber = uid.getValue(); - return Optional.of( + return Stream.of( new MessageSearchIndex.SearchResult( id.map(field -> messageIdFactory.fromString(field.getValue())), mailboxIdFactory.fromString(mailboxId.getValue()), MessageUid.of(uidAsNumber.longValue()))); } else { LOGGER.warn("Can not extract UID, MessageID and/or MailboxId for search result {}", hit.getId()); - return Optional.empty(); + return Stream.empty(); } } diff --git a/mailbox/lucene/src/main/java/org/apache/james/mailbox/lucene/search/LuceneMessageSearchIndex.java b/mailbox/lucene/src/main/java/org/apache/james/mailbox/lucene/search/LuceneMessageSearchIndex.java index 93d0893..8300b5c 100644 --- a/mailbox/lucene/src/main/java/org/apache/james/mailbox/lucene/search/LuceneMessageSearchIndex.java +++ b/mailbox/lucene/src/main/java/org/apache/james/mailbox/lucene/search/LuceneMessageSearchIndex.java @@ -36,6 +36,7 @@ import java.util.Locale; import java.util.Optional; import java.util.Set; import java.util.TimeZone; +import java.util.stream.Stream; import javax.annotation.PreDestroy; import javax.inject.Inject; @@ -463,13 +464,12 @@ public class LuceneMessageSearchIndex extends ListeningMessageSearchIndex { @Override - public Iterator<MessageUid> search(MailboxSession session, Mailbox mailbox, SearchQuery searchQuery) throws MailboxException { + public Stream<MessageUid> search(MailboxSession session, Mailbox mailbox, SearchQuery searchQuery) throws MailboxException { Preconditions.checkArgument(session != null, "'session' is mandatory"); return searchMultimap(ImmutableList.of(mailbox.getMailboxId()), searchQuery) .stream() - .map(SearchResult::getMessageUid) - .iterator(); + .map(SearchResult::getMessageUid); } @Override diff --git a/mailbox/lucene/src/test/java/org/apache/james/mailbox/lucene/search/LuceneMailboxMessageSearchIndexTest.java b/mailbox/lucene/src/test/java/org/apache/james/mailbox/lucene/search/LuceneMailboxMessageSearchIndexTest.java index cf57832..d407a8e 100644 --- a/mailbox/lucene/src/test/java/org/apache/james/mailbox/lucene/search/LuceneMailboxMessageSearchIndexTest.java +++ b/mailbox/lucene/src/test/java/org/apache/james/mailbox/lucene/search/LuceneMailboxMessageSearchIndexTest.java @@ -24,10 +24,10 @@ import java.nio.charset.Charset; import java.util.Calendar; import java.util.Date; import java.util.HashMap; -import java.util.Iterator; import java.util.List; import java.util.Locale; import java.util.Map; +import java.util.stream.Stream; import javax.mail.Flags; import javax.mail.Flags.Flag; @@ -160,7 +160,7 @@ public class LuceneMailboxMessageSearchIndexTest { public void bodySearchShouldMatchPhraseInBody() throws Exception { SearchQuery query = new SearchQuery(); query.andCriteria(SearchQuery.bodyContains(CUSTARD)); - Iterator<MessageUid> result = index.search(session, mailbox3, query); + Stream<MessageUid> result = index.search(session, mailbox3, query); assertThat(result).containsExactly(uid5); } @@ -168,7 +168,7 @@ public class LuceneMailboxMessageSearchIndexTest { public void bodySearchShouldNotMatchAbsentPhraseInBody() throws Exception { SearchQuery query = new SearchQuery(); query.andCriteria(SearchQuery.bodyContains(CUSTARD + CUSTARD)); - Iterator<MessageUid> result = index.search(session, mailbox3, query); + Stream<MessageUid> result = index.search(session, mailbox3, query); assertThat(result).isEmpty(); } @@ -176,7 +176,7 @@ public class LuceneMailboxMessageSearchIndexTest { public void bodySearchShouldBeCaseInsensitive() throws Exception { SearchQuery query = new SearchQuery(); query.andCriteria(SearchQuery.bodyContains(RHUBARD)); - Iterator<MessageUid> result = index.search(session, mailbox3, query); + Stream<MessageUid> result = index.search(session, mailbox3, query); assertThat(result).containsExactly(uid5); } @@ -184,7 +184,7 @@ public class LuceneMailboxMessageSearchIndexTest { public void bodySearchNotMatchPhraseOnlyInFrom() throws Exception { SearchQuery query = new SearchQuery(); query.andCriteria(SearchQuery.bodyContains(FROM_ADDRESS)); - Iterator<MessageUid> result = index.search(session, mailbox3, query); + Stream<MessageUid> result = index.search(session, mailbox3, query); assertThat(result).isEmpty(); } @@ -192,7 +192,7 @@ public class LuceneMailboxMessageSearchIndexTest { public void bodySearchShouldNotMatchPhraseOnlyInSubject() throws Exception { SearchQuery query = new SearchQuery(); query.andCriteria(SearchQuery.bodyContains(SUBJECT_PART)); - Iterator<MessageUid> result = index.search(session, mailbox3, query); + Stream<MessageUid> result = index.search(session, mailbox3, query); assertThat(result).isEmpty(); } @@ -200,7 +200,7 @@ public class LuceneMailboxMessageSearchIndexTest { public void textSearchShouldMatchPhraseInBody() throws Exception { SearchQuery query = new SearchQuery(); query.andCriteria(SearchQuery.mailContains(CUSTARD)); - Iterator<MessageUid> result = index.search(session, mailbox3, query); + Stream<MessageUid> result = index.search(session, mailbox3, query); assertThat(result).containsExactly(uid5); } @@ -208,7 +208,7 @@ public class LuceneMailboxMessageSearchIndexTest { public void textSearchShouldNotAbsentMatchPhraseInBody() throws Exception { SearchQuery query = new SearchQuery(); query.andCriteria(SearchQuery.mailContains(CUSTARD + CUSTARD)); - Iterator<MessageUid> result = index.search(session, mailbox3, query); + Stream<MessageUid> result = index.search(session, mailbox3, query); assertThat(result).isEmpty(); } @@ -216,7 +216,7 @@ public class LuceneMailboxMessageSearchIndexTest { public void textSearchMatchShouldBeCaseInsensitive() throws Exception { SearchQuery query = new SearchQuery(); query.andCriteria(SearchQuery.mailContains(RHUBARD.toLowerCase(Locale.US))); - Iterator<MessageUid> result = index.search(session, mailbox3, query); + Stream<MessageUid> result = index.search(session, mailbox3, query); assertThat(result).containsExactly(uid5); } @@ -224,7 +224,7 @@ public class LuceneMailboxMessageSearchIndexTest { public void addressSearchShouldMatchToFullAddress() throws Exception { SearchQuery query = new SearchQuery(); query.andCriteria(SearchQuery.address(AddressType.To,FROM_ADDRESS)); - Iterator<MessageUid> result = index.search(session, mailbox3, query); + Stream<MessageUid> result = index.search(session, mailbox3, query); assertThat(result).containsExactly(uid5); } @@ -232,7 +232,7 @@ public class LuceneMailboxMessageSearchIndexTest { public void addressSearchShouldMatchToDisplayName() throws Exception { SearchQuery query = new SearchQuery(); query.andCriteria(SearchQuery.address(AddressType.To,"Harry")); - Iterator<MessageUid> result = index.search(session, mailbox3, query); + Stream<MessageUid> result = index.search(session, mailbox3, query); assertThat(result).containsExactly(uid5); } @@ -240,7 +240,7 @@ public class LuceneMailboxMessageSearchIndexTest { public void addressSearchShouldMatchToEmail() throws Exception { SearchQuery query = new SearchQuery(); query.andCriteria(SearchQuery.address(AddressType.To,"ha...@example.org")); - Iterator<MessageUid> result = index.search(session, mailbox3, query); + Stream<MessageUid> result = index.search(session, mailbox3, query); assertThat(result).containsExactly(uid5); } @@ -248,7 +248,7 @@ public class LuceneMailboxMessageSearchIndexTest { public void addressSearchShouldMatchFrom() throws Exception { SearchQuery query = new SearchQuery(); query.andCriteria(SearchQuery.address(AddressType.From,"ser-f...@domain.or")); - Iterator<MessageUid> result = index.search(session, mailbox3, query); + Stream<MessageUid> result = index.search(session, mailbox3, query); assertThat(result).containsExactly(uid5); } @@ -256,7 +256,7 @@ public class LuceneMailboxMessageSearchIndexTest { public void textSearchShouldMatchPhraseOnlyInToHeader() throws Exception { SearchQuery query = new SearchQuery(); query.andCriteria(SearchQuery.mailContains(FROM_ADDRESS)); - Iterator<MessageUid> result = index.search(session, mailbox3, query); + Stream<MessageUid> result = index.search(session, mailbox3, query); assertThat(result).containsExactly(uid5); } @@ -264,7 +264,7 @@ public class LuceneMailboxMessageSearchIndexTest { public void textSearchShouldMatchPhraseOnlyInSubjectHeader() throws Exception { SearchQuery query = new SearchQuery(); query.andCriteria(SearchQuery.mailContains(SUBJECT_PART)); - Iterator<MessageUid> result = index.search(session, mailbox3, query); + Stream<MessageUid> result = index.search(session, mailbox3, query); assertThat(result).containsExactly(uid5); } @@ -272,7 +272,7 @@ public class LuceneMailboxMessageSearchIndexTest { public void searchAllShouldMatchAllMailboxEmails() throws Exception { SearchQuery query = new SearchQuery(); query.andCriteria(SearchQuery.all()); - Iterator<MessageUid> result = index.search(session, mailbox2, query); + Stream<MessageUid> result = index.search(session, mailbox2, query); assertThat(result).containsExactly(uid2); } @@ -325,7 +325,7 @@ public class LuceneMailboxMessageSearchIndexTest { public void flagSearchShouldMatch() throws Exception { SearchQuery query = new SearchQuery(); query.andCriteria(SearchQuery.flagIsSet(Flag.DELETED)); - Iterator<MessageUid> result = index.search(session, mailbox, query); + Stream<MessageUid> result = index.search(session, mailbox, query); assertThat(result).containsExactly(uid3, uid4); } @@ -333,7 +333,7 @@ public class LuceneMailboxMessageSearchIndexTest { public void bodySearchShouldMatchSeveralEmails() throws Exception { SearchQuery query = new SearchQuery(); query.andCriteria(SearchQuery.bodyContains("body")); - Iterator<MessageUid> result = index.search(session, mailbox, query); + Stream<MessageUid> result = index.search(session, mailbox, query); assertThat(result).containsExactly(uid1, uid3, uid4); } @@ -341,7 +341,7 @@ public class LuceneMailboxMessageSearchIndexTest { public void textSearchShouldMatchSeveralEmails() throws Exception { SearchQuery query = new SearchQuery(); query.andCriteria(SearchQuery.mailContains("body")); - Iterator<MessageUid> result = index.search(session, mailbox, query); + Stream<MessageUid> result = index.search(session, mailbox, query); assertThat(result).containsExactly(uid1, uid3, uid4); } @@ -349,7 +349,7 @@ public class LuceneMailboxMessageSearchIndexTest { public void headerSearchShouldMatch() throws Exception { SearchQuery query = new SearchQuery(); query.andCriteria(SearchQuery.headerContains("Subject", "test")); - Iterator<MessageUid> result = index.search(session, mailbox, query); + Stream<MessageUid> result = index.search(session, mailbox, query); assertThat(result).containsExactly(uid1, uid4); } @@ -357,7 +357,7 @@ public class LuceneMailboxMessageSearchIndexTest { public void headerExistsShouldMatch() throws Exception { SearchQuery query = new SearchQuery(); query.andCriteria(SearchQuery.headerExists("Subject")); - Iterator<MessageUid> result = index.search(session, mailbox, query); + Stream<MessageUid> result = index.search(session, mailbox, query); assertThat(result).containsExactly(uid1, uid4); } @@ -365,7 +365,7 @@ public class LuceneMailboxMessageSearchIndexTest { public void flagUnsetShouldMatch() throws Exception { SearchQuery query = new SearchQuery(); query.andCriteria(SearchQuery.flagIsUnSet(Flag.DRAFT)); - Iterator<MessageUid> result = index.search(session, mailbox, query); + Stream<MessageUid> result = index.search(session, mailbox, query); assertThat(result).containsExactly(uid1, uid3, uid4); } @@ -376,7 +376,7 @@ public class LuceneMailboxMessageSearchIndexTest { cal.setTime(new Date()); query.andCriteria(SearchQuery.internalDateBefore(cal.getTime(), DateResolution.Day)); - Iterator<MessageUid> result = index.search(session, mailbox, query); + Stream<MessageUid> result = index.search(session, mailbox, query); assertThat(result).containsExactly(uid3); } @@ -387,7 +387,7 @@ public class LuceneMailboxMessageSearchIndexTest { Calendar cal = Calendar.getInstance(); cal.setTime(new Date()); query.andCriteria(SearchQuery.internalDateAfter(cal.getTime(), DateResolution.Day)); - Iterator<MessageUid> result = index.search(session, mailbox, query); + Stream<MessageUid> result = index.search(session, mailbox, query); assertThat(result).containsExactly(uid4); } @@ -399,7 +399,7 @@ public class LuceneMailboxMessageSearchIndexTest { Calendar cal = Calendar.getInstance(); cal.setTime(new Date()); query.andCriteria(SearchQuery.internalDateOn(cal.getTime(), DateResolution.Day)); - Iterator<MessageUid> result = index.search(session, mailbox, query); + Stream<MessageUid> result = index.search(session, mailbox, query); assertThat(result).containsExactly(uid1); } @@ -409,7 +409,7 @@ public class LuceneMailboxMessageSearchIndexTest { Calendar cal = Calendar.getInstance(); cal.setTime(new Date()); query.andCriteria(SearchQuery.uid(new SearchQuery.UidRange[] {new SearchQuery.UidRange(uid1)})); - Iterator<MessageUid> result = index.search(session, mailbox, query); + Stream<MessageUid> result = index.search(session, mailbox, query); assertThat(result).containsExactly(uid1); } @@ -419,7 +419,7 @@ public class LuceneMailboxMessageSearchIndexTest { Calendar cal = Calendar.getInstance(); cal.setTime(new Date()); query.andCriteria(SearchQuery.uid(new SearchQuery.UidRange[] {new SearchQuery.UidRange(uid1), new SearchQuery.UidRange(uid3,uid4)})); - Iterator<MessageUid> result = index.search(session, mailbox, query); + Stream<MessageUid> result = index.search(session, mailbox, query); assertThat(result).containsExactly(uid1, uid3, uid4); } @@ -427,7 +427,7 @@ public class LuceneMailboxMessageSearchIndexTest { public void sizeEqualsShouldMatch() throws Exception { SearchQuery query = new SearchQuery(); query.andCriteria(SearchQuery.sizeEquals(200)); - Iterator<MessageUid> result = index.search(session, mailbox, query); + Stream<MessageUid> result = index.search(session, mailbox, query); assertThat(result).containsExactly(uid1); } @@ -435,7 +435,7 @@ public class LuceneMailboxMessageSearchIndexTest { public void sizeLessThanShouldMatch() throws Exception { SearchQuery query = new SearchQuery(); query.andCriteria(SearchQuery.sizeLessThan(200)); - Iterator<MessageUid> result = index.search(session, mailbox, query); + Stream<MessageUid> result = index.search(session, mailbox, query); assertThat(result).containsExactly(uid3, uid4); } @@ -443,7 +443,7 @@ public class LuceneMailboxMessageSearchIndexTest { public void sizeGreaterThanShouldMatch() throws Exception { SearchQuery query = new SearchQuery(); query.andCriteria(SearchQuery.sizeGreaterThan(6)); - Iterator<MessageUid> result = index.search(session, mailbox, query); + Stream<MessageUid> result = index.search(session, mailbox, query); assertThat(result).containsExactly(uid1, uid3, uid4); } @@ -451,7 +451,7 @@ public class LuceneMailboxMessageSearchIndexTest { public void uidShouldBeSorted() throws Exception { SearchQuery query = new SearchQuery(); query.andCriteria(SearchQuery.all()); - Iterator<MessageUid> result = index.search(session, mailbox, query); + Stream<MessageUid> result = index.search(session, mailbox, query); assertThat(result).containsExactly(uid1, uid3, uid4); } @@ -460,7 +460,7 @@ public class LuceneMailboxMessageSearchIndexTest { SearchQuery query = new SearchQuery(SearchQuery.all()); query.setSorts(ImmutableList.of(new Sort(SortClause.Uid, Order.REVERSE))); - Iterator<MessageUid> result = index.search(session, mailbox, query); + Stream<MessageUid> result = index.search(session, mailbox, query); assertThat(result).containsExactly(uid4, uid3, uid1); } @@ -469,7 +469,7 @@ public class LuceneMailboxMessageSearchIndexTest { SearchQuery query = new SearchQuery(SearchQuery.all()); query.setSorts(ImmutableList.of(new Sort(SortClause.SentDate, Order.NATURAL))); - Iterator<MessageUid> result = index.search(session, mailbox, query); + Stream<MessageUid> result = index.search(session, mailbox, query); assertThat(result).containsExactly(uid3, uid4, uid1); } @@ -478,7 +478,7 @@ public class LuceneMailboxMessageSearchIndexTest { SearchQuery query = new SearchQuery(SearchQuery.all()); query.setSorts(ImmutableList.of(new Sort(SortClause.SentDate, Order.REVERSE))); - Iterator<MessageUid> result = index.search(session, mailbox, query); + Stream<MessageUid> result = index.search(session, mailbox, query); assertThat(result).containsExactly(uid1, uid4, uid3); } @@ -487,7 +487,7 @@ public class LuceneMailboxMessageSearchIndexTest { SearchQuery query = new SearchQuery(SearchQuery.all()); query.setSorts(ImmutableList.of(new Sort(SortClause.BaseSubject, Order.NATURAL))); - Iterator<MessageUid> result = index.search(session, mailbox, query); + Stream<MessageUid> result = index.search(session, mailbox, query); assertThat(result).containsExactly(uid3, uid1, uid4); } @@ -496,7 +496,7 @@ public class LuceneMailboxMessageSearchIndexTest { SearchQuery query = new SearchQuery(SearchQuery.all()); query.setSorts(ImmutableList.of(new Sort(SortClause.BaseSubject, Order.REVERSE))); - Iterator<MessageUid> result = index.search(session, mailbox, query); + Stream<MessageUid> result = index.search(session, mailbox, query); assertThat(result).containsExactly(uid4, uid1, uid3); } @@ -505,7 +505,7 @@ public class LuceneMailboxMessageSearchIndexTest { SearchQuery query = new SearchQuery(SearchQuery.all()); query.setSorts(ImmutableList.of(new Sort(SortClause.MailboxFrom, Order.NATURAL))); - Iterator<MessageUid> result = index.search(session, mailbox, query); + Stream<MessageUid> result = index.search(session, mailbox, query); assertThat(result).containsExactly(uid3, uid4, uid1); } @@ -514,7 +514,7 @@ public class LuceneMailboxMessageSearchIndexTest { SearchQuery query = new SearchQuery(SearchQuery.all()); query.setSorts(ImmutableList.of(new Sort(SortClause.MailboxFrom, Order.REVERSE))); - Iterator<MessageUid> result = index.search(session, mailbox, query); + Stream<MessageUid> result = index.search(session, mailbox, query); assertThat(result).containsExactly(uid1, uid4, uid3); } @@ -523,7 +523,7 @@ public class LuceneMailboxMessageSearchIndexTest { SearchQuery query = new SearchQuery(SearchQuery.all()); query.setSorts(ImmutableList.of(new Sort(SortClause.MailboxCc, Order.NATURAL))); - Iterator<MessageUid> result = index.search(session, mailbox, query); + Stream<MessageUid> result = index.search(session, mailbox, query); assertThat(result).containsExactly(uid1, uid3, uid4); } @@ -532,7 +532,7 @@ public class LuceneMailboxMessageSearchIndexTest { SearchQuery query = new SearchQuery(SearchQuery.all()); query.setSorts(ImmutableList.of(new Sort(SortClause.MailboxCc, Order.REVERSE))); - Iterator<MessageUid> result = index.search(session, mailbox, query); + Stream<MessageUid> result = index.search(session, mailbox, query); assertThat(result).containsExactly(uid3, uid4, uid1); } @@ -541,7 +541,7 @@ public class LuceneMailboxMessageSearchIndexTest { SearchQuery query = new SearchQuery(SearchQuery.all()); query.setSorts(ImmutableList.of(new Sort(SortClause.MailboxTo, Order.NATURAL))); - Iterator<MessageUid> result = index.search(session, mailbox, query); + Stream<MessageUid> result = index.search(session, mailbox, query); assertThat(result).containsExactly(uid4, uid1, uid3); } @@ -550,7 +550,7 @@ public class LuceneMailboxMessageSearchIndexTest { SearchQuery query = new SearchQuery(SearchQuery.all()); query.setSorts(ImmutableList.of(new Sort(SortClause.MailboxTo, Order.REVERSE))); - Iterator<MessageUid> result = index.search(session, mailbox, query); + Stream<MessageUid> result = index.search(session, mailbox, query); assertThat(result).containsExactly(uid3, uid1, uid4); } @@ -559,7 +559,7 @@ public class LuceneMailboxMessageSearchIndexTest { SearchQuery query = new SearchQuery(SearchQuery.all()); query.setSorts(ImmutableList.of(new Sort(SortClause.DisplayTo, Order.NATURAL))); - Iterator<MessageUid> result = index.search(session, mailbox, query); + Stream<MessageUid> result = index.search(session, mailbox, query); assertThat(result).containsExactly(uid4, uid1, uid3); } @@ -568,7 +568,7 @@ public class LuceneMailboxMessageSearchIndexTest { SearchQuery query = new SearchQuery(SearchQuery.all()); query.setSorts(ImmutableList.of(new Sort(SortClause.DisplayTo, Order.REVERSE))); - Iterator<MessageUid> result = index.search(session, mailbox, query); + Stream<MessageUid> result = index.search(session, mailbox, query); assertThat(result).containsExactly(uid3, uid1, uid4); } @@ -577,7 +577,7 @@ public class LuceneMailboxMessageSearchIndexTest { SearchQuery query = new SearchQuery(SearchQuery.all()); query.setSorts(ImmutableList.of(new Sort(SortClause.DisplayFrom, Order.NATURAL))); - Iterator<MessageUid> result = index.search(session, mailbox, query); + Stream<MessageUid> result = index.search(session, mailbox, query); assertThat(result).containsExactly(uid3, uid4, uid1); } @@ -586,7 +586,7 @@ public class LuceneMailboxMessageSearchIndexTest { SearchQuery query = new SearchQuery(SearchQuery.all()); query.setSorts(ImmutableList.of(new Sort(SortClause.DisplayFrom, Order.REVERSE))); - Iterator<MessageUid> result = index.search(session, mailbox, query); + Stream<MessageUid> result = index.search(session, mailbox, query); assertThat(result).containsExactly(uid1, uid4, uid3); } @@ -595,7 +595,7 @@ public class LuceneMailboxMessageSearchIndexTest { SearchQuery query = new SearchQuery(SearchQuery.all()); query.setSorts(ImmutableList.of(new Sort(SortClause.Arrival, Order.NATURAL))); - Iterator<MessageUid> result = index.search(session, mailbox, query); + Stream<MessageUid> result = index.search(session, mailbox, query); assertThat(result).containsExactly(uid3, uid1, uid4); } @@ -604,7 +604,7 @@ public class LuceneMailboxMessageSearchIndexTest { SearchQuery query = new SearchQuery(SearchQuery.all()); query.setSorts(ImmutableList.of(new Sort(SortClause.Arrival, Order.REVERSE))); - Iterator<MessageUid> result = index.search(session, mailbox, query); + Stream<MessageUid> result = index.search(session, mailbox, query); assertThat(result).containsExactly(uid4, uid1, uid3); } @@ -613,7 +613,7 @@ public class LuceneMailboxMessageSearchIndexTest { SearchQuery query = new SearchQuery(SearchQuery.all()); query.setSorts(ImmutableList.of(new Sort(SortClause.Size, Order.NATURAL))); - Iterator<MessageUid> result = index.search(session, mailbox, query); + Stream<MessageUid> result = index.search(session, mailbox, query); assertThat(result).containsExactly(uid3, uid4, uid1); } @@ -622,7 +622,7 @@ public class LuceneMailboxMessageSearchIndexTest { SearchQuery query = new SearchQuery(SearchQuery.all()); query.setSorts(ImmutableList.of(new Sort(SortClause.Size, Order.REVERSE))); - Iterator<MessageUid> result = index.search(session, mailbox, query); + Stream<MessageUid> result = index.search(session, mailbox, query); assertThat(result).containsExactly(uid1, uid3, uid4); } @@ -631,7 +631,7 @@ public class LuceneMailboxMessageSearchIndexTest { SearchQuery query = new SearchQuery(); query.andCriteria(SearchQuery.not(SearchQuery.uid(new SearchQuery.UidRange[] { new SearchQuery.UidRange(uid1)}))); - Iterator<MessageUid> result = index.search(session, mailbox, query); + Stream<MessageUid> result = index.search(session, mailbox, query); assertThat(result).containsExactly(uid3, uid4); } } diff --git a/mailbox/plugin/deleted-messages-vault/src/test/java/org/apache/james/vault/DeletedMessageVaultHookTest.java b/mailbox/plugin/deleted-messages-vault/src/test/java/org/apache/james/vault/DeletedMessageVaultHookTest.java index 324f20c..7b22354 100644 --- a/mailbox/plugin/deleted-messages-vault/src/test/java/org/apache/james/vault/DeletedMessageVaultHookTest.java +++ b/mailbox/plugin/deleted-messages-vault/src/test/java/org/apache/james/vault/DeletedMessageVaultHookTest.java @@ -52,6 +52,7 @@ import org.apache.james.vault.search.Query; import org.junit.jupiter.api.BeforeEach; import org.junit.jupiter.api.Test; +import com.github.steveash.guavate.Guavate; import com.google.common.collect.ImmutableList; class DeletedMessageVaultHookTest { @@ -165,7 +166,7 @@ class DeletedMessageVaultHookTest { long messageSize = messageSize(bobMessageManager, composedMessageId); DeletedMessage deletedMessage = buildDeletedMessage(ImmutableList.of(aliceMailbox), messageId, ALICE, messageSize); - bobMessageManager.delete(ImmutableList.copyOf(bobMessageManager.search(searchQuery, bobSession)), bobSession); + bobMessageManager.delete(bobMessageManager.search(searchQuery, bobSession).collect(Guavate.toImmutableList()), bobSession); assertThat(messageVault.search(ALICE, Query.ALL).blockFirst()) .isEqualTo(deletedMessage); @@ -186,7 +187,7 @@ class DeletedMessageVaultHookTest { MessageManager bobMessageManager = mailboxManager.getMailbox(aliceMailbox, bobSession); appendMessage(aliceMessageManager); - bobMessageManager.delete(ImmutableList.copyOf(bobMessageManager.search(searchQuery, bobSession)), bobSession); + bobMessageManager.delete(bobMessageManager.search(searchQuery, bobSession).collect(Guavate.toImmutableList()), bobSession); assertThat(messageVault.search(BOB, Query.ALL).collectList().block()) .isEmpty(); @@ -212,7 +213,7 @@ class DeletedMessageVaultHookTest { long messageSize = messageSize(bobMessageManager, composedMessageId); DeletedMessage deletedMessage = buildDeletedMessage(ImmutableList.of(bobMailbox), messageId, BOB, messageSize); - bobMessageManager.delete(ImmutableList.copyOf(bobMessageManager.search(searchQuery, bobSession)), bobSession); + bobMessageManager.delete(bobMessageManager.search(searchQuery, bobSession).collect(Guavate.toImmutableList()), bobSession); assertThat(messageVault.search(BOB, Query.ALL).blockFirst()) .isEqualTo(deletedMessage); @@ -236,7 +237,7 @@ class DeletedMessageVaultHookTest { messageIdManager.setInMailboxes(messageId, ImmutableList.of(bobMailbox), bobSession); - bobMessageManager.delete(ImmutableList.copyOf(bobMessageManager.search(searchQuery, bobSession)), bobSession); + bobMessageManager.delete(bobMessageManager.search(searchQuery, bobSession).collect(Guavate.toImmutableList()), bobSession); assertThat(messageVault.search(ALICE, Query.ALL).collectList().block()) .isEmpty(); @@ -262,7 +263,7 @@ class DeletedMessageVaultHookTest { long messageSize = messageSize(bobMessageManager, composedMessageId); DeletedMessage deletedMessage = buildDeletedMessage(ImmutableList.of(bobMailbox), messageId, BOB, messageSize); - bobMessageManager.delete(ImmutableList.copyOf(bobMessageManager.search(searchQuery, bobSession)), bobSession); + bobMessageManager.delete(bobMessageManager.search(searchQuery, bobSession).collect(Guavate.toImmutableList()), bobSession); assertThat(messageVault.search(BOB, Query.ALL).blockFirst()) .isEqualTo(deletedMessage); @@ -286,7 +287,7 @@ class DeletedMessageVaultHookTest { messageIdManager.setInMailboxes(messageId, ImmutableList.of(aliceMailbox, bobMailbox), bobSession); - bobMessageManager.delete(ImmutableList.copyOf(bobMessageManager.search(searchQuery, bobSession)), bobSession); + bobMessageManager.delete(bobMessageManager.search(searchQuery, bobSession).collect(Guavate.toImmutableList()), bobSession); assertThat(messageVault.search(ALICE, Query.ALL).collectList().block()) .isEmpty(); diff --git a/mailbox/plugin/quota-search-elasticsearch/src/main/java/org/apache/james/quota/search/elasticsearch/ElasticSearchQuotaSearcher.java b/mailbox/plugin/quota-search-elasticsearch/src/main/java/org/apache/james/quota/search/elasticsearch/ElasticSearchQuotaSearcher.java index fd3cff7..7c02032 100644 --- a/mailbox/plugin/quota-search-elasticsearch/src/main/java/org/apache/james/quota/search/elasticsearch/ElasticSearchQuotaSearcher.java +++ b/mailbox/plugin/quota-search-elasticsearch/src/main/java/org/apache/james/quota/search/elasticsearch/ElasticSearchQuotaSearcher.java @@ -29,7 +29,7 @@ import java.util.stream.Stream; import org.apache.james.backends.es.AliasName; import org.apache.james.backends.es.NodeMappingFactory; import org.apache.james.backends.es.ReadAliasName; -import org.apache.james.backends.es.search.ScrollIterable; +import org.apache.james.backends.es.search.ScrolledSearch; import org.apache.james.core.User; import org.apache.james.quota.search.QuotaQuery; import org.apache.james.quota.search.QuotaSearcher; @@ -59,10 +59,12 @@ public class ElasticSearchQuotaSearcher implements QuotaSearcher { @Override public List<User> search(QuotaQuery query) { try { - return searchHits(query) - .map(SearchHit::getId) - .map(User::fromUsername) - .collect(Guavate.toImmutableList()); + try (Stream<SearchHit> searchHits = searchHits(query)) { + return searchHits + .map(SearchHit::getId) + .map(User::fromUsername) + .collect(Guavate.toImmutableList()); + } } catch (IOException e) { throw new RuntimeException("Unexpected exception while executing " + query, e); } @@ -92,13 +94,12 @@ public class ElasticSearchQuotaSearcher implements QuotaSearcher { } private Stream<SearchHit> executeScrolledSearch(QuotaQuery query) { - return new ScrollIterable(client, + return new ScrolledSearch(client, new SearchRequest(readAlias.getValue()) .types(NodeMappingFactory.DEFAULT_MAPPING_NAME) .source(searchSourceBuilder(query)) .scroll(TIMEOUT)) - .stream() - .flatMap(searchResponse -> Arrays.stream(searchResponse.getHits().getHits())) + .searchHits() .skip(query.getOffset().getValue()); } diff --git a/mailbox/store/src/main/java/org/apache/james/mailbox/store/StoreMessageManager.java b/mailbox/store/src/main/java/org/apache/james/mailbox/store/StoreMessageManager.java index 40318dc..5d63d3b 100644 --- a/mailbox/store/src/main/java/org/apache/james/mailbox/store/StoreMessageManager.java +++ b/mailbox/store/src/main/java/org/apache/james/mailbox/store/StoreMessageManager.java @@ -34,6 +34,7 @@ import java.util.List; import java.util.Map; import java.util.SortedMap; import java.util.TreeMap; +import java.util.stream.Stream; import javax.mail.Flags; import javax.mail.Flags.Flag; @@ -745,7 +746,7 @@ public class StoreMessageManager implements MessageManager { } @Override - public Iterator<MessageUid> search(SearchQuery query, MailboxSession mailboxSession) throws MailboxException { + public Stream<MessageUid> search(SearchQuery query, MailboxSession mailboxSession) throws MailboxException { if (query.equals(new SearchQuery(SearchQuery.all()))) { return listAllMessageUids(mailboxSession); } @@ -900,11 +901,11 @@ public class StoreMessageManager implements MessageManager { .getApplicableFlag(mailbox); } - private Iterator<MessageUid> listAllMessageUids(MailboxSession session) throws MailboxException { + private Stream<MessageUid> listAllMessageUids(MailboxSession session) throws MailboxException { final MessageMapper messageMapper = mapperFactory.getMessageMapper(session); return messageMapper.execute( - () -> messageMapper.listAllMessageUids(mailbox)); + () -> Iterators.toStream(messageMapper.listAllMessageUids(mailbox))); } @Override diff --git a/mailbox/store/src/main/java/org/apache/james/mailbox/store/search/LazyMessageSearchIndex.java b/mailbox/store/src/main/java/org/apache/james/mailbox/store/search/LazyMessageSearchIndex.java index c3faf9a..61a2f9b 100644 --- a/mailbox/store/src/main/java/org/apache/james/mailbox/store/search/LazyMessageSearchIndex.java +++ b/mailbox/store/src/main/java/org/apache/james/mailbox/store/search/LazyMessageSearchIndex.java @@ -23,6 +23,7 @@ import java.util.EnumSet; import java.util.Iterator; import java.util.List; import java.util.concurrent.ConcurrentHashMap; +import java.util.stream.Stream; import org.apache.james.mailbox.MailboxManager; import org.apache.james.mailbox.MailboxManager.SearchCapabilities; @@ -104,7 +105,7 @@ public class LazyMessageSearchIndex extends ListeningMessageSearchIndex { * */ @Override - public Iterator<MessageUid> search(MailboxSession session, Mailbox mailbox, SearchQuery searchQuery) throws MailboxException { + public Stream<MessageUid> search(MailboxSession session, Mailbox mailbox, SearchQuery searchQuery) throws MailboxException { Preconditions.checkArgument(session != null, "'session' is mandatory"); MailboxId id = mailbox.getMailboxId(); diff --git a/mailbox/store/src/main/java/org/apache/james/mailbox/store/search/MessageSearchIndex.java b/mailbox/store/src/main/java/org/apache/james/mailbox/store/search/MessageSearchIndex.java index 9343ee0..eb1dcee 100644 --- a/mailbox/store/src/main/java/org/apache/james/mailbox/store/search/MessageSearchIndex.java +++ b/mailbox/store/src/main/java/org/apache/james/mailbox/store/search/MessageSearchIndex.java @@ -21,9 +21,9 @@ package org.apache.james.mailbox.store.search; import java.util.Collection; import java.util.EnumSet; -import java.util.Iterator; import java.util.List; import java.util.Optional; +import java.util.stream.Stream; import org.apache.james.mailbox.MailboxManager; import org.apache.james.mailbox.MailboxSession; @@ -45,7 +45,7 @@ public interface MessageSearchIndex { /** * Return all uids of the previous indexed {@link Mailbox}'s which match the {@link SearchQuery} */ - Iterator<MessageUid> search(MailboxSession session, Mailbox mailbox, SearchQuery searchQuery) throws MailboxException; + Stream<MessageUid> search(MailboxSession session, Mailbox mailbox, SearchQuery searchQuery) throws MailboxException; /** * Return all uids of all {@link Mailbox}'s the current user has access to which match the {@link SearchQuery} diff --git a/mailbox/store/src/main/java/org/apache/james/mailbox/store/search/SimpleMessageSearchIndex.java b/mailbox/store/src/main/java/org/apache/james/mailbox/store/search/SimpleMessageSearchIndex.java index 2198506..26b602d 100644 --- a/mailbox/store/src/main/java/org/apache/james/mailbox/store/search/SimpleMessageSearchIndex.java +++ b/mailbox/store/src/main/java/org/apache/james/mailbox/store/search/SimpleMessageSearchIndex.java @@ -106,13 +106,12 @@ public class SimpleMessageSearchIndex implements MessageSearchIndex { } @Override - public Iterator<MessageUid> search(MailboxSession session, final Mailbox mailbox, SearchQuery query) throws MailboxException { + public Stream<MessageUid> search(MailboxSession session, final Mailbox mailbox, SearchQuery query) throws MailboxException { Preconditions.checkArgument(session != null, "'session' is mandatory"); return searchResults(session, ImmutableList.of(mailbox).stream(), query) .stream() .filter(searchResult -> searchResult.getMailboxId().equals(mailbox.getMailboxId())) - .map(SearchResult::getMessageUid) - .iterator(); + .map(SearchResult::getMessageUid); } private List<SearchResult> searchResults(MailboxSession session, Mailbox mailbox, SearchQuery query) throws MailboxException { diff --git a/mailbox/store/src/test/java/org/apache/james/mailbox/store/search/AbstractMessageSearchIndexTest.java b/mailbox/store/src/test/java/org/apache/james/mailbox/store/search/AbstractMessageSearchIndexTest.java index 6df7e78..0f189f4 100644 --- a/mailbox/store/src/test/java/org/apache/james/mailbox/store/search/AbstractMessageSearchIndexTest.java +++ b/mailbox/store/src/test/java/org/apache/james/mailbox/store/search/AbstractMessageSearchIndexTest.java @@ -62,7 +62,6 @@ import org.junit.Before; import org.junit.Test; import com.google.common.collect.ImmutableList; -import com.google.common.collect.Iterators; import com.jayway.awaitility.Awaitility; import com.jayway.awaitility.Duration; @@ -1488,7 +1487,7 @@ public abstract class AbstractMessageSearchIndexTest { .await() .atMost(30, TimeUnit.SECONDS) .until( - () -> Iterators.size(messageSearchIndex.search(session, newBox.getMailboxEntity(), searchQuery)) == 9); + () -> messageSearchIndex.search(session, newBox.getMailboxEntity(), searchQuery).count() == 9); } @Test diff --git a/protocols/imap/src/main/java/org/apache/james/imap/processor/AbstractMailboxProcessor.java b/protocols/imap/src/main/java/org/apache/james/imap/processor/AbstractMailboxProcessor.java index fa43bb7..971bc44 100644 --- a/protocols/imap/src/main/java/org/apache/james/imap/processor/AbstractMailboxProcessor.java +++ b/protocols/imap/src/main/java/org/apache/james/imap/processor/AbstractMailboxProcessor.java @@ -24,6 +24,7 @@ import java.util.Iterator; import java.util.List; import java.util.Optional; import java.util.Set; +import java.util.stream.Stream; import javax.mail.Flags; @@ -584,9 +585,8 @@ public abstract class AbstractMailboxProcessor<M extends ImapRequest> extends Ab } searchQuery.andCriteria(SearchQuery.uid(nRanges)); searchQuery.andCriteria(SearchQuery.modSeqGreaterThan(changedSince)); - Iterator<MessageUid> uids = mailbox.search(searchQuery, session); - while (uids.hasNext()) { - vanishedUids.remove(uids.next()); + try (Stream<MessageUid> uids = mailbox.search(searchQuery, session)) { + uids.forEach(vanishedUids::remove); } UidRange[] vanishedIdRanges = uidRanges(MessageRange.toRanges(vanishedUids)); responder.respond(new VanishedResponse(vanishedIdRanges, true)); diff --git a/protocols/imap/src/main/java/org/apache/james/imap/processor/SearchProcessor.java b/protocols/imap/src/main/java/org/apache/james/imap/processor/SearchProcessor.java index f074eec..c01bd31 100644 --- a/protocols/imap/src/main/java/org/apache/james/imap/processor/SearchProcessor.java +++ b/protocols/imap/src/main/java/org/apache/james/imap/processor/SearchProcessor.java @@ -23,10 +23,9 @@ import java.io.Closeable; import java.util.ArrayList; import java.util.Collection; import java.util.Date; -import java.util.Iterator; import java.util.List; import java.util.Optional; -import java.util.TreeSet; +import java.util.stream.Stream; import javax.mail.Flags.Flag; @@ -68,6 +67,7 @@ import org.apache.james.util.MDCBuilder; import org.slf4j.Logger; import org.slf4j.LoggerFactory; +import com.github.steveash.guavate.Guavate; import com.google.common.collect.ImmutableList; public class SearchProcessor extends AbstractMailboxProcessor<SearchRequest> implements CapabilityImplementingProcessor { @@ -94,27 +94,10 @@ public class SearchProcessor extends AbstractMailboxProcessor<SearchRequest> imp final SearchQuery query = toQuery(searchKey, session); MailboxSession msession = ImapSessionUtils.getMailboxSession(session); - final Iterator<MessageUid> it = mailbox.search(query, msession); - - final Collection<Long> results = new TreeSet<>(); - final Collection<MessageUid> uids = new TreeSet<>(); - - while (it.hasNext()) { - final MessageUid uid = it.next(); - final Long number; - if (useUids) { - uids.add(uid); - results.add(uid.asLong()); - } else { - final int msn = session.getSelected().msn(uid); - number = (long) msn; - if (number == SelectedMailbox.NO_SUCH_MESSAGE == false) { - results.add(number); - } - } - - } - + + final Collection<MessageUid> uids = performUidSearch(mailbox, query, msession); + final Collection<Long> results = asResults(session, useUids, uids); + // Check if the search did contain the MODSEQ searchkey. If so we need to include the highest mod in the response. // // See RFC4551: 3.4. MODSEQ Search Criterion in SEARCH @@ -220,7 +203,27 @@ public class SearchProcessor extends AbstractMailboxProcessor<SearchRequest> imp session.setAttribute(SEARCH_MODSEQ, null); } } - + + private Collection<Long> asResults(ImapSession session, boolean useUids, Collection<MessageUid> uids) { + if (useUids) { + return uids.stream() + .map(MessageUid::asLong) + .collect(Guavate.toImmutableList()); + } else { + return uids.stream() + .map(uid -> session.getSelected().msn(uid)) + .map(Integer::longValue) + .filter(msn -> msn != SelectedMailbox.NO_SUCH_MESSAGE) + .collect(Guavate.toImmutableList()); + } + } + + private Collection<MessageUid> performUidSearch(MessageManager mailbox, SearchQuery query, MailboxSession msession) throws MailboxException { + try (Stream<MessageUid> stream = mailbox.search(query, msession)) { + return stream.collect(Guavate.toImmutableList()); + } + } + private long[] toArray(Collection<Long> results) { return results.stream().mapToLong(x -> x).toArray(); } diff --git a/protocols/imap/src/main/java/org/apache/james/imap/processor/base/SelectedMailboxImpl.java b/protocols/imap/src/main/java/org/apache/james/imap/processor/base/SelectedMailboxImpl.java index c1ec72d..63a4c77 100644 --- a/protocols/imap/src/main/java/org/apache/james/imap/processor/base/SelectedMailboxImpl.java +++ b/protocols/imap/src/main/java/org/apache/james/imap/processor/base/SelectedMailboxImpl.java @@ -27,6 +27,7 @@ import java.util.List; import java.util.Optional; import java.util.Set; import java.util.TreeSet; +import java.util.stream.Stream; import javax.mail.Flags; import javax.mail.Flags.Flag; @@ -49,7 +50,7 @@ import org.apache.james.mailbox.model.MailboxPath; import org.apache.james.mailbox.model.SearchQuery; import org.apache.james.mailbox.model.UpdatedFlags; -import com.google.common.collect.ImmutableList; +import com.github.steveash.guavate.Guavate; /** * Default implementation of {@link SelectedMailbox} @@ -92,8 +93,9 @@ public class SelectedMailboxImpl implements SelectedMailbox, MailboxListener { registration = eventBus.register(this, new MailboxIdRegistrationKey(mailboxId)); applicableFlags = messageManager.getApplicableFlags(mailboxSession); - uidMsnConverter.addAll(ImmutableList.copyOf( - messageManager.search(new SearchQuery(SearchQuery.all()), mailboxSession))); + try (Stream<MessageUid> stream = messageManager.search(new SearchQuery(SearchQuery.all()), mailboxSession)) { + uidMsnConverter.addAll(stream.collect(Guavate.toImmutableList())); + } } @Override diff --git a/protocols/imap/src/test/java/org/apache/james/imap/processor/SearchProcessorTest.java b/protocols/imap/src/test/java/org/apache/james/imap/processor/SearchProcessorTest.java index b3d4911..3e5c0a2 100644 --- a/protocols/imap/src/test/java/org/apache/james/imap/processor/SearchProcessorTest.java +++ b/protocols/imap/src/test/java/org/apache/james/imap/processor/SearchProcessorTest.java @@ -33,6 +33,7 @@ import java.util.List; import java.util.Locale; import java.util.Optional; import java.util.TimeZone; +import java.util.stream.Stream; import javax.mail.Flags; import javax.mail.Flags.Flag; @@ -469,7 +470,7 @@ public class SearchProcessorTest { private void check(SearchKey key, final SearchQuery query) throws Exception { when(session.getAttribute(SearchProcessor.SEARCH_MODSEQ)).thenReturn(null); when(session.getAttribute(ImapSessionUtils.MAILBOX_SESSION_ATTRIBUTE_SESSION_KEY)).thenReturn(mailboxSession); - when(mailbox.search(query, mailboxSession)).thenReturn(new ArrayList<MessageUid>().iterator()); + when(mailbox.search(query, mailboxSession)).thenReturn(Stream.empty()); when(selectedMailbox.getApplicableFlags()).thenReturn(new Flags()); when(selectedMailbox.hasNewApplicableFlags()).thenReturn(false); diff --git a/protocols/imap/src/test/java/org/apache/james/imap/processor/base/MailboxEventAnalyserTest.java b/protocols/imap/src/test/java/org/apache/james/imap/processor/base/MailboxEventAnalyserTest.java index a22744a..7bbeaef 100644 --- a/protocols/imap/src/test/java/org/apache/james/imap/processor/base/MailboxEventAnalyserTest.java +++ b/protocols/imap/src/test/java/org/apache/james/imap/processor/base/MailboxEventAnalyserTest.java @@ -25,6 +25,7 @@ import static org.mockito.Mockito.mock; import static org.mockito.Mockito.when; import java.util.Date; +import java.util.stream.Stream; import javax.mail.Flags; @@ -56,8 +57,6 @@ import org.apache.james.metrics.api.NoopMetricFactory; import org.junit.Before; import org.junit.Test; -import com.google.common.collect.ImmutableList; - public class MailboxEventAnalyserTest { private static final MessageUid UID = MessageUid.of(900); private static final UpdatedFlags ADD_RECENT_UPDATED_FLAGS = UpdatedFlags.builder() @@ -152,7 +151,7 @@ public class MailboxEventAnalyserTest { when(messageManager.getApplicableFlags(any())).thenReturn(new Flags()); when(messageManager.getId()).thenReturn(MAILBOX_ID); when(messageManager.search(any(), any())) - .thenReturn(ImmutableList.of(MESSAGE_UID).iterator()); + .thenReturn(Stream.of(MESSAGE_UID)); when(messageManager.getMessages(any(), any(), any())) .thenReturn(new SingleMessageResultIterator(messageResult)); diff --git a/protocols/imap/src/test/java/org/apache/james/imap/processor/base/SelectedMailboxImplTest.java b/protocols/imap/src/test/java/org/apache/james/imap/processor/base/SelectedMailboxImplTest.java index 64fbb7d..7d7df56 100644 --- a/protocols/imap/src/test/java/org/apache/james/imap/processor/base/SelectedMailboxImplTest.java +++ b/protocols/imap/src/test/java/org/apache/james/imap/processor/base/SelectedMailboxImplTest.java @@ -32,6 +32,7 @@ import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; import java.util.concurrent.ThreadFactory; import java.util.concurrent.atomic.AtomicInteger; +import java.util.stream.Stream; import javax.mail.Flags; @@ -61,8 +62,6 @@ import org.mockito.stubbing.Answer; import org.slf4j.Logger; import org.slf4j.LoggerFactory; -import com.google.common.collect.ImmutableList; - public class SelectedMailboxImplTest { @@ -146,10 +145,10 @@ public class SelectedMailboxImplTest { .isEqualTo(1); } - private Answer<Iterator<MessageUid>> delayedSearchAnswer() { + private Answer<Stream<MessageUid>> delayedSearchAnswer() { return invocation -> { Thread.sleep(1000); - return ImmutableList.of(MessageUid.of(1), MessageUid.of(3)).iterator(); + return Stream.of(MessageUid.of(1), MessageUid.of(3)); }; } diff --git a/server/container/guice/memory-guice/src/main/java/org/apache/james/FakeMessageSearchIndex.java b/server/container/guice/memory-guice/src/main/java/org/apache/james/FakeMessageSearchIndex.java index fdd7cea..1ab3a1d 100644 --- a/server/container/guice/memory-guice/src/main/java/org/apache/james/FakeMessageSearchIndex.java +++ b/server/container/guice/memory-guice/src/main/java/org/apache/james/FakeMessageSearchIndex.java @@ -21,8 +21,8 @@ package org.apache.james; import java.util.Collection; import java.util.EnumSet; -import java.util.Iterator; import java.util.List; +import java.util.stream.Stream; import org.apache.commons.lang.NotImplementedException; import org.apache.james.mailbox.MailboxManager; @@ -73,7 +73,7 @@ public class FakeMessageSearchIndex extends ListeningMessageSearchIndex { } @Override - public Iterator<MessageUid> search(MailboxSession session, Mailbox mailbox, SearchQuery searchQuery) throws MailboxException { + public Stream<MessageUid> search(MailboxSession session, Mailbox mailbox, SearchQuery searchQuery) throws MailboxException { throw new NotImplementedException(); } --------------------------------------------------------------------- To unsubscribe, e-mail: server-dev-unsubscr...@james.apache.org For additional commands, e-mail: server-dev-h...@james.apache.org