[GitHub] [kafka] dajac commented on a diff in pull request #14378: KAFKA-15459: Convert coordinator retriable errors to a known producer response error

2023-09-13 Thread via GitHub


dajac commented on code in PR #14378:
URL: https://github.com/apache/kafka/pull/14378#discussion_r1324871934


##
core/src/test/scala/unit/kafka/server/ReplicaManagerTest.scala:
##
@@ -2569,6 +2571,70 @@ class ReplicaManagerTest {
 
   assertEquals((Errors.NONE, node0), 
replicaManager.getTransactionCoordinator(txnCoordinatorPartition0))
   assertEquals((Errors.COORDINATOR_NOT_AVAILABLE, Node.noNode), 
replicaManager.getTransactionCoordinator(txnCoordinatorPartition1))
+
+  // Test we convert the error correctly when trying to append and 
coordinator is not available
+  val tp0 = new TopicPartition(topic, 0)
+  val producerId = 24L
+  val producerEpoch = 0.toShort
+  val sequence = 0
+  replicaManager.becomeLeaderOrFollower(1,
+makeLeaderAndIsrRequest(topicIds(tp0.topic), tp0, Seq(0, 1), 
LeaderAndIsr(1, List(0, 1))),
+(_, _) => ())
+  val transactionalRecords = 
MemoryRecords.withTransactionalRecords(CompressionType.NONE, producerId, 
producerEpoch, sequence,
+new SimpleRecord("message".getBytes))
+  val result = appendRecords(replicaManager, tp0, transactionalRecords, 
transactionalId = transactionalId, transactionStatePartition = 
Some(txnCoordinatorPartition1))
+  val expectedError = s"Unable to verify the partition has been added to 
the transaction. Underlying error:${Errors.COORDINATOR_NOT_AVAILABLE.toString}"
+  assertEquals(Errors.NOT_ENOUGH_REPLICAS, result.assertFired.error)
+  assertEquals(expectedError, result.assertFired.errorMessage)
+} finally {
+  replicaManager.shutdown(checkpointHW = false)
+}
+  }
+
+  @Test
+  def testVerificationErrorConversions(): Unit = {
+val tp0 = new TopicPartition(topic, 0)
+val producerId = 24L
+val producerEpoch = 0.toShort
+val sequence = 0
+val node = new Node(0, "host1", 0)
+val addPartitionsToTxnManager = mock(classOf[AddPartitionsToTxnManager])
+
+val replicaManager = 
setUpReplicaManagerWithMockedAddPartitionsToTxnManager(addPartitionsToTxnManager,
 List(tp0), node)
+try {
+  replicaManager.becomeLeaderOrFollower(1,
+makeLeaderAndIsrRequest(topicIds(tp0.topic), tp0, Seq(0, 1), 
LeaderAndIsr(1, List(0, 1))),
+(_, _) => ())
+
+  val transactionalRecords = 
MemoryRecords.withTransactionalRecords(CompressionType.NONE, producerId, 
producerEpoch, sequence,
+new SimpleRecord("message".getBytes))
+
+  val transactionToAdd = new AddPartitionsToTxnTransaction()
+.setTransactionalId(transactionalId)
+.setProducerId(producerId)
+.setProducerEpoch(producerEpoch)
+.setVerifyOnly(true)
+.setTopics(new AddPartitionsToTxnTopicCollection(
+  Seq(new 
AddPartitionsToTxnTopic().setName(tp0.topic).setPartitions(Collections.singletonList(tp0.partition))).iterator.asJava
+))
+
+  // Start verification and return the coordinator related errors.
+  var invocations = 1
+  def verifyError(error: Errors): Unit = {
+val expectedMessage = s"Unable to verify the partition has been added 
to the transaction. Underlying error:${error.toString}"
+val result = appendRecords(replicaManager, tp0, transactionalRecords, 
transactionalId = transactionalId, transactionStatePartition = Some(0))
+val appendCallback = 
ArgumentCaptor.forClass(classOf[AddPartitionsToTxnManager.AppendCallback])
+verify(addPartitionsToTxnManager, 
times(invocations)).addTxnData(ArgumentMatchers.eq(node), 
ArgumentMatchers.eq(transactionToAdd), appendCallback.capture())
+
+// Confirm we did not write to the log and instead returned the 
converted error with the correct error message.
+val callback: AddPartitionsToTxnManager.AppendCallback = 
appendCallback.getValue()
+callback(Map(tp0 -> error).toMap)
+assertEquals(Errors.NOT_ENOUGH_REPLICAS, result.assertFired.error)
+assertEquals(expectedMessage, result.assertFired.errorMessage)
+invocations = invocations + 1
+  }
+
+  Set(Errors.NOT_COORDINATOR, Errors.CONCURRENT_TRANSACTIONS, 
Errors.COORDINATOR_LOAD_IN_PROGRESS).foreach(verifyError(_))

Review Comment:
   That's fine.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] dajac commented on a diff in pull request #14378: KAFKA-15459: Convert coordinator retriable errors to a known producer response error

2023-09-13 Thread via GitHub


dajac commented on code in PR #14378:
URL: https://github.com/apache/kafka/pull/14378#discussion_r1324868208


##
core/src/main/scala/kafka/server/ReplicaManager.scala:
##
@@ -753,39 +753,63 @@ class ReplicaManager(val config: KafkaConfig,
 val localProduceResults = appendToLocalLog(internalTopicsAllowed = 
internalTopicsAllowed,
   origin, verifiedEntries, requiredAcks, requestLocal, 
verificationGuards.toMap)
 debug("Produce to local log in %d ms".format(time.milliseconds - 
sTime))
-
-val unverifiedResults = unverifiedEntries.map { case (topicPartition, 
error) =>
-  val message = if (error == Errors.INVALID_TXN_STATE) "Partition was 
not added to the transaction" else error.message()
-  topicPartition -> LogAppendResult(
-LogAppendInfo.UNKNOWN_LOG_APPEND_INFO,
-Some(error.exception(message))
-  )
-}
 
-val errorResults = errorsPerPartition.map { case (topicPartition, 
error) =>
-  topicPartition -> LogAppendResult(
-LogAppendInfo.UNKNOWN_LOG_APPEND_INFO,
-Some(error.exception())
-  )
+def produceStatusResult(appendResult: Map[TopicPartition, 
LogAppendResult],
+useCustomMessage: Boolean): 
Map[TopicPartition, ProducePartitionStatus] = {
+  appendResult.map { case (topicPartition, result) =>
+topicPartition -> ProducePartitionStatus(
+  result.info.lastOffset + 1, // required offset
+  new PartitionResponse(
+result.error,
+result.info.firstOffset.map[Long](_.messageOffset).orElse(-1L),
+result.info.lastOffset,
+result.info.logAppendTime,
+result.info.logStartOffset,
+result.info.recordErrors,
+if (useCustomMessage) result.exception.get.getMessage else 
result.info.errorMessage

Review Comment:
   I suppose that you are saying that `result.info.errorMessage` is always null 
when `useCustomMessage` is true because we know that 
`LogAppendInfo.UnknownLogAppendInfo` does not set it. Am I right?
   
   What I was wondering is why don't we set it all the time... I suppose that 
what you did here is fine.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] dajac commented on a diff in pull request #14378: KAFKA-15459: Convert coordinator retriable errors to a known producer response error

2023-09-13 Thread via GitHub


dajac commented on code in PR #14378:
URL: https://github.com/apache/kafka/pull/14378#discussion_r1324869043


##
core/src/test/scala/unit/kafka/server/ReplicaManagerTest.scala:
##
@@ -2569,6 +2571,70 @@ class ReplicaManagerTest {
 
   assertEquals((Errors.NONE, node0), 
replicaManager.getTransactionCoordinator(txnCoordinatorPartition0))
   assertEquals((Errors.COORDINATOR_NOT_AVAILABLE, Node.noNode), 
replicaManager.getTransactionCoordinator(txnCoordinatorPartition1))
+
+  // Test we convert the error correctly when trying to append and 
coordinator is not available
+  val tp0 = new TopicPartition(topic, 0)
+  val producerId = 24L
+  val producerEpoch = 0.toShort
+  val sequence = 0
+  replicaManager.becomeLeaderOrFollower(1,
+makeLeaderAndIsrRequest(topicIds(tp0.topic), tp0, Seq(0, 1), 
LeaderAndIsr(1, List(0, 1))),
+(_, _) => ())
+  val transactionalRecords = 
MemoryRecords.withTransactionalRecords(CompressionType.NONE, producerId, 
producerEpoch, sequence,
+new SimpleRecord("message".getBytes))
+  val result = appendRecords(replicaManager, tp0, transactionalRecords, 
transactionalId = transactionalId, transactionStatePartition = 
Some(txnCoordinatorPartition1))
+  val expectedError = s"Unable to verify the partition has been added to 
the transaction. Underlying error:${Errors.COORDINATOR_NOT_AVAILABLE.toString}"
+  assertEquals(Errors.NOT_ENOUGH_REPLICAS, result.assertFired.error)
+  assertEquals(expectedError, result.assertFired.errorMessage)
+} finally {
+  replicaManager.shutdown(checkpointHW = false)
+}
+  }
+
+  @Test
+  def testVerificationErrorConversions(): Unit = {
+val tp0 = new TopicPartition(topic, 0)
+val producerId = 24L
+val producerEpoch = 0.toShort
+val sequence = 0
+val node = new Node(0, "host1", 0)
+val addPartitionsToTxnManager = mock(classOf[AddPartitionsToTxnManager])
+
+val replicaManager = 
setUpReplicaManagerWithMockedAddPartitionsToTxnManager(addPartitionsToTxnManager,
 List(tp0), node)
+try {
+  replicaManager.becomeLeaderOrFollower(1,
+makeLeaderAndIsrRequest(topicIds(tp0.topic), tp0, Seq(0, 1), 
LeaderAndIsr(1, List(0, 1))),
+(_, _) => ())
+
+  val transactionalRecords = 
MemoryRecords.withTransactionalRecords(CompressionType.NONE, producerId, 
producerEpoch, sequence,
+new SimpleRecord("message".getBytes))
+
+  val transactionToAdd = new AddPartitionsToTxnTransaction()
+.setTransactionalId(transactionalId)
+.setProducerId(producerId)
+.setProducerEpoch(producerEpoch)
+.setVerifyOnly(true)
+.setTopics(new AddPartitionsToTxnTopicCollection(
+  Seq(new 
AddPartitionsToTxnTopic().setName(tp0.topic).setPartitions(Collections.singletonList(tp0.partition))).iterator.asJava
+))
+
+  // Start verification and return the coordinator related errors.
+  var invocations = 1
+  def verifyError(error: Errors): Unit = {
+val expectedMessage = s"Unable to verify the partition has been added 
to the transaction. Underlying error:${error.toString}"
+val result = appendRecords(replicaManager, tp0, transactionalRecords, 
transactionalId = transactionalId, transactionStatePartition = Some(0))
+val appendCallback = 
ArgumentCaptor.forClass(classOf[AddPartitionsToTxnManager.AppendCallback])
+verify(addPartitionsToTxnManager, 
times(invocations)).addTxnData(ArgumentMatchers.eq(node), 
ArgumentMatchers.eq(transactionToAdd), appendCallback.capture())
+
+// Confirm we did not write to the log and instead returned the 
converted error with the correct error message.
+val callback: AddPartitionsToTxnManager.AppendCallback = 
appendCallback.getValue()
+callback(Map(tp0 -> error).toMap)
+assertEquals(Errors.NOT_ENOUGH_REPLICAS, result.assertFired.error)
+assertEquals(expectedMessage, result.assertFired.errorMessage)
+invocations = invocations + 1

Review Comment:
   Personally, I would use the ParameterizedTest but I leave it up to you. The 
`invocations` feels a bit hacky.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] dajac commented on a diff in pull request #14378: KAFKA-15459: Convert coordinator retriable errors to a known producer response error

2023-09-13 Thread via GitHub


dajac commented on code in PR #14378:
URL: https://github.com/apache/kafka/pull/14378#discussion_r1324868507


##
core/src/test/scala/unit/kafka/server/ReplicaManagerTest.scala:
##
@@ -2569,6 +2571,70 @@ class ReplicaManagerTest {
 
   assertEquals((Errors.NONE, node0), 
replicaManager.getTransactionCoordinator(txnCoordinatorPartition0))
   assertEquals((Errors.COORDINATOR_NOT_AVAILABLE, Node.noNode), 
replicaManager.getTransactionCoordinator(txnCoordinatorPartition1))
+
+  // Test we convert the error correctly when trying to append and 
coordinator is not available
+  val tp0 = new TopicPartition(topic, 0)
+  val producerId = 24L
+  val producerEpoch = 0.toShort
+  val sequence = 0
+  replicaManager.becomeLeaderOrFollower(1,
+makeLeaderAndIsrRequest(topicIds(tp0.topic), tp0, Seq(0, 1), 
LeaderAndIsr(1, List(0, 1))),
+(_, _) => ())
+  val transactionalRecords = 
MemoryRecords.withTransactionalRecords(CompressionType.NONE, producerId, 
producerEpoch, sequence,
+new SimpleRecord("message".getBytes))
+  val result = appendRecords(replicaManager, tp0, transactionalRecords, 
transactionalId = transactionalId, transactionStatePartition = 
Some(txnCoordinatorPartition1))
+  val expectedError = s"Unable to verify the partition has been added to 
the transaction. Underlying error:${Errors.COORDINATOR_NOT_AVAILABLE.toString}"
+  assertEquals(Errors.NOT_ENOUGH_REPLICAS, result.assertFired.error)
+  assertEquals(expectedError, result.assertFired.errorMessage)
+} finally {
+  replicaManager.shutdown(checkpointHW = false)
+}
+  }
+
+  @Test
+  def testVerificationErrorConversions(): Unit = {
+val tp0 = new TopicPartition(topic, 0)
+val producerId = 24L
+val producerEpoch = 0.toShort
+val sequence = 0
+val node = new Node(0, "host1", 0)
+val addPartitionsToTxnManager = mock(classOf[AddPartitionsToTxnManager])
+
+val replicaManager = 
setUpReplicaManagerWithMockedAddPartitionsToTxnManager(addPartitionsToTxnManager,
 List(tp0), node)
+try {
+  replicaManager.becomeLeaderOrFollower(1,
+makeLeaderAndIsrRequest(topicIds(tp0.topic), tp0, Seq(0, 1), 
LeaderAndIsr(1, List(0, 1))),
+(_, _) => ())
+
+  val transactionalRecords = 
MemoryRecords.withTransactionalRecords(CompressionType.NONE, producerId, 
producerEpoch, sequence,
+new SimpleRecord("message".getBytes))
+
+  val transactionToAdd = new AddPartitionsToTxnTransaction()
+.setTransactionalId(transactionalId)
+.setProducerId(producerId)
+.setProducerEpoch(producerEpoch)
+.setVerifyOnly(true)
+.setTopics(new AddPartitionsToTxnTopicCollection(
+  Seq(new 
AddPartitionsToTxnTopic().setName(tp0.topic).setPartitions(Collections.singletonList(tp0.partition))).iterator.asJava
+))
+
+  // Start verification and return the coordinator related errors.
+  var invocations = 1
+  def verifyError(error: Errors): Unit = {
+val expectedMessage = s"Unable to verify the partition has been added 
to the transaction. Underlying error:${error.toString}"
+val result = appendRecords(replicaManager, tp0, transactionalRecords, 
transactionalId = transactionalId, transactionStatePartition = Some(0))
+val appendCallback = 
ArgumentCaptor.forClass(classOf[AddPartitionsToTxnManager.AppendCallback])
+verify(addPartitionsToTxnManager, 
times(invocations)).addTxnData(ArgumentMatchers.eq(node), 
ArgumentMatchers.eq(transactionToAdd), appendCallback.capture())
+
+// Confirm we did not write to the log and instead returned the 
converted error with the correct error message.
+val callback: AddPartitionsToTxnManager.AppendCallback = 
appendCallback.getValue()
+callback(Map(tp0 -> error).toMap)
+assertEquals(Errors.NOT_ENOUGH_REPLICAS, result.assertFired.error)
+assertEquals(expectedMessage, result.assertFired.errorMessage)
+invocations = invocations + 1
+  }
+
+  Set(Errors.NOT_COORDINATOR, Errors.CONCURRENT_TRANSACTIONS, 
Errors.COORDINATOR_LOAD_IN_PROGRESS).foreach(verifyError(_))

Review Comment:
   Understood. I think that it is fine as it is then.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] dajac commented on a diff in pull request #14378: KAFKA-15459: Convert coordinator retriable errors to a known producer response error

2023-09-13 Thread via GitHub


dajac commented on code in PR #14378:
URL: https://github.com/apache/kafka/pull/14378#discussion_r1324868208


##
core/src/main/scala/kafka/server/ReplicaManager.scala:
##
@@ -753,39 +753,63 @@ class ReplicaManager(val config: KafkaConfig,
 val localProduceResults = appendToLocalLog(internalTopicsAllowed = 
internalTopicsAllowed,
   origin, verifiedEntries, requiredAcks, requestLocal, 
verificationGuards.toMap)
 debug("Produce to local log in %d ms".format(time.milliseconds - 
sTime))
-
-val unverifiedResults = unverifiedEntries.map { case (topicPartition, 
error) =>
-  val message = if (error == Errors.INVALID_TXN_STATE) "Partition was 
not added to the transaction" else error.message()
-  topicPartition -> LogAppendResult(
-LogAppendInfo.UNKNOWN_LOG_APPEND_INFO,
-Some(error.exception(message))
-  )
-}
 
-val errorResults = errorsPerPartition.map { case (topicPartition, 
error) =>
-  topicPartition -> LogAppendResult(
-LogAppendInfo.UNKNOWN_LOG_APPEND_INFO,
-Some(error.exception())
-  )
+def produceStatusResult(appendResult: Map[TopicPartition, 
LogAppendResult],
+useCustomMessage: Boolean): 
Map[TopicPartition, ProducePartitionStatus] = {
+  appendResult.map { case (topicPartition, result) =>
+topicPartition -> ProducePartitionStatus(
+  result.info.lastOffset + 1, // required offset
+  new PartitionResponse(
+result.error,
+result.info.firstOffset.map[Long](_.messageOffset).orElse(-1L),
+result.info.lastOffset,
+result.info.logAppendTime,
+result.info.logStartOffset,
+result.info.recordErrors,
+if (useCustomMessage) result.exception.get.getMessage else 
result.info.errorMessage

Review Comment:
   I suppose that you are saying that `result.info.errorMessage` is always null 
when `useCustomMessage` is true because we know that 
`LogAppendInfo.UnknownLogAppendInfo` does not set it. Am I right?
   
   What I was wondering is why don't we set it all the time. I suppose that 
what you did here is fine.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] dajac commented on a diff in pull request #14378: KAFKA-15459: Convert coordinator retriable errors to a known producer response error

2023-09-13 Thread via GitHub


dajac commented on code in PR #14378:
URL: https://github.com/apache/kafka/pull/14378#discussion_r1324869297


##
core/src/main/scala/kafka/server/ReplicaManager.scala:
##
@@ -753,39 +753,56 @@ class ReplicaManager(val config: KafkaConfig,
 val localProduceResults = appendToLocalLog(internalTopicsAllowed = 
internalTopicsAllowed,
   origin, verifiedEntries, requiredAcks, requestLocal, 
verificationGuards.toMap)
 debug("Produce to local log in %d ms".format(time.milliseconds - 
sTime))
-
-val unverifiedResults = unverifiedEntries.map { case (topicPartition, 
error) =>
-  val message = if (error == Errors.INVALID_TXN_STATE) "Partition was 
not added to the transaction" else error.message()
-  topicPartition -> LogAppendResult(
-LogAppendInfo.UNKNOWN_LOG_APPEND_INFO,
-Some(error.exception(message))
-  )
-}
 
-val errorResults = errorsPerPartition.map { case (topicPartition, 
error) =>
-  topicPartition -> LogAppendResult(
-LogAppendInfo.UNKNOWN_LOG_APPEND_INFO,
-Some(error.exception())
-  )
+def produceStatusResult(appendResult: Map[TopicPartition, 
LogAppendResult],
+useCustomMessage: Boolean): 
Map[TopicPartition, ProducePartitionStatus] = {
+  appendResult.map { case (topicPartition, result) =>
+topicPartition -> ProducePartitionStatus(
+  result.info.lastOffset + 1, // required offset
+  new PartitionResponse(
+result.error,
+result.info.firstOffset.map[Long](_.messageOffset).orElse(-1L),
+result.info.lastOffset,
+result.info.logAppendTime,
+result.info.logStartOffset,
+result.info.recordErrors,
+if (useCustomMessage) result.exception.get.getMessage else 
result.info.errorMessage
+  )
+) // response status
+  }
 }
 
-val allResults = localProduceResults ++ unverifiedResults ++ 
errorResults
+val unverifiedResults = unverifiedEntries.map {
+  case (topicPartition, error) =>
+val finalException =
+  error match {
+case Errors.INVALID_TXN_STATE => error.exception("Partition 
was not added to the transaction")
+case Errors.CONCURRENT_TRANSACTIONS |
+ Errors.COORDINATOR_LOAD_IN_PROGRESS |
+ Errors.COORDINATOR_NOT_AVAILABLE |
+ Errors.NOT_COORDINATOR => new NotEnoughReplicasException(
+  s"Unable to verify the partition has been added to the 
transaction. Underlying error: ${error.toString}")

Review Comment:
   nit: Indentation.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] dajac commented on a diff in pull request #14378: KAFKA-15459: Convert coordinator retriable errors to a known producer response error

2023-09-13 Thread via GitHub


dajac commented on code in PR #14378:
URL: https://github.com/apache/kafka/pull/14378#discussion_r1324437789


##
core/src/main/scala/kafka/server/ReplicaManager.scala:
##
@@ -753,39 +753,63 @@ class ReplicaManager(val config: KafkaConfig,
 val localProduceResults = appendToLocalLog(internalTopicsAllowed = 
internalTopicsAllowed,
   origin, verifiedEntries, requiredAcks, requestLocal, 
verificationGuards.toMap)
 debug("Produce to local log in %d ms".format(time.milliseconds - 
sTime))
-
-val unverifiedResults = unverifiedEntries.map { case (topicPartition, 
error) =>
-  val message = if (error == Errors.INVALID_TXN_STATE) "Partition was 
not added to the transaction" else error.message()
-  topicPartition -> LogAppendResult(
-LogAppendInfo.UNKNOWN_LOG_APPEND_INFO,
-Some(error.exception(message))
-  )
-}
 
-val errorResults = errorsPerPartition.map { case (topicPartition, 
error) =>
-  topicPartition -> LogAppendResult(
-LogAppendInfo.UNKNOWN_LOG_APPEND_INFO,
-Some(error.exception())
-  )
+def produceStatusResult(appendResult: Map[TopicPartition, 
LogAppendResult],
+useCustomMessage: Boolean): 
Map[TopicPartition, ProducePartitionStatus] = {
+  appendResult.map { case (topicPartition, result) =>
+topicPartition -> ProducePartitionStatus(
+  result.info.lastOffset + 1, // required offset
+  new PartitionResponse(
+result.error,
+result.info.firstOffset.map[Long](_.messageOffset).orElse(-1L),
+result.info.lastOffset,
+result.info.logAppendTime,
+result.info.logStartOffset,
+result.info.recordErrors,
+if (useCustomMessage) result.exception.get.getMessage else 
result.info.errorMessage

Review Comment:
   It is interesting to note that `result.info.errorMessage` was only set in a 
few cases which means that the error field in the response was not always 
populated. With this change, it will always be so I wonder if the previous 
handling was done on purpose, perhaps to avoid returning the default error 
message. Thoughts?
   
   Long term, we should remove `result.info.errorMessage` and rely on the 
message provided in the exception. We could compare with the default to avoid 
sending the default message if we want to as well.



##
core/src/main/scala/kafka/server/ReplicaManager.scala:
##
@@ -753,39 +753,63 @@ class ReplicaManager(val config: KafkaConfig,
 val localProduceResults = appendToLocalLog(internalTopicsAllowed = 
internalTopicsAllowed,
   origin, verifiedEntries, requiredAcks, requestLocal, 
verificationGuards.toMap)
 debug("Produce to local log in %d ms".format(time.milliseconds - 
sTime))
-
-val unverifiedResults = unverifiedEntries.map { case (topicPartition, 
error) =>
-  val message = if (error == Errors.INVALID_TXN_STATE) "Partition was 
not added to the transaction" else error.message()
-  topicPartition -> LogAppendResult(
-LogAppendInfo.UNKNOWN_LOG_APPEND_INFO,
-Some(error.exception(message))
-  )
-}
 
-val errorResults = errorsPerPartition.map { case (topicPartition, 
error) =>
-  topicPartition -> LogAppendResult(
-LogAppendInfo.UNKNOWN_LOG_APPEND_INFO,
-Some(error.exception())
-  )
+def produceStatusResult(appendResult: Map[TopicPartition, 
LogAppendResult],
+useCustomMessage: Boolean): 
Map[TopicPartition, ProducePartitionStatus] = {
+  appendResult.map { case (topicPartition, result) =>
+topicPartition -> ProducePartitionStatus(
+  result.info.lastOffset + 1, // required offset
+  new PartitionResponse(
+result.error,
+result.info.firstOffset.map[Long](_.messageOffset).orElse(-1L),
+result.info.lastOffset,
+result.info.logAppendTime,
+result.info.logStartOffset,
+result.info.recordErrors,
+if (useCustomMessage) result.exception.get.getMessage else 
result.info.errorMessage
+  )
+) // response status
+  }
 }
 
-val allResults = localProduceResults ++ unverifiedResults ++ 
errorResults
+val unverifiedResults = unverifiedEntries.map {
+  case (topicPartition, error) =>
+val finalError =
+  error match {
+case Errors.CONCURRENT_TRANSACTIONS |
+ Errors.COORDINATOR_LOAD_IN_PROGRESS |
+ Errors.COORDINATOR_NOT_AVAILABLE |
+ Errors.NOT_COORDINATOR => Errors.NOT_ENOUGH_REPLICAS
+case _ => error
+}
+