junrao commented on code in PR #20138:
URL: https://github.com/apache/kafka/pull/20138#discussion_r2198336561


##########
clients/src/main/java/org/apache/kafka/clients/producer/internals/Sender.java:
##########
@@ -884,6 +897,8 @@ private void sendProduceRequest(long now, int destination, 
short acks, int timeo
                     .setIndex(tp.partition())
                     .setRecords(records));
             recordsByPartition.put(tp, batch);
+
+            sensors.updateInconsistentTopics(batch, bufferSupplier);

Review Comment:
   Could we pass in `tp` and `records` instead of batch to avoid any potential 
visibility issue on batch?



##########
clients/src/main/java/org/apache/kafka/clients/producer/internals/Sender.java:
##########
@@ -1054,6 +1077,54 @@ public void updateProduceRequestMetrics(Map<Integer, 
List<ProducerBatch>> batche
             }
         }
 
+        /**
+         * Iterate over all the {@link Record}s in the batch and checks the 
{@link Header}s for
+         * {@link #TOPIC_NAME_HEADER_NAME}. If the header is present, check 
its topic name against the
+         * {@link ProducerBatch#topicPartition}'s topic name. If it doesn't 
match, increment the metrics to
+         * track the inconsistency.
+         */
+        public void updateInconsistentTopics(ProducerBatch producerBatch, 
BufferSupplier bufferSupplier) {
+            for (RecordBatch recordBatch : producerBatch.records().batches()) {
+                if (recordBatch.isControlBatch())
+                    continue;
+
+                try (CloseableIterator<Record> recordIterator = 
recordBatch.streamingIterator(bufferSupplier)) {
+                    while (recordIterator.hasNext()) {
+                        Record record = recordIterator.next();
+                        Header[] headers = record.headers();
+
+                        if (headers == null)
+                            continue;
+
+                        for (Header header : headers) {
+                            if (!header.key().equals(TOPIC_NAME_HEADER_NAME))
+                                continue;
+
+                            String headerTopicName = new 
String(header.value(), StandardCharsets.UTF_8);
+
+                            if 
(headerTopicName.equals(producerBatch.topicPartition.topic()))
+                                continue;
+
+                            log.warn(
+                                "A topic mismatch was detected! ProducerBatch 
topic: {}, Record header {} topic: {}",

Review Comment:
   Record header {} topic: {}  => Record header {}: {} since the header already 
contains topic.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org

Reply via email to