Repository: logging-log4j2 Updated Branches: refs/heads/master cc8eb1c46 -> 50f05baea
Refactor JeroMqAppender to use a manager. Project: http://git-wip-us.apache.org/repos/asf/logging-log4j2/repo Commit: http://git-wip-us.apache.org/repos/asf/logging-log4j2/commit/50f05bae Tree: http://git-wip-us.apache.org/repos/asf/logging-log4j2/tree/50f05bae Diff: http://git-wip-us.apache.org/repos/asf/logging-log4j2/diff/50f05bae Branch: refs/heads/master Commit: 50f05baea0aa488c01dc439efeb85e604435c38e Parents: cc8eb1c Author: Matt Sicker <boa...@gmail.com> Authored: Sun Mar 6 20:04:52 2016 -0600 Committer: Matt Sicker <boa...@gmail.com> Committed: Sun Mar 6 20:04:52 2016 -0600 ---------------------------------------------------------------------- .../appender/mom/jeromq/JeroMqAppender.java | 225 ++----------------- .../core/appender/mom/jeromq/JeroMqManager.java | 220 ++++++++++++++++++ .../appender/mom/jeromq/JeroMqAppenderTest.java | 12 +- 3 files changed, 243 insertions(+), 214 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/logging-log4j2/blob/50f05bae/log4j-core/src/main/java/org/apache/logging/log4j/core/appender/mom/jeromq/JeroMqAppender.java ---------------------------------------------------------------------- diff --git a/log4j-core/src/main/java/org/apache/logging/log4j/core/appender/mom/jeromq/JeroMqAppender.java b/log4j-core/src/main/java/org/apache/logging/log4j/core/appender/mom/jeromq/JeroMqAppender.java index 2ce0bfd..9f2fa41 100644 --- a/log4j-core/src/main/java/org/apache/logging/log4j/core/appender/mom/jeromq/JeroMqAppender.java +++ b/log4j-core/src/main/java/org/apache/logging/log4j/core/appender/mom/jeromq/JeroMqAppender.java @@ -19,11 +19,8 @@ package org.apache.logging.log4j.core.appender.mom.jeromq; import java.io.Serializable; import java.util.ArrayList; -import java.util.Arrays; import java.util.List; -import org.apache.logging.log4j.LogManager; -import org.apache.logging.log4j.Logger; import org.apache.logging.log4j.core.Appender; import org.apache.logging.log4j.core.Filter; import org.apache.logging.log4j.core.Layout; @@ -37,12 +34,7 @@ import org.apache.logging.log4j.core.config.plugins.PluginElement; import org.apache.logging.log4j.core.config.plugins.PluginFactory; import org.apache.logging.log4j.core.config.plugins.validation.constraints.Required; import org.apache.logging.log4j.core.layout.PatternLayout; -import org.apache.logging.log4j.core.util.ShutdownCallbackRegistry; -import org.apache.logging.log4j.status.StatusLogger; -import org.apache.logging.log4j.util.PropertiesUtil; import org.apache.logging.log4j.util.Strings; -import org.zeromq.ZMQ; -import org.zeromq.ZMQ.Socket; /** * Sends log events to one or more ZeroMQ (JeroMQ) endpoints. @@ -57,19 +49,6 @@ import org.zeromq.ZMQ.Socket; @Plugin(name = "JeroMQ", category = Node.CATEGORY, elementType = Appender.ELEMENT_TYPE, printObject = true) public final class JeroMqAppender extends AbstractAppender { - /** - * System property to enable shutdown hook. - */ - static final String SYS_PROPERTY_ENABLE_SHUTDOWN_HOOK = "log4j.jeromq.enableShutdownHook"; - - /** - * System property to control JeroMQ I/O thread count. - */ - static final String SYS_PROPERTY_IO_THREADS = "log4j.jeromq.ioThreads"; - - // Per ZMQ docs, there should usually only be one ZMQ context per process. - private static volatile ZMQ.Context context; - private static final int DEFAULT_BACKLOG = 100; private static final int DEFAULT_IVL = 100; @@ -78,58 +57,10 @@ public final class JeroMqAppender extends AbstractAppender { private static final int DEFAULT_SND_HWM = 1000; - private static Logger logger; - - // ZMQ sockets are not thread safe. - private static ZMQ.Socket publisher; - - private static final String SIMPLE_NAME = JeroMqAppender.class.getSimpleName(); - - static { - logger = StatusLogger.getLogger(); - final PropertiesUtil managerProps = PropertiesUtil.getProperties(); - final int ioThreads = managerProps.getIntegerProperty(SYS_PROPERTY_IO_THREADS, 1); - final boolean enableShutdownHook = managerProps.getBooleanProperty(SYS_PROPERTY_ENABLE_SHUTDOWN_HOOK, true); - final String simpleName = SIMPLE_NAME; - logger.trace("{} using ZMQ version {}", simpleName, ZMQ.getVersionString()); - logger.trace("{} creating ZMQ context with ioThreads={}", simpleName, ioThreads); - context = ZMQ.context(ioThreads); - logger.trace("{} created ZMQ context {}", simpleName, context); - if (enableShutdownHook) { - logger.trace("{} adding shutdown hook", simpleName); - ((ShutdownCallbackRegistry) LogManager.getFactory()).addShutdownCallback(new Runnable() { - @Override - public void run() { - shutdown(); - } - }); - } - } - - private final long affinity; - private final long backlog; - private final boolean delayAttachOnConnect; + private final JeroMqManager manager; private final List<String> endpoints; - private final byte[] identity; - private final int ioThreads = 1; - private final boolean ipv4Only; - private final long linger; - private final long maxMsgSize; - private final long rcvHwm; - private final long receiveBufferSize; - private final int receiveTimeOut; - private final long reconnectIVL; - private final long reconnectIVLMax; - private final long sendBufferSize; private int sendRcFalse; private int sendRcTrue; - private final int sendTimeOut; - private final long sndHwm; - private final int tcpKeepAlive; - private final long tcpKeepAliveCount; - private final long tcpKeepAliveIdle; - private final long tcpKeepAliveInterval; - private final boolean xpubVerbose; private JeroMqAppender(final String name, final Filter filter, final Layout<? extends Serializable> layout, final boolean ignoreExceptions, final List<String> endpoints, final long affinity, final long backlog, @@ -139,27 +70,11 @@ public final class JeroMqAppender extends AbstractAppender { final long sndHWM, final int tcpKeepAlive, final long tcpKeepAliveCount, final long tcpKeepAliveIdle, final long tcpKeepAliveInterval, final boolean xpubVerbose) { super(name, filter, layout, ignoreExceptions); + this.manager = JeroMqManager.getJeroMqManager(name, affinity, backlog, delayAttachOnConnect, identity, ipv4Only, + linger, maxMsgSize, rcvHwm, receiveBufferSize, receiveTimeOut, reconnectIVL, reconnectIVLMax, + sendBufferSize, sendTimeOut, sndHWM, tcpKeepAlive, tcpKeepAliveCount, tcpKeepAliveIdle, + tcpKeepAliveInterval, xpubVerbose, endpoints); this.endpoints = endpoints; - this.affinity = affinity; - this.backlog = backlog; - this.delayAttachOnConnect = delayAttachOnConnect; - this.identity = identity; - this.ipv4Only = ipv4Only; - this.linger = linger; - this.maxMsgSize = maxMsgSize; - this.rcvHwm = rcvHwm; - this.receiveBufferSize = receiveBufferSize; - this.receiveTimeOut = receiveTimeOut; - this.reconnectIVL = reconnectIVL; - this.reconnectIVLMax = reconnectIVLMax; - this.sendBufferSize = sendBufferSize; - this.sendTimeOut = sendTimeOut; - this.sndHwm = sndHWM; - this.tcpKeepAlive = tcpKeepAlive; - this.tcpKeepAliveCount = tcpKeepAliveCount; - this.tcpKeepAliveIdle = tcpKeepAliveIdle; - this.tcpKeepAliveInterval = tcpKeepAliveInterval; - this.xpubVerbose = xpubVerbose; } // The ZMQ.Socket class has other set methods that we do not cover because @@ -213,7 +128,7 @@ public final class JeroMqAppender extends AbstractAppender { } } } - logger.debug("Creating JeroMqAppender with name={}, filter={}, layout={}, ignoreExceptions={}, endpoints={}", + LOGGER.debug("Creating JeroMqAppender with name={}, filter={}, layout={}, ignoreExceptions={}, endpoints={}", name, filter, layout, ignoreExceptions, endpoints); return new JeroMqAppender(name, filter, layout, ignoreExceptions, endpoints, affinity, backlog, delayAttachOnConnect, identity, ipv4Only, linger, maxMsgSize, rcvHwm, receiveBufferSize, @@ -221,41 +136,24 @@ public final class JeroMqAppender extends AbstractAppender { tcpKeepAliveCount, tcpKeepAliveIdle, tcpKeepAliveInterval, xpubVerbose); } - static ZMQ.Context getContext() { - return context; - } - - private static ZMQ.Socket getPublisher() { - return publisher; - } - - private static ZMQ.Socket newPublisher() { - logger.trace("{} creating a new ZMQ PUB socket with context {}", SIMPLE_NAME, context); - final Socket socketPub = context.socket(ZMQ.PUB); - logger.trace("{} created new ZMQ PUB socket {}", SIMPLE_NAME, socketPub); - return socketPub; - } - - static void shutdown() { - if (context != null) { - logger.trace("{} terminating JeroMQ context {}", SIMPLE_NAME, context); - context.term(); - context = null; - } - } - @Override public synchronized void append(final LogEvent event) { final Layout<? extends Serializable> layout = getLayout(); final byte[] formattedMessage = layout.toByteArray(event); - if (getPublisher().send(getLayout().toByteArray(event))) { + if (manager.send(getLayout().toByteArray(event))) { sendRcTrue++; } else { sendRcFalse++; - logger.error("Appender {} could not send message {} to JeroMQ {}", getName(), sendRcFalse, formattedMessage); + LOGGER.error("Appender {} could not send message {} to JeroMQ {}", getName(), sendRcFalse, formattedMessage); } } + @Override + public void stop() { + manager.release(); + super.stop(); + } + // not public, handy for testing int getSendRcFalse() { return sendRcFalse; @@ -272,95 +170,12 @@ public final class JeroMqAppender extends AbstractAppender { } @Override - public synchronized void start() { - super.start(); - publisher = newPublisher(); - final String name = getName(); - final String prefix = "JeroMQ Appender"; - logger.debug("Starting {} {} using ZMQ version {}", prefix, name, ZMQ.getVersionString()); - logger.debug("{} {} context {} with ioThreads={}", prefix, name, context, ioThreads); - // - final ZMQ.Socket socketPub = getPublisher(); - logger.trace("{} {} setting {} publisher properties for instance {}", prefix, name, socketPub.getClass() - .getName(), socketPub); - logger.trace("{} {} publisher setAffinity({})", prefix, name, affinity); - socketPub.setAffinity(affinity); - logger.trace("{} {} publisher setBacklog({})", prefix, name, backlog); - socketPub.setBacklog(backlog); - logger.trace("{} {} publisher setDelayAttachOnConnect({})", prefix, name, delayAttachOnConnect); - socketPub.setDelayAttachOnConnect(delayAttachOnConnect); - if (identity != null) { - logger.trace("{} {} publisher setIdentity({})", prefix, name, Arrays.toString(identity)); - socketPub.setIdentity(identity); - } - logger.trace("{} {} publisher setIPv4Only({})", prefix, name, ipv4Only); - socketPub.setIPv4Only(ipv4Only); - logger.trace("{} {} publisher setLinger({})", prefix, name, linger); - socketPub.setLinger(linger); - logger.trace("{} {} publisher setMaxMsgSize({})", prefix, name, maxMsgSize); - socketPub.setMaxMsgSize(maxMsgSize); - logger.trace("{} {} publisher setRcvHWM({})", prefix, name, rcvHwm); - socketPub.setRcvHWM(rcvHwm); - logger.trace("{} {} publisher setReceiveBufferSize({})", prefix, name, receiveBufferSize); - socketPub.setReceiveBufferSize(receiveBufferSize); - logger.trace("{} {} publisher setReceiveTimeOut({})", prefix, name, receiveTimeOut); - socketPub.setReceiveTimeOut(receiveTimeOut); - logger.trace("{} {} publisher setReconnectIVL({})", prefix, name, reconnectIVL); - socketPub.setReconnectIVL(reconnectIVL); - logger.trace("{} {} publisher setReconnectIVLMax({})", prefix, name, reconnectIVLMax); - socketPub.setReconnectIVLMax(reconnectIVLMax); - logger.trace("{} {} publisher setSendBufferSize({})", prefix, name, sendBufferSize); - socketPub.setSendBufferSize(sendBufferSize); - logger.trace("{} {} publisher setSendTimeOut({})", prefix, name, sendTimeOut); - socketPub.setSendTimeOut(sendTimeOut); - logger.trace("{} {} publisher setSndHWM({})", prefix, name, sndHwm); - socketPub.setSndHWM(sndHwm); - logger.trace("{} {} publisher setTCPKeepAlive({})", prefix, name, tcpKeepAlive); - socketPub.setTCPKeepAlive(tcpKeepAlive); - logger.trace("{} {} publisher setTCPKeepAliveCount({})", prefix, name, tcpKeepAliveCount); - socketPub.setTCPKeepAliveCount(tcpKeepAliveCount); - logger.trace("{} {} publisher setTCPKeepAliveIdle({})", prefix, name, tcpKeepAliveIdle); - socketPub.setTCPKeepAliveIdle(tcpKeepAliveIdle); - logger.trace("{} {} publisher setTCPKeepAliveInterval({})", prefix, name, tcpKeepAliveInterval); - socketPub.setTCPKeepAliveInterval(tcpKeepAliveInterval); - logger.trace("{} {} publisher setXpubVerbose({})", prefix, name, xpubVerbose); - socketPub.setXpubVerbose(xpubVerbose); - // - if (logger.isDebugEnabled()) { - logger.debug( - "Created JeroMQ {} publisher {} type {}, affinity={}, backlog={}, delayAttachOnConnect={}, events={}, IPv4Only={}, linger={}, maxMsgSize={}, multicastHops={}, " - + "rate={}, rcvHWM={}, receiveBufferSize={}, receiveTimeOut={}, reconnectIVL={}, reconnectIVLMax={}, recoveryInterval={}, sendBufferSize={}, " - + "sendTimeOut={}, sndHWM={}, TCPKeepAlive={}, TCPKeepAliveCount={}, TCPKeepAliveIdle={}, TCPKeepAliveInterval={}, TCPKeepAliveSetting={}", - name, socketPub, socketPub.getType(), socketPub.getAffinity(), socketPub.getBacklog(), - socketPub.getDelayAttachOnConnect(), socketPub.getEvents(), socketPub.getIPv4Only(), - socketPub.getLinger(), socketPub.getMaxMsgSize(), socketPub.getMulticastHops(), - socketPub.getRate(), socketPub.getRcvHWM(), socketPub.getReceiveBufferSize(), - socketPub.getReceiveTimeOut(), socketPub.getReconnectIVL(), socketPub.getReconnectIVLMax(), - socketPub.getRecoveryInterval(), socketPub.getSendBufferSize(), socketPub.getSendTimeOut(), - socketPub.getSndHWM(), socketPub.getTCPKeepAlive(), socketPub.getTCPKeepAliveCount(), - socketPub.getTCPKeepAliveIdle(), socketPub.getTCPKeepAliveInterval(), - socketPub.getTCPKeepAliveSetting()); - } - for (final String endpoint : endpoints) { - logger.debug("Binding {} appender {} to endpoint {}", SIMPLE_NAME, name, endpoint); - socketPub.bind(endpoint); - } - } - - @Override - public synchronized void stop() { - super.stop(); - final Socket socketPub = getPublisher(); - if (socketPub != null) { - logger.debug("Closing {} appender {} publisher {}", SIMPLE_NAME, getName(), socketPub); - socketPub.close(); - publisher = null; - } - } - - @Override public String toString() { - return "JeroMqAppender [context=" + context + ", publisher=" + publisher + ", endpoints=" + endpoints + "]"; + return "JeroMqAppender{" + + "name=" + getName() + + ", state=" + getState() + + ", manager=" + manager + + ", endpoints=" + endpoints + + '}'; } - } http://git-wip-us.apache.org/repos/asf/logging-log4j2/blob/50f05bae/log4j-core/src/main/java/org/apache/logging/log4j/core/appender/mom/jeromq/JeroMqManager.java ---------------------------------------------------------------------- diff --git a/log4j-core/src/main/java/org/apache/logging/log4j/core/appender/mom/jeromq/JeroMqManager.java b/log4j-core/src/main/java/org/apache/logging/log4j/core/appender/mom/jeromq/JeroMqManager.java new file mode 100644 index 0000000..e84ab6d --- /dev/null +++ b/log4j-core/src/main/java/org/apache/logging/log4j/core/appender/mom/jeromq/JeroMqManager.java @@ -0,0 +1,220 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache license, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the license for the specific language governing permissions and + * limitations under the license. + */ + +package org.apache.logging.log4j.core.appender.mom.jeromq; + +import java.util.Arrays; +import java.util.List; + +import org.apache.logging.log4j.LogManager; +import org.apache.logging.log4j.core.appender.AbstractManager; +import org.apache.logging.log4j.core.appender.ManagerFactory; +import org.apache.logging.log4j.core.util.ShutdownCallbackRegistry; +import org.apache.logging.log4j.util.PropertiesUtil; +import org.zeromq.ZMQ; + +/** + * Manager for publishing messages via JeroMq. + * + * @since 2.6 + */ +public class JeroMqManager extends AbstractManager { + + /** + * System property to enable shutdown hook. + */ + public static final String SYS_PROPERTY_ENABLE_SHUTDOWN_HOOK = "log4j.jeromq.enableShutdownHook"; + + /** + * System property to control JeroMQ I/O thread count. + */ + public static final String SYS_PROPERTY_IO_THREADS = "log4j.jeromq.ioThreads"; + + private static final JeroMqManagerFactory FACTORY = new JeroMqManagerFactory(); + private static final ZMQ.Context CONTEXT; + + static { + LOGGER.trace("JeroMqManager using ZMQ version {}", ZMQ.getVersionString()); + + final int ioThreads = PropertiesUtil.getProperties().getIntegerProperty(SYS_PROPERTY_IO_THREADS, 1); + LOGGER.trace("JeroMqManager creating ZMQ context with ioThreads = {}", ioThreads); + CONTEXT = ZMQ.context(ioThreads); + + final boolean enableShutdownHook = PropertiesUtil.getProperties().getBooleanProperty( + SYS_PROPERTY_ENABLE_SHUTDOWN_HOOK, true); + if (enableShutdownHook) { + ((ShutdownCallbackRegistry) LogManager.getFactory()).addShutdownCallback(new Runnable() { + @Override + public void run() { + CONTEXT.close(); + } + }); + } + } + + private final ZMQ.Socket publisher; + + private JeroMqManager(final String name, final JeroMqConfiguration config) { + super(name); + publisher = CONTEXT.socket(ZMQ.PUB); + publisher.setAffinity(config.affinity); + publisher.setBacklog(config.backlog); + publisher.setDelayAttachOnConnect(config.delayAttachOnConnect); + if (config.identity != null) { + publisher.setIdentity(config.identity); + } + publisher.setIPv4Only(config.ipv4Only); + publisher.setLinger(config.linger); + publisher.setMaxMsgSize(config.maxMsgSize); + publisher.setRcvHWM(config.rcvHwm); + publisher.setReceiveBufferSize(config.receiveBufferSize); + publisher.setReceiveTimeOut(config.receiveTimeOut); + publisher.setReconnectIVL(config.reconnectIVL); + publisher.setReconnectIVLMax(config.reconnectIVLMax); + publisher.setSendBufferSize(config.sendBufferSize); + publisher.setSendTimeOut(config.sendTimeOut); + publisher.setSndHWM(config.sndHwm); + publisher.setTCPKeepAlive(config.tcpKeepAlive); + publisher.setTCPKeepAliveCount(config.tcpKeepAliveCount); + publisher.setTCPKeepAliveIdle(config.tcpKeepAliveIdle); + publisher.setTCPKeepAliveInterval(config.tcpKeepAliveInterval); + publisher.setXpubVerbose(config.xpubVerbose); + for (final String endpoint : config.endpoints) { + publisher.bind(endpoint); + } + LOGGER.debug("Created JeroMqManager with {}", config); + } + + public boolean send(final byte[] data) { + return publisher.send(data); + } + + @Override + protected void releaseSub() { + publisher.close(); + } + + public static JeroMqManager getJeroMqManager(final String name, final long affinity, final long backlog, + final boolean delayAttachOnConnect, final byte[] identity, + final boolean ipv4Only, final long linger, final long maxMsgSize, + final long rcvHwm, final long receiveBufferSize, + final int receiveTimeOut, final long reconnectIVL, + final long reconnectIVLMax, final long sendBufferSize, + final int sendTimeOut, final long sndHwm, final int tcpKeepAlive, + final long tcpKeepAliveCount, final long tcpKeepAliveIdle, + final long tcpKeepAliveInterval, final boolean xpubVerbose, + final List<String> endpoints) { + return getManager(name, FACTORY, + new JeroMqConfiguration(affinity, backlog, delayAttachOnConnect, identity, ipv4Only, linger, maxMsgSize, + rcvHwm, receiveBufferSize, receiveTimeOut, reconnectIVL, reconnectIVLMax, sendBufferSize, sendTimeOut, + sndHwm, tcpKeepAlive, tcpKeepAliveCount, tcpKeepAliveIdle, tcpKeepAliveInterval, xpubVerbose, + endpoints)); + } + + public static ZMQ.Context getContext() { + return CONTEXT; + } + + private static class JeroMqConfiguration { + private final long affinity; + private final long backlog; + private final boolean delayAttachOnConnect; + private final byte[] identity; + private final boolean ipv4Only; + private final long linger; + private final long maxMsgSize; + private final long rcvHwm; + private final long receiveBufferSize; + private final int receiveTimeOut; + private final long reconnectIVL; + private final long reconnectIVLMax; + private final long sendBufferSize; + private final int sendTimeOut; + private final long sndHwm; + private final int tcpKeepAlive; + private final long tcpKeepAliveCount; + private final long tcpKeepAliveIdle; + private final long tcpKeepAliveInterval; + private final boolean xpubVerbose; + private final List<String> endpoints; + + private JeroMqConfiguration(final long affinity, final long backlog, final boolean delayAttachOnConnect, + final byte[] identity, final boolean ipv4Only, final long linger, + final long maxMsgSize, final long rcvHwm, final long receiveBufferSize, + final int receiveTimeOut, final long reconnectIVL, final long reconnectIVLMax, + final long sendBufferSize, final int sendTimeOut, final long sndHwm, + final int tcpKeepAlive, final long tcpKeepAliveCount, final long tcpKeepAliveIdle, + final long tcpKeepAliveInterval, final boolean xpubVerbose, + final List<String> endpoints) { + this.affinity = affinity; + this.backlog = backlog; + this.delayAttachOnConnect = delayAttachOnConnect; + this.identity = identity; + this.ipv4Only = ipv4Only; + this.linger = linger; + this.maxMsgSize = maxMsgSize; + this.rcvHwm = rcvHwm; + this.receiveBufferSize = receiveBufferSize; + this.receiveTimeOut = receiveTimeOut; + this.reconnectIVL = reconnectIVL; + this.reconnectIVLMax = reconnectIVLMax; + this.sendBufferSize = sendBufferSize; + this.sendTimeOut = sendTimeOut; + this.sndHwm = sndHwm; + this.tcpKeepAlive = tcpKeepAlive; + this.tcpKeepAliveCount = tcpKeepAliveCount; + this.tcpKeepAliveIdle = tcpKeepAliveIdle; + this.tcpKeepAliveInterval = tcpKeepAliveInterval; + this.xpubVerbose = xpubVerbose; + this.endpoints = endpoints; + } + + @Override + public String toString() { + return "JeroMqConfiguration{" + + "affinity=" + affinity + + ", backlog=" + backlog + + ", delayAttachOnConnect=" + delayAttachOnConnect + + ", identity=" + Arrays.toString(identity) + + ", ipv4Only=" + ipv4Only + + ", linger=" + linger + + ", maxMsgSize=" + maxMsgSize + + ", rcvHwm=" + rcvHwm + + ", receiveBufferSize=" + receiveBufferSize + + ", receiveTimeOut=" + receiveTimeOut + + ", reconnectIVL=" + reconnectIVL + + ", reconnectIVLMax=" + reconnectIVLMax + + ", sendBufferSize=" + sendBufferSize + + ", sendTimeOut=" + sendTimeOut + + ", sndHwm=" + sndHwm + + ", tcpKeepAlive=" + tcpKeepAlive + + ", tcpKeepAliveCount=" + tcpKeepAliveCount + + ", tcpKeepAliveIdle=" + tcpKeepAliveIdle + + ", tcpKeepAliveInterval=" + tcpKeepAliveInterval + + ", xpubVerbose=" + xpubVerbose + + ", endpoints=" + endpoints + + '}'; + } + } + + private static class JeroMqManagerFactory implements ManagerFactory<JeroMqManager, JeroMqConfiguration> { + @Override + public JeroMqManager createManager(final String name, final JeroMqConfiguration data) { + return new JeroMqManager(name, data); + } + } +} http://git-wip-us.apache.org/repos/asf/logging-log4j2/blob/50f05bae/log4j-core/src/test/java/org/apache/logging/log4j/core/appender/mom/jeromq/JeroMqAppenderTest.java ---------------------------------------------------------------------- diff --git a/log4j-core/src/test/java/org/apache/logging/log4j/core/appender/mom/jeromq/JeroMqAppenderTest.java b/log4j-core/src/test/java/org/apache/logging/log4j/core/appender/mom/jeromq/JeroMqAppenderTest.java index 385d63d..fc85aab 100644 --- a/log4j-core/src/test/java/org/apache/logging/log4j/core/appender/mom/jeromq/JeroMqAppenderTest.java +++ b/log4j-core/src/test/java/org/apache/logging/log4j/core/appender/mom/jeromq/JeroMqAppenderTest.java @@ -25,18 +25,12 @@ import java.util.concurrent.Future; import org.apache.logging.log4j.ThreadContext; import org.apache.logging.log4j.core.Logger; import org.apache.logging.log4j.junit.LoggerContextRule; -import org.junit.AfterClass; import org.junit.Assert; import org.junit.ClassRule; import org.junit.Test; public class JeroMqAppenderTest { - @AfterClass - public static void tearDownClass() { - // JeroMqAppender.shutdown(); - } - @ClassRule public static LoggerContextRule ctx = new LoggerContextRule("JeroMqAppenderTest.xml"); @@ -44,14 +38,14 @@ public class JeroMqAppenderTest { public void testAppenderLifeCycle() throws Exception { // do nothing to make sure the appender starts and stops without // locking up resources. - Assert.assertNotNull(JeroMqAppender.getContext()); + Assert.assertNotNull(JeroMqManager.getContext()); } @Test(timeout = 10000) public void testClientServer() throws Exception { final JeroMqAppender appender = ctx.getRequiredAppender("JeroMQAppender", JeroMqAppender.class); final int expectedReceiveCount = 3; - final JeroMqTestClient client = new JeroMqTestClient(JeroMqAppender.getContext(), "tcp://localhost:5556", expectedReceiveCount); + final JeroMqTestClient client = new JeroMqTestClient(JeroMqManager.getContext(), "tcp://localhost:5556", expectedReceiveCount); final ExecutorService executor = Executors.newSingleThreadExecutor(); try { final Future<List<String>> future = executor.submit(client); @@ -78,7 +72,7 @@ public class JeroMqAppenderTest { final int nThreads = 10; final JeroMqAppender appender = ctx.getRequiredAppender("JeroMQAppender", JeroMqAppender.class); final int expectedReceiveCount = 2 * nThreads; - final JeroMqTestClient client = new JeroMqTestClient(JeroMqAppender.getContext(), "tcp://localhost:5556", + final JeroMqTestClient client = new JeroMqTestClient(JeroMqManager.getContext(), "tcp://localhost:5556", expectedReceiveCount); final ExecutorService executor = Executors.newSingleThreadExecutor(); try {