This is an automated email from the ASF dual-hosted git repository. sijie pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/incubator-pulsar.git
The following commit(s) were added to refs/heads/master by this push: new 86b9546 Send Log statements to log Topic for Java Functions (#1447) 86b9546 is described below commit 86b95461602b1f813e41f382eac41a6b2a31ee63 Author: Sanjeev Kulkarni <sanjee...@gmail.com> AuthorDate: Tue Mar 27 11:58:41 2018 -0700 Send Log statements to log Topic for Java Functions (#1447) * Added LogTopic support to Java functions * UseLogAppender instead of PulsarAppender * Reverted changes to PulsarAppender * No need to take dep on pulsar-log4j plugin * Start the appender * Fixed comments * Reverted back logging function change --- .../functions/instance/JavaInstanceRunnable.java | 38 +++++++ .../pulsar/functions/instance/LogAppender.java | 121 +++++++++++++++++++++ 2 files changed, 159 insertions(+) diff --git a/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/instance/JavaInstanceRunnable.java b/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/instance/JavaInstanceRunnable.java index 3d1934c..510608f 100644 --- a/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/instance/JavaInstanceRunnable.java +++ b/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/instance/JavaInstanceRunnable.java @@ -45,6 +45,9 @@ import org.apache.bookkeeper.clients.exceptions.StreamNotFoundException; import org.apache.bookkeeper.clients.utils.NetUtils; import org.apache.bookkeeper.stream.proto.NamespaceConfiguration; import org.apache.logging.log4j.ThreadContext; +import org.apache.logging.log4j.core.LoggerContext; +import org.apache.logging.log4j.core.config.Configuration; +import org.apache.logging.log4j.core.config.LoggerConfig; import org.apache.pulsar.client.api.*; import org.apache.pulsar.client.api.PulsarClientException.ProducerBusyException; import org.apache.pulsar.client.impl.MessageIdImpl; @@ -85,6 +88,7 @@ public class JavaInstanceRunnable implements AutoCloseable, Runnable, ConsumerEv @Getter(AccessLevel.PACKAGE) private final Map<String, Consumer> inputConsumers; private LinkedList<String> inputTopicsToResubscribe = null; + private LogAppender logAppender; // provide tables for storing states private final String stateStorageServiceUrl; @@ -188,6 +192,8 @@ public class JavaInstanceRunnable implements AutoCloseable, Runnable, ConsumerEv startOutputProducer(); // start the input consumer startInputConsumer(); + // start any log topic handler + setupLogHandler(); return new JavaInstance(instanceConfig, object, clsLoader, client, inputConsumers); } @@ -262,10 +268,12 @@ public class JavaInstanceRunnable implements AutoCloseable, Runnable, ConsumerEv } long processAt = System.currentTimeMillis(); stats.incrementProcessed(processAt); + addLogTopicHandler(); result = javaInstance.handleMessage( msg.getActualMessage().getMessageId(), msg.getTopicName(), input); + removeLogTopicHandler(); long doneProcessing = System.currentTimeMillis(); log.debug("Got result: {}", result.getResult()); @@ -755,4 +763,34 @@ public class JavaInstanceRunnable implements AutoCloseable, Runnable, ConsumerEv } } } + + private void setupLogHandler() { + if (instanceConfig.getFunctionConfig().getLogTopic() != null && + !instanceConfig.getFunctionConfig().getLogTopic().isEmpty()) { + logAppender = new LogAppender(client, instanceConfig.getFunctionConfig().getLogTopic(), + FunctionConfigUtils.getFullyQualifiedName(instanceConfig.getFunctionConfig())); + logAppender.start(); + } + } + + private void addLogTopicHandler() { + if (logAppender == null) return; + LoggerContext context = LoggerContext.getContext(false); + Configuration config = context.getConfiguration(); + config.addAppender(logAppender); + for (final LoggerConfig loggerConfig : config.getLoggers().values()) { + loggerConfig.addAppender(logAppender, null, null); + } + config.getRootLogger().addAppender(logAppender, null, null); + } + + private void removeLogTopicHandler() { + if (logAppender == null) return; + LoggerContext context = LoggerContext.getContext(false); + Configuration config = context.getConfiguration(); + for (final LoggerConfig loggerConfig : config.getLoggers().values()) { + loggerConfig.removeAppender(logAppender.getName()); + } + config.getRootLogger().removeAppender(logAppender.getName()); + } } diff --git a/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/instance/LogAppender.java b/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/instance/LogAppender.java new file mode 100644 index 0000000..3dd71a2 --- /dev/null +++ b/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/instance/LogAppender.java @@ -0,0 +1,121 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.pulsar.functions.instance; + +import org.apache.logging.log4j.core.*; +import org.apache.pulsar.client.api.Producer; +import org.apache.pulsar.client.api.ProducerBuilder; +import org.apache.pulsar.client.api.PulsarClient; + +import java.io.Serializable; +import java.util.concurrent.TimeUnit; + +/** + * LogAppender class that is used to send log statements from Pulsar Functions logger + * to a log topic. + */ +public class LogAppender implements Appender { + private PulsarClient pulsarClient; + private String logTopic; + private String fqn; + private State state; + private ErrorHandler errorHandler; + private Producer<byte[]> producer; + + public LogAppender(PulsarClient pulsarClient, String logTopic, String fqn) { + this.pulsarClient = pulsarClient; + this.logTopic = logTopic; + this.fqn = fqn; + } + + @Override + public void append(LogEvent logEvent) { + producer.sendAsync(logEvent.getMessage().getFormattedMessage().getBytes()); + } + + @Override + public String getName() { + return fqn; + } + + @Override + public Layout<? extends Serializable> getLayout() { + return null; + } + + @Override + public boolean ignoreExceptions() { + return false; + } + + @Override + public ErrorHandler getHandler() { + return errorHandler; + } + + @Override + public void setHandler(ErrorHandler errorHandler) { + this.errorHandler = errorHandler; + } + + @Override + public State getState() { + return state; + } + + @Override + public void initialize() { + this.state = State.INITIALIZED; + } + + @Override + public void start() { + this.state = State.STARTING; + try { + ProducerBuilder<byte[]> producerBuilder = pulsarClient.newProducer() + .topic(logTopic) + .producerName(fqn) + .blockIfQueueFull(false) + .enableBatching(true) + .batchingMaxPublishDelay(10, TimeUnit.MILLISECONDS); + producer = producerBuilder.create(); + } catch (Exception e) { + throw new RuntimeException("Error starting LogTopic Producer", e); + } + this.state = State.STARTED; + } + + @Override + public void stop() { + this.state = State.STOPPING; + producer.closeAsync(); + producer = null; + this.state = State.STOPPED; + } + + @Override + public boolean isStarted() { + return state == State.STARTED; + } + + @Override + public boolean isStopped() { + return state == State.STOPPED; + } +} -- To stop receiving notification emails like this one, please contact si...@apache.org.