This is an automated email from the ASF dual-hosted git repository.

bharathkk pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/samza.git


The following commit(s) were added to refs/heads/master by this push:
     new 77dec99be Fix deadlock in StreamAppender (#1652)
77dec99be is described below

commit 77dec99be3d40a0d111dbf22d8b49a3f102d3ba3
Author: jia-gao <94939653+jia-...@users.noreply.github.com>
AuthorDate: Wed Feb 1 13:33:05 2023 -0800

    Fix deadlock in StreamAppender (#1652)
    
    Issue:
    In StreamAppender, there is a synchronized block around setupSystem(), 
which should be called the first time (after loggingContext config is set up) 
any logger logs something.
    It could lead to a deadlock situation. Because if during the setupSystem(), 
any other threads try to do LOG.xxx(), they will be blocked, and if the system 
setup depends on those threads, it leads to deadlock
    
    Change:
    Replace the synchronized block with a tryLock(long time, TimeUnit unit)
    so that other threads won't be blocked forever if setupSystem() stuck or 
timeout
    
    Usage Change:
    If any thread try to log during the time that another thread is setting up 
the stream, it will wait for SET_UP_SYSTEM_TIMEOUT_MILLI_SECONDS. If timeout, 
StreamAppender will skip sending its log event
    
    Note that before the change, the thread will block and wait until the 
stream setting up is done. This behavior opens the chance of deadlock and needs 
to be changed
---
 .../samza/logging/log4j2/StreamAppender.java       |  21 +++-
 .../samza/logging/log4j2/MockSystemAdmin.java      |   8 ++
 .../samza/logging/log4j2/TestStreamAppender.java   | 133 +++++++++++++++++++++
 3 files changed, 157 insertions(+), 5 deletions(-)

diff --git 
a/samza-log4j2/src/main/java/org/apache/samza/logging/log4j2/StreamAppender.java
 
b/samza-log4j2/src/main/java/org/apache/samza/logging/log4j2/StreamAppender.java
index 42c62ca5b..817bde111 100644
--- 
a/samza-log4j2/src/main/java/org/apache/samza/logging/log4j2/StreamAppender.java
+++ 
b/samza-log4j2/src/main/java/org/apache/samza/logging/log4j2/StreamAppender.java
@@ -26,6 +26,8 @@ import java.util.Map;
 import java.util.concurrent.BlockingQueue;
 import java.util.concurrent.LinkedBlockingQueue;
 import java.util.concurrent.TimeUnit;
+import java.util.concurrent.locks.Lock;
+import java.util.concurrent.locks.ReentrantLock;
 import org.apache.logging.log4j.core.Filter;
 import org.apache.logging.log4j.core.Layout;
 import org.apache.logging.log4j.core.LogEvent;
@@ -88,6 +90,8 @@ public class StreamAppender extends AbstractAppender {
   protected volatile boolean systemInitialized = false;
   protected StreamAppenderMetrics metrics;
   protected long queueTimeoutS = DEFAULT_QUEUE_TIMEOUT_S;
+  protected Lock setUpSystemLock = new ReentrantLock();
+  protected static final long SET_UP_SYSTEM_TIMEOUT_MILLI_SECONDS = 10000;
 
   /**
    * Constructor is protected so that this class can be extended.
@@ -186,13 +190,20 @@ public class StreamAppender extends AbstractAppender {
       if (!systemInitialized) {
         // configs are needed to set up producer system, so check that before 
actually initializing
         if (this.loggingContextHolder.getConfig() != null) {
-          synchronized (this) {
-            if (!systemInitialized) {
-              setupSystem();
-              systemInitialized = true;
+          if (setUpSystemLock.tryLock(SET_UP_SYSTEM_TIMEOUT_MILLI_SECONDS, 
TimeUnit.MILLISECONDS)) {
+            try {
+              if (!systemInitialized) {
+                setupSystem();
+                systemInitialized = true;
+              }
+              handleEvent(event);
+            } finally {
+              setUpSystemLock.unlock();
             }
+          } else {
+            // skip sending the log to the stream if setupStream timeout in 10s
+            System.out.println("Waiting for another thread to setupStream 
before log can be handled");
           }
-          handleEvent(event);
         } else {
           // skip sending the log to the stream if initialization can't happen 
yet
           System.out.println("Waiting for config to become available before 
log can be handled");
diff --git 
a/samza-log4j2/src/test/java/org/apache/samza/logging/log4j2/MockSystemAdmin.java
 
b/samza-log4j2/src/test/java/org/apache/samza/logging/log4j2/MockSystemAdmin.java
index be99ad3b7..3989a4cf0 100644
--- 
a/samza-log4j2/src/test/java/org/apache/samza/logging/log4j2/MockSystemAdmin.java
+++ 
b/samza-log4j2/src/test/java/org/apache/samza/logging/log4j2/MockSystemAdmin.java
@@ -19,6 +19,8 @@
 
 package org.apache.samza.logging.log4j2;
 
+import java.util.ArrayList;
+import java.util.List;
 import java.util.Map;
 import java.util.Set;
 import org.apache.samza.system.StreamSpec;
@@ -30,6 +32,7 @@ import org.apache.samza.system.SystemStreamPartition;
 
 public class MockSystemAdmin implements SystemAdmin {
   public static StreamSpec createdStreamSpec = null;
+  public static List<MockSystemAdmin.MockSystemAdminListener> listeners = new 
ArrayList<>();
 
   @Override
   public void start() {
@@ -59,6 +62,7 @@ public class MockSystemAdmin implements SystemAdmin {
   @Override
   public boolean createStream(StreamSpec streamSpec) {
     createdStreamSpec = streamSpec;
+    listeners.forEach(listener -> listener.onCreate(streamSpec));
     return true;
   }
 
@@ -71,4 +75,8 @@ public class MockSystemAdmin implements SystemAdmin {
   public boolean clearStream(StreamSpec streamSpec) {
     return false;
   }
+
+  public interface MockSystemAdminListener {
+    void onCreate(StreamSpec streamSpec);
+  }
 }
diff --git 
a/samza-log4j2/src/test/java/org/apache/samza/logging/log4j2/TestStreamAppender.java
 
b/samza-log4j2/src/test/java/org/apache/samza/logging/log4j2/TestStreamAppender.java
index 495022f5e..eb1d10eb1 100644
--- 
a/samza-log4j2/src/test/java/org/apache/samza/logging/log4j2/TestStreamAppender.java
+++ 
b/samza-log4j2/src/test/java/org/apache/samza/logging/log4j2/TestStreamAppender.java
@@ -58,6 +58,8 @@ import org.junit.Test;
 import org.mockito.Mock;
 import org.mockito.MockitoAnnotations;
 
+import static java.lang.Thread.sleep;
+import static 
org.apache.samza.logging.log4j2.StreamAppender.SET_UP_SYSTEM_TIMEOUT_MILLI_SECONDS;
 import static org.junit.Assert.assertEquals;
 import static org.junit.Assert.assertTrue;
 import static org.junit.Assert.fail;
@@ -406,6 +408,66 @@ public class TestStreamAppender {
     streamAppender.stop();
   }
 
+  @Test
+  public void testSetupStreamTimeout() throws InterruptedException {
+    System.setProperty("samza.container.name", "samza-container-1");
+    MapConfig mapConfig = new MapConfig(new ImmutableMap.Builder<String, 
String>()
+        .put("task.log4j.create.stream.enabled", "true") // Enable explicit 
stream creation
+        .put("job.name", "log4jTest")
+        .put("job.id", "1")
+        .put("systems.mock.samza.factory", 
MockSystemFactory.class.getCanonicalName())
+        .put("task.log4j.system", "mock")
+        .put("job.container.count", "4")
+        .build());
+    when(this.loggingContextHolder.getConfig()).thenReturn(mapConfig);
+    PatternLayout layout = 
PatternLayout.newBuilder().withPattern("%m").build();
+    StreamAppender streamAppender =
+        new StreamAppender("testName", null, layout, false, true, null, 
this.loggingContextHolder);
+    startAndAttachAppender(streamAppender);
+    setupSystemTimeoutAndVerifyMessages(SET_UP_SYSTEM_TIMEOUT_MILLI_SECONDS * 
2);
+    streamAppender.stop();
+  }
+
+  @Test
+  public void testSetupStreamNoTimeout() throws InterruptedException {
+    System.setProperty("samza.container.name", "samza-container-2");
+    MapConfig mapConfig = new MapConfig(new ImmutableMap.Builder<String, 
String>()
+        .put("task.log4j.create.stream.enabled", "true") // Enable explicit 
stream creation
+        .put("job.name", "log4jTest")
+        .put("job.id", "1")
+        .put("systems.mock.samza.factory", 
MockSystemFactory.class.getCanonicalName())
+        .put("task.log4j.system", "mock")
+        .put("job.container.count", "4")
+        .build());
+    when(this.loggingContextHolder.getConfig()).thenReturn(mapConfig);
+    PatternLayout layout = 
PatternLayout.newBuilder().withPattern("%m").build();
+    StreamAppender streamAppender =
+        new StreamAppender("testName", null, layout, false, true, null, 
this.loggingContextHolder);
+    startAndAttachAppender(streamAppender);
+    setupSystemTimeoutAndVerifyMessages(SET_UP_SYSTEM_TIMEOUT_MILLI_SECONDS / 
2);
+    streamAppender.stop();
+  }
+
+  @Test
+  public void testSetupStreamException() throws InterruptedException {
+    System.setProperty("samza.container.name", "samza-container-2");
+    MapConfig mapConfig = new MapConfig(new ImmutableMap.Builder<String, 
String>()
+        .put("task.log4j.create.stream.enabled", "true") // Enable explicit 
stream creation
+        .put("job.name", "log4jTest")
+        .put("job.id", "1")
+        .put("systems.mock.samza.factory", 
MockSystemFactory.class.getCanonicalName())
+        .put("task.log4j.system", "mock")
+        .put("job.container.count", "4")
+        .build());
+    when(this.loggingContextHolder.getConfig()).thenReturn(mapConfig);
+    PatternLayout layout = 
PatternLayout.newBuilder().withPattern("%m").build();
+    StreamAppender streamAppender =
+        new StreamAppender("testName", null, layout, false, true, null, 
this.loggingContextHolder);
+    startAndAttachAppender(streamAppender);
+    setupSystemExceptionAndVerifyMessages();
+    streamAppender.stop();
+  }
+
   private void logConcurrentlyAndVerifyMessages(List<String> messages) throws 
InterruptedException {
     // Set up latch
     final CountDownLatch allMessagesSent = new CountDownLatch(messages.size());
@@ -444,6 +506,77 @@ public class TestStreamAppender {
     assertEquals(messages.size(), MockSystemProducer.messagesReceived.size());
   }
 
+  private void setupSystemTimeoutAndVerifyMessages(long setUpSystemTime) 
throws InterruptedException {
+    MockSystemProducer.listeners.clear();
+    MockSystemProducer.messagesReceived.clear();
+    MockSystemAdmin.listeners.clear();
+    List<String> messages = Lists.newArrayList("testing1", "testing2");
+    // Set up latch
+    final CountDownLatch allMessagesSent = new CountDownLatch(messages.size());
+    MockSystemProducer.listeners.add((source, envelope) -> 
allMessagesSent.countDown());
+    MockSystemAdmin.listeners.add(streamSpec -> {
+      try {
+        // This log should not be sent to system producer as it is logged 
recursively
+        LOG.info("setting up stream");
+        // mock setUpSystem time during createStream() call
+        sleep(setUpSystemTime);
+      } catch (InterruptedException e) {
+        e.printStackTrace();
+      }
+    });
+    logMessagesConcurrentlyWithRandomOrder(messages);
+    // Wait for messages
+    allMessagesSent.await(setUpSystemTime * 4, TimeUnit.MILLISECONDS);
+    // If the setUpSystem time out, verify only one message sent. otherwise, 
verify two messages sent
+    if (setUpSystemTime >= SET_UP_SYSTEM_TIMEOUT_MILLI_SECONDS) {
+      assertEquals(messages.size() - 1, 
MockSystemProducer.messagesReceived.size());
+    } else {
+      assertEquals(messages.size(), 
MockSystemProducer.messagesReceived.size());
+    }
+  }
+
+  private void setupSystemExceptionAndVerifyMessages() throws 
InterruptedException {
+    MockSystemProducer.listeners.clear();
+    MockSystemProducer.messagesReceived.clear();
+    MockSystemAdmin.listeners.clear();
+    List<String> messages = Lists.newArrayList("testing1", "testing2");
+    // Set up latch
+    final CountDownLatch allMessagesSent = new CountDownLatch(messages.size());
+    MockSystemProducer.listeners.add((source, envelope) -> 
allMessagesSent.countDown());
+    MockSystemAdmin.listeners.add(streamSpec -> {
+      // This log should not be sent to system producer as it is logged 
recursively
+      LOG.info("setting up stream");
+      throw new RuntimeException("Exception during setting up stream");
+    });
+    logMessagesConcurrentlyWithRandomOrder(messages);
+    // Wait for messages
+    allMessagesSent.await(SET_UP_SYSTEM_TIMEOUT_MILLI_SECONDS * 2, 
TimeUnit.MILLISECONDS);
+    // verify no messages sent
+    assertEquals(0, MockSystemProducer.messagesReceived.size());
+  }
+
+  private void logMessagesConcurrentlyWithRandomOrder(List<String> messages) {
+    List<ExecutorService> executorServices = new ArrayList<>();
+    for (int i = 0; i < messages.size(); i++) {
+      executorServices.add(Executors.newFixedThreadPool(1));
+    }
+    for (int i = 0; i < messages.size(); i++) {
+      // Log the messages with multiple threads, ensure that each message will 
be handled by one thread.
+      // the threads will sleep a random time so the logging order is random
+      // The sleep time should be Incomparable smaller than 
SET_UP_SYSTEM_TIMEOUT_MILLI_SECONDS to make sure that the threads
+      // do try to acquire the lock of setUpSystem concurrently
+      String message = messages.get(i);
+      executorServices.get(i).submit(() -> {
+        try {
+          sleep((long) (Math.random() * SET_UP_SYSTEM_TIMEOUT_MILLI_SECONDS / 
20));
+        } catch (InterruptedException e) {
+          throw new RuntimeException(e);
+        }
+        LOG.info(message);
+      });
+    }
+  }
+
   private static Config baseConfig() {
     Map<String, String> map = new HashMap<>();
     map.put("job.name", "log4jTest");

Reply via email to