[ 
https://issues.apache.org/jira/browse/AMQ-4611?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Claus Ibsen updated AMQ-4611:
-----------------------------

         Priority: Major  (was: Blocker)
    Fix Version/s: NEEDS_REVIEWED

> Network Failure Issue in Embedded Broker using setStaticBridge=true
> -------------------------------------------------------------------
>
>                 Key: AMQ-4611
>                 URL: https://issues.apache.org/jira/browse/AMQ-4611
>             Project: ActiveMQ
>          Issue Type: Bug
>          Components: Broker
>    Affects Versions: 5.8.0
>         Environment: Production
>            Reporter: Murtaza
>              Labels: AMQ_EMBEDDED_BROKER_WITH_STATIC_BRIDGE
>             Fix For: NEEDS_REVIEWED
>
>
> Hi,
> I have an embedded broker connecting to my remote server broker using network 
> connector and setStaticBridge=true and static list of destinations. When 
> producer produces the message connecting to my local embedded broker, it 
> successfully forwards to remote broker and stick it to pending messages. 
> Issue comes during network failure when embedded broker is unable to 
> communicate to remote broker and producer keep on producing messages. 
> Producer messages will be stored in KahaDB as a part of persistence setting. 
> When network comes up, I am not able to see old messages produced by producer 
> in the pending message of the remote broker and it doesn't forward any new 
> messages produced by the producer to remote broker thereafter. Only temporary 
> solution is to restart embedded broker. Please help in this urgent issue. My 
> code are as below -
> 1. Embedded Broker
>     public final class EmbeddedBroker {
>     public static Destination destination;
>     public static String subject = "TOOL.DEFAULT";
>     public static boolean topic;
>     public static boolean transacted;
>     private EmbeddedBroker() {
>     }
>     public static void main(String[] args) throws Exception {
>         BrokerService broker = new BrokerService();
>         broker.setUseJmx(true);
>         broker.setBrokerName("storeforward");
>         broker.addConnector("tcp://localhost:61616");
>         broker.setPersistent(true);
>         NetworkConnector connector1 = 
> broker.addNetworkConnector("static:failover:"+"tcp://172.16.102.153:61616");
>         connector1.setDuplex(true);
>         connector1.setStaticBridge(true);
>         ActiveMQConnectionFactory connectionFactory = new 
> ActiveMQConnectionFactory(null, null, "failover:tcp://172.16.102.153:61616");
>         Connection connection = connectionFactory.createConnection();
>         System.out.println("test");
>         connection.start();
>         Session session = connection.createSession(transacted, 
> Session.AUTO_ACKNOWLEDGE);
>         if (topic) {
>             destination = session.createTopic(subject);
>         } else {
>             destination = session.createQueue(subject);
>         }
>         
>         List lst = new ArrayList();
>         lst.add(destination);
>         connector1.setBridgeTempDestinations(true);
>         connector1.setAdvisoryForFailedForward(true);
>         connector1.setStaticallyIncludedDestinations(lst);
>         
>         broker.start();
>         // now lets wait forever to avoid the JVM terminating immediately
>         Object lock = new Object();
>         synchronized (lock) {
>             lock.wait();
>         }
>     }
> }
> 2. Producer
> public class ProducerTool extends Thread {
>     private Destination destination;
>     private int messageCount = 10;
>     private long sleepTime;
>     private boolean verbose = true;
>     private int messageSize = 255;
>     private static int parallelThreads = 1;
>     private long timeToLive;
> /*    private String user = ActiveMQConnection.DEFAULT_USER;
>     private String password = ActiveMQConnection.DEFAULT_PASSWORD;
> */    
>     private String user = null;
>     private String password = null;
>     
>     private String url = "failover:tcp://172.16.102.153:61616";
>     private String subject = "TOOL.DEFAULT";
>     private boolean topic;
>     private boolean transacted;
>     private boolean persistent = true;
>     private static Object lockResults = new Object();
>     public static void main(String[] args) throws Exception {
>     
>         ArrayList<ProducerTool> threads = new ArrayList();
>         ProducerTool producerTool = new ProducerTool();
>         String[] unknown = CommandLineSupport.setOptions(producerTool, args);
>         if (unknown.length > 0) {
>             System.out.println("Unknown options: " + 
> Arrays.toString(unknown));
>             System.exit(-1);
>         }
>         producerTool.showParameters();
>         for (int threadCount = 1; threadCount <= parallelThreads; 
> threadCount++) {
>             producerTool = new ProducerTool();
>             CommandLineSupport.setOptions(producerTool, args);
>             producerTool.start();
>             threads.add(producerTool);
>         }
>         while (true) {
>             Iterator<ProducerTool> itr = threads.iterator();
>             int running = 0;
>             while (itr.hasNext()) {
>                 ProducerTool thread = itr.next();
>                 if (thread.isAlive()) {
>                     running++;
>                 }
>             }
>             if (running <= 0) {
>                 System.out.println("All threads completed their work");
>                 break;
>             }
>             try {
>                 Thread.sleep(1000);
>             } catch (Exception e) {
>             }
>         }
>         System.exit(0);
>     }
>     public void showParameters() {
>         System.out.println("Connecting to URL: " + url + " (" + user + ":" + 
> password + ")");
>         System.out.println("Publishing a Message with size " + messageSize + 
> " to " + (topic ? "topic" : "queue") + ": " + subject);
>         System.out.println("Using " + (persistent ? "persistent" : 
> "non-persistent") + " messages");
>         System.out.println("Sleeping between publish " + sleepTime + " ms");
>         System.out.println("Running " + parallelThreads + " parallel 
> threads");
>         if (timeToLive != 0) {
>             System.out.println("Messages time to live " + timeToLive + " ms");
>         }
>     }
>     public void run() {
>         Connection connection = null;
>         try {
>             // Create the connection.
>             
>             url = "tcp://localhost:61616";
>             
>             ActiveMQConnectionFactory connectionFactory = new 
> ActiveMQConnectionFactory(user, password, url);
>             connection = connectionFactory.createConnection();
>             connection.start();
>             
>             Session session = connection.createSession(transacted, 
> Session.AUTO_ACKNOWLEDGE);
>             if (topic) {
>                 destination = session.createTopic(subject);
>             } else {
>                 destination = session.createQueue(subject);
>             }
>             // Create the session
>             // Create the producer.
>             MessageProducer producer = session.createProducer(destination);
>             if (persistent) {
>                 producer.setDeliveryMode(DeliveryMode.PERSISTENT);
>             } else {
>                 producer.setDeliveryMode(DeliveryMode.NON_PERSISTENT);
>             }
>             if (timeToLive != 0) {
>                 producer.setTimeToLive(timeToLive);
>             }
>             // Start sending messages
>             sendLoop(session, producer);
>             System.out.println("[" + this.getName() + "] Done.");
>             synchronized (lockResults) {
>                 ActiveMQConnection c = (ActiveMQConnection) connection;
>                 System.out.println("[" + this.getName() + "] Results:\n");
>                 c.getConnectionStats().dump(new IndentPrinter());
>             }
>         } catch (Exception e) {
>             System.out.println("[" + this.getName() + "] Caught: " + e);
>             e.printStackTrace();
>         } finally {
>             try {
>                 connection.close();
>             } catch (Throwable ignore) {
>             }
>         }
>     }
>     protected void sendLoop(Session session, MessageProducer producer) throws 
> Exception {
>         for (int i = 0; i < messageCount || messageCount == 0; i++) {
>             TextMessage message = 
> session.createTextMessage(createMessageText(i));
>             if (verbose) {
>                 String msg = message.getText();
>                 if (msg.length() > 50) {
>                     msg = msg.substring(0, 50) + "...";
>                 }
>                 System.out.println("[" + this.getName() + "] Sending message: 
> '" + msg + "'");
>             }
>             producer.send(message);
>             if (transacted) {
>                 System.out.println("[" + this.getName() + "] Committing " + 
> messageCount + " messages");
>                 session.commit();
>             }
>             Thread.sleep(sleepTime);
>         }
>     }
>     private String createMessageText(int index) {
>         StringBuffer buffer = new StringBuffer(messageSize);
>         buffer.append("Message: " + index + " sent at: " + new Date());
>         if (buffer.length() > messageSize) {
>             return buffer.substring(0, messageSize);
>         }
>         for (int i = buffer.length(); i < messageSize; i++) {
>             buffer.append(' ');
>         }
>         return buffer.toString();
>     }
>     public void setPersistent(boolean durable) {
>         this.persistent = durable;
>     }
>     public void setMessageCount(int messageCount) {
>         this.messageCount = messageCount;
>     }
>     public void setMessageSize(int messageSize) {
>         this.messageSize = messageSize;
>     }
>     public void setPassword(String pwd) {
>         this.password = pwd;
>     }
>     public void setSleepTime(long sleepTime) {
>         this.sleepTime = sleepTime;
>     }
>     public void setSubject(String subject) {
>         this.subject = subject;
>     }
>     public void setTimeToLive(long timeToLive) {
>         this.timeToLive = timeToLive;
>     }
>     public void setParallelThreads(int parallelThreads) {
>         if (parallelThreads < 1) {
>             parallelThreads = 1;
>         }
>         this.parallelThreads = parallelThreads;
>     }
>     public void setTopic(boolean topic) {
>         this.topic = topic;
>     }
>     public void setQueue(boolean queue) {
>         this.topic = !queue;
>     }
>     public void setTransacted(boolean transacted) {
>         this.transacted = transacted;
>     }
>     public void setUrl(String url) {
>         this.url = url;
>     }
>     public void setUser(String user) {
>         this.user = user;
>     }
>     public void setVerbose(boolean verbose) {
>         this.verbose = verbose;
>     }
> }



--
This message was sent by Atlassian JIRA
(v6.1#6144)

Reply via email to