CAMEL-9648: Create new IronMQ component
Project: http://git-wip-us.apache.org/repos/asf/camel/repo Commit: http://git-wip-us.apache.org/repos/asf/camel/commit/c3cfb652 Tree: http://git-wip-us.apache.org/repos/asf/camel/tree/c3cfb652 Diff: http://git-wip-us.apache.org/repos/asf/camel/diff/c3cfb652 Branch: refs/heads/master Commit: c3cfb6521706118472cdd6d07763aeb1d9490bad Parents: 3f6aa3e Author: Preben Asmussen <[email protected]> Authored: Sat Feb 27 14:39:33 2016 +0100 Committer: Claus Ibsen <[email protected]> Committed: Sat Feb 27 17:10:42 2016 +0100 ---------------------------------------------------------------------- components/camel-ironmq/.gitignore | 4 + components/camel-ironmq/pom.xml | 67 +++++++ .../camel-ironmq/src/main/docs/ironmq.adoc | 149 ++++++++++++++ .../apache/camel/component/ironmq/GsonUtil.java | 69 +++++++ .../camel/component/ironmq/IronMQComponent.java | 58 ++++++ .../component/ironmq/IronMQConfiguration.java | 199 +++++++++++++++++++ .../camel/component/ironmq/IronMQConstants.java | 27 +++ .../camel/component/ironmq/IronMQConsumer.java | 164 +++++++++++++++ .../camel/component/ironmq/IronMQEndpoint.java | 137 +++++++++++++ .../camel/component/ironmq/IronMQProducer.java | 80 ++++++++ .../services/org/apache/camel/component/ironmq | 18 ++ components/camel-ironmq/src/test/data/test.txt | 1 + .../component/ironmq/FromQueueToQueueTest.java | 98 +++++++++ .../ironmq/IronMQBatchConsumerTest.java | 88 ++++++++ .../ironmq/IronMQBatchDeleteConsumerTest.java | 89 +++++++++ .../ironmq/IronMQBatchProducerTest.java | 79 ++++++++ .../component/ironmq/IronMQClientMock.java | 44 ++++ .../IronMQComponentConfigurationTest.java | 96 +++++++++ .../ironmq/IronMQComponentSpringTest.java | 79 ++++++++ .../component/ironmq/IronMQComponentTest.java | 117 +++++++++++ .../ironmq/IronMQPreserveHeadersTest.java | 67 +++++++ .../camel/component/ironmq/MockQueue.java | 141 +++++++++++++ .../ConcurrentConsumerLoadTest.java | 100 ++++++++++ .../ironmq/integrationtest/FileCopyExample.java | 59 ++++++ .../integrationtest/IronMQComponentTest.java | 61 ++++++ .../ironmq/integrationtest/IronMQFIFOTest.java | 74 +++++++ .../IronMQRackspaceComponentTest.java | 61 ++++++ .../ironmq/integrationtest/LoadTest.java | 89 +++++++++ .../integrationtest/Queue2QueueExample.java | 66 ++++++ .../src/test/resources/log4j.properties | 41 ++++ .../IronMQComponentSpringTest-context.xml | 39 ++++ components/pom.xml | 1 + parent/pom.xml | 1 + 33 files changed, 2463 insertions(+) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/camel/blob/c3cfb652/components/camel-ironmq/.gitignore ---------------------------------------------------------------------- diff --git a/components/camel-ironmq/.gitignore b/components/camel-ironmq/.gitignore new file mode 100644 index 0000000..c708c36 --- /dev/null +++ b/components/camel-ironmq/.gitignore @@ -0,0 +1,4 @@ +/target +/.settings +/.classpath +/.project http://git-wip-us.apache.org/repos/asf/camel/blob/c3cfb652/components/camel-ironmq/pom.xml ---------------------------------------------------------------------- diff --git a/components/camel-ironmq/pom.xml b/components/camel-ironmq/pom.xml new file mode 100644 index 0000000..e525394 --- /dev/null +++ b/components/camel-ironmq/pom.xml @@ -0,0 +1,67 @@ +<?xml version="1.0" encoding="UTF-8"?> +<!-- + Licensed to the Apache Software Foundation (ASF) under one or more contributor + license agreements. See the NOTICE file distributed with this work for additional + information regarding copyright ownership. The ASF licenses this file to + You under the Apache License, Version 2.0 (the "License"); you may not use + this file except in compliance with the License. You may obtain a copy of + the License at http://www.apache.org/licenses/LICENSE-2.0 Unless required + by applicable law or agreed to in writing, software distributed under the + License is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS + OF ANY KIND, either express or implied. See the License for the specific + language governing permissions and limitations under the License. +--> +<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" + xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/maven-v4_0_0.xsd"> + <modelVersion>4.0.0</modelVersion> + <parent> + <groupId>org.apache.camel</groupId> + <artifactId>components</artifactId> + <version>2.17-SNAPSHOT</version> + </parent> + + <artifactId>camel-ironmq</artifactId> + <packaging>bundle</packaging> + + <name>Camel :: IronMQ</name> + <description>Camel IronMQ component</description> + + <properties> + <camel.osgi.export.pkg>org.apache.camel.component.ironmq.*</camel.osgi.export.pkg> + <camel.osgi.export.service>org.apache.camel.spi.ComponentResolver;component=ironmq</camel.osgi.export.service> + </properties> + + <dependencies> + <dependency> + <groupId>org.apache.camel</groupId> + <artifactId>camel-core</artifactId> + </dependency> + <dependency> + <groupId>io.iron.ironmq</groupId> + <artifactId>ironmq</artifactId> + <version>${ironmq-version}</version> + </dependency> + <dependency> + <groupId>com.google.code.gson</groupId> + <artifactId>gson</artifactId> + <version>${gson-version}</version> + </dependency> + <dependency> + <groupId>org.apache.commons</groupId> + <artifactId>commons-lang3</artifactId> + <version>${commons-lang3-version}</version> + </dependency> + <!-- logging --> + <dependency> + <groupId>org.slf4j</groupId> + <artifactId>slf4j-log4j12</artifactId> + <scope>test</scope> + </dependency> + <!-- testing --> + <dependency> + <groupId>org.apache.camel</groupId> + <artifactId>camel-test-spring</artifactId> + <scope>test</scope> + </dependency> + </dependencies> +</project> http://git-wip-us.apache.org/repos/asf/camel/blob/c3cfb652/components/camel-ironmq/src/main/docs/ironmq.adoc ---------------------------------------------------------------------- diff --git a/components/camel-ironmq/src/main/docs/ironmq.adoc b/components/camel-ironmq/src/main/docs/ironmq.adoc new file mode 100644 index 0000000..b32489c --- /dev/null +++ b/components/camel-ironmq/src/main/docs/ironmq.adoc @@ -0,0 +1,149 @@ +[[IronMQ-IronMQComponent]] +IronMQ Component +~~~~~~~~~~~~~~~~ + +*Available as of Camel 2.17* + +The IronMQ component provides integration with http://www.iron.io/products/mq[IronMQ] an elastic and durable hosted message queue as a service. + +The component uses the +https://github.com/iron-io/iron_mq_java[IronMQ java client] +library. + +To run it requires a IronMQ account, and a project id and token. + +Maven users will need to add the following dependency to their `pom.xml` +for this component: + +[source,java] +------------------------------------------------------------ +<dependency> + <groupId>org.apache.camel</groupId> + <artifactId>camel-ironmq</artifactId> + <version>x.x.x</version> + <!-- use the same version as your Camel core version --> +</dependency> +------------------------------------------------------------ + +[[IronMQ-URIformat]] +URI format +^^^^^^^^^^ + +[source,java] +----------------------------- + ironmq:queueName[?options] +----------------------------- +Where **queueName** identifies the IronMQ queue you want to publish or consume messages from. + +[[IronMQ-Options]] +Options +^^^^^^^ + + +// endpoint options: START +The ironmq component supports 31 endpoint options which are listed below: + +[width="100%",cols="2s,1,1m,1m,5",options="header"] +|======================================================================= +| Name | Group | Default | Java Type | Description +| queueName | common | | String | *Required* The name of the IronMQ queue +| client | common | | Client | Reference to a io.iron.ironmq.Client in the Registry. +| ironMQCloud | common | https://mq-aws-us-east-1.iron.io | String | IronMq Cloud url. See http://dev.iron.io/mq/reference/clouds/ for valid options +| preserveHeaders | common | false | boolean | Should camel message headers be preserved when publishing messages +| projectId | common | | String | IronMq projectId +| token | common | | String | IronMq token +| batchDelete | consumer | false | boolean | Shold messages be deleted in one batch or one at the time +| bridgeErrorHandler | consumer | false | boolean | Allows for bridging the consumer to the Camel routing Error Handler which mean any exceptions occurred while the consumer is trying to pickup incoming messages or the likes will now be processed as a message and handled by the routing Error Handler. By default the consumer will use the org.apache.camel.spi.ExceptionHandler to deal with exceptions that will be logged at WARN/ERROR level and ignored. +| concurrentConsumers | consumer | 1 | Integer | The number of concurrent consumers. +| maxMessagesPerPoll | consumer | 1 | int | Number of messages to poll pr. call +| sendEmptyMessageWhenIdle | consumer | false | boolean | If the polling consumer did not poll any files you can enable this option to send an empty message (no body) instead. +| timeout | consumer | 60 | int | sets the timeout +| wait | consumer | | int | Sets the wait +| exceptionHandler | consumer (advanced) | | ExceptionHandler | To let the consumer use a custom ExceptionHandler. Notice if the option bridgeErrorHandler is enabled then this options is not in use. By default the consumer will deal with exceptions that will be logged at WARN/ERROR level and ignored. +| pollStrategy | consumer (advanced) | | PollingConsumerPollStrategy | A pluggable org.apache.camel.PollingConsumerPollingStrategy allowing you to provide your custom implementation to control error handling usually occurred during the poll operation before an Exchange have been created and being routed in Camel. +| visibilityDelay | producer | | int | Set's the visibility delay in seconds. +| exchangePattern | advanced | InOnly | ExchangePattern | Sets the default exchange pattern when creating an exchange +| synchronous | advanced | false | boolean | Sets whether synchronous processing should be strictly used or Camel is allowed to use asynchronous processing (if supported). +| backoffErrorThreshold | scheduler | | int | The number of subsequent error polls (failed due some error) that should happen before the backoffMultipler should kick-in. +| backoffIdleThreshold | scheduler | | int | The number of subsequent idle polls that should happen before the backoffMultipler should kick-in. +| backoffMultiplier | scheduler | | int | To let the scheduled polling consumer backoff if there has been a number of subsequent idles/errors in a row. The multiplier is then the number of polls that will be skipped before the next actual attempt is happening again. When this option is in use then backoffIdleThreshold and/or backoffErrorThreshold must also be configured. +| delay | scheduler | 500 | long | Milliseconds before the next poll. +| greedy | scheduler | false | boolean | If greedy is enabled then the ScheduledPollConsumer will run immediately again if the previous run polled 1 or more messages. +| initialDelay | scheduler | 1000 | long | Milliseconds before the first poll starts. +| runLoggingLevel | scheduler | TRACE | LoggingLevel | The consumer logs a start/complete log line when it polls. This option allows you to configure the logging level for that. +| scheduledExecutorService | scheduler | | ScheduledExecutorService | Allows for configuring a custom/shared thread pool to use for the consumer. By default each consumer has its own single threaded thread pool. +| scheduler | scheduler | none | ScheduledPollConsumerScheduler | To use a cron scheduler from either camel-spring or camel-quartz2 component +| schedulerProperties | scheduler | | Map | To configure additional properties when using a custom scheduler or any of the Quartz2 Spring based scheduler. +| startScheduler | scheduler | true | boolean | Whether the scheduler should be auto started. +| timeUnit | scheduler | MILLISECONDS | TimeUnit | Time unit for initialDelay and delay options. +| useFixedDelay | scheduler | true | boolean | Controls if fixed delay or fixed rate is used. See ScheduledExecutorService in JDK for details. +|======================================================================= +// endpoint options: END + +[[IronMQ-IronMQComponentOptions]] +IronMQComponent Options +^^^^^^^^^^^^^^^^^^^^^^^ + + + +// component options: START +The ironmq component has no options. + + +[width="100%",cols="2s,1m,8",options="header"] +|======================================================================= +| Name | Java Type | Description +|======================================================================= +// component options: END + + + + +[[IronMQ-Messagebody]] +Message Body +^^^^^^^^^^^^ +Should be either a String or a array of Strings. In the latter case the batch of strings will be send to IronMQ as one request, creating one message pr. element in the array. + +[[IronMQ-MessageHeaders]] +Producer message headers +^^^^^^^^^^^^^^^^^^^^^^^^ + +[width="100%",cols="10%,10%,80%",options="header",] +|======================================================================= +|Header |Type | Description +|CamelIronMQOperation |String|If value set to 'CamelIronMQClearQueue' the queue is cleared of unconsumed messages. +|CamelIronMQMessageId |String or io.iron.ironmq.Ids|The id of the IronMQ message as a String when sending a single message, or a Ids object when sending a array of strings. +|======================================================================= + +Consumer message headers +^^^^^^^^^^^^^^^^^^^^^^^^ + +[width="100%",cols="10%,10%,80%",options="header",] +|======================================================================= +|Header |Type | Description +|CamelIronMQMessageId |String|The id of the message. +|CamelIronMQReservationId|String|The reservation id of the message. +|CamelIronMQReservedCount|String|The number of times this message has been reserved. +|======================================================================= + + +Consumer example +^^^^^^^^^^^^^^^^ + +Consume 50 messages pr. poll from the queue 'testqueue' on aws eu, and save the messages to files. + +[source,java] +-------------------------------------------------- +from("ironmq:testqueue?ironMQCloud=mq-aws-eu-west-1.iron.io&projectId=myIronMQProjectid&token=myIronMQToken&maxMessagesPerPoll=50") + .to("file:somefolder); +-------------------------------------------------- + +Producer example +^^^^^^^^^^^^^^^^ +Dequeue from activemq jms and enqueue the messages on IronMQ. + +[source,java] +-------------------------------------------------- +from("activemq:foo) + .to("ironmq:testqueue?projectId=myIronMQProjectid&token=myIronMQToken"); +-------------------------------------------------- http://git-wip-us.apache.org/repos/asf/camel/blob/c3cfb652/components/camel-ironmq/src/main/java/org/apache/camel/component/ironmq/GsonUtil.java ---------------------------------------------------------------------- diff --git a/components/camel-ironmq/src/main/java/org/apache/camel/component/ironmq/GsonUtil.java b/components/camel-ironmq/src/main/java/org/apache/camel/component/ironmq/GsonUtil.java new file mode 100644 index 0000000..47dab61 --- /dev/null +++ b/components/camel-ironmq/src/main/java/org/apache/camel/component/ironmq/GsonUtil.java @@ -0,0 +1,69 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.camel.component.ironmq; + +import java.util.HashMap; +import java.util.Map; + +import com.google.gson.Gson; + +import org.apache.camel.Message; + +public final class GsonUtil { + private static final Gson GSON = new Gson(); + + private GsonUtil() { + } + + static class IronMqMessage { + private Map<String, Object> headers = new HashMap<String, Object>(); + private String body; + + public IronMqMessage(String body, Map<String, Object> headers) { + super(); + this.headers = headers; + this.body = body; + } + + public Map<String, Object> getHeaders() { + return headers; + } + + public String getBody() { + return body; + } + } + + static String getBodyFromMessage(Message message) { + IronMqMessage ironMessage = new IronMqMessage(message.getBody(String.class), message.getHeaders()); + return GSON.toJson(ironMessage); + } + + static void copyFrom(io.iron.ironmq.Message source, Message target) { + IronMqMessage ironMqMessage = GSON.fromJson(source.getBody(), IronMqMessage.class); + target.setBody(ironMqMessage.getBody()); + if (ironMqMessage.getHeaders() != null) { + for (Map.Entry<String, Object> entry : ironMqMessage.getHeaders().entrySet()) { + String key = entry.getKey(); + Object value = entry.getValue(); + if (target.getHeader(key) == null) { + target.setHeader(key, value); + } + } + } + } +} http://git-wip-us.apache.org/repos/asf/camel/blob/c3cfb652/components/camel-ironmq/src/main/java/org/apache/camel/component/ironmq/IronMQComponent.java ---------------------------------------------------------------------- diff --git a/components/camel-ironmq/src/main/java/org/apache/camel/component/ironmq/IronMQComponent.java b/components/camel-ironmq/src/main/java/org/apache/camel/component/ironmq/IronMQComponent.java new file mode 100644 index 0000000..4f74dbe --- /dev/null +++ b/components/camel-ironmq/src/main/java/org/apache/camel/component/ironmq/IronMQComponent.java @@ -0,0 +1,58 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.camel.component.ironmq; + +import java.util.Map; + +import org.apache.camel.CamelContext; +import org.apache.camel.Endpoint; +import org.apache.camel.impl.ScheduledPollEndpoint; +import org.apache.camel.impl.UriEndpointComponent; + +/** + * Represents the component that manages {@link IronMQEndpoint}. + */ +public class IronMQComponent extends UriEndpointComponent { + + public IronMQComponent(CamelContext context) { + super(context, IronMQEndpoint.class); + } + + public IronMQComponent() { + super(IronMQEndpoint.class); + } + + @Override + protected Endpoint createEndpoint(String uri, String remaining, Map<String, Object> parameters) throws Exception { + IronMQConfiguration ironMQConfiguration = new IronMQConfiguration(); + setProperties(ironMQConfiguration, parameters); + if (remaining == null || remaining.trim().length() == 0) { + throw new IllegalArgumentException("Queue name must be specified."); + } + + ironMQConfiguration.setQueueName(remaining); + + if (ironMQConfiguration.getClient() == null && (ironMQConfiguration.getProjectId() == null || ironMQConfiguration.getToken() == null)) { + throw new IllegalArgumentException("Client or project and token must be specified."); + } + + Endpoint endpoint = new IronMQEndpoint(uri, this, ironMQConfiguration); + ((ScheduledPollEndpoint)endpoint).setConsumerProperties(parameters); + + return endpoint; + } +} http://git-wip-us.apache.org/repos/asf/camel/blob/c3cfb652/components/camel-ironmq/src/main/java/org/apache/camel/component/ironmq/IronMQConfiguration.java ---------------------------------------------------------------------- diff --git a/components/camel-ironmq/src/main/java/org/apache/camel/component/ironmq/IronMQConfiguration.java b/components/camel-ironmq/src/main/java/org/apache/camel/component/ironmq/IronMQConfiguration.java new file mode 100644 index 0000000..7bd160b --- /dev/null +++ b/components/camel-ironmq/src/main/java/org/apache/camel/component/ironmq/IronMQConfiguration.java @@ -0,0 +1,199 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.camel.component.ironmq; + +import io.iron.ironmq.Client; + +import org.apache.camel.spi.Metadata; +import org.apache.camel.spi.UriParam; +import org.apache.camel.spi.UriParams; +import org.apache.camel.spi.UriPath; + +@UriParams +public class IronMQConfiguration { + // common properties + + @UriParam + private String projectId; + + @UriParam + private String token; + + @UriPath @Metadata(required = "true") + private String queueName; + + @UriParam(defaultValue = "https://mq-aws-us-east-1.iron.io") + private String ironMQCloud = "https://mq-aws-us-east-1.iron.io"; + + @UriParam + private boolean preserveHeaders; + + @UriParam + private Client client; + + // producer properties + @UriParam(label = "producer") + private int visibilityDelay; + + // consumer properties + @UriParam(defaultValue = "1", label = "consumer") + private Integer concurrentConsumers = 1; + + @UriParam(label = "consumer") + private boolean batchDelete; + + @UriParam(defaultValue = "1", label = "consumer") + private int maxMessagesPerPoll = 1; + + @UriParam(defaultValue = "60", label = "consumer") + private int timeout = 60; + + @UriParam(label = "consumer") + private int wait; + + public Client getClient() { + return client; + } + + /** + * Reference to a io.iron.ironmq.Client in the Registry. + */ + public void setClient(Client client) { + this.client = client; + } + + public Integer getConcurrentConsumers() { + return concurrentConsumers; + } + + /** + * The number of concurrent consumers. + */ + public void setConcurrentConsumers(Integer concurrentConsumers) { + this.concurrentConsumers = concurrentConsumers; + } + + public String getProjectId() { + return projectId; + } + + /** + * IronMq projectId + */ + public void setProjectId(String projectId) { + this.projectId = projectId; + } + + public String getToken() { + return token; + } + + /** + * IronMq token + */ + public void setToken(String token) { + this.token = token; + } + + /** + * The name of the IronMQ queue + */ + public void setQueueName(String queueName) { + this.queueName = queueName; + } + + public String getQueueName() { + return queueName; + } + + /** + * IronMq Cloud url. See http://dev.iron.io/mq/reference/clouds/ for valid options + */ + public void setIronMQCloud(String ironMQCloud) { + this.ironMQCloud = ironMQCloud; + } + + public String getIronMQCloud() { + return ironMQCloud; + } + + public int getTimeout() { + return timeout; + } + + /** + * sets the timeout + */ + public void setTimeout(int timeout) { + this.timeout = timeout; + } + + public int getMaxMessagesPerPoll() { + return maxMessagesPerPoll; + } + + /** + * Number of messages to poll pr. call + */ + public void setMaxMessagesPerPoll(int maxMessagesPerPoll) { + this.maxMessagesPerPoll = maxMessagesPerPoll; + } + + public int getVisibilityDelay() { + return visibilityDelay; + } + + /** + * Set's the visibility delay in seconds. + */ + public void setVisibilityDelay(int visibilityDelay) { + this.visibilityDelay = visibilityDelay; + } + + public boolean isPreserveHeaders() { + return preserveHeaders; + } + + /** + * Should camel message headers be preserved when publishing messages + */ + public void setPreserveHeaders(boolean preserveHeaders) { + this.preserveHeaders = preserveHeaders; + } + + public boolean isBatchDelete() { + return batchDelete; + } + + /** + * Shold messages be deleted in one batch or one at the time + */ + public void setBatchDelete(boolean batchDelete) { + this.batchDelete = batchDelete; + } + + public int getWait() { + return wait; + } + + /** + * Sets the wait + */ + public void setWait(int wait) { + this.wait = wait; + } +} http://git-wip-us.apache.org/repos/asf/camel/blob/c3cfb652/components/camel-ironmq/src/main/java/org/apache/camel/component/ironmq/IronMQConstants.java ---------------------------------------------------------------------- diff --git a/components/camel-ironmq/src/main/java/org/apache/camel/component/ironmq/IronMQConstants.java b/components/camel-ironmq/src/main/java/org/apache/camel/component/ironmq/IronMQConstants.java new file mode 100644 index 0000000..e600f74 --- /dev/null +++ b/components/camel-ironmq/src/main/java/org/apache/camel/component/ironmq/IronMQConstants.java @@ -0,0 +1,27 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.camel.component.ironmq; + +public interface IronMQConstants { + + String MESSAGE_ID = "CamelIronMQMessageId"; + String MESSAGE_RESERVATION_ID = "CamelIronMQReservationId"; + String MESSAGE_RESERVED_COUNT = "CamelIronMQReservedCount"; + String OPERATION = "CamelIronMQOperation"; + String CLEARQUEUE = "CamelIronMQClearQueue"; + +} http://git-wip-us.apache.org/repos/asf/camel/blob/c3cfb652/components/camel-ironmq/src/main/java/org/apache/camel/component/ironmq/IronMQConsumer.java ---------------------------------------------------------------------- diff --git a/components/camel-ironmq/src/main/java/org/apache/camel/component/ironmq/IronMQConsumer.java b/components/camel-ironmq/src/main/java/org/apache/camel/component/ironmq/IronMQConsumer.java new file mode 100644 index 0000000..356545b --- /dev/null +++ b/components/camel-ironmq/src/main/java/org/apache/camel/component/ironmq/IronMQConsumer.java @@ -0,0 +1,164 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.camel.component.ironmq; + +import java.util.LinkedList; +import java.util.Queue; + +import io.iron.ironmq.EmptyQueueException; +import io.iron.ironmq.Message; +import io.iron.ironmq.Messages; + +import org.apache.camel.Endpoint; +import org.apache.camel.Exchange; +import org.apache.camel.Processor; +import org.apache.camel.impl.ScheduledBatchPollingConsumer; +import org.apache.camel.spi.Synchronization; +import org.apache.camel.util.CastUtils; +import org.apache.camel.util.ExchangeHelper; +import org.apache.camel.util.ObjectHelper; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +/** + * The IronMQ consumer. + */ +public class IronMQConsumer extends ScheduledBatchPollingConsumer { + private static final Logger LOG = LoggerFactory.getLogger(IronMQConsumer.class); + + private final io.iron.ironmq.Queue ironQueue; + + public IronMQConsumer(Endpoint endpoint, Processor processor, io.iron.ironmq.Queue ironQueue) { + super(endpoint, processor); + this.ironQueue = ironQueue; + } + + @Override + protected int poll() throws Exception { + // must reset for each poll + shutdownRunningTask = null; + pendingExchanges = 0; + try { + Messages messages = null; + LOG.trace("Receiving messages with request [messagePerPoll {}, timeout {}]...", getMaxMessagesPerPoll(), getEndpoint().getConfiguration().getTimeout()); + messages = this.ironQueue.reserve(getMaxMessagesPerPoll(), getEndpoint().getConfiguration().getTimeout(), getEndpoint().getConfiguration().getWait()); + LOG.trace("Received {} messages", messages.getSize()); + + Queue<Exchange> exchanges = createExchanges(messages.getMessages()); + int noProcessed = processBatch(CastUtils.cast(exchanges)); + // delete all processed messages in one batch; + if (getEndpoint().getConfiguration().isBatchDelete()) { + LOG.trace("Batch deleting {} messages", messages.getSize()); + this.ironQueue.deleteMessages(messages); + } + return noProcessed; + } catch (EmptyQueueException e) { + return 0; + } + } + + protected Queue<Exchange> createExchanges(Message[] messages) { + LOG.trace("Received {} messages in this poll", messages.length); + + Queue<Exchange> answer = new LinkedList<Exchange>(); + for (int i = 0; i < messages.length; i++) { + Exchange exchange = getEndpoint().createExchange(messages[i]); + answer.add(exchange); + } + return answer; + } + + @Override + public int processBatch(Queue<Object> exchanges) throws Exception { + int total = exchanges.size(); + + for (int index = 0; index < total && isBatchAllowed(); index++) { + // only loop if we are started (allowed to run) + final Exchange exchange = ObjectHelper.cast(Exchange.class, exchanges.poll()); + // add current index and total as properties + exchange.setProperty(Exchange.BATCH_INDEX, index); + exchange.setProperty(Exchange.BATCH_SIZE, total); + exchange.setProperty(Exchange.BATCH_COMPLETE, index == total - 1); + + // update pending number of exchanges + pendingExchanges = total - index - 1; + + // add on completion to handle after work when the exchange is done + // if batchDelete is not enabled + if (!getEndpoint().getConfiguration().isBatchDelete()) { + exchange.addOnCompletion(new Synchronization() { + final String reservationId = ExchangeHelper.getMandatoryHeader(exchange, IronMQConstants.MESSAGE_RESERVATION_ID, String.class); + final String messageid = ExchangeHelper.getMandatoryHeader(exchange, IronMQConstants.MESSAGE_ID, String.class); + + public void onComplete(Exchange exchange) { + processCommit(exchange, messageid, reservationId); + } + + public void onFailure(Exchange exchange) { + processRollback(exchange); + } + + @Override + public String toString() { + return "IronMQConsumerOnCompletion"; + } + }); + } + + LOG.trace("Processing exchange [{}]...", exchange); + + getProcessor().process(exchange); + } + + return total; + } + + /** + * Strategy to delete the message after being processed. + * + * @param exchange the exchange + */ + protected void processCommit(Exchange exchange, String messageid, String reservationId) { + try { + LOG.trace("Deleting message with messageId {} and reservationId {}...", messageid, reservationId); + this.ironQueue.deleteMessage(messageid, reservationId); + LOG.trace("Message deleted"); + } catch (Exception e) { + getExceptionHandler().handleException("Error occurred during delete of message. This exception is ignored.", exchange, e); + } + } + + /** + * Strategy when processing the exchange failed. + * + * @param exchange the exchange + */ + protected void processRollback(Exchange exchange) { + Exception cause = exchange.getException(); + if (cause != null) { + LOG.warn("Exchange failed, so rolling back message status: " + exchange, cause); + } else { + LOG.warn("Exchange failed, so rolling back message status: {}", exchange); + } + } + + @Override + public IronMQEndpoint getEndpoint() { + return (IronMQEndpoint)super.getEndpoint(); + } + +} http://git-wip-us.apache.org/repos/asf/camel/blob/c3cfb652/components/camel-ironmq/src/main/java/org/apache/camel/component/ironmq/IronMQEndpoint.java ---------------------------------------------------------------------- diff --git a/components/camel-ironmq/src/main/java/org/apache/camel/component/ironmq/IronMQEndpoint.java b/components/camel-ironmq/src/main/java/org/apache/camel/component/ironmq/IronMQEndpoint.java new file mode 100644 index 0000000..5742a98 --- /dev/null +++ b/components/camel-ironmq/src/main/java/org/apache/camel/component/ironmq/IronMQEndpoint.java @@ -0,0 +1,137 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.camel.component.ironmq; + +import java.net.MalformedURLException; + +import io.iron.ironmq.Client; +import io.iron.ironmq.Cloud; + +import org.apache.camel.Consumer; +import org.apache.camel.Exchange; +import org.apache.camel.ExchangePattern; +import org.apache.camel.Message; +import org.apache.camel.Processor; +import org.apache.camel.Producer; +import org.apache.camel.impl.DefaultExchange; +import org.apache.camel.impl.DefaultScheduledPollConsumerScheduler; +import org.apache.camel.impl.ScheduledPollEndpoint; +import org.apache.camel.spi.UriEndpoint; +import org.apache.camel.spi.UriParam; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +/** + * Represents a IronMQ endpoint. + */ +@UriEndpoint(scheme = "ironmq", syntax = "ironmq:queue", title = "ironmq", consumerClass = IronMQConsumer.class, label = "cloud,messaging") +public class IronMQEndpoint extends ScheduledPollEndpoint { + private static final Logger LOG = LoggerFactory.getLogger(IronMQEndpoint.class); + + @UriParam + private IronMQConfiguration configuration; + + private Client client; + + public IronMQEndpoint(String uri, IronMQComponent component, IronMQConfiguration ironMQConfiguration) { + super(uri, component); + this.configuration = ironMQConfiguration; + } + + public Producer createProducer() throws Exception { + return new IronMQProducer(this, getClient().queue(configuration.getQueueName())); + } + + public Consumer createConsumer(Processor processor) throws Exception { + IronMQConsumer ironMQConsumer = new IronMQConsumer(this, processor, getClient().queue(configuration.getQueueName())); + configureConsumer(ironMQConsumer); + ironMQConsumer.setMaxMessagesPerPoll(configuration.getMaxMessagesPerPoll()); + DefaultScheduledPollConsumerScheduler scheduler = new DefaultScheduledPollConsumerScheduler(); + scheduler.setConcurrentTasks(configuration.getConcurrentConsumers()); + ironMQConsumer.setScheduler(scheduler); + + return ironMQConsumer; + } + + public Exchange createExchange(io.iron.ironmq.Message msg) { + return createExchange(getExchangePattern(), msg); + } + + private Exchange createExchange(ExchangePattern pattern, io.iron.ironmq.Message msg) { + Exchange exchange = new DefaultExchange(this, pattern); + Message message = exchange.getIn(); + if (configuration.isPreserveHeaders()) { + GsonUtil.copyFrom(msg, message); + } else { + message.setBody(msg.getBody()); + } + message.setHeader(IronMQConstants.MESSAGE_ID, msg.getId()); + message.setHeader(IronMQConstants.MESSAGE_RESERVATION_ID, msg.getReservationId()); + message.setHeader(IronMQConstants.MESSAGE_RESERVED_COUNT, msg.getReservedCount()); + return exchange; + } + + public boolean isSingleton() { + return true; + } + + @Override + protected void doStart() throws Exception { + super.doStart(); + client = getConfiguration().getClient() != null ? getConfiguration().getClient() : getClient(); + } + + @Override + protected void doStop() throws Exception { + client = null; + super.doStop(); + } + + public Client getClient() { + if (client == null) { + client = createClient(); + } + return client; + } + + public void setClient(Client client) { + this.client = client; + } + + /** + * Provide the possibility to override this method for an mock + * implementation + * + * @return Client + */ + Client createClient() { + Cloud cloud; + try { + cloud = new Cloud(configuration.getIronMQCloud()); + } catch (MalformedURLException e) { + cloud = Cloud.ironAWSUSEast; + LOG.warn("Unable to parse ironMQCloud {} will use {}", configuration.getIronMQCloud(), cloud.getHost()); + } + client = new Client(configuration.getProjectId(), configuration.getToken(), cloud); + return client; + } + + public IronMQConfiguration getConfiguration() { + return configuration; + } + +} http://git-wip-us.apache.org/repos/asf/camel/blob/c3cfb652/components/camel-ironmq/src/main/java/org/apache/camel/component/ironmq/IronMQProducer.java ---------------------------------------------------------------------- diff --git a/components/camel-ironmq/src/main/java/org/apache/camel/component/ironmq/IronMQProducer.java b/components/camel-ironmq/src/main/java/org/apache/camel/component/ironmq/IronMQProducer.java new file mode 100644 index 0000000..b483fb2 --- /dev/null +++ b/components/camel-ironmq/src/main/java/org/apache/camel/component/ironmq/IronMQProducer.java @@ -0,0 +1,80 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.camel.component.ironmq; + +import io.iron.ironmq.Queue; + +import org.apache.camel.Exchange; +import org.apache.camel.InvalidPayloadException; +import org.apache.camel.Message; +import org.apache.camel.impl.DefaultProducer; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +/** + * The IronMQ producer. + */ +public class IronMQProducer extends DefaultProducer { + private static final Logger LOG = LoggerFactory.getLogger(IronMQProducer.class); + + private final Queue ironQueue; + + public IronMQProducer(IronMQEndpoint endpoint, Queue ironQueue) { + super(endpoint); + this.ironQueue = ironQueue; + } + + public void process(Exchange exchange) throws Exception { + IronMQConfiguration configuration = getEndpoint().getConfiguration(); + if (IronMQConstants.CLEARQUEUE.equals(exchange.getIn().getHeader(IronMQConstants.OPERATION, String.class))) { + this.ironQueue.clear(); + } else { + Object messageId = null; + Object body = exchange.getIn().getBody(); + if (body instanceof String[]) { + messageId = this.ironQueue.pushMessages((String[])body, configuration.getVisibilityDelay()); + } else if (body instanceof String) { + if (configuration.isPreserveHeaders()) { + body = GsonUtil.getBodyFromMessage(exchange.getIn()); + } + messageId = this.ironQueue.push((String)body, configuration.getVisibilityDelay()); + } else { + throw new InvalidPayloadException(exchange, String.class); + } + LOG.trace("Send request [{}] from exchange [{}]...", body, exchange); + LOG.trace("Received messageId [{}]", messageId); + Message message = getMessageForResponse(exchange); + message.setHeader(IronMQConstants.MESSAGE_ID, messageId); + } + } + + private Message getMessageForResponse(Exchange exchange) { + if (exchange.getPattern().isOutCapable()) { + Message out = exchange.getOut(); + out.copyFrom(exchange.getIn()); + return out; + } + + return exchange.getIn(); + } + + @Override + public IronMQEndpoint getEndpoint() { + return (IronMQEndpoint)super.getEndpoint(); + } + +} http://git-wip-us.apache.org/repos/asf/camel/blob/c3cfb652/components/camel-ironmq/src/main/resources/META-INF/services/org/apache/camel/component/ironmq ---------------------------------------------------------------------- diff --git a/components/camel-ironmq/src/main/resources/META-INF/services/org/apache/camel/component/ironmq b/components/camel-ironmq/src/main/resources/META-INF/services/org/apache/camel/component/ironmq new file mode 100644 index 0000000..8f703b2 --- /dev/null +++ b/components/camel-ironmq/src/main/resources/META-INF/services/org/apache/camel/component/ironmq @@ -0,0 +1,18 @@ +# +# Licensed to the Apache Software Foundation (ASF) under one or more +# contributor license agreements. See the NOTICE file distributed with +# this work for additional information regarding copyright ownership. +# The ASF licenses this file to You under the Apache License, Version 2.0 +# (the "License"); you may not use this file except in compliance with +# the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# + +class=org.apache.camel.component.ironmq.IronMQComponent http://git-wip-us.apache.org/repos/asf/camel/blob/c3cfb652/components/camel-ironmq/src/test/data/test.txt ---------------------------------------------------------------------- diff --git a/components/camel-ironmq/src/test/data/test.txt b/components/camel-ironmq/src/test/data/test.txt new file mode 100644 index 0000000..bac6b57 --- /dev/null +++ b/components/camel-ironmq/src/test/data/test.txt @@ -0,0 +1 @@ +Some random text \ No newline at end of file http://git-wip-us.apache.org/repos/asf/camel/blob/c3cfb652/components/camel-ironmq/src/test/java/org/apache/camel/component/ironmq/FromQueueToQueueTest.java ---------------------------------------------------------------------- diff --git a/components/camel-ironmq/src/test/java/org/apache/camel/component/ironmq/FromQueueToQueueTest.java b/components/camel-ironmq/src/test/java/org/apache/camel/component/ironmq/FromQueueToQueueTest.java new file mode 100644 index 0000000..acc2e97 --- /dev/null +++ b/components/camel-ironmq/src/test/java/org/apache/camel/component/ironmq/FromQueueToQueueTest.java @@ -0,0 +1,98 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.camel.component.ironmq; + +import java.io.IOException; + +import io.iron.ironmq.EmptyQueueException; + +import org.apache.camel.EndpointInject; +import org.apache.camel.Exchange; +import org.apache.camel.ExchangePattern; +import org.apache.camel.Processor; +import org.apache.camel.builder.RouteBuilder; +import org.apache.camel.component.mock.MockEndpoint; +import org.apache.camel.impl.JndiRegistry; +import org.apache.camel.test.junit4.CamelTestSupport; +import org.junit.Test; + +public class FromQueueToQueueTest extends CamelTestSupport { + + @EndpointInject(uri = "mock:result") + private MockEndpoint result; + + @EndpointInject(uri = "ironmq:testqueue?client=#ironMock") + private IronMQEndpoint queue1; + + @EndpointInject(uri = "ironmq:testqueue2?client=#ironMock") + private IronMQEndpoint queue2; + + @Test + public void shouldDeleteMessageFromQueue1() throws Exception { + + result.setExpectedMessageCount(1); + + template.send("direct:start", ExchangePattern.InOnly, new Processor() { + @Override + public void process(Exchange exchange) throws Exception { + exchange.getIn().setBody("This is my message text."); + } + }); + + assertMockEndpointsSatisfied(); + + try { + queue1.getClient().queue("testqueue").reserve(); + fail("Message was in the first queue!"); + } catch (IOException e) { + if (!(e instanceof EmptyQueueException)) { + // Unexpected exception. + throw e; + } + } + + try { + queue2.getClient().queue("testqueue1").reserve(); + fail("Message remained in second queue!"); + } catch (IOException e) { + if (!(e instanceof EmptyQueueException)) { + // Unexpected exception. + throw e; + } + } + } + + @Override + protected JndiRegistry createRegistry() throws Exception { + JndiRegistry registry = super.createRegistry(); + registry.bind("ironMock", new IronMQClientMock("dummy", "dummy")); + return registry; + } + + @Override + protected RouteBuilder createRouteBuilder() throws Exception { + return new RouteBuilder() { + @Override + public void configure() { + from("direct:start").to("ironmq:testqueue?client=#ironMock"); + from("ironmq:testqueue?client=#ironMock").to("ironmq:testqueue2?client=#ironMock"); + from("ironmq:testqueue2?client=#ironMock").to("mock:result"); + } + }; + } + +} http://git-wip-us.apache.org/repos/asf/camel/blob/c3cfb652/components/camel-ironmq/src/test/java/org/apache/camel/component/ironmq/IronMQBatchConsumerTest.java ---------------------------------------------------------------------- diff --git a/components/camel-ironmq/src/test/java/org/apache/camel/component/ironmq/IronMQBatchConsumerTest.java b/components/camel-ironmq/src/test/java/org/apache/camel/component/ironmq/IronMQBatchConsumerTest.java new file mode 100644 index 0000000..80222c6 --- /dev/null +++ b/components/camel-ironmq/src/test/java/org/apache/camel/component/ironmq/IronMQBatchConsumerTest.java @@ -0,0 +1,88 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.camel.component.ironmq; + +import java.util.HashMap; +import java.util.Map; + +import io.iron.ironmq.Message; + +import org.apache.camel.CamelContext; +import org.apache.camel.Exchange; +import org.apache.camel.builder.RouteBuilder; +import org.apache.camel.component.mock.MockEndpoint; +import org.apache.camel.test.junit4.CamelTestSupport; +import org.junit.Test; + + +public class IronMQBatchConsumerTest extends CamelTestSupport { + + private IronMQEndpoint endpoint; + + @Test + public void testConsumeBatchDelete() throws Exception { + for (int counter = 0; counter <= 5; counter++) { + Message message = new Message(); + message.setBody("{\"body\": \"Message " + counter + "\"}"); + message.setId("" + counter); + ((MockQueue)endpoint.getClient().queue("testqueue")).add(message); + } + + MockEndpoint mock = getMockEndpoint("mock:result"); + mock.expectedMessageCount(5); + assertMockEndpointsSatisfied(); + + mock.message(0).exchangeProperty(Exchange.BATCH_INDEX).isEqualTo(0); + mock.message(1).exchangeProperty(Exchange.BATCH_INDEX).isEqualTo(1); + mock.message(2).exchangeProperty(Exchange.BATCH_INDEX).isEqualTo(2); + mock.message(3).exchangeProperty(Exchange.BATCH_INDEX).isEqualTo(3); + mock.message(4).exchangeProperty(Exchange.BATCH_INDEX).isEqualTo(4); + mock.message(0).exchangeProperty(Exchange.BATCH_COMPLETE).isEqualTo(false); + mock.message(1).exchangeProperty(Exchange.BATCH_COMPLETE).isEqualTo(false); + mock.message(2).exchangeProperty(Exchange.BATCH_COMPLETE).isEqualTo(false); + mock.message(3).exchangeProperty(Exchange.BATCH_COMPLETE).isEqualTo(false); + mock.message(3).exchangeProperty(Exchange.BATCH_COMPLETE).isEqualTo(false); + mock.message(4).exchangeProperty(Exchange.BATCH_COMPLETE).isEqualTo(true); + mock.expectedPropertyReceived(Exchange.BATCH_SIZE, 5); + } + + @Override + protected CamelContext createCamelContext() throws Exception { + + CamelContext context = super.createCamelContext(); + IronMQComponent component = new IronMQComponent(context); + Map<String, Object> parameters = new HashMap<String, Object>(); + parameters.put("projectId", "dummy"); + parameters.put("token", "dummy"); + parameters.put("maxMessagesPerPoll", "5"); + parameters.put("batchDelete", "true"); + endpoint = (IronMQEndpoint)component.createEndpoint("ironmq", "testqueue", parameters); + endpoint.setClient(new IronMQClientMock("dummy", "dummy")); + context.addComponent("ironmq", component); + return context; + } + + @Override + protected RouteBuilder createRouteBuilder() throws Exception { + return new RouteBuilder() { + @Override + public void configure() { + from(endpoint).to("mock:result"); + } + }; + } +} http://git-wip-us.apache.org/repos/asf/camel/blob/c3cfb652/components/camel-ironmq/src/test/java/org/apache/camel/component/ironmq/IronMQBatchDeleteConsumerTest.java ---------------------------------------------------------------------- diff --git a/components/camel-ironmq/src/test/java/org/apache/camel/component/ironmq/IronMQBatchDeleteConsumerTest.java b/components/camel-ironmq/src/test/java/org/apache/camel/component/ironmq/IronMQBatchDeleteConsumerTest.java new file mode 100644 index 0000000..e8b8823 --- /dev/null +++ b/components/camel-ironmq/src/test/java/org/apache/camel/component/ironmq/IronMQBatchDeleteConsumerTest.java @@ -0,0 +1,89 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.camel.component.ironmq; + +import java.util.HashMap; +import java.util.Map; +import java.util.concurrent.TimeUnit; + +import io.iron.ironmq.Message; + +import org.apache.camel.CamelContext; +import org.apache.camel.Exchange; +import org.apache.camel.builder.RouteBuilder; +import org.apache.camel.component.mock.MockEndpoint; +import org.apache.camel.test.junit4.CamelTestSupport; +import org.junit.Test; + + +public class IronMQBatchDeleteConsumerTest extends CamelTestSupport { + + private IronMQEndpoint endpoint; + + @Test + public void testConsumeBatch() throws Exception { + for (int counter = 0; counter <= 5; counter++) { + Message message = new Message(); + message.setBody("{\"body\": \"Message " + counter + "\"}"); + message.setId("" + counter); + message.setReservationId("" + counter); + ((MockQueue)endpoint.getClient().queue("testqueue22")).add(message); + } + + MockEndpoint mock = getMockEndpoint("mock:result"); + mock.expectedMessageCount(5); + assertMockEndpointsSatisfied(30, TimeUnit.SECONDS); + + mock.message(0).exchangeProperty(Exchange.BATCH_INDEX).isEqualTo(0); + mock.message(1).exchangeProperty(Exchange.BATCH_INDEX).isEqualTo(1); + mock.message(2).exchangeProperty(Exchange.BATCH_INDEX).isEqualTo(2); + mock.message(3).exchangeProperty(Exchange.BATCH_INDEX).isEqualTo(3); + mock.message(4).exchangeProperty(Exchange.BATCH_INDEX).isEqualTo(4); + mock.message(0).exchangeProperty(Exchange.BATCH_COMPLETE).isEqualTo(false); + mock.message(1).exchangeProperty(Exchange.BATCH_COMPLETE).isEqualTo(false); + mock.message(2).exchangeProperty(Exchange.BATCH_COMPLETE).isEqualTo(false); + mock.message(3).exchangeProperty(Exchange.BATCH_COMPLETE).isEqualTo(false); + mock.message(3).exchangeProperty(Exchange.BATCH_COMPLETE).isEqualTo(false); + mock.message(4).exchangeProperty(Exchange.BATCH_COMPLETE).isEqualTo(true); + mock.expectedPropertyReceived(Exchange.BATCH_SIZE, 5); + } + + @Override + protected CamelContext createCamelContext() throws Exception { + + CamelContext context = super.createCamelContext(); + IronMQComponent component = new IronMQComponent(context); + Map<String, Object> parameters = new HashMap<String, Object>(); + parameters.put("projectId", "dummy"); + parameters.put("token", "dummy"); + parameters.put("maxMessagesPerPoll", "5"); + endpoint = (IronMQEndpoint)component.createEndpoint("ironmq", "testqueue22", parameters); + endpoint.setClient(new IronMQClientMock("dummy", "dummy")); + context.addComponent("ironmq", component); + return context; + } + + @Override + protected RouteBuilder createRouteBuilder() throws Exception { + return new RouteBuilder() { + @Override + public void configure() { + from(endpoint).to("mock:result"); + } + }; + } +} http://git-wip-us.apache.org/repos/asf/camel/blob/c3cfb652/components/camel-ironmq/src/test/java/org/apache/camel/component/ironmq/IronMQBatchProducerTest.java ---------------------------------------------------------------------- diff --git a/components/camel-ironmq/src/test/java/org/apache/camel/component/ironmq/IronMQBatchProducerTest.java b/components/camel-ironmq/src/test/java/org/apache/camel/component/ironmq/IronMQBatchProducerTest.java new file mode 100644 index 0000000..275f0fb --- /dev/null +++ b/components/camel-ironmq/src/test/java/org/apache/camel/component/ironmq/IronMQBatchProducerTest.java @@ -0,0 +1,79 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.camel.component.ironmq; + + +import java.util.Arrays; +import java.util.HashMap; +import java.util.Map; + +import io.iron.ironmq.Ids; + +import org.apache.camel.CamelContext; +import org.apache.camel.CamelExecutionException; +import org.apache.camel.builder.RouteBuilder; +import org.apache.camel.component.mock.MockEndpoint; +import org.apache.camel.test.junit4.CamelTestSupport; +import org.junit.Test; + +import static org.hamcrest.CoreMatchers.equalTo; + +public class IronMQBatchProducerTest extends CamelTestSupport { + + private IronMQEndpoint endpoint; + + @Test + public void testProduceBatch() throws Exception { + String[] messages = new String[] {"{foo:bar}", "{foo2:bar2}"}; + MockEndpoint mock = getMockEndpoint("mock:result"); + mock.expectedMessageCount(1); + template.sendBody("direct:start", messages); + assertMockEndpointsSatisfied(); + assertThat(mock.getReceivedExchanges().size(), equalTo(1)); + Object header = mock.getReceivedExchanges().get(0).getIn().getHeader(IronMQConstants.MESSAGE_ID); + assertIsInstanceOf(Ids.class, header); + assertThat(((Ids)header).getSize(), equalTo(2)); + } + + @Test(expected = CamelExecutionException.class) + public void testProduceBatchWithIllegalPayload() throws Exception { + template.sendBody("direct:start", Arrays.asList("foo", "bar")); + } + + @Override + protected CamelContext createCamelContext() throws Exception { + CamelContext context = super.createCamelContext(); + IronMQComponent component = new IronMQComponent(context); + Map<String, Object> parameters = new HashMap<String, Object>(); + parameters.put("projectId", "dummy"); + parameters.put("token", "dummy"); + endpoint = (IronMQEndpoint)component.createEndpoint("ironmq", "testqueue", parameters); + endpoint.setClient(new IronMQClientMock("dummy", "dummy")); + context.addComponent("ironmq", component); + return context; + } + + @Override + protected RouteBuilder createRouteBuilder() throws Exception { + return new RouteBuilder() { + @Override + public void configure() { + from("direct:start").to(endpoint).to("mock:result"); + } + }; + } +} http://git-wip-us.apache.org/repos/asf/camel/blob/c3cfb652/components/camel-ironmq/src/test/java/org/apache/camel/component/ironmq/IronMQClientMock.java ---------------------------------------------------------------------- diff --git a/components/camel-ironmq/src/test/java/org/apache/camel/component/ironmq/IronMQClientMock.java b/components/camel-ironmq/src/test/java/org/apache/camel/component/ironmq/IronMQClientMock.java new file mode 100644 index 0000000..474c24f --- /dev/null +++ b/components/camel-ironmq/src/test/java/org/apache/camel/component/ironmq/IronMQClientMock.java @@ -0,0 +1,44 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.camel.component.ironmq; + +import java.util.Map; +import java.util.concurrent.ConcurrentHashMap; + +import io.iron.ironmq.Client; +import io.iron.ironmq.Queue; + +public class IronMQClientMock extends Client { + private final Map<String, Queue> memQueues = new ConcurrentHashMap<String, Queue>(); + + public IronMQClientMock(String projectId, String token) { + super(projectId, token); + } + + @Override + public Queue queue(String name) { + Queue answer = null; + if (memQueues.containsKey(name)) { + answer = memQueues.get(name); + } else { + answer = new MockQueue(this, name); + memQueues.put(name, answer); + } + return answer; + } + +} http://git-wip-us.apache.org/repos/asf/camel/blob/c3cfb652/components/camel-ironmq/src/test/java/org/apache/camel/component/ironmq/IronMQComponentConfigurationTest.java ---------------------------------------------------------------------- diff --git a/components/camel-ironmq/src/test/java/org/apache/camel/component/ironmq/IronMQComponentConfigurationTest.java b/components/camel-ironmq/src/test/java/org/apache/camel/component/ironmq/IronMQComponentConfigurationTest.java new file mode 100644 index 0000000..34326df --- /dev/null +++ b/components/camel-ironmq/src/test/java/org/apache/camel/component/ironmq/IronMQComponentConfigurationTest.java @@ -0,0 +1,96 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.camel.component.ironmq; + +import org.apache.camel.test.junit4.CamelTestSupport; +import org.junit.Test; + +public class IronMQComponentConfigurationTest extends CamelTestSupport { + + @Test + public void createEndpointWithMinimalConfiguration() throws Exception { + IronMQComponent component = new IronMQComponent(context); + IronMQEndpoint endpoint = (IronMQEndpoint)component.createEndpoint("ironmq://TestQueue?projectId=xxx&token=yyy"); + + assertEquals("TestQueue", endpoint.getConfiguration().getQueueName()); + assertEquals("xxx", endpoint.getConfiguration().getProjectId()); + assertEquals("yyy", endpoint.getConfiguration().getToken()); + assertEquals(0, endpoint.getConfiguration().getVisibilityDelay()); + assertEquals(1, endpoint.getConfiguration().getMaxMessagesPerPoll()); + assertEquals(60, endpoint.getConfiguration().getTimeout()); + assertFalse(endpoint.getConfiguration().isPreserveHeaders()); + assertFalse(endpoint.getConfiguration().isBatchDelete()); + assertEquals(0, endpoint.getConfiguration().getWait()); + } + + @Test + public void createEndpointWithMinimalConfigurationAndIronMQCloud() throws Exception { + IronMQComponent component = new IronMQComponent(context); + + IronMQEndpoint endpoint = (IronMQEndpoint)component.createEndpoint("ironmq://TestQueue?projectId=xxx&token=yyy&ironMQCloud=https://iron.foo"); + + assertEquals("TestQueue", endpoint.getConfiguration().getQueueName()); + assertEquals("xxx", endpoint.getConfiguration().getProjectId()); + assertEquals("yyy", endpoint.getConfiguration().getToken()); + assertEquals(0, endpoint.getConfiguration().getVisibilityDelay()); + assertEquals(1, endpoint.getConfiguration().getMaxMessagesPerPoll()); + assertEquals(60, endpoint.getConfiguration().getTimeout()); + assertEquals("https://iron.foo", endpoint.getConfiguration().getIronMQCloud()); + } + + @Test + public void createEndpointWithMaximalConfiguration() throws Exception { + IronMQComponent component = new IronMQComponent(context); + IronMQEndpoint endpoint = (IronMQEndpoint)component + .createEndpoint("ironmq://TestQueue?projectId=xxx&token=yyy&timeout=120&visibilityDelay=5&maxMessagesPerPoll=20&preserveHeaders=true&wait=30" + + "&ironMQCloud=https://iron.foo&batchDelete=true"); + assertEquals("TestQueue", endpoint.getConfiguration().getQueueName()); + assertEquals("xxx", endpoint.getConfiguration().getProjectId()); + assertEquals("yyy", endpoint.getConfiguration().getToken()); + assertEquals(20, endpoint.getConfiguration().getMaxMessagesPerPoll()); + assertEquals(120, endpoint.getConfiguration().getTimeout()); + assertEquals(5, endpoint.getConfiguration().getVisibilityDelay()); + assertTrue(endpoint.getConfiguration().isPreserveHeaders()); + assertEquals(30, endpoint.getConfiguration().getWait()); + assertTrue(endpoint.getConfiguration().isBatchDelete()); + assertEquals("https://iron.foo", endpoint.getConfiguration().getIronMQCloud()); + } + + @Test + public void createEndpointWithPollConsumerConfiguration() throws Exception { + IronMQComponent component = new IronMQComponent(context); + IronMQEndpoint endpoint = (IronMQEndpoint)component + .createEndpoint("ironmq://TestQueue?projectId=xxx&token=yyy&initialDelay=200&delay=400&timeout=120&maxMessagesPerPoll=20"); + IronMQConsumer consumer = (IronMQConsumer)endpoint.createConsumer(null); + + assertEquals(200, consumer.getInitialDelay()); + assertEquals(400, consumer.getDelay()); + assertEquals(20, consumer.getMaxMessagesPerPoll()); + } + + @Test(expected = IllegalArgumentException.class) + public void createEndpointWithoutTokenConfiguration() throws Exception { + IronMQComponent component = new IronMQComponent(context); + component.createEndpoint("ironmq://testqueue?projectId=yyy"); + } + + @Test(expected = IllegalArgumentException.class) + public void createEndpointWithoutProjectIdConfiguration() throws Exception { + IronMQComponent component = new IronMQComponent(context); + component.createEndpoint("ironmq://MyQueue?token=xxx"); + } +} http://git-wip-us.apache.org/repos/asf/camel/blob/c3cfb652/components/camel-ironmq/src/test/java/org/apache/camel/component/ironmq/IronMQComponentSpringTest.java ---------------------------------------------------------------------- diff --git a/components/camel-ironmq/src/test/java/org/apache/camel/component/ironmq/IronMQComponentSpringTest.java b/components/camel-ironmq/src/test/java/org/apache/camel/component/ironmq/IronMQComponentSpringTest.java new file mode 100644 index 0000000..8000987 --- /dev/null +++ b/components/camel-ironmq/src/test/java/org/apache/camel/component/ironmq/IronMQComponentSpringTest.java @@ -0,0 +1,79 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.camel.component.ironmq; + +import org.apache.camel.EndpointInject; +import org.apache.camel.Exchange; +import org.apache.camel.ExchangePattern; +import org.apache.camel.Processor; +import org.apache.camel.ProducerTemplate; +import org.apache.camel.component.mock.MockEndpoint; +import org.apache.camel.test.spring.CamelSpringTestSupport; +import org.junit.Test; +import org.springframework.context.support.ClassPathXmlApplicationContext; + +public class IronMQComponentSpringTest extends CamelSpringTestSupport { + + @EndpointInject(uri = "direct:start") + private ProducerTemplate template; + + @EndpointInject(uri = "mock:result") + private MockEndpoint result; + + @Test + public void sendInOnly() throws Exception { + result.expectedMessageCount(1); + + Exchange exchange = template.send("direct:start", ExchangePattern.InOnly, new Processor() { + public void process(Exchange exchange) throws Exception { + exchange.getIn().setBody("This is my message text."); + } + }); + + assertMockEndpointsSatisfied(); + + Exchange resultExchange = result.getExchanges().get(0); + assertEquals("This is my message text.", resultExchange.getIn().getBody()); + assertNotNull(resultExchange.getIn().getHeader(IronMQConstants.MESSAGE_ID)); + + assertNotNull(exchange.getIn().getHeader(IronMQConstants.MESSAGE_ID)); + } + + @Test + public void sendInOut() throws Exception { + result.expectedMessageCount(1); + + Exchange exchange = template.send("direct:start", ExchangePattern.InOut, new Processor() { + public void process(Exchange exchange) throws Exception { + exchange.getIn().setBody("This is my message text."); + } + }); + + assertMockEndpointsSatisfied(); + + Exchange resultExchange = result.getExchanges().get(0); + assertEquals("This is my message text.", resultExchange.getIn().getBody()); + assertNotNull(resultExchange.getIn().getHeader(IronMQConstants.MESSAGE_ID)); + + assertNotNull(exchange.getOut().getHeader(IronMQConstants.MESSAGE_ID)); + } + + @Override + protected ClassPathXmlApplicationContext createApplicationContext() { + return new ClassPathXmlApplicationContext("org/apache/camel/component/ironmq/IronMQComponentSpringTest-context.xml"); + } +} http://git-wip-us.apache.org/repos/asf/camel/blob/c3cfb652/components/camel-ironmq/src/test/java/org/apache/camel/component/ironmq/IronMQComponentTest.java ---------------------------------------------------------------------- diff --git a/components/camel-ironmq/src/test/java/org/apache/camel/component/ironmq/IronMQComponentTest.java b/components/camel-ironmq/src/test/java/org/apache/camel/component/ironmq/IronMQComponentTest.java new file mode 100644 index 0000000..2aeae86 --- /dev/null +++ b/components/camel-ironmq/src/test/java/org/apache/camel/component/ironmq/IronMQComponentTest.java @@ -0,0 +1,117 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.camel.component.ironmq; + +import java.util.HashMap; +import java.util.Map; + +import org.apache.camel.CamelContext; +import org.apache.camel.EndpointInject; +import org.apache.camel.Exchange; +import org.apache.camel.ExchangePattern; +import org.apache.camel.Message; +import org.apache.camel.Processor; +import org.apache.camel.builder.RouteBuilder; +import org.apache.camel.component.mock.MockEndpoint; +import org.apache.camel.test.junit4.CamelTestSupport; +import org.junit.Assert; +import org.junit.Test; + +public class IronMQComponentTest extends CamelTestSupport { + + private IronMQEndpoint endpoint; + + @EndpointInject(uri = "mock:result") + private MockEndpoint result; + + @Test + public void testIronMQ() throws Exception { + MockEndpoint mock = getMockEndpoint("mock:result"); + mock.expectedMinimumMessageCount(1); + template.sendBody("direct:start", "some payload"); + + assertMockEndpointsSatisfied(); + Message in = mock.getExchanges().get(0).getIn(); + Assert.assertNotNull(in.getHeader(IronMQConstants.MESSAGE_ID)); + Assert.assertNotNull(in.getHeader(IronMQConstants.MESSAGE_RESERVATION_ID)); + Assert.assertNotNull(in.getHeader(IronMQConstants.MESSAGE_RESERVED_COUNT)); + } + + @Test + public void sendInOnly() throws Exception { + result.expectedMessageCount(1); + + Exchange exchange = template.send("direct:start", ExchangePattern.InOnly, new Processor() { + public void process(Exchange exchange) throws Exception { + exchange.getIn().setBody("This is my message text."); + } + }); + + assertMockEndpointsSatisfied(); + + Exchange resultExchange = result.getExchanges().get(0); + assertEquals("This is my message text.", resultExchange.getIn().getBody()); + assertNotNull(resultExchange.getIn().getHeader(IronMQConstants.MESSAGE_ID)); + + assertEquals("This is my message text.", exchange.getIn().getBody()); + assertNotNull(exchange.getIn().getHeader(IronMQConstants.MESSAGE_ID)); + } + + @Test + public void sendInOut() throws Exception { + result.expectedMessageCount(1); + + Exchange exchange = template.send("direct:start", ExchangePattern.InOut, new Processor() { + public void process(Exchange exchange) throws Exception { + exchange.getIn().setBody("This is my message text."); + } + }); + + assertMockEndpointsSatisfied(); + + Exchange resultExchange = result.getExchanges().get(0); + assertEquals("This is my message text.", resultExchange.getIn().getBody()); + assertNotNull(resultExchange.getIn().getHeader(IronMQConstants.MESSAGE_ID)); + + assertEquals("This is my message text.", exchange.getOut().getBody()); + assertNotNull(exchange.getOut().getHeader(IronMQConstants.MESSAGE_ID)); + } + + @Override + protected CamelContext createCamelContext() throws Exception { + CamelContext context = super.createCamelContext(); + IronMQComponent component = new IronMQComponent(context); + Map<String, Object> parameters = new HashMap<String, Object>(); + parameters.put("projectId", "dummy"); + parameters.put("token", "dummy"); + endpoint = (IronMQEndpoint)component.createEndpoint("ironmq", "testqueue", parameters); + endpoint.setClient(new IronMQClientMock("dummy", "dummy")); + context.addComponent("ironmq", component); + return context; + } + + @Override + protected RouteBuilder createRouteBuilder() throws Exception { + return new RouteBuilder() { + public void configure() { + from("direct:start").to(endpoint); + + from(endpoint).to("mock:result"); + } + }; + } +} http://git-wip-us.apache.org/repos/asf/camel/blob/c3cfb652/components/camel-ironmq/src/test/java/org/apache/camel/component/ironmq/IronMQPreserveHeadersTest.java ---------------------------------------------------------------------- diff --git a/components/camel-ironmq/src/test/java/org/apache/camel/component/ironmq/IronMQPreserveHeadersTest.java b/components/camel-ironmq/src/test/java/org/apache/camel/component/ironmq/IronMQPreserveHeadersTest.java new file mode 100644 index 0000000..2dfe09a --- /dev/null +++ b/components/camel-ironmq/src/test/java/org/apache/camel/component/ironmq/IronMQPreserveHeadersTest.java @@ -0,0 +1,67 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.camel.component.ironmq; + +import org.apache.camel.CamelContext; +import org.apache.camel.EndpointInject; +import org.apache.camel.builder.RouteBuilder; +import org.apache.camel.component.mock.MockEndpoint; +import org.apache.camel.test.junit4.CamelTestSupport; +import org.junit.Assert; +import org.junit.Test; + +public class IronMQPreserveHeadersTest extends CamelTestSupport { + + private IronMQEndpoint endpoint; + + @EndpointInject(uri = "mock:result") + private MockEndpoint result; + + @Test + public void testPreserveHeaders() throws Exception { + MockEndpoint mock = getMockEndpoint("mock:result"); + mock.expectedMinimumMessageCount(1); + mock.expectedBodiesReceived("some payload"); + mock.expectedHeaderReceived("MyHeader", "HeaderValue"); + template.sendBodyAndHeader("direct:start", "some payload", "MyHeader", "HeaderValue"); + + assertMockEndpointsSatisfied(); + String id = mock.getExchanges().get(0).getIn().getHeader(IronMQConstants.MESSAGE_ID, String.class); + Assert.assertNotNull(id); + } + + @Override + protected CamelContext createCamelContext() throws Exception { + CamelContext context = super.createCamelContext(); + IronMQComponent component = new IronMQComponent(context); + endpoint = (IronMQEndpoint)component.createEndpoint("ironmq://TestQueue?projectId=xxx&token=yyy&preserveHeaders=true"); + endpoint.setClient(new IronMQClientMock("dummy", "dummy")); + context.addComponent("ironmq", component); + return context; + } + + @Override + protected RouteBuilder createRouteBuilder() throws Exception { + return new RouteBuilder() { + public void configure() { + from("direct:start").to(endpoint); + + from(endpoint).to("mock:result"); + } + }; + } +}
