This is an automated email from the ASF dual-hosted git repository. sijie pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/incubator-pulsar.git
The following commit(s) were added to refs/heads/master by this push: new 973fa3a Introduce a pulsar log4j2 appender (#1316) 973fa3a is described below commit 973fa3ac35394e73ca7a950afac4f908633ff23c Author: Sijie Guo <guosi...@gmail.com> AuthorDate: Fri Mar 2 13:07:21 2018 -0800 Introduce a pulsar log4j2 appender (#1316) ### Motivation when developing `pulsar-functions`, it would be good to log the log messages generated by functions to pulsar, so the user can easily tail the log messages topic to get all the messages per functions. ### Modifications This PR is to add a pulsar based appender. ### Result We are able to use pulsar log4j2 appender to log messages. --- pom.xml | 25 +++ pulsar-log4j2-appender/pom.xml | 74 +++++++ .../pulsar/log4j2/appender/PulsarAppender.java | 211 ++++++++++++++++++++ .../pulsar/log4j2/appender/PulsarManager.java | 145 ++++++++++++++ .../pulsar/log4j2/appender/PulsarAppenderTest.java | 218 +++++++++++++++++++++ .../builder/ConfigurationAssemblerTest.java | 115 +++++++++++ .../appender/builder/ConfigurationBuilderTest.java | 120 ++++++++++++ .../builder/CustomConfigurationFactory.java | 88 +++++++++ .../src/test/resources/PulsarAppenderTest.xml | 49 +++++ 9 files changed, 1045 insertions(+) diff --git a/pom.xml b/pom.xml index ada1a71..c8131e6 100644 --- a/pom.xml +++ b/pom.xml @@ -99,6 +99,7 @@ flexible messaging model and an intuitive client API.</description> <module>all</module> <module>docker</module> <module>tests</module> + <module>pulsar-log4j2-appender</module> </modules> <issueManagement> @@ -126,6 +127,9 @@ flexible messaging model and an intuitive client API.</description> <jackson.version>2.8.4</jackson.version> <puppycrawl.checkstyle.version>6.19</puppycrawl.checkstyle.version> <dockerfile-maven.version>1.3.7</dockerfile-maven.version> + + <!-- test dependencies --> + <disruptor.version>3.4.0</disruptor.version> </properties> <dependencyManagement> @@ -363,6 +367,19 @@ flexible messaging model and an intuitive client API.</description> </dependency> <dependency> + <groupId>org.apache.logging.log4j</groupId> + <artifactId>log4j-api</artifactId> + <type>test-jar</type> + <version>${log4j2.version}</version> + </dependency> + <dependency> + <groupId>org.apache.logging.log4j</groupId> + <artifactId>log4j-core</artifactId> + <type>test-jar</type> + <version>${log4j2.version}</version> + </dependency> + + <dependency> <groupId>commons-io</groupId> <artifactId>commons-io</artifactId> <version>2.5</version> @@ -599,6 +616,14 @@ flexible messaging model and an intuitive client API.</description> <artifactId>bcpkix-jdk15on</artifactId> <version>${bouncycastle.version}</version> </dependency> + + <!-- test dependencies --> + <dependency> + <groupId>com.lmax</groupId> + <artifactId>disruptor</artifactId> + <version>${disruptor.version}</version> + </dependency> + </dependencies> </dependencyManagement> diff --git a/pulsar-log4j2-appender/pom.xml b/pulsar-log4j2-appender/pom.xml new file mode 100644 index 0000000..9352457 --- /dev/null +++ b/pulsar-log4j2-appender/pom.xml @@ -0,0 +1,74 @@ +<!-- + + Licensed to the Apache Software Foundation (ASF) under one + or more contributor license agreements. See the NOTICE file + distributed with this work for additional information + regarding copyright ownership. The ASF licenses this file + to you under the Apache License, Version 2.0 (the + "License"); you may not use this file except in compliance + with the License. You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + + Unless required by applicable law or agreed to in writing, + software distributed under the License is distributed on an + "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + KIND, either express or implied. See the License for the + specific language governing permissions and limitations + under the License. + +--> +<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" + xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd"> + <modelVersion>4.0.0</modelVersion> + <parent> + <groupId>org.apache.pulsar</groupId> + <artifactId>pulsar</artifactId> + <version>2.0.0-incubating-SNAPSHOT</version> + <relativePath>..</relativePath> + </parent> + + <artifactId>pulsar-log4j2-appender</artifactId> + <name>Pulsar Log4j2 Appender</name> + <description>Pulsar Log4j2 Appender</description> + + <dependencies> + <dependency> + <groupId>${project.groupId}</groupId> + <artifactId>pulsar-client</artifactId> + <version>${project.version}</version> + </dependency> + <dependency> + <groupId>org.apache.logging.log4j</groupId> + <artifactId>log4j-api</artifactId> + </dependency> + <dependency> + <groupId>org.apache.logging.log4j</groupId> + <artifactId>log4j-core</artifactId> + </dependency> + <dependency> + <groupId>org.apache.logging.log4j</groupId> + <artifactId>log4j-slf4j-impl</artifactId> + </dependency> + + <dependency> + <groupId>org.apache.logging.log4j</groupId> + <artifactId>log4j-api</artifactId> + <type>test-jar</type> + <scope>test</scope> + </dependency> + <dependency> + <groupId>org.apache.logging.log4j</groupId> + <artifactId>log4j-core</artifactId> + <type>test-jar</type> + <scope>test</scope> + </dependency> + <!-- Required for AsyncLoggers --> + <dependency> + <groupId>com.lmax</groupId> + <artifactId>disruptor</artifactId> + <scope>test</scope> + </dependency> + </dependencies> + +</project> diff --git a/pulsar-log4j2-appender/src/main/java/org/apache/pulsar/log4j2/appender/PulsarAppender.java b/pulsar-log4j2-appender/src/main/java/org/apache/pulsar/log4j2/appender/PulsarAppender.java new file mode 100644 index 0000000..68bfbd5 --- /dev/null +++ b/pulsar-log4j2-appender/src/main/java/org/apache/pulsar/log4j2/appender/PulsarAppender.java @@ -0,0 +1,211 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.pulsar.log4j2.appender; + +import java.io.Serializable; +import java.util.Objects; +import java.util.concurrent.TimeUnit; +import org.apache.logging.log4j.core.AbstractLifeCycle; +import org.apache.logging.log4j.core.Appender; +import org.apache.logging.log4j.core.Filter; +import org.apache.logging.log4j.core.Layout; +import org.apache.logging.log4j.core.LogEvent; +import org.apache.logging.log4j.core.appender.AbstractAppender; +import org.apache.logging.log4j.core.config.Node; +import org.apache.logging.log4j.core.config.Property; +import org.apache.logging.log4j.core.config.plugins.Plugin; +import org.apache.logging.log4j.core.config.plugins.PluginAttribute; +import org.apache.logging.log4j.core.config.plugins.PluginBuilderFactory; +import org.apache.logging.log4j.core.config.plugins.PluginElement; +import org.apache.logging.log4j.core.layout.SerializedLayout; + +/** + * The PulsarAppender logs events to an Apache Pulsar topic. + * + * <p>Each log event is sent as a Pulsar record. + */ +@Plugin( + name = "Pulsar", + category = Node.CATEGORY, + elementType = Appender.ELEMENT_TYPE, + printObject = true) +public final class PulsarAppender extends AbstractAppender { + + /** + * Builds PulsarAppender instances. + * @param <B> The type to build + */ + public static class Builder<B extends Builder<B>> extends AbstractAppender.Builder<B> + implements org.apache.logging.log4j.core.util.Builder<PulsarAppender> { + + @PluginAttribute("key") + private String key; + + @PluginAttribute("topic") + private String topic; + + @PluginAttribute("serviceUrl") + private String serviceUrl; + + @PluginAttribute(value = "avoidRecursive", defaultBoolean = true) + private boolean avoidRecursive; + + @PluginAttribute(value = "syncSend", defaultBoolean = false) + private boolean syncSend; + + @PluginElement("Properties") + private Property[] properties; + + @SuppressWarnings("resource") + @Override + public PulsarAppender build() { + final Layout<? extends Serializable> layout = getLayout(); + if (layout == null) { + AbstractLifeCycle.LOGGER.error("No layout provided for PulsarAppender"); + return null; + } + PulsarManager manager = new PulsarManager( + getConfiguration().getLoggerContext(), + getName(), + serviceUrl, + topic, + syncSend, + properties, + key); + return new PulsarAppender( + getName(), + layout, + getFilter(), + isIgnoreExceptions(), + avoidRecursive, + manager); + } + + public String getTopic() { + return topic; + } + + public boolean isSyncSend() { + return syncSend; + } + + public Property[] getProperties() { + return properties; + } + + public B setTopic(final String topic) { + this.topic = topic; + return asBuilder(); + } + + public B setSyncSend(final boolean syncSend) { + this.syncSend = syncSend; + return asBuilder(); + } + + public B setProperties(final Property[] properties) { + this.properties = properties; + return asBuilder(); + } + } + + /** + * Creates a builder for a PulsarAppender. + * @return a builder for a PulsarAppender. + */ + @PluginBuilderFactory + public static <B extends Builder<B>> B newBuilder() { + return new Builder<B>().asBuilder(); + } + + private final boolean avoidRecursive; + private final PulsarManager manager; + + private PulsarAppender( + final String name, + final Layout<? extends Serializable> layout, + final Filter filter, + final boolean ignoreExceptions, + final boolean avoidRecursive, + final PulsarManager manager) { + super(name, filter, layout, ignoreExceptions); + this.avoidRecursive = avoidRecursive; + this.manager = Objects.requireNonNull(manager, "manager"); + } + + @Override + public void append(final LogEvent event) { + if (avoidRecursive + && event.getLoggerName() != null + && event.getLoggerName().startsWith("org.apache.pulsar")) { + LOGGER.warn("Recursive logging from [{}] for appender [{}].", event.getLoggerName(), getName()); + } else { + try { + tryAppend(event); + } catch (final Exception e) { + error("Unable to write to Pulsar in appender [" + getName() + "]", event, e); + } + } + } + + private void tryAppend(final LogEvent event) { + final Layout<? extends Serializable> layout = getLayout(); + byte[] data; + if (layout instanceof SerializedLayout) { + final byte[] header = layout.getHeader(); + final byte[] body = layout.toByteArray(event); + data = new byte[header.length + body.length]; + System.arraycopy(header, 0, data, 0, header.length); + System.arraycopy(body, 0, data, header.length, body.length); + } else { + data = layout.toByteArray(event); + } + manager.send(data); + } + + @Override + public void start() { + super.start(); + try { + manager.startup(); + } catch (Exception e) { + // fail to start the manager + } + } + + @Override + public boolean stop(final long timeout, final TimeUnit timeUnit) { + setStopping(); + boolean stopped = super.stop(timeout, timeUnit, false); + stopped &= manager.stop(timeout, timeUnit); + setStopped(); + return stopped; + } + + @Override + public String toString() { + return "PulsarAppender{" + + "name=" + getName() + + ", state=" + getState() + + ", serviceUrl=" + manager.getServiceUrl() + + ", topic=" + manager.getTopic() + + '}'; + } + +} diff --git a/pulsar-log4j2-appender/src/main/java/org/apache/pulsar/log4j2/appender/PulsarManager.java b/pulsar-log4j2-appender/src/main/java/org/apache/pulsar/log4j2/appender/PulsarManager.java new file mode 100644 index 0000000..c52faa6 --- /dev/null +++ b/pulsar-log4j2-appender/src/main/java/org/apache/pulsar/log4j2/appender/PulsarManager.java @@ -0,0 +1,145 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.pulsar.log4j2.appender; + +import java.util.Objects; +import java.util.concurrent.TimeUnit; +import java.util.function.BiFunction; +import java.util.function.Supplier; +import org.apache.logging.log4j.core.LoggerContext; +import org.apache.logging.log4j.core.appender.AbstractManager; +import org.apache.logging.log4j.core.config.Property; +import org.apache.pulsar.client.api.ClientBuilder; +import org.apache.pulsar.client.api.Message; +import org.apache.pulsar.client.api.MessageBuilder; +import org.apache.pulsar.client.api.Producer; +import org.apache.pulsar.client.api.ProducerBuilder; +import org.apache.pulsar.client.api.PulsarClient; +import org.apache.pulsar.client.api.PulsarClientException; + +public class PulsarManager extends AbstractManager { + + static Supplier<ClientBuilder> PULSAR_CLIENT_BUILDER = () -> PulsarClient.builder(); + + static BiFunction<String, byte[], Message<byte[]>> MESSAGE_BUILDER = (key, data) -> { + MessageBuilder<byte[]> messageBuilder = MessageBuilder.create() + .setContent(data); + if (null != key) { + messageBuilder = messageBuilder.setKey(key); + } + return messageBuilder.build(); + }; + + private PulsarClient client; + private Producer<byte[]> producer; + + private final String serviceUrl; + private final String topic; + private final String key; + private final boolean syncSend; + + public PulsarManager(final LoggerContext loggerContext, + final String name, + final String serviceUrl, + final String topic, + final boolean syncSend, + final Property[] properties, + final String key) { + super(loggerContext, name); + this.serviceUrl = Objects.requireNonNull(serviceUrl, "serviceUrl"); + this.topic = Objects.requireNonNull(topic, "topic"); + this.syncSend = syncSend; + this.key = key; + } + + @Override + public boolean releaseSub(final long timeout, final TimeUnit timeUnit) { + if (producer != null) { + try { + producer.closeAsync().get(timeout, timeUnit); + } catch (Exception e) { + // exceptions on closing + LOGGER.warn("Failed to close producer within {} milliseconds", + timeUnit.toMillis(timeout), e); + } + } + return true; + } + + public void send(final byte[] msg) { + if (producer != null) { + String newKey = null; + + if(key != null && key.contains("${")) { + newKey = getLoggerContext().getConfiguration().getStrSubstitutor().replace(key); + } else if (key != null) { + newKey = key; + } + + Message<byte[]> message = MESSAGE_BUILDER.apply(newKey, msg); + if (syncSend) { + try { + producer.send(message); + } catch (PulsarClientException e) { + LOGGER.error("Unable to write to Pulsar in appender [" + getName() + "]", e); + } + } else { + producer.sendAsync(message) + .exceptionally(cause -> { + LOGGER.error("Unable to write to Pulsar in appender [" + getName() + "]", cause); + return null; + }); + } + } + } + + public void startup() throws Exception { + try { + client = PULSAR_CLIENT_BUILDER.get() + .serviceUrl(serviceUrl) + .build(); + ProducerBuilder<byte[]> producerBuilder = client.newProducer() + .topic(topic) + .producerName("pulsar-log4j2-appender-" + topic) + .blockIfQueueFull(false); + if (syncSend) { + // disable batching for sync send + producerBuilder = producerBuilder.enableBatching(false); + } else { + // enable batching in 10 ms for async send + producerBuilder = producerBuilder + .enableBatching(true) + .batchingMaxPublishDelay(10, TimeUnit.MILLISECONDS); + } + producer = producerBuilder.create(); + } catch (Exception t) { + LOGGER.error("Failed to start pulsar manager {}", t); + throw t; + } + } + + public String getServiceUrl() { + return serviceUrl; + } + + public String getTopic() { + return topic; + } + +} diff --git a/pulsar-log4j2-appender/src/test/java/org/apache/pulsar/log4j2/appender/PulsarAppenderTest.java b/pulsar-log4j2-appender/src/test/java/org/apache/pulsar/log4j2/appender/PulsarAppenderTest.java new file mode 100644 index 0000000..4431243 --- /dev/null +++ b/pulsar-log4j2-appender/src/test/java/org/apache/pulsar/log4j2/appender/PulsarAppenderTest.java @@ -0,0 +1,218 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.pulsar.log4j2.appender; + +import static org.mockito.Matchers.any; +import static org.mockito.Matchers.anyBoolean; +import static org.mockito.Matchers.anyInt; +import static org.mockito.Matchers.anyString; +import static org.mockito.Mockito.doReturn; +import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.when; +import static org.testng.AssertJUnit.assertEquals; +import static org.testng.AssertJUnit.assertNotNull; +import static org.testng.AssertJUnit.assertNull; + +import java.io.ByteArrayInputStream; +import java.io.IOException; +import java.io.ObjectInput; +import java.io.ObjectInputStream; +import java.nio.charset.StandardCharsets; +import java.text.SimpleDateFormat; +import java.util.Date; +import java.util.LinkedList; +import java.util.List; +import java.util.concurrent.CompletableFuture; +import java.util.concurrent.TimeUnit; +import org.apache.logging.log4j.Level; +import org.apache.logging.log4j.core.Appender; +import org.apache.logging.log4j.core.LogEvent; +import org.apache.logging.log4j.core.LoggerContext; +import org.apache.logging.log4j.core.config.Configurator; +import org.apache.logging.log4j.core.impl.Log4jLogEvent; +import org.apache.logging.log4j.message.SimpleMessage; +import org.apache.pulsar.client.api.Message; +import org.apache.pulsar.client.api.MessageId; +import org.apache.pulsar.client.api.Producer; +import org.apache.pulsar.client.api.ProducerBuilder; +import org.apache.pulsar.client.api.PulsarClient; +import org.apache.pulsar.client.impl.ClientBuilderImpl; +import org.testng.annotations.BeforeMethod; +import org.testng.annotations.Test; + +public class PulsarAppenderTest { + + private static final String LOG_MESSAGE = "Hello, world!"; + private static final String TOPIC_NAME = "pulsar-topic"; + + private static Log4jLogEvent createLogEvent() { + return Log4jLogEvent.newBuilder() + .setLoggerName(PulsarAppenderTest.class.getName()) + .setLoggerFqcn(PulsarAppenderTest.class.getName()) + .setLevel(Level.INFO) + .setMessage(new SimpleMessage(LOG_MESSAGE)) + .build(); + } + + private ClientBuilderImpl clientBuilder; + private PulsarClient client; + private Producer<byte[]> producer; + private List<Message<byte[]>> history; + + private LoggerContext ctx; + + @BeforeMethod + public void setUp() throws Exception { + history = new LinkedList<>(); + + client = mock(PulsarClient.class); + producer = mock(Producer.class); + clientBuilder = mock(ClientBuilderImpl.class); + + doReturn(client).when(clientBuilder).build(); + doReturn(clientBuilder).when(clientBuilder).serviceUrl(anyString()); + + ProducerBuilder<byte[]> producerBuilder = mock(ProducerBuilder.class); + when(client.newProducer()).thenReturn(producerBuilder); + doReturn(producerBuilder).when(producerBuilder).topic(anyString()); + doReturn(producerBuilder).when(producerBuilder).producerName(anyString()); + doReturn(producerBuilder).when(producerBuilder).enableBatching(anyBoolean()); + doReturn(producerBuilder).when(producerBuilder).batchingMaxPublishDelay(anyInt(), any(TimeUnit.class)); + doReturn(producerBuilder).when(producerBuilder).blockIfQueueFull(anyBoolean()); + doReturn(producer).when(producerBuilder).create(); + + when(producer.send(any(Message.class))) + .thenAnswer(invocationOnMock -> { + Message<byte[]> msg = invocationOnMock.getArgumentAt(0, Message.class); + synchronized (history) { + history.add(msg); + } + return null; + }); + + when(producer.sendAsync(any(Message.class))) + .thenAnswer(invocationOnMock -> { + Message<byte[]> msg = invocationOnMock.getArgumentAt(0, Message.class); + synchronized (history) { + history.add(msg); + } + CompletableFuture<MessageId> future = new CompletableFuture<>(); + future.complete(mock(MessageId.class)); + return future; + }); + + PulsarManager.PULSAR_CLIENT_BUILDER = () -> clientBuilder; + PulsarManager.MESSAGE_BUILDER = (key, data) -> { + Message<byte[]> msg = mock(Message.class); + when(msg.getKey()).thenReturn(key); + when(msg.getData()).thenReturn(data); + return msg; + }; + + ctx = Configurator.initialize( + "PulsarAppenderTest", + getClass().getClassLoader(), + getClass().getClassLoader().getResource("PulsarAppenderTest.xml").toURI()); + } + + @Test + public void testAppendWithLayout() throws Exception { + final Appender appender = ctx.getConfiguration().getAppender("PulsarAppenderWithLayout"); + appender.append(createLogEvent()); + final Message<byte[]> item; + synchronized (history) { + assertEquals(1, history.size()); + item = history.get(0); + } + assertNotNull(item); + assertNull(item.getKey()); + assertEquals("[" + LOG_MESSAGE + "]", new String(item.getData(), StandardCharsets.UTF_8)); + } + + @Test + public void testAppendWithSerializedLayout() throws Exception { + final Appender appender = ctx.getConfiguration().getAppender("PulsarAppenderWithSerializedLayout"); + final LogEvent logEvent = createLogEvent(); + appender.append(logEvent); + final Message<byte[]> item; + synchronized (history) { + assertEquals(1, history.size()); + item = history.get(0); + } + assertNotNull(item); + assertNull(item.getKey()); + assertEquals(LOG_MESSAGE, deserializeLogEvent(item.getData()).getMessage().getFormattedMessage()); + } + + @Test + public void testAsyncAppend() throws Exception { + final Appender appender = ctx.getConfiguration().getAppender("AsyncPulsarAppender"); + appender.append(createLogEvent()); + final Message<byte[]> item; + synchronized (history) { + assertEquals(1, history.size()); + item = history.get(0); + } + assertNotNull(item); + assertNull(item.getKey()); + assertEquals(LOG_MESSAGE, new String(item.getData(), StandardCharsets.UTF_8)); + } + + @Test + public void testAppendWithKey() throws Exception { + final Appender appender = ctx.getConfiguration().getAppender("PulsarAppenderWithKey"); + final LogEvent logEvent = createLogEvent(); + appender.append(logEvent); + Message<byte[]> item; + synchronized (history) { + assertEquals(1, history.size()); + item = history.get(0); + } + assertNotNull(item); + String msgKey = item.getKey(); + assertEquals(msgKey, "key"); + assertEquals(LOG_MESSAGE, new String(item.getData(), StandardCharsets.UTF_8)); + } + + @Test + public void testAppendWithKeyLookup() throws Exception { + final Appender appender = ctx.getConfiguration().getAppender("PulsarAppenderWithKeyLookup"); + final LogEvent logEvent = createLogEvent(); + Date date = new Date(); + SimpleDateFormat format = new SimpleDateFormat("dd-MM-yyyy"); + appender.append(logEvent); + Message<byte[]> item; + synchronized (history) { + assertEquals(1, history.size()); + item = history.get(0); + } + assertNotNull(item); + String keyValue = format.format(date); + assertEquals(item.getKey(), keyValue); + assertEquals(LOG_MESSAGE, new String(item.getData(), StandardCharsets.UTF_8)); + } + + private LogEvent deserializeLogEvent(final byte[] data) throws IOException, ClassNotFoundException { + final ByteArrayInputStream bis = new ByteArrayInputStream(data); + try (ObjectInput ois = new ObjectInputStream(bis)) { + return (LogEvent) ois.readObject(); + } + } + +} \ No newline at end of file diff --git a/pulsar-log4j2-appender/src/test/java/org/apache/pulsar/log4j2/appender/builder/ConfigurationAssemblerTest.java b/pulsar-log4j2-appender/src/test/java/org/apache/pulsar/log4j2/appender/builder/ConfigurationAssemblerTest.java new file mode 100644 index 0000000..a234741 --- /dev/null +++ b/pulsar-log4j2-appender/src/test/java/org/apache/pulsar/log4j2/appender/builder/ConfigurationAssemblerTest.java @@ -0,0 +1,115 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.pulsar.log4j2.appender.builder; + +import static org.testng.Assert.assertFalse; +import static org.testng.Assert.assertNotNull; +import static org.testng.AssertJUnit.assertEquals; +import static org.testng.AssertJUnit.assertTrue; + +import java.util.List; +import java.util.Map; +import org.apache.logging.log4j.Level; +import org.apache.logging.log4j.LogManager; +import org.apache.logging.log4j.Logger; +import org.apache.logging.log4j.core.Appender; +import org.apache.logging.log4j.core.Filter; +import org.apache.logging.log4j.core.LifeCycle; +import org.apache.logging.log4j.core.LoggerContext; +import org.apache.logging.log4j.core.config.Configuration; +import org.apache.logging.log4j.core.config.ConfigurationFactory; +import org.apache.logging.log4j.core.config.Configurator; +import org.apache.logging.log4j.core.config.CustomLevelConfig; +import org.apache.logging.log4j.core.config.LoggerConfig; +import org.apache.logging.log4j.core.config.builder.api.ConfigurationBuilder; +import org.apache.logging.log4j.core.config.builder.api.ConfigurationBuilderFactory; +import org.apache.logging.log4j.core.config.builder.impl.BuiltConfiguration; +import org.apache.logging.log4j.core.filter.ThresholdFilter; +import org.apache.logging.log4j.core.layout.GelfLayout; +import org.apache.logging.log4j.core.util.Constants; +import org.apache.pulsar.log4j2.appender.PulsarAppender; +import org.testng.annotations.Test; + +public class ConfigurationAssemblerTest { + + @Test + public void testBuildConfiguration() throws Exception { + try { + System.setProperty(Constants.LOG4J_CONTEXT_SELECTOR, + "org.apache.logging.log4j.core.async.AsyncLoggerContextSelector"); + final ConfigurationBuilder<BuiltConfiguration> builder = ConfigurationBuilderFactory + .newConfigurationBuilder(); + CustomConfigurationFactory.addTestFixtures("config name", builder); + final Configuration configuration = builder.build(); + try (LoggerContext ctx = Configurator.initialize(configuration)) { + validate(configuration); + } + } finally { + System.getProperties().remove(Constants.LOG4J_CONTEXT_SELECTOR); + } + } + + @Test + public void testCustomConfigurationFactory() throws Exception { + try { + System.setProperty(ConfigurationFactory.CONFIGURATION_FACTORY_PROPERTY, + "org.apache.pulsar.log4j2.appender.builder.CustomConfigurationFactory"); + System.setProperty(Constants.LOG4J_CONTEXT_SELECTOR, + "org.apache.logging.log4j.core.async.AsyncLoggerContextSelector"); + final Configuration config = ((LoggerContext) LogManager.getContext(false)).getConfiguration(); + validate(config); + } finally { + System.getProperties().remove(Constants.LOG4J_CONTEXT_SELECTOR); + System.getProperties().remove(ConfigurationFactory.CONFIGURATION_FACTORY_PROPERTY); + } + } + + private void validate(final Configuration config) { + assertNotNull(config); + assertNotNull(config.getName()); + assertFalse(config.getName().isEmpty()); + assertNotNull(config, "No configuration created"); + assertEquals("Incorrect State: " + config.getState(), config.getState(), LifeCycle.State.STARTED); + final Map<String, Appender> appenders = config.getAppenders(); + assertNotNull(appenders); + assertTrue("Incorrect number of Appenders: " + appenders.size(), appenders.size() == 2); + final PulsarAppender pulsarAppender = (PulsarAppender) appenders.get("Pulsar"); + final GelfLayout gelfLayout = (GelfLayout) pulsarAppender.getLayout(); + final Map<String, LoggerConfig> loggers = config.getLoggers(); + assertNotNull(loggers); + assertTrue("Incorrect number of LoggerConfigs: " + loggers.size(), loggers.size() == 2); + final LoggerConfig rootLoggerConfig = loggers.get(""); + assertEquals(Level.ERROR, rootLoggerConfig.getLevel()); + assertFalse(rootLoggerConfig.isIncludeLocation()); + final LoggerConfig loggerConfig = loggers.get("org.apache.logging.log4j"); + assertEquals(Level.DEBUG, loggerConfig.getLevel()); + assertTrue(loggerConfig.isIncludeLocation()); + final Filter filter = config.getFilter(); + assertNotNull(filter, "No Filter"); + assertTrue("Not a Threshold Filter", filter instanceof ThresholdFilter); + final List<CustomLevelConfig> customLevels = config.getCustomLevels(); + assertNotNull(filter, "No CustomLevels"); + assertEquals(1, customLevels.size()); + final CustomLevelConfig customLevel = customLevels.get(0); + assertEquals("Panic", customLevel.getLevelName()); + assertEquals(17, customLevel.getIntLevel()); + final Logger logger = LogManager.getLogger(getClass()); + logger.info("Welcome to Log4j!"); + } +} diff --git a/pulsar-log4j2-appender/src/test/java/org/apache/pulsar/log4j2/appender/builder/ConfigurationBuilderTest.java b/pulsar-log4j2-appender/src/test/java/org/apache/pulsar/log4j2/appender/builder/ConfigurationBuilderTest.java new file mode 100644 index 0000000..7de8a3f --- /dev/null +++ b/pulsar-log4j2-appender/src/test/java/org/apache/pulsar/log4j2/appender/builder/ConfigurationBuilderTest.java @@ -0,0 +1,120 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.pulsar.log4j2.appender.builder; + +import static org.testng.Assert.assertEquals; + +import java.util.concurrent.TimeUnit; +import org.apache.logging.log4j.Level; +import org.apache.logging.log4j.core.Filter; +import org.apache.logging.log4j.core.appender.ConsoleAppender; +import org.apache.logging.log4j.core.config.builder.api.AppenderComponentBuilder; +import org.apache.logging.log4j.core.config.builder.api.ConfigurationBuilder; +import org.apache.logging.log4j.core.config.builder.api.ConfigurationBuilderFactory; +import org.apache.logging.log4j.core.config.builder.impl.BuiltConfiguration; +import org.testng.annotations.Test; + +public class ConfigurationBuilderTest { + + private static final String INDENT = " "; + private static final String EOL = System.lineSeparator(); + + private void addTestFixtures(final String name, final ConfigurationBuilder<BuiltConfiguration> builder) { + builder.setConfigurationName(name); + builder.setStatusLevel(Level.ERROR); + builder.setShutdownTimeout(5000, TimeUnit.MILLISECONDS); + builder.add(builder.newScriptFile("target/test-classes/scripts/filter.groovy").addIsWatched(true)); + builder.add(builder.newFilter("ThresholdFilter", Filter.Result.ACCEPT, Filter.Result.NEUTRAL) + .addAttribute("level", Level.DEBUG)); + + final AppenderComponentBuilder appenderBuilder = builder.newAppender("Stdout", "CONSOLE").addAttribute("target", ConsoleAppender.Target.SYSTEM_OUT); + appenderBuilder.add(builder.newLayout("PatternLayout"). + addAttribute("pattern", "%d [%t] %-5level: %msg%n%throwable")); + appenderBuilder.add(builder.newFilter("MarkerFilter", Filter.Result.DENY, + Filter.Result.NEUTRAL).addAttribute("marker", "FLOW")); + builder.add(appenderBuilder); + + final AppenderComponentBuilder appenderBuilder2 = builder.newAppender("Pulsar", "Pulsar") + .addAttribute("serviceUrl", "pulsar://localhost:6650") + .addAttribute("topic", "my-topic"); + appenderBuilder2.add(builder.newLayout("GelfLayout"). + addAttribute("host", "my-host"). + addComponent(builder.newKeyValuePair("extraField", "extraValue"))); + builder.add(appenderBuilder2); + + builder.add(builder.newLogger("org.apache.logging.log4j", Level.DEBUG, true). + add(builder.newAppenderRef("Stdout")). + addAttribute("additivity", false)); + builder.add(builder.newLogger("org.apache.logging.log4j.core", Level.DEBUG). + add(builder.newAppenderRef("Stdout"))); + builder.add(builder.newRootLogger(Level.ERROR).add(builder.newAppenderRef("Stdout"))); + + builder.addProperty("MyKey", "MyValue"); + builder.add(builder.newCustomLevel("Panic", 17)); + builder.setPackages("foo,bar"); + } + + private final static String expectedXml = + "<?xml version=\"1.0\" ?>" + EOL + + "<Configuration name=\"config name\" status=\"ERROR\" packages=\"foo,bar\" shutdownTimeout=\"5000\">" + EOL + + INDENT + "<Properties>" + EOL + + INDENT + INDENT + "<Property name=\"MyKey\">MyValue</Property>" + EOL + + INDENT + "</Properties>" + EOL + + INDENT + "<Scripts>" + EOL + + INDENT + INDENT + "<ScriptFile name=\"target/test-classes/scripts/filter.groovy\" path=\"target/test-classes/scripts/filter.groovy\" isWatched=\"true\"/>" + EOL + + INDENT + "</Scripts>" + EOL + + INDENT + "<CustomLevels>" + EOL + + INDENT + INDENT + "<CustomLevel name=\"Panic\" intLevel=\"17\"/>" + EOL + + INDENT + "</CustomLevels>" + EOL + + INDENT + "<ThresholdFilter onMatch=\"ACCEPT\" onMisMatch=\"NEUTRAL\" level=\"DEBUG\"/>" + EOL + + INDENT + "<Appenders>" + EOL + + INDENT + INDENT + "<CONSOLE name=\"Stdout\" target=\"SYSTEM_OUT\">" + EOL + + INDENT + INDENT + INDENT + "<PatternLayout pattern=\"%d [%t] %-5level: %msg%n%throwable\"/>" + EOL + + INDENT + INDENT + INDENT + "<MarkerFilter onMatch=\"DENY\" onMisMatch=\"NEUTRAL\" marker=\"FLOW\"/>" + EOL + + INDENT + INDENT + "</CONSOLE>" + EOL + + INDENT + INDENT + "<Pulsar name=\"Pulsar\" serviceUrl=\"pulsar://localhost:6650\" topic=\"my-topic\">" + EOL + + INDENT + INDENT + INDENT + "<GelfLayout host=\"my-host\">" + EOL + + INDENT + INDENT + INDENT + INDENT + "<KeyValuePair key=\"extraField\" value=\"extraValue\"/>" + EOL + + INDENT + INDENT + INDENT + "</GelfLayout>" + EOL + + INDENT + INDENT + "</Pulsar>" + EOL + + INDENT + "</Appenders>" + EOL + + INDENT + "<Loggers>" + EOL + + INDENT + INDENT + "<Logger name=\"org.apache.logging.log4j\" level=\"DEBUG\" includeLocation=\"true\" additivity=\"false\">" + EOL + + INDENT + INDENT + INDENT + "<AppenderRef ref=\"Stdout\"/>" + EOL + + INDENT + INDENT + "</Logger>" + EOL + + INDENT + INDENT + "<Logger name=\"org.apache.logging.log4j.core\" level=\"DEBUG\">" + EOL + + INDENT + INDENT + INDENT + "<AppenderRef ref=\"Stdout\"/>" + EOL + + INDENT + INDENT + "</Logger>" + EOL + + INDENT + INDENT + "<Root level=\"ERROR\">" + EOL + + INDENT + INDENT + INDENT + "<AppenderRef ref=\"Stdout\"/>" + EOL + + INDENT + INDENT + "</Root>" + EOL + + INDENT + "</Loggers>" + EOL + + "</Configuration>" + EOL; + + // TODO make test run properly on Windows + @Test + public void testXmlConstructing() throws Exception { + //assumeTrue(System.lineSeparator().length() == 1); // Only run test on platforms with single character line endings (such as Linux), not on Windows + final ConfigurationBuilder<BuiltConfiguration> builder = ConfigurationBuilderFactory.newConfigurationBuilder(); + addTestFixtures("config name", builder); + final String xmlConfiguration = builder.toXmlConfiguration(); + assertEquals(expectedXml, xmlConfiguration); + } + +} diff --git a/pulsar-log4j2-appender/src/test/java/org/apache/pulsar/log4j2/appender/builder/CustomConfigurationFactory.java b/pulsar-log4j2-appender/src/test/java/org/apache/pulsar/log4j2/appender/builder/CustomConfigurationFactory.java new file mode 100644 index 0000000..98b26d0 --- /dev/null +++ b/pulsar-log4j2-appender/src/test/java/org/apache/pulsar/log4j2/appender/builder/CustomConfigurationFactory.java @@ -0,0 +1,88 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.pulsar.log4j2.appender.builder; + +import java.net.URI; +import org.apache.logging.log4j.Level; +import org.apache.logging.log4j.core.Filter; +import org.apache.logging.log4j.core.LoggerContext; +import org.apache.logging.log4j.core.appender.ConsoleAppender; +import org.apache.logging.log4j.core.config.Configuration; +import org.apache.logging.log4j.core.config.ConfigurationFactory; +import org.apache.logging.log4j.core.config.ConfigurationSource; +import org.apache.logging.log4j.core.config.builder.api.AppenderComponentBuilder; +import org.apache.logging.log4j.core.config.builder.api.ConfigurationBuilder; +import org.apache.logging.log4j.core.config.builder.impl.BuiltConfiguration; + +/** + * Normally this would be a plugin. However, we don't want it used for everything so it will be defined + * via a system property. + */ +//@Plugin(name = "CustomConfigurationFactory", category = ConfigurationFactory.CATEGORY) +//@Order(50) +public class CustomConfigurationFactory extends ConfigurationFactory { + + static Configuration addTestFixtures(final String name, final ConfigurationBuilder<BuiltConfiguration> builder) { + builder.setConfigurationName(name); + builder.setStatusLevel(Level.ERROR); + builder.add(builder.newScriptFile("target/test-classes/scripts/filter.groovy").addIsWatched(true)); + builder.add(builder.newFilter("ThresholdFilter", Filter.Result.ACCEPT, Filter.Result.NEUTRAL) + .addAttribute("level", Level.DEBUG)); + + final AppenderComponentBuilder appenderBuilder = builder.newAppender("Stdout", "CONSOLE").addAttribute("target", ConsoleAppender.Target.SYSTEM_OUT); + appenderBuilder.add(builder.newLayout("PatternLayout"). + addAttribute("pattern", "%d [%t] %-5level: %msg%n%throwable")); + appenderBuilder.add(builder.newFilter("MarkerFilter", Filter.Result.DENY, + Filter.Result.NEUTRAL).addAttribute("marker", "FLOW")); + builder.add(appenderBuilder); + + final AppenderComponentBuilder appenderBuilder2 = builder.newAppender("Pulsar", "Pulsar") + .addAttribute("serviceUrl", "pulsar://localhost:6650") + .addAttribute("topic", "my-topic"); + appenderBuilder2.add(builder.newLayout("GelfLayout"). + addAttribute("host", "my-host"). + addComponent(builder.newKeyValuePair("extraField", "extraValue"))); + builder.add(appenderBuilder2); + + builder.add(builder.newLogger("org.apache.logging.log4j", Level.DEBUG, true). + add(builder.newAppenderRef("Stdout")). + addAttribute("additivity", false)); + builder.add(builder.newRootLogger(Level.ERROR).add(builder.newAppenderRef("Stdout"))); + + builder.add(builder.newCustomLevel("Panic", 17)); + + return builder.build(); + } + + @Override + public Configuration getConfiguration(final LoggerContext loggerContext, final ConfigurationSource source) { + return getConfiguration(loggerContext, source.toString(), null); + } + + @Override + public Configuration getConfiguration(final LoggerContext loggerContext, final String name, final URI configLocation) { + final ConfigurationBuilder<BuiltConfiguration> builder = newConfigurationBuilder(); + return addTestFixtures(name, builder); + } + + @Override + protected String[] getSupportedTypes() { + return new String[] {"*"}; + } +} diff --git a/pulsar-log4j2-appender/src/test/resources/PulsarAppenderTest.xml b/pulsar-log4j2-appender/src/test/resources/PulsarAppenderTest.xml new file mode 100644 index 0000000..a5743fb --- /dev/null +++ b/pulsar-log4j2-appender/src/test/resources/PulsarAppenderTest.xml @@ -0,0 +1,49 @@ +<?xml version="1.0" encoding="UTF-8"?> +<!-- + + Licensed to the Apache Software Foundation (ASF) under one + or more contributor license agreements. See the NOTICE file + distributed with this work for additional information + regarding copyright ownership. The ASF licenses this file + to you under the Apache License, Version 2.0 (the + "License"); you may not use this file except in compliance + with the License. You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + + Unless required by applicable law or agreed to in writing, + software distributed under the License is distributed on an + "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + KIND, either express or implied. See the License for the + specific language governing permissions and limitations + under the License. + +--> +<Configuration name="PulsarAppenderTest" status="OFF"> + <Appenders> + <Pulsar name="PulsarAppenderWithLayout" serviceUrl="pulsar://localhost:6650" topic="persistent://t/c/n/pulsar-topic" avoidRecursive="false"> + <PatternLayout pattern="[%m]"/> + </Pulsar> + <Pulsar name="PulsarAppenderWithSerializedLayout" serviceUrl="pulsar://localhost:6650" topic="persistent://t/c/n/pulsar-topic" avoidRecursive="false"> + <SerializedLayout/> + </Pulsar> + <Pulsar name="AsyncPulsarAppender" serviceUrl="pulsar://localhost:6650" topic="persistent://t/c/n/pulsar-topic" avoidRecursive="false"> + <PatternLayout pattern="%m"/> + <Property name="syncSend">false</Property> + </Pulsar> + <Pulsar name="PulsarAppenderWithKey" serviceUrl="pulsar://localhost:6650" topic="persistent://t/c/n/pulsar-topic" key="key" avoidRecursive="false"> + <PatternLayout pattern="%m"/> + </Pulsar> + <Pulsar name="PulsarAppenderWithKeyLookup" serviceUrl="pulsar://localhost:6650" topic="persistent://t/c/n/pulsar-topic" key="$${date:dd-MM-yyyy}" avoidRecursive="false"> + <PatternLayout pattern="%m"/> + </Pulsar> + </Appenders> + <Loggers> + <Root level="info"> + <AppenderRef ref="PulsarAppenderWithLayout"/> + <AppenderRef ref="PulsarAppenderWithSerializedLayout"/> + <AppenderRef ref="AsyncPulsarAppender"/> + <AppenderRef ref="PulsarAppenderWithKey"/> + </Root> + </Loggers> +</Configuration> \ No newline at end of file -- To stop receiving notification emails like this one, please contact si...@apache.org.