[ https://issues.apache.org/jira/browse/AMQ-4611?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ]
Claus Ibsen updated AMQ-4611: ----------------------------- Priority: Major (was: Blocker) Fix Version/s: NEEDS_REVIEWED > Network Failure Issue in Embedded Broker using setStaticBridge=true > ------------------------------------------------------------------- > > Key: AMQ-4611 > URL: https://issues.apache.org/jira/browse/AMQ-4611 > Project: ActiveMQ > Issue Type: Bug > Components: Broker > Affects Versions: 5.8.0 > Environment: Production > Reporter: Murtaza > Labels: AMQ_EMBEDDED_BROKER_WITH_STATIC_BRIDGE > Fix For: NEEDS_REVIEWED > > > Hi, > I have an embedded broker connecting to my remote server broker using network > connector and setStaticBridge=true and static list of destinations. When > producer produces the message connecting to my local embedded broker, it > successfully forwards to remote broker and stick it to pending messages. > Issue comes during network failure when embedded broker is unable to > communicate to remote broker and producer keep on producing messages. > Producer messages will be stored in KahaDB as a part of persistence setting. > When network comes up, I am not able to see old messages produced by producer > in the pending message of the remote broker and it doesn't forward any new > messages produced by the producer to remote broker thereafter. Only temporary > solution is to restart embedded broker. Please help in this urgent issue. My > code are as below - > 1. Embedded Broker > public final class EmbeddedBroker { > public static Destination destination; > public static String subject = "TOOL.DEFAULT"; > public static boolean topic; > public static boolean transacted; > private EmbeddedBroker() { > } > public static void main(String[] args) throws Exception { > BrokerService broker = new BrokerService(); > broker.setUseJmx(true); > broker.setBrokerName("storeforward"); > broker.addConnector("tcp://localhost:61616"); > broker.setPersistent(true); > NetworkConnector connector1 = > broker.addNetworkConnector("static:failover:"+"tcp://172.16.102.153:61616"); > connector1.setDuplex(true); > connector1.setStaticBridge(true); > ActiveMQConnectionFactory connectionFactory = new > ActiveMQConnectionFactory(null, null, "failover:tcp://172.16.102.153:61616"); > Connection connection = connectionFactory.createConnection(); > System.out.println("test"); > connection.start(); > Session session = connection.createSession(transacted, > Session.AUTO_ACKNOWLEDGE); > if (topic) { > destination = session.createTopic(subject); > } else { > destination = session.createQueue(subject); > } > > List lst = new ArrayList(); > lst.add(destination); > connector1.setBridgeTempDestinations(true); > connector1.setAdvisoryForFailedForward(true); > connector1.setStaticallyIncludedDestinations(lst); > > broker.start(); > // now lets wait forever to avoid the JVM terminating immediately > Object lock = new Object(); > synchronized (lock) { > lock.wait(); > } > } > } > 2. Producer > public class ProducerTool extends Thread { > private Destination destination; > private int messageCount = 10; > private long sleepTime; > private boolean verbose = true; > private int messageSize = 255; > private static int parallelThreads = 1; > private long timeToLive; > /* private String user = ActiveMQConnection.DEFAULT_USER; > private String password = ActiveMQConnection.DEFAULT_PASSWORD; > */ > private String user = null; > private String password = null; > > private String url = "failover:tcp://172.16.102.153:61616"; > private String subject = "TOOL.DEFAULT"; > private boolean topic; > private boolean transacted; > private boolean persistent = true; > private static Object lockResults = new Object(); > public static void main(String[] args) throws Exception { > > ArrayList<ProducerTool> threads = new ArrayList(); > ProducerTool producerTool = new ProducerTool(); > String[] unknown = CommandLineSupport.setOptions(producerTool, args); > if (unknown.length > 0) { > System.out.println("Unknown options: " + > Arrays.toString(unknown)); > System.exit(-1); > } > producerTool.showParameters(); > for (int threadCount = 1; threadCount <= parallelThreads; > threadCount++) { > producerTool = new ProducerTool(); > CommandLineSupport.setOptions(producerTool, args); > producerTool.start(); > threads.add(producerTool); > } > while (true) { > Iterator<ProducerTool> itr = threads.iterator(); > int running = 0; > while (itr.hasNext()) { > ProducerTool thread = itr.next(); > if (thread.isAlive()) { > running++; > } > } > if (running <= 0) { > System.out.println("All threads completed their work"); > break; > } > try { > Thread.sleep(1000); > } catch (Exception e) { > } > } > System.exit(0); > } > public void showParameters() { > System.out.println("Connecting to URL: " + url + " (" + user + ":" + > password + ")"); > System.out.println("Publishing a Message with size " + messageSize + > " to " + (topic ? "topic" : "queue") + ": " + subject); > System.out.println("Using " + (persistent ? "persistent" : > "non-persistent") + " messages"); > System.out.println("Sleeping between publish " + sleepTime + " ms"); > System.out.println("Running " + parallelThreads + " parallel > threads"); > if (timeToLive != 0) { > System.out.println("Messages time to live " + timeToLive + " ms"); > } > } > public void run() { > Connection connection = null; > try { > // Create the connection. > > url = "tcp://localhost:61616"; > > ActiveMQConnectionFactory connectionFactory = new > ActiveMQConnectionFactory(user, password, url); > connection = connectionFactory.createConnection(); > connection.start(); > > Session session = connection.createSession(transacted, > Session.AUTO_ACKNOWLEDGE); > if (topic) { > destination = session.createTopic(subject); > } else { > destination = session.createQueue(subject); > } > // Create the session > // Create the producer. > MessageProducer producer = session.createProducer(destination); > if (persistent) { > producer.setDeliveryMode(DeliveryMode.PERSISTENT); > } else { > producer.setDeliveryMode(DeliveryMode.NON_PERSISTENT); > } > if (timeToLive != 0) { > producer.setTimeToLive(timeToLive); > } > // Start sending messages > sendLoop(session, producer); > System.out.println("[" + this.getName() + "] Done."); > synchronized (lockResults) { > ActiveMQConnection c = (ActiveMQConnection) connection; > System.out.println("[" + this.getName() + "] Results:\n"); > c.getConnectionStats().dump(new IndentPrinter()); > } > } catch (Exception e) { > System.out.println("[" + this.getName() + "] Caught: " + e); > e.printStackTrace(); > } finally { > try { > connection.close(); > } catch (Throwable ignore) { > } > } > } > protected void sendLoop(Session session, MessageProducer producer) throws > Exception { > for (int i = 0; i < messageCount || messageCount == 0; i++) { > TextMessage message = > session.createTextMessage(createMessageText(i)); > if (verbose) { > String msg = message.getText(); > if (msg.length() > 50) { > msg = msg.substring(0, 50) + "..."; > } > System.out.println("[" + this.getName() + "] Sending message: > '" + msg + "'"); > } > producer.send(message); > if (transacted) { > System.out.println("[" + this.getName() + "] Committing " + > messageCount + " messages"); > session.commit(); > } > Thread.sleep(sleepTime); > } > } > private String createMessageText(int index) { > StringBuffer buffer = new StringBuffer(messageSize); > buffer.append("Message: " + index + " sent at: " + new Date()); > if (buffer.length() > messageSize) { > return buffer.substring(0, messageSize); > } > for (int i = buffer.length(); i < messageSize; i++) { > buffer.append(' '); > } > return buffer.toString(); > } > public void setPersistent(boolean durable) { > this.persistent = durable; > } > public void setMessageCount(int messageCount) { > this.messageCount = messageCount; > } > public void setMessageSize(int messageSize) { > this.messageSize = messageSize; > } > public void setPassword(String pwd) { > this.password = pwd; > } > public void setSleepTime(long sleepTime) { > this.sleepTime = sleepTime; > } > public void setSubject(String subject) { > this.subject = subject; > } > public void setTimeToLive(long timeToLive) { > this.timeToLive = timeToLive; > } > public void setParallelThreads(int parallelThreads) { > if (parallelThreads < 1) { > parallelThreads = 1; > } > this.parallelThreads = parallelThreads; > } > public void setTopic(boolean topic) { > this.topic = topic; > } > public void setQueue(boolean queue) { > this.topic = !queue; > } > public void setTransacted(boolean transacted) { > this.transacted = transacted; > } > public void setUrl(String url) { > this.url = url; > } > public void setUser(String user) { > this.user = user; > } > public void setVerbose(boolean verbose) { > this.verbose = verbose; > } > } -- This message was sent by Atlassian JIRA (v6.1#6144)