cmccabe commented on a change in pull request #11689:
URL: https://github.com/apache/kafka/pull/11689#discussion_r788922915



##########
File path: 
clients/src/test/java/org/apache/kafka/clients/producer/KafkaProducerTest.java
##########
@@ -1506,6 +1508,56 @@ public void testNullTopicName() {
             "key".getBytes(StandardCharsets.UTF_8), 
"value".getBytes(StandardCharsets.UTF_8)));
     }
 
+    @Test
+    public void testCallbackHandlesError() throws Exception {
+        Map<String, Object> configs = new HashMap<>();
+        configs.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9000");
+        configs.put(ProducerConfig.MAX_BLOCK_MS_CONFIG, "1000");
+
+        Time time = new MockTime();
+        ProducerMetadata producerMetadata = newMetadata(0, Long.MAX_VALUE);
+        MockClient client = new MockClient(time, producerMetadata);
+
+        String invalidTopicName = "topic abc"; // Invalid topic name due to 
space
+
+        try (Producer<String, String> producer = kafkaProducer(configs, new 
StringSerializer(), new StringSerializer(),
+                producerMetadata, client, null, time)) {
+            ProducerRecord<String, String> record = new 
ProducerRecord<>(invalidTopicName, "HelloKafka");
+
+            // Here's the important piece of the test. Let's make sure that 
the RecordMetadata we get
+            // is non-null and adheres to the onCompletion contract.
+            Callback callBack = (recordMetadata, exception) -> {
+                assertNotNull(exception);
+                assertNotNull(recordMetadata);
+
+                try {
+                    assertNotNull(recordMetadata.topic());
+                } catch (NullPointerException e) {
+                    fail("Topic name should be valid even on send failure", e);

Review comment:
       It's not necessary to do this. If you want to display a special error 
message when the assert fails, there is a three-argument form which lets you 
specify the error.

##########
File path: 
clients/src/test/java/org/apache/kafka/clients/producer/KafkaProducerTest.java
##########
@@ -1506,6 +1508,56 @@ public void testNullTopicName() {
             "key".getBytes(StandardCharsets.UTF_8), 
"value".getBytes(StandardCharsets.UTF_8)));
     }
 
+    @Test
+    public void testCallbackHandlesError() throws Exception {
+        Map<String, Object> configs = new HashMap<>();
+        configs.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9000");
+        configs.put(ProducerConfig.MAX_BLOCK_MS_CONFIG, "1000");
+
+        Time time = new MockTime();
+        ProducerMetadata producerMetadata = newMetadata(0, Long.MAX_VALUE);
+        MockClient client = new MockClient(time, producerMetadata);
+
+        String invalidTopicName = "topic abc"; // Invalid topic name due to 
space
+
+        try (Producer<String, String> producer = kafkaProducer(configs, new 
StringSerializer(), new StringSerializer(),
+                producerMetadata, client, null, time)) {
+            ProducerRecord<String, String> record = new 
ProducerRecord<>(invalidTopicName, "HelloKafka");
+
+            // Here's the important piece of the test. Let's make sure that 
the RecordMetadata we get
+            // is non-null and adheres to the onCompletion contract.
+            Callback callBack = (recordMetadata, exception) -> {
+                assertNotNull(exception);
+                assertNotNull(recordMetadata);
+
+                try {
+                    assertNotNull(recordMetadata.topic());
+                } catch (NullPointerException e) {
+                    fail("Topic name should be valid even on send failure", e);
+                }
+
+                assertEquals(invalidTopicName, recordMetadata.topic());
+
+                try {
+                    assertEquals(RecordMetadata.UNKNOWN_PARTITION, 
recordMetadata.partition());

Review comment:
       As before, it's not necessary to do this. If you want to display a 
special error message when the assert fails, there is a three-argument form 
which lets you specify the error.

##########
File path: 
clients/src/test/java/org/apache/kafka/clients/producer/KafkaProducerTest.java
##########
@@ -1506,6 +1508,56 @@ public void testNullTopicName() {
             "key".getBytes(StandardCharsets.UTF_8), 
"value".getBytes(StandardCharsets.UTF_8)));
     }
 
+    @Test
+    public void testCallbackHandlesError() throws Exception {
+        Map<String, Object> configs = new HashMap<>();
+        configs.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9000");
+        configs.put(ProducerConfig.MAX_BLOCK_MS_CONFIG, "1000");
+
+        Time time = new MockTime();
+        ProducerMetadata producerMetadata = newMetadata(0, Long.MAX_VALUE);
+        MockClient client = new MockClient(time, producerMetadata);
+
+        String invalidTopicName = "topic abc"; // Invalid topic name due to 
space
+
+        try (Producer<String, String> producer = kafkaProducer(configs, new 
StringSerializer(), new StringSerializer(),
+                producerMetadata, client, null, time)) {
+            ProducerRecord<String, String> record = new 
ProducerRecord<>(invalidTopicName, "HelloKafka");
+
+            // Here's the important piece of the test. Let's make sure that 
the RecordMetadata we get
+            // is non-null and adheres to the onCompletion contract.
+            Callback callBack = (recordMetadata, exception) -> {
+                assertNotNull(exception);
+                assertNotNull(recordMetadata);
+
+                try {
+                    assertNotNull(recordMetadata.topic());
+                } catch (NullPointerException e) {
+                    fail("Topic name should be valid even on send failure", e);
+                }
+
+                assertEquals(invalidTopicName, recordMetadata.topic());
+
+                try {
+                    assertEquals(RecordMetadata.UNKNOWN_PARTITION, 
recordMetadata.partition());

Review comment:
       As before, it's not necessary to do this. If you want to display a 
special error message when the assert fails, there is a three-argument form 
which lets you specify the error message.

##########
File path: 
clients/src/test/java/org/apache/kafka/clients/producer/KafkaProducerTest.java
##########
@@ -1506,6 +1508,56 @@ public void testNullTopicName() {
             "key".getBytes(StandardCharsets.UTF_8), 
"value".getBytes(StandardCharsets.UTF_8)));
     }
 
+    @Test
+    public void testCallbackHandlesError() throws Exception {
+        Map<String, Object> configs = new HashMap<>();
+        configs.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9000");
+        configs.put(ProducerConfig.MAX_BLOCK_MS_CONFIG, "1000");
+
+        Time time = new MockTime();
+        ProducerMetadata producerMetadata = newMetadata(0, Long.MAX_VALUE);
+        MockClient client = new MockClient(time, producerMetadata);
+
+        String invalidTopicName = "topic abc"; // Invalid topic name due to 
space
+
+        try (Producer<String, String> producer = kafkaProducer(configs, new 
StringSerializer(), new StringSerializer(),
+                producerMetadata, client, null, time)) {
+            ProducerRecord<String, String> record = new 
ProducerRecord<>(invalidTopicName, "HelloKafka");
+
+            // Here's the important piece of the test. Let's make sure that 
the RecordMetadata we get
+            // is non-null and adheres to the onCompletion contract.
+            Callback callBack = (recordMetadata, exception) -> {
+                assertNotNull(exception);
+                assertNotNull(recordMetadata);
+
+                try {
+                    assertNotNull(recordMetadata.topic());
+                } catch (NullPointerException e) {
+                    fail("Topic name should be valid even on send failure", e);

Review comment:
       It's not necessary to do this. If you want to display a special error 
message when the assert fails, there is a three-argument form which lets you 
specify the error message.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


Reply via email to