Author: gsim Date: Mon Sep 7 18:09:00 2009 New Revision: 812243 URL: http://svn.apache.org/viewvc?rev=812243&view=rev Log: QPID-664: Added automatic message replay on reconnection.
Added: qpid/trunk/qpid/cpp/src/qpid/client/amqp0_10/CodecsInternal.h qpid/trunk/qpid/cpp/src/qpid/client/amqp0_10/OutgoingMessage.cpp qpid/trunk/qpid/cpp/src/qpid/client/amqp0_10/OutgoingMessage.h Modified: qpid/trunk/qpid/cpp/include/qpid/messaging/Sender.h qpid/trunk/qpid/cpp/src/CMakeLists.txt qpid/trunk/qpid/cpp/src/Makefile.am qpid/trunk/qpid/cpp/src/qpid/client/amqp0_10/AddressResolution.cpp qpid/trunk/qpid/cpp/src/qpid/client/amqp0_10/IncomingMessages.cpp qpid/trunk/qpid/cpp/src/qpid/client/amqp0_10/MessageSink.h qpid/trunk/qpid/cpp/src/qpid/client/amqp0_10/SenderImpl.cpp qpid/trunk/qpid/cpp/src/qpid/client/amqp0_10/SenderImpl.h qpid/trunk/qpid/cpp/src/qpid/messaging/MessageImpl.cpp qpid/trunk/qpid/cpp/src/qpid/messaging/MessageImpl.h qpid/trunk/qpid/cpp/src/qpid/messaging/Sender.cpp qpid/trunk/qpid/cpp/src/qpid/messaging/SenderImpl.h qpid/trunk/qpid/cpp/src/tests/MessagingSessionTests.cpp Modified: qpid/trunk/qpid/cpp/include/qpid/messaging/Sender.h URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/include/qpid/messaging/Sender.h?rev=812243&r1=812242&r2=812243&view=diff ============================================================================== --- qpid/trunk/qpid/cpp/include/qpid/messaging/Sender.h (original) +++ qpid/trunk/qpid/cpp/include/qpid/messaging/Sender.h Mon Sep 7 18:09:00 2009 @@ -47,7 +47,7 @@ QPID_CLIENT_EXTERN ~Sender(); QPID_CLIENT_EXTERN Sender& operator=(const Sender&); - QPID_CLIENT_EXTERN void send(Message& message); + QPID_CLIENT_EXTERN void send(const Message& message); QPID_CLIENT_EXTERN void cancel(); private: friend class qpid::client::PrivateImplRef<Sender>; Modified: qpid/trunk/qpid/cpp/src/CMakeLists.txt URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/src/CMakeLists.txt?rev=812243&r1=812242&r2=812243&view=diff ============================================================================== --- qpid/trunk/qpid/cpp/src/CMakeLists.txt (original) +++ qpid/trunk/qpid/cpp/src/CMakeLists.txt Mon Sep 7 18:09:00 2009 @@ -540,6 +540,7 @@ qpid/client/amqp0_10/AddressResolution.h qpid/client/amqp0_10/AddressResolution.cpp qpid/client/amqp0_10/Codecs.cpp + qpid/client/amqp0_10/CodecsInternal.h qpid/client/amqp0_10/CompletionTracker.h qpid/client/amqp0_10/CompletionTracker.cpp qpid/client/amqp0_10/ConnectionImpl.h @@ -548,6 +549,8 @@ qpid/client/amqp0_10/IncomingMessages.cpp qpid/client/amqp0_10/MessageSink.h qpid/client/amqp0_10/MessageSource.h + qpid/client/amqp0_10/OutgoingMessage.h + qpid/client/amqp0_10/OutgoingMessage.cpp qpid/client/amqp0_10/ReceiverImpl.h qpid/client/amqp0_10/ReceiverImpl.cpp qpid/client/amqp0_10/SessionImpl.h Modified: qpid/trunk/qpid/cpp/src/Makefile.am URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/src/Makefile.am?rev=812243&r1=812242&r2=812243&view=diff ============================================================================== --- qpid/trunk/qpid/cpp/src/Makefile.am (original) +++ qpid/trunk/qpid/cpp/src/Makefile.am Mon Sep 7 18:09:00 2009 @@ -700,6 +700,7 @@ qpid/client/amqp0_10/AddressResolution.h \ qpid/client/amqp0_10/AddressResolution.cpp \ qpid/client/amqp0_10/Codecs.cpp \ + qpid/client/amqp0_10/CodecsInternal.h \ qpid/client/amqp0_10/ConnectionImpl.h \ qpid/client/amqp0_10/ConnectionImpl.cpp \ qpid/client/amqp0_10/CompletionTracker.h \ @@ -708,6 +709,8 @@ qpid/client/amqp0_10/IncomingMessages.cpp \ qpid/client/amqp0_10/MessageSink.h \ qpid/client/amqp0_10/MessageSource.h \ + qpid/client/amqp0_10/OutgoingMessage.h \ + qpid/client/amqp0_10/OutgoingMessage.cpp \ qpid/client/amqp0_10/ReceiverImpl.h \ qpid/client/amqp0_10/ReceiverImpl.cpp \ qpid/client/amqp0_10/SessionImpl.h \ Modified: qpid/trunk/qpid/cpp/src/qpid/client/amqp0_10/AddressResolution.cpp URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/src/qpid/client/amqp0_10/AddressResolution.cpp?rev=812243&r1=812242&r2=812243&view=diff ============================================================================== --- qpid/trunk/qpid/cpp/src/qpid/client/amqp0_10/AddressResolution.cpp (original) +++ qpid/trunk/qpid/cpp/src/qpid/client/amqp0_10/AddressResolution.cpp Mon Sep 7 18:09:00 2009 @@ -22,6 +22,7 @@ #include "qpid/client/amqp0_10/Codecs.h" #include "qpid/client/amqp0_10/MessageSource.h" #include "qpid/client/amqp0_10/MessageSink.h" +#include "qpid/client/amqp0_10/OutgoingMessage.h" #include "qpid/messaging/Address.h" #include "qpid/messaging/Filter.h" #include "qpid/messaging/Message.h" @@ -122,7 +123,7 @@ bool passive = true, const std::string& type = EMPTY_STRING, bool durable = false, const FieldTable& options = EMPTY_FIELD_TABLE); void declare(qpid::client::AsyncSession& session, const std::string& name); - void send(qpid::client::AsyncSession& session, const std::string& name, qpid::messaging::Message& message); + void send(qpid::client::AsyncSession& session, const std::string& name, OutgoingMessage& message); void cancel(qpid::client::AsyncSession& session, const std::string& name); private: const std::string name; @@ -139,7 +140,7 @@ QueueSink(const std::string& name, bool passive=true, bool exclusive=false, bool autoDelete=false, bool durable=false, const FieldTable& options = EMPTY_FIELD_TABLE); void declare(qpid::client::AsyncSession& session, const std::string& name); - void send(qpid::client::AsyncSession& session, const std::string& name, qpid::messaging::Message& message); + void send(qpid::client::AsyncSession& session, const std::string& name, OutgoingMessage& message); void cancel(qpid::client::AsyncSession& session, const std::string& name); private: const std::string name; @@ -328,14 +329,12 @@ } } -void Exchange::send(qpid::client::AsyncSession& session, const std::string&, qpid::messaging::Message& m) +void Exchange::send(qpid::client::AsyncSession& session, const std::string&, OutgoingMessage& m) { - qpid::client::Message message; - convert(m, message); - if (message.getDeliveryProperties().getRoutingKey().empty() && !defaultSubject.empty()) { - message.getDeliveryProperties().setRoutingKey(defaultSubject); + if (m.message.getDeliveryProperties().getRoutingKey().empty() && !defaultSubject.empty()) { + m.message.getDeliveryProperties().setRoutingKey(defaultSubject); } - session.messageTransfer(arg::destination=name, arg::content=message); + m.status = session.messageTransfer(arg::destination=name, arg::content=m.message); } void Exchange::cancel(qpid::client::AsyncSession&, const std::string&) {} @@ -355,12 +354,10 @@ arg::autoDelete=autoDelete, arg::arguments=options); } } -void QueueSink::send(qpid::client::AsyncSession& session, const std::string&, qpid::messaging::Message& m) +void QueueSink::send(qpid::client::AsyncSession& session, const std::string&, OutgoingMessage& m) { - qpid::client::Message message; - convert(m, message); - message.getDeliveryProperties().setRoutingKey(name); - session.messageTransfer(arg::content=message); + m.message.getDeliveryProperties().setRoutingKey(name); + m.status = session.messageTransfer(arg::content=m.message); } void QueueSink::cancel(qpid::client::AsyncSession&, const std::string&) {} Added: qpid/trunk/qpid/cpp/src/qpid/client/amqp0_10/CodecsInternal.h URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/src/qpid/client/amqp0_10/CodecsInternal.h?rev=812243&view=auto ============================================================================== --- qpid/trunk/qpid/cpp/src/qpid/client/amqp0_10/CodecsInternal.h (added) +++ qpid/trunk/qpid/cpp/src/qpid/client/amqp0_10/CodecsInternal.h Mon Sep 7 18:09:00 2009 @@ -0,0 +1,41 @@ +#ifndef QPID_CLIENT_AMQP0_10_CODECSINTERNAL_H +#define QPID_CLIENT_AMQP0_10_CODECSINTERNAL_H + +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ +#include "qpid/messaging/Variant.h" +#include "qpid/framing/FieldTable.h" + +namespace qpid { +namespace client { +namespace amqp0_10 { + +/** + * Declarations of a couple of conversion functions implemented in + * Codecs.cpp but not exposed through API + */ + +void translate(const qpid::messaging::Variant::Map& from, qpid::framing::FieldTable& to); +void translate(const qpid::framing::FieldTable& from, qpid::messaging::Variant::Variant::Map& to); + +}}} // namespace qpid::client::amqp0_10 + +#endif /*!QPID_CLIENT_AMQP0_10_CODECSINTERNAL_H*/ Modified: qpid/trunk/qpid/cpp/src/qpid/client/amqp0_10/IncomingMessages.cpp URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/src/qpid/client/amqp0_10/IncomingMessages.cpp?rev=812243&r1=812242&r2=812243&view=diff ============================================================================== --- qpid/trunk/qpid/cpp/src/qpid/client/amqp0_10/IncomingMessages.cpp (original) +++ qpid/trunk/qpid/cpp/src/qpid/client/amqp0_10/IncomingMessages.cpp Mon Sep 7 18:09:00 2009 @@ -21,6 +21,7 @@ #include "qpid/client/amqp0_10/IncomingMessages.h" #include "qpid/client/amqp0_10/AddressResolution.h" #include "qpid/client/amqp0_10/Codecs.h" +#include "qpid/client/amqp0_10/CodecsInternal.h" #include "qpid/client/SessionImpl.h" #include "qpid/client/SessionBase_0_10Access.h" #include "qpid/log/Statement.h" @@ -195,8 +196,6 @@ parent.retrieve(content, message); } -void translate(const FieldTable& from, Variant::Map& to);//implemented in Codecs.cpp - void populateHeaders(qpid::messaging::Message& message, const DeliveryProperties* deliveryProperties, const MessageProperties* messageProperties) Modified: qpid/trunk/qpid/cpp/src/qpid/client/amqp0_10/MessageSink.h URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/src/qpid/client/amqp0_10/MessageSink.h?rev=812243&r1=812242&r2=812243&view=diff ============================================================================== --- qpid/trunk/qpid/cpp/src/qpid/client/amqp0_10/MessageSink.h (original) +++ qpid/trunk/qpid/cpp/src/qpid/client/amqp0_10/MessageSink.h Mon Sep 7 18:09:00 2009 @@ -33,6 +33,8 @@ namespace client { namespace amqp0_10 { +class OutgoingMessage; + /** * */ @@ -41,7 +43,7 @@ public: virtual ~MessageSink() {} virtual void declare(qpid::client::AsyncSession& session, const std::string& name) = 0; - virtual void send(qpid::client::AsyncSession& session, const std::string& name, qpid::messaging::Message& message) = 0; + virtual void send(qpid::client::AsyncSession& session, const std::string& name, OutgoingMessage& message) = 0; virtual void cancel(qpid::client::AsyncSession& session, const std::string& name) = 0; private: }; Added: qpid/trunk/qpid/cpp/src/qpid/client/amqp0_10/OutgoingMessage.cpp URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/src/qpid/client/amqp0_10/OutgoingMessage.cpp?rev=812243&view=auto ============================================================================== --- qpid/trunk/qpid/cpp/src/qpid/client/amqp0_10/OutgoingMessage.cpp (added) +++ qpid/trunk/qpid/cpp/src/qpid/client/amqp0_10/OutgoingMessage.cpp Mon Sep 7 18:09:00 2009 @@ -0,0 +1,64 @@ +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ +#include "qpid/client/amqp0_10/OutgoingMessage.h" +#include "qpid/client/amqp0_10/AddressResolution.h" +#include "qpid/client/amqp0_10/Codecs.h" +#include "qpid/client/amqp0_10/CodecsInternal.h" +#include "qpid/messaging/Address.h" +#include "qpid/messaging/Message.h" +#include "qpid/messaging/MessageImpl.h" + +namespace qpid { +namespace client { +namespace amqp0_10 { + +using qpid::messaging::Address; +using qpid::messaging::MessageImplAccess; + +template <class T> void encode(const qpid::messaging::Message& from, qpid::client::Message& to) +{ + T codec; + MessageImplAccess::get(from).getEncodedContent(codec, to.getData()); + to.getMessageProperties().setContentType(T::contentType); +} + +void OutgoingMessage::convert(const qpid::messaging::Message& from) +{ + //TODO: need to avoid copying as much as possible + if (from.getContent().isList()) { + encode<ListCodec>(from, message); + } else if (from.getContent().isMap()) { + encode<MapCodec>(from, message); + } else { + message.setData(from.getBytes()); + message.getMessageProperties().setContentType(from.getContentType()); + } + const Address& address = from.getReplyTo(); + if (!address.value.empty()) { + message.getMessageProperties().setReplyTo(AddressResolution::convert(address)); + } + translate(from.getHeaders(), message.getMessageProperties().getApplicationHeaders()); + //TODO: set other message properties + message.getDeliveryProperties().setRoutingKey(from.getSubject()); + //TODO: set other delivery properties +} + +}}} // namespace qpid::client::amqp0_10 Added: qpid/trunk/qpid/cpp/src/qpid/client/amqp0_10/OutgoingMessage.h URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/src/qpid/client/amqp0_10/OutgoingMessage.h?rev=812243&view=auto ============================================================================== --- qpid/trunk/qpid/cpp/src/qpid/client/amqp0_10/OutgoingMessage.h (added) +++ qpid/trunk/qpid/cpp/src/qpid/client/amqp0_10/OutgoingMessage.h Mon Sep 7 18:09:00 2009 @@ -0,0 +1,46 @@ +#ifndef QPID_CLIENT_AMQP0_10_OUTGOINGMESSAGE_H +#define QPID_CLIENT_AMQP0_10_OUTGOINGMESSAGE_H + +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ +#include "qpid/client/Completion.h" +#include "qpid/client/Message.h" + +namespace qpid { +namespace messaging { +class Message; +} +namespace client { +namespace amqp0_10 { + +struct OutgoingMessage +{ + qpid::client::Message message; + qpid::client::Completion status; + + void convert(const qpid::messaging::Message&); +}; + + + +}}} // namespace qpid::client::amqp0_10 + +#endif /*!QPID_CLIENT_AMQP0_10_OUTGOINGMESSAGE_H*/ Modified: qpid/trunk/qpid/cpp/src/qpid/client/amqp0_10/SenderImpl.cpp URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/src/qpid/client/amqp0_10/SenderImpl.cpp?rev=812243&r1=812242&r2=812243&view=diff ============================================================================== --- qpid/trunk/qpid/cpp/src/qpid/client/amqp0_10/SenderImpl.cpp (original) +++ qpid/trunk/qpid/cpp/src/qpid/client/amqp0_10/SenderImpl.cpp Mon Sep 7 18:09:00 2009 @@ -22,6 +22,7 @@ #include "MessageSink.h" #include "SessionImpl.h" #include "AddressResolution.h" +#include "OutgoingMessage.h" namespace qpid { namespace client { @@ -30,9 +31,10 @@ SenderImpl::SenderImpl(SessionImpl& _parent, const std::string& _name, const qpid::messaging::Address& _address, const qpid::messaging::Variant::Map& _options) : - parent(_parent), name(_name), address(_address), options(_options), state(UNRESOLVED) {} + parent(_parent), name(_name), address(_address), options(_options), state(UNRESOLVED), + capacity(50), window(0) {} -void SenderImpl::send(qpid::messaging::Message& m) +void SenderImpl::send(const qpid::messaging::Message& m) { execute1<Send>(&m); } @@ -54,14 +56,36 @@ parent.senderCancelled(name); } else { sink->declare(session, name); - //TODO: replay + replay(); } } -void SenderImpl::sendImpl(qpid::messaging::Message& m) +void SenderImpl::sendImpl(const qpid::messaging::Message& m) { - //TODO: record for replay if appropriate - sink->send(session, name, m); + //TODO: make recoding for replay optional + std::auto_ptr<OutgoingMessage> msg(new OutgoingMessage()); + msg->convert(m); + outgoing.push_back(msg.release()); + sink->send(session, name, outgoing.back()); + if (++window > (capacity / 2)) {//TODO: make this configurable? + session.flush(); + checkPendingSends(); + window = 0; + } +} + +void SenderImpl::replay() +{ + for (OutgoingMessages::iterator i = outgoing.begin(); i != outgoing.end(); ++i) { + sink->send(session, name, *i); + } +} + +void SenderImpl::checkPendingSends() +{ + while (!outgoing.empty() && outgoing.front().status.isComplete()) { + outgoing.pop_front(); + } } void SenderImpl::cancelImpl() Modified: qpid/trunk/qpid/cpp/src/qpid/client/amqp0_10/SenderImpl.h URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/src/qpid/client/amqp0_10/SenderImpl.h?rev=812243&r1=812242&r2=812243&view=diff ============================================================================== --- qpid/trunk/qpid/cpp/src/qpid/client/amqp0_10/SenderImpl.h (original) +++ qpid/trunk/qpid/cpp/src/qpid/client/amqp0_10/SenderImpl.h Mon Sep 7 18:09:00 2009 @@ -28,6 +28,7 @@ #include "qpid/client/AsyncSession.h" #include "qpid/client/amqp0_10/SessionImpl.h" #include <memory> +#include <boost/ptr_container/ptr_deque.hpp> namespace qpid { namespace client { @@ -35,6 +36,7 @@ class AddressResolution; class MessageSink; +class OutgoingMessage; /** * @@ -47,7 +49,7 @@ SenderImpl(SessionImpl& parent, const std::string& name, const qpid::messaging::Address& address, const qpid::messaging::Variant::Map& options); - void send(qpid::messaging::Message&); + void send(const qpid::messaging::Message&); void cancel(); void init(qpid::client::AsyncSession, AddressResolution&); @@ -63,8 +65,16 @@ std::string destination; std::string routingKey; + typedef boost::ptr_deque<OutgoingMessage> OutgoingMessages; + OutgoingMessages outgoing; + uint32_t capacity; + uint32_t window; + + void checkPendingSends(); + void replay(); + //logic for application visible methods: - void sendImpl(qpid::messaging::Message&); + void sendImpl(const qpid::messaging::Message&); void cancelImpl(); //functors for application visible methods (allowing locking and @@ -78,9 +88,9 @@ struct Send : Command { - qpid::messaging::Message* message; + const qpid::messaging::Message* message; - Send(SenderImpl& i, qpid::messaging::Message* m) : Command(i), message(m) {} + Send(SenderImpl& i, const qpid::messaging::Message* m) : Command(i), message(m) {} void operator()() { impl.sendImpl(*message); } }; Modified: qpid/trunk/qpid/cpp/src/qpid/messaging/MessageImpl.cpp URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/src/qpid/messaging/MessageImpl.cpp?rev=812243&r1=812242&r2=812243&view=diff ============================================================================== --- qpid/trunk/qpid/cpp/src/qpid/messaging/MessageImpl.cpp (original) +++ qpid/trunk/qpid/cpp/src/qpid/messaging/MessageImpl.cpp Mon Sep 7 18:09:00 2009 @@ -126,6 +126,15 @@ } } +void MessageImpl::getEncodedContent(Codec& codec, std::string& out) const +{ + if (content.getType() != VAR_VOID) { + codec.encode(content, out); + } else { + out = bytes; + } +} + void MessageImpl::decode(Codec& codec) { codec.decode(bytes, content); @@ -188,5 +197,9 @@ { return *msg.impl; } +const MessageImpl& MessageImplAccess::get(const Message& msg) +{ + return *msg.impl; +} }} // namespace qpid::messaging Modified: qpid/trunk/qpid/cpp/src/qpid/messaging/MessageImpl.h URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/src/qpid/messaging/MessageImpl.h?rev=812243&r1=812242&r2=812243&view=diff ============================================================================== --- qpid/trunk/qpid/cpp/src/qpid/messaging/MessageImpl.h (original) +++ qpid/trunk/qpid/cpp/src/qpid/messaging/MessageImpl.h Mon Sep 7 18:09:00 2009 @@ -84,6 +84,7 @@ void clear(); + void getEncodedContent(Codec& codec, std::string&) const; void encode(Codec& codec); void decode(Codec& codec); @@ -125,6 +126,7 @@ struct MessageImplAccess { static MessageImpl& get(Message&); + static const MessageImpl& get(const Message&); }; }} // namespace qpid::messaging Modified: qpid/trunk/qpid/cpp/src/qpid/messaging/Sender.cpp URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/src/qpid/messaging/Sender.cpp?rev=812243&r1=812242&r2=812243&view=diff ============================================================================== --- qpid/trunk/qpid/cpp/src/qpid/messaging/Sender.cpp (original) +++ qpid/trunk/qpid/cpp/src/qpid/messaging/Sender.cpp Mon Sep 7 18:09:00 2009 @@ -38,7 +38,7 @@ Sender::Sender(const Sender& s) : qpid::client::Handle<SenderImpl>() { PI::copy(*this, s); } Sender::~Sender() { PI::dtor(*this); } Sender& Sender::operator=(const Sender& s) { return PI::assign(*this, s); } -void Sender::send(Message& message) { impl->send(message); } +void Sender::send(const Message& message) { impl->send(message); } void Sender::cancel() { impl->cancel(); } }} // namespace qpid::messaging Modified: qpid/trunk/qpid/cpp/src/qpid/messaging/SenderImpl.h URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/src/qpid/messaging/SenderImpl.h?rev=812243&r1=812242&r2=812243&view=diff ============================================================================== --- qpid/trunk/qpid/cpp/src/qpid/messaging/SenderImpl.h (original) +++ qpid/trunk/qpid/cpp/src/qpid/messaging/SenderImpl.h Mon Sep 7 18:09:00 2009 @@ -35,7 +35,7 @@ { public: virtual ~SenderImpl() {} - virtual void send(Message& message) = 0; + virtual void send(const Message& message) = 0; virtual void cancel() = 0; private: }; Modified: qpid/trunk/qpid/cpp/src/tests/MessagingSessionTests.cpp URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/src/tests/MessagingSessionTests.cpp?rev=812243&r1=812242&r2=812243&view=diff ============================================================================== --- qpid/trunk/qpid/cpp/src/tests/MessagingSessionTests.cpp (original) +++ qpid/trunk/qpid/cpp/src/tests/MessagingSessionTests.cpp Mon Sep 7 18:09:00 2009 @@ -310,7 +310,6 @@ sender.send(out); Receiver receiver = fix.session.createReceiver(fix.queue); Message in = receiver.fetch(5 * qpid::sys::TIME_SEC); - BOOST_CHECK_EQUAL(in.getBytes(), out.getBytes()); BOOST_CHECK_EQUAL(in.getContent().asMap()["abc"].asString(), "def"); BOOST_CHECK_EQUAL(in.getContent().asMap()["pi"].asFloat(), 3.14f); fix.session.acknowledge(); @@ -329,7 +328,6 @@ sender.send(out); Receiver receiver = fix.session.createReceiver(fix.queue); Message in = receiver.fetch(5 * qpid::sys::TIME_SEC); - BOOST_CHECK_EQUAL(in.getBytes(), out.getBytes()); Variant::List& list = in.getContent().asList(); BOOST_CHECK_EQUAL(list.size(), out.getContent().asList().size()); BOOST_CHECK_EQUAL(list.front().asString(), "abc"); --------------------------------------------------------------------- Apache Qpid - AMQP Messaging Implementation Project: http://qpid.apache.org Use/Interact: mailto:commits-subscr...@qpid.apache.org