artemlivshits commented on code in PR #20146: URL: https://github.com/apache/kafka/pull/20146#discussion_r2240912984
########## clients/src/main/java/org/apache/kafka/clients/producer/internals/Kafka19012Instrumentation.java: ########## @@ -0,0 +1,316 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.kafka.clients.producer.internals; + +import org.apache.kafka.clients.producer.Callback; +import org.apache.kafka.clients.producer.Producer; +import org.apache.kafka.clients.producer.ProducerRecord; +import org.apache.kafka.common.MetricName; +import org.apache.kafka.common.MetricNameTemplate; +import org.apache.kafka.common.header.Header; +import org.apache.kafka.common.header.internals.RecordHeaders; +import org.apache.kafka.common.metrics.Metrics; +import org.apache.kafka.common.metrics.Sensor; +import org.apache.kafka.common.metrics.stats.Meter; +import org.apache.kafka.common.utils.LogContext; +import org.apache.kafka.common.utils.Utils; +import org.slf4j.Logger; + +import java.nio.ByteBuffer; +import java.util.Collections; +import java.util.LinkedHashSet; +import java.util.Map; +import java.util.Set; +import java.util.function.Function; + +import static org.apache.kafka.clients.producer.internals.SenderMetricsRegistry.TOPIC_METRIC_GROUP_NAME; + +/** + * This class is a temporary utility class to help us track down the cause/origin of KAFKA-19012. The main + * idea is that the record's topic is added to the record in a header (in {@link Producer#send(ProducerRecord)}) + * which is later checked at different points in its way to the broker. + * + * <p/> + * + * After the user "sends" a {@link ProducerRecord}, internally to the Kafka client it is decomposed into its + * raw elements: + * + * <ul> + * <li>Topic</li> + * <li>Partition</li> + * <li>Timestamp</li> + * <li>Headers</li> + * <li>Key bytes</li> + * <li>Value bytes</li> + * </ul> + * + * The timestamp, headers, key, and value are stored together in data structures (e.g. {@link ProducerBatch}) as the + * data makes its way from the client internals to the broker. These data structures group the raw record data based on + * the topic and partition. It appears to be the case that somewhere along this path, on very rare occasions, raw + * record data originally sent to topic <code>A</code> mistakenly ends up being grouped along with data for + * topic <code>B</code>. The big question is <em>where</em> does this mistake happen and under what circumstances? + */ +public class Kafka19012Instrumentation { + + /** + * Well-known header that is added to each record that holds the name of the topic at the time of sending. + */ + private static final String TOPIC_HEADER_NAME = "KAFKA-19012-topic"; + + private final Logger log; + + private final Kafka19012MetricsRegistry metrics; + + public Kafka19012Instrumentation(LogContext logContext, Metrics metrics) { + this.log = logContext.logger(Kafka19012Instrumentation.class); + this.metrics = new Kafka19012MetricsRegistry(metrics); + } + + /** + * Adds the well-known {@link #TOPIC_HEADER_NAME} header to the record with the topic from the record. This is + * used to verify internal consistency later in the processing path. + */ + public void addTopicHeader(ProducerRecord<?, ?> record) { + if (record == null) + return; + + RecordHeaders headers = (RecordHeaders) record.headers(); + + try { + // This is the happy path: the header can be added to the record. + headers.add(TOPIC_HEADER_NAME, Utils.utf8(record.topic())); + } catch (IllegalStateException e) { + log.warn("An error occurred adding the {} header to the record, likely because the headers are now readonly", TOPIC_HEADER_NAME, e); + return; + } + + // The following code defensively checks the following... + Header header = headers.lastHeader(TOPIC_HEADER_NAME); + + // ...the topic header is present... + if (header == null) { + log.warn("The {} header was added to the record, but it was not returned from lastHeader()", TOPIC_HEADER_NAME); + return; + } + + byte[] existingTopicBytes = header.value(); + + // ...the topic header has a non-null value... + if (existingTopicBytes == null) { + log.warn("The {} header was was returned from lastHeader(), but its value was null", TOPIC_HEADER_NAME); + return; + } + + String existingTopic = Utils.utf8(existingTopicBytes); + + // ...the topic header value matches the topic from the record. + if (!record.topic().equals(existingTopic)) { + log.warn( + "The {} header had a value of {}, which differs from the record's topic: {}", + TOPIC_HEADER_NAME, + existingTopic, + record.topic() + ); + } + } + + /** + * This is called from the {@link ProducerBatch#tryAppend(long, byte[], byte[], Header[], Callback, long)} to + * check the topic consistency. This is a separate method from + * {@link #checkHeadersForTryAppendForSplit(String, Header[])} to disambiguate which path was taken. + * + * @param expectedTopic The topic stored in the {@link ProducerBatch} + * @param headers Headers that were originally part of the {@link ProducerRecord} which <em>should</em> + * contain the {@link #TOPIC_HEADER_NAME} header + */ + public void checkHeadersForTryAppend(String expectedTopic, Header[] headers) { + checkHeaders( + expectedTopic, + headers, + metrics::tryAppendInconsistencySensor, + "ProducerBatch.tryAppend()" + ); + } + + /** + * This is called from the {@code ProducerBatch#tryAppendForSplit} to check the topic consistency. This is a + * separate method from {@link #checkHeadersForTryAppend(String, Header[])} to disambiguate which path was taken. + * + * @param expectedTopic The topic stored in the {@link ProducerBatch} + * @param headers Headers that were originally part of the {@link ProducerRecord} which <em>should</em> + * contain the {@link #TOPIC_HEADER_NAME} header + */ + public void checkHeadersForTryAppendForSplit(String expectedTopic, Header[] headers) { + checkHeaders( + expectedTopic, + headers, + metrics::tryAppendForSplitInconsistencySensor, + "ProducerBatch.tryAppendForSplit()" + ); + } + + private void checkHeaders(String expectedTopic, + Header[] headers, + Function<String, Sensor> sensorFn, + String location) { + if (headers == null) { + // This _shouldn't_ happen, but let's be careful to check first. + return; + } + + for (Header header : headers) { + // It's possible that a null Header ended up in the record. Also check to make sure the header matches + // the internal topic name header before continuing. + if (header == null || !header.key().equals(TOPIC_HEADER_NAME)) + continue; + + String headerTopic = Utils.utf8(header.value()); + + // Good--the header and the ProducerBatch are consistent, at least for this particular header. + if (headerTopic.equals(expectedTopic)) + continue; + + log.warn( + "A topic mismatch was detected in {}! Expected topic: {}, topic from record header {}: {}.", + location, Review Comment: Maybe print stack trace here? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org