This is an automated email from the ASF dual-hosted git repository. bharathkk pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/samza.git
The following commit(s) were added to refs/heads/master by this push: new 77dec99be Fix deadlock in StreamAppender (#1652) 77dec99be is described below commit 77dec99be3d40a0d111dbf22d8b49a3f102d3ba3 Author: jia-gao <94939653+jia-...@users.noreply.github.com> AuthorDate: Wed Feb 1 13:33:05 2023 -0800 Fix deadlock in StreamAppender (#1652) Issue: In StreamAppender, there is a synchronized block around setupSystem(), which should be called the first time (after loggingContext config is set up) any logger logs something. It could lead to a deadlock situation. Because if during the setupSystem(), any other threads try to do LOG.xxx(), they will be blocked, and if the system setup depends on those threads, it leads to deadlock Change: Replace the synchronized block with a tryLock(long time, TimeUnit unit) so that other threads won't be blocked forever if setupSystem() stuck or timeout Usage Change: If any thread try to log during the time that another thread is setting up the stream, it will wait for SET_UP_SYSTEM_TIMEOUT_MILLI_SECONDS. If timeout, StreamAppender will skip sending its log event Note that before the change, the thread will block and wait until the stream setting up is done. This behavior opens the chance of deadlock and needs to be changed --- .../samza/logging/log4j2/StreamAppender.java | 21 +++- .../samza/logging/log4j2/MockSystemAdmin.java | 8 ++ .../samza/logging/log4j2/TestStreamAppender.java | 133 +++++++++++++++++++++ 3 files changed, 157 insertions(+), 5 deletions(-) diff --git a/samza-log4j2/src/main/java/org/apache/samza/logging/log4j2/StreamAppender.java b/samza-log4j2/src/main/java/org/apache/samza/logging/log4j2/StreamAppender.java index 42c62ca5b..817bde111 100644 --- a/samza-log4j2/src/main/java/org/apache/samza/logging/log4j2/StreamAppender.java +++ b/samza-log4j2/src/main/java/org/apache/samza/logging/log4j2/StreamAppender.java @@ -26,6 +26,8 @@ import java.util.Map; import java.util.concurrent.BlockingQueue; import java.util.concurrent.LinkedBlockingQueue; import java.util.concurrent.TimeUnit; +import java.util.concurrent.locks.Lock; +import java.util.concurrent.locks.ReentrantLock; import org.apache.logging.log4j.core.Filter; import org.apache.logging.log4j.core.Layout; import org.apache.logging.log4j.core.LogEvent; @@ -88,6 +90,8 @@ public class StreamAppender extends AbstractAppender { protected volatile boolean systemInitialized = false; protected StreamAppenderMetrics metrics; protected long queueTimeoutS = DEFAULT_QUEUE_TIMEOUT_S; + protected Lock setUpSystemLock = new ReentrantLock(); + protected static final long SET_UP_SYSTEM_TIMEOUT_MILLI_SECONDS = 10000; /** * Constructor is protected so that this class can be extended. @@ -186,13 +190,20 @@ public class StreamAppender extends AbstractAppender { if (!systemInitialized) { // configs are needed to set up producer system, so check that before actually initializing if (this.loggingContextHolder.getConfig() != null) { - synchronized (this) { - if (!systemInitialized) { - setupSystem(); - systemInitialized = true; + if (setUpSystemLock.tryLock(SET_UP_SYSTEM_TIMEOUT_MILLI_SECONDS, TimeUnit.MILLISECONDS)) { + try { + if (!systemInitialized) { + setupSystem(); + systemInitialized = true; + } + handleEvent(event); + } finally { + setUpSystemLock.unlock(); } + } else { + // skip sending the log to the stream if setupStream timeout in 10s + System.out.println("Waiting for another thread to setupStream before log can be handled"); } - handleEvent(event); } else { // skip sending the log to the stream if initialization can't happen yet System.out.println("Waiting for config to become available before log can be handled"); diff --git a/samza-log4j2/src/test/java/org/apache/samza/logging/log4j2/MockSystemAdmin.java b/samza-log4j2/src/test/java/org/apache/samza/logging/log4j2/MockSystemAdmin.java index be99ad3b7..3989a4cf0 100644 --- a/samza-log4j2/src/test/java/org/apache/samza/logging/log4j2/MockSystemAdmin.java +++ b/samza-log4j2/src/test/java/org/apache/samza/logging/log4j2/MockSystemAdmin.java @@ -19,6 +19,8 @@ package org.apache.samza.logging.log4j2; +import java.util.ArrayList; +import java.util.List; import java.util.Map; import java.util.Set; import org.apache.samza.system.StreamSpec; @@ -30,6 +32,7 @@ import org.apache.samza.system.SystemStreamPartition; public class MockSystemAdmin implements SystemAdmin { public static StreamSpec createdStreamSpec = null; + public static List<MockSystemAdmin.MockSystemAdminListener> listeners = new ArrayList<>(); @Override public void start() { @@ -59,6 +62,7 @@ public class MockSystemAdmin implements SystemAdmin { @Override public boolean createStream(StreamSpec streamSpec) { createdStreamSpec = streamSpec; + listeners.forEach(listener -> listener.onCreate(streamSpec)); return true; } @@ -71,4 +75,8 @@ public class MockSystemAdmin implements SystemAdmin { public boolean clearStream(StreamSpec streamSpec) { return false; } + + public interface MockSystemAdminListener { + void onCreate(StreamSpec streamSpec); + } } diff --git a/samza-log4j2/src/test/java/org/apache/samza/logging/log4j2/TestStreamAppender.java b/samza-log4j2/src/test/java/org/apache/samza/logging/log4j2/TestStreamAppender.java index 495022f5e..eb1d10eb1 100644 --- a/samza-log4j2/src/test/java/org/apache/samza/logging/log4j2/TestStreamAppender.java +++ b/samza-log4j2/src/test/java/org/apache/samza/logging/log4j2/TestStreamAppender.java @@ -58,6 +58,8 @@ import org.junit.Test; import org.mockito.Mock; import org.mockito.MockitoAnnotations; +import static java.lang.Thread.sleep; +import static org.apache.samza.logging.log4j2.StreamAppender.SET_UP_SYSTEM_TIMEOUT_MILLI_SECONDS; import static org.junit.Assert.assertEquals; import static org.junit.Assert.assertTrue; import static org.junit.Assert.fail; @@ -406,6 +408,66 @@ public class TestStreamAppender { streamAppender.stop(); } + @Test + public void testSetupStreamTimeout() throws InterruptedException { + System.setProperty("samza.container.name", "samza-container-1"); + MapConfig mapConfig = new MapConfig(new ImmutableMap.Builder<String, String>() + .put("task.log4j.create.stream.enabled", "true") // Enable explicit stream creation + .put("job.name", "log4jTest") + .put("job.id", "1") + .put("systems.mock.samza.factory", MockSystemFactory.class.getCanonicalName()) + .put("task.log4j.system", "mock") + .put("job.container.count", "4") + .build()); + when(this.loggingContextHolder.getConfig()).thenReturn(mapConfig); + PatternLayout layout = PatternLayout.newBuilder().withPattern("%m").build(); + StreamAppender streamAppender = + new StreamAppender("testName", null, layout, false, true, null, this.loggingContextHolder); + startAndAttachAppender(streamAppender); + setupSystemTimeoutAndVerifyMessages(SET_UP_SYSTEM_TIMEOUT_MILLI_SECONDS * 2); + streamAppender.stop(); + } + + @Test + public void testSetupStreamNoTimeout() throws InterruptedException { + System.setProperty("samza.container.name", "samza-container-2"); + MapConfig mapConfig = new MapConfig(new ImmutableMap.Builder<String, String>() + .put("task.log4j.create.stream.enabled", "true") // Enable explicit stream creation + .put("job.name", "log4jTest") + .put("job.id", "1") + .put("systems.mock.samza.factory", MockSystemFactory.class.getCanonicalName()) + .put("task.log4j.system", "mock") + .put("job.container.count", "4") + .build()); + when(this.loggingContextHolder.getConfig()).thenReturn(mapConfig); + PatternLayout layout = PatternLayout.newBuilder().withPattern("%m").build(); + StreamAppender streamAppender = + new StreamAppender("testName", null, layout, false, true, null, this.loggingContextHolder); + startAndAttachAppender(streamAppender); + setupSystemTimeoutAndVerifyMessages(SET_UP_SYSTEM_TIMEOUT_MILLI_SECONDS / 2); + streamAppender.stop(); + } + + @Test + public void testSetupStreamException() throws InterruptedException { + System.setProperty("samza.container.name", "samza-container-2"); + MapConfig mapConfig = new MapConfig(new ImmutableMap.Builder<String, String>() + .put("task.log4j.create.stream.enabled", "true") // Enable explicit stream creation + .put("job.name", "log4jTest") + .put("job.id", "1") + .put("systems.mock.samza.factory", MockSystemFactory.class.getCanonicalName()) + .put("task.log4j.system", "mock") + .put("job.container.count", "4") + .build()); + when(this.loggingContextHolder.getConfig()).thenReturn(mapConfig); + PatternLayout layout = PatternLayout.newBuilder().withPattern("%m").build(); + StreamAppender streamAppender = + new StreamAppender("testName", null, layout, false, true, null, this.loggingContextHolder); + startAndAttachAppender(streamAppender); + setupSystemExceptionAndVerifyMessages(); + streamAppender.stop(); + } + private void logConcurrentlyAndVerifyMessages(List<String> messages) throws InterruptedException { // Set up latch final CountDownLatch allMessagesSent = new CountDownLatch(messages.size()); @@ -444,6 +506,77 @@ public class TestStreamAppender { assertEquals(messages.size(), MockSystemProducer.messagesReceived.size()); } + private void setupSystemTimeoutAndVerifyMessages(long setUpSystemTime) throws InterruptedException { + MockSystemProducer.listeners.clear(); + MockSystemProducer.messagesReceived.clear(); + MockSystemAdmin.listeners.clear(); + List<String> messages = Lists.newArrayList("testing1", "testing2"); + // Set up latch + final CountDownLatch allMessagesSent = new CountDownLatch(messages.size()); + MockSystemProducer.listeners.add((source, envelope) -> allMessagesSent.countDown()); + MockSystemAdmin.listeners.add(streamSpec -> { + try { + // This log should not be sent to system producer as it is logged recursively + LOG.info("setting up stream"); + // mock setUpSystem time during createStream() call + sleep(setUpSystemTime); + } catch (InterruptedException e) { + e.printStackTrace(); + } + }); + logMessagesConcurrentlyWithRandomOrder(messages); + // Wait for messages + allMessagesSent.await(setUpSystemTime * 4, TimeUnit.MILLISECONDS); + // If the setUpSystem time out, verify only one message sent. otherwise, verify two messages sent + if (setUpSystemTime >= SET_UP_SYSTEM_TIMEOUT_MILLI_SECONDS) { + assertEquals(messages.size() - 1, MockSystemProducer.messagesReceived.size()); + } else { + assertEquals(messages.size(), MockSystemProducer.messagesReceived.size()); + } + } + + private void setupSystemExceptionAndVerifyMessages() throws InterruptedException { + MockSystemProducer.listeners.clear(); + MockSystemProducer.messagesReceived.clear(); + MockSystemAdmin.listeners.clear(); + List<String> messages = Lists.newArrayList("testing1", "testing2"); + // Set up latch + final CountDownLatch allMessagesSent = new CountDownLatch(messages.size()); + MockSystemProducer.listeners.add((source, envelope) -> allMessagesSent.countDown()); + MockSystemAdmin.listeners.add(streamSpec -> { + // This log should not be sent to system producer as it is logged recursively + LOG.info("setting up stream"); + throw new RuntimeException("Exception during setting up stream"); + }); + logMessagesConcurrentlyWithRandomOrder(messages); + // Wait for messages + allMessagesSent.await(SET_UP_SYSTEM_TIMEOUT_MILLI_SECONDS * 2, TimeUnit.MILLISECONDS); + // verify no messages sent + assertEquals(0, MockSystemProducer.messagesReceived.size()); + } + + private void logMessagesConcurrentlyWithRandomOrder(List<String> messages) { + List<ExecutorService> executorServices = new ArrayList<>(); + for (int i = 0; i < messages.size(); i++) { + executorServices.add(Executors.newFixedThreadPool(1)); + } + for (int i = 0; i < messages.size(); i++) { + // Log the messages with multiple threads, ensure that each message will be handled by one thread. + // the threads will sleep a random time so the logging order is random + // The sleep time should be Incomparable smaller than SET_UP_SYSTEM_TIMEOUT_MILLI_SECONDS to make sure that the threads + // do try to acquire the lock of setUpSystem concurrently + String message = messages.get(i); + executorServices.get(i).submit(() -> { + try { + sleep((long) (Math.random() * SET_UP_SYSTEM_TIMEOUT_MILLI_SECONDS / 20)); + } catch (InterruptedException e) { + throw new RuntimeException(e); + } + LOG.info(message); + }); + } + } + private static Config baseConfig() { Map<String, String> map = new HashMap<>(); map.put("job.name", "log4jTest");