[ https://issues.apache.org/jira/browse/KAFKA-14266?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ]
Yu Wang reassigned KAFKA-14266: ------------------------------- Assignee: Yu Wang > MirrorSourceTask will stop mirroring when get corrupt record > ------------------------------------------------------------ > > Key: KAFKA-14266 > URL: https://issues.apache.org/jira/browse/KAFKA-14266 > Project: Kafka > Issue Type: Bug > Components: KafkaConnect > Affects Versions: 2.5.1, 3.2.3 > Reporter: Yu Wang > Assignee: Yu Wang > Priority: Critical > > The mirror task will keeping throwing this error when got a corrupt record > {code:java} > [2022-09-28 22:27:07,125] WARN Failure during poll. > (org.apache.kafka.connect.mirror.MirrorSourceTask) > org.apache.kafka.common.KafkaException: Received exception when fetching the > next record from TOPIC-261. If needed, please seek past the record to > continue consumption. > at > org.apache.kafka.clients.consumer.internals.Fetcher$CompletedFetch.fetchRecords(Fetcher.java:1536) > at > org.apache.kafka.clients.consumer.internals.Fetcher$CompletedFetch.access$1700(Fetcher.java:1391) > at > org.apache.kafka.clients.consumer.internals.Fetcher.fetchRecords(Fetcher.java:683) > at > org.apache.kafka.clients.consumer.internals.Fetcher.fetchedRecords(Fetcher.java:634) > at > org.apache.kafka.clients.consumer.KafkaConsumer.pollForFetches(KafkaConsumer.java:1289) > at > org.apache.kafka.clients.consumer.KafkaConsumer.poll(KafkaConsumer.java:1243) > at > org.apache.kafka.clients.consumer.KafkaConsumer.poll(KafkaConsumer.java:1216) > at > org.apache.kafka.connect.mirror.MirrorSourceTask.poll(MirrorSourceTask.java:137) > at > org.apache.kafka.connect.runtime.WorkerSourceTask.poll(WorkerSourceTask.java:272) > at > org.apache.kafka.connect.runtime.WorkerSourceTask.execute(WorkerSourceTask.java:239) > at > org.apache.kafka.connect.runtime.WorkerTask.doRun(WorkerTask.java:184) > at > org.apache.kafka.connect.runtime.WorkerTask.run(WorkerTask.java:234) > at > java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511) > at java.util.concurrent.FutureTask.run(FutureTask.java:266) > at > java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149) > at > java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624) > at java.lang.Thread.run(Thread.java:748) > Caused by: org.apache.kafka.common.KafkaException: Record batch for partition > TOPIC-261 at offset 18665849419 is invalid, cause: Record is corrupt (stored > crc = 4289549294, computed crc = 3792599753) > at > org.apache.kafka.clients.consumer.internals.Fetcher$CompletedFetch.maybeEnsureValid(Fetcher.java:1449) > at > org.apache.kafka.clients.consumer.internals.Fetcher$CompletedFetch.nextFetchedRecord(Fetcher.java:1493) > at > org.apache.kafka.clients.consumer.internals.Fetcher$CompletedFetch.fetchRecords(Fetcher.java:1550) > at > org.apache.kafka.clients.consumer.internals.Fetcher$CompletedFetch.access$1700(Fetcher.java:1391) > at > org.apache.kafka.clients.consumer.internals.Fetcher.fetchRecords(Fetcher.java:683) > at > org.apache.kafka.clients.consumer.internals.Fetcher.fetchedRecords(Fetcher.java:634) > at > org.apache.kafka.clients.consumer.KafkaConsumer.pollForFetches(KafkaConsumer.java:1314) > ... 12 more {code} > > In the poll function of {*}MirrorSourceTask{*}, when the task got > {*}KafkaException{*}, it only print a warn level log and return null. > {code:java} > @Override > public List<SourceRecord> poll() { > if (!consumerAccess.tryAcquire()) { > return null; > } > if (stopping) { > return null; > } > try { > ConsumerRecords<byte[], byte[]> records = consumer.poll(pollTimeout); > List<SourceRecord> sourceRecords = new ArrayList<>(records.count()); > ... > if (sourceRecords.isEmpty()) { > // WorkerSourceTasks expects non-zero batch size > return null; > } else { > log.trace("Polled {} records from {}.", sourceRecords.size(), > records.partitions()); > return sourceRecords; > } > } catch (WakeupException e) { > return null; > } catch (KafkaException e) { > log.warn("Failure during poll.", e); > return null; > } catch (Throwable e) { > log.error("Failure during poll.", e); > // allow Connect to deal with the exception > throw e; > } finally { > consumerAccess.release(); > } > } {code} > In the next poll round, the consumer will keep throwing exception because it > has received a corrupt record. Which makes the *MirrorSourceTask* cannot get > next records and be blocked on the same offset. > {code:java} > private List<ConsumerRecord<K, V>> fetchRecords(int maxRecords) { > // Error when fetching the next record before deserialization. > if (corruptLastRecord) > throw new KafkaException("Received exception when fetching the next > record from " + partition > + ". If needed, please seek past the > record to " > + "continue consumption.", > cachedRecordException); > ... > } {code} > As this issue will not have any metrics to alert, after the retention time > reaches, the records after the corrupt record in the source topic will lost > and cannot be mirrored again. > So it would be better that the *MirrorSourceTask* can throw the exception or > expose some metrics for users to alert this kind of issue. > -- This message was sent by Atlassian Jira (v8.20.10#820010)