[
https://issues.apache.org/jira/browse/KAFKA-2837?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15048704#comment-15048704
]
ASF GitHub Bot commented on KAFKA-2837:
---------------------------------------
GitHub user ZoneMayor opened a pull request:
https://github.com/apache/kafka/pull/648
KAFKA-2837: fix transient failure of kafka.api.ProducerBounceTest >
testBrokerFailure
I can reproduced this transient failure, it seldom happen;
code is like below:
// rolling bounce brokers
for (i <- 0 until numServers) {
for (server <- servers) {
server.shutdown()
server.awaitShutdown()
server.startup()
Thread.sleep(2000)
}
// Make sure the producer do not see any exception
// in returned metadata due to broker failures
assertTrue(scheduler.failed == false)
// Make sure the leader still exists after bouncing brokers
(0 until numPartitions).foreach(partition =>
TestUtils.waitUntilLeaderIsElectedOrChanged(zkUtils, topic1, partition))
Brokers keep rolling restart, and producer keep sending messages;
In every loop, it will wait for election of partition leader;
But if the election is slow, more messages will be buffered in
RecordAccumulator's BufferPool;
The limit for buffer is set to be 30000;
TimeoutException("Failed to allocate memory within the configured max
blocking time") will show up when out of memory;
Since for every restart of the broker, it will sleep for 2000 ms, so this
transient failure seldom happen;
But if I reduce the sleeping period, the bigger chance failure happens;
for example if the broker with role of controller suffered a restart, it
will take time to select controller first, then select leader, which will lead
to more messges blocked in KafkaProducer:RecordAccumulator:BufferPool;
In this fix, I just enlarge the producer's buffer size to be 1MB;
@guozhangwang , Could you give some comments?
You can merge this pull request into a Git repository by running:
$ git pull https://github.com/ZoneMayor/kafka trunk-KAFKA-2837
Alternatively you can review and apply these changes as the patch at:
https://github.com/apache/kafka/pull/648.patch
To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:
This closes #648
----
commit 95374147a28208d4850f6e73f714bf418935fc2d
Author: ZoneMayor <[email protected]>
Date: 2015-11-27T03:49:34Z
Merge pull request #1 from apache/trunk
merge
commit cec5b48b651a7efd3900cfa3c1fd0ab1eeeaa3ec
Author: ZoneMayor <[email protected]>
Date: 2015-12-01T10:44:02Z
Merge pull request #2 from apache/trunk
2015-12-1
commit a119d547bf1741625ce0627073c7909992a20f15
Author: ZoneMayor <[email protected]>
Date: 2015-12-04T13:42:27Z
Merge pull request #3 from apache/trunk
2015-12-04#KAFKA-2893
commit b767a8dff85fc71c75d4cf5178c3f6f03ff81bfc
Author: ZoneMayor <[email protected]>
Date: 2015-12-09T10:42:30Z
Merge pull request #5 from apache/trunk
2015-12-9
commit cd5e6f4700a4387f9383b84aca0ee9c4639b1033
Author: jinxing <[email protected]>
Date: 2015-12-09T13:49:07Z
KAFKA-2837: fix transient failure kafka.api.ProducerBounceTest >
testBrokerFailure
----
> FAILING TEST: kafka.api.ProducerBounceTest > testBrokerFailure
> ---------------------------------------------------------------
>
> Key: KAFKA-2837
> URL: https://issues.apache.org/jira/browse/KAFKA-2837
> Project: Kafka
> Issue Type: Sub-task
> Reporter: Gwen Shapira
> Labels: newbie
>
> {code}
> java.lang.AssertionError
> at org.junit.Assert.fail(Assert.java:86)
> at org.junit.Assert.assertTrue(Assert.java:41)
> at org.junit.Assert.assertTrue(Assert.java:52)
> at
> kafka.api.ProducerBounceTest.testBrokerFailure(ProducerBounceTest.scala:117)
> at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
> at
> sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:57)
> at
> sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
> at java.lang.reflect.Method.invoke(Method.java:606)
> at
> org.junit.runners.model.FrameworkMethod$1.runReflectiveCall(FrameworkMethod.java:50)
> at
> org.junit.internal.runners.model.ReflectiveCallable.run(ReflectiveCallable.java:12)
> at
> org.junit.runners.model.FrameworkMethod.invokeExplosively(FrameworkMethod.java:47)
> at
> org.junit.internal.runners.statements.InvokeMethod.evaluate(InvokeMethod.java:17)
> at
> org.junit.internal.runners.statements.RunBefores.evaluate(RunBefores.java:26)
> at
> org.junit.internal.runners.statements.RunAfters.evaluate(RunAfters.java:27)
> at org.junit.runners.ParentRunner.runLeaf(ParentRunner.java:325)
> at
> org.junit.runners.BlockJUnit4ClassRunner.runChild(BlockJUnit4ClassRunner.java:78)
> at
> org.junit.runners.BlockJUnit4ClassRunner.runChild(BlockJUnit4ClassRunner.java:57)
> at org.junit.runners.ParentRunner$3.run(ParentRunner.java:290)
> at org.junit.runners.ParentRunner$1.schedule(ParentRunner.java:71)
> at org.junit.runners.ParentRunner.runChildren(ParentRunner.java:288)
> at org.junit.runners.ParentRunner.access$000(ParentRunner.java:58)
> at org.junit.runners.ParentRunner$2.evaluate(ParentRunner.java:268)
> at org.junit.runners.ParentRunner.run(ParentRunner.java:363)
> at
> org.gradle.api.internal.tasks.testing.junit.JUnitTestClassExecuter.runTestClass(JUnitTestClassExecuter.java:105)
> at
> org.gradle.api.internal.tasks.testing.junit.JUnitTestClassExecuter.execute(JUnitTestClassExecuter.java:56)
> at
> org.gradle.api.internal.tasks.testing.junit.JUnitTestClassProcessor.processTestClass(JUnitTestClassProcessor.java:64)
> at
> org.gradle.api.internal.tasks.testing.SuiteTestClassProcessor.processTestClass(SuiteTestClassProcessor.java:50)
> at sun.reflect.GeneratedMethodAccessor12.invoke(Unknown Source)
> at
> sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
> at java.lang.reflect.Method.invoke(Method.java:606)
> at
> org.gradle.messaging.dispatch.ReflectionDispatch.dispatch(ReflectionDispatch.java:35)
> at
> org.gradle.messaging.dispatch.ReflectionDispatch.dispatch(ReflectionDispatch.java:24)
> at
> org.gradle.messaging.dispatch.ContextClassLoaderDispatch.dispatch(ContextClassLoaderDispatch.java:32)
> at
> org.gradle.messaging.dispatch.ProxyDispatchAdapter$DispatchingInvocationHandler.invoke(ProxyDispatchAdapter.java:93)
> at com.sun.proxy.$Proxy2.processTestClass(Unknown Source)
> at
> org.gradle.api.internal.tasks.testing.worker.TestWorker.processTestClass(TestWorker.java:106)
> at sun.reflect.GeneratedMethodAccessor11.invoke(Unknown Source)
> at
> sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
> at java.lang.reflect.Method.invoke(Method.java:606)
> at
> org.gradle.messaging.dispatch.ReflectionDispatch.dispatch(ReflectionDispatch.java:35)
> at
> org.gradle.messaging.dispatch.ReflectionDispatch.dispatch(ReflectionDispatch.java:24)
> at
> org.gradle.messaging.remote.internal.hub.MessageHub$Handler.run(MessageHub.java:360)
> at
> org.gradle.internal.concurrent.ExecutorPolicy$CatchAndRecordFailures.onExecute(ExecutorPolicy.java:54)
> at
> org.gradle.internal.concurrent.StoppableExecutorImpl$1.run(StoppableExecutorImpl.java:40)
> at
> java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145)
> at
> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615)
> at java.lang.Thread.run(Thread.java:744)
> {code}
> https://builds.apache.org/job/kafka-trunk-jdk7/815/console
--
This message was sent by Atlassian JIRA
(v6.3.4#6332)