[ https://issues.apache.org/jira/browse/KAFKA-4198?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15873134#comment-15873134 ]
ASF GitHub Bot commented on KAFKA-4198: --------------------------------------- GitHub user original-brownbear opened a pull request: https://github.com/apache/kafka/pull/2568 Kafka 4198: Fix Race Condition in KafkaServer Shutdown Fixes the initially reported issue in https://issues.apache.org/jira/browse/KAFKA-4198. The relevant part in fixing the initial issue here is the change to `kafka.server.KafkaServer#shutdown`. It contained this step: ```java val canShutdown = isShuttingDown.compareAndSet(false, true) if (canShutdown && shutdownLatch.getCount > 0) { ``` without any fallback for the case of `shutdownLatch.getCount == 0`. So in the case of `shutdownLatch.getCount == 0` (when a previous call to the shutdown method was right about to finish) you would set `isShuttingDown` to true again without any possibility of ever getting the server started (since `startup` will check `isShuttingDown` before setting up a new latch with count 1). Long story short: concurrent calls to shutdown can get the server locked in a broken state. This fixes the reported error: ```sh java.lang.IllegalStateException: Kafka server is still shutting down, cannot re-start! at kafka.server.KafkaServer.startup(KafkaServer.scala:184) at kafka.integration.KafkaServerTestHarness$$anonfun$restartDeadBrokers$2.apply$mcVI$sp(KafkaServerTestHarness.scala:117) at kafka.integration.KafkaServerTestHarness$$anonfun$restartDeadBrokers$2.apply(KafkaServerTestHarness.scala:116) at kafka.integration.KafkaServerTestHarness$$anonfun$restartDeadBrokers$2.apply(KafkaServerTestHarness.scala:116) at scala.collection.TraversableLike$WithFilter$$anonfun$foreach$1.apply(TraversableLike.scala:733) at scala.collection.immutable.Range.foreach(Range.scala:160) at scala.collection.TraversableLike$WithFilter.foreach(TraversableLike.scala:732) at kafka.integration.KafkaServerTestHarness$class.restartDeadBrokers(KafkaServerTestHarness.scala:116) at kafka.api.ConsumerBounceTest.restartDeadBrokers(ConsumerBounceTest.scala:34) at kafka.api.ConsumerBounceTest$BounceBrokerScheduler.doWork(ConsumerBounceTest.scala:158) ``` That said this error (reported in a comment to the JIRA) is still left even with this fix: ```sh kafka.api.ConsumerBounceTest > testConsumptionWithBrokerFailures FAILED java.lang.IllegalArgumentException: You can only check the position for partitions assigned to this consumer. at org.apache.kafka.clients.consumer.KafkaConsumer.position(KafkaConsumer.java:1271) at kafka.api.ConsumerBounceTest.consumeWithBrokerFailures(ConsumerBounceTest.scala:96) at kafka.api.ConsumerBounceTest.testConsumptionWithBrokerFailures(ConsumerBounceTest.scala:69) ``` ... I think this one should get a separate JIRA though. It seems to me that the behaviour of the call to `partition` when a Broker just died is a separate issue from the one initially reported. You can merge this pull request into a Git repository by running: $ git pull https://github.com/original-brownbear/kafka KAFKA-4198 Alternatively you can review and apply these changes as the patch at: https://github.com/apache/kafka/pull/2568.patch To close this pull request, make a commit to your master/trunk branch with (at least) the following in the commit message: This closes #2568 ---- commit 08460a669b4c737c20793129b01c7a3676452dac Author: Armin Braun <m...@obrown.io> Date: 2017-02-18T08:11:10Z KAFKA-4198: Cleaner ExecutorService Handling commit 128db5ea5afeb940f108e95c9bcaad84c9889b10 Author: Armin Braun <m...@obrown.io> Date: 2017-02-18T09:12:15Z KAFKA-4198: Ensure Fresh MetaData commit 847d001e17c2baaba18936cc5c497756154c931a Author: Armin Braun <m...@obrown.io> Date: 2017-02-18T10:27:37Z KAFKA-4198: Revert Test Change commit 005ff8f4a180a7c2c45313accff6627e90e9983a Author: Armin Braun <m...@obrown.io> Date: 2017-02-18T11:39:47Z KAFKA-4198: Fix RunCondition in KafkaServer#shutdown commit 9559ad387bba6d24a0ba5f244aae5f6d32a897f1 Author: Armin Braun <m...@obrown.io> Date: 2017-02-18T11:41:00Z KAFKA-4198: Revert Experimental Change to KafkaConsumer commit d2f138c9f01800219fcd02a625a9f89b9315fd73 Author: Armin Braun <m...@obrown.io> Date: 2017-02-18T12:04:14Z KAFKA-4198: Revert Experimental Change to KafkaServerTestHarness commit 8cfb45240eda64cc358303c2533aef6c50f69225 Author: Armin Braun <m...@obrown.io> Date: 2017-02-18T12:06:28Z KAFKA-4198: Revert Experimental Change to ConsumerBounceTest ---- > Transient test failure: ConsumerBounceTest.testConsumptionWithBrokerFailures > ---------------------------------------------------------------------------- > > Key: KAFKA-4198 > URL: https://issues.apache.org/jira/browse/KAFKA-4198 > Project: Kafka > Issue Type: Sub-task > Reporter: Ismael Juma > Labels: transient-unit-test-failure > > The issue seems to be that we call `startup` while `shutdown` is still taking > place. > {code} > java.lang.AssertionError: expected:<107> but was:<0> > at org.junit.Assert.fail(Assert.java:88) > at org.junit.Assert.failNotEquals(Assert.java:834) > at org.junit.Assert.assertEquals(Assert.java:645) > at org.junit.Assert.assertEquals(Assert.java:631) > at > kafka.api.ConsumerBounceTest$$anonfun$consumeWithBrokerFailures$2.apply(ConsumerBounceTest.scala:91) > at > kafka.api.ConsumerBounceTest$$anonfun$consumeWithBrokerFailures$2.apply(ConsumerBounceTest.scala:90) > at scala.collection.Iterator$class.foreach(Iterator.scala:893) > at scala.collection.AbstractIterator.foreach(Iterator.scala:1336) > at scala.collection.IterableLike$class.foreach(IterableLike.scala:72) > at scala.collection.AbstractIterable.foreach(Iterable.scala:54) > at > kafka.api.ConsumerBounceTest.consumeWithBrokerFailures(ConsumerBounceTest.scala:90) > at > kafka.api.ConsumerBounceTest.testConsumptionWithBrokerFailures(ConsumerBounceTest.scala:70) > {code} > {code} > java.lang.IllegalStateException: Kafka server is still shutting down, cannot > re-start! > at kafka.server.KafkaServer.startup(KafkaServer.scala:184) > at > kafka.integration.KafkaServerTestHarness$$anonfun$restartDeadBrokers$2.apply$mcVI$sp(KafkaServerTestHarness.scala:117) > at > kafka.integration.KafkaServerTestHarness$$anonfun$restartDeadBrokers$2.apply(KafkaServerTestHarness.scala:116) > at > kafka.integration.KafkaServerTestHarness$$anonfun$restartDeadBrokers$2.apply(KafkaServerTestHarness.scala:116) > at > scala.collection.TraversableLike$WithFilter$$anonfun$foreach$1.apply(TraversableLike.scala:733) > at scala.collection.immutable.Range.foreach(Range.scala:160) > at > scala.collection.TraversableLike$WithFilter.foreach(TraversableLike.scala:732) > at > kafka.integration.KafkaServerTestHarness$class.restartDeadBrokers(KafkaServerTestHarness.scala:116) > at > kafka.api.ConsumerBounceTest.restartDeadBrokers(ConsumerBounceTest.scala:34) > at > kafka.api.ConsumerBounceTest$BounceBrokerScheduler.doWork(ConsumerBounceTest.scala:158) > {code} -- This message was sent by Atlassian JIRA (v6.3.15#6346)