Re: [PR] KAFKA-15526: Simplify the LogAppendInfo class [kafka]
splett2 commented on PR #14470: URL: https://github.com/apache/kafka/pull/14470#issuecomment-1746898555 thanks for the feedback @ijuma I opened up https://github.com/apache/kafka/pull/14485 to address the comments. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-15526: Simplify the LogAppendInfo class [kafka]
ijuma commented on code in PR #14470: URL: https://github.com/apache/kafka/pull/14470#discussion_r1345296313 ## core/src/main/scala/kafka/raft/KafkaMetadataLog.scala: ## @@ -96,10 +96,10 @@ final class KafkaMetadataLog private ( } private def handleAndConvertLogAppendInfo(appendInfo: internals.log.LogAppendInfo): LogAppendInfo = { -if (appendInfo.firstOffset.isPresent()) - new LogAppendInfo(appendInfo.firstOffset.get().messageOffset, appendInfo.lastOffset) +if (appendInfo.firstOffset != UnifiedLog.UnknownOffset) Review Comment: We are making things less type safe here by using sentinels vs types. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-15526: Simplify the LogAppendInfo class [kafka]
ijuma commented on code in PR #14470: URL: https://github.com/apache/kafka/pull/14470#discussion_r1345292439 ## core/src/main/scala/kafka/log/UnifiedLog.scala: ## @@ -768,10 +768,10 @@ class UnifiedLog(@volatile var logStartOffset: Long, // This will ensure that any log data can be recovered with the correct topic ID in the case of failure. maybeFlushMetadataFile() -val appendInfo = analyzeAndValidateRecords(records, origin, ignoreRecordSize, leaderEpoch) +val appendInfo = analyzeAndValidateRecords(records, origin, ignoreRecordSize, !validateAndAssignOffsets, leaderEpoch) // return if we have no valid messages or if this is a duplicate of the last appended entry -if (appendInfo.shallowCount == 0) appendInfo +if (appendInfo.validBytes <= 0) appendInfo Review Comment: I think we have to be careful about changes like this. The previous version had fewer assumptions than this one. I think it's fine not to include the shallowCount as a field, but we should have a method that represents the intent vs having the implicit assumption regarding how `validBytes` is populated (including the behavior when there are duplicates of the last appended entry). -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-15526: Simplify the LogAppendInfo class [kafka]
jolshan merged PR #14470: URL: https://github.com/apache/kafka/pull/14470 -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-15526: Simplify the LogAppendInfo class [kafka]
splett2 commented on PR #14470: URL: https://github.com/apache/kafka/pull/14470#issuecomment-1745872570 thanks for the prompt reviews! -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-15526: Simplify the LogAppendInfo class [kafka]
jolshan commented on code in PR #14470: URL: https://github.com/apache/kafka/pull/14470#discussion_r1344697594 ## core/src/main/scala/kafka/server/ReplicaManager.scala: ## @@ -74,11 +74,20 @@ import scala.jdk.CollectionConverters._ /* * Result metadata of a log append operation on the log */ -case class LogAppendResult(info: LogAppendInfo, exception: Option[Throwable] = None) { +case class LogAppendResult(info: LogAppendInfo, + exception: Option[Throwable], + hasCustomErrorMessage: Boolean) { Review Comment: That's fair. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-15526: Simplify the LogAppendInfo class [kafka]
jolshan commented on code in PR #14470: URL: https://github.com/apache/kafka/pull/14470#discussion_r1344697202 ## core/src/main/scala/kafka/server/ReplicaManager.scala: ## @@ -763,20 +763,20 @@ class ReplicaManager(val config: KafkaConfig, val errorResults = (unverifiedEntries ++ errorsPerPartition).map { case (topicPartition, error) => // translate transaction coordinator errors to known producer response errors -val finalException = +val customException = error match { -case Errors.INVALID_TXN_STATE => error.exception("Partition was not added to the transaction") +case Errors.INVALID_TXN_STATE => Some(error.exception("Partition was not added to the transaction")) case Errors.CONCURRENT_TRANSACTIONS | Errors.COORDINATOR_LOAD_IN_PROGRESS | Errors.COORDINATOR_NOT_AVAILABLE | - Errors.NOT_COORDINATOR => new NotEnoughReplicasException( - s"Unable to verify the partition has been added to the transaction. Underlying error: ${error.toString}") -case _ => error.exception() + Errors.NOT_COORDINATOR => Some(new NotEnoughReplicasException( + s"Unable to verify the partition has been added to the transaction. Underlying error: ${error.toString}")) +case _ => None } topicPartition -> LogAppendResult( LogAppendInfo.UNKNOWN_LOG_APPEND_INFO, - Some(finalException), - hasCustomErrorMessage = true + Some(customException.getOrElse(error.exception)), Review Comment: This is pretty elegant -- thanks for fixing. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-15526: Simplify the LogAppendInfo class [kafka]
splett2 commented on code in PR #14470: URL: https://github.com/apache/kafka/pull/14470#discussion_r1344456909 ## core/src/main/scala/kafka/server/ReplicaManager.scala: ## @@ -74,11 +74,20 @@ import scala.jdk.CollectionConverters._ /* * Result metadata of a log append operation on the log */ -case class LogAppendResult(info: LogAppendInfo, exception: Option[Throwable] = None) { +case class LogAppendResult(info: LogAppendInfo, + exception: Option[Throwable], + hasCustomErrorMessage: Boolean) { Review Comment: honestly, I'm just not a fan of having default args. I think it makes things less explicit, hence the change. ## core/src/main/scala/kafka/server/ReplicaManager.scala: ## @@ -74,11 +74,20 @@ import scala.jdk.CollectionConverters._ /* * Result metadata of a log append operation on the log */ -case class LogAppendResult(info: LogAppendInfo, exception: Option[Throwable] = None) { +case class LogAppendResult(info: LogAppendInfo, + exception: Option[Throwable], + hasCustomErrorMessage: Boolean) { Review Comment: honestly, I'm just not a fan of having default args. I think it makes things tricky to reason about, hence the change. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-15526: Simplify the LogAppendInfo class [kafka]
splett2 commented on code in PR #14470: URL: https://github.com/apache/kafka/pull/14470#discussion_r131966 ## core/src/main/scala/kafka/server/ReplicaManager.scala: ## @@ -766,7 +775,8 @@ class ReplicaManager(val config: KafkaConfig, } topicPartition -> LogAppendResult( LogAppendInfo.UNKNOWN_LOG_APPEND_INFO, - Some(finalException) + Some(finalException), + hasCustomErrorMessage = true Review Comment: hmm, I see what you are saying. I will try to fix it in this PR I guess. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-15526: Simplify the LogAppendInfo class [kafka]
jolshan commented on code in PR #14470: URL: https://github.com/apache/kafka/pull/14470#discussion_r133473 ## core/src/main/scala/kafka/server/ReplicaManager.scala: ## @@ -766,7 +775,8 @@ class ReplicaManager(val config: KafkaConfig, } topicPartition -> LogAppendResult( LogAppendInfo.UNKNOWN_LOG_APPEND_INFO, - Some(finalException) + Some(finalException), + hasCustomErrorMessage = true Review Comment: Ok -- no worries if it is too annoying to do here. This code is very convoluted. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-15526: Simplify the LogAppendInfo class [kafka]
splett2 commented on code in PR #14470: URL: https://github.com/apache/kafka/pull/14470#discussion_r131966 ## core/src/main/scala/kafka/server/ReplicaManager.scala: ## @@ -766,7 +775,8 @@ class ReplicaManager(val config: KafkaConfig, } topicPartition -> LogAppendResult( LogAppendInfo.UNKNOWN_LOG_APPEND_INFO, - Some(finalException) + Some(finalException), + hasCustomErrorMessage = true Review Comment: hmm, I see what you are saying. I will fix it. ## core/src/main/scala/kafka/server/ReplicaManager.scala: ## @@ -766,7 +775,8 @@ class ReplicaManager(val config: KafkaConfig, } topicPartition -> LogAppendResult( LogAppendInfo.UNKNOWN_LOG_APPEND_INFO, - Some(finalException) + Some(finalException), + hasCustomErrorMessage = true Review Comment: hmm, I see what you are saying. I will fix it in this PR I guess. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-15526: Simplify the LogAppendInfo class [kafka]
jolshan commented on code in PR #14470: URL: https://github.com/apache/kafka/pull/14470#discussion_r1344423738 ## core/src/main/scala/kafka/server/ReplicaManager.scala: ## @@ -766,7 +775,8 @@ class ReplicaManager(val config: KafkaConfig, } topicPartition -> LogAppendResult( LogAppendInfo.UNKNOWN_LOG_APPEND_INFO, - Some(finalException) + Some(finalException), + hasCustomErrorMessage = true Review Comment: I sort of want to refactor this code so maybe we can do that in a followup. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-15526: Simplify the LogAppendInfo class [kafka]
jolshan commented on code in PR #14470: URL: https://github.com/apache/kafka/pull/14470#discussion_r1344419654 ## core/src/main/scala/kafka/server/ReplicaManager.scala: ## @@ -766,7 +775,8 @@ class ReplicaManager(val config: KafkaConfig, } topicPartition -> LogAppendResult( LogAppendInfo.UNKNOWN_LOG_APPEND_INFO, - Some(finalException) + Some(finalException), + hasCustomErrorMessage = true Review Comment: Maybe this is ok -- but I think we are still returning custom error messages for non-custom messages in the final case of the match here. Error.exception I think uses the default message. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-15526: Simplify the LogAppendInfo class [kafka]
jolshan commented on code in PR #14470: URL: https://github.com/apache/kafka/pull/14470#discussion_r1344414660 ## core/src/main/scala/kafka/server/ReplicaManager.scala: ## @@ -74,11 +74,20 @@ import scala.jdk.CollectionConverters._ /* * Result metadata of a log append operation on the log */ -case class LogAppendResult(info: LogAppendInfo, exception: Option[Throwable] = None) { +case class LogAppendResult(info: LogAppendInfo, + exception: Option[Throwable], + hasCustomErrorMessage: Boolean) { Review Comment: We could make the hasCustomErrorMessage param false by default, but there are pros and cons to it. I'm ok with keeping as is. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-15526: Simplify the LogAppendInfo class [kafka]
jolshan commented on code in PR #14470: URL: https://github.com/apache/kafka/pull/14470#discussion_r1344411599 ## core/src/main/scala/kafka/log/UnifiedLog.scala: ## @@ -892,16 +889,11 @@ class UnifiedLog(@volatile var logStartOffset: Long, maybeDuplicate match { case Some(duplicate) => - appendInfo.setFirstOffset(Optional.of(new LogOffsetMetadata(duplicate.firstOffset))) + appendInfo.setFirstOffset(duplicate.firstOffset) appendInfo.setLastOffset(duplicate.lastOffset) appendInfo.setLogAppendTime(duplicate.timestamp) appendInfo.setLogStartOffset(logStartOffset) case None => - // Before appending update the first offset metadata to include segment information - appendInfo.setFirstOffset(appendInfo.firstOffset.map { offsetMetadata => Review Comment: Right -- was just clarifying we use the default. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-15526: Simplify the LogAppendInfo class [kafka]
splett2 commented on code in PR #14470: URL: https://github.com/apache/kafka/pull/14470#discussion_r1343331209 ## core/src/main/scala/kafka/log/UnifiedLog.scala: ## @@ -892,16 +889,11 @@ class UnifiedLog(@volatile var logStartOffset: Long, maybeDuplicate match { case Some(duplicate) => - appendInfo.setFirstOffset(Optional.of(new LogOffsetMetadata(duplicate.firstOffset))) + appendInfo.setFirstOffset(duplicate.firstOffset) appendInfo.setLastOffset(duplicate.lastOffset) appendInfo.setLogAppendTime(duplicate.timestamp) appendInfo.setLogStartOffset(logStartOffset) case None => - // Before appending update the first offset metadata to include segment information - appendInfo.setFirstOffset(appendInfo.firstOffset.map { offsetMetadata => Review Comment: We removed this because none of the readers use anything besides the `messageOffset` (which is the `appendInfo.firstOffset` now). -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-15526: Simplify the LogAppendInfo class [kafka]
splett2 commented on code in PR #14470: URL: https://github.com/apache/kafka/pull/14470#discussion_r1343327345 ## core/src/main/scala/kafka/server/ReplicaManager.scala: ## @@ -787,19 +770,22 @@ class ReplicaManager(val config: KafkaConfig, ) } -val errorResults = errorsPerPartition.map { - case (topicPartition, error) => -topicPartition -> LogAppendResult( - LogAppendInfo.UNKNOWN_LOG_APPEND_INFO, - Some(error.exception()) +val allResults = localProduceResults ++ errorResults +val produceStatus = allResults.map { case (topicPartition, result) => + topicPartition -> ProducePartitionStatus( +result.info.lastOffset + 1, // required offset +new PartitionResponse( + result.error, + result.info.firstOffset, + result.info.lastOffset, + result.info.logAppendTime, + result.info.logStartOffset, + result.info.recordErrors, + result.exception.map(_.getMessage).orNull Review Comment: Hmm, good catch. somehow I didn't think about that. I refactored some of the LogAppendResult stuff to try to make it more explicit when we use a custom error message. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-15526: Simplify the LogAppendInfo class [kafka]
jolshan commented on code in PR #14470: URL: https://github.com/apache/kafka/pull/14470#discussion_r1342935092 ## core/src/main/scala/kafka/log/UnifiedLog.scala: ## @@ -1141,13 +1134,13 @@ class UnifiedLog(@volatile var logStartOffset: Long, // Also indicate whether we have the accurate first offset or not if (!readFirstMessage) { if (batch.magic >= RecordBatch.MAGIC_VALUE_V2) - firstOffset = Optional.of(new LogOffsetMetadata(batch.baseOffset)) + firstOffset = batch.baseOffset lastOffsetOfFirstBatch = batch.lastOffset readFirstMessage = true } // check that offsets are monotonically increasing - if (lastOffset >= batch.lastOffset) + if (requireOffsetsMonotonic && lastOffset >= batch.lastOffset) Review Comment: We technically don't need this since the check also has `if (requireOffsetsMonotonic && !monotonic)` ## core/src/main/scala/kafka/server/ReplicaManager.scala: ## @@ -787,19 +770,22 @@ class ReplicaManager(val config: KafkaConfig, ) } -val errorResults = errorsPerPartition.map { - case (topicPartition, error) => -topicPartition -> LogAppendResult( - LogAppendInfo.UNKNOWN_LOG_APPEND_INFO, - Some(error.exception()) +val allResults = localProduceResults ++ errorResults +val produceStatus = allResults.map { case (topicPartition, result) => + topicPartition -> ProducePartitionStatus( +result.info.lastOffset + 1, // required offset +new PartitionResponse( + result.error, + result.info.firstOffset, + result.info.lastOffset, + result.info.logAppendTime, + result.info.logStartOffset, + result.info.recordErrors, + result.exception.map(_.getMessage).orNull Review Comment: Are we going to start returning the error message for other exceptions now too? I think we will add the default exception message when previously we were not. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-15526: Simplify the LogAppendInfo class [kafka]
jolshan commented on code in PR #14470: URL: https://github.com/apache/kafka/pull/14470#discussion_r1342933163 ## core/src/main/scala/kafka/log/UnifiedLog.scala: ## @@ -892,16 +889,11 @@ class UnifiedLog(@volatile var logStartOffset: Long, maybeDuplicate match { case Some(duplicate) => - appendInfo.setFirstOffset(Optional.of(new LogOffsetMetadata(duplicate.firstOffset))) + appendInfo.setFirstOffset(duplicate.firstOffset) appendInfo.setLastOffset(duplicate.lastOffset) appendInfo.setLogAppendTime(duplicate.timestamp) appendInfo.setLogStartOffset(logStartOffset) case None => - // Before appending update the first offset metadata to include segment information - appendInfo.setFirstOffset(appendInfo.firstOffset.map { offsetMetadata => Review Comment: We removed this since the default is zero, and in that case we take the segment base offset anyway? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-15526: Simplify the LogAppendInfo class [kafka]
splett2 commented on code in PR #14470: URL: https://github.com/apache/kafka/pull/14470#discussion_r1342909611 ## core/src/main/scala/kafka/raft/KafkaMetadataLog.scala: ## @@ -96,10 +96,10 @@ final class KafkaMetadataLog private ( } private def handleAndConvertLogAppendInfo(appendInfo: internals.log.LogAppendInfo): LogAppendInfo = { -if (appendInfo.firstOffset.isPresent()) - new LogAppendInfo(appendInfo.firstOffset.get().messageOffset, appendInfo.lastOffset) +if (appendInfo.firstOffset != UnifiedLog.UnknownOffset) + new LogAppendInfo(appendInfo.firstOffset, appendInfo.lastOffset) else - throw new KafkaException(s"Append failed unexpectedly: ${appendInfo.errorMessage}") Review Comment: `errorMessage` would have never been populated in the calling context of `handleAndConvertLogAppendInfo`, so it is just dropped from the log in the PR. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org