[ 
https://issues.apache.org/jira/browse/KAFKA-4198?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15873134#comment-15873134
 ] 

ASF GitHub Bot commented on KAFKA-4198:
---------------------------------------

GitHub user original-brownbear opened a pull request:

    https://github.com/apache/kafka/pull/2568

    Kafka 4198: Fix Race Condition in KafkaServer Shutdown

    Fixes the initially reported issue in 
https://issues.apache.org/jira/browse/KAFKA-4198.
    
    The relevant part in fixing the initial issue here is the change to 
`kafka.server.KafkaServer#shutdown`.
    
    It contained this step:
    
    ```java
          val canShutdown = isShuttingDown.compareAndSet(false, true)
          if (canShutdown && shutdownLatch.getCount > 0) {
    ```
    
    without any fallback for the case of `shutdownLatch.getCount == 0`. So in 
the case of `shutdownLatch.getCount == 0`  (when a previous call to the 
shutdown method was right about to finish) you would set `isShuttingDown` to 
true again without any possibility of ever getting the server started (since 
`startup` will check `isShuttingDown` before setting up a new latch with count 
1).
    
    Long story short: concurrent calls to shutdown can get the server locked in 
a broken state.
    
    This fixes the reported error:
    
    ```sh
    java.lang.IllegalStateException: Kafka server is still shutting down, 
cannot re-start!
        at kafka.server.KafkaServer.startup(KafkaServer.scala:184)
        at 
kafka.integration.KafkaServerTestHarness$$anonfun$restartDeadBrokers$2.apply$mcVI$sp(KafkaServerTestHarness.scala:117)
        at 
kafka.integration.KafkaServerTestHarness$$anonfun$restartDeadBrokers$2.apply(KafkaServerTestHarness.scala:116)
        at 
kafka.integration.KafkaServerTestHarness$$anonfun$restartDeadBrokers$2.apply(KafkaServerTestHarness.scala:116)
        at 
scala.collection.TraversableLike$WithFilter$$anonfun$foreach$1.apply(TraversableLike.scala:733)
        at scala.collection.immutable.Range.foreach(Range.scala:160)
        at 
scala.collection.TraversableLike$WithFilter.foreach(TraversableLike.scala:732)
        at 
kafka.integration.KafkaServerTestHarness$class.restartDeadBrokers(KafkaServerTestHarness.scala:116)
        at 
kafka.api.ConsumerBounceTest.restartDeadBrokers(ConsumerBounceTest.scala:34)
        at 
kafka.api.ConsumerBounceTest$BounceBrokerScheduler.doWork(ConsumerBounceTest.scala:158)
    ```
    
    That said this error (reported in a comment to the JIRA)  is still left 
even with this fix:
    
    ```sh
    kafka.api.ConsumerBounceTest > testConsumptionWithBrokerFailures FAILED
        java.lang.IllegalArgumentException: You can only check the position for 
partitions assigned to this consumer.
            at 
org.apache.kafka.clients.consumer.KafkaConsumer.position(KafkaConsumer.java:1271)
            at 
kafka.api.ConsumerBounceTest.consumeWithBrokerFailures(ConsumerBounceTest.scala:96)
            at 
kafka.api.ConsumerBounceTest.testConsumptionWithBrokerFailures(ConsumerBounceTest.scala:69)
    ```
    
    ... I think this one should get a separate JIRA though. It seems to me that 
the behaviour of the call to `partition` when a Broker just died is a separate 
issue from the one initially reported.

You can merge this pull request into a Git repository by running:

    $ git pull https://github.com/original-brownbear/kafka KAFKA-4198

Alternatively you can review and apply these changes as the patch at:

    https://github.com/apache/kafka/pull/2568.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

    This closes #2568
    
----
commit 08460a669b4c737c20793129b01c7a3676452dac
Author: Armin Braun <m...@obrown.io>
Date:   2017-02-18T08:11:10Z

    KAFKA-4198: Cleaner ExecutorService Handling

commit 128db5ea5afeb940f108e95c9bcaad84c9889b10
Author: Armin Braun <m...@obrown.io>
Date:   2017-02-18T09:12:15Z

    KAFKA-4198: Ensure Fresh MetaData

commit 847d001e17c2baaba18936cc5c497756154c931a
Author: Armin Braun <m...@obrown.io>
Date:   2017-02-18T10:27:37Z

    KAFKA-4198: Revert Test Change

commit 005ff8f4a180a7c2c45313accff6627e90e9983a
Author: Armin Braun <m...@obrown.io>
Date:   2017-02-18T11:39:47Z

    KAFKA-4198: Fix RunCondition in KafkaServer#shutdown

commit 9559ad387bba6d24a0ba5f244aae5f6d32a897f1
Author: Armin Braun <m...@obrown.io>
Date:   2017-02-18T11:41:00Z

    KAFKA-4198: Revert Experimental Change to KafkaConsumer

commit d2f138c9f01800219fcd02a625a9f89b9315fd73
Author: Armin Braun <m...@obrown.io>
Date:   2017-02-18T12:04:14Z

    KAFKA-4198: Revert Experimental Change to KafkaServerTestHarness

commit 8cfb45240eda64cc358303c2533aef6c50f69225
Author: Armin Braun <m...@obrown.io>
Date:   2017-02-18T12:06:28Z

    KAFKA-4198: Revert Experimental Change to ConsumerBounceTest

----


> Transient test failure: ConsumerBounceTest.testConsumptionWithBrokerFailures
> ----------------------------------------------------------------------------
>
>                 Key: KAFKA-4198
>                 URL: https://issues.apache.org/jira/browse/KAFKA-4198
>             Project: Kafka
>          Issue Type: Sub-task
>            Reporter: Ismael Juma
>              Labels: transient-unit-test-failure
>
> The issue seems to be that we call `startup` while `shutdown` is still taking 
> place.
> {code}
> java.lang.AssertionError: expected:<107> but was:<0>
>       at org.junit.Assert.fail(Assert.java:88)
>       at org.junit.Assert.failNotEquals(Assert.java:834)
>       at org.junit.Assert.assertEquals(Assert.java:645)
>       at org.junit.Assert.assertEquals(Assert.java:631)
>       at 
> kafka.api.ConsumerBounceTest$$anonfun$consumeWithBrokerFailures$2.apply(ConsumerBounceTest.scala:91)
>       at 
> kafka.api.ConsumerBounceTest$$anonfun$consumeWithBrokerFailures$2.apply(ConsumerBounceTest.scala:90)
>       at scala.collection.Iterator$class.foreach(Iterator.scala:893)
>       at scala.collection.AbstractIterator.foreach(Iterator.scala:1336)
>       at scala.collection.IterableLike$class.foreach(IterableLike.scala:72)
>       at scala.collection.AbstractIterable.foreach(Iterable.scala:54)
>       at 
> kafka.api.ConsumerBounceTest.consumeWithBrokerFailures(ConsumerBounceTest.scala:90)
>       at 
> kafka.api.ConsumerBounceTest.testConsumptionWithBrokerFailures(ConsumerBounceTest.scala:70)
> {code}
> {code}
> java.lang.IllegalStateException: Kafka server is still shutting down, cannot 
> re-start!
>       at kafka.server.KafkaServer.startup(KafkaServer.scala:184)
>       at 
> kafka.integration.KafkaServerTestHarness$$anonfun$restartDeadBrokers$2.apply$mcVI$sp(KafkaServerTestHarness.scala:117)
>       at 
> kafka.integration.KafkaServerTestHarness$$anonfun$restartDeadBrokers$2.apply(KafkaServerTestHarness.scala:116)
>       at 
> kafka.integration.KafkaServerTestHarness$$anonfun$restartDeadBrokers$2.apply(KafkaServerTestHarness.scala:116)
>       at 
> scala.collection.TraversableLike$WithFilter$$anonfun$foreach$1.apply(TraversableLike.scala:733)
>       at scala.collection.immutable.Range.foreach(Range.scala:160)
>       at 
> scala.collection.TraversableLike$WithFilter.foreach(TraversableLike.scala:732)
>       at 
> kafka.integration.KafkaServerTestHarness$class.restartDeadBrokers(KafkaServerTestHarness.scala:116)
>       at 
> kafka.api.ConsumerBounceTest.restartDeadBrokers(ConsumerBounceTest.scala:34)
>       at 
> kafka.api.ConsumerBounceTest$BounceBrokerScheduler.doWork(ConsumerBounceTest.scala:158)
> {code}



--
This message was sent by Atlassian JIRA
(v6.3.15#6346)

Reply via email to