http://git-wip-us.apache.org/repos/asf/logging-log4j2/blob/78fe32c0/log4j-core/src/main/java/org/apache/logging/log4j/core/appender/mom/jeromq/JeroMqAppender.java ---------------------------------------------------------------------- diff --git a/log4j-core/src/main/java/org/apache/logging/log4j/core/appender/mom/jeromq/JeroMqAppender.java b/log4j-core/src/main/java/org/apache/logging/log4j/core/appender/mom/jeromq/JeroMqAppender.java index 5a60653..96b2f57 100644 --- a/log4j-core/src/main/java/org/apache/logging/log4j/core/appender/mom/jeromq/JeroMqAppender.java +++ b/log4j-core/src/main/java/org/apache/logging/log4j/core/appender/mom/jeromq/JeroMqAppender.java @@ -1,185 +1,185 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache license, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the license for the specific language governing permissions and - * limitations under the license. - */ - -package org.apache.logging.log4j.core.appender.mom.jeromq; - -import java.io.Serializable; -import java.util.ArrayList; -import java.util.List; -import java.util.concurrent.TimeUnit; - -import org.apache.logging.log4j.core.Appender; -import org.apache.logging.log4j.core.Filter; -import org.apache.logging.log4j.core.Layout; -import org.apache.logging.log4j.core.LogEvent; -import org.apache.logging.log4j.core.appender.AbstractAppender; -import org.apache.logging.log4j.core.config.Node; -import org.apache.logging.log4j.core.config.Property; -import org.apache.logging.log4j.core.config.plugins.Plugin; -import org.apache.logging.log4j.core.config.plugins.PluginAttribute; -import org.apache.logging.log4j.core.config.plugins.PluginElement; -import org.apache.logging.log4j.core.config.plugins.PluginFactory; -import org.apache.logging.log4j.core.config.plugins.validation.constraints.Required; -import org.apache.logging.log4j.core.layout.PatternLayout; -import org.apache.logging.log4j.util.Strings; - -/** - * Sends log events to one or more ZeroMQ (JeroMQ) endpoints. - * <p> - * Requires the JeroMQ jar (LGPL as of 0.3.5) - * </p> - */ -// TODO -// Some methods are synchronized because a ZMQ.Socket is not thread-safe -// Using a ThreadLocal for the publisher hangs tests on shutdown. There must be -// some issue on threads owning certain resources as opposed to others. -@Plugin(name = "JeroMQ", category = Node.CATEGORY, elementType = Appender.ELEMENT_TYPE, printObject = true) -public final class JeroMqAppender extends AbstractAppender { - - private static final int DEFAULT_BACKLOG = 100; - - private static final int DEFAULT_IVL = 100; - - private static final int DEFAULT_RCV_HWM = 1000; - - private static final int DEFAULT_SND_HWM = 1000; - - private final JeroMqManager manager; - private final List<String> endpoints; - private int sendRcFalse; - private int sendRcTrue; - - private JeroMqAppender(final String name, final Filter filter, final Layout<? extends Serializable> layout, - final boolean ignoreExceptions, final List<String> endpoints, final long affinity, final long backlog, - final boolean delayAttachOnConnect, final byte[] identity, final boolean ipv4Only, final long linger, - final long maxMsgSize, final long rcvHwm, final long receiveBufferSize, final int receiveTimeOut, - final long reconnectIVL, final long reconnectIVLMax, final long sendBufferSize, final int sendTimeOut, - final long sndHWM, final int tcpKeepAlive, final long tcpKeepAliveCount, final long tcpKeepAliveIdle, - final long tcpKeepAliveInterval, final boolean xpubVerbose) { - super(name, filter, layout, ignoreExceptions); - this.manager = JeroMqManager.getJeroMqManager(name, affinity, backlog, delayAttachOnConnect, identity, ipv4Only, - linger, maxMsgSize, rcvHwm, receiveBufferSize, receiveTimeOut, reconnectIVL, reconnectIVLMax, - sendBufferSize, sendTimeOut, sndHWM, tcpKeepAlive, tcpKeepAliveCount, tcpKeepAliveIdle, - tcpKeepAliveInterval, xpubVerbose, endpoints); - this.endpoints = endpoints; - } - - // The ZMQ.Socket class has other set methods that we do not cover because - // they throw unsupported operation exceptions. - @PluginFactory - public static JeroMqAppender createAppender( - // @formatter:off - @Required(message = "No name provided for JeroMqAppender") @PluginAttribute("name") final String name, - @PluginElement("Layout") Layout<?> layout, - @PluginElement("Filter") final Filter filter, - @PluginElement("Properties") final Property[] properties, - // Super attributes - @PluginAttribute("ignoreExceptions") final boolean ignoreExceptions, - // ZMQ attributes; defaults picked from zmq.Options. - @PluginAttribute(value = "affinity", defaultLong = 0) final long affinity, - @PluginAttribute(value = "backlog", defaultLong = DEFAULT_BACKLOG) final long backlog, - @PluginAttribute(value = "delayAttachOnConnect", defaultBoolean = false) final boolean delayAttachOnConnect, - @PluginAttribute(value = "identity") final byte[] identity, - @PluginAttribute(value = "ipv4Only", defaultBoolean = true) final boolean ipv4Only, - @PluginAttribute(value = "linger", defaultLong = -1) final long linger, - @PluginAttribute(value = "maxMsgSize", defaultLong = -1) final long maxMsgSize, - @PluginAttribute(value = "rcvHwm", defaultLong = DEFAULT_RCV_HWM) final long rcvHwm, - @PluginAttribute(value = "receiveBufferSize", defaultLong = 0) final long receiveBufferSize, - @PluginAttribute(value = "receiveTimeOut", defaultLong = -1) final int receiveTimeOut, - @PluginAttribute(value = "reconnectIVL", defaultLong = DEFAULT_IVL) final long reconnectIVL, - @PluginAttribute(value = "reconnectIVLMax", defaultLong = 0) final long reconnectIVLMax, - @PluginAttribute(value = "sendBufferSize", defaultLong = 0) final long sendBufferSize, - @PluginAttribute(value = "sendTimeOut", defaultLong = -1) final int sendTimeOut, - @PluginAttribute(value = "sndHwm", defaultLong = DEFAULT_SND_HWM) final long sndHwm, - @PluginAttribute(value = "tcpKeepAlive", defaultInt = -1) final int tcpKeepAlive, - @PluginAttribute(value = "tcpKeepAliveCount", defaultLong = -1) final long tcpKeepAliveCount, - @PluginAttribute(value = "tcpKeepAliveIdle", defaultLong = -1) final long tcpKeepAliveIdle, - @PluginAttribute(value = "tcpKeepAliveInterval", defaultLong = -1) final long tcpKeepAliveInterval, - @PluginAttribute(value = "xpubVerbose", defaultBoolean = false) final boolean xpubVerbose - // @formatter:on - ) { - if (layout == null) { - layout = PatternLayout.createDefaultLayout(); - } - List<String> endpoints; - if (properties == null) { - endpoints = new ArrayList<>(0); - } else { - endpoints = new ArrayList<>(properties.length); - for (final Property property : properties) { - if ("endpoint".equalsIgnoreCase(property.getName())) { - final String value = property.getValue(); - if (Strings.isNotEmpty(value)) { - endpoints.add(value); - } - } - } - } - LOGGER.debug("Creating JeroMqAppender with name={}, filter={}, layout={}, ignoreExceptions={}, endpoints={}", - name, filter, layout, ignoreExceptions, endpoints); - return new JeroMqAppender(name, filter, layout, ignoreExceptions, endpoints, affinity, backlog, - delayAttachOnConnect, identity, ipv4Only, linger, maxMsgSize, rcvHwm, receiveBufferSize, - receiveTimeOut, reconnectIVL, reconnectIVLMax, sendBufferSize, sendTimeOut, sndHwm, tcpKeepAlive, - tcpKeepAliveCount, tcpKeepAliveIdle, tcpKeepAliveInterval, xpubVerbose); - } - - @Override - public synchronized void append(final LogEvent event) { - final Layout<? extends Serializable> layout = getLayout(); - final byte[] formattedMessage = layout.toByteArray(event); - if (manager.send(getLayout().toByteArray(event))) { - sendRcTrue++; - } else { - sendRcFalse++; - LOGGER.error("Appender {} could not send message {} to JeroMQ {}", getName(), sendRcFalse, formattedMessage); - } - } - - @Override - public boolean stop(final long timeout, final TimeUnit timeUnit) { - setStopping(); - super.stop(timeout, timeUnit, false); - manager.stop(timeout, timeUnit); - setStopped(); - return true; - } - - // not public, handy for testing - int getSendRcFalse() { - return sendRcFalse; - } - - // not public, handy for testing - int getSendRcTrue() { - return sendRcTrue; - } - - // not public, handy for testing - void resetSendRcs() { - sendRcTrue = sendRcFalse = 0; - } - - @Override - public String toString() { - return "JeroMqAppender{" + - "name=" + getName() + - ", state=" + getState() + - ", manager=" + manager + - ", endpoints=" + endpoints + - '}'; - } -} +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache license, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the license for the specific language governing permissions and + * limitations under the license. + */ + +package org.apache.logging.log4j.core.appender.mom.jeromq; + +import java.io.Serializable; +import java.util.ArrayList; +import java.util.List; +import java.util.concurrent.TimeUnit; + +import org.apache.logging.log4j.core.Appender; +import org.apache.logging.log4j.core.Filter; +import org.apache.logging.log4j.core.Layout; +import org.apache.logging.log4j.core.LogEvent; +import org.apache.logging.log4j.core.appender.AbstractAppender; +import org.apache.logging.log4j.core.config.Node; +import org.apache.logging.log4j.core.config.Property; +import org.apache.logging.log4j.core.config.plugins.Plugin; +import org.apache.logging.log4j.core.config.plugins.PluginAttribute; +import org.apache.logging.log4j.core.config.plugins.PluginElement; +import org.apache.logging.log4j.core.config.plugins.PluginFactory; +import org.apache.logging.log4j.core.config.plugins.validation.constraints.Required; +import org.apache.logging.log4j.core.layout.PatternLayout; +import org.apache.logging.log4j.util.Strings; + +/** + * Sends log events to one or more ZeroMQ (JeroMQ) endpoints. + * <p> + * Requires the JeroMQ jar (LGPL as of 0.3.5) + * </p> + */ +// TODO +// Some methods are synchronized because a ZMQ.Socket is not thread-safe +// Using a ThreadLocal for the publisher hangs tests on shutdown. There must be +// some issue on threads owning certain resources as opposed to others. +@Plugin(name = "JeroMQ", category = Node.CATEGORY, elementType = Appender.ELEMENT_TYPE, printObject = true) +public final class JeroMqAppender extends AbstractAppender { + + private static final int DEFAULT_BACKLOG = 100; + + private static final int DEFAULT_IVL = 100; + + private static final int DEFAULT_RCV_HWM = 1000; + + private static final int DEFAULT_SND_HWM = 1000; + + private final JeroMqManager manager; + private final List<String> endpoints; + private int sendRcFalse; + private int sendRcTrue; + + private JeroMqAppender(final String name, final Filter filter, final Layout<? extends Serializable> layout, + final boolean ignoreExceptions, final List<String> endpoints, final long affinity, final long backlog, + final boolean delayAttachOnConnect, final byte[] identity, final boolean ipv4Only, final long linger, + final long maxMsgSize, final long rcvHwm, final long receiveBufferSize, final int receiveTimeOut, + final long reconnectIVL, final long reconnectIVLMax, final long sendBufferSize, final int sendTimeOut, + final long sndHWM, final int tcpKeepAlive, final long tcpKeepAliveCount, final long tcpKeepAliveIdle, + final long tcpKeepAliveInterval, final boolean xpubVerbose) { + super(name, filter, layout, ignoreExceptions); + this.manager = JeroMqManager.getJeroMqManager(name, affinity, backlog, delayAttachOnConnect, identity, ipv4Only, + linger, maxMsgSize, rcvHwm, receiveBufferSize, receiveTimeOut, reconnectIVL, reconnectIVLMax, + sendBufferSize, sendTimeOut, sndHWM, tcpKeepAlive, tcpKeepAliveCount, tcpKeepAliveIdle, + tcpKeepAliveInterval, xpubVerbose, endpoints); + this.endpoints = endpoints; + } + + // The ZMQ.Socket class has other set methods that we do not cover because + // they throw unsupported operation exceptions. + @PluginFactory + public static JeroMqAppender createAppender( + // @formatter:off + @Required(message = "No name provided for JeroMqAppender") @PluginAttribute("name") final String name, + @PluginElement("Layout") Layout<?> layout, + @PluginElement("Filter") final Filter filter, + @PluginElement("Properties") final Property[] properties, + // Super attributes + @PluginAttribute("ignoreExceptions") final boolean ignoreExceptions, + // ZMQ attributes; defaults picked from zmq.Options. + @PluginAttribute(value = "affinity", defaultLong = 0) final long affinity, + @PluginAttribute(value = "backlog", defaultLong = DEFAULT_BACKLOG) final long backlog, + @PluginAttribute(value = "delayAttachOnConnect", defaultBoolean = false) final boolean delayAttachOnConnect, + @PluginAttribute(value = "identity") final byte[] identity, + @PluginAttribute(value = "ipv4Only", defaultBoolean = true) final boolean ipv4Only, + @PluginAttribute(value = "linger", defaultLong = -1) final long linger, + @PluginAttribute(value = "maxMsgSize", defaultLong = -1) final long maxMsgSize, + @PluginAttribute(value = "rcvHwm", defaultLong = DEFAULT_RCV_HWM) final long rcvHwm, + @PluginAttribute(value = "receiveBufferSize", defaultLong = 0) final long receiveBufferSize, + @PluginAttribute(value = "receiveTimeOut", defaultLong = -1) final int receiveTimeOut, + @PluginAttribute(value = "reconnectIVL", defaultLong = DEFAULT_IVL) final long reconnectIVL, + @PluginAttribute(value = "reconnectIVLMax", defaultLong = 0) final long reconnectIVLMax, + @PluginAttribute(value = "sendBufferSize", defaultLong = 0) final long sendBufferSize, + @PluginAttribute(value = "sendTimeOut", defaultLong = -1) final int sendTimeOut, + @PluginAttribute(value = "sndHwm", defaultLong = DEFAULT_SND_HWM) final long sndHwm, + @PluginAttribute(value = "tcpKeepAlive", defaultInt = -1) final int tcpKeepAlive, + @PluginAttribute(value = "tcpKeepAliveCount", defaultLong = -1) final long tcpKeepAliveCount, + @PluginAttribute(value = "tcpKeepAliveIdle", defaultLong = -1) final long tcpKeepAliveIdle, + @PluginAttribute(value = "tcpKeepAliveInterval", defaultLong = -1) final long tcpKeepAliveInterval, + @PluginAttribute(value = "xpubVerbose", defaultBoolean = false) final boolean xpubVerbose + // @formatter:on + ) { + if (layout == null) { + layout = PatternLayout.createDefaultLayout(); + } + List<String> endpoints; + if (properties == null) { + endpoints = new ArrayList<>(0); + } else { + endpoints = new ArrayList<>(properties.length); + for (final Property property : properties) { + if ("endpoint".equalsIgnoreCase(property.getName())) { + final String value = property.getValue(); + if (Strings.isNotEmpty(value)) { + endpoints.add(value); + } + } + } + } + LOGGER.debug("Creating JeroMqAppender with name={}, filter={}, layout={}, ignoreExceptions={}, endpoints={}", + name, filter, layout, ignoreExceptions, endpoints); + return new JeroMqAppender(name, filter, layout, ignoreExceptions, endpoints, affinity, backlog, + delayAttachOnConnect, identity, ipv4Only, linger, maxMsgSize, rcvHwm, receiveBufferSize, + receiveTimeOut, reconnectIVL, reconnectIVLMax, sendBufferSize, sendTimeOut, sndHwm, tcpKeepAlive, + tcpKeepAliveCount, tcpKeepAliveIdle, tcpKeepAliveInterval, xpubVerbose); + } + + @Override + public synchronized void append(final LogEvent event) { + final Layout<? extends Serializable> layout = getLayout(); + final byte[] formattedMessage = layout.toByteArray(event); + if (manager.send(getLayout().toByteArray(event))) { + sendRcTrue++; + } else { + sendRcFalse++; + LOGGER.error("Appender {} could not send message {} to JeroMQ {}", getName(), sendRcFalse, formattedMessage); + } + } + + @Override + public boolean stop(final long timeout, final TimeUnit timeUnit) { + setStopping(); + boolean stopped = super.stop(timeout, timeUnit, false); + stopped &= manager.stop(timeout, timeUnit); + setStopped(); + return stopped; + } + + // not public, handy for testing + int getSendRcFalse() { + return sendRcFalse; + } + + // not public, handy for testing + int getSendRcTrue() { + return sendRcTrue; + } + + // not public, handy for testing + void resetSendRcs() { + sendRcTrue = sendRcFalse = 0; + } + + @Override + public String toString() { + return "JeroMqAppender{" + + "name=" + getName() + + ", state=" + getState() + + ", manager=" + manager + + ", endpoints=" + endpoints + + '}'; + } +}
http://git-wip-us.apache.org/repos/asf/logging-log4j2/blob/78fe32c0/log4j-core/src/main/java/org/apache/logging/log4j/core/appender/mom/jeromq/JeroMqManager.java ---------------------------------------------------------------------- diff --git a/log4j-core/src/main/java/org/apache/logging/log4j/core/appender/mom/jeromq/JeroMqManager.java b/log4j-core/src/main/java/org/apache/logging/log4j/core/appender/mom/jeromq/JeroMqManager.java index f731d61..a438faf 100644 --- a/log4j-core/src/main/java/org/apache/logging/log4j/core/appender/mom/jeromq/JeroMqManager.java +++ b/log4j-core/src/main/java/org/apache/logging/log4j/core/appender/mom/jeromq/JeroMqManager.java @@ -1,221 +1,222 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache license, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the license for the specific language governing permissions and - * limitations under the license. - */ - -package org.apache.logging.log4j.core.appender.mom.jeromq; - -import java.util.Arrays; -import java.util.List; -import java.util.concurrent.TimeUnit; - -import org.apache.logging.log4j.LogManager; -import org.apache.logging.log4j.core.appender.AbstractManager; -import org.apache.logging.log4j.core.appender.ManagerFactory; -import org.apache.logging.log4j.core.util.ShutdownCallbackRegistry; -import org.apache.logging.log4j.util.PropertiesUtil; -import org.zeromq.ZMQ; - -/** - * Manager for publishing messages via JeroMq. - * - * @since 2.6 - */ -public class JeroMqManager extends AbstractManager { - - /** - * System property to enable shutdown hook. - */ - public static final String SYS_PROPERTY_ENABLE_SHUTDOWN_HOOK = "log4j.jeromq.enableShutdownHook"; - - /** - * System property to control JeroMQ I/O thread count. - */ - public static final String SYS_PROPERTY_IO_THREADS = "log4j.jeromq.ioThreads"; - - private static final JeroMqManagerFactory FACTORY = new JeroMqManagerFactory(); - private static final ZMQ.Context CONTEXT; - - static { - LOGGER.trace("JeroMqManager using ZMQ version {}", ZMQ.getVersionString()); - - final int ioThreads = PropertiesUtil.getProperties().getIntegerProperty(SYS_PROPERTY_IO_THREADS, 1); - LOGGER.trace("JeroMqManager creating ZMQ context with ioThreads = {}", ioThreads); - CONTEXT = ZMQ.context(ioThreads); - - final boolean enableShutdownHook = PropertiesUtil.getProperties().getBooleanProperty( - SYS_PROPERTY_ENABLE_SHUTDOWN_HOOK, true); - if (enableShutdownHook) { - ((ShutdownCallbackRegistry) LogManager.getFactory()).addShutdownCallback(new Runnable() { - @Override - public void run() { - CONTEXT.close(); - } - }); - } - } - - private final ZMQ.Socket publisher; - - private JeroMqManager(final String name, final JeroMqConfiguration config) { - super(null, name); - publisher = CONTEXT.socket(ZMQ.PUB); - publisher.setAffinity(config.affinity); - publisher.setBacklog(config.backlog); - publisher.setDelayAttachOnConnect(config.delayAttachOnConnect); - if (config.identity != null) { - publisher.setIdentity(config.identity); - } - publisher.setIPv4Only(config.ipv4Only); - publisher.setLinger(config.linger); - publisher.setMaxMsgSize(config.maxMsgSize); - publisher.setRcvHWM(config.rcvHwm); - publisher.setReceiveBufferSize(config.receiveBufferSize); - publisher.setReceiveTimeOut(config.receiveTimeOut); - publisher.setReconnectIVL(config.reconnectIVL); - publisher.setReconnectIVLMax(config.reconnectIVLMax); - publisher.setSendBufferSize(config.sendBufferSize); - publisher.setSendTimeOut(config.sendTimeOut); - publisher.setSndHWM(config.sndHwm); - publisher.setTCPKeepAlive(config.tcpKeepAlive); - publisher.setTCPKeepAliveCount(config.tcpKeepAliveCount); - publisher.setTCPKeepAliveIdle(config.tcpKeepAliveIdle); - publisher.setTCPKeepAliveInterval(config.tcpKeepAliveInterval); - publisher.setXpubVerbose(config.xpubVerbose); - for (final String endpoint : config.endpoints) { - publisher.bind(endpoint); - } - LOGGER.debug("Created JeroMqManager with {}", config); - } - - public boolean send(final byte[] data) { - return publisher.send(data); - } - - @Override - protected void releaseSub(final long timeout, final TimeUnit timeUnit) { - publisher.close(); - } - - public static JeroMqManager getJeroMqManager(final String name, final long affinity, final long backlog, - final boolean delayAttachOnConnect, final byte[] identity, - final boolean ipv4Only, final long linger, final long maxMsgSize, - final long rcvHwm, final long receiveBufferSize, - final int receiveTimeOut, final long reconnectIVL, - final long reconnectIVLMax, final long sendBufferSize, - final int sendTimeOut, final long sndHwm, final int tcpKeepAlive, - final long tcpKeepAliveCount, final long tcpKeepAliveIdle, - final long tcpKeepAliveInterval, final boolean xpubVerbose, - final List<String> endpoints) { - return getManager(name, FACTORY, - new JeroMqConfiguration(affinity, backlog, delayAttachOnConnect, identity, ipv4Only, linger, maxMsgSize, - rcvHwm, receiveBufferSize, receiveTimeOut, reconnectIVL, reconnectIVLMax, sendBufferSize, sendTimeOut, - sndHwm, tcpKeepAlive, tcpKeepAliveCount, tcpKeepAliveIdle, tcpKeepAliveInterval, xpubVerbose, - endpoints)); - } - - public static ZMQ.Context getContext() { - return CONTEXT; - } - - private static class JeroMqConfiguration { - private final long affinity; - private final long backlog; - private final boolean delayAttachOnConnect; - private final byte[] identity; - private final boolean ipv4Only; - private final long linger; - private final long maxMsgSize; - private final long rcvHwm; - private final long receiveBufferSize; - private final int receiveTimeOut; - private final long reconnectIVL; - private final long reconnectIVLMax; - private final long sendBufferSize; - private final int sendTimeOut; - private final long sndHwm; - private final int tcpKeepAlive; - private final long tcpKeepAliveCount; - private final long tcpKeepAliveIdle; - private final long tcpKeepAliveInterval; - private final boolean xpubVerbose; - private final List<String> endpoints; - - private JeroMqConfiguration(final long affinity, final long backlog, final boolean delayAttachOnConnect, - final byte[] identity, final boolean ipv4Only, final long linger, - final long maxMsgSize, final long rcvHwm, final long receiveBufferSize, - final int receiveTimeOut, final long reconnectIVL, final long reconnectIVLMax, - final long sendBufferSize, final int sendTimeOut, final long sndHwm, - final int tcpKeepAlive, final long tcpKeepAliveCount, final long tcpKeepAliveIdle, - final long tcpKeepAliveInterval, final boolean xpubVerbose, - final List<String> endpoints) { - this.affinity = affinity; - this.backlog = backlog; - this.delayAttachOnConnect = delayAttachOnConnect; - this.identity = identity; - this.ipv4Only = ipv4Only; - this.linger = linger; - this.maxMsgSize = maxMsgSize; - this.rcvHwm = rcvHwm; - this.receiveBufferSize = receiveBufferSize; - this.receiveTimeOut = receiveTimeOut; - this.reconnectIVL = reconnectIVL; - this.reconnectIVLMax = reconnectIVLMax; - this.sendBufferSize = sendBufferSize; - this.sendTimeOut = sendTimeOut; - this.sndHwm = sndHwm; - this.tcpKeepAlive = tcpKeepAlive; - this.tcpKeepAliveCount = tcpKeepAliveCount; - this.tcpKeepAliveIdle = tcpKeepAliveIdle; - this.tcpKeepAliveInterval = tcpKeepAliveInterval; - this.xpubVerbose = xpubVerbose; - this.endpoints = endpoints; - } - - @Override - public String toString() { - return "JeroMqConfiguration{" + - "affinity=" + affinity + - ", backlog=" + backlog + - ", delayAttachOnConnect=" + delayAttachOnConnect + - ", identity=" + Arrays.toString(identity) + - ", ipv4Only=" + ipv4Only + - ", linger=" + linger + - ", maxMsgSize=" + maxMsgSize + - ", rcvHwm=" + rcvHwm + - ", receiveBufferSize=" + receiveBufferSize + - ", receiveTimeOut=" + receiveTimeOut + - ", reconnectIVL=" + reconnectIVL + - ", reconnectIVLMax=" + reconnectIVLMax + - ", sendBufferSize=" + sendBufferSize + - ", sendTimeOut=" + sendTimeOut + - ", sndHwm=" + sndHwm + - ", tcpKeepAlive=" + tcpKeepAlive + - ", tcpKeepAliveCount=" + tcpKeepAliveCount + - ", tcpKeepAliveIdle=" + tcpKeepAliveIdle + - ", tcpKeepAliveInterval=" + tcpKeepAliveInterval + - ", xpubVerbose=" + xpubVerbose + - ", endpoints=" + endpoints + - '}'; - } - } - - private static class JeroMqManagerFactory implements ManagerFactory<JeroMqManager, JeroMqConfiguration> { - @Override - public JeroMqManager createManager(final String name, final JeroMqConfiguration data) { - return new JeroMqManager(name, data); - } - } -} +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache license, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the license for the specific language governing permissions and + * limitations under the license. + */ + +package org.apache.logging.log4j.core.appender.mom.jeromq; + +import java.util.Arrays; +import java.util.List; +import java.util.concurrent.TimeUnit; + +import org.apache.logging.log4j.LogManager; +import org.apache.logging.log4j.core.appender.AbstractManager; +import org.apache.logging.log4j.core.appender.ManagerFactory; +import org.apache.logging.log4j.core.util.ShutdownCallbackRegistry; +import org.apache.logging.log4j.util.PropertiesUtil; +import org.zeromq.ZMQ; + +/** + * Manager for publishing messages via JeroMq. + * + * @since 2.6 + */ +public class JeroMqManager extends AbstractManager { + + /** + * System property to enable shutdown hook. + */ + public static final String SYS_PROPERTY_ENABLE_SHUTDOWN_HOOK = "log4j.jeromq.enableShutdownHook"; + + /** + * System property to control JeroMQ I/O thread count. + */ + public static final String SYS_PROPERTY_IO_THREADS = "log4j.jeromq.ioThreads"; + + private static final JeroMqManagerFactory FACTORY = new JeroMqManagerFactory(); + private static final ZMQ.Context CONTEXT; + + static { + LOGGER.trace("JeroMqManager using ZMQ version {}", ZMQ.getVersionString()); + + final int ioThreads = PropertiesUtil.getProperties().getIntegerProperty(SYS_PROPERTY_IO_THREADS, 1); + LOGGER.trace("JeroMqManager creating ZMQ context with ioThreads = {}", ioThreads); + CONTEXT = ZMQ.context(ioThreads); + + final boolean enableShutdownHook = PropertiesUtil.getProperties().getBooleanProperty( + SYS_PROPERTY_ENABLE_SHUTDOWN_HOOK, true); + if (enableShutdownHook) { + ((ShutdownCallbackRegistry) LogManager.getFactory()).addShutdownCallback(new Runnable() { + @Override + public void run() { + CONTEXT.close(); + } + }); + } + } + + private final ZMQ.Socket publisher; + + private JeroMqManager(final String name, final JeroMqConfiguration config) { + super(null, name); + publisher = CONTEXT.socket(ZMQ.PUB); + publisher.setAffinity(config.affinity); + publisher.setBacklog(config.backlog); + publisher.setDelayAttachOnConnect(config.delayAttachOnConnect); + if (config.identity != null) { + publisher.setIdentity(config.identity); + } + publisher.setIPv4Only(config.ipv4Only); + publisher.setLinger(config.linger); + publisher.setMaxMsgSize(config.maxMsgSize); + publisher.setRcvHWM(config.rcvHwm); + publisher.setReceiveBufferSize(config.receiveBufferSize); + publisher.setReceiveTimeOut(config.receiveTimeOut); + publisher.setReconnectIVL(config.reconnectIVL); + publisher.setReconnectIVLMax(config.reconnectIVLMax); + publisher.setSendBufferSize(config.sendBufferSize); + publisher.setSendTimeOut(config.sendTimeOut); + publisher.setSndHWM(config.sndHwm); + publisher.setTCPKeepAlive(config.tcpKeepAlive); + publisher.setTCPKeepAliveCount(config.tcpKeepAliveCount); + publisher.setTCPKeepAliveIdle(config.tcpKeepAliveIdle); + publisher.setTCPKeepAliveInterval(config.tcpKeepAliveInterval); + publisher.setXpubVerbose(config.xpubVerbose); + for (final String endpoint : config.endpoints) { + publisher.bind(endpoint); + } + LOGGER.debug("Created JeroMqManager with {}", config); + } + + public boolean send(final byte[] data) { + return publisher.send(data); + } + + @Override + protected boolean releaseSub(final long timeout, final TimeUnit timeUnit) { + publisher.close(); + return true; + } + + public static JeroMqManager getJeroMqManager(final String name, final long affinity, final long backlog, + final boolean delayAttachOnConnect, final byte[] identity, + final boolean ipv4Only, final long linger, final long maxMsgSize, + final long rcvHwm, final long receiveBufferSize, + final int receiveTimeOut, final long reconnectIVL, + final long reconnectIVLMax, final long sendBufferSize, + final int sendTimeOut, final long sndHwm, final int tcpKeepAlive, + final long tcpKeepAliveCount, final long tcpKeepAliveIdle, + final long tcpKeepAliveInterval, final boolean xpubVerbose, + final List<String> endpoints) { + return getManager(name, FACTORY, + new JeroMqConfiguration(affinity, backlog, delayAttachOnConnect, identity, ipv4Only, linger, maxMsgSize, + rcvHwm, receiveBufferSize, receiveTimeOut, reconnectIVL, reconnectIVLMax, sendBufferSize, sendTimeOut, + sndHwm, tcpKeepAlive, tcpKeepAliveCount, tcpKeepAliveIdle, tcpKeepAliveInterval, xpubVerbose, + endpoints)); + } + + public static ZMQ.Context getContext() { + return CONTEXT; + } + + private static class JeroMqConfiguration { + private final long affinity; + private final long backlog; + private final boolean delayAttachOnConnect; + private final byte[] identity; + private final boolean ipv4Only; + private final long linger; + private final long maxMsgSize; + private final long rcvHwm; + private final long receiveBufferSize; + private final int receiveTimeOut; + private final long reconnectIVL; + private final long reconnectIVLMax; + private final long sendBufferSize; + private final int sendTimeOut; + private final long sndHwm; + private final int tcpKeepAlive; + private final long tcpKeepAliveCount; + private final long tcpKeepAliveIdle; + private final long tcpKeepAliveInterval; + private final boolean xpubVerbose; + private final List<String> endpoints; + + private JeroMqConfiguration(final long affinity, final long backlog, final boolean delayAttachOnConnect, + final byte[] identity, final boolean ipv4Only, final long linger, + final long maxMsgSize, final long rcvHwm, final long receiveBufferSize, + final int receiveTimeOut, final long reconnectIVL, final long reconnectIVLMax, + final long sendBufferSize, final int sendTimeOut, final long sndHwm, + final int tcpKeepAlive, final long tcpKeepAliveCount, final long tcpKeepAliveIdle, + final long tcpKeepAliveInterval, final boolean xpubVerbose, + final List<String> endpoints) { + this.affinity = affinity; + this.backlog = backlog; + this.delayAttachOnConnect = delayAttachOnConnect; + this.identity = identity; + this.ipv4Only = ipv4Only; + this.linger = linger; + this.maxMsgSize = maxMsgSize; + this.rcvHwm = rcvHwm; + this.receiveBufferSize = receiveBufferSize; + this.receiveTimeOut = receiveTimeOut; + this.reconnectIVL = reconnectIVL; + this.reconnectIVLMax = reconnectIVLMax; + this.sendBufferSize = sendBufferSize; + this.sendTimeOut = sendTimeOut; + this.sndHwm = sndHwm; + this.tcpKeepAlive = tcpKeepAlive; + this.tcpKeepAliveCount = tcpKeepAliveCount; + this.tcpKeepAliveIdle = tcpKeepAliveIdle; + this.tcpKeepAliveInterval = tcpKeepAliveInterval; + this.xpubVerbose = xpubVerbose; + this.endpoints = endpoints; + } + + @Override + public String toString() { + return "JeroMqConfiguration{" + + "affinity=" + affinity + + ", backlog=" + backlog + + ", delayAttachOnConnect=" + delayAttachOnConnect + + ", identity=" + Arrays.toString(identity) + + ", ipv4Only=" + ipv4Only + + ", linger=" + linger + + ", maxMsgSize=" + maxMsgSize + + ", rcvHwm=" + rcvHwm + + ", receiveBufferSize=" + receiveBufferSize + + ", receiveTimeOut=" + receiveTimeOut + + ", reconnectIVL=" + reconnectIVL + + ", reconnectIVLMax=" + reconnectIVLMax + + ", sendBufferSize=" + sendBufferSize + + ", sendTimeOut=" + sendTimeOut + + ", sndHwm=" + sndHwm + + ", tcpKeepAlive=" + tcpKeepAlive + + ", tcpKeepAliveCount=" + tcpKeepAliveCount + + ", tcpKeepAliveIdle=" + tcpKeepAliveIdle + + ", tcpKeepAliveInterval=" + tcpKeepAliveInterval + + ", xpubVerbose=" + xpubVerbose + + ", endpoints=" + endpoints + + '}'; + } + } + + private static class JeroMqManagerFactory implements ManagerFactory<JeroMqManager, JeroMqConfiguration> { + @Override + public JeroMqManager createManager(final String name, final JeroMqConfiguration data) { + return new JeroMqManager(name, data); + } + } +} http://git-wip-us.apache.org/repos/asf/logging-log4j2/blob/78fe32c0/log4j-core/src/main/java/org/apache/logging/log4j/core/appender/mom/kafka/KafkaAppender.java ---------------------------------------------------------------------- diff --git a/log4j-core/src/main/java/org/apache/logging/log4j/core/appender/mom/kafka/KafkaAppender.java b/log4j-core/src/main/java/org/apache/logging/log4j/core/appender/mom/kafka/KafkaAppender.java index bee9437..787f655 100644 --- a/log4j-core/src/main/java/org/apache/logging/log4j/core/appender/mom/kafka/KafkaAppender.java +++ b/log4j-core/src/main/java/org/apache/logging/log4j/core/appender/mom/kafka/KafkaAppender.java @@ -1,120 +1,120 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache license, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the license for the specific language governing permissions and - * limitations under the license. - */ - -package org.apache.logging.log4j.core.appender.mom.kafka; - -import java.io.Serializable; -import java.nio.charset.StandardCharsets; -import java.util.concurrent.TimeUnit; - -import org.apache.logging.log4j.core.Appender; -import org.apache.logging.log4j.core.Filter; -import org.apache.logging.log4j.core.Layout; -import org.apache.logging.log4j.core.LogEvent; -import org.apache.logging.log4j.core.appender.AbstractAppender; -import org.apache.logging.log4j.core.appender.AppenderLoggingException; -import org.apache.logging.log4j.core.config.Configuration; -import org.apache.logging.log4j.core.config.Node; -import org.apache.logging.log4j.core.config.Property; -import org.apache.logging.log4j.core.config.plugins.Plugin; -import org.apache.logging.log4j.core.config.plugins.PluginAttribute; -import org.apache.logging.log4j.core.config.plugins.PluginConfiguration; -import org.apache.logging.log4j.core.config.plugins.PluginElement; -import org.apache.logging.log4j.core.config.plugins.PluginFactory; -import org.apache.logging.log4j.core.config.plugins.validation.constraints.Required; -import org.apache.logging.log4j.core.layout.SerializedLayout; -import org.apache.logging.log4j.core.util.StringEncoder; - -/** - * Sends log events to an Apache Kafka topic. - */ -@Plugin(name = "Kafka", category = Node.CATEGORY, elementType = Appender.ELEMENT_TYPE, printObject = true) -public final class KafkaAppender extends AbstractAppender { - - @PluginFactory - public static KafkaAppender createAppender( - @PluginElement("Layout") final Layout<? extends Serializable> layout, - @PluginElement("Filter") final Filter filter, - @Required(message = "No name provided for KafkaAppender") @PluginAttribute("name") final String name, - @PluginAttribute(value = "ignoreExceptions", defaultBoolean = true) final boolean ignoreExceptions, - @Required(message = "No topic provided for KafkaAppender") @PluginAttribute("topic") final String topic, - @PluginElement("Properties") final Property[] properties, - @PluginConfiguration final Configuration configuration) { - final KafkaManager kafkaManager = new KafkaManager(configuration.getLoggerContext(), name, topic, properties); - return new KafkaAppender(name, layout, filter, ignoreExceptions, kafkaManager); - } - - private final KafkaManager manager; - - private KafkaAppender(final String name, final Layout<? extends Serializable> layout, final Filter filter, final boolean ignoreExceptions, final KafkaManager manager) { - super(name, filter, layout, ignoreExceptions); - this.manager = manager; - } - - @Override - public void append(final LogEvent event) { - if (event.getLoggerName().startsWith("org.apache.kafka")) { - LOGGER.warn("Recursive logging from [{}] for appender [{}].", event.getLoggerName(), getName()); - } else { - try { - final Layout<? extends Serializable> layout = getLayout(); - byte[] data; - if (layout != null) { - if (layout instanceof SerializedLayout) { - final byte[] header = layout.getHeader(); - final byte[] body = layout.toByteArray(event); - data = new byte[header.length + body.length]; - System.arraycopy(header, 0, data, 0, header.length); - System.arraycopy(body, 0, data, header.length, body.length); - } else { - data = layout.toByteArray(event); - } - } else { - data = StringEncoder.toBytes(event.getMessage().getFormattedMessage(), StandardCharsets.UTF_8); - } - manager.send(data); - } catch (final Exception e) { - LOGGER.error("Unable to write to Kafka [{}] for appender [{}].", manager.getName(), getName(), e); - throw new AppenderLoggingException("Unable to write to Kafka in appender: " + e.getMessage(), e); - } - } - } - - @Override - public void start() { - super.start(); - manager.startup(); - } - - @Override - public boolean stop(final long timeout, final TimeUnit timeUnit) { - setStopping(); - super.stop(timeout, timeUnit, false); - manager.stop(timeout, timeUnit); - setStopped(); - return true; - } - - @Override - public String toString() { - return "KafkaAppender{" + - "name=" + getName() + - ", state=" + getState() + - ", topic=" + manager.getTopic() + - '}'; - } -} +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache license, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the license for the specific language governing permissions and + * limitations under the license. + */ + +package org.apache.logging.log4j.core.appender.mom.kafka; + +import java.io.Serializable; +import java.nio.charset.StandardCharsets; +import java.util.concurrent.TimeUnit; + +import org.apache.logging.log4j.core.Appender; +import org.apache.logging.log4j.core.Filter; +import org.apache.logging.log4j.core.Layout; +import org.apache.logging.log4j.core.LogEvent; +import org.apache.logging.log4j.core.appender.AbstractAppender; +import org.apache.logging.log4j.core.appender.AppenderLoggingException; +import org.apache.logging.log4j.core.config.Configuration; +import org.apache.logging.log4j.core.config.Node; +import org.apache.logging.log4j.core.config.Property; +import org.apache.logging.log4j.core.config.plugins.Plugin; +import org.apache.logging.log4j.core.config.plugins.PluginAttribute; +import org.apache.logging.log4j.core.config.plugins.PluginConfiguration; +import org.apache.logging.log4j.core.config.plugins.PluginElement; +import org.apache.logging.log4j.core.config.plugins.PluginFactory; +import org.apache.logging.log4j.core.config.plugins.validation.constraints.Required; +import org.apache.logging.log4j.core.layout.SerializedLayout; +import org.apache.logging.log4j.core.util.StringEncoder; + +/** + * Sends log events to an Apache Kafka topic. + */ +@Plugin(name = "Kafka", category = Node.CATEGORY, elementType = Appender.ELEMENT_TYPE, printObject = true) +public final class KafkaAppender extends AbstractAppender { + + @PluginFactory + public static KafkaAppender createAppender( + @PluginElement("Layout") final Layout<? extends Serializable> layout, + @PluginElement("Filter") final Filter filter, + @Required(message = "No name provided for KafkaAppender") @PluginAttribute("name") final String name, + @PluginAttribute(value = "ignoreExceptions", defaultBoolean = true) final boolean ignoreExceptions, + @Required(message = "No topic provided for KafkaAppender") @PluginAttribute("topic") final String topic, + @PluginElement("Properties") final Property[] properties, + @PluginConfiguration final Configuration configuration) { + final KafkaManager kafkaManager = new KafkaManager(configuration.getLoggerContext(), name, topic, properties); + return new KafkaAppender(name, layout, filter, ignoreExceptions, kafkaManager); + } + + private final KafkaManager manager; + + private KafkaAppender(final String name, final Layout<? extends Serializable> layout, final Filter filter, final boolean ignoreExceptions, final KafkaManager manager) { + super(name, filter, layout, ignoreExceptions); + this.manager = manager; + } + + @Override + public void append(final LogEvent event) { + if (event.getLoggerName().startsWith("org.apache.kafka")) { + LOGGER.warn("Recursive logging from [{}] for appender [{}].", event.getLoggerName(), getName()); + } else { + try { + final Layout<? extends Serializable> layout = getLayout(); + byte[] data; + if (layout != null) { + if (layout instanceof SerializedLayout) { + final byte[] header = layout.getHeader(); + final byte[] body = layout.toByteArray(event); + data = new byte[header.length + body.length]; + System.arraycopy(header, 0, data, 0, header.length); + System.arraycopy(body, 0, data, header.length, body.length); + } else { + data = layout.toByteArray(event); + } + } else { + data = StringEncoder.toBytes(event.getMessage().getFormattedMessage(), StandardCharsets.UTF_8); + } + manager.send(data); + } catch (final Exception e) { + LOGGER.error("Unable to write to Kafka [{}] for appender [{}].", manager.getName(), getName(), e); + throw new AppenderLoggingException("Unable to write to Kafka in appender: " + e.getMessage(), e); + } + } + } + + @Override + public void start() { + super.start(); + manager.startup(); + } + + @Override + public boolean stop(final long timeout, final TimeUnit timeUnit) { + setStopping(); + boolean stopped = super.stop(timeout, timeUnit, false); + stopped &= manager.stop(timeout, timeUnit); + setStopped(); + return stopped; + } + + @Override + public String toString() { + return "KafkaAppender{" + + "name=" + getName() + + ", state=" + getState() + + ", topic=" + manager.getTopic() + + '}'; + } +} http://git-wip-us.apache.org/repos/asf/logging-log4j2/blob/78fe32c0/log4j-core/src/main/java/org/apache/logging/log4j/core/appender/mom/kafka/KafkaManager.java ---------------------------------------------------------------------- diff --git a/log4j-core/src/main/java/org/apache/logging/log4j/core/appender/mom/kafka/KafkaManager.java b/log4j-core/src/main/java/org/apache/logging/log4j/core/appender/mom/kafka/KafkaManager.java index d991d7e..c5347a2 100644 --- a/log4j-core/src/main/java/org/apache/logging/log4j/core/appender/mom/kafka/KafkaManager.java +++ b/log4j-core/src/main/java/org/apache/logging/log4j/core/appender/mom/kafka/KafkaManager.java @@ -1,79 +1,80 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache license, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the license for the specific language governing permissions and - * limitations under the license. - */ - -package org.apache.logging.log4j.core.appender.mom.kafka; - -import java.util.Properties; -import java.util.concurrent.ExecutionException; -import java.util.concurrent.TimeUnit; -import java.util.concurrent.TimeoutException; - -import org.apache.kafka.clients.producer.Producer; -import org.apache.kafka.clients.producer.ProducerRecord; -import org.apache.logging.log4j.core.LoggerContext; -import org.apache.logging.log4j.core.appender.AbstractManager; -import org.apache.logging.log4j.core.config.Property; - -public class KafkaManager extends AbstractManager { - - public static final String DEFAULT_TIMEOUT_MILLIS = "30000"; - - /** - * package-private access for testing. - */ - static KafkaProducerFactory producerFactory = new DefaultKafkaProducerFactory(); - - private final Properties config = new Properties(); - private Producer<byte[], byte[]> producer; - private final int timeoutMillis; - - private final String topic; - - public KafkaManager(final LoggerContext loggerContext, final String name, final String topic, final Property[] properties) { - super(loggerContext, name); - this.topic = topic; - config.setProperty("key.serializer", "org.apache.kafka.common.serialization.ByteArraySerializer"); - config.setProperty("value.serializer", "org.apache.kafka.common.serialization.ByteArraySerializer"); - config.setProperty("batch.size", "0"); - for (final Property property : properties) { - config.setProperty(property.getName(), property.getValue()); - } - this.timeoutMillis = Integer.parseInt(config.getProperty("timeout.ms", DEFAULT_TIMEOUT_MILLIS)); - } - - @Override - public void releaseSub(final long timeout, final TimeUnit timeUnit) { - if (producer != null) { - producer.close(timeout, timeUnit); - } - } - - public void send(final byte[] msg) throws ExecutionException, InterruptedException, TimeoutException { - if (producer != null) { - producer.send(new ProducerRecord<byte[], byte[]>(topic, msg)).get(timeoutMillis, TimeUnit.MILLISECONDS); - } - } - - public void startup() { - producer = producerFactory.newKafkaProducer(config); - } - - public String getTopic() { - return topic; - } - -} +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache license, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the license for the specific language governing permissions and + * limitations under the license. + */ + +package org.apache.logging.log4j.core.appender.mom.kafka; + +import java.util.Properties; +import java.util.concurrent.ExecutionException; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.TimeoutException; + +import org.apache.kafka.clients.producer.Producer; +import org.apache.kafka.clients.producer.ProducerRecord; +import org.apache.logging.log4j.core.LoggerContext; +import org.apache.logging.log4j.core.appender.AbstractManager; +import org.apache.logging.log4j.core.config.Property; + +public class KafkaManager extends AbstractManager { + + public static final String DEFAULT_TIMEOUT_MILLIS = "30000"; + + /** + * package-private access for testing. + */ + static KafkaProducerFactory producerFactory = new DefaultKafkaProducerFactory(); + + private final Properties config = new Properties(); + private Producer<byte[], byte[]> producer; + private final int timeoutMillis; + + private final String topic; + + public KafkaManager(final LoggerContext loggerContext, final String name, final String topic, final Property[] properties) { + super(loggerContext, name); + this.topic = topic; + config.setProperty("key.serializer", "org.apache.kafka.common.serialization.ByteArraySerializer"); + config.setProperty("value.serializer", "org.apache.kafka.common.serialization.ByteArraySerializer"); + config.setProperty("batch.size", "0"); + for (final Property property : properties) { + config.setProperty(property.getName(), property.getValue()); + } + this.timeoutMillis = Integer.parseInt(config.getProperty("timeout.ms", DEFAULT_TIMEOUT_MILLIS)); + } + + @Override + public boolean releaseSub(final long timeout, final TimeUnit timeUnit) { + if (producer != null) { + producer.close(timeout, timeUnit); + } + return true; + } + + public void send(final byte[] msg) throws ExecutionException, InterruptedException, TimeoutException { + if (producer != null) { + producer.send(new ProducerRecord<byte[], byte[]>(topic, msg)).get(timeoutMillis, TimeUnit.MILLISECONDS); + } + } + + public void startup() { + producer = producerFactory.newKafkaProducer(config); + } + + public String getTopic() { + return topic; + } + +}