[ 
https://issues.apache.org/jira/browse/KAFKA-4692?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15947773#comment-15947773
 ] 

ASF GitHub Bot commented on KAFKA-4692:
---------------------------------------

GitHub user original-brownbear opened a pull request:

    https://github.com/apache/kafka/pull/2761

    KAFKA-4692: Make testNo thread safe

    https://issues.apache.org/jira/browse/KAFKA-4692 should be resolved by this.
    
    The problem here (or at least one problem, causing trouble) appears to have 
been that the `testNo` field is not thread-safe the way it is used here 
(`volatile` doesn't help anything really) to generate a unique test number (for 
creating non-conflicting topic names).
    If two instances of this class are set up inside the same fork and 
`testNo++;` is run in one just as the other runs e.g. 
    
    ```java
        private void createTopics() throws InterruptedException {
            streamOneInput = "stream-one-" + testNo;
            outputTopic = "output-" + testNo;
            CLUSTER.createTopic(streamOneInput, 3, 1);
            CLUSTER.createTopic(outputTopic);
        }
    ```
    
    then both will have the same value for the topic names while sharing the 
same static `CLUSTER` field too.
    I fixed this by simply making the actually used `testNo` an instance 
variable that is derived from an increasing static counter for each test in an 
atomic way. I think this should resolve any naming collisions here.


You can merge this pull request into a Git repository by running:

    $ git pull https://github.com/original-brownbear/kafka KAFKA-4692

Alternatively you can review and apply these changes as the patch at:

    https://github.com/apache/kafka/pull/2761.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

    This closes #2761
    
----
commit 4d2468318897f37b00dab0ad63c36ae3004a9165
Author: Armin Braun <m...@obrown.io>
Date:   2017-03-29T19:27:59Z

    KAFKA-4692: Make testNo thread safe

----


> Transient test failure in 
> org.apache.kafka.streams.integration.KStreamAggregationDedupIntegrationTest
> -----------------------------------------------------------------------------------------------------
>
>                 Key: KAFKA-4692
>                 URL: https://issues.apache.org/jira/browse/KAFKA-4692
>             Project: Kafka
>          Issue Type: Sub-task
>          Components: unit tests
>            Reporter: Guozhang Wang
>            Assignee: Armin Braun
>
> Seen a couple of failures on at least the following two test cases:
> org.apache.kafka.streams.integration.KStreamAggregationDedupIntegrationTest.shouldReduce
> {code}
> Error Message
> java.lang.AssertionError: 
> Expected: is <[KeyValue(A, A:A), KeyValue(B, B:B), KeyValue(C, C:C), 
> KeyValue(D, D:D), KeyValue(E, E:E)]>
>      but: was <[KeyValue(A, A), KeyValue(A, A:A), KeyValue(B, B:B), 
> KeyValue(C, C:C), KeyValue(D, D:D), KeyValue(E, E:E)]>
> Stacktrace
> java.lang.AssertionError: 
> Expected: is <[KeyValue(A, A:A), KeyValue(B, B:B), KeyValue(C, C:C), 
> KeyValue(D, D:D), KeyValue(E, E:E)]>
>      but: was <[KeyValue(A, A), KeyValue(A, A:A), KeyValue(B, B:B), 
> KeyValue(C, C:C), KeyValue(D, D:D), KeyValue(E, E:E)]>
>       at org.hamcrest.MatcherAssert.assertThat(MatcherAssert.java:20)
>       at org.hamcrest.MatcherAssert.assertThat(MatcherAssert.java:8)
>       at 
> org.apache.kafka.streams.integration.KStreamAggregationDedupIntegrationTest.shouldReduce(KStreamAggregationDedupIntegrationTest.java:138)
>       at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
>       at 
> sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
>       at 
> sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
>       at java.lang.reflect.Method.invoke(Method.java:498)
>       at 
> org.junit.runners.model.FrameworkMethod$1.runReflectiveCall(FrameworkMethod.java:50)
>       at 
> org.junit.internal.runners.model.ReflectiveCallable.run(ReflectiveCallable.java:12)
>       at 
> org.junit.runners.model.FrameworkMethod.invokeExplosively(FrameworkMethod.java:47)
>       at 
> org.junit.internal.runners.statements.InvokeMethod.evaluate(InvokeMethod.java:17)
>       at 
> org.junit.internal.runners.statements.RunBefores.evaluate(RunBefores.java:26)
>       at 
> org.junit.internal.runners.statements.RunAfters.evaluate(RunAfters.java:27)
>       at org.junit.runners.ParentRunner.runLeaf(ParentRunner.java:325)
>       at 
> org.junit.runners.BlockJUnit4ClassRunner.runChild(BlockJUnit4ClassRunner.java:78)
>       at 
> org.junit.runners.BlockJUnit4ClassRunner.runChild(BlockJUnit4ClassRunner.java:57)
>       at org.junit.runners.ParentRunner$3.run(ParentRunner.java:290)
>       at org.junit.runners.ParentRunner$1.schedule(ParentRunner.java:71)
>       at org.junit.runners.ParentRunner.runChildren(ParentRunner.java:288)
>       at org.junit.runners.ParentRunner.access$000(ParentRunner.java:58)
>       at org.junit.runners.ParentRunner$2.evaluate(ParentRunner.java:268)
>       at org.junit.rules.ExternalResource$1.evaluate(ExternalResource.java:48)
>       at org.junit.rules.RunRules.evaluate(RunRules.java:20)
>       at org.junit.runners.ParentRunner.run(ParentRunner.java:363)
>       at 
> org.gradle.api.internal.tasks.testing.junit.JUnitTestClassExecuter.runTestClass(JUnitTestClassExecuter.java:114)
>       at 
> org.gradle.api.internal.tasks.testing.junit.JUnitTestClassExecuter.execute(JUnitTestClassExecuter.java:57)
>       at 
> org.gradle.api.internal.tasks.testing.junit.JUnitTestClassProcessor.processTestClass(JUnitTestClassProcessor.java:66)
>       at 
> org.gradle.api.internal.tasks.testing.SuiteTestClassProcessor.processTestClass(SuiteTestClassProcessor.java:51)
>       at sun.reflect.GeneratedMethodAccessor5.invoke(Unknown Source)
>       at 
> sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
>       at java.lang.reflect.Method.invoke(Method.java:498)
>       at 
> org.gradle.internal.dispatch.ReflectionDispatch.dispatch(ReflectionDispatch.java:35)
>       at 
> org.gradle.internal.dispatch.ReflectionDispatch.dispatch(ReflectionDispatch.java:24)
>       at 
> org.gradle.internal.dispatch.ContextClassLoaderDispatch.dispatch(ContextClassLoaderDispatch.java:32)
>       at 
> org.gradle.internal.dispatch.ProxyDispatchAdapter$DispatchingInvocationHandler.invoke(ProxyDispatchAdapter.java:93)
>       at com.sun.proxy.$Proxy2.processTestClass(Unknown Source)
>       at 
> org.gradle.api.internal.tasks.testing.worker.TestWorker.processTestClass(TestWorker.java:109)
>       at sun.reflect.GeneratedMethodAccessor4.invoke(Unknown Source)
>       at 
> sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
>       at java.lang.reflect.Method.invoke(Method.java:498)
>       at 
> org.gradle.internal.dispatch.ReflectionDispatch.dispatch(ReflectionDispatch.java:35)
>       at 
> org.gradle.internal.dispatch.ReflectionDispatch.dispatch(ReflectionDispatch.java:24)
>       at 
> org.gradle.internal.remote.internal.hub.MessageHub$Handler.run(MessageHub.java:377)
>       at 
> org.gradle.internal.concurrent.ExecutorPolicy$CatchAndRecordFailures.onExecute(ExecutorPolicy.java:54)
>       at 
> org.gradle.internal.concurrent.StoppableExecutorImpl$1.run(StoppableExecutorImpl.java:40)
>       at 
> java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
>       at 
> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
>       at java.lang.Thread.run(Thread.java:745)
> Standard Output
> [2017-01-24 17:32:11,379] ERROR ZKShutdownHandler is not registered, so 
> ZooKeeper server won't take any action on ERROR or SHUTDOWN server state 
> changes (org.apache.zookeeper.server.ZooKeeperServer:472)
> [2017-01-24 17:32:11,408] WARN No meta.properties file under dir 
> /tmp/junit8682341835780103762/junit5821763745241022056/meta.properties 
> (kafka.server.BrokerMetadataCheckpoint:85)
> [2017-01-24 17:32:11,430] WARN No meta.properties file under dir 
> /tmp/junit8682341835780103762/junit5821763745241022056/meta.properties 
> (kafka.server.BrokerMetadataCheckpoint:85)
> [2017-01-24 17:32:11,432] WARN Session 0x0 for server null, unexpected error, 
> closing socket connection and attempting reconnect 
> (org.apache.zookeeper.ClientCnxn:1162)
> java.net.ConnectException: Connection refused
>       at sun.nio.ch.SocketChannelImpl.checkConnect(Native Method)
>       at 
> sun.nio.ch.SocketChannelImpl.finishConnect(SocketChannelImpl.java:717)
>       at 
> org.apache.zookeeper.ClientCnxnSocketNIO.doTransport(ClientCnxnSocketNIO.java:361)
>       at org.apache.zookeeper.ClientCnxn$SendThread.run(ClientCnxn.java:1141)
> [2017-01-24 17:32:17,950] WARN Session 0x0 for server null, unexpected error, 
> closing socket connection and attempting reconnect 
> (org.apache.zookeeper.ClientCnxn:1162)
> java.net.ConnectException: Connection refused
>       at sun.nio.ch.SocketChannelImpl.checkConnect(Native Method)
>       at 
> sun.nio.ch.SocketChannelImpl.finishConnect(SocketChannelImpl.java:717)
>       at 
> org.apache.zookeeper.ClientCnxnSocketNIO.doTransport(ClientCnxnSocketNIO.java:361)
>       at org.apache.zookeeper.ClientCnxn$SendThread.run(ClientCnxn.java:1141)
> [2017-01-24 17:32:29,099] ERROR ZKShutdownHandler is not registered, so 
> ZooKeeper server won't take any action on ERROR or SHUTDOWN server state 
> changes (org.apache.zookeeper.server.ZooKeeperServer:472)
> {code}
> org.apache.kafka.streams.integration.KStreamAggregationDedupIntegrationTest.shouldGroupByKey
> {code}
> Error Message
>  
>  
> java.lang.AssertionError:
> Expected: is <[KeyValue(1@1484776400000, 2), KeyValue(2@1484776400000, 2), 
> KeyValue(3@1484776400000, 2), KeyValue(4@1484776400000, 2), 
> KeyValue(5@1484776400000, 2)]>
>      but: was <[KeyValue(1@1484776400000, 1), KeyValue(1@1484776400000, 2), 
> KeyValue(2@1484776400000, 1), KeyValue(3@1484776400000, 1), 
> KeyValue(4@1484776400000, 1), KeyValue(5@1484776400000, 1)]>
>  
>  
> Stacktrace
>  
>  
> java.lang.AssertionError:
> Expected: is <[KeyValue(1@1484776400000, 2), KeyValue(2@1484776400000, 2), 
> KeyValue(3@1484776400000, 2), KeyValue(4@1484776400000, 2), 
> KeyValue(5@1484776400000, 2)]>
>      but: was <[KeyValue(1@1484776400000, 1), KeyValue(1@1484776400000, 2), 
> KeyValue(2@1484776400000, 1), KeyValue(3@1484776400000, 1), 
> KeyValue(4@1484776400000, 1), KeyValue(5@1484776400000, 1)]>
>     at org.hamcrest.MatcherAssert.assertThat(MatcherAssert.java:20)
>     at org.hamcrest.MatcherAssert.assertThat(MatcherAssert.java:8)
>     at 
> org.apache.kafka.streams.integration.KStreamAggregationDedupIntegrationTest.shouldGroupByKey(KStreamAggregationDedupIntegrationTest.java:240)
>     at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
>     at 
> sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:57)
>     at 
> sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
>     at java.lang.reflect.Method.invoke(Method.java:606)
>     at 
> org.junit.runners.model.FrameworkMethod$1.runReflectiveCall(FrameworkMethod.java:50)
>     at 
> org.junit.internal.runners.model.ReflectiveCallable.run(ReflectiveCallable.java:12)
>     at 
> org.junit.runners.model.FrameworkMethod.invokeExplosively(FrameworkMethod.java:47)
>     at 
> org.junit.internal.runners.statements.InvokeMethod.evaluate(InvokeMethod.java:17)
>     at 
> org.junit.internal.runners.statements.RunBefores.evaluate(RunBefores.java:26)
>     at 
> org.junit.internal.runners.statements.RunAfters.evaluate(RunAfters.java:27)
>     at org.junit.runners.ParentRunner.runLeaf(ParentRunner.java:325)
>     at 
> org.junit.runners.BlockJUnit4ClassRunner.runChild(BlockJUnit4ClassRunner.java:78)
>     at 
> org.junit.runners.BlockJUnit4ClassRunner.runChild(BlockJUnit4ClassRunner.java:57)
>     at org.junit.runners.ParentRunner$3.run(ParentRunner.java:290)
>     at org.junit.runners.ParentRunner$1.schedule(ParentRunner.java:71)
>     at org.junit.runners.ParentRunner.runChildren(ParentRunner.java:288)
>     at org.junit.runners.ParentRunner.access$000(ParentRunner.java:58)
>     at org.junit.runners.ParentRunner$2.evaluate(ParentRunner.java:268)
>     at org.junit.rules.ExternalResource$1.evaluate(ExternalResource.java:48)
>     at org.junit.rules.RunRules.evaluate(RunRules.java:20)
>     at org.junit.runners.ParentRunner.run(ParentRunner.java:363)
>     at 
> org.gradle.api.internal.tasks.testing.junit.JUnitTestClassExecuter.runTestClass(JUnitTestClassExecuter.java:114)
>     at 
> org.gradle.api.internal.tasks.testing.junit.JUnitTestClassExecuter.execute(JUnitTestClassExecuter.java:57)
>     at 
> org.gradle.api.internal.tasks.testing.junit.JUnitTestClassProcessor.processTestClass(JUnitTestClassProcessor.java:66)
>     at 
> org.gradle.api.internal.tasks.testing.SuiteTestClassProcessor.processTestClass(SuiteTestClassProcessor.java:51)
>     at sun.reflect.GeneratedMethodAccessor8.invoke(Unknown Source)
>     at 
> sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
>     at java.lang.reflect.Method.invoke(Method.java:606)
>     at 
> org.gradle.internal.dispatch.ReflectionDispatch.dispatch(ReflectionDispatch.java:35)
>     at 
> org.gradle.internal.dispatch.ReflectionDispatch.dispatch(ReflectionDispatch.java:24)
>     at 
> org.gradle.internal.dispatch.ContextClassLoaderDispatch.dispatch(ContextClassLoaderDispatch.java:32)
>     at 
> org.gradle.internal.dispatch.ProxyDispatchAdapter$DispatchingInvocationHandler.invoke(ProxyDispatchAdapter.java:93)
>     at com.sun.proxy.$Proxy2.processTestClass(Unknown Source)
>     at 
> org.gradle.api.internal.tasks.testing.worker.TestWorker.processTestClass(TestWorker.java:109)
>     at sun.reflect.GeneratedMethodAccessor7.invoke(Unknown Source)
>     at 
> sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
>     at java.lang.reflect.Method.invoke(Method.java:606)
>     at 
> org.gradle.internal.dispatch.ReflectionDispatch.dispatch(ReflectionDispatch.java:35)
>     at 
> org.gradle.internal.dispatch.ReflectionDispatch.dispatch(ReflectionDispatch.java:24)
>     at 
> org.gradle.internal.remote.internal.hub.MessageHub$Handler.run(MessageHub.java:377)
>     at 
> org.gradle.internal.concurrent.ExecutorPolicy$CatchAndRecordFailures.onExecute(ExecutorPolicy.java:54)
>     at 
> org.gradle.internal.concurrent.StoppableExecutorImpl$1.run(StoppableExecutorImpl.java:40)
>     at 
> java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145)
>     at 
> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615)
>     at java.lang.Thread.run(Thread.java:745)
> {code}



--
This message was sent by Atlassian JIRA
(v6.3.15#6346)

Reply via email to