Hi

1. we use flink 1.17.1
2. there is no traffic and data in the topics when it's happened
3

log part with the error:

2023-07-24 08:41:30,334 DEBUG org.apache.flink.connector.kafka.sink.
FlinkKafkaInternalProducer [] - commitTransaction telephony-decoder-
ORCHESTRATOR-MULTIMEDIA-IN-8-10
2023-07-24 08:41:30,378 ERROR org.apache.flink.connector.kafka.sink.
KafkaCommitter [] - Unable to commit transaction
(org.apache.flink.streaming.runtime.operators.sink.committables.
CommitRequestImpl@24e0e43c) because it's in an invalid state. Most likely
the transaction has been aborted for some reason. Please check the Kafka
logs for more details.
org.apache.kafka.common.errors.InvalidTxnStateException: The producer
attempted a transactional operation in an invalid state.
2023-07-24 08:41:30,378 ERROR org.apache.flink.connector.kafka.sink.
KafkaCommitter [] - Unable to commit transaction
(org.apache.flink.streaming.runtime.operators.sink.committables.
CommitRequestImpl@55c55f14) because it's in an invalid state. Most likely
the transaction has been aborted for some reason. Please check the Kafka
logs for more details.
org.apache.kafka.common.errors.InvalidTxnStateException: The producer
attempted a transactional operation in an invalid state.
2023-07-24 08:41:30,380 DEBUG org.apache.flink.runtime.state.
TaskStateManagerImpl [] - Operator b2a2e541ae5a67fa81a2d8b387225849 has
remote state SubtaskState{operatorStateFromBackend=StateObjectCollection{[
OperatorStateHandle{stateNameToPartitionOffsets={writer_raw_states=
StateMetaInfo{offsets=[233], distributionMode=SPLIT_DISTRIBUTE}},
delegateStateHandle=ByteStreamStateHandle{handleName=
'file:/opt/flink/shared/savepoints/savepoint-000000-fcc29062a521/238f2a1e-aa7e-4218-843a-b469a5dcbad8',
dataBytes=291}}]}, operatorStateFromStream=StateObjectCollection{[]},
keyedStateFromBackend=StateObjectCollection{[]}, keyedStateFromStream=
StateObjectCollection{[]}, inputChannelState=StateObjectCollection{[]},
resultSubpartitionState=StateObjectCollection{[]}, stateSize=291,
checkpointedSize=291} from job manager and local state alternatives [] from
local state store org.apache.flink.runtime.state.NoOpTaskLocalStateStoreImpl
@42b00150.
2023-07-24 08:41:30,380 DEBUG org.apache.flink.runtime.state.
TaskStateManagerImpl [] - Operator b2a2e541ae5a67fa81a2d8b387225849 has
remote state SubtaskState{operatorStateFromBackend=StateObjectCollection{[
OperatorStateHandle{stateNameToPartitionOffsets={writer_raw_states=
StateMetaInfo{offsets=[233], distributionMode=SPLIT_DISTRIBUTE}},
delegateStateHandle=ByteStreamStateHandle{handleName=
'file:/opt/flink/shared/savepoints/savepoint-000000-fcc29062a521/e2d3916f-f9f9-40e4-89bc-bc193fbd5373',
dataBytes=291}}]}, operatorStateFromStream=StateObjectCollection{[]},
keyedStateFromBackend=StateObjectCollection{[]}, keyedStateFromStream=
StateObjectCollection{[]}, inputChannelState=StateObjectCollection{[]},
resultSubpartitionState=StateObjectCollection{[]}, stateSize=291,
checkpointedSize=291} from job manager and local state alternatives [] from
local state store org.apache.flink.runtime.state.NoOpTaskLocalStateStoreImpl
@32734da0.
2023-07-24 08:41:30,380 DEBUG org.apache.flink.streaming.api.operators.
BackendRestorerProcedure [] - Creating operator state backend for
SinkWriterOperator_b2a2e541ae5a67fa81a2d8b387225849_(12/20) and restoring
with state from alternative (1/1).
2023-07-24 08:41:30,381 DEBUG org.apache.flink.streaming.api.operators.
BackendRestorerProcedure [] - Creating operator state backend for
SinkWriterOperator_b2a2e541ae5a67fa81a2d8b387225849_(14/20) and restoring
with state from alternative (1/1).
2023-07-24 08:41:30,381 ERROR org.apache.flink.connector.kafka.sink.
KafkaCommitter [] - Unable to commit transaction
(org.apache.flink.streaming.runtime.operators.sink.committables.
CommitRequestImpl@75581079) because it's in an invalid state. Most likely
the transaction has been aborted for some reason. Please check the Kafka
logs for more details.
org.apache.kafka.common.errors.InvalidTxnStateException: The producer
attempted a transactional operation in an invalid state.
2023-07-24 08:41:30,382 DEBUG org.apache.flink.runtime.state.
TaskStateManagerImpl [] - Operator b2a2e541ae5a67fa81a2d8b387225849 has
remote state SubtaskState{operatorStateFromBackend=StateObjectCollection{[
OperatorStateHandle{stateNameToPartitionOffsets={writer_raw_states=
StateMetaInfo{offsets=[233], distributionMode=SPLIT_DISTRIBUTE}},
delegateStateHandle=ByteStreamStateHandle{handleName=
'file:/opt/flink/shared/savepoints/savepoint-000000-fcc29062a521/66a3e58e-0f8c-4c09-9fc3-04be0d3388dc',
dataBytes=291}}]}, operatorStateFromStream=StateObjectCollection{[]},
keyedStateFromBackend=StateObjectCollection{[]}, keyedStateFromStream=
StateObjectCollection{[]}, inputChannelState=StateObjectCollection{[]},
resultSubpartitionState=StateObjectCollection{[]}, stateSize=291,
checkpointedSize=291} from job manager and local state alternatives [] from
local state store org.apache.flink.runtime.state.NoOpTaskLocalStateStoreImpl
@5c241b41.
2023-07-24 08:41:30,382 DEBUG org.apache.flink.streaming.api.operators.
BackendRestorerProcedure [] - Creating operator state backend for
SinkWriterOperator_b2a2e541ae5a67fa81a2d8b387225849_(5/20) and restoring
with state from alternative (1/1).
2023-07-24 08:41:30,382 DEBUG org.apache.flink.runtime.metrics.
MetricRegistryImpl [] - Registering metric
taskmanager.job.task.operator.numRecordsOutErrors.
2023-07-24 08:41:30,382 DEBUG org.apache.flink.runtime.metrics.
MetricRegistryImpl [] - Registering metric
taskmanager.job.task.operator.numRecordsOutErrors.
2023-07-24 08:41:30,383 DEBUG org.apache.flink.runtime.metrics.
MetricRegistryImpl [] - Registering metric
taskmanager.job.task.operator.numRecordsSendErrors.
2023-07-24 08:41:30,383 DEBUG org.apache.flink.runtime.metrics.
MetricRegistryImpl [] - Registering metric
taskmanager.job.task.operator.numRecordsOutErrors.
2023-07-24 08:41:30,383 DEBUG org.apache.flink.runtime.metrics.
MetricRegistryImpl [] - Registering metric
taskmanager.job.task.operator.numRecordsSendErrors.
2023-07-24 08:41:30,383 DEBUG org.apache.flink.runtime.metrics.
MetricRegistryImpl [] - Registering metric
taskmanager.job.task.operator.numRecordsSend.
2023-07-24 08:41:30,383 DEBUG org.apache.flink.runtime.metrics.
MetricRegistryImpl [] - Registering metric
taskmanager.job.task.operator.numRecordsSend.
2023-07-24 08:41:30,384 DEBUG org.apache.flink.runtime.metrics.
MetricRegistryImpl [] - Registering metric
taskmanager.job.task.operator.numBytesSend.
2023-07-24 08:41:30,384 DEBUG org.apache.flink.runtime.metrics.
MetricRegistryImpl [] - Registering metric
taskmanager.job.task.operator.numRecordsSendErrors.
2023-07-24 08:41:30,384 DEBUG org.apache.flink.runtime.metrics.
MetricRegistryImpl [] - Registering metric
taskmanager.job.task.operator.numRecordsSend.
2023-07-24 08:41:30,384 DEBUG org.apache.flink.runtime.metrics.
MetricRegistryImpl [] - Registering metric
taskmanager.job.task.operator.numBytesSend.
2023-07-24 08:41:30,385 ERROR org.apache.flink.connector.kafka.sink.
KafkaCommitter [] - Unable to commit transaction
(org.apache.flink.streaming.runtime.operators.sink.committables.
CommitRequestImpl@47835f95) because it's in an invalid state. Most likely
the transaction has been aborted for some reason. Please check the Kafka
logs for more details.
org.apache.kafka.common.errors.InvalidTxnStateException: The producer
attempted a transactional operation in an invalid state.
2023-07-24 08:41:30,385 DEBUG org.apache.flink.runtime.metrics.
MetricRegistryImpl [] - Registering metric
taskmanager.job.task.operator.numBytesSend.
2023-07-24 08:41:30,385 DEBUG org.apache.flink.runtime.state.
TaskStateManagerImpl [] - Operator b2a2e541ae5a67fa81a2d8b387225849 has
remote state SubtaskState{operatorStateFromBackend=StateObjectCollection{[
OperatorStateHandle{stateNameToPartitionOffsets={writer_raw_states=
StateMetaInfo{offsets=[233], distributionMode=SPLIT_DISTRIBUTE}},
delegateStateHandle=ByteStreamStateHandle{handleName=
'file:/opt/flink/shared/savepoints/savepoint-000000-fcc29062a521/e4aa5b34-3b1f-4f6b-b9a0-6dacb4d5408b',
dataBytes=291}}]}, operatorStateFromStream=StateObjectCollection{[]},
keyedStateFromBackend=StateObjectCollection{[]}, keyedStateFromStream=
StateObjectCollection{[]}, inputChannelState=StateObjectCollection{[]},
resultSubpartitionState=StateObjectCollection{[]}, stateSize=291,
checkpointedSize=291} from job manager and local state alternatives [] from
local state store org.apache.flink.runtime.state.NoOpTaskLocalStateStoreImpl
@157a2949.


‫בתאריך יום ב׳, 24 ביולי 2023 ב-5:31 מאת ‪Shammon FY‬‏ <‪zjur...@gmail.com
‬‏>:‬

> Hi nick,
>
> Is there any error log? That may help to analyze the root cause.
>
> On Sun, Jul 23, 2023 at 9:53 PM nick toker <nick.toker....@gmail.com>
> wrote:
>
>> hello
>>
>>
>> we replaced deprecated kafka producer with kafka sink
>> and from time to time when we submit a job he stack for 5 min in
>> inisazaing ( on sink operators)
>> we verify the the transaction prefix is unique
>>
>> it's not happened when we use kafka producer
>>
>> What can be the reason?
>>
>>

Reply via email to