Repository: kafka
Updated Branches:
  refs/heads/trunk da0b5b859 -> 7258a5fdd


KAFKA-5160; KIP-98 Broker side support for TxnOffsetCommitRequest

This patch adds support for the `TxnOffsetCommitRequest` added in KIP-98. 
Desired handling for this request is [described 
here](https://docs.google.com/document/d/11Jqy_GjUGtdXJK94XGsEIK7CP1SnQGdp2eF0wSw9ra8/edit#bookmark=id.55yzhvkppi6m)
 .

The functionality includes handling the stable state of receiving 
`TxnOffsetCommitRequests` and materializing results only when the commit marker 
for the transaction is received. It also handles partition emigration and 
immigration and rebuilds the required data structures on these events.

Tests are included for all the functionality.

Author: Apurva Mehta <apu...@confluent.io>

Reviewers: Jason Gustafson <ja...@confluent.io>

Closes #2970 from 
apurvam/KAFKA-5160-broker-side-support-for-txnoffsetcommitrequest


Project: http://git-wip-us.apache.org/repos/asf/kafka/repo
Commit: http://git-wip-us.apache.org/repos/asf/kafka/commit/7258a5fd
Tree: http://git-wip-us.apache.org/repos/asf/kafka/tree/7258a5fd
Diff: http://git-wip-us.apache.org/repos/asf/kafka/diff/7258a5fd

Branch: refs/heads/trunk
Commit: 7258a5fddf3fb77480cd414819cdbfbd96b709e5
Parents: da0b5b8
Author: Apurva Mehta <apu...@confluent.io>
Authored: Fri May 12 16:27:11 2017 -0700
Committer: Jason Gustafson <ja...@confluent.io>
Committed: Fri May 12 16:27:11 2017 -0700

----------------------------------------------------------------------
 .../kafka/common/record/MemoryRecords.java      |  18 +-
 .../coordinator/group/GroupCoordinator.scala    |  28 +-
 .../kafka/coordinator/group/GroupMetadata.scala |  63 ++++-
 .../group/GroupMetadataManager.scala            | 209 +++++++++++---
 core/src/main/scala/kafka/log/Log.scala         |   3 +-
 .../scala/kafka/log/ProducerStateManager.scala  |  43 +--
 .../src/main/scala/kafka/server/KafkaApis.scala |  94 ++++++-
 .../group/GroupCoordinatorResponseTest.scala    | 249 ++++++++++++++++-
 .../group/GroupMetadataManagerTest.scala        | 276 +++++++++++++++++++
 .../kafka/log/ProducerStateManagerTest.scala    |  14 +-
 10 files changed, 906 insertions(+), 91 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/kafka/blob/7258a5fd/clients/src/main/java/org/apache/kafka/common/record/MemoryRecords.java
----------------------------------------------------------------------
diff --git 
a/clients/src/main/java/org/apache/kafka/common/record/MemoryRecords.java 
b/clients/src/main/java/org/apache/kafka/common/record/MemoryRecords.java
index a222cc3..cec309e 100644
--- a/clients/src/main/java/org/apache/kafka/common/record/MemoryRecords.java
+++ b/clients/src/main/java/org/apache/kafka/common/record/MemoryRecords.java
@@ -398,8 +398,24 @@ public class MemoryRecords extends AbstractRecords {
                                                int baseSequence,
                                                boolean isTransactional,
                                                int partitionLeaderEpoch) {
+        return builder(buffer, magic, compressionType, timestampType, 
baseOffset,
+                logAppendTime, producerId, producerEpoch, baseSequence, 
isTransactional, false, partitionLeaderEpoch);
+    }
+
+    public static MemoryRecordsBuilder builder(ByteBuffer buffer,
+                                               byte magic,
+                                               CompressionType compressionType,
+                                               TimestampType timestampType,
+                                               long baseOffset,
+                                               long logAppendTime,
+                                               long producerId,
+                                               short producerEpoch,
+                                               int baseSequence,
+                                               boolean isTransactional,
+                                               boolean isControlBatch,
+                                               int partitionLeaderEpoch) {
         return new MemoryRecordsBuilder(buffer, magic, compressionType, 
timestampType, baseOffset,
-                logAppendTime, producerId, producerEpoch, baseSequence, 
isTransactional, false, partitionLeaderEpoch,
+                logAppendTime, producerId, producerEpoch, baseSequence, 
isTransactional, isControlBatch, partitionLeaderEpoch,
                 buffer.remaining());
     }
 

http://git-wip-us.apache.org/repos/asf/kafka/blob/7258a5fd/core/src/main/scala/kafka/coordinator/group/GroupCoordinator.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/coordinator/group/GroupCoordinator.scala 
b/core/src/main/scala/kafka/coordinator/group/GroupCoordinator.scala
index f5b1a29..a57b6be 100644
--- a/core/src/main/scala/kafka/coordinator/group/GroupCoordinator.scala
+++ b/core/src/main/scala/kafka/coordinator/group/GroupCoordinator.scala
@@ -19,14 +19,15 @@ package kafka.coordinator.group
 import java.util.Properties
 import java.util.concurrent.atomic.AtomicBoolean
 
-import kafka.common.OffsetAndMetadata
+import kafka.common.{OffsetAndMetadata, Topic}
 import kafka.log.LogConfig
 import kafka.message.ProducerCompressionCodec
 import kafka.server._
 import kafka.utils._
 import org.apache.kafka.common.TopicPartition
 import org.apache.kafka.common.protocol.Errors
-import org.apache.kafka.common.requests.{JoinGroupRequest, OffsetFetchResponse}
+import org.apache.kafka.common.record.RecordBatch
+import org.apache.kafka.common.requests.{JoinGroupRequest, 
OffsetFetchResponse, TransactionResult}
 import org.apache.kafka.common.utils.Time
 
 import scala.collection.{Map, Seq, immutable}
@@ -397,7 +398,9 @@ class GroupCoordinator(val brokerId: Int,
                           memberId: String,
                           generationId: Int,
                           offsetMetadata: immutable.Map[TopicPartition, 
OffsetAndMetadata],
-                          responseCallback: immutable.Map[TopicPartition, 
Errors] => Unit) {
+                          responseCallback: immutable.Map[TopicPartition, 
Errors] => Unit,
+                          producerId: Long = RecordBatch.NO_PRODUCER_ID,
+                          producerEpoch: Short = 
RecordBatch.NO_PRODUCER_EPOCH) {
     if (!isActive.get) {
       responseCallback(offsetMetadata.mapValues(_ => 
Errors.COORDINATOR_NOT_AVAILABLE))
     } else if (!isCoordinatorForGroup(groupId)) {
@@ -410,32 +413,41 @@ class GroupCoordinator(val brokerId: Int,
           if (generationId < 0) {
             // the group is not relying on Kafka for group management, so 
allow the commit
             val group = groupManager.addGroup(new GroupMetadata(groupId))
-            doCommitOffsets(group, memberId, generationId, offsetMetadata, 
responseCallback)
+            doCommitOffsets(group, memberId, generationId, producerId, 
producerEpoch, offsetMetadata, responseCallback)
           } else {
             // or this is a request coming from an older generation. either 
way, reject the commit
             responseCallback(offsetMetadata.mapValues(_ => 
Errors.ILLEGAL_GENERATION))
           }
 
         case Some(group) =>
-          doCommitOffsets(group, memberId, generationId, offsetMetadata, 
responseCallback)
+          doCommitOffsets(group, memberId, generationId, producerId, 
producerEpoch, offsetMetadata, responseCallback)
       }
     }
   }
 
+  def handleTxnCompletion(producerId: Long,
+                          topicPartitions: Seq[TopicPartition],
+                          transactionResult: TransactionResult) {
+    val offsetPartitions = topicPartitions.filter(_.topic() == 
Topic.GroupMetadataTopicName).map(_.partition).toSet
+    groupManager.handleTxnCompletion(producerId, offsetPartitions, 
transactionResult == TransactionResult.COMMIT)
+  }
+
   private def doCommitOffsets(group: GroupMetadata,
                               memberId: String,
                               generationId: Int,
+                              producerId: Long,
+                              producerEpoch: Short,
                               offsetMetadata: immutable.Map[TopicPartition, 
OffsetAndMetadata],
                               responseCallback: immutable.Map[TopicPartition, 
Errors] => Unit) {
     var delayedOffsetStore: Option[DelayedStore] = None
-
     group synchronized {
       if (group.is(Dead)) {
         responseCallback(offsetMetadata.mapValues(_ => 
Errors.UNKNOWN_MEMBER_ID))
-      } else if (generationId < 0 && group.is(Empty)) {
+      } else if ((generationId < 0 && group.is(Empty)) || (producerId != 
RecordBatch.NO_PRODUCER_ID)) {
         // the group is only using Kafka to store offsets
+        // Also, for transactional offset commits we don't need to validate 
group membership and the generation.
         delayedOffsetStore = groupManager.prepareStoreOffsets(group, memberId, 
generationId,
-          offsetMetadata, responseCallback)
+          offsetMetadata, responseCallback, producerId, producerEpoch)
       } else if (group.is(AwaitingSync)) {
         responseCallback(offsetMetadata.mapValues(_ => 
Errors.REBALANCE_IN_PROGRESS))
       } else if (!group.has(memberId)) {

http://git-wip-us.apache.org/repos/asf/kafka/blob/7258a5fd/core/src/main/scala/kafka/coordinator/group/GroupMetadata.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/coordinator/group/GroupMetadata.scala 
b/core/src/main/scala/kafka/coordinator/group/GroupMetadata.scala
index 44f3f2b..8122694 100644
--- a/core/src/main/scala/kafka/coordinator/group/GroupMetadata.scala
+++ b/core/src/main/scala/kafka/coordinator/group/GroupMetadata.scala
@@ -19,7 +19,7 @@ package kafka.coordinator.group
 import java.util.UUID
 
 import kafka.common.OffsetAndMetadata
-import kafka.utils.nonthreadsafe
+import kafka.utils.{Logging, nonthreadsafe}
 import org.apache.kafka.common.TopicPartition
 
 import scala.collection.{Seq, immutable, mutable}
@@ -140,12 +140,16 @@ case class GroupSummary(state: String,
  *  3. leader id
  */
 @nonthreadsafe
-private[group] class GroupMetadata(val groupId: String, initialState: 
GroupState = Empty) {
+private[group] class GroupMetadata(val groupId: String, initialState: 
GroupState = Empty) extends Logging {
 
   private var state: GroupState = initialState
   private val members = new mutable.HashMap[String, MemberMetadata]
   private val offsets = new mutable.HashMap[TopicPartition, OffsetAndMetadata]
   private val pendingOffsetCommits = new mutable.HashMap[TopicPartition, 
OffsetAndMetadata]
+  // A map from a PID to the open offset commits for that pid.
+  private val pendingTransactionalOffsetCommits = new mutable.HashMap[Long, 
mutable.Map[TopicPartition, OffsetAndMetadata]]()
+  private var receivedTransactionalOffsetCommits = false
+  private var receivedConsumerOffsetCommits = false
 
   var protocolType: Option[String] = None
   var generationId = 0
@@ -243,6 +247,8 @@ private[group] class GroupMetadata(val groupId: String, 
initialState: GroupState
       protocol = null
       transitionTo(Empty)
     }
+    receivedConsumerOffsetCommits = false
+    receivedTransactionalOffsetCommits = false
   }
 
   def currentMemberMetadata: Map[String, Array[Byte]] = {
@@ -265,8 +271,10 @@ private[group] class GroupMetadata(val groupId: String, 
initialState: GroupState
     GroupOverview(groupId, protocolType.getOrElse(""))
   }
 
-  def initializeOffsets(offsets: collection.Map[TopicPartition, 
OffsetAndMetadata]) {
-     this.offsets ++= offsets
+  def initializeOffsets(offsets: collection.Map[TopicPartition, 
OffsetAndMetadata],
+                        pendingTxnOffsets: Map[Long, 
mutable.Map[TopicPartition, OffsetAndMetadata]]) {
+    this.offsets ++= offsets
+    this.pendingTransactionalOffsetCommits ++= pendingTxnOffsets
   }
 
   def completePendingOffsetWrite(topicPartition: TopicPartition, offset: 
OffsetAndMetadata) {
@@ -287,12 +295,57 @@ private[group] class GroupMetadata(val groupId: String, 
initialState: GroupState
   }
 
   def prepareOffsetCommit(offsets: Map[TopicPartition, OffsetAndMetadata]) {
+    receivedConsumerOffsetCommits = true
     pendingOffsetCommits ++= offsets
   }
 
+  def prepareTxnOffsetCommit(producerId: Long, offsets: Map[TopicPartition, 
OffsetAndMetadata]) {
+    receivedTransactionalOffsetCommits = true
+    val producerOffsets = 
pendingTransactionalOffsetCommits.getOrElseUpdate(producerId, 
mutable.Map.empty[TopicPartition, OffsetAndMetadata])
+    producerOffsets ++= offsets
+  }
+
+  def hasReceivedConsistentOffsetCommits : Boolean = {
+    !receivedConsumerOffsetCommits || !receivedTransactionalOffsetCommits
+  }
+
+  /* Remove a pending transactional offset commit if the actual offset commit 
record was not written to the log.
+   * We will return an error and the client will retry the request, 
potentially to a different coordinator.
+   */
+  def failPendingTxnOffsetCommit(producerId: Long, topicPartition: 
TopicPartition, offsetAndMetadata: OffsetAndMetadata): Unit = {
+    pendingTransactionalOffsetCommits.get(producerId) match {
+      case Some(pendingOffsets) =>
+        pendingOffsets.remove(topicPartition)
+        if (pendingOffsets.isEmpty)
+          pendingTransactionalOffsetCommits.remove(producerId)
+      case _ =>
+        // We may hit this case if the partition in question has emigrated 
already.
+    }
+  }
+
+  /* Complete a pending transactional offset commit. This is called after a 
commit or abort marker is fully written
+   * to the log.
+   */
+  def completePendingTxnOffsetCommit(producerId: Long, isCommit: Boolean): 
Unit = {
+    trace(s"Completing transactional offset commit for producer $producerId 
and group $groupId. isCommit: $isCommit")
+    if (isCommit) {
+      val producerOffsets = 
pendingTransactionalOffsetCommits.getOrElse(producerId, 
Map.empty[TopicPartition, OffsetAndMetadata])
+      offsets ++= producerOffsets
+    }
+    pendingTransactionalOffsetCommits.remove(producerId)
+  }
+
+  def activeProducers = pendingTransactionalOffsetCommits.keySet
+
+  def hasPendingOffsetCommitsFromProducer(producerId: Long) =
+    pendingTransactionalOffsetCommits.contains(producerId)
+
   def removeOffsets(topicPartitions: Seq[TopicPartition]): 
immutable.Map[TopicPartition, OffsetAndMetadata] = {
     topicPartitions.flatMap { topicPartition =>
       pendingOffsetCommits.remove(topicPartition)
+      pendingTransactionalOffsetCommits.foreach { case (_, pendingOffsets) =>
+        pendingOffsets.remove(topicPartition)
+      }
       val removedOffset = offsets.remove(topicPartition)
       removedOffset.map(topicPartition -> _)
     }.toMap
@@ -312,7 +365,7 @@ private[group] class GroupMetadata(val groupId: String, 
initialState: GroupState
 
   def numOffsets = offsets.size
 
-  def hasOffsets = offsets.nonEmpty || pendingOffsetCommits.nonEmpty
+  def hasOffsets = offsets.nonEmpty || pendingOffsetCommits.nonEmpty || 
pendingTransactionalOffsetCommits.nonEmpty
 
   private def assertValidTransition(targetState: GroupState) {
     if (!GroupMetadata.validPreviousStates(targetState).contains(state))

http://git-wip-us.apache.org/repos/asf/kafka/blob/7258a5fd/core/src/main/scala/kafka/coordinator/group/GroupMetadataManager.scala
----------------------------------------------------------------------
diff --git 
a/core/src/main/scala/kafka/coordinator/group/GroupMetadataManager.scala 
b/core/src/main/scala/kafka/coordinator/group/GroupMetadataManager.scala
index 3eafdb7..222bcdc 100644
--- a/core/src/main/scala/kafka/coordinator/group/GroupMetadataManager.scala
+++ b/core/src/main/scala/kafka/coordinator/group/GroupMetadataManager.scala
@@ -75,6 +75,11 @@ class GroupMetadataManager(brokerId: Int,
   /* single-thread scheduler to handle offset/group metadata cache loading and 
unloading */
   private val scheduler = new KafkaScheduler(threads = 1, threadNamePrefix = 
"group-metadata-manager-")
 
+  /* The groups with open transactional offsets commits per producer. We need 
this because when the commit or abort
+   * marker comes in for a transaction, it is for a particular partition on 
the offsets topic and a particular producerId.
+   * We use this structure to quickly find the groups which need to be updated 
by the commit/abort marker. */
+  private val openGroupsForProducer = mutable.HashMap[Long, 
mutable.Set[String]]()
+
   this.logIdent = "[Group Metadata Manager on Broker " + brokerId + "]: "
 
   newGauge("NumOffsets",
@@ -114,6 +119,13 @@ class GroupMetadataManager(brokerId: Int,
 
   def isLoading(): Boolean = inLock(partitionLock) { 
loadingPartitions.nonEmpty }
 
+  // visible for testing
+  private[group] def isGroupOpenForProducer(producerId: Long, groupId: String) 
= openGroupsForProducer.get(producerId) match {
+    case Some(groups) =>
+      groups.contains(groupId)
+    case None =>
+      false
+  }
   /**
    * Get the group associated with the given groupId, or null if not found
    */
@@ -238,12 +250,22 @@ class GroupMetadataManager(brokerId: Int,
                           consumerId: String,
                           generationId: Int,
                           offsetMetadata: immutable.Map[TopicPartition, 
OffsetAndMetadata],
-                          responseCallback: immutable.Map[TopicPartition, 
Errors] => Unit): Option[DelayedStore] = {
+                          responseCallback: immutable.Map[TopicPartition, 
Errors] => Unit,
+                          producerId: Long = RecordBatch.NO_PRODUCER_ID,
+                          producerEpoch: Short = 
RecordBatch.NO_PRODUCER_EPOCH): Option[DelayedStore] = {
     // first filter out partitions with offset metadata size exceeding limit
     val filteredOffsetMetadata = offsetMetadata.filter { case (_, 
offsetAndMetadata) =>
       validateOffsetMetadataLength(offsetAndMetadata.metadata)
     }
 
+    group synchronized {
+      if (!group.hasReceivedConsistentOffsetCommits)
+        warn(s"group: ${group.groupId} with leader: ${group.leaderId} has 
received offset commits from consumers as well " +
+          s"as transactional producers. Mixing both types of offset commits 
will generally result in surprises and " +
+          s"should be avoided.")
+    }
+
+    val isTxnOffsetCommit = producerId != RecordBatch.NO_PRODUCER_ID
     // construct the message set to append
     if (filteredOffsetMetadata.isEmpty) {
       // compute the final error codes for the commit response
@@ -264,7 +286,13 @@ class GroupMetadataManager(brokerId: Int,
           }
           val offsetTopicPartition = new 
TopicPartition(Topic.GroupMetadataTopicName, partitionFor(group.groupId))
           val buffer = 
ByteBuffer.allocate(AbstractRecords.estimateSizeInBytes(magicValue, 
compressionType, records.asJava))
-          val builder = MemoryRecords.builder(buffer, magicValue, 
compressionType, timestampType, 0L)
+
+          if (isTxnOffsetCommit && magicValue < RecordBatch.MAGIC_VALUE_V2)
+            throw Errors.UNSUPPORTED_FOR_MESSAGE_FORMAT.exception("Attempting 
to make a transaction offset commit with an invalid magic: " + magicValue)
+
+          val builder = MemoryRecords.builder(buffer, magicValue, 
compressionType, timestampType, 0L, time.milliseconds(),
+            producerId, producerEpoch, 0, isTxnOffsetCommit, 
RecordBatch.NO_PARTITION_LEADER_EPOCH)
+
           records.foreach(builder.append)
           val entries = Map(offsetTopicPartition -> builder.build())
 
@@ -281,7 +309,7 @@ class GroupMetadataManager(brokerId: Int,
 
             val responseError = group synchronized {
               if (status.error == Errors.NONE) {
-                if (!group.is(Dead)) {
+                if (!group.is(Dead) && !isTxnOffsetCommit) {
                   filteredOffsetMetadata.foreach { case (topicPartition, 
offsetAndMetadata) =>
                     group.completePendingOffsetWrite(topicPartition, 
offsetAndMetadata)
                   }
@@ -289,8 +317,13 @@ class GroupMetadataManager(brokerId: Int,
                 Errors.NONE
               } else {
                 if (!group.is(Dead)) {
+                  if (!group.hasPendingOffsetCommitsFromProducer(producerId))
+                    removeProducerGroup(producerId, group.groupId)
                   filteredOffsetMetadata.foreach { case (topicPartition, 
offsetAndMetadata) =>
-                    group.failPendingOffsetWrite(topicPartition, 
offsetAndMetadata)
+                    if (isTxnOffsetCommit)
+                      group.failPendingTxnOffsetCommit(producerId, 
topicPartition, offsetAndMetadata)
+                    else
+                      group.failPendingOffsetWrite(topicPartition, 
offsetAndMetadata)
                   }
                 }
 
@@ -329,8 +362,15 @@ class GroupMetadataManager(brokerId: Int,
             responseCallback(commitStatus)
           }
 
-          group synchronized {
-            group.prepareOffsetCommit(offsetMetadata)
+          if (isTxnOffsetCommit) {
+            group synchronized {
+              addProducerGroup(producerId, group.groupId)
+              group.prepareTxnOffsetCommit(producerId, offsetMetadata)
+            }
+          } else {
+            group synchronized {
+              group.prepareOffsetCommit(offsetMetadata)
+            }
           }
 
           Some(DelayedStore(entries, putCacheCallback))
@@ -434,7 +474,7 @@ class GroupMetadataManager(brokerId: Int,
 
         // loop breaks if leader changes at any time during the load, since 
getHighWatermark is -1
         val loadedOffsets = mutable.Map[GroupTopicPartition, 
OffsetAndMetadata]()
-        val removedOffsets = mutable.Set[GroupTopicPartition]()
+        val pendingOffsets = mutable.Map[Long, 
mutable.Map[GroupTopicPartition, OffsetAndMetadata]]()
         val loadedGroups = mutable.Map[String, GroupMetadata]()
         val removedGroups = mutable.Set[String]()
 
@@ -451,59 +491,99 @@ class GroupMetadataManager(brokerId: Int,
           }
 
           memRecords.batches.asScala.foreach { batch =>
-            for (record <- batch.asScala) {
-              require(record.hasKey, "Group metadata/offset entry key should 
not be null")
-              GroupMetadataManager.readMessageKey(record.key) match {
-
-                case offsetKey: OffsetKey =>
-                  // load offset
-                  val key = offsetKey.key
-                  if (!record.hasValue) {
-                    loadedOffsets.remove(key)
-                    removedOffsets.add(key)
-                  } else {
-                    val value = 
GroupMetadataManager.readOffsetMessageValue(record.value)
-                    loadedOffsets.put(key, value)
-                    removedOffsets.remove(key)
-                  }
-
-                case groupMetadataKey: GroupMetadataKey =>
-                  // load group metadata
-                  val groupId = groupMetadataKey.key
-                  val groupMetadata = 
GroupMetadataManager.readGroupMessageValue(groupId, record.value)
-                  if (groupMetadata != null) {
-                    trace(s"Loaded group metadata for group $groupId with 
generation ${groupMetadata.generationId}")
-                    removedGroups.remove(groupId)
-                    loadedGroups.put(groupId, groupMetadata)
-                  } else {
-                    loadedGroups.remove(groupId)
-                    removedGroups.add(groupId)
+            val isTxnOffsetCommit = batch.isTransactional
+            if (batch.isControlBatch) {
+              val record = batch.iterator.next()
+              val controlRecord = ControlRecordType.parse(record.key)
+              if (controlRecord == ControlRecordType.COMMIT) {
+                pendingOffsets.getOrElse(batch.producerId, 
mutable.Map[GroupTopicPartition, OffsetAndMetadata]())
+                  .foreach {
+                    case (groupTopicPartition, offsetAndMetadata) =>
+                      loadedOffsets.put(groupTopicPartition, offsetAndMetadata)
                   }
-
-                case unknownKey =>
-                  throw new IllegalStateException(s"Unexpected message key 
$unknownKey while loading offsets and group metadata")
               }
-
-              currOffset = batch.nextOffset
+              pendingOffsets.remove(batch.producerId)
+            } else {
+              for (record <- batch.asScala) {
+                require(record.hasKey, "Group metadata/offset entry key should 
not be null")
+                GroupMetadataManager.readMessageKey(record.key) match {
+
+                  case offsetKey: OffsetKey =>
+                    if (isTxnOffsetCommit && 
!pendingOffsets.contains(batch.producerId))
+                      pendingOffsets.put(batch.producerId, 
mutable.Map[GroupTopicPartition, OffsetAndMetadata]())
+
+                    // load offset
+                    val groupTopicPartition = offsetKey.key
+                    if (!record.hasValue) {
+                      if (isTxnOffsetCommit)
+                        
pendingOffsets(batch.producerId).remove(groupTopicPartition)
+                      else
+                        loadedOffsets.remove(groupTopicPartition)
+                    } else {
+                      val value = 
GroupMetadataManager.readOffsetMessageValue(record.value)
+                      if (isTxnOffsetCommit)
+                        
pendingOffsets(batch.producerId).put(groupTopicPartition, value)
+                      else
+                        loadedOffsets.put(groupTopicPartition, value)
+                    }
+
+                  case groupMetadataKey: GroupMetadataKey =>
+                    // load group metadata
+                    val groupId = groupMetadataKey.key
+                    val groupMetadata = 
GroupMetadataManager.readGroupMessageValue(groupId, record.value)
+                    if (groupMetadata != null) {
+                      trace(s"Loaded group metadata for group $groupId with 
generation ${groupMetadata.generationId}")
+                      removedGroups.remove(groupId)
+                      loadedGroups.put(groupId, groupMetadata)
+                    } else {
+                      loadedGroups.remove(groupId)
+                      removedGroups.add(groupId)
+                    }
+
+                  case unknownKey =>
+                    throw new IllegalStateException(s"Unexpected message key 
$unknownKey while loading offsets and group metadata")
+                }
+              }
             }
+            currOffset = batch.nextOffset
           }
 
+
           val (groupOffsets, emptyGroupOffsets) = loadedOffsets
             .groupBy(_._1.group)
             .mapValues(_.map { case (groupTopicPartition, offset) => 
(groupTopicPartition.topicPartition, offset) })
             .partition { case (group, _) => loadedGroups.contains(group) }
 
+          val pendingOffsetsByGroup = mutable.Map[String, mutable.Map[Long, 
mutable.Map[TopicPartition, OffsetAndMetadata]]]()
+          pendingOffsets.foreach { case (producerId, producerOffsets) =>
+            
producerOffsets.keySet.map(_.group).foreach(addProducerGroup(producerId, _))
+            producerOffsets
+              .groupBy(_._1.group)
+              .mapValues(_.map { case (groupTopicPartition, offset) => 
(groupTopicPartition.topicPartition, offset)})
+              .foreach { case (group, offsets) =>
+                val groupPendingOffsets = 
pendingOffsetsByGroup.getOrElseUpdate(group, mutable.Map.empty[Long, 
mutable.Map[TopicPartition, OffsetAndMetadata]])
+                val groupProducerOffsets = 
groupPendingOffsets.getOrElseUpdate(producerId, 
mutable.Map.empty[TopicPartition, OffsetAndMetadata])
+                groupProducerOffsets ++= offsets
+              }
+          }
+
+          val (pendingGroupOffsets, pendingEmptyGroupOffsets) = 
pendingOffsetsByGroup
+            .partition { case (group, _) => loadedGroups.contains(group)}
+
           loadedGroups.values.foreach { group =>
             val offsets = groupOffsets.getOrElse(group.groupId, 
Map.empty[TopicPartition, OffsetAndMetadata])
-            loadGroup(group, offsets)
+            val pendingOffsets = pendingGroupOffsets.getOrElse(group.groupId, 
Map.empty[Long, mutable.Map[TopicPartition, OffsetAndMetadata]])
+            loadGroup(group, offsets, pendingOffsets)
             onGroupLoaded(group)
           }
 
           // load groups which store offsets in kafka, but which have no 
active members and thus no group
           // metadata stored in the log
-          emptyGroupOffsets.foreach { case (groupId, offsets) =>
+          (emptyGroupOffsets.keySet ++ 
pendingEmptyGroupOffsets.keySet).foreach { case(groupId) =>
             val group = new GroupMetadata(groupId)
-            loadGroup(group, offsets)
+            val offsets = emptyGroupOffsets.getOrElse(groupId, 
Map.empty[TopicPartition, OffsetAndMetadata])
+            val pendingOffsets = pendingEmptyGroupOffsets.getOrElse(groupId, 
Map.empty[Long, mutable.Map[TopicPartition, OffsetAndMetadata]])
+            loadGroup(group, offsets, pendingOffsets)
             onGroupLoaded(group)
           }
 
@@ -523,7 +603,8 @@ class GroupMetadataManager(brokerId: Int,
     }
   }
 
-  private def loadGroup(group: GroupMetadata, offsets: Map[TopicPartition, 
OffsetAndMetadata]): Unit = {
+  private def loadGroup(group: GroupMetadata, offsets: Map[TopicPartition, 
OffsetAndMetadata],
+                        pendingTransactionalOffsets: Map[Long, 
mutable.Map[TopicPartition, OffsetAndMetadata]]): Unit = {
     // offsets are initialized prior to loading the group into the cache to 
ensure that clients see a consistent
     // view of the group's offsets
     val loadedOffsets = offsets.mapValues { offsetAndMetadata =>
@@ -535,7 +616,7 @@ class GroupMetadataManager(brokerId: Int,
         offsetAndMetadata
     }
     trace(s"Initialized offsets $loadedOffsets for group ${group.groupId}")
-    group.initializeOffsets(loadedOffsets)
+    group.initializeOffsets(loadedOffsets, pendingTransactionalOffsets.toMap)
 
     val currentGroup = addGroup(group)
     if (group != currentGroup)
@@ -567,6 +648,7 @@ class GroupMetadataManager(brokerId: Int,
           if (partitionFor(group.groupId) == offsetsPartition) {
             onGroupUnloaded(group)
             groupMetadataCache.remove(group.groupId, group)
+            removeGroupFromAllProducers(group.groupId)
             numGroupsRemoved += 1
             numOffsetsRemoved += group.numOffsets
           }
@@ -660,6 +742,45 @@ class GroupMetadataManager(brokerId: Int,
     info(s"Removed $offsetsRemoved expired offsets in ${time.milliseconds() - 
startMs} milliseconds.")
   }
 
+  def handleTxnCompletion(producerId: Long, completedPartitions: Set[Int], 
isCommit: Boolean) {
+    val pendingGroups = groupsBelongingToPartitions(producerId, 
completedPartitions)
+    pendingGroups.foreach { case (groupId) =>
+      getGroup(groupId) match {
+        case Some(group) => group synchronized {
+          if (!group.is(Dead)) {
+            group.completePendingTxnOffsetCommit(producerId, isCommit)
+            removeProducerGroup(producerId, groupId)
+          }
+       }
+        case _ =>
+          info(s"Group $groupId has moved away from $brokerId after 
transaction marker was written but before the " +
+            s"cache was updated. The cache on the new group owner will be 
updated instead.")
+      }
+    }
+  }
+
+  private def addProducerGroup(producerId: Long, groupId: String) = 
openGroupsForProducer synchronized {
+    openGroupsForProducer.getOrElseUpdate(producerId, 
mutable.Set.empty[String]).add(groupId)
+  }
+
+  private def removeProducerGroup(producerId: Long, groupId: String) = 
openGroupsForProducer synchronized {
+    openGroupsForProducer.getOrElseUpdate(producerId, 
mutable.Set.empty[String]).remove(groupId)
+    if (openGroupsForProducer(producerId).isEmpty)
+      openGroupsForProducer.remove(producerId)
+  }
+
+  private def groupsBelongingToPartitions(producerId: Long, partitions: 
Set[Int]) = openGroupsForProducer synchronized {
+    val (ownedGroups, _) = openGroupsForProducer.getOrElse(producerId, 
mutable.Set.empty[String])
+      .partition { case (group) => partitions.contains(partitionFor(group)) }
+    ownedGroups
+  }
+
+  private def removeGroupFromAllProducers(groupId: String) = 
openGroupsForProducer synchronized {
+    openGroupsForProducer.foreach { case (_, groups) =>
+      groups.remove(groupId)
+    }
+  }
+
   /*
    * Check if the offset metadata length is valid
    */

http://git-wip-us.apache.org/repos/asf/kafka/blob/7258a5fd/core/src/main/scala/kafka/log/Log.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/log/Log.scala 
b/core/src/main/scala/kafka/log/Log.scala
index 77ab25d..ed4eff2 100644
--- a/core/src/main/scala/kafka/log/Log.scala
+++ b/core/src/main/scala/kafka/log/Log.scala
@@ -778,7 +778,8 @@ class Log(@volatile var dir: File,
                               loadingFromLog: Boolean): Unit = {
     val pid = batch.producerId
     val appendInfo = producers.getOrElseUpdate(pid, new 
ProducerAppendInfo(pid, lastEntry, loadingFromLog))
-    val maybeCompletedTxn = appendInfo.append(batch)
+    val shouldValidateSequenceNumbers = topicPartition.topic() != 
Topic.GroupMetadataTopicName
+    val maybeCompletedTxn = appendInfo.append(batch, 
shouldValidateSequenceNumbers)
     maybeCompletedTxn.foreach(completedTxns += _)
   }
 

http://git-wip-us.apache.org/repos/asf/kafka/blob/7258a5fd/core/src/main/scala/kafka/log/ProducerStateManager.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/log/ProducerStateManager.scala 
b/core/src/main/scala/kafka/log/ProducerStateManager.scala
index 03c60e4..4b2cedb 100644
--- a/core/src/main/scala/kafka/log/ProducerStateManager.scala
+++ b/core/src/main/scala/kafka/log/ProducerStateManager.scala
@@ -76,29 +76,31 @@ private[log] class ProducerAppendInfo(val producerId: Long, 
initialEntry: Produc
   def this(pid: Long, initialEntry: Option[ProducerIdEntry], loadingFromLog: 
Boolean) =
     this(pid, initialEntry.getOrElse(ProducerIdEntry.Empty), loadingFromLog)
 
-  private def validateAppend(epoch: Short, firstSeq: Int, lastSeq: Int) = {
+  private def validateAppend(epoch: Short, firstSeq: Int, lastSeq: Int, 
shouldValidateSequenceNumbers: Boolean) = {
     if (this.producerEpoch > epoch) {
       throw new ProducerFencedException(s"Producer's epoch is no longer valid. 
There is probably another producer " +
         s"with a newer epoch. $epoch (request epoch), ${this.producerEpoch} 
(server epoch)")
-    } else if (this.producerEpoch == RecordBatch.NO_PRODUCER_EPOCH || 
this.producerEpoch < epoch) {
-      if (firstSeq != 0)
-        throw new OutOfOrderSequenceException(s"Invalid sequence number for 
new epoch: $epoch " +
-          s"(request epoch), $firstSeq (seq. number)")
-    } else if (this.firstSeq == RecordBatch.NO_SEQUENCE && firstSeq != 0) {
-      // the epoch was bumped by a control record, so we expect the sequence 
number to be reset
-      throw new OutOfOrderSequenceException(s"Out of order sequence number: 
$producerId (pid), found $firstSeq " +
-        s"(incoming seq. number), but expected 0")
-    } else if (firstSeq == this.firstSeq && lastSeq == this.lastSeq) {
-      throw new DuplicateSequenceNumberException(s"Duplicate sequence number: 
pid: $producerId, (incomingBatch.firstSeq, " +
-        s"incomingBatch.lastSeq): ($firstSeq, $lastSeq), (lastEntry.firstSeq, 
lastEntry.lastSeq): " +
-        s"(${this.firstSeq}, ${this.lastSeq}).")
-    } else if (firstSeq != this.lastSeq + 1L) {
-      throw new OutOfOrderSequenceException(s"Out of order sequence number: 
$producerId (pid), $firstSeq " +
-        s"(incoming seq. number), ${this.lastSeq} (current end sequence 
number)")
+    } else if (shouldValidateSequenceNumbers) {
+      if (this.producerEpoch == RecordBatch.NO_PRODUCER_EPOCH || 
this.producerEpoch < epoch) {
+        if (firstSeq != 0)
+          throw new OutOfOrderSequenceException(s"Invalid sequence number for 
new epoch: $epoch " +
+            s"(request epoch), $firstSeq (seq. number)")
+      } else if (this.firstSeq == RecordBatch.NO_SEQUENCE && firstSeq != 0) {
+        // the epoch was bumped by a control record, so we expect the sequence 
number to be reset
+        throw new OutOfOrderSequenceException(s"Out of order sequence number: 
$producerId (pid), found $firstSeq " +
+          s"(incoming seq. number), but expected 0")
+      } else if (firstSeq == this.firstSeq && lastSeq == this.lastSeq) {
+        throw new DuplicateSequenceNumberException(s"Duplicate sequence 
number: pid: $producerId, (incomingBatch.firstSeq, " +
+          s"incomingBatch.lastSeq): ($firstSeq, $lastSeq), 
(lastEntry.firstSeq, lastEntry.lastSeq): " +
+          s"(${this.firstSeq}, ${this.lastSeq}).")
+      } else if (firstSeq != this.lastSeq + 1L) {
+        throw new OutOfOrderSequenceException(s"Out of order sequence number: 
$producerId (pid), $firstSeq " +
+          s"(incoming seq. number), ${this.lastSeq} (current end sequence 
number)")
+      }
     }
   }
 
-  def append(batch: RecordBatch): Option[CompletedTxn] = {
+  def append(batch: RecordBatch, shouldValidateSequenceNumbers: Boolean = 
true): Option[CompletedTxn] = {
     if (batch.isControlBatch) {
       val record = batch.iterator.next()
       val endTxnMarker = EndTransactionMarker.deserialize(record)
@@ -106,7 +108,7 @@ private[log] class ProducerAppendInfo(val producerId: Long, 
initialEntry: Produc
       Some(completedTxn)
     } else {
       append(batch.producerEpoch, batch.baseSequence, batch.lastSequence, 
batch.maxTimestamp, batch.lastOffset,
-        batch.isTransactional)
+        batch.isTransactional, shouldValidateSequenceNumbers)
       None
     }
   }
@@ -116,11 +118,12 @@ private[log] class ProducerAppendInfo(val producerId: 
Long, initialEntry: Produc
              lastSeq: Int,
              lastTimestamp: Long,
              lastOffset: Long,
-             isTransactional: Boolean): Unit = {
+             isTransactional: Boolean,
+             shouldValidateSequenceNumbers: Boolean): Unit = {
     if (epoch != RecordBatch.NO_PRODUCER_EPOCH && !loadingFromLog)
       // skip validation if this is the first entry when loading from the log. 
Log retention
       // will generally have removed the beginning entries from each PID
-      validateAppend(epoch, firstSeq, lastSeq)
+      validateAppend(epoch, firstSeq, lastSeq, shouldValidateSequenceNumbers)
 
     this.producerEpoch = epoch
     this.firstSeq = firstSeq

http://git-wip-us.apache.org/repos/asf/kafka/blob/7258a5fd/core/src/main/scala/kafka/server/KafkaApis.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/server/KafkaApis.scala 
b/core/src/main/scala/kafka/server/KafkaApis.scala
index 150d16d..aaa2458 100644
--- a/core/src/main/scala/kafka/server/KafkaApis.scala
+++ b/core/src/main/scala/kafka/server/KafkaApis.scala
@@ -1433,8 +1433,23 @@ class KafkaApis(val requestChannel: RequestChannel,
       return
     }
 
-    def sendResponseCallback(pid: Long)(responseStatus: Map[TopicPartition, 
PartitionResponse]): Unit = {
+    def sendResponseCallback(pid: Long, result: 
TransactionResult)(responseStatus: Map[TopicPartition, PartitionResponse]): 
Unit = {
       errors.put(pid, responseStatus.mapValues(_.error).asJava)
+
+      val successfulPartitions = responseStatus.filter { case (_, 
partitionResponse) =>
+        partitionResponse.error == Errors.NONE
+      }.keys.toSeq
+
+      try {
+        groupCoordinator.handleTxnCompletion(producerId = pid, topicPartitions 
= successfulPartitions, transactionResult = result)
+      } catch {
+        case e: Exception =>
+          error(s"Received an exception while trying to update the offsets 
cache on transaction completion: $e")
+          val producerIdErrors = errors.get(pid)
+          successfulPartitions.foreach(producerIdErrors.put(_, Errors.UNKNOWN))
+      }
+
+
       if (numAppends.decrementAndGet() == 0)
         sendResponseExemptThrottle(request, new 
RequestChannel.Response(request, new WriteTxnMarkersResponse(errors)))
     }
@@ -1460,7 +1475,7 @@ class KafkaApis(val requestChannel: RequestChannel,
         internalTopicsAllowed = true,
         isFromClient = false,
         entriesPerPartition = controlRecords,
-        sendResponseCallback(producerId))
+        sendResponseCallback(producerId, marker.transactionResult))
     }
   }
 
@@ -1510,9 +1525,78 @@ class KafkaApis(val requestChannel: RequestChannel,
   }
 
   def handleTxnOffsetCommitRequest(request: RequestChannel.Request): Unit = {
-    val emptyResponse = new java.util.HashMap[TopicPartition, Errors]()
-    def createResponse(throttleTimeMs: Int): AbstractResponse = new 
TxnOffsetCommitResponse(throttleTimeMs, emptyResponse)
-    sendResponseMaybeThrottle(request, createResponse)
+    val header = request.header
+    val txnOffsetCommitRequest = request.body[TxnOffsetCommitRequest]
+    // reject the request if not authorized to the group
+    if (!authorize(request.session, Read, new Resource(Group, 
txnOffsetCommitRequest.consumerGroupId))) {
+      val error = Errors.GROUP_AUTHORIZATION_FAILED
+      val results = txnOffsetCommitRequest.offsets.keySet.asScala.map { 
topicPartition =>
+        (topicPartition, error)
+      }.toMap
+      def createResponse(throttleTimeMs: Int): AbstractResponse = new 
TxnOffsetCommitResponse(throttleTimeMs, results.asJava)
+      sendResponseMaybeThrottle(request, createResponse)
+    } else {
+      val (existingAndAuthorizedForDescribeTopics, 
nonExistingOrUnauthorizedForDescribeTopics) = 
txnOffsetCommitRequest.offsets.asScala.toMap.partition {
+        case (topicPartition, _) =>
+          val authorizedForDescribe = authorize(request.session, Describe, new 
Resource(Topic, topicPartition.topic))
+          val exists = metadataCache.contains(topicPartition.topic)
+          if (!authorizedForDescribe && exists)
+              debug(s"Transaction Offset commit request with correlation id 
${header.correlationId} from client ${header.clientId} " +
+                s"on partition $topicPartition failing due to user not having 
DESCRIBE authorization, but returning UNKNOWN_TOPIC_OR_PARTITION")
+          authorizedForDescribe && exists
+      }
+
+      val (authorizedTopics, unauthorizedForReadTopics) = 
existingAndAuthorizedForDescribeTopics.partition {
+        case (topicPartition, _) => authorize(request.session, Read, new 
Resource(Topic, topicPartition.topic))
+      }
+
+      // the callback for sending an offset commit response
+      def sendResponseCallback(commitStatus: immutable.Map[TopicPartition, 
Errors]) {
+        val combinedCommitStatus = commitStatus ++
+          unauthorizedForReadTopics.mapValues(_ => 
Errors.TOPIC_AUTHORIZATION_FAILED) ++
+          nonExistingOrUnauthorizedForDescribeTopics.mapValues(_ => 
Errors.UNKNOWN_TOPIC_OR_PARTITION)
+
+        if (isDebugEnabled)
+          combinedCommitStatus.foreach { case (topicPartition, error) =>
+            if (error != Errors.NONE) {
+              debug(s"TxnOffsetCommit request with correlation id 
${header.correlationId} from client ${header.clientId} " +
+                s"on partition $topicPartition failed due to 
${error.exceptionName}")
+            }
+          }
+        def createResponse(throttleTimeMs: Int): AbstractResponse = new 
TxnOffsetCommitResponse(throttleTimeMs, combinedCommitStatus.asJava)
+        sendResponseMaybeThrottle(request, createResponse)
+      }
+
+      if (authorizedTopics.isEmpty)
+        sendResponseCallback(Map.empty)
+      else {
+        val offsetRetention = groupCoordinator.offsetConfig.offsetsRetentionMs
+
+        // commit timestamp is always set to now.
+        // "default" expiration timestamp is now + retention (and retention 
may be overridden if v2)
+        val currentTimestamp = time.milliseconds
+        val defaultExpireTimestamp = offsetRetention + currentTimestamp
+        val partitionData = authorizedTopics.mapValues { partitionData =>
+          val metadata = if (partitionData.metadata == null) 
OffsetMetadata.NoMetadata else partitionData.metadata
+          new OffsetAndMetadata(
+            offsetMetadata = OffsetMetadata(partitionData.offset, metadata),
+            commitTimestamp = currentTimestamp,
+            expireTimestamp = defaultExpireTimestamp
+          )
+        }
+
+        // call coordinator to handle commit offset
+        groupCoordinator.handleCommitOffsets(
+          txnOffsetCommitRequest.consumerGroupId,
+          null,
+          -1,
+          partitionData,
+          sendResponseCallback,
+          txnOffsetCommitRequest.producerId,
+          txnOffsetCommitRequest.producerEpoch)
+      }
+    }
+
   }
 
   def handleOffsetForLeaderEpochRequest(request: RequestChannel.Request): Unit 
= {

http://git-wip-us.apache.org/repos/asf/kafka/blob/7258a5fd/core/src/test/scala/unit/kafka/coordinator/group/GroupCoordinatorResponseTest.scala
----------------------------------------------------------------------
diff --git 
a/core/src/test/scala/unit/kafka/coordinator/group/GroupCoordinatorResponseTest.scala
 
b/core/src/test/scala/unit/kafka/coordinator/group/GroupCoordinatorResponseTest.scala
index 2dfbf48..bfc9be2 100644
--- 
a/core/src/test/scala/unit/kafka/coordinator/group/GroupCoordinatorResponseTest.scala
+++ 
b/core/src/test/scala/unit/kafka/coordinator/group/GroupCoordinatorResponseTest.scala
@@ -24,9 +24,9 @@ import kafka.utils._
 import kafka.utils.timer.MockTimer
 import org.apache.kafka.common.TopicPartition
 import org.apache.kafka.common.protocol.Errors
-import org.apache.kafka.common.record.{RecordBatch, MemoryRecords}
+import org.apache.kafka.common.record.{RecordBatch, MemoryRecords, 
TimestampType}
 import org.apache.kafka.common.requests.ProduceResponse.PartitionResponse
-import org.apache.kafka.common.requests.{JoinGroupRequest, 
OffsetCommitRequest, OffsetFetchResponse}
+import org.apache.kafka.common.requests.{JoinGroupRequest, 
OffsetCommitRequest, OffsetFetchResponse, TransactionResult}
 import org.easymock.{Capture, EasyMock, IAnswer}
 import java.util.concurrent.TimeUnit
 
@@ -768,6 +768,223 @@ class GroupCoordinatorResponseTest extends JUnitSuite {
   }
 
   @Test
+  def testBasicFetchTxnOffsets() {
+    val tp = new TopicPartition("topic", 0)
+    val offset = OffsetAndMetadata(0)
+    val producerId = 1000L
+    val producerEpoch : Short = 2
+
+    val commitOffsetResult = commitTransactionalOffsets(groupId, producerId, 
producerEpoch, immutable.Map(tp -> offset))
+    assertEquals(Errors.NONE, commitOffsetResult(tp))
+
+    val (error, partitionData) = groupCoordinator.handleFetchOffsets(groupId, 
Some(Seq(tp)))
+
+    // Validate that the offset isn't materialjzed yet.
+    assertEquals(Errors.NONE, error)
+    assertEquals(Some(OffsetFetchResponse.INVALID_OFFSET), 
partitionData.get(tp).map(_.offset))
+
+    val offsetsTopic = new TopicPartition(Topic.GroupMetadataTopicName, 
groupPartitionId)
+
+    // Send commit marker.
+    groupCoordinator.handleTxnCompletion(producerId, List(offsetsTopic), 
TransactionResult.COMMIT)
+
+    // Validate that committed offset is materialized.
+    val (secondReqError, secondReqPartitionData) = 
groupCoordinator.handleFetchOffsets(groupId, Some(Seq(tp)))
+    assertEquals(Errors.NONE, secondReqError)
+    assertEquals(Some(0), secondReqPartitionData.get(tp).map(_.offset))
+  }
+
+  @Test
+  def testFetchTxnOffsetsWithAbort() {
+    val tp = new TopicPartition("topic", 0)
+    val offset = OffsetAndMetadata(0)
+    val producerId = 1000L
+    val producerEpoch : Short = 2
+
+    val commitOffsetResult = commitTransactionalOffsets(groupId, producerId, 
producerEpoch, immutable.Map(tp -> offset))
+    assertEquals(Errors.NONE, commitOffsetResult(tp))
+
+    val (error, partitionData) = groupCoordinator.handleFetchOffsets(groupId, 
Some(Seq(tp)))
+    assertEquals(Errors.NONE, error)
+    assertEquals(Some(OffsetFetchResponse.INVALID_OFFSET), 
partitionData.get(tp).map(_.offset))
+
+    val offsetsTopic = new TopicPartition(Topic.GroupMetadataTopicName, 
groupPartitionId)
+
+    // Validate that the pending commit is discarded.
+    groupCoordinator.handleTxnCompletion(producerId, List(offsetsTopic), 
TransactionResult.ABORT)
+
+    val (secondReqError, secondReqPartitionData) = 
groupCoordinator.handleFetchOffsets(groupId, Some(Seq(tp)))
+    assertEquals(Errors.NONE, secondReqError)
+    assertEquals(Some(OffsetFetchResponse.INVALID_OFFSET), 
secondReqPartitionData.get(tp).map(_.offset))
+  }
+
+  @Test
+  def testFetchTxnOffsetsIgnoreSpuriousCommit() {
+    val tp = new TopicPartition("topic", 0)
+    val offset = OffsetAndMetadata(0)
+    val producerId = 1000L
+    val producerEpoch : Short = 2
+
+    val commitOffsetResult = commitTransactionalOffsets(groupId, producerId, 
producerEpoch, immutable.Map(tp -> offset))
+    assertEquals(Errors.NONE, commitOffsetResult(tp))
+
+    val (error, partitionData) = groupCoordinator.handleFetchOffsets(groupId, 
Some(Seq(tp)))
+    assertEquals(Errors.NONE, error)
+    assertEquals(Some(OffsetFetchResponse.INVALID_OFFSET), 
partitionData.get(tp).map(_.offset))
+
+    val offsetsTopic = new TopicPartition(Topic.GroupMetadataTopicName, 
groupPartitionId)
+    groupCoordinator.handleTxnCompletion(producerId, List(offsetsTopic), 
TransactionResult.ABORT)
+
+    val (secondReqError, secondReqPartitionData) = 
groupCoordinator.handleFetchOffsets(groupId, Some(Seq(tp)))
+    assertEquals(Errors.NONE, secondReqError)
+    assertEquals(Some(OffsetFetchResponse.INVALID_OFFSET), 
secondReqPartitionData.get(tp).map(_.offset))
+
+    // Ignore spurious commit.
+    groupCoordinator.handleTxnCompletion(producerId, List(offsetsTopic), 
TransactionResult.COMMIT)
+
+    val (thirdReqError, thirdReqPartitionData) = 
groupCoordinator.handleFetchOffsets(groupId, Some(Seq(tp)))
+    assertEquals(Errors.NONE, secondReqError)
+    assertEquals(Some(OffsetFetchResponse.INVALID_OFFSET), 
thirdReqPartitionData.get(tp).map(_.offset))
+  }
+
+  @Test
+  def testFetchTxnOffsetsOneProducerMultipleGroups() {
+    // One producer, two groups located on separate offsets topic partitions.
+    // Both group have pending offset commits.
+    // Marker for only one partition is received. That commit should be 
materialized while the other should not.
+
+    val partitions = List(new TopicPartition("topic1", 0), new 
TopicPartition("topic2", 0))
+    val offsets = List(OffsetAndMetadata(10), OffsetAndMetadata(15))
+    val producerId = 1000L
+    val producerEpoch: Short = 3
+
+    val groupIds = List(groupId, otherGroupId)
+    val offsetTopicPartitions = List(new 
TopicPartition(Topic.GroupMetadataTopicName, 
groupCoordinator.partitionFor(groupId)),
+      new TopicPartition(Topic.GroupMetadataTopicName, 
groupCoordinator.partitionFor(otherGroupId)))
+
+    
groupCoordinator.groupManager.addPartitionOwnership(offsetTopicPartitions(1).partition)
+    val errors = mutable.ArrayBuffer[Errors]()
+    val partitionData = mutable.ArrayBuffer[Map[TopicPartition, 
OffsetFetchResponse.PartitionData]]()
+
+    val commitOffsetResults = mutable.ArrayBuffer[CommitOffsetCallbackParams]()
+
+    // Ensure that the two groups map to different partitions.
+    assertNotEquals(offsetTopicPartitions(0), offsetTopicPartitions(1))
+
+    commitOffsetResults.append(commitTransactionalOffsets(groupId, producerId, 
producerEpoch, immutable.Map(partitions(0) -> offsets(0))))
+    assertEquals(Errors.NONE, commitOffsetResults(0)(partitions(0)))
+    commitOffsetResults.append(commitTransactionalOffsets(otherGroupId, 
producerId, producerEpoch, immutable.Map(partitions(1) -> offsets(1))))
+    assertEquals(Errors.NONE, commitOffsetResults(1)(partitions(1)))
+
+    // We got a commit for only one __consumer_offsets partition. We should 
only materialize it's group offsets.
+    groupCoordinator.handleTxnCompletion(producerId, 
List(offsetTopicPartitions(0)), TransactionResult.COMMIT)
+    groupCoordinator.handleFetchOffsets(groupIds(0), Some(partitions)) match {
+      case (error, partData) =>
+        errors.append(error)
+        partitionData.append(partData)
+      case _ =>
+    }
+
+     groupCoordinator.handleFetchOffsets(groupIds(1), Some(partitions)) match {
+      case (error, partData) =>
+        errors.append(error)
+        partitionData.append(partData)
+      case _ =>
+    }
+
+    assertEquals(2, errors.size)
+    assertEquals(Errors.NONE, errors(0))
+    assertEquals(Errors.NONE, errors(1))
+
+    // Exactly one offset commit should have been materialized.
+    assertEquals(Some(offsets(0).offset), 
partitionData(0).get(partitions(0)).map(_.offset))
+    assertEquals(Some(OffsetFetchResponse.INVALID_OFFSET), 
partitionData(0).get(partitions(1)).map(_.offset))
+    assertEquals(Some(OffsetFetchResponse.INVALID_OFFSET), 
partitionData(1).get(partitions(0)).map(_.offset))
+    assertEquals(Some(OffsetFetchResponse.INVALID_OFFSET), 
partitionData(1).get(partitions(1)).map(_.offset))
+
+    // Now we receive the other marker.
+    groupCoordinator.handleTxnCompletion(producerId, 
List(offsetTopicPartitions(1)), TransactionResult.COMMIT)
+    errors.clear()
+    partitionData.clear()
+    groupCoordinator.handleFetchOffsets(groupIds(0), Some(partitions)) match {
+      case (error, partData) =>
+        errors.append(error)
+        partitionData.append(partData)
+      case _ =>
+    }
+
+     groupCoordinator.handleFetchOffsets(groupIds(1), Some(partitions)) match {
+      case (error, partData) =>
+        errors.append(error)
+        partitionData.append(partData)
+      case _ =>
+    }
+    // Two offsets should have been materialized
+    assertEquals(Some(offsets(0).offset), 
partitionData(0).get(partitions(0)).map(_.offset))
+    assertEquals(Some(OffsetFetchResponse.INVALID_OFFSET), 
partitionData(0).get(partitions(1)).map(_.offset))
+    assertEquals(Some(OffsetFetchResponse.INVALID_OFFSET), 
partitionData(1).get(partitions(0)).map(_.offset))
+    assertEquals(Some(offsets(1).offset), 
partitionData(1).get(partitions(1)).map(_.offset))
+  }
+
+  @Test
+  def testFetchTxnOffsetsMultipleProducersOneGroup() {
+    // One group, two producers
+    // Different producers will commit offsets for different partitions.
+    // Each partition's offsets should be materialized when the corresponding 
producer's marker is received.
+
+    val partitions = List(new TopicPartition("topic1", 0), new 
TopicPartition("topic2", 0))
+    val offsets = List(OffsetAndMetadata(10), OffsetAndMetadata(15))
+    val producerIds = List(1000L, 1005L)
+    val producerEpochs: Seq[Short] = List(3, 4)
+
+    val offsetTopicPartition = new 
TopicPartition(Topic.GroupMetadataTopicName, 
groupCoordinator.partitionFor(groupId))
+
+    val errors = mutable.ArrayBuffer[Errors]()
+    val partitionData = mutable.ArrayBuffer[Map[TopicPartition, 
OffsetFetchResponse.PartitionData]]()
+
+    val commitOffsetResults = mutable.ArrayBuffer[CommitOffsetCallbackParams]()
+
+    // producer0 commits the offsets for partition0
+    commitOffsetResults.append(commitTransactionalOffsets(groupId, 
producerIds(0), producerEpochs(0), immutable.Map(partitions(0) -> offsets(0))))
+    assertEquals(Errors.NONE, commitOffsetResults(0)(partitions(0)))
+
+    // producer1 commits the offsets for partition1
+    commitOffsetResults.append(commitTransactionalOffsets(groupId, 
producerIds(1), producerEpochs(1), immutable.Map(partitions(1) -> offsets(1))))
+    assertEquals(Errors.NONE, commitOffsetResults(1)(partitions(1)))
+
+    // producer0 commits its transaction.
+    groupCoordinator.handleTxnCompletion(producerIds(0), 
List(offsetTopicPartition), TransactionResult.COMMIT)
+    groupCoordinator.handleFetchOffsets(groupId, Some(partitions)) match {
+      case (error, partData) =>
+        errors.append(error)
+        partitionData.append(partData)
+      case _ =>
+    }
+
+    assertEquals(Errors.NONE, errors(0))
+
+    // We should only see the offset commit for producer0
+    assertEquals(Some(offsets(0).offset), 
partitionData(0).get(partitions(0)).map(_.offset))
+    assertEquals(Some(OffsetFetchResponse.INVALID_OFFSET), 
partitionData(0).get(partitions(1)).map(_.offset))
+
+    // producer1 now commits its transaction.
+    groupCoordinator.handleTxnCompletion(producerIds(1), 
List(offsetTopicPartition), TransactionResult.COMMIT)
+
+    groupCoordinator.handleFetchOffsets(groupId, Some(partitions)) match {
+      case (error, partData) =>
+        errors.append(error)
+        partitionData.append(partData)
+      case _ =>
+    }
+
+    assertEquals(Errors.NONE, errors(1))
+
+    // We should now see the offset commits for both producers.
+    assertEquals(Some(offsets(0).offset), 
partitionData(1).get(partitions(0)).map(_.offset))
+    assertEquals(Some(offsets(1).offset), 
partitionData(1).get(partitions(1)).map(_.offset))
+  }
+
+  @Test
   def testFetchOffsetForUnknownPartition(): Unit = {
     val tp = new TopicPartition("topic", 0)
     val (error, partitionData) = groupCoordinator.handleFetchOffsets(groupId, 
Some(Seq(tp)))
@@ -1222,6 +1439,34 @@ class GroupCoordinatorResponseTest extends JUnitSuite {
     Await.result(responseFuture, Duration(40, TimeUnit.MILLISECONDS))
   }
 
+  private def commitTransactionalOffsets(groupId: String,
+                                         producerId: Long,
+                                         producerEpoch: Short,
+                                         offsets: 
immutable.Map[TopicPartition, OffsetAndMetadata]): CommitOffsetCallbackParams = 
{
+    val (responseFuture, responseCallback) = setupCommitOffsetsCallback
+
+    val capturedArgument: Capture[Map[TopicPartition, PartitionResponse] => 
Unit] = EasyMock.newCapture()
+
+    EasyMock.expect(replicaManager.appendRecords(EasyMock.anyLong(),
+      EasyMock.anyShort(),
+      internalTopicsAllowed = EasyMock.eq(true),
+      isFromClient = EasyMock.eq(false),
+      EasyMock.anyObject().asInstanceOf[Map[TopicPartition, MemoryRecords]],
+      EasyMock.capture(capturedArgument))).andAnswer(new IAnswer[Unit] {
+      override def answer = capturedArgument.getValue.apply(
+        Map(new TopicPartition(Topic.GroupMetadataTopicName, 
groupCoordinator.partitionFor(groupId)) ->
+          new PartitionResponse(Errors.NONE, 0L, RecordBatch.NO_TIMESTAMP)
+        )
+      )})
+    
EasyMock.expect(replicaManager.getMagic(EasyMock.anyObject())).andReturn(Some(RecordBatch.MAGIC_VALUE_V2)).anyTimes()
+    EasyMock.replay(replicaManager)
+
+    groupCoordinator.handleCommitOffsets(groupId, null, -1, offsets, 
responseCallback, producerId, producerEpoch)
+    val result = Await.result(responseFuture, Duration(40, 
TimeUnit.MILLISECONDS))
+    EasyMock.reset(replicaManager)
+    result
+  }
+
   private def leaveGroup(groupId: String, consumerId: String): 
LeaveGroupCallbackParams = {
     val (responseFuture, responseCallback) = setupHeartbeatCallback
 

http://git-wip-us.apache.org/repos/asf/kafka/blob/7258a5fd/core/src/test/scala/unit/kafka/coordinator/group/GroupMetadataManagerTest.scala
----------------------------------------------------------------------
diff --git 
a/core/src/test/scala/unit/kafka/coordinator/group/GroupMetadataManagerTest.scala
 
b/core/src/test/scala/unit/kafka/coordinator/group/GroupMetadataManagerTest.scala
index 9053e0a..ce3b997 100644
--- 
a/core/src/test/scala/unit/kafka/coordinator/group/GroupMetadataManagerTest.scala
+++ 
b/core/src/test/scala/unit/kafka/coordinator/group/GroupMetadataManagerTest.scala
@@ -109,6 +109,282 @@ class GroupMetadataManagerTest {
   }
 
   @Test
+  def testLoadTransactionalOffsetsWithoutGroup() {
+    val groupMetadataTopicPartition = new 
TopicPartition(Topic.GroupMetadataTopicName, groupPartitionId)
+    val producerId = 1000L
+    val producerEpoch: Short = 2
+
+    val committedOffsets = Map(
+      new TopicPartition("foo", 0) -> 23L,
+      new TopicPartition("foo", 1) -> 455L,
+      new TopicPartition("bar", 0) -> 8992L
+    )
+
+    val buffer = ByteBuffer.allocate(1024)
+    var nextOffset = 0
+    nextOffset += appendTransactionalOffsetCommits(buffer, producerId, 
producerEpoch, nextOffset, committedOffsets)
+    nextOffset += completeTransactionalOffsetCommit(buffer, producerId, 
producerEpoch, nextOffset, isCommit = true)
+    buffer.flip()
+
+    val records = MemoryRecords.readableRecords(buffer)
+    expectGroupMetadataLoad(groupMetadataTopicPartition, 0, records)
+
+    EasyMock.replay(replicaManager)
+
+    groupMetadataManager.loadGroupsAndOffsets(groupMetadataTopicPartition, _ 
=> ())
+
+    val group = groupMetadataManager.getGroup(groupId).getOrElse(fail("Group 
was not loaded into the cache"))
+    assertEquals(groupId, group.groupId)
+    assertEquals(Empty, group.currentState)
+    assertEquals(committedOffsets.size, group.allOffsets.size)
+    committedOffsets.foreach { case (topicPartition, offset) =>
+      assertEquals(Some(offset), group.offset(topicPartition).map(_.offset))
+    }
+  }
+
+  @Test
+  def testDoNotLoadAbortedTransactionalOffsetCommits() {
+    val groupMetadataTopicPartition = new 
TopicPartition(Topic.GroupMetadataTopicName, groupPartitionId)
+    val producerId = 1000L
+    val producerEpoch: Short = 2
+
+    val abortedOffsets = Map(
+      new TopicPartition("foo", 0) -> 23L,
+      new TopicPartition("foo", 1) -> 455L,
+      new TopicPartition("bar", 0) -> 8992L
+    )
+
+    val buffer = ByteBuffer.allocate(1024)
+    var nextOffset = 0
+    nextOffset += appendTransactionalOffsetCommits(buffer, producerId, 
producerEpoch, nextOffset, abortedOffsets)
+    nextOffset += completeTransactionalOffsetCommit(buffer, producerId, 
producerEpoch, nextOffset, isCommit = false)
+    buffer.flip()
+
+    val records = MemoryRecords.readableRecords(buffer)
+    expectGroupMetadataLoad(groupMetadataTopicPartition, 0, records)
+
+    EasyMock.replay(replicaManager)
+
+    groupMetadataManager.loadGroupsAndOffsets(groupMetadataTopicPartition, _ 
=> ())
+
+    // Since there are no committed offsets for the group, and there is no 
other group metadata, we don't expect the
+    // group to be loaded.
+    assertEquals(None, groupMetadataManager.getGroup(groupId))
+  }
+
+  @Test
+  def testGroupLoadedWithPendingCommits(): Unit = {
+    val groupMetadataTopicPartition = new 
TopicPartition(Topic.GroupMetadataTopicName, groupPartitionId)
+    val producerId = 1000L
+    val producerEpoch: Short = 2
+
+    val pendingOffsets = Map(
+      new TopicPartition("foo", 0) -> 23L,
+      new TopicPartition("foo", 1) -> 455L,
+      new TopicPartition("bar", 0) -> 8992L
+    )
+
+    val buffer = ByteBuffer.allocate(1024)
+    var nextOffset = 0
+    nextOffset += appendTransactionalOffsetCommits(buffer, producerId, 
producerEpoch, nextOffset, pendingOffsets)
+    buffer.flip()
+
+    val records = MemoryRecords.readableRecords(buffer)
+    expectGroupMetadataLoad(groupMetadataTopicPartition, 0, records)
+
+    EasyMock.replay(replicaManager)
+
+    groupMetadataManager.loadGroupsAndOffsets(groupMetadataTopicPartition, _ 
=> ())
+
+    // The group should be loaded with pending offsets.
+    val group = groupMetadataManager.getGroup(groupId).getOrElse(fail("Group 
was not loaded into the cache"))
+    assertEquals(groupId, group.groupId)
+    assertEquals(Empty, group.currentState)
+    // Ensure that no offsets are materialized, but that we have offsets 
pending.
+    assertEquals(0, group.allOffsets.size)
+    assertTrue(group.hasOffsets)
+    assertTrue(group.hasPendingOffsetCommitsFromProducer(producerId))
+  }
+
+  @Test
+  def testLoadWithCommittedAndAbortedTransactionalOffsetCommits() {
+    // A test which loads a log with a mix of committed and aborted 
transactional offset committed messages.
+    val groupMetadataTopicPartition = new 
TopicPartition(Topic.GroupMetadataTopicName, groupPartitionId)
+    val producerId = 1000L
+    val producerEpoch: Short = 2
+
+    val committedOffsets = Map(
+      new TopicPartition("foo", 0) -> 23L,
+      new TopicPartition("foo", 1) -> 455L,
+      new TopicPartition("bar", 0) -> 8992L
+    )
+
+    val abortedOffsets = Map(
+      new TopicPartition("foo", 2) -> 231L,
+      new TopicPartition("foo", 3) -> 4551L,
+      new TopicPartition("bar", 1) -> 89921L
+    )
+
+    val buffer = ByteBuffer.allocate(1024)
+    var nextOffset = 0
+    nextOffset += appendTransactionalOffsetCommits(buffer, producerId, 
producerEpoch, nextOffset, abortedOffsets)
+    nextOffset += completeTransactionalOffsetCommit(buffer, producerId, 
producerEpoch, nextOffset, isCommit = false)
+    nextOffset += appendTransactionalOffsetCommits(buffer, producerId, 
producerEpoch, nextOffset, committedOffsets)
+    nextOffset += completeTransactionalOffsetCommit(buffer, producerId, 
producerEpoch, nextOffset, isCommit = true)
+    buffer.flip()
+
+    val records = MemoryRecords.readableRecords(buffer)
+    expectGroupMetadataLoad(groupMetadataTopicPartition, 0, records)
+
+    EasyMock.replay(replicaManager)
+
+    groupMetadataManager.loadGroupsAndOffsets(groupMetadataTopicPartition, _ 
=> ())
+
+    val group = groupMetadataManager.getGroup(groupId).getOrElse(fail("Group 
was not loaded into the cache"))
+    assertEquals(groupId, group.groupId)
+    assertEquals(Empty, group.currentState)
+    // Ensure that only the committed offsets are materialized, and that there 
are no pending commits for the producer.
+    // This allows us to be certain that the aborted offset commits are truly 
discarded.
+    assertEquals(committedOffsets.size, group.allOffsets.size)
+    committedOffsets.foreach { case (topicPartition, offset) =>
+      assertEquals(Some(offset), group.offset(topicPartition).map(_.offset))
+    }
+    assertFalse(group.hasPendingOffsetCommitsFromProducer(producerId))
+  }
+
+  @Test
+  def testLoadWithCommittedAndAbortedAndPendingTransactionalOffsetCommits() {
+    val groupMetadataTopicPartition = new 
TopicPartition(Topic.GroupMetadataTopicName, groupPartitionId)
+    val producerId = 1000L
+    val producerEpoch: Short = 2
+
+    val committedOffsets = Map(
+      new TopicPartition("foo", 0) -> 23L,
+      new TopicPartition("foo", 1) -> 455L,
+      new TopicPartition("bar", 0) -> 8992L
+    )
+
+    val abortedOffsets = Map(
+      new TopicPartition("foo", 2) -> 231L,
+      new TopicPartition("foo", 3) -> 4551L,
+      new TopicPartition("bar", 1) -> 89921L
+    )
+
+    val pendingOffsets = Map(
+      new TopicPartition("foo", 3) -> 2312L,
+      new TopicPartition("foo", 4) -> 45512L,
+      new TopicPartition("bar", 2) -> 899212L
+    )
+
+    val buffer = ByteBuffer.allocate(1024)
+    var nextOffset = 0
+    nextOffset += appendTransactionalOffsetCommits(buffer, producerId, 
producerEpoch, nextOffset, committedOffsets)
+    nextOffset += completeTransactionalOffsetCommit(buffer, producerId, 
producerEpoch, nextOffset, isCommit = true)
+    nextOffset += appendTransactionalOffsetCommits(buffer, producerId, 
producerEpoch, nextOffset, abortedOffsets)
+    nextOffset += completeTransactionalOffsetCommit(buffer, producerId, 
producerEpoch, nextOffset, isCommit = false)
+    nextOffset += appendTransactionalOffsetCommits(buffer, producerId, 
producerEpoch, nextOffset, pendingOffsets)
+    buffer.flip()
+
+    val records = MemoryRecords.readableRecords(buffer)
+    expectGroupMetadataLoad(groupMetadataTopicPartition, 0, records)
+
+    EasyMock.replay(replicaManager)
+
+    groupMetadataManager.loadGroupsAndOffsets(groupMetadataTopicPartition, _ 
=> ())
+
+    val group = groupMetadataManager.getGroup(groupId).getOrElse(fail("Group 
was not loaded into the cache"))
+    assertEquals(groupId, group.groupId)
+    assertEquals(Empty, group.currentState)
+
+    // Ensure that only the committed offsets are materialized, and that there 
are no pending commits for the producer.
+    // This allows us to be certain that the aborted offset commits are truly 
discarded.
+    assertEquals(committedOffsets.size, group.allOffsets.size)
+    committedOffsets.foreach { case (topicPartition, offset) =>
+      assertEquals(Some(offset), group.offset(topicPartition).map(_.offset))
+    }
+
+    // We should have pending commits.
+    assertTrue(group.hasPendingOffsetCommitsFromProducer(producerId))
+
+    // The loaded pending commits should materialize after a commit marker 
comes in.
+    groupMetadataManager.handleTxnCompletion(producerId, 
List(groupMetadataTopicPartition.partition).toSet, isCommit = true)
+    assertFalse(group.hasPendingOffsetCommitsFromProducer(producerId))
+    pendingOffsets.foreach { case (topicPartition, offset) =>
+      assertEquals(Some(offset), group.offset(topicPartition).map(_.offset))
+    }
+  }
+
+  @Test
+  def testLoadTransactionalOffsetCommitsFromMultipleProducers(): Unit = {
+    val groupMetadataTopicPartition = new 
TopicPartition(Topic.GroupMetadataTopicName, groupPartitionId)
+    val firstProducerId = 1000L
+    val firstProducerEpoch: Short = 2
+    val secondProducerId = 1001L
+    val secondProducerEpoch: Short = 3
+
+    val committedOffsetsFirstProducer = Map(
+      new TopicPartition("foo", 0) -> 23L,
+      new TopicPartition("foo", 1) -> 455L,
+      new TopicPartition("bar", 0) -> 8992L
+    )
+
+    val committedOffsetsSecondProducer = Map(
+      new TopicPartition("foo", 2) -> 231L,
+      new TopicPartition("foo", 3) -> 4551L,
+      new TopicPartition("bar", 1) -> 89921L
+    )
+
+    val buffer = ByteBuffer.allocate(1024)
+    var nextOffset = 0
+    nextOffset += appendTransactionalOffsetCommits(buffer, firstProducerId, 
firstProducerEpoch, nextOffset, committedOffsetsFirstProducer)
+    nextOffset += completeTransactionalOffsetCommit(buffer, firstProducerId, 
firstProducerEpoch, nextOffset, isCommit = true)
+    nextOffset += appendTransactionalOffsetCommits(buffer, secondProducerId, 
secondProducerEpoch, nextOffset, committedOffsetsSecondProducer)
+    nextOffset += completeTransactionalOffsetCommit(buffer, secondProducerId, 
secondProducerEpoch, nextOffset, isCommit = true)
+    buffer.flip()
+
+    val records = MemoryRecords.readableRecords(buffer)
+    expectGroupMetadataLoad(groupMetadataTopicPartition, 0, records)
+
+    EasyMock.replay(replicaManager)
+
+    groupMetadataManager.loadGroupsAndOffsets(groupMetadataTopicPartition, _ 
=> ())
+
+    val group = groupMetadataManager.getGroup(groupId).getOrElse(fail("Group 
was not loaded into the cache"))
+    assertEquals(groupId, group.groupId)
+    assertEquals(Empty, group.currentState)
+
+    // Ensure that only the committed offsets are materialized, and that there 
are no pending commits for the producer.
+    // This allows us to be certain that the aborted offset commits are truly 
discarded.
+    assertEquals(committedOffsetsFirstProducer.size + 
committedOffsetsSecondProducer.size, group.allOffsets.size)
+    committedOffsetsFirstProducer.foreach { case (topicPartition, offset) =>
+      assertEquals(Some(offset), group.offset(topicPartition).map(_.offset))
+    }
+    committedOffsetsSecondProducer.foreach { case (topicPartition, offset) =>
+      assertEquals(Some(offset), group.offset(topicPartition).map(_.offset))
+    }
+  }
+
+  private def appendTransactionalOffsetCommits(buffer: ByteBuffer, producerId: 
Long, producerEpoch: Short,
+                                               baseOffset: Long, offsets: 
Map[TopicPartition, Long]): Int = {
+    val builder = MemoryRecords.builder(buffer, CompressionType.NONE, 
baseOffset, producerId, producerEpoch, 0, true)
+    val commitRecords = createCommittedOffsetRecords(offsets)
+    commitRecords.foreach(builder.append)
+    builder.build()
+    offsets.size
+  }
+
+  private def completeTransactionalOffsetCommit(buffer: ByteBuffer, 
producerId: Long, producerEpoch: Short, baseOffset: Long,
+                                                isCommit: Boolean): Int = {
+    val builder = MemoryRecords.builder(buffer, RecordBatch.MAGIC_VALUE_V2, 
CompressionType.NONE,
+      TimestampType.LOG_APPEND_TIME, baseOffset, time.milliseconds(), 
producerId, producerEpoch, 0, true, true,
+      RecordBatch.NO_PARTITION_LEADER_EPOCH)
+    val controlRecordType = if (isCommit) ControlRecordType.COMMIT else 
ControlRecordType.ABORT
+    builder.appendEndTxnMarker(time.milliseconds(), new 
EndTransactionMarker(controlRecordType, 0))
+    builder.build()
+    1
+  }
+
+  @Test
   def testLoadOffsetsWithTombstones() {
     val groupMetadataTopicPartition = new 
TopicPartition(Topic.GroupMetadataTopicName, groupPartitionId)
     val startOffset = 15L

http://git-wip-us.apache.org/repos/asf/kafka/blob/7258a5fd/core/src/test/scala/unit/kafka/log/ProducerStateManagerTest.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/unit/kafka/log/ProducerStateManagerTest.scala 
b/core/src/test/scala/unit/kafka/log/ProducerStateManagerTest.scala
index e8c918d..353642b 100644
--- a/core/src/test/scala/unit/kafka/log/ProducerStateManagerTest.scala
+++ b/core/src/test/scala/unit/kafka/log/ProducerStateManagerTest.scala
@@ -128,7 +128,8 @@ class ProducerStateManagerTest extends JUnitSuite {
     val offset = 992342L
     val seq = 0
     val producerAppendInfo = new ProducerAppendInfo(pid, None, false)
-    producerAppendInfo.append(producerEpoch, seq, seq, time.milliseconds(), 
offset, isTransactional = true)
+    producerAppendInfo.append(producerEpoch, seq, seq, time.milliseconds(), 
offset, isTransactional = true,
+      shouldValidateSequenceNumbers = true)
 
     val logOffsetMetadata = new LogOffsetMetadata(messageOffset = offset, 
segmentBaseOffset = 990000L,
       relativePositionInSegment = 234224)
@@ -144,7 +145,8 @@ class ProducerStateManagerTest extends JUnitSuite {
     val offset = 992342L
     val seq = 0
     val producerAppendInfo = new ProducerAppendInfo(pid, None, false)
-    producerAppendInfo.append(producerEpoch, seq, seq, time.milliseconds(), 
offset, isTransactional = true)
+    producerAppendInfo.append(producerEpoch, seq, seq, time.milliseconds(), 
offset, isTransactional = true,
+      shouldValidateSequenceNumbers = true)
 
     // use some other offset to simulate a follower append where the log 
offset metadata won't typically
     // match any of the transaction first offsets
@@ -164,7 +166,8 @@ class ProducerStateManagerTest extends JUnitSuite {
     append(idMapping, pid, 0, producerEpoch, offset)
 
     val appendInfo = new ProducerAppendInfo(pid, idMapping.lastEntry(pid), 
loadingFromLog = false)
-    appendInfo.append(producerEpoch, 1, 5, time.milliseconds(), 20L, 
isTransactional = true)
+    appendInfo.append(producerEpoch, 1, 5, time.milliseconds(), 20L, 
isTransactional = true,
+      shouldValidateSequenceNumbers = true)
     var lastEntry = appendInfo.lastEntry
     assertEquals(producerEpoch, lastEntry.producerEpoch)
     assertEquals(1, lastEntry.firstSeq)
@@ -174,7 +177,8 @@ class ProducerStateManagerTest extends JUnitSuite {
     assertEquals(Some(16L), lastEntry.currentTxnFirstOffset)
     assertEquals(List(new TxnMetadata(pid, 16L)), 
appendInfo.startedTransactions)
 
-    appendInfo.append(producerEpoch, 6, 10, time.milliseconds(), 30L, 
isTransactional = true)
+    appendInfo.append(producerEpoch, 6, 10, time.milliseconds(), 30L, 
isTransactional = true,
+      shouldValidateSequenceNumbers = true)
     lastEntry = appendInfo.lastEntry
     assertEquals(producerEpoch, lastEntry.producerEpoch)
     assertEquals(6, lastEntry.firstSeq)
@@ -551,7 +555,7 @@ class ProducerStateManagerTest extends JUnitSuite {
                      isTransactional: Boolean = false,
                      isLoadingFromLog: Boolean = false): Unit = {
     val producerAppendInfo = new ProducerAppendInfo(pid, 
mapping.lastEntry(pid), isLoadingFromLog)
-    producerAppendInfo.append(epoch, seq, seq, timestamp, offset, 
isTransactional)
+    producerAppendInfo.append(epoch, seq, seq, timestamp, offset, 
isTransactional, shouldValidateSequenceNumbers = true)
     mapping.update(producerAppendInfo)
     mapping.updateMapEndOffset(offset + 1)
   }

Reply via email to