This is the follow-up code I wrote to help me understand JMS-Topic
(javax.jms.Topic).
Regards,
John
(ns jms.jms-topic
(:use [jms.jms-test :only (get-initial-context
get-message-text)])
(:import (javax.jms Session MessageListener)))
(defn publish-term-message-to-topic [tPublisher tSession]
(.publish tPublisher (.createMessage tSession)))
(defn publish-message-to-topic [tPublisher tSession]
(let [message (.createTextMessage tSession)]
(.setText message (get-message-text))
(println (format "Publishing message - %s ..." (.getText
message)))
(.publish tPublisher message)))
(defn publish-n-messages-to-topic [tPublisher tSession num-messages
qReceiver qConnection]
;; Wait for subscribers
(println "Waiting for subscribers...")
(let [subscriber-available (ref false)]
(.setMessageListener qReceiver
(proxy [MessageListener][]
(onMessage [message]
(dosync (ref-set subscriber-
available true)))))
(.start qConnection)
(while (false? @subscriber-available)
(Thread/sleep 1000))
(.close qConnection)
(println "Hoo-rah!! We have a subscriber"))
(loop [n num-messages]
(if (zero? n)
(do
(publish-term-message-to-topic tPublisher tSession)
(println "*Done publishing messages to topic!*"))
(recur (do
(publish-message-to-topic tPublisher tSession)
(dec n))))))
(defn process-topic-messages [tSubscriber tConnection qSender
qSession]
;; Snooze a bit before we are ready to process topic messages
(Thread/sleep (* 10 1000))
(println "-- Subscriber is now online --")
(.send qSender (.createMessage qSession))
(let [done-processing (ref false)]
(.setMessageListener tSubscriber
(proxy [MessageListener][]
(onMessage [message]
(if (instance?
javax.jms.TextMessage message)
(println (format "Read
message: %s" (.getText message)))
(dosync (ref-set done-
processing true))))))
(.start tConnection)
(while (false? @done-processing)
(Thread/sleep 1000)))
(.close tConnection)
(println "==Read all messages off the topic!!=="))
(defn main []
(let [ctx (get-initial-context)
tConFactory (.lookup ctx "TopicConnectionFactory")
tConnection (.createTopicConnection tConFactory)
tSession (.createTopicSession tConnection false Session/
AUTO_ACKNOWLEDGE)
topic (.createTopic tSession "TestTopic")
tPublisher (.createPublisher tSession topic)
tSubscriber (.createSubscriber tSession topic)
qConFactory (.lookup ctx "QueueConnectionFactory")
qConnection (.createQueueConnection qConFactory)
qSession (.createQueueSession qConnection false Session/
AUTO_ACKNOWLEDGE)
queue (.createQueue qSession "controlQueue")
qSender (.createSender qSession queue)
qReceiver (.createReceiver qSession queue)]
(.start (Thread. (fn[] (publish-n-messages-to-topic tPublisher
tSession 10 qReceiver qConnection))))
(.start (Thread. (fn[] (process-topic-messages tSubscriber
tConnection qSender qSession))))))
--
You received this message because you are subscribed to the Google
Groups "Clojure" group.
To post to this group, send email to [email protected]
Note that posts from new members are moderated - please be patient with your
first post.
To unsubscribe from this group, send email to
[email protected]
For more options, visit this group at
http://groups.google.com/group/clojure?hl=en
To unsubscribe from this group, send email to
clojure+unsubscribegooglegroups.com or reply to this email with the words
"REMOVE ME" as the subject.