[ 
https://issues.apache.org/jira/browse/FLINK-25455?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Till Rohrmann updated FLINK-25455:
----------------------------------
    Labels: test-stability  (was: )

> FlinkKafkaInternalProducerITCase.testProducerWhenCommitEmptyPartitionsToOutdatedTxnCoordinator
>  fails on AZP
> -----------------------------------------------------------------------------------------------------------
>
>                 Key: FLINK-25455
>                 URL: https://issues.apache.org/jira/browse/FLINK-25455
>             Project: Flink
>          Issue Type: Bug
>          Components: Connectors / Kafka
>    Affects Versions: 1.15.0, 1.13.5, 1.14.2
>            Reporter: Till Rohrmann
>            Priority: Critical
>              Labels: test-stability
>             Fix For: 1.15.0, 1.13.6, 1.14.3
>
>
> The test 
> {{FlinkKafkaInternalProducerITCase.testProducerWhenCommitEmptyPartitionsToOutdatedTxnCoordinator}}
>  fails on AZP with
> {code}
> 2021-12-27T02:56:56.3618475Z java.util.concurrent.TimeoutException: The topic 
> metadata failed to propagate to Kafka broker.
> 2021-12-27T02:56:56.3619339Z  at 
> org.apache.flink.core.testutils.CommonTestUtils.waitUtil(CommonTestUtils.java:214)
> 2021-12-27T02:56:56.3619958Z  at 
> org.apache.flink.core.testutils.CommonTestUtils.waitUtil(CommonTestUtils.java:230)
> 2021-12-27T02:56:56.3620643Z  at 
> org.apache.flink.streaming.connectors.kafka.KafkaTestEnvironmentImpl.createTestTopic(KafkaTestEnvironmentImpl.java:216)
> 2021-12-27T02:56:56.3621504Z  at 
> org.apache.flink.streaming.connectors.kafka.KafkaTestEnvironment.createTestTopic(KafkaTestEnvironment.java:98)
> 2021-12-27T02:56:56.3625442Z  at 
> org.apache.flink.streaming.connectors.kafka.KafkaTestBase.createTestTopic(KafkaTestBase.java:216)
> 2021-12-27T02:56:56.3626958Z  at 
> org.apache.flink.streaming.connectors.kafka.FlinkKafkaInternalProducerITCase.testProducerWhenCommitEmptyPartitionsToOutdatedTxnCoordinator(FlinkKafkaInternalProducerITCase.java:206)
> 2021-12-27T02:56:56.3627771Z  at 
> sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
> 2021-12-27T02:56:56.3628571Z  at 
> sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
> 2021-12-27T02:56:56.3629167Z  at 
> sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
> 2021-12-27T02:56:56.3629689Z  at 
> java.lang.reflect.Method.invoke(Method.java:498)
> 2021-12-27T02:56:56.3630183Z  at 
> org.junit.runners.model.FrameworkMethod$1.runReflectiveCall(FrameworkMethod.java:59)
> 2021-12-27T02:56:56.3630764Z  at 
> org.junit.internal.runners.model.ReflectiveCallable.run(ReflectiveCallable.java:12)
> 2021-12-27T02:56:56.3631484Z  at 
> org.junit.runners.model.FrameworkMethod.invokeExplosively(FrameworkMethod.java:56)
> 2021-12-27T02:56:56.3632056Z  at 
> org.junit.internal.runners.statements.InvokeMethod.evaluate(InvokeMethod.java:17)
> 2021-12-27T02:56:56.3632655Z  at 
> org.junit.internal.runners.statements.FailOnTimeout$CallableStatement.call(FailOnTimeout.java:299)
> 2021-12-27T02:56:56.3633576Z  at 
> org.junit.internal.runners.statements.FailOnTimeout$CallableStatement.call(FailOnTimeout.java:293)
> 2021-12-27T02:56:56.3634123Z  at 
> java.util.concurrent.FutureTask.run(FutureTask.java:266)
> 2021-12-27T02:56:56.3634579Z  at java.lang.Thread.run(Thread.java:748)
> 2021-12-27T02:56:56.3785761Z java.util.concurrent.ExecutionException: 
> org.apache.kafka.common.errors.TopicExistsException: Topic 
> 'flink-kafka-producer-txn-coordinator-changed' already exists.
> 2021-12-27T02:56:56.3787263Z  at 
> org.apache.kafka.common.internals.KafkaFutureImpl.wrapAndThrow(KafkaFutureImpl.java:45)
> 2021-12-27T02:56:56.3787944Z  at 
> org.apache.kafka.common.internals.KafkaFutureImpl.access$000(KafkaFutureImpl.java:32)
> 2021-12-27T02:56:56.3788739Z  at 
> org.apache.kafka.common.internals.KafkaFutureImpl$SingleWaiter.await(KafkaFutureImpl.java:89)
> 2021-12-27T02:56:56.3789355Z  at 
> org.apache.kafka.common.internals.KafkaFutureImpl.get(KafkaFutureImpl.java:260)
> 2021-12-27T02:56:56.3790029Z  at 
> org.apache.flink.streaming.connectors.kafka.KafkaTestEnvironmentImpl.createTestTopic(KafkaTestEnvironmentImpl.java:214)
> 2021-12-27T02:56:56.3791029Z  at 
> org.apache.flink.streaming.connectors.kafka.KafkaTestEnvironment.createTestTopic(KafkaTestEnvironment.java:98)
> 2021-12-27T02:56:56.3791831Z  at 
> org.apache.flink.streaming.connectors.kafka.KafkaTestBase.createTestTopic(KafkaTestBase.java:216)
> 2021-12-27T02:56:56.3792748Z  at 
> org.apache.flink.streaming.connectors.kafka.FlinkKafkaInternalProducerITCase.testProducerWhenCommitEmptyPartitionsToOutdatedTxnCoordinator(FlinkKafkaInternalProducerITCase.java:206)
> 2021-12-27T02:56:56.3793447Z  at 
> sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
> 2021-12-27T02:56:56.3793931Z  at 
> sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
> 2021-12-27T02:56:56.3794501Z  at 
> sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
> 2021-12-27T02:56:56.3795020Z  at 
> java.lang.reflect.Method.invoke(Method.java:498)
> 2021-12-27T02:56:56.3795538Z  at 
> org.junit.runners.model.FrameworkMethod$1.runReflectiveCall(FrameworkMethod.java:59)
> 2021-12-27T02:56:56.3796121Z  at 
> org.junit.internal.runners.model.ReflectiveCallable.run(ReflectiveCallable.java:12)
> 2021-12-27T02:56:56.3796692Z  at 
> org.junit.runners.model.FrameworkMethod.invokeExplosively(FrameworkMethod.java:56)
> 2021-12-27T02:56:56.3797256Z  at 
> org.junit.internal.runners.statements.InvokeMethod.evaluate(InvokeMethod.java:17)
> 2021-12-27T02:56:56.3797835Z  at 
> org.junit.internal.runners.statements.FailOnTimeout$CallableStatement.call(FailOnTimeout.java:299)
> 2021-12-27T02:56:56.3798546Z  at 
> org.junit.internal.runners.statements.FailOnTimeout$CallableStatement.call(FailOnTimeout.java:293)
> 2021-12-27T02:56:56.3799087Z  at 
> java.util.concurrent.FutureTask.run(FutureTask.java:266)
> 2021-12-27T02:56:56.3799519Z  at java.lang.Thread.run(Thread.java:748)
> 2021-12-27T02:56:56.3800385Z Caused by: 
> org.apache.kafka.common.errors.TopicExistsException: Topic 
> 'flink-kafka-producer-txn-coordinator-changed' already exists.
> 2021-12-27T02:56:56.3888956Z java.util.concurrent.ExecutionException: 
> org.apache.kafka.common.errors.TopicExistsException: Topic 
> 'flink-kafka-producer-txn-coordinator-changed' already exists.
> 2021-12-27T02:56:56.3890048Z  at 
> org.apache.kafka.common.internals.KafkaFutureImpl.wrapAndThrow(KafkaFutureImpl.java:45)
> 2021-12-27T02:56:56.3890665Z  at 
> org.apache.kafka.common.internals.KafkaFutureImpl.access$000(KafkaFutureImpl.java:32)
> 2021-12-27T02:56:56.3891363Z  at 
> org.apache.kafka.common.internals.KafkaFutureImpl$SingleWaiter.await(KafkaFutureImpl.java:89)
> 2021-12-27T02:56:56.3891964Z  at 
> org.apache.kafka.common.internals.KafkaFutureImpl.get(KafkaFutureImpl.java:260)
> 2021-12-27T02:56:56.3892608Z  at 
> org.apache.flink.streaming.connectors.kafka.KafkaTestEnvironmentImpl.createTestTopic(KafkaTestEnvironmentImpl.java:214)
> 2021-12-27T02:56:56.3893404Z  at 
> org.apache.flink.streaming.connectors.kafka.KafkaTestEnvironment.createTestTopic(KafkaTestEnvironment.java:98)
> 2021-12-27T02:56:56.3894039Z  at 
> org.apache.flink.streaming.connectors.kafka.KafkaTestBase.createTestTopic(KafkaTestBase.java:216)
> 2021-12-27T02:56:56.3895053Z  at 
> org.apache.flink.streaming.connectors.kafka.FlinkKafkaInternalProducerITCase.testProducerWhenCommitEmptyPartitionsToOutdatedTxnCoordinator(FlinkKafkaInternalProducerITCase.java:206)
> 2021-12-27T02:56:56.3895717Z  at 
> sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
> 2021-12-27T02:56:56.3896201Z  at 
> sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
> 2021-12-27T02:56:56.3896756Z  at 
> sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
> 2021-12-27T02:56:56.3897243Z  at 
> java.lang.reflect.Method.invoke(Method.java:498)
> 2021-12-27T02:56:56.3897734Z  at 
> org.junit.runners.model.FrameworkMethod$1.runReflectiveCall(FrameworkMethod.java:59)
> 2021-12-27T02:56:56.3898374Z  at 
> org.junit.internal.runners.model.ReflectiveCallable.run(ReflectiveCallable.java:12)
> 2021-12-27T02:56:56.3898924Z  at 
> org.junit.runners.model.FrameworkMethod.invokeExplosively(FrameworkMethod.java:56)
> 2021-12-27T02:56:56.3899587Z  at 
> org.junit.internal.runners.statements.InvokeMethod.evaluate(InvokeMethod.java:17)
> 2021-12-27T02:56:56.3900165Z  at 
> org.junit.internal.runners.statements.FailOnTimeout$CallableStatement.call(FailOnTimeout.java:299)
> 2021-12-27T02:56:56.3900800Z  at 
> org.junit.internal.runners.statements.FailOnTimeout$CallableStatement.call(FailOnTimeout.java:293)
> 2021-12-27T02:56:56.3901502Z  at 
> java.util.concurrent.FutureTask.run(FutureTask.java:266)
> 2021-12-27T02:56:56.3901932Z  at java.lang.Thread.run(Thread.java:748)
> 2021-12-27T02:56:56.3902788Z Caused by: 
> org.apache.kafka.common.errors.TopicExistsException: Topic 
> 'flink-kafka-producer-txn-coordinator-changed' already exists.
> 2021-12-27T02:56:56.3991984Z java.util.concurrent.ExecutionException: 
> org.apache.kafka.common.errors.TopicExistsException: Topic 
> 'flink-kafka-producer-txn-coordinator-changed' already exists.
> 2021-12-27T02:56:56.3992877Z  at 
> org.apache.kafka.common.internals.KafkaFutureImpl.wrapAndThrow(KafkaFutureImpl.java:45)
> 2021-12-27T02:56:56.3993759Z  at 
> org.apache.kafka.common.internals.KafkaFutureImpl.access$000(KafkaFutureImpl.java:32)
> 2021-12-27T02:56:56.3994385Z  at 
> org.apache.kafka.common.internals.KafkaFutureImpl$SingleWaiter.await(KafkaFutureImpl.java:89)
> 2021-12-27T02:56:56.3994975Z  at 
> org.apache.kafka.common.internals.KafkaFutureImpl.get(KafkaFutureImpl.java:260)
> 2021-12-27T02:56:56.3995830Z  at 
> org.apache.flink.streaming.connectors.kafka.KafkaTestEnvironmentImpl.createTestTopic(KafkaTestEnvironmentImpl.java:214)
> 2021-12-27T02:56:56.3996907Z  at 
> org.apache.flink.streaming.connectors.kafka.KafkaTestEnvironment.createTestTopic(KafkaTestEnvironment.java:98)
> 2021-12-27T02:56:56.3997773Z  at 
> org.apache.flink.streaming.connectors.kafka.KafkaTestBase.createTestTopic(KafkaTestBase.java:216)
> 2021-12-27T02:56:56.3998719Z  at 
> org.apache.flink.streaming.connectors.kafka.FlinkKafkaInternalProducerITCase.testProducerWhenCommitEmptyPartitionsToOutdatedTxnCoordinator(FlinkKafkaInternalProducerITCase.java:206)
> 2021-12-27T02:56:56.3999400Z  at 
> sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
> 2021-12-27T02:56:56.3999895Z  at 
> sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
> 2021-12-27T02:56:56.4000621Z  at 
> sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
> 2021-12-27T02:56:56.4001473Z  at 
> java.lang.reflect.Method.invoke(Method.java:498)
> 2021-12-27T02:56:56.4002287Z  at 
> org.junit.runners.model.FrameworkMethod$1.runReflectiveCall(FrameworkMethod.java:59)
> 2021-12-27T02:56:56.4003140Z  at 
> org.junit.internal.runners.model.ReflectiveCallable.run(ReflectiveCallable.java:12)
> 2021-12-27T02:56:56.4003988Z  at 
> org.junit.runners.model.FrameworkMethod.invokeExplosively(FrameworkMethod.java:56)
> 2021-12-27T02:56:56.4004839Z  at 
> org.junit.internal.runners.statements.InvokeMethod.evaluate(InvokeMethod.java:17)
> 2021-12-27T02:56:56.4005767Z  at 
> org.junit.internal.runners.statements.FailOnTimeout$CallableStatement.call(FailOnTimeout.java:299)
> 2021-12-27T02:56:56.4007030Z  at 
> org.junit.internal.runners.statements.FailOnTimeout$CallableStatement.call(FailOnTimeout.java:293)
> 2021-12-27T02:56:56.4007864Z  at 
> java.util.concurrent.FutureTask.run(FutureTask.java:266)
> 2021-12-27T02:56:56.4008687Z  at java.lang.Thread.run(Thread.java:748)
> 2021-12-27T02:56:56.4010025Z Caused by: 
> org.apache.kafka.common.errors.TopicExistsException: Topic 
> 'flink-kafka-producer-txn-coordinator-changed' already exists.
> 2021-12-27T02:57:34.2838223Z Dec 27 02:57:34 [ERROR] Tests run: 10, Failures: 
> 1, Errors: 0, Skipped: 0, Time elapsed: 71.271 s <<< FAILURE! - in 
> org.apache.flink.streaming.connectors.kafka.FlinkKafkaInternalProducerITCase
> 2021-12-27T02:57:34.2839174Z Dec 27 02:57:34 [ERROR] 
> testProducerWhenCommitEmptyPartitionsToOutdatedTxnCoordinator  Time elapsed: 
> 17.087 s  <<< FAILURE!
> 2021-12-27T02:57:34.2842604Z Dec 27 02:57:34 java.lang.AssertionError: Create 
> test topic : flink-kafka-producer-txn-coordinator-changed failed, 
> org.apache.kafka.common.errors.TopicExistsException: Topic 
> 'flink-kafka-producer-txn-coordinator-changed' already exists.
> 2021-12-27T02:57:34.2844263Z Dec 27 02:57:34  at 
> org.junit.Assert.fail(Assert.java:89)
> 2021-12-27T02:57:34.2844951Z Dec 27 02:57:34  at 
> org.apache.flink.streaming.connectors.kafka.KafkaTestEnvironmentImpl.createTestTopic(KafkaTestEnvironmentImpl.java:223)
> 2021-12-27T02:57:34.2845760Z Dec 27 02:57:34  at 
> org.apache.flink.streaming.connectors.kafka.KafkaTestEnvironment.createTestTopic(KafkaTestEnvironment.java:98)
> 2021-12-27T02:57:34.2846517Z Dec 27 02:57:34  at 
> org.apache.flink.streaming.connectors.kafka.KafkaTestBase.createTestTopic(KafkaTestBase.java:216)
> 2021-12-27T02:57:34.2847411Z Dec 27 02:57:34  at 
> org.apache.flink.streaming.connectors.kafka.FlinkKafkaInternalProducerITCase.testProducerWhenCommitEmptyPartitionsToOutdatedTxnCoordinator(FlinkKafkaInternalProducerITCase.java:206)
> 2021-12-27T02:57:34.2848348Z Dec 27 02:57:34  at 
> sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
> 2021-12-27T02:57:34.2849022Z Dec 27 02:57:34  at 
> sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
> 2021-12-27T02:57:34.2849894Z Dec 27 02:57:34  at 
> sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
> 2021-12-27T02:57:34.2850502Z Dec 27 02:57:34  at 
> java.lang.reflect.Method.invoke(Method.java:498)
> 2021-12-27T02:57:34.2851187Z Dec 27 02:57:34  at 
> org.junit.runners.model.FrameworkMethod$1.runReflectiveCall(FrameworkMethod.java:59)
> 2021-12-27T02:57:34.2851863Z Dec 27 02:57:34  at 
> org.junit.internal.runners.model.ReflectiveCallable.run(ReflectiveCallable.java:12)
> 2021-12-27T02:57:34.2852519Z Dec 27 02:57:34  at 
> org.junit.runners.model.FrameworkMethod.invokeExplosively(FrameworkMethod.java:56)
> 2021-12-27T02:57:34.2853347Z Dec 27 02:57:34  at 
> org.junit.internal.runners.statements.InvokeMethod.evaluate(InvokeMethod.java:17)
> 2021-12-27T02:57:34.2854045Z Dec 27 02:57:34  at 
> org.junit.internal.runners.statements.FailOnTimeout$CallableStatement.call(FailOnTimeout.java:299)
> 2021-12-27T02:57:34.2854754Z Dec 27 02:57:34  at 
> org.junit.internal.runners.statements.FailOnTimeout$CallableStatement.call(FailOnTimeout.java:293)
> 2021-12-27T02:57:34.2855401Z Dec 27 02:57:34  at 
> java.util.concurrent.FutureTask.run(FutureTask.java:266)
> 2021-12-27T02:57:34.2855902Z Dec 27 02:57:34  at 
> java.lang.Thread.run(Thread.java:748)
> 2021-12-27T02:57:34.2856297Z Dec 27 02:57:34 
> 2021-12-27T02:57:35.3197736Z Dec 27 02:57:35 [INFO] Running 
> org.apache.flink.streaming.connectors.kafka.shuffle.KafkaShuffleExactlyOnceITCase
> 2021-12-27T02:58:56.9222554Z Dec 27 02:58:56 [INFO] Tests run: 6, Failures: 
> 0, Errors: 0, Skipped: 0, Time elapsed: 81.601 s - in 
> org.apache.flink.streaming.connectors.kafka.shuffle.KafkaShuffleExactlyOnceITCase
> 2021-12-27T02:58:58.1145773Z Dec 27 02:58:58 [INFO] Running 
> org.apache.flink.streaming.connectors.kafka.shuffle.KafkaShuffleITCase
> 2021-12-27T02:59:34.6576286Z Dec 27 02:59:34 [INFO] Tests run: 11, Failures: 
> 0, Errors: 0, Skipped: 0, Time elapsed: 36.541 s - in 
> org.apache.flink.streaming.connectors.kafka.shuffle.KafkaShuffleITCase
> 2021-12-27T02:59:35.8126189Z Dec 27 02:59:35 [INFO] Running 
> org.apache.flink.streaming.connectors.kafka.FlinkKafkaProducerITCase
> 2021-12-27T03:03:00.0705828Z Dec 27 03:03:00 [INFO] Tests run: 15, Failures: 
> 0, Errors: 0, Skipped: 0, Time elapsed: 204.256 s - in 
> org.apache.flink.streaming.connectors.kafka.FlinkKafkaProducerITCase
> 2021-12-27T03:03:00.5933283Z Dec 27 03:03:00 [INFO] 
> 2021-12-27T03:03:00.5934894Z Dec 27 03:03:00 [INFO] Results:
> 2021-12-27T03:03:00.5935369Z Dec 27 03:03:00 [INFO] 
> 2021-12-27T03:03:00.5935750Z Dec 27 03:03:00 [ERROR] Failures: 
> 2021-12-27T03:03:00.5939331Z Dec 27 03:03:00 [ERROR]   
> FlinkKafkaInternalProducerITCase.testProducerWhenCommitEmptyPartitionsToOutdatedTxnCoordinator:206->KafkaTestBase.createTestTopic:216
>  Create test topic : flink-kafka-producer-txn-coordinator-changed failed, 
> org.apache.kafka.common.errors.TopicExistsException: Topic 
> 'flink-kafka-producer-txn-coordinator-changed' already exists.
> {code}
> I suspect that this has something to do with the retries that we introduced 
> as part of FLINK-15493 (cc [~fpaul]).
> https://dev.azure.com/apache-flink/apache-flink/_build/results?buildId=28602&view=logs&j=1fc6e7bf-633c-5081-c32a-9dea24b05730&t=576aba0a-d787-51b6-6a92-cf233f360582&l=7576



--
This message was sent by Atlassian Jira
(v8.20.1#820001)

Reply via email to