[PR] KAFKA-15259: Processing must continue with flush + commitTnx [kafka]

2024-06-13 Thread via GitHub
aliehsaeedii opened a new pull request, #16332: URL: https://github.com/apache/kafka/pull/16332 This PR aims at returning the transaction from error state to enable the user to commit the transaction even if the latest `send(ProducerRecord)` failed with an error. This change will fix the bu

Re: [PR] KAFKA-15259: Processing must continue with flush + commitTnx [kafka]

2024-06-13 Thread via GitHub
mjsax commented on code in PR #16332: URL: https://github.com/apache/kafka/pull/16332#discussion_r1638861356 ## clients/src/main/java/org/apache/kafka/clients/producer/KafkaProducer.java: ## @@ -792,6 +792,9 @@ public void sendOffsetsToTransaction(Map offs * * Furth

Re: [PR] KAFKA-15259: Processing must continue with flush + commitTnx [kafka]

2024-06-13 Thread via GitHub
aliehsaeedii commented on code in PR #16332: URL: https://github.com/apache/kafka/pull/16332#discussion_r1639280799 ## clients/src/main/java/org/apache/kafka/clients/producer/KafkaProducer.java: ## @@ -792,6 +792,9 @@ public void sendOffsetsToTransaction(Map offs *

Re: [PR] KAFKA-15259: Processing must continue with flush + commitTnx [kafka]

2024-06-14 Thread via GitHub
aliehsaeedii commented on code in PR #16332: URL: https://github.com/apache/kafka/pull/16332#discussion_r1639578562 ## clients/src/main/java/org/apache/kafka/clients/producer/KafkaProducer.java: ## @@ -1220,6 +1223,7 @@ private void ensureValidRecordSize(int size) { * of f

Re: [PR] KAFKA-15259: Processing must continue with flush + commitTnx [kafka]

2024-06-14 Thread via GitHub
aliehsaeedii commented on PR #16332: URL: https://github.com/apache/kafka/pull/16332#issuecomment-2167688944 @C0urante It would be nice if you took a look. Instead of implementing a custom handler for Producer (as suggested in KIP-1038), we came up with the idea of allowing the user to st

Re: [PR] KAFKA-15259: Processing must continue with flush + commitTnx [kafka]

2024-06-14 Thread via GitHub
aliehsaeedii commented on code in PR #16332: URL: https://github.com/apache/kafka/pull/16332#discussion_r1639578562 ## clients/src/main/java/org/apache/kafka/clients/producer/KafkaProducer.java: ## @@ -1220,6 +1223,7 @@ private void ensureValidRecordSize(int size) { * of f

Re: [PR] KAFKA-15259: Processing must continue with flush + commitTnx [kafka]

2024-06-14 Thread via GitHub
artemlivshits commented on code in PR #16332: URL: https://github.com/apache/kafka/pull/16332#discussion_r1640809221 ## clients/src/main/java/org/apache/kafka/clients/producer/KafkaProducer.java: ## @@ -1257,6 +1261,9 @@ public void flush() { this.sender.wakeup();

Re: [PR] KAFKA-15259: Processing must continue with flush + commitTnx [kafka]

2024-06-17 Thread via GitHub
C0urante commented on PR #16332: URL: https://github.com/apache/kafka/pull/16332#issuecomment-2173591962 @aliehsaeedii I like this approach a lot more than the pluggable error handler interface! But it seems like there's risk to this change. Allowing transactions to commit where they would

Re: [PR] KAFKA-15259: Processing must continue with flush + commitTnx [kafka]

2024-06-17 Thread via GitHub
mjsax commented on code in PR #16332: URL: https://github.com/apache/kafka/pull/16332#discussion_r1643028085 ## clients/src/main/java/org/apache/kafka/clients/producer/KafkaProducer.java: ## @@ -1257,6 +1261,9 @@ public void flush() { this.sender.wakeup(); try

Re: [PR] KAFKA-15259: Processing must continue with flush + commitTnx [kafka]

2024-06-17 Thread via GitHub
aliehsaeedii commented on PR #16332: URL: https://github.com/apache/kafka/pull/16332#issuecomment-2173820465 > @aliehsaeedii I like this approach a lot more than the pluggable error handler interface! But it seems like there's risk to this change. Allowing transactions to commit where they

Re: [PR] KAFKA-15259: Processing must continue with flush + commitTnx [kafka]

2024-06-21 Thread via GitHub
artemlivshits commented on code in PR #16332: URL: https://github.com/apache/kafka/pull/16332#discussion_r1645285270 ## clients/src/main/java/org/apache/kafka/clients/producer/KafkaProducer.java: ## @@ -1257,6 +1261,9 @@ public void flush() { this.sender.wakeup();