Repository: camel Updated Branches: refs/heads/master a2bb3c905 -> 746e42634
CAMEL-8767 camel-elasticsearch - ElasticsearchConfiguration should be a plain pojo Project: http://git-wip-us.apache.org/repos/asf/camel/repo Commit: http://git-wip-us.apache.org/repos/asf/camel/commit/461e15f9 Tree: http://git-wip-us.apache.org/repos/asf/camel/tree/461e15f9 Diff: http://git-wip-us.apache.org/repos/asf/camel/diff/461e15f9 Branch: refs/heads/master Commit: 461e15f91c48837a02095f1a76d7a499c25c038a Parents: a2bb3c9 Author: Andrea Cosentino <anco...@gmail.com> Authored: Tue May 12 22:58:20 2015 +0200 Committer: Andrea Cosentino <anco...@gmail.com> Committed: Tue May 12 22:58:20 2015 +0200 ---------------------------------------------------------------------- .../elasticsearch/ElasticsearchComponent.java | 48 +- .../ElasticsearchConfiguration.java | 438 ++++++++----------- .../elasticsearch/ElasticsearchConstants.java | 48 ++ .../elasticsearch/ElasticsearchEndpoint.java | 13 +- .../elasticsearch/ElasticsearchProducer.java | 54 +-- .../ElasticsearchActionRequestConverter.java | 23 +- .../ElasticsearchComponentTest.java | 58 +-- .../ElasticsearchConfigurationTest.java | 204 --------- .../elasticsearch/SpringElasticsearchTest.java | 6 +- 9 files changed, 346 insertions(+), 546 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/camel/blob/461e15f9/components/camel-elasticsearch/src/main/java/org/apache/camel/component/elasticsearch/ElasticsearchComponent.java ---------------------------------------------------------------------- diff --git a/components/camel-elasticsearch/src/main/java/org/apache/camel/component/elasticsearch/ElasticsearchComponent.java b/components/camel-elasticsearch/src/main/java/org/apache/camel/component/elasticsearch/ElasticsearchComponent.java index 66423a0..2ccd896 100644 --- a/components/camel-elasticsearch/src/main/java/org/apache/camel/component/elasticsearch/ElasticsearchComponent.java +++ b/components/camel-elasticsearch/src/main/java/org/apache/camel/component/elasticsearch/ElasticsearchComponent.java @@ -16,11 +16,15 @@ */ package org.apache.camel.component.elasticsearch; +import java.util.ArrayList; +import java.util.Arrays; +import java.util.List; import java.util.Map; import org.apache.camel.CamelContext; import org.apache.camel.Endpoint; import org.apache.camel.impl.UriEndpointComponent; +import org.elasticsearch.common.transport.InetSocketTransportAddress; /** * Represents the component that manages {@link ElasticsearchEndpoint}. @@ -36,8 +40,48 @@ public class ElasticsearchComponent extends UriEndpointComponent { } protected Endpoint createEndpoint(String uri, String remaining, Map<String, Object> parameters) throws Exception { - Endpoint endpoint = new ElasticsearchEndpoint(uri, this, parameters); - setProperties(endpoint, parameters); + ElasticsearchConfiguration config = new ElasticsearchConfiguration(); + setProperties(config, parameters); + if (ElasticsearchConstants.LOCAL_NAME.equals(remaining)) { + config.setLocal(true); + config.setClusterName(null); + } else { + config.setLocal(false); + config.setClusterName(remaining); + } + + if (config.getData() == null) { + config.setData(config.isLocal()); + } + + if (config.isLocal() && !config.getData()) { + throw new IllegalArgumentException("invalid to use local node without data"); + } + + config.setTransportAddressesList(parseTransportAddresses(config.getTransportAddresses(), config)); + + Endpoint endpoint = new ElasticsearchEndpoint(uri, this, config); + return endpoint; } + + private List<InetSocketTransportAddress> parseTransportAddresses(String ipsString, ElasticsearchConfiguration config) { + if (ipsString == null || ipsString.isEmpty()) { + return null; + } + List<String> addressesStr = Arrays.asList(ipsString.split(ElasticsearchConstants.TRANSPORT_ADDRESSES_SEPARATOR_REGEX)); + List<InetSocketTransportAddress> addressesTrAd = new ArrayList<InetSocketTransportAddress>(addressesStr.size()); + for (String address : addressesStr) { + String[] split = address.split(ElasticsearchConstants.IP_PORT_SEPARATOR_REGEX); + String hostname; + if (split.length > 0) { + hostname = split[0]; + } else { + throw new IllegalArgumentException(); + } + Integer port = split.length > 1 ? Integer.parseInt(split[1]) : ElasticsearchConstants.DEFAULT_PORT; + addressesTrAd.add(new InetSocketTransportAddress(hostname, port)); + } + return addressesTrAd; + } } http://git-wip-us.apache.org/repos/asf/camel/blob/461e15f9/components/camel-elasticsearch/src/main/java/org/apache/camel/component/elasticsearch/ElasticsearchConfiguration.java ---------------------------------------------------------------------- diff --git a/components/camel-elasticsearch/src/main/java/org/apache/camel/component/elasticsearch/ElasticsearchConfiguration.java b/components/camel-elasticsearch/src/main/java/org/apache/camel/component/elasticsearch/ElasticsearchConfiguration.java index a2933b9..62d0acf 100644 --- a/components/camel-elasticsearch/src/main/java/org/apache/camel/component/elasticsearch/ElasticsearchConfiguration.java +++ b/components/camel-elasticsearch/src/main/java/org/apache/camel/component/elasticsearch/ElasticsearchConfiguration.java @@ -38,31 +38,6 @@ import static org.elasticsearch.node.NodeBuilder.nodeBuilder; @UriParams public class ElasticsearchConfiguration { - public static final String PARAM_OPERATION = "operation"; - public static final String OPERATION_INDEX = "INDEX"; - public static final String OPERATION_BULK = "BULK"; - public static final String OPERATION_BULK_INDEX = "BULK_INDEX"; - public static final String OPERATION_GET_BY_ID = "GET_BY_ID"; - public static final String OPERATION_DELETE = "DELETE"; - public static final String OPERATION_SEARCH = "SEARCH"; - public static final String PARAM_INDEX_ID = "indexId"; - public static final String PARAM_DATA = "data"; - public static final String PARAM_INDEX_NAME = "indexName"; - public static final String PARAM_INDEX_TYPE = "indexType"; - public static final String PARAM_CONSISTENCY_LEVEL = "consistencyLevel"; - public static final String PARAM_REPLICATION_TYPE = "replicationType"; - public static final String TRANSPORT_ADDRESSES = "transportAddresses"; - public static final String PROTOCOL = "elasticsearch"; - private static final String LOCAL_NAME = "local"; - private static final String IP = "ip"; - private static final String PORT = "port"; - private static final Integer DEFAULT_PORT = 9300; - private static final WriteConsistencyLevel DEFAULT_CONSISTENCY_LEVEL = WriteConsistencyLevel.DEFAULT; - private static final ReplicationType DEFAULT_REPLICATION_TYPE = ReplicationType.DEFAULT; - private static final String TRANSPORT_ADDRESSES_SEPARATOR_REGEX = ","; - private static final String IP_PORT_SEPARATOR_REGEX = ":"; - - private URI uri; private boolean local; private List<InetSocketTransportAddress> transportAddressesList; @@ -75,9 +50,9 @@ public class ElasticsearchConfiguration { @UriParam private String indexType; @UriParam(defaultValue = "DEFAULT") - private WriteConsistencyLevel consistencyLevel = DEFAULT_CONSISTENCY_LEVEL; + private WriteConsistencyLevel consistencyLevel = ElasticsearchConstants.DEFAULT_CONSISTENCY_LEVEL; @UriParam(defaultValue = "DEFAULT") - private ReplicationType replicationType = DEFAULT_REPLICATION_TYPE; + private ReplicationType replicationType = ElasticsearchConstants.DEFAULT_REPLICATION_TYPE; @UriParam private Boolean data; @UriParam @@ -85,245 +60,174 @@ public class ElasticsearchConfiguration { @UriParam private String transportAddresses; @UriParam(defaultValue = "9300") - private int port = DEFAULT_PORT; - - public ElasticsearchConfiguration(URI uri, Map<String, Object> parameters) throws Exception { - String protocol = uri.getScheme(); - - if (!protocol.equalsIgnoreCase(PROTOCOL)) { - throw new IllegalArgumentException("unrecognized elasticsearch protocol: " + protocol + " for uri: " + uri); - } - setUri(uri); - if (!isValidAuthority(uri.getAuthority())) { - throw new URISyntaxException(uri.toASCIIString(), "incorrect URI syntax specified for the elasticsearch endpoint." - + "please specify the syntax as \"elasticsearch:[Cluster Name | 'local']?[Query]\""); - } - - if (LOCAL_NAME.equals(uri.getAuthority())) { - setLocal(true); - setClusterName(null); - } else { - setLocal(false); - setClusterName(uri.getAuthority()); - } - - data = toBoolean(parameters.remove(PARAM_DATA)); - - if (data == null) { - data = local; - } - - if (local && !data) { - throw new IllegalArgumentException("invalid to use local node without data"); - } - - indexName = (String)parameters.remove(PARAM_INDEX_NAME); - indexType = (String)parameters.remove(PARAM_INDEX_TYPE); - operation = (String)parameters.remove(PARAM_OPERATION); - consistencyLevel = parseConsistencyLevel(parameters); - replicationType = parseReplicationType(parameters); - - ip = (String)parameters.remove(IP); - transportAddresses = (String) parameters.remove(TRANSPORT_ADDRESSES); - transportAddressesList = parseTransportAddresses(transportAddresses); - - String portParam = (String) parameters.remove(PORT); - port = portParam == null ? DEFAULT_PORT : Integer.valueOf(portParam); - } - - private static boolean isValidAuthority(String authority) throws URISyntaxException { - if (authority.contains(":")) { - return false; - } - return true; - } - - private ReplicationType parseReplicationType(Map<String, Object> parameters) { - Object replicationTypeParam = parameters.remove(PARAM_REPLICATION_TYPE); - if (replicationTypeParam != null) { - return ReplicationType.valueOf(replicationTypeParam.toString()); - } else { - return DEFAULT_REPLICATION_TYPE; - } - } - - private WriteConsistencyLevel parseConsistencyLevel(Map<String, Object> parameters) { - Object consistencyLevelParam = parameters.remove(PARAM_CONSISTENCY_LEVEL); - if (consistencyLevelParam != null) { - return WriteConsistencyLevel.valueOf(consistencyLevelParam.toString()); - } else { - return DEFAULT_CONSISTENCY_LEVEL; - } - } - - private List<InetSocketTransportAddress> parseTransportAddresses(String ipsString) { - if (ipsString == null || ipsString.isEmpty()) { - return null; - } - List<String> addressesStr = Arrays.asList(ipsString.split(TRANSPORT_ADDRESSES_SEPARATOR_REGEX)); - List<InetSocketTransportAddress> addressesTrAd = new ArrayList<InetSocketTransportAddress>(addressesStr.size()); - for (String address : addressesStr) { - String[] split = address.split(IP_PORT_SEPARATOR_REGEX); - String hostname; - if (split.length > 0) { - hostname = split[0]; - } else { - throw new IllegalArgumentException(); - } - Integer port = split.length > 1 ? Integer.parseInt(split[1]) : DEFAULT_PORT; - addressesTrAd.add(new InetSocketTransportAddress(hostname, port)); - } - return addressesTrAd; - } - - protected Boolean toBoolean(Object string) { - if ("true".equals(string)) { - return true; - } else if ("false".equals(string)) { - return false; - } else { - return null; - } - } - - public Node buildNode() { - NodeBuilder builder = nodeBuilder().local(isLocal()).data(isData()); - if (!isLocal() && getClusterName() != null) { - builder.clusterName(getClusterName()); - } - return builder.node(); - } - - public URI getUri() { - return uri; - } - - public void setUri(URI uri) { - this.uri = uri; - } - - public String getClusterName() { - return clusterName; - } - - /** - * Name of cluster or use local for local mode - */ - public void setClusterName(String clusterName) { - this.clusterName = clusterName; - } - - public String getIndexName() { - return indexName; - } - - /** - * The name of the index to act against - */ - public void setIndexName(String indexName) { - this.indexName = indexName; - } - - public String getIndexType() { - return indexType; - } - - /** - * The type of the index to act against - */ - public void setIndexType(String indexType) { - this.indexType = indexType; - } - - public boolean isLocal() { - return local; - } - - public void setLocal(boolean local) { - this.local = local; - } - - public boolean isData() { - return data; - } - - /** - * Is the node going to be allowed to allocate data (shards) to it or not. This setting map to the <tt>node.data</tt> setting. - */ - public void setData(boolean data) { - this.data = data; - } - - /** - * What operation to perform - */ - public void setOperation(String operation) { - this.operation = operation; - } - - public String getOperation() { - return this.operation; - } - - public String getIp() { - return ip; - } - - /** - * The TransportClient remote host ip to use - */ - public void setIp(String ip) { - this.ip = ip; - } - - public List<InetSocketTransportAddress> getTransportAddressesList() { - return transportAddressesList; - } - - public String getTransportAddresses() { - return transportAddresses; - } - - /** - * Comma separated list with ip:port formatted remote transport addresses to use. - * The ip and port options must be left blank for transportAddresses to be considered instead. - */ - public void setTransportAddresses(String transportAddresses) { - this.transportAddresses = transportAddresses; - this.transportAddressesList = parseTransportAddresses(transportAddresses); - } - - public int getPort() { - return port; - } - - /** - * The TransportClient remote port to use (defaults to 9300) - */ - public void setPort(int port) { - this.port = port; - } - - /** - * The write consistency level to use with INDEX and BULK operations (can be any of ONE, QUORUM, ALL or DEFAULT) - */ - public void setConsistencyLevel(WriteConsistencyLevel consistencyLevel) { - this.consistencyLevel = consistencyLevel; - } - - public WriteConsistencyLevel getConsistencyLevel() { - return consistencyLevel; - } - - /** - * The replication type to use with INDEX and BULK operations (can be any of SYNC, ASYNC or DEFAULT) - */ - public void setReplicationType(ReplicationType replicationType) { - this.replicationType = replicationType; - } - - public ReplicationType getReplicationType() { - return replicationType; - } + private int port = ElasticsearchConstants.DEFAULT_PORT; + + + + public boolean isLocal() { + return local; + } + + + + + public void setLocal(boolean local) { + this.local = local; + } + + + + + public List<InetSocketTransportAddress> getTransportAddressesList() { + return transportAddressesList; + } + + + + + public void setTransportAddressesList( + List<InetSocketTransportAddress> transportAddressesList) { + this.transportAddressesList = transportAddressesList; + } + + + + + public String getClusterName() { + return clusterName; + } + + + + + public void setClusterName(String clusterName) { + this.clusterName = clusterName; + } + + + + + public String getOperation() { + return operation; + } + + + + + public void setOperation(String operation) { + this.operation = operation; + } + + + + + public String getIndexName() { + return indexName; + } + + + + + public void setIndexName(String indexName) { + this.indexName = indexName; + } + + + + + public String getIndexType() { + return indexType; + } + + + + + public void setIndexType(String indexType) { + this.indexType = indexType; + } + + + + + public WriteConsistencyLevel getConsistencyLevel() { + return consistencyLevel; + } + + + + + public void setConsistencyLevel(WriteConsistencyLevel consistencyLevel) { + this.consistencyLevel = consistencyLevel; + } + + + + + public ReplicationType getReplicationType() { + return replicationType; + } + + + + + public void setReplicationType(ReplicationType replicationType) { + this.replicationType = replicationType; + } + + + + + public Boolean getData() { + return data; + } + + + + + public void setData(Boolean data) { + this.data = data; + } + + + + + public String getIp() { + return ip; + } + + + + + public void setIp(String ip) { + this.ip = ip; + } + + + + + public String getTransportAddresses() { + return transportAddresses; + } + + + + + public void setTransportAddresses(String transportAddresses) { + this.transportAddresses = transportAddresses; + } + + + + + public int getPort() { + return port; + } + + + + + public void setPort(int port) { + this.port = port; + } } http://git-wip-us.apache.org/repos/asf/camel/blob/461e15f9/components/camel-elasticsearch/src/main/java/org/apache/camel/component/elasticsearch/ElasticsearchConstants.java ---------------------------------------------------------------------- diff --git a/components/camel-elasticsearch/src/main/java/org/apache/camel/component/elasticsearch/ElasticsearchConstants.java b/components/camel-elasticsearch/src/main/java/org/apache/camel/component/elasticsearch/ElasticsearchConstants.java new file mode 100644 index 0000000..998047a --- /dev/null +++ b/components/camel-elasticsearch/src/main/java/org/apache/camel/component/elasticsearch/ElasticsearchConstants.java @@ -0,0 +1,48 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.camel.component.elasticsearch; + +import org.elasticsearch.action.WriteConsistencyLevel; +import org.elasticsearch.action.support.replication.ReplicationType; + +public interface ElasticsearchConstants { + + String PARAM_OPERATION = "operation"; + String OPERATION_INDEX = "INDEX"; + String OPERATION_BULK = "BULK"; + String OPERATION_BULK_INDEX = "BULK_INDEX"; + String OPERATION_GET_BY_ID = "GET_BY_ID"; + String OPERATION_DELETE = "DELETE"; + String OPERATION_SEARCH = "SEARCH"; + String PARAM_INDEX_ID = "indexId"; + String PARAM_DATA = "data"; + String PARAM_INDEX_NAME = "indexName"; + String PARAM_INDEX_TYPE = "indexType"; + String PARAM_CONSISTENCY_LEVEL = "consistencyLevel"; + String PARAM_REPLICATION_TYPE = "replicationType"; + String TRANSPORT_ADDRESSES = "transportAddresses"; + String PROTOCOL = "elasticsearch"; + String LOCAL_NAME = "local"; + String IP = "ip"; + String PORT = "port"; + Integer DEFAULT_PORT = 9300; + WriteConsistencyLevel DEFAULT_CONSISTENCY_LEVEL = WriteConsistencyLevel.DEFAULT; + ReplicationType DEFAULT_REPLICATION_TYPE = ReplicationType.DEFAULT; + String TRANSPORT_ADDRESSES_SEPARATOR_REGEX = ","; + String IP_PORT_SEPARATOR_REGEX = ":"; +} http://git-wip-us.apache.org/repos/asf/camel/blob/461e15f9/components/camel-elasticsearch/src/main/java/org/apache/camel/component/elasticsearch/ElasticsearchEndpoint.java ---------------------------------------------------------------------- diff --git a/components/camel-elasticsearch/src/main/java/org/apache/camel/component/elasticsearch/ElasticsearchEndpoint.java b/components/camel-elasticsearch/src/main/java/org/apache/camel/component/elasticsearch/ElasticsearchEndpoint.java index 61e5a4f..979a93a 100644 --- a/components/camel-elasticsearch/src/main/java/org/apache/camel/component/elasticsearch/ElasticsearchEndpoint.java +++ b/components/camel-elasticsearch/src/main/java/org/apache/camel/component/elasticsearch/ElasticsearchEndpoint.java @@ -16,6 +16,8 @@ */ package org.apache.camel.component.elasticsearch; +import static org.elasticsearch.node.NodeBuilder.nodeBuilder; + import java.net.URI; import java.util.ArrayList; import java.util.List; @@ -34,6 +36,7 @@ import org.elasticsearch.common.settings.Settings; import org.elasticsearch.common.transport.InetSocketTransportAddress; import org.elasticsearch.common.transport.TransportAddress; import org.elasticsearch.node.Node; +import org.elasticsearch.node.NodeBuilder; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -50,9 +53,9 @@ public class ElasticsearchEndpoint extends DefaultEndpoint { @UriParam private ElasticsearchConfiguration configuration; - public ElasticsearchEndpoint(String uri, ElasticsearchComponent component, Map<String, Object> parameters) throws Exception { + public ElasticsearchEndpoint(String uri, ElasticsearchComponent component, ElasticsearchConfiguration config) throws Exception { super(uri, component); - this.configuration = new ElasticsearchConfiguration(new URI(uri), parameters); + this.configuration = config; } public Producer createProducer() throws Exception { @@ -89,7 +92,11 @@ public class ElasticsearchEndpoint extends DefaultEndpoint { this.client = new TransportClient(getSettings()) .addTransportAddresses(addresses.toArray(new TransportAddress[addresses.size()])); } else { - node = configuration.buildNode(); + NodeBuilder builder = nodeBuilder().local(configuration.isLocal()).data(configuration.getData()); + if (!configuration.isLocal() && configuration.getClusterName() != null) { + builder.clusterName(configuration.getClusterName()); + } + node = builder.node(); client = node.client(); } } http://git-wip-us.apache.org/repos/asf/camel/blob/461e15f9/components/camel-elasticsearch/src/main/java/org/apache/camel/component/elasticsearch/ElasticsearchProducer.java ---------------------------------------------------------------------- diff --git a/components/camel-elasticsearch/src/main/java/org/apache/camel/component/elasticsearch/ElasticsearchProducer.java b/components/camel-elasticsearch/src/main/java/org/apache/camel/component/elasticsearch/ElasticsearchProducer.java index 0ededde..3a1afdb 100644 --- a/components/camel-elasticsearch/src/main/java/org/apache/camel/component/elasticsearch/ElasticsearchProducer.java +++ b/components/camel-elasticsearch/src/main/java/org/apache/camel/component/elasticsearch/ElasticsearchProducer.java @@ -57,28 +57,28 @@ public class ElasticsearchProducer extends DefaultProducer { Object request = exchange.getIn().getBody(); if (request instanceof IndexRequest) { - return ElasticsearchConfiguration.OPERATION_INDEX; + return ElasticsearchConstants.OPERATION_INDEX; } else if (request instanceof GetRequest) { - return ElasticsearchConfiguration.OPERATION_GET_BY_ID; + return ElasticsearchConstants.OPERATION_GET_BY_ID; } else if (request instanceof BulkRequest) { // do we want bulk or bulk_index? if ("BULK_INDEX".equals(getEndpoint().getConfig().getOperation())) { - return ElasticsearchConfiguration.OPERATION_BULK_INDEX; + return ElasticsearchConstants.OPERATION_BULK_INDEX; } else { - return ElasticsearchConfiguration.OPERATION_BULK; + return ElasticsearchConstants.OPERATION_BULK; } } else if (request instanceof DeleteRequest) { - return ElasticsearchConfiguration.OPERATION_DELETE; + return ElasticsearchConstants.OPERATION_DELETE; } else if (request instanceof SearchRequest) { - return ElasticsearchConfiguration.OPERATION_SEARCH; + return ElasticsearchConstants.OPERATION_SEARCH; } - String operationConfig = exchange.getIn().getHeader(ElasticsearchConfiguration.PARAM_OPERATION, String.class); + String operationConfig = exchange.getIn().getHeader(ElasticsearchConstants.PARAM_OPERATION, String.class); if (operationConfig == null) { operationConfig = getEndpoint().getConfig().getOperation(); } if (operationConfig == null) { - throw new IllegalArgumentException(ElasticsearchConfiguration.PARAM_OPERATION + " value '" + operationConfig + "' is not supported"); + throw new IllegalArgumentException(ElasticsearchConstants.PARAM_OPERATION + " value '" + operationConfig + "' is not supported"); } return operationConfig; } @@ -100,58 +100,58 @@ public class ElasticsearchProducer extends DefaultProducer { // Set the index/type headers on the exchange if necessary. This is used // for type conversion. boolean configIndexName = false; - String indexName = message.getHeader(ElasticsearchConfiguration.PARAM_INDEX_NAME, String.class); + String indexName = message.getHeader(ElasticsearchConstants.PARAM_INDEX_NAME, String.class); if (indexName == null) { - message.setHeader(ElasticsearchConfiguration.PARAM_INDEX_NAME, getEndpoint().getConfig().getIndexName()); + message.setHeader(ElasticsearchConstants.PARAM_INDEX_NAME, getEndpoint().getConfig().getIndexName()); configIndexName = true; } boolean configIndexType = false; - String indexType = message.getHeader(ElasticsearchConfiguration.PARAM_INDEX_TYPE, String.class); + String indexType = message.getHeader(ElasticsearchConstants.PARAM_INDEX_TYPE, String.class); if (indexType == null) { - message.setHeader(ElasticsearchConfiguration.PARAM_INDEX_TYPE, getEndpoint().getConfig().getIndexType()); + message.setHeader(ElasticsearchConstants.PARAM_INDEX_TYPE, getEndpoint().getConfig().getIndexType()); configIndexType = true; } boolean configConsistencyLevel = false; - String consistencyLevel = message.getHeader(ElasticsearchConfiguration.PARAM_CONSISTENCY_LEVEL, String.class); + String consistencyLevel = message.getHeader(ElasticsearchConstants.PARAM_CONSISTENCY_LEVEL, String.class); if (consistencyLevel == null) { - message.setHeader(ElasticsearchConfiguration.PARAM_CONSISTENCY_LEVEL, getEndpoint().getConfig().getConsistencyLevel()); + message.setHeader(ElasticsearchConstants.PARAM_CONSISTENCY_LEVEL, getEndpoint().getConfig().getConsistencyLevel()); configConsistencyLevel = true; } boolean configReplicationType = false; - String replicationType = message.getHeader(ElasticsearchConfiguration.PARAM_REPLICATION_TYPE, String.class); + String replicationType = message.getHeader(ElasticsearchConstants.PARAM_REPLICATION_TYPE, String.class); if (replicationType == null) { - message.setHeader(ElasticsearchConfiguration.PARAM_REPLICATION_TYPE, getEndpoint().getConfig().getReplicationType()); + message.setHeader(ElasticsearchConstants.PARAM_REPLICATION_TYPE, getEndpoint().getConfig().getReplicationType()); configReplicationType = true; } Client client = getEndpoint().getClient(); - if (ElasticsearchConfiguration.OPERATION_INDEX.equals(operation)) { + if (ElasticsearchConstants.OPERATION_INDEX.equals(operation)) { IndexRequest indexRequest = message.getBody(IndexRequest.class); message.setBody(client.index(indexRequest).actionGet().getId()); - } else if (ElasticsearchConfiguration.OPERATION_GET_BY_ID.equals(operation)) { + } else if (ElasticsearchConstants.OPERATION_GET_BY_ID.equals(operation)) { GetRequest getRequest = message.getBody(GetRequest.class); message.setBody(client.get(getRequest)); - } else if (ElasticsearchConfiguration.OPERATION_BULK.equals(operation)) { + } else if (ElasticsearchConstants.OPERATION_BULK.equals(operation)) { BulkRequest bulkRequest = message.getBody(BulkRequest.class); message.setBody(client.bulk(bulkRequest).actionGet()); - } else if (ElasticsearchConfiguration.OPERATION_BULK_INDEX.equals(operation)) { + } else if (ElasticsearchConstants.OPERATION_BULK_INDEX.equals(operation)) { BulkRequest bulkRequest = message.getBody(BulkRequest.class); List<String> indexedIds = new ArrayList<String>(); for (BulkItemResponse response : client.bulk(bulkRequest).actionGet().getItems()) { indexedIds.add(response.getId()); } message.setBody(indexedIds); - } else if (ElasticsearchConfiguration.OPERATION_DELETE.equals(operation)) { + } else if (ElasticsearchConstants.OPERATION_DELETE.equals(operation)) { DeleteRequest deleteRequest = message.getBody(DeleteRequest.class); message.setBody(client.delete(deleteRequest).actionGet()); - } else if (ElasticsearchConfiguration.OPERATION_SEARCH.equals(operation)) { + } else if (ElasticsearchConstants.OPERATION_SEARCH.equals(operation)) { SearchRequest searchRequest = message.getBody(SearchRequest.class); message.setBody(client.search(searchRequest).actionGet()); } else { - throw new IllegalArgumentException(ElasticsearchConfiguration.PARAM_OPERATION + " value '" + operation + "' is not supported"); + throw new IllegalArgumentException(ElasticsearchConstants.PARAM_OPERATION + " value '" + operation + "' is not supported"); } // If we set params via the configuration on this exchange, remove them @@ -163,19 +163,19 @@ public class ElasticsearchProducer extends DefaultProducer { // elasticsearch endpoints would have the effect overriding any // subsequent endpoint index/type with the first endpoint index/type. if (configIndexName) { - message.removeHeader(ElasticsearchConfiguration.PARAM_INDEX_NAME); + message.removeHeader(ElasticsearchConstants.PARAM_INDEX_NAME); } if (configIndexType) { - message.removeHeader(ElasticsearchConfiguration.PARAM_INDEX_TYPE); + message.removeHeader(ElasticsearchConstants.PARAM_INDEX_TYPE); } if (configConsistencyLevel) { - message.removeHeader(ElasticsearchConfiguration.PARAM_CONSISTENCY_LEVEL); + message.removeHeader(ElasticsearchConstants.PARAM_CONSISTENCY_LEVEL); } if (configReplicationType) { - message.removeHeader(ElasticsearchConfiguration.PARAM_REPLICATION_TYPE); + message.removeHeader(ElasticsearchConstants.PARAM_REPLICATION_TYPE); } } http://git-wip-us.apache.org/repos/asf/camel/blob/461e15f9/components/camel-elasticsearch/src/main/java/org/apache/camel/component/elasticsearch/converter/ElasticsearchActionRequestConverter.java ---------------------------------------------------------------------- diff --git a/components/camel-elasticsearch/src/main/java/org/apache/camel/component/elasticsearch/converter/ElasticsearchActionRequestConverter.java b/components/camel-elasticsearch/src/main/java/org/apache/camel/component/elasticsearch/converter/ElasticsearchActionRequestConverter.java index a64f843..26f0125 100644 --- a/components/camel-elasticsearch/src/main/java/org/apache/camel/component/elasticsearch/converter/ElasticsearchActionRequestConverter.java +++ b/components/camel-elasticsearch/src/main/java/org/apache/camel/component/elasticsearch/converter/ElasticsearchActionRequestConverter.java @@ -21,6 +21,7 @@ import java.util.Map; import org.apache.camel.Converter; import org.apache.camel.Exchange; +import org.apache.camel.component.elasticsearch.ElasticsearchConstants; import org.apache.camel.component.elasticsearch.ElasticsearchConfiguration; import org.elasticsearch.action.WriteConsistencyLevel; import org.elasticsearch.action.bulk.BulkRequest; @@ -54,27 +55,27 @@ public final class ElasticsearchActionRequestConverter { return indexRequest .consistencyLevel(exchange.getIn().getHeader( - ElasticsearchConfiguration.PARAM_CONSISTENCY_LEVEL, WriteConsistencyLevel.class)) + ElasticsearchConstants.PARAM_CONSISTENCY_LEVEL, WriteConsistencyLevel.class)) .replicationType(exchange.getIn().getHeader( - ElasticsearchConfiguration.PARAM_REPLICATION_TYPE, ReplicationType.class)) + ElasticsearchConstants.PARAM_REPLICATION_TYPE, ReplicationType.class)) .index(exchange.getIn().getHeader( - ElasticsearchConfiguration.PARAM_INDEX_NAME, String.class)) + ElasticsearchConstants.PARAM_INDEX_NAME, String.class)) .type(exchange.getIn().getHeader( - ElasticsearchConfiguration.PARAM_INDEX_TYPE, String.class)); + ElasticsearchConstants.PARAM_INDEX_TYPE, String.class)); } @Converter public static IndexRequest toIndexRequest(Object document, Exchange exchange) { return createIndexRequest(document, exchange) - .id(exchange.getIn().getHeader(ElasticsearchConfiguration.PARAM_INDEX_ID, String.class)); + .id(exchange.getIn().getHeader(ElasticsearchConstants.PARAM_INDEX_ID, String.class)); } @Converter public static GetRequest toGetRequest(String id, Exchange exchange) { return new GetRequest(exchange.getIn().getHeader( - ElasticsearchConfiguration.PARAM_INDEX_NAME, String.class)) + ElasticsearchConstants.PARAM_INDEX_NAME, String.class)) .type(exchange.getIn().getHeader( - ElasticsearchConfiguration.PARAM_INDEX_TYPE, + ElasticsearchConstants.PARAM_INDEX_TYPE, String.class)).id(id); } @@ -82,10 +83,10 @@ public final class ElasticsearchActionRequestConverter { public static DeleteRequest toDeleteRequest(String id, Exchange exchange) { return new DeleteRequest() .index(exchange.getIn().getHeader( - ElasticsearchConfiguration.PARAM_INDEX_NAME, + ElasticsearchConstants.PARAM_INDEX_NAME, String.class)) .type(exchange.getIn().getHeader( - ElasticsearchConfiguration.PARAM_INDEX_TYPE, + ElasticsearchConstants.PARAM_INDEX_TYPE, String.class)).id(id); } @@ -93,9 +94,9 @@ public final class ElasticsearchActionRequestConverter { public static SearchRequest toSearchRequest(Object queryObject, Exchange exchange) { Map<?, ?> query = exchange.getContext().getTypeConverter().convertTo(Map.class, queryObject); return new SearchRequest(exchange.getIn().getHeader( - ElasticsearchConfiguration.PARAM_INDEX_NAME, String.class)) + ElasticsearchConstants.PARAM_INDEX_NAME, String.class)) .types(exchange.getIn().getHeader( - ElasticsearchConfiguration.PARAM_INDEX_TYPE, + ElasticsearchConstants.PARAM_INDEX_TYPE, String.class)).source(query); } http://git-wip-us.apache.org/repos/asf/camel/blob/461e15f9/components/camel-elasticsearch/src/test/java/org/apache/camel/component/elasticsearch/ElasticsearchComponentTest.java ---------------------------------------------------------------------- diff --git a/components/camel-elasticsearch/src/test/java/org/apache/camel/component/elasticsearch/ElasticsearchComponentTest.java b/components/camel-elasticsearch/src/test/java/org/apache/camel/component/elasticsearch/ElasticsearchComponentTest.java index 11357c4..78d3196 100644 --- a/components/camel-elasticsearch/src/test/java/org/apache/camel/component/elasticsearch/ElasticsearchComponentTest.java +++ b/components/camel-elasticsearch/src/test/java/org/apache/camel/component/elasticsearch/ElasticsearchComponentTest.java @@ -180,9 +180,9 @@ public class ElasticsearchComponentTest extends CamelTestSupport { public void testIndexWithHeaders() throws Exception { Map<String, String> map = createIndexedData(); Map<String, Object> headers = new HashMap<String, Object>(); - headers.put(ElasticsearchConfiguration.PARAM_OPERATION, ElasticsearchConfiguration.OPERATION_INDEX); - headers.put(ElasticsearchConfiguration.PARAM_INDEX_NAME, "twitter"); - headers.put(ElasticsearchConfiguration.PARAM_INDEX_TYPE, "tweet"); + headers.put(ElasticsearchConstants.PARAM_OPERATION, ElasticsearchConstants.OPERATION_INDEX); + headers.put(ElasticsearchConstants.PARAM_INDEX_NAME, "twitter"); + headers.put(ElasticsearchConstants.PARAM_INDEX_TYPE, "tweet"); String indexId = template.requestBodyAndHeaders("direct:start", map, headers, String.class); assertNotNull("indexId should be set", indexId); @@ -192,10 +192,10 @@ public class ElasticsearchComponentTest extends CamelTestSupport { public void testIndexWithIDInHeader() throws Exception { Map<String, String> map = createIndexedData(); Map<String, Object> headers = new HashMap<String, Object>(); - headers.put(ElasticsearchConfiguration.PARAM_OPERATION, ElasticsearchConfiguration.OPERATION_INDEX); - headers.put(ElasticsearchConfiguration.PARAM_INDEX_NAME, "twitter"); - headers.put(ElasticsearchConfiguration.PARAM_INDEX_TYPE, "tweet"); - headers.put(ElasticsearchConfiguration.PARAM_INDEX_ID, "123"); + headers.put(ElasticsearchConstants.PARAM_OPERATION, ElasticsearchConstants.OPERATION_INDEX); + headers.put(ElasticsearchConstants.PARAM_INDEX_NAME, "twitter"); + headers.put(ElasticsearchConstants.PARAM_INDEX_TYPE, "tweet"); + headers.put(ElasticsearchConstants.PARAM_INDEX_ID, "123"); String indexId = template.requestBodyAndHeaders("direct:start", map, headers, String.class); assertNotNull("indexId should be set", indexId); @@ -207,9 +207,9 @@ public class ElasticsearchComponentTest extends CamelTestSupport { public void indexWithIp() throws Exception { Map<String, String> map = createIndexedData(); Map<String, Object> headers = new HashMap<String, Object>(); - headers.put(ElasticsearchConfiguration.PARAM_OPERATION, ElasticsearchConfiguration.OPERATION_INDEX); - headers.put(ElasticsearchConfiguration.PARAM_INDEX_NAME, "twitter"); - headers.put(ElasticsearchConfiguration.PARAM_INDEX_TYPE, "tweet"); + headers.put(ElasticsearchConstants.PARAM_OPERATION, ElasticsearchConstants.OPERATION_INDEX); + headers.put(ElasticsearchConstants.PARAM_INDEX_NAME, "twitter"); + headers.put(ElasticsearchConstants.PARAM_INDEX_TYPE, "tweet"); String indexId = template.requestBodyAndHeaders("direct:indexWithIp", map, headers, String.class); assertNotNull("indexId should be set", indexId); @@ -220,9 +220,9 @@ public class ElasticsearchComponentTest extends CamelTestSupport { public void indexWithIpAndPort() throws Exception { Map<String, String> map = createIndexedData(); Map<String, Object> headers = new HashMap<String, Object>(); - headers.put(ElasticsearchConfiguration.PARAM_OPERATION, ElasticsearchConfiguration.OPERATION_INDEX); - headers.put(ElasticsearchConfiguration.PARAM_INDEX_NAME, "twitter"); - headers.put(ElasticsearchConfiguration.PARAM_INDEX_TYPE, "tweet"); + headers.put(ElasticsearchConstants.PARAM_OPERATION, ElasticsearchConstants.OPERATION_INDEX); + headers.put(ElasticsearchConstants.PARAM_INDEX_NAME, "twitter"); + headers.put(ElasticsearchConstants.PARAM_INDEX_TYPE, "tweet"); String indexId = template.requestBodyAndHeaders("direct:indexWithIpAndPort", map, headers, String.class); assertNotNull("indexId should be set", indexId); @@ -233,9 +233,9 @@ public class ElasticsearchComponentTest extends CamelTestSupport { public void indexWithTransportAddresses() throws Exception { Map<String, String> map = createIndexedData(); Map<String, Object> headers = new HashMap<String, Object>(); - headers.put(ElasticsearchConfiguration.PARAM_OPERATION, ElasticsearchConfiguration.OPERATION_INDEX); - headers.put(ElasticsearchConfiguration.PARAM_INDEX_NAME, "twitter"); - headers.put(ElasticsearchConfiguration.PARAM_INDEX_TYPE, "tweet"); + headers.put(ElasticsearchConstants.PARAM_OPERATION, ElasticsearchConstants.OPERATION_INDEX); + headers.put(ElasticsearchConstants.PARAM_INDEX_NAME, "twitter"); + headers.put(ElasticsearchConstants.PARAM_INDEX_TYPE, "tweet"); String indexId = template.requestBodyAndHeaders("direct:indexWithTransportAddresses", map, headers, String.class); assertNotNull("indexId should be set", indexId); @@ -246,9 +246,9 @@ public class ElasticsearchComponentTest extends CamelTestSupport { public void indexWithIpAndTransportAddresses() throws Exception { Map<String, String> map = createIndexedData(); Map<String, Object> headers = new HashMap<String, Object>(); - headers.put(ElasticsearchConfiguration.PARAM_OPERATION, ElasticsearchConfiguration.OPERATION_INDEX); - headers.put(ElasticsearchConfiguration.PARAM_INDEX_NAME, "twitter"); - headers.put(ElasticsearchConfiguration.PARAM_INDEX_TYPE, "tweet"); + headers.put(ElasticsearchConstants.PARAM_OPERATION, ElasticsearchConstants.OPERATION_INDEX); + headers.put(ElasticsearchConstants.PARAM_INDEX_NAME, "twitter"); + headers.put(ElasticsearchConstants.PARAM_INDEX_TYPE, "tweet"); //should ignore transport addresses configuration String indexId = template.requestBodyAndHeaders("direct:indexWithIpAndTransportAddresses", map, headers, String.class); @@ -260,14 +260,14 @@ public class ElasticsearchComponentTest extends CamelTestSupport { //first, INDEX a value Map<String, String> map = createIndexedData(); Map<String, Object> headers = new HashMap<String, Object>(); - headers.put(ElasticsearchConfiguration.PARAM_OPERATION, ElasticsearchConfiguration.OPERATION_INDEX); - headers.put(ElasticsearchConfiguration.PARAM_INDEX_NAME, "twitter"); - headers.put(ElasticsearchConfiguration.PARAM_INDEX_TYPE, "tweet"); + headers.put(ElasticsearchConstants.PARAM_OPERATION, ElasticsearchConstants.OPERATION_INDEX); + headers.put(ElasticsearchConstants.PARAM_INDEX_NAME, "twitter"); + headers.put(ElasticsearchConstants.PARAM_INDEX_TYPE, "tweet"); String indexId = template.requestBodyAndHeaders("direct:start", map, headers, String.class); //now, verify GET - headers.put(ElasticsearchConfiguration.PARAM_OPERATION, ElasticsearchConfiguration.OPERATION_GET_BY_ID); + headers.put(ElasticsearchConstants.PARAM_OPERATION, ElasticsearchConstants.OPERATION_GET_BY_ID); GetResponse response = template.requestBodyAndHeaders("direct:start", indexId, headers, GetResponse.class); assertNotNull("response should not be null", response); assertNotNull("response source should not be null", response.getSource()); @@ -278,25 +278,25 @@ public class ElasticsearchComponentTest extends CamelTestSupport { //first, INDEX a value Map<String, String> map = createIndexedData(); Map<String, Object> headers = new HashMap<String, Object>(); - headers.put(ElasticsearchConfiguration.PARAM_OPERATION, ElasticsearchConfiguration.OPERATION_INDEX); - headers.put(ElasticsearchConfiguration.PARAM_INDEX_NAME, "twitter"); - headers.put(ElasticsearchConfiguration.PARAM_INDEX_TYPE, "tweet"); + headers.put(ElasticsearchConstants.PARAM_OPERATION, ElasticsearchConstants.OPERATION_INDEX); + headers.put(ElasticsearchConstants.PARAM_INDEX_NAME, "twitter"); + headers.put(ElasticsearchConstants.PARAM_INDEX_TYPE, "tweet"); String indexId = template.requestBodyAndHeaders("direct:start", map, headers, String.class); //now, verify GET - headers.put(ElasticsearchConfiguration.PARAM_OPERATION, ElasticsearchConfiguration.OPERATION_GET_BY_ID); + headers.put(ElasticsearchConstants.PARAM_OPERATION, ElasticsearchConstants.OPERATION_GET_BY_ID); GetResponse response = template.requestBodyAndHeaders("direct:start", indexId, headers, GetResponse.class); assertNotNull("response should not be null", response); assertNotNull("response source should not be null", response.getSource()); //now, perform DELETE - headers.put(ElasticsearchConfiguration.PARAM_OPERATION, ElasticsearchConfiguration.OPERATION_DELETE); + headers.put(ElasticsearchConstants.PARAM_OPERATION, ElasticsearchConstants.OPERATION_DELETE); DeleteResponse deleteResponse = template.requestBodyAndHeaders("direct:start", indexId, headers, DeleteResponse.class); assertNotNull("response should not be null", deleteResponse); //now, verify GET fails to find the indexed value - headers.put(ElasticsearchConfiguration.PARAM_OPERATION, ElasticsearchConfiguration.OPERATION_GET_BY_ID); + headers.put(ElasticsearchConstants.PARAM_OPERATION, ElasticsearchConstants.OPERATION_GET_BY_ID); response = template.requestBodyAndHeaders("direct:start", indexId, headers, GetResponse.class); assertNotNull("response should not be null", response); assertNull("response source should be null", response.getSource()); http://git-wip-us.apache.org/repos/asf/camel/blob/461e15f9/components/camel-elasticsearch/src/test/java/org/apache/camel/component/elasticsearch/ElasticsearchConfigurationTest.java ---------------------------------------------------------------------- diff --git a/components/camel-elasticsearch/src/test/java/org/apache/camel/component/elasticsearch/ElasticsearchConfigurationTest.java b/components/camel-elasticsearch/src/test/java/org/apache/camel/component/elasticsearch/ElasticsearchConfigurationTest.java deleted file mode 100644 index 6690a5c..0000000 --- a/components/camel-elasticsearch/src/test/java/org/apache/camel/component/elasticsearch/ElasticsearchConfigurationTest.java +++ /dev/null @@ -1,204 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.camel.component.elasticsearch; - -import java.net.URI; -import java.util.Map; - -import org.apache.camel.test.junit4.CamelTestSupport; -import org.apache.camel.util.URISupport; -import org.elasticsearch.action.WriteConsistencyLevel; -import org.elasticsearch.action.support.replication.ReplicationType; - -import org.junit.Test; - -public class ElasticsearchConfigurationTest extends CamelTestSupport { - - @Test - public void localNode() throws Exception { - URI uri = new URI("elasticsearch://local?operation=INDEX&indexName=twitter&indexType=tweet"); - Map<String, Object> parameters = URISupport.parseParameters(uri); - ElasticsearchConfiguration conf = new ElasticsearchConfiguration(uri, parameters); - assertTrue(conf.isLocal()); - assertEquals("twitter", conf.getIndexName()); - assertEquals("tweet", conf.getIndexType()); - assertEquals("INDEX", conf.getOperation()); - assertTrue(conf.isData()); - assertNull(conf.getClusterName()); - } - - @Test(expected = IllegalArgumentException.class) - public void localNonDataNodeThrowsIllegalArgumentException() throws Exception { - URI uri = new URI("elasticsearch://local?operation=INDEX&indexName=twitter&indexType=tweet&data=false"); - Map<String, Object> parameters = URISupport.parseParameters(uri); - new ElasticsearchConfiguration(uri, parameters); - } - - @Test - public void localConfDefaultsToDataNode() throws Exception { - URI uri = new URI("elasticsearch://local?operation=INDEX&indexName=twitter&indexType=tweet"); - Map<String, Object> parameters = URISupport.parseParameters(uri); - ElasticsearchConfiguration conf = new ElasticsearchConfiguration(uri, parameters); - assertEquals("INDEX", conf.getOperation()); - assertTrue(conf.isLocal()); - assertTrue(conf.isData()); - } - - @Test - public void clusterConfDefaultsToNonDataNode() throws Exception { - URI uri = new URI("elasticsearch://clustername?operation=INDEX&indexName=twitter&indexType=tweet"); - Map<String, Object> parameters = URISupport.parseParameters(uri); - ElasticsearchConfiguration conf = new ElasticsearchConfiguration(uri, parameters); - assertEquals("clustername", conf.getClusterName()); - assertEquals("INDEX", conf.getOperation()); - assertFalse(conf.isLocal()); - assertFalse(conf.isData()); - } - - @Test - public void clusterConfWithIpAddress() throws Exception { - URI uri = new URI("elasticsearch://clustername?operation=INDEX&indexName=twitter&indexType=tweet&ip=127.0.0.1"); - Map<String, Object> parameters = URISupport.parseParameters(uri); - ElasticsearchConfiguration conf = new ElasticsearchConfiguration(uri, parameters); - assertEquals("clustername", conf.getClusterName()); - assertEquals("INDEX", conf.getOperation()); - assertFalse(conf.isLocal()); - assertFalse(conf.isData()); - assertEquals("127.0.0.1", conf.getIp()); - assertEquals(9300, conf.getPort()); - } - - @Test - public void localDataNode() throws Exception { - URI uri = new URI("elasticsearch://local?operation=INDEX&indexName=twitter&indexType=tweet&data=true"); - Map<String, Object> parameters = URISupport.parseParameters(uri); - ElasticsearchConfiguration conf = new ElasticsearchConfiguration(uri, parameters); - assertTrue(conf.isLocal()); - assertEquals("INDEX", conf.getOperation()); - assertEquals("twitter", conf.getIndexName()); - assertEquals("tweet", conf.getIndexType()); - assertTrue(conf.isData()); - assertNull(conf.getClusterName()); - } - - @Test - public void writeConsistencyLevelDefaultConfTest() throws Exception { - URI uri = new URI("elasticsearch://local?operation=INDEX&indexName=twitter&indexType=tweet"); - Map<String, Object> parameters = URISupport.parseParameters(uri); - ElasticsearchConfiguration conf = new ElasticsearchConfiguration(uri, parameters); - assertTrue(conf.isLocal()); - assertEquals("INDEX", conf.getOperation()); - assertEquals("twitter", conf.getIndexName()); - assertEquals("tweet", conf.getIndexType()); - assertEquals(WriteConsistencyLevel.DEFAULT, conf.getConsistencyLevel()); - assertNull(conf.getClusterName()); - } - - @Test - public void writeConsistencyLevelConfTest() throws Exception { - URI uri = new URI("elasticsearch://local?operation=INDEX&indexName=twitter&indexType=tweet&consistencyLevel=QUORUM"); - Map<String, Object> parameters = URISupport.parseParameters(uri); - ElasticsearchConfiguration conf = new ElasticsearchConfiguration(uri, parameters); - assertTrue(conf.isLocal()); - assertEquals("INDEX", conf.getOperation()); - assertEquals("twitter", conf.getIndexName()); - assertEquals("tweet", conf.getIndexType()); - assertEquals(WriteConsistencyLevel.QUORUM, conf.getConsistencyLevel()); - assertNull(conf.getClusterName()); - } - - @Test - public void replicationTypeConfTest() throws Exception { - URI uri = new URI("elasticsearch://local?operation=INDEX&indexName=twitter&indexType=tweet&replicationType=ASYNC"); - Map<String, Object> parameters = URISupport.parseParameters(uri); - ElasticsearchConfiguration conf = new ElasticsearchConfiguration(uri, parameters); - assertDefaultConfigurationParameters(conf); - assertEquals(ReplicationType.ASYNC, conf.getReplicationType()); - } - - @Test - public void replicationTypeDefaultConfTest() throws Exception { - URI uri = new URI("elasticsearch://local?operation=INDEX&indexName=twitter&indexType=tweet"); - Map<String, Object> parameters = URISupport.parseParameters(uri); - ElasticsearchConfiguration conf = new ElasticsearchConfiguration(uri, parameters); - assertDefaultConfigurationParameters(conf); - assertEquals(ReplicationType.DEFAULT, conf.getReplicationType()); - } - - @Test - public void transportAddressesSimpleHostnameTest() throws Exception { - URI uri = new URI("elasticsearch://local?operation=INDEX&indexName=twitter&" - + "indexType=tweet&transportAddresses=127.0.0.1"); - Map<String, Object> parameters = URISupport.parseParameters(uri); - ElasticsearchConfiguration conf = new ElasticsearchConfiguration(uri, parameters); - assertDefaultConfigurationParameters(conf); - assertEquals(1, conf.getTransportAddressesList().size()); - assertEquals("127.0.0.1", conf.getTransportAddressesList().get(0).address().getHostString()); - assertEquals(9300, conf.getTransportAddressesList().get(0).address().getPort()); - } - - @Test - public void transportAddressesMultipleHostnameTest() throws Exception { - URI uri = new URI("elasticsearch://local?operation=INDEX&indexName=twitter&" - + "indexType=tweet&transportAddresses=127.0.0.1,127.0.0.2"); - Map<String, Object> parameters = URISupport.parseParameters(uri); - ElasticsearchConfiguration conf = new ElasticsearchConfiguration(uri, parameters); - assertDefaultConfigurationParameters(conf); - assertEquals(2, conf.getTransportAddressesList().size()); - assertEquals("127.0.0.1", conf.getTransportAddressesList().get(0).address().getHostString()); - assertEquals(9300, conf.getTransportAddressesList().get(0).address().getPort()); - assertEquals("127.0.0.2", conf.getTransportAddressesList().get(1).address().getHostString()); - assertEquals(9300, conf.getTransportAddressesList().get(1).address().getPort()); - } - - @Test - public void transportAddressesSimpleHostnameAndPortTest() throws Exception { - URI uri = new URI("elasticsearch://local?operation=INDEX&indexName=twitter&" - + "indexType=tweet&transportAddresses=127.0.0.1:9305"); - Map<String, Object> parameters = URISupport.parseParameters(uri); - ElasticsearchConfiguration conf = new ElasticsearchConfiguration(uri, parameters); - assertDefaultConfigurationParameters(conf); - assertEquals(1, conf.getTransportAddressesList().size()); - assertEquals("127.0.0.1", conf.getTransportAddressesList().get(0).address().getHostString()); - assertEquals(9305, conf.getTransportAddressesList().get(0).address().getPort()); - } - - @Test - public void transportAddressesMultipleHostnameAndPortTest() throws Exception { - URI uri = new URI("elasticsearch://local?operation=INDEX&indexName=twitter&" - + "indexType=tweet&transportAddresses=127.0.0.1:9400,127.0.0.2,127.0.0.3:9401"); - Map<String, Object> parameters = URISupport.parseParameters(uri); - ElasticsearchConfiguration conf = new ElasticsearchConfiguration(uri, parameters); - assertDefaultConfigurationParameters(conf); - assertEquals(3, conf.getTransportAddressesList().size()); - assertEquals("127.0.0.1", conf.getTransportAddressesList().get(0).address().getHostString()); - assertEquals(9400, conf.getTransportAddressesList().get(0).address().getPort()); - assertEquals("127.0.0.2", conf.getTransportAddressesList().get(1).address().getHostString()); - assertEquals(9300, conf.getTransportAddressesList().get(1).address().getPort()); - assertEquals("127.0.0.3", conf.getTransportAddressesList().get(2).address().getHostString()); - assertEquals(9401, conf.getTransportAddressesList().get(2).address().getPort()); - } - - private void assertDefaultConfigurationParameters(ElasticsearchConfiguration conf) { - assertTrue(conf.isLocal()); - assertEquals("INDEX", conf.getOperation()); - assertEquals("twitter", conf.getIndexName()); - assertEquals("tweet", conf.getIndexType()); - assertNull(conf.getClusterName()); - } - -} http://git-wip-us.apache.org/repos/asf/camel/blob/461e15f9/components/camel-elasticsearch/src/test/java/org/apache/camel/component/elasticsearch/SpringElasticsearchTest.java ---------------------------------------------------------------------- diff --git a/components/camel-elasticsearch/src/test/java/org/apache/camel/component/elasticsearch/SpringElasticsearchTest.java b/components/camel-elasticsearch/src/test/java/org/apache/camel/component/elasticsearch/SpringElasticsearchTest.java index 17ee993..506175e 100644 --- a/components/camel-elasticsearch/src/test/java/org/apache/camel/component/elasticsearch/SpringElasticsearchTest.java +++ b/components/camel-elasticsearch/src/test/java/org/apache/camel/component/elasticsearch/SpringElasticsearchTest.java @@ -69,9 +69,9 @@ public class SpringElasticsearchTest extends CamelSpringTestSupport { body.put("content", "test"); Map<String, Object> headers = new HashMap<String, Object>(); - headers.put(ElasticsearchConfiguration.PARAM_OPERATION, ElasticsearchConfiguration.OPERATION_INDEX); - headers.put(ElasticsearchConfiguration.PARAM_INDEX_NAME, "twitter"); - headers.put(ElasticsearchConfiguration.PARAM_INDEX_TYPE, "tweet"); + headers.put(ElasticsearchConstants.PARAM_OPERATION, ElasticsearchConstants.OPERATION_INDEX); + headers.put(ElasticsearchConstants.PARAM_INDEX_NAME, "twitter"); + headers.put(ElasticsearchConstants.PARAM_INDEX_TYPE, "tweet"); producer.sendBodyAndHeaders(body, headers);