Updated Branches: refs/heads/master 51a46bfad -> a9a2b5bc8
The OpenStack Marconi Claim API. Project: http://git-wip-us.apache.org/repos/asf/jclouds-labs-openstack/repo Commit: http://git-wip-us.apache.org/repos/asf/jclouds-labs-openstack/commit/a9a2b5bc Tree: http://git-wip-us.apache.org/repos/asf/jclouds-labs-openstack/tree/a9a2b5bc Diff: http://git-wip-us.apache.org/repos/asf/jclouds-labs-openstack/diff/a9a2b5bc Branch: refs/heads/master Commit: a9a2b5bc8f910d589d531e1b088e89729f7286ee Parents: 51a46bf Author: Everett Toews <[email protected]> Authored: Wed Dec 11 21:55:41 2013 -0600 Committer: Everett Toews <[email protected]> Committed: Thu Dec 12 12:36:37 2013 -0600 ---------------------------------------------------------------------- .../openstack/marconi/v1/MarconiApi.java | 23 ++- .../openstack/marconi/v1/domain/Claim.java | 164 +++++++++++++++++++ .../openstack/marconi/v1/domain/Message.java | 31 +++- .../openstack/marconi/v1/features/ClaimApi.java | 144 ++++++++++++++++ .../marconi/v1/features/MessageApi.java | 30 +++- .../marconi/v1/functions/ParseClaim.java | 78 +++++++++ .../v1/functions/ParseMessagesCreated.java | 4 +- .../v1/functions/ParseMessagesToStream.java | 47 +++++- .../marconi/v1/functions/ParseQueueStats.java | 6 +- .../v1/options/StreamMessagesOptions.java | 16 ++ .../marconi/v1/features/ClaimApiLiveTest.java | 134 +++++++++++++++ .../marconi/v1/features/ClaimApiMockTest.java | 126 ++++++++++++++ .../marconi/v1/features/MessageApiLiveTest.java | 15 ++ .../marconi/v1/features/MessageApiMockTest.java | 23 +++ .../us/CloudQueuesUSClaimApiLiveTest.java | 30 ++++ 15 files changed, 844 insertions(+), 27 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/jclouds-labs-openstack/blob/a9a2b5bc/openstack-marconi/src/main/java/org/jclouds/openstack/marconi/v1/MarconiApi.java ---------------------------------------------------------------------- diff --git a/openstack-marconi/src/main/java/org/jclouds/openstack/marconi/v1/MarconiApi.java b/openstack-marconi/src/main/java/org/jclouds/openstack/marconi/v1/MarconiApi.java index fe4c048..3583bbe 100644 --- a/openstack-marconi/src/main/java/org/jclouds/openstack/marconi/v1/MarconiApi.java +++ b/openstack-marconi/src/main/java/org/jclouds/openstack/marconi/v1/MarconiApi.java @@ -20,15 +20,18 @@ import com.google.inject.Provides; import org.jclouds.javax.annotation.Nullable; import org.jclouds.location.Zone; import org.jclouds.location.functions.ZoneToEndpoint; +import org.jclouds.openstack.marconi.v1.features.ClaimApi; import org.jclouds.openstack.marconi.v1.features.MessageApi; import org.jclouds.openstack.marconi.v1.features.QueueApi; import org.jclouds.rest.annotations.Delegate; import org.jclouds.rest.annotations.EndpointParam; +import javax.ws.rs.HeaderParam; import javax.ws.rs.Path; import javax.ws.rs.PathParam; import java.io.Closeable; import java.util.Set; +import java.util.UUID; /** * Marconi is a robust, web-scale message queuing service to support the distributed nature of large web applications. @@ -55,11 +58,29 @@ public interface MarconiApi extends Closeable { /** * Provides access to Message features. * - * @param zone The zone where this queue will live. + * @param zone The zone where this queue lives. * @param name Name of the queue. */ @Delegate @Path("/queues/{name}") MessageApi getMessageApiForZoneAndQueue( @EndpointParam(parser = ZoneToEndpoint.class) @Nullable String zone, @PathParam("name") String name); + + /** + * Provides access to Claim features. + * + * @param zone The zone where this queue lives. + * @param clientId A UUID for each client instance. The UUID must be submitted in its canonical form (for example, + * 3381af92-2b9e-11e3-b191-71861300734c). The client generates the Client-ID once. Client-ID + * persists between restarts of the client so the client should reuse that same Client-ID. All + * message-related operations require the use of Client-ID in the headers to ensure that messages + * are not echoed back to the client that posted them, unless the client explicitly requests this. + * @param name Name of the queue. + */ + @Delegate + @Path("/queues/{name}") + ClaimApi getClaimApiForZoneAndClientAndQueue( + @EndpointParam(parser = ZoneToEndpoint.class) @Nullable String zone, + @HeaderParam("Client-ID") UUID clientId, + @PathParam("name") String name); } http://git-wip-us.apache.org/repos/asf/jclouds-labs-openstack/blob/a9a2b5bc/openstack-marconi/src/main/java/org/jclouds/openstack/marconi/v1/domain/Claim.java ---------------------------------------------------------------------- diff --git a/openstack-marconi/src/main/java/org/jclouds/openstack/marconi/v1/domain/Claim.java b/openstack-marconi/src/main/java/org/jclouds/openstack/marconi/v1/domain/Claim.java new file mode 100644 index 0000000..71945b4 --- /dev/null +++ b/openstack-marconi/src/main/java/org/jclouds/openstack/marconi/v1/domain/Claim.java @@ -0,0 +1,164 @@ +/* + * Licensed to jclouds, Inc. (jclouds) under one or more + * contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. jclouds licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.jclouds.openstack.marconi.v1.domain; + +import com.google.common.base.Objects; +import com.google.common.collect.ImmutableList; +import org.jclouds.javax.annotation.Nullable; + +import java.util.List; + +import static com.google.common.base.Preconditions.checkArgument; +import static com.google.common.base.Preconditions.checkNotNull; + +/** + * A claim for messages in a queue. + * + * @author Everett Toews + */ +public class Claim { + + private final String id; + private final int ttl; + private final int age; + private final List<Message> messages; + + protected Claim(String id, int ttl, int age, @Nullable List<Message> messages) { + this.id = checkNotNull(id, "id required"); + this.ttl = ttl; + this.age = age; + this.messages = messages == null ? ImmutableList.<Message>of() : messages; + } + + /** + * @return The id of this message. + */ + public String getId() { + return id; + } + + /** + * @see CreateMessage.Builder#ttl(int) + */ + public int getTTL() { + return ttl; + } + + /** + * @return Age of this message in seconds. + */ + public int getAge() { + return age; + } + + /** + * @return The messages that are associated with this claim. + */ + public List<Message> getMessages() { + return messages; + } + + @Override + public int hashCode() { + return Objects.hashCode(id); + } + + @Override + public boolean equals(Object obj) { + if (this == obj) return true; + if (obj == null || getClass() != obj.getClass()) return false; + Claim that = Claim.class.cast(obj); + return Objects.equal(this.id, that.id); + } + + protected Objects.ToStringHelper string() { + return Objects.toStringHelper(this) + .add("id", id).add("ttl", ttl).add("age", age).add("messages", messages); + } + + @Override + public String toString() { + return string().toString(); + } + + public static Builder builder() { + return new ConcreteBuilder(); + } + + public Builder toBuilder() { + return new ConcreteBuilder().fromMessage(this); + } + + public static abstract class Builder { + protected abstract Builder self(); + + protected String id; + protected int ttl; + protected int age; + protected List<Message> messages; + + /** + * @see Claim#getId() + */ + public Builder id(String id) { + this.id = id; + return self(); + } + + /** + * @see CreateMessage.Builder#ttl(int) + */ + public Builder ttl(int ttl) { + this.ttl = ttl; + return self(); + } + + /** + * @see Claim#getAge() + */ + public Builder age(int age) { + this.age = age; + return self(); + } + + /** + * @see Claim#getMessages() + */ + public Builder messages(List<Message> messages) { + this.messages = messages; + return self(); + } + + public Claim build() { + return new Claim(id, ttl, age, messages); + } + + public Builder fromMessage(Claim in) { + return this.id(in.getId()).ttl(in.getTTL()).age(in.getAge()).messages(in.getMessages()); + } + } + + private static class ConcreteBuilder extends Builder { + @Override + protected ConcreteBuilder self() { + return this; + } + } + +} http://git-wip-us.apache.org/repos/asf/jclouds-labs-openstack/blob/a9a2b5bc/openstack-marconi/src/main/java/org/jclouds/openstack/marconi/v1/domain/Message.java ---------------------------------------------------------------------- diff --git a/openstack-marconi/src/main/java/org/jclouds/openstack/marconi/v1/domain/Message.java b/openstack-marconi/src/main/java/org/jclouds/openstack/marconi/v1/domain/Message.java index a118603..184ef7b 100644 --- a/openstack-marconi/src/main/java/org/jclouds/openstack/marconi/v1/domain/Message.java +++ b/openstack-marconi/src/main/java/org/jclouds/openstack/marconi/v1/domain/Message.java @@ -19,6 +19,8 @@ package org.jclouds.openstack.marconi.v1.domain; import com.google.common.base.Objects; +import com.google.common.base.Optional; +import org.jclouds.javax.annotation.Nullable; import static com.google.common.base.Preconditions.checkNotNull; @@ -33,12 +35,14 @@ public class Message { private final int ttl; private final String body; private final int age; + private final String claimId; - protected Message(String id, int ttl, String body, int age) { + protected Message(String id, int ttl, String body, int age, @Nullable String claimId) { this.id = checkNotNull(id, "id required"); this.ttl = ttl; this.body = checkNotNull(body, "body required"); this.age = age; + this.claimId = claimId; } /** @@ -69,6 +73,13 @@ public class Message { return age; } + /** + * @return The claim id of this message. + */ + public Optional<String> getClaimId() { + return Optional.fromNullable(claimId); + } + @Override public int hashCode() { return Objects.hashCode(id); @@ -83,8 +94,8 @@ public class Message { } protected Objects.ToStringHelper string() { - return Objects.toStringHelper(this) - .add("id", id).add("ttl", ttl).add("body", body).add("age", age); + return Objects.toStringHelper(this).omitNullValues() + .add("id", id).add("ttl", ttl).add("body", body).add("age", age).add("claimId", claimId); } @Override @@ -107,6 +118,7 @@ public class Message { protected int ttl; protected String body; protected int age; + protected String claimId; /** * @see Message#getId() @@ -140,12 +152,21 @@ public class Message { return self(); } + /** + * @see Message#getClaimId() + */ + public Builder claimId(String claimId) { + this.claimId = claimId; + return self(); + } + public Message build() { - return new Message(id, ttl, body, age); + return new Message(id, ttl, body, age, claimId); } public Builder fromMessage(Message in) { - return this.id(in.getId()).ttl(in.getTTL()).body(in.getBody()).age(in.getAge()); + return this.id(in.getId()).ttl(in.getTTL()).body(in.getBody()).age(in.getAge()) + .claimId(in.getClaimId().orNull()); } } http://git-wip-us.apache.org/repos/asf/jclouds-labs-openstack/blob/a9a2b5bc/openstack-marconi/src/main/java/org/jclouds/openstack/marconi/v1/features/ClaimApi.java ---------------------------------------------------------------------- diff --git a/openstack-marconi/src/main/java/org/jclouds/openstack/marconi/v1/features/ClaimApi.java b/openstack-marconi/src/main/java/org/jclouds/openstack/marconi/v1/features/ClaimApi.java new file mode 100644 index 0000000..f14deb5 --- /dev/null +++ b/openstack-marconi/src/main/java/org/jclouds/openstack/marconi/v1/features/ClaimApi.java @@ -0,0 +1,144 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.jclouds.openstack.marconi.v1.features; + +import org.jclouds.openstack.keystone.v2_0.filters.AuthenticateRequest; +import org.jclouds.openstack.marconi.v1.domain.Claim; +import org.jclouds.openstack.marconi.v1.domain.Message; +import org.jclouds.openstack.marconi.v1.functions.ParseClaim; +import org.jclouds.openstack.marconi.v1.functions.ParseMessagesToList; +import org.jclouds.rest.annotations.Fallback; +import org.jclouds.rest.annotations.PATCH; +import org.jclouds.rest.annotations.Payload; +import org.jclouds.rest.annotations.PayloadParam; +import org.jclouds.rest.annotations.RequestFilters; +import org.jclouds.rest.annotations.ResponseParser; +import org.jclouds.rest.annotations.SkipEncoding; + +import javax.inject.Named; +import javax.ws.rs.Consumes; +import javax.ws.rs.DELETE; +import javax.ws.rs.GET; +import javax.ws.rs.POST; +import javax.ws.rs.Path; +import javax.ws.rs.PathParam; +import javax.ws.rs.Produces; +import javax.ws.rs.QueryParam; +import javax.ws.rs.core.MediaType; +import java.util.List; + +import static org.jclouds.Fallbacks.EmptyListOnNotFoundOr404; +import static org.jclouds.Fallbacks.FalseOnNotFoundOr404; +import static org.jclouds.Fallbacks.NullOnNotFoundOr404; + +/** + * Provides access to Messages via their REST API. + * + * @author Everett Toews + */ +@SkipEncoding({'/', '='}) +@RequestFilters(AuthenticateRequest.class) +public interface ClaimApi { + /** + * This operation claims a set of messages (up to the value of the limit parameter) from oldest to newest and skips + * any messages that are already claimed. If no unclaimed messages are available, an empty List is returned. + * </p> + * When a client (worker) finishes processing a message, it should delete the message before the claim expires to + * ensure that the message is processed only once. As part of the delete operation, workers should specify the claim + * ID. If workers perform these actions and a claim simply expires, the server can return an error and notify the + * worker of the race condition. This action gives the worker a chance to roll back its own processing of the given + * message because another worker can claim the message and process it. + * </p> + * The age given for a claim is relative to the server's clock. The claim's age is useful for determining how + * quickly messages are getting processed and whether a given message's claim is about to expire. + * </p> + * When a claim expires, it is released. If the original worker failed to process the message, another client worker + * can then claim the message. + * </p> + * Note that claim creation is best-effort, meaning the worker may claim and return less than the requested number + * of messages. + * </p> + * To deal with workers that have stopped responding (for up to 1209600 seconds or 14 days, including claim + * lifetime), the server extends the lifetime of claimed messages to be at least as long as the lifetime of the + * claim itself, plus the specified grace period. If a claimed message would normally live longer than the grace + * period, its expiration is not adjusted. + * + * @param ttl The ttl attribute specifies how long the server waits before releasing the claim. The ttl value + * must be between 60 and 43200 seconds (12 hours). You must include a value for this attribute in + * your request. + * @param grace The grace attribute specifies the message grace period in seconds. The value of grace value must + * be between 60 and 43200 seconds (12 hours). You must include a value for thia attribute in your + * request. + * @param limit Specifies the number of messages to return, up to 20 messages. + */ + @Named("claim:claim") + @POST + @Path("/claims") + @ResponseParser(ParseMessagesToList.class) + @Fallback(EmptyListOnNotFoundOr404.class) + @Payload("%7B\"ttl\":{ttl},\"grace\":{grace}%7D") + List<Message> claim(@PayloadParam("ttl") int ttl, + @PayloadParam("grace") int grace, + @QueryParam("limit") int limit); + + /** + * Gets a specific claim and the associated messages. + * + * @param claimId Specific claim ID of the message to get. + */ + @Named("claim:get") + @GET + @ResponseParser(ParseClaim.class) + @Consumes(MediaType.APPLICATION_JSON) + @Path("/claims/{claim_id}") + @Fallback(NullOnNotFoundOr404.class) + Claim get(@PathParam("claim_id") String claimId); + + /** + * Clients should periodically renew claims during long-running batches of work to avoid losing a claim while + * processing a message. The client can renew a claim by including a new TTL for the claim (which can be different + * from the original TTL). The server resets the age of the claim and applies the new TTL. + * + * @param claimId Specific claim ID of the message to get. + * @param ttl The ttl attribute specifies how long the server waits before releasing the claim. The ttl value + * must be between 60 and 43200 seconds (12 hours). You must include a value for this attribute in + * your request. + */ + // TODO: revisit this when we figure out what's wrong with PATCH + // @Named("claim:update") + // @PATCH + // @Path("/claims/{claim_id}") + // @Fallback(FalseOnNotFoundOr404.class) + // @Payload("%7B\"ttl\":{ttl}%7D") + // @Produces(MediaType.APPLICATION_JSON) + // boolean update(@PathParam("claim_id") String claimId, + // @PayloadParam("ttl") int ttl); + + /** + * This operation immediately releases a claim, making any remaining, undeleted) messages that are associated with + * the claim available to other workers. This operation is useful when a worker is performing a graceful shutdown, + * fails to process one or more messages, or is taking longer than expected to process messages, and wants to make + * the remainder of the messages available to other workers. + * + * @param claimId Specific claim ID of the message to get. + */ + @Named("claim:delete") + @DELETE + @Path("/claims/{claim_id}") + @Fallback(FalseOnNotFoundOr404.class) + boolean release(@PathParam("claim_id") String claimId); +} http://git-wip-us.apache.org/repos/asf/jclouds-labs-openstack/blob/a9a2b5bc/openstack-marconi/src/main/java/org/jclouds/openstack/marconi/v1/features/MessageApi.java ---------------------------------------------------------------------- diff --git a/openstack-marconi/src/main/java/org/jclouds/openstack/marconi/v1/features/MessageApi.java b/openstack-marconi/src/main/java/org/jclouds/openstack/marconi/v1/features/MessageApi.java index 5dc0aa9..645602c 100644 --- a/openstack-marconi/src/main/java/org/jclouds/openstack/marconi/v1/features/MessageApi.java +++ b/openstack-marconi/src/main/java/org/jclouds/openstack/marconi/v1/features/MessageApi.java @@ -42,6 +42,7 @@ import javax.ws.rs.HeaderParam; import javax.ws.rs.POST; import javax.ws.rs.Path; import javax.ws.rs.PathParam; +import javax.ws.rs.QueryParam; import javax.ws.rs.core.MediaType; import java.util.List; import java.util.UUID; @@ -83,7 +84,7 @@ public interface MessageApi { * messages indefinitely. * * @param clientId A UUID for each client instance. - * @param options Options for streaming messages to your client. + * @param options Options for streaming messages to your client. */ @Named("message:stream") @GET @@ -98,7 +99,7 @@ public interface MessageApi { * Lists specific messages. Unlike the stream method, a client's own messages are always returned in this operation. * * @param clientId A UUID for each client instance. - * @param ids Specifies the IDs of the messages to get. + * @param ids Specifies the IDs of the messages to list. */ @Named("message:list") @GET @@ -109,13 +110,11 @@ public interface MessageApi { List<Message> list(@HeaderParam("Client-ID") UUID clientId, @BinderParam(BindIdsToQueryParam.class) Iterable<String> ids); - // TODO: list by claim id when claim API done - /** * Gets a specific message. Unlike the stream method, a client's own messages are always returned in this operation. * * @param clientId A UUID for each client instance. - * @param id Specific ID of the message to get. + * @param id Specific ID of the message to get. */ @Named("message:get") @GET @@ -131,7 +130,7 @@ public interface MessageApi { * remaining valid messages IDs are deleted. * * @param clientId A UUID for each client instance. - * @param ids Specifies the IDs of the messages to delete. + * @param ids Specifies the IDs of the messages to delete. */ @Named("message:delete") @DELETE @@ -141,5 +140,22 @@ public interface MessageApi { boolean delete(@HeaderParam("Client-ID") UUID clientId, @BinderParam(BindIdsToQueryParam.class) Iterable<String> ids); - // TODO: delete by claim id when claim API done + /** + * The claimId parameter specifies that the message is deleted only if it has the specified claim ID and that claim + * has not expired. This specification is useful for ensuring only one worker processes any given message. When a + * worker's claim expires before it can delete a message that it has processed, the worker must roll back any + * actions it took based on that message because another worker can now claim and process the same message. + * + * @param clientId A UUID for each client instance. + * @param id Specific ID of the message to delete. + * @param claimId Specific claim ID of the message to delete. + */ + @Named("message:delete") + @DELETE + @Consumes(MediaType.APPLICATION_JSON) + @Path("/messages/{message_id}") + @Fallback(FalseOnNotFoundOr404.class) + boolean deleteByClaim(@HeaderParam("Client-ID") UUID clientId, + @PathParam("message_id") String id, + @QueryParam("claim_id") String claimId); } http://git-wip-us.apache.org/repos/asf/jclouds-labs-openstack/blob/a9a2b5bc/openstack-marconi/src/main/java/org/jclouds/openstack/marconi/v1/functions/ParseClaim.java ---------------------------------------------------------------------- diff --git a/openstack-marconi/src/main/java/org/jclouds/openstack/marconi/v1/functions/ParseClaim.java b/openstack-marconi/src/main/java/org/jclouds/openstack/marconi/v1/functions/ParseClaim.java new file mode 100644 index 0000000..c03358f --- /dev/null +++ b/openstack-marconi/src/main/java/org/jclouds/openstack/marconi/v1/functions/ParseClaim.java @@ -0,0 +1,78 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.jclouds.openstack.marconi.v1.functions; + +import com.google.common.base.Function; +import com.google.common.collect.ImmutableList; +import org.jclouds.http.HttpResponse; +import org.jclouds.http.functions.ParseJson; +import org.jclouds.openstack.marconi.v1.domain.Claim; +import org.jclouds.openstack.marconi.v1.domain.Message; + +import javax.inject.Inject; +import java.beans.ConstructorProperties; +import java.util.List; + +import static com.google.common.base.Preconditions.checkNotNull; +import static com.google.common.collect.Iterables.transform; +import static com.google.common.collect.Lists.newArrayList; +import static org.jclouds.openstack.marconi.v1.functions.ParseMessagesToStream.MessageWithHref; +import static org.jclouds.openstack.marconi.v1.functions.ParseMessagesToStream.TO_ID_FROM_HREF; +import static org.jclouds.openstack.marconi.v1.functions.ParseMessagesToStream.TO_MESSAGE; + +/** + * @author Everett Toews + */ +public class ParseClaim implements Function<HttpResponse, Claim> { + + protected static final Function<ClaimWithHref, Claim> TO_CLAIM = new Function<ClaimWithHref, Claim>() { + @Override + public Claim apply(ClaimWithHref claimWithHref) { + List<Message> messages = newArrayList(transform(claimWithHref.messagesWithHref, TO_MESSAGE)); + String claimId = TO_ID_FROM_HREF.apply(claimWithHref.getId()); + + return claimWithHref.toBuilder() + .id(claimId) + .messages(messages) + .build(); + } + }; + private final ParseJson<ClaimWithHref> json; + + @Inject + ParseClaim(ParseJson<ClaimWithHref> json) { + this.json = checkNotNull(json, "json"); + } + + @Override + public Claim apply(HttpResponse response) { + ClaimWithHref claimWithHref = json.apply(response); + Claim claim = TO_CLAIM.apply(claimWithHref); + + return claim; + } + + private static class ClaimWithHref extends Claim { + private final List<MessageWithHref> messagesWithHref; + + @ConstructorProperties({"href", "ttl", "age", "messages"}) + protected ClaimWithHref(String href, int ttl, int age, List<MessageWithHref> messagesWithHref) { + super(href, ttl, age, null); + this.messagesWithHref = messagesWithHref; + } + } +} http://git-wip-us.apache.org/repos/asf/jclouds-labs-openstack/blob/a9a2b5bc/openstack-marconi/src/main/java/org/jclouds/openstack/marconi/v1/functions/ParseMessagesCreated.java ---------------------------------------------------------------------- diff --git a/openstack-marconi/src/main/java/org/jclouds/openstack/marconi/v1/functions/ParseMessagesCreated.java b/openstack-marconi/src/main/java/org/jclouds/openstack/marconi/v1/functions/ParseMessagesCreated.java index 9be3722..3d38092 100644 --- a/openstack-marconi/src/main/java/org/jclouds/openstack/marconi/v1/functions/ParseMessagesCreated.java +++ b/openstack-marconi/src/main/java/org/jclouds/openstack/marconi/v1/functions/ParseMessagesCreated.java @@ -26,7 +26,7 @@ import org.jclouds.openstack.marconi.v1.domain.MessagesCreated; import java.util.List; import static com.google.common.base.Preconditions.checkNotNull; -import static org.jclouds.openstack.marconi.v1.functions.ParseMessagesToStream.TO_MESSAGE_ID; +import static org.jclouds.openstack.marconi.v1.functions.ParseMessagesToStream.TO_ID_FROM_HREF; /** * This parses the messages created on a queue. @@ -44,7 +44,7 @@ public class ParseMessagesCreated implements Function<HttpResponse, MessagesCrea public MessagesCreated apply(HttpResponse from) { MessagesCreated rawMessagesCreated = json.apply(from); - List<String> messageIds = Lists.transform(rawMessagesCreated.getMessageIds(), TO_MESSAGE_ID); + List<String> messageIds = Lists.transform(rawMessagesCreated.getMessageIds(), TO_ID_FROM_HREF); MessagesCreated messagesCreated = MessagesCreated.builder() .messageIds(messageIds) http://git-wip-us.apache.org/repos/asf/jclouds-labs-openstack/blob/a9a2b5bc/openstack-marconi/src/main/java/org/jclouds/openstack/marconi/v1/functions/ParseMessagesToStream.java ---------------------------------------------------------------------- diff --git a/openstack-marconi/src/main/java/org/jclouds/openstack/marconi/v1/functions/ParseMessagesToStream.java b/openstack-marconi/src/main/java/org/jclouds/openstack/marconi/v1/functions/ParseMessagesToStream.java index 76a9ba7..42e2b02 100644 --- a/openstack-marconi/src/main/java/org/jclouds/openstack/marconi/v1/functions/ParseMessagesToStream.java +++ b/openstack-marconi/src/main/java/org/jclouds/openstack/marconi/v1/functions/ParseMessagesToStream.java @@ -18,7 +18,7 @@ package org.jclouds.openstack.marconi.v1.functions; import com.google.common.base.Function; import com.google.common.collect.ImmutableSet; -import com.google.common.collect.Iterables; +import com.google.common.collect.Multimap; import org.jclouds.http.HttpResponse; import org.jclouds.http.functions.ParseJson; import org.jclouds.openstack.marconi.v1.domain.Message; @@ -30,6 +30,9 @@ import javax.inject.Inject; import java.beans.ConstructorProperties; import static com.google.common.base.Preconditions.checkNotNull; +import static com.google.common.collect.Iterables.getOnlyElement; +import static com.google.common.collect.Iterables.transform; +import static org.jclouds.http.utils.Queries.queryParser; /** * @author Everett Toews @@ -51,27 +54,53 @@ public class ParseMessagesToStream implements Function<HttpResponse, MessageStre } MessagesWithHref messagesWithHref = json.apply(response); - Iterable<Message> messages = Iterables.transform(messagesWithHref, TO_MESSAGE); + Iterable<Message> messages = transform(messagesWithHref, TO_MESSAGE); return new Messages(messages, messagesWithHref.getLinks()); } - private static String getMessageId(String rawMessageHref) { - // strip off everything but the message id - return rawMessageHref.substring(rawMessageHref.lastIndexOf('/')+1); + /** + * Strip off everything but the message id. + */ + private static String getIdFromHref(String rawMessageHref) { + int indexOfQuestionMark = rawMessageHref.indexOf('?'); + int lastIndexOfSlash = rawMessageHref.lastIndexOf('/') + 1; + + if (indexOfQuestionMark > 0) { + return rawMessageHref.substring(lastIndexOfSlash, indexOfQuestionMark); + } + else { + return rawMessageHref.substring(lastIndexOfSlash); + } + } + + private static String getClaimIdFromHref(String rawMessageHref) { + int indexOfQuestionMark = rawMessageHref.indexOf('?') + 1; + + if (indexOfQuestionMark > 0) { + Multimap<String, String> queryParams = queryParser().apply(rawMessageHref.substring(indexOfQuestionMark)); + + return getOnlyElement(queryParams.get("claim_id"), null); + } + else { + return null; + } } protected static final Function<MessageWithHref, Message> TO_MESSAGE = new Function<MessageWithHref, Message>() { @Override public Message apply(MessageWithHref messageWithHref) { - return messageWithHref.toBuilder().id(getMessageId(messageWithHref.getId())).build(); + return messageWithHref.toBuilder() + .id(getIdFromHref(messageWithHref.getId())) + .claimId(getClaimIdFromHref(messageWithHref.getId())) + .build(); } }; - protected static final Function<String, String> TO_MESSAGE_ID = new Function<String, String>() { + protected static final Function<String, String> TO_ID_FROM_HREF = new Function<String, String>() { @Override public String apply(String messageIdWithHref) { - return getMessageId(messageIdWithHref); + return getIdFromHref(messageIdWithHref); } }; @@ -95,7 +124,7 @@ public class ParseMessagesToStream implements Function<HttpResponse, MessageStre @ConstructorProperties({ "href", "ttl", "body", "age" }) protected MessageWithHref(String href, int ttl, String body, int age) { - super(href, ttl, body, age); + super(href, ttl, body, age, null); } } } http://git-wip-us.apache.org/repos/asf/jclouds-labs-openstack/blob/a9a2b5bc/openstack-marconi/src/main/java/org/jclouds/openstack/marconi/v1/functions/ParseQueueStats.java ---------------------------------------------------------------------- diff --git a/openstack-marconi/src/main/java/org/jclouds/openstack/marconi/v1/functions/ParseQueueStats.java b/openstack-marconi/src/main/java/org/jclouds/openstack/marconi/v1/functions/ParseQueueStats.java index e6f0ee2..264dc38 100644 --- a/openstack-marconi/src/main/java/org/jclouds/openstack/marconi/v1/functions/ParseQueueStats.java +++ b/openstack-marconi/src/main/java/org/jclouds/openstack/marconi/v1/functions/ParseQueueStats.java @@ -25,7 +25,7 @@ import org.jclouds.openstack.marconi.v1.domain.MessagesStats; import org.jclouds.openstack.marconi.v1.domain.QueueStats; import static com.google.common.base.Preconditions.checkNotNull; -import static org.jclouds.openstack.marconi.v1.functions.ParseMessagesToStream.TO_MESSAGE_ID; +import static org.jclouds.openstack.marconi.v1.functions.ParseMessagesToStream.TO_ID_FROM_HREF; /** * This parses the stats of a queue. @@ -51,11 +51,11 @@ public class ParseQueueStats implements Function<HttpResponse, QueueStats> { // change the hrefs to ids Aged oldestWithHref = rawQueueStats.getMessagesStats().getOldest().get(); Aged oldestWithId = oldestWithHref.toBuilder() - .id(TO_MESSAGE_ID.apply(oldestWithHref.getId())) + .id(TO_ID_FROM_HREF.apply(oldestWithHref.getId())) .build(); Aged newestWithHref = rawQueueStats.getMessagesStats().getNewest().get(); Aged newestWithId = newestWithHref.toBuilder() - .id(TO_MESSAGE_ID.apply(newestWithHref.getId())) + .id(TO_ID_FROM_HREF.apply(newestWithHref.getId())) .build(); MessagesStats messagesStatsWithIds = rawQueueStats.getMessagesStats().toBuilder() http://git-wip-us.apache.org/repos/asf/jclouds-labs-openstack/blob/a9a2b5bc/openstack-marconi/src/main/java/org/jclouds/openstack/marconi/v1/options/StreamMessagesOptions.java ---------------------------------------------------------------------- diff --git a/openstack-marconi/src/main/java/org/jclouds/openstack/marconi/v1/options/StreamMessagesOptions.java b/openstack-marconi/src/main/java/org/jclouds/openstack/marconi/v1/options/StreamMessagesOptions.java index b0ff396..2b8ca40 100644 --- a/openstack-marconi/src/main/java/org/jclouds/openstack/marconi/v1/options/StreamMessagesOptions.java +++ b/openstack-marconi/src/main/java/org/jclouds/openstack/marconi/v1/options/StreamMessagesOptions.java @@ -67,6 +67,14 @@ public class StreamMessagesOptions extends PaginationOptions { } /** + * @see Builder#includeClaimed(boolean) + */ + public StreamMessagesOptions includeClaimed(boolean includeClaimed) { + queryParameters.put("include_claimed", Boolean.toString(includeClaimed)); + return this; + } + + /** * @return The String representation of the marker for these StreamMessagesOptions. */ public String getMarker() { @@ -115,5 +123,13 @@ public class StreamMessagesOptions extends PaginationOptions { StreamMessagesOptions options = new StreamMessagesOptions(); return options.echo(echo); } + + /** + * The includeClaimed parameter determines whether the API returns claimed messages. + */ + public static StreamMessagesOptions includeClaimed(boolean includeClaimed) { + StreamMessagesOptions options = new StreamMessagesOptions(); + return options.echo(includeClaimed); + } } } http://git-wip-us.apache.org/repos/asf/jclouds-labs-openstack/blob/a9a2b5bc/openstack-marconi/src/test/java/org/jclouds/openstack/marconi/v1/features/ClaimApiLiveTest.java ---------------------------------------------------------------------- diff --git a/openstack-marconi/src/test/java/org/jclouds/openstack/marconi/v1/features/ClaimApiLiveTest.java b/openstack-marconi/src/test/java/org/jclouds/openstack/marconi/v1/features/ClaimApiLiveTest.java new file mode 100644 index 0000000..b2d5d80 --- /dev/null +++ b/openstack-marconi/src/test/java/org/jclouds/openstack/marconi/v1/features/ClaimApiLiveTest.java @@ -0,0 +1,134 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.jclouds.openstack.marconi.v1.features; + +import com.google.common.collect.ImmutableList; +import com.google.common.collect.Maps; +import org.jclouds.openstack.marconi.v1.domain.Claim; +import org.jclouds.openstack.marconi.v1.domain.CreateMessage; +import org.jclouds.openstack.marconi.v1.domain.Message; +import org.jclouds.openstack.marconi.v1.domain.MessagesCreated; +import org.jclouds.openstack.marconi.v1.internal.BaseMarconiApiLiveTest; +import org.testng.annotations.Test; + +import java.util.ArrayList; +import java.util.List; +import java.util.Map; +import java.util.UUID; + +import static com.google.common.collect.Iterables.getLast; +import static org.testng.Assert.assertEquals; +import static org.testng.Assert.assertFalse; +import static org.testng.Assert.assertNotNull; +import static org.testng.Assert.assertTrue; + +@Test(groups = "live", testName = "ClaimApiLiveTest", singleThreaded = true) +public class ClaimApiLiveTest extends BaseMarconiApiLiveTest { + + private static final UUID CLIENT_ID = UUID.fromString("3381af92-2b9e-11e3-b191-71861300734c"); + private final Map<String, List<String>> claimIds = Maps.newHashMap(); + + public void createQueues() throws Exception { + for (String zoneId : zones) { + QueueApi queueApi = api.getQueueApiForZone(zoneId); + boolean success = queueApi.create("jclouds-test"); + + assertTrue(success); + } + } + + @Test(dependsOnMethods = { "createQueues" }) + public void createMessages() throws Exception { + for (String zoneId : zones) { + MessageApi messageApi = api.getMessageApiForZoneAndQueue(zoneId, "jclouds-test"); + + UUID clientId = UUID.fromString("3381af92-2b9e-11e3-b191-71861300734c"); + String json1 = "{\"event\":{\"name\":\"Austin Java User Group\",\"attendees\":[\"bob\",\"jim\",\"sally\"]}}"; + CreateMessage message1 = CreateMessage.builder().ttl(86400).body(json1).build(); + String json2 = "{\"event\":{\"name\":\"SF Java User Group\",\"attendees\":[\"bob\",\"jim\",\"sally\"]}}"; + CreateMessage message2 = CreateMessage.builder().ttl(86400).body(json2).build(); + String json3 = "{\"event\":{\"name\":\"HK Java User Group\",\"attendees\":[\"bob\",\"jim\",\"sally\"]}}"; + CreateMessage message3 = CreateMessage.builder().ttl(86400).body(json3).build(); + List<CreateMessage> messages = ImmutableList.of(message1, message2, message3); + + MessagesCreated messagesCreated = messageApi.create(clientId, messages); + + assertNotNull(messagesCreated); + assertEquals(messagesCreated.getMessageIds().size(), 3); + } + } + + @Test(dependsOnMethods = { "createMessages" }) + public void claimMessages() throws Exception { + for (String zoneId : zones) { + ClaimApi claimApi = api.getClaimApiForZoneAndClientAndQueue(zoneId, CLIENT_ID, "jclouds-test"); + + List<Message> messages = claimApi.claim(300, 200, 2); + assertEquals(messages.size(), 2); + + claimIds.put(zoneId, new ArrayList<String>()); + + for (Message message: messages) { + claimIds.get(zoneId).add(message.getClaimId().get()); + + assertNotNull(message.getId()); + assertTrue(message.getClaimId().isPresent()); + assertEquals(message.getTTL(), 86400); + } + } + } + + @Test(dependsOnMethods = { "claimMessages" }) + public void getClaim() throws Exception { + for (String zoneId : zones) { + ClaimApi claimApi = api.getClaimApiForZoneAndClientAndQueue(zoneId, CLIENT_ID, "jclouds-test"); + + Claim claim = claimApi.get(claimIds.get(zoneId).get(0)); + + assertNotNull(claim.getId()); + assertEquals(claim.getMessages().size(), 2); + assertEquals(claim.getTTL(), 300); + + for (Message message: claim.getMessages()) { + assertNotNull(message.getId()); + assertTrue(message.getClaimId().isPresent()); + assertEquals(message.getTTL(), 86400); + } + } + } + + @Test(dependsOnMethods = { "getClaim" }) + public void releaseClaim() throws Exception { + for (String zoneId : zones) { + ClaimApi claimApi = api.getClaimApiForZoneAndClientAndQueue(zoneId, CLIENT_ID, "jclouds-test"); + + boolean success = claimApi.release(claimIds.get(zoneId).get(0)); + + assertTrue(success); + } + } + + @Test(dependsOnMethods = { "getClaim" }) + public void delete() throws Exception { + for (String zoneId : zones) { + QueueApi queueApi = api.getQueueApiForZone(zoneId); + boolean success = queueApi.delete("jclouds-test"); + + assertTrue(success); + } + } +} http://git-wip-us.apache.org/repos/asf/jclouds-labs-openstack/blob/a9a2b5bc/openstack-marconi/src/test/java/org/jclouds/openstack/marconi/v1/features/ClaimApiMockTest.java ---------------------------------------------------------------------- diff --git a/openstack-marconi/src/test/java/org/jclouds/openstack/marconi/v1/features/ClaimApiMockTest.java b/openstack-marconi/src/test/java/org/jclouds/openstack/marconi/v1/features/ClaimApiMockTest.java new file mode 100644 index 0000000..04eee13 --- /dev/null +++ b/openstack-marconi/src/test/java/org/jclouds/openstack/marconi/v1/features/ClaimApiMockTest.java @@ -0,0 +1,126 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.jclouds.openstack.marconi.v1.features; + +import com.squareup.okhttp.mockwebserver.MockResponse; +import com.squareup.okhttp.mockwebserver.MockWebServer; +import org.jclouds.openstack.marconi.v1.MarconiApi; +import org.jclouds.openstack.marconi.v1.domain.Claim; +import org.jclouds.openstack.marconi.v1.domain.Message; +import org.jclouds.openstack.v2_0.internal.BaseOpenStackMockTest; +import org.testng.annotations.Test; + +import java.util.List; +import java.util.UUID; + +import static org.testng.Assert.assertEquals; +import static org.testng.Assert.assertFalse; +import static org.testng.Assert.assertNotNull; +import static org.testng.Assert.assertTrue; + +/** + * @author Everett Toews + */ +@Test +public class ClaimApiMockTest extends BaseOpenStackMockTest<MarconiApi> { + private static final UUID CLIENT_ID = UUID.fromString("3381af92-2b9e-11e3-b191-71861300734c"); + + public void claimMessages() throws Exception { + MockWebServer server = mockOpenStackServer(); + server.enqueue(new MockResponse().setBody(accessRackspace)); + server.enqueue(new MockResponse().setResponseCode(201).setBody("[{\"body\": \"{\\\"event\\\":{\\\"name\\\":\\\"HK Java User Group\\\",\\\"attendees\\\":[\\\"bob\\\",\\\"jim\\\",\\\"sally\\\"]}}\", \"age\": 1997, \"href\": \"/v1/queues/jclouds-test/messages/52a645633ac24e6f0be88d44?claim_id=52a64d30ef913e6d05e7f786\", \"ttl\": 86400}, {\"body\": \"{\\\"event\\\":{\\\"name\\\":\\\"SF Java User Group\\\",\\\"attendees\\\":[\\\"bob\\\",\\\"jim\\\",\\\"sally\\\"]}}\", \"age\": 981, \"href\": \"/v1/queues/jclouds-test/messages/52a6495bef913e6d195dcffe?claim_id=52a64d30ef913e6d05e7f786\", \"ttl\": 86400}]")); + + try { + MarconiApi api = api(server.getUrl("/").toString(), "openstack-marconi"); + ClaimApi claimApi = api.getClaimApiForZoneAndClientAndQueue("DFW", CLIENT_ID, "jclouds-test"); + + List<Message> messages = claimApi.claim(300, 200, 2); + + assertEquals(messages.size(), 2); + assertEquals(messages.get(0).getId(), "52a645633ac24e6f0be88d44"); + assertEquals(messages.get(0).getClaimId().get(), "52a64d30ef913e6d05e7f786"); + assertEquals(messages.get(0).getTTL(), 86400); + assertEquals(messages.get(1).getId(), "52a6495bef913e6d195dcffe"); + assertEquals(messages.get(1).getClaimId().get(), "52a64d30ef913e6d05e7f786"); + assertEquals(messages.get(1).getTTL(), 86400); + + assertEquals(server.getRequestCount(), 2); + assertEquals(server.takeRequest().getRequestLine(), "POST /tokens HTTP/1.1"); + assertEquals(server.takeRequest().getRequestLine(), "POST /v1/123123/queues/jclouds-test/claims?limit=2 HTTP/1.1"); + } + finally { + server.shutdown(); + } + } + + public void getClaim() throws Exception { + MockWebServer server = mockOpenStackServer(); + server.enqueue(new MockResponse().setBody(accessRackspace)); + server.enqueue(new MockResponse().setResponseCode(201).setBody("{\"age\": 209, \"href\": \"/v1/queues/jclouds-test/claims/52a8d23eb04a584f1bbd4f47\", \"messages\": [{\"body\": \"{\\\"event\\\":{\\\"name\\\":\\\"SF Java User Group\\\",\\\"attendees\\\":[\\\"bob\\\",\\\"jim\\\",\\\"sally\\\"]}}\", \"age\": 12182, \"href\": \"/v1/queues/jclouds-test/messages/52a8a379b04a584f2ec2bc3e?claim_id=52a8d23eb04a584f1bbd4f47\", \"ttl\": 86400}, {\"body\": \"{\\\"event\\\":{\\\"name\\\":\\\"Austin Java User Group\\\",\\\"attendees\\\":[\\\"bob\\\",\\\"jim\\\",\\\"sally\\\"]}}\", \"age\": 12182, \"href\": \"/v1/queues/jclouds-test/messages/52a8a379b04a584f2ec2bc3f?claim_id=52a8d23eb04a584f1bbd4f47\", \"ttl\": 86400}], \"ttl\": 300}")); + + try { + MarconiApi api = api(server.getUrl("/").toString(), "openstack-marconi"); + ClaimApi claimApi = api.getClaimApiForZoneAndClientAndQueue("DFW", CLIENT_ID, "jclouds-test"); + + Claim claim = claimApi.get("52a8d23eb04a584f1bbd4f47"); + + assertEquals(claim.getMessages().size(), 2); + assertEquals(claim.getId(), "52a8d23eb04a584f1bbd4f47"); + assertEquals(claim.getAge(), 209); + assertEquals(claim.getTTL(), 300); + + assertEquals(claim.getMessages().get(0).getId(), "52a8a379b04a584f2ec2bc3e"); + assertEquals(claim.getMessages().get(0).getClaimId().get(), "52a8d23eb04a584f1bbd4f47"); + assertEquals(claim.getMessages().get(0).getAge(), 12182); + assertEquals(claim.getMessages().get(0).getTTL(), 86400); + + assertEquals(claim.getMessages().get(1).getId(), "52a8a379b04a584f2ec2bc3f"); + assertEquals(claim.getMessages().get(1).getClaimId().get(), "52a8d23eb04a584f1bbd4f47"); + assertEquals(claim.getMessages().get(1).getAge(), 12182); + assertEquals(claim.getMessages().get(1).getTTL(), 86400); + + assertEquals(server.getRequestCount(), 2); + assertEquals(server.takeRequest().getRequestLine(), "POST /tokens HTTP/1.1"); + assertEquals(server.takeRequest().getRequestLine(), "GET /v1/123123/queues/jclouds-test/claims/52a8d23eb04a584f1bbd4f47 HTTP/1.1"); + } + finally { + server.shutdown(); + } + } + + public void releaseClaim() throws Exception { + MockWebServer server = mockOpenStackServer(); + server.enqueue(new MockResponse().setBody(accessRackspace)); + server.enqueue(new MockResponse().setResponseCode(204)); + + try { + MarconiApi api = api(server.getUrl("/").toString(), "openstack-marconi"); + ClaimApi claimApi = api.getClaimApiForZoneAndClientAndQueue("DFW", CLIENT_ID, "jclouds-test"); + + boolean success = claimApi.release("52a8d23eb04a584f1bbd4f47"); + + assertTrue(success); + + assertEquals(server.getRequestCount(), 2); + assertEquals(server.takeRequest().getRequestLine(), "POST /tokens HTTP/1.1"); + assertEquals(server.takeRequest().getRequestLine(), "DELETE /v1/123123/queues/jclouds-test/claims/52a8d23eb04a584f1bbd4f47 HTTP/1.1"); + } + finally { + server.shutdown(); + } + } +} http://git-wip-us.apache.org/repos/asf/jclouds-labs-openstack/blob/a9a2b5bc/openstack-marconi/src/test/java/org/jclouds/openstack/marconi/v1/features/MessageApiLiveTest.java ---------------------------------------------------------------------- diff --git a/openstack-marconi/src/test/java/org/jclouds/openstack/marconi/v1/features/MessageApiLiveTest.java b/openstack-marconi/src/test/java/org/jclouds/openstack/marconi/v1/features/MessageApiLiveTest.java index 47cd6e0..3d7d745 100644 --- a/openstack-marconi/src/test/java/org/jclouds/openstack/marconi/v1/features/MessageApiLiveTest.java +++ b/openstack-marconi/src/test/java/org/jclouds/openstack/marconi/v1/features/MessageApiLiveTest.java @@ -32,6 +32,7 @@ import java.util.Map; import java.util.UUID; import static com.google.common.collect.Iterables.getLast; +import static com.google.common.collect.Iterables.getOnlyElement; import static org.jclouds.openstack.marconi.v1.options.StreamMessagesOptions.Builder.echo; import static org.testng.Assert.assertEquals; import static org.testng.Assert.assertFalse; @@ -175,6 +176,20 @@ public class MessageApiLiveTest extends BaseMarconiApiLiveTest { } @Test(dependsOnMethods = { "getMessage" }) + public void deleteMessagesByClaimId() throws Exception { + for (String zoneId : zones) { + UUID clientId = UUID.fromString("3381af92-2b9e-11e3-b191-71861300734c"); + MessageApi messageApi = api.getMessageApiForZoneAndQueue(zoneId, "jclouds-test"); + ClaimApi claimApi = api.getClaimApiForZoneAndClientAndQueue(zoneId, clientId, "jclouds-test"); + Message message = getOnlyElement(claimApi.claim(300, 100, 1)); + + boolean success = messageApi.deleteByClaim(clientId, message.getId(), message.getClaimId().get()); + + assertTrue(success); + } + } + + @Test(dependsOnMethods = { "deleteMessagesByClaimId" }) public void deleteMessages() throws Exception { for (String zoneId : zones) { MessageApi messageApi = api.getMessageApiForZoneAndQueue(zoneId, "jclouds-test"); http://git-wip-us.apache.org/repos/asf/jclouds-labs-openstack/blob/a9a2b5bc/openstack-marconi/src/test/java/org/jclouds/openstack/marconi/v1/features/MessageApiMockTest.java ---------------------------------------------------------------------- diff --git a/openstack-marconi/src/test/java/org/jclouds/openstack/marconi/v1/features/MessageApiMockTest.java b/openstack-marconi/src/test/java/org/jclouds/openstack/marconi/v1/features/MessageApiMockTest.java index dbf9799..8e5a5f4 100644 --- a/openstack-marconi/src/test/java/org/jclouds/openstack/marconi/v1/features/MessageApiMockTest.java +++ b/openstack-marconi/src/test/java/org/jclouds/openstack/marconi/v1/features/MessageApiMockTest.java @@ -277,4 +277,27 @@ public class MessageApiMockTest extends BaseOpenStackMockTest<MarconiApi> { server.shutdown(); } } + + public void deleteMessageByClaimId() throws Exception { + MockWebServer server = mockOpenStackServer(); + server.enqueue(new MockResponse().setBody(accessRackspace)); + server.enqueue(new MockResponse().setResponseCode(204)); + + try { + MarconiApi api = api(server.getUrl("/").toString(), "openstack-marconi"); + MessageApi messageApi = api.getMessageApiForZoneAndQueue("DFW", "jclouds-test"); + UUID clientId = UUID.fromString("3381af92-2b9e-11e3-b191-71861300734c"); + + boolean success = messageApi.deleteByClaim(clientId, "52936b8a3ac24e6ef4c067dd", "5292b30cef913e6d026f4dec"); + + assertTrue(success); + + assertEquals(server.getRequestCount(), 2); + assertEquals(server.takeRequest().getRequestLine(), "POST /tokens HTTP/1.1"); + assertEquals(server.takeRequest().getRequestLine(), "DELETE /v1/123123/queues/jclouds-test/messages/52936b8a3ac24e6ef4c067dd?claim_id=5292b30cef913e6d026f4dec HTTP/1.1"); + } + finally { + server.shutdown(); + } + } } http://git-wip-us.apache.org/repos/asf/jclouds-labs-openstack/blob/a9a2b5bc/rackspace-cloudqueues-us/src/test/java/org/jclouds/rackspace/cloudqueues/us/CloudQueuesUSClaimApiLiveTest.java ---------------------------------------------------------------------- diff --git a/rackspace-cloudqueues-us/src/test/java/org/jclouds/rackspace/cloudqueues/us/CloudQueuesUSClaimApiLiveTest.java b/rackspace-cloudqueues-us/src/test/java/org/jclouds/rackspace/cloudqueues/us/CloudQueuesUSClaimApiLiveTest.java new file mode 100644 index 0000000..9a210b4 --- /dev/null +++ b/rackspace-cloudqueues-us/src/test/java/org/jclouds/rackspace/cloudqueues/us/CloudQueuesUSClaimApiLiveTest.java @@ -0,0 +1,30 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.jclouds.rackspace.cloudqueues.us; + +import org.jclouds.openstack.marconi.v1.features.ClaimApiLiveTest; +import org.testng.annotations.Test; + +/** + * @author Everett Toews + */ +@Test(groups = "live", testName = "CloudQueuesUSClaimApiLiveTest") +public class CloudQueuesUSClaimApiLiveTest extends ClaimApiLiveTest { + public CloudQueuesUSClaimApiLiveTest() { + provider = "rackspace-cloudqueues-us"; + } +}
