This is an automated email from the ASF dual-hosted git repository.

showuon pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/kafka.git


The following commit(s) were added to refs/heads/trunk by this push:
     new 5166890f3e7 KAFKA-14242: use mock managers to avoid duplicated 
resource allocation (#12639)
5166890f3e7 is described below

commit 5166890f3e7b7ffa90d0bf3a4dcd9fca46f6b1c6
Author: Luke Chen <[email protected]>
AuthorDate: Mon Nov 28 10:37:40 2022 +0800

    KAFKA-14242: use mock managers to avoid duplicated resource allocation 
(#12639)
    
    Recently, we got a lot of build failed (and terminated) with core:unitTest 
failure. The failed messages look like this:
    
    FAILURE: Build failed with an exception.
    [2022-09-14T09:51:52.190Z]
    [2022-09-14T09:51:52.190Z] * What went wrong:
    [2022-09-14T09:51:52.190Z] Execution failed for task ':core:unitTest'.
    [2022-09-14T09:51:52.190Z] > Process 'Gradle Test Executor 128' finished 
with non-zero exit value 1
    
    After investigation, I found one reason of it (maybe there are other 
reasons). In 
BrokerMetadataPublisherTest#testReloadUpdatedFilesWithoutConfigChange test, we 
created logManager twice, but when cleanup, we only close one of them. So, 
there will be a log cleaner keeping running. But during this time, the temp log 
dirs are deleted, so it will Exit.halt(1), and got the error we saw in gradle, 
like this code did when we encounter IOException in all our log dirs:
    
    fatal(s"Shutdown broker because all log dirs in ${logDirs.mkString(", ")} 
have failed")
    Exit.halt(1)
    
    And, why does it sometimes pass, sometimes failed? Because during test 
cluster close, we shutdown broker first, and then other components. And the log 
cleaner is triggered in an interval. So, if the cluster can close fast enough, 
and finish this test, it'll be passed. Otherwise, it'll exit with 1.
    
    Fixed it by mock log manager and other managers in mock publisher to avoid 
duplicate resource allocation. This change won't change the original test goal 
since we only want to make sure publisher will invoke 
reloadUpdatedFilesWithoutConfigChange when necessary.
    
    Reviewers: dengziming <[email protected]>
---
 .../metadata/BrokerMetadataPublisherTest.scala     | 33 ++++++++++++++++------
 1 file changed, 24 insertions(+), 9 deletions(-)

diff --git 
a/core/src/test/scala/unit/kafka/server/metadata/BrokerMetadataPublisherTest.scala
 
b/core/src/test/scala/unit/kafka/server/metadata/BrokerMetadataPublisherTest.scala
index 472d7ef550b..b0936d12f3e 100644
--- 
a/core/src/test/scala/unit/kafka/server/metadata/BrokerMetadataPublisherTest.scala
+++ 
b/core/src/test/scala/unit/kafka/server/metadata/BrokerMetadataPublisherTest.scala
@@ -17,11 +17,14 @@
 
 package kafka.server.metadata
 
+import kafka.coordinator.group.GroupCoordinator
+import kafka.coordinator.transaction.TransactionCoordinator
+
 import java.util.Collections.{singleton, singletonList, singletonMap}
 import java.util.Properties
 import java.util.concurrent.atomic.{AtomicInteger, AtomicReference}
-import kafka.log.UnifiedLog
-import kafka.server.{BrokerServer, KafkaConfig}
+import kafka.log.{LogManager, UnifiedLog}
+import kafka.server.{BrokerServer, KafkaConfig, ReplicaManager}
 import kafka.testkit.{KafkaClusterTestKit, TestKitNodes}
 import kafka.utils.TestUtils
 import org.apache.kafka.clients.admin.AlterConfigOp.OpType.SET
@@ -35,7 +38,7 @@ import org.apache.kafka.metadata.LeaderRecoveryState
 import org.apache.kafka.metadata.PartitionRegistration
 import org.apache.kafka.server.fault.{FaultHandler, MockFaultHandler}
 import org.junit.jupiter.api.Assertions.{assertEquals, assertNotNull, 
assertTrue}
-import org.junit.jupiter.api.{AfterEach, BeforeEach, Disabled, Test}
+import org.junit.jupiter.api.{AfterEach, BeforeEach, Test}
 import org.mockito.ArgumentMatchers.any
 import org.mockito.Mockito
 import org.mockito.Mockito.doThrow
@@ -178,15 +181,21 @@ class BrokerMetadataPublisherTest {
 
   private def newMockPublisher(
     broker: BrokerServer,
+    logManager: LogManager,
+    replicaManager: ReplicaManager,
+    groupCoordinator: GroupCoordinator,
+    txnCoordinator: TransactionCoordinator,
     errorHandler: FaultHandler = new MockFaultHandler("publisher")
   ): BrokerMetadataPublisher = {
+    val mockLogManager = Mockito.mock(classOf[LogManager])
+    Mockito.when(mockLogManager.allLogs).thenReturn(Iterable.empty[UnifiedLog])
     Mockito.spy(new BrokerMetadataPublisher(
       conf = broker.config,
       metadataCache = broker.metadataCache,
-      logManager = broker.logManager,
-      replicaManager = broker.replicaManager,
-      groupCoordinator = broker.groupCoordinator,
-      txnCoordinator = broker.transactionCoordinator,
+      logManager,
+      replicaManager,
+      groupCoordinator,
+      txnCoordinator,
       clientQuotaMetadataManager = broker.clientQuotaMetadataManager,
       dynamicConfigHandlers = broker.dynamicConfigHandlers.toMap,
       _authorizer = Option.empty,
@@ -195,7 +204,6 @@ class BrokerMetadataPublisherTest {
     ))
   }
 
-  @Disabled
   @Test
   def testReloadUpdatedFilesWithoutConfigChange(): Unit = {
     val cluster = new KafkaClusterTestKit.Builder(
@@ -207,7 +215,14 @@ class BrokerMetadataPublisherTest {
       cluster.startup()
       cluster.waitForReadyBrokers()
       val broker = cluster.brokers().values().iterator().next()
-      val publisher = newMockPublisher(broker)
+      val mockLogManager = Mockito.mock(classOf[LogManager])
+      
Mockito.when(mockLogManager.allLogs).thenReturn(Iterable.empty[UnifiedLog])
+      val mockReplicaManager = Mockito.mock(classOf[ReplicaManager])
+      val mockGroupCoordinator = Mockito.mock(classOf[GroupCoordinator])
+      val mockTxnCoordinator = Mockito.mock(classOf[TransactionCoordinator])
+
+      val publisher = newMockPublisher(broker, mockLogManager, 
mockReplicaManager, mockGroupCoordinator, mockTxnCoordinator)
+
       val numTimesReloadCalled = new AtomicInteger(0)
       
Mockito.when(publisher.reloadUpdatedFilesWithoutConfigChange(any[Properties]())).
         thenAnswer(new Answer[Unit]() {

Reply via email to