Hi all,

I am trying to use the failover transport
(failover:(tcp://localhost:61618)), and I have some questions :

I did a test with a simple consumer in scala (see the code and log at
the end of the mail).

Scenario :
Broker is stopped.
We start the consumer, it waits
Start the broker
Consumer connects and consume messages
Stop the broker and start the broker again
Consumer tries 6 times to reconnect and stop working.

Any ideas ? thanks for your help.

Regards.
Jean-Yves


Here is the log :

R |16 févr. 2010 16:34:57,730 - Waiting 320 ms before attempting connection.
R |16 févr. 2010 16:34:58,042 - urlList connectionList:[tcp://localhost:61618]
R |16 févr. 2010 16:34:58,042 - Attempting connect to: tcp://localhost:61618
R |16 févr. 2010 16:34:59,245 - Connect fail to:
tcp://localhost:61618, reason: java.net.ConnectException: Connection
refused: connect
R |16 févr. 2010 16:34:59,245 - Stopping transport tcp://null:0
R |16 févr. 2010 16:34:59,245 - Waiting 640 ms before attempting connection.
R |16 févr. 2010 16:34:59,886 - urlList connectionList:[tcp://localhost:61618]
R |16 févr. 2010 16:34:59,886 - Attempting connect to: tcp://localhost:61618
R |16 févr. 2010 16:35:00,964 - Connect fail to:
tcp://localhost:61618, reason: java.net.ConnectException: Connection
refused: connect
R |16 févr. 2010 16:35:00,964 - Stopping transport tcp://null:0
R |16 févr. 2010 16:35:00,964 - Waiting 1280 ms before attempting connection.
R |16 févr. 2010 16:35:02,245 - urlList connectionList:[tcp://localhost:61618]
R |16 févr. 2010 16:35:02,245 - Attempting connect to: tcp://localhost:61618
R |16 févr. 2010 16:35:02,245 - Sending: WireFormatInfo { version=5,
properties={CacheSize=1024, CacheEnabled=true,
SizePrefixDisabled=false, MaxInactivityDurationInitalDelay=10000,
TcpNoDelayEnabled=true, MaxInactivityDuration=30000,
TightEncodingEnabled=true, StackTraceEnabled=true},
magic=[A,c,t,i,v,e,M,Q]}
R |16 févr. 2010 16:35:02,448 - Received WireFormat: WireFormatInfo {
version=3, properties={CacheSize=1024, CacheEnabled=true,
SizePrefixDisabled=false, MaxInactivityDurationInitalDelay=10000,
TcpNoDelayEnabled=true, MaxInactivityDuration=30000,
TightEncodingEnabled=true, StackTraceEnabled=true},
magic=[A,c,t,i,v,e,M,Q]}
R |16 févr. 2010 16:35:02,448 - tcp://localhost/127.0.0.1:61618 before
negotiation: OpenWireFormat{version=5, cacheEnabled=false,
stackTraceEnabled=false, tightEncodingEnabled=false,
sizePrefixDisabled=false}
R |16 févr. 2010 16:35:02,495 - tcp://localhost/127.0.0.1:61618 after
negotiation: OpenWireFormat{version=3, cacheEnabled=true,
stackTraceEnabled=true, tightEncodingEnabled=true,
sizePrefixDisabled=false}
R |16 févr. 2010 16:35:02,511 - Connection established
R |16 févr. 2010 16:35:02,511 - Successfully connected to tcp://localhost:61618
Message recieved:
{'RealTimeData':{'reference':'ZInternal_SystemTimeMillis','value':'1266334506073'}}
Message recieved:
{'RealTimeData':{'reference':'ZInternal_SystemTimeMillis','value':'1266334512073'}}
R |16 févr. 2010 16:35:14,339 - Stopping transport
tcp://localhost/127.0.0.1:61618
R |16 févr. 2010 16:35:14,339 - Transport failed to
tcp://localhost:61618 , attempting to automatically reconnect due to:
java.io.EOFException
R |16 févr. 2010 16:35:14,339 - Transport failed with the following exception:
java.io.EOFException
        at java.io.DataInputStream.readInt(DataInputStream.java:375)
        at 
org.apache.activemq.openwire.OpenWireFormat.unmarshal(OpenWireFormat.java:272)
        at 
org.apache.activemq.transport.tcp.TcpTransport.readCommand(TcpTransport.java:210)
        at 
org.apache.activemq.transport.tcp.TcpTransport.doRun(TcpTransport.java:202)
        at 
org.apache.activemq.transport.tcp.TcpTransport.run(TcpTransport.java:185)
        at java.lang.Thread.run(Thread.java:619)
R |16 févr. 2010 16:35:14,355 - urlList connectionList:[tcp://localhost:61618]
R |16 févr. 2010 16:35:14,355 - Attempting connect to: tcp://localhost:61618
R |16 févr. 2010 16:35:15,339 - Connect fail to:
tcp://localhost:61618, reason: java.net.ConnectException: Connection
refused: connect
R |16 févr. 2010 16:35:15,339 - Stopping transport tcp://null:0
R |16 févr. 2010 16:35:15,339 - Waiting 10 ms before attempting connection.
R |16 févr. 2010 16:35:15,355 - urlList connectionList:[tcp://localhost:61618]
R |16 févr. 2010 16:35:15,355 - Attempting connect to: tcp://localhost:61618
R |16 févr. 2010 16:35:16,355 - Connect fail to:
tcp://localhost:61618, reason: java.net.ConnectException: Connection
refused: connect
.... tries 6 times and stops


-------------- The code ------------------------

import javax.jms._
import org.apache.activemq.ActiveMQConnectionFactory

class Consumer(brokerUrl:String) extends MessageListener {
    var factory = new ActiveMQConnectionFactory(brokerUrl)
    val connection = factory.createConnection()
    connection.start()
    val session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE)

    val destination = session.createTopic("wbm.supervision")
    val consumer = session.createConsumer(destination)
    consumer.setMessageListener(this)

    def onMessage(message:Message):Unit = {
        if (message.isInstanceOf[TextMessage]) {
            val textMessage = message.asInstanceOf[TextMessage]
            println("Message recieved: " + textMessage.getText())
        } else {
            println("Oops, not a text message")
        }
    }
}

object ConsumerApp extends Application {
  val consumer = new Consumer("failover:(tcp://localhost:61618)")
}

Reply via email to