apoorvmittal10 commented on code in PR #14770:
URL: https://github.com/apache/kafka/pull/14770#discussion_r1403234553


##########
core/src/test/scala/unit/kafka/server/BrokerLifecycleManagerTest.scala:
##########
@@ -207,34 +207,35 @@ class BrokerLifecycleManagerTest {
   }
 
   @Test
-  def testOfflineDirsSentUntilHeartbeatSuccess(): Unit = {
+  def testAlwaysSendsAccumulatedOfflineDirs(): Unit = {
     val ctx = new RegistrationTestContext(configProperties)
     val manager = new BrokerLifecycleManager(ctx.config, ctx.time, 
"offline-dirs-sent-in-heartbeat-", isZkBroker = false)
     val controllerNode = new Node(3000, "localhost", 8021)
     ctx.controllerNodeProvider.node.set(controllerNode)
 
     val registration = prepareResponse(ctx, new BrokerRegistrationResponse(new 
BrokerRegistrationResponseData().setBrokerEpoch(1000)))
-    val hb1 = prepareResponse[BrokerHeartbeatRequest](ctx, new 
BrokerHeartbeatResponse(new BrokerHeartbeatResponseData()
-      .setErrorCode(Errors.NOT_CONTROLLER.code())))
-    val hb2 = prepareResponse[BrokerHeartbeatRequest](ctx, new 
BrokerHeartbeatResponse(new BrokerHeartbeatResponseData()))
-    val hb3 = prepareResponse[BrokerHeartbeatRequest](ctx, new 
BrokerHeartbeatResponse(new BrokerHeartbeatResponseData()))
+    val heartbeats = Seq.fill(6)(prepareResponse[BrokerHeartbeatRequest](ctx, 
new BrokerHeartbeatResponse(new BrokerHeartbeatResponseData())))
 
-    val offlineDirs = Set(Uuid.fromString("h3sC4Yk-Q9-fd0ntJTocCA"), 
Uuid.fromString("ej8Q9_d2Ri6FXNiTxKFiow"))
-    offlineDirs.foreach(manager.propagateDirectoryFailure)
-
-    // start the manager late to prevent a race, and force expectations on the 
first heartbeat
     manager.start(() => ctx.highestMetadataOffset.get(),
       ctx.mockChannelManager, ctx.clusterId, ctx.advertisedListeners,
       Collections.emptyMap(), OptionalLong.empty())
-
     poll(ctx, manager, registration)
-    val dirs1 = poll(ctx, manager, hb1).data().offlineLogDirs()
-    val dirs2 = poll(ctx, manager, hb2).data().offlineLogDirs()
-    val dirs3 = poll(ctx, manager, hb3).data().offlineLogDirs()
 
-    assertEquals(offlineDirs, dirs1.asScala.toSet)
-    assertEquals(offlineDirs, dirs2.asScala.toSet)
-    assertEquals(Set.empty, dirs3.asScala.toSet)
+    
manager.propagateDirectoryFailure(Uuid.fromString("h3sC4Yk-Q9-fd0ntJTocCA"))
+    poll(ctx, manager, heartbeats(0)).data()
+    val dirs1 = poll(ctx, manager, heartbeats(1)).data().offlineLogDirs()
+
+    
manager.propagateDirectoryFailure(Uuid.fromString("ej8Q9_d2Ri6FXNiTxKFiow"))
+    poll(ctx, manager, heartbeats(2)).data()
+    val dirs2 = poll(ctx, manager, heartbeats(3)).data().offlineLogDirs()
+
+    
manager.propagateDirectoryFailure(Uuid.fromString("1iF76HVNRPqC7Y4r6647eg"))
+    poll(ctx, manager, heartbeats(4)).data()
+    val dirs3 = poll(ctx, manager, heartbeats(5)).data().offlineLogDirs()

Review Comment:
   @soarez Can we please verify the flakiness in the test. I have added 
@RepeatedTest(100) locally for this test and can always get failures among 100 
runs. Listing out 2 failures below
   
   ```
   java.util.ConcurrentModificationException
        at java.base/java.util.ArrayDeque.nonNullElementAt(ArrayDeque.java:270)
        at java.base/java.util.ArrayDeque$DeqIterator.next(ArrayDeque.java:700)
        at 
kafka.server.MockNodeToControllerChannelManager.poll(MockNodeToControllerChannelManager.scala:73)
        at 
kafka.server.RegistrationTestContext.poll(RegistrationTestContext.scala:76)
        at 
kafka.server.BrokerLifecycleManagerTest.poll(BrokerLifecycleManagerTest.scala:202)
        at 
kafka.server.BrokerLifecycleManagerTest.testAlwaysSendsAccumulatedOfflineDirs(BrokerLifecycleManagerTest.scala:230)
        at java.base/java.lang.reflect.Method.invoke(Method.java:568)
        at 
java.base/java.util.stream.ForEachOps$ForEachOp$OfRef.accept(ForEachOps.java:183)
        at 
java.base/java.util.stream.ReferencePipeline$3$1.accept(ReferencePipeline.java:197)
        at 
java.base/java.util.stream.ReferencePipeline$2$1.accept(ReferencePipeline.java:179)
        at 
java.base/java.util.stream.ReferencePipeline$3$1.accept(ReferencePipeline.java:197)
        at 
java.base/java.util.stream.ForEachOps$ForEachOp$OfRef.accept(ForEachOps.java:183)
        at 
java.base/java.util.stream.ReferencePipeline$3$1.accept(ReferencePipeline.java:197)
        at 
java.base/java.util.stream.IntPipeline$1$1.accept(IntPipeline.java:180)
        at 
java.base/java.util.stream.Streams$RangeIntSpliterator.forEachRemaining(Streams.java:104)
        at 
java.base/java.util.Spliterator$OfInt.forEachRemaining(Spliterator.java:711)
        at 
java.base/java.util.stream.AbstractPipeline.copyInto(AbstractPipeline.java:509)
        at 
java.base/java.util.stream.AbstractPipeline.wrapAndCopyInto(AbstractPipeline.java:499)
        at 
java.base/java.util.stream.ForEachOps$ForEachOp.evaluateSequential(ForEachOps.java:150)
        at 
java.base/java.util.stream.ForEachOps$ForEachOp$OfRef.evaluateSequential(ForEachOps.java:173)
        at 
java.base/java.util.stream.AbstractPipeline.evaluate(AbstractPipeline.java:234)
        at 
java.base/java.util.stream.ReferencePipeline.forEach(ReferencePipeline.java:596)
        at 
java.base/java.util.stream.ReferencePipeline$7$1.accept(ReferencePipeline.java:276)
   ```
   
   ```
   rg.opentest4j.AssertionFailedError: 
   Expected :Set(h3sC4Yk-Q9-fd0ntJTocCA, ej8Q9_d2Ri6FXNiTxKFiow, 
1iF76HVNRPqC7Y4r6647eg)
   Actual   :Set(h3sC4Yk-Q9-fd0ntJTocCA, ej8Q9_d2Ri6FXNiTxKFiow)
   <Click to see difference>
   
   
        at 
org.junit.jupiter.api.AssertionFailureBuilder.build(AssertionFailureBuilder.java:151)
        at 
org.junit.jupiter.api.AssertionFailureBuilder.buildAndThrow(AssertionFailureBuilder.java:132)
        at 
org.junit.jupiter.api.AssertEquals.failNotEqual(AssertEquals.java:197)
        at 
org.junit.jupiter.api.AssertEquals.assertEquals(AssertEquals.java:182)
        at 
org.junit.jupiter.api.AssertEquals.assertEquals(AssertEquals.java:177)
        at org.junit.jupiter.api.Assertions.assertEquals(Assertions.java:1141)
        at 
kafka.server.BrokerLifecycleManagerTest.testAlwaysSendsAccumulatedOfflineDirs(BrokerLifecycleManagerTest.scala:238)
        at java.base/java.lang.reflect.Method.invoke(Method.java:568)
        at 
java.base/java.util.stream.ForEachOps$ForEachOp$OfRef.accept(ForEachOps.java:183)
        at 
java.base/java.util.stream.ReferencePipeline$3$1.accept(ReferencePipeline.java:197)
        at 
java.base/java.util.stream.ReferencePipeline$2$1.accept(ReferencePipeline.java:179)
        at 
java.base/java.util.stream.ReferencePipeline$3$1.accept(ReferencePipeline.java:197)
        at 
java.base/java.util.stream.ForEachOps$ForEachOp$OfRef.accept(ForEachOps.java:183)
        at 
java.base/java.util.stream.ReferencePipeline$3$1.accept(ReferencePipeline.java:197)
        at 
java.base/java.util.stream.IntPipeline$1$1.accept(IntPipeline.java:180)
        at 
java.base/java.util.stream.Streams$RangeIntSpliterator.forEachRemaining(Streams.java:104)
   ```
   
   
   cc: @cmccabe @junrao (Saw the flakiness in the run at PR build: 
https://github.com/apache/kafka/pull/14699, 
https://ci-builds.apache.org/blue/organizations/jenkins/Kafka%2Fkafka-pr/detail/PR-14699/17/tests/)



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org

Reply via email to