Hi JM, > why having "transactional.id.expiration.ms" < "transaction.timeout.ms" helps
When recover a job from a checkpoint/savepoint which contains Kafka transactions, Flink will try to re-commit those transactions based on transaction ID upon recovery. If those transactions timeout or transaction ID expire, re-commit may fail due to the mismatch of transactional id. IIUC, if we set "transactional.id.expiration.ms" < "transaction.timeout.ms", it allows transactional id to be reset, but will cause data loss. [1][2] would be helpful to understand what happened. [1] https://issues.apache.org/jira/browse/FLINK-16419?focusedCommentId=17624315&page=com.atlassian.jira.plugin.system.issuetabpanels%3Acomment-tabpanel#comment-17624315 [2] https://ververica.zendesk.com/hc/en-us/articles/360013269680-Best-Practices-for-Using-Kafka-Sources-Sinks-in-Flink-Jobs Jean-Marc Paulin <j...@uk.ibm.com> 于2024年4月23日周二 16:45写道: > > Thanks for y our insight. > > I am still trying to understand exactly what happens here. We currently have > the default setting in kafka, and we set the "transaction.timeout.ms" to 15 > minutes (which also happen to be the default "transaction.max.timeout.ms". > My expectation would be that if our savepoint is more than 15 minutes old it > would fail, but that is not the case. > > I still think we need to extend the "transaction.max.timeout.ms" to something > like 7 days, as a 7 days old savepoints is effectively worthless, and > probably adjust "transaction.timeout.ms" to be close to this. > > But can you explain how "transactional.id.expiration.ms" influences the > InvalidPidMappingException, or why having "transactional.id.expiration.ms" < > "transaction.timeout.ms" helps? > > Kind regards > > Jean-Marc > > > ________________________________ > From: Yanfei Lei <fredia...@gmail.com> > Sent: Monday, April 22, 2024 03:28 > To: Jean-Marc Paulin <j...@uk.ibm.com> > Cc: user@flink.apache.org <user@flink.apache.org> > Subject: [EXTERNAL] Re: Flink 1.18: Unable to resume from a savepoint with > error InvalidPidMappingException > > Hi JM, > > Yes, `InvalidPidMappingException` occurs because the transaction is > lost in most cases. > > For short-term, " transaction.timeout.ms" > > "transactional.id.expiration.ms" can ignore the > `InvalidPidMappingException`[1]. > For long-term, FLIP-319[2] provides a solution. > > [1] > https://speakerdeck.com/rmetzger/3-flink-mistakes-we-made-so-you-wont-have-to?slide=13 > [2] https://cwiki.apache.org/confluence/pages/viewpage.action?pageId=255071710 > > Jean-Marc Paulin <j...@uk.ibm.com> 于2024年4月20日周六 02:30写道: > > > > Hi, > > > > we use Flink 1.18 with Kafka Sink, and we enabled `EXACTLY_ONCE` on one of > > our kafka sink. We set the transation timeout to 15 minutes. When we try to > > restore from a savepoint, way after that 15 minutes window, Flink enter in > > a RESTARTING loop. We see the error: > > > > ``` > > { > > "exception": { > > "exception_class": > > "org.apache.kafka.common.errors.InvalidPidMappingException", > > "exception_message": "The producer attempted to use a producer id which > > is not currently assigned to its transactional id.", > > "stacktrace": > > "org.apache.kafka.common.errors.InvalidPidMappingException: The producer > > attempted to use a producer id which is not currently assigned to its > > transactional id.\n" > > }, > > "@version": 1, > > "source_host": "aiops-ir-lifecycle-eventprocessor-ep-jobmanager-0", > > "message": "policy-exec::schedule-policy-execution -> > > (policy-exec::select-kafka-async-policy-stages, > > policy-exec::select-async-policy-stages -> > > policy-exec::execute-async-policy-stages, > > policy-exec::select-non-async-policy-stages, Sink: stories-input, Sink: > > policy-completion-results, Sink: stories-changes, Sink: alerts-input, Sink: > > story-notifications-output, Sink: alerts-output, Sink: alerts-changes, > > Sink: connector-alerts, Sink: updated-events-output, Sink: stories-output, > > Sink: runbook-execution-requests) (6/6) > > (3f8cb042c1aa628891c444466a8b52d1_593c33b9decafa4ad6ae85c185860bef_5_0) > > switched from INITIALIZING to FAILED on > > aiops-ir-lifecycle-eventprocessor-ep-taskmanager-1.aiops-ir-lifecycle-eventprocessor-ep-taskmanager.cp4aiops.svc:6122-d2828c > > @ > > aiops-ir-lifecycle-eventprocessor-ep-taskmanager-1.aiops-ir-lifecycle-eventprocessor-ep-taskmanager.cp4aiops.svc.cluster.local > > (dataPort=6121).", > > "thread_name": "flink-pekko.actor.default-dispatcher-18", > > "@timestamp": "2024-04-19T11:11:05.169+0000", > > "level": "INFO", > > "logger_name": "org.apache.flink.runtime.executiongraph.ExecutionGraph" > > } > > ``` > > As much as I understanding the transaction is lost, would it be possible to > > ignore this particular error and resume the job anyway? > > > > Thanks for any suggestions > > > > JM > > > > > > Unless otherwise stated above: > > > > IBM United Kingdom Limited > > Registered in England and Wales with number 741598 > > Registered office: PO Box 41, North Harbour, Portsmouth, Hants. PO6 3AU > > > > -- > Best, > Yanfei > Unless otherwise stated above: > > IBM United Kingdom Limited > Registered in England and Wales with number 741598 > Registered office: PO Box 41, North Harbour, Portsmouth, Hants. PO6 3AU -- Best, Yanfei