[ 
https://issues.apache.org/jira/browse/FLINK-39723?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=18085261#comment-18085261
 ] 

Martijn Visser commented on FLINK-39723:
----------------------------------------

I don't think this is a transactional-id reuse issue, but it's a topic metadata 
propagation race. Looking at the the weekly run
https://github.com/apache/flink-connector-kafka/actions/runs/26755629723, 
reproduced 3 times across surefire
retries for the same topic. Inline MiniCluster stack trace:

{code:java}
java.lang.RuntimeException: Failed to get open transactions for topics 
[topics_csv_<uuid>].
  at AdminUtils.getOpenTransactionsForTopics(AdminUtils.java:115)
  at TransactionAbortStrategyImpl$2.abortTransactions(...)            # LISTING 
strategy
  at 
ExactlyOnceKafkaWriter.abortLingeringTransactions(ExactlyOnceKafkaWriter.java:331)
  at ExactlyOnceKafkaWriter.initialize(ExactlyOnceKafkaWriter.java:194)   # 
sink writer init
...
Caused by: ExecutionException: UnknownTopicOrPartitionException:
           This server does not host this topic-partition.
{code}

The root case looks to be that:

1. The test creates its topic via KafkaTableTestBase.createTestTopic(), which 
calls AdminClient.createTopics(...).all().get() and returns as soon as the 
controller acknowledges creation. It does *not* wait for partition 
leadership/metadata to propagate.
2. The test then immediately starts an EXACTLY_ONCE sink job. During writer 
initialization, ExactlyOnceKafkaWriter.abortLingeringTransactions() (LISTING 
strategy) calls AdminUtils.getTopicMetadata(), which races ahead of metadata 
propagation and receives UnknownTopicOrPartitionException.
3. The job runs with NoRestartBackoffTimeStrategy, so the transient error kills 
the job immediately (observed ~0.2s), surfacing to the test as "TableException: 
Failed to wait job finish".

This is the same readiness-race class addressed by FLINK-39234 
(createNewTopicAndWaitForPartitionAssignment).

> KafkaTableITCase.testExactlyOnceSink fails due to transactional ID reuse 
> across test runs
> -----------------------------------------------------------------------------------------
>
>                 Key: FLINK-39723
>                 URL: https://issues.apache.org/jira/browse/FLINK-39723
>             Project: Flink
>          Issue Type: Bug
>          Components: Connectors / Kafka
>            Reporter: Aleksandr Savonin
>            Assignee: Aleksandr Savonin
>            Priority: Major
>
> KafkaTableITCase.testExactlyOnceSink intermittently fails in CI with 
> ProducerFencedException caused by transactional producer state leaking 
> between test runs on the same Kafka broker instance.
> Observed in: CI run 
> https://github.com/apache/flink-connector-kafka/actions/runs/26227821749
>  
> Ideas/potential fixes to think about:
>  - Use randomized/unique transactional ID prefixes per test run (similar to 
> how topic names already use random UUIDs)
>   - Or explicitly abort any open transactions with the same ID before the 
> test starts
>   - Or ensure the Kafka container is fully restarted between parameterized 
> variants that share transactional ID space



--
This message was sent by Atlassian Jira
(v8.20.10#820010)

Reply via email to