dajac commented on code in PR #19251:
URL: https://github.com/apache/kafka/pull/19251#discussion_r2005969287


##########
core/src/main/scala/kafka/server/KafkaApis.scala:
##########
@@ -1635,40 +1635,8 @@ class KafkaApis(val requestChannel: RequestChannel,
       trace(s"End transaction marker append for producer id $producerId 
completed with status: $currentErrors")
       updateErrors(producerId, currentErrors)
 
-      def maybeSendResponse(): Unit = {
-        if (numAppends.decrementAndGet() == 0) {
-          requestHelper.sendResponseExemptThrottle(request, new 
WriteTxnMarkersResponse(errors))
-        }
-      }
-
-      // The new group coordinator uses GroupCoordinator#completeTransaction 
so we do
-      // not need to call GroupCoordinator#onTransactionCompleted here.
-      if (config.isNewGroupCoordinatorEnabled) {
-        maybeSendResponse()
-        return
-      }
-
-      val successfulOffsetsPartitions = currentErrors.asScala.filter { case 
(topicPartition, error) =>
-        topicPartition.topic == GROUP_METADATA_TOPIC_NAME && error == 
Errors.NONE
-      }.keys
-
-      // If no end transaction marker has been written to a __consumer_offsets 
partition, we do not
-      // need to call GroupCoordinator#onTransactionCompleted.
-      if (successfulOffsetsPartitions.isEmpty) {
-        maybeSendResponse()
-        return
-      }
-
-      // Otherwise, we call GroupCoordinator#onTransactionCompleted to 
materialize the offsets
-      // into the cache and we wait until the meterialization is completed.
-      groupCoordinator.onTransactionCompleted(producerId, 
successfulOffsetsPartitions.asJava, result).whenComplete { (_, exception) =>
-        if (exception != null) {
-          error(s"Received an exception while trying to update the offsets 
cache on transaction marker append", exception)
-          val updatedErrors = new ConcurrentHashMap[TopicPartition, Errors]()
-          successfulOffsetsPartitions.foreach(updatedErrors.put(_, 
Errors.UNKNOWN_SERVER_ERROR))
-          updateErrors(producerId, updatedErrors)
-        }
-        maybeSendResponse()
+      if (numAppends.decrementAndGet() == 0) {

Review Comment:
   This is the only place where there is a change impacting the core logic. I 
basically kept only the logic related to the new group coordinator.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to