Make sure to not use zero timeout for KafkaManager shutdown
Project: http://git-wip-us.apache.org/repos/asf/logging-log4j2/repo Commit: http://git-wip-us.apache.org/repos/asf/logging-log4j2/commit/78cbb44d Tree: http://git-wip-us.apache.org/repos/asf/logging-log4j2/tree/78cbb44d Diff: http://git-wip-us.apache.org/repos/asf/logging-log4j2/diff/78cbb44d Branch: refs/heads/LOG4J2-1390 Commit: 78cbb44ddbbf9f7a80156b555041e41414fa8b1b Parents: 166d3fa Author: Mikael Ståldal <[email protected]> Authored: Thu Sep 29 14:04:36 2016 +0200 Committer: Mikael Ståldal <[email protected]> Committed: Thu Sep 29 14:04:36 2016 +0200 ---------------------------------------------------------------------- .../log4j/core/appender/mom/kafka/KafkaManager.java | 10 +++++++++- 1 file changed, 9 insertions(+), 1 deletion(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/logging-log4j2/blob/78cbb44d/log4j-core/src/main/java/org/apache/logging/log4j/core/appender/mom/kafka/KafkaManager.java ---------------------------------------------------------------------- diff --git a/log4j-core/src/main/java/org/apache/logging/log4j/core/appender/mom/kafka/KafkaManager.java b/log4j-core/src/main/java/org/apache/logging/log4j/core/appender/mom/kafka/KafkaManager.java index 9302beb..ded641c 100644 --- a/log4j-core/src/main/java/org/apache/logging/log4j/core/appender/mom/kafka/KafkaManager.java +++ b/log4j-core/src/main/java/org/apache/logging/log4j/core/appender/mom/kafka/KafkaManager.java @@ -57,6 +57,15 @@ public class KafkaManager extends AbstractManager { @Override public boolean releaseSub(final long timeout, final TimeUnit timeUnit) { + if (timeout > 0) { + closeProducer(timeout, timeUnit); + } else { + closeProducer(timeoutMillis, TimeUnit.MILLISECONDS); + } + return true; + } + + private void closeProducer(final long timeout, final TimeUnit timeUnit) { if (producer != null) { // This thread is a workaround for this Kafka issue: https://issues.apache.org/jira/browse/KAFKA-1660 final Runnable task = new Runnable() { @@ -73,7 +82,6 @@ public class KafkaManager extends AbstractManager { // ignore } } - return true; } public void send(final byte[] msg) throws ExecutionException, InterruptedException, TimeoutException {
