Re: [PR] KAFKA-16477: Detect thread leaked client-metrics-reaper in tests [kafka]
chia7712 merged PR #15668: URL: https://github.com/apache/kafka/pull/15668 -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-16477: Detect thread leaked client-metrics-reaper in tests [kafka]
brandboat commented on code in PR #15668: URL: https://github.com/apache/kafka/pull/15668#discussion_r1556044180 ## server/src/main/java/org/apache/kafka/server/ClientMetricsManager.java: ## @@ -112,7 +113,7 @@ public ClientMetricsManager(ClientMetricsReceiverPlugin receiverPlugin, int clie this.subscriptionMap = new ConcurrentHashMap<>(); this.subscriptionUpdateVersion = new AtomicInteger(0); this.clientInstanceCache = new SynchronizedCache<>(new LRUCache<>(CACHE_MAX_SIZE)); -this.expirationTimer = new SystemTimerReaper("client-metrics-reaper", new SystemTimer("client-metrics")); +this.expirationTimer = new SystemTimerReaper(CLIENT_METRICS_REAPER_THREAD_NAME, new SystemTimer("client-metrics")); Review Comment: Sure, addressed in https://github.com/apache/kafka/pull/15668/commits/2da40c26541e727f2cd38bb9fb64ee41e5ca42c2 -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-16477: Detect thread leaked client-metrics-reaper in tests [kafka]
chia7712 commented on code in PR #15668: URL: https://github.com/apache/kafka/pull/15668#discussion_r1555109656 ## server/src/main/java/org/apache/kafka/server/ClientMetricsManager.java: ## @@ -112,7 +113,7 @@ public ClientMetricsManager(ClientMetricsReceiverPlugin receiverPlugin, int clie this.subscriptionMap = new ConcurrentHashMap<>(); this.subscriptionUpdateVersion = new AtomicInteger(0); this.clientInstanceCache = new SynchronizedCache<>(new LRUCache<>(CACHE_MAX_SIZE)); -this.expirationTimer = new SystemTimerReaper("client-metrics-reaper", new SystemTimer("client-metrics")); +this.expirationTimer = new SystemTimerReaper(CLIENT_METRICS_REAPER_THREAD_NAME, new SystemTimer("client-metrics")); Review Comment: I just notice this response. Could you add `executor-` to `unexpectedThreadNames` to trigger QA again? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-16477: Detect thread leaked client-metrics-reaper in tests [kafka]
brandboat commented on code in PR #15668: URL: https://github.com/apache/kafka/pull/15668#discussion_r1554997766 ## server-common/src/test/java/org/apache/kafka/server/util/timer/TimerTest.java: ## @@ -77,7 +77,7 @@ public void setup() { @AfterEach public void teardown() throws Exception { timer.close(); -TestUtils.waitForCondition(() -> timer.isExecutorTerminated(), "timer excutor not terminated"); +TestUtils.waitForCondition(() -> timer.isTerminated(), "timer executor not terminated"); Review Comment: God... I definitly need to open the IDE check for this. Many thanks -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-16477: Detect thread leaked client-metrics-reaper in tests [kafka]
chia7712 commented on code in PR #15668: URL: https://github.com/apache/kafka/pull/15668#discussion_r1554997443 ## server-common/src/test/java/org/apache/kafka/server/util/timer/TimerTest.java: ## @@ -77,7 +77,7 @@ public void setup() { @AfterEach public void teardown() throws Exception { timer.close(); -TestUtils.waitForCondition(() -> timer.isExecutorTerminated(), "timer excutor not terminated"); +TestUtils.waitForCondition(() -> timer.isTerminated(), "timer executor not terminated"); Review Comment: `timer::isTerminated` ## server-common/src/test/java/org/apache/kafka/server/util/timer/SystemTimerReaperTest.java: ## @@ -59,4 +60,13 @@ public void testReaper() throws Exception { timer.close(); } } + +@Test +public void testReaperClose() throws Exception { +Timer timer = Mockito.mock(Timer.class); +SystemTimerReaper timerReaper = new SystemTimerReaper("reaper", timer); +timerReaper.close(); +Mockito.verify(timer, Mockito.times(1)).close(); +TestUtils.waitForCondition(() -> timerReaper.isShutdown(), "reaper not shutdown"); Review Comment: `timerReaper::isShutdown` -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-16477: Detect thread leaked client-metrics-reaper in tests [kafka]
chia7712 commented on code in PR #15668: URL: https://github.com/apache/kafka/pull/15668#discussion_r1554964050 ## server-common/src/test/java/org/apache/kafka/server/util/timer/TimerTest.java: ## @@ -76,6 +77,7 @@ public void setup() { @AfterEach public void teardown() throws Exception { timer.close(); +TestUtils.waitForCondition(() -> timer.isExecutorTerminated(), "timer excutor not terminated"); Review Comment: typo: excutor -> executor ## server-common/src/test/java/org/apache/kafka/server/util/timer/SystemTimerReaperTest.java: ## @@ -59,4 +60,16 @@ public void testReaper() throws Exception { timer.close(); } } + +@Test +public void testReaperClose() throws Exception { +Timer timer = Mockito.mock(Timer.class); +SystemTimerReaper timerReaper2 = new SystemTimerReaper("reaper", timer); +timerReaper2.close(); +Mockito.verify(timer, Mockito.times(1)).close(); + +SystemTimerReaper timerReaper = new SystemTimerReaper("reaper", new SystemTimer("timer")); +timerReaper.close(); +TestUtils.waitForCondition(() -> timerReaper.isShutdown(), "reaper not shutdown"); Review Comment: we can verity `timerReaper2` instead of `timerReaper` ## server-common/src/main/java/org/apache/kafka/server/util/timer/SystemTimer.java: ## @@ -110,4 +110,9 @@ public int size() { public void close() { taskExecutor.shutdown(); } + +// visible for testing +boolean isExecutorTerminated() { Review Comment: `isTerminated`. we don't need to expose name of inner variable -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-16477: Detect thread leaked client-metrics-reaper in tests [kafka]
brandboat commented on code in PR #15668: URL: https://github.com/apache/kafka/pull/15668#discussion_r1554955481 ## server-common/src/test/java/org/apache/kafka/server/util/timer/SystemTimerReaperTest.java: ## @@ -59,4 +59,11 @@ public void testReaper() throws Exception { timer.close(); } } + +@Test +public void testReaperClose() throws Exception { Review Comment: I've addressed the comments in the latest commit, many thanks ! -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-16477: Detect thread leaked client-metrics-reaper in tests [kafka]
chia7712 commented on code in PR #15668: URL: https://github.com/apache/kafka/pull/15668#discussion_r1554647591 ## server-common/src/main/java/org/apache/kafka/server/util/timer/SystemTimerReaper.java: ## @@ -76,4 +76,17 @@ public void run() {} reaper.awaitShutdown(); timer.close(); } + +// visible for testing +boolean isReaperShutdown() { +return reaper.isShutdownComplete(); +} + +// visible for testing +boolean isTimerShutdown() { Review Comment: We should add UT for `SystemTest#close` instead of casing class here. ## server-common/src/main/java/org/apache/kafka/server/util/timer/SystemTimerReaper.java: ## @@ -76,4 +76,17 @@ public void run() {} reaper.awaitShutdown(); timer.close(); } + +// visible for testing +boolean isReaperShutdown() { Review Comment: `isShutdown` ## server-common/src/test/java/org/apache/kafka/server/util/timer/SystemTimerReaperTest.java: ## @@ -59,4 +59,11 @@ public void testReaper() throws Exception { timer.close(); } } + +@Test +public void testReaperClose() throws Exception { Review Comment: It seems to me the test should check two conditions after calling `close`: 1. `SystemTimerReaper` is not running 2. `SystemTimer#close` happens once -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-16477: Detect thread leaked client-metrics-reaper in tests [kafka]
brandboat commented on code in PR #15668: URL: https://github.com/apache/kafka/pull/15668#discussion_r1554644159 ## server/src/main/java/org/apache/kafka/server/ClientMetricsManager.java: ## @@ -112,7 +113,7 @@ public ClientMetricsManager(ClientMetricsReceiverPlugin receiverPlugin, int clie this.subscriptionMap = new ConcurrentHashMap<>(); this.subscriptionUpdateVersion = new AtomicInteger(0); this.clientInstanceCache = new SynchronizedCache<>(new LRUCache<>(CACHE_MAX_SIZE)); -this.expirationTimer = new SystemTimerReaper("client-metrics-reaper", new SystemTimer("client-metrics")); +this.expirationTimer = new SystemTimerReaper(CLIENT_METRICS_REAPER_THREAD_NAME, new SystemTimer("client-metrics")); Review Comment: Already add a test in 5f220e297990311d631544582ac27d11473ebea4 while the test need some additional methods to check if executorservice/thread is shutdown or not. Or maybe we can simply add `executor-` (the thread prefix used by SystemTimer ) to `unexpectedThreadNames`. Let the unexpectedThreadNames to check if thread are cleanup or not. https://github.com/apache/kafka/blob/dc189a39cc79d45c59fa1762ca9379bdbd85ee34/server-common/src/main/java/org/apache/kafka/server/util/timer/SystemTimer.java#L52 -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-16477: Detect thread leaked client-metrics-reaper in tests [kafka]
chia7712 commented on code in PR #15668: URL: https://github.com/apache/kafka/pull/15668#discussion_r1554626964 ## server/src/main/java/org/apache/kafka/server/ClientMetricsManager.java: ## @@ -112,7 +113,7 @@ public ClientMetricsManager(ClientMetricsReceiverPlugin receiverPlugin, int clie this.subscriptionMap = new ConcurrentHashMap<>(); this.subscriptionUpdateVersion = new AtomicInteger(0); this.clientInstanceCache = new SynchronizedCache<>(new LRUCache<>(CACHE_MAX_SIZE)); -this.expirationTimer = new SystemTimerReaper("client-metrics-reaper", new SystemTimer("client-metrics")); +this.expirationTimer = new SystemTimerReaper(CLIENT_METRICS_REAPER_THREAD_NAME, new SystemTimer("client-metrics")); Review Comment: Could you add test to check `SystemTimerReaper#close` does close the system timer? The system timer has inner thread also. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-16477: Detect thread leaked client-metrics-reaper in tests [kafka]
brandboat commented on PR #15668: URL: https://github.com/apache/kafka/pull/15668#issuecomment-2040977883 > @brandboat nice finding! Should we add the thread prefix to https://github.com/apache/kafka/blob/a2ee0855ee5e73f3a74555d52294bb4acfd28945/core/src/test/scala/unit/kafka/utils/TestUtils.scala#L2487 to avoid similar issue in the future? Oh wow ! I didn't notice that we have this check before. Sure ! Already added the thread name in this check in commit https://github.com/apache/kafka/pull/15668/commits/8daf268f2c99b7a88dde150967c22301c88cc1d6. Many thanks ! -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-16477: Detect thread leaked client-metrics-reaper in tests [kafka]
chia7712 commented on PR #15668: URL: https://github.com/apache/kafka/pull/15668#issuecomment-2039540752 @brandboat nice finding! Should we add the thread prefix to https://github.com/apache/kafka/blob/a2ee0855ee5e73f3a74555d52294bb4acfd28945/core/src/test/scala/unit/kafka/utils/TestUtils.scala#L2487 to avoid similar issue in the future? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org