Re: [PR] KAFKA-15987: Refactor ReplicaManager code for transaction verification [kafka]

2024-01-26 Thread via GitHub


jolshan merged PR #15087:
URL: https://github.com/apache/kafka/pull/15087


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-15987: Refactor ReplicaManager code for transaction verification [kafka]

2024-01-18 Thread via GitHub


jolshan commented on code in PR #15087:
URL: https://github.com/apache/kafka/pull/15087#discussion_r1458144631


##
core/src/test/scala/unit/kafka/server/ReplicaManagerTest.scala:
##
@@ -2560,41 +2562,6 @@ class ReplicaManagerTest {
 }
   }
 
-  @ParameterizedTest
-  @EnumSource(value = classOf[Errors], names = Array("NOT_COORDINATOR", 
"CONCURRENT_TRANSACTIONS", "COORDINATOR_LOAD_IN_PROGRESS", 
"COORDINATOR_NOT_AVAILABLE"))
-  def testMaybeVerificationErrorConversions(error: Errors): Unit = {

Review Comment:
   We also have one for the GroupCoordinator.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-15987: Refactor ReplicaManager code for transaction verification [kafka]

2024-01-18 Thread via GitHub


jolshan commented on code in PR #15087:
URL: https://github.com/apache/kafka/pull/15087#discussion_r1458144452


##
core/src/test/scala/unit/kafka/server/ReplicaManagerTest.scala:
##
@@ -2560,41 +2562,6 @@ class ReplicaManagerTest {
 }
   }
 
-  @ParameterizedTest
-  @EnumSource(value = classOf[Errors], names = Array("NOT_COORDINATOR", 
"CONCURRENT_TRANSACTIONS", "COORDINATOR_LOAD_IN_PROGRESS", 
"COORDINATOR_NOT_AVAILABLE"))
-  def testMaybeVerificationErrorConversions(error: Errors): Unit = {

Review Comment:
   We have the test above in this file. 



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-15987: Refactor ReplicaManager code for transaction verification [kafka]

2024-01-18 Thread via GitHub


jolshan commented on code in PR #15087:
URL: https://github.com/apache/kafka/pull/15087#discussion_r1458143981


##
core/src/main/scala/kafka/server/ReplicaManager.scala:
##
@@ -1188,18 +1073,7 @@ class ReplicaManager(val config: KafkaConfig,
   requestLocal: RequestLocal,
   verificationErrors: Map[TopicPartition, Errors]
 ): Unit = {
-  // Map transaction coordinator errors to known errors for the response
-  val convertedErrors = verificationErrors.map { case (tp, error) =>
-error match {
-  case Errors.CONCURRENT_TRANSACTIONS |
-Errors.COORDINATOR_LOAD_IN_PROGRESS |
-Errors.COORDINATOR_NOT_AVAILABLE |
-Errors.NOT_COORDINATOR => tp -> Errors.NOT_ENOUGH_REPLICAS
-  case _ => tp -> error
-}
-
-  }

Review Comment:
   But yes, we simply pass through concurrent txns which will be fatal to the 
client.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-15987: Refactor ReplicaManager code for transaction verification [kafka]

2024-01-18 Thread via GitHub


jolshan commented on code in PR #15087:
URL: https://github.com/apache/kafka/pull/15087#discussion_r1458143630


##
core/src/main/scala/kafka/server/ReplicaManager.scala:
##
@@ -1188,18 +1073,7 @@ class ReplicaManager(val config: KafkaConfig,
   requestLocal: RequestLocal,
   verificationErrors: Map[TopicPartition, Errors]
 ): Unit = {
-  // Map transaction coordinator errors to known errors for the response
-  val convertedErrors = verificationErrors.map { case (tp, error) =>
-error match {
-  case Errors.CONCURRENT_TRANSACTIONS |
-Errors.COORDINATOR_LOAD_IN_PROGRESS |
-Errors.COORDINATOR_NOT_AVAILABLE |
-Errors.NOT_COORDINATOR => tp -> Errors.NOT_ENOUGH_REPLICAS
-  case _ => tp -> error
-}
-
-  }

Review Comment:
   We have separate handling for produce requests and txn offset commit 
requests.
   
   for produce:
   
   ```
 case Errors.INVALID_TXN_STATE => 
Some(error.exception("Partition was not added to the transaction"))
 case Errors.CONCURRENT_TRANSACTIONS |
  Errors.COORDINATOR_LOAD_IN_PROGRESS |
  Errors.COORDINATOR_NOT_AVAILABLE |
  Errors.NOT_COORDINATOR => Some(new 
NotEnoughReplicasException(
   s"Unable to verify the partition has been added to the 
transaction. Underlying error: ${error.toString}"))
 case _ => None
```
   
   
   for txn offset commit: 
   
   ```
   error match {
 case Errors.UNKNOWN_TOPIC_OR_PARTITION
  | Errors.NOT_ENOUGH_REPLICAS
  | Errors.NOT_ENOUGH_REPLICAS_AFTER_APPEND =>
   Errors.COORDINATOR_NOT_AVAILABLE
   
 case Errors.NOT_LEADER_OR_FOLLOWER
  | Errors.KAFKA_STORAGE_ERROR =>
   Errors.NOT_COORDINATOR
   
 case Errors.MESSAGE_TOO_LARGE
  | Errors.RECORD_LIST_TOO_LARGE
  | Errors.INVALID_FETCH_SIZE =>
   Errors.INVALID_COMMIT_OFFSET_SIZE
   
 // We may see INVALID_TXN_STATE or INVALID_PID_MAPPING here due to 
transaction verification.
 // They can be returned without mapping to a new error.
 case other => other
   }
   ```



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-15987: Refactor ReplicaManager code for transaction verification [kafka]

2024-01-18 Thread via GitHub


jolshan commented on code in PR #15087:
URL: https://github.com/apache/kafka/pull/15087#discussion_r1458066834


##
core/src/main/scala/kafka/server/KafkaApis.scala:
##
@@ -717,17 +717,16 @@ class KafkaApis(val requestChannel: RequestChannel,
   val internalTopicsAllowed = request.header.clientId == 
AdminUtils.ADMIN_CLIENT_ID
 
   // call the replica manager to append messages to the replicas
-  replicaManager.appendRecords(
+  replicaManager.handleProduceAppend(
 timeout = produceRequest.timeout.toLong,
 requiredAcks = produceRequest.acks,
 internalTopicsAllowed = internalTopicsAllowed,
 origin = AppendOrigin.CLIENT,

Review Comment:
   that's fair. I can do that.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-15987: Refactor ReplicaManager code for transaction verification [kafka]

2024-01-18 Thread via GitHub


dajac commented on code in PR #15087:
URL: https://github.com/apache/kafka/pull/15087#discussion_r1457569421


##
core/src/main/scala/kafka/server/KafkaApis.scala:
##
@@ -717,17 +717,16 @@ class KafkaApis(val requestChannel: RequestChannel,
   val internalTopicsAllowed = request.header.clientId == 
AdminUtils.ADMIN_CLIENT_ID
 
   // call the replica manager to append messages to the replicas
-  replicaManager.appendRecords(
+  replicaManager.handleProduceAppend(
 timeout = produceRequest.timeout.toLong,
 requiredAcks = produceRequest.acks,
 internalTopicsAllowed = internalTopicsAllowed,
 origin = AppendOrigin.CLIENT,

Review Comment:
   nit: I wonder if we should remove the `origin` parameter from 
`handleProduceAppend` as it should always come from a client by definition.



##
core/src/test/scala/unit/kafka/server/ReplicaManagerTest.scala:
##
@@ -3041,31 +3007,57 @@ class ReplicaManagerTest {
   origin = origin,
   entriesPerPartition = Map(partition -> records),
   responseCallback = appendCallback,
-  transactionalId = transactionalId,
 )
 
 result
   }
 
-  private def appendRecordsToMultipleTopics(replicaManager: ReplicaManager,
-entriesToAppend: 
Map[TopicPartition, MemoryRecords],
-transactionalId: String,
-origin: AppendOrigin = 
AppendOrigin.CLIENT,
-requiredAcks: Short = -1): 
CallbackResult[Map[TopicPartition, PartitionResponse]] = {
+  private def handleProduceAppendToMultipleTopics(replicaManager: 
ReplicaManager,
+  entriesToAppend: 
Map[TopicPartition, MemoryRecords],
+  transactionalId: String,
+  origin: AppendOrigin = 
AppendOrigin.CLIENT,
+  requiredAcks: Short = -1): 
CallbackResult[Map[TopicPartition, PartitionResponse]] = {
 val result = new CallbackResult[Map[TopicPartition, PartitionResponse]]()
 def appendCallback(responses: Map[TopicPartition, PartitionResponse]): 
Unit = {
   responses.foreach( response => 
assertTrue(responses.get(response._1).isDefined))
   result.fire(responses)
 }
+  replicaManager.handleProduceAppend(

Review Comment:
   nit: Indentation seems off here.



##
core/src/main/scala/kafka/server/ReplicaManager.scala:
##
@@ -1188,18 +1073,7 @@ class ReplicaManager(val config: KafkaConfig,
   requestLocal: RequestLocal,
   verificationErrors: Map[TopicPartition, Errors]
 ): Unit = {
-  // Map transaction coordinator errors to known errors for the response
-  val convertedErrors = verificationErrors.map { case (tp, error) =>
-error match {
-  case Errors.CONCURRENT_TRANSACTIONS |
-Errors.COORDINATOR_LOAD_IN_PROGRESS |
-Errors.COORDINATOR_NOT_AVAILABLE |
-Errors.NOT_COORDINATOR => tp -> Errors.NOT_ENOUGH_REPLICAS
-  case _ => tp -> error
-}
-
-  }

Review Comment:
   For my understanding, we remove this here and we adds it back in 
handleProduceAppend and we rely on the conversion in the group coordinator. Did 
I get it right? In the group coordinator, we don't handle 
`CONCURRENT_TRANSACTIONS`, I think. I need to double check.



##
core/src/test/scala/unit/kafka/server/ReplicaManagerTest.scala:
##
@@ -2560,41 +2562,6 @@ class ReplicaManagerTest {
 }
   }
 
-  @ParameterizedTest
-  @EnumSource(value = classOf[Errors], names = Array("NOT_COORDINATOR", 
"CONCURRENT_TRANSACTIONS", "COORDINATOR_LOAD_IN_PROGRESS", 
"COORDINATOR_NOT_AVAILABLE"))
-  def testMaybeVerificationErrorConversions(error: Errors): Unit = {

Review Comment:
   Don't we need to keep this one as we still have those conversion but in a 
different place now?



##
core/src/main/scala/kafka/server/ReplicaManager.scala:
##
@@ -762,167 +763,124 @@ class ReplicaManager(val config: KafkaConfig,
 delayedProduceLock: Option[Lock] = None,
 recordValidationStatsCallback: Map[TopicPartition, 
RecordValidationStats] => Unit = _ => (),
 requestLocal: RequestLocal = RequestLocal.NoCaching,
-transactionalId: String = null,
-actionQueue: ActionQueue = this.defaultActionQueue): Unit 
= {
-if (isValidRequiredAcks(requiredAcks)) {
-
-  val verificationGuards: mutable.Map[TopicPartition, VerificationGuard] = 
mutable.Map[TopicPartition, VerificationGuard]()
-  val (verifiedEntriesPerPartition, notYetVerifiedEntriesPerPartition, 
errorsPerPartition) =
-if (transactionalId == null || 
!config.transactionPartitionVerificationEnable)
-  (entriesPerPartition, Map.empty

Re: [PR] KAFKA-15987: Refactor ReplicaManager code for transaction verification [kafka]

2024-01-05 Thread via GitHub


jolshan commented on code in PR #15087:
URL: https://github.com/apache/kafka/pull/15087#discussion_r1443537258


##
core/src/main/scala/kafka/server/ReplicaManager.scala:
##
@@ -749,9 +749,11 @@ class ReplicaManager(val config: KafkaConfig,
* @param responseCallback  callback for sending the response
* @param delayedProduceLocklock for the delayed actions
* @param recordValidationStatsCallback callback for updating stats on 
record conversions
-   * @param requestLocal  container for the stateful instances 
scoped to this request
-   * @param transactionalId   transactional ID if the request is 
from a producer and the producer is transactional
+   * @param requestLocal  container for the stateful instances 
scoped to this request -- this must correspond to the
+   *  thread calling this method
* @param actionQueue   the action queue to use. 
ReplicaManager#defaultActionQueue is used by default.
+   * @param verificationGuardsthe mapping from topic partition to 
verification guards if transaction verification is used
+   * @param preAppendErrors   the mapping from topic partition to 
LogAppendResult for errors that occurred before appending

Review Comment:
   This is updated.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-15987: Refactor ReplicaManager code for transaction verification [kafka]

2024-01-03 Thread via GitHub


jolshan commented on code in PR #15087:
URL: https://github.com/apache/kafka/pull/15087#discussion_r1441068240


##
core/src/main/scala/kafka/server/ReplicaManager.scala:
##
@@ -749,9 +749,11 @@ class ReplicaManager(val config: KafkaConfig,
* @param responseCallback  callback for sending the response
* @param delayedProduceLocklock for the delayed actions
* @param recordValidationStatsCallback callback for updating stats on 
record conversions
-   * @param requestLocal  container for the stateful instances 
scoped to this request
-   * @param transactionalId   transactional ID if the request is 
from a producer and the producer is transactional
+   * @param requestLocal  container for the stateful instances 
scoped to this request -- this must correspond to the
+   *  thread calling this method
* @param actionQueue   the action queue to use. 
ReplicaManager#defaultActionQueue is used by default.
+   * @param verificationGuardsthe mapping from topic partition to 
verification guards if transaction verification is used
+   * @param preAppendErrors   the mapping from topic partition to 
LogAppendResult for errors that occurred before appending

Review Comment:
   Oh I see what you are saying -- basically the appendCallback should be 
defined so the verification errors are joined whenever the callback is defined 
(ideally after we get the verification errors).
   
   So the callback for verification would include the definition of the append 
callback. I think this could work.
   
   Time to change all the mock code 😂 



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-15987: Refactor ReplicaManager code for transaction verification [kafka]

2024-01-03 Thread via GitHub


jolshan commented on code in PR #15087:
URL: https://github.com/apache/kafka/pull/15087#discussion_r1441066328


##
core/src/main/scala/kafka/server/ReplicaManager.scala:
##
@@ -749,9 +749,11 @@ class ReplicaManager(val config: KafkaConfig,
* @param responseCallback  callback for sending the response
* @param delayedProduceLocklock for the delayed actions
* @param recordValidationStatsCallback callback for updating stats on 
record conversions
-   * @param requestLocal  container for the stateful instances 
scoped to this request
-   * @param transactionalId   transactional ID if the request is 
from a producer and the producer is transactional
+   * @param requestLocal  container for the stateful instances 
scoped to this request -- this must correspond to the
+   *  thread calling this method
* @param actionQueue   the action queue to use. 
ReplicaManager#defaultActionQueue is used by default.
+   * @param verificationGuardsthe mapping from topic partition to 
verification guards if transaction verification is used
+   * @param preAppendErrors   the mapping from topic partition to 
LogAppendResult for errors that occurred before appending

Review Comment:
   I was considering a write where some partitions were verified and some were 
not. We would allow the ones that succeeded to still write and not the ones 
that failed verification.
   
   Not sure if we support multi-partition writes in the produce api, but it 
appears we do.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-15987: Refactor ReplicaManager code for transaction verification [kafka]

2024-01-03 Thread via GitHub


hachikuji commented on code in PR #15087:
URL: https://github.com/apache/kafka/pull/15087#discussion_r1441051073


##
core/src/main/scala/kafka/server/ReplicaManager.scala:
##
@@ -749,9 +749,11 @@ class ReplicaManager(val config: KafkaConfig,
* @param responseCallback  callback for sending the response
* @param delayedProduceLocklock for the delayed actions
* @param recordValidationStatsCallback callback for updating stats on 
record conversions
-   * @param requestLocal  container for the stateful instances 
scoped to this request
-   * @param transactionalId   transactional ID if the request is 
from a producer and the producer is transactional
+   * @param requestLocal  container for the stateful instances 
scoped to this request -- this must correspond to the
+   *  thread calling this method
* @param actionQueue   the action queue to use. 
ReplicaManager#defaultActionQueue is used by default.
+   * @param verificationGuardsthe mapping from topic partition to 
verification guards if transaction verification is used
+   * @param preAppendErrors   the mapping from topic partition to 
LogAppendResult for errors that occurred before appending

Review Comment:
   This seems a little strange. If the partitions have already failed, then why 
do we need to pass them through `appendRecords`? I would expect instead that 
the caller would just join the pre-append failures with the append failures.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[PR] KAFKA-15987: Refactor ReplicaManager code for transaction verification [kafka]

2023-12-28 Thread via GitHub


jolshan opened a new pull request, #15087:
URL: https://github.com/apache/kafka/pull/15087

   I originally did some refactors in 
https://github.com/apache/kafka/pull/14774, but we decided to keep the changes 
minimal since the ticket was a blocker. Here are those refactors:
   
   * Removed separate append paths so that produce, group coordinator, and 
other append paths all call appendRecords
   * appendRecords has been simplified
   * Removed unneeded error conversions in verification code since group 
coordinator and produce path convert errors differently, removed test for that
   * Fixed incorrect capital param name in KafkaRequestHandler
   * Updated ReplicaManager test to handle transaction verification appends 
separately. (I may revise this)
   
   
   ### Committer Checklist (excluded from commit message)
   - [ ] Verify design and implementation 
   - [ ] Verify test coverage and CI build status
   - [ ] Verify documentation (including upgrade notes)
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org