This is an automated email from the ASF dual-hosted git repository. shanthoosh pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/samza.git
The following commit(s) were added to refs/heads/master by this push: new d5d3d989b LISAMZA-27395 removing the current recursive call prevention logic (#1641) d5d3d989b is described below commit d5d3d989b0b97b55e573b1c895156238b4599d29 Author: jia-gao <94939653+jia-...@users.noreply.github.com> AuthorDate: Wed Nov 16 19:48:25 2022 -0800 LISAMZA-27395 removing the current recursive call prevention logic (#1641) * LISAMZA-27395 removing the current recursive call prevention logic since it doesn’t work as expected * Add unit tests for streamappender change --- .../samza/logging/log4j2/StreamAppender.java | 52 ++++++---------- .../logging/log4j2/StreamAppenderMetrics.java | 4 -- .../samza/logging/log4j2/TestStreamAppender.java | 69 ++++++++++++++++++++++ 3 files changed, 88 insertions(+), 37 deletions(-) diff --git a/samza-log4j2/src/main/java/org/apache/samza/logging/log4j2/StreamAppender.java b/samza-log4j2/src/main/java/org/apache/samza/logging/log4j2/StreamAppender.java index c369282b8..42c62ca5b 100644 --- a/samza-log4j2/src/main/java/org/apache/samza/logging/log4j2/StreamAppender.java +++ b/samza-log4j2/src/main/java/org/apache/samza/logging/log4j2/StreamAppender.java @@ -26,7 +26,6 @@ import java.util.Map; import java.util.concurrent.BlockingQueue; import java.util.concurrent.LinkedBlockingQueue; import java.util.concurrent.TimeUnit; -import java.util.concurrent.atomic.AtomicBoolean; import org.apache.logging.log4j.core.Filter; import org.apache.logging.log4j.core.Layout; import org.apache.logging.log4j.core.LogEvent; @@ -85,12 +84,6 @@ public class StreamAppender extends AbstractAppender { private String streamName = null; private final boolean usingAsyncLogger; private final LoggingContextHolder loggingContextHolder; - - /** - * used to detect if this thread is called recursively - */ - private final AtomicBoolean recursiveCall = new AtomicBoolean(false); - protected static final int DEFAULT_QUEUE_SIZE = 100; protected volatile boolean systemInitialized = false; protected StreamAppenderMetrics metrics; @@ -189,37 +182,30 @@ public class StreamAppender extends AbstractAppender { @Override public void append(LogEvent event) { - if (!recursiveCall.get()) { - try { - recursiveCall.set(true); - if (!systemInitialized) { - // configs are needed to set up producer system, so check that before actually initializing - if (this.loggingContextHolder.getConfig() != null) { - synchronized (this) { - if (!systemInitialized) { - setupSystem(); - systemInitialized = true; - } + try { + if (!systemInitialized) { + // configs are needed to set up producer system, so check that before actually initializing + if (this.loggingContextHolder.getConfig() != null) { + synchronized (this) { + if (!systemInitialized) { + setupSystem(); + systemInitialized = true; } - handleEvent(event); - } else { - // skip sending the log to the stream if initialization can't happen yet - System.out.println("Waiting for config to become available before log can be handled"); } - } else { handleEvent(event); + } else { + // skip sending the log to the stream if initialization can't happen yet + System.out.println("Waiting for config to become available before log can be handled"); } - } catch (Exception e) { - if (metrics != null) { // setupSystem() may not have been invoked yet so metrics can be null here. - metrics.logMessagesErrors.inc(); - } - System.err.println(String.format("[%s] Error sending log message:", getName())); - e.printStackTrace(); - } finally { - recursiveCall.set(false); + } else { + handleEvent(event); + } + } catch (Exception e) { + if (metrics != null) { // setupSystem() may not have been invoked yet so metrics can be null here. + metrics.logMessagesErrors.inc(); } - } else if (metrics != null) { // setupSystem() may not have been invoked yet so metrics can be null here. - metrics.recursiveCalls.inc(); + System.err.println(String.format("[%s] Error sending log message:", getName())); + e.printStackTrace(); } } diff --git a/samza-log4j2/src/main/java/org/apache/samza/logging/log4j2/StreamAppenderMetrics.java b/samza-log4j2/src/main/java/org/apache/samza/logging/log4j2/StreamAppenderMetrics.java index 466a520bd..e8e784b08 100644 --- a/samza-log4j2/src/main/java/org/apache/samza/logging/log4j2/StreamAppenderMetrics.java +++ b/samza-log4j2/src/main/java/org/apache/samza/logging/log4j2/StreamAppenderMetrics.java @@ -28,9 +28,6 @@ public class StreamAppenderMetrics extends MetricsBase { /** The percentage of the log queue capacity that is currently filled with messages from 0 to 100. */ public final Gauge<Integer> bufferFillPct; - /** The number of recursive calls to the StreamAppender. These events will not be logged. */ - public final Counter recursiveCalls; - /** The number of log messages dropped e.g. because of buffer overflow. Does not include recursive calls. */ public final Counter logMessagesDropped; @@ -46,7 +43,6 @@ public class StreamAppenderMetrics extends MetricsBase { public StreamAppenderMetrics(String prefix, MetricsRegistry registry) { super(prefix + "-", registry); bufferFillPct = newGauge("buffer-fill-percent", 0); - recursiveCalls = newCounter("recursive-calls"); logMessagesDropped = newCounter("log-messages-dropped"); logMessagesErrors = newCounter("log-messages-errors"); logMessagesBytesSent = newCounter("log-messages-bytes-sent"); diff --git a/samza-log4j2/src/test/java/org/apache/samza/logging/log4j2/TestStreamAppender.java b/samza-log4j2/src/test/java/org/apache/samza/logging/log4j2/TestStreamAppender.java index 74437babf..495022f5e 100644 --- a/samza-log4j2/src/test/java/org/apache/samza/logging/log4j2/TestStreamAppender.java +++ b/samza-log4j2/src/test/java/org/apache/samza/logging/log4j2/TestStreamAppender.java @@ -25,6 +25,8 @@ import java.util.HashMap; import java.util.List; import java.util.Map; import java.util.concurrent.CountDownLatch; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.Executors; import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicInteger; import com.google.common.collect.ImmutableMap; @@ -375,6 +377,73 @@ public class TestStreamAppender { } } + @Test + public void testLogConcurrently() throws InterruptedException { + System.setProperty("samza.container.name", "samza-container-1"); + + PatternLayout layout = PatternLayout.newBuilder().withPattern("%m").build(); + StreamAppender streamAppender = + new StreamAppender("testName", null, layout, false, true, null, this.loggingContextHolder); + startAndAttachAppender(streamAppender); + List<String> messages = new ArrayList<>(); + for (int i = 0; i < 100; i++) { + messages.add("testing" + i); + } + logConcurrentlyAndVerifyMessages(messages); + streamAppender.stop(); + } + + @Test + public void testLogRecursively() throws InterruptedException { + System.setProperty("samza.container.name", "samza-container-1"); + + PatternLayout layout = PatternLayout.newBuilder().withPattern("%m").build(); + StreamAppender streamAppender = + new StreamAppender("testName", null, layout, false, true, null, this.loggingContextHolder); + startAndAttachAppender(streamAppender); + List<String> messages = Lists.newArrayList("testing1", "testing2"); + logRecursivelyAndVerifyMessages(messages); + streamAppender.stop(); + } + + private void logConcurrentlyAndVerifyMessages(List<String> messages) throws InterruptedException { + // Set up latch + final CountDownLatch allMessagesSent = new CountDownLatch(messages.size()); + MockSystemProducer.listeners.add((source, envelope) -> allMessagesSent.countDown()); + ExecutorService service = Executors.newFixedThreadPool(10); + + // Log the messages concurrently + for (String message : messages) { + service.submit(() -> { + LOG.info(message); + }); + } + // Wait for messages + assertTrue("Timeout while waiting for StreamAppender to send all messages. Count: " + allMessagesSent.getCount(), + allMessagesSent.await(60, TimeUnit.SECONDS)); + + // MockSystemProducer.messagesReceived is not thread safe, verify allMessagesSent CountDownLatch instead + assertEquals(0, allMessagesSent.getCount()); + service.shutdown(); + } + + private void logRecursivelyAndVerifyMessages(List<String> messages) throws InterruptedException { + // Set up latch + final CountDownLatch allMessagesSent = new CountDownLatch(messages.size()); + MockSystemProducer.listeners.add((source, envelope) -> { + LOG.info("system producer invoked"); + allMessagesSent.countDown(); + }); + // Log the messages + messages.forEach(LOG::info); + // Wait for messages + assertTrue("Timeout while waiting for StreamAppender to send all messages. Count: " + allMessagesSent.getCount(), + allMessagesSent.await(60, TimeUnit.SECONDS)); + + // Verify + assertEquals(messages.size(), MockSystemProducer.messagesReceived.size()); + } + private static Config baseConfig() { Map<String, String> map = new HashMap<>(); map.put("job.name", "log4jTest");