This is an automated email from the ASF dual-hosted git repository.
lianetm pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/kafka.git
The following commit(s) were added to refs/heads/trunk by this push:
new caeca090b86 MINOR: Improve producer docs and add tests around timeout
behaviour on missing topic/partition (#20533)
caeca090b86 is described below
commit caeca090b860a200d75e0b9c7492f67259ec650a
Author: Lianet Magrans <[email protected]>
AuthorDate: Mon Sep 15 13:28:27 2025 -0400
MINOR: Improve producer docs and add tests around timeout behaviour on
missing topic/partition (#20533)
Clarify timeout errors received on send if the case is topic not in
metadata vs partition not in metadata. Add integration tests showcases
the difference Follow-up from 4.1 fix for misleading timeout error
message (https://issues.apache.org/jira/browse/KAFKA-8862)
Reviewers: TengYao Chi <[email protected]>, Kuan-Po Tseng
<[email protected]>
---
.../kafka/clients/producer/KafkaProducer.java | 25 +++++-----
.../kafka/api/BaseProducerSendTest.scala | 12 +++--
.../kafka/api/PlaintextProducerSendTest.scala | 57 +++++++++++++++++++++-
3 files changed, 78 insertions(+), 16 deletions(-)
diff --git
a/clients/src/main/java/org/apache/kafka/clients/producer/KafkaProducer.java
b/clients/src/main/java/org/apache/kafka/clients/producer/KafkaProducer.java
index 48be57f262b..6e656f590e4 100644
--- a/clients/src/main/java/org/apache/kafka/clients/producer/KafkaProducer.java
+++ b/clients/src/main/java/org/apache/kafka/clients/producer/KafkaProducer.java
@@ -1027,15 +1027,15 @@ public class KafkaProducer<K, V> implements Producer<K,
V> {
* expensive callbacks it is recommended to use your own {@link
java.util.concurrent.Executor} in the callback body
* to parallelize processing.
*
- * @param record The record to send
+ * @param record The record to send
* @param callback A user-supplied callback to execute when the record has
been acknowledged by the server (null
- * indicates no callback)
- *
- * @throws IllegalStateException if a transactional.id has been configured
and no transaction has been started, or
- * when send is invoked after producer has
been closed.
- * @throws InterruptException If the thread is interrupted while blocked
+ * indicates no callback)
+ * @throws IllegalStateException if a transactional.id has been
configured and no transaction has been started, or
+ * when send is invoked after producer has
been closed.
+ * @throws TimeoutException if the topic or the partition specified
in the record cannot be found in metadata within {@code max.block.ms}
+ * @throws InterruptException If the thread is interrupted while
blocked
* @throws SerializationException If the key or value are not valid
objects given the configured serializers
- * @throws KafkaException If a Kafka related error occurs that does not
belong to the public API exceptions.
+ * @throws KafkaException If a Kafka related error occurs that
does not belong to the public API exceptions.
*/
@Override
public Future<RecordMetadata> send(ProducerRecord<K, V> record, Callback
callback) {
@@ -1335,11 +1335,14 @@ public class KafkaProducer<K, V> implements Producer<K,
V> {
/**
* Get the partition metadata for the given topic. This can be used for
custom partitioning.
+ * <p/>
+ * This will attempt to refresh metadata until it finds the topic in it,
or the configured {@link ProducerConfig#MAX_BLOCK_MS_CONFIG} expires.
+ *
* @throws AuthenticationException if authentication fails. See the
exception for more details
- * @throws AuthorizationException if not authorized to the specified
topic. See the exception for more details
- * @throws InterruptException if the thread is interrupted while blocked
- * @throws TimeoutException if metadata could not be refreshed within
{@code max.block.ms}
- * @throws KafkaException for all Kafka-related exceptions, including the
case where this method is called after producer close
+ * @throws AuthorizationException if not authorized to the specified
topic. See the exception for more details
+ * @throws InterruptException if the thread is interrupted while
blocked
+ * @throws TimeoutException if the topic cannot be found in
metadata within {@code max.block.ms}
+ * @throws KafkaException for all Kafka-related exceptions,
including the case where this method is called after producer close
*/
@Override
public List<PartitionInfo> partitionsFor(String topic) {
diff --git
a/core/src/test/scala/integration/kafka/api/BaseProducerSendTest.scala
b/core/src/test/scala/integration/kafka/api/BaseProducerSendTest.scala
index 98155438549..add18b260cd 100644
--- a/core/src/test/scala/integration/kafka/api/BaseProducerSendTest.scala
+++ b/core/src/test/scala/integration/kafka/api/BaseProducerSendTest.scala
@@ -48,16 +48,20 @@ import scala.jdk.javaapi.OptionConverters
abstract class BaseProducerSendTest extends KafkaServerTestHarness {
def generateConfigs: scala.collection.Seq[KafkaConfig] = {
- val overridingProps = new Properties()
val numServers = 2
-
overridingProps.put(GroupCoordinatorConfig.OFFSETS_TOPIC_REPLICATION_FACTOR_CONFIG,
2.toShort)
- overridingProps.put(ServerLogConfigs.NUM_PARTITIONS_CONFIG, 4.toString)
TestUtils.createBrokerConfigs(
numServers,
interBrokerSecurityProtocol = Some(securityProtocol),
trustStoreFile = trustStoreFile,
saslProperties = serverSaslProperties
- ).map(KafkaConfig.fromProps(_, overridingProps))
+ ).map(KafkaConfig.fromProps(_, brokerOverrides))
+ }
+
+ protected def brokerOverrides: Properties = {
+ val overridingProps = new Properties()
+
overridingProps.put(GroupCoordinatorConfig.OFFSETS_TOPIC_REPLICATION_FACTOR_CONFIG,
2.toShort)
+ overridingProps.put(ServerLogConfigs.NUM_PARTITIONS_CONFIG, 4.toString)
+ overridingProps
}
private var consumer: Consumer[Array[Byte], Array[Byte]] = _
diff --git
a/core/src/test/scala/integration/kafka/api/PlaintextProducerSendTest.scala
b/core/src/test/scala/integration/kafka/api/PlaintextProducerSendTest.scala
index 6e3bbf4aed7..dc8b9423304 100644
--- a/core/src/test/scala/integration/kafka/api/PlaintextProducerSendTest.scala
+++ b/core/src/test/scala/integration/kafka/api/PlaintextProducerSendTest.scala
@@ -28,7 +28,7 @@ import org.apache.kafka.common.record.{DefaultRecord,
DefaultRecordBatch, Record
import org.apache.kafka.common.serialization.ByteArraySerializer
import org.apache.kafka.server.config.ServerLogConfigs
import org.junit.jupiter.api.Assertions._
-import org.junit.jupiter.api.Timeout
+import org.junit.jupiter.api.{BeforeEach, TestInfo, Timeout}
import org.junit.jupiter.params.ParameterizedTest
import org.junit.jupiter.params.provider.{Arguments, MethodSource}
@@ -37,6 +37,22 @@ import java.nio.charset.StandardCharsets
class PlaintextProducerSendTest extends BaseProducerSendTest {
+ // topic auto creation is enabled by default, only some tests disable it
+ var disableAutoTopicCreation = false
+
+ override def brokerOverrides: Properties = {
+ val props = super.brokerOverrides
+ if (disableAutoTopicCreation) {
+ props.put("auto.create.topics.enable", "false")
+ }
+ props
+ }
+ @BeforeEach
+ override def setUp(testInfo: TestInfo): Unit = {
+ disableAutoTopicCreation =
testInfo.getDisplayName.contains("autoCreateTopicsEnabled=false")
+ super.setUp(testInfo)
+ }
+
@ParameterizedTest(name =
TestInfoUtils.TestWithParameterizedGroupProtocolNames)
@MethodSource(Array("getTestGroupProtocolParametersAll"))
def testWrongSerializer(groupProtocol: String): Unit = {
@@ -121,6 +137,39 @@ class PlaintextProducerSendTest extends
BaseProducerSendTest {
}
}
+ /**
+ * Test error message received when send fails waiting on metadata for a
topic that does not exist.
+ * No need to run this for both rebalance protocols.
+ */
+ @ParameterizedTest(name = "groupProtocol={0}.autoCreateTopicsEnabled={1}")
+ @MethodSource(Array("protocolAndAutoCreateTopicProviders"))
+ def testSendTimeoutErrorMessageWhenTopicDoesNotExist(groupProtocol: String,
autoCreateTopicsEnabled: String): Unit = {
+ val producer = createProducer(maxBlockMs = 500)
+ val record = new ProducerRecord(topic, null, "key".getBytes,
"value".getBytes)
+ val exception = assertThrows(classOf[ExecutionException], () =>
producer.send(record).get)
+ assertInstanceOf(classOf[TimeoutException], exception.getCause)
+ assertEquals("Topic topic not present in metadata after 500 ms.",
exception.getCause.getMessage)
+ }
+
+ /**
+ * Test error message received when send fails waiting on metadata for a
partition that does not exist (topic exists).
+ * No need to run this for both rebalance protocols.
+ */
+ @ParameterizedTest(name =
TestInfoUtils.TestWithParameterizedGroupProtocolNames)
+
@MethodSource(Array("getTestGroupProtocolParametersClassicGroupProtocolOnly"))
+ def testSendTimeoutErrorWhenPartitionDoesNotExist(groupProtocol: String):
Unit = {
+ val producer = createProducer(maxBlockMs = 500)
+ // Send a message to auto-create the topic
+ var record = new ProducerRecord(topic, null, "key".getBytes,
"value".getBytes)
+ assertEquals(0L, producer.send(record).get.offset, "Should have offset 0")
+
+ // Send another message to the topic that exists but to a partition that
does not
+ record = new ProducerRecord(topic, 10, "key".getBytes, "value".getBytes)
+ val exception = assertThrows(classOf[ExecutionException], () =>
producer.send(record).get)
+ assertInstanceOf(classOf[TimeoutException], exception.getCause)
+ assertEquals("Partition 10 of topic topic with partition count 4 is not
present in metadata after 500 ms.", exception.getCause.getMessage)
+ }
+
@ParameterizedTest(name =
TestInfoUtils.TestWithParameterizedGroupProtocolNames)
@MethodSource(Array("timestampConfigProvider"))
def testSendWithInvalidBeforeAndAfterTimestamp(groupProtocol: String,
messageTimeStampConfig: String, recordTimestamp: Long): Unit = {
@@ -285,4 +334,10 @@ object PlaintextProducerSendTest {
}
data.stream()
}
+
+ def protocolAndAutoCreateTopicProviders: java.util.stream.Stream[Arguments]
= {
+ val data = new java.util.ArrayList[Arguments]()
+ data.add(Arguments.of("classic", "false"))
+ data.stream()
+ }
}
\ No newline at end of file