Author: aconway
Date: Fri Feb 17 14:05:56 2012
New Revision: 1245486

URL: http://svn.apache.org/viewvc?rev=1245486&view=rev
Log:
QPID-3603: Cleaned up HA log messages.

Modified:
    qpid/branches/qpid-3603-7/qpid/cpp/src/qpid/ha/Backup.cpp
    qpid/branches/qpid-3603-7/qpid/cpp/src/qpid/ha/HaBroker.cpp
    qpid/branches/qpid-3603-7/qpid/cpp/src/qpid/ha/HaPlugin.cpp
    qpid/branches/qpid-3603-7/qpid/cpp/src/qpid/ha/QueueReplicator.cpp
    qpid/branches/qpid-3603-7/qpid/cpp/src/qpid/ha/ReplicatingSubscription.cpp
    qpid/branches/qpid-3603-7/qpid/cpp/src/qpid/ha/WiringReplicator.cpp

Modified: qpid/branches/qpid-3603-7/qpid/cpp/src/qpid/ha/Backup.cpp
URL: 
http://svn.apache.org/viewvc/qpid/branches/qpid-3603-7/qpid/cpp/src/qpid/ha/Backup.cpp?rev=1245486&r1=1245485&r2=1245486&view=diff
==============================================================================
--- qpid/branches/qpid-3603-7/qpid/cpp/src/qpid/ha/Backup.cpp (original)
+++ qpid/branches/qpid-3603-7/qpid/cpp/src/qpid/ha/Backup.cpp Fri Feb 17 
14:05:56 2012
@@ -43,23 +43,20 @@ using types::Variant;
 using std::string;
 
 Backup::Backup(broker::Broker& b, const Settings& s) : broker(b), settings(s) {
-    // FIXME aconway 2011-11-24: identifying the primary.
-    if (s.brokerUrl != "primary") { // FIXME aconway 2011-11-22: temporary 
hack to identify primary.
-        Url url(s.brokerUrl);
-        QPID_LOG(info, "HA: Acting as backup");
-        string protocol = url[0].protocol.empty() ? "tcp" : url[0].protocol;
+    Url url(s.brokerUrl);
+    string protocol = url[0].protocol.empty() ? "tcp" : url[0].protocol;
 
-        // FIXME aconway 2011-11-17: TBD: link management, discovery, 
fail-over.
-        // Declare the link
-        std::pair<Link::shared_ptr, bool> result = broker.getLinks().declare(
-            url[0].host, url[0].port, protocol,
-            false,              // durable
-            s.mechanism, s.username, s.password);
-        assert(result.second);  // FIXME aconway 2011-11-23: error handling
-        link = result.first;
-        boost::shared_ptr<WiringReplicator> wr(new WiringReplicator(link));
-        broker.getExchanges().registerExchange(wr);
-    }
+    // FIXME aconway 2011-11-17: TBD: link management, discovery, fail-over.
+    // Declare the link
+    std::pair<Link::shared_ptr, bool> result = broker.getLinks().declare(
+        url[0].host, url[0].port, protocol,
+        false,              // durable
+        s.mechanism, s.username, s.password);
+    assert(result.second);  // FIXME aconway 2011-11-23: error handling
+    link = result.first;
+    boost::shared_ptr<WiringReplicator> wr(new WiringReplicator(link));
+    broker.getExchanges().registerExchange(wr);
 }
 
+
 }} // namespace qpid::ha

Modified: qpid/branches/qpid-3603-7/qpid/cpp/src/qpid/ha/HaBroker.cpp
URL: 
http://svn.apache.org/viewvc/qpid/branches/qpid-3603-7/qpid/cpp/src/qpid/ha/HaBroker.cpp?rev=1245486&r1=1245485&r2=1245486&view=diff
==============================================================================
--- qpid/branches/qpid-3603-7/qpid/cpp/src/qpid/ha/HaBroker.cpp (original)
+++ qpid/branches/qpid-3603-7/qpid/cpp/src/qpid/ha/HaBroker.cpp Fri Feb 17 
14:05:56 2012
@@ -61,7 +61,9 @@ HaBroker::HaBroker(broker::Broker& b, co
     }
     QPID_LOG(notice, "HA: Initialized: client-url=" << clientUrl
              << " broker-url=" << brokerUrl);
-    backup.reset(new Backup(broker, s));
+    // FIXME aconway 2011-11-22: temporary hack to identify primary.
+    if (s.brokerUrl != "primary")
+        backup.reset(new Backup(broker, s));
     // Register a factory for replicating subscriptions.
     broker.getConsumerFactories().add(
         boost::shared_ptr<ReplicatingSubscription::Factory>(

Modified: qpid/branches/qpid-3603-7/qpid/cpp/src/qpid/ha/HaPlugin.cpp
URL: 
http://svn.apache.org/viewvc/qpid/branches/qpid-3603-7/qpid/cpp/src/qpid/ha/HaPlugin.cpp?rev=1245486&r1=1245485&r2=1245486&view=diff
==============================================================================
--- qpid/branches/qpid-3603-7/qpid/cpp/src/qpid/ha/HaPlugin.cpp (original)
+++ qpid/branches/qpid-3603-7/qpid/cpp/src/qpid/ha/HaPlugin.cpp Fri Feb 17 
14:05:56 2012
@@ -58,7 +58,7 @@ struct HaPlugin : public Plugin {
         if (broker && settings.enabled) {
             haBroker.reset(new ha::HaBroker(*broker, settings));
         } else
-            QPID_LOG(info, "HA: Disabled");
+            QPID_LOG(notice, "HA: Disabled");
     }
 };
 

Modified: qpid/branches/qpid-3603-7/qpid/cpp/src/qpid/ha/QueueReplicator.cpp
URL: 
http://svn.apache.org/viewvc/qpid/branches/qpid-3603-7/qpid/cpp/src/qpid/ha/QueueReplicator.cpp?rev=1245486&r1=1245485&r2=1245486&view=diff
==============================================================================
--- qpid/branches/qpid-3603-7/qpid/cpp/src/qpid/ha/QueueReplicator.cpp 
(original)
+++ qpid/branches/qpid-3603-7/qpid/cpp/src/qpid/ha/QueueReplicator.cpp Fri Feb 
17 14:05:56 2012
@@ -42,7 +42,6 @@ namespace qpid {
 namespace ha {
 using namespace broker;
 
-// FIXME aconway 2011-12-02: separate file for string constantS?
 const std::string QueueReplicator::DEQUEUE_EVENT_KEY("qpid.dequeue-event");
 
 QueueReplicator::QueueReplicator(boost::shared_ptr<Queue> q, 
boost::shared_ptr<Link> l)
@@ -85,7 +84,7 @@ void QueueReplicator::initializeBridge(B
     peer.getMessage().subscribe(args.i_src, args.i_dest, args.i_sync ? 0 : 1, 
0, false, "", 0, settings);
     peer.getMessage().flow(getName(), 0, 0xFFFFFFFF);
     peer.getMessage().flow(getName(), 1, 0xFFFFFFFF);
-    QPID_LOG(debug, "HA: Activated route from queue " << args.i_src << " to " 
<< args.i_dest);
+    QPID_LOG(debug, "HA: Backup activated bridge from queue " << args.i_src << 
" to " << args.i_dest);
 }
 
 void QueueReplicator::route(Deliverable& msg, const std::string& key, const 
qpid::framing::FieldTable* /*args*/)
@@ -102,18 +101,15 @@ void QueueReplicator::route(Deliverable&
             if (current < *i) {
                 //haven't got that far yet, record the dequeue
                 dequeued.add(*i);
-                QPID_LOG(trace, "HA: Recording dequeue of message at " <<
-                         QueuePos(queue.get(), *i));
+                QPID_LOG(trace, "HA: Recording dequeue of " << 
QueuePos(queue.get(), *i));
             } else {
                 QueuedMessage message;
                 if (queue->acquireMessageAt(*i, message)) {
                     queue->dequeue(0, message);
-                    QPID_LOG(info, "HA: Dequeued message "<< 
QueuePos(message));
+                    QPID_LOG(trace, "HA: Backup dequeued: "<< 
QueuePos(message));
                 } else {
-                    // FIXME aconway 2011-11-29: error handling
-                    // Is this an error? Will happen if queue has initial 
dequeues.
-                    QPID_LOG(error, "HA: Unable to dequeue message at "
-                             << QueuePos(queue.get(), *i));
+                    // This can happen if we're replicating a queue that has 
initial dequeues.
+                    QPID_LOG(trace, "HA: Backup message already dequeued: "<< 
QueuePos(queue.get(), *i));
                 }
             }
         }
@@ -122,10 +118,10 @@ void QueueReplicator::route(Deliverable&
         //dequeued before our subscription reached them
         while (dequeued.contains(++current)) {
             dequeued.remove(current);
-            QPID_LOG(debug, "HA: Skipping dequeued message at " << current << 
" from " << queue->getName());
+            QPID_LOG(trace, "HA: Backup skipping dequeued message: " << 
QueuePos(queue.get(), current));
             queue->setPosition(current);
         }
-        QPID_LOG(info, "HA: Enqueued message on " << queue->getName() << "; 
currently at " << current);
+        QPID_LOG(trace, "HA: Backup enqueued message: " << 
QueuePos(queue.get(), current));
         msg.deliverTo(queue);
     }
 }

Modified: 
qpid/branches/qpid-3603-7/qpid/cpp/src/qpid/ha/ReplicatingSubscription.cpp
URL: 
http://svn.apache.org/viewvc/qpid/branches/qpid-3603-7/qpid/cpp/src/qpid/ha/ReplicatingSubscription.cpp?rev=1245486&r1=1245485&r2=1245486&view=diff
==============================================================================
--- qpid/branches/qpid-3603-7/qpid/cpp/src/qpid/ha/ReplicatingSubscription.cpp 
(original)
+++ qpid/branches/qpid-3603-7/qpid/cpp/src/qpid/ha/ReplicatingSubscription.cpp 
Fri Feb 17 14:05:56 2012
@@ -33,8 +33,6 @@ using namespace framing;
 using namespace broker;
 using namespace std;
 
-// FIXME aconway 2011-11-28: review all arugment names, prefixes etc.
-// Do we want a common HA prefix?
 const string 
ReplicatingSubscription::QPID_REPLICATING_SUBSCRIPTION("qpid.replicating-subscription");
 const string 
ReplicatingSubscription::QPID_HIGH_SEQUENCE_NUMBER("qpid.high-sequence-number");
 const string 
ReplicatingSubscription::QPID_LOW_SEQUENCE_NUMBER("qpid.low-sequence-number");
@@ -211,7 +209,7 @@ void ReplicatingSubscription::dequeued(c
     {
         sys::Mutex::ScopedLock l(lock);
         range.add(m.position);
-        // FIXME aconway 2011-11-29: q[pos]
+        // FIXME aconway 2011-11-29: q[pos] logging
         QPID_LOG(trace, "HA: Updated dequeue event to include " << QueuePos(m) 
<< "; subscription is at " << position);
     }
     notify();

Modified: qpid/branches/qpid-3603-7/qpid/cpp/src/qpid/ha/WiringReplicator.cpp
URL: 
http://svn.apache.org/viewvc/qpid/branches/qpid-3603-7/qpid/cpp/src/qpid/ha/WiringReplicator.cpp?rev=1245486&r1=1245485&r2=1245486&view=diff
==============================================================================
--- qpid/branches/qpid-3603-7/qpid/cpp/src/qpid/ha/WiringReplicator.cpp 
(original)
+++ qpid/branches/qpid-3603-7/qpid/cpp/src/qpid/ha/WiringReplicator.cpp Fri Feb 
17 14:05:56 2012
@@ -115,7 +115,6 @@ const string S_WIRING="wiring";
 const string S_ALL="all";
 
 ReplicateLevel replicateLevel(const string& str) {
-    // FIXME aconway 2011-11-24: case insenstive comparison.
     ReplicateLevel rl = RL_NONE;
     if (str == S_WIRING) rl = RL_WIRING;
     else if (str == S_ALL) rl = RL_ALL;
@@ -176,7 +175,7 @@ WiringReplicator::~WiringReplicator() {}
 WiringReplicator::WiringReplicator(const boost::shared_ptr<Link>& l)
     : Exchange(QPID_WIRING_REPLICATOR), broker(*l->getBroker()), link(l)
 {
-    QPID_LOG(debug, "HA: Starting replication from " <<
+    QPID_LOG(info, "HA: Backup replicating from " <<
              link->getTransport() << ":" << link->getHost() << ":" << 
link->getPort());
     broker.getLinks().declare(
         link->getHost(), link->getPort(),
@@ -212,10 +211,10 @@ void WiringReplicator::initializeBridge(
     sendQuery(QUEUE, queueName, sessionHandler);
     sendQuery(EXCHANGE, queueName, sessionHandler);
     sendQuery(BINDING, queueName, sessionHandler);
-    QPID_LOG(debug, "HA: Activated wiring replicator")
+    QPID_LOG(debug, "HA: Backup activated wiring bridge: " << queueName);
 }
 
-// FIXME aconway 2011-12-02: error  handling in route. Be forging but log 
warnings?
+// FIXME aconway 2011-12-02: error handling in route.
 void WiringReplicator::route(Deliverable& msg, const string& /*key*/, const 
framing::FieldTable* headers) {
     Variant::List list;
     try {
@@ -230,7 +229,8 @@ void WiringReplicator::route(Deliverable
                 Variant::Map& map = i->asMap();
                 Variant::Map& schema = map[SCHEMA_ID].asMap();
                 Variant::Map& values = map[VALUES].asMap();
-                QPID_LOG(trace, "HA: Configuration event: schema=" << schema 
<< " values=" << values);
+                QPID_LOG(debug, "HA: Backup received event: schema=" << schema
+                         << " values=" << values);
                 if      (match<EventQueueDeclare>(schema)) 
doEventQueueDeclare(values);
                 else if (match<EventQueueDelete>(schema)) 
doEventQueueDelete(values);
                 else if (match<EventExchangeDeclare>(schema)) 
doEventExchangeDeclare(values);
@@ -238,7 +238,9 @@ void WiringReplicator::route(Deliverable
                 else if (match<EventBind>(schema)) doEventBind(values);
                 // FIXME aconway 2011-11-21: handle unbind & all other events.
                 else if (match<EventSubscribe>(schema)) {} // Deliberately 
ignored.
-                else throw(Exception(QPID_MSG("WiringReplicator received 
unexpected event, schema=" << schema)));
+                // FIXME aconway 2011-12-02: error handling
+                else throw(Exception(QPID_MSG("Backup received unexpected 
event, schema="
+                                              << schema)));
             }
         } else if (headers->getAsString(QMF_OPCODE) == QUERY_RESPONSE) {
             for (Variant::List::iterator i = list.begin(); i != list.end(); 
++i) {
@@ -246,18 +248,20 @@ void WiringReplicator::route(Deliverable
                 Variant::Map& values = i->asMap()[VALUES].asMap();
                 framing::FieldTable args;
                 amqp_0_10::translate(values[ARGUMENTS].asMap(), args);
-                QPID_LOG(trace, "HA: Configuration response type=" << type << 
" values=" << values);
+                QPID_LOG(trace, "HA: Backup received response type=" << type
+                         << " values=" << values);
                 if      (type == QUEUE) doResponseQueue(values);
                 else if (type == EXCHANGE) doResponseExchange(values);
                 else if (type == BINDING) doResponseBind(values);
                 else throw Exception(QPID_MSG("HA: Unexpected response type: " 
<< type));
             }
         } else {
-            QPID_LOG(warning, QPID_MSG("HA: Expecting remote configuration 
message, got: " << *headers));
+            QPID_LOG(error, QPID_MSG("HA: Backup received unexpected message: "
+                                       << *headers));
         }
     } catch (const std::exception& e) {
-        QPID_LOG(warning, "HA: Error replicating configuration: " << e.what());
-        QPID_LOG(debug, "HA: Error processing configuration message: " << 
list);
+        QPID_LOG(error, "HA: Backup replication error: " << e.what());
+        QPID_LOG(error, "HA: Backup replication error while processing: " << 
list);
     }
 }
 
@@ -267,9 +271,7 @@ void WiringReplicator::doEventQueueDecla
     if (values[DISP] == CREATED && replicateLevel(argsMap)) {
          framing::FieldTable args;
         amqp_0_10::translate(argsMap, args);
-
-        QPID_LOG(debug, "HA: Creating queue from event " << name);
-       std::pair<boost::shared_ptr<Queue>, bool> result =
+        std::pair<boost::shared_ptr<Queue>, bool> result =
             broker.createQueue(
                 name,
                 values[DURABLE].asBool(),
@@ -284,10 +286,11 @@ void WiringReplicator::doEventQueueDecla
             // re-create from event.
             // Events are always up to date, whereas responses may be
             // out of date.
-            QPID_LOG(debug, "HA: New queue replica " << name);
+            QPID_LOG(debug, "HA: Created backup queue from event: " << name);
             startQueueReplicator(result.first);
         } else {
-            QPID_LOG(warning, "HA: Replicated queue " << name << " already 
exists");
+            // FIXME aconway 2011-12-02: what's the right way to handle this?
+            QPID_LOG(warning, "HA: Queue already exists on backup: " << name);
         }
     }
 }
@@ -296,7 +299,7 @@ void WiringReplicator::doEventQueueDelet
     string name = values[QNAME].asString();
     boost::shared_ptr<Queue> queue = broker.getQueues().find(name);
     if (queue && replicateLevel(queue->getSettings())) {
-        QPID_LOG(debug, "HA: Deleting queue from event: " << name);
+        QPID_LOG(debug, "HA: Deleting backup queue from event: " << name);
         broker.deleteQueue(
             name,
             values[USER].asString(),
@@ -310,18 +313,20 @@ void WiringReplicator::doEventExchangeDe
         string name = values[EXNAME].asString();
         framing::FieldTable args;
         amqp_0_10::translate(argsMap, args);
-        QPID_LOG(debug, "HA: New exchange replica " << name);
-        if (!broker.createExchange(
+        if (broker.createExchange(
                 name,
                 values[EXTYPE].asString(),
                 values[DURABLE].asBool(),
                 values[ALTEX].asString(),
                 args,
                 values[USER].asString(),
-                values[RHOST].asString()).second) {
+                values[RHOST].asString()).second)
+        {
+                    QPID_LOG(debug, "HA: created backup exchange from event: " 
<< name);
+        } else {
             // FIXME aconway 2011-11-22: should delete pre-exisitng exchange
             // and re-create from event. See comment in doEventQueueDeclare.
-            QPID_LOG(warning, "HA: Replicated exchange " << name << " already 
exists");
+            QPID_LOG(warning, "HA: Exchange already exists on backup: " << 
name);
         }
     }
 }
@@ -331,7 +336,7 @@ void WiringReplicator::doEventExchangeDe
     try {
         boost::shared_ptr<Exchange> exchange = 
broker.getExchanges().find(name);
         if (exchange && replicateLevel(exchange->getArgs())) {
-            QPID_LOG(debug, "HA: Deleting exchange:" << name);
+            QPID_LOG(debug, "HA: Deleting backup exchange:" << name);
             broker.deleteExchange(
                 name,
                 values[USER].asString(),
@@ -378,12 +383,12 @@ void WiringReplicator::doResponseQueue(V
             ""/*TODO: who is the user?*/,
             ""/*TODO: what should we use as connection id?*/);
     if (result.second) {
-        QPID_LOG(debug, "HA: New queue replica: " << values[NAME] << " (in 
catch-up)");
+        QPID_LOG(debug, "HA: Created backup queue from response: " << 
values[NAME]);
         startQueueReplicator(result.first);
     } else {
         // FIXME aconway 2011-11-22: Normal to find queue already
         // exists if we're failing over.
-        QPID_LOG(warning, "HA: Replicated queue " << values[NAME] << " already 
exists (in catch-up)");
+        QPID_LOG(warning, "HA: Queue already exists on backup: " << name);
     }
 }
 
@@ -392,16 +397,18 @@ void WiringReplicator::doResponseExchang
     if (!replicateLevel(argsMap)) return;
     framing::FieldTable args;
     amqp_0_10::translate(argsMap, args);
-    QPID_LOG(debug, "HA: New exchange replica " << values[NAME] << " (in 
catch-up)");
-    if (!broker.createExchange(
+    if (broker.createExchange(
             values[NAME].asString(),
             values[TYPE].asString(),
             values[DURABLE].asBool(),
             ""/*TODO: need to include alternate-exchange*/,
             args,
             ""/*TODO: who is the user?*/,
-            ""/*TODO: what should we use as connection id?*/).second) {
-        QPID_LOG(warning, "HA: Replicated exchange " << values[QNAME] << " 
already exists (in catch-up)");
+            ""/*TODO: what should we use as connection id?*/).second)
+    {
+        QPID_LOG(debug, "HA: Created backup exchange from response: " << 
values[NAME]);
+    } else {
+        QPID_LOG(warning, "HA: Exchange already exists on backup:  " << 
values[QNAME]);
     }
 }
 
@@ -440,10 +447,10 @@ void WiringReplicator::doResponseBind(Va
         framing::FieldTable args;
         amqp_0_10::translate(values[ARGUMENTS].asMap(), args);
         string key = values[KEY].asString();
-        QPID_LOG(debug, "HA: Replicated binding exchange=" << 
exchange->getName()
+        exchange->bind(queue, key, &args);
+        QPID_LOG(debug, "HA: Created backup binding from response: exchange=" 
<< exchange->getName()
                  << " queue=" << queue->getName()
                  << " key=" << key);
-        exchange->bind(queue, key, &args);
     }
 }
 



---------------------------------------------------------------------
Apache Qpid - AMQP Messaging Implementation
Project:      http://qpid.apache.org
Use/Interact: mailto:commits-subscr...@qpid.apache.org

Reply via email to