Excel RTD Server
----------------

                 Key: QPID-1146
                 URL: https://issues.apache.org/jira/browse/QPID-1146
             Project: Qpid
          Issue Type: New Feature
          Components: Dot Net Client
    Affects Versions: M2
         Environment: Windows .NET with Excel
            Reporter: Shahbaz Chaudhary
             Fix For: M2


--QUOTED FROM AN EMAIL SENT TO QPID'S MAILING LIST--
Hi All,

The following email contains an Excel RTD server which is able to subscribe to 
information from an M2 Qpid server.
This is just a proof of concept, there are almost no optimizations, check for 
leaks, etc.  I'm not really a .NET programmer and I have never done any COM 
programming (and no C++ since college).

In any case, try it out.  It works for me.  I haven't figured out how to run it 
outside Visual Studio 2008 yet (no idea how to create install packages, 
register assemblies, etc.).

QPID C# folks are welcome to use it as they wish.  I probably won't maintain 
this, hopefully someone else will.

In excel, you just have to type the following formula:
=rtd("rtd.test",,"amqp://guest:[EMAIL 
PROTECTED]/test?brokerlist='tcp://<host>:567
2'","<topic>","<field>")
This will subscribe to a topic, each time an update is received, it will 
retrieve a field and display it in Excel.
Example:

=rtd("rtd.test",,"amqp://guest:[EMAIL 
PROTECTED]/test?brokerlist='tcp://<host>:567
2'","md.bidsoffers","price")

This will just display the price for all incoming bidsoffers (so MSFT/ORCL/EBAY 
will be mixed in)

You can also add 'filters' to the RTD function.  Just append two parameters to 
the end of the previous RTD function, the first parameter refers to the field 
you wish to use in comparison, the second parameter refers to the value the 
first parameter must have.

Example:

=rtd("rtd.test",,"amqp://guest:[EMAIL 
PROTECTED]/test?brokerlist='tcp://<host>:567
2'","md.bidsoffers","price",
"symbol","MSFT")

This will subscribe to the same information as the last RTD function, but will 
only display prices for MSFT.

You can add as many filters as you like, just make sure you append the RTD 
function with a filter field and a filter value.
--8<------------------------------RTDTest.cs----------------------------
-----
using System;
using System.Collections.Generic;
using System.Linq;
using System.Text;
using Apache.Qpid.Messaging;
using Apache.Qpid.Client.Qms;
using Apache.Qpid.Client;
using System.Runtime.InteropServices;
using Microsoft.Office.Interop.Excel;

//Shahbaz Chaudhary
namespace RTDTest
{
    [ComVisible(true), ProgId("RTD.Test")]
    public class RTDTest : IRtdServer
    {
        //QPID CACHE
        Dictionary<string, IChannel> channelCache;//url, channel
        Dictionary<string, int> channelCacheCount;
        Dictionary<string, IMessageConsumer> topicCache;//url+topic, consumer
        Dictionary<string, int> topicCacheCount;
        Dictionary<int, string> topicIDCache;//url+topic+field,  topicid
        Dictionary<string, IList<Tuple2<int, string>>> topicTopicIDFieldCache;
        Dictionary<int, Tuple2<string[], string[]>> filtersCache;
        //END QPID CACHE

        //IRTDServer Globals
        IRTDUpdateEvent updateEvent;
        Queue<Tuple2<int, object>> refreshQ;
        //END IRTDServer Globals

        public RTDTest()
        {
            channelCache = new Dictionary<string, IChannel>();
            topicCache = new Dictionary<string, IMessageConsumer>();
            channelCacheCount = new Dictionary<string, int>();
            topicCacheCount = new Dictionary<string, int>();
            topicIDCache = new Dictionary<int, string>();
            topicTopicIDFieldCache = new Dictionary<string, IList<Tuple2<int, 
string>>>();
            filtersCache = new Dictionary<int, Tuple2<string[], string[]>>();

            refreshQ = new Queue<Tuple2<int, object>>();
        }

        //QPID METHODS

        private IChannel getChannel(string url)
        {
            IChannel chan;
            if (channelCache.ContainsKey(url))
            {
                chan = channelCache[url];
            }
            else
            {
                IConnectionInfo connectionInfo = 
QpidConnectionInfo.FromUrl(url);
                Apache.Qpid.Messaging.IConnection connection = new 
AMQConnection(connectionInfo);
                IChannel channel = connection.CreateChannel(false, 
AcknowledgeMode.AutoAcknowledge, 1);
                connection.Start();
                chan = channel;
                channelCache[url] = chan;
            }
            return chan;
        }

        private IMessageConsumer getTopicConsumer(string url, string
topic)
        {
            IMessageConsumer cons;
            string key = url + topic;
            if (topicCache.ContainsKey(key))
            {
                cons = topicCache[key];
            }
            else
            {
                IChannel channel = getChannel(url);
                string tempQ = channel.GenerateUniqueName();
                channel.DeclareQueue(tempQ, false, true, true);
                cons = channel.CreateConsumerBuilder(tempQ).Create();
                channel.Bind(tempQ, ExchangeNameDefaults.TOPIC, topic);
                topicCache[key] = cons;
            }
            return cons;
        }

        private IList<Tuple2<int, string>> getFields(string topic)
        {
            if (!topicTopicIDFieldCache.ContainsKey(topic))
            {
                topicTopicIDFieldCache[topic] = new List<Tuple2<int,
string>>();
            }
            return topicTopicIDFieldCache[topic];
        }

        private void onMessage(IMessage msg, string url, string topic, string 
field, int topicid)
        {
                foreach (Tuple2<int, string> f in getFields(topic))//?
                {
                    int id = f.a;
                    object value = msg.Headers[f.b];
                    //Dictionary<int, object> d = new Dictionary<int,
object>();
                    //d.Add(id, value);
                    string[] filterFields = filtersCache[id].a;
                    string[] filterVals = filtersCache[id].b;
                    if(allFiltersTrue(filterFields,filterVals,msg)){
                        refreshQ.Enqueue(new Tuple2<int,object>(id,value));
                    }
                }
                try
                {
                    updateEvent.UpdateNotify();
                }
                catch (COMException e)
                {
                }
        }

        void registerTopicID(string url, string topic, string field, int
topicid)
        {
            string val = url + "|" + topic + "|" + field;
            topicIDCache.Add(topicid, val);
            Tuple2<int, string> dict = new Tuple2<int,
string>(topicid,field);
            getFields(topic).Add(dict);

            if (!channelCacheCount.ContainsKey(url))
channelCacheCount[url] = 0;
            channelCacheCount[url]++;
            if (!topicCacheCount.ContainsKey(url + "|" + topic)) 
topicCacheCount[url + "|" + topic] = 0;
            topicCacheCount[url + "|" + topic]++;


            getTopicConsumer(url, topic).OnMessage += msg => { 
onMessage(msg,url, topic, field, topicid); };
        }

        private bool allFiltersTrue(string[] filterKeys, string[] filterVals, 
IMessage msg)
        {
            for (int i = 0; i < filterKeys.Length; i++)
            {
                if
(!msg.Headers[filterKeys[i]].ToString().Equals(filterVals[i]))
                {
                    return false;
                }
            }
            return true;
        }

        public void removeRegisteredTopic(int topicid)
        {
            string vals = topicIDCache[topicid];
            string[] keys = vals.Split(new char[] { '|' });
            string url = keys[0];
            string topic = keys[1];
            string field = keys[2];
            channelCacheCount[url]--;
            topicCacheCount[url + "|" + topic]--;

            if (channelCacheCount[url] <= 0)
            {
                channelCacheCount.Remove(url);
                channelCache[url].Dispose();
                channelCache.Remove(url);
            }
            if (topicCacheCount[url + "|" + topic] <= 0)
            {
                topicCacheCount.Remove(url + "|" + topic);
                topicCache[url + "|" + topic].Dispose();
                topicCache.Remove(url + "|" + topic);

                topicTopicIDFieldCache.Remove(topic);
            }
            filtersCache.Remove(topicid);
        }

        //END QPID METHODS
 
//----------------------------------------------------------------------
-----------------------------------
        //IRTDServer METHODS
        #region IRtdServer Members

        public int ServerStart(IRTDUpdateEvent CallbackObject)
        {
            updateEvent = CallbackObject;
            return 1;
        }

        public object ConnectData(int TopicID, ref Array Strings, ref bool 
GetNewValues)
        {
            int size = Strings.Length;
            int conditions = (int)Math.Floor((double)(size - 3) / 2);

            string url;
            string topic;
            string field;
            string[] filterKeys = new string[conditions];
            string[] filterVals = new string[conditions];

            url = (string)Strings.GetValue(0);
            topic = (string)Strings.GetValue(1);
            field = (string)Strings.GetValue(2);

            for (int i = 0; i < conditions; i = i + 2)
            {
                filterKeys[i] = (string)Strings.GetValue(i + 3);
                filterVals[i] = (string)Strings.GetValue(i + 1 + 3);
            }

            Tuple2<string[], string[]> filters = new Tuple2<string[], 
string[]>(filterKeys,filterVals);
            filtersCache.Add(TopicID, filters);

            registerTopicID(url, topic, field, TopicID);
            return "Getting data...";
        }


        public void DisconnectData(int TopicID)
        {
            removeRegisteredTopic(TopicID);
        }

        public int Heartbeat()
        {
            return 1;
        }

        public Array RefreshData(ref int TopicCount)
        {
            Tuple2<int, object> data;
            object[,] result = new object[2, refreshQ.Count];
            TopicCount = 0;
            for (int i = 0; i < refreshQ.Count; i++)
            {
                data = refreshQ.Dequeue();
                TopicCount++;
                result[0, i] = data.a;
                result[1, i] = data.b;
            }

            return result;
        }

        public void ServerTerminate()
        {
            foreach (IChannel c in channelCache.Values)
            {
                c.Dispose();
            }
        }

        #endregion
        //END IRTDServer METHODS
    }

    class Tuple2<T, U>
    {
        public Tuple2(T t, U u)
        {
            a = t;
            b = u;
        }
        public T a { get; set; }
        public U b { get; set; }
    }

    class Tuple3<T, U, V>
    {
        public Tuple3(T t, U u, V v)
        {
            a = t;
            b = u;
            c = v;
        }
        public T a { get; set; }
        public U b { get; set; }
        public V c { get; set; }
    }
}


-- 
This message is automatically generated by JIRA.
-
You can reply to this email to add a comment to the issue online.

Reply via email to