[GitHub] [kafka] abbccdda commented on a change in pull request #9103: KAFKA-10181: Add redirection for (Incremental)AlterConfig, AlterClientQuota and CreateTopics
abbccdda commented on a change in pull request #9103: URL: https://github.com/apache/kafka/pull/9103#discussion_r493209974 ## File path: core/src/main/scala/kafka/api/ApiVersion.scala ## @@ -103,6 +103,9 @@ object ApiVersion { KAFKA_2_7_IV0, // Bup Fetch protocol for Raft protocol (KIP-595) KAFKA_2_7_IV1, +// Enable redirection (KIP-590) +// TODO: remove this IBP in the 2.7 release if redirection work could not be done before the freeze Review comment: But in case we release AK 2.7, wouldn't this flag give user the confidence to upgrade to, which we don't want to happen? This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] michael-carter-instaclustr commented on pull request #8844: KAFKA-9887 fix failed task or connector count on startup failure
michael-carter-instaclustr commented on pull request #8844: URL: https://github.com/apache/kafka/pull/8844#issuecomment-697142928 Hey @C0urante, I'm not sure how long it usually takes for a committer to get around to looking at things like this, maybe several months is normal, but just thought I'd check with you, do you have any tips on how to get a committers attention? This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] abbccdda commented on a change in pull request #9103: KAFKA-10181: Add redirection for (Incremental)AlterConfig, AlterClientQuota and CreateTopics
abbccdda commented on a change in pull request #9103: URL: https://github.com/apache/kafka/pull/9103#discussion_r493206068 ## File path: clients/src/main/java/org/apache/kafka/clients/admin/AlterConfigsUtil.java ## @@ -0,0 +1,52 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.kafka.clients.admin; + +import org.apache.kafka.common.config.ConfigResource; +import org.apache.kafka.common.message.IncrementalAlterConfigsRequestData; + +import java.util.Collection; +import java.util.Map; + +public class AlterConfigsUtil { + +public static IncrementalAlterConfigsRequestData generateIncrementalRequestData(final Map> configs, + final boolean validateOnly) { +return generateIncrementalRequestData(configs.keySet(), configs, validateOnly); +} + +public static IncrementalAlterConfigsRequestData generateIncrementalRequestData(final Collection resources, Review comment: The primary reason is that we would trigger the disallowed import if we do it in the request builder: ``` [ant:checkstyle] [ERROR] /Users/boyang.chen/code/kafka/clients/src/main/java/org/apache/kafka/common/requests/IncrementalAlterConfigsRequest.java:20:1: Disallowed import - org.apache.kafka.clients.admin.AlterConfigOp. [ImportControl] ``` Let me check if we could make exceptions here This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] huxihx commented on pull request #9218: MINOR: Fix shouldNotResetEpochHistoryHeadIfUndefinedPassed
huxihx commented on pull request #9218: URL: https://github.com/apache/kafka/pull/9218#issuecomment-697095836 retest this please. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] hachikuji commented on a change in pull request #9103: KAFKA-10181: Add redirection for (Incremental)AlterConfig, AlterClientQuota and CreateTopics
hachikuji commented on a change in pull request #9103: URL: https://github.com/apache/kafka/pull/9103#discussion_r493160927 ## File path: clients/src/main/java/org/apache/kafka/clients/admin/AlterConfigsUtil.java ## @@ -0,0 +1,52 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.kafka.clients.admin; + +import org.apache.kafka.common.config.ConfigResource; +import org.apache.kafka.common.message.IncrementalAlterConfigsRequestData; + +import java.util.Collection; +import java.util.Map; + +public class AlterConfigsUtil { + +public static IncrementalAlterConfigsRequestData generateIncrementalRequestData(final Map> configs, + final boolean validateOnly) { +return generateIncrementalRequestData(configs.keySet(), configs, validateOnly); +} + +public static IncrementalAlterConfigsRequestData generateIncrementalRequestData(final Collection resources, Review comment: nit: might be useful to document the expectation that `resources` is a subset of the key set of `configs`. The signature surprised me a little bit. As an aside, this kind of convenience conversion seems more appropriate for `IncrementalAlterConfigsRequest.Builder` rather than a static class. ## File path: core/src/main/scala/kafka/api/ApiVersion.scala ## @@ -103,6 +103,9 @@ object ApiVersion { KAFKA_2_7_IV0, // Bup Fetch protocol for Raft protocol (KIP-595) KAFKA_2_7_IV1, +// Enable redirection (KIP-590) +// TODO: remove this IBP in the 2.7 release if redirection work could not be done before the freeze Review comment: Get rid of this TODO. We do not need to remove IBP internal versions. ## File path: core/src/main/scala/kafka/common/InterBrokerSendThread.scala ## @@ -147,7 +147,9 @@ abstract class InterBrokerSendThread(name: String, case class RequestAndCompletionHandler(destination: Node, request: AbstractRequest.Builder[_ <: AbstractRequest], - handler: RequestCompletionHandler) + handler: RequestCompletionHandler, + initialPrincipalName: String = null, Review comment: nit: why don't we add a case class and make this optional. for example: ```scala case class InitialPrincipal(name: String, clientId: String) ``` In addition to reducing parameters, that makes the expectation that both are provided explicit. ## File path: core/src/main/scala/kafka/server/KafkaApis.scala ## @@ -117,6 +117,89 @@ class KafkaApis(val requestChannel: RequestChannel, val adminZkClient = new AdminZkClient(zkClient) private val alterAclsPurgatory = new DelayedFuturePurgatory(purgatoryName = "AlterAcls", brokerId = config.brokerId) + /** + * The template to create a forward request handler. + * + * @tparam T request type + * @tparam R response type + * @tparam RK resource key + * @tparam RV resource value + */ + private[server] abstract class ForwardRequestHandler[T <: AbstractRequest, +R <: AbstractResponse, RK, RV](request: RequestChannel.Request) extends Logging { + +/** + * Split the given resource into authorized and unauthorized sets. + * + * @return authorized resources and unauthorized resources + */ +def resourceSplitByAuthorization(request: T): (Map[RK, RV], Map[RK, ApiError]) + +/** + * Controller handling logic of the request. + */ +def process(authorizedResources: Map[RK, RV], +unauthorizedResult: Map[RK, ApiError], +request: T): Unit + +/** + * Build a forward request to the controller. + * + * @param authorizedResources authorized resources by the forwarding broker + * @param request the original request + * @return forward request builder + */ +def createRequestBuilder(authorizedResources: Map[RK, RV], + request: T): AbstractRequest.Builder[T] + +/** + * Merge the forward response with the previously unauthorized
[GitHub] [kafka] ijuma commented on a change in pull request #4090: [KAFKA-6084] Propagate JSON parsing errors in ReassignPartitionsCommand
ijuma commented on a change in pull request #4090: URL: https://github.com/apache/kafka/pull/4090#discussion_r492811318 ## File path: core/src/main/scala/kafka/utils/Json.scala ## @@ -69,8 +70,11 @@ object Json { * @return An `Either` which in case of `Left` means an exception and `Right` is the actual return value. */ def tryParseFull(input: String): Either[JsonProcessingException, JsonValue] = -try Right(mapper.readTree(input)).map(JsonValue(_)) -catch { case e: JsonProcessingException => Left(e) } +if (input != null && input.isEmpty) Review comment: I think we should say `if (input == null || input.isEmpty)` instead. It's unexpected to get an exception instead of `Left` in a method that returns `Either`. ## File path: core/src/main/scala/kafka/utils/Json.scala ## @@ -69,8 +70,11 @@ object Json { * @return An `Either` which in case of `Left` means an exception and `Right` is the actual return value. */ def tryParseFull(input: String): Either[JsonProcessingException, JsonValue] = -try Right(mapper.readTree(input)).map(JsonValue(_)) -catch { case e: JsonProcessingException => Left(e) } +if (input != null && input.isEmpty) + Left(new JsonParseException(MissingNode.getInstance().traverse(), "The input string shouldn't be empty")) Review comment: This seems fine. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] ijuma commented on a change in pull request #9299: MINOR: Use `Map.foreachKv` to avoid tuple allocation in Scala 2.13
ijuma commented on a change in pull request #9299: URL: https://github.com/apache/kafka/pull/9299#discussion_r492361262 ## File path: core/src/main/scala/kafka/utils/Implicits.scala ## @@ -46,4 +47,21 @@ object Implicits { } + /** + * Exposes `foreachKv` which maps to `foreachEntry` in Scala 2.13 and `foreach` in Scala 2.12 + * (with the help of scala.collection.compat). `foreachEntry` avoids the tuple allocation and + * is more efficient. + * + * This was not named `foreachEntry` to avoid `unused import` warnings in Scala 2.13 (the implicit + * would not be triggered in Scala 2.13 since `Map.foreachEntry` would have precedence). + */ + @nowarn("cat=unused-imports") + implicit class MapExtensionMethods[K, V](private val self: scala.collection.Map[K, V]) extends AnyVal { +import scala.collection.compat._ +def foreachKv[U](f: (K, V) => U): Unit = { Review comment: Checked with @hachikuji and he's fine with this change. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] ijuma commented on pull request #4090: [KAFKA-6084] Propagate JSON parsing errors in ReassignPartitionsCommand
ijuma commented on pull request #4090: URL: https://github.com/apache/kafka/pull/4090#issuecomment-696784662 Can you also rebase please so that the PR builder runs? This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] viktorsomogyi commented on a change in pull request #4090: [KAFKA-6084] Propagate JSON parsing errors in ReassignPartitionsCommand
viktorsomogyi commented on a change in pull request #4090: URL: https://github.com/apache/kafka/pull/4090#discussion_r492704280 ## File path: core/src/test/scala/unit/kafka/utils/JsonTest.scala ## @@ -40,25 +40,34 @@ class JsonTest { def testJsonParse(): Unit = { val jnf = JsonNodeFactory.instance -assertEquals(Json.parseFull("{}"), Some(JsonValue(new ObjectNode(jnf +assertEquals(Some(JsonValue(new ObjectNode(jnf))), Json.parseFull("{}")) +assertEquals(Right(JsonValue(new ObjectNode(jnf))), Json.tryParseFull("{}")) +assertThrows(classOf[IllegalArgumentException], () => Json.tryParseFull(null)) +assertThrows(classOf[IllegalArgumentException], () => Json.tryParseBytes(null)) -assertEquals(Json.parseFull("""{"foo":"bar"s}"""), None) +assertEquals(Option(MissingNode.getInstance()).map(JsonValue(_)), Json.parseFull("")) +assertEquals(Right(MissingNode.getInstance()).map(JsonValue(_)), Json.tryParseFull("")) Review comment: You're right, it makes sense to return `None` and `Left` in those cases. Will change it and upload it to this PR. ## File path: core/src/main/scala/kafka/utils/Json.scala ## @@ -69,8 +70,11 @@ object Json { * @return An `Either` which in case of `Left` means an exception and `Right` is the actual return value. */ def tryParseFull(input: String): Either[JsonProcessingException, JsonValue] = -try Right(mapper.readTree(input)).map(JsonValue(_)) -catch { case e: JsonProcessingException => Left(e) } +if (input != null && input.isEmpty) Review comment: Wasn't sure what to do here but `null` is checked in `readTree` and an IllegalArgumentException is thrown so I decided to go skip here and leave the original behavior. ## File path: core/src/main/scala/kafka/utils/Json.scala ## @@ -69,8 +70,11 @@ object Json { * @return An `Either` which in case of `Left` means an exception and `Right` is the actual return value. */ def tryParseFull(input: String): Either[JsonProcessingException, JsonValue] = -try Right(mapper.readTree(input)).map(JsonValue(_)) -catch { case e: JsonProcessingException => Left(e) } +if (input != null && input.isEmpty) Review comment: Wasn't sure what to do here but `null` is checked in `readTree` and an IllegalArgumentException is thrown so I decided to skip here and leave the original behavior. ## File path: core/src/main/scala/kafka/utils/Json.scala ## @@ -69,8 +70,11 @@ object Json { * @return An `Either` which in case of `Left` means an exception and `Right` is the actual return value. */ def tryParseFull(input: String): Either[JsonProcessingException, JsonValue] = -try Right(mapper.readTree(input)).map(JsonValue(_)) -catch { case e: JsonProcessingException => Left(e) } +if (input != null && input.isEmpty) + Left(new JsonParseException(MissingNode.getInstance().traverse(), "The input string shouldn't be empty")) Review comment: It seems like that the most adequate JsonProcessingException is JsonParseException that could apply here. Another possibility is to change the exception in return type to something like IOException and throw that here but it seems it's more vague than throwing JsonParseException. Another option is to simple throw IllegalArgumentException here instead of Left. That would correspond to the `readTree(InputStream)` as well. What do you think? This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] ijuma commented on pull request #9266: KAFKA-10469: Resolve logger levels hierarchically
ijuma commented on pull request #9266: URL: https://github.com/apache/kafka/pull/9266#issuecomment-696784359 @tombentley `testDescribeConfigsForLog4jLogLevels` is failing. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] mjsax commented on pull request #9251: KAFKA-10459: Document IQ APIs where order does not hold between stores
mjsax commented on pull request #9251: URL: https://github.com/apache/kafka/pull/9251#issuecomment-696467547 Thanks for the PR @showuon! This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] ableegoldman commented on pull request #9312: KAFKA-10505: Fix parsing of generation log string.
ableegoldman commented on pull request #9312: URL: https://github.com/apache/kafka/pull/9312#issuecomment-696925826 Original system test run was aborted for some reason. Kicked off a new set: https://jenkins.confluent.io/job/system-test-kafka-branch-builder/4182/ This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] mjsax commented on pull request #9156: KAFKA-10077: Filter downstream of state-store results in spurious tombstones
mjsax commented on pull request #9156: URL: https://github.com/apache/kafka/pull/9156#issuecomment-696823406 Retest this please. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] mjsax merged pull request #9251: KAFKA-10459: Document IQ APIs where order does not hold between stores
mjsax merged pull request #9251: URL: https://github.com/apache/kafka/pull/9251 This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] gmunozfe commented on pull request #9309: KAFKA-10503: MockProducer doesn't throw ClassCastException when no
gmunozfe commented on pull request #9309: URL: https://github.com/apache/kafka/pull/9309#issuecomment-696995620 @mjsax @guozhangwang @leonardge could you review? thanks! This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] vvcephei commented on pull request #8892: KAFKA-10068: verify assignment performance with large cluster
vvcephei commented on pull request #8892: URL: https://github.com/apache/kafka/pull/8892#issuecomment-696752069 Thanks, @ableegoldman ! There was only one unrelated test failure: `Build / JDK 11 / kafka.network.DynamicConnectionQuotaTest.testDynamicListenerConnectionCreationRateQuota` This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] abbccdda commented on a change in pull request #9270: [WIP] KAFKA-10284: Group membership update due to static member rejoin should be persisted
abbccdda commented on a change in pull request #9270: URL: https://github.com/apache/kafka/pull/9270#discussion_r492896997 ## File path: core/src/test/scala/unit/kafka/coordinator/group/GroupCoordinatorTest.scala ## @@ -3882,6 +3942,21 @@ class GroupCoordinatorTest { Await.result(responseFuture, Duration(rebalanceTimeout + 100, TimeUnit.MILLISECONDS)) } + private def staticJoinGroupWithPersistence(groupId: String, + memberId: String, Review comment: nit: alignment ## File path: core/src/main/scala/kafka/coordinator/group/GroupCoordinator.scala ## @@ -1040,21 +1040,36 @@ class GroupCoordinator(val brokerId: Int, group.currentState match { case Stable => -info(s"Static member joins during Stable stage will not trigger rebalance.") -group.maybeInvokeJoinCallback(member, JoinGroupResult( - members = List.empty, - memberId = newMemberId, - generationId = group.generationId, - protocolType = group.protocolType, - protocolName = group.protocolName, - // We want to avoid current leader performing trivial assignment while the group - // is in stable stage, because the new assignment in leader's next sync call - // won't be broadcast by a stable group. This could be guaranteed by - // always returning the old leader id so that the current leader won't assume itself - // as a leader based on the returned message, since the new member.id won't match - // returned leader id, therefore no assignment will be performed. - leaderId = currentLeader, - error = Errors.NONE)) +// check if group's selectedProtocol of next generation will change, if not, simply store group to persist the +// updated static member, if yes, rebalance should be triggered to let the group's assignment and selectProtocol consistent +val selectedProtocolOfNextGeneration = group.selectProtocol +if (group.protocolName.contains(selectedProtocolOfNextGeneration)) { + info(s"Static member which joins during Stable stage and doesn't affect selectProtocol will not trigger rebalance.") + val groupAssignment: Map[String, Array[Byte]] = group.allMemberMetadata.map(member => member.memberId -> member.assignment).toMap + groupManager.storeGroup(group, groupAssignment, error => { +group.inLock { + if (error != Errors.NONE) { +warn(s"Failed to persist metadata for group ${group.groupId}: ${error.message}") + } +} + }) + group.maybeInvokeJoinCallback(member, JoinGroupResult( +members = List.empty, +memberId = newMemberId, +generationId = group.generationId, +protocolType = group.protocolType, +protocolName = group.protocolName, +// We want to avoid current leader performing trivial assignment while the group +// is in stable stage, because the new assignment in leader's next sync call +// won't be broadcast by a stable group. This could be guaranteed by +// always returning the old leader id so that the current leader won't assume itself +// as a leader based on the returned message, since the new member.id won't match +// returned leader id, therefore no assignment will be performed. +leaderId = currentLeader, +error = Errors.NONE)) +} else { + maybePrepareRebalance(group, s"Group's selectedProtocol will change because static member ${member.memberId} with instance id $groupInstanceId joined with change of protocol") Review comment: Could you elaborate why this case is possible? We do have checks for `!group.supportsProtocols(protocolType, MemberMetadata.plainProtocolSet(protocols)` in the caller, so if the group protocol is incompatible, won't we just reject the rejoin? ## File path: core/src/main/scala/kafka/coordinator/group/GroupCoordinator.scala ## @@ -1040,21 +1040,36 @@ class GroupCoordinator(val brokerId: Int, group.currentState match { case Stable => -info(s"Static member joins during Stable stage will not trigger rebalance.") -group.maybeInvokeJoinCallback(member, JoinGroupResult( - members = List.empty, - memberId = newMemberId, - generationId = group.generationId, - protocolType = group.protocolType, - protocolName = group.protocolName, - // We want to avoid current leader performing trivial assignment while the group - // is in stable stage, because the new assignment in leader's next sync call - // won't be broadcast by a stable group. This could be guaranteed by - // always returning the old leader id so that the current leader won't assume itself
[GitHub] [kafka] chia7712 commented on pull request #9162: MINOR: refactor Log to get rid of "return" in nested anonymous function
chia7712 commented on pull request #9162: URL: https://github.com/apache/kafka/pull/9162#issuecomment-696523232 ``` Build / JDK 15 / org.apache.kafka.streams.integration.EosBetaUpgradeIntegrationTest.shouldUpgradeFromEosAlphaToEosBeta[true] Build / JDK 8 / kafka.api.ConsumerBounceTest.testClose ``` unrelated failure. rebase to trigger QA This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] guozhangwang commented on a change in pull request #9083: KAFKA-9450: Follow-up; Forbid process after closed
guozhangwang commented on a change in pull request #9083: URL: https://github.com/apache/kafka/pull/9083#discussion_r492944504 ## File path: streams/src/main/java/org/apache/kafka/streams/processor/internals/ProcessorContextImpl.java ## @@ -217,12 +217,12 @@ public StateStore getStateStore(final String name) { forward((ProcessorNode) child, key, value); } } else { -final ProcessorNode child = currentNode().getChild(sendTo); Review comment: I made the change in ProcessorNode to add back the template types: https://github.com/apache/kafka/pull/9083/files/82b6f6f5d238401097e0906c8135c5c189524666#diff-705bfd0ed3f214048b76d775708cc7d2L96 But since `currentNode()`'s template is `` its templated `getChild` and that's why I need to weaken it here --- as you can see from the above `if` branch, it now aligns consistently on the typing. ## File path: streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamFlatTransformValues.java ## @@ -44,19 +45,18 @@ public KStreamFlatTransformValues(final ValueTransformerWithKeySupplier implements Processor { +public static class KStreamFlatTransformValuesProcessor extends AbstractProcessor { Review comment: As described in at the top, `Let all built-in processors to extend from AbstractProcessor.` The main reason is that AbstractProcessor provides some basic functionalities and hence it's better to let our own impl to base on them. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] hachikuji merged pull request #9130: KAFKA-10492; Core Kafka Raft Implementation (KIP-595)
hachikuji merged pull request #9130: URL: https://github.com/apache/kafka/pull/9130 This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] nizhikov commented on pull request #9312: KAFKA-10505: Fix parsing of generation log string.
nizhikov commented on pull request #9312: URL: https://github.com/apache/kafka/pull/9312#issuecomment-696559508 @mjsax These tests are broken with the 7e7bb184d2abe34280a7f0eb0f0d9fc0e32389f2 Line logging only generationId - https://github.com/apache/kafka/commit/7e7bb184d2abe34280a7f0eb0f0d9fc0e32389f2#diff-30048900127c988a69027b70b0ce4eacL504 New line with the whole Generation object logging - https://github.com/apache/kafka/commit/7e7bb184d2abe34280a7f0eb0f0d9fc0e32389f2#diff-30048900127c988a69027b70b0ce4eacR590 This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] tombentley edited a comment on pull request #9266: KAFKA-10469: Resolve logger levels hierarchically
tombentley edited a comment on pull request #9266: URL: https://github.com/apache/kafka/pull/9266#issuecomment-696622982 @ijuma I've fixed an infinite loop, could you trigger the build again please? This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] abbccdda commented on a change in pull request #9083: KAFKA-9450: Follow-up; Forbid process after closed
abbccdda commented on a change in pull request #9083: URL: https://github.com/apache/kafka/pull/9083#discussion_r492941448 ## File path: streams/src/test/java/org/apache/kafka/streams/processor/internals/StreamTaskTest.java ## @@ -1566,6 +1566,7 @@ public void shouldCheckpointForSuspendedTask() { EasyMock.verify(stateManager); } + Review comment: nit: not necessary ## File path: streams/src/main/java/org/apache/kafka/streams/processor/internals/ProcessorContextImpl.java ## @@ -217,12 +217,12 @@ public StateStore getStateStore(final String name) { forward((ProcessorNode) child, key, value); } } else { -final ProcessorNode child = currentNode().getChild(sendTo); Review comment: +1 ## File path: streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamFlatTransformValues.java ## @@ -44,19 +45,18 @@ public KStreamFlatTransformValues(final ValueTransformerWithKeySupplier implements Processor { +public static class KStreamFlatTransformValuesProcessor extends AbstractProcessor { Review comment: Could you elaborate why this is better than Processor? This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] vvcephei commented on a change in pull request #8892: KAFKA-10068: verify assignment performance with large cluster
vvcephei commented on a change in pull request #8892: URL: https://github.com/apache/kafka/pull/8892#discussion_r492436680 ## File path: streams/src/test/java/org/apache/kafka/streams/processor/internals/assignment/AssignmentTestUtils.java ## @@ -77,13 +86,8 @@ public static final TaskId TASK_2_3 = new TaskId(2, 3); public static final Set EMPTY_TASKS = emptySet(); -public static final List EMPTY_TASK_LIST = emptyList(); -public static final Map EMPTY_TASK_OFFSET_SUMS = emptyMap(); public static final Map EMPTY_CHANGELOG_END_OFFSETS = new HashMap<>(); -private AssignmentTestUtils() {} Review comment: Ah, note the `private`. The purpose of this constructor is to make it uninstantiable. I.e., it's "self-documenting" that it should only be a container for static members. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] tombentley commented on pull request #9266: KAFKA-10469: Resolve logger levels hierarchically
tombentley commented on pull request #9266: URL: https://github.com/apache/kafka/pull/9266#issuecomment-696622982 @ijuma I've fixed an infinite loop, could you trigger the build again pleast? This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] chia7712 commented on a change in pull request #9206: MINOR: rewrite zipWithIndex by normal foreach to refrain unnecessary …
chia7712 commented on a change in pull request #9206: URL: https://github.com/apache/kafka/pull/9206#discussion_r492834735 ## File path: core/src/main/scala/kafka/log/LogValidator.scala ## @@ -234,16 +234,17 @@ private[log] object LogValidator extends Logging { val firstBatch = getFirstBatchAndMaybeValidateNoMoreBatches(records, NoCompressionCodec) -for (batch <- records.batches.asScala) { +records.batches.forEach { batch => validateBatch(topicPartition, firstBatch, batch, origin, toMagicValue, brokerTopicStats) val recordErrors = new ArrayBuffer[ApiRecordError](0) - for ((record, batchIndex) <- batch.asScala.view.zipWithIndex) { + var batchIndex = 0 + batch.forEach { record => validateRecord(batch, topicPartition, record, batchIndex, now, timestampType, timestampDiffMaxMs, compactedTopic, brokerTopicStats).foreach(recordError => recordErrors += recordError) // we fail the batch if any record fails, so we stop appending if any record fails -if (recordErrors.isEmpty) - builder.appendWithOffset(offsetCounter.getAndIncrement(), record) +if (recordErrors.isEmpty) builder.appendWithOffset(offsetCounter.getAndIncrement(), record) +batchIndex += 1 Review comment: > Have we benchmarked this path? I didn't benchmark this path and you are right that optimization is small as we have to convert data in this path. I will revert it to make small patch. ## File path: core/src/main/scala/kafka/log/LogValidator.scala ## @@ -279,14 +280,15 @@ private[log] object LogValidator extends Logging { val firstBatch = getFirstBatchAndMaybeValidateNoMoreBatches(records, NoCompressionCodec) -for (batch <- records.batches.asScala) { +records.batches.forEach { batch => validateBatch(topicPartition, firstBatch, batch, origin, magic, brokerTopicStats) var maxBatchTimestamp = RecordBatch.NO_TIMESTAMP var offsetOfMaxBatchTimestamp = -1L val recordErrors = new ArrayBuffer[ApiRecordError](0) - for ((record, batchIndex) <- batch.asScala.view.zipWithIndex) { + var batchIndex = 0 Review comment: copy that ## File path: core/src/main/scala/kafka/log/LogValidator.scala ## @@ -234,17 +234,16 @@ private[log] object LogValidator extends Logging { val firstBatch = getFirstBatchAndMaybeValidateNoMoreBatches(records, NoCompressionCodec) -records.batches.forEach { batch => +for (batch <- records.batches.asScala) { Review comment: ok~ This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] vvcephei merged pull request #8892: KAFKA-10068: verify assignment performance with large cluster
vvcephei merged pull request #8892: URL: https://github.com/apache/kafka/pull/8892 This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] mjsax commented on pull request #9312: KAFKA-10505: Fix parsing of generation log string.
mjsax commented on pull request #9312: URL: https://github.com/apache/kafka/pull/9312#issuecomment-696464404 @nizhikov @ableegoldman Do we know when this broke? What was the old log line and what is the new one? Triggered system test run: https://jenkins.confluent.io/job/system-test-kafka-branch-builder/4181/ This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] chia7712 commented on pull request #9206: MINOR: rewrite zipWithIndex by normal foreach to refrain unnecessary …
chia7712 commented on pull request #9206: URL: https://github.com/apache/kafka/pull/9206#issuecomment-696522007 rebase to run JMH again. This patch is still a benefit to validation :) This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] abbccdda commented on a change in pull request #9311: KAFKA-9910: Implement new transaction timed out error
abbccdda commented on a change in pull request #9311: URL: https://github.com/apache/kafka/pull/9311#discussion_r491655717 ## File path: clients/src/main/java/org/apache/kafka/common/errors/TransactionTimeoutException.java ## @@ -0,0 +1,43 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.kafka.common.errors; + +/** + * This exception indicates that the last ongoing transaction timed out on the coordinator. + * When encountering this exception, the producer should retry initialization with current epoch. + */ +public class TransactionTimeoutException extends ApiException { Review comment: `TransactionTimedOut` ## File path: clients/src/main/java/org/apache/kafka/common/errors/TransactionTimeoutException.java ## @@ -0,0 +1,43 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.kafka.common.errors; + +/** + * This exception indicates that the last ongoing transaction timed out on the coordinator. + * When encountering this exception, the producer should retry initialization with current epoch. + */ +public class TransactionTimeoutException extends ApiException { + +private static final long serialVersionUID = 1L; + +public TransactionTimeoutException() { +super(); +} + +public TransactionTimeoutException(String message, Throwable cause) { +super(message, cause); +} + +public TransactionTimeoutException(String message) { Review comment: Do we need all other constructors? ## File path: clients/src/test/java/org/apache/kafka/clients/producer/internals/TransactionManagerTest.java ## @@ -1577,6 +1578,59 @@ public void testInvalidProducerEpochConvertToProducerFencedInAddPartitionToTxn() verifyProducerFencedForAddPartitionsToTxn(Errors.INVALID_PRODUCER_EPOCH); } +@Test +public void testTxnTimeoutForAddPartitionsToTxn() throws InterruptedException { +doInitTransactions(); + +transactionManager.beginTransaction(); +transactionManager.failIfNotReadyForSend(); +Future responseFuture = appendToAccumulator(tp0); +transactionManager.maybeAddPartitionToTransaction(tp0); + +assertFalse(responseFuture.isDone()); +prepareAddPartitionsToTxnResponse(Errors.TRANSACTION_TIMED_OUT, tp0, epoch, producerId); + +verifyTxnTimeout(responseFuture); +} + +@Test +public void testTxnTimeoutForAddOffsetsToTxn() throws InterruptedException { +doInitTransactions(); + +transactionManager.beginTransaction(); +transactionManager.failIfNotReadyForSend(); +transactionManager.sendOffsetsToTransaction(Collections.emptyMap(), new ConsumerGroupMetadata(consumerGroupId)); + +Future responseFuture = appendToAccumulator(tp0); + +assertFalse(responseFuture.isDone()); +prepareAddOffsetsToTxnResponse(Errors.TRANSACTION_TIMED_OUT, consumerGroupId, producerId, epoch); + +verifyTxnTimeout(responseFuture); +} + +@Test +public void testTxnTimeoutInEndTxn() throws InterruptedException { + +doInitTransactions(); + +transactionManager.beginTransaction(); +transactionManager.failIfNotReadyForSend(); +transactionManager.maybeAddPartitionToTransaction(tp0); +TransactionalRequestResult commitResult = transactionManager.beginCommit(); + +Future responseFuture =
[GitHub] [kafka] ijuma commented on a change in pull request #9206: MINOR: rewrite zipWithIndex by normal foreach to refrain unnecessary …
ijuma commented on a change in pull request #9206: URL: https://github.com/apache/kafka/pull/9206#discussion_r492821097 ## File path: core/src/main/scala/kafka/log/LogValidator.scala ## @@ -234,16 +234,17 @@ private[log] object LogValidator extends Logging { val firstBatch = getFirstBatchAndMaybeValidateNoMoreBatches(records, NoCompressionCodec) -for (batch <- records.batches.asScala) { +records.batches.forEach { batch => validateBatch(topicPartition, firstBatch, batch, origin, toMagicValue, brokerTopicStats) val recordErrors = new ArrayBuffer[ApiRecordError](0) - for ((record, batchIndex) <- batch.asScala.view.zipWithIndex) { + var batchIndex = 0 + batch.forEach { record => validateRecord(batch, topicPartition, record, batchIndex, now, timestampType, timestampDiffMaxMs, compactedTopic, brokerTopicStats).foreach(recordError => recordErrors += recordError) // we fail the batch if any record fails, so we stop appending if any record fails -if (recordErrors.isEmpty) - builder.appendWithOffset(offsetCounter.getAndIncrement(), record) +if (recordErrors.isEmpty) builder.appendWithOffset(offsetCounter.getAndIncrement(), record) +batchIndex += 1 Review comment: Have we benchmarked this path? It seems doubtful that these micro optimizations help given that we are `converting`. ## File path: core/src/main/scala/kafka/log/LogValidator.scala ## @@ -279,14 +280,15 @@ private[log] object LogValidator extends Logging { val firstBatch = getFirstBatchAndMaybeValidateNoMoreBatches(records, NoCompressionCodec) -for (batch <- records.batches.asScala) { +records.batches.forEach { batch => validateBatch(topicPartition, firstBatch, batch, origin, magic, brokerTopicStats) var maxBatchTimestamp = RecordBatch.NO_TIMESTAMP var offsetOfMaxBatchTimestamp = -1L val recordErrors = new ArrayBuffer[ApiRecordError](0) - for ((record, batchIndex) <- batch.asScala.view.zipWithIndex) { + var batchIndex = 0 Review comment: Worth adding a comment here that this is a hot path and we want to avoid any unnecessary allocations. ## File path: core/src/main/scala/kafka/log/LogValidator.scala ## @@ -234,17 +234,16 @@ private[log] object LogValidator extends Logging { val firstBatch = getFirstBatchAndMaybeValidateNoMoreBatches(records, NoCompressionCodec) -records.batches.forEach { batch => +for (batch <- records.batches.asScala) { Review comment: I liked your changes to make the code more concise, I'd keep them. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] ableegoldman commented on a change in pull request #8892: KAFKA-10068: verify assignment performance with large cluster
ableegoldman commented on a change in pull request #8892: URL: https://github.com/apache/kafka/pull/8892#discussion_r492382332 ## File path: streams/src/test/java/org/apache/kafka/streams/processor/internals/assignment/AssignmentTestUtils.java ## @@ -77,13 +86,8 @@ public static final TaskId TASK_2_3 = new TaskId(2, 3); public static final Set EMPTY_TASKS = emptySet(); -public static final List EMPTY_TASK_LIST = emptyList(); -public static final Map EMPTY_TASK_OFFSET_SUMS = emptyMap(); public static final Map EMPTY_CHANGELOG_END_OFFSETS = new HashMap<>(); -private AssignmentTestUtils() {} Review comment: > Do we need to instantiate this class now No...isn't that exactly the reason we don't need this? ## File path: streams/src/main/java/org/apache/kafka/streams/processor/internals/assignment/ClientState.java ## @@ -288,17 +288,17 @@ public void computeTaskLags(final UUID uuid, final Map allTaskEndO final Long endOffsetSum = taskEntry.getValue(); final Long offsetSum = taskOffsetSums.getOrDefault(task, 0L); -if (endOffsetSum < offsetSum) { +if (offsetSum == Task.LATEST_OFFSET) { +taskLagTotals.put(task, Task.LATEST_OFFSET); +} else if (offsetSum == UNKNOWN_OFFSET_SUM) { +taskLagTotals.put(task, UNKNOWN_OFFSET_SUM); +} else if (endOffsetSum < offsetSum) { Review comment: I don't know about fixing this before, but I found this while debugging these tests and fixed it on the side in this PR. To be fair, it wasn't hurting anything since we happen to put the right thing in the `taskLagTotals` map, but the warning logged was definitely incorrect. ## File path: streams/src/test/java/org/apache/kafka/streams/processor/internals/assignment/AssignmentTestUtils.java ## @@ -77,13 +86,8 @@ public static final TaskId TASK_2_3 = new TaskId(2, 3); public static final Set EMPTY_TASKS = emptySet(); -public static final List EMPTY_TASK_LIST = emptyList(); -public static final Map EMPTY_TASK_OFFSET_SUMS = emptyMap(); public static final Map EMPTY_CHANGELOG_END_OFFSETS = new HashMap<>(); -private AssignmentTestUtils() {} Review comment: Ah yeah ok. I'll put it back This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] piotrrzysko commented on pull request #9315: KAFKA-10496: Removed relying on external DNS servers in tests
piotrrzysko commented on pull request #9315: URL: https://github.com/apache/kafka/pull/9315#issuecomment-696860022 @jolshan @mumrah @dajac Could you please take look at this PR? This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] showuon commented on pull request #9121: KAFKA-10351: add tests for IOExceptions for GlobalStateManagerImpl/OffsetCheckpoint
showuon commented on pull request #9121: URL: https://github.com/apache/kafka/pull/9121#issuecomment-696467921 @mjsax , sorry, I found you're online now. So could you take a look again for the long pending PR? It's 2nd review, so it should be easier. Thanks. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] ijuma merged pull request #9299: MINOR: Use `Map.forKeyValue` to avoid tuple allocation in Scala 2.13
ijuma merged pull request #9299: URL: https://github.com/apache/kafka/pull/9299 This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] chia7712 commented on a change in pull request #9318: KAFKA-10497 Convert group coordinator metadata schemas to use generat…
chia7712 commented on a change in pull request #9318: URL: https://github.com/apache/kafka/pull/9318#discussion_r492696066 ## File path: core/src/main/scala/kafka/coordinator/group/GroupMetadataManager.scala ## @@ -997,189 +997,52 @@ object GroupMetadataManager { val MetricsGroup: String = "group-coordinator-metrics" val LoadTimeSensor: String = "GroupPartitionLoadTime" - private val CURRENT_OFFSET_KEY_SCHEMA_VERSION = 1.toShort - private val CURRENT_GROUP_KEY_SCHEMA_VERSION = 2.toShort - - private val OFFSET_COMMIT_KEY_SCHEMA = new Schema(new Field("group", STRING), -new Field("topic", STRING), -new Field("partition", INT32)) - private val OFFSET_KEY_GROUP_FIELD = OFFSET_COMMIT_KEY_SCHEMA.get("group") - private val OFFSET_KEY_TOPIC_FIELD = OFFSET_COMMIT_KEY_SCHEMA.get("topic") - private val OFFSET_KEY_PARTITION_FIELD = OFFSET_COMMIT_KEY_SCHEMA.get("partition") - - private val OFFSET_COMMIT_VALUE_SCHEMA_V0 = new Schema(new Field("offset", INT64), -new Field("metadata", STRING, "Associated metadata.", ""), -new Field("timestamp", INT64)) - private val OFFSET_VALUE_OFFSET_FIELD_V0 = OFFSET_COMMIT_VALUE_SCHEMA_V0.get("offset") - private val OFFSET_VALUE_METADATA_FIELD_V0 = OFFSET_COMMIT_VALUE_SCHEMA_V0.get("metadata") - private val OFFSET_VALUE_TIMESTAMP_FIELD_V0 = OFFSET_COMMIT_VALUE_SCHEMA_V0.get("timestamp") - - private val OFFSET_COMMIT_VALUE_SCHEMA_V1 = new Schema(new Field("offset", INT64), -new Field("metadata", STRING, "Associated metadata.", ""), -new Field("commit_timestamp", INT64), -new Field("expire_timestamp", INT64)) - private val OFFSET_VALUE_OFFSET_FIELD_V1 = OFFSET_COMMIT_VALUE_SCHEMA_V1.get("offset") - private val OFFSET_VALUE_METADATA_FIELD_V1 = OFFSET_COMMIT_VALUE_SCHEMA_V1.get("metadata") - private val OFFSET_VALUE_COMMIT_TIMESTAMP_FIELD_V1 = OFFSET_COMMIT_VALUE_SCHEMA_V1.get("commit_timestamp") - private val OFFSET_VALUE_EXPIRE_TIMESTAMP_FIELD_V1 = OFFSET_COMMIT_VALUE_SCHEMA_V1.get("expire_timestamp") - - private val OFFSET_COMMIT_VALUE_SCHEMA_V2 = new Schema(new Field("offset", INT64), -new Field("metadata", STRING, "Associated metadata.", ""), -new Field("commit_timestamp", INT64)) - private val OFFSET_VALUE_OFFSET_FIELD_V2 = OFFSET_COMMIT_VALUE_SCHEMA_V2.get("offset") - private val OFFSET_VALUE_METADATA_FIELD_V2 = OFFSET_COMMIT_VALUE_SCHEMA_V2.get("metadata") - private val OFFSET_VALUE_COMMIT_TIMESTAMP_FIELD_V2 = OFFSET_COMMIT_VALUE_SCHEMA_V2.get("commit_timestamp") - - private val OFFSET_COMMIT_VALUE_SCHEMA_V3 = new Schema( -new Field("offset", INT64), -new Field("leader_epoch", INT32), -new Field("metadata", STRING, "Associated metadata.", ""), -new Field("commit_timestamp", INT64)) - private val OFFSET_VALUE_OFFSET_FIELD_V3 = OFFSET_COMMIT_VALUE_SCHEMA_V3.get("offset") - private val OFFSET_VALUE_LEADER_EPOCH_FIELD_V3 = OFFSET_COMMIT_VALUE_SCHEMA_V3.get("leader_epoch") - private val OFFSET_VALUE_METADATA_FIELD_V3 = OFFSET_COMMIT_VALUE_SCHEMA_V3.get("metadata") - private val OFFSET_VALUE_COMMIT_TIMESTAMP_FIELD_V3 = OFFSET_COMMIT_VALUE_SCHEMA_V3.get("commit_timestamp") - - private val GROUP_METADATA_KEY_SCHEMA = new Schema(new Field("group", STRING)) - private val GROUP_KEY_GROUP_FIELD = GROUP_METADATA_KEY_SCHEMA.get("group") - - private val MEMBER_ID_KEY = "member_id" - private val GROUP_INSTANCE_ID_KEY = "group_instance_id" - private val CLIENT_ID_KEY = "client_id" - private val CLIENT_HOST_KEY = "client_host" - private val REBALANCE_TIMEOUT_KEY = "rebalance_timeout" - private val SESSION_TIMEOUT_KEY = "session_timeout" - private val SUBSCRIPTION_KEY = "subscription" - private val ASSIGNMENT_KEY = "assignment" - - private val MEMBER_METADATA_V0 = new Schema( -new Field(MEMBER_ID_KEY, STRING), -new Field(CLIENT_ID_KEY, STRING), -new Field(CLIENT_HOST_KEY, STRING), -new Field(SESSION_TIMEOUT_KEY, INT32), -new Field(SUBSCRIPTION_KEY, BYTES), -new Field(ASSIGNMENT_KEY, BYTES)) - - private val MEMBER_METADATA_V1 = new Schema( -new Field(MEMBER_ID_KEY, STRING), -new Field(CLIENT_ID_KEY, STRING), -new Field(CLIENT_HOST_KEY, STRING), -new Field(REBALANCE_TIMEOUT_KEY, INT32), -new Field(SESSION_TIMEOUT_KEY, INT32), -new Field(SUBSCRIPTION_KEY, BYTES), -new Field(ASSIGNMENT_KEY, BYTES)) - - private val MEMBER_METADATA_V2 = MEMBER_METADATA_V1 - - private val MEMBER_METADATA_V3 = new Schema( -new Field(MEMBER_ID_KEY, STRING), -new Field(GROUP_INSTANCE_ID_KEY, NULLABLE_STRING), -new Field(CLIENT_ID_KEY, STRING), -new Field(CLIENT_HOST_KEY, STRING), -new Field(REBALANCE_TIMEOUT_KEY, INT32), -new Field(SESSION_TIMEOUT_KEY, INT32), -new Field(SUBSCRIPTION_KEY, BYTES), -new Field(ASSIGNMENT_KEY, BYTES)) - - private val PROTOCOL_TYPE_KEY = "protocol_type" - private val GENERATION_KEY = "generation" - private val PROTOCOL_KEY = "protocol" - private val LEADER_KEY = "leader" -
[GitHub] [kafka] vvcephei merged pull request #9262: MINOR: Fix log message when tasks directory is cleaned manually
vvcephei merged pull request #9262: URL: https://github.com/apache/kafka/pull/9262 This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] chia7712 commented on pull request #9284: KAFKA-10479 Throw exception if users try to update configs of existen…
chia7712 commented on pull request #9284: URL: https://github.com/apache/kafka/pull/9284#issuecomment-696528358 > So the previous behavior would have caused the updated configuration to be persisted, but it wouldn't take effect until the broker was restarted. Is that right? You do give a better explanation :) This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] dajac commented on a change in pull request #9317: KAFKA-10509: Added throttle connection accept rate metric (KIP-612)
dajac commented on a change in pull request #9317: URL: https://github.com/apache/kafka/pull/9317#discussion_r492570653 ## File path: core/src/main/scala/kafka/network/SocketServer.scala ## @@ -1414,7 +1420,8 @@ class ConnectionQuotas(config: KafkaConfig, time: Time, metrics: Metrics) extend class ListenerConnectionQuota(lock: Object, listener: ListenerName) extends ListenerReconfigurable { @volatile private var _maxConnections = Int.MaxValue -val connectionRateSensor = createConnectionRateQuotaSensor(Int.MaxValue, Some(listener.value)) +val connectionRateSensor: Sensor = createConnectionRateQuotaSensor(Int.MaxValue, Some(listener.value)) +val connectionRateThrottleSensor: Sensor = createConnectionRateThrottleSensor() Review comment: nit: Do we really need to provide the type here? It seems that we usually don't provide it in that file. ## File path: core/src/main/scala/kafka/network/SocketServer.scala ## @@ -1292,6 +1292,12 @@ class ConnectionQuotas(config: KafkaConfig, time: Time, metrics: Metrics) extend counts.synchronized { val startThrottleTimeMs = time.milliseconds val throttleTimeMs = math.max(recordConnectionAndGetThrottleTimeMs(listenerName, startThrottleTimeMs), 0) + if (throttleTimeMs > 0) { +// record throttle time due to hitting connection rate limit +// connection could be throttled longer if the limit of the number of active connections is reached as well +maxConnectionsPerListener.get(listenerName) + .foreach(_.connectionRateThrottleSensor.record(throttleTimeMs.toDouble, startThrottleTimeMs)) Review comment: It is a bit annoying that we have to lookup the `ListenerConnectionQuota` twice. Once in `recordConnectionAndGetThrottleTimeMs` and once here. I wonder if we could look it up once to record the rate and to record the throttle time. I am thinking about the following to be more concrete. We could add two methods in `ListenerConnectionQuota`: 1) `record` and 2) `recordThrottleTime`. They would encapsulate the logic to respectively record the rate (for the listener and the broker like we do now) and the throttle time. So here we could look up the `ListenerConnectionQuota` record and get the throttle time, and record the throttle time if > 0. That could improve the readability a bit. I am not sure that it would make a difference from a performance point of view. WDYT? ## File path: core/src/main/scala/kafka/network/SocketServer.scala ## @@ -1447,13 +1454,33 @@ class ConnectionQuotas(config: KafkaConfig, time: Time, metrics: Metrics) extend } } +def removeSensors(): Unit = { Review comment: nit: What about naming this `close`? It feels a bit more natural to call `close` when we don't need the object anymore and it would be more aligned with the `close` methods that we already have in this file. For instance, `ConnectionQuotas#close` which also cleanup the metrics. BTW, not related to this PR but it seems that we don't close the `ListenerConnectionQuota` when the `ConnectionQuotas` is closed. I suppose that we leave metrics around. Is it the case? This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] showuon commented on pull request #9301: KAFKA-10482: Fix flaky testDynamicListenerConnectionCreationRateQuota
showuon commented on pull request #9301: URL: https://github.com/apache/kafka/pull/9301#issuecomment-696577916 @dajac , thanks for the comments. I've updated in this commit: https://github.com/apache/kafka/pull/9301/commits/8e2714c6b4d0b73c36893c365998380cfaf724f6. Thanks. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] mjsax commented on a change in pull request #9251: KAFKA-10459: Document IQ APIs where order does not hold between stores
mjsax commented on a change in pull request #9251: URL: https://github.com/apache/kafka/pull/9251#discussion_r492434090 ## File path: streams/src/main/java/org/apache/kafka/streams/state/ReadOnlyWindowStore.java ## @@ -25,6 +25,11 @@ * A window store that only supports read operations. * Implementations should be thread-safe as concurrent reads and writes are expected. * + * Note: The current implementation of either forward or backward fetches on range-key-range-time does not Review comment: ```suggestion * Note: The current implementation of either forward or backward fetches on range-key-range-time does not ``` This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] vvcephei commented on pull request #9262: MINOR: Fix log message when tasks directory is cleaned manually
vvcephei commented on pull request #9262: URL: https://github.com/apache/kafka/pull/9262#issuecomment-696760191 Test failure unrelated: `Build / JDK 11 / kafka.api.ConsumerBounceTest.testRollingBrokerRestartsWithSmallerMaxGroupSizeConfigDisruptsBigGroup` This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] big-andy-coates commented on pull request #9156: KAFKA-10077: Filter downstream of state-store results in spurious tombstones
big-andy-coates commented on pull request #9156: URL: https://github.com/apache/kafka/pull/9156#issuecomment-696383217 This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] avocader closed pull request #9227: KAFKA-10439: Connect's Values to parse BigInteger as Decimal with zero scale.
avocader closed pull request #9227: URL: https://github.com/apache/kafka/pull/9227 This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] dajac commented on a change in pull request #9301: KAFKA-10482: Fix flaky testDynamicListenerConnectionCreationRateQuota
dajac commented on a change in pull request #9301: URL: https://github.com/apache/kafka/pull/9301#discussion_r492540150 ## File path: core/src/test/scala/integration/kafka/network/DynamicConnectionQuotaTest.scala ## @@ -179,17 +179,23 @@ class DynamicConnectionQuotaTest extends BaseRequestTest { reconfigureServers(props, perBrokerConfig = true, (KafkaConfig.ListenersProp, newListeners)) waitForListener("EXTERNAL") +// we need to set the initialConnectionCount earlier and pass to verifyConnectionRate method +// so that the race condition won't occur for the following multi-thread test cases Review comment: This comment is irrelevant now. I would just say that this is the expected connection count after each run. ## File path: core/src/test/scala/integration/kafka/network/DynamicConnectionQuotaTest.scala ## @@ -317,6 +325,12 @@ class DynamicConnectionQuotaTest extends BaseRequestTest { } } + // make sure the connection count state is the same as the expectedConnectionCount Review comment: nit: I would remove this comment. I think that the method is self-explanatory now. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] mjsax commented on a change in pull request #8181: KAFKA-9584 Headers ConcurrentModificationException
mjsax commented on a change in pull request #8181: URL: https://github.com/apache/kafka/pull/8181#discussion_r492360815 ## File path: streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamTask.java ## @@ -581,8 +580,8 @@ public void punctuate(final ProcessorNode node, final long timestamp, final Punc if (processorContext.currentNode() != null) { throw new IllegalStateException(format("%sCurrent node is not null", logPrefix)); } - -updateProcessorContext(new StampedRecord(DUMMY_RECORD, timestamp), node); + +updateProcessorContext(new StampedRecord(new ConsumerRecord<>(ProcessorContextImpl.NONEXIST_TOPIC, -1, -1L, null, null), timestamp), node); Review comment: From my understanding, neither the `ConsumerRecord` nor the `ProcessroRecordContext` are the issue, but the shared `Header` object -- it's just a "side effect" that creating a new `ConsumerRecord` creates an new `Header` object internally. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] dajac commented on a change in pull request #9318: KAFKA-10497 Convert group coordinator metadata schemas to use generat…
dajac commented on a change in pull request #9318: URL: https://github.com/apache/kafka/pull/9318#discussion_r492661437 ## File path: core/src/main/scala/kafka/coordinator/group/GroupMetadataManager.scala ## @@ -997,189 +997,52 @@ object GroupMetadataManager { val MetricsGroup: String = "group-coordinator-metrics" val LoadTimeSensor: String = "GroupPartitionLoadTime" - private val CURRENT_OFFSET_KEY_SCHEMA_VERSION = 1.toShort - private val CURRENT_GROUP_KEY_SCHEMA_VERSION = 2.toShort - - private val OFFSET_COMMIT_KEY_SCHEMA = new Schema(new Field("group", STRING), -new Field("topic", STRING), -new Field("partition", INT32)) - private val OFFSET_KEY_GROUP_FIELD = OFFSET_COMMIT_KEY_SCHEMA.get("group") - private val OFFSET_KEY_TOPIC_FIELD = OFFSET_COMMIT_KEY_SCHEMA.get("topic") - private val OFFSET_KEY_PARTITION_FIELD = OFFSET_COMMIT_KEY_SCHEMA.get("partition") - - private val OFFSET_COMMIT_VALUE_SCHEMA_V0 = new Schema(new Field("offset", INT64), -new Field("metadata", STRING, "Associated metadata.", ""), -new Field("timestamp", INT64)) - private val OFFSET_VALUE_OFFSET_FIELD_V0 = OFFSET_COMMIT_VALUE_SCHEMA_V0.get("offset") - private val OFFSET_VALUE_METADATA_FIELD_V0 = OFFSET_COMMIT_VALUE_SCHEMA_V0.get("metadata") - private val OFFSET_VALUE_TIMESTAMP_FIELD_V0 = OFFSET_COMMIT_VALUE_SCHEMA_V0.get("timestamp") - - private val OFFSET_COMMIT_VALUE_SCHEMA_V1 = new Schema(new Field("offset", INT64), -new Field("metadata", STRING, "Associated metadata.", ""), -new Field("commit_timestamp", INT64), -new Field("expire_timestamp", INT64)) - private val OFFSET_VALUE_OFFSET_FIELD_V1 = OFFSET_COMMIT_VALUE_SCHEMA_V1.get("offset") - private val OFFSET_VALUE_METADATA_FIELD_V1 = OFFSET_COMMIT_VALUE_SCHEMA_V1.get("metadata") - private val OFFSET_VALUE_COMMIT_TIMESTAMP_FIELD_V1 = OFFSET_COMMIT_VALUE_SCHEMA_V1.get("commit_timestamp") - private val OFFSET_VALUE_EXPIRE_TIMESTAMP_FIELD_V1 = OFFSET_COMMIT_VALUE_SCHEMA_V1.get("expire_timestamp") - - private val OFFSET_COMMIT_VALUE_SCHEMA_V2 = new Schema(new Field("offset", INT64), -new Field("metadata", STRING, "Associated metadata.", ""), -new Field("commit_timestamp", INT64)) - private val OFFSET_VALUE_OFFSET_FIELD_V2 = OFFSET_COMMIT_VALUE_SCHEMA_V2.get("offset") - private val OFFSET_VALUE_METADATA_FIELD_V2 = OFFSET_COMMIT_VALUE_SCHEMA_V2.get("metadata") - private val OFFSET_VALUE_COMMIT_TIMESTAMP_FIELD_V2 = OFFSET_COMMIT_VALUE_SCHEMA_V2.get("commit_timestamp") - - private val OFFSET_COMMIT_VALUE_SCHEMA_V3 = new Schema( -new Field("offset", INT64), -new Field("leader_epoch", INT32), -new Field("metadata", STRING, "Associated metadata.", ""), -new Field("commit_timestamp", INT64)) - private val OFFSET_VALUE_OFFSET_FIELD_V3 = OFFSET_COMMIT_VALUE_SCHEMA_V3.get("offset") - private val OFFSET_VALUE_LEADER_EPOCH_FIELD_V3 = OFFSET_COMMIT_VALUE_SCHEMA_V3.get("leader_epoch") - private val OFFSET_VALUE_METADATA_FIELD_V3 = OFFSET_COMMIT_VALUE_SCHEMA_V3.get("metadata") - private val OFFSET_VALUE_COMMIT_TIMESTAMP_FIELD_V3 = OFFSET_COMMIT_VALUE_SCHEMA_V3.get("commit_timestamp") - - private val GROUP_METADATA_KEY_SCHEMA = new Schema(new Field("group", STRING)) - private val GROUP_KEY_GROUP_FIELD = GROUP_METADATA_KEY_SCHEMA.get("group") - - private val MEMBER_ID_KEY = "member_id" - private val GROUP_INSTANCE_ID_KEY = "group_instance_id" - private val CLIENT_ID_KEY = "client_id" - private val CLIENT_HOST_KEY = "client_host" - private val REBALANCE_TIMEOUT_KEY = "rebalance_timeout" - private val SESSION_TIMEOUT_KEY = "session_timeout" - private val SUBSCRIPTION_KEY = "subscription" - private val ASSIGNMENT_KEY = "assignment" - - private val MEMBER_METADATA_V0 = new Schema( -new Field(MEMBER_ID_KEY, STRING), -new Field(CLIENT_ID_KEY, STRING), -new Field(CLIENT_HOST_KEY, STRING), -new Field(SESSION_TIMEOUT_KEY, INT32), -new Field(SUBSCRIPTION_KEY, BYTES), -new Field(ASSIGNMENT_KEY, BYTES)) - - private val MEMBER_METADATA_V1 = new Schema( -new Field(MEMBER_ID_KEY, STRING), -new Field(CLIENT_ID_KEY, STRING), -new Field(CLIENT_HOST_KEY, STRING), -new Field(REBALANCE_TIMEOUT_KEY, INT32), -new Field(SESSION_TIMEOUT_KEY, INT32), -new Field(SUBSCRIPTION_KEY, BYTES), -new Field(ASSIGNMENT_KEY, BYTES)) - - private val MEMBER_METADATA_V2 = MEMBER_METADATA_V1 - - private val MEMBER_METADATA_V3 = new Schema( -new Field(MEMBER_ID_KEY, STRING), -new Field(GROUP_INSTANCE_ID_KEY, NULLABLE_STRING), -new Field(CLIENT_ID_KEY, STRING), -new Field(CLIENT_HOST_KEY, STRING), -new Field(REBALANCE_TIMEOUT_KEY, INT32), -new Field(SESSION_TIMEOUT_KEY, INT32), -new Field(SUBSCRIPTION_KEY, BYTES), -new Field(ASSIGNMENT_KEY, BYTES)) - - private val PROTOCOL_TYPE_KEY = "protocol_type" - private val GENERATION_KEY = "generation" - private val PROTOCOL_KEY = "protocol" - private val LEADER_KEY = "leader" -
[jira] [Assigned] (KAFKA-8653) Regression in JoinGroup v0 rebalance timeout handling
[ https://issues.apache.org/jira/browse/KAFKA-8653?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] James Cheng reassigned KAFKA-8653: -- Assignee: James Cheng (was: Jason Gustafson) > Regression in JoinGroup v0 rebalance timeout handling > - > > Key: KAFKA-8653 > URL: https://issues.apache.org/jira/browse/KAFKA-8653 > Project: Kafka > Issue Type: Bug >Affects Versions: 2.3.0 >Reporter: Jason Gustafson >Assignee: James Cheng >Priority: Blocker > Fix For: 2.3.1 > > > The rebalance timeout was added to the JoinGroup protocol in version 1. Prior > to 2.3, we handled version 0 JoinGroup requests by setting the rebalance > timeout to be equal to the session timeout. We lost this logic when we > converted the API to use the generated schema definition which uses the > default value of -1. The impact of this is that the group rebalance timeout > becomes 0, so rebalances finish immediately after we enter the > PrepareRebalance state and kick out all old members. This causes consumer > groups to enter an endless rebalance loop. -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Assigned] (KAFKA-8653) Regression in JoinGroup v0 rebalance timeout handling
[ https://issues.apache.org/jira/browse/KAFKA-8653?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] James Cheng reassigned KAFKA-8653: -- Assignee: (was: James Cheng) > Regression in JoinGroup v0 rebalance timeout handling > - > > Key: KAFKA-8653 > URL: https://issues.apache.org/jira/browse/KAFKA-8653 > Project: Kafka > Issue Type: Bug >Affects Versions: 2.3.0 >Reporter: Jason Gustafson >Priority: Blocker > Fix For: 2.3.1 > > > The rebalance timeout was added to the JoinGroup protocol in version 1. Prior > to 2.3, we handled version 0 JoinGroup requests by setting the rebalance > timeout to be equal to the session timeout. We lost this logic when we > converted the API to use the generated schema definition which uses the > default value of -1. The impact of this is that the group rebalance timeout > becomes 0, so rebalances finish immediately after we enter the > PrepareRebalance state and kick out all old members. This causes consumer > groups to enter an endless rebalance loop. -- This message was sent by Atlassian Jira (v8.3.4#803005)
[GitHub] [kafka] hachikuji commented on a change in pull request #9103: KAFKA-10181: Add redirection for (Incremental)AlterConfig, AlterClientQuota and CreateTopics
hachikuji commented on a change in pull request #9103: URL: https://github.com/apache/kafka/pull/9103#discussion_r493160927 ## File path: clients/src/main/java/org/apache/kafka/clients/admin/AlterConfigsUtil.java ## @@ -0,0 +1,52 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.kafka.clients.admin; + +import org.apache.kafka.common.config.ConfigResource; +import org.apache.kafka.common.message.IncrementalAlterConfigsRequestData; + +import java.util.Collection; +import java.util.Map; + +public class AlterConfigsUtil { + +public static IncrementalAlterConfigsRequestData generateIncrementalRequestData(final Map> configs, + final boolean validateOnly) { +return generateIncrementalRequestData(configs.keySet(), configs, validateOnly); +} + +public static IncrementalAlterConfigsRequestData generateIncrementalRequestData(final Collection resources, Review comment: nit: might be useful to document the expectation that `resources` is a subset of the key set of `configs`. The signature surprised me a little bit. As an aside, this kind of convenience conversion seems more appropriate for `IncrementalAlterConfigsRequest.Builder` rather than a static class. ## File path: core/src/main/scala/kafka/api/ApiVersion.scala ## @@ -103,6 +103,9 @@ object ApiVersion { KAFKA_2_7_IV0, // Bup Fetch protocol for Raft protocol (KIP-595) KAFKA_2_7_IV1, +// Enable redirection (KIP-590) +// TODO: remove this IBP in the 2.7 release if redirection work could not be done before the freeze Review comment: Get rid of this TODO. We do not need to remove IBP internal versions. ## File path: core/src/main/scala/kafka/common/InterBrokerSendThread.scala ## @@ -147,7 +147,9 @@ abstract class InterBrokerSendThread(name: String, case class RequestAndCompletionHandler(destination: Node, request: AbstractRequest.Builder[_ <: AbstractRequest], - handler: RequestCompletionHandler) + handler: RequestCompletionHandler, + initialPrincipalName: String = null, Review comment: nit: why don't we add a case class and make this optional. for example: ```scala case class InitialPrincipal(name: String, clientId: String) ``` In addition to reducing parameters, that makes the expectation that both are provided explicit. ## File path: core/src/main/scala/kafka/server/KafkaApis.scala ## @@ -117,6 +117,89 @@ class KafkaApis(val requestChannel: RequestChannel, val adminZkClient = new AdminZkClient(zkClient) private val alterAclsPurgatory = new DelayedFuturePurgatory(purgatoryName = "AlterAcls", brokerId = config.brokerId) + /** + * The template to create a forward request handler. + * + * @tparam T request type + * @tparam R response type + * @tparam RK resource key + * @tparam RV resource value + */ + private[server] abstract class ForwardRequestHandler[T <: AbstractRequest, +R <: AbstractResponse, RK, RV](request: RequestChannel.Request) extends Logging { + +/** + * Split the given resource into authorized and unauthorized sets. + * + * @return authorized resources and unauthorized resources + */ +def resourceSplitByAuthorization(request: T): (Map[RK, RV], Map[RK, ApiError]) + +/** + * Controller handling logic of the request. + */ +def process(authorizedResources: Map[RK, RV], +unauthorizedResult: Map[RK, ApiError], +request: T): Unit + +/** + * Build a forward request to the controller. + * + * @param authorizedResources authorized resources by the forwarding broker + * @param request the original request + * @return forward request builder + */ +def createRequestBuilder(authorizedResources: Map[RK, RV], + request: T): AbstractRequest.Builder[T] + +/** + * Merge the forward response with the previously unauthorized
[GitHub] [kafka] huxihx commented on pull request #9218: MINOR: Fix shouldNotResetEpochHistoryHeadIfUndefinedPassed
huxihx commented on pull request #9218: URL: https://github.com/apache/kafka/pull/9218#issuecomment-697095836 retest this please. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Created] (KAFKA-10513) Newly added partitions are not assigned to running static consumer
Marlon Ou created KAFKA-10513: - Summary: Newly added partitions are not assigned to running static consumer Key: KAFKA-10513 URL: https://issues.apache.org/jira/browse/KAFKA-10513 Project: Kafka Issue Type: Bug Components: consumer Affects Versions: 2.6.0 Reporter: Marlon Ou If consumers are polling messages from a certain topic with static membership and we add new partitions to this topic while the consumers are running, no partition reassignment is ever triggered (and hence messages published into the new partitions are never consumed). To reproduce, simply set group instance IDs on the consumers: {code:java} props.setProperty("group.instance.id", instanceId); {code} And then while the static consumers are running, use Kafka's admin client to add more partitions to the topic: {code:java} adminClient.createPartitions(...) {code} -- This message was sent by Atlassian Jira (v8.3.4#803005)
[GitHub] [kafka] mattwong949 opened a new pull request #9322: KAFKA-10512: Prevent JmxTool Crashing on Unmarshall Error
mattwong949 opened a new pull request #9322: URL: https://github.com/apache/kafka/pull/9322 The JMXTool will query for all metrics when not supplied an "--object-name" arg. However, some MBean objects can contain attributes that cannot be serialized, thus crashing the JMXTool before reporting any metrics. This PR catches those exceptions, printing an error message but allowing the tool to continue to reporting all metrics w/ the errored ones filtered out This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Created] (KAFKA-10512) JmxTool Can Crash on Unmarshall Error
Matthew Wong created KAFKA-10512: Summary: JmxTool Can Crash on Unmarshall Error Key: KAFKA-10512 URL: https://issues.apache.org/jira/browse/KAFKA-10512 Project: Kafka Issue Type: Bug Components: tools Affects Versions: 2.6.0 Reporter: Matthew Wong JmxTool can potentially crash from errors when querying for MBean objects. The errors can be caused by MBean objects that have attributes which can't be serialized. When querying for all metrics, if the tool encounters such nonserializable MBean attributes, the tool will crash without outputting any metrics. Instead, the tool should print an error message and filter out the problematic objects, proceeding to print all other metrics. -- This message was sent by Atlassian Jira (v8.3.4#803005)
[GitHub] [kafka] jeqo opened a new pull request #9321: fix: add missing default implementations
jeqo opened a new pull request #9321: URL: https://github.com/apache/kafka/pull/9321 ### Committer Checklist (excluded from commit message) - [ ] Verify design and implementation - [ ] Verify test coverage and CI build status - [ ] Verify documentation (including upgrade notes) This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] gmunozfe commented on pull request #9309: KAFKA-10503: MockProducer doesn't throw ClassCastException when no
gmunozfe commented on pull request #9309: URL: https://github.com/apache/kafka/pull/9309#issuecomment-696995620 @mjsax @guozhangwang @leonardge could you review? thanks! This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Created] (KAFKA-10511) Fix minor behavior difference in `MockLog`
Jason Gustafson created KAFKA-10511: --- Summary: Fix minor behavior difference in `MockLog` Key: KAFKA-10511 URL: https://issues.apache.org/jira/browse/KAFKA-10511 Project: Kafka Issue Type: Sub-task Reporter: Jason Gustafson Assignee: Jason Gustafson Fix minor difference in the implementation of the epoch cache in MockLog. In `LeaderEpochFileCache`, we ensure new entries increase both start offset and epoch monotonically. We also do not allow duplicates. -- This message was sent by Atlassian Jira (v8.3.4#803005)
[GitHub] [kafka] ableegoldman commented on pull request #9312: KAFKA-10505: Fix parsing of generation log string.
ableegoldman commented on pull request #9312: URL: https://github.com/apache/kafka/pull/9312#issuecomment-696925826 Original system test run was aborted for some reason. Kicked off a new set: https://jenkins.confluent.io/job/system-test-kafka-branch-builder/4182/ This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] avocader opened a new pull request #9320: KAFKA-10439: Connect's Values to parse BigInteger as Decimal with zero scale.
avocader opened a new pull request #9320: URL: https://github.com/apache/kafka/pull/9320 The `org.apache.kafka.connect.data.Values#parse` method parses integers, which are larger than `Long.MAX_VALUE` as `double` with `Schema.FLOAT64_SCHEMA`. That means we are losing precision for these larger integers. For example: ``` SchemaAndValue schemaAndValue = Values.parseString("9223372036854775808"); ``` returns: ``` SchemaAndValue{schema=Schema{FLOAT64}, value=9.223372036854776E18} ``` Also, this method parses values, that can be parsed as `FLOAT32` to `FLOAT64`. This PR changes parsing logic, to use `FLOAT32`/`FLOAT64` for numbers that don't have and fraction part(`decimal.scale()!=0`) only, and use an arbitrary-precision `org.apache.kafka.connect.data.Decimal` otherwise. Also, it updates the method to parse numbers, that can be represented as `float` to `FLOAT64`. Added unit tests, that cover parsing `BigInteger`, `Byte`, `Short`, `Integer`, `Long`, `Float`, `Double` types. ### Committer Checklist (excluded from commit message) - [ ] Verify design and implementation - [ ] Verify test coverage and CI build status - [ ] Verify documentation (including upgrade notes) This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] avocader closed pull request #9227: KAFKA-10439: Connect's Values to parse BigInteger as Decimal with zero scale.
avocader closed pull request #9227: URL: https://github.com/apache/kafka/pull/9227 This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Resolved] (KAFKA-10492) Core Raft implementation
[ https://issues.apache.org/jira/browse/KAFKA-10492?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Jason Gustafson resolved KAFKA-10492. - Resolution: Fixed > Core Raft implementation > > > Key: KAFKA-10492 > URL: https://issues.apache.org/jira/browse/KAFKA-10492 > Project: Kafka > Issue Type: Sub-task >Reporter: Jason Gustafson >Assignee: Jason Gustafson >Priority: Major > > This issue tracks the core implementation of the Raft protocol specified in > KIP-595: > https://cwiki.apache.org/confluence/display/KAFKA/KIP-595%3A+A+Raft+Protocol+for+the+Metadata+Quorum. -- This message was sent by Atlassian Jira (v8.3.4#803005)
[GitHub] [kafka] hachikuji merged pull request #9130: KAFKA-10492; Core Kafka Raft Implementation (KIP-595)
hachikuji merged pull request #9130: URL: https://github.com/apache/kafka/pull/9130 This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] guozhangwang commented on a change in pull request #9083: KAFKA-9450: Follow-up; Forbid process after closed
guozhangwang commented on a change in pull request #9083: URL: https://github.com/apache/kafka/pull/9083#discussion_r492950073 ## File path: streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamFlatTransformValues.java ## @@ -44,19 +45,18 @@ public KStreamFlatTransformValues(final ValueTransformerWithKeySupplier implements Processor { +public static class KStreamFlatTransformValuesProcessor extends AbstractProcessor { Review comment: As described in at the top, `Let all built-in processors to extend from AbstractProcessor.` The main reason is that AbstractProcessor provides some basic functionalities and hence it's better to let our own impl to base on them. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] guozhangwang commented on a change in pull request #9083: KAFKA-9450: Follow-up; Forbid process after closed
guozhangwang commented on a change in pull request #9083: URL: https://github.com/apache/kafka/pull/9083#discussion_r492944504 ## File path: streams/src/main/java/org/apache/kafka/streams/processor/internals/ProcessorContextImpl.java ## @@ -217,12 +217,12 @@ public StateStore getStateStore(final String name) { forward((ProcessorNode) child, key, value); } } else { -final ProcessorNode child = currentNode().getChild(sendTo); Review comment: I made the change in ProcessorNode to add back the template types: https://github.com/apache/kafka/pull/9083/files/82b6f6f5d238401097e0906c8135c5c189524666#diff-705bfd0ed3f214048b76d775708cc7d2L96 But since `currentNode()`'s template is `` its templated `getChild` and that's why I need to weaken it here --- as you can see from the above `if` branch, it now aligns consistently on the typing. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] abbccdda commented on a change in pull request #9083: KAFKA-9450: Follow-up; Forbid process after closed
abbccdda commented on a change in pull request #9083: URL: https://github.com/apache/kafka/pull/9083#discussion_r492941448 ## File path: streams/src/test/java/org/apache/kafka/streams/processor/internals/StreamTaskTest.java ## @@ -1566,6 +1566,7 @@ public void shouldCheckpointForSuspendedTask() { EasyMock.verify(stateManager); } + Review comment: nit: not necessary ## File path: streams/src/main/java/org/apache/kafka/streams/processor/internals/ProcessorContextImpl.java ## @@ -217,12 +217,12 @@ public StateStore getStateStore(final String name) { forward((ProcessorNode) child, key, value); } } else { -final ProcessorNode child = currentNode().getChild(sendTo); Review comment: +1 ## File path: streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamFlatTransformValues.java ## @@ -44,19 +45,18 @@ public KStreamFlatTransformValues(final ValueTransformerWithKeySupplier implements Processor { +public static class KStreamFlatTransformValuesProcessor extends AbstractProcessor { Review comment: Could you elaborate why this is better than Processor? This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] piotrrzysko commented on pull request #9315: KAFKA-10496: Removed relying on external DNS servers in tests
piotrrzysko commented on pull request #9315: URL: https://github.com/apache/kafka/pull/9315#issuecomment-696860022 @jolshan @mumrah @dajac Could you please take look at this PR? This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] abbccdda commented on a change in pull request #9270: [WIP] KAFKA-10284: Group membership update due to static member rejoin should be persisted
abbccdda commented on a change in pull request #9270: URL: https://github.com/apache/kafka/pull/9270#discussion_r492896997 ## File path: core/src/test/scala/unit/kafka/coordinator/group/GroupCoordinatorTest.scala ## @@ -3882,6 +3942,21 @@ class GroupCoordinatorTest { Await.result(responseFuture, Duration(rebalanceTimeout + 100, TimeUnit.MILLISECONDS)) } + private def staticJoinGroupWithPersistence(groupId: String, + memberId: String, Review comment: nit: alignment ## File path: core/src/main/scala/kafka/coordinator/group/GroupCoordinator.scala ## @@ -1040,21 +1040,36 @@ class GroupCoordinator(val brokerId: Int, group.currentState match { case Stable => -info(s"Static member joins during Stable stage will not trigger rebalance.") -group.maybeInvokeJoinCallback(member, JoinGroupResult( - members = List.empty, - memberId = newMemberId, - generationId = group.generationId, - protocolType = group.protocolType, - protocolName = group.protocolName, - // We want to avoid current leader performing trivial assignment while the group - // is in stable stage, because the new assignment in leader's next sync call - // won't be broadcast by a stable group. This could be guaranteed by - // always returning the old leader id so that the current leader won't assume itself - // as a leader based on the returned message, since the new member.id won't match - // returned leader id, therefore no assignment will be performed. - leaderId = currentLeader, - error = Errors.NONE)) +// check if group's selectedProtocol of next generation will change, if not, simply store group to persist the +// updated static member, if yes, rebalance should be triggered to let the group's assignment and selectProtocol consistent +val selectedProtocolOfNextGeneration = group.selectProtocol +if (group.protocolName.contains(selectedProtocolOfNextGeneration)) { + info(s"Static member which joins during Stable stage and doesn't affect selectProtocol will not trigger rebalance.") + val groupAssignment: Map[String, Array[Byte]] = group.allMemberMetadata.map(member => member.memberId -> member.assignment).toMap + groupManager.storeGroup(group, groupAssignment, error => { +group.inLock { + if (error != Errors.NONE) { +warn(s"Failed to persist metadata for group ${group.groupId}: ${error.message}") + } +} + }) + group.maybeInvokeJoinCallback(member, JoinGroupResult( +members = List.empty, +memberId = newMemberId, +generationId = group.generationId, +protocolType = group.protocolType, +protocolName = group.protocolName, +// We want to avoid current leader performing trivial assignment while the group +// is in stable stage, because the new assignment in leader's next sync call +// won't be broadcast by a stable group. This could be guaranteed by +// always returning the old leader id so that the current leader won't assume itself +// as a leader based on the returned message, since the new member.id won't match +// returned leader id, therefore no assignment will be performed. +leaderId = currentLeader, +error = Errors.NONE)) +} else { + maybePrepareRebalance(group, s"Group's selectedProtocol will change because static member ${member.memberId} with instance id $groupInstanceId joined with change of protocol") Review comment: Could you elaborate why this case is possible? We do have checks for `!group.supportsProtocols(protocolType, MemberMetadata.plainProtocolSet(protocols)` in the caller, so if the group protocol is incompatible, won't we just reject the rejoin? ## File path: core/src/main/scala/kafka/coordinator/group/GroupCoordinator.scala ## @@ -1040,21 +1040,36 @@ class GroupCoordinator(val brokerId: Int, group.currentState match { case Stable => -info(s"Static member joins during Stable stage will not trigger rebalance.") -group.maybeInvokeJoinCallback(member, JoinGroupResult( - members = List.empty, - memberId = newMemberId, - generationId = group.generationId, - protocolType = group.protocolType, - protocolName = group.protocolName, - // We want to avoid current leader performing trivial assignment while the group - // is in stable stage, because the new assignment in leader's next sync call - // won't be broadcast by a stable group. This could be guaranteed by - // always returning the old leader id so that the current leader won't assume itself
[GitHub] [kafka] chia7712 commented on a change in pull request #9206: MINOR: rewrite zipWithIndex by normal foreach to refrain unnecessary …
chia7712 commented on a change in pull request #9206: URL: https://github.com/apache/kafka/pull/9206#discussion_r492872408 ## File path: core/src/main/scala/kafka/log/LogValidator.scala ## @@ -234,17 +234,16 @@ private[log] object LogValidator extends Logging { val firstBatch = getFirstBatchAndMaybeValidateNoMoreBatches(records, NoCompressionCodec) -records.batches.forEach { batch => +for (batch <- records.batches.asScala) { Review comment: ok~ This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] ijuma commented on a change in pull request #9206: MINOR: rewrite zipWithIndex by normal foreach to refrain unnecessary …
ijuma commented on a change in pull request #9206: URL: https://github.com/apache/kafka/pull/9206#discussion_r492865230 ## File path: core/src/main/scala/kafka/log/LogValidator.scala ## @@ -234,17 +234,16 @@ private[log] object LogValidator extends Logging { val firstBatch = getFirstBatchAndMaybeValidateNoMoreBatches(records, NoCompressionCodec) -records.batches.forEach { batch => +for (batch <- records.batches.asScala) { Review comment: I liked your changes to make the code more concise, I'd keep them. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] mjsax commented on pull request #9156: KAFKA-10077: Filter downstream of state-store results in spurious tombstones
mjsax commented on pull request #9156: URL: https://github.com/apache/kafka/pull/9156#issuecomment-696823406 Retest this please. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] vvcephei merged pull request #9262: MINOR: Fix log message when tasks directory is cleaned manually
vvcephei merged pull request #9262: URL: https://github.com/apache/kafka/pull/9262 This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Updated] (KAFKA-10510) Reassigning partitions should not allow increasing RF of a partition unless configured with it
[ https://issues.apache.org/jira/browse/KAFKA-10510?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Stanislav Kozlovski updated KAFKA-10510: Description: Kafka should have some validations in place against increasing the RF of a partition through a reassignment. Users could otherwise shoot themselves in the foot by increasing the RF of a topic by reassigning its partitions to extra replicas and then have new partition creations use a lesser (the configured) replication factor. Our tools should ideally detect when RF is increasing inconsistently with the rest of the topic's partitions (or the default replication factor) was: Kafka should have some validations in place against increasing the RF of a partition through a reassignment. Users could otherwise shoot themselves in the foot by increasing the RF of a topic by reassigning its partitions to extra replicas and then have new partition creations use a lesser (the configured) replication factor. Our tools should ideally detect when RF is increasing inconsistently with the config and issue a separate command to change the config. > Reassigning partitions should not allow increasing RF of a partition unless > configured with it > -- > > Key: KAFKA-10510 > URL: https://issues.apache.org/jira/browse/KAFKA-10510 > Project: Kafka > Issue Type: Improvement >Reporter: Stanislav Kozlovski >Priority: Major > > Kafka should have some validations in place against increasing the RF of a > partition through a reassignment. Users could otherwise shoot themselves in > the foot by increasing the RF of a topic by reassigning its partitions to > extra replicas and then have new partition creations use a lesser (the > configured) replication factor. > Our tools should ideally detect when RF is increasing inconsistently with the > rest of the topic's partitions (or the default replication factor) -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Commented] (KAFKA-10507) Limit the set of APIs returned in pre-authentication ApiVersions
[ https://issues.apache.org/jira/browse/KAFKA-10507?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17200172#comment-17200172 ] Jason Gustafson commented on KAFKA-10507: - [~edenhill] It was more of a suggestion. I was thinking to avoid leaking information prior to authentication. Since librdkafka depends on it, I will just close this as "won't fix." > Limit the set of APIs returned in pre-authentication ApiVersions > - > > Key: KAFKA-10507 > URL: https://issues.apache.org/jira/browse/KAFKA-10507 > Project: Kafka > Issue Type: Improvement >Reporter: Jason Gustafson >Assignee: David Jacot >Priority: Major > > We use the ApiVersions RPC to check whether the SaslHandshake and > SaslAuthenticate APIs are supported before authenticating with the broker. > Currently the response contains all APIs supported by the broker. It seems > like a good idea to reduce the set of APIs returned at this level to only > those which are supported prior to authentication. -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Resolved] (KAFKA-10507) Limit the set of APIs returned in pre-authentication ApiVersions
[ https://issues.apache.org/jira/browse/KAFKA-10507?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Jason Gustafson resolved KAFKA-10507. - Resolution: Won't Do > Limit the set of APIs returned in pre-authentication ApiVersions > - > > Key: KAFKA-10507 > URL: https://issues.apache.org/jira/browse/KAFKA-10507 > Project: Kafka > Issue Type: Improvement >Reporter: Jason Gustafson >Assignee: David Jacot >Priority: Major > > We use the ApiVersions RPC to check whether the SaslHandshake and > SaslAuthenticate APIs are supported before authenticating with the broker. > Currently the response contains all APIs supported by the broker. It seems > like a good idea to reduce the set of APIs returned at this level to only > those which are supported prior to authentication. -- This message was sent by Atlassian Jira (v8.3.4#803005)
[GitHub] [kafka] chia7712 commented on a change in pull request #9206: MINOR: rewrite zipWithIndex by normal foreach to refrain unnecessary …
chia7712 commented on a change in pull request #9206: URL: https://github.com/apache/kafka/pull/9206#discussion_r492834735 ## File path: core/src/main/scala/kafka/log/LogValidator.scala ## @@ -234,16 +234,17 @@ private[log] object LogValidator extends Logging { val firstBatch = getFirstBatchAndMaybeValidateNoMoreBatches(records, NoCompressionCodec) -for (batch <- records.batches.asScala) { +records.batches.forEach { batch => validateBatch(topicPartition, firstBatch, batch, origin, toMagicValue, brokerTopicStats) val recordErrors = new ArrayBuffer[ApiRecordError](0) - for ((record, batchIndex) <- batch.asScala.view.zipWithIndex) { + var batchIndex = 0 + batch.forEach { record => validateRecord(batch, topicPartition, record, batchIndex, now, timestampType, timestampDiffMaxMs, compactedTopic, brokerTopicStats).foreach(recordError => recordErrors += recordError) // we fail the batch if any record fails, so we stop appending if any record fails -if (recordErrors.isEmpty) - builder.appendWithOffset(offsetCounter.getAndIncrement(), record) +if (recordErrors.isEmpty) builder.appendWithOffset(offsetCounter.getAndIncrement(), record) +batchIndex += 1 Review comment: > Have we benchmarked this path? I didn't benchmark this path and you are right that optimization is small as we have to convert data in this path. I will revert it to make small patch. ## File path: core/src/main/scala/kafka/log/LogValidator.scala ## @@ -279,14 +280,15 @@ private[log] object LogValidator extends Logging { val firstBatch = getFirstBatchAndMaybeValidateNoMoreBatches(records, NoCompressionCodec) -for (batch <- records.batches.asScala) { +records.batches.forEach { batch => validateBatch(topicPartition, firstBatch, batch, origin, magic, brokerTopicStats) var maxBatchTimestamp = RecordBatch.NO_TIMESTAMP var offsetOfMaxBatchTimestamp = -1L val recordErrors = new ArrayBuffer[ApiRecordError](0) - for ((record, batchIndex) <- batch.asScala.view.zipWithIndex) { + var batchIndex = 0 Review comment: copy that This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] ijuma commented on a change in pull request #9206: MINOR: rewrite zipWithIndex by normal foreach to refrain unnecessary …
ijuma commented on a change in pull request #9206: URL: https://github.com/apache/kafka/pull/9206#discussion_r492823279 ## File path: core/src/main/scala/kafka/log/LogValidator.scala ## @@ -279,14 +280,15 @@ private[log] object LogValidator extends Logging { val firstBatch = getFirstBatchAndMaybeValidateNoMoreBatches(records, NoCompressionCodec) -for (batch <- records.batches.asScala) { +records.batches.forEach { batch => validateBatch(topicPartition, firstBatch, batch, origin, magic, brokerTopicStats) var maxBatchTimestamp = RecordBatch.NO_TIMESTAMP var offsetOfMaxBatchTimestamp = -1L val recordErrors = new ArrayBuffer[ApiRecordError](0) - for ((record, batchIndex) <- batch.asScala.view.zipWithIndex) { + var batchIndex = 0 Review comment: Worth adding a comment here that this is a hot path and we want to avoid any unnecessary allocations. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] ijuma commented on a change in pull request #9206: MINOR: rewrite zipWithIndex by normal foreach to refrain unnecessary …
ijuma commented on a change in pull request #9206: URL: https://github.com/apache/kafka/pull/9206#discussion_r492821097 ## File path: core/src/main/scala/kafka/log/LogValidator.scala ## @@ -234,16 +234,17 @@ private[log] object LogValidator extends Logging { val firstBatch = getFirstBatchAndMaybeValidateNoMoreBatches(records, NoCompressionCodec) -for (batch <- records.batches.asScala) { +records.batches.forEach { batch => validateBatch(topicPartition, firstBatch, batch, origin, toMagicValue, brokerTopicStats) val recordErrors = new ArrayBuffer[ApiRecordError](0) - for ((record, batchIndex) <- batch.asScala.view.zipWithIndex) { + var batchIndex = 0 + batch.forEach { record => validateRecord(batch, topicPartition, record, batchIndex, now, timestampType, timestampDiffMaxMs, compactedTopic, brokerTopicStats).foreach(recordError => recordErrors += recordError) // we fail the batch if any record fails, so we stop appending if any record fails -if (recordErrors.isEmpty) - builder.appendWithOffset(offsetCounter.getAndIncrement(), record) +if (recordErrors.isEmpty) builder.appendWithOffset(offsetCounter.getAndIncrement(), record) +batchIndex += 1 Review comment: Have we benchmarked this path? It seems doubtful that these micro optimizations help given that we are `converting`. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] ijuma commented on pull request #4090: [KAFKA-6084] Propagate JSON parsing errors in ReassignPartitionsCommand
ijuma commented on pull request #4090: URL: https://github.com/apache/kafka/pull/4090#issuecomment-696784662 Can you also rebase please so that the PR builder runs? This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] ijuma commented on pull request #9266: KAFKA-10469: Resolve logger levels hierarchically
ijuma commented on pull request #9266: URL: https://github.com/apache/kafka/pull/9266#issuecomment-696784359 @tombentley `testDescribeConfigsForLog4jLogLevels` is failing. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] ijuma commented on a change in pull request #4090: [KAFKA-6084] Propagate JSON parsing errors in ReassignPartitionsCommand
ijuma commented on a change in pull request #4090: URL: https://github.com/apache/kafka/pull/4090#discussion_r492813256 ## File path: core/src/main/scala/kafka/utils/Json.scala ## @@ -69,8 +70,11 @@ object Json { * @return An `Either` which in case of `Left` means an exception and `Right` is the actual return value. */ def tryParseFull(input: String): Either[JsonProcessingException, JsonValue] = -try Right(mapper.readTree(input)).map(JsonValue(_)) -catch { case e: JsonProcessingException => Left(e) } +if (input != null && input.isEmpty) + Left(new JsonParseException(MissingNode.getInstance().traverse(), "The input string shouldn't be empty")) Review comment: This seems fine. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] ijuma commented on a change in pull request #4090: [KAFKA-6084] Propagate JSON parsing errors in ReassignPartitionsCommand
ijuma commented on a change in pull request #4090: URL: https://github.com/apache/kafka/pull/4090#discussion_r492811318 ## File path: core/src/main/scala/kafka/utils/Json.scala ## @@ -69,8 +70,11 @@ object Json { * @return An `Either` which in case of `Left` means an exception and `Right` is the actual return value. */ def tryParseFull(input: String): Either[JsonProcessingException, JsonValue] = -try Right(mapper.readTree(input)).map(JsonValue(_)) -catch { case e: JsonProcessingException => Left(e) } +if (input != null && input.isEmpty) Review comment: I think we should say `if (input == null || input.isEmpty)` instead. It's unexpected to get an exception instead of `Left` in a method that returns `Either`. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] vvcephei commented on pull request #9262: MINOR: Fix log message when tasks directory is cleaned manually
vvcephei commented on pull request #9262: URL: https://github.com/apache/kafka/pull/9262#issuecomment-696760191 Test failure unrelated: `Build / JDK 11 / kafka.api.ConsumerBounceTest.testRollingBrokerRestartsWithSmallerMaxGroupSizeConfigDisruptsBigGroup` This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] vvcephei merged pull request #8892: KAFKA-10068: verify assignment performance with large cluster
vvcephei merged pull request #8892: URL: https://github.com/apache/kafka/pull/8892 This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] vvcephei commented on pull request #8892: KAFKA-10068: verify assignment performance with large cluster
vvcephei commented on pull request #8892: URL: https://github.com/apache/kafka/pull/8892#issuecomment-696752069 Thanks, @ableegoldman ! There was only one unrelated test failure: `Build / JDK 11 / kafka.network.DynamicConnectionQuotaTest.testDynamicListenerConnectionCreationRateQuota` This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] viktorsomogyi commented on a change in pull request #4090: [KAFKA-6084] Propagate JSON parsing errors in ReassignPartitionsCommand
viktorsomogyi commented on a change in pull request #4090: URL: https://github.com/apache/kafka/pull/4090#discussion_r492740825 ## File path: core/src/main/scala/kafka/utils/Json.scala ## @@ -69,8 +70,11 @@ object Json { * @return An `Either` which in case of `Left` means an exception and `Right` is the actual return value. */ def tryParseFull(input: String): Either[JsonProcessingException, JsonValue] = -try Right(mapper.readTree(input)).map(JsonValue(_)) -catch { case e: JsonProcessingException => Left(e) } +if (input != null && input.isEmpty) + Left(new JsonParseException(MissingNode.getInstance().traverse(), "The input string shouldn't be empty")) Review comment: It seems like that the most adequate JsonProcessingException is JsonParseException that could apply here. Another possibility is to change the exception in return type to something like IOException and throw that here but it seems it's more vague than throwing JsonParseException. Another option is to simple throw IllegalArgumentException here instead of Left. That would correspond to the `readTree(InputStream)` as well. What do you think? This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] viktorsomogyi commented on a change in pull request #4090: [KAFKA-6084] Propagate JSON parsing errors in ReassignPartitionsCommand
viktorsomogyi commented on a change in pull request #4090: URL: https://github.com/apache/kafka/pull/4090#discussion_r492727120 ## File path: core/src/main/scala/kafka/utils/Json.scala ## @@ -69,8 +70,11 @@ object Json { * @return An `Either` which in case of `Left` means an exception and `Right` is the actual return value. */ def tryParseFull(input: String): Either[JsonProcessingException, JsonValue] = -try Right(mapper.readTree(input)).map(JsonValue(_)) -catch { case e: JsonProcessingException => Left(e) } +if (input != null && input.isEmpty) Review comment: Wasn't sure what to do here but `null` is checked in `readTree` and an IllegalArgumentException is thrown so I decided to go skip here and leave the original behavior. ## File path: core/src/main/scala/kafka/utils/Json.scala ## @@ -69,8 +70,11 @@ object Json { * @return An `Either` which in case of `Left` means an exception and `Right` is the actual return value. */ def tryParseFull(input: String): Either[JsonProcessingException, JsonValue] = -try Right(mapper.readTree(input)).map(JsonValue(_)) -catch { case e: JsonProcessingException => Left(e) } +if (input != null && input.isEmpty) Review comment: Wasn't sure what to do here but `null` is checked in `readTree` and an IllegalArgumentException is thrown so I decided to skip here and leave the original behavior. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] viktorsomogyi commented on a change in pull request #4090: [KAFKA-6084] Propagate JSON parsing errors in ReassignPartitionsCommand
viktorsomogyi commented on a change in pull request #4090: URL: https://github.com/apache/kafka/pull/4090#discussion_r492704280 ## File path: core/src/test/scala/unit/kafka/utils/JsonTest.scala ## @@ -40,25 +40,34 @@ class JsonTest { def testJsonParse(): Unit = { val jnf = JsonNodeFactory.instance -assertEquals(Json.parseFull("{}"), Some(JsonValue(new ObjectNode(jnf +assertEquals(Some(JsonValue(new ObjectNode(jnf))), Json.parseFull("{}")) +assertEquals(Right(JsonValue(new ObjectNode(jnf))), Json.tryParseFull("{}")) +assertThrows(classOf[IllegalArgumentException], () => Json.tryParseFull(null)) +assertThrows(classOf[IllegalArgumentException], () => Json.tryParseBytes(null)) -assertEquals(Json.parseFull("""{"foo":"bar"s}"""), None) +assertEquals(Option(MissingNode.getInstance()).map(JsonValue(_)), Json.parseFull("")) +assertEquals(Right(MissingNode.getInstance()).map(JsonValue(_)), Json.tryParseFull("")) Review comment: You're right, it makes sense to return `None` and `Left` in those cases. Will change it and upload it to this PR. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] chia7712 commented on a change in pull request #9318: KAFKA-10497 Convert group coordinator metadata schemas to use generat…
chia7712 commented on a change in pull request #9318: URL: https://github.com/apache/kafka/pull/9318#discussion_r492696416 ## File path: core/src/main/resources/common/message/GroupMetadataValue.json ## @@ -0,0 +1,103 @@ +// Licensed to the Apache Software Foundation (ASF) under one or more +// contributor license agreements. See the NOTICE file distributed with +// this work for additional information regarding copyright ownership. +// The ASF licenses this file to You under the Apache License, Version 2.0 +// (the "License"); you may not use this file except in compliance with +// the License. You may obtain a copy of the License at +// +//http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +{ + "name": "GroupMetadataValue", Review comment: copy that This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] chia7712 commented on a change in pull request #9318: KAFKA-10497 Convert group coordinator metadata schemas to use generat…
chia7712 commented on a change in pull request #9318: URL: https://github.com/apache/kafka/pull/9318#discussion_r492696066 ## File path: core/src/main/scala/kafka/coordinator/group/GroupMetadataManager.scala ## @@ -997,189 +997,52 @@ object GroupMetadataManager { val MetricsGroup: String = "group-coordinator-metrics" val LoadTimeSensor: String = "GroupPartitionLoadTime" - private val CURRENT_OFFSET_KEY_SCHEMA_VERSION = 1.toShort - private val CURRENT_GROUP_KEY_SCHEMA_VERSION = 2.toShort - - private val OFFSET_COMMIT_KEY_SCHEMA = new Schema(new Field("group", STRING), -new Field("topic", STRING), -new Field("partition", INT32)) - private val OFFSET_KEY_GROUP_FIELD = OFFSET_COMMIT_KEY_SCHEMA.get("group") - private val OFFSET_KEY_TOPIC_FIELD = OFFSET_COMMIT_KEY_SCHEMA.get("topic") - private val OFFSET_KEY_PARTITION_FIELD = OFFSET_COMMIT_KEY_SCHEMA.get("partition") - - private val OFFSET_COMMIT_VALUE_SCHEMA_V0 = new Schema(new Field("offset", INT64), -new Field("metadata", STRING, "Associated metadata.", ""), -new Field("timestamp", INT64)) - private val OFFSET_VALUE_OFFSET_FIELD_V0 = OFFSET_COMMIT_VALUE_SCHEMA_V0.get("offset") - private val OFFSET_VALUE_METADATA_FIELD_V0 = OFFSET_COMMIT_VALUE_SCHEMA_V0.get("metadata") - private val OFFSET_VALUE_TIMESTAMP_FIELD_V0 = OFFSET_COMMIT_VALUE_SCHEMA_V0.get("timestamp") - - private val OFFSET_COMMIT_VALUE_SCHEMA_V1 = new Schema(new Field("offset", INT64), -new Field("metadata", STRING, "Associated metadata.", ""), -new Field("commit_timestamp", INT64), -new Field("expire_timestamp", INT64)) - private val OFFSET_VALUE_OFFSET_FIELD_V1 = OFFSET_COMMIT_VALUE_SCHEMA_V1.get("offset") - private val OFFSET_VALUE_METADATA_FIELD_V1 = OFFSET_COMMIT_VALUE_SCHEMA_V1.get("metadata") - private val OFFSET_VALUE_COMMIT_TIMESTAMP_FIELD_V1 = OFFSET_COMMIT_VALUE_SCHEMA_V1.get("commit_timestamp") - private val OFFSET_VALUE_EXPIRE_TIMESTAMP_FIELD_V1 = OFFSET_COMMIT_VALUE_SCHEMA_V1.get("expire_timestamp") - - private val OFFSET_COMMIT_VALUE_SCHEMA_V2 = new Schema(new Field("offset", INT64), -new Field("metadata", STRING, "Associated metadata.", ""), -new Field("commit_timestamp", INT64)) - private val OFFSET_VALUE_OFFSET_FIELD_V2 = OFFSET_COMMIT_VALUE_SCHEMA_V2.get("offset") - private val OFFSET_VALUE_METADATA_FIELD_V2 = OFFSET_COMMIT_VALUE_SCHEMA_V2.get("metadata") - private val OFFSET_VALUE_COMMIT_TIMESTAMP_FIELD_V2 = OFFSET_COMMIT_VALUE_SCHEMA_V2.get("commit_timestamp") - - private val OFFSET_COMMIT_VALUE_SCHEMA_V3 = new Schema( -new Field("offset", INT64), -new Field("leader_epoch", INT32), -new Field("metadata", STRING, "Associated metadata.", ""), -new Field("commit_timestamp", INT64)) - private val OFFSET_VALUE_OFFSET_FIELD_V3 = OFFSET_COMMIT_VALUE_SCHEMA_V3.get("offset") - private val OFFSET_VALUE_LEADER_EPOCH_FIELD_V3 = OFFSET_COMMIT_VALUE_SCHEMA_V3.get("leader_epoch") - private val OFFSET_VALUE_METADATA_FIELD_V3 = OFFSET_COMMIT_VALUE_SCHEMA_V3.get("metadata") - private val OFFSET_VALUE_COMMIT_TIMESTAMP_FIELD_V3 = OFFSET_COMMIT_VALUE_SCHEMA_V3.get("commit_timestamp") - - private val GROUP_METADATA_KEY_SCHEMA = new Schema(new Field("group", STRING)) - private val GROUP_KEY_GROUP_FIELD = GROUP_METADATA_KEY_SCHEMA.get("group") - - private val MEMBER_ID_KEY = "member_id" - private val GROUP_INSTANCE_ID_KEY = "group_instance_id" - private val CLIENT_ID_KEY = "client_id" - private val CLIENT_HOST_KEY = "client_host" - private val REBALANCE_TIMEOUT_KEY = "rebalance_timeout" - private val SESSION_TIMEOUT_KEY = "session_timeout" - private val SUBSCRIPTION_KEY = "subscription" - private val ASSIGNMENT_KEY = "assignment" - - private val MEMBER_METADATA_V0 = new Schema( -new Field(MEMBER_ID_KEY, STRING), -new Field(CLIENT_ID_KEY, STRING), -new Field(CLIENT_HOST_KEY, STRING), -new Field(SESSION_TIMEOUT_KEY, INT32), -new Field(SUBSCRIPTION_KEY, BYTES), -new Field(ASSIGNMENT_KEY, BYTES)) - - private val MEMBER_METADATA_V1 = new Schema( -new Field(MEMBER_ID_KEY, STRING), -new Field(CLIENT_ID_KEY, STRING), -new Field(CLIENT_HOST_KEY, STRING), -new Field(REBALANCE_TIMEOUT_KEY, INT32), -new Field(SESSION_TIMEOUT_KEY, INT32), -new Field(SUBSCRIPTION_KEY, BYTES), -new Field(ASSIGNMENT_KEY, BYTES)) - - private val MEMBER_METADATA_V2 = MEMBER_METADATA_V1 - - private val MEMBER_METADATA_V3 = new Schema( -new Field(MEMBER_ID_KEY, STRING), -new Field(GROUP_INSTANCE_ID_KEY, NULLABLE_STRING), -new Field(CLIENT_ID_KEY, STRING), -new Field(CLIENT_HOST_KEY, STRING), -new Field(REBALANCE_TIMEOUT_KEY, INT32), -new Field(SESSION_TIMEOUT_KEY, INT32), -new Field(SUBSCRIPTION_KEY, BYTES), -new Field(ASSIGNMENT_KEY, BYTES)) - - private val PROTOCOL_TYPE_KEY = "protocol_type" - private val GENERATION_KEY = "generation" - private val PROTOCOL_KEY = "protocol" - private val LEADER_KEY = "leader" -
[GitHub] [kafka] dajac commented on a change in pull request #9318: KAFKA-10497 Convert group coordinator metadata schemas to use generat…
dajac commented on a change in pull request #9318: URL: https://github.com/apache/kafka/pull/9318#discussion_r492661437 ## File path: core/src/main/scala/kafka/coordinator/group/GroupMetadataManager.scala ## @@ -997,189 +997,52 @@ object GroupMetadataManager { val MetricsGroup: String = "group-coordinator-metrics" val LoadTimeSensor: String = "GroupPartitionLoadTime" - private val CURRENT_OFFSET_KEY_SCHEMA_VERSION = 1.toShort - private val CURRENT_GROUP_KEY_SCHEMA_VERSION = 2.toShort - - private val OFFSET_COMMIT_KEY_SCHEMA = new Schema(new Field("group", STRING), -new Field("topic", STRING), -new Field("partition", INT32)) - private val OFFSET_KEY_GROUP_FIELD = OFFSET_COMMIT_KEY_SCHEMA.get("group") - private val OFFSET_KEY_TOPIC_FIELD = OFFSET_COMMIT_KEY_SCHEMA.get("topic") - private val OFFSET_KEY_PARTITION_FIELD = OFFSET_COMMIT_KEY_SCHEMA.get("partition") - - private val OFFSET_COMMIT_VALUE_SCHEMA_V0 = new Schema(new Field("offset", INT64), -new Field("metadata", STRING, "Associated metadata.", ""), -new Field("timestamp", INT64)) - private val OFFSET_VALUE_OFFSET_FIELD_V0 = OFFSET_COMMIT_VALUE_SCHEMA_V0.get("offset") - private val OFFSET_VALUE_METADATA_FIELD_V0 = OFFSET_COMMIT_VALUE_SCHEMA_V0.get("metadata") - private val OFFSET_VALUE_TIMESTAMP_FIELD_V0 = OFFSET_COMMIT_VALUE_SCHEMA_V0.get("timestamp") - - private val OFFSET_COMMIT_VALUE_SCHEMA_V1 = new Schema(new Field("offset", INT64), -new Field("metadata", STRING, "Associated metadata.", ""), -new Field("commit_timestamp", INT64), -new Field("expire_timestamp", INT64)) - private val OFFSET_VALUE_OFFSET_FIELD_V1 = OFFSET_COMMIT_VALUE_SCHEMA_V1.get("offset") - private val OFFSET_VALUE_METADATA_FIELD_V1 = OFFSET_COMMIT_VALUE_SCHEMA_V1.get("metadata") - private val OFFSET_VALUE_COMMIT_TIMESTAMP_FIELD_V1 = OFFSET_COMMIT_VALUE_SCHEMA_V1.get("commit_timestamp") - private val OFFSET_VALUE_EXPIRE_TIMESTAMP_FIELD_V1 = OFFSET_COMMIT_VALUE_SCHEMA_V1.get("expire_timestamp") - - private val OFFSET_COMMIT_VALUE_SCHEMA_V2 = new Schema(new Field("offset", INT64), -new Field("metadata", STRING, "Associated metadata.", ""), -new Field("commit_timestamp", INT64)) - private val OFFSET_VALUE_OFFSET_FIELD_V2 = OFFSET_COMMIT_VALUE_SCHEMA_V2.get("offset") - private val OFFSET_VALUE_METADATA_FIELD_V2 = OFFSET_COMMIT_VALUE_SCHEMA_V2.get("metadata") - private val OFFSET_VALUE_COMMIT_TIMESTAMP_FIELD_V2 = OFFSET_COMMIT_VALUE_SCHEMA_V2.get("commit_timestamp") - - private val OFFSET_COMMIT_VALUE_SCHEMA_V3 = new Schema( -new Field("offset", INT64), -new Field("leader_epoch", INT32), -new Field("metadata", STRING, "Associated metadata.", ""), -new Field("commit_timestamp", INT64)) - private val OFFSET_VALUE_OFFSET_FIELD_V3 = OFFSET_COMMIT_VALUE_SCHEMA_V3.get("offset") - private val OFFSET_VALUE_LEADER_EPOCH_FIELD_V3 = OFFSET_COMMIT_VALUE_SCHEMA_V3.get("leader_epoch") - private val OFFSET_VALUE_METADATA_FIELD_V3 = OFFSET_COMMIT_VALUE_SCHEMA_V3.get("metadata") - private val OFFSET_VALUE_COMMIT_TIMESTAMP_FIELD_V3 = OFFSET_COMMIT_VALUE_SCHEMA_V3.get("commit_timestamp") - - private val GROUP_METADATA_KEY_SCHEMA = new Schema(new Field("group", STRING)) - private val GROUP_KEY_GROUP_FIELD = GROUP_METADATA_KEY_SCHEMA.get("group") - - private val MEMBER_ID_KEY = "member_id" - private val GROUP_INSTANCE_ID_KEY = "group_instance_id" - private val CLIENT_ID_KEY = "client_id" - private val CLIENT_HOST_KEY = "client_host" - private val REBALANCE_TIMEOUT_KEY = "rebalance_timeout" - private val SESSION_TIMEOUT_KEY = "session_timeout" - private val SUBSCRIPTION_KEY = "subscription" - private val ASSIGNMENT_KEY = "assignment" - - private val MEMBER_METADATA_V0 = new Schema( -new Field(MEMBER_ID_KEY, STRING), -new Field(CLIENT_ID_KEY, STRING), -new Field(CLIENT_HOST_KEY, STRING), -new Field(SESSION_TIMEOUT_KEY, INT32), -new Field(SUBSCRIPTION_KEY, BYTES), -new Field(ASSIGNMENT_KEY, BYTES)) - - private val MEMBER_METADATA_V1 = new Schema( -new Field(MEMBER_ID_KEY, STRING), -new Field(CLIENT_ID_KEY, STRING), -new Field(CLIENT_HOST_KEY, STRING), -new Field(REBALANCE_TIMEOUT_KEY, INT32), -new Field(SESSION_TIMEOUT_KEY, INT32), -new Field(SUBSCRIPTION_KEY, BYTES), -new Field(ASSIGNMENT_KEY, BYTES)) - - private val MEMBER_METADATA_V2 = MEMBER_METADATA_V1 - - private val MEMBER_METADATA_V3 = new Schema( -new Field(MEMBER_ID_KEY, STRING), -new Field(GROUP_INSTANCE_ID_KEY, NULLABLE_STRING), -new Field(CLIENT_ID_KEY, STRING), -new Field(CLIENT_HOST_KEY, STRING), -new Field(REBALANCE_TIMEOUT_KEY, INT32), -new Field(SESSION_TIMEOUT_KEY, INT32), -new Field(SUBSCRIPTION_KEY, BYTES), -new Field(ASSIGNMENT_KEY, BYTES)) - - private val PROTOCOL_TYPE_KEY = "protocol_type" - private val GENERATION_KEY = "generation" - private val PROTOCOL_KEY = "protocol" - private val LEADER_KEY = "leader" -
[GitHub] [kafka] dajac commented on a change in pull request #9318: KAFKA-10497 Convert group coordinator metadata schemas to use generat…
dajac commented on a change in pull request #9318: URL: https://github.com/apache/kafka/pull/9318#discussion_r492663086 ## File path: core/src/main/scala/kafka/coordinator/group/GroupMetadataManager.scala ## @@ -997,189 +997,52 @@ object GroupMetadataManager { val MetricsGroup: String = "group-coordinator-metrics" val LoadTimeSensor: String = "GroupPartitionLoadTime" - private val CURRENT_OFFSET_KEY_SCHEMA_VERSION = 1.toShort - private val CURRENT_GROUP_KEY_SCHEMA_VERSION = 2.toShort - - private val OFFSET_COMMIT_KEY_SCHEMA = new Schema(new Field("group", STRING), -new Field("topic", STRING), -new Field("partition", INT32)) - private val OFFSET_KEY_GROUP_FIELD = OFFSET_COMMIT_KEY_SCHEMA.get("group") - private val OFFSET_KEY_TOPIC_FIELD = OFFSET_COMMIT_KEY_SCHEMA.get("topic") - private val OFFSET_KEY_PARTITION_FIELD = OFFSET_COMMIT_KEY_SCHEMA.get("partition") - - private val OFFSET_COMMIT_VALUE_SCHEMA_V0 = new Schema(new Field("offset", INT64), -new Field("metadata", STRING, "Associated metadata.", ""), -new Field("timestamp", INT64)) - private val OFFSET_VALUE_OFFSET_FIELD_V0 = OFFSET_COMMIT_VALUE_SCHEMA_V0.get("offset") - private val OFFSET_VALUE_METADATA_FIELD_V0 = OFFSET_COMMIT_VALUE_SCHEMA_V0.get("metadata") - private val OFFSET_VALUE_TIMESTAMP_FIELD_V0 = OFFSET_COMMIT_VALUE_SCHEMA_V0.get("timestamp") - - private val OFFSET_COMMIT_VALUE_SCHEMA_V1 = new Schema(new Field("offset", INT64), -new Field("metadata", STRING, "Associated metadata.", ""), -new Field("commit_timestamp", INT64), -new Field("expire_timestamp", INT64)) - private val OFFSET_VALUE_OFFSET_FIELD_V1 = OFFSET_COMMIT_VALUE_SCHEMA_V1.get("offset") - private val OFFSET_VALUE_METADATA_FIELD_V1 = OFFSET_COMMIT_VALUE_SCHEMA_V1.get("metadata") - private val OFFSET_VALUE_COMMIT_TIMESTAMP_FIELD_V1 = OFFSET_COMMIT_VALUE_SCHEMA_V1.get("commit_timestamp") - private val OFFSET_VALUE_EXPIRE_TIMESTAMP_FIELD_V1 = OFFSET_COMMIT_VALUE_SCHEMA_V1.get("expire_timestamp") - - private val OFFSET_COMMIT_VALUE_SCHEMA_V2 = new Schema(new Field("offset", INT64), -new Field("metadata", STRING, "Associated metadata.", ""), -new Field("commit_timestamp", INT64)) - private val OFFSET_VALUE_OFFSET_FIELD_V2 = OFFSET_COMMIT_VALUE_SCHEMA_V2.get("offset") - private val OFFSET_VALUE_METADATA_FIELD_V2 = OFFSET_COMMIT_VALUE_SCHEMA_V2.get("metadata") - private val OFFSET_VALUE_COMMIT_TIMESTAMP_FIELD_V2 = OFFSET_COMMIT_VALUE_SCHEMA_V2.get("commit_timestamp") - - private val OFFSET_COMMIT_VALUE_SCHEMA_V3 = new Schema( -new Field("offset", INT64), -new Field("leader_epoch", INT32), -new Field("metadata", STRING, "Associated metadata.", ""), -new Field("commit_timestamp", INT64)) - private val OFFSET_VALUE_OFFSET_FIELD_V3 = OFFSET_COMMIT_VALUE_SCHEMA_V3.get("offset") - private val OFFSET_VALUE_LEADER_EPOCH_FIELD_V3 = OFFSET_COMMIT_VALUE_SCHEMA_V3.get("leader_epoch") - private val OFFSET_VALUE_METADATA_FIELD_V3 = OFFSET_COMMIT_VALUE_SCHEMA_V3.get("metadata") - private val OFFSET_VALUE_COMMIT_TIMESTAMP_FIELD_V3 = OFFSET_COMMIT_VALUE_SCHEMA_V3.get("commit_timestamp") - - private val GROUP_METADATA_KEY_SCHEMA = new Schema(new Field("group", STRING)) - private val GROUP_KEY_GROUP_FIELD = GROUP_METADATA_KEY_SCHEMA.get("group") - - private val MEMBER_ID_KEY = "member_id" - private val GROUP_INSTANCE_ID_KEY = "group_instance_id" - private val CLIENT_ID_KEY = "client_id" - private val CLIENT_HOST_KEY = "client_host" - private val REBALANCE_TIMEOUT_KEY = "rebalance_timeout" - private val SESSION_TIMEOUT_KEY = "session_timeout" - private val SUBSCRIPTION_KEY = "subscription" - private val ASSIGNMENT_KEY = "assignment" - - private val MEMBER_METADATA_V0 = new Schema( -new Field(MEMBER_ID_KEY, STRING), -new Field(CLIENT_ID_KEY, STRING), -new Field(CLIENT_HOST_KEY, STRING), -new Field(SESSION_TIMEOUT_KEY, INT32), -new Field(SUBSCRIPTION_KEY, BYTES), -new Field(ASSIGNMENT_KEY, BYTES)) - - private val MEMBER_METADATA_V1 = new Schema( -new Field(MEMBER_ID_KEY, STRING), -new Field(CLIENT_ID_KEY, STRING), -new Field(CLIENT_HOST_KEY, STRING), -new Field(REBALANCE_TIMEOUT_KEY, INT32), -new Field(SESSION_TIMEOUT_KEY, INT32), -new Field(SUBSCRIPTION_KEY, BYTES), -new Field(ASSIGNMENT_KEY, BYTES)) - - private val MEMBER_METADATA_V2 = MEMBER_METADATA_V1 - - private val MEMBER_METADATA_V3 = new Schema( -new Field(MEMBER_ID_KEY, STRING), -new Field(GROUP_INSTANCE_ID_KEY, NULLABLE_STRING), -new Field(CLIENT_ID_KEY, STRING), -new Field(CLIENT_HOST_KEY, STRING), -new Field(REBALANCE_TIMEOUT_KEY, INT32), -new Field(SESSION_TIMEOUT_KEY, INT32), -new Field(SUBSCRIPTION_KEY, BYTES), -new Field(ASSIGNMENT_KEY, BYTES)) - - private val PROTOCOL_TYPE_KEY = "protocol_type" - private val GENERATION_KEY = "generation" - private val PROTOCOL_KEY = "protocol" - private val LEADER_KEY = "leader" -
[GitHub] [kafka] dajac commented on a change in pull request #9318: KAFKA-10497 Convert group coordinator metadata schemas to use generat…
dajac commented on a change in pull request #9318: URL: https://github.com/apache/kafka/pull/9318#discussion_r492662801 ## File path: core/src/main/resources/common/message/GroupMetadataValue.json ## @@ -0,0 +1,103 @@ +// Licensed to the Apache Software Foundation (ASF) under one or more +// contributor license agreements. See the NOTICE file distributed with +// this work for additional information regarding copyright ownership. +// The ASF licenses this file to You under the Apache License, Version 2.0 +// (the "License"); you may not use this file except in compliance with +// the License. You may obtain a copy of the License at +// +//http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +{ + "name": "GroupMetadataValue", Review comment: Could we use the same formatting as the request/response? I know that the formatting that we use is a bit weird but I think that we should remain consistent. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] dajac commented on a change in pull request #9318: KAFKA-10497 Convert group coordinator metadata schemas to use generat…
dajac commented on a change in pull request #9318: URL: https://github.com/apache/kafka/pull/9318#discussion_r492661437 ## File path: core/src/main/scala/kafka/coordinator/group/GroupMetadataManager.scala ## @@ -997,189 +997,52 @@ object GroupMetadataManager { val MetricsGroup: String = "group-coordinator-metrics" val LoadTimeSensor: String = "GroupPartitionLoadTime" - private val CURRENT_OFFSET_KEY_SCHEMA_VERSION = 1.toShort - private val CURRENT_GROUP_KEY_SCHEMA_VERSION = 2.toShort - - private val OFFSET_COMMIT_KEY_SCHEMA = new Schema(new Field("group", STRING), -new Field("topic", STRING), -new Field("partition", INT32)) - private val OFFSET_KEY_GROUP_FIELD = OFFSET_COMMIT_KEY_SCHEMA.get("group") - private val OFFSET_KEY_TOPIC_FIELD = OFFSET_COMMIT_KEY_SCHEMA.get("topic") - private val OFFSET_KEY_PARTITION_FIELD = OFFSET_COMMIT_KEY_SCHEMA.get("partition") - - private val OFFSET_COMMIT_VALUE_SCHEMA_V0 = new Schema(new Field("offset", INT64), -new Field("metadata", STRING, "Associated metadata.", ""), -new Field("timestamp", INT64)) - private val OFFSET_VALUE_OFFSET_FIELD_V0 = OFFSET_COMMIT_VALUE_SCHEMA_V0.get("offset") - private val OFFSET_VALUE_METADATA_FIELD_V0 = OFFSET_COMMIT_VALUE_SCHEMA_V0.get("metadata") - private val OFFSET_VALUE_TIMESTAMP_FIELD_V0 = OFFSET_COMMIT_VALUE_SCHEMA_V0.get("timestamp") - - private val OFFSET_COMMIT_VALUE_SCHEMA_V1 = new Schema(new Field("offset", INT64), -new Field("metadata", STRING, "Associated metadata.", ""), -new Field("commit_timestamp", INT64), -new Field("expire_timestamp", INT64)) - private val OFFSET_VALUE_OFFSET_FIELD_V1 = OFFSET_COMMIT_VALUE_SCHEMA_V1.get("offset") - private val OFFSET_VALUE_METADATA_FIELD_V1 = OFFSET_COMMIT_VALUE_SCHEMA_V1.get("metadata") - private val OFFSET_VALUE_COMMIT_TIMESTAMP_FIELD_V1 = OFFSET_COMMIT_VALUE_SCHEMA_V1.get("commit_timestamp") - private val OFFSET_VALUE_EXPIRE_TIMESTAMP_FIELD_V1 = OFFSET_COMMIT_VALUE_SCHEMA_V1.get("expire_timestamp") - - private val OFFSET_COMMIT_VALUE_SCHEMA_V2 = new Schema(new Field("offset", INT64), -new Field("metadata", STRING, "Associated metadata.", ""), -new Field("commit_timestamp", INT64)) - private val OFFSET_VALUE_OFFSET_FIELD_V2 = OFFSET_COMMIT_VALUE_SCHEMA_V2.get("offset") - private val OFFSET_VALUE_METADATA_FIELD_V2 = OFFSET_COMMIT_VALUE_SCHEMA_V2.get("metadata") - private val OFFSET_VALUE_COMMIT_TIMESTAMP_FIELD_V2 = OFFSET_COMMIT_VALUE_SCHEMA_V2.get("commit_timestamp") - - private val OFFSET_COMMIT_VALUE_SCHEMA_V3 = new Schema( -new Field("offset", INT64), -new Field("leader_epoch", INT32), -new Field("metadata", STRING, "Associated metadata.", ""), -new Field("commit_timestamp", INT64)) - private val OFFSET_VALUE_OFFSET_FIELD_V3 = OFFSET_COMMIT_VALUE_SCHEMA_V3.get("offset") - private val OFFSET_VALUE_LEADER_EPOCH_FIELD_V3 = OFFSET_COMMIT_VALUE_SCHEMA_V3.get("leader_epoch") - private val OFFSET_VALUE_METADATA_FIELD_V3 = OFFSET_COMMIT_VALUE_SCHEMA_V3.get("metadata") - private val OFFSET_VALUE_COMMIT_TIMESTAMP_FIELD_V3 = OFFSET_COMMIT_VALUE_SCHEMA_V3.get("commit_timestamp") - - private val GROUP_METADATA_KEY_SCHEMA = new Schema(new Field("group", STRING)) - private val GROUP_KEY_GROUP_FIELD = GROUP_METADATA_KEY_SCHEMA.get("group") - - private val MEMBER_ID_KEY = "member_id" - private val GROUP_INSTANCE_ID_KEY = "group_instance_id" - private val CLIENT_ID_KEY = "client_id" - private val CLIENT_HOST_KEY = "client_host" - private val REBALANCE_TIMEOUT_KEY = "rebalance_timeout" - private val SESSION_TIMEOUT_KEY = "session_timeout" - private val SUBSCRIPTION_KEY = "subscription" - private val ASSIGNMENT_KEY = "assignment" - - private val MEMBER_METADATA_V0 = new Schema( -new Field(MEMBER_ID_KEY, STRING), -new Field(CLIENT_ID_KEY, STRING), -new Field(CLIENT_HOST_KEY, STRING), -new Field(SESSION_TIMEOUT_KEY, INT32), -new Field(SUBSCRIPTION_KEY, BYTES), -new Field(ASSIGNMENT_KEY, BYTES)) - - private val MEMBER_METADATA_V1 = new Schema( -new Field(MEMBER_ID_KEY, STRING), -new Field(CLIENT_ID_KEY, STRING), -new Field(CLIENT_HOST_KEY, STRING), -new Field(REBALANCE_TIMEOUT_KEY, INT32), -new Field(SESSION_TIMEOUT_KEY, INT32), -new Field(SUBSCRIPTION_KEY, BYTES), -new Field(ASSIGNMENT_KEY, BYTES)) - - private val MEMBER_METADATA_V2 = MEMBER_METADATA_V1 - - private val MEMBER_METADATA_V3 = new Schema( -new Field(MEMBER_ID_KEY, STRING), -new Field(GROUP_INSTANCE_ID_KEY, NULLABLE_STRING), -new Field(CLIENT_ID_KEY, STRING), -new Field(CLIENT_HOST_KEY, STRING), -new Field(REBALANCE_TIMEOUT_KEY, INT32), -new Field(SESSION_TIMEOUT_KEY, INT32), -new Field(SUBSCRIPTION_KEY, BYTES), -new Field(ASSIGNMENT_KEY, BYTES)) - - private val PROTOCOL_TYPE_KEY = "protocol_type" - private val GENERATION_KEY = "generation" - private val PROTOCOL_KEY = "protocol" - private val LEADER_KEY = "leader" -
[jira] [Comment Edited] (KAFKA-10413) rebalancing leads to unevenly balanced connectors
[ https://issues.apache.org/jira/browse/KAFKA-10413?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17193402#comment-17193402 ] rameshkrishnan muthusamy edited comment on KAFKA-10413 at 9/22/20, 11:28 AM: - This is an issue starting with Kafka version 2.4.x onwards. The only work around I could find is to switch back to eager protocol which defeats the purpose of incremental cooperative rebalancing all together. was (Author: ramkrish1489): This is an issue starting with Kafka version 2.4.x onwards. The online work around I could find is to switch back to eager protocol which defeats the purpose of incremental cooperative rebalancing all together. > rebalancing leads to unevenly balanced connectors > - > > Key: KAFKA-10413 > URL: https://issues.apache.org/jira/browse/KAFKA-10413 > Project: Kafka > Issue Type: Bug > Components: KafkaConnect >Affects Versions: 2.5.1 >Reporter: yazgoo >Priority: Major > Attachments: connect_worker_balanced.png > > > Hi, > With CP 5.5, running kafka connect s3 sink on EC2 whith autoscaling enabled, > if a connect instance disappear, or a new one appear, we're seeing unbalanced > consumption, much like mentionned in this post: > [https://stackoverflow.com/questions/58644622/incremental-cooperative-rebalancing-leads-to-unevenly-balanced-connectors] > This usually leads to one kafka connect instance taking most of the load and > consumption not being able to keep on. > Currently, we're "fixing" this by deleting the connector and re-creating it, > but this is far from ideal. > Any suggestion on what we could do to mitigate this ? -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Commented] (KAFKA-10413) rebalancing leads to unevenly balanced connectors
[ https://issues.apache.org/jira/browse/KAFKA-10413?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17200015#comment-17200015 ] rameshkrishnan muthusamy commented on KAFKA-10413: -- We have PR opened on this [https://github.com/apache/kafka/pull/9319] > rebalancing leads to unevenly balanced connectors > - > > Key: KAFKA-10413 > URL: https://issues.apache.org/jira/browse/KAFKA-10413 > Project: Kafka > Issue Type: Bug > Components: KafkaConnect >Affects Versions: 2.5.1 >Reporter: yazgoo >Priority: Major > Attachments: connect_worker_balanced.png > > > Hi, > With CP 5.5, running kafka connect s3 sink on EC2 whith autoscaling enabled, > if a connect instance disappear, or a new one appear, we're seeing unbalanced > consumption, much like mentionned in this post: > [https://stackoverflow.com/questions/58644622/incremental-cooperative-rebalancing-leads-to-unevenly-balanced-connectors] > This usually leads to one kafka connect instance taking most of the load and > consumption not being able to keep on. > Currently, we're "fixing" this by deleting the connector and re-creating it, > but this is far from ideal. > Any suggestion on what we could do to mitigate this ? -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Commented] (KAFKA-7870) Error sending fetch request (sessionId=1578860481, epoch=INITIAL) to node 2: java.io.IOException: Connection to 2 was disconnected before the response was read.
[ https://issues.apache.org/jira/browse/KAFKA-7870?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=1724#comment-1724 ] Gowtham commented on KAFKA-7870: {code:java} o.a.k.c.FetchSessionHandler pool-18-thread-1 [INFO] [Consumer clientId=baa4f80-dbb3-4e2e-8428-0363f974d5f6, groupId=mygroup] Error sending fetch request (sessionId=1854680744, epoch=3) to node 6: {}. org.apache.kafka.common.errors.DisconnectException: null {code} We are also facing the same in 2.4.0 version. > Error sending fetch request (sessionId=1578860481, epoch=INITIAL) to node 2: > java.io.IOException: Connection to 2 was disconnected before the response was > read. > > > Key: KAFKA-7870 > URL: https://issues.apache.org/jira/browse/KAFKA-7870 > Project: Kafka > Issue Type: Bug > Components: clients >Affects Versions: 2.1.0 >Reporter: Chakhsu Lau >Priority: Blocker > > We build a kafka cluster with 5 brokers. But one of brokers suddenly stopped > running during the run. And it happened twice in the same broker. Here is the > log and is this a bug in kafka ? > {code:java} > [2019-01-25 12:57:14,686] INFO [ReplicaFetcher replicaId=3, leaderId=2, > fetcherId=0] Error sending fetch request (sessionId=1578860481, > epoch=INITIAL) to node 2: java.io.IOException: Connection to 2 was > disconnected before the response was read. > (org.apache.kafka.clients.FetchSessionHandler) > [2019-01-25 12:57:14,687] WARN [ReplicaFetcher replicaId=3, leaderId=2, > fetcherId=0] Error in response for fetch request (type=FetchRequest, > replicaId=3, maxWait=500, minBytes=1, maxBytes=10485760, > fetchData={api-result-bi-heatmap-8=(offset=0, logStartOffset=0, > maxBytes=1048576, currentLeaderEpoch=Optional[4]), > api-result-bi-heatmap-save-12=(offset=0, logStartOffset=0, maxBytes=1048576, > currentLeaderEpoch=Optional[4]), api-result-bi-heatmap-task-2=(offset=2, > logStartOffset=0, maxBytes=1048576, currentLeaderEpoch=Optional[4]), > api-result-bi-flow-39=(offset=1883206, logStartOffset=0, maxBytes=1048576, > currentLeaderEpoch=Optional[4]), __consumer_offsets-47=(offset=349437, > logStartOffset=0, maxBytes=1048576, currentLeaderEpoch=Optional[4]), > api-result-bi-heatmap-track-6=(offset=1039889, logStartOffset=0, > maxBytes=1048576, currentLeaderEpoch=Optional[4]), > api-result-bi-heatmap-task-17=(offset=0, logStartOffset=0, maxBytes=1048576, > currentLeaderEpoch=Optional[4]), __consumer_offsets-2=(offset=0, > logStartOffset=0, maxBytes=1048576, currentLeaderEpoch=Optional[4]), > api-result-bi-heatmap-aggs-19=(offset=1255056, logStartOffset=0, > maxBytes=1048576, currentLeaderEpoch=Optional[4])}, > isolationLevel=READ_UNCOMMITTED, toForget=, metadata=(sessionId=1578860481, > epoch=INITIAL)) (kafka.server.ReplicaFetcherThread) > java.io.IOException: Connection to 2 was disconnected before the response was > read > at > org.apache.kafka.clients.NetworkClientUtils.sendAndReceive(NetworkClientUtils.java:97) > at > kafka.server.ReplicaFetcherBlockingSend.sendRequest(ReplicaFetcherBlockingSend.scala:97) > at > kafka.server.ReplicaFetcherThread.fetchFromLeader(ReplicaFetcherThread.scala:190) > at > kafka.server.AbstractFetcherThread.kafka$server$AbstractFetcherThread$$processFetchRequest(AbstractFetcherThread.scala:241) > at > kafka.server.AbstractFetcherThread$$anonfun$maybeFetch$1.apply(AbstractFetcherThread.scala:130) > at > kafka.server.AbstractFetcherThread$$anonfun$maybeFetch$1.apply(AbstractFetcherThread.scala:129) > at scala.Option.foreach(Option.scala:257) > at > kafka.server.AbstractFetcherThread.maybeFetch(AbstractFetcherThread.scala:129) > at kafka.server.AbstractFetcherThread.doWork(AbstractFetcherThread.scala:111) > at kafka.utils.ShutdownableThread.run(ShutdownableThread.scala:82) > {code} -- This message was sent by Atlassian Jira (v8.3.4#803005)
[GitHub] [kafka] ramesh-muthusamy opened a new pull request #9319: Allow even distribution of lost/new tasks when more than one worker j…
ramesh-muthusamy opened a new pull request #9319: URL: https://github.com/apache/kafka/pull/9319 Allow even distribution of lost/new tasks when more than one worker joins the group at the same time Issue description: Existing issue 1 description : When more than one worker joins the consumer group the incremental co operative assignor revokes and re assigns atmost average number of tasks per worker. Issue: This results in the additional workers joining the group stay idle and would require more future rebalances to happen to have even distribution of tasks. Fix: As part of task assignment calculation following a deployment, the reassignment of tasks are calculated by revoking all the tasks above ceil(average) number of tasks. Existing issue 2 description: When more than one worker is lost and rejoins the group at most one worker will be re assigned with the lost tasks from all the workers that left the group. Issue: In scenarios where more than one worker is lost and rejoins the group only one among them gets assigned all the partitions that were lost in the past. The additional workers that have joined would not get any task assigned to them until a rebalance that happens in future. Fix: As part fo lost task re assignment all the new workers that have joined the group would be considered for task assignment and would be assigned in a round robin fashion with the new tasks. Testing strategy : System testing in a Kube environment completed. UT : updated to UT This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Commented] (KAFKA-10507) Limit the set of APIs returned in pre-authentication ApiVersions
[ https://issues.apache.org/jira/browse/KAFKA-10507?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=1716#comment-1716 ] Magnus Edenhill commented on KAFKA-10507: - librdkafka will only send ApiVersionRequests prior to authentication, so this would be a breaking change and thus require an ApiVersion bump, and if we do that a client can still use an older ApiVersionRequest version to get the supported APIs prior to auth, making this pre-auth filtering a bit pointless. What is the primary motivation here? > Limit the set of APIs returned in pre-authentication ApiVersions > - > > Key: KAFKA-10507 > URL: https://issues.apache.org/jira/browse/KAFKA-10507 > Project: Kafka > Issue Type: Improvement >Reporter: Jason Gustafson >Assignee: David Jacot >Priority: Major > > We use the ApiVersions RPC to check whether the SaslHandshake and > SaslAuthenticate APIs are supported before authenticating with the broker. > Currently the response contains all APIs supported by the broker. It seems > like a good idea to reduce the set of APIs returned at this level to only > those which are supported prior to authentication. -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Commented] (KAFKA-10497) Convert group/transaction coordinator metadata schemas to use generated protocol
[ https://issues.apache.org/jira/browse/KAFKA-10497?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17199989#comment-17199989 ] Chia-Ping Tsai commented on KAFKA-10497: [~hachikuji] I have submitted a PR (https://github.com/apache/kafka/pull/9318) to convert group coordinator. Will file another PR to convert transaction. > Convert group/transaction coordinator metadata schemas to use generated > protocol > > > Key: KAFKA-10497 > URL: https://issues.apache.org/jira/browse/KAFKA-10497 > Project: Kafka > Issue Type: Improvement >Reporter: Jason Gustafson >Assignee: Chia-Ping Tsai >Priority: Major > > We need to convert the internal schemas used for representing > transaction/group metadata to the generated protocol. This opens the door for > flexible version support on the next bump. -- This message was sent by Atlassian Jira (v8.3.4#803005)
[GitHub] [kafka] chia7712 opened a new pull request #9318: KAFKA-10497 Convert group coordinator metadata schemas to use generat…
chia7712 opened a new pull request #9318: URL: https://github.com/apache/kafka/pull/9318 issue: https://issues.apache.org/jira/browse/KAFKA-10497 ### Committer Checklist (excluded from commit message) - [ ] Verify design and implementation - [ ] Verify test coverage and CI build status - [ ] Verify documentation (including upgrade notes) This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org