Author: tross
Date: Tue Jun 16 14:57:06 2009
New Revision: 785243

URL: http://svn.apache.org/viewvc?rev=785243&view=rev
Log:
Added a new qmf-console example program.
This new program illustrates how individual nodes of a broker cluster may be 
monitored.
It connects to all nodes, tracks their connectivity, and queries only one of 
the connected
nodes.
This mechanism is used in lieu of cluster-failover which is not appropriate for 
broker management.

Added:
    qpid/trunk/qpid/cpp/examples/qmf-console/cluster-qmon.cpp
Modified:
    qpid/trunk/qpid/cpp/examples/qmf-console/Makefile.am

Modified: qpid/trunk/qpid/cpp/examples/qmf-console/Makefile.am
URL: 
http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/examples/qmf-console/Makefile.am?rev=785243&r1=785242&r2=785243&view=diff
==============================================================================
--- qpid/trunk/qpid/cpp/examples/qmf-console/Makefile.am (original)
+++ qpid/trunk/qpid/cpp/examples/qmf-console/Makefile.am Tue Jun 16 14:57:06 
2009
@@ -22,7 +22,7 @@
 MAKELDFLAGS=$(CONSOLEFLAGS)
 include $(top_srcdir)/examples/makedist.mk
 
-noinst_PROGRAMS=console printevents ping queuestats
+noinst_PROGRAMS=console printevents ping queuestats cluster-qmon
 
 console_SOURCES=console.cpp
 console_LDADD=$(CONSOLE_LIB)
@@ -36,11 +36,15 @@
 queuestats_SOURCES=queuestats.cpp
 queuestats_LDADD=$(CONSOLE_LIB)
 
+cluster_qmon_SOURCES=cluster-qmon.cpp
+cluster_qmon_LDADD=$(CONSOLE_LIB)
+
 examples_DATA= \
        console.cpp \
        printevents.cpp \
        ping.cpp \
        queuestats.cpp \
+       cluster-qmon.cpp \
        $(MAKEDIST)
 
 EXTRA_DIST=                  \

Added: qpid/trunk/qpid/cpp/examples/qmf-console/cluster-qmon.cpp
URL: 
http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/examples/qmf-console/cluster-qmon.cpp?rev=785243&view=auto
==============================================================================
--- qpid/trunk/qpid/cpp/examples/qmf-console/cluster-qmon.cpp (added)
+++ qpid/trunk/qpid/cpp/examples/qmf-console/cluster-qmon.cpp Tue Jun 16 
14:57:06 2009
@@ -0,0 +1,170 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * 
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+
+#include "qpid/console/ConsoleListener.h"
+#include "qpid/console/SessionManager.h"
+#include "qpid/sys/Time.h"
+#include "qpid/sys/Mutex.h"
+#include <signal.h>
+#include <map>
+
+using namespace std;
+using namespace qpid::console;
+using qpid::sys::Mutex;
+
+//
+//  This example maintains connections to a number of brokers (assumed
+//  to be running on localhost and at ports listed in the command line
+//  arguments).
+//
+//  The program then periodically polls queue information from a
+//  single operational broker.  This is a useful illustration of how
+//  one might monitor statistics on a cluster of brokers.
+//
+
+//==============================================================
+// Main program
+//==============================================================
+class Main : public ConsoleListener {
+    bool  stopping;  // Used to tell the program to exit
+    Mutex lock;      // Mutex to protect the broker-map
+    map<Broker*, bool> brokerMap; // Map of broker-pointers to boolean 
"operational" status
+
+public:
+    Main() : stopping(false) {}
+
+    /** Invoked when a connection is established to a broker
+     */
+    void brokerConnected(const Broker& broker)
+    {
+        Mutex::ScopedLock l(lock);
+        brokerMap[const_cast<Broker*>(&broker)] = true;
+    }
+
+    /** Invoked when the connection to a broker is lost
+     */
+    void brokerDisconnected(const Broker& broker)
+    {
+        Mutex::ScopedLock l(lock);
+        brokerMap[const_cast<Broker*>(&broker)] = false;
+    }
+
+    int run(int argc, char** argv)
+    {
+        //
+        // Tune the settings for this application:  We will operate 
synchronously only, we don't
+        // wish to use the bandwidth needed to aysnchronously receive objects 
or events.
+        //
+        SessionManager::Settings sessionSettings;
+        sessionSettings.rcvObjects = false;
+        sessionSettings.rcvEvents = false;
+        sessionSettings.rcvHeartbeats = false;
+
+        SessionManager sm(this, sessionSettings);
+
+        //
+        // Connect to the brokers.
+        //
+        for (int idx = 1; idx < argc; idx++) {
+            Mutex::ScopedLock l(lock);
+            qpid::client::ConnectionSettings connSettings;
+            connSettings.host = "localhost";
+            connSettings.port = atoi(argv[idx]);
+            Broker* broker = sm.addBroker(connSettings);
+            brokerMap[broker] = false; // initially assume broker is 
disconnected
+        }
+
+        //
+        // Periodically poll the first connected broker.
+        //
+        while (!stopping) {
+            //
+            // Find an operational broker
+            //
+            Broker* operationalBroker = 0;
+            for (map<Broker*, bool>::iterator iter = brokerMap.begin();
+                 iter != brokerMap.end(); iter++) {
+                if (iter->second) {
+                    operationalBroker = iter->first;
+                    break;
+                }
+            }
+
+            if (operationalBroker != 0) {
+                Object::Vector list;
+                sm.getObjects(list, "queue", operationalBroker);
+                for (Object::Vector::iterator i = list.begin(); i != 
list.end(); i++) {
+                    cout << "queue: " << i->attrString("name");
+                    cout << "  bindingCount=" << i->attrUint64("bindingCount") 
<< endl;
+                }
+            } else {
+                cout << "No operational brokers" << endl;
+            }
+
+            qpid::sys::sleep(10);
+            if (stopping)
+                break;
+        }
+
+        {
+            //
+            //  The following code structure uses the mutex to protect the 
broker map while
+            //  ensuring that sm.delBroker is called without the mutex held 
(which leads to
+            //  a deadlock).
+            //
+            Mutex::ScopedLock l(lock);
+            map<Broker*, bool>::iterator iter = brokerMap.begin();
+            while (iter != brokerMap.end()) {
+                Broker* broker = iter->first;
+                brokerMap.erase(iter);
+                {
+                    Mutex::ScopedUnlock ul(lock);
+                    sm.delBroker(broker);
+                }
+                iter = brokerMap.begin();
+            }
+        }
+
+        return 0;
+    }
+
+    void stop() {
+        stopping = true;
+    }
+};
+
+Main main_program;
+
+void signal_handler(int)
+{
+    main_program.stop();
+}
+
+int main(int argc, char** argv)
+{
+    signal(SIGINT, signal_handler);
+    try {
+        return main_program.run(argc, argv);
+    } catch(std::exception& e) {
+        cout << "Top Level Exception: " << e.what() << endl;
+    }
+}
+



---------------------------------------------------------------------
Apache Qpid - AMQP Messaging Implementation
Project:      http://qpid.apache.org
Use/Interact: mailto:commits-subscr...@qpid.apache.org

Reply via email to