[ 
https://issues.apache.org/jira/browse/KAFKA-14266?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Yu Wang reassigned KAFKA-14266:
-------------------------------

    Assignee: Yu Wang

> MirrorSourceTask will stop mirroring when get corrupt record
> ------------------------------------------------------------
>
>                 Key: KAFKA-14266
>                 URL: https://issues.apache.org/jira/browse/KAFKA-14266
>             Project: Kafka
>          Issue Type: Bug
>          Components: KafkaConnect
>    Affects Versions: 2.5.1, 3.2.3
>            Reporter: Yu Wang
>            Assignee: Yu Wang
>            Priority: Critical
>
> The mirror task will keeping throwing this error when got a corrupt record
> {code:java}
> [2022-09-28 22:27:07,125] WARN Failure during poll. 
> (org.apache.kafka.connect.mirror.MirrorSourceTask)
> org.apache.kafka.common.KafkaException: Received exception when fetching the 
> next record from TOPIC-261. If needed, please seek past the record to 
> continue consumption.
>         at 
> org.apache.kafka.clients.consumer.internals.Fetcher$CompletedFetch.fetchRecords(Fetcher.java:1536)
>         at 
> org.apache.kafka.clients.consumer.internals.Fetcher$CompletedFetch.access$1700(Fetcher.java:1391)
>         at 
> org.apache.kafka.clients.consumer.internals.Fetcher.fetchRecords(Fetcher.java:683)
>         at 
> org.apache.kafka.clients.consumer.internals.Fetcher.fetchedRecords(Fetcher.java:634)
>         at 
> org.apache.kafka.clients.consumer.KafkaConsumer.pollForFetches(KafkaConsumer.java:1289)
>         at 
> org.apache.kafka.clients.consumer.KafkaConsumer.poll(KafkaConsumer.java:1243)
>         at 
> org.apache.kafka.clients.consumer.KafkaConsumer.poll(KafkaConsumer.java:1216)
>         at 
> org.apache.kafka.connect.mirror.MirrorSourceTask.poll(MirrorSourceTask.java:137)
>         at 
> org.apache.kafka.connect.runtime.WorkerSourceTask.poll(WorkerSourceTask.java:272)
>         at 
> org.apache.kafka.connect.runtime.WorkerSourceTask.execute(WorkerSourceTask.java:239)
>         at 
> org.apache.kafka.connect.runtime.WorkerTask.doRun(WorkerTask.java:184)
>         at 
> org.apache.kafka.connect.runtime.WorkerTask.run(WorkerTask.java:234)
>         at 
> java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
>         at java.util.concurrent.FutureTask.run(FutureTask.java:266)
>         at 
> java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
>         at 
> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
>         at java.lang.Thread.run(Thread.java:748)
> Caused by: org.apache.kafka.common.KafkaException: Record batch for partition 
> TOPIC-261 at offset 18665849419 is invalid, cause: Record is corrupt (stored 
> crc = 4289549294, computed crc = 3792599753)
>         at 
> org.apache.kafka.clients.consumer.internals.Fetcher$CompletedFetch.maybeEnsureValid(Fetcher.java:1449)
>         at 
> org.apache.kafka.clients.consumer.internals.Fetcher$CompletedFetch.nextFetchedRecord(Fetcher.java:1493)
>         at 
> org.apache.kafka.clients.consumer.internals.Fetcher$CompletedFetch.fetchRecords(Fetcher.java:1550)
>         at 
> org.apache.kafka.clients.consumer.internals.Fetcher$CompletedFetch.access$1700(Fetcher.java:1391)
>         at 
> org.apache.kafka.clients.consumer.internals.Fetcher.fetchRecords(Fetcher.java:683)
>         at 
> org.apache.kafka.clients.consumer.internals.Fetcher.fetchedRecords(Fetcher.java:634)
>         at 
> org.apache.kafka.clients.consumer.KafkaConsumer.pollForFetches(KafkaConsumer.java:1314)
>         ... 12 more {code}
>  
> In the poll function of {*}MirrorSourceTask{*}, when the task got 
> {*}KafkaException{*}, it only print a warn level log and return null.
> {code:java}
> @Override
> public List<SourceRecord> poll() {
>     if (!consumerAccess.tryAcquire()) {
>         return null;
>     }
>     if (stopping) {
>         return null;
>     }
>     try {
>         ConsumerRecords<byte[], byte[]> records = consumer.poll(pollTimeout);
>         List<SourceRecord> sourceRecords = new ArrayList<>(records.count());
>         ...
>         if (sourceRecords.isEmpty()) {
>             // WorkerSourceTasks expects non-zero batch size
>             return null;
>         } else {
>             log.trace("Polled {} records from {}.", sourceRecords.size(), 
> records.partitions());
>             return sourceRecords;
>         }
>     } catch (WakeupException e) {
>         return null;
>     } catch (KafkaException e) {
>         log.warn("Failure during poll.", e);
>         return null;
>     } catch (Throwable e)  {
>         log.error("Failure during poll.", e);
>         // allow Connect to deal with the exception
>         throw e;
>     } finally {
>         consumerAccess.release();
>     }
> } {code}
> In the next poll round, the consumer will keep throwing exception because it 
> has received a corrupt record. Which makes the  *MirrorSourceTask* cannot get 
> next records and be blocked on the same offset.
> {code:java}
> private List<ConsumerRecord<K, V>> fetchRecords(int maxRecords) {
>     // Error when fetching the next record before deserialization.
>     if (corruptLastRecord)
>         throw new KafkaException("Received exception when fetching the next 
> record from " + partition
>                                      + ". If needed, please seek past the 
> record to "
>                                      + "continue consumption.", 
> cachedRecordException);
> ...
> } {code}
> As this issue will not have any metrics to alert, after the retention time 
> reaches, the records after the corrupt record in the source topic will lost 
> and cannot be mirrored again.
> So it would be better that the *MirrorSourceTask* can throw the exception or 
> expose some metrics for users to alert this kind of issue.
>  



--
This message was sent by Atlassian Jira
(v8.20.10#820010)

Reply via email to