chia7712 commented on a change in pull request #9758:
URL: https://github.com/apache/kafka/pull/9758#discussion_r584224855
##########
File path: core/src/main/scala/kafka/server/KafkaApis.scala
##########
@@ -761,76 +754,80 @@ class KafkaApis(val requestChannel: RequestChannel,
// For fetch requests from clients, check if down-conversion is
disabled for the particular partition
if (!fetchRequest.isFromFollower &&
!logConfig.forall(_.messageDownConversionEnable)) {
trace(s"Conversion to message format ${downConvertMagic.get} is
disabled for partition $tp. Sending unsupported version response to $clientId.")
- errorResponse(Errors.UNSUPPORTED_VERSION)
+ FetchResponse.partitionResponse(tp.partition,
Errors.UNSUPPORTED_VERSION)
} else {
try {
trace(s"Down converting records from partition $tp to message
format version $magic for fetch request from $clientId")
// Because down-conversion is extremely memory intensive, we
want to try and delay the down-conversion as much
// as possible. With KIP-283, we have the ability to lazily
down-convert in a chunked manner. The lazy, chunked
// down-conversion always guarantees that at least one batch
of messages is down-converted and sent out to the
// client.
- val error = maybeDownConvertStorageError(partitionData.error)
- new FetchResponse.PartitionData[BaseRecords](error,
partitionData.highWatermark,
- partitionData.lastStableOffset, partitionData.logStartOffset,
- partitionData.preferredReadReplica,
partitionData.abortedTransactions,
- new LazyDownConversionRecords(tp, unconvertedRecords, magic,
fetchContext.getFetchOffset(tp).get, time))
+ new FetchResponseData.PartitionData()
+ .setPartitionIndex(tp.partition)
+
.setErrorCode(maybeDownConvertStorageError(Errors.forCode(partitionData.errorCode)).code)
+ .setHighWatermark(partitionData.highWatermark)
+ .setLastStableOffset(partitionData.lastStableOffset)
+ .setLogStartOffset(partitionData.logStartOffset)
+ .setAbortedTransactions(partitionData.abortedTransactions)
+ .setRecords(new LazyDownConversionRecords(tp,
unconvertedRecords, magic, fetchContext.getFetchOffset(tp).get, time))
+
.setPreferredReadReplica(partitionData.preferredReadReplica())
} catch {
case e: UnsupportedCompressionTypeException =>
trace("Received unsupported compression type error during
down-conversion", e)
- errorResponse(Errors.UNSUPPORTED_COMPRESSION_TYPE)
+ FetchResponse.partitionResponse(tp.partition,
Errors.UNSUPPORTED_COMPRESSION_TYPE)
}
}
case None =>
- val error = maybeDownConvertStorageError(partitionData.error)
- new FetchResponse.PartitionData[BaseRecords](error,
- partitionData.highWatermark,
- partitionData.lastStableOffset,
- partitionData.logStartOffset,
- partitionData.preferredReadReplica,
- partitionData.abortedTransactions,
- partitionData.divergingEpoch,
- unconvertedRecords)
+ new FetchResponseData.PartitionData()
+ .setPartitionIndex(tp.partition)
+
.setErrorCode(maybeDownConvertStorageError(Errors.forCode(partitionData.errorCode)).code)
+ .setHighWatermark(partitionData.highWatermark)
+ .setLastStableOffset(partitionData.lastStableOffset)
+ .setLogStartOffset(partitionData.logStartOffset)
+ .setAbortedTransactions(partitionData.abortedTransactions)
+ .setRecords(unconvertedRecords)
+ .setPreferredReadReplica(partitionData.preferredReadReplica)
+ .setDivergingEpoch(partitionData.divergingEpoch)
}
}
}
// the callback for process a fetch response, invoked before throttling
def processResponseCallback(responsePartitionData: Seq[(TopicPartition,
FetchPartitionData)]): Unit = {
- val partitions = new util.LinkedHashMap[TopicPartition,
FetchResponse.PartitionData[Records]]
+ val partitions = new util.LinkedHashMap[TopicPartition,
FetchResponseData.PartitionData]
val reassigningPartitions = mutable.Set[TopicPartition]()
responsePartitionData.foreach { case (tp, data) =>
val abortedTransactions = data.abortedTransactions.map(_.asJava).orNull
val lastStableOffset =
data.lastStableOffset.getOrElse(FetchResponse.INVALID_LAST_STABLE_OFFSET)
- if (data.isReassignmentFetch)
- reassigningPartitions.add(tp)
- val error = maybeDownConvertStorageError(data.error)
- partitions.put(tp, new FetchResponse.PartitionData(
- error,
- data.highWatermark,
- lastStableOffset,
- data.logStartOffset,
- data.preferredReadReplica.map(int2Integer).asJava,
- abortedTransactions,
- data.divergingEpoch.asJava,
- data.records))
+ if (data.isReassignmentFetch) reassigningPartitions.add(tp)
+ partitions.put(tp, new FetchResponseData.PartitionData()
+ .setPartitionIndex(tp.partition)
+ .setErrorCode(maybeDownConvertStorageError(data.error).code)
+ .setHighWatermark(data.highWatermark)
+ .setLastStableOffset(lastStableOffset)
+ .setLogStartOffset(data.logStartOffset)
+ .setAbortedTransactions(abortedTransactions)
+ .setRecords(data.records)
+
.setPreferredReadReplica(data.preferredReadReplica.getOrElse(FetchResponse.INVALID_PREFERRED_REPLICA_ID))
+ .setDivergingEpoch(data.divergingEpoch.getOrElse(new
FetchResponseData.EpochEndOffset)))
Review comment:
same to #9758 (comment)
----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
For queries about this service, please contact Infrastructure at:
[email protected]