Yu Wang created KAFKA-14266:
-------------------------------
Summary: MirrorSourceTask will stop mirroring when get the corrupt
record
Key: KAFKA-14266
URL: https://issues.apache.org/jira/browse/KAFKA-14266
Project: Kafka
Issue Type: Improvement
Components: KafkaConnect
Affects Versions: 3.2.3, 2.5.1
Reporter: Yu Wang
The mirror task will keeping throwing this error when got a corrupt record
{code:java}
[2022-09-28 22:27:07,125] WARN Failure during poll.
(org.apache.kafka.connect.mirror.MirrorSourceTask)
org.apache.kafka.common.KafkaException: Received exception when fetching the
next record from TOPIC-261. If needed, please seek past the record to continue
consumption.
at
org.apache.kafka.clients.consumer.internals.Fetcher$CompletedFetch.fetchRecords(Fetcher.java:1536)
at
org.apache.kafka.clients.consumer.internals.Fetcher$CompletedFetch.access$1700(Fetcher.java:1391)
at
org.apache.kafka.clients.consumer.internals.Fetcher.fetchRecords(Fetcher.java:683)
at
org.apache.kafka.clients.consumer.internals.Fetcher.fetchedRecords(Fetcher.java:634)
at
org.apache.kafka.clients.consumer.KafkaConsumer.pollForFetches(KafkaConsumer.java:1289)
at
org.apache.kafka.clients.consumer.KafkaConsumer.poll(KafkaConsumer.java:1243)
at
org.apache.kafka.clients.consumer.KafkaConsumer.poll(KafkaConsumer.java:1216)
at
org.apache.kafka.connect.mirror.MirrorSourceTask.poll(MirrorSourceTask.java:137)
at
org.apache.kafka.connect.runtime.WorkerSourceTask.poll(WorkerSourceTask.java:272)
at
org.apache.kafka.connect.runtime.WorkerSourceTask.execute(WorkerSourceTask.java:239)
at
org.apache.kafka.connect.runtime.WorkerTask.doRun(WorkerTask.java:184)
at org.apache.kafka.connect.runtime.WorkerTask.run(WorkerTask.java:234)
at
java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
at java.util.concurrent.FutureTask.run(FutureTask.java:266)
at
java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
at
java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
at java.lang.Thread.run(Thread.java:748)
Caused by: org.apache.kafka.common.KafkaException: Record batch for partition
TOPIC-261 at offset 18665849419 is invalid, cause: Record is corrupt (stored
crc = 4289549294, computed crc = 3792599753)
at
org.apache.kafka.clients.consumer.internals.Fetcher$CompletedFetch.maybeEnsureValid(Fetcher.java:1449)
at
org.apache.kafka.clients.consumer.internals.Fetcher$CompletedFetch.nextFetchedRecord(Fetcher.java:1493)
at
org.apache.kafka.clients.consumer.internals.Fetcher$CompletedFetch.fetchRecords(Fetcher.java:1550)
at
org.apache.kafka.clients.consumer.internals.Fetcher$CompletedFetch.access$1700(Fetcher.java:1391)
at
org.apache.kafka.clients.consumer.internals.Fetcher.fetchRecords(Fetcher.java:683)
at
org.apache.kafka.clients.consumer.internals.Fetcher.fetchedRecords(Fetcher.java:634)
at
org.apache.kafka.clients.consumer.KafkaConsumer.pollForFetches(KafkaConsumer.java:1314)
... 12 more {code}
In the poll function of {*}MirrorSourceTask{*}, when the task got
*KafkaException* it only print a warn log and return null.
{code:java}
@Override
public List<SourceRecord> poll() {
if (!consumerAccess.tryAcquire()) {
return null;
}
if (stopping) {
return null;
}
try {
ConsumerRecords<byte[], byte[]> records = consumer.poll(pollTimeout);
List<SourceRecord> sourceRecords = new ArrayList<>(records.count());
...
if (sourceRecords.isEmpty()) {
// WorkerSourceTasks expects non-zero batch size
return null;
} else {
log.trace("Polled {} records from {}.", sourceRecords.size(),
records.partitions());
return sourceRecords;
}
} catch (WakeupException e) {
return null;
} catch (KafkaException e) {
log.warn("Failure during poll.", e);
return null;
} catch (Throwable e) {
log.error("Failure during poll.", e);
// allow Connect to deal with the exception
throw e;
} finally {
consumerAccess.release();
}
} {code}
As the consumer will keep throwing exception when it receive a corrupt record.
This makes the *MirrorSourceTask* cannot get next records and blocked on the
same offset.
{code:java}
private List<ConsumerRecord<K, V>> fetchRecords(int maxRecords) {
// Error when fetching the next record before deserialization.
if (corruptLastRecord)
throw new KafkaException("Received exception when fetching the next
record from " + partition
+ ". If needed, please seek past the
record to "
+ "continue consumption.",
cachedRecordException);
...
} {code}
As this issue will not have any metrics to alert, after the retention time
reaches, the records after the corrupt record in the source topic will lost and
cannot be mirrored again.
So it would be better that the mirror source task can throw the exception or
expose some metrics for users to alert this kind of issue.
--
This message was sent by Atlassian Jira
(v8.20.10#820010)