[
https://issues.apache.org/jira/browse/KAFKA-14266?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
]
Yu Wang resolved KAFKA-14266.
-----------------------------
Resolution: Works for Me
> MirrorSourceTask will stop mirroring when get corrupt record
> ------------------------------------------------------------
>
> Key: KAFKA-14266
> URL: https://issues.apache.org/jira/browse/KAFKA-14266
> Project: Kafka
> Issue Type: Bug
> Components: KafkaConnect
> Affects Versions: 2.5.1, 3.2.3
> Reporter: Yu Wang
> Assignee: Yu Wang
> Priority: Critical
>
> The mirror task will keeping throwing this error when got a corrupt record
> {code:java}
> [2022-09-28 22:27:07,125] WARN Failure during poll.
> (org.apache.kafka.connect.mirror.MirrorSourceTask)
> org.apache.kafka.common.KafkaException: Received exception when fetching the
> next record from TOPIC-261. If needed, please seek past the record to
> continue consumption.
> at
> org.apache.kafka.clients.consumer.internals.Fetcher$CompletedFetch.fetchRecords(Fetcher.java:1536)
> at
> org.apache.kafka.clients.consumer.internals.Fetcher$CompletedFetch.access$1700(Fetcher.java:1391)
> at
> org.apache.kafka.clients.consumer.internals.Fetcher.fetchRecords(Fetcher.java:683)
> at
> org.apache.kafka.clients.consumer.internals.Fetcher.fetchedRecords(Fetcher.java:634)
> at
> org.apache.kafka.clients.consumer.KafkaConsumer.pollForFetches(KafkaConsumer.java:1289)
> at
> org.apache.kafka.clients.consumer.KafkaConsumer.poll(KafkaConsumer.java:1243)
> at
> org.apache.kafka.clients.consumer.KafkaConsumer.poll(KafkaConsumer.java:1216)
> at
> org.apache.kafka.connect.mirror.MirrorSourceTask.poll(MirrorSourceTask.java:137)
> at
> org.apache.kafka.connect.runtime.WorkerSourceTask.poll(WorkerSourceTask.java:272)
> at
> org.apache.kafka.connect.runtime.WorkerSourceTask.execute(WorkerSourceTask.java:239)
> at
> org.apache.kafka.connect.runtime.WorkerTask.doRun(WorkerTask.java:184)
> at
> org.apache.kafka.connect.runtime.WorkerTask.run(WorkerTask.java:234)
> at
> java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
> at java.util.concurrent.FutureTask.run(FutureTask.java:266)
> at
> java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
> at
> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
> at java.lang.Thread.run(Thread.java:748)
> Caused by: org.apache.kafka.common.KafkaException: Record batch for partition
> TOPIC-261 at offset 18665849419 is invalid, cause: Record is corrupt (stored
> crc = 4289549294, computed crc = 3792599753)
> at
> org.apache.kafka.clients.consumer.internals.Fetcher$CompletedFetch.maybeEnsureValid(Fetcher.java:1449)
> at
> org.apache.kafka.clients.consumer.internals.Fetcher$CompletedFetch.nextFetchedRecord(Fetcher.java:1493)
> at
> org.apache.kafka.clients.consumer.internals.Fetcher$CompletedFetch.fetchRecords(Fetcher.java:1550)
> at
> org.apache.kafka.clients.consumer.internals.Fetcher$CompletedFetch.access$1700(Fetcher.java:1391)
> at
> org.apache.kafka.clients.consumer.internals.Fetcher.fetchRecords(Fetcher.java:683)
> at
> org.apache.kafka.clients.consumer.internals.Fetcher.fetchedRecords(Fetcher.java:634)
> at
> org.apache.kafka.clients.consumer.KafkaConsumer.pollForFetches(KafkaConsumer.java:1314)
> ... 12 more {code}
>
> In the poll function of {*}MirrorSourceTask{*}, when the task got
> {*}KafkaException{*}, it only print a warn level log and return null.
> {code:java}
> @Override
> public List<SourceRecord> poll() {
> if (!consumerAccess.tryAcquire()) {
> return null;
> }
> if (stopping) {
> return null;
> }
> try {
> ConsumerRecords<byte[], byte[]> records = consumer.poll(pollTimeout);
> List<SourceRecord> sourceRecords = new ArrayList<>(records.count());
> ...
> if (sourceRecords.isEmpty()) {
> // WorkerSourceTasks expects non-zero batch size
> return null;
> } else {
> log.trace("Polled {} records from {}.", sourceRecords.size(),
> records.partitions());
> return sourceRecords;
> }
> } catch (WakeupException e) {
> return null;
> } catch (KafkaException e) {
> log.warn("Failure during poll.", e);
> return null;
> } catch (Throwable e) {
> log.error("Failure during poll.", e);
> // allow Connect to deal with the exception
> throw e;
> } finally {
> consumerAccess.release();
> }
> } {code}
> In the next poll round, the consumer will keep throwing exception because it
> has received a corrupt record. Which makes the *MirrorSourceTask* cannot get
> next records and be blocked on the same offset.
> {code:java}
> private List<ConsumerRecord<K, V>> fetchRecords(int maxRecords) {
> // Error when fetching the next record before deserialization.
> if (corruptLastRecord)
> throw new KafkaException("Received exception when fetching the next
> record from " + partition
> + ". If needed, please seek past the
> record to "
> + "continue consumption.",
> cachedRecordException);
> ...
> } {code}
> As this issue will not have any metrics to alert, after the retention time
> reaches, the records after the corrupt record in the source topic will lost
> and cannot be mirrored again.
> So it would be better that the *MirrorSourceTask* can throw the exception or
> expose some metrics for users to alert this kind of issue.
>
--
This message was sent by Atlassian Jira
(v8.20.10#820010)