[jira] [Commented] (KAFKA-7412) Bug prone response from producer.send(ProducerRecord, Callback) if Kafka broker is not running
[ https://issues.apache.org/jira/browse/KAFKA-7412?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16680684#comment-16680684 ] ASF GitHub Bot commented on KAFKA-7412: --- junrao closed pull request #5798: KAFKA-7412: onComplete should not reassign `metadata` variable URL: https://github.com/apache/kafka/pull/5798 This is a PR merged from a forked repository. As GitHub hides the original diff on merge, it is displayed below for the sake of provenance: As this is a foreign pull request (from a fork), the diff is supplied below (as it won't show otherwise due to GitHub magic): diff --git a/clients/src/main/java/org/apache/kafka/clients/producer/Callback.java b/clients/src/main/java/org/apache/kafka/clients/producer/Callback.java index a70e4e9a685..f7d4bcdd468 100644 --- a/clients/src/main/java/org/apache/kafka/clients/producer/Callback.java +++ b/clients/src/main/java/org/apache/kafka/clients/producer/Callback.java @@ -24,10 +24,11 @@ /** * A callback method the user can implement to provide asynchronous handling of request completion. This method will - * be called when the record sent to the server has been acknowledged. Exactly one of the arguments will be - * non-null. - * @param metadata The metadata for the record that was sent (i.e. the partition and offset). Null if an error - *occurred. + * be called when the record sent to the server has been acknowledged. When exception is not null in the callback, + * metadata will contain the special -1 value for all fields except for topicPartition, which will be valid. + * + * @param metadata The metadata for the record that was sent (i.e. the partition and offset). An empty metadata + * with -1 value for all fields except for topicPartition will be returned if an error occurred. * @param exception The exception thrown during processing of this record. Null if no error occurred. * Possible thrown exceptions include: * @@ -49,5 +50,5 @@ * TimeoutException * UnknownTopicOrPartitionException */ -public void onCompletion(RecordMetadata metadata, Exception exception); +void onCompletion(RecordMetadata metadata, Exception exception); } diff --git a/examples/src/main/java/kafka/examples/Producer.java b/examples/src/main/java/kafka/examples/Producer.java index 87212801246..b6998c58ac7 100644 --- a/examples/src/main/java/kafka/examples/Producer.java +++ b/examples/src/main/java/kafka/examples/Producer.java @@ -81,8 +81,8 @@ public DemoCallBack(long startTime, int key, String message) { /** * A callback method the user can implement to provide asynchronous handling of request completion. This method will - * be called when the record sent to the server has been acknowledged. Exactly one of the arguments will be - * non-null. + * be called when the record sent to the server has been acknowledged. When exception is not null in the callback, + * metadata will contain the special -1 value for all fields except for topicPartition, which will be valid. * * @param metadata The metadata for the record that was sent (i.e. the partition and offset). Null if an error * occurred. This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org > Bug prone response from producer.send(ProducerRecord, Callback) if Kafka > broker is not running > -- > > Key: KAFKA-7412 > URL: https://issues.apache.org/jira/browse/KAFKA-7412 > Project: Kafka > Issue Type: Bug > Components: producer >Affects Versions: 2.0.0 >Reporter: Michal Turek >Assignee: huxihx >Priority: Major > Attachments: both_metadata_and_exception.png, > metadata_when_kafka_is_stopped.png > > > Hi there, I have probably found a bug in Java Kafka producer client. > Scenario & current behavior: > - Start Kafka broker, single instance. > - Start application that produces messages to Kafka. > - Let the application to load partitions for a topic to warm up the producer, > e.g. send a message to Kafka. I'm not sure if this is necessary step, but our > code does it. > - Gracefully stop the Kafka broker. > - Application logs now contains "org.apache.kafka.clients.NetworkClient: > [Producer clientId=...] Connection to node 0 could not be established. Broker > may not be available." so the client is aware about the Kafka unavailability. > -
[jira] [Commented] (KAFKA-7412) Bug prone response from producer.send(ProducerRecord, Callback) if Kafka broker is not running
[ https://issues.apache.org/jira/browse/KAFKA-7412?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16649959#comment-16649959 ] ASF GitHub Bot commented on KAFKA-7412: --- huxihx opened a new pull request #5798: KAFKA-7412: onComplete should not reassign `metadata` variable URL: https://github.com/apache/kafka/pull/5798 The Java doc for `InterceptorCallback#onComplete` says that exactly one of the arguments(metadata and exception) will be non-null. However, the commitment will be broken when TimeoutException is encountered since the code reassigns a new-created RecordMetadata object to variable `metadata`. The solution is to leave `metadata1` unchanged and pass a new RecordMetadata instance to `onAcknowledgement`. *More detailed description of your change, if necessary. The PR title and PR message become the squashed commit message, so use a separate comment to ping reviewers.* *Summary of testing strategy (including rationale) for the feature or bug fix. Unit and/or integration tests are expected for any behaviour change and system tests should be considered for larger changes.* ### Committer Checklist (excluded from commit message) - [ ] Verify design and implementation - [ ] Verify test coverage and CI build status - [ ] Verify documentation (including upgrade notes) This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org > Bug prone response from producer.send(ProducerRecord, Callback) if Kafka > broker is not running > -- > > Key: KAFKA-7412 > URL: https://issues.apache.org/jira/browse/KAFKA-7412 > Project: Kafka > Issue Type: Bug > Components: producer >Affects Versions: 2.0.0 >Reporter: Michal Turek >Priority: Major > Attachments: both_metadata_and_exception.png, > metadata_when_kafka_is_stopped.png > > > Hi there, I have probably found a bug in Java Kafka producer client. > Scenario & current behavior: > - Start Kafka broker, single instance. > - Start application that produces messages to Kafka. > - Let the application to load partitions for a topic to warm up the producer, > e.g. send a message to Kafka. I'm not sure if this is necessary step, but our > code does it. > - Gracefully stop the Kafka broker. > - Application logs now contains "org.apache.kafka.clients.NetworkClient: > [Producer clientId=...] Connection to node 0 could not be established. Broker > may not be available." so the client is aware about the Kafka unavailability. > - Trigger the producer to send a message using > KafkaProducer.send(ProducerRecord, Callback) method. > - The callback that notifies business code receives non-null RecordMetadata > and null Exception after request.timeout.ms. The metadata contains offset -1 > which is value of ProduceResponse.INVALID_OFFSET. > Expected behavior: > - If the Kafka is not running and the message is not appended to the log, the > callback should contain null RecordMetadata and non-null Exception. At least > I subjectively understand the Javadoc this way, "exception on production > error" in simple words. > - Developer that is not aware of this behavior and that doesn't test for > offset -1, may consider the message as successfully send and properly acked > by the broker. > Known workaround > - Together with checking for non-null exception in the callback, add another > condition for ProduceResponse.INVALID_OFFSET. > {noformat} > try { > producer.send(record, (metadata, exception) -> { > if (metadata != null) { > if (metadata.offset() != > ProduceResponse.INVALID_OFFSET) { > // Success > } else { > // Failure > } > } else { > // Failure > } > }); > } catch (Exception e) { > // Failure > } > {noformat} > Used setup > - Latest Kafka 2.0.0 for both broker and Java client. > - Originally found with broker 0.11.0.1 and client 2.0.0. > - Code is analogy of the one in Javadoc of KafkaProducer.send(). > - Used producer configuration (others use defaults). > {noformat} > bootstrap.servers = "localhost:9092" > client.id = "..." > acks = "all" > retries = 1 > linger.ms = "20" > compression.type = "lz4" >
[jira] [Commented] (KAFKA-7412) Bug prone response from producer.send(ProducerRecord, Callback) if Kafka broker is not running
[ https://issues.apache.org/jira/browse/KAFKA-7412?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16626825#comment-16626825 ] Michal Turek commented on KAFKA-7412: - Hmm, I have just tried to repeat it multiple times with fresh Kafka installation with default configuration (according to quick start guide). The callback is now receiving org.apache.kafka.common.errors.TimeoutException together with non-null RecordMetadata, offset -1, which breaks "Exactly one of the arguments will be non-null." contract from Javadoc. I still believe there is something broken inside the client. {noformat} /** * A callback method the user can implement to provide asynchronous handling of request completion. This method will * be called when the record sent to the server has been acknowledged. Exactly one of the arguments will be * non-null. * @param metadata The metadata for the record that was sent (i.e. the partition and offset). Null if an error *occurred. * @param exception The exception thrown during processing of this record. Null if no error occurred. * Possible thrown exceptions include: * ... {noformat} {noformat} Metadata AA_INPUT_666-0@-1, exception org.apache.kafka.common.errors.TimeoutException: Expiring 1 record(s) for AA_INPUT_666-0: 5036 ms has passed since batch creation plus linger time {noformat} !both_metadata_and_exception.png! I have no idea why it started to behave differently. It's the same code with the same configuration and versions. This screenshot of debugging has been taken two weeks ago while creating the task. !metadata_when_kafka_is_stopped.png! > Bug prone response from producer.send(ProducerRecord, Callback) if Kafka > broker is not running > -- > > Key: KAFKA-7412 > URL: https://issues.apache.org/jira/browse/KAFKA-7412 > Project: Kafka > Issue Type: Bug > Components: producer >Affects Versions: 2.0.0 >Reporter: Michal Turek >Priority: Major > Attachments: both_metadata_and_exception.png, > metadata_when_kafka_is_stopped.png > > > Hi there, I have probably found a bug in Java Kafka producer client. > Scenario & current behavior: > - Start Kafka broker, single instance. > - Start application that produces messages to Kafka. > - Let the application to load partitions for a topic to warm up the producer, > e.g. send a message to Kafka. I'm not sure if this is necessary step, but our > code does it. > - Gracefully stop the Kafka broker. > - Application logs now contains "org.apache.kafka.clients.NetworkClient: > [Producer clientId=...] Connection to node 0 could not be established. Broker > may not be available." so the client is aware about the Kafka unavailability. > - Trigger the producer to send a message using > KafkaProducer.send(ProducerRecord, Callback) method. > - The callback that notifies business code receives non-null RecordMetadata > and null Exception after request.timeout.ms. The metadata contains offset -1 > which is value of ProduceResponse.INVALID_OFFSET. > Expected behavior: > - If the Kafka is not running and the message is not appended to the log, the > callback should contain null RecordMetadata and non-null Exception. At least > I subjectively understand the Javadoc this way, "exception on production > error" in simple words. > - Developer that is not aware of this behavior and that doesn't test for > offset -1, may consider the message as successfully send and properly acked > by the broker. > Known workaround > - Together with checking for non-null exception in the callback, add another > condition for ProduceResponse.INVALID_OFFSET. > {noformat} > try { > producer.send(record, (metadata, exception) -> { > if (metadata != null) { > if (metadata.offset() != > ProduceResponse.INVALID_OFFSET) { > // Success > } else { > // Failure > } > } else { > // Failure > } > }); > } catch (Exception e) { > // Failure > } > {noformat} > Used setup > - Latest Kafka 2.0.0 for both broker and Java client. > - Originally found with broker 0.11.0.1 and client 2.0.0. > - Code is analogy of the one in Javadoc of KafkaProducer.send(). > - Used producer configuration (others use defaults). > {noformat} > bootstrap.servers = "localhost:9092" > client.id = "..." > acks = "all" > retries = 1 > linger.ms = "20" > compression.type = "lz4" > request
[jira] [Commented] (KAFKA-7412) Bug prone response from producer.send(ProducerRecord, Callback) if Kafka broker is not running
[ https://issues.apache.org/jira/browse/KAFKA-7412?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16626414#comment-16626414 ] Suman B N commented on KAFKA-7412: -- Even I have used the same client, broker, and code, but all the time there is an exception - org.apache.kafka.common.errors.TimeoutException. Re-run again and see if you can reproduce. > Bug prone response from producer.send(ProducerRecord, Callback) if Kafka > broker is not running > -- > > Key: KAFKA-7412 > URL: https://issues.apache.org/jira/browse/KAFKA-7412 > Project: Kafka > Issue Type: Bug > Components: producer >Affects Versions: 2.0.0 >Reporter: Michal Turek >Priority: Major > Attachments: metadata_when_kafka_is_stopped.png > > > Hi there, I have probably found a bug in Java Kafka producer client. > Scenario & current behavior: > - Start Kafka broker, single instance. > - Start application that produces messages to Kafka. > - Let the application to load partitions for a topic to warm up the producer, > e.g. send a message to Kafka. I'm not sure if this is necessary step, but our > code does it. > - Gracefully stop the Kafka broker. > - Application logs now contains "org.apache.kafka.clients.NetworkClient: > [Producer clientId=...] Connection to node 0 could not be established. Broker > may not be available." so the client is aware about the Kafka unavailability. > - Trigger the producer to send a message using > KafkaProducer.send(ProducerRecord, Callback) method. > - The callback that notifies business code receives non-null RecordMetadata > and null Exception after request.timeout.ms. The metadata contains offset -1 > which is value of ProduceResponse.INVALID_OFFSET. > Expected behavior: > - If the Kafka is not running and the message is not appended to the log, the > callback should contain null RecordMetadata and non-null Exception. At least > I subjectively understand the Javadoc this way, "exception on production > error" in simple words. > - Developer that is not aware of this behavior and that doesn't test for > offset -1, may consider the message as successfully send and properly acked > by the broker. > Known workaround > - Together with checking for non-null exception in the callback, add another > condition for ProduceResponse.INVALID_OFFSET. > {noformat} > try { > producer.send(record, (metadata, exception) -> { > if (metadata != null) { > if (metadata.offset() != > ProduceResponse.INVALID_OFFSET) { > // Success > } else { > // Failure > } > } else { > // Failure > } > }); > } catch (Exception e) { > // Failure > } > {noformat} > Used setup > - Latest Kafka 2.0.0 for both broker and Java client. > - Originally found with broker 0.11.0.1 and client 2.0.0. > - Code is analogy of the one in Javadoc of KafkaProducer.send(). > - Used producer configuration (others use defaults). > {noformat} > bootstrap.servers = "localhost:9092" > client.id = "..." > acks = "all" > retries = 1 > linger.ms = "20" > compression.type = "lz4" > request.timeout.ms = 5000 # The same behavior is with default, this is to > speed up the tests > {noformat} -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (KAFKA-7412) Bug prone response from producer.send(ProducerRecord, Callback) if Kafka broker is not running
[ https://issues.apache.org/jira/browse/KAFKA-7412?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16618599#comment-16618599 ] huxihx commented on KAFKA-7412: --- I would expect the exception should not be empty in your case. It will be much safer to check exception nullability first. > Bug prone response from producer.send(ProducerRecord, Callback) if Kafka > broker is not running > -- > > Key: KAFKA-7412 > URL: https://issues.apache.org/jira/browse/KAFKA-7412 > Project: Kafka > Issue Type: Bug > Components: producer >Affects Versions: 2.0.0 >Reporter: Michal Turek >Priority: Major > Attachments: metadata_when_kafka_is_stopped.png > > > Hi there, I have probably found a bug in Java Kafka producer client. > Scenario & current behavior: > - Start Kafka broker, single instance. > - Start application that produces messages to Kafka. > - Let the application to load partitions for a topic to warm up the producer, > e.g. send a message to Kafka. I'm not sure if this is necessary step, but our > code does it. > - Gracefully stop the Kafka broker. > - Application logs now contains "org.apache.kafka.clients.NetworkClient: > [Producer clientId=...] Connection to node 0 could not be established. Broker > may not be available." so the client is aware about the Kafka unavailability. > - Trigger the producer to send a message using > KafkaProducer.send(ProducerRecord, Callback) method. > - The callback that notifies business code receives non-null RecordMetadata > and null Exception after request.timeout.ms. The metadata contains offset -1 > which is value of ProduceResponse.INVALID_OFFSET. > Expected behavior: > - If the Kafka is not running and the message is not appended to the log, the > callback should contain null RecordMetadata and non-null Exception. At least > I subjectively understand the Javadoc this way, "exception on production > error" in simple words. > - Developer that is not aware of this behavior and that doesn't test for > offset -1, may consider the message as successfully send and properly acked > by the broker. > Known workaround > - Together with checking for non-null exception in the callback, add another > condition for ProduceResponse.INVALID_OFFSET. > {noformat} > try { > producer.send(record, (metadata, exception) -> { > if (metadata != null) { > if (metadata.offset() != > ProduceResponse.INVALID_OFFSET) { > // Success > } else { > // Failure > } > } else { > // Failure > } > }); > } catch (Exception e) { > // Failure > } > {noformat} > Used setup > - Latest Kafka 2.0.0 for both broker and Java client. > - Originally found with broker 0.11.0.1 and client 2.0.0. > - Code is analogy of the one in Javadoc of KafkaProducer.send(). > - Used producer configuration (others use defaults). > {noformat} > bootstrap.servers = "localhost:9092" > client.id = "..." > acks = "all" > retries = 1 > linger.ms = "20" > compression.type = "lz4" > request.timeout.ms = 5000 # The same behavior is with default, this is to > speed up the tests > {noformat} -- This message was sent by Atlassian JIRA (v7.6.3#76005)