[ https://issues.apache.org/jira/browse/KAFKA-14266?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ]
Yu Wang updated KAFKA-14266: ---------------------------- Description: The mirror task will keeping throwing this error when got a corrupt record {code:java} [2022-09-28 22:27:07,125] WARN Failure during poll. (org.apache.kafka.connect.mirror.MirrorSourceTask) org.apache.kafka.common.KafkaException: Received exception when fetching the next record from TOPIC-261. If needed, please seek past the record to continue consumption. at org.apache.kafka.clients.consumer.internals.Fetcher$CompletedFetch.fetchRecords(Fetcher.java:1536) at org.apache.kafka.clients.consumer.internals.Fetcher$CompletedFetch.access$1700(Fetcher.java:1391) at org.apache.kafka.clients.consumer.internals.Fetcher.fetchRecords(Fetcher.java:683) at org.apache.kafka.clients.consumer.internals.Fetcher.fetchedRecords(Fetcher.java:634) at org.apache.kafka.clients.consumer.KafkaConsumer.pollForFetches(KafkaConsumer.java:1289) at org.apache.kafka.clients.consumer.KafkaConsumer.poll(KafkaConsumer.java:1243) at org.apache.kafka.clients.consumer.KafkaConsumer.poll(KafkaConsumer.java:1216) at org.apache.kafka.connect.mirror.MirrorSourceTask.poll(MirrorSourceTask.java:137) at org.apache.kafka.connect.runtime.WorkerSourceTask.poll(WorkerSourceTask.java:272) at org.apache.kafka.connect.runtime.WorkerSourceTask.execute(WorkerSourceTask.java:239) at org.apache.kafka.connect.runtime.WorkerTask.doRun(WorkerTask.java:184) at org.apache.kafka.connect.runtime.WorkerTask.run(WorkerTask.java:234) at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511) at java.util.concurrent.FutureTask.run(FutureTask.java:266) at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149) at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624) at java.lang.Thread.run(Thread.java:748) Caused by: org.apache.kafka.common.KafkaException: Record batch for partition TOPIC-261 at offset 18665849419 is invalid, cause: Record is corrupt (stored crc = 4289549294, computed crc = 3792599753) at org.apache.kafka.clients.consumer.internals.Fetcher$CompletedFetch.maybeEnsureValid(Fetcher.java:1449) at org.apache.kafka.clients.consumer.internals.Fetcher$CompletedFetch.nextFetchedRecord(Fetcher.java:1493) at org.apache.kafka.clients.consumer.internals.Fetcher$CompletedFetch.fetchRecords(Fetcher.java:1550) at org.apache.kafka.clients.consumer.internals.Fetcher$CompletedFetch.access$1700(Fetcher.java:1391) at org.apache.kafka.clients.consumer.internals.Fetcher.fetchRecords(Fetcher.java:683) at org.apache.kafka.clients.consumer.internals.Fetcher.fetchedRecords(Fetcher.java:634) at org.apache.kafka.clients.consumer.KafkaConsumer.pollForFetches(KafkaConsumer.java:1314) ... 12 more {code} In the poll function of {*}MirrorSourceTask{*}, when the task got {*}KafkaException{*}, it only print a warn level log and return null. {code:java} @Override public List<SourceRecord> poll() { if (!consumerAccess.tryAcquire()) { return null; } if (stopping) { return null; } try { ConsumerRecords<byte[], byte[]> records = consumer.poll(pollTimeout); List<SourceRecord> sourceRecords = new ArrayList<>(records.count()); ... if (sourceRecords.isEmpty()) { // WorkerSourceTasks expects non-zero batch size return null; } else { log.trace("Polled {} records from {}.", sourceRecords.size(), records.partitions()); return sourceRecords; } } catch (WakeupException e) { return null; } catch (KafkaException e) { log.warn("Failure during poll.", e); return null; } catch (Throwable e) { log.error("Failure during poll.", e); // allow Connect to deal with the exception throw e; } finally { consumerAccess.release(); } } {code} In the next poll, the consumer will keep throwing exception because it has received a corrupt record. This makes the *MirrorSourceTask* cannot get next records and be blocked on the same offset. {code:java} private List<ConsumerRecord<K, V>> fetchRecords(int maxRecords) { // Error when fetching the next record before deserialization. if (corruptLastRecord) throw new KafkaException("Received exception when fetching the next record from " + partition + ". If needed, please seek past the record to " + "continue consumption.", cachedRecordException); ... } {code} As this issue will not have any metrics to alert, after the retention time reaches, the records after the corrupt record in the source topic will lost and cannot be mirrored again. So it would be better that the mirror source task can throw the exception or expose some metrics for users to alert this kind of issue. was: The mirror task will keeping throwing this error when got a corrupt record {code:java} [2022-09-28 22:27:07,125] WARN Failure during poll. (org.apache.kafka.connect.mirror.MirrorSourceTask) org.apache.kafka.common.KafkaException: Received exception when fetching the next record from TOPIC-261. If needed, please seek past the record to continue consumption. at org.apache.kafka.clients.consumer.internals.Fetcher$CompletedFetch.fetchRecords(Fetcher.java:1536) at org.apache.kafka.clients.consumer.internals.Fetcher$CompletedFetch.access$1700(Fetcher.java:1391) at org.apache.kafka.clients.consumer.internals.Fetcher.fetchRecords(Fetcher.java:683) at org.apache.kafka.clients.consumer.internals.Fetcher.fetchedRecords(Fetcher.java:634) at org.apache.kafka.clients.consumer.KafkaConsumer.pollForFetches(KafkaConsumer.java:1289) at org.apache.kafka.clients.consumer.KafkaConsumer.poll(KafkaConsumer.java:1243) at org.apache.kafka.clients.consumer.KafkaConsumer.poll(KafkaConsumer.java:1216) at org.apache.kafka.connect.mirror.MirrorSourceTask.poll(MirrorSourceTask.java:137) at org.apache.kafka.connect.runtime.WorkerSourceTask.poll(WorkerSourceTask.java:272) at org.apache.kafka.connect.runtime.WorkerSourceTask.execute(WorkerSourceTask.java:239) at org.apache.kafka.connect.runtime.WorkerTask.doRun(WorkerTask.java:184) at org.apache.kafka.connect.runtime.WorkerTask.run(WorkerTask.java:234) at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511) at java.util.concurrent.FutureTask.run(FutureTask.java:266) at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149) at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624) at java.lang.Thread.run(Thread.java:748) Caused by: org.apache.kafka.common.KafkaException: Record batch for partition TOPIC-261 at offset 18665849419 is invalid, cause: Record is corrupt (stored crc = 4289549294, computed crc = 3792599753) at org.apache.kafka.clients.consumer.internals.Fetcher$CompletedFetch.maybeEnsureValid(Fetcher.java:1449) at org.apache.kafka.clients.consumer.internals.Fetcher$CompletedFetch.nextFetchedRecord(Fetcher.java:1493) at org.apache.kafka.clients.consumer.internals.Fetcher$CompletedFetch.fetchRecords(Fetcher.java:1550) at org.apache.kafka.clients.consumer.internals.Fetcher$CompletedFetch.access$1700(Fetcher.java:1391) at org.apache.kafka.clients.consumer.internals.Fetcher.fetchRecords(Fetcher.java:683) at org.apache.kafka.clients.consumer.internals.Fetcher.fetchedRecords(Fetcher.java:634) at org.apache.kafka.clients.consumer.KafkaConsumer.pollForFetches(KafkaConsumer.java:1314) ... 12 more {code} In the poll function of {*}MirrorSourceTask{*}, when the task got *KafkaException* it only print a warn log and return null. {code:java} @Override public List<SourceRecord> poll() { if (!consumerAccess.tryAcquire()) { return null; } if (stopping) { return null; } try { ConsumerRecords<byte[], byte[]> records = consumer.poll(pollTimeout); List<SourceRecord> sourceRecords = new ArrayList<>(records.count()); ... if (sourceRecords.isEmpty()) { // WorkerSourceTasks expects non-zero batch size return null; } else { log.trace("Polled {} records from {}.", sourceRecords.size(), records.partitions()); return sourceRecords; } } catch (WakeupException e) { return null; } catch (KafkaException e) { log.warn("Failure during poll.", e); return null; } catch (Throwable e) { log.error("Failure during poll.", e); // allow Connect to deal with the exception throw e; } finally { consumerAccess.release(); } } {code} As the consumer will keep throwing exception when it receive a corrupt record. This makes the *MirrorSourceTask* cannot get next records and blocked on the same offset. {code:java} private List<ConsumerRecord<K, V>> fetchRecords(int maxRecords) { // Error when fetching the next record before deserialization. if (corruptLastRecord) throw new KafkaException("Received exception when fetching the next record from " + partition + ". If needed, please seek past the record to " + "continue consumption.", cachedRecordException); ... } {code} As this issue will not have any metrics to alert, after the retention time reaches, the records after the corrupt record in the source topic will lost and cannot be mirrored again. So it would be better that the mirror source task can throw the exception or expose some metrics for users to alert this kind of issue. > MirrorSourceTask will stop mirroring when get the corrupt record > ---------------------------------------------------------------- > > Key: KAFKA-14266 > URL: https://issues.apache.org/jira/browse/KAFKA-14266 > Project: Kafka > Issue Type: Improvement > Components: KafkaConnect > Affects Versions: 2.5.1, 3.2.3 > Reporter: Yu Wang > Priority: Critical > > The mirror task will keeping throwing this error when got a corrupt record > {code:java} > [2022-09-28 22:27:07,125] WARN Failure during poll. > (org.apache.kafka.connect.mirror.MirrorSourceTask) > org.apache.kafka.common.KafkaException: Received exception when fetching the > next record from TOPIC-261. If needed, please seek past the record to > continue consumption. > at > org.apache.kafka.clients.consumer.internals.Fetcher$CompletedFetch.fetchRecords(Fetcher.java:1536) > at > org.apache.kafka.clients.consumer.internals.Fetcher$CompletedFetch.access$1700(Fetcher.java:1391) > at > org.apache.kafka.clients.consumer.internals.Fetcher.fetchRecords(Fetcher.java:683) > at > org.apache.kafka.clients.consumer.internals.Fetcher.fetchedRecords(Fetcher.java:634) > at > org.apache.kafka.clients.consumer.KafkaConsumer.pollForFetches(KafkaConsumer.java:1289) > at > org.apache.kafka.clients.consumer.KafkaConsumer.poll(KafkaConsumer.java:1243) > at > org.apache.kafka.clients.consumer.KafkaConsumer.poll(KafkaConsumer.java:1216) > at > org.apache.kafka.connect.mirror.MirrorSourceTask.poll(MirrorSourceTask.java:137) > at > org.apache.kafka.connect.runtime.WorkerSourceTask.poll(WorkerSourceTask.java:272) > at > org.apache.kafka.connect.runtime.WorkerSourceTask.execute(WorkerSourceTask.java:239) > at > org.apache.kafka.connect.runtime.WorkerTask.doRun(WorkerTask.java:184) > at > org.apache.kafka.connect.runtime.WorkerTask.run(WorkerTask.java:234) > at > java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511) > at java.util.concurrent.FutureTask.run(FutureTask.java:266) > at > java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149) > at > java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624) > at java.lang.Thread.run(Thread.java:748) > Caused by: org.apache.kafka.common.KafkaException: Record batch for partition > TOPIC-261 at offset 18665849419 is invalid, cause: Record is corrupt (stored > crc = 4289549294, computed crc = 3792599753) > at > org.apache.kafka.clients.consumer.internals.Fetcher$CompletedFetch.maybeEnsureValid(Fetcher.java:1449) > at > org.apache.kafka.clients.consumer.internals.Fetcher$CompletedFetch.nextFetchedRecord(Fetcher.java:1493) > at > org.apache.kafka.clients.consumer.internals.Fetcher$CompletedFetch.fetchRecords(Fetcher.java:1550) > at > org.apache.kafka.clients.consumer.internals.Fetcher$CompletedFetch.access$1700(Fetcher.java:1391) > at > org.apache.kafka.clients.consumer.internals.Fetcher.fetchRecords(Fetcher.java:683) > at > org.apache.kafka.clients.consumer.internals.Fetcher.fetchedRecords(Fetcher.java:634) > at > org.apache.kafka.clients.consumer.KafkaConsumer.pollForFetches(KafkaConsumer.java:1314) > ... 12 more {code} > > In the poll function of {*}MirrorSourceTask{*}, when the task got > {*}KafkaException{*}, it only print a warn level log and return null. > {code:java} > @Override > public List<SourceRecord> poll() { > if (!consumerAccess.tryAcquire()) { > return null; > } > if (stopping) { > return null; > } > try { > ConsumerRecords<byte[], byte[]> records = consumer.poll(pollTimeout); > List<SourceRecord> sourceRecords = new ArrayList<>(records.count()); > ... > if (sourceRecords.isEmpty()) { > // WorkerSourceTasks expects non-zero batch size > return null; > } else { > log.trace("Polled {} records from {}.", sourceRecords.size(), > records.partitions()); > return sourceRecords; > } > } catch (WakeupException e) { > return null; > } catch (KafkaException e) { > log.warn("Failure during poll.", e); > return null; > } catch (Throwable e) { > log.error("Failure during poll.", e); > // allow Connect to deal with the exception > throw e; > } finally { > consumerAccess.release(); > } > } {code} > In the next poll, the consumer will keep throwing exception because it has > received a corrupt record. This makes the *MirrorSourceTask* cannot get next > records and be blocked on the same offset. > {code:java} > private List<ConsumerRecord<K, V>> fetchRecords(int maxRecords) { > // Error when fetching the next record before deserialization. > if (corruptLastRecord) > throw new KafkaException("Received exception when fetching the next > record from " + partition > + ". If needed, please seek past the > record to " > + "continue consumption.", > cachedRecordException); > ... > } {code} > As this issue will not have any metrics to alert, after the retention time > reaches, the records after the corrupt record in the source topic will lost > and cannot be mirrored again. > So it would be better that the mirror source task can throw the exception or > expose some metrics for users to alert this kind of issue. > -- This message was sent by Atlassian Jira (v8.20.10#820010)