[ https://issues.apache.org/jira/browse/FLINK-24119?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17625574#comment-17625574 ]
Qingsheng Ren commented on FLINK-24119: --------------------------------------- Thanks for the input [~gaborgsomogyi] ! {{KafkaShuffleExactlyOnceITCase}} is configured to re-run on failure, and the topic name is not randomized, so we encounter this "topic already exists" exception whatever the root cause is. We have to dig into the log and find the actual failure reason in the first attempt (it's a nightmare to look through the CI log...) I checked the latest two failure cases above, and both of them failed because of InitProducerId timeout, which is caused by a timeout on broker when requesting new producer id blocks in ProducerIdManager: {code:java} org.apache.kafka.common.errors.TimeoutException: Timed out waiting for next producer ID block{code} So, I assume this is an internal issue on Kafka broker and not related to the Flink Kafka shuffler. [~gaborgsomogyi] 's solution to randomize topic names on retry makes sense to me on this case, considering it's so hard to debug issues on Kafka broker on our side. Please ping me if your PR is ready to review. > KafkaITCase.testTimestamps fails due to "Topic xxx already exist" > ----------------------------------------------------------------- > > Key: FLINK-24119 > URL: https://issues.apache.org/jira/browse/FLINK-24119 > Project: Flink > Issue Type: Bug > Components: Connectors / Kafka > Affects Versions: 1.14.0, 1.15.0, 1.16.0 > Reporter: Xintong Song > Assignee: Qingsheng Ren > Priority: Blocker > Labels: auto-deprioritized-critical, test-stability > Fix For: 1.16.1 > > > https://dev.azure.com/apache-flink/apache-flink/_build/results?buildId=23328&view=logs&j=c5f0071e-1851-543e-9a45-9ac140befc32&t=15a22db7-8faa-5b34-3920-d33c9f0ca23c&l=7419 > {code} > Sep 01 15:53:20 [ERROR] Tests run: 23, Failures: 1, Errors: 0, Skipped: 0, > Time elapsed: 162.65 s <<< FAILURE! - in > org.apache.flink.streaming.connectors.kafka.KafkaITCase > Sep 01 15:53:20 [ERROR] testTimestamps Time elapsed: 23.237 s <<< FAILURE! > Sep 01 15:53:20 java.lang.AssertionError: Create test topic : tstopic failed, > org.apache.kafka.common.errors.TopicExistsException: Topic 'tstopic' already > exists. > Sep 01 15:53:20 at org.junit.Assert.fail(Assert.java:89) > Sep 01 15:53:20 at > org.apache.flink.streaming.connectors.kafka.KafkaTestEnvironmentImpl.createTestTopic(KafkaTestEnvironmentImpl.java:226) > Sep 01 15:53:20 at > org.apache.flink.streaming.connectors.kafka.KafkaTestEnvironment.createTestTopic(KafkaTestEnvironment.java:112) > Sep 01 15:53:20 at > org.apache.flink.streaming.connectors.kafka.KafkaTestBase.createTestTopic(KafkaTestBase.java:212) > Sep 01 15:53:20 at > org.apache.flink.streaming.connectors.kafka.KafkaITCase.testTimestamps(KafkaITCase.java:191) > Sep 01 15:53:20 at sun.reflect.NativeMethodAccessorImpl.invoke0(Native > Method) > Sep 01 15:53:20 at > sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62) > Sep 01 15:53:20 at > sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) > Sep 01 15:53:20 at java.lang.reflect.Method.invoke(Method.java:498) > Sep 01 15:53:20 at > org.junit.runners.model.FrameworkMethod$1.runReflectiveCall(FrameworkMethod.java:59) > Sep 01 15:53:20 at > org.junit.internal.runners.model.ReflectiveCallable.run(ReflectiveCallable.java:12) > Sep 01 15:53:20 at > org.junit.runners.model.FrameworkMethod.invokeExplosively(FrameworkMethod.java:56) > Sep 01 15:53:20 at > org.junit.internal.runners.statements.InvokeMethod.evaluate(InvokeMethod.java:17) > Sep 01 15:53:20 at > org.junit.internal.runners.statements.FailOnTimeout$CallableStatement.call(FailOnTimeout.java:299) > Sep 01 15:53:20 at > org.junit.internal.runners.statements.FailOnTimeout$CallableStatement.call(FailOnTimeout.java:293) > Sep 01 15:53:20 at > java.util.concurrent.FutureTask.run(FutureTask.java:266) > Sep 01 15:53:20 at java.lang.Thread.run(Thread.java:748) > {code} -- This message was sent by Atlassian Jira (v8.20.10#820010)