[ 
https://issues.apache.org/jira/browse/KAFKA-5140?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16212812#comment-16212812
 ] 

Guozhang Wang commented on KAFKA-5140:
--------------------------------------

Discovered the root cause of this flaky test is this: 
https://issues.apache.org/jira/browse/KAFKA-6098

More specifically, here is the (augmented) log trails to expose this issue:

1. After the reset tool is called, and 
{{assertInternalTopicsGotDeleted(INTERMEDIATE_USER_TOPIC)}} has passed (but the 
ZK path has not been deleted due to the above issue). The streams app will 
resume executing and in the first rebalance the StreamPartitionAssignor will 
try to create the deleted topic again:

{code}
org.apache.kafka.streams.integration.ResetIntegrationTest > 
testReprocessingFromScratchAfterResetWithIntermediateUserTopic STANDARD_OUT
    [2017-10-19 18:28:12,027] INFO [GroupCoordinator 0]: Preparing to rebalance 
group cleanup-integration-test1 with old generation 4 (__consumer_offsets-1) 
(kafka.coordinator.group.GroupCoordinator:72)
    [2017-10-19 18:28:12,028] INFO [GroupCoordinator 0]: Stabilized group 
cleanup-integration-test1 generation 5 (__consumer_offsets-1) 
(kafka.coordinator.group.GroupCoordinator:72)
    [2017-10-19 18:28:12,029] WARN stream-thread 
[cleanup-integration-test1-e86cdf4e-781a-408a-8414-1115d9558914-StreamThread-10]
 Constructed client metadata 
{e86cdf4e-781a-408a-8414-1115d9558914=ClientMetadata{hostInfo=null, 
consumers=[cleanup-integration-test1-e86cdf4e-781a-408a-8414-1115d9558914-StreamThread-10-consumer-bd575ab5-c159-4c8a-9130-1ac896c23595],
 state=[activeTasks: ([]) standbyTasks: ([]) assignedTasks: ([]) 
prevActiveTasks: ([]) prevAssignedTasks: ([]) capacity: 1]}} from the member 
subscriptions. 
(org.apache.kafka.streams.processor.internals.StreamPartitionAssignor:241)
    [2017-10-19 18:28:12,029] WARN stream-thread 
[cleanup-integration-test1-e86cdf4e-781a-408a-8414-1115d9558914-StreamThread-10]
 Starting to validate internal topics in partition assignor. 
(org.apache.kafka.streams.processor.internals.StreamPartitionAssignor:236)
{code}

2. And then following entry can be found at the broker side:

{code}
    [2017-10-19 18:28:12,280] INFO [Admin Manager on Broker 0]: Error 
processing create topic request for topic 
cleanup-integration-test1-KSTREAM-AGGREGATE-STATE-STORE-0000000002-repartition 
with arguments (numPartitions=1, replicationFactor=1, replicasAssignments={}, 
configs={cleanup.policy=delete}) (kafka.server.AdminManager:80)
    org.apache.kafka.common.errors.TopicExistsException: Topic 
'cleanup-integration-test1-KSTREAM-AGGREGATE-STATE-STORE-0000000002-repartition'
 already exists.
{code}

3. {{StreamsKafkaClient}} takes the error code as OK, and then moves on to the 
validation phase, which will be blocking forever.

{code}
            // wait until each one of the topic metadata has been propagated to 
at least one broker
            while (!allTopicsCreated(topicNamesToMakeReady, topicsToMakeReady)) 
{
                try {
                    Thread.sleep(50L);
                } catch (InterruptedException e) {
                    // ignore
                }
            }
{code}

4. And after 2 seconds (which is the session timeout value), the stream 
consumer will be kicked out of the group as it is blocked on the above phase.

{code}
org.apache.kafka.streams.integration.ResetIntegrationTest > 
testReprocessingFromScratchAfterResetWithIntermediateUserTopic STANDARD_OUT
    [2017-10-19 18:28:14,030] INFO [GroupCoordinator 0]: Member 
cleanup-integration-test1-e86cdf4e-781a-408a-8414-1115d9558914-StreamThread-10-consumer-bd575ab5-c159-4c8a-9130-1ac896c23595
 in group cleanup-integration-test1 has failed, removing it from the group 
(kafka.coordinator.group.GroupCoordinator:72)
    [2017-10-19 18:28:14,032] INFO [GroupCoordinator 0]: Preparing to rebalance 
group cleanup-integration-test1 with old generation 5 (__consumer_offsets-1) 
(kafka.coordinator.group.GroupCoordinator:72)
    [2017-10-19 18:28:14,032] INFO [GroupCoordinator 0]: Group 
cleanup-integration-test1 with generation 6 is now empty (__consumer_offsets-1) 
(kafka.coordinator.group.GroupCoordinator:72)
{code}

And after 30 seconds the test will fail.

> Flaky ResetIntegrationTest
> --------------------------
>
>                 Key: KAFKA-5140
>                 URL: https://issues.apache.org/jira/browse/KAFKA-5140
>             Project: Kafka
>          Issue Type: Bug
>          Components: streams, unit tests
>    Affects Versions: 0.10.2.0
>            Reporter: Matthias J. Sax
>            Assignee: Guozhang Wang
>             Fix For: 0.11.0.0
>
>
> {noformat}
> org.apache.kafka.streams.integration.ResetIntegrationTest > 
> testReprocessingFromScratchAfterResetWithIntermediateUserTopic FAILED
>     java.lang.AssertionError: 
>     Expected: <[KeyValue(2986681642095, 1), KeyValue(2986681642055, 1), 
> KeyValue(2986681642075, 1), KeyValue(2986681642035, 1), 
> KeyValue(2986681642095, 1), KeyValue(2986681642055, 1), 
> KeyValue(2986681642115, 1), KeyValue(2986681642075, 1), 
> KeyValue(2986681642075, 2), KeyValue(2986681642095, 2), 
> KeyValue(2986681642115, 1), KeyValue(2986681642135, 1), 
> KeyValue(2986681642095, 2), KeyValue(2986681642115, 2), 
> KeyValue(2986681642155, 1), KeyValue(2986681642135, 1), 
> KeyValue(2986681642115, 2), KeyValue(2986681642135, 2), 
> KeyValue(2986681642155, 1), KeyValue(2986681642175, 1), 
> KeyValue(2986681642135, 2), KeyValue(2986681642155, 2), 
> KeyValue(2986681642175, 1), KeyValue(2986681642195, 1), 
> KeyValue(2986681642135, 3), KeyValue(2986681642155, 2), 
> KeyValue(2986681642175, 2), KeyValue(2986681642195, 1), 
> KeyValue(2986681642155, 3), KeyValue(2986681642175, 2), 
> KeyValue(2986681642195, 2), KeyValue(2986681642155, 3), 
> KeyValue(2986681642175, 3), KeyValue(2986681642195, 2), 
> KeyValue(2986681642155, 4), KeyValue(2986681642175, 3), 
> KeyValue(2986681642195, 3)]>
>          but: was <[KeyValue(2986681642095, 1), KeyValue(2986681642055, 1), 
> KeyValue(2986681642075, 1), KeyValue(2986681642035, 1), 
> KeyValue(2986681642095, 1), KeyValue(2986681642055, 1), 
> KeyValue(2986681642115, 1), KeyValue(2986681642075, 1), 
> KeyValue(2986681642075, 2), KeyValue(2986681642095, 2), 
> KeyValue(2986681642115, 1), KeyValue(2986681642135, 1), 
> KeyValue(2986681642095, 2), KeyValue(2986681642115, 2), 
> KeyValue(2986681642155, 1), KeyValue(2986681642135, 1), 
> KeyValue(2986681642115, 2), KeyValue(2986681642135, 2), 
> KeyValue(2986681642155, 1), KeyValue(2986681642175, 1), 
> KeyValue(2986681642135, 2), KeyValue(2986681642155, 2), 
> KeyValue(2986681642175, 1), KeyValue(2986681642195, 1), 
> KeyValue(2986681642135, 3), KeyValue(2986681642155, 2), 
> KeyValue(2986681642175, 2), KeyValue(2986681642195, 1), 
> KeyValue(2986681642155, 3), KeyValue(2986681642175, 2), 
> KeyValue(2986681642195, 2), KeyValue(2986681642155, 3)]>
>         at org.hamcrest.MatcherAssert.assertThat(MatcherAssert.java:20)
>         at org.hamcrest.MatcherAssert.assertThat(MatcherAssert.java:8)
>         at 
> org.apache.kafka.streams.integration.ResetIntegrationTest.testReprocessingFromScratchAfterResetWithIntermediateUserTopic(ResetIntegrationTest.java:190)
> {noformat}



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)

Reply via email to