This is an automated email from the ASF dual-hosted git repository.

lianetm pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/kafka.git


The following commit(s) were added to refs/heads/trunk by this push:
     new caeca090b86 MINOR: Improve producer docs and add tests around timeout 
behaviour on missing topic/partition (#20533)
caeca090b86 is described below

commit caeca090b860a200d75e0b9c7492f67259ec650a
Author: Lianet Magrans <[email protected]>
AuthorDate: Mon Sep 15 13:28:27 2025 -0400

    MINOR: Improve producer docs and add tests around timeout behaviour on 
missing topic/partition (#20533)
    
    Clarify timeout errors received on send if the case is topic not in
    metadata vs partition not in metadata.  Add integration tests showcases
    the difference  Follow-up from 4.1 fix for misleading timeout error
    message (https://issues.apache.org/jira/browse/KAFKA-8862)
    
    Reviewers: TengYao Chi <[email protected]>, Kuan-Po Tseng
     <[email protected]>
---
 .../kafka/clients/producer/KafkaProducer.java      | 25 +++++-----
 .../kafka/api/BaseProducerSendTest.scala           | 12 +++--
 .../kafka/api/PlaintextProducerSendTest.scala      | 57 +++++++++++++++++++++-
 3 files changed, 78 insertions(+), 16 deletions(-)

diff --git 
a/clients/src/main/java/org/apache/kafka/clients/producer/KafkaProducer.java 
b/clients/src/main/java/org/apache/kafka/clients/producer/KafkaProducer.java
index 48be57f262b..6e656f590e4 100644
--- a/clients/src/main/java/org/apache/kafka/clients/producer/KafkaProducer.java
+++ b/clients/src/main/java/org/apache/kafka/clients/producer/KafkaProducer.java
@@ -1027,15 +1027,15 @@ public class KafkaProducer<K, V> implements Producer<K, 
V> {
      * expensive callbacks it is recommended to use your own {@link 
java.util.concurrent.Executor} in the callback body
      * to parallelize processing.
      *
-     * @param record The record to send
+     * @param record   The record to send
      * @param callback A user-supplied callback to execute when the record has 
been acknowledged by the server (null
-     *        indicates no callback)
-     *
-     * @throws IllegalStateException if a transactional.id has been configured 
and no transaction has been started, or
-     *                               when send is invoked after producer has 
been closed.
-     * @throws InterruptException If the thread is interrupted while blocked
+     *                 indicates no callback)
+     * @throws IllegalStateException  if a transactional.id has been 
configured and no transaction has been started, or
+     *                                when send is invoked after producer has 
been closed.
+     * @throws TimeoutException       if the topic or the partition specified 
in the record cannot be found in metadata within {@code max.block.ms}
+     * @throws InterruptException     If the thread is interrupted while 
blocked
      * @throws SerializationException If the key or value are not valid 
objects given the configured serializers
-     * @throws KafkaException If a Kafka related error occurs that does not 
belong to the public API exceptions.
+     * @throws KafkaException         If a Kafka related error occurs that 
does not belong to the public API exceptions.
      */
     @Override
     public Future<RecordMetadata> send(ProducerRecord<K, V> record, Callback 
callback) {
@@ -1335,11 +1335,14 @@ public class KafkaProducer<K, V> implements Producer<K, 
V> {
 
     /**
      * Get the partition metadata for the given topic. This can be used for 
custom partitioning.
+     * <p/>
+     * This will attempt to refresh metadata until it finds the topic in it, 
or the configured {@link ProducerConfig#MAX_BLOCK_MS_CONFIG} expires.
+     *
      * @throws AuthenticationException if authentication fails. See the 
exception for more details
-     * @throws AuthorizationException if not authorized to the specified 
topic. See the exception for more details
-     * @throws InterruptException if the thread is interrupted while blocked
-     * @throws TimeoutException if metadata could not be refreshed within 
{@code max.block.ms}
-     * @throws KafkaException for all Kafka-related exceptions, including the 
case where this method is called after producer close
+     * @throws AuthorizationException  if not authorized to the specified 
topic. See the exception for more details
+     * @throws InterruptException      if the thread is interrupted while 
blocked
+     * @throws TimeoutException        if the topic cannot be found in 
metadata within {@code max.block.ms}
+     * @throws KafkaException          for all Kafka-related exceptions, 
including the case where this method is called after producer close
      */
     @Override
     public List<PartitionInfo> partitionsFor(String topic) {
diff --git 
a/core/src/test/scala/integration/kafka/api/BaseProducerSendTest.scala 
b/core/src/test/scala/integration/kafka/api/BaseProducerSendTest.scala
index 98155438549..add18b260cd 100644
--- a/core/src/test/scala/integration/kafka/api/BaseProducerSendTest.scala
+++ b/core/src/test/scala/integration/kafka/api/BaseProducerSendTest.scala
@@ -48,16 +48,20 @@ import scala.jdk.javaapi.OptionConverters
 abstract class BaseProducerSendTest extends KafkaServerTestHarness {
 
   def generateConfigs: scala.collection.Seq[KafkaConfig] = {
-    val overridingProps = new Properties()
     val numServers = 2
-    
overridingProps.put(GroupCoordinatorConfig.OFFSETS_TOPIC_REPLICATION_FACTOR_CONFIG,
 2.toShort)
-    overridingProps.put(ServerLogConfigs.NUM_PARTITIONS_CONFIG, 4.toString)
     TestUtils.createBrokerConfigs(
       numServers,
       interBrokerSecurityProtocol = Some(securityProtocol),
       trustStoreFile = trustStoreFile,
       saslProperties = serverSaslProperties
-    ).map(KafkaConfig.fromProps(_, overridingProps))
+    ).map(KafkaConfig.fromProps(_, brokerOverrides))
+  }
+
+  protected def brokerOverrides: Properties = {
+    val overridingProps = new Properties()
+    
overridingProps.put(GroupCoordinatorConfig.OFFSETS_TOPIC_REPLICATION_FACTOR_CONFIG,
 2.toShort)
+    overridingProps.put(ServerLogConfigs.NUM_PARTITIONS_CONFIG, 4.toString)
+    overridingProps
   }
 
   private var consumer: Consumer[Array[Byte], Array[Byte]] = _
diff --git 
a/core/src/test/scala/integration/kafka/api/PlaintextProducerSendTest.scala 
b/core/src/test/scala/integration/kafka/api/PlaintextProducerSendTest.scala
index 6e3bbf4aed7..dc8b9423304 100644
--- a/core/src/test/scala/integration/kafka/api/PlaintextProducerSendTest.scala
+++ b/core/src/test/scala/integration/kafka/api/PlaintextProducerSendTest.scala
@@ -28,7 +28,7 @@ import org.apache.kafka.common.record.{DefaultRecord, 
DefaultRecordBatch, Record
 import org.apache.kafka.common.serialization.ByteArraySerializer
 import org.apache.kafka.server.config.ServerLogConfigs
 import org.junit.jupiter.api.Assertions._
-import org.junit.jupiter.api.Timeout
+import org.junit.jupiter.api.{BeforeEach, TestInfo, Timeout}
 import org.junit.jupiter.params.ParameterizedTest
 import org.junit.jupiter.params.provider.{Arguments, MethodSource}
 
@@ -37,6 +37,22 @@ import java.nio.charset.StandardCharsets
 
 class PlaintextProducerSendTest extends BaseProducerSendTest {
 
+  // topic auto creation is enabled by default, only some tests disable it
+  var disableAutoTopicCreation = false
+
+  override def brokerOverrides: Properties = {
+    val props = super.brokerOverrides
+    if (disableAutoTopicCreation) {
+      props.put("auto.create.topics.enable", "false")
+    }
+    props
+  }
+  @BeforeEach
+  override def setUp(testInfo: TestInfo): Unit = {
+    disableAutoTopicCreation = 
testInfo.getDisplayName.contains("autoCreateTopicsEnabled=false")
+    super.setUp(testInfo)
+  }
+
   @ParameterizedTest(name = 
TestInfoUtils.TestWithParameterizedGroupProtocolNames)
   @MethodSource(Array("getTestGroupProtocolParametersAll"))
   def testWrongSerializer(groupProtocol: String): Unit = {
@@ -121,6 +137,39 @@ class PlaintextProducerSendTest extends 
BaseProducerSendTest {
     }
   }
 
+  /**
+   * Test error message received when send fails waiting on metadata for a 
topic that does not exist.
+   * No need to run this for both rebalance protocols.
+   */
+  @ParameterizedTest(name = "groupProtocol={0}.autoCreateTopicsEnabled={1}")
+  @MethodSource(Array("protocolAndAutoCreateTopicProviders"))
+  def testSendTimeoutErrorMessageWhenTopicDoesNotExist(groupProtocol: String, 
autoCreateTopicsEnabled: String): Unit = {
+    val producer = createProducer(maxBlockMs = 500)
+    val record = new ProducerRecord(topic, null, "key".getBytes, 
"value".getBytes)
+    val exception = assertThrows(classOf[ExecutionException], () => 
producer.send(record).get)
+    assertInstanceOf(classOf[TimeoutException], exception.getCause)
+    assertEquals("Topic topic not present in metadata after 500 ms.", 
exception.getCause.getMessage)
+  }
+
+  /**
+   * Test error message received when send fails waiting on metadata for a 
partition that does not exist (topic exists).
+   * No need to run this for both rebalance protocols.
+   */
+  @ParameterizedTest(name = 
TestInfoUtils.TestWithParameterizedGroupProtocolNames)
+  
@MethodSource(Array("getTestGroupProtocolParametersClassicGroupProtocolOnly"))
+  def testSendTimeoutErrorWhenPartitionDoesNotExist(groupProtocol: String): 
Unit = {
+    val producer = createProducer(maxBlockMs = 500)
+    // Send a message to auto-create the topic
+    var record = new ProducerRecord(topic, null, "key".getBytes, 
"value".getBytes)
+    assertEquals(0L, producer.send(record).get.offset, "Should have offset 0")
+
+    // Send another message to the topic that exists but to a partition that 
does not
+    record = new ProducerRecord(topic, 10, "key".getBytes, "value".getBytes)
+    val exception = assertThrows(classOf[ExecutionException], () => 
producer.send(record).get)
+    assertInstanceOf(classOf[TimeoutException], exception.getCause)
+    assertEquals("Partition 10 of topic topic with partition count 4 is not 
present in metadata after 500 ms.", exception.getCause.getMessage)
+  }
+
   @ParameterizedTest(name = 
TestInfoUtils.TestWithParameterizedGroupProtocolNames)
   @MethodSource(Array("timestampConfigProvider"))
   def testSendWithInvalidBeforeAndAfterTimestamp(groupProtocol: String, 
messageTimeStampConfig: String, recordTimestamp: Long): Unit = {
@@ -285,4 +334,10 @@ object PlaintextProducerSendTest {
     }
     data.stream()
   }
+
+  def protocolAndAutoCreateTopicProviders: java.util.stream.Stream[Arguments] 
= {
+    val data = new java.util.ArrayList[Arguments]()
+    data.add(Arguments.of("classic", "false"))
+    data.stream()
+  }
 }
\ No newline at end of file

Reply via email to