Hi 1. we use flink 1.17.1 2. there is no traffic and data in the topics when it's happened 3
log part with the error: 2023-07-24 08:41:30,334 DEBUG org.apache.flink.connector.kafka.sink. FlinkKafkaInternalProducer [] - commitTransaction telephony-decoder- ORCHESTRATOR-MULTIMEDIA-IN-8-10 2023-07-24 08:41:30,378 ERROR org.apache.flink.connector.kafka.sink. KafkaCommitter [] - Unable to commit transaction (org.apache.flink.streaming.runtime.operators.sink.committables. CommitRequestImpl@24e0e43c) because it's in an invalid state. Most likely the transaction has been aborted for some reason. Please check the Kafka logs for more details. org.apache.kafka.common.errors.InvalidTxnStateException: The producer attempted a transactional operation in an invalid state. 2023-07-24 08:41:30,378 ERROR org.apache.flink.connector.kafka.sink. KafkaCommitter [] - Unable to commit transaction (org.apache.flink.streaming.runtime.operators.sink.committables. CommitRequestImpl@55c55f14) because it's in an invalid state. Most likely the transaction has been aborted for some reason. Please check the Kafka logs for more details. org.apache.kafka.common.errors.InvalidTxnStateException: The producer attempted a transactional operation in an invalid state. 2023-07-24 08:41:30,380 DEBUG org.apache.flink.runtime.state. TaskStateManagerImpl [] - Operator b2a2e541ae5a67fa81a2d8b387225849 has remote state SubtaskState{operatorStateFromBackend=StateObjectCollection{[ OperatorStateHandle{stateNameToPartitionOffsets={writer_raw_states= StateMetaInfo{offsets=[233], distributionMode=SPLIT_DISTRIBUTE}}, delegateStateHandle=ByteStreamStateHandle{handleName= 'file:/opt/flink/shared/savepoints/savepoint-000000-fcc29062a521/238f2a1e-aa7e-4218-843a-b469a5dcbad8', dataBytes=291}}]}, operatorStateFromStream=StateObjectCollection{[]}, keyedStateFromBackend=StateObjectCollection{[]}, keyedStateFromStream= StateObjectCollection{[]}, inputChannelState=StateObjectCollection{[]}, resultSubpartitionState=StateObjectCollection{[]}, stateSize=291, checkpointedSize=291} from job manager and local state alternatives [] from local state store org.apache.flink.runtime.state.NoOpTaskLocalStateStoreImpl @42b00150. 2023-07-24 08:41:30,380 DEBUG org.apache.flink.runtime.state. TaskStateManagerImpl [] - Operator b2a2e541ae5a67fa81a2d8b387225849 has remote state SubtaskState{operatorStateFromBackend=StateObjectCollection{[ OperatorStateHandle{stateNameToPartitionOffsets={writer_raw_states= StateMetaInfo{offsets=[233], distributionMode=SPLIT_DISTRIBUTE}}, delegateStateHandle=ByteStreamStateHandle{handleName= 'file:/opt/flink/shared/savepoints/savepoint-000000-fcc29062a521/e2d3916f-f9f9-40e4-89bc-bc193fbd5373', dataBytes=291}}]}, operatorStateFromStream=StateObjectCollection{[]}, keyedStateFromBackend=StateObjectCollection{[]}, keyedStateFromStream= StateObjectCollection{[]}, inputChannelState=StateObjectCollection{[]}, resultSubpartitionState=StateObjectCollection{[]}, stateSize=291, checkpointedSize=291} from job manager and local state alternatives [] from local state store org.apache.flink.runtime.state.NoOpTaskLocalStateStoreImpl @32734da0. 2023-07-24 08:41:30,380 DEBUG org.apache.flink.streaming.api.operators. BackendRestorerProcedure [] - Creating operator state backend for SinkWriterOperator_b2a2e541ae5a67fa81a2d8b387225849_(12/20) and restoring with state from alternative (1/1). 2023-07-24 08:41:30,381 DEBUG org.apache.flink.streaming.api.operators. BackendRestorerProcedure [] - Creating operator state backend for SinkWriterOperator_b2a2e541ae5a67fa81a2d8b387225849_(14/20) and restoring with state from alternative (1/1). 2023-07-24 08:41:30,381 ERROR org.apache.flink.connector.kafka.sink. KafkaCommitter [] - Unable to commit transaction (org.apache.flink.streaming.runtime.operators.sink.committables. CommitRequestImpl@75581079) because it's in an invalid state. Most likely the transaction has been aborted for some reason. Please check the Kafka logs for more details. org.apache.kafka.common.errors.InvalidTxnStateException: The producer attempted a transactional operation in an invalid state. 2023-07-24 08:41:30,382 DEBUG org.apache.flink.runtime.state. TaskStateManagerImpl [] - Operator b2a2e541ae5a67fa81a2d8b387225849 has remote state SubtaskState{operatorStateFromBackend=StateObjectCollection{[ OperatorStateHandle{stateNameToPartitionOffsets={writer_raw_states= StateMetaInfo{offsets=[233], distributionMode=SPLIT_DISTRIBUTE}}, delegateStateHandle=ByteStreamStateHandle{handleName= 'file:/opt/flink/shared/savepoints/savepoint-000000-fcc29062a521/66a3e58e-0f8c-4c09-9fc3-04be0d3388dc', dataBytes=291}}]}, operatorStateFromStream=StateObjectCollection{[]}, keyedStateFromBackend=StateObjectCollection{[]}, keyedStateFromStream= StateObjectCollection{[]}, inputChannelState=StateObjectCollection{[]}, resultSubpartitionState=StateObjectCollection{[]}, stateSize=291, checkpointedSize=291} from job manager and local state alternatives [] from local state store org.apache.flink.runtime.state.NoOpTaskLocalStateStoreImpl @5c241b41. 2023-07-24 08:41:30,382 DEBUG org.apache.flink.streaming.api.operators. BackendRestorerProcedure [] - Creating operator state backend for SinkWriterOperator_b2a2e541ae5a67fa81a2d8b387225849_(5/20) and restoring with state from alternative (1/1). 2023-07-24 08:41:30,382 DEBUG org.apache.flink.runtime.metrics. MetricRegistryImpl [] - Registering metric taskmanager.job.task.operator.numRecordsOutErrors. 2023-07-24 08:41:30,382 DEBUG org.apache.flink.runtime.metrics. MetricRegistryImpl [] - Registering metric taskmanager.job.task.operator.numRecordsOutErrors. 2023-07-24 08:41:30,383 DEBUG org.apache.flink.runtime.metrics. MetricRegistryImpl [] - Registering metric taskmanager.job.task.operator.numRecordsSendErrors. 2023-07-24 08:41:30,383 DEBUG org.apache.flink.runtime.metrics. MetricRegistryImpl [] - Registering metric taskmanager.job.task.operator.numRecordsOutErrors. 2023-07-24 08:41:30,383 DEBUG org.apache.flink.runtime.metrics. MetricRegistryImpl [] - Registering metric taskmanager.job.task.operator.numRecordsSendErrors. 2023-07-24 08:41:30,383 DEBUG org.apache.flink.runtime.metrics. MetricRegistryImpl [] - Registering metric taskmanager.job.task.operator.numRecordsSend. 2023-07-24 08:41:30,383 DEBUG org.apache.flink.runtime.metrics. MetricRegistryImpl [] - Registering metric taskmanager.job.task.operator.numRecordsSend. 2023-07-24 08:41:30,384 DEBUG org.apache.flink.runtime.metrics. MetricRegistryImpl [] - Registering metric taskmanager.job.task.operator.numBytesSend. 2023-07-24 08:41:30,384 DEBUG org.apache.flink.runtime.metrics. MetricRegistryImpl [] - Registering metric taskmanager.job.task.operator.numRecordsSendErrors. 2023-07-24 08:41:30,384 DEBUG org.apache.flink.runtime.metrics. MetricRegistryImpl [] - Registering metric taskmanager.job.task.operator.numRecordsSend. 2023-07-24 08:41:30,384 DEBUG org.apache.flink.runtime.metrics. MetricRegistryImpl [] - Registering metric taskmanager.job.task.operator.numBytesSend. 2023-07-24 08:41:30,385 ERROR org.apache.flink.connector.kafka.sink. KafkaCommitter [] - Unable to commit transaction (org.apache.flink.streaming.runtime.operators.sink.committables. CommitRequestImpl@47835f95) because it's in an invalid state. Most likely the transaction has been aborted for some reason. Please check the Kafka logs for more details. org.apache.kafka.common.errors.InvalidTxnStateException: The producer attempted a transactional operation in an invalid state. 2023-07-24 08:41:30,385 DEBUG org.apache.flink.runtime.metrics. MetricRegistryImpl [] - Registering metric taskmanager.job.task.operator.numBytesSend. 2023-07-24 08:41:30,385 DEBUG org.apache.flink.runtime.state. TaskStateManagerImpl [] - Operator b2a2e541ae5a67fa81a2d8b387225849 has remote state SubtaskState{operatorStateFromBackend=StateObjectCollection{[ OperatorStateHandle{stateNameToPartitionOffsets={writer_raw_states= StateMetaInfo{offsets=[233], distributionMode=SPLIT_DISTRIBUTE}}, delegateStateHandle=ByteStreamStateHandle{handleName= 'file:/opt/flink/shared/savepoints/savepoint-000000-fcc29062a521/e4aa5b34-3b1f-4f6b-b9a0-6dacb4d5408b', dataBytes=291}}]}, operatorStateFromStream=StateObjectCollection{[]}, keyedStateFromBackend=StateObjectCollection{[]}, keyedStateFromStream= StateObjectCollection{[]}, inputChannelState=StateObjectCollection{[]}, resultSubpartitionState=StateObjectCollection{[]}, stateSize=291, checkpointedSize=291} from job manager and local state alternatives [] from local state store org.apache.flink.runtime.state.NoOpTaskLocalStateStoreImpl @157a2949. בתאריך יום ב׳, 24 ביולי 2023 ב-5:31 מאת Shammon FY <zjur...@gmail.com >: > Hi nick, > > Is there any error log? That may help to analyze the root cause. > > On Sun, Jul 23, 2023 at 9:53 PM nick toker <nick.toker....@gmail.com> > wrote: > >> hello >> >> >> we replaced deprecated kafka producer with kafka sink >> and from time to time when we submit a job he stack for 5 min in >> inisazaing ( on sink operators) >> we verify the the transaction prefix is unique >> >> it's not happened when we use kafka producer >> >> What can be the reason? >> >>