This is an automated email from the ASF dual-hosted git repository.

shanthoosh pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/samza.git


The following commit(s) were added to refs/heads/master by this push:
     new d5d3d989b LISAMZA-27395 removing the current recursive call prevention 
logic (#1641)
d5d3d989b is described below

commit d5d3d989b0b97b55e573b1c895156238b4599d29
Author: jia-gao <94939653+jia-...@users.noreply.github.com>
AuthorDate: Wed Nov 16 19:48:25 2022 -0800

    LISAMZA-27395 removing the current recursive call prevention logic (#1641)
    
    * LISAMZA-27395 removing the current recursive call prevention logic since 
it doesn’t work as expected
    
    * Add unit tests for streamappender change
---
 .../samza/logging/log4j2/StreamAppender.java       | 52 ++++++----------
 .../logging/log4j2/StreamAppenderMetrics.java      |  4 --
 .../samza/logging/log4j2/TestStreamAppender.java   | 69 ++++++++++++++++++++++
 3 files changed, 88 insertions(+), 37 deletions(-)

diff --git 
a/samza-log4j2/src/main/java/org/apache/samza/logging/log4j2/StreamAppender.java
 
b/samza-log4j2/src/main/java/org/apache/samza/logging/log4j2/StreamAppender.java
index c369282b8..42c62ca5b 100644
--- 
a/samza-log4j2/src/main/java/org/apache/samza/logging/log4j2/StreamAppender.java
+++ 
b/samza-log4j2/src/main/java/org/apache/samza/logging/log4j2/StreamAppender.java
@@ -26,7 +26,6 @@ import java.util.Map;
 import java.util.concurrent.BlockingQueue;
 import java.util.concurrent.LinkedBlockingQueue;
 import java.util.concurrent.TimeUnit;
-import java.util.concurrent.atomic.AtomicBoolean;
 import org.apache.logging.log4j.core.Filter;
 import org.apache.logging.log4j.core.Layout;
 import org.apache.logging.log4j.core.LogEvent;
@@ -85,12 +84,6 @@ public class StreamAppender extends AbstractAppender {
   private String streamName = null;
   private final boolean usingAsyncLogger;
   private final LoggingContextHolder loggingContextHolder;
-
-  /**
-   * used to detect if this thread is called recursively
-   */
-  private final AtomicBoolean recursiveCall = new AtomicBoolean(false);
-
   protected static final int DEFAULT_QUEUE_SIZE = 100;
   protected volatile boolean systemInitialized = false;
   protected StreamAppenderMetrics metrics;
@@ -189,37 +182,30 @@ public class StreamAppender extends AbstractAppender {
 
   @Override
   public void append(LogEvent event) {
-    if (!recursiveCall.get()) {
-      try {
-        recursiveCall.set(true);
-        if (!systemInitialized) {
-          // configs are needed to set up producer system, so check that 
before actually initializing
-          if (this.loggingContextHolder.getConfig() != null) {
-            synchronized (this) {
-              if (!systemInitialized) {
-                setupSystem();
-                systemInitialized = true;
-              }
+    try {
+      if (!systemInitialized) {
+        // configs are needed to set up producer system, so check that before 
actually initializing
+        if (this.loggingContextHolder.getConfig() != null) {
+          synchronized (this) {
+            if (!systemInitialized) {
+              setupSystem();
+              systemInitialized = true;
             }
-            handleEvent(event);
-          } else {
-            // skip sending the log to the stream if initialization can't 
happen yet
-            System.out.println("Waiting for config to become available before 
log can be handled");
           }
-        } else {
           handleEvent(event);
+        } else {
+          // skip sending the log to the stream if initialization can't happen 
yet
+          System.out.println("Waiting for config to become available before 
log can be handled");
         }
-      } catch (Exception e) {
-        if (metrics != null) { // setupSystem() may not have been invoked yet 
so metrics can be null here.
-          metrics.logMessagesErrors.inc();
-        }
-        System.err.println(String.format("[%s] Error sending log message:", 
getName()));
-        e.printStackTrace();
-      } finally {
-        recursiveCall.set(false);
+      } else {
+        handleEvent(event);
+      }
+    } catch (Exception e) {
+      if (metrics != null) { // setupSystem() may not have been invoked yet so 
metrics can be null here.
+        metrics.logMessagesErrors.inc();
       }
-    } else if (metrics != null) { // setupSystem() may not have been invoked 
yet so metrics can be null here.
-      metrics.recursiveCalls.inc();
+      System.err.println(String.format("[%s] Error sending log message:", 
getName()));
+      e.printStackTrace();
     }
   }
 
diff --git 
a/samza-log4j2/src/main/java/org/apache/samza/logging/log4j2/StreamAppenderMetrics.java
 
b/samza-log4j2/src/main/java/org/apache/samza/logging/log4j2/StreamAppenderMetrics.java
index 466a520bd..e8e784b08 100644
--- 
a/samza-log4j2/src/main/java/org/apache/samza/logging/log4j2/StreamAppenderMetrics.java
+++ 
b/samza-log4j2/src/main/java/org/apache/samza/logging/log4j2/StreamAppenderMetrics.java
@@ -28,9 +28,6 @@ public class StreamAppenderMetrics extends MetricsBase {
   /** The percentage of the log queue capacity that is currently filled with 
messages from 0 to 100. */
   public final Gauge<Integer> bufferFillPct;
 
-  /** The number of recursive calls to the StreamAppender. These events will 
not be logged. */
-  public final Counter recursiveCalls;
-
   /** The number of log messages dropped e.g. because of buffer overflow. Does 
not include recursive calls. */
   public final Counter logMessagesDropped;
 
@@ -46,7 +43,6 @@ public class StreamAppenderMetrics extends MetricsBase {
   public StreamAppenderMetrics(String prefix, MetricsRegistry registry) {
     super(prefix + "-", registry);
     bufferFillPct = newGauge("buffer-fill-percent", 0);
-    recursiveCalls = newCounter("recursive-calls");
     logMessagesDropped = newCounter("log-messages-dropped");
     logMessagesErrors = newCounter("log-messages-errors");
     logMessagesBytesSent = newCounter("log-messages-bytes-sent");
diff --git 
a/samza-log4j2/src/test/java/org/apache/samza/logging/log4j2/TestStreamAppender.java
 
b/samza-log4j2/src/test/java/org/apache/samza/logging/log4j2/TestStreamAppender.java
index 74437babf..495022f5e 100644
--- 
a/samza-log4j2/src/test/java/org/apache/samza/logging/log4j2/TestStreamAppender.java
+++ 
b/samza-log4j2/src/test/java/org/apache/samza/logging/log4j2/TestStreamAppender.java
@@ -25,6 +25,8 @@ import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
 import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
 import java.util.concurrent.TimeUnit;
 import java.util.concurrent.atomic.AtomicInteger;
 import com.google.common.collect.ImmutableMap;
@@ -375,6 +377,73 @@ public class TestStreamAppender {
     }
   }
 
+  @Test
+  public void testLogConcurrently() throws InterruptedException {
+    System.setProperty("samza.container.name", "samza-container-1");
+
+    PatternLayout layout = 
PatternLayout.newBuilder().withPattern("%m").build();
+    StreamAppender streamAppender =
+        new StreamAppender("testName", null, layout, false, true, null, 
this.loggingContextHolder);
+    startAndAttachAppender(streamAppender);
+    List<String> messages = new ArrayList<>();
+    for (int i = 0; i < 100; i++) {
+      messages.add("testing" + i);
+    }
+    logConcurrentlyAndVerifyMessages(messages);
+    streamAppender.stop();
+  }
+
+  @Test
+  public void testLogRecursively() throws InterruptedException {
+    System.setProperty("samza.container.name", "samza-container-1");
+
+    PatternLayout layout = 
PatternLayout.newBuilder().withPattern("%m").build();
+    StreamAppender streamAppender =
+        new StreamAppender("testName", null, layout, false, true, null, 
this.loggingContextHolder);
+    startAndAttachAppender(streamAppender);
+    List<String> messages = Lists.newArrayList("testing1", "testing2");
+    logRecursivelyAndVerifyMessages(messages);
+    streamAppender.stop();
+  }
+
+  private void logConcurrentlyAndVerifyMessages(List<String> messages) throws 
InterruptedException {
+    // Set up latch
+    final CountDownLatch allMessagesSent = new CountDownLatch(messages.size());
+    MockSystemProducer.listeners.add((source, envelope) -> 
allMessagesSent.countDown());
+    ExecutorService service = Executors.newFixedThreadPool(10);
+
+    // Log the messages concurrently
+    for (String message : messages) {
+      service.submit(() -> {
+        LOG.info(message);
+      });
+    }
+    // Wait for messages
+    assertTrue("Timeout while waiting for StreamAppender to send all messages. 
Count: " + allMessagesSent.getCount(),
+        allMessagesSent.await(60, TimeUnit.SECONDS));
+
+    // MockSystemProducer.messagesReceived is not thread safe, verify 
allMessagesSent CountDownLatch instead
+    assertEquals(0, allMessagesSent.getCount());
+    service.shutdown();
+  }
+
+  private void logRecursivelyAndVerifyMessages(List<String> messages) throws 
InterruptedException {
+    // Set up latch
+    final CountDownLatch allMessagesSent = new CountDownLatch(messages.size());
+    MockSystemProducer.listeners.add((source, envelope) -> {
+      LOG.info("system producer invoked");
+      allMessagesSent.countDown();
+    });
+    // Log the messages
+    messages.forEach(LOG::info);
+    // Wait for messages
+    assertTrue("Timeout while waiting for StreamAppender to send all messages. 
Count: " + allMessagesSent.getCount(),
+        allMessagesSent.await(60, TimeUnit.SECONDS));
+
+    // Verify
+    assertEquals(messages.size(), MockSystemProducer.messagesReceived.size());
+  }
+
   private static Config baseConfig() {
     Map<String, String> map = new HashMap<>();
     map.put("job.name", "log4jTest");

Reply via email to