[
https://issues.apache.org/jira/browse/KAFKA-4198?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15873134#comment-15873134
]
ASF GitHub Bot commented on KAFKA-4198:
---------------------------------------
GitHub user original-brownbear opened a pull request:
https://github.com/apache/kafka/pull/2568
Kafka 4198: Fix Race Condition in KafkaServer Shutdown
Fixes the initially reported issue in
https://issues.apache.org/jira/browse/KAFKA-4198.
The relevant part in fixing the initial issue here is the change to
`kafka.server.KafkaServer#shutdown`.
It contained this step:
```java
val canShutdown = isShuttingDown.compareAndSet(false, true)
if (canShutdown && shutdownLatch.getCount > 0) {
```
without any fallback for the case of `shutdownLatch.getCount == 0`. So in
the case of `shutdownLatch.getCount == 0` (when a previous call to the
shutdown method was right about to finish) you would set `isShuttingDown` to
true again without any possibility of ever getting the server started (since
`startup` will check `isShuttingDown` before setting up a new latch with count
1).
Long story short: concurrent calls to shutdown can get the server locked in
a broken state.
This fixes the reported error:
```sh
java.lang.IllegalStateException: Kafka server is still shutting down,
cannot re-start!
at kafka.server.KafkaServer.startup(KafkaServer.scala:184)
at
kafka.integration.KafkaServerTestHarness$$anonfun$restartDeadBrokers$2.apply$mcVI$sp(KafkaServerTestHarness.scala:117)
at
kafka.integration.KafkaServerTestHarness$$anonfun$restartDeadBrokers$2.apply(KafkaServerTestHarness.scala:116)
at
kafka.integration.KafkaServerTestHarness$$anonfun$restartDeadBrokers$2.apply(KafkaServerTestHarness.scala:116)
at
scala.collection.TraversableLike$WithFilter$$anonfun$foreach$1.apply(TraversableLike.scala:733)
at scala.collection.immutable.Range.foreach(Range.scala:160)
at
scala.collection.TraversableLike$WithFilter.foreach(TraversableLike.scala:732)
at
kafka.integration.KafkaServerTestHarness$class.restartDeadBrokers(KafkaServerTestHarness.scala:116)
at
kafka.api.ConsumerBounceTest.restartDeadBrokers(ConsumerBounceTest.scala:34)
at
kafka.api.ConsumerBounceTest$BounceBrokerScheduler.doWork(ConsumerBounceTest.scala:158)
```
That said this error (reported in a comment to the JIRA) is still left
even with this fix:
```sh
kafka.api.ConsumerBounceTest > testConsumptionWithBrokerFailures FAILED
java.lang.IllegalArgumentException: You can only check the position for
partitions assigned to this consumer.
at
org.apache.kafka.clients.consumer.KafkaConsumer.position(KafkaConsumer.java:1271)
at
kafka.api.ConsumerBounceTest.consumeWithBrokerFailures(ConsumerBounceTest.scala:96)
at
kafka.api.ConsumerBounceTest.testConsumptionWithBrokerFailures(ConsumerBounceTest.scala:69)
```
... I think this one should get a separate JIRA though. It seems to me that
the behaviour of the call to `partition` when a Broker just died is a separate
issue from the one initially reported.
You can merge this pull request into a Git repository by running:
$ git pull https://github.com/original-brownbear/kafka KAFKA-4198
Alternatively you can review and apply these changes as the patch at:
https://github.com/apache/kafka/pull/2568.patch
To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:
This closes #2568
----
commit 08460a669b4c737c20793129b01c7a3676452dac
Author: Armin Braun <[email protected]>
Date: 2017-02-18T08:11:10Z
KAFKA-4198: Cleaner ExecutorService Handling
commit 128db5ea5afeb940f108e95c9bcaad84c9889b10
Author: Armin Braun <[email protected]>
Date: 2017-02-18T09:12:15Z
KAFKA-4198: Ensure Fresh MetaData
commit 847d001e17c2baaba18936cc5c497756154c931a
Author: Armin Braun <[email protected]>
Date: 2017-02-18T10:27:37Z
KAFKA-4198: Revert Test Change
commit 005ff8f4a180a7c2c45313accff6627e90e9983a
Author: Armin Braun <[email protected]>
Date: 2017-02-18T11:39:47Z
KAFKA-4198: Fix RunCondition in KafkaServer#shutdown
commit 9559ad387bba6d24a0ba5f244aae5f6d32a897f1
Author: Armin Braun <[email protected]>
Date: 2017-02-18T11:41:00Z
KAFKA-4198: Revert Experimental Change to KafkaConsumer
commit d2f138c9f01800219fcd02a625a9f89b9315fd73
Author: Armin Braun <[email protected]>
Date: 2017-02-18T12:04:14Z
KAFKA-4198: Revert Experimental Change to KafkaServerTestHarness
commit 8cfb45240eda64cc358303c2533aef6c50f69225
Author: Armin Braun <[email protected]>
Date: 2017-02-18T12:06:28Z
KAFKA-4198: Revert Experimental Change to ConsumerBounceTest
----
> Transient test failure: ConsumerBounceTest.testConsumptionWithBrokerFailures
> ----------------------------------------------------------------------------
>
> Key: KAFKA-4198
> URL: https://issues.apache.org/jira/browse/KAFKA-4198
> Project: Kafka
> Issue Type: Sub-task
> Reporter: Ismael Juma
> Labels: transient-unit-test-failure
>
> The issue seems to be that we call `startup` while `shutdown` is still taking
> place.
> {code}
> java.lang.AssertionError: expected:<107> but was:<0>
> at org.junit.Assert.fail(Assert.java:88)
> at org.junit.Assert.failNotEquals(Assert.java:834)
> at org.junit.Assert.assertEquals(Assert.java:645)
> at org.junit.Assert.assertEquals(Assert.java:631)
> at
> kafka.api.ConsumerBounceTest$$anonfun$consumeWithBrokerFailures$2.apply(ConsumerBounceTest.scala:91)
> at
> kafka.api.ConsumerBounceTest$$anonfun$consumeWithBrokerFailures$2.apply(ConsumerBounceTest.scala:90)
> at scala.collection.Iterator$class.foreach(Iterator.scala:893)
> at scala.collection.AbstractIterator.foreach(Iterator.scala:1336)
> at scala.collection.IterableLike$class.foreach(IterableLike.scala:72)
> at scala.collection.AbstractIterable.foreach(Iterable.scala:54)
> at
> kafka.api.ConsumerBounceTest.consumeWithBrokerFailures(ConsumerBounceTest.scala:90)
> at
> kafka.api.ConsumerBounceTest.testConsumptionWithBrokerFailures(ConsumerBounceTest.scala:70)
> {code}
> {code}
> java.lang.IllegalStateException: Kafka server is still shutting down, cannot
> re-start!
> at kafka.server.KafkaServer.startup(KafkaServer.scala:184)
> at
> kafka.integration.KafkaServerTestHarness$$anonfun$restartDeadBrokers$2.apply$mcVI$sp(KafkaServerTestHarness.scala:117)
> at
> kafka.integration.KafkaServerTestHarness$$anonfun$restartDeadBrokers$2.apply(KafkaServerTestHarness.scala:116)
> at
> kafka.integration.KafkaServerTestHarness$$anonfun$restartDeadBrokers$2.apply(KafkaServerTestHarness.scala:116)
> at
> scala.collection.TraversableLike$WithFilter$$anonfun$foreach$1.apply(TraversableLike.scala:733)
> at scala.collection.immutable.Range.foreach(Range.scala:160)
> at
> scala.collection.TraversableLike$WithFilter.foreach(TraversableLike.scala:732)
> at
> kafka.integration.KafkaServerTestHarness$class.restartDeadBrokers(KafkaServerTestHarness.scala:116)
> at
> kafka.api.ConsumerBounceTest.restartDeadBrokers(ConsumerBounceTest.scala:34)
> at
> kafka.api.ConsumerBounceTest$BounceBrokerScheduler.doWork(ConsumerBounceTest.scala:158)
> {code}
--
This message was sent by Atlassian JIRA
(v6.3.15#6346)