[GitHub] [kafka] chia7712 commented on a change in pull request #10269: KAFKA-12410 KafkaAPis ought to group fetch data before generating fet…

2021-03-08 Thread GitBox


chia7712 commented on a change in pull request #10269:
URL: https://github.com/apache/kafka/pull/10269#discussion_r589633902



##
File path: core/src/main/scala/kafka/server/KafkaApis.scala
##
@@ -3368,8 +3396,16 @@ object KafkaApis {
   private[server] def sizeOfThrottledPartitions(versionId: Short,
 unconvertedResponse: 
FetchResponse,
 quota: 
ReplicationQuotaManager): Int = {
-FetchResponse.sizeOf(versionId, unconvertedResponse.responseData.entrySet
-  .iterator.asScala.filter(element => 
quota.isThrottled(element.getKey)).asJava)
+val topicResponses = new 
util.ArrayList[FetchResponseData.FetchableTopicResponse]()

Review comment:
   > The previous version did not.
   
   It created copy before since we have to add a collection to auto-generated 
data to calculate the size. see 
https://github.com/apache/kafka/blob/trunk/clients/src/main/java/org/apache/kafka/common/requests/FetchResponse.java#L133





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] chia7712 commented on a change in pull request #10269: KAFKA-12410 KafkaAPis ought to group fetch data before generating fet…

2021-03-05 Thread GitBox


chia7712 commented on a change in pull request #10269:
URL: https://github.com/apache/kafka/pull/10269#discussion_r588138822



##
File path: core/src/main/scala/kafka/server/KafkaApis.scala
##
@@ -811,31 +821,40 @@ class KafkaApis(val requestChannel: RequestChannel,
   .setRecords(data.records)
   
.setPreferredReadReplica(data.preferredReadReplica.getOrElse(FetchResponse.INVALID_PREFERRED_REPLICA_ID))
 data.divergingEpoch.foreach(partitionData.setDivergingEpoch)
-partitions.put(tp, partitionData)
+addPartition(tp.topic, partitionData)
   }
-  erroneous.foreach { case (tp, data) => partitions.put(tp, data) }
-
-  var unconvertedFetchResponse: FetchResponse = null
+  erroneous.foreach { case (tp, data) => addPartition(tp.topic, data) }
 
-  def createResponse(throttleTimeMs: Int): FetchResponse = {
+  def createResponse(unconvertedFetchResponse: FetchResponse, 
throttleTimeMs: Int): FetchResponse = {
 // Down-convert messages for each partition if required
-val convertedData = new util.LinkedHashMap[TopicPartition, 
FetchResponseData.PartitionData]
-unconvertedFetchResponse.responseData.forEach { (tp, 
unconvertedPartitionData) =>
-  val error = Errors.forCode(unconvertedPartitionData.errorCode)
-  if (error != Errors.NONE)
-debug(s"Fetch request with correlation id 
${request.header.correlationId} from client $clientId " +
-  s"on partition $tp failed due to ${error.exceptionName}")
-  convertedData.put(tp, maybeConvertFetchedData(tp, 
unconvertedPartitionData))
+val convertedResponse = new FetchResponseData()

Review comment:
   done





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] chia7712 commented on a change in pull request #10269: KAFKA-12410 KafkaAPis ought to group fetch data before generating fet…

2021-03-04 Thread GitBox


chia7712 commented on a change in pull request #10269:
URL: https://github.com/apache/kafka/pull/10269#discussion_r588095220



##
File path: core/src/main/scala/kafka/server/KafkaApis.scala
##
@@ -811,31 +821,40 @@ class KafkaApis(val requestChannel: RequestChannel,
   .setRecords(data.records)
   
.setPreferredReadReplica(data.preferredReadReplica.getOrElse(FetchResponse.INVALID_PREFERRED_REPLICA_ID))
 data.divergingEpoch.foreach(partitionData.setDivergingEpoch)
-partitions.put(tp, partitionData)
+addPartition(tp.topic, partitionData)
   }
-  erroneous.foreach { case (tp, data) => partitions.put(tp, data) }
-
-  var unconvertedFetchResponse: FetchResponse = null
+  erroneous.foreach { case (tp, data) => addPartition(tp.topic, data) }
 
-  def createResponse(throttleTimeMs: Int): FetchResponse = {
+  def createResponse(unconvertedFetchResponse: FetchResponse, 
throttleTimeMs: Int): FetchResponse = {
 // Down-convert messages for each partition if required
-val convertedData = new util.LinkedHashMap[TopicPartition, 
FetchResponseData.PartitionData]
-unconvertedFetchResponse.responseData.forEach { (tp, 
unconvertedPartitionData) =>
-  val error = Errors.forCode(unconvertedPartitionData.errorCode)
-  if (error != Errors.NONE)
-debug(s"Fetch request with correlation id 
${request.header.correlationId} from client $clientId " +
-  s"on partition $tp failed due to ${error.exceptionName}")
-  convertedData.put(tp, maybeConvertFetchedData(tp, 
unconvertedPartitionData))
+val convertedResponse = new FetchResponseData()
+  .setErrorCode(unconvertedFetchResponse.error.code)
+  .setThrottleTimeMs(throttleTimeMs)
+  .setSessionId(unconvertedFetchResponse.sessionId)
+  .setResponses(new 
util.ArrayList[FetchResponseData.FetchableTopicResponse](unconvertedFetchResponse.data.responses.size))
+unconvertedFetchResponse.data.responses.forEach { unconvertedTopicData 
=>
+  val convertedTopicResponse = new 
FetchResponseData.FetchableTopicResponse()
+.setTopic(unconvertedTopicData.topic)
+.setPartitions(new 
util.ArrayList[FetchResponseData.PartitionData](unconvertedTopicData.partitions.size))
+  convertedResponse.responses.add(convertedTopicResponse)
+  unconvertedTopicData.partitions.forEach { unconvertedPartitionData =>
+val tp = new TopicPartition(unconvertedTopicData.topic, 
unconvertedPartitionData.partitionIndex)
+val error = Errors.forCode(unconvertedPartitionData.errorCode)
+if (error != Errors.NONE)

Review comment:
   make sense. will copy that.





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] chia7712 commented on a change in pull request #10269: KAFKA-12410 KafkaAPis ought to group fetch data before generating fet…

2021-03-04 Thread GitBox


chia7712 commented on a change in pull request #10269:
URL: https://github.com/apache/kafka/pull/10269#discussion_r588094923



##
File path: core/src/main/scala/kafka/server/KafkaApis.scala
##
@@ -811,31 +821,40 @@ class KafkaApis(val requestChannel: RequestChannel,
   .setRecords(data.records)
   
.setPreferredReadReplica(data.preferredReadReplica.getOrElse(FetchResponse.INVALID_PREFERRED_REPLICA_ID))
 data.divergingEpoch.foreach(partitionData.setDivergingEpoch)
-partitions.put(tp, partitionData)
+addPartition(tp.topic, partitionData)
   }
-  erroneous.foreach { case (tp, data) => partitions.put(tp, data) }
-
-  var unconvertedFetchResponse: FetchResponse = null
+  erroneous.foreach { case (tp, data) => addPartition(tp.topic, data) }
 
-  def createResponse(throttleTimeMs: Int): FetchResponse = {
+  def createResponse(unconvertedFetchResponse: FetchResponse, 
throttleTimeMs: Int): FetchResponse = {
 // Down-convert messages for each partition if required
-val convertedData = new util.LinkedHashMap[TopicPartition, 
FetchResponseData.PartitionData]
-unconvertedFetchResponse.responseData.forEach { (tp, 
unconvertedPartitionData) =>
-  val error = Errors.forCode(unconvertedPartitionData.errorCode)
-  if (error != Errors.NONE)
-debug(s"Fetch request with correlation id 
${request.header.correlationId} from client $clientId " +
-  s"on partition $tp failed due to ${error.exceptionName}")
-  convertedData.put(tp, maybeConvertFetchedData(tp, 
unconvertedPartitionData))
+val convertedResponse = new FetchResponseData()
+  .setErrorCode(unconvertedFetchResponse.error.code)
+  .setThrottleTimeMs(throttleTimeMs)
+  .setSessionId(unconvertedFetchResponse.sessionId)
+  .setResponses(new 
util.ArrayList[FetchResponseData.FetchableTopicResponse](unconvertedFetchResponse.data.responses.size))
+unconvertedFetchResponse.data.responses.forEach { unconvertedTopicData 
=>
+  val convertedTopicResponse = new 
FetchResponseData.FetchableTopicResponse()
+.setTopic(unconvertedTopicData.topic)
+.setPartitions(new 
util.ArrayList[FetchResponseData.PartitionData](unconvertedTopicData.partitions.size))
+  convertedResponse.responses.add(convertedTopicResponse)
+  unconvertedTopicData.partitions.forEach { unconvertedPartitionData =>
+val tp = new TopicPartition(unconvertedTopicData.topic, 
unconvertedPartitionData.partitionIndex)

Review comment:
   yep, we made many copies when processing fetch request.
   
   1. merge responsePartitionData with errors
   1. down-convert fetch data 
   1. regroup data
   
   This patch want to fix 3. and others are traced by 
https://issues.apache.org/jira/browse/KAFKA-12387
   
   At any rate, I will take a look at other cases if it does not bring a big 
patch.





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] chia7712 commented on a change in pull request #10269: KAFKA-12410 KafkaAPis ought to group fetch data before generating fet…

2021-03-04 Thread GitBox


chia7712 commented on a change in pull request #10269:
URL: https://github.com/apache/kafka/pull/10269#discussion_r588091547



##
File path: core/src/main/scala/kafka/server/KafkaApis.scala
##
@@ -811,31 +821,40 @@ class KafkaApis(val requestChannel: RequestChannel,
   .setRecords(data.records)
   
.setPreferredReadReplica(data.preferredReadReplica.getOrElse(FetchResponse.INVALID_PREFERRED_REPLICA_ID))
 data.divergingEpoch.foreach(partitionData.setDivergingEpoch)
-partitions.put(tp, partitionData)
+addPartition(tp.topic, partitionData)
   }
-  erroneous.foreach { case (tp, data) => partitions.put(tp, data) }
-
-  var unconvertedFetchResponse: FetchResponse = null
+  erroneous.foreach { case (tp, data) => addPartition(tp.topic, data) }

Review comment:
   `erroneous` is not map type so we can't use `forKeyValue`.





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] chia7712 commented on a change in pull request #10269: KAFKA-12410 KafkaAPis ought to group fetch data before generating fet…

2021-03-04 Thread GitBox


chia7712 commented on a change in pull request #10269:
URL: https://github.com/apache/kafka/pull/10269#discussion_r588090869



##
File path: core/src/main/scala/kafka/server/KafkaApis.scala
##
@@ -811,31 +821,40 @@ class KafkaApis(val requestChannel: RequestChannel,
   .setRecords(data.records)
   
.setPreferredReadReplica(data.preferredReadReplica.getOrElse(FetchResponse.INVALID_PREFERRED_REPLICA_ID))
 data.divergingEpoch.foreach(partitionData.setDivergingEpoch)
-partitions.put(tp, partitionData)
+addPartition(tp.topic, partitionData)
   }
-  erroneous.foreach { case (tp, data) => partitions.put(tp, data) }
-
-  var unconvertedFetchResponse: FetchResponse = null
+  erroneous.foreach { case (tp, data) => addPartition(tp.topic, data) }
 
-  def createResponse(throttleTimeMs: Int): FetchResponse = {
+  def createResponse(unconvertedFetchResponse: FetchResponse, 
throttleTimeMs: Int): FetchResponse = {
 // Down-convert messages for each partition if required
-val convertedData = new util.LinkedHashMap[TopicPartition, 
FetchResponseData.PartitionData]
-unconvertedFetchResponse.responseData.forEach { (tp, 
unconvertedPartitionData) =>
-  val error = Errors.forCode(unconvertedPartitionData.errorCode)
-  if (error != Errors.NONE)
-debug(s"Fetch request with correlation id 
${request.header.correlationId} from client $clientId " +
-  s"on partition $tp failed due to ${error.exceptionName}")
-  convertedData.put(tp, maybeConvertFetchedData(tp, 
unconvertedPartitionData))
+val convertedResponse = new FetchResponseData()
+  .setErrorCode(unconvertedFetchResponse.error.code)
+  .setThrottleTimeMs(throttleTimeMs)
+  .setSessionId(unconvertedFetchResponse.sessionId)
+  .setResponses(new 
util.ArrayList[FetchResponseData.FetchableTopicResponse](unconvertedFetchResponse.data.responses.size))
+unconvertedFetchResponse.data.responses.forEach { unconvertedTopicData 
=>
+  val convertedTopicResponse = new 
FetchResponseData.FetchableTopicResponse()
+.setTopic(unconvertedTopicData.topic)
+.setPartitions(new 
util.ArrayList[FetchResponseData.PartitionData](unconvertedTopicData.partitions.size))
+  convertedResponse.responses.add(convertedTopicResponse)
+  unconvertedTopicData.partitions.forEach { unconvertedPartitionData =>
+val tp = new TopicPartition(unconvertedTopicData.topic, 
unconvertedPartitionData.partitionIndex)
+val error = Errors.forCode(unconvertedPartitionData.errorCode)
+if (error != Errors.NONE)

Review comment:
   the `error` is used by logging also.





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] chia7712 commented on a change in pull request #10269: KAFKA-12410 KafkaAPis ought to group fetch data before generating fet…

2021-03-04 Thread GitBox


chia7712 commented on a change in pull request #10269:
URL: https://github.com/apache/kafka/pull/10269#discussion_r588050163



##
File path: core/src/main/scala/kafka/server/KafkaApis.scala
##
@@ -795,7 +795,17 @@ class KafkaApis(val requestChannel: RequestChannel,
 
 // the callback for process a fetch response, invoked before throttling
 def processResponseCallback(responsePartitionData: Seq[(TopicPartition, 
FetchPartitionData)]): Unit = {
-  val partitions = new util.LinkedHashMap[TopicPartition, 
FetchResponseData.PartitionData]
+  val topicResponses = new 
util.ArrayList[FetchResponseData.FetchableTopicResponse]()

Review comment:
   This is the main purpose of this PR. `KafkaApis` keeps grouped data.





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] chia7712 commented on a change in pull request #10269: KAFKA-12410 KafkaAPis ought to group fetch data before generating fet…

2021-03-04 Thread GitBox


chia7712 commented on a change in pull request #10269:
URL: https://github.com/apache/kafka/pull/10269#discussion_r588049874



##
File path: core/src/main/scala/kafka/server/FetchSession.scala
##
@@ -294,14 +294,24 @@ trait FetchContext extends Logging {
 */
   def updateAndGenerateResponseData(updates: FetchSession.RESP_MAP): 
FetchResponse
 
-  def partitionsToLogString(partitions: util.Collection[TopicPartition]): 
String =
-FetchSession.partitionsToLogString(partitions, isTraceEnabled)
+  def partitionsToLogString(topics: FetchSession.RESP_MAP): String = {

Review comment:
   this method is used to log (DEBUG level) so it should be fine to iterate 
through whole collection.  





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] chia7712 commented on a change in pull request #10269: KAFKA-12410 KafkaAPis ought to group fetch data before generating fet…

2021-03-04 Thread GitBox


chia7712 commented on a change in pull request #10269:
URL: https://github.com/apache/kafka/pull/10269#discussion_r588049196



##
File path: core/src/main/scala/kafka/server/FetchSession.scala
##
@@ -411,54 +436,40 @@ class IncrementalFetchContext(private val time: Time,
 }
   }
 
-  // Iterator that goes over the given partition map and selects partitions 
that need to be included in the response.
-  // If updateFetchContextAndRemoveUnselected is set to true, the fetch 
context will be updated for the selected
-  // partitions and also remove unselected ones as they are encountered.
-  private class PartitionIterator(val iter: FetchSession.RESP_MAP_ITER,

Review comment:
   The `iterator` is unnecessary since we have to generate `list` 
collection in order to calculate message size.





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org