Hi Paul,
that is the correct behaviour, in order to get all messages you must
make sure to be subscribed. If your publishes have been done before your
subscription you get only the last published message. Persistent means
that the message is saved persistently until delivery it does not mean
you can get it even if you suscribe after having being published. I
think the behaviour you are looking for is a request to the history
queue. Read more on how to make a query to the history queue:
http://www.xmlblaster.org/xmlBlaster/doc/requirements/engine.qos.queryspec.QueueQuery.html
Also remember you have to configure the history queue to be
sufficiently big to keep the messages you need.
Regards
Michele
Paul Babyak wrote:
Hello.
I'm trying to use persistence messages, but i think i missed something
in examples.
When i publish message into topic only last persist. Moreover, this
message exist every time when i subscribe to topic.
Here the sample code (formerly HelloWorld).
================Publish===================
I_XmlBlasterAccess con = this.glob.getXmlBlasterAccess();
try {
String name = glob.getProperty().get("session.name
<http://session.name>", "XBTestsPublish");
String passwd = glob.getProperty().get("passwd", "secret");
ConnectQos qos = new ConnectQos(glob, name, passwd);
qos.setPersistent(true);
con.connect(qos, this);
PublishKey pk = new PublishKey(glob, "HelloWorld",
"text/xml", "1.0");
PublishQos pq = new PublishQos(glob);
pq.setPersistent(true);
MsgUnit msgUnit;
PublishReturnQos prq;
for (int i = 0; i < 10; i++) {
msgUnit = new MsgUnit(pk, "Hi " + i, pq);
prq = con.publish(msgUnit);
log.info <http://log.info>("Got status='" +
prq.getState() + "' for published message '" + prq.getKeyOid());
System.out.println("Got status='" + prq.getState() +
"' for published message '" + prq.getKeyOid());
}
try {
Thread.sleep(1000);
}
catch (InterruptedException i) {
} // wait a second to receive update()
DisconnectQos dq = new DisconnectQos(glob);
con.disconnect(dq);
glob.shutdown(); // free resources
}catch (Throwable e) {
log.severe(e.toString());
e.printStackTrace();
}
==============================
============Subscribe============
I_XmlBlasterAccess con = this.glob.getXmlBlasterAccess();
try {
String name = glob.getProperty().get("session.name
<http://session.name>", "XBTestSubscribe/5");
String passwd = glob.getProperty().get("passwd", "secret");
ConnectQos qos = new ConnectQos(glob, name, passwd);
ConnectReturnQos crq = con.connect(qos, this);
String subId = Constants.SUBSCRIPTIONID_PREFIX +
crq.getSessionName().getRelativeName(true) + "-" + "4";
SubscribeKey sk = new SubscribeKey(glob, "HelloWorld");
SubscribeQos sq = new SubscribeQos(glob);
sq.setSubscriptionId(subId);
// sq.setWantUpdateOneway();
SubscribeReturnQos subRet = con.subscribe(sk, sq, new
I_Callback() {
public String update(String cbSessionId, UpdateKey
updateKey, byte[] bytes, UpdateQos updateQos) throws XmlBlasterException {
System.out.println("message = " + (new
String(bytes)));
return null;
}
});
try {
Global.waitOnKeyboardHit("Success, hit a key to exit");
}
catch (Exception i) {
} // wait a second to receive update()
UnSubscribeKey uk = new UnSubscribeKey(glob,
subRet.getSubscriptionId());
UnSubscribeQos uq = new UnSubscribeQos(glob);
UnSubscribeReturnQos[] urq = con.unSubscribe(uk, uq);
if (urq.length > 0) log.info
<http://log.info>("Unsubscribed from topic");
DisconnectQos dq = new DisconnectQos(glob);
con.disconnect(dq);
glob.shutdown(); // free resources
}catch (Throwable e) {
log.severe(e.toString());
e.printStackTrace();
}
===============================