cmccabe commented on a change in pull request #11689: URL: https://github.com/apache/kafka/pull/11689#discussion_r788922915
########## File path: clients/src/test/java/org/apache/kafka/clients/producer/KafkaProducerTest.java ########## @@ -1506,6 +1508,56 @@ public void testNullTopicName() { "key".getBytes(StandardCharsets.UTF_8), "value".getBytes(StandardCharsets.UTF_8))); } + @Test + public void testCallbackHandlesError() throws Exception { + Map<String, Object> configs = new HashMap<>(); + configs.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9000"); + configs.put(ProducerConfig.MAX_BLOCK_MS_CONFIG, "1000"); + + Time time = new MockTime(); + ProducerMetadata producerMetadata = newMetadata(0, Long.MAX_VALUE); + MockClient client = new MockClient(time, producerMetadata); + + String invalidTopicName = "topic abc"; // Invalid topic name due to space + + try (Producer<String, String> producer = kafkaProducer(configs, new StringSerializer(), new StringSerializer(), + producerMetadata, client, null, time)) { + ProducerRecord<String, String> record = new ProducerRecord<>(invalidTopicName, "HelloKafka"); + + // Here's the important piece of the test. Let's make sure that the RecordMetadata we get + // is non-null and adheres to the onCompletion contract. + Callback callBack = (recordMetadata, exception) -> { + assertNotNull(exception); + assertNotNull(recordMetadata); + + try { + assertNotNull(recordMetadata.topic()); + } catch (NullPointerException e) { + fail("Topic name should be valid even on send failure", e); Review comment: It's not necessary to do this. If you want to display a special error message when the assert fails, there is a three-argument form which lets you specify the error. ########## File path: clients/src/test/java/org/apache/kafka/clients/producer/KafkaProducerTest.java ########## @@ -1506,6 +1508,56 @@ public void testNullTopicName() { "key".getBytes(StandardCharsets.UTF_8), "value".getBytes(StandardCharsets.UTF_8))); } + @Test + public void testCallbackHandlesError() throws Exception { + Map<String, Object> configs = new HashMap<>(); + configs.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9000"); + configs.put(ProducerConfig.MAX_BLOCK_MS_CONFIG, "1000"); + + Time time = new MockTime(); + ProducerMetadata producerMetadata = newMetadata(0, Long.MAX_VALUE); + MockClient client = new MockClient(time, producerMetadata); + + String invalidTopicName = "topic abc"; // Invalid topic name due to space + + try (Producer<String, String> producer = kafkaProducer(configs, new StringSerializer(), new StringSerializer(), + producerMetadata, client, null, time)) { + ProducerRecord<String, String> record = new ProducerRecord<>(invalidTopicName, "HelloKafka"); + + // Here's the important piece of the test. Let's make sure that the RecordMetadata we get + // is non-null and adheres to the onCompletion contract. + Callback callBack = (recordMetadata, exception) -> { + assertNotNull(exception); + assertNotNull(recordMetadata); + + try { + assertNotNull(recordMetadata.topic()); + } catch (NullPointerException e) { + fail("Topic name should be valid even on send failure", e); + } + + assertEquals(invalidTopicName, recordMetadata.topic()); + + try { + assertEquals(RecordMetadata.UNKNOWN_PARTITION, recordMetadata.partition()); Review comment: As before, it's not necessary to do this. If you want to display a special error message when the assert fails, there is a three-argument form which lets you specify the error. ########## File path: clients/src/test/java/org/apache/kafka/clients/producer/KafkaProducerTest.java ########## @@ -1506,6 +1508,56 @@ public void testNullTopicName() { "key".getBytes(StandardCharsets.UTF_8), "value".getBytes(StandardCharsets.UTF_8))); } + @Test + public void testCallbackHandlesError() throws Exception { + Map<String, Object> configs = new HashMap<>(); + configs.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9000"); + configs.put(ProducerConfig.MAX_BLOCK_MS_CONFIG, "1000"); + + Time time = new MockTime(); + ProducerMetadata producerMetadata = newMetadata(0, Long.MAX_VALUE); + MockClient client = new MockClient(time, producerMetadata); + + String invalidTopicName = "topic abc"; // Invalid topic name due to space + + try (Producer<String, String> producer = kafkaProducer(configs, new StringSerializer(), new StringSerializer(), + producerMetadata, client, null, time)) { + ProducerRecord<String, String> record = new ProducerRecord<>(invalidTopicName, "HelloKafka"); + + // Here's the important piece of the test. Let's make sure that the RecordMetadata we get + // is non-null and adheres to the onCompletion contract. + Callback callBack = (recordMetadata, exception) -> { + assertNotNull(exception); + assertNotNull(recordMetadata); + + try { + assertNotNull(recordMetadata.topic()); + } catch (NullPointerException e) { + fail("Topic name should be valid even on send failure", e); + } + + assertEquals(invalidTopicName, recordMetadata.topic()); + + try { + assertEquals(RecordMetadata.UNKNOWN_PARTITION, recordMetadata.partition()); Review comment: As before, it's not necessary to do this. If you want to display a special error message when the assert fails, there is a three-argument form which lets you specify the error message. ########## File path: clients/src/test/java/org/apache/kafka/clients/producer/KafkaProducerTest.java ########## @@ -1506,6 +1508,56 @@ public void testNullTopicName() { "key".getBytes(StandardCharsets.UTF_8), "value".getBytes(StandardCharsets.UTF_8))); } + @Test + public void testCallbackHandlesError() throws Exception { + Map<String, Object> configs = new HashMap<>(); + configs.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9000"); + configs.put(ProducerConfig.MAX_BLOCK_MS_CONFIG, "1000"); + + Time time = new MockTime(); + ProducerMetadata producerMetadata = newMetadata(0, Long.MAX_VALUE); + MockClient client = new MockClient(time, producerMetadata); + + String invalidTopicName = "topic abc"; // Invalid topic name due to space + + try (Producer<String, String> producer = kafkaProducer(configs, new StringSerializer(), new StringSerializer(), + producerMetadata, client, null, time)) { + ProducerRecord<String, String> record = new ProducerRecord<>(invalidTopicName, "HelloKafka"); + + // Here's the important piece of the test. Let's make sure that the RecordMetadata we get + // is non-null and adheres to the onCompletion contract. + Callback callBack = (recordMetadata, exception) -> { + assertNotNull(exception); + assertNotNull(recordMetadata); + + try { + assertNotNull(recordMetadata.topic()); + } catch (NullPointerException e) { + fail("Topic name should be valid even on send failure", e); Review comment: It's not necessary to do this. If you want to display a special error message when the assert fails, there is a three-argument form which lets you specify the error message. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org