[ 
https://issues.apache.org/jira/browse/FLINK-24119?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17625574#comment-17625574
 ] 

Qingsheng Ren commented on FLINK-24119:
---------------------------------------

Thanks for the input [~gaborgsomogyi] ! {{KafkaShuffleExactlyOnceITCase}} is 
configured to re-run on failure, and the topic name is not randomized, so we 
encounter this "topic already exists" exception whatever the root cause is. We 
have to dig into the log and find the actual failure reason in the first 
attempt (it's a nightmare to look through the CI log...)

I checked the latest two failure cases above, and both of them failed because 
of InitProducerId timeout, which is caused by a timeout on broker when 
requesting new producer id blocks in ProducerIdManager: 

 
{code:java}
org.apache.kafka.common.errors.TimeoutException: Timed out waiting for next 
producer ID block{code}
So, I assume this is an internal issue on Kafka broker and not related to the 
Flink Kafka shuffler. [~gaborgsomogyi] 's solution to randomize topic names on 
retry makes sense to me on this case, considering it's so hard to debug issues 
on Kafka broker on our side. Please ping me if your PR is ready to review. 

 

> KafkaITCase.testTimestamps fails due to "Topic xxx already exist"
> -----------------------------------------------------------------
>
>                 Key: FLINK-24119
>                 URL: https://issues.apache.org/jira/browse/FLINK-24119
>             Project: Flink
>          Issue Type: Bug
>          Components: Connectors / Kafka
>    Affects Versions: 1.14.0, 1.15.0, 1.16.0
>            Reporter: Xintong Song
>            Assignee: Qingsheng Ren
>            Priority: Blocker
>              Labels: auto-deprioritized-critical, test-stability
>             Fix For: 1.16.1
>
>
> https://dev.azure.com/apache-flink/apache-flink/_build/results?buildId=23328&view=logs&j=c5f0071e-1851-543e-9a45-9ac140befc32&t=15a22db7-8faa-5b34-3920-d33c9f0ca23c&l=7419
> {code}
> Sep 01 15:53:20 [ERROR] Tests run: 23, Failures: 1, Errors: 0, Skipped: 0, 
> Time elapsed: 162.65 s <<< FAILURE! - in 
> org.apache.flink.streaming.connectors.kafka.KafkaITCase
> Sep 01 15:53:20 [ERROR] testTimestamps  Time elapsed: 23.237 s  <<< FAILURE!
> Sep 01 15:53:20 java.lang.AssertionError: Create test topic : tstopic failed, 
> org.apache.kafka.common.errors.TopicExistsException: Topic 'tstopic' already 
> exists.
> Sep 01 15:53:20       at org.junit.Assert.fail(Assert.java:89)
> Sep 01 15:53:20       at 
> org.apache.flink.streaming.connectors.kafka.KafkaTestEnvironmentImpl.createTestTopic(KafkaTestEnvironmentImpl.java:226)
> Sep 01 15:53:20       at 
> org.apache.flink.streaming.connectors.kafka.KafkaTestEnvironment.createTestTopic(KafkaTestEnvironment.java:112)
> Sep 01 15:53:20       at 
> org.apache.flink.streaming.connectors.kafka.KafkaTestBase.createTestTopic(KafkaTestBase.java:212)
> Sep 01 15:53:20       at 
> org.apache.flink.streaming.connectors.kafka.KafkaITCase.testTimestamps(KafkaITCase.java:191)
> Sep 01 15:53:20       at sun.reflect.NativeMethodAccessorImpl.invoke0(Native 
> Method)
> Sep 01 15:53:20       at 
> sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
> Sep 01 15:53:20       at 
> sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
> Sep 01 15:53:20       at java.lang.reflect.Method.invoke(Method.java:498)
> Sep 01 15:53:20       at 
> org.junit.runners.model.FrameworkMethod$1.runReflectiveCall(FrameworkMethod.java:59)
> Sep 01 15:53:20       at 
> org.junit.internal.runners.model.ReflectiveCallable.run(ReflectiveCallable.java:12)
> Sep 01 15:53:20       at 
> org.junit.runners.model.FrameworkMethod.invokeExplosively(FrameworkMethod.java:56)
> Sep 01 15:53:20       at 
> org.junit.internal.runners.statements.InvokeMethod.evaluate(InvokeMethod.java:17)
> Sep 01 15:53:20       at 
> org.junit.internal.runners.statements.FailOnTimeout$CallableStatement.call(FailOnTimeout.java:299)
> Sep 01 15:53:20       at 
> org.junit.internal.runners.statements.FailOnTimeout$CallableStatement.call(FailOnTimeout.java:293)
> Sep 01 15:53:20       at 
> java.util.concurrent.FutureTask.run(FutureTask.java:266)
> Sep 01 15:53:20       at java.lang.Thread.run(Thread.java:748)
> {code}



--
This message was sent by Atlassian Jira
(v8.20.10#820010)

Reply via email to