[
https://issues.apache.org/jira/browse/KAFKA-4692?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15947773#comment-15947773
]
ASF GitHub Bot commented on KAFKA-4692:
---------------------------------------
GitHub user original-brownbear opened a pull request:
https://github.com/apache/kafka/pull/2761
KAFKA-4692: Make testNo thread safe
https://issues.apache.org/jira/browse/KAFKA-4692 should be resolved by this.
The problem here (or at least one problem, causing trouble) appears to have
been that the `testNo` field is not thread-safe the way it is used here
(`volatile` doesn't help anything really) to generate a unique test number (for
creating non-conflicting topic names).
If two instances of this class are set up inside the same fork and
`testNo++;` is run in one just as the other runs e.g.
```java
private void createTopics() throws InterruptedException {
streamOneInput = "stream-one-" + testNo;
outputTopic = "output-" + testNo;
CLUSTER.createTopic(streamOneInput, 3, 1);
CLUSTER.createTopic(outputTopic);
}
```
then both will have the same value for the topic names while sharing the
same static `CLUSTER` field too.
I fixed this by simply making the actually used `testNo` an instance
variable that is derived from an increasing static counter for each test in an
atomic way. I think this should resolve any naming collisions here.
You can merge this pull request into a Git repository by running:
$ git pull https://github.com/original-brownbear/kafka KAFKA-4692
Alternatively you can review and apply these changes as the patch at:
https://github.com/apache/kafka/pull/2761.patch
To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:
This closes #2761
----
commit 4d2468318897f37b00dab0ad63c36ae3004a9165
Author: Armin Braun <[email protected]>
Date: 2017-03-29T19:27:59Z
KAFKA-4692: Make testNo thread safe
----
> Transient test failure in
> org.apache.kafka.streams.integration.KStreamAggregationDedupIntegrationTest
> -----------------------------------------------------------------------------------------------------
>
> Key: KAFKA-4692
> URL: https://issues.apache.org/jira/browse/KAFKA-4692
> Project: Kafka
> Issue Type: Sub-task
> Components: unit tests
> Reporter: Guozhang Wang
> Assignee: Armin Braun
>
> Seen a couple of failures on at least the following two test cases:
> org.apache.kafka.streams.integration.KStreamAggregationDedupIntegrationTest.shouldReduce
> {code}
> Error Message
> java.lang.AssertionError:
> Expected: is <[KeyValue(A, A:A), KeyValue(B, B:B), KeyValue(C, C:C),
> KeyValue(D, D:D), KeyValue(E, E:E)]>
> but: was <[KeyValue(A, A), KeyValue(A, A:A), KeyValue(B, B:B),
> KeyValue(C, C:C), KeyValue(D, D:D), KeyValue(E, E:E)]>
> Stacktrace
> java.lang.AssertionError:
> Expected: is <[KeyValue(A, A:A), KeyValue(B, B:B), KeyValue(C, C:C),
> KeyValue(D, D:D), KeyValue(E, E:E)]>
> but: was <[KeyValue(A, A), KeyValue(A, A:A), KeyValue(B, B:B),
> KeyValue(C, C:C), KeyValue(D, D:D), KeyValue(E, E:E)]>
> at org.hamcrest.MatcherAssert.assertThat(MatcherAssert.java:20)
> at org.hamcrest.MatcherAssert.assertThat(MatcherAssert.java:8)
> at
> org.apache.kafka.streams.integration.KStreamAggregationDedupIntegrationTest.shouldReduce(KStreamAggregationDedupIntegrationTest.java:138)
> at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
> at
> sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
> at
> sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
> at java.lang.reflect.Method.invoke(Method.java:498)
> at
> org.junit.runners.model.FrameworkMethod$1.runReflectiveCall(FrameworkMethod.java:50)
> at
> org.junit.internal.runners.model.ReflectiveCallable.run(ReflectiveCallable.java:12)
> at
> org.junit.runners.model.FrameworkMethod.invokeExplosively(FrameworkMethod.java:47)
> at
> org.junit.internal.runners.statements.InvokeMethod.evaluate(InvokeMethod.java:17)
> at
> org.junit.internal.runners.statements.RunBefores.evaluate(RunBefores.java:26)
> at
> org.junit.internal.runners.statements.RunAfters.evaluate(RunAfters.java:27)
> at org.junit.runners.ParentRunner.runLeaf(ParentRunner.java:325)
> at
> org.junit.runners.BlockJUnit4ClassRunner.runChild(BlockJUnit4ClassRunner.java:78)
> at
> org.junit.runners.BlockJUnit4ClassRunner.runChild(BlockJUnit4ClassRunner.java:57)
> at org.junit.runners.ParentRunner$3.run(ParentRunner.java:290)
> at org.junit.runners.ParentRunner$1.schedule(ParentRunner.java:71)
> at org.junit.runners.ParentRunner.runChildren(ParentRunner.java:288)
> at org.junit.runners.ParentRunner.access$000(ParentRunner.java:58)
> at org.junit.runners.ParentRunner$2.evaluate(ParentRunner.java:268)
> at org.junit.rules.ExternalResource$1.evaluate(ExternalResource.java:48)
> at org.junit.rules.RunRules.evaluate(RunRules.java:20)
> at org.junit.runners.ParentRunner.run(ParentRunner.java:363)
> at
> org.gradle.api.internal.tasks.testing.junit.JUnitTestClassExecuter.runTestClass(JUnitTestClassExecuter.java:114)
> at
> org.gradle.api.internal.tasks.testing.junit.JUnitTestClassExecuter.execute(JUnitTestClassExecuter.java:57)
> at
> org.gradle.api.internal.tasks.testing.junit.JUnitTestClassProcessor.processTestClass(JUnitTestClassProcessor.java:66)
> at
> org.gradle.api.internal.tasks.testing.SuiteTestClassProcessor.processTestClass(SuiteTestClassProcessor.java:51)
> at sun.reflect.GeneratedMethodAccessor5.invoke(Unknown Source)
> at
> sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
> at java.lang.reflect.Method.invoke(Method.java:498)
> at
> org.gradle.internal.dispatch.ReflectionDispatch.dispatch(ReflectionDispatch.java:35)
> at
> org.gradle.internal.dispatch.ReflectionDispatch.dispatch(ReflectionDispatch.java:24)
> at
> org.gradle.internal.dispatch.ContextClassLoaderDispatch.dispatch(ContextClassLoaderDispatch.java:32)
> at
> org.gradle.internal.dispatch.ProxyDispatchAdapter$DispatchingInvocationHandler.invoke(ProxyDispatchAdapter.java:93)
> at com.sun.proxy.$Proxy2.processTestClass(Unknown Source)
> at
> org.gradle.api.internal.tasks.testing.worker.TestWorker.processTestClass(TestWorker.java:109)
> at sun.reflect.GeneratedMethodAccessor4.invoke(Unknown Source)
> at
> sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
> at java.lang.reflect.Method.invoke(Method.java:498)
> at
> org.gradle.internal.dispatch.ReflectionDispatch.dispatch(ReflectionDispatch.java:35)
> at
> org.gradle.internal.dispatch.ReflectionDispatch.dispatch(ReflectionDispatch.java:24)
> at
> org.gradle.internal.remote.internal.hub.MessageHub$Handler.run(MessageHub.java:377)
> at
> org.gradle.internal.concurrent.ExecutorPolicy$CatchAndRecordFailures.onExecute(ExecutorPolicy.java:54)
> at
> org.gradle.internal.concurrent.StoppableExecutorImpl$1.run(StoppableExecutorImpl.java:40)
> at
> java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
> at
> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
> at java.lang.Thread.run(Thread.java:745)
> Standard Output
> [2017-01-24 17:32:11,379] ERROR ZKShutdownHandler is not registered, so
> ZooKeeper server won't take any action on ERROR or SHUTDOWN server state
> changes (org.apache.zookeeper.server.ZooKeeperServer:472)
> [2017-01-24 17:32:11,408] WARN No meta.properties file under dir
> /tmp/junit8682341835780103762/junit5821763745241022056/meta.properties
> (kafka.server.BrokerMetadataCheckpoint:85)
> [2017-01-24 17:32:11,430] WARN No meta.properties file under dir
> /tmp/junit8682341835780103762/junit5821763745241022056/meta.properties
> (kafka.server.BrokerMetadataCheckpoint:85)
> [2017-01-24 17:32:11,432] WARN Session 0x0 for server null, unexpected error,
> closing socket connection and attempting reconnect
> (org.apache.zookeeper.ClientCnxn:1162)
> java.net.ConnectException: Connection refused
> at sun.nio.ch.SocketChannelImpl.checkConnect(Native Method)
> at
> sun.nio.ch.SocketChannelImpl.finishConnect(SocketChannelImpl.java:717)
> at
> org.apache.zookeeper.ClientCnxnSocketNIO.doTransport(ClientCnxnSocketNIO.java:361)
> at org.apache.zookeeper.ClientCnxn$SendThread.run(ClientCnxn.java:1141)
> [2017-01-24 17:32:17,950] WARN Session 0x0 for server null, unexpected error,
> closing socket connection and attempting reconnect
> (org.apache.zookeeper.ClientCnxn:1162)
> java.net.ConnectException: Connection refused
> at sun.nio.ch.SocketChannelImpl.checkConnect(Native Method)
> at
> sun.nio.ch.SocketChannelImpl.finishConnect(SocketChannelImpl.java:717)
> at
> org.apache.zookeeper.ClientCnxnSocketNIO.doTransport(ClientCnxnSocketNIO.java:361)
> at org.apache.zookeeper.ClientCnxn$SendThread.run(ClientCnxn.java:1141)
> [2017-01-24 17:32:29,099] ERROR ZKShutdownHandler is not registered, so
> ZooKeeper server won't take any action on ERROR or SHUTDOWN server state
> changes (org.apache.zookeeper.server.ZooKeeperServer:472)
> {code}
> org.apache.kafka.streams.integration.KStreamAggregationDedupIntegrationTest.shouldGroupByKey
> {code}
> Error Message
>
>
> java.lang.AssertionError:
> Expected: is <[KeyValue(1@1484776400000, 2), KeyValue(2@1484776400000, 2),
> KeyValue(3@1484776400000, 2), KeyValue(4@1484776400000, 2),
> KeyValue(5@1484776400000, 2)]>
> but: was <[KeyValue(1@1484776400000, 1), KeyValue(1@1484776400000, 2),
> KeyValue(2@1484776400000, 1), KeyValue(3@1484776400000, 1),
> KeyValue(4@1484776400000, 1), KeyValue(5@1484776400000, 1)]>
>
>
> Stacktrace
>
>
> java.lang.AssertionError:
> Expected: is <[KeyValue(1@1484776400000, 2), KeyValue(2@1484776400000, 2),
> KeyValue(3@1484776400000, 2), KeyValue(4@1484776400000, 2),
> KeyValue(5@1484776400000, 2)]>
> but: was <[KeyValue(1@1484776400000, 1), KeyValue(1@1484776400000, 2),
> KeyValue(2@1484776400000, 1), KeyValue(3@1484776400000, 1),
> KeyValue(4@1484776400000, 1), KeyValue(5@1484776400000, 1)]>
> at org.hamcrest.MatcherAssert.assertThat(MatcherAssert.java:20)
> at org.hamcrest.MatcherAssert.assertThat(MatcherAssert.java:8)
> at
> org.apache.kafka.streams.integration.KStreamAggregationDedupIntegrationTest.shouldGroupByKey(KStreamAggregationDedupIntegrationTest.java:240)
> at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
> at
> sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:57)
> at
> sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
> at java.lang.reflect.Method.invoke(Method.java:606)
> at
> org.junit.runners.model.FrameworkMethod$1.runReflectiveCall(FrameworkMethod.java:50)
> at
> org.junit.internal.runners.model.ReflectiveCallable.run(ReflectiveCallable.java:12)
> at
> org.junit.runners.model.FrameworkMethod.invokeExplosively(FrameworkMethod.java:47)
> at
> org.junit.internal.runners.statements.InvokeMethod.evaluate(InvokeMethod.java:17)
> at
> org.junit.internal.runners.statements.RunBefores.evaluate(RunBefores.java:26)
> at
> org.junit.internal.runners.statements.RunAfters.evaluate(RunAfters.java:27)
> at org.junit.runners.ParentRunner.runLeaf(ParentRunner.java:325)
> at
> org.junit.runners.BlockJUnit4ClassRunner.runChild(BlockJUnit4ClassRunner.java:78)
> at
> org.junit.runners.BlockJUnit4ClassRunner.runChild(BlockJUnit4ClassRunner.java:57)
> at org.junit.runners.ParentRunner$3.run(ParentRunner.java:290)
> at org.junit.runners.ParentRunner$1.schedule(ParentRunner.java:71)
> at org.junit.runners.ParentRunner.runChildren(ParentRunner.java:288)
> at org.junit.runners.ParentRunner.access$000(ParentRunner.java:58)
> at org.junit.runners.ParentRunner$2.evaluate(ParentRunner.java:268)
> at org.junit.rules.ExternalResource$1.evaluate(ExternalResource.java:48)
> at org.junit.rules.RunRules.evaluate(RunRules.java:20)
> at org.junit.runners.ParentRunner.run(ParentRunner.java:363)
> at
> org.gradle.api.internal.tasks.testing.junit.JUnitTestClassExecuter.runTestClass(JUnitTestClassExecuter.java:114)
> at
> org.gradle.api.internal.tasks.testing.junit.JUnitTestClassExecuter.execute(JUnitTestClassExecuter.java:57)
> at
> org.gradle.api.internal.tasks.testing.junit.JUnitTestClassProcessor.processTestClass(JUnitTestClassProcessor.java:66)
> at
> org.gradle.api.internal.tasks.testing.SuiteTestClassProcessor.processTestClass(SuiteTestClassProcessor.java:51)
> at sun.reflect.GeneratedMethodAccessor8.invoke(Unknown Source)
> at
> sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
> at java.lang.reflect.Method.invoke(Method.java:606)
> at
> org.gradle.internal.dispatch.ReflectionDispatch.dispatch(ReflectionDispatch.java:35)
> at
> org.gradle.internal.dispatch.ReflectionDispatch.dispatch(ReflectionDispatch.java:24)
> at
> org.gradle.internal.dispatch.ContextClassLoaderDispatch.dispatch(ContextClassLoaderDispatch.java:32)
> at
> org.gradle.internal.dispatch.ProxyDispatchAdapter$DispatchingInvocationHandler.invoke(ProxyDispatchAdapter.java:93)
> at com.sun.proxy.$Proxy2.processTestClass(Unknown Source)
> at
> org.gradle.api.internal.tasks.testing.worker.TestWorker.processTestClass(TestWorker.java:109)
> at sun.reflect.GeneratedMethodAccessor7.invoke(Unknown Source)
> at
> sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
> at java.lang.reflect.Method.invoke(Method.java:606)
> at
> org.gradle.internal.dispatch.ReflectionDispatch.dispatch(ReflectionDispatch.java:35)
> at
> org.gradle.internal.dispatch.ReflectionDispatch.dispatch(ReflectionDispatch.java:24)
> at
> org.gradle.internal.remote.internal.hub.MessageHub$Handler.run(MessageHub.java:377)
> at
> org.gradle.internal.concurrent.ExecutorPolicy$CatchAndRecordFailures.onExecute(ExecutorPolicy.java:54)
> at
> org.gradle.internal.concurrent.StoppableExecutorImpl$1.run(StoppableExecutorImpl.java:40)
> at
> java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145)
> at
> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615)
> at java.lang.Thread.run(Thread.java:745)
> {code}
--
This message was sent by Atlassian JIRA
(v6.3.15#6346)