JAMES-1901 Create a ElasticSearch backend project
Project: http://git-wip-us.apache.org/repos/asf/james-project/repo Commit: http://git-wip-us.apache.org/repos/asf/james-project/commit/1688bc6f Tree: http://git-wip-us.apache.org/repos/asf/james-project/tree/1688bc6f Diff: http://git-wip-us.apache.org/repos/asf/james-project/diff/1688bc6f Branch: refs/heads/master Commit: 1688bc6f98c217fdf82475e47d1636c7b78d3533 Parents: ea114d7 Author: Benoit Tellier <[email protected]> Authored: Mon Jan 23 18:51:49 2017 +0700 Committer: Benoit Tellier <[email protected]> Committed: Fri Feb 3 16:43:39 2017 +0700 ---------------------------------------------------------------------- backends-common/elasticsearch/pom.xml | 222 ++++++++++++++++ .../james/backends/es/ClientProvider.java | 26 ++ .../james/backends/es/ClientProviderImpl.java | 48 ++++ .../backends/es/DeleteByQueryPerformer.java | 90 +++++++ .../james/backends/es/ElasticSearchIndexer.java | 102 ++++++++ .../james/backends/es/IndexCreationFactory.java | 84 ++++++ .../james/backends/es/NodeMappingFactory.java | 55 ++++ .../backends/es/search/ScrollIterable.java | 81 ++++++ .../backends/es/ElasticSearchIndexerTest.java | 244 +++++++++++++++++ .../backends/es/EmbeddedElasticSearch.java | 118 +++++++++ .../backends/es/search/ScrollIterableTest.java | 174 +++++++++++++ .../es/utils/TestingClientProvider.java | 37 +++ backends-common/pom.xml | 13 + mailbox/elasticsearch/pom.xml | 15 +- .../mailbox/elasticsearch/ClientProvider.java | 26 -- .../elasticsearch/ClientProviderImpl.java | 48 ---- .../elasticsearch/DeleteByQueryPerformer.java | 86 ------ .../elasticsearch/ElasticSearchIndexer.java | 100 ------- .../elasticsearch/IndexCreationFactory.java | 84 ------ .../MailboxElasticsearchConstants.java | 25 ++ .../elasticsearch/MailboxMappingFactory.java | 229 ++++++++++++++++ .../elasticsearch/NodeMappingFactory.java | 259 ------------------- ...lasticSearchListeningMessageSearchIndex.java | 2 +- .../elasticsearch/query/SortConverter.java | 3 +- .../search/ElasticSearchSearcher.java | 7 +- .../elasticsearch/search/ScrollIterable.java | 81 ------ .../elasticsearch/ElasticSearchIndexerTest.java | 238 ----------------- .../ElasticSearchIntegrationTest.java | 27 +- .../elasticsearch/EmbeddedElasticSearch.java | 116 --------- ...hListeningMailboxMessageSearchIndexTest.java | 2 +- .../search/ScrollIterableTest.java | 173 ------------- .../utils/TestingClientProvider.java | 37 --- mailbox/pom.xml | 19 ++ mpt/impl/imap-mailbox/elasticsearch/pom.xml | 18 +- .../host/ElasticSearchHostSystem.java | 23 +- mpt/impl/smtp/cassandra/pom.xml | 6 +- .../smtp/host/CassandraJamesSmtpHostSystem.java | 5 +- mpt/pom.xml | 7 + server/container/guice/cassandra-guice/pom.xml | 6 + .../mailbox/ElasticSearchMailboxModule.java | 22 +- .../apache/james/EmbeddedElasticSearchRule.java | 6 +- .../james/JamesCapabilitiesServerTest.java | 5 +- .../modules/CassandraJmapServerModule.java | 2 +- .../james/modules/TestElasticSearchModule.java | 20 +- server/pom.xml | 11 + .../cassandra-jmap-integration-testing/pom.xml | 6 + .../cassandra/cucumber/CassandraStepdefs.java | 5 +- .../protocols/jmap-integration-testing/pom.xml | 1 + .../protocols/webadmin-integration-test/pom.xml | 6 + 49 files changed, 1717 insertions(+), 1303 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/james-project/blob/1688bc6f/backends-common/elasticsearch/pom.xml ---------------------------------------------------------------------- diff --git a/backends-common/elasticsearch/pom.xml b/backends-common/elasticsearch/pom.xml new file mode 100644 index 0000000..624ec16 --- /dev/null +++ b/backends-common/elasticsearch/pom.xml @@ -0,0 +1,222 @@ +<?xml version="1.0" encoding="UTF-8"?> +<!-- + Licensed to the Apache Software Foundation (ASF) under one + or more contributor license agreements. See the NOTICE file + distributed with this work for additional information + regarding copyright ownership. The ASF licenses this file + to you under the Apache License, Version 2.0 (the + "License"); you may not use this file except in compliance + with the License. You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + + Unless required by applicable law or agreed to in writing, + software distributed under the License is distributed on an + "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + KIND, either express or implied. See the License for the + specific language governing permissions and limitations + under the License. +--> +<project xmlns="http://maven.apache.org/POM/4.0.0" + xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" + xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd"> + <parent> + <artifactId>james-backends-common</artifactId> + <groupId>org.apache.james</groupId> + <version>3.0.0-beta6-SNAPSHOT</version> + </parent> + <modelVersion>4.0.0</modelVersion> + + <artifactId>apache-james-backends-es</artifactId> + + <profiles> + <profile> + <id>disable-build-for-older-jdk</id> + <activation> + <jdk>(,1.8)</jdk> + </activation> + <build> + <plugins> + <plugin> + <artifactId>maven-jar-plugin</artifactId> + <executions> + <execution> + <id>default-jar</id> + <phase>none</phase> + </execution> + <execution> + <id>jar</id> + <phase>none</phase> + </execution> + <execution> + <id>test-jar</id> + <phase>none</phase> + </execution> + </executions> + </plugin> + <plugin> + <artifactId>maven-compiler-plugin</artifactId> + <executions> + <execution> + <id>default-compile</id> + <phase>none</phase> + </execution> + <execution> + <id>default-testCompile</id> + <phase>none</phase> + </execution> + </executions> + </plugin> + <plugin> + <artifactId>maven-surefire-plugin</artifactId> + <executions> + <execution> + <id>default-test</id> + <phase>none</phase> + </execution> + </executions> + </plugin> + <plugin> + <artifactId>maven-source-plugin</artifactId> + <executions> + <execution> + <id>attach-sources</id> + <phase>none</phase> + </execution> + </executions> + </plugin> + <plugin> + <artifactId>maven-install-plugin</artifactId> + <executions> + <execution> + <id>default-install</id> + <phase>none</phase> + </execution> + </executions> + </plugin> + <plugin> + <artifactId>maven-resources-plugin</artifactId> + <executions> + <execution> + <id>default-resources</id> + <phase>none</phase> + </execution> + <execution> + <id>default-testResources</id> + <phase>none</phase> + </execution> + </executions> + </plugin> + <plugin> + <artifactId>maven-site-plugin</artifactId> + <executions> + <execution> + <id>attach-descriptor</id> + <phase>none</phase> + </execution> + </executions> + </plugin> + </plugins> + </build> + </profile> + <profile> + <id>build-for-jdk-8</id> + <activation> + <jdk>[1.8,)</jdk> + </activation> + <dependencies> + <dependency> + <groupId>org.apache.james</groupId> + <artifactId>james-server-util-java8</artifactId> + </dependency> + <dependency> + <groupId>com.jayway.awaitility</groupId> + <artifactId>awaitility</artifactId> + <version>1.6.3</version> + </dependency> + <dependency> + <groupId>junit</groupId> + <artifactId>junit</artifactId> + </dependency> + <dependency> + <groupId>org.assertj</groupId> + <artifactId>assertj-core</artifactId> + <version>${assertj-3.version}</version> + <scope>test</scope> + </dependency> + <dependency> + <groupId>org.elasticsearch</groupId> + <artifactId>elasticsearch</artifactId> + <version>2.2.1</version> + </dependency> + <dependency> + <groupId>org.elasticsearch</groupId> + <artifactId>elasticsearch</artifactId> + <version>2.2.1</version> + <scope>test</scope> + <type>test-jar</type> + </dependency> + <dependency> + <groupId>org.slf4j</groupId> + <artifactId>slf4j-api</artifactId> + </dependency> + </dependencies> + <build> + <plugins> + <plugin> + <artifactId>maven-assembly-plugin</artifactId> + <configuration> + <archive> + <manifest> + <mainClass>fully.qualified.MainClass</mainClass> + </manifest> + </archive> + <descriptorRefs> + <descriptorRef>jar-with-dependencies</descriptorRef> + </descriptorRefs> + </configuration> + </plugin> + <plugin> + <groupId>org.apache.maven.plugins</groupId> + <artifactId>maven-compiler-plugin</artifactId> + <configuration> + <source>1.8</source> + <target>1.8</target> + </configuration> + </plugin> + </plugins> + </build> + </profile> + <profile> + <id>animal-sniffer-java-8</id> + <activation> + <jdk>[1.8,)</jdk> + </activation> + <build> + <plugins> + <plugin> + <groupId>org.codehaus.mojo</groupId> + <artifactId>animal-sniffer-maven-plugin</artifactId> + <configuration> + <signature> + <groupId>org.codehaus.mojo.signature</groupId> + <artifactId>java18</artifactId> + <version>1.0</version> + </signature> + </configuration> + <executions> + <execution> + <id>check_java_8</id> + <phase>test</phase> + <goals> + <goal>check</goal> + </goals> + </execution> + </executions> + </plugin> + </plugins> + </build> + </profile> + </profiles> + +</project> \ No newline at end of file http://git-wip-us.apache.org/repos/asf/james-project/blob/1688bc6f/backends-common/elasticsearch/src/main/java/org/apache/james/backends/es/ClientProvider.java ---------------------------------------------------------------------- diff --git a/backends-common/elasticsearch/src/main/java/org/apache/james/backends/es/ClientProvider.java b/backends-common/elasticsearch/src/main/java/org/apache/james/backends/es/ClientProvider.java new file mode 100644 index 0000000..81ed92f --- /dev/null +++ b/backends-common/elasticsearch/src/main/java/org/apache/james/backends/es/ClientProvider.java @@ -0,0 +1,26 @@ +/**************************************************************** + * Licensed to the Apache Software Foundation (ASF) under one * + * or more contributor license agreements. See the NOTICE file * + * distributed with this work for additional information * + * regarding copyright ownership. The ASF licenses this file * + * to you under the Apache License, Version 2.0 (the * + * "License"); you may not use this file except in compliance * + * with the License. You may obtain a copy of the License at * + * * + * http://www.apache.org/licenses/LICENSE-2.0 * + * * + * Unless required by applicable law or agreed to in writing, * + * software distributed under the License is distributed on an * + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY * + * KIND, either express or implied. See the License for the * + * specific language governing permissions and limitations * + * under the License. * + ****************************************************************/ +package org.apache.james.backends.es; + +import org.elasticsearch.client.Client; + +public interface ClientProvider { + + Client get(); +} http://git-wip-us.apache.org/repos/asf/james-project/blob/1688bc6f/backends-common/elasticsearch/src/main/java/org/apache/james/backends/es/ClientProviderImpl.java ---------------------------------------------------------------------- diff --git a/backends-common/elasticsearch/src/main/java/org/apache/james/backends/es/ClientProviderImpl.java b/backends-common/elasticsearch/src/main/java/org/apache/james/backends/es/ClientProviderImpl.java new file mode 100644 index 0000000..118d077 --- /dev/null +++ b/backends-common/elasticsearch/src/main/java/org/apache/james/backends/es/ClientProviderImpl.java @@ -0,0 +1,48 @@ +/**************************************************************** + * Licensed to the Apache Software Foundation (ASF) under one * + * or more contributor license agreements. See the NOTICE file * + * distributed with this work for additional information * + * regarding copyright ownership. The ASF licenses this file * + * to you under the Apache License, Version 2.0 (the * + * "License"); you may not use this file except in compliance * + * with the License. You may obtain a copy of the License at * + * * + * http://www.apache.org/licenses/LICENSE-2.0 * + * * + * Unless required by applicable law or agreed to in writing, * + * software distributed under the License is distributed on an * + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY * + * KIND, either express or implied. See the License for the * + * specific language governing permissions and limitations * + * under the License. * + ****************************************************************/ +package org.apache.james.backends.es; + +import java.net.InetAddress; +import java.net.UnknownHostException; + +import org.elasticsearch.client.Client; +import org.elasticsearch.client.transport.TransportClient; +import org.elasticsearch.common.transport.InetSocketTransportAddress; + +import com.google.common.base.Throwables; + +public class ClientProviderImpl implements ClientProvider { + + private final String host; + private final int port; + + public ClientProviderImpl(String host, int port) { + this.host = host; + this.port = port; + } + + public Client get() { + try { + return TransportClient.builder().build() + .addTransportAddress(new InetSocketTransportAddress(InetAddress.getByName(host), port)); + } catch (UnknownHostException e) { + throw Throwables.propagate(e); + } + } +} http://git-wip-us.apache.org/repos/asf/james-project/blob/1688bc6f/backends-common/elasticsearch/src/main/java/org/apache/james/backends/es/DeleteByQueryPerformer.java ---------------------------------------------------------------------- diff --git a/backends-common/elasticsearch/src/main/java/org/apache/james/backends/es/DeleteByQueryPerformer.java b/backends-common/elasticsearch/src/main/java/org/apache/james/backends/es/DeleteByQueryPerformer.java new file mode 100644 index 0000000..065fe19 --- /dev/null +++ b/backends-common/elasticsearch/src/main/java/org/apache/james/backends/es/DeleteByQueryPerformer.java @@ -0,0 +1,90 @@ +/**************************************************************** + * Licensed to the Apache Software Foundation (ASF) under one * + * or more contributor license agreements. See the NOTICE file * + * distributed with this work for additional information * + * regarding copyright ownership. The ASF licenses this file * + * to you under the Apache License, Version 2.0 (the * + * "License"); you may not use this file except in compliance * + * with the License. You may obtain a copy of the License at * + * * + * http://www.apache.org/licenses/LICENSE-2.0 * + * * + * Unless required by applicable law or agreed to in writing, * + * software distributed under the License is distributed on an * + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY * + * KIND, either express or implied. See the License for the * + * specific language governing permissions and limitations * + * under the License. * + ****************************************************************/ + +package org.apache.james.backends.es; + +import java.util.concurrent.ExecutorService; + +import javax.inject.Inject; +import javax.inject.Named; + +import org.apache.james.backends.es.search.ScrollIterable; +import org.elasticsearch.action.ListenableActionFuture; +import org.elasticsearch.action.bulk.BulkRequestBuilder; +import org.elasticsearch.action.bulk.BulkResponse; +import org.elasticsearch.action.search.SearchResponse; +import org.elasticsearch.client.Client; +import org.elasticsearch.common.unit.TimeValue; +import org.elasticsearch.index.query.QueryBuilder; +import org.elasticsearch.search.SearchHit; + +import com.google.common.annotations.VisibleForTesting; + +public class DeleteByQueryPerformer { + public static final int DEFAULT_BATCH_SIZE = 100; + public static final TimeValue TIMEOUT = new TimeValue(60000); + + private final Client client; + private final ExecutorService executor; + private final int batchSize; + private final String indexName; + private final String typeName; + + @Inject + public DeleteByQueryPerformer(Client client, @Named("AsyncExecutor") ExecutorService executor, String indexName, String typeName) { + this(client, executor, DEFAULT_BATCH_SIZE, indexName, typeName); + } + + @VisibleForTesting + public DeleteByQueryPerformer(Client client, @Named("AsyncExecutor") ExecutorService executor, int batchSize, String indexName, String typeName) { + this.client = client; + this.executor = executor; + this.batchSize = batchSize; + this.indexName = indexName; + this.typeName = typeName; + } + + public void perform(QueryBuilder queryBuilder) { + executor.execute(() -> doDeleteByQuery(queryBuilder)); + } + + protected void doDeleteByQuery(QueryBuilder queryBuilder) { + new ScrollIterable(client, + client.prepareSearch(indexName) + .setTypes(typeName) + .setScroll(TIMEOUT) + .setNoFields() + .setQuery(queryBuilder) + .setSize(batchSize)) + .stream() + .forEach(searchResponse -> deleteRetrievedIds(client, searchResponse)); + } + + private ListenableActionFuture<BulkResponse> deleteRetrievedIds(Client client, SearchResponse searchResponse) { + BulkRequestBuilder bulkRequestBuilder = client.prepareBulk(); + for (SearchHit hit : searchResponse.getHits()) { + bulkRequestBuilder.add(client.prepareDelete() + .setIndex(indexName) + .setType(typeName) + .setId(hit.getId())); + } + return bulkRequestBuilder.execute(); + } + +} http://git-wip-us.apache.org/repos/asf/james-project/blob/1688bc6f/backends-common/elasticsearch/src/main/java/org/apache/james/backends/es/ElasticSearchIndexer.java ---------------------------------------------------------------------- diff --git a/backends-common/elasticsearch/src/main/java/org/apache/james/backends/es/ElasticSearchIndexer.java b/backends-common/elasticsearch/src/main/java/org/apache/james/backends/es/ElasticSearchIndexer.java new file mode 100644 index 0000000..5cbf1f4 --- /dev/null +++ b/backends-common/elasticsearch/src/main/java/org/apache/james/backends/es/ElasticSearchIndexer.java @@ -0,0 +1,102 @@ +/**************************************************************** + * Licensed to the Apache Software Foundation (ASF) under one * + * or more contributor license agreements. See the NOTICE file * + * distributed with this work for additional information * + * regarding copyright ownership. The ASF licenses this file * + * to you under the Apache License, Version 2.0 (the * + * "License"); you may not use this file except in compliance * + * with the License. You may obtain a copy of the License at * + * * + * http://www.apache.org/licenses/LICENSE-2.0 * + * * + * Unless required by applicable law or agreed to in writing, * + * software distributed under the License is distributed on an * + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY * + * KIND, either express or implied. See the License for the * + * specific language governing permissions and limitations * + * under the License. * + ****************************************************************/ +package org.apache.james.backends.es; + +import java.util.List; + +import javax.inject.Inject; + +import org.elasticsearch.action.bulk.BulkRequestBuilder; +import org.elasticsearch.action.bulk.BulkResponse; +import org.elasticsearch.action.index.IndexResponse; +import org.elasticsearch.client.Client; +import org.elasticsearch.common.Strings; +import org.elasticsearch.index.query.QueryBuilder; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import com.google.common.base.Preconditions; + +public class ElasticSearchIndexer { + + public static class UpdatedRepresentation { + private final String id; + private final String updatedDocumentPart; + + public UpdatedRepresentation(String id, String updatedDocumentPart) { + Preconditions.checkArgument(!Strings.isNullOrEmpty(id), "Updated id must be specified " + id); + Preconditions.checkArgument(!Strings.isNullOrEmpty(updatedDocumentPart), "Updated document must be specified"); + this.id = id; + this.updatedDocumentPart = updatedDocumentPart; + } + + public String getId() { + return id; + } + + public String getUpdatedDocumentPart() { + return updatedDocumentPart; + } + } + + private static final Logger LOGGER = LoggerFactory.getLogger(ElasticSearchIndexer.class); + + private final Client client; + private final DeleteByQueryPerformer deleteByQueryPerformer; + private final String indexName; + private final String typeName; + + @Inject + public ElasticSearchIndexer(Client client, DeleteByQueryPerformer deleteByQueryPerformer, String indexName, String typeName) { + this.client = client; + this.deleteByQueryPerformer = deleteByQueryPerformer; + this.indexName = indexName; + this.typeName = typeName; + } + + public IndexResponse indexMessage(String id, String content) { + checkArgument(content); + LOGGER.debug(String.format("Indexing %s: %s", id, content)); + return client.prepareIndex(indexName, typeName, id) + .setSource(content) + .get(); + } + + public BulkResponse updateMessages(List<UpdatedRepresentation> updatedDocumentParts) { + Preconditions.checkNotNull(updatedDocumentParts); + BulkRequestBuilder bulkRequestBuilder = client.prepareBulk(); + updatedDocumentParts.forEach(updatedDocumentPart -> bulkRequestBuilder.add(client.prepareUpdate(indexName, typeName, updatedDocumentPart.getId()) + .setDoc(updatedDocumentPart.getUpdatedDocumentPart()))); + return bulkRequestBuilder.get(); + } + + public BulkResponse deleteMessages(List<String> ids) { + BulkRequestBuilder bulkRequestBuilder = client.prepareBulk(); + ids.forEach(id -> bulkRequestBuilder.add(client.prepareDelete(indexName, typeName, id))); + return bulkRequestBuilder.get(); + } + + public void deleteAllMatchingQuery(QueryBuilder queryBuilder) { + deleteByQueryPerformer.perform(queryBuilder); + } + + private void checkArgument(String content) { + Preconditions.checkArgument(content != null, "content should be provided"); + } +} http://git-wip-us.apache.org/repos/asf/james-project/blob/1688bc6f/backends-common/elasticsearch/src/main/java/org/apache/james/backends/es/IndexCreationFactory.java ---------------------------------------------------------------------- diff --git a/backends-common/elasticsearch/src/main/java/org/apache/james/backends/es/IndexCreationFactory.java b/backends-common/elasticsearch/src/main/java/org/apache/james/backends/es/IndexCreationFactory.java new file mode 100644 index 0000000..cab6021 --- /dev/null +++ b/backends-common/elasticsearch/src/main/java/org/apache/james/backends/es/IndexCreationFactory.java @@ -0,0 +1,84 @@ +/**************************************************************** + * Licensed to the Apache Software Foundation (ASF) under one * + * or more contributor license agreements. See the NOTICE file * + * distributed with this work for additional information * + * regarding copyright ownership. The ASF licenses this file * + * to you under the Apache License, Version 2.0 (the * + * "License"); you may not use this file except in compliance * + * with the License. You may obtain a copy of the License at * + * * + * http://www.apache.org/licenses/LICENSE-2.0 * + * * + * Unless required by applicable law or agreed to in writing, * + * software distributed under the License is distributed on an * + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY * + * KIND, either express or implied. See the License for the * + * specific language governing permissions and limitations * + * under the License. * + ****************************************************************/ + +package org.apache.james.backends.es; + +import static org.elasticsearch.common.xcontent.XContentFactory.jsonBuilder; + +import java.io.IOException; + +import org.elasticsearch.client.Client; +import org.elasticsearch.common.xcontent.XContentBuilder; +import org.elasticsearch.indices.IndexAlreadyExistsException; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +public class IndexCreationFactory { + + private static final Logger LOGGER = LoggerFactory.getLogger(IndexCreationFactory.class); + private static final int DEFAULT_NB_SHARDS = 1; + private static final int DEFAULT_NB_REPLICA = 0; + public static final String CASE_INSENSITIVE = "case_insensitive"; + + public static Client createIndex(Client client, String name, int nbShards, int nbReplica) { + try { + return createIndex(client, name, generateSetting(nbShards, nbReplica)); + } catch (IOException e) { + LOGGER.error("Error while creating index : ", e); + return client; + } + } + + public static Client createIndex(Client client, String name) { + return createIndex(client, name, DEFAULT_NB_SHARDS, DEFAULT_NB_REPLICA); + } + + private static Client createIndex(Client client, String name, XContentBuilder settings) { + try { + client.admin() + .indices() + .prepareCreate(name) + .setSettings(settings) + .execute() + .actionGet(); + } catch (IndexAlreadyExistsException exception) { + LOGGER.info("Index [" + name + "] already exist"); + } + return client; + } + + private static XContentBuilder generateSetting(int nbShards, int nbReplica) throws IOException { + return jsonBuilder() + .startObject() + .field("number_of_shards", nbShards) + .field("number_of_replicas", nbReplica) + .startObject("analysis") + .startObject("analyzer") + .startObject(CASE_INSENSITIVE) + .field("tokenizer", "keyword") + .startArray("filter") + .value("lowercase") + .endArray() + .endObject() + .endObject() + .endObject() + .endObject(); + } + +} http://git-wip-us.apache.org/repos/asf/james-project/blob/1688bc6f/backends-common/elasticsearch/src/main/java/org/apache/james/backends/es/NodeMappingFactory.java ---------------------------------------------------------------------- diff --git a/backends-common/elasticsearch/src/main/java/org/apache/james/backends/es/NodeMappingFactory.java b/backends-common/elasticsearch/src/main/java/org/apache/james/backends/es/NodeMappingFactory.java new file mode 100644 index 0000000..876c741 --- /dev/null +++ b/backends-common/elasticsearch/src/main/java/org/apache/james/backends/es/NodeMappingFactory.java @@ -0,0 +1,55 @@ +/**************************************************************** + * Licensed to the Apache Software Foundation (ASF) under one * + * or more contributor license agreements. See the NOTICE file * + * distributed with this work for additional information * + * regarding copyright ownership. The ASF licenses this file * + * to you under the Apache License, Version 2.0 (the * + * "License"); you may not use this file except in compliance * + * with the License. You may obtain a copy of the License at * + * * + * http://www.apache.org/licenses/LICENSE-2.0 * + * * + * Unless required by applicable law or agreed to in writing, * + * software distributed under the License is distributed on an * + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY * + * KIND, either express or implied. See the License for the * + * specific language governing permissions and limitations * + * under the License. * + ****************************************************************/ + +package org.apache.james.backends.es; + +import org.elasticsearch.client.Client; +import org.elasticsearch.common.xcontent.XContentBuilder; + +public class NodeMappingFactory { + + public static final String BOOLEAN = "boolean"; + public static final String TYPE = "type"; + public static final String LONG = "long"; + public static final String INDEX = "index"; + public static final String NOT_ANALYZED = "not_analyzed"; + public static final String STRING = "string"; + public static final String PROPERTIES = "properties"; + public static final String DATE = "date"; + public static final String FORMAT = "format"; + public static final String NESTED = "nested"; + public static final String FIELDS = "fields"; + public static final String RAW = "raw"; + public static final String ANALYZER = "analyzer"; + public static final String SNOWBALL = "snowball"; + public static final String IGNORE_ABOVE = "ignore_above"; + public static final int LUCENE_LIMIT = 32766; + + public static Client applyMapping(Client client, String indexName, String typeName, XContentBuilder mappingsSources) { + client.admin() + .indices() + .preparePutMapping(indexName) + .setType(typeName) + .setSource(mappingsSources) + .execute() + .actionGet(); + return client; + } + +} http://git-wip-us.apache.org/repos/asf/james-project/blob/1688bc6f/backends-common/elasticsearch/src/main/java/org/apache/james/backends/es/search/ScrollIterable.java ---------------------------------------------------------------------- diff --git a/backends-common/elasticsearch/src/main/java/org/apache/james/backends/es/search/ScrollIterable.java b/backends-common/elasticsearch/src/main/java/org/apache/james/backends/es/search/ScrollIterable.java new file mode 100644 index 0000000..a43160f --- /dev/null +++ b/backends-common/elasticsearch/src/main/java/org/apache/james/backends/es/search/ScrollIterable.java @@ -0,0 +1,81 @@ +/**************************************************************** + * Licensed to the Apache Software Foundation (ASF) under one * + * or more contributor license agreements. See the NOTICE file * + * distributed with this work for additional information * + * regarding copyright ownership. The ASF licenses this file * + * to you under the Apache License, Version 2.0 (the * + * "License"); you may not use this file except in compliance * + * with the License. You may obtain a copy of the License at * + * * + * http://www.apache.org/licenses/LICENSE-2.0 * + * * + * Unless required by applicable law or agreed to in writing, * + * software distributed under the License is distributed on an * + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY * + * KIND, either express or implied. See the License for the * + * specific language governing permissions and limitations * + * under the License. * + ****************************************************************/ + +package org.apache.james.backends.es.search; + +import java.util.Iterator; +import java.util.stream.Stream; +import java.util.stream.StreamSupport; + +import org.elasticsearch.action.ListenableActionFuture; +import org.elasticsearch.action.search.SearchRequestBuilder; +import org.elasticsearch.action.search.SearchResponse; +import org.elasticsearch.client.Client; +import org.elasticsearch.common.unit.TimeValue; + +public class ScrollIterable implements Iterable<SearchResponse> { + + private static final TimeValue TIMEOUT = new TimeValue(60000); + private final Client client; + private final SearchRequestBuilder searchRequestBuilder; + + public ScrollIterable(Client client, SearchRequestBuilder searchRequestBuilder) { + this.client = client; + this.searchRequestBuilder = searchRequestBuilder; + } + + @Override + public Iterator<SearchResponse> iterator() { + return new ScrollIterator(client, searchRequestBuilder); + } + + public Stream<SearchResponse> stream() { + return StreamSupport.stream(spliterator(), false); + } + + public static class ScrollIterator implements Iterator<SearchResponse> { + + private final Client client; + private ListenableActionFuture<SearchResponse> searchResponseFuture; + + public ScrollIterator(Client client, SearchRequestBuilder searchRequestBuilder) { + this.client = client; + this.searchResponseFuture = searchRequestBuilder.execute(); + } + + @Override + public boolean hasNext() { + return !allSearchResponsesConsumed(searchResponseFuture.actionGet()); + } + + @Override + public SearchResponse next() { + SearchResponse result = searchResponseFuture.actionGet(); + searchResponseFuture = client.prepareSearchScroll(result.getScrollId()) + .setScroll(TIMEOUT) + .execute(); + return result; + } + + private boolean allSearchResponsesConsumed(SearchResponse searchResponse) { + return searchResponse.getHits().getHits().length == 0; + } + } + +} http://git-wip-us.apache.org/repos/asf/james-project/blob/1688bc6f/backends-common/elasticsearch/src/test/java/org/apache/james/backends/es/ElasticSearchIndexerTest.java ---------------------------------------------------------------------- diff --git a/backends-common/elasticsearch/src/test/java/org/apache/james/backends/es/ElasticSearchIndexerTest.java b/backends-common/elasticsearch/src/test/java/org/apache/james/backends/es/ElasticSearchIndexerTest.java new file mode 100644 index 0000000..cb42956 --- /dev/null +++ b/backends-common/elasticsearch/src/test/java/org/apache/james/backends/es/ElasticSearchIndexerTest.java @@ -0,0 +1,244 @@ +/**************************************************************** + * Licensed to the Apache Software Foundation (ASF) under one * + * or more contributor license agreements. See the NOTICE file * + * distributed with this work for additional information * + * regarding copyright ownership. The ASF licenses this file * + * to you under the Apache License, Version 2.0 (the * + * "License"); you may not use this file except in compliance * + * with the License. You may obtain a copy of the License at * + * * + * http://www.apache.org/licenses/LICENSE-2.0 * + * * + * Unless required by applicable law or agreed to in writing, * + * software distributed under the License is distributed on an * + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY * + * KIND, either express or implied. See the License for the * + * specific language governing permissions and limitations * + * under the License. * + ****************************************************************/ + +package org.apache.james.backends.es; + +import static org.assertj.core.api.Assertions.assertThat; +import static org.elasticsearch.index.query.QueryBuilders.termQuery; + +import java.io.IOException; +import java.util.concurrent.Executors; + +import org.apache.james.backends.es.utils.TestingClientProvider; +import org.elasticsearch.action.search.SearchResponse; +import org.elasticsearch.client.Client; +import org.elasticsearch.index.query.QueryBuilder; +import org.elasticsearch.index.query.QueryBuilders; +import org.elasticsearch.node.Node; +import org.junit.Before; +import org.junit.Rule; +import org.junit.Test; +import org.junit.rules.RuleChain; +import org.junit.rules.TemporaryFolder; + +import com.google.common.collect.Lists; + +public class ElasticSearchIndexerTest { + + private static final int MINIMUM_BATCH_SIZE = 1; + private static final String CONTENT = "content"; + private static final String INDEX_NAME = "index_name"; + private static final String TYPE_NAME = "type_name"; + private TemporaryFolder temporaryFolder = new TemporaryFolder(); + private EmbeddedElasticSearch embeddedElasticSearch= new EmbeddedElasticSearch(temporaryFolder, INDEX_NAME); + + @Rule + public RuleChain ruleChain = RuleChain.outerRule(temporaryFolder).around(embeddedElasticSearch); + + private Node node; + private ElasticSearchIndexer testee; + + @Before + public void setup() throws IOException { + node = embeddedElasticSearch.getNode(); + TestingClientProvider clientProvider = new TestingClientProvider(node); + DeleteByQueryPerformer deleteByQueryPerformer = new DeleteByQueryPerformer(clientProvider.get(), + Executors.newSingleThreadExecutor(), + MINIMUM_BATCH_SIZE, + INDEX_NAME, + TYPE_NAME) { + @Override + public void perform(QueryBuilder queryBuilder) { + doDeleteByQuery(queryBuilder); + } + }; + testee = new ElasticSearchIndexer(clientProvider.get(), deleteByQueryPerformer, INDEX_NAME, TYPE_NAME); + } + + @Test + public void indexMessageShouldWork() throws Exception { + String messageId = "1"; + String content = "{\"message\": \"trying out Elasticsearch\"}"; + + testee.indexMessage(messageId, content); + embeddedElasticSearch.awaitForElasticSearch(); + + try (Client client = node.client()) { + SearchResponse searchResponse = client.prepareSearch(INDEX_NAME) + .setTypes(TYPE_NAME) + .setQuery(QueryBuilders.matchQuery("message", "trying")) + .get(); + assertThat(searchResponse.getHits().getTotalHits()).isEqualTo(1); + } + } + + @Test(expected=IllegalArgumentException.class) + public void indexMessageShouldThrowWhenJsonIsNull() throws InterruptedException { + testee.indexMessage("1", null); + } + + @Test + public void updateMessages() throws Exception { + String messageId = "1"; + String content = "{\"message\": \"trying out Elasticsearch\",\"field\":\"Should be unchanged\"}"; + + testee.indexMessage(messageId, content); + embeddedElasticSearch.awaitForElasticSearch(); + + testee.updateMessages(Lists.newArrayList(new ElasticSearchIndexer.UpdatedRepresentation(messageId, "{\"message\": \"mastering out Elasticsearch\"}"))); + embeddedElasticSearch.awaitForElasticSearch(); + + try (Client client = node.client()) { + SearchResponse searchResponse = client.prepareSearch(INDEX_NAME) + .setTypes(TYPE_NAME) + .setQuery(QueryBuilders.matchQuery("message", "mastering")) + .get(); + assertThat(searchResponse.getHits().getTotalHits()).isEqualTo(1); + } + + try (Client client = node.client()) { + SearchResponse searchResponse = client.prepareSearch(INDEX_NAME) + .setTypes(TYPE_NAME) + .setQuery(QueryBuilders.matchQuery("field", "unchanged")) + .get(); + assertThat(searchResponse.getHits().getTotalHits()).isEqualTo(1); + } + } + + @Test(expected=IllegalArgumentException.class) + public void updateMessageShouldThrowWhenJsonIsNull() throws InterruptedException { + testee.updateMessages(Lists.newArrayList(new ElasticSearchIndexer.UpdatedRepresentation("1", null))); + } + + @Test(expected=IllegalArgumentException.class) + public void updateMessageShouldThrowWhenIdIsNull() throws InterruptedException { + testee.updateMessages(Lists.newArrayList(new ElasticSearchIndexer.UpdatedRepresentation(null, "{\"message\": \"mastering out Elasticsearch\"}"))); + } + + @Test(expected=IllegalArgumentException.class) + public void updateMessageShouldThrowWhenJsonIsEmpty() throws InterruptedException { + testee.updateMessages(Lists.newArrayList(new ElasticSearchIndexer.UpdatedRepresentation("1", ""))); + } + + @Test(expected=IllegalArgumentException.class) + public void updateMessageShouldThrowWhenIdIsEmpty() throws InterruptedException { + testee.updateMessages(Lists.newArrayList(new ElasticSearchIndexer.UpdatedRepresentation("", "{\"message\": \"mastering out Elasticsearch\"}"))); + } + + @Test + public void deleteByQueryShouldWorkOnSingleMessage() throws Exception { + String messageId = "1:2"; + String content = "{\"message\": \"trying out Elasticsearch\", \"property\":\"1\"}"; + + testee.indexMessage(messageId, content); + embeddedElasticSearch.awaitForElasticSearch(); + + testee.deleteAllMatchingQuery(termQuery("property", "1")); + embeddedElasticSearch.awaitForElasticSearch(); + + try (Client client = node.client()) { + SearchResponse searchResponse = client.prepareSearch(INDEX_NAME) + .setTypes(TYPE_NAME) + .setQuery(QueryBuilders.matchAllQuery()) + .get(); + assertThat(searchResponse.getHits().getTotalHits()).isEqualTo(0); + } + } + + @Test + public void deleteByQueryShouldWorkWhenMultipleMessages() throws Exception { + String messageId = "1:1"; + String content = "{\"message\": \"trying out Elasticsearch\", \"property\":\"1\"}"; + + testee.indexMessage(messageId, content); + + String messageId2 = "1:2"; + String content2 = "{\"message\": \"trying out Elasticsearch 2\", \"property\":\"1\"}"; + + testee.indexMessage(messageId2, content2); + + String messageId3 = "2:3"; + String content3 = "{\"message\": \"trying out Elasticsearch 3\", \"property\":\"2\"}"; + + testee.indexMessage(messageId3, content3); + embeddedElasticSearch.awaitForElasticSearch(); + + testee.deleteAllMatchingQuery(termQuery("property", "1")); + embeddedElasticSearch.awaitForElasticSearch(); + + try (Client client = node.client()) { + SearchResponse searchResponse = client.prepareSearch(INDEX_NAME) + .setTypes(TYPE_NAME) + .setQuery(QueryBuilders.matchAllQuery()) + .get(); + assertThat(searchResponse.getHits().getTotalHits()).isEqualTo(1); + } + } + + @Test + public void deleteMessage() throws Exception { + String messageId = "1:2"; + String content = "{\"message\": \"trying out Elasticsearch\"}"; + + testee.indexMessage(messageId, content); + embeddedElasticSearch.awaitForElasticSearch(); + + testee.deleteMessages(Lists.newArrayList(messageId)); + embeddedElasticSearch.awaitForElasticSearch(); + + try (Client client = node.client()) { + SearchResponse searchResponse = client.prepareSearch(INDEX_NAME) + .setTypes(TYPE_NAME) + .setQuery(QueryBuilders.matchAllQuery()) + .get(); + assertThat(searchResponse.getHits().getTotalHits()).isEqualTo(0); + } + } + + @Test + public void deleteShouldWorkWhenMultipleMessages() throws Exception { + String messageId = "1:1"; + String content = "{\"message\": \"trying out Elasticsearch\", \"mailboxId\":\"1\"}"; + + testee.indexMessage(messageId, content); + + String messageId2 = "1:2"; + String content2 = "{\"message\": \"trying out Elasticsearch 2\", \"mailboxId\":\"1\"}"; + + testee.indexMessage(messageId2, content2); + + String messageId3 = "2:3"; + String content3 = "{\"message\": \"trying out Elasticsearch 3\", \"mailboxId\":\"2\"}"; + + testee.indexMessage(messageId3, content3); + embeddedElasticSearch.awaitForElasticSearch(); + + testee.deleteMessages(Lists.newArrayList(messageId, messageId3)); + embeddedElasticSearch.awaitForElasticSearch(); + + try (Client client = node.client()) { + SearchResponse searchResponse = client.prepareSearch(INDEX_NAME) + .setTypes(TYPE_NAME) + .setQuery(QueryBuilders.matchAllQuery()) + .get(); + assertThat(searchResponse.getHits().getTotalHits()).isEqualTo(1); + } + } + +} http://git-wip-us.apache.org/repos/asf/james-project/blob/1688bc6f/backends-common/elasticsearch/src/test/java/org/apache/james/backends/es/EmbeddedElasticSearch.java ---------------------------------------------------------------------- diff --git a/backends-common/elasticsearch/src/test/java/org/apache/james/backends/es/EmbeddedElasticSearch.java b/backends-common/elasticsearch/src/test/java/org/apache/james/backends/es/EmbeddedElasticSearch.java new file mode 100644 index 0000000..2902cde --- /dev/null +++ b/backends-common/elasticsearch/src/test/java/org/apache/james/backends/es/EmbeddedElasticSearch.java @@ -0,0 +1,118 @@ +/**************************************************************** + * Licensed to the Apache Software Foundation (ASF) under one * + * or more contributor license agreements. See the NOTICE file * + * distributed with this work for additional information * + * regarding copyright ownership. The ASF licenses this file * + * to you under the Apache License, Version 2.0 (the * + * "License"); you may not use this file except in compliance * + * with the License. You may obtain a copy of the License at * + * * + * http://www.apache.org/licenses/LICENSE-2.0 * + * * + * Unless required by applicable law or agreed to in writing, * + * software distributed under the License is distributed on an * + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY * + * KIND, either express or implied. See the License for the * + * specific language governing permissions and limitations * + * under the License. * + ****************************************************************/ + +package org.apache.james.backends.es; + +import static com.jayway.awaitility.Awaitility.await; +import static org.elasticsearch.node.NodeBuilder.nodeBuilder; + +import java.io.IOException; +import java.nio.file.Path; +import java.util.function.Supplier; + +import org.elasticsearch.action.admin.indices.delete.DeleteIndexRequest; +import org.elasticsearch.action.admin.indices.flush.FlushAction; +import org.elasticsearch.action.admin.indices.flush.FlushRequestBuilder; +import org.elasticsearch.client.Client; +import org.elasticsearch.common.settings.Settings; +import org.elasticsearch.node.Node; +import org.junit.rules.ExternalResource; +import org.junit.rules.TemporaryFolder; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import com.google.common.base.Throwables; +import com.jayway.awaitility.Duration; + +public class EmbeddedElasticSearch extends ExternalResource { + + private static final Logger LOGGER = LoggerFactory.getLogger(EmbeddedElasticSearch.class); + + private final Supplier<Path> folder; + private final String indexName; + private Node node; + + private static Path createTempDir(TemporaryFolder temporaryFolder) { + try { + return temporaryFolder.newFolder().toPath(); + } catch (IOException e) { + throw Throwables.propagate(e); + } + } + + public EmbeddedElasticSearch(TemporaryFolder temporaryFolder, String indexName) { + this(() -> EmbeddedElasticSearch.createTempDir(temporaryFolder), indexName); + } + + public EmbeddedElasticSearch(Path folder, String indexName) { + this(() -> folder, indexName); + } + + private EmbeddedElasticSearch(Supplier<Path> folder, String indexName) { + this.folder = folder; + this.indexName = indexName; + } + + @Override + public void before() throws IOException { + node = nodeBuilder().local(true) + .settings(Settings.builder() + .put("path.home", folder.get().toAbsolutePath()) + .build()) + .node(); + node.start(); + awaitForElasticSearch(); + } + + @Override + public void after() { + awaitForElasticSearch(); + try (Client client = node.client()) { + client.admin() + .indices() + .delete(new DeleteIndexRequest(indexName)) + .actionGet(); + } catch (Exception e) { + LOGGER.warn("Error while closing ES connection", e); + } + node.close(); + } + + public Node getNode() { + return node; + } + + /** + * Sometimes, tests are too fast. + * This method ensure that ElasticSearch service is up and indices are updated + */ + public void awaitForElasticSearch() { + await().atMost(Duration.TEN_SECONDS).until(this::flush); + } + + private boolean flush() { + try (Client client = node.client()) { + new FlushRequestBuilder(client, FlushAction.INSTANCE).setForce(true).get(); + return true; + } catch (Exception e) { + return false; + } + } + +} http://git-wip-us.apache.org/repos/asf/james-project/blob/1688bc6f/backends-common/elasticsearch/src/test/java/org/apache/james/backends/es/search/ScrollIterableTest.java ---------------------------------------------------------------------- diff --git a/backends-common/elasticsearch/src/test/java/org/apache/james/backends/es/search/ScrollIterableTest.java b/backends-common/elasticsearch/src/test/java/org/apache/james/backends/es/search/ScrollIterableTest.java new file mode 100644 index 0000000..5b53a76 --- /dev/null +++ b/backends-common/elasticsearch/src/test/java/org/apache/james/backends/es/search/ScrollIterableTest.java @@ -0,0 +1,174 @@ +/**************************************************************** + * Licensed to the Apache Software Foundation (ASF) under one * + * or more contributor license agreements. See the NOTICE file * + * distributed with this work for additional information * + * regarding copyright ownership. The ASF licenses this file * + * to you under the Apache License, Version 2.0 (the * + * "License"); you may not use this file except in compliance * + * with the License. You may obtain a copy of the License at * + * * + * http://www.apache.org/licenses/LICENSE-2.0 * + * * + * Unless required by applicable law or agreed to in writing, * + * software distributed under the License is distributed on an * + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY * + * KIND, either express or implied. See the License for the * + * specific language governing permissions and limitations * + * under the License. * + ****************************************************************/ + +package org.apache.james.backends.es.search; + +import static org.assertj.core.api.Assertions.assertThat; +import static org.elasticsearch.common.xcontent.XContentFactory.jsonBuilder; +import static org.elasticsearch.index.query.QueryBuilders.matchAllQuery; + +import java.io.IOException; +import java.util.Arrays; +import java.util.List; +import java.util.stream.Collectors; + +import org.apache.james.backends.es.ClientProvider; +import org.apache.james.backends.es.EmbeddedElasticSearch; +import org.apache.james.backends.es.IndexCreationFactory; +import org.apache.james.backends.es.NodeMappingFactory; +import org.apache.james.backends.es.utils.TestingClientProvider; +import org.elasticsearch.action.search.SearchRequestBuilder; +import org.elasticsearch.client.Client; +import org.elasticsearch.common.unit.TimeValue; +import org.elasticsearch.common.xcontent.XContentBuilder; +import org.elasticsearch.search.SearchHit; +import org.junit.Before; +import org.junit.Rule; +import org.junit.Test; +import org.junit.rules.RuleChain; +import org.junit.rules.TemporaryFolder; + +public class ScrollIterableTest { + + public static final TimeValue TIMEOUT = new TimeValue(6000); + public static final int SIZE = 2; + public static final String MESSAGE = "message"; + public static final String INDEX_NAME = "index"; + public static final String MESSAGES = "messages"; + + private TemporaryFolder temporaryFolder = new TemporaryFolder(); + private EmbeddedElasticSearch embeddedElasticSearch= new EmbeddedElasticSearch(temporaryFolder, INDEX_NAME); + + @Rule + public RuleChain ruleChain = RuleChain.outerRule(temporaryFolder).around(embeddedElasticSearch); + + private ClientProvider clientProvider; + + @Before + public void setUp() throws Exception { + clientProvider = new TestingClientProvider(embeddedElasticSearch.getNode()); + IndexCreationFactory.createIndex(clientProvider.get(), INDEX_NAME); + embeddedElasticSearch.awaitForElasticSearch(); + NodeMappingFactory.applyMapping(clientProvider.get(), INDEX_NAME, MESSAGES, getMappingsSources()); + } + + private XContentBuilder getMappingsSources() throws IOException { + return jsonBuilder() + .startObject() + .startObject(MESSAGES) + .startObject(NodeMappingFactory.PROPERTIES) + .startObject(MESSAGE) + .field(NodeMappingFactory.TYPE, NodeMappingFactory.STRING) + .endObject() + .endObject() + .endObject() + .endObject(); + } + + @Test + public void scrollIterableShouldWorkWhenEmpty() { + try (Client client = clientProvider.get()) { + SearchRequestBuilder searchRequestBuilder = client.prepareSearch(INDEX_NAME) + .setTypes(MESSAGES) + .setScroll(TIMEOUT) + .setQuery(matchAllQuery()) + .setSize(SIZE); + assertThat(new ScrollIterable(client, searchRequestBuilder)).isEmpty(); + } + } + + @Test + public void scrollIterableShouldWorkWhenOneElement() { + try (Client client = clientProvider.get()) { + String id = "1"; + client.prepareIndex(INDEX_NAME, MESSAGES, id) + .setSource(MESSAGE, "Sample message") + .execute(); + + embeddedElasticSearch.awaitForElasticSearch(); + + SearchRequestBuilder searchRequestBuilder = client.prepareSearch(INDEX_NAME) + .setTypes(MESSAGES) + .setScroll(TIMEOUT) + .setQuery(matchAllQuery()) + .setSize(SIZE); + assertThat(convertToIdList(new ScrollIterable(client, searchRequestBuilder))).containsOnly(id); + } + } + + @Test + public void scrollIterableShouldWorkWhenSizeElement() { + try (Client client = clientProvider.get()) { + String id1 = "1"; + client.prepareIndex(INDEX_NAME, MESSAGES, id1) + .setSource(MESSAGE, "Sample message") + .execute(); + + String id2 = "2"; + client.prepareIndex(INDEX_NAME, MESSAGES, id2) + .setSource(MESSAGE, "Sample message") + .execute(); + + embeddedElasticSearch.awaitForElasticSearch(); + + SearchRequestBuilder searchRequestBuilder = client.prepareSearch(INDEX_NAME) + .setTypes(MESSAGES) + .setScroll(TIMEOUT) + .setQuery(matchAllQuery()) + .setSize(SIZE); + assertThat(convertToIdList(new ScrollIterable(client, searchRequestBuilder))).containsOnly(id1, id2); + } + } + + @Test + public void scrollIterableShouldWorkWhenMoreThanSizeElement() { + try (Client client = clientProvider.get()) { + String id1 = "1"; + client.prepareIndex(INDEX_NAME, MESSAGES, id1) + .setSource(MESSAGE, "Sample message") + .execute(); + + String id2 = "2"; + client.prepareIndex(INDEX_NAME, MESSAGES, id2) + .setSource(MESSAGE, "Sample message") + .execute(); + + String id3 = "3"; + client.prepareIndex(INDEX_NAME, MESSAGES, id3) + .setSource(MESSAGE, "Sample message") + .execute(); + + embeddedElasticSearch.awaitForElasticSearch(); + + SearchRequestBuilder searchRequestBuilder = client.prepareSearch(INDEX_NAME) + .setTypes(MESSAGES) + .setScroll(TIMEOUT) + .setQuery(matchAllQuery()) + .setSize(SIZE); + assertThat(convertToIdList(new ScrollIterable(client, searchRequestBuilder))).containsOnly(id1, id2, id3); + } + } + + private List<String> convertToIdList(ScrollIterable scrollIterable) { + return scrollIterable.stream() + .flatMap(searchResponse -> Arrays.stream(searchResponse.getHits().getHits())) + .map(SearchHit::getId) + .collect(Collectors.toList()); + } +} http://git-wip-us.apache.org/repos/asf/james-project/blob/1688bc6f/backends-common/elasticsearch/src/test/java/org/apache/james/backends/es/utils/TestingClientProvider.java ---------------------------------------------------------------------- diff --git a/backends-common/elasticsearch/src/test/java/org/apache/james/backends/es/utils/TestingClientProvider.java b/backends-common/elasticsearch/src/test/java/org/apache/james/backends/es/utils/TestingClientProvider.java new file mode 100644 index 0000000..2de4fe5 --- /dev/null +++ b/backends-common/elasticsearch/src/test/java/org/apache/james/backends/es/utils/TestingClientProvider.java @@ -0,0 +1,37 @@ +/**************************************************************** + * Licensed to the Apache Software Foundation (ASF) under one * + * or more contributor license agreements. See the NOTICE file * + * distributed with this work for additional information * + * regarding copyright ownership. The ASF licenses this file * + * to you under the Apache License, Version 2.0 (the * + * "License"); you may not use this file except in compliance * + * with the License. You may obtain a copy of the License at * + * * + * http://www.apache.org/licenses/LICENSE-2.0 * + * * + * Unless required by applicable law or agreed to in writing, * + * software distributed under the License is distributed on an * + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY * + * KIND, either express or implied. See the License for the * + * specific language governing permissions and limitations * + * under the License. * + ****************************************************************/ +package org.apache.james.backends.es.utils; + +import org.apache.james.backends.es.ClientProvider; +import org.elasticsearch.client.Client; +import org.elasticsearch.node.Node; + +public class TestingClientProvider implements ClientProvider { + + private final Node node; + + public TestingClientProvider(Node node) { + this.node = node; + } + + @Override + public Client get() { + return node.client(); + } +} http://git-wip-us.apache.org/repos/asf/james-project/blob/1688bc6f/backends-common/pom.xml ---------------------------------------------------------------------- diff --git a/backends-common/pom.xml b/backends-common/pom.xml index 34df193..aaaaf3e 100644 --- a/backends-common/pom.xml +++ b/backends-common/pom.xml @@ -35,11 +35,14 @@ <properties> <assertj-3.version>3.2.0</assertj-3.version> + <junit.version>4.11</junit.version> + <slf4j.version>1.7.7</slf4j.version> </properties> <modules> <module>cassandra</module> <module>jpa</module> + <module>elasticsearch</module> </modules> <dependencyManagement> @@ -59,6 +62,16 @@ <artifactId>javax.inject</artifactId> <version>1</version> </dependency> + <dependency> + <groupId>junit</groupId> + <artifactId>junit</artifactId> + <version>${junit.version}</version> + </dependency> + <dependency> + <groupId>org.slf4j</groupId> + <artifactId>slf4j-api</artifactId> + <version>${slf4j.version}</version> + </dependency> </dependencies> </dependencyManagement> http://git-wip-us.apache.org/repos/asf/james-project/blob/1688bc6f/mailbox/elasticsearch/pom.xml ---------------------------------------------------------------------- diff --git a/mailbox/elasticsearch/pom.xml b/mailbox/elasticsearch/pom.xml index bfe0c5b..dd2698d 100644 --- a/mailbox/elasticsearch/pom.xml +++ b/mailbox/elasticsearch/pom.xml @@ -159,6 +159,16 @@ </build> <dependencies> <dependency> + <groupId>org.apache.james</groupId> + <artifactId>apache-james-backends-es</artifactId> + </dependency> + <dependency> + <groupId>org.apache.james</groupId> + <artifactId>apache-james-backends-es</artifactId> + <type>test-jar</type> + <scope>test</scope> + </dependency> + <dependency> <groupId>${project.groupId}</groupId> <artifactId>apache-james-mailbox-api</artifactId> </dependency> @@ -205,11 +215,6 @@ <artifactId>guava</artifactId> </dependency> <dependency> - <groupId>com.jayway.awaitility</groupId> - <artifactId>awaitility</artifactId> - <version>1.6.3</version> - </dependency> - <dependency> <groupId>javax.inject</groupId> <artifactId>javax.inject</artifactId> </dependency> http://git-wip-us.apache.org/repos/asf/james-project/blob/1688bc6f/mailbox/elasticsearch/src/main/java/org/apache/james/mailbox/elasticsearch/ClientProvider.java ---------------------------------------------------------------------- diff --git a/mailbox/elasticsearch/src/main/java/org/apache/james/mailbox/elasticsearch/ClientProvider.java b/mailbox/elasticsearch/src/main/java/org/apache/james/mailbox/elasticsearch/ClientProvider.java deleted file mode 100644 index cd512e5..0000000 --- a/mailbox/elasticsearch/src/main/java/org/apache/james/mailbox/elasticsearch/ClientProvider.java +++ /dev/null @@ -1,26 +0,0 @@ -/**************************************************************** - * Licensed to the Apache Software Foundation (ASF) under one * - * or more contributor license agreements. See the NOTICE file * - * distributed with this work for additional information * - * regarding copyright ownership. The ASF licenses this file * - * to you under the Apache License, Version 2.0 (the * - * "License"); you may not use this file except in compliance * - * with the License. You may obtain a copy of the License at * - * * - * http://www.apache.org/licenses/LICENSE-2.0 * - * * - * Unless required by applicable law or agreed to in writing, * - * software distributed under the License is distributed on an * - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY * - * KIND, either express or implied. See the License for the * - * specific language governing permissions and limitations * - * under the License. * - ****************************************************************/ -package org.apache.james.mailbox.elasticsearch; - -import org.elasticsearch.client.Client; - -public interface ClientProvider { - - Client get(); -} http://git-wip-us.apache.org/repos/asf/james-project/blob/1688bc6f/mailbox/elasticsearch/src/main/java/org/apache/james/mailbox/elasticsearch/ClientProviderImpl.java ---------------------------------------------------------------------- diff --git a/mailbox/elasticsearch/src/main/java/org/apache/james/mailbox/elasticsearch/ClientProviderImpl.java b/mailbox/elasticsearch/src/main/java/org/apache/james/mailbox/elasticsearch/ClientProviderImpl.java deleted file mode 100644 index 3e088af..0000000 --- a/mailbox/elasticsearch/src/main/java/org/apache/james/mailbox/elasticsearch/ClientProviderImpl.java +++ /dev/null @@ -1,48 +0,0 @@ -/**************************************************************** - * Licensed to the Apache Software Foundation (ASF) under one * - * or more contributor license agreements. See the NOTICE file * - * distributed with this work for additional information * - * regarding copyright ownership. The ASF licenses this file * - * to you under the Apache License, Version 2.0 (the * - * "License"); you may not use this file except in compliance * - * with the License. You may obtain a copy of the License at * - * * - * http://www.apache.org/licenses/LICENSE-2.0 * - * * - * Unless required by applicable law or agreed to in writing, * - * software distributed under the License is distributed on an * - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY * - * KIND, either express or implied. See the License for the * - * specific language governing permissions and limitations * - * under the License. * - ****************************************************************/ -package org.apache.james.mailbox.elasticsearch; - -import java.net.InetAddress; -import java.net.UnknownHostException; - -import org.elasticsearch.client.Client; -import org.elasticsearch.client.transport.TransportClient; -import org.elasticsearch.common.transport.InetSocketTransportAddress; - -import com.google.common.base.Throwables; - -public class ClientProviderImpl implements ClientProvider { - - private final String host; - private final int port; - - public ClientProviderImpl(String host, int port) { - this.host = host; - this.port = port; - } - - public Client get() { - try { - return TransportClient.builder().build() - .addTransportAddress(new InetSocketTransportAddress(InetAddress.getByName(host), port)); - } catch (UnknownHostException e) { - throw Throwables.propagate(e); - } - } -} http://git-wip-us.apache.org/repos/asf/james-project/blob/1688bc6f/mailbox/elasticsearch/src/main/java/org/apache/james/mailbox/elasticsearch/DeleteByQueryPerformer.java ---------------------------------------------------------------------- diff --git a/mailbox/elasticsearch/src/main/java/org/apache/james/mailbox/elasticsearch/DeleteByQueryPerformer.java b/mailbox/elasticsearch/src/main/java/org/apache/james/mailbox/elasticsearch/DeleteByQueryPerformer.java deleted file mode 100644 index be81654..0000000 --- a/mailbox/elasticsearch/src/main/java/org/apache/james/mailbox/elasticsearch/DeleteByQueryPerformer.java +++ /dev/null @@ -1,86 +0,0 @@ -/**************************************************************** - * Licensed to the Apache Software Foundation (ASF) under one * - * or more contributor license agreements. See the NOTICE file * - * distributed with this work for additional information * - * regarding copyright ownership. The ASF licenses this file * - * to you under the Apache License, Version 2.0 (the * - * "License"); you may not use this file except in compliance * - * with the License. You may obtain a copy of the License at * - * * - * http://www.apache.org/licenses/LICENSE-2.0 * - * * - * Unless required by applicable law or agreed to in writing, * - * software distributed under the License is distributed on an * - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY * - * KIND, either express or implied. See the License for the * - * specific language governing permissions and limitations * - * under the License. * - ****************************************************************/ - -package org.apache.james.mailbox.elasticsearch; - -import java.util.concurrent.ExecutorService; - -import javax.inject.Inject; -import javax.inject.Named; - -import org.apache.james.mailbox.elasticsearch.search.ScrollIterable; -import org.elasticsearch.action.ListenableActionFuture; -import org.elasticsearch.action.bulk.BulkRequestBuilder; -import org.elasticsearch.action.bulk.BulkResponse; -import org.elasticsearch.action.search.SearchResponse; -import org.elasticsearch.client.Client; -import org.elasticsearch.common.unit.TimeValue; -import org.elasticsearch.index.query.QueryBuilder; -import org.elasticsearch.search.SearchHit; - -import com.google.common.annotations.VisibleForTesting; - -public class DeleteByQueryPerformer { - public static final int DEFAULT_BATCH_SIZE = 100; - public static final TimeValue TIMEOUT = new TimeValue(60000); - - private final Client client; - private final ExecutorService executor; - private final int batchSize; - - @Inject - public DeleteByQueryPerformer(Client client, @Named("AsyncExecutor") ExecutorService executor) { - this(client, executor, DEFAULT_BATCH_SIZE); - } - - @VisibleForTesting - DeleteByQueryPerformer(Client client, @Named("AsyncExecutor") ExecutorService executor, int batchSize) { - this.client = client; - this.executor = executor; - this.batchSize = batchSize; - } - - public void perform(QueryBuilder queryBuilder) { - executor.execute(() -> doDeleteByQuery(queryBuilder)); - } - - protected void doDeleteByQuery(QueryBuilder queryBuilder) { - new ScrollIterable(client, - client.prepareSearch(ElasticSearchIndexer.MAILBOX_INDEX) - .setTypes(ElasticSearchIndexer.MESSAGE_TYPE) - .setScroll(TIMEOUT) - .setNoFields() - .setQuery(queryBuilder) - .setSize(batchSize)) - .stream() - .forEach(searchResponse -> deleteRetrievedIds(client, searchResponse)); - } - - private ListenableActionFuture<BulkResponse> deleteRetrievedIds(Client client, SearchResponse searchResponse) { - BulkRequestBuilder bulkRequestBuilder = client.prepareBulk(); - for (SearchHit hit : searchResponse.getHits()) { - bulkRequestBuilder.add(client.prepareDelete() - .setIndex(ElasticSearchIndexer.MAILBOX_INDEX) - .setType(ElasticSearchIndexer.MESSAGE_TYPE) - .setId(hit.getId())); - } - return bulkRequestBuilder.execute(); - } - -} http://git-wip-us.apache.org/repos/asf/james-project/blob/1688bc6f/mailbox/elasticsearch/src/main/java/org/apache/james/mailbox/elasticsearch/ElasticSearchIndexer.java ---------------------------------------------------------------------- diff --git a/mailbox/elasticsearch/src/main/java/org/apache/james/mailbox/elasticsearch/ElasticSearchIndexer.java b/mailbox/elasticsearch/src/main/java/org/apache/james/mailbox/elasticsearch/ElasticSearchIndexer.java deleted file mode 100644 index 523fa22..0000000 --- a/mailbox/elasticsearch/src/main/java/org/apache/james/mailbox/elasticsearch/ElasticSearchIndexer.java +++ /dev/null @@ -1,100 +0,0 @@ -/**************************************************************** - * Licensed to the Apache Software Foundation (ASF) under one * - * or more contributor license agreements. See the NOTICE file * - * distributed with this work for additional information * - * regarding copyright ownership. The ASF licenses this file * - * to you under the Apache License, Version 2.0 (the * - * "License"); you may not use this file except in compliance * - * with the License. You may obtain a copy of the License at * - * * - * http://www.apache.org/licenses/LICENSE-2.0 * - * * - * Unless required by applicable law or agreed to in writing, * - * software distributed under the License is distributed on an * - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY * - * KIND, either express or implied. See the License for the * - * specific language governing permissions and limitations * - * under the License. * - ****************************************************************/ -package org.apache.james.mailbox.elasticsearch; - -import java.util.List; - -import javax.inject.Inject; - -import org.elasticsearch.action.bulk.BulkRequestBuilder; -import org.elasticsearch.action.bulk.BulkResponse; -import org.elasticsearch.action.index.IndexResponse; -import org.elasticsearch.client.Client; -import org.elasticsearch.common.Strings; -import org.elasticsearch.index.query.QueryBuilder; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; - -import com.google.common.base.Preconditions; - -public class ElasticSearchIndexer { - - public static class UpdatedRepresentation { - private final String id; - private final String updatedDocumentPart; - - public UpdatedRepresentation(String id, String updatedDocumentPart) { - Preconditions.checkArgument(!Strings.isNullOrEmpty(id), "Updated id must be specified " + id); - Preconditions.checkArgument(!Strings.isNullOrEmpty(updatedDocumentPart), "Updated document must be specified"); - this.id = id; - this.updatedDocumentPart = updatedDocumentPart; - } - - public String getId() { - return id; - } - - public String getUpdatedDocumentPart() { - return updatedDocumentPart; - } - } - - private static final Logger LOGGER = LoggerFactory.getLogger(ElasticSearchIndexer.class); - public static final String MAILBOX_INDEX = "mailbox"; - public static final String MESSAGE_TYPE = "message"; - - private final Client client; - private final DeleteByQueryPerformer deleteByQueryPerformer; - - @Inject - public ElasticSearchIndexer(Client client, DeleteByQueryPerformer deleteByQueryPerformer) { - this.client = client; - this.deleteByQueryPerformer = deleteByQueryPerformer; - } - - public IndexResponse indexMessage(String id, String content) { - checkArgument(content); - LOGGER.debug(String.format("Indexing %s: %s", id, content)); - return client.prepareIndex(MAILBOX_INDEX, MESSAGE_TYPE, id) - .setSource(content) - .get(); - } - - public BulkResponse updateMessages(List<UpdatedRepresentation> updatedDocumentParts) { - Preconditions.checkNotNull(updatedDocumentParts); - BulkRequestBuilder bulkRequestBuilder = client.prepareBulk(); - updatedDocumentParts.forEach(updatedDocumentPart -> bulkRequestBuilder.add(client.prepareUpdate(MAILBOX_INDEX, MESSAGE_TYPE, updatedDocumentPart.getId()) - .setDoc(updatedDocumentPart.getUpdatedDocumentPart()))); - return bulkRequestBuilder.get(); - } - - public BulkResponse deleteMessages(List<String> ids) { - BulkRequestBuilder bulkRequestBuilder = client.prepareBulk(); - ids.forEach(id -> bulkRequestBuilder.add(client.prepareDelete(MAILBOX_INDEX, MESSAGE_TYPE, id))); - return bulkRequestBuilder.get(); - } - - public void deleteAllMatchingQuery(QueryBuilder queryBuilder) { - deleteByQueryPerformer.perform(queryBuilder); - } - - private void checkArgument(String content) { - Preconditions.checkArgument(content != null, "content should be provided"); - } -} http://git-wip-us.apache.org/repos/asf/james-project/blob/1688bc6f/mailbox/elasticsearch/src/main/java/org/apache/james/mailbox/elasticsearch/IndexCreationFactory.java ---------------------------------------------------------------------- diff --git a/mailbox/elasticsearch/src/main/java/org/apache/james/mailbox/elasticsearch/IndexCreationFactory.java b/mailbox/elasticsearch/src/main/java/org/apache/james/mailbox/elasticsearch/IndexCreationFactory.java deleted file mode 100644 index c4a8060..0000000 --- a/mailbox/elasticsearch/src/main/java/org/apache/james/mailbox/elasticsearch/IndexCreationFactory.java +++ /dev/null @@ -1,84 +0,0 @@ -/**************************************************************** - * Licensed to the Apache Software Foundation (ASF) under one * - * or more contributor license agreements. See the NOTICE file * - * distributed with this work for additional information * - * regarding copyright ownership. The ASF licenses this file * - * to you under the Apache License, Version 2.0 (the * - * "License"); you may not use this file except in compliance * - * with the License. You may obtain a copy of the License at * - * * - * http://www.apache.org/licenses/LICENSE-2.0 * - * * - * Unless required by applicable law or agreed to in writing, * - * software distributed under the License is distributed on an * - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY * - * KIND, either express or implied. See the License for the * - * specific language governing permissions and limitations * - * under the License. * - ****************************************************************/ - -package org.apache.james.mailbox.elasticsearch; - -import static org.elasticsearch.common.xcontent.XContentFactory.jsonBuilder; - -import java.io.IOException; - -import org.elasticsearch.client.Client; -import org.elasticsearch.common.xcontent.XContentBuilder; -import org.elasticsearch.indices.IndexAlreadyExistsException; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; - -public class IndexCreationFactory { - - private static final Logger LOGGER = LoggerFactory.getLogger(IndexCreationFactory.class); - private static final int DEFAULT_NB_SHARDS = 1; - private static final int DEFAULT_NB_REPLICA = 0; - public static final String CASE_INSENSITIVE = "case_insensitive"; - - public static Client createIndex(Client client, int nbShards, int nbReplica) { - try { - return createIndex(client, generateSetting(nbShards, nbReplica)); - } catch (IOException e) { - LOGGER.error("Error while creating index : ", e); - return client; - } - } - - public static Client createIndex(Client client) { - return createIndex(client, DEFAULT_NB_SHARDS, DEFAULT_NB_REPLICA); - } - - private static Client createIndex(Client client, XContentBuilder settings) { - try { - client.admin() - .indices() - .prepareCreate(ElasticSearchIndexer.MAILBOX_INDEX) - .setSettings(settings) - .execute() - .actionGet(); - } catch (IndexAlreadyExistsException exception) { - LOGGER.info("Index [" + ElasticSearchIndexer.MAILBOX_INDEX + "] already exist"); - } - return client; - } - - private static XContentBuilder generateSetting(int nbShards, int nbReplica) throws IOException { - return jsonBuilder() - .startObject() - .field("number_of_shards", nbShards) - .field("number_of_replicas", nbReplica) - .startObject("analysis") - .startObject("analyzer") - .startObject(CASE_INSENSITIVE) - .field("tokenizer", "keyword") - .startArray("filter") - .value("lowercase") - .endArray() - .endObject() - .endObject() - .endObject() - .endObject(); - } - -} http://git-wip-us.apache.org/repos/asf/james-project/blob/1688bc6f/mailbox/elasticsearch/src/main/java/org/apache/james/mailbox/elasticsearch/MailboxElasticsearchConstants.java ---------------------------------------------------------------------- diff --git a/mailbox/elasticsearch/src/main/java/org/apache/james/mailbox/elasticsearch/MailboxElasticsearchConstants.java b/mailbox/elasticsearch/src/main/java/org/apache/james/mailbox/elasticsearch/MailboxElasticsearchConstants.java new file mode 100644 index 0000000..4977950 --- /dev/null +++ b/mailbox/elasticsearch/src/main/java/org/apache/james/mailbox/elasticsearch/MailboxElasticsearchConstants.java @@ -0,0 +1,25 @@ +/**************************************************************** + * Licensed to the Apache Software Foundation (ASF) under one * + * or more contributor license agreements. See the NOTICE file * + * distributed with this work for additional information * + * regarding copyright ownership. The ASF licenses this file * + * to you under the Apache License, Version 2.0 (the * + * "License"); you may not use this file except in compliance * + * with the License. You may obtain a copy of the License at * + * * + * http://www.apache.org/licenses/LICENSE-2.0 * + * * + * Unless required by applicable law or agreed to in writing, * + * software distributed under the License is distributed on an * + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY * + * KIND, either express or implied. See the License for the * + * specific language governing permissions and limitations * + * under the License. * + ****************************************************************/ + +package org.apache.james.mailbox.elasticsearch; + +public interface MailboxElasticsearchConstants { + String MAILBOX_INDEX = "mailbox"; + String MESSAGE_TYPE = "message"; +} --------------------------------------------------------------------- To unsubscribe, e-mail: [email protected] For additional commands, e-mail: [email protected]
