[ 
https://issues.apache.org/jira/browse/KAFKA-8011?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17118222#comment-17118222
 ] 

Sophie Blee-Goldman edited comment on KAFKA-8011 at 6/4/20, 12:58 AM:
----------------------------------------------------------------------

[~mjsax] should we create a separate ticket for that test? AFAICT the test this 
ticket is for is only failing because that test is failing and the cleanup 
isn't done properly. But the test you pointed out seems to be a real bug:
{code:java}
[2020-05-27 17:56:00,413] ERROR Exception caught during Deserialization, 
taskId: 0_0, topic: TEST-TOPIC-1, partition: 0, offset: 0 
(org.apache.kafka.streams.errors.LogAndFailExceptionHandler:39)
java.lang.NullPointerException
        at 
org.apache.kafka.streams.processor.internals.SourceNode.deserializeKey(SourceNode.java:58)
        at 
org.apache.kafka.streams.processor.internals.RecordDeserializer.deserialize(RecordDeserializer.java:65)
        at 
org.apache.kafka.streams.processor.internals.RecordQueue.updateHead(RecordQueue.java:176)
        at 
org.apache.kafka.streams.processor.internals.RecordQueue.addRawRecords(RecordQueue.java:112)
        at 
org.apache.kafka.streams.processor.internals.PartitionGroup.addRawRecords(PartitionGroup.java:185)
        at 
org.apache.kafka.streams.processor.internals.StreamTask.addRecords(StreamTask.java:844)
        at 
org.apache.kafka.streams.processor.internals.StreamThread.addRecordsToTasks(StreamThread.java:836)
        at 
org.apache.kafka.streams.processor.internals.StreamThread.runOnce(StreamThread.java:646)
        at 
org.apache.kafka.streams.processor.internals.StreamThread.runLoop(StreamThread.java:550)
        at 
org.apache.kafka.streams.processor.internals.StreamThread.run(StreamThread.java:509)

{code}


was (Author: ableegoldman):
[~mjsax] should we create a separate ticket for that test? AFAICT the test this 
ticket is for is only failing because that test is failing and the cleanup 
isn't done properly. But the test you pointed out seems to be a real bug:
{code:java}
[2020-05-27 17:56:00,413] ERROR Exception caught during Deserialization, 
taskId: 0_0, topic: TEST-TOPIC-1, partition: 0, offset: 0 
(org.apache.kafka.streams.errors.LogAndFailExceptionHandler:39)
java.lang.NullPointerException
        at 
org.apache.kafka.streams.processor.internals.SourceNode.deserializeKey(SourceNode.java:58)
        at 
org.apache.kafka.streams.processor.internals.RecordDeserializer.deserialize(RecordDeserializer.java:65)
        at 
org.apache.kafka.streams.processor.internals.RecordQueue.updateHead(RecordQueue.java:176)
        at 
org.apache.kafka.streams.processor.internals.RecordQueue.addRawRecords(RecordQueue.java:112)
        at 
org.apache.kafka.streams.processor.internals.PartitionGroup.addRawRecords(PartitionGroup.java:185)
        at 
org.apache.kafka.streams.processor.internals.StreamTask.addRecords(StreamTask.java:844)
        at 
org.apache.kafka.streams.processor.internals.StreamThread.addRecordsToTasks(StreamThread.java:836)
        at 
org.apache.kafka.streams.processor.internals.StreamThread.runOnce(StreamThread.java:646)
        at 
org.apache.kafka.streams.processor.internals.StreamThread.runLoop(StreamThread.java:550)
        at 
org.apache.kafka.streams.processor.internals.StreamThread.run(StreamThread.java:509)
{code}
I also later see 
{code:java}
[2020-05-27 17:57:23,145] ERROR stream-thread 
[regex-source-integration-test-9e3e325f-3fe3-4741-9862-eff246a8fb2a-StreamThread-1]
 Encountered the following exception during processing and the thread is going 
to shut down: (org.apache.kafka.streams.processor.internals.StreamThread:529) 
org.apache.kafka.streams.errors.TopologyException: Invalid topology: Topic foo 
is already matched for another regex pattern foo.* and hence cannot be matched 
to this regex pattern f.* any more. at 
org.apache.kafka.streams.processor.internals.InternalTopologyBuilder$SourceNodeFactory.getTopics(InternalTopologyBuilder.java:261)
 at 
org.apache.kafka.streams.processor.internals.InternalTopologyBuilder.setRegexMatchedTopicsToSourceNodes(InternalTopologyBuilder.java:1122)
 at 
org.apache.kafka.streams.processor.internals.InternalTopologyBuilder.updateSubscribedTopics(InternalTopologyBuilder.java:1983)
 at 
org.apache.kafka.streams.processor.internals.InternalTopologyBuilder.addSubscribedTopicsFromMetadata(InternalTopologyBuilder.java:1973)
 at 
org.apache.kafka.streams.processor.internals.TaskManager.handleRebalanceStart(TaskManager.java:136)
 at 
org.apache.kafka.streams.processor.internals.StreamsPartitionAssignor.handleRebalanceStart(StreamsPartitionAssignor.java:1585)
 at 
org.apache.kafka.streams.processor.internals.StreamsPartitionAssignor.subscriptionUserData(StreamsPartitionAssignor.java:232)
 at 
org.apache.kafka.clients.consumer.internals.ConsumerCoordinator.metadata(ConsumerCoordinator.java:222)
 at 
org.apache.kafka.clients.consumer.internals.AbstractCoordinator.sendJoinGroupRequest(AbstractCoordinator.java:561)
 at 
org.apache.kafka.clients.consumer.internals.AbstractCoordinator.initiateJoinGroup(AbstractCoordinator.java:496)
 at 
org.apache.kafka.clients.consumer.internals.AbstractCoordinator.joinGroupIfNeeded(AbstractCoordinator.java:418)
 at 
org.apache.kafka.clients.consumer.internals.AbstractCoordinator.ensureActiveGroup(AbstractCoordinator.java:359)
 at 
org.apache.kafka.clients.consumer.internals.ConsumerCoordinator.poll(ConsumerCoordinator.java:506)
 at 
org.apache.kafka.clients.consumer.KafkaConsumer.updateAssignmentMetadataIfNeeded(KafkaConsumer.java:1265)
 at 
org.apache.kafka.clients.consumer.KafkaConsumer.poll(KafkaConsumer.java:1231) 
at 
org.apache.kafka.clients.consumer.KafkaConsumer.poll(KafkaConsumer.java:1206) 
at 
org.apache.kafka.streams.processor.internals.StreamThread.pollRequests(StreamThread.java:770)
 at 
org.apache.kafka.streams.processor.internals.StreamThread.runOnce(StreamThread.java:630)
 at 
org.apache.kafka.streams.processor.internals.StreamThread.runLoop(StreamThread.java:550)
 at 
org.apache.kafka.streams.processor.internals.StreamThread.run(StreamThread.java:509)
{code}

> Flaky Test RegexSourceIntegrationTest#testRegexMatchesTopicsAWhenCreated
> ------------------------------------------------------------------------
>
>                 Key: KAFKA-8011
>                 URL: https://issues.apache.org/jira/browse/KAFKA-8011
>             Project: Kafka
>          Issue Type: Bug
>          Components: streams
>            Reporter: Bill Bejeck
>            Assignee: Bill Bejeck
>            Priority: Blocker
>              Labels: flaky-test, newbie
>             Fix For: 1.0.3, 1.1.2, 2.2.0, 2.0.2, 2.1.2, 2.6.0
>
>         Attachments: 
> org.apache.kafka.streams.integration.RegexSourceIntegrationTest.html, 
> streams_1_0_test_results.png, streams_1_1_tests.png
>
>
> The RegexSourceIntegrationTest#testRegexMatchesTopicsAWhenCreated
> and RegexSourceIntegrationTest#testRegexMatchesTopicsAWhenDeleted  tests use 
> an ArrayList to assert the topics assigned to the Streams application. 
> The ConsumerRebalanceListener used in the test operates on this list as does 
> the TestUtils.waitForCondition() to verify the expected topic assignments.
> Using the same list in both places can cause a ConcurrentModficationException 
> if the rebalance listener modifies the assignment at the same time 
> TestUtils.waitForCondition() is using the list to verify the expected topics. 



--
This message was sent by Atlassian Jira
(v8.3.4#803005)

Reply via email to