Repository: activemq Updated Branches: refs/heads/master c60d71696 -> 5385fd1bb
https://issues.apache.org/jira/browse/AMQ-6446 - use shared logger and pepend statements with connection counter. Old behaviour or per connection logger can be obtained with trace=true&jmxPort=0 Project: http://git-wip-us.apache.org/repos/asf/activemq/repo Commit: http://git-wip-us.apache.org/repos/asf/activemq/commit/5385fd1b Tree: http://git-wip-us.apache.org/repos/asf/activemq/tree/5385fd1b Diff: http://git-wip-us.apache.org/repos/asf/activemq/diff/5385fd1b Branch: refs/heads/master Commit: 5385fd1bb36abe5a123db039b63d30cefd594ba4 Parents: c60d716 Author: gtully <[email protected]> Authored: Fri Sep 30 14:05:42 2016 +0100 Committer: gtully <[email protected]> Committed: Fri Sep 30 14:05:42 2016 +0100 ---------------------------------------------------------------------- .../transport/TransportLoggerFactory.java | 17 ++- .../transport/logwriters/CustomLogWriter.java | 7 +- .../transport/logwriters/DefaultLogWriter.java | 18 ++- .../apache/activemq/TransportLoggerSupport.java | 6 + .../apache/activemq/transport/LogWriter.java | 7 + .../transport/tcp/TcpTransportServer.java | 10 ++ .../activemq/store/kahadb/KahaDBTest.java | 2 +- .../apache/activemq/usecases/AMQ6446Test.java | 152 +++++++++++++++++++ 8 files changed, 205 insertions(+), 14 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/activemq/blob/5385fd1b/activemq-broker/src/main/java/org/apache/activemq/transport/TransportLoggerFactory.java ---------------------------------------------------------------------- diff --git a/activemq-broker/src/main/java/org/apache/activemq/transport/TransportLoggerFactory.java b/activemq-broker/src/main/java/org/apache/activemq/transport/TransportLoggerFactory.java index 37ee004..599ea46 100644 --- a/activemq-broker/src/main/java/org/apache/activemq/transport/TransportLoggerFactory.java +++ b/activemq-broker/src/main/java/org/apache/activemq/transport/TransportLoggerFactory.java @@ -27,6 +27,8 @@ import org.apache.activemq.util.LogWriterFinder; import org.slf4j.Logger; import org.slf4j.LoggerFactory; +import static org.apache.activemq.TransportLoggerSupport.defaultJmxPort; + /** * Singleton class to create TransportLogger objects. * When the method getInstance() is called for the first time, @@ -61,10 +63,6 @@ public class TransportLoggerFactory { * This setting only has a meaning if */ private static boolean defaultInitialBehavior = true; - /** - * Default port to control the transport loggers through JMX - */ - private static int defaultJmxPort = 1099; private boolean transportLoggerControlCreated = false; private ManagementContext managementContext; @@ -137,7 +135,11 @@ public class TransportLoggerFactory { */ public TransportLogger createTransportLogger(Transport next, String logWriterName, boolean useJmx, boolean startLogging, int jmxport) throws IOException { - int id = getNextId(); + int id = -1; // new default to single logger + // allow old behaviour with incantation + if (!useJmx && jmxport != defaultJmxPort) { + id = getNextId(); + } return createTransportLogger(next, id, createLog(id), logWriterName, useJmx, startLogging, jmxport); } @@ -162,6 +164,9 @@ public class TransportLoggerFactory { String logWriterName, boolean dynamicManagement, boolean startLogging, int jmxport) throws IOException { try { LogWriter logWriter = logWriterFinder.newInstance(logWriterName); + if (id == -1) { + logWriter.setPrefix(String.format("%08X: ", getNextId())); + } TransportLogger tl = new TransportLogger (next, log, startLogging, logWriter); if (dynamicManagement) { synchronized (this) { @@ -183,7 +188,7 @@ public class TransportLoggerFactory { } private static Logger createLog(int id) { - return LoggerFactory.getLogger(TransportLogger.class.getName()+".Connection:" + id); + return LoggerFactory.getLogger(TransportLogger.class.getName()+".Connection" + (id > 0 ? ":"+id : "" )); } /** http://git-wip-us.apache.org/repos/asf/activemq/blob/5385fd1b/activemq-broker/src/main/java/org/apache/activemq/transport/logwriters/CustomLogWriter.java ---------------------------------------------------------------------- diff --git a/activemq-broker/src/main/java/org/apache/activemq/transport/logwriters/CustomLogWriter.java b/activemq-broker/src/main/java/org/apache/activemq/transport/logwriters/CustomLogWriter.java index 65b9162..4387cdc 100644 --- a/activemq-broker/src/main/java/org/apache/activemq/transport/logwriters/CustomLogWriter.java +++ b/activemq-broker/src/main/java/org/apache/activemq/transport/logwriters/CustomLogWriter.java @@ -36,7 +36,12 @@ import org.slf4j.Logger; * */ public class CustomLogWriter implements LogWriter { - + + @Override + public void setPrefix(String prefix) { + // for the custom case, revert to the logger per connection + } + // doc comment inherited from LogWriter public void initialMessage(Logger log) { http://git-wip-us.apache.org/repos/asf/activemq/blob/5385fd1b/activemq-broker/src/main/java/org/apache/activemq/transport/logwriters/DefaultLogWriter.java ---------------------------------------------------------------------- diff --git a/activemq-broker/src/main/java/org/apache/activemq/transport/logwriters/DefaultLogWriter.java b/activemq-broker/src/main/java/org/apache/activemq/transport/logwriters/DefaultLogWriter.java index b8261d9..4de3706 100644 --- a/activemq-broker/src/main/java/org/apache/activemq/transport/logwriters/DefaultLogWriter.java +++ b/activemq-broker/src/main/java/org/apache/activemq/transport/logwriters/DefaultLogWriter.java @@ -30,6 +30,12 @@ import org.slf4j.Logger; */ public class DefaultLogWriter implements LogWriter { + String prefix = ""; + @Override + public void setPrefix(String prefix) { + this.prefix = prefix; + } + // doc comment inherited from LogWriter public void initialMessage(Logger log) { // Default log writer does nothing here @@ -37,32 +43,32 @@ public class DefaultLogWriter implements LogWriter { // doc comment inherited from LogWriter public void logRequest (Logger log, Object command) { - log.debug("SENDING REQUEST: "+command); + log.debug(prefix + "SENDING REQUEST: "+command); } // doc comment inherited from LogWriter public void logResponse (Logger log, Object response) { - log.debug("GOT RESPONSE: "+response); + log.debug(prefix + "GOT RESPONSE: "+response); } // doc comment inherited from LogWriter public void logAsyncRequest (Logger log, Object command) { - log.debug("SENDING ASNYC REQUEST: "+command); + log.debug(prefix + "SENDING ASNYC REQUEST: "+command); } // doc comment inherited from LogWriter public void logOneWay (Logger log, Object command) { - log.debug("SENDING: "+command); + log.debug(prefix + "SENDING: "+command); } // doc comment inherited from LogWriter public void logReceivedCommand (Logger log, Object command) { - log.debug("RECEIVED: " + command); + log.debug(prefix + "RECEIVED: " + command); } // doc comment inherited from LogWriter public void logReceivedException (Logger log, IOException error) { - log.debug("RECEIVED Exception: "+error, error); + log.debug(prefix + "RECEIVED Exception: "+error, error); } http://git-wip-us.apache.org/repos/asf/activemq/blob/5385fd1b/activemq-client/src/main/java/org/apache/activemq/TransportLoggerSupport.java ---------------------------------------------------------------------- diff --git a/activemq-client/src/main/java/org/apache/activemq/TransportLoggerSupport.java b/activemq-client/src/main/java/org/apache/activemq/TransportLoggerSupport.java index 2a4e600..80ad176 100644 --- a/activemq-client/src/main/java/org/apache/activemq/TransportLoggerSupport.java +++ b/activemq-client/src/main/java/org/apache/activemq/TransportLoggerSupport.java @@ -27,6 +27,12 @@ public class TransportLoggerSupport { public static String defaultLogWriterName = "default"; + /** + * Default port to control the transport loggers through JMX + */ + public static int defaultJmxPort = 1099; + + public static interface SPI { public Transport createTransportLogger(Transport transport) throws IOException; public Transport createTransportLogger(Transport transport, String logWriterName, boolean dynamicManagement, boolean startLogging, int jmxPort) throws IOException; http://git-wip-us.apache.org/repos/asf/activemq/blob/5385fd1b/activemq-client/src/main/java/org/apache/activemq/transport/LogWriter.java ---------------------------------------------------------------------- diff --git a/activemq-client/src/main/java/org/apache/activemq/transport/LogWriter.java b/activemq-client/src/main/java/org/apache/activemq/transport/LogWriter.java index afc2ca2..d92ccc6 100644 --- a/activemq-client/src/main/java/org/apache/activemq/transport/LogWriter.java +++ b/activemq-client/src/main/java/org/apache/activemq/transport/LogWriter.java @@ -35,6 +35,13 @@ import org.slf4j.Logger; public interface LogWriter { /** + * prefix each statement with this value. allows connections to be correlated + * when logger is shared + * @param prefix + */ + public void setPrefix(String prefix); + + /** * Writes a header message to the log. * @param log The log to be written to. */ http://git-wip-us.apache.org/repos/asf/activemq/blob/5385fd1b/activemq-client/src/main/java/org/apache/activemq/transport/tcp/TcpTransportServer.java ---------------------------------------------------------------------- diff --git a/activemq-client/src/main/java/org/apache/activemq/transport/tcp/TcpTransportServer.java b/activemq-client/src/main/java/org/apache/activemq/transport/tcp/TcpTransportServer.java index 5d623b6..f3e225f 100755 --- a/activemq-client/src/main/java/org/apache/activemq/transport/tcp/TcpTransportServer.java +++ b/activemq-client/src/main/java/org/apache/activemq/transport/tcp/TcpTransportServer.java @@ -112,6 +112,7 @@ public class TcpTransportServer extends TransportServerThreadSupport implements * TransportConnector URIs. */ protected boolean startLogging = true; + protected int jmxPort = TransportLoggerSupport.defaultJmxPort; protected final ServerSocketFactory serverSocketFactory; protected final BlockingQueue<Socket> socketQueue = new LinkedBlockingQueue<Socket>(); protected Thread socketHandlerThread; @@ -258,6 +259,14 @@ public class TcpTransportServer extends TransportServerThreadSupport implements this.dynamicManagement = useJmx; } + public void setJmxPort(int jmxPort) { + this.jmxPort = jmxPort; + } + + public int getJmxPort() { + return jmxPort; + } + public boolean isStartLogging() { return startLogging; } @@ -577,6 +586,7 @@ public class TcpTransportServer extends TransportServerThreadSupport implements options.put("logWriterName", logWriterName); options.put("dynamicManagement", Boolean.valueOf(dynamicManagement)); options.put("startLogging", Boolean.valueOf(startLogging)); + options.put("jmxPort", Integer.valueOf(jmxPort)); options.putAll(transportOptions); TransportInfo transportInfo = configureTransport(this, socket); http://git-wip-us.apache.org/repos/asf/activemq/blob/5385fd1b/activemq-unit-tests/src/test/java/org/apache/activemq/store/kahadb/KahaDBTest.java ---------------------------------------------------------------------- diff --git a/activemq-unit-tests/src/test/java/org/apache/activemq/store/kahadb/KahaDBTest.java b/activemq-unit-tests/src/test/java/org/apache/activemq/store/kahadb/KahaDBTest.java index bd81524..74d2bb8 100644 --- a/activemq-unit-tests/src/test/java/org/apache/activemq/store/kahadb/KahaDBTest.java +++ b/activemq-unit-tests/src/test/java/org/apache/activemq/store/kahadb/KahaDBTest.java @@ -226,7 +226,7 @@ public class KahaDBTest extends TestCase { assertEquals("Expected to received all messages.", count, 100); broker.stop(); - Logger.getRootLogger().addAppender(appender); + Logger.getRootLogger().removeAppender(appender); assertFalse("Did not replay any records from the journal", didSomeRecovery.get()); } http://git-wip-us.apache.org/repos/asf/activemq/blob/5385fd1b/activemq-unit-tests/src/test/java/org/apache/activemq/usecases/AMQ6446Test.java ---------------------------------------------------------------------- diff --git a/activemq-unit-tests/src/test/java/org/apache/activemq/usecases/AMQ6446Test.java b/activemq-unit-tests/src/test/java/org/apache/activemq/usecases/AMQ6446Test.java new file mode 100644 index 0000000..ecc570f --- /dev/null +++ b/activemq-unit-tests/src/test/java/org/apache/activemq/usecases/AMQ6446Test.java @@ -0,0 +1,152 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.activemq.usecases; + +import org.apache.activemq.ActiveMQConnectionFactory; +import org.apache.activemq.broker.BrokerFactory; +import org.apache.activemq.broker.BrokerService; +import org.apache.activemq.util.DefaultTestAppender; +import org.apache.log4j.Level; +import org.apache.log4j.Logger; +import org.apache.log4j.spi.LoggingEvent; +import org.junit.After; +import org.junit.Before; +import org.junit.Test; + +import javax.jms.Connection; +import java.util.Collection; +import java.util.HashSet; +import java.util.LinkedList; +import java.util.regex.Matcher; +import java.util.regex.Pattern; + +import static org.junit.Assert.assertFalse; +import static org.junit.Assert.assertTrue; + +public class AMQ6446Test { + + private BrokerService brokerService; + LinkedList<Connection> connections = new LinkedList<>(); + + @Test + public void test2Connections() throws Exception { + final String urlTraceParam = "?trace=true"; + startBroker(urlTraceParam); + final HashSet<String> loggers = new HashSet<String>(); + final HashSet<String> messages = new HashSet<String>(); + + DefaultTestAppender appender = new DefaultTestAppender() { + @Override + public void doAppend(LoggingEvent event) { + loggers.add(event.getLoggerName()); + messages.add(event.getRenderedMessage()); + } + }; + + Logger.getRootLogger().addAppender(appender); + Logger.getRootLogger().setLevel(Level.DEBUG); + + String brokerUrlWithTrace = brokerService.getTransportConnectorByScheme("tcp").getPublishableConnectString() + + urlTraceParam; + ActiveMQConnectionFactory factory = new ActiveMQConnectionFactory(brokerUrlWithTrace); + + for (int i=0; i<2; i++) { + Connection c = factory.createConnection(); + c.start(); + connections.add(c); + } + + Logger.getRootLogger().removeAppender(appender); + + // no logger ends with :2 + assertFalse(foundMatch(loggers, ".*:2$")); + + // starts with 000000x: + assertTrue(foundMatch(messages, "^0+\\d:.*")); + } + + public boolean foundMatch(Collection<String> values, String regex) { + boolean found = false; + Pattern p = Pattern.compile(regex); + + for (String input: values) { + Matcher m = p.matcher(input); + found = m.matches(); + if (found) { + break; + } + } + return found; + } + + @Test + public void test2ConnectionsLegacy() throws Exception { + final String legacySupportParam = "?trace=true&jmxPort=22"; + startBroker(legacySupportParam); + + final HashSet<String> loggers = new HashSet<String>(); + final HashSet<String> messages = new HashSet<String>(); + + DefaultTestAppender appender = new DefaultTestAppender() { + @Override + public void doAppend(LoggingEvent event) { + loggers.add(event.getLoggerName()); + messages.add(event.getRenderedMessage()); + } + }; + + Logger.getRootLogger().addAppender(appender); + Logger.getRootLogger().setLevel(Level.TRACE); + + String brokerUrlWithTrace = brokerService.getTransportConnectorByScheme("tcp").getPublishableConnectString() + + legacySupportParam; + ActiveMQConnectionFactory factory = new ActiveMQConnectionFactory(brokerUrlWithTrace); + + for (int i=0; i<2; i++) { + Connection c = factory.createConnection(); + c.start(); + connections.add(c); + } + + Logger.getRootLogger().removeAppender(appender); + + // logger ends with :2 + assertTrue(foundMatch(loggers, ".*:2$")); + + // starts with 000000x: + assertFalse(foundMatch(messages, "^0+\\d:.*")); + + } + + @After + public void tearDown() throws Exception { + for (Connection connection : connections) { + try { + connection.close(); + } catch (Exception ignored) {} + } + brokerService.stop(); + brokerService.waitUntilStopped(); + } + + public void startBroker(String urlParam) throws Exception { + brokerService = BrokerFactory.createBroker("broker:(tcp://0.0.0.0:0" + urlParam + ")/localhost?useJmx=false&persistent=false"); + brokerService.start(); + brokerService.waitUntilStarted(); + } + +}
