[
https://issues.apache.org/jira/browse/QPID-1146?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=12617608#action_12617608
]
John O'Hara commented on QPID-1146:
-----------------------------------
Very cool -- I've had people asking me for this feature this week.
> Excel RTD Server
> ----------------
>
> Key: QPID-1146
> URL: https://issues.apache.org/jira/browse/QPID-1146
> Project: Qpid
> Issue Type: New Feature
> Components: Dot Net Client
> Affects Versions: M2
> Environment: Windows .NET with Excel
> Reporter: Shahbaz Chaudhary
> Assignee: Aidan Skinner
> Fix For: M3
>
> Attachments: RTDTest.cs, RTDTest.cs
>
> Original Estimate: 48h
> Remaining Estimate: 48h
>
> --QUOTED FROM AN EMAIL SENT TO QPID'S MAILING LIST--
> Hi All,
> The following email contains an Excel RTD server which is able to subscribe
> to information from an M2 Qpid server.
> This is just a proof of concept, there are almost no optimizations, check for
> leaks, etc. I'm not really a .NET programmer and I have never done any COM
> programming (and no C++ since college).
> In any case, try it out. It works for me. I haven't figured out how to run
> it outside Visual Studio 2008 yet (no idea how to create install packages,
> register assemblies, etc.).
> QPID C# folks are welcome to use it as they wish. I probably won't maintain
> this, hopefully someone else will.
> In excel, you just have to type the following formula:
> =rtd("rtd.test",,"amqp://guest:[EMAIL
> PROTECTED]/test?brokerlist='tcp://<host>:567
> 2'","<topic>","<field>")
> This will subscribe to a topic, each time an update is received, it will
> retrieve a field and display it in Excel.
> Example:
> =rtd("rtd.test",,"amqp://guest:[EMAIL
> PROTECTED]/test?brokerlist='tcp://<host>:567
> 2'","md.bidsoffers","price")
> This will just display the price for all incoming bidsoffers (so
> MSFT/ORCL/EBAY will be mixed in)
> You can also add 'filters' to the RTD function. Just append two parameters
> to the end of the previous RTD function, the first parameter refers to the
> field you wish to use in comparison, the second parameter refers to the value
> the first parameter must have.
> Example:
> =rtd("rtd.test",,"amqp://guest:[EMAIL
> PROTECTED]/test?brokerlist='tcp://<host>:567
> 2'","md.bidsoffers","price",
> "symbol","MSFT")
> This will subscribe to the same information as the last RTD function, but
> will only display prices for MSFT.
> You can add as many filters as you like, just make sure you append the RTD
> function with a filter field and a filter value.
> --8<------------------------------RTDTest.cs----------------------------
> -----
> using System;
> using System.Collections.Generic;
> using System.Linq;
> using System.Text;
> using Apache.Qpid.Messaging;
> using Apache.Qpid.Client.Qms;
> using Apache.Qpid.Client;
> using System.Runtime.InteropServices;
> using Microsoft.Office.Interop.Excel;
> //Shahbaz Chaudhary
> namespace RTDTest
> {
> [ComVisible(true), ProgId("RTD.Test")]
> public class RTDTest : IRtdServer
> {
> //QPID CACHE
> Dictionary<string, IChannel> channelCache;//url, channel
> Dictionary<string, int> channelCacheCount;
> Dictionary<string, IMessageConsumer> topicCache;//url+topic, consumer
> Dictionary<string, int> topicCacheCount;
> Dictionary<int, string> topicIDCache;//url+topic+field, topicid
> Dictionary<string, IList<Tuple2<int, string>>> topicTopicIDFieldCache;
> Dictionary<int, Tuple2<string[], string[]>> filtersCache;
> //END QPID CACHE
> //IRTDServer Globals
> IRTDUpdateEvent updateEvent;
> Queue<Tuple2<int, object>> refreshQ;
> //END IRTDServer Globals
> public RTDTest()
> {
> channelCache = new Dictionary<string, IChannel>();
> topicCache = new Dictionary<string, IMessageConsumer>();
> channelCacheCount = new Dictionary<string, int>();
> topicCacheCount = new Dictionary<string, int>();
> topicIDCache = new Dictionary<int, string>();
> topicTopicIDFieldCache = new Dictionary<string, IList<Tuple2<int,
> string>>>();
> filtersCache = new Dictionary<int, Tuple2<string[], string[]>>();
> refreshQ = new Queue<Tuple2<int, object>>();
> }
> //QPID METHODS
> private IChannel getChannel(string url)
> {
> IChannel chan;
> if (channelCache.ContainsKey(url))
> {
> chan = channelCache[url];
> }
> else
> {
> IConnectionInfo connectionInfo =
> QpidConnectionInfo.FromUrl(url);
> Apache.Qpid.Messaging.IConnection connection = new
> AMQConnection(connectionInfo);
> IChannel channel = connection.CreateChannel(false,
> AcknowledgeMode.AutoAcknowledge, 1);
> connection.Start();
> chan = channel;
> channelCache[url] = chan;
> }
> return chan;
> }
> private IMessageConsumer getTopicConsumer(string url, string
> topic)
> {
> IMessageConsumer cons;
> string key = url + topic;
> if (topicCache.ContainsKey(key))
> {
> cons = topicCache[key];
> }
> else
> {
> IChannel channel = getChannel(url);
> string tempQ = channel.GenerateUniqueName();
> channel.DeclareQueue(tempQ, false, true, true);
> cons = channel.CreateConsumerBuilder(tempQ).Create();
> channel.Bind(tempQ, ExchangeNameDefaults.TOPIC, topic);
> topicCache[key] = cons;
> }
> return cons;
> }
> private IList<Tuple2<int, string>> getFields(string topic)
> {
> if (!topicTopicIDFieldCache.ContainsKey(topic))
> {
> topicTopicIDFieldCache[topic] = new List<Tuple2<int,
> string>>();
> }
> return topicTopicIDFieldCache[topic];
> }
> private void onMessage(IMessage msg, string url, string topic, string
> field, int topicid)
> {
> foreach (Tuple2<int, string> f in getFields(topic))//?
> {
> int id = f.a;
> object value = msg.Headers[f.b];
> //Dictionary<int, object> d = new Dictionary<int,
> object>();
> //d.Add(id, value);
> string[] filterFields = filtersCache[id].a;
> string[] filterVals = filtersCache[id].b;
> if(allFiltersTrue(filterFields,filterVals,msg)){
> refreshQ.Enqueue(new Tuple2<int,object>(id,value));
> }
> }
> try
> {
> updateEvent.UpdateNotify();
> }
> catch (COMException e)
> {
> }
> }
> void registerTopicID(string url, string topic, string field, int
> topicid)
> {
> string val = url + "|" + topic + "|" + field;
> topicIDCache.Add(topicid, val);
> Tuple2<int, string> dict = new Tuple2<int,
> string>(topicid,field);
> getFields(topic).Add(dict);
> if (!channelCacheCount.ContainsKey(url))
> channelCacheCount[url] = 0;
> channelCacheCount[url]++;
> if (!topicCacheCount.ContainsKey(url + "|" + topic))
> topicCacheCount[url + "|" + topic] = 0;
> topicCacheCount[url + "|" + topic]++;
> getTopicConsumer(url, topic).OnMessage += msg => {
> onMessage(msg,url, topic, field, topicid); };
> }
> private bool allFiltersTrue(string[] filterKeys, string[] filterVals,
> IMessage msg)
> {
> for (int i = 0; i < filterKeys.Length; i++)
> {
> if
> (!msg.Headers[filterKeys[i]].ToString().Equals(filterVals[i]))
> {
> return false;
> }
> }
> return true;
> }
> public void removeRegisteredTopic(int topicid)
> {
> string vals = topicIDCache[topicid];
> string[] keys = vals.Split(new char[] { '|' });
> string url = keys[0];
> string topic = keys[1];
> string field = keys[2];
> channelCacheCount[url]--;
> topicCacheCount[url + "|" + topic]--;
> if (channelCacheCount[url] <= 0)
> {
> channelCacheCount.Remove(url);
> channelCache[url].Dispose();
> channelCache.Remove(url);
> }
> if (topicCacheCount[url + "|" + topic] <= 0)
> {
> topicCacheCount.Remove(url + "|" + topic);
> topicCache[url + "|" + topic].Dispose();
> topicCache.Remove(url + "|" + topic);
> topicTopicIDFieldCache.Remove(topic);
> }
> filtersCache.Remove(topicid);
> }
> //END QPID METHODS
>
> //----------------------------------------------------------------------
> -----------------------------------
> //IRTDServer METHODS
> #region IRtdServer Members
> public int ServerStart(IRTDUpdateEvent CallbackObject)
> {
> updateEvent = CallbackObject;
> return 1;
> }
> public object ConnectData(int TopicID, ref Array Strings, ref bool
> GetNewValues)
> {
> int size = Strings.Length;
> int conditions = (int)Math.Floor((double)(size - 3) / 2);
> string url;
> string topic;
> string field;
> string[] filterKeys = new string[conditions];
> string[] filterVals = new string[conditions];
> url = (string)Strings.GetValue(0);
> topic = (string)Strings.GetValue(1);
> field = (string)Strings.GetValue(2);
> for (int i = 0; i < conditions; i = i + 2)
> {
> filterKeys[i] = (string)Strings.GetValue(i + 3);
> filterVals[i] = (string)Strings.GetValue(i + 1 + 3);
> }
> Tuple2<string[], string[]> filters = new Tuple2<string[],
> string[]>(filterKeys,filterVals);
> filtersCache.Add(TopicID, filters);
> registerTopicID(url, topic, field, TopicID);
> return "Getting data...";
> }
> public void DisconnectData(int TopicID)
> {
> removeRegisteredTopic(TopicID);
> }
> public int Heartbeat()
> {
> return 1;
> }
> public Array RefreshData(ref int TopicCount)
> {
> Tuple2<int, object> data;
> object[,] result = new object[2, refreshQ.Count];
> TopicCount = 0;
> for (int i = 0; i < refreshQ.Count; i++)
> {
> data = refreshQ.Dequeue();
> TopicCount++;
> result[0, i] = data.a;
> result[1, i] = data.b;
> }
> return result;
> }
> public void ServerTerminate()
> {
> foreach (IChannel c in channelCache.Values)
> {
> c.Dispose();
> }
> }
> #endregion
> //END IRTDServer METHODS
> }
> class Tuple2<T, U>
> {
> public Tuple2(T t, U u)
> {
> a = t;
> b = u;
> }
> public T a { get; set; }
> public U b { get; set; }
> }
> class Tuple3<T, U, V>
> {
> public Tuple3(T t, U u, V v)
> {
> a = t;
> b = u;
> c = v;
> }
> public T a { get; set; }
> public U b { get; set; }
> public V c { get; set; }
> }
> }
--
This message is automatically generated by JIRA.
-
You can reply to this email to add a comment to the issue online.