[GitHub] [kafka] abbccdda commented on a change in pull request #9103: KAFKA-10181: Add redirection for (Incremental)AlterConfig, AlterClientQuota and CreateTopics

2020-09-22 Thread GitBox


abbccdda commented on a change in pull request #9103:
URL: https://github.com/apache/kafka/pull/9103#discussion_r493209974



##
File path: core/src/main/scala/kafka/api/ApiVersion.scala
##
@@ -103,6 +103,9 @@ object ApiVersion {
 KAFKA_2_7_IV0,
 // Bup Fetch protocol for Raft protocol (KIP-595)
 KAFKA_2_7_IV1,
+// Enable redirection (KIP-590)
+// TODO: remove this IBP in the 2.7 release if redirection work could not 
be done before the freeze

Review comment:
   But in case we release AK 2.7, wouldn't this flag give user the 
confidence to upgrade to, which we don't want to happen?





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] michael-carter-instaclustr commented on pull request #8844: KAFKA-9887 fix failed task or connector count on startup failure

2020-09-22 Thread GitBox


michael-carter-instaclustr commented on pull request #8844:
URL: https://github.com/apache/kafka/pull/8844#issuecomment-697142928


   Hey @C0urante, I'm not sure how long it usually takes for a committer to get 
around to looking at things like this, maybe several months is normal, but just 
thought I'd check with you, do you have any tips on how to get a committers 
attention?



This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] abbccdda commented on a change in pull request #9103: KAFKA-10181: Add redirection for (Incremental)AlterConfig, AlterClientQuota and CreateTopics

2020-09-22 Thread GitBox


abbccdda commented on a change in pull request #9103:
URL: https://github.com/apache/kafka/pull/9103#discussion_r493206068



##
File path: 
clients/src/main/java/org/apache/kafka/clients/admin/AlterConfigsUtil.java
##
@@ -0,0 +1,52 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.clients.admin;
+
+import org.apache.kafka.common.config.ConfigResource;
+import org.apache.kafka.common.message.IncrementalAlterConfigsRequestData;
+
+import java.util.Collection;
+import java.util.Map;
+
+public class AlterConfigsUtil {
+
+public static IncrementalAlterConfigsRequestData 
generateIncrementalRequestData(final Map> configs,
+   
 final boolean validateOnly) {
+return generateIncrementalRequestData(configs.keySet(), configs, 
validateOnly);
+}
+
+public static IncrementalAlterConfigsRequestData 
generateIncrementalRequestData(final Collection resources,

Review comment:
   The primary reason is that we would trigger the disallowed import if we 
do it in the request builder:
   ```
   [ant:checkstyle] [ERROR] 
/Users/boyang.chen/code/kafka/clients/src/main/java/org/apache/kafka/common/requests/IncrementalAlterConfigsRequest.java:20:1:
 Disallowed import - org.apache.kafka.clients.admin.AlterConfigOp. 
[ImportControl]
   ```
   Let me check if we could make exceptions here





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] huxihx commented on pull request #9218: MINOR: Fix shouldNotResetEpochHistoryHeadIfUndefinedPassed

2020-09-22 Thread GitBox


huxihx commented on pull request #9218:
URL: https://github.com/apache/kafka/pull/9218#issuecomment-697095836


   retest this please.
   



This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] hachikuji commented on a change in pull request #9103: KAFKA-10181: Add redirection for (Incremental)AlterConfig, AlterClientQuota and CreateTopics

2020-09-22 Thread GitBox


hachikuji commented on a change in pull request #9103:
URL: https://github.com/apache/kafka/pull/9103#discussion_r493160927



##
File path: 
clients/src/main/java/org/apache/kafka/clients/admin/AlterConfigsUtil.java
##
@@ -0,0 +1,52 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.clients.admin;
+
+import org.apache.kafka.common.config.ConfigResource;
+import org.apache.kafka.common.message.IncrementalAlterConfigsRequestData;
+
+import java.util.Collection;
+import java.util.Map;
+
+public class AlterConfigsUtil {
+
+public static IncrementalAlterConfigsRequestData 
generateIncrementalRequestData(final Map> configs,
+   
 final boolean validateOnly) {
+return generateIncrementalRequestData(configs.keySet(), configs, 
validateOnly);
+}
+
+public static IncrementalAlterConfigsRequestData 
generateIncrementalRequestData(final Collection resources,

Review comment:
   nit: might be useful to document the expectation that `resources` is a 
subset of the key set of `configs`. The signature surprised me a little bit.
   
   As an aside, this kind of convenience conversion seems more appropriate for 
`IncrementalAlterConfigsRequest.Builder` rather than a static class.

##
File path: core/src/main/scala/kafka/api/ApiVersion.scala
##
@@ -103,6 +103,9 @@ object ApiVersion {
 KAFKA_2_7_IV0,
 // Bup Fetch protocol for Raft protocol (KIP-595)
 KAFKA_2_7_IV1,
+// Enable redirection (KIP-590)
+// TODO: remove this IBP in the 2.7 release if redirection work could not 
be done before the freeze

Review comment:
   Get rid of this TODO. We do not need to remove IBP internal versions.

##
File path: core/src/main/scala/kafka/common/InterBrokerSendThread.scala
##
@@ -147,7 +147,9 @@ abstract class InterBrokerSendThread(name: String,
 
 case class RequestAndCompletionHandler(destination: Node,
request: AbstractRequest.Builder[_ <: 
AbstractRequest],
-   handler: RequestCompletionHandler)
+   handler: RequestCompletionHandler,
+   initialPrincipalName: String = null,

Review comment:
   nit: why don't we add a case class and make this optional. for example:
   
   ```scala
   case class InitialPrincipal(name: String, clientId: String)
   ```
   In addition to reducing parameters, that makes the expectation that both are 
provided explicit.
   

##
File path: core/src/main/scala/kafka/server/KafkaApis.scala
##
@@ -117,6 +117,89 @@ class KafkaApis(val requestChannel: RequestChannel,
   val adminZkClient = new AdminZkClient(zkClient)
   private val alterAclsPurgatory = new DelayedFuturePurgatory(purgatoryName = 
"AlterAcls", brokerId = config.brokerId)
 
+  /**
+   * The template to create a forward request handler.
+   *
+   * @tparam T request type
+   * @tparam R response type
+   * @tparam RK resource key
+   * @tparam RV resource value
+   */
+  private[server] abstract class ForwardRequestHandler[T <: AbstractRequest,
+R <: AbstractResponse, RK, RV](request: RequestChannel.Request) extends 
Logging {
+
+/**
+ * Split the given resource into authorized and unauthorized sets.
+ *
+ * @return authorized resources and unauthorized resources
+ */
+def resourceSplitByAuthorization(request: T): (Map[RK, RV], Map[RK, 
ApiError])
+
+/**
+ * Controller handling logic of the request.
+ */
+def process(authorizedResources: Map[RK, RV],
+unauthorizedResult: Map[RK, ApiError],
+request: T): Unit
+
+/**
+ * Build a forward request to the controller.
+ *
+ * @param authorizedResources authorized resources by the forwarding broker
+ * @param request the original request
+ * @return forward request builder
+ */
+def createRequestBuilder(authorizedResources: Map[RK, RV],
+ request: T): AbstractRequest.Builder[T]
+
+/**
+ * Merge the forward response with the previously unauthorized 

[GitHub] [kafka] ijuma commented on a change in pull request #4090: [KAFKA-6084] Propagate JSON parsing errors in ReassignPartitionsCommand

2020-09-22 Thread GitBox


ijuma commented on a change in pull request #4090:
URL: https://github.com/apache/kafka/pull/4090#discussion_r492811318



##
File path: core/src/main/scala/kafka/utils/Json.scala
##
@@ -69,8 +70,11 @@ object Json {
* @return An `Either` which in case of `Left` means an exception and 
`Right` is the actual return value.
*/
   def tryParseFull(input: String): Either[JsonProcessingException, JsonValue] =
-try Right(mapper.readTree(input)).map(JsonValue(_))
-catch { case e: JsonProcessingException => Left(e) }
+if (input != null && input.isEmpty)

Review comment:
   I think we should say `if (input == null || input.isEmpty)` instead. 
It's unexpected to get an exception instead of `Left` in a method that returns 
`Either`.

##
File path: core/src/main/scala/kafka/utils/Json.scala
##
@@ -69,8 +70,11 @@ object Json {
* @return An `Either` which in case of `Left` means an exception and 
`Right` is the actual return value.
*/
   def tryParseFull(input: String): Either[JsonProcessingException, JsonValue] =
-try Right(mapper.readTree(input)).map(JsonValue(_))
-catch { case e: JsonProcessingException => Left(e) }
+if (input != null && input.isEmpty)
+  Left(new JsonParseException(MissingNode.getInstance().traverse(), "The 
input string shouldn't be empty"))

Review comment:
   This seems fine.





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] ijuma commented on a change in pull request #9299: MINOR: Use `Map.foreachKv` to avoid tuple allocation in Scala 2.13

2020-09-22 Thread GitBox


ijuma commented on a change in pull request #9299:
URL: https://github.com/apache/kafka/pull/9299#discussion_r492361262



##
File path: core/src/main/scala/kafka/utils/Implicits.scala
##
@@ -46,4 +47,21 @@ object Implicits {
 
   }
 
+  /**
+   * Exposes `foreachKv` which maps to `foreachEntry` in Scala 2.13 and 
`foreach` in Scala 2.12
+   * (with the help of scala.collection.compat). `foreachEntry` avoids the 
tuple allocation and
+   * is more efficient.
+   *
+   * This was not named `foreachEntry` to avoid `unused import` warnings in 
Scala 2.13 (the implicit
+   * would not be triggered in Scala 2.13 since `Map.foreachEntry` would have 
precedence).
+   */
+  @nowarn("cat=unused-imports")
+  implicit class MapExtensionMethods[K, V](private val self: 
scala.collection.Map[K, V]) extends AnyVal {
+import scala.collection.compat._
+def foreachKv[U](f: (K, V) => U): Unit = {

Review comment:
   Checked with @hachikuji and he's fine with this change.





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] ijuma commented on pull request #4090: [KAFKA-6084] Propagate JSON parsing errors in ReassignPartitionsCommand

2020-09-22 Thread GitBox


ijuma commented on pull request #4090:
URL: https://github.com/apache/kafka/pull/4090#issuecomment-696784662


   Can you also rebase please so that the PR builder runs?



This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] viktorsomogyi commented on a change in pull request #4090: [KAFKA-6084] Propagate JSON parsing errors in ReassignPartitionsCommand

2020-09-22 Thread GitBox


viktorsomogyi commented on a change in pull request #4090:
URL: https://github.com/apache/kafka/pull/4090#discussion_r492704280



##
File path: core/src/test/scala/unit/kafka/utils/JsonTest.scala
##
@@ -40,25 +40,34 @@ class JsonTest {
   def testJsonParse(): Unit = {
 val jnf = JsonNodeFactory.instance
 
-assertEquals(Json.parseFull("{}"), Some(JsonValue(new ObjectNode(jnf
+assertEquals(Some(JsonValue(new ObjectNode(jnf))), Json.parseFull("{}"))
+assertEquals(Right(JsonValue(new ObjectNode(jnf))), 
Json.tryParseFull("{}"))
+assertThrows(classOf[IllegalArgumentException], () => 
Json.tryParseFull(null))
+assertThrows(classOf[IllegalArgumentException], () => 
Json.tryParseBytes(null))
 
-assertEquals(Json.parseFull("""{"foo":"bar"s}"""), None)
+assertEquals(Option(MissingNode.getInstance()).map(JsonValue(_)), 
Json.parseFull(""))
+assertEquals(Right(MissingNode.getInstance()).map(JsonValue(_)), 
Json.tryParseFull(""))

Review comment:
   You're right, it makes sense to return `None` and `Left` in those cases. 
Will change it and upload it to this PR.

##
File path: core/src/main/scala/kafka/utils/Json.scala
##
@@ -69,8 +70,11 @@ object Json {
* @return An `Either` which in case of `Left` means an exception and 
`Right` is the actual return value.
*/
   def tryParseFull(input: String): Either[JsonProcessingException, JsonValue] =
-try Right(mapper.readTree(input)).map(JsonValue(_))
-catch { case e: JsonProcessingException => Left(e) }
+if (input != null && input.isEmpty)

Review comment:
   Wasn't sure what to do here but `null` is checked in `readTree` and an 
IllegalArgumentException is thrown so I decided to go skip here and leave the 
original behavior.

##
File path: core/src/main/scala/kafka/utils/Json.scala
##
@@ -69,8 +70,11 @@ object Json {
* @return An `Either` which in case of `Left` means an exception and 
`Right` is the actual return value.
*/
   def tryParseFull(input: String): Either[JsonProcessingException, JsonValue] =
-try Right(mapper.readTree(input)).map(JsonValue(_))
-catch { case e: JsonProcessingException => Left(e) }
+if (input != null && input.isEmpty)

Review comment:
   Wasn't sure what to do here but `null` is checked in `readTree` and an 
IllegalArgumentException is thrown so I decided to skip here and leave the 
original behavior.

##
File path: core/src/main/scala/kafka/utils/Json.scala
##
@@ -69,8 +70,11 @@ object Json {
* @return An `Either` which in case of `Left` means an exception and 
`Right` is the actual return value.
*/
   def tryParseFull(input: String): Either[JsonProcessingException, JsonValue] =
-try Right(mapper.readTree(input)).map(JsonValue(_))
-catch { case e: JsonProcessingException => Left(e) }
+if (input != null && input.isEmpty)
+  Left(new JsonParseException(MissingNode.getInstance().traverse(), "The 
input string shouldn't be empty"))

Review comment:
   It seems like that the most adequate JsonProcessingException is 
JsonParseException that could apply here. Another possibility is to change the 
exception in return type to something like IOException and throw that here but 
it seems it's more vague than throwing JsonParseException.
   Another option is to simple throw IllegalArgumentException here instead of 
Left. That would correspond to the `readTree(InputStream)` as well. What do you 
think?





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] ijuma commented on pull request #9266: KAFKA-10469: Resolve logger levels hierarchically

2020-09-22 Thread GitBox


ijuma commented on pull request #9266:
URL: https://github.com/apache/kafka/pull/9266#issuecomment-696784359


   @tombentley `testDescribeConfigsForLog4jLogLevels` is failing.



This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] mjsax commented on pull request #9251: KAFKA-10459: Document IQ APIs where order does not hold between stores

2020-09-22 Thread GitBox


mjsax commented on pull request #9251:
URL: https://github.com/apache/kafka/pull/9251#issuecomment-696467547


   Thanks for the PR @showuon!



This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] ableegoldman commented on pull request #9312: KAFKA-10505: Fix parsing of generation log string.

2020-09-22 Thread GitBox


ableegoldman commented on pull request #9312:
URL: https://github.com/apache/kafka/pull/9312#issuecomment-696925826


   Original system test run was aborted for some reason. Kicked off a new set: 
https://jenkins.confluent.io/job/system-test-kafka-branch-builder/4182/



This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] mjsax commented on pull request #9156: KAFKA-10077: Filter downstream of state-store results in spurious tombstones

2020-09-22 Thread GitBox


mjsax commented on pull request #9156:
URL: https://github.com/apache/kafka/pull/9156#issuecomment-696823406


   Retest this please.



This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] mjsax merged pull request #9251: KAFKA-10459: Document IQ APIs where order does not hold between stores

2020-09-22 Thread GitBox


mjsax merged pull request #9251:
URL: https://github.com/apache/kafka/pull/9251


   



This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] gmunozfe commented on pull request #9309: KAFKA-10503: MockProducer doesn't throw ClassCastException when no

2020-09-22 Thread GitBox


gmunozfe commented on pull request #9309:
URL: https://github.com/apache/kafka/pull/9309#issuecomment-696995620


   @mjsax @guozhangwang @leonardge could you review? thanks!



This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] vvcephei commented on pull request #8892: KAFKA-10068: verify assignment performance with large cluster

2020-09-22 Thread GitBox


vvcephei commented on pull request #8892:
URL: https://github.com/apache/kafka/pull/8892#issuecomment-696752069


   Thanks, @ableegoldman !
   
   There was only one unrelated test failure:
   `Build / JDK 11 / 
kafka.network.DynamicConnectionQuotaTest.testDynamicListenerConnectionCreationRateQuota`



This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] abbccdda commented on a change in pull request #9270: [WIP] KAFKA-10284: Group membership update due to static member rejoin should be persisted

2020-09-22 Thread GitBox


abbccdda commented on a change in pull request #9270:
URL: https://github.com/apache/kafka/pull/9270#discussion_r492896997



##
File path: 
core/src/test/scala/unit/kafka/coordinator/group/GroupCoordinatorTest.scala
##
@@ -3882,6 +3942,21 @@ class GroupCoordinatorTest {
 Await.result(responseFuture, Duration(rebalanceTimeout + 100, 
TimeUnit.MILLISECONDS))
   }
 
+  private def staticJoinGroupWithPersistence(groupId: String,
+ memberId: String,

Review comment:
   nit: alignment

##
File path: core/src/main/scala/kafka/coordinator/group/GroupCoordinator.scala
##
@@ -1040,21 +1040,36 @@ class GroupCoordinator(val brokerId: Int,
 
 group.currentState match {
   case Stable =>
-info(s"Static member joins during Stable stage will not trigger 
rebalance.")
-group.maybeInvokeJoinCallback(member, JoinGroupResult(
-  members = List.empty,
-  memberId = newMemberId,
-  generationId = group.generationId,
-  protocolType = group.protocolType,
-  protocolName = group.protocolName,
-  // We want to avoid current leader performing trivial assignment 
while the group
-  // is in stable stage, because the new assignment in leader's next 
sync call
-  // won't be broadcast by a stable group. This could be guaranteed by
-  // always returning the old leader id so that the current leader 
won't assume itself
-  // as a leader based on the returned message, since the new 
member.id won't match
-  // returned leader id, therefore no assignment will be performed.
-  leaderId = currentLeader,
-  error = Errors.NONE))
+// check if group's selectedProtocol of next generation will change, 
if not, simply store group to persist the
+// updated static member, if yes, rebalance should be triggered to let 
the group's assignment and selectProtocol consistent
+val selectedProtocolOfNextGeneration = group.selectProtocol
+if (group.protocolName.contains(selectedProtocolOfNextGeneration)) {
+  info(s"Static member which joins during Stable stage and doesn't 
affect selectProtocol will not trigger rebalance.")
+  val groupAssignment: Map[String, Array[Byte]] = 
group.allMemberMetadata.map(member => member.memberId -> 
member.assignment).toMap
+  groupManager.storeGroup(group, groupAssignment, error => {
+group.inLock {
+  if (error != Errors.NONE) {
+warn(s"Failed to persist metadata for group ${group.groupId}: 
${error.message}")
+  }
+}
+  })
+  group.maybeInvokeJoinCallback(member, JoinGroupResult(
+members = List.empty,
+memberId = newMemberId,
+generationId = group.generationId,
+protocolType = group.protocolType,
+protocolName = group.protocolName,
+// We want to avoid current leader performing trivial assignment 
while the group
+// is in stable stage, because the new assignment in leader's next 
sync call
+// won't be broadcast by a stable group. This could be guaranteed 
by
+// always returning the old leader id so that the current leader 
won't assume itself
+// as a leader based on the returned message, since the new 
member.id won't match
+// returned leader id, therefore no assignment will be performed.
+leaderId = currentLeader,
+error = Errors.NONE))
+} else {
+  maybePrepareRebalance(group, s"Group's selectedProtocol will change 
because static member ${member.memberId} with instance id $groupInstanceId 
joined with change of protocol")

Review comment:
   Could you elaborate why this case is possible? We do have checks for 
`!group.supportsProtocols(protocolType, 
MemberMetadata.plainProtocolSet(protocols)` in the caller, so if the group 
protocol is incompatible, won't we just reject the rejoin?

##
File path: core/src/main/scala/kafka/coordinator/group/GroupCoordinator.scala
##
@@ -1040,21 +1040,36 @@ class GroupCoordinator(val brokerId: Int,
 
 group.currentState match {
   case Stable =>
-info(s"Static member joins during Stable stage will not trigger 
rebalance.")
-group.maybeInvokeJoinCallback(member, JoinGroupResult(
-  members = List.empty,
-  memberId = newMemberId,
-  generationId = group.generationId,
-  protocolType = group.protocolType,
-  protocolName = group.protocolName,
-  // We want to avoid current leader performing trivial assignment 
while the group
-  // is in stable stage, because the new assignment in leader's next 
sync call
-  // won't be broadcast by a stable group. This could be guaranteed by
-  // always returning the old leader id so that the current leader 
won't assume itself

[GitHub] [kafka] chia7712 commented on pull request #9162: MINOR: refactor Log to get rid of "return" in nested anonymous function

2020-09-22 Thread GitBox


chia7712 commented on pull request #9162:
URL: https://github.com/apache/kafka/pull/9162#issuecomment-696523232


   ```
   Build / JDK 15 / 
org.apache.kafka.streams.integration.EosBetaUpgradeIntegrationTest.shouldUpgradeFromEosAlphaToEosBeta[true]
   Build / JDK 8 / kafka.api.ConsumerBounceTest.testClose
   ```
   
   unrelated failure. rebase to trigger QA



This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] guozhangwang commented on a change in pull request #9083: KAFKA-9450: Follow-up; Forbid process after closed

2020-09-22 Thread GitBox


guozhangwang commented on a change in pull request #9083:
URL: https://github.com/apache/kafka/pull/9083#discussion_r492944504



##
File path: 
streams/src/main/java/org/apache/kafka/streams/processor/internals/ProcessorContextImpl.java
##
@@ -217,12 +217,12 @@ public StateStore getStateStore(final String name) {
 forward((ProcessorNode) child, key, value);
 }
 } else {
-final ProcessorNode child = 
currentNode().getChild(sendTo);

Review comment:
   I made the change in ProcessorNode to add back the template types: 
https://github.com/apache/kafka/pull/9083/files/82b6f6f5d238401097e0906c8135c5c189524666#diff-705bfd0ed3f214048b76d775708cc7d2L96
   
   But since `currentNode()`'s template is `` its templated 
`getChild` and that's why I need to weaken it here --- as you can see from the 
above `if` branch, it now aligns consistently on the typing.

##
File path: 
streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamFlatTransformValues.java
##
@@ -44,19 +45,18 @@ public KStreamFlatTransformValues(final 
ValueTransformerWithKeySupplier 
implements Processor {
+public static class KStreamFlatTransformValuesProcessor 
extends AbstractProcessor {

Review comment:
   As described in at the top, `Let all built-in processors to extend from 
AbstractProcessor.` The main reason is that AbstractProcessor provides some 
basic functionalities and hence it's better to let our own impl to base on them.





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] hachikuji merged pull request #9130: KAFKA-10492; Core Kafka Raft Implementation (KIP-595)

2020-09-22 Thread GitBox


hachikuji merged pull request #9130:
URL: https://github.com/apache/kafka/pull/9130


   



This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] nizhikov commented on pull request #9312: KAFKA-10505: Fix parsing of generation log string.

2020-09-22 Thread GitBox


nizhikov commented on pull request #9312:
URL: https://github.com/apache/kafka/pull/9312#issuecomment-696559508


   @mjsax These tests are broken with the 
7e7bb184d2abe34280a7f0eb0f0d9fc0e32389f2
   
   Line logging only generationId - 
https://github.com/apache/kafka/commit/7e7bb184d2abe34280a7f0eb0f0d9fc0e32389f2#diff-30048900127c988a69027b70b0ce4eacL504
   
   New line with the whole Generation object logging - 
https://github.com/apache/kafka/commit/7e7bb184d2abe34280a7f0eb0f0d9fc0e32389f2#diff-30048900127c988a69027b70b0ce4eacR590



This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] tombentley edited a comment on pull request #9266: KAFKA-10469: Resolve logger levels hierarchically

2020-09-22 Thread GitBox


tombentley edited a comment on pull request #9266:
URL: https://github.com/apache/kafka/pull/9266#issuecomment-696622982


   @ijuma I've fixed an infinite loop, could you trigger the build again please?



This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] abbccdda commented on a change in pull request #9083: KAFKA-9450: Follow-up; Forbid process after closed

2020-09-22 Thread GitBox


abbccdda commented on a change in pull request #9083:
URL: https://github.com/apache/kafka/pull/9083#discussion_r492941448



##
File path: 
streams/src/test/java/org/apache/kafka/streams/processor/internals/StreamTaskTest.java
##
@@ -1566,6 +1566,7 @@ public void shouldCheckpointForSuspendedTask() {
 EasyMock.verify(stateManager);
 }
 
+

Review comment:
   nit: not necessary

##
File path: 
streams/src/main/java/org/apache/kafka/streams/processor/internals/ProcessorContextImpl.java
##
@@ -217,12 +217,12 @@ public StateStore getStateStore(final String name) {
 forward((ProcessorNode) child, key, value);
 }
 } else {
-final ProcessorNode child = 
currentNode().getChild(sendTo);

Review comment:
   +1

##
File path: 
streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamFlatTransformValues.java
##
@@ -44,19 +45,18 @@ public KStreamFlatTransformValues(final 
ValueTransformerWithKeySupplier 
implements Processor {
+public static class KStreamFlatTransformValuesProcessor 
extends AbstractProcessor {

Review comment:
   Could you elaborate why this is better than Processor?





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] vvcephei commented on a change in pull request #8892: KAFKA-10068: verify assignment performance with large cluster

2020-09-22 Thread GitBox


vvcephei commented on a change in pull request #8892:
URL: https://github.com/apache/kafka/pull/8892#discussion_r492436680



##
File path: 
streams/src/test/java/org/apache/kafka/streams/processor/internals/assignment/AssignmentTestUtils.java
##
@@ -77,13 +86,8 @@
 public static final TaskId TASK_2_3 = new TaskId(2, 3);
 
 public static final Set EMPTY_TASKS = emptySet();
-public static final List EMPTY_TASK_LIST = emptyList();
-public static final Map EMPTY_TASK_OFFSET_SUMS = emptyMap();
 public static final Map EMPTY_CHANGELOG_END_OFFSETS 
= new HashMap<>();
 
-private AssignmentTestUtils() {}

Review comment:
   Ah, note the `private`. The purpose of this constructor is to make it 
uninstantiable. I.e., it's "self-documenting" that it should only be a 
container for static members.





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] tombentley commented on pull request #9266: KAFKA-10469: Resolve logger levels hierarchically

2020-09-22 Thread GitBox


tombentley commented on pull request #9266:
URL: https://github.com/apache/kafka/pull/9266#issuecomment-696622982


   @ijuma I've fixed an infinite loop, could you trigger the build again pleast?



This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] chia7712 commented on a change in pull request #9206: MINOR: rewrite zipWithIndex by normal foreach to refrain unnecessary …

2020-09-22 Thread GitBox


chia7712 commented on a change in pull request #9206:
URL: https://github.com/apache/kafka/pull/9206#discussion_r492834735



##
File path: core/src/main/scala/kafka/log/LogValidator.scala
##
@@ -234,16 +234,17 @@ private[log] object LogValidator extends Logging {
 
 val firstBatch = getFirstBatchAndMaybeValidateNoMoreBatches(records, 
NoCompressionCodec)
 
-for (batch <- records.batches.asScala) {
+records.batches.forEach { batch =>
   validateBatch(topicPartition, firstBatch, batch, origin, toMagicValue, 
brokerTopicStats)
 
   val recordErrors = new ArrayBuffer[ApiRecordError](0)
-  for ((record, batchIndex) <- batch.asScala.view.zipWithIndex) {
+  var batchIndex = 0
+  batch.forEach { record =>
 validateRecord(batch, topicPartition, record, batchIndex, now, 
timestampType,
   timestampDiffMaxMs, compactedTopic, 
brokerTopicStats).foreach(recordError => recordErrors += recordError)
 // we fail the batch if any record fails, so we stop appending if any 
record fails
-if (recordErrors.isEmpty)
-  builder.appendWithOffset(offsetCounter.getAndIncrement(), record)
+if (recordErrors.isEmpty) 
builder.appendWithOffset(offsetCounter.getAndIncrement(), record)
+batchIndex += 1

Review comment:
   > Have we benchmarked this path?
   
   I didn't benchmark this path and you are right that optimization is small as 
we have to convert data in this path. I will revert it to make small patch. 

##
File path: core/src/main/scala/kafka/log/LogValidator.scala
##
@@ -279,14 +280,15 @@ private[log] object LogValidator extends Logging {
 
 val firstBatch = getFirstBatchAndMaybeValidateNoMoreBatches(records, 
NoCompressionCodec)
 
-for (batch <- records.batches.asScala) {
+records.batches.forEach { batch =>
   validateBatch(topicPartition, firstBatch, batch, origin, magic, 
brokerTopicStats)
 
   var maxBatchTimestamp = RecordBatch.NO_TIMESTAMP
   var offsetOfMaxBatchTimestamp = -1L
 
   val recordErrors = new ArrayBuffer[ApiRecordError](0)
-  for ((record, batchIndex) <- batch.asScala.view.zipWithIndex) {
+  var batchIndex = 0

Review comment:
   copy that

##
File path: core/src/main/scala/kafka/log/LogValidator.scala
##
@@ -234,17 +234,16 @@ private[log] object LogValidator extends Logging {
 
 val firstBatch = getFirstBatchAndMaybeValidateNoMoreBatches(records, 
NoCompressionCodec)
 
-records.batches.forEach { batch =>
+for (batch <- records.batches.asScala) {

Review comment:
   ok~





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] vvcephei merged pull request #8892: KAFKA-10068: verify assignment performance with large cluster

2020-09-22 Thread GitBox


vvcephei merged pull request #8892:
URL: https://github.com/apache/kafka/pull/8892


   



This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] mjsax commented on pull request #9312: KAFKA-10505: Fix parsing of generation log string.

2020-09-22 Thread GitBox


mjsax commented on pull request #9312:
URL: https://github.com/apache/kafka/pull/9312#issuecomment-696464404


   @nizhikov @ableegoldman Do we know when this broke? What was the old log 
line and what is the new one?
   
   Triggered system test run: 
https://jenkins.confluent.io/job/system-test-kafka-branch-builder/4181/



This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] chia7712 commented on pull request #9206: MINOR: rewrite zipWithIndex by normal foreach to refrain unnecessary …

2020-09-22 Thread GitBox


chia7712 commented on pull request #9206:
URL: https://github.com/apache/kafka/pull/9206#issuecomment-696522007


   rebase to run JMH again. This patch is still a benefit to validation :)



This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] abbccdda commented on a change in pull request #9311: KAFKA-9910: Implement new transaction timed out error

2020-09-22 Thread GitBox


abbccdda commented on a change in pull request #9311:
URL: https://github.com/apache/kafka/pull/9311#discussion_r491655717



##
File path: 
clients/src/main/java/org/apache/kafka/common/errors/TransactionTimeoutException.java
##
@@ -0,0 +1,43 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.common.errors;
+
+/**
+ * This exception indicates that the last ongoing transaction timed out on the 
coordinator.
+ * When encountering this exception, the producer should retry initialization 
with current epoch.
+ */
+public class TransactionTimeoutException extends ApiException {

Review comment:
   `TransactionTimedOut`

##
File path: 
clients/src/main/java/org/apache/kafka/common/errors/TransactionTimeoutException.java
##
@@ -0,0 +1,43 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.common.errors;
+
+/**
+ * This exception indicates that the last ongoing transaction timed out on the 
coordinator.
+ * When encountering this exception, the producer should retry initialization 
with current epoch.
+ */
+public class TransactionTimeoutException extends ApiException {
+
+private static final long serialVersionUID = 1L;
+
+public TransactionTimeoutException() {
+super();
+}
+
+public TransactionTimeoutException(String message, Throwable cause) {
+super(message, cause);
+}
+
+public TransactionTimeoutException(String message) {

Review comment:
   Do we need all other constructors?

##
File path: 
clients/src/test/java/org/apache/kafka/clients/producer/internals/TransactionManagerTest.java
##
@@ -1577,6 +1578,59 @@ public void 
testInvalidProducerEpochConvertToProducerFencedInAddPartitionToTxn()
 
verifyProducerFencedForAddPartitionsToTxn(Errors.INVALID_PRODUCER_EPOCH);
 }
 
+@Test
+public void testTxnTimeoutForAddPartitionsToTxn() throws 
InterruptedException {
+doInitTransactions();
+
+transactionManager.beginTransaction();
+transactionManager.failIfNotReadyForSend();
+Future responseFuture = appendToAccumulator(tp0);
+transactionManager.maybeAddPartitionToTransaction(tp0);
+
+assertFalse(responseFuture.isDone());
+prepareAddPartitionsToTxnResponse(Errors.TRANSACTION_TIMED_OUT, tp0, 
epoch, producerId);
+
+verifyTxnTimeout(responseFuture);
+}
+
+@Test
+public void testTxnTimeoutForAddOffsetsToTxn() throws InterruptedException 
{
+doInitTransactions();
+
+transactionManager.beginTransaction();
+transactionManager.failIfNotReadyForSend();
+transactionManager.sendOffsetsToTransaction(Collections.emptyMap(), 
new ConsumerGroupMetadata(consumerGroupId));
+
+Future responseFuture = appendToAccumulator(tp0);
+
+assertFalse(responseFuture.isDone());
+prepareAddOffsetsToTxnResponse(Errors.TRANSACTION_TIMED_OUT, 
consumerGroupId, producerId, epoch);
+
+verifyTxnTimeout(responseFuture);
+}
+
+@Test
+public void testTxnTimeoutInEndTxn() throws InterruptedException {
+
+doInitTransactions();
+
+transactionManager.beginTransaction();
+transactionManager.failIfNotReadyForSend();
+transactionManager.maybeAddPartitionToTransaction(tp0);
+TransactionalRequestResult commitResult = 
transactionManager.beginCommit();
+
+Future responseFuture = 

[GitHub] [kafka] ijuma commented on a change in pull request #9206: MINOR: rewrite zipWithIndex by normal foreach to refrain unnecessary …

2020-09-22 Thread GitBox


ijuma commented on a change in pull request #9206:
URL: https://github.com/apache/kafka/pull/9206#discussion_r492821097



##
File path: core/src/main/scala/kafka/log/LogValidator.scala
##
@@ -234,16 +234,17 @@ private[log] object LogValidator extends Logging {
 
 val firstBatch = getFirstBatchAndMaybeValidateNoMoreBatches(records, 
NoCompressionCodec)
 
-for (batch <- records.batches.asScala) {
+records.batches.forEach { batch =>
   validateBatch(topicPartition, firstBatch, batch, origin, toMagicValue, 
brokerTopicStats)
 
   val recordErrors = new ArrayBuffer[ApiRecordError](0)
-  for ((record, batchIndex) <- batch.asScala.view.zipWithIndex) {
+  var batchIndex = 0
+  batch.forEach { record =>
 validateRecord(batch, topicPartition, record, batchIndex, now, 
timestampType,
   timestampDiffMaxMs, compactedTopic, 
brokerTopicStats).foreach(recordError => recordErrors += recordError)
 // we fail the batch if any record fails, so we stop appending if any 
record fails
-if (recordErrors.isEmpty)
-  builder.appendWithOffset(offsetCounter.getAndIncrement(), record)
+if (recordErrors.isEmpty) 
builder.appendWithOffset(offsetCounter.getAndIncrement(), record)
+batchIndex += 1

Review comment:
   Have we benchmarked this path? It seems doubtful that these micro 
optimizations help given that we are `converting`.

##
File path: core/src/main/scala/kafka/log/LogValidator.scala
##
@@ -279,14 +280,15 @@ private[log] object LogValidator extends Logging {
 
 val firstBatch = getFirstBatchAndMaybeValidateNoMoreBatches(records, 
NoCompressionCodec)
 
-for (batch <- records.batches.asScala) {
+records.batches.forEach { batch =>
   validateBatch(topicPartition, firstBatch, batch, origin, magic, 
brokerTopicStats)
 
   var maxBatchTimestamp = RecordBatch.NO_TIMESTAMP
   var offsetOfMaxBatchTimestamp = -1L
 
   val recordErrors = new ArrayBuffer[ApiRecordError](0)
-  for ((record, batchIndex) <- batch.asScala.view.zipWithIndex) {
+  var batchIndex = 0

Review comment:
   Worth adding a comment here that this is a hot path and we want to avoid 
any unnecessary allocations.

##
File path: core/src/main/scala/kafka/log/LogValidator.scala
##
@@ -234,17 +234,16 @@ private[log] object LogValidator extends Logging {
 
 val firstBatch = getFirstBatchAndMaybeValidateNoMoreBatches(records, 
NoCompressionCodec)
 
-records.batches.forEach { batch =>
+for (batch <- records.batches.asScala) {

Review comment:
   I liked your changes to make the code more concise, I'd keep them.





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] ableegoldman commented on a change in pull request #8892: KAFKA-10068: verify assignment performance with large cluster

2020-09-22 Thread GitBox


ableegoldman commented on a change in pull request #8892:
URL: https://github.com/apache/kafka/pull/8892#discussion_r492382332



##
File path: 
streams/src/test/java/org/apache/kafka/streams/processor/internals/assignment/AssignmentTestUtils.java
##
@@ -77,13 +86,8 @@
 public static final TaskId TASK_2_3 = new TaskId(2, 3);
 
 public static final Set EMPTY_TASKS = emptySet();
-public static final List EMPTY_TASK_LIST = emptyList();
-public static final Map EMPTY_TASK_OFFSET_SUMS = emptyMap();
 public static final Map EMPTY_CHANGELOG_END_OFFSETS 
= new HashMap<>();
 
-private AssignmentTestUtils() {}

Review comment:
   > Do we need to instantiate this class now
   
   No...isn't that exactly the reason we don't need this?

##
File path: 
streams/src/main/java/org/apache/kafka/streams/processor/internals/assignment/ClientState.java
##
@@ -288,17 +288,17 @@ public void computeTaskLags(final UUID uuid, final 
Map allTaskEndO
 final Long endOffsetSum = taskEntry.getValue();
 final Long offsetSum = taskOffsetSums.getOrDefault(task, 0L);
 
-if (endOffsetSum < offsetSum) {
+if (offsetSum == Task.LATEST_OFFSET) {
+taskLagTotals.put(task, Task.LATEST_OFFSET);
+} else if (offsetSum == UNKNOWN_OFFSET_SUM) {
+taskLagTotals.put(task, UNKNOWN_OFFSET_SUM);
+} else if (endOffsetSum < offsetSum) {

Review comment:
   I don't know about fixing this before, but I found this while debugging 
these tests and fixed it on the side in this PR. To be fair, it wasn't hurting 
anything since we happen to put the right thing in the `taskLagTotals` map, but 
the warning logged was definitely incorrect.

##
File path: 
streams/src/test/java/org/apache/kafka/streams/processor/internals/assignment/AssignmentTestUtils.java
##
@@ -77,13 +86,8 @@
 public static final TaskId TASK_2_3 = new TaskId(2, 3);
 
 public static final Set EMPTY_TASKS = emptySet();
-public static final List EMPTY_TASK_LIST = emptyList();
-public static final Map EMPTY_TASK_OFFSET_SUMS = emptyMap();
 public static final Map EMPTY_CHANGELOG_END_OFFSETS 
= new HashMap<>();
 
-private AssignmentTestUtils() {}

Review comment:
   Ah yeah ok. I'll put it back





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] piotrrzysko commented on pull request #9315: KAFKA-10496: Removed relying on external DNS servers in tests

2020-09-22 Thread GitBox


piotrrzysko commented on pull request #9315:
URL: https://github.com/apache/kafka/pull/9315#issuecomment-696860022


   @jolshan @mumrah @dajac Could you please take look at this PR? 



This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] showuon commented on pull request #9121: KAFKA-10351: add tests for IOExceptions for GlobalStateManagerImpl/OffsetCheckpoint

2020-09-22 Thread GitBox


showuon commented on pull request #9121:
URL: https://github.com/apache/kafka/pull/9121#issuecomment-696467921


   @mjsax , sorry, I found you're online now. So could you take a look again 
for the long pending PR? It's 2nd review, so it should be easier. Thanks. 



This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] ijuma merged pull request #9299: MINOR: Use `Map.forKeyValue` to avoid tuple allocation in Scala 2.13

2020-09-22 Thread GitBox


ijuma merged pull request #9299:
URL: https://github.com/apache/kafka/pull/9299


   



This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] chia7712 commented on a change in pull request #9318: KAFKA-10497 Convert group coordinator metadata schemas to use generat…

2020-09-22 Thread GitBox


chia7712 commented on a change in pull request #9318:
URL: https://github.com/apache/kafka/pull/9318#discussion_r492696066



##
File path: 
core/src/main/scala/kafka/coordinator/group/GroupMetadataManager.scala
##
@@ -997,189 +997,52 @@ object GroupMetadataManager {
   val MetricsGroup: String = "group-coordinator-metrics"
   val LoadTimeSensor: String = "GroupPartitionLoadTime"
 
-  private val CURRENT_OFFSET_KEY_SCHEMA_VERSION = 1.toShort
-  private val CURRENT_GROUP_KEY_SCHEMA_VERSION = 2.toShort
-
-  private val OFFSET_COMMIT_KEY_SCHEMA = new Schema(new Field("group", STRING),
-new Field("topic", STRING),
-new Field("partition", INT32))
-  private val OFFSET_KEY_GROUP_FIELD = OFFSET_COMMIT_KEY_SCHEMA.get("group")
-  private val OFFSET_KEY_TOPIC_FIELD = OFFSET_COMMIT_KEY_SCHEMA.get("topic")
-  private val OFFSET_KEY_PARTITION_FIELD = 
OFFSET_COMMIT_KEY_SCHEMA.get("partition")
-
-  private val OFFSET_COMMIT_VALUE_SCHEMA_V0 = new Schema(new Field("offset", 
INT64),
-new Field("metadata", STRING, "Associated metadata.", ""),
-new Field("timestamp", INT64))
-  private val OFFSET_VALUE_OFFSET_FIELD_V0 = 
OFFSET_COMMIT_VALUE_SCHEMA_V0.get("offset")
-  private val OFFSET_VALUE_METADATA_FIELD_V0 = 
OFFSET_COMMIT_VALUE_SCHEMA_V0.get("metadata")
-  private val OFFSET_VALUE_TIMESTAMP_FIELD_V0 = 
OFFSET_COMMIT_VALUE_SCHEMA_V0.get("timestamp")
-
-  private val OFFSET_COMMIT_VALUE_SCHEMA_V1 = new Schema(new Field("offset", 
INT64),
-new Field("metadata", STRING, "Associated metadata.", ""),
-new Field("commit_timestamp", INT64),
-new Field("expire_timestamp", INT64))
-  private val OFFSET_VALUE_OFFSET_FIELD_V1 = 
OFFSET_COMMIT_VALUE_SCHEMA_V1.get("offset")
-  private val OFFSET_VALUE_METADATA_FIELD_V1 = 
OFFSET_COMMIT_VALUE_SCHEMA_V1.get("metadata")
-  private val OFFSET_VALUE_COMMIT_TIMESTAMP_FIELD_V1 = 
OFFSET_COMMIT_VALUE_SCHEMA_V1.get("commit_timestamp")
-  private val OFFSET_VALUE_EXPIRE_TIMESTAMP_FIELD_V1 = 
OFFSET_COMMIT_VALUE_SCHEMA_V1.get("expire_timestamp")
-
-  private val OFFSET_COMMIT_VALUE_SCHEMA_V2 = new Schema(new Field("offset", 
INT64),
-new Field("metadata", STRING, "Associated metadata.", ""),
-new Field("commit_timestamp", INT64))
-  private val OFFSET_VALUE_OFFSET_FIELD_V2 = 
OFFSET_COMMIT_VALUE_SCHEMA_V2.get("offset")
-  private val OFFSET_VALUE_METADATA_FIELD_V2 = 
OFFSET_COMMIT_VALUE_SCHEMA_V2.get("metadata")
-  private val OFFSET_VALUE_COMMIT_TIMESTAMP_FIELD_V2 = 
OFFSET_COMMIT_VALUE_SCHEMA_V2.get("commit_timestamp")
-
-  private val OFFSET_COMMIT_VALUE_SCHEMA_V3 = new Schema(
-new Field("offset", INT64),
-new Field("leader_epoch", INT32),
-new Field("metadata", STRING, "Associated metadata.", ""),
-new Field("commit_timestamp", INT64))
-  private val OFFSET_VALUE_OFFSET_FIELD_V3 = 
OFFSET_COMMIT_VALUE_SCHEMA_V3.get("offset")
-  private val OFFSET_VALUE_LEADER_EPOCH_FIELD_V3 = 
OFFSET_COMMIT_VALUE_SCHEMA_V3.get("leader_epoch")
-  private val OFFSET_VALUE_METADATA_FIELD_V3 = 
OFFSET_COMMIT_VALUE_SCHEMA_V3.get("metadata")
-  private val OFFSET_VALUE_COMMIT_TIMESTAMP_FIELD_V3 = 
OFFSET_COMMIT_VALUE_SCHEMA_V3.get("commit_timestamp")
-
-  private val GROUP_METADATA_KEY_SCHEMA = new Schema(new Field("group", 
STRING))
-  private val GROUP_KEY_GROUP_FIELD = GROUP_METADATA_KEY_SCHEMA.get("group")
-
-  private val MEMBER_ID_KEY = "member_id"
-  private val GROUP_INSTANCE_ID_KEY = "group_instance_id"
-  private val CLIENT_ID_KEY = "client_id"
-  private val CLIENT_HOST_KEY = "client_host"
-  private val REBALANCE_TIMEOUT_KEY = "rebalance_timeout"
-  private val SESSION_TIMEOUT_KEY = "session_timeout"
-  private val SUBSCRIPTION_KEY = "subscription"
-  private val ASSIGNMENT_KEY = "assignment"
-
-  private val MEMBER_METADATA_V0 = new Schema(
-new Field(MEMBER_ID_KEY, STRING),
-new Field(CLIENT_ID_KEY, STRING),
-new Field(CLIENT_HOST_KEY, STRING),
-new Field(SESSION_TIMEOUT_KEY, INT32),
-new Field(SUBSCRIPTION_KEY, BYTES),
-new Field(ASSIGNMENT_KEY, BYTES))
-
-  private val MEMBER_METADATA_V1 = new Schema(
-new Field(MEMBER_ID_KEY, STRING),
-new Field(CLIENT_ID_KEY, STRING),
-new Field(CLIENT_HOST_KEY, STRING),
-new Field(REBALANCE_TIMEOUT_KEY, INT32),
-new Field(SESSION_TIMEOUT_KEY, INT32),
-new Field(SUBSCRIPTION_KEY, BYTES),
-new Field(ASSIGNMENT_KEY, BYTES))
-
-  private val MEMBER_METADATA_V2 = MEMBER_METADATA_V1
-
-  private val MEMBER_METADATA_V3 = new Schema(
-new Field(MEMBER_ID_KEY, STRING),
-new Field(GROUP_INSTANCE_ID_KEY, NULLABLE_STRING),
-new Field(CLIENT_ID_KEY, STRING),
-new Field(CLIENT_HOST_KEY, STRING),
-new Field(REBALANCE_TIMEOUT_KEY, INT32),
-new Field(SESSION_TIMEOUT_KEY, INT32),
-new Field(SUBSCRIPTION_KEY, BYTES),
-new Field(ASSIGNMENT_KEY, BYTES))
-
-  private val PROTOCOL_TYPE_KEY = "protocol_type"
-  private val GENERATION_KEY = "generation"
-  private val PROTOCOL_KEY = "protocol"
-  private val LEADER_KEY = "leader"
-  

[GitHub] [kafka] vvcephei merged pull request #9262: MINOR: Fix log message when tasks directory is cleaned manually

2020-09-22 Thread GitBox


vvcephei merged pull request #9262:
URL: https://github.com/apache/kafka/pull/9262


   



This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] chia7712 commented on pull request #9284: KAFKA-10479 Throw exception if users try to update configs of existen…

2020-09-22 Thread GitBox


chia7712 commented on pull request #9284:
URL: https://github.com/apache/kafka/pull/9284#issuecomment-696528358


   > So the previous behavior would have caused the updated configuration to be 
persisted, but it wouldn't take effect until the broker was restarted. Is that 
right?
   
   You do give a better explanation :)



This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] dajac commented on a change in pull request #9317: KAFKA-10509: Added throttle connection accept rate metric (KIP-612)

2020-09-22 Thread GitBox


dajac commented on a change in pull request #9317:
URL: https://github.com/apache/kafka/pull/9317#discussion_r492570653



##
File path: core/src/main/scala/kafka/network/SocketServer.scala
##
@@ -1414,7 +1420,8 @@ class ConnectionQuotas(config: KafkaConfig, time: Time, 
metrics: Metrics) extend
 
   class ListenerConnectionQuota(lock: Object, listener: ListenerName) extends 
ListenerReconfigurable {
 @volatile private var _maxConnections = Int.MaxValue
-val connectionRateSensor = createConnectionRateQuotaSensor(Int.MaxValue, 
Some(listener.value))
+val connectionRateSensor: Sensor = 
createConnectionRateQuotaSensor(Int.MaxValue, Some(listener.value))
+val connectionRateThrottleSensor: Sensor = 
createConnectionRateThrottleSensor()

Review comment:
   nit: Do we really need to provide the type here? It seems that we 
usually don't provide it in that file.

##
File path: core/src/main/scala/kafka/network/SocketServer.scala
##
@@ -1292,6 +1292,12 @@ class ConnectionQuotas(config: KafkaConfig, time: Time, 
metrics: Metrics) extend
 counts.synchronized {
   val startThrottleTimeMs = time.milliseconds
   val throttleTimeMs = 
math.max(recordConnectionAndGetThrottleTimeMs(listenerName, 
startThrottleTimeMs), 0)
+  if (throttleTimeMs > 0) {
+// record throttle time due to hitting connection rate limit
+// connection could be throttled longer if the limit of the number of 
active connections is reached as well
+maxConnectionsPerListener.get(listenerName)
+  
.foreach(_.connectionRateThrottleSensor.record(throttleTimeMs.toDouble, 
startThrottleTimeMs))

Review comment:
   It is a bit annoying that we have to lookup the 
`ListenerConnectionQuota` twice. Once in `recordConnectionAndGetThrottleTimeMs` 
and once here. I wonder if we could look it up once to record the rate and to 
record the throttle time.
   
   I am thinking about the following to be more concrete. We could add two 
methods in `ListenerConnectionQuota`: 1) `record` and 2) `recordThrottleTime`. 
They would encapsulate the logic to respectively record the rate (for the 
listener and the broker like we do now) and the throttle time. So here we could 
look up the `ListenerConnectionQuota` record and get the throttle time, and 
record the throttle time if > 0.
   
   That could improve the readability a bit. I am not sure that it would make a 
difference from a performance point of view. WDYT?

##
File path: core/src/main/scala/kafka/network/SocketServer.scala
##
@@ -1447,13 +1454,33 @@ class ConnectionQuotas(config: KafkaConfig, time: Time, 
metrics: Metrics) extend
   }
 }
 
+def removeSensors(): Unit = {

Review comment:
   nit: What about naming this `close`? It feels a bit more natural to call 
`close` when we don't need the object anymore and it would be more aligned with 
the `close` methods that we already have in this file. For instance, 
`ConnectionQuotas#close` which also cleanup the metrics.
   
   BTW, not related to this PR but it seems that we don't close the 
`ListenerConnectionQuota` when the `ConnectionQuotas` is closed. I suppose that 
we leave metrics around. Is it the case?





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] showuon commented on pull request #9301: KAFKA-10482: Fix flaky testDynamicListenerConnectionCreationRateQuota

2020-09-22 Thread GitBox


showuon commented on pull request #9301:
URL: https://github.com/apache/kafka/pull/9301#issuecomment-696577916


   @dajac , thanks for the comments. I've updated in this commit: 
https://github.com/apache/kafka/pull/9301/commits/8e2714c6b4d0b73c36893c365998380cfaf724f6.
 Thanks.



This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] mjsax commented on a change in pull request #9251: KAFKA-10459: Document IQ APIs where order does not hold between stores

2020-09-22 Thread GitBox


mjsax commented on a change in pull request #9251:
URL: https://github.com/apache/kafka/pull/9251#discussion_r492434090



##
File path: 
streams/src/main/java/org/apache/kafka/streams/state/ReadOnlyWindowStore.java
##
@@ -25,6 +25,11 @@
  * A window store that only supports read operations.
  * Implementations should be thread-safe as concurrent reads and writes are 
expected.
  *
+ * Note: The current implementation of either forward or backward fetches on 
range-key-range-time does not

Review comment:
   ```suggestion
* Note: The current implementation of either forward or backward fetches 
on range-key-range-time does not
   ```





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] vvcephei commented on pull request #9262: MINOR: Fix log message when tasks directory is cleaned manually

2020-09-22 Thread GitBox


vvcephei commented on pull request #9262:
URL: https://github.com/apache/kafka/pull/9262#issuecomment-696760191


   Test failure unrelated: `Build / JDK 11 / 
kafka.api.ConsumerBounceTest.testRollingBrokerRestartsWithSmallerMaxGroupSizeConfigDisruptsBigGroup`



This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] big-andy-coates commented on pull request #9156: KAFKA-10077: Filter downstream of state-store results in spurious tombstones

2020-09-22 Thread GitBox


big-andy-coates commented on pull request #9156:
URL: https://github.com/apache/kafka/pull/9156#issuecomment-696383217







This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] avocader closed pull request #9227: KAFKA-10439: Connect's Values to parse BigInteger as Decimal with zero scale.

2020-09-22 Thread GitBox


avocader closed pull request #9227:
URL: https://github.com/apache/kafka/pull/9227


   



This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] dajac commented on a change in pull request #9301: KAFKA-10482: Fix flaky testDynamicListenerConnectionCreationRateQuota

2020-09-22 Thread GitBox


dajac commented on a change in pull request #9301:
URL: https://github.com/apache/kafka/pull/9301#discussion_r492540150



##
File path: 
core/src/test/scala/integration/kafka/network/DynamicConnectionQuotaTest.scala
##
@@ -179,17 +179,23 @@ class DynamicConnectionQuotaTest extends BaseRequestTest {
 reconfigureServers(props, perBrokerConfig = true, 
(KafkaConfig.ListenersProp, newListeners))
 waitForListener("EXTERNAL")
 
+// we need to set the initialConnectionCount earlier and pass to 
verifyConnectionRate method
+// so that the race condition won't occur for the following multi-thread 
test cases

Review comment:
   This comment is irrelevant now. I would just say that this is the 
expected connection count after each run.

##
File path: 
core/src/test/scala/integration/kafka/network/DynamicConnectionQuotaTest.scala
##
@@ -317,6 +325,12 @@ class DynamicConnectionQuotaTest extends BaseRequestTest {
 }
   }
 
+  // make sure the connection count state is the same as the 
expectedConnectionCount

Review comment:
   nit: I would remove this comment. I think that the method is 
self-explanatory now.





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] mjsax commented on a change in pull request #8181: KAFKA-9584 Headers ConcurrentModificationException

2020-09-22 Thread GitBox


mjsax commented on a change in pull request #8181:
URL: https://github.com/apache/kafka/pull/8181#discussion_r492360815



##
File path: 
streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamTask.java
##
@@ -581,8 +580,8 @@ public void punctuate(final ProcessorNode node, final long 
timestamp, final Punc
 if (processorContext.currentNode() != null) {
 throw new IllegalStateException(format("%sCurrent node is not 
null", logPrefix));
 }
-
-updateProcessorContext(new StampedRecord(DUMMY_RECORD, timestamp), 
node);
+
+updateProcessorContext(new StampedRecord(new 
ConsumerRecord<>(ProcessorContextImpl.NONEXIST_TOPIC, -1, -1L, null, null), 
timestamp), node);

Review comment:
   From my understanding, neither the `ConsumerRecord` nor the 
`ProcessroRecordContext` are the issue, but the shared `Header` object -- it's 
just a "side effect" that creating a new `ConsumerRecord` creates an new 
`Header` object internally.





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] dajac commented on a change in pull request #9318: KAFKA-10497 Convert group coordinator metadata schemas to use generat…

2020-09-22 Thread GitBox


dajac commented on a change in pull request #9318:
URL: https://github.com/apache/kafka/pull/9318#discussion_r492661437



##
File path: 
core/src/main/scala/kafka/coordinator/group/GroupMetadataManager.scala
##
@@ -997,189 +997,52 @@ object GroupMetadataManager {
   val MetricsGroup: String = "group-coordinator-metrics"
   val LoadTimeSensor: String = "GroupPartitionLoadTime"
 
-  private val CURRENT_OFFSET_KEY_SCHEMA_VERSION = 1.toShort
-  private val CURRENT_GROUP_KEY_SCHEMA_VERSION = 2.toShort
-
-  private val OFFSET_COMMIT_KEY_SCHEMA = new Schema(new Field("group", STRING),
-new Field("topic", STRING),
-new Field("partition", INT32))
-  private val OFFSET_KEY_GROUP_FIELD = OFFSET_COMMIT_KEY_SCHEMA.get("group")
-  private val OFFSET_KEY_TOPIC_FIELD = OFFSET_COMMIT_KEY_SCHEMA.get("topic")
-  private val OFFSET_KEY_PARTITION_FIELD = 
OFFSET_COMMIT_KEY_SCHEMA.get("partition")
-
-  private val OFFSET_COMMIT_VALUE_SCHEMA_V0 = new Schema(new Field("offset", 
INT64),
-new Field("metadata", STRING, "Associated metadata.", ""),
-new Field("timestamp", INT64))
-  private val OFFSET_VALUE_OFFSET_FIELD_V0 = 
OFFSET_COMMIT_VALUE_SCHEMA_V0.get("offset")
-  private val OFFSET_VALUE_METADATA_FIELD_V0 = 
OFFSET_COMMIT_VALUE_SCHEMA_V0.get("metadata")
-  private val OFFSET_VALUE_TIMESTAMP_FIELD_V0 = 
OFFSET_COMMIT_VALUE_SCHEMA_V0.get("timestamp")
-
-  private val OFFSET_COMMIT_VALUE_SCHEMA_V1 = new Schema(new Field("offset", 
INT64),
-new Field("metadata", STRING, "Associated metadata.", ""),
-new Field("commit_timestamp", INT64),
-new Field("expire_timestamp", INT64))
-  private val OFFSET_VALUE_OFFSET_FIELD_V1 = 
OFFSET_COMMIT_VALUE_SCHEMA_V1.get("offset")
-  private val OFFSET_VALUE_METADATA_FIELD_V1 = 
OFFSET_COMMIT_VALUE_SCHEMA_V1.get("metadata")
-  private val OFFSET_VALUE_COMMIT_TIMESTAMP_FIELD_V1 = 
OFFSET_COMMIT_VALUE_SCHEMA_V1.get("commit_timestamp")
-  private val OFFSET_VALUE_EXPIRE_TIMESTAMP_FIELD_V1 = 
OFFSET_COMMIT_VALUE_SCHEMA_V1.get("expire_timestamp")
-
-  private val OFFSET_COMMIT_VALUE_SCHEMA_V2 = new Schema(new Field("offset", 
INT64),
-new Field("metadata", STRING, "Associated metadata.", ""),
-new Field("commit_timestamp", INT64))
-  private val OFFSET_VALUE_OFFSET_FIELD_V2 = 
OFFSET_COMMIT_VALUE_SCHEMA_V2.get("offset")
-  private val OFFSET_VALUE_METADATA_FIELD_V2 = 
OFFSET_COMMIT_VALUE_SCHEMA_V2.get("metadata")
-  private val OFFSET_VALUE_COMMIT_TIMESTAMP_FIELD_V2 = 
OFFSET_COMMIT_VALUE_SCHEMA_V2.get("commit_timestamp")
-
-  private val OFFSET_COMMIT_VALUE_SCHEMA_V3 = new Schema(
-new Field("offset", INT64),
-new Field("leader_epoch", INT32),
-new Field("metadata", STRING, "Associated metadata.", ""),
-new Field("commit_timestamp", INT64))
-  private val OFFSET_VALUE_OFFSET_FIELD_V3 = 
OFFSET_COMMIT_VALUE_SCHEMA_V3.get("offset")
-  private val OFFSET_VALUE_LEADER_EPOCH_FIELD_V3 = 
OFFSET_COMMIT_VALUE_SCHEMA_V3.get("leader_epoch")
-  private val OFFSET_VALUE_METADATA_FIELD_V3 = 
OFFSET_COMMIT_VALUE_SCHEMA_V3.get("metadata")
-  private val OFFSET_VALUE_COMMIT_TIMESTAMP_FIELD_V3 = 
OFFSET_COMMIT_VALUE_SCHEMA_V3.get("commit_timestamp")
-
-  private val GROUP_METADATA_KEY_SCHEMA = new Schema(new Field("group", 
STRING))
-  private val GROUP_KEY_GROUP_FIELD = GROUP_METADATA_KEY_SCHEMA.get("group")
-
-  private val MEMBER_ID_KEY = "member_id"
-  private val GROUP_INSTANCE_ID_KEY = "group_instance_id"
-  private val CLIENT_ID_KEY = "client_id"
-  private val CLIENT_HOST_KEY = "client_host"
-  private val REBALANCE_TIMEOUT_KEY = "rebalance_timeout"
-  private val SESSION_TIMEOUT_KEY = "session_timeout"
-  private val SUBSCRIPTION_KEY = "subscription"
-  private val ASSIGNMENT_KEY = "assignment"
-
-  private val MEMBER_METADATA_V0 = new Schema(
-new Field(MEMBER_ID_KEY, STRING),
-new Field(CLIENT_ID_KEY, STRING),
-new Field(CLIENT_HOST_KEY, STRING),
-new Field(SESSION_TIMEOUT_KEY, INT32),
-new Field(SUBSCRIPTION_KEY, BYTES),
-new Field(ASSIGNMENT_KEY, BYTES))
-
-  private val MEMBER_METADATA_V1 = new Schema(
-new Field(MEMBER_ID_KEY, STRING),
-new Field(CLIENT_ID_KEY, STRING),
-new Field(CLIENT_HOST_KEY, STRING),
-new Field(REBALANCE_TIMEOUT_KEY, INT32),
-new Field(SESSION_TIMEOUT_KEY, INT32),
-new Field(SUBSCRIPTION_KEY, BYTES),
-new Field(ASSIGNMENT_KEY, BYTES))
-
-  private val MEMBER_METADATA_V2 = MEMBER_METADATA_V1
-
-  private val MEMBER_METADATA_V3 = new Schema(
-new Field(MEMBER_ID_KEY, STRING),
-new Field(GROUP_INSTANCE_ID_KEY, NULLABLE_STRING),
-new Field(CLIENT_ID_KEY, STRING),
-new Field(CLIENT_HOST_KEY, STRING),
-new Field(REBALANCE_TIMEOUT_KEY, INT32),
-new Field(SESSION_TIMEOUT_KEY, INT32),
-new Field(SUBSCRIPTION_KEY, BYTES),
-new Field(ASSIGNMENT_KEY, BYTES))
-
-  private val PROTOCOL_TYPE_KEY = "protocol_type"
-  private val GENERATION_KEY = "generation"
-  private val PROTOCOL_KEY = "protocol"
-  private val LEADER_KEY = "leader"
-  

[jira] [Assigned] (KAFKA-8653) Regression in JoinGroup v0 rebalance timeout handling

2020-09-22 Thread James Cheng (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-8653?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

James Cheng reassigned KAFKA-8653:
--

Assignee: James Cheng  (was: Jason Gustafson)

> Regression in JoinGroup v0 rebalance timeout handling
> -
>
> Key: KAFKA-8653
> URL: https://issues.apache.org/jira/browse/KAFKA-8653
> Project: Kafka
>  Issue Type: Bug
>Affects Versions: 2.3.0
>Reporter: Jason Gustafson
>Assignee: James Cheng
>Priority: Blocker
> Fix For: 2.3.1
>
>
> The rebalance timeout was added to the JoinGroup protocol in version 1. Prior 
> to 2.3, we handled version 0 JoinGroup requests by setting the rebalance 
> timeout to be equal to the session timeout. We lost this logic when we 
> converted the API to use the generated schema definition which uses the 
> default value of -1. The impact of this is that the group rebalance timeout 
> becomes 0, so rebalances finish immediately after we enter the 
> PrepareRebalance state and kick out all old members. This causes consumer 
> groups to enter an endless rebalance loop.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Assigned] (KAFKA-8653) Regression in JoinGroup v0 rebalance timeout handling

2020-09-22 Thread James Cheng (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-8653?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

James Cheng reassigned KAFKA-8653:
--

Assignee: (was: James Cheng)

> Regression in JoinGroup v0 rebalance timeout handling
> -
>
> Key: KAFKA-8653
> URL: https://issues.apache.org/jira/browse/KAFKA-8653
> Project: Kafka
>  Issue Type: Bug
>Affects Versions: 2.3.0
>Reporter: Jason Gustafson
>Priority: Blocker
> Fix For: 2.3.1
>
>
> The rebalance timeout was added to the JoinGroup protocol in version 1. Prior 
> to 2.3, we handled version 0 JoinGroup requests by setting the rebalance 
> timeout to be equal to the session timeout. We lost this logic when we 
> converted the API to use the generated schema definition which uses the 
> default value of -1. The impact of this is that the group rebalance timeout 
> becomes 0, so rebalances finish immediately after we enter the 
> PrepareRebalance state and kick out all old members. This causes consumer 
> groups to enter an endless rebalance loop.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[GitHub] [kafka] hachikuji commented on a change in pull request #9103: KAFKA-10181: Add redirection for (Incremental)AlterConfig, AlterClientQuota and CreateTopics

2020-09-22 Thread GitBox


hachikuji commented on a change in pull request #9103:
URL: https://github.com/apache/kafka/pull/9103#discussion_r493160927



##
File path: 
clients/src/main/java/org/apache/kafka/clients/admin/AlterConfigsUtil.java
##
@@ -0,0 +1,52 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.clients.admin;
+
+import org.apache.kafka.common.config.ConfigResource;
+import org.apache.kafka.common.message.IncrementalAlterConfigsRequestData;
+
+import java.util.Collection;
+import java.util.Map;
+
+public class AlterConfigsUtil {
+
+public static IncrementalAlterConfigsRequestData 
generateIncrementalRequestData(final Map> configs,
+   
 final boolean validateOnly) {
+return generateIncrementalRequestData(configs.keySet(), configs, 
validateOnly);
+}
+
+public static IncrementalAlterConfigsRequestData 
generateIncrementalRequestData(final Collection resources,

Review comment:
   nit: might be useful to document the expectation that `resources` is a 
subset of the key set of `configs`. The signature surprised me a little bit.
   
   As an aside, this kind of convenience conversion seems more appropriate for 
`IncrementalAlterConfigsRequest.Builder` rather than a static class.

##
File path: core/src/main/scala/kafka/api/ApiVersion.scala
##
@@ -103,6 +103,9 @@ object ApiVersion {
 KAFKA_2_7_IV0,
 // Bup Fetch protocol for Raft protocol (KIP-595)
 KAFKA_2_7_IV1,
+// Enable redirection (KIP-590)
+// TODO: remove this IBP in the 2.7 release if redirection work could not 
be done before the freeze

Review comment:
   Get rid of this TODO. We do not need to remove IBP internal versions.

##
File path: core/src/main/scala/kafka/common/InterBrokerSendThread.scala
##
@@ -147,7 +147,9 @@ abstract class InterBrokerSendThread(name: String,
 
 case class RequestAndCompletionHandler(destination: Node,
request: AbstractRequest.Builder[_ <: 
AbstractRequest],
-   handler: RequestCompletionHandler)
+   handler: RequestCompletionHandler,
+   initialPrincipalName: String = null,

Review comment:
   nit: why don't we add a case class and make this optional. for example:
   
   ```scala
   case class InitialPrincipal(name: String, clientId: String)
   ```
   In addition to reducing parameters, that makes the expectation that both are 
provided explicit.
   

##
File path: core/src/main/scala/kafka/server/KafkaApis.scala
##
@@ -117,6 +117,89 @@ class KafkaApis(val requestChannel: RequestChannel,
   val adminZkClient = new AdminZkClient(zkClient)
   private val alterAclsPurgatory = new DelayedFuturePurgatory(purgatoryName = 
"AlterAcls", brokerId = config.brokerId)
 
+  /**
+   * The template to create a forward request handler.
+   *
+   * @tparam T request type
+   * @tparam R response type
+   * @tparam RK resource key
+   * @tparam RV resource value
+   */
+  private[server] abstract class ForwardRequestHandler[T <: AbstractRequest,
+R <: AbstractResponse, RK, RV](request: RequestChannel.Request) extends 
Logging {
+
+/**
+ * Split the given resource into authorized and unauthorized sets.
+ *
+ * @return authorized resources and unauthorized resources
+ */
+def resourceSplitByAuthorization(request: T): (Map[RK, RV], Map[RK, 
ApiError])
+
+/**
+ * Controller handling logic of the request.
+ */
+def process(authorizedResources: Map[RK, RV],
+unauthorizedResult: Map[RK, ApiError],
+request: T): Unit
+
+/**
+ * Build a forward request to the controller.
+ *
+ * @param authorizedResources authorized resources by the forwarding broker
+ * @param request the original request
+ * @return forward request builder
+ */
+def createRequestBuilder(authorizedResources: Map[RK, RV],
+ request: T): AbstractRequest.Builder[T]
+
+/**
+ * Merge the forward response with the previously unauthorized 

[GitHub] [kafka] huxihx commented on pull request #9218: MINOR: Fix shouldNotResetEpochHistoryHeadIfUndefinedPassed

2020-09-22 Thread GitBox


huxihx commented on pull request #9218:
URL: https://github.com/apache/kafka/pull/9218#issuecomment-697095836


   retest this please.
   



This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[jira] [Created] (KAFKA-10513) Newly added partitions are not assigned to running static consumer

2020-09-22 Thread Marlon Ou (Jira)
Marlon Ou created KAFKA-10513:
-

 Summary: Newly added partitions are not assigned to running static 
consumer
 Key: KAFKA-10513
 URL: https://issues.apache.org/jira/browse/KAFKA-10513
 Project: Kafka
  Issue Type: Bug
  Components: consumer
Affects Versions: 2.6.0
Reporter: Marlon Ou


If consumers are polling messages from a certain topic with static membership 
and we add new partitions to this topic while the consumers are running, no 
partition reassignment is ever triggered (and hence messages published into the 
new partitions are never consumed). 

To reproduce, simply set group instance IDs on the consumers: 
{code:java}
props.setProperty("group.instance.id", instanceId);
{code}
And then while the static consumers are running, use Kafka's admin client to 
add more partitions to the topic:
{code:java}
adminClient.createPartitions(...)
{code}
 



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[GitHub] [kafka] mattwong949 opened a new pull request #9322: KAFKA-10512: Prevent JmxTool Crashing on Unmarshall Error

2020-09-22 Thread GitBox


mattwong949 opened a new pull request #9322:
URL: https://github.com/apache/kafka/pull/9322


   The JMXTool will query for all metrics when not supplied an "--object-name" 
arg. However, some MBean objects can contain attributes that cannot be 
serialized, thus crashing the JMXTool before reporting any metrics. This PR 
catches those exceptions, printing an error message but allowing the tool to 
continue to reporting all metrics w/ the errored ones filtered out



This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[jira] [Created] (KAFKA-10512) JmxTool Can Crash on Unmarshall Error

2020-09-22 Thread Matthew Wong (Jira)
Matthew Wong created KAFKA-10512:


 Summary: JmxTool Can Crash on Unmarshall Error
 Key: KAFKA-10512
 URL: https://issues.apache.org/jira/browse/KAFKA-10512
 Project: Kafka
  Issue Type: Bug
  Components: tools
Affects Versions: 2.6.0
Reporter: Matthew Wong


JmxTool can potentially crash from errors when querying for MBean objects. The 
errors can be caused by MBean objects that have attributes which can't be 
serialized.

When querying for all metrics, if the tool encounters such nonserializable 
MBean attributes, the tool will crash without outputting any metrics. Instead, 
the tool should print an error message and filter out the problematic objects, 
proceeding to print all other metrics.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[GitHub] [kafka] jeqo opened a new pull request #9321: fix: add missing default implementations

2020-09-22 Thread GitBox


jeqo opened a new pull request #9321:
URL: https://github.com/apache/kafka/pull/9321


   
   ### Committer Checklist (excluded from commit message)
   - [ ] Verify design and implementation 
   - [ ] Verify test coverage and CI build status
   - [ ] Verify documentation (including upgrade notes)
   



This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] gmunozfe commented on pull request #9309: KAFKA-10503: MockProducer doesn't throw ClassCastException when no

2020-09-22 Thread GitBox


gmunozfe commented on pull request #9309:
URL: https://github.com/apache/kafka/pull/9309#issuecomment-696995620


   @mjsax @guozhangwang @leonardge could you review? thanks!



This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[jira] [Created] (KAFKA-10511) Fix minor behavior difference in `MockLog`

2020-09-22 Thread Jason Gustafson (Jira)
Jason Gustafson created KAFKA-10511:
---

 Summary: Fix minor behavior difference in `MockLog`
 Key: KAFKA-10511
 URL: https://issues.apache.org/jira/browse/KAFKA-10511
 Project: Kafka
  Issue Type: Sub-task
Reporter: Jason Gustafson
Assignee: Jason Gustafson


Fix minor difference in the implementation of the epoch cache in MockLog. In 
`LeaderEpochFileCache`, we ensure new entries increase both start offset and 
epoch monotonically. We also do not allow duplicates.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[GitHub] [kafka] ableegoldman commented on pull request #9312: KAFKA-10505: Fix parsing of generation log string.

2020-09-22 Thread GitBox


ableegoldman commented on pull request #9312:
URL: https://github.com/apache/kafka/pull/9312#issuecomment-696925826


   Original system test run was aborted for some reason. Kicked off a new set: 
https://jenkins.confluent.io/job/system-test-kafka-branch-builder/4182/



This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] avocader opened a new pull request #9320: KAFKA-10439: Connect's Values to parse BigInteger as Decimal with zero scale.

2020-09-22 Thread GitBox


avocader opened a new pull request #9320:
URL: https://github.com/apache/kafka/pull/9320


   The `org.apache.kafka.connect.data.Values#parse` method parses integers, 
which are larger than `Long.MAX_VALUE` as `double` with `Schema.FLOAT64_SCHEMA`.
   
   That means we are losing precision for these larger integers.
   
   For example:
   ```
   SchemaAndValue schemaAndValue = Values.parseString("9223372036854775808");
   ```
   returns:
   ```
   SchemaAndValue{schema=Schema{FLOAT64}, value=9.223372036854776E18}
   ```
   Also, this method parses values, that can be parsed as `FLOAT32` to 
`FLOAT64`.
   
   This PR changes parsing logic, to use `FLOAT32`/`FLOAT64` for numbers that 
don't have and fraction part(`decimal.scale()!=0`) only, and use an 
arbitrary-precision `org.apache.kafka.connect.data.Decimal` otherwise.
   Also, it updates the method to parse numbers, that can be represented as 
`float` to `FLOAT64`.
   
   Added unit tests, that cover parsing `BigInteger`, `Byte`, `Short`, 
`Integer`, `Long`, `Float`, `Double` types.
   
   ### Committer Checklist (excluded from commit message)
   - [ ] Verify design and implementation 
   - [ ] Verify test coverage and CI build status
   - [ ] Verify documentation (including upgrade notes)
   



This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] avocader closed pull request #9227: KAFKA-10439: Connect's Values to parse BigInteger as Decimal with zero scale.

2020-09-22 Thread GitBox


avocader closed pull request #9227:
URL: https://github.com/apache/kafka/pull/9227


   



This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[jira] [Resolved] (KAFKA-10492) Core Raft implementation

2020-09-22 Thread Jason Gustafson (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-10492?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Jason Gustafson resolved KAFKA-10492.
-
Resolution: Fixed

> Core Raft implementation
> 
>
> Key: KAFKA-10492
> URL: https://issues.apache.org/jira/browse/KAFKA-10492
> Project: Kafka
>  Issue Type: Sub-task
>Reporter: Jason Gustafson
>Assignee: Jason Gustafson
>Priority: Major
>
> This issue tracks the core implementation of the Raft protocol specified in 
> KIP-595: 
> https://cwiki.apache.org/confluence/display/KAFKA/KIP-595%3A+A+Raft+Protocol+for+the+Metadata+Quorum.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[GitHub] [kafka] hachikuji merged pull request #9130: KAFKA-10492; Core Kafka Raft Implementation (KIP-595)

2020-09-22 Thread GitBox


hachikuji merged pull request #9130:
URL: https://github.com/apache/kafka/pull/9130


   



This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] guozhangwang commented on a change in pull request #9083: KAFKA-9450: Follow-up; Forbid process after closed

2020-09-22 Thread GitBox


guozhangwang commented on a change in pull request #9083:
URL: https://github.com/apache/kafka/pull/9083#discussion_r492950073



##
File path: 
streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamFlatTransformValues.java
##
@@ -44,19 +45,18 @@ public KStreamFlatTransformValues(final 
ValueTransformerWithKeySupplier 
implements Processor {
+public static class KStreamFlatTransformValuesProcessor 
extends AbstractProcessor {

Review comment:
   As described in at the top, `Let all built-in processors to extend from 
AbstractProcessor.` The main reason is that AbstractProcessor provides some 
basic functionalities and hence it's better to let our own impl to base on them.





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] guozhangwang commented on a change in pull request #9083: KAFKA-9450: Follow-up; Forbid process after closed

2020-09-22 Thread GitBox


guozhangwang commented on a change in pull request #9083:
URL: https://github.com/apache/kafka/pull/9083#discussion_r492944504



##
File path: 
streams/src/main/java/org/apache/kafka/streams/processor/internals/ProcessorContextImpl.java
##
@@ -217,12 +217,12 @@ public StateStore getStateStore(final String name) {
 forward((ProcessorNode) child, key, value);
 }
 } else {
-final ProcessorNode child = 
currentNode().getChild(sendTo);

Review comment:
   I made the change in ProcessorNode to add back the template types: 
https://github.com/apache/kafka/pull/9083/files/82b6f6f5d238401097e0906c8135c5c189524666#diff-705bfd0ed3f214048b76d775708cc7d2L96
   
   But since `currentNode()`'s template is `` its templated 
`getChild` and that's why I need to weaken it here --- as you can see from the 
above `if` branch, it now aligns consistently on the typing.





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] abbccdda commented on a change in pull request #9083: KAFKA-9450: Follow-up; Forbid process after closed

2020-09-22 Thread GitBox


abbccdda commented on a change in pull request #9083:
URL: https://github.com/apache/kafka/pull/9083#discussion_r492941448



##
File path: 
streams/src/test/java/org/apache/kafka/streams/processor/internals/StreamTaskTest.java
##
@@ -1566,6 +1566,7 @@ public void shouldCheckpointForSuspendedTask() {
 EasyMock.verify(stateManager);
 }
 
+

Review comment:
   nit: not necessary

##
File path: 
streams/src/main/java/org/apache/kafka/streams/processor/internals/ProcessorContextImpl.java
##
@@ -217,12 +217,12 @@ public StateStore getStateStore(final String name) {
 forward((ProcessorNode) child, key, value);
 }
 } else {
-final ProcessorNode child = 
currentNode().getChild(sendTo);

Review comment:
   +1

##
File path: 
streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamFlatTransformValues.java
##
@@ -44,19 +45,18 @@ public KStreamFlatTransformValues(final 
ValueTransformerWithKeySupplier 
implements Processor {
+public static class KStreamFlatTransformValuesProcessor 
extends AbstractProcessor {

Review comment:
   Could you elaborate why this is better than Processor?





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] piotrrzysko commented on pull request #9315: KAFKA-10496: Removed relying on external DNS servers in tests

2020-09-22 Thread GitBox


piotrrzysko commented on pull request #9315:
URL: https://github.com/apache/kafka/pull/9315#issuecomment-696860022


   @jolshan @mumrah @dajac Could you please take look at this PR? 



This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] abbccdda commented on a change in pull request #9270: [WIP] KAFKA-10284: Group membership update due to static member rejoin should be persisted

2020-09-22 Thread GitBox


abbccdda commented on a change in pull request #9270:
URL: https://github.com/apache/kafka/pull/9270#discussion_r492896997



##
File path: 
core/src/test/scala/unit/kafka/coordinator/group/GroupCoordinatorTest.scala
##
@@ -3882,6 +3942,21 @@ class GroupCoordinatorTest {
 Await.result(responseFuture, Duration(rebalanceTimeout + 100, 
TimeUnit.MILLISECONDS))
   }
 
+  private def staticJoinGroupWithPersistence(groupId: String,
+ memberId: String,

Review comment:
   nit: alignment

##
File path: core/src/main/scala/kafka/coordinator/group/GroupCoordinator.scala
##
@@ -1040,21 +1040,36 @@ class GroupCoordinator(val brokerId: Int,
 
 group.currentState match {
   case Stable =>
-info(s"Static member joins during Stable stage will not trigger 
rebalance.")
-group.maybeInvokeJoinCallback(member, JoinGroupResult(
-  members = List.empty,
-  memberId = newMemberId,
-  generationId = group.generationId,
-  protocolType = group.protocolType,
-  protocolName = group.protocolName,
-  // We want to avoid current leader performing trivial assignment 
while the group
-  // is in stable stage, because the new assignment in leader's next 
sync call
-  // won't be broadcast by a stable group. This could be guaranteed by
-  // always returning the old leader id so that the current leader 
won't assume itself
-  // as a leader based on the returned message, since the new 
member.id won't match
-  // returned leader id, therefore no assignment will be performed.
-  leaderId = currentLeader,
-  error = Errors.NONE))
+// check if group's selectedProtocol of next generation will change, 
if not, simply store group to persist the
+// updated static member, if yes, rebalance should be triggered to let 
the group's assignment and selectProtocol consistent
+val selectedProtocolOfNextGeneration = group.selectProtocol
+if (group.protocolName.contains(selectedProtocolOfNextGeneration)) {
+  info(s"Static member which joins during Stable stage and doesn't 
affect selectProtocol will not trigger rebalance.")
+  val groupAssignment: Map[String, Array[Byte]] = 
group.allMemberMetadata.map(member => member.memberId -> 
member.assignment).toMap
+  groupManager.storeGroup(group, groupAssignment, error => {
+group.inLock {
+  if (error != Errors.NONE) {
+warn(s"Failed to persist metadata for group ${group.groupId}: 
${error.message}")
+  }
+}
+  })
+  group.maybeInvokeJoinCallback(member, JoinGroupResult(
+members = List.empty,
+memberId = newMemberId,
+generationId = group.generationId,
+protocolType = group.protocolType,
+protocolName = group.protocolName,
+// We want to avoid current leader performing trivial assignment 
while the group
+// is in stable stage, because the new assignment in leader's next 
sync call
+// won't be broadcast by a stable group. This could be guaranteed 
by
+// always returning the old leader id so that the current leader 
won't assume itself
+// as a leader based on the returned message, since the new 
member.id won't match
+// returned leader id, therefore no assignment will be performed.
+leaderId = currentLeader,
+error = Errors.NONE))
+} else {
+  maybePrepareRebalance(group, s"Group's selectedProtocol will change 
because static member ${member.memberId} with instance id $groupInstanceId 
joined with change of protocol")

Review comment:
   Could you elaborate why this case is possible? We do have checks for 
`!group.supportsProtocols(protocolType, 
MemberMetadata.plainProtocolSet(protocols)` in the caller, so if the group 
protocol is incompatible, won't we just reject the rejoin?

##
File path: core/src/main/scala/kafka/coordinator/group/GroupCoordinator.scala
##
@@ -1040,21 +1040,36 @@ class GroupCoordinator(val brokerId: Int,
 
 group.currentState match {
   case Stable =>
-info(s"Static member joins during Stable stage will not trigger 
rebalance.")
-group.maybeInvokeJoinCallback(member, JoinGroupResult(
-  members = List.empty,
-  memberId = newMemberId,
-  generationId = group.generationId,
-  protocolType = group.protocolType,
-  protocolName = group.protocolName,
-  // We want to avoid current leader performing trivial assignment 
while the group
-  // is in stable stage, because the new assignment in leader's next 
sync call
-  // won't be broadcast by a stable group. This could be guaranteed by
-  // always returning the old leader id so that the current leader 
won't assume itself

[GitHub] [kafka] chia7712 commented on a change in pull request #9206: MINOR: rewrite zipWithIndex by normal foreach to refrain unnecessary …

2020-09-22 Thread GitBox


chia7712 commented on a change in pull request #9206:
URL: https://github.com/apache/kafka/pull/9206#discussion_r492872408



##
File path: core/src/main/scala/kafka/log/LogValidator.scala
##
@@ -234,17 +234,16 @@ private[log] object LogValidator extends Logging {
 
 val firstBatch = getFirstBatchAndMaybeValidateNoMoreBatches(records, 
NoCompressionCodec)
 
-records.batches.forEach { batch =>
+for (batch <- records.batches.asScala) {

Review comment:
   ok~





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] ijuma commented on a change in pull request #9206: MINOR: rewrite zipWithIndex by normal foreach to refrain unnecessary …

2020-09-22 Thread GitBox


ijuma commented on a change in pull request #9206:
URL: https://github.com/apache/kafka/pull/9206#discussion_r492865230



##
File path: core/src/main/scala/kafka/log/LogValidator.scala
##
@@ -234,17 +234,16 @@ private[log] object LogValidator extends Logging {
 
 val firstBatch = getFirstBatchAndMaybeValidateNoMoreBatches(records, 
NoCompressionCodec)
 
-records.batches.forEach { batch =>
+for (batch <- records.batches.asScala) {

Review comment:
   I liked your changes to make the code more concise, I'd keep them.





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] mjsax commented on pull request #9156: KAFKA-10077: Filter downstream of state-store results in spurious tombstones

2020-09-22 Thread GitBox


mjsax commented on pull request #9156:
URL: https://github.com/apache/kafka/pull/9156#issuecomment-696823406


   Retest this please.



This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] vvcephei merged pull request #9262: MINOR: Fix log message when tasks directory is cleaned manually

2020-09-22 Thread GitBox


vvcephei merged pull request #9262:
URL: https://github.com/apache/kafka/pull/9262


   



This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[jira] [Updated] (KAFKA-10510) Reassigning partitions should not allow increasing RF of a partition unless configured with it

2020-09-22 Thread Stanislav Kozlovski (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-10510?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Stanislav Kozlovski updated KAFKA-10510:

Description: 
Kafka should have some validations in place against increasing the RF of a 
partition through a reassignment. Users could otherwise shoot themselves in the 
foot by increasing the RF of a topic by reassigning its partitions to extra 
replicas and then have new partition creations use a lesser (the configured) 
replication factor.

Our tools should ideally detect when RF is increasing inconsistently with the 
rest of the topic's partitions (or the default replication factor)

  was:
Kafka should have some validations in place against increasing the RF of a 
partition through a reassignment. Users could otherwise shoot themselves in the 
foot by increasing the RF of a topic by reassigning its partitions to extra 
replicas and then have new partition creations use a lesser (the configured) 
replication factor.

Our tools should ideally detect when RF is increasing inconsistently with the 
config and issue a separate command to change the config.


> Reassigning partitions should not allow increasing RF of a partition unless 
> configured with it
> --
>
> Key: KAFKA-10510
> URL: https://issues.apache.org/jira/browse/KAFKA-10510
> Project: Kafka
>  Issue Type: Improvement
>Reporter: Stanislav Kozlovski
>Priority: Major
>
> Kafka should have some validations in place against increasing the RF of a 
> partition through a reassignment. Users could otherwise shoot themselves in 
> the foot by increasing the RF of a topic by reassigning its partitions to 
> extra replicas and then have new partition creations use a lesser (the 
> configured) replication factor.
> Our tools should ideally detect when RF is increasing inconsistently with the 
> rest of the topic's partitions (or the default replication factor)



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Commented] (KAFKA-10507) Limit the set of APIs returned in pre-authentication ApiVersions

2020-09-22 Thread Jason Gustafson (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-10507?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17200172#comment-17200172
 ] 

Jason Gustafson commented on KAFKA-10507:
-

[~edenhill] It was more of a suggestion. I was thinking to avoid leaking 
information prior to authentication. Since librdkafka depends on it, I will 
just close this as "won't fix."

> Limit the set of APIs returned in pre-authentication ApiVersions 
> -
>
> Key: KAFKA-10507
> URL: https://issues.apache.org/jira/browse/KAFKA-10507
> Project: Kafka
>  Issue Type: Improvement
>Reporter: Jason Gustafson
>Assignee: David Jacot
>Priority: Major
>
> We use the ApiVersions RPC to check whether the SaslHandshake and 
> SaslAuthenticate APIs are supported before authenticating with the broker. 
> Currently the response contains all APIs supported by the broker. It seems 
> like a good idea to reduce the set of APIs returned at this level to only 
> those which are supported prior to authentication. 



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Resolved] (KAFKA-10507) Limit the set of APIs returned in pre-authentication ApiVersions

2020-09-22 Thread Jason Gustafson (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-10507?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Jason Gustafson resolved KAFKA-10507.
-
Resolution: Won't Do

> Limit the set of APIs returned in pre-authentication ApiVersions 
> -
>
> Key: KAFKA-10507
> URL: https://issues.apache.org/jira/browse/KAFKA-10507
> Project: Kafka
>  Issue Type: Improvement
>Reporter: Jason Gustafson
>Assignee: David Jacot
>Priority: Major
>
> We use the ApiVersions RPC to check whether the SaslHandshake and 
> SaslAuthenticate APIs are supported before authenticating with the broker. 
> Currently the response contains all APIs supported by the broker. It seems 
> like a good idea to reduce the set of APIs returned at this level to only 
> those which are supported prior to authentication. 



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[GitHub] [kafka] chia7712 commented on a change in pull request #9206: MINOR: rewrite zipWithIndex by normal foreach to refrain unnecessary …

2020-09-22 Thread GitBox


chia7712 commented on a change in pull request #9206:
URL: https://github.com/apache/kafka/pull/9206#discussion_r492834735



##
File path: core/src/main/scala/kafka/log/LogValidator.scala
##
@@ -234,16 +234,17 @@ private[log] object LogValidator extends Logging {
 
 val firstBatch = getFirstBatchAndMaybeValidateNoMoreBatches(records, 
NoCompressionCodec)
 
-for (batch <- records.batches.asScala) {
+records.batches.forEach { batch =>
   validateBatch(topicPartition, firstBatch, batch, origin, toMagicValue, 
brokerTopicStats)
 
   val recordErrors = new ArrayBuffer[ApiRecordError](0)
-  for ((record, batchIndex) <- batch.asScala.view.zipWithIndex) {
+  var batchIndex = 0
+  batch.forEach { record =>
 validateRecord(batch, topicPartition, record, batchIndex, now, 
timestampType,
   timestampDiffMaxMs, compactedTopic, 
brokerTopicStats).foreach(recordError => recordErrors += recordError)
 // we fail the batch if any record fails, so we stop appending if any 
record fails
-if (recordErrors.isEmpty)
-  builder.appendWithOffset(offsetCounter.getAndIncrement(), record)
+if (recordErrors.isEmpty) 
builder.appendWithOffset(offsetCounter.getAndIncrement(), record)
+batchIndex += 1

Review comment:
   > Have we benchmarked this path?
   
   I didn't benchmark this path and you are right that optimization is small as 
we have to convert data in this path. I will revert it to make small patch. 

##
File path: core/src/main/scala/kafka/log/LogValidator.scala
##
@@ -279,14 +280,15 @@ private[log] object LogValidator extends Logging {
 
 val firstBatch = getFirstBatchAndMaybeValidateNoMoreBatches(records, 
NoCompressionCodec)
 
-for (batch <- records.batches.asScala) {
+records.batches.forEach { batch =>
   validateBatch(topicPartition, firstBatch, batch, origin, magic, 
brokerTopicStats)
 
   var maxBatchTimestamp = RecordBatch.NO_TIMESTAMP
   var offsetOfMaxBatchTimestamp = -1L
 
   val recordErrors = new ArrayBuffer[ApiRecordError](0)
-  for ((record, batchIndex) <- batch.asScala.view.zipWithIndex) {
+  var batchIndex = 0

Review comment:
   copy that





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] ijuma commented on a change in pull request #9206: MINOR: rewrite zipWithIndex by normal foreach to refrain unnecessary …

2020-09-22 Thread GitBox


ijuma commented on a change in pull request #9206:
URL: https://github.com/apache/kafka/pull/9206#discussion_r492823279



##
File path: core/src/main/scala/kafka/log/LogValidator.scala
##
@@ -279,14 +280,15 @@ private[log] object LogValidator extends Logging {
 
 val firstBatch = getFirstBatchAndMaybeValidateNoMoreBatches(records, 
NoCompressionCodec)
 
-for (batch <- records.batches.asScala) {
+records.batches.forEach { batch =>
   validateBatch(topicPartition, firstBatch, batch, origin, magic, 
brokerTopicStats)
 
   var maxBatchTimestamp = RecordBatch.NO_TIMESTAMP
   var offsetOfMaxBatchTimestamp = -1L
 
   val recordErrors = new ArrayBuffer[ApiRecordError](0)
-  for ((record, batchIndex) <- batch.asScala.view.zipWithIndex) {
+  var batchIndex = 0

Review comment:
   Worth adding a comment here that this is a hot path and we want to avoid 
any unnecessary allocations.





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] ijuma commented on a change in pull request #9206: MINOR: rewrite zipWithIndex by normal foreach to refrain unnecessary …

2020-09-22 Thread GitBox


ijuma commented on a change in pull request #9206:
URL: https://github.com/apache/kafka/pull/9206#discussion_r492821097



##
File path: core/src/main/scala/kafka/log/LogValidator.scala
##
@@ -234,16 +234,17 @@ private[log] object LogValidator extends Logging {
 
 val firstBatch = getFirstBatchAndMaybeValidateNoMoreBatches(records, 
NoCompressionCodec)
 
-for (batch <- records.batches.asScala) {
+records.batches.forEach { batch =>
   validateBatch(topicPartition, firstBatch, batch, origin, toMagicValue, 
brokerTopicStats)
 
   val recordErrors = new ArrayBuffer[ApiRecordError](0)
-  for ((record, batchIndex) <- batch.asScala.view.zipWithIndex) {
+  var batchIndex = 0
+  batch.forEach { record =>
 validateRecord(batch, topicPartition, record, batchIndex, now, 
timestampType,
   timestampDiffMaxMs, compactedTopic, 
brokerTopicStats).foreach(recordError => recordErrors += recordError)
 // we fail the batch if any record fails, so we stop appending if any 
record fails
-if (recordErrors.isEmpty)
-  builder.appendWithOffset(offsetCounter.getAndIncrement(), record)
+if (recordErrors.isEmpty) 
builder.appendWithOffset(offsetCounter.getAndIncrement(), record)
+batchIndex += 1

Review comment:
   Have we benchmarked this path? It seems doubtful that these micro 
optimizations help given that we are `converting`.





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] ijuma commented on pull request #4090: [KAFKA-6084] Propagate JSON parsing errors in ReassignPartitionsCommand

2020-09-22 Thread GitBox


ijuma commented on pull request #4090:
URL: https://github.com/apache/kafka/pull/4090#issuecomment-696784662


   Can you also rebase please so that the PR builder runs?



This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] ijuma commented on pull request #9266: KAFKA-10469: Resolve logger levels hierarchically

2020-09-22 Thread GitBox


ijuma commented on pull request #9266:
URL: https://github.com/apache/kafka/pull/9266#issuecomment-696784359


   @tombentley `testDescribeConfigsForLog4jLogLevels` is failing.



This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] ijuma commented on a change in pull request #4090: [KAFKA-6084] Propagate JSON parsing errors in ReassignPartitionsCommand

2020-09-22 Thread GitBox


ijuma commented on a change in pull request #4090:
URL: https://github.com/apache/kafka/pull/4090#discussion_r492813256



##
File path: core/src/main/scala/kafka/utils/Json.scala
##
@@ -69,8 +70,11 @@ object Json {
* @return An `Either` which in case of `Left` means an exception and 
`Right` is the actual return value.
*/
   def tryParseFull(input: String): Either[JsonProcessingException, JsonValue] =
-try Right(mapper.readTree(input)).map(JsonValue(_))
-catch { case e: JsonProcessingException => Left(e) }
+if (input != null && input.isEmpty)
+  Left(new JsonParseException(MissingNode.getInstance().traverse(), "The 
input string shouldn't be empty"))

Review comment:
   This seems fine.





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] ijuma commented on a change in pull request #4090: [KAFKA-6084] Propagate JSON parsing errors in ReassignPartitionsCommand

2020-09-22 Thread GitBox


ijuma commented on a change in pull request #4090:
URL: https://github.com/apache/kafka/pull/4090#discussion_r492811318



##
File path: core/src/main/scala/kafka/utils/Json.scala
##
@@ -69,8 +70,11 @@ object Json {
* @return An `Either` which in case of `Left` means an exception and 
`Right` is the actual return value.
*/
   def tryParseFull(input: String): Either[JsonProcessingException, JsonValue] =
-try Right(mapper.readTree(input)).map(JsonValue(_))
-catch { case e: JsonProcessingException => Left(e) }
+if (input != null && input.isEmpty)

Review comment:
   I think we should say `if (input == null || input.isEmpty)` instead. 
It's unexpected to get an exception instead of `Left` in a method that returns 
`Either`.





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] vvcephei commented on pull request #9262: MINOR: Fix log message when tasks directory is cleaned manually

2020-09-22 Thread GitBox


vvcephei commented on pull request #9262:
URL: https://github.com/apache/kafka/pull/9262#issuecomment-696760191


   Test failure unrelated: `Build / JDK 11 / 
kafka.api.ConsumerBounceTest.testRollingBrokerRestartsWithSmallerMaxGroupSizeConfigDisruptsBigGroup`



This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] vvcephei merged pull request #8892: KAFKA-10068: verify assignment performance with large cluster

2020-09-22 Thread GitBox


vvcephei merged pull request #8892:
URL: https://github.com/apache/kafka/pull/8892


   



This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] vvcephei commented on pull request #8892: KAFKA-10068: verify assignment performance with large cluster

2020-09-22 Thread GitBox


vvcephei commented on pull request #8892:
URL: https://github.com/apache/kafka/pull/8892#issuecomment-696752069


   Thanks, @ableegoldman !
   
   There was only one unrelated test failure:
   `Build / JDK 11 / 
kafka.network.DynamicConnectionQuotaTest.testDynamicListenerConnectionCreationRateQuota`



This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] viktorsomogyi commented on a change in pull request #4090: [KAFKA-6084] Propagate JSON parsing errors in ReassignPartitionsCommand

2020-09-22 Thread GitBox


viktorsomogyi commented on a change in pull request #4090:
URL: https://github.com/apache/kafka/pull/4090#discussion_r492740825



##
File path: core/src/main/scala/kafka/utils/Json.scala
##
@@ -69,8 +70,11 @@ object Json {
* @return An `Either` which in case of `Left` means an exception and 
`Right` is the actual return value.
*/
   def tryParseFull(input: String): Either[JsonProcessingException, JsonValue] =
-try Right(mapper.readTree(input)).map(JsonValue(_))
-catch { case e: JsonProcessingException => Left(e) }
+if (input != null && input.isEmpty)
+  Left(new JsonParseException(MissingNode.getInstance().traverse(), "The 
input string shouldn't be empty"))

Review comment:
   It seems like that the most adequate JsonProcessingException is 
JsonParseException that could apply here. Another possibility is to change the 
exception in return type to something like IOException and throw that here but 
it seems it's more vague than throwing JsonParseException.
   Another option is to simple throw IllegalArgumentException here instead of 
Left. That would correspond to the `readTree(InputStream)` as well. What do you 
think?





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] viktorsomogyi commented on a change in pull request #4090: [KAFKA-6084] Propagate JSON parsing errors in ReassignPartitionsCommand

2020-09-22 Thread GitBox


viktorsomogyi commented on a change in pull request #4090:
URL: https://github.com/apache/kafka/pull/4090#discussion_r492727120



##
File path: core/src/main/scala/kafka/utils/Json.scala
##
@@ -69,8 +70,11 @@ object Json {
* @return An `Either` which in case of `Left` means an exception and 
`Right` is the actual return value.
*/
   def tryParseFull(input: String): Either[JsonProcessingException, JsonValue] =
-try Right(mapper.readTree(input)).map(JsonValue(_))
-catch { case e: JsonProcessingException => Left(e) }
+if (input != null && input.isEmpty)

Review comment:
   Wasn't sure what to do here but `null` is checked in `readTree` and an 
IllegalArgumentException is thrown so I decided to go skip here and leave the 
original behavior.

##
File path: core/src/main/scala/kafka/utils/Json.scala
##
@@ -69,8 +70,11 @@ object Json {
* @return An `Either` which in case of `Left` means an exception and 
`Right` is the actual return value.
*/
   def tryParseFull(input: String): Either[JsonProcessingException, JsonValue] =
-try Right(mapper.readTree(input)).map(JsonValue(_))
-catch { case e: JsonProcessingException => Left(e) }
+if (input != null && input.isEmpty)

Review comment:
   Wasn't sure what to do here but `null` is checked in `readTree` and an 
IllegalArgumentException is thrown so I decided to skip here and leave the 
original behavior.





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] viktorsomogyi commented on a change in pull request #4090: [KAFKA-6084] Propagate JSON parsing errors in ReassignPartitionsCommand

2020-09-22 Thread GitBox


viktorsomogyi commented on a change in pull request #4090:
URL: https://github.com/apache/kafka/pull/4090#discussion_r492704280



##
File path: core/src/test/scala/unit/kafka/utils/JsonTest.scala
##
@@ -40,25 +40,34 @@ class JsonTest {
   def testJsonParse(): Unit = {
 val jnf = JsonNodeFactory.instance
 
-assertEquals(Json.parseFull("{}"), Some(JsonValue(new ObjectNode(jnf
+assertEquals(Some(JsonValue(new ObjectNode(jnf))), Json.parseFull("{}"))
+assertEquals(Right(JsonValue(new ObjectNode(jnf))), 
Json.tryParseFull("{}"))
+assertThrows(classOf[IllegalArgumentException], () => 
Json.tryParseFull(null))
+assertThrows(classOf[IllegalArgumentException], () => 
Json.tryParseBytes(null))
 
-assertEquals(Json.parseFull("""{"foo":"bar"s}"""), None)
+assertEquals(Option(MissingNode.getInstance()).map(JsonValue(_)), 
Json.parseFull(""))
+assertEquals(Right(MissingNode.getInstance()).map(JsonValue(_)), 
Json.tryParseFull(""))

Review comment:
   You're right, it makes sense to return `None` and `Left` in those cases. 
Will change it and upload it to this PR.





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] chia7712 commented on a change in pull request #9318: KAFKA-10497 Convert group coordinator metadata schemas to use generat…

2020-09-22 Thread GitBox


chia7712 commented on a change in pull request #9318:
URL: https://github.com/apache/kafka/pull/9318#discussion_r492696416



##
File path: core/src/main/resources/common/message/GroupMetadataValue.json
##
@@ -0,0 +1,103 @@
+// Licensed to the Apache Software Foundation (ASF) under one or more
+// contributor license agreements.  See the NOTICE file distributed with
+// this work for additional information regarding copyright ownership.
+// The ASF licenses this file to You under the Apache License, Version 2.0
+// (the "License"); you may not use this file except in compliance with
+// the License.  You may obtain a copy of the License at
+//
+//http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+
+{
+  "name": "GroupMetadataValue",

Review comment:
   copy that





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] chia7712 commented on a change in pull request #9318: KAFKA-10497 Convert group coordinator metadata schemas to use generat…

2020-09-22 Thread GitBox


chia7712 commented on a change in pull request #9318:
URL: https://github.com/apache/kafka/pull/9318#discussion_r492696066



##
File path: 
core/src/main/scala/kafka/coordinator/group/GroupMetadataManager.scala
##
@@ -997,189 +997,52 @@ object GroupMetadataManager {
   val MetricsGroup: String = "group-coordinator-metrics"
   val LoadTimeSensor: String = "GroupPartitionLoadTime"
 
-  private val CURRENT_OFFSET_KEY_SCHEMA_VERSION = 1.toShort
-  private val CURRENT_GROUP_KEY_SCHEMA_VERSION = 2.toShort
-
-  private val OFFSET_COMMIT_KEY_SCHEMA = new Schema(new Field("group", STRING),
-new Field("topic", STRING),
-new Field("partition", INT32))
-  private val OFFSET_KEY_GROUP_FIELD = OFFSET_COMMIT_KEY_SCHEMA.get("group")
-  private val OFFSET_KEY_TOPIC_FIELD = OFFSET_COMMIT_KEY_SCHEMA.get("topic")
-  private val OFFSET_KEY_PARTITION_FIELD = 
OFFSET_COMMIT_KEY_SCHEMA.get("partition")
-
-  private val OFFSET_COMMIT_VALUE_SCHEMA_V0 = new Schema(new Field("offset", 
INT64),
-new Field("metadata", STRING, "Associated metadata.", ""),
-new Field("timestamp", INT64))
-  private val OFFSET_VALUE_OFFSET_FIELD_V0 = 
OFFSET_COMMIT_VALUE_SCHEMA_V0.get("offset")
-  private val OFFSET_VALUE_METADATA_FIELD_V0 = 
OFFSET_COMMIT_VALUE_SCHEMA_V0.get("metadata")
-  private val OFFSET_VALUE_TIMESTAMP_FIELD_V0 = 
OFFSET_COMMIT_VALUE_SCHEMA_V0.get("timestamp")
-
-  private val OFFSET_COMMIT_VALUE_SCHEMA_V1 = new Schema(new Field("offset", 
INT64),
-new Field("metadata", STRING, "Associated metadata.", ""),
-new Field("commit_timestamp", INT64),
-new Field("expire_timestamp", INT64))
-  private val OFFSET_VALUE_OFFSET_FIELD_V1 = 
OFFSET_COMMIT_VALUE_SCHEMA_V1.get("offset")
-  private val OFFSET_VALUE_METADATA_FIELD_V1 = 
OFFSET_COMMIT_VALUE_SCHEMA_V1.get("metadata")
-  private val OFFSET_VALUE_COMMIT_TIMESTAMP_FIELD_V1 = 
OFFSET_COMMIT_VALUE_SCHEMA_V1.get("commit_timestamp")
-  private val OFFSET_VALUE_EXPIRE_TIMESTAMP_FIELD_V1 = 
OFFSET_COMMIT_VALUE_SCHEMA_V1.get("expire_timestamp")
-
-  private val OFFSET_COMMIT_VALUE_SCHEMA_V2 = new Schema(new Field("offset", 
INT64),
-new Field("metadata", STRING, "Associated metadata.", ""),
-new Field("commit_timestamp", INT64))
-  private val OFFSET_VALUE_OFFSET_FIELD_V2 = 
OFFSET_COMMIT_VALUE_SCHEMA_V2.get("offset")
-  private val OFFSET_VALUE_METADATA_FIELD_V2 = 
OFFSET_COMMIT_VALUE_SCHEMA_V2.get("metadata")
-  private val OFFSET_VALUE_COMMIT_TIMESTAMP_FIELD_V2 = 
OFFSET_COMMIT_VALUE_SCHEMA_V2.get("commit_timestamp")
-
-  private val OFFSET_COMMIT_VALUE_SCHEMA_V3 = new Schema(
-new Field("offset", INT64),
-new Field("leader_epoch", INT32),
-new Field("metadata", STRING, "Associated metadata.", ""),
-new Field("commit_timestamp", INT64))
-  private val OFFSET_VALUE_OFFSET_FIELD_V3 = 
OFFSET_COMMIT_VALUE_SCHEMA_V3.get("offset")
-  private val OFFSET_VALUE_LEADER_EPOCH_FIELD_V3 = 
OFFSET_COMMIT_VALUE_SCHEMA_V3.get("leader_epoch")
-  private val OFFSET_VALUE_METADATA_FIELD_V3 = 
OFFSET_COMMIT_VALUE_SCHEMA_V3.get("metadata")
-  private val OFFSET_VALUE_COMMIT_TIMESTAMP_FIELD_V3 = 
OFFSET_COMMIT_VALUE_SCHEMA_V3.get("commit_timestamp")
-
-  private val GROUP_METADATA_KEY_SCHEMA = new Schema(new Field("group", 
STRING))
-  private val GROUP_KEY_GROUP_FIELD = GROUP_METADATA_KEY_SCHEMA.get("group")
-
-  private val MEMBER_ID_KEY = "member_id"
-  private val GROUP_INSTANCE_ID_KEY = "group_instance_id"
-  private val CLIENT_ID_KEY = "client_id"
-  private val CLIENT_HOST_KEY = "client_host"
-  private val REBALANCE_TIMEOUT_KEY = "rebalance_timeout"
-  private val SESSION_TIMEOUT_KEY = "session_timeout"
-  private val SUBSCRIPTION_KEY = "subscription"
-  private val ASSIGNMENT_KEY = "assignment"
-
-  private val MEMBER_METADATA_V0 = new Schema(
-new Field(MEMBER_ID_KEY, STRING),
-new Field(CLIENT_ID_KEY, STRING),
-new Field(CLIENT_HOST_KEY, STRING),
-new Field(SESSION_TIMEOUT_KEY, INT32),
-new Field(SUBSCRIPTION_KEY, BYTES),
-new Field(ASSIGNMENT_KEY, BYTES))
-
-  private val MEMBER_METADATA_V1 = new Schema(
-new Field(MEMBER_ID_KEY, STRING),
-new Field(CLIENT_ID_KEY, STRING),
-new Field(CLIENT_HOST_KEY, STRING),
-new Field(REBALANCE_TIMEOUT_KEY, INT32),
-new Field(SESSION_TIMEOUT_KEY, INT32),
-new Field(SUBSCRIPTION_KEY, BYTES),
-new Field(ASSIGNMENT_KEY, BYTES))
-
-  private val MEMBER_METADATA_V2 = MEMBER_METADATA_V1
-
-  private val MEMBER_METADATA_V3 = new Schema(
-new Field(MEMBER_ID_KEY, STRING),
-new Field(GROUP_INSTANCE_ID_KEY, NULLABLE_STRING),
-new Field(CLIENT_ID_KEY, STRING),
-new Field(CLIENT_HOST_KEY, STRING),
-new Field(REBALANCE_TIMEOUT_KEY, INT32),
-new Field(SESSION_TIMEOUT_KEY, INT32),
-new Field(SUBSCRIPTION_KEY, BYTES),
-new Field(ASSIGNMENT_KEY, BYTES))
-
-  private val PROTOCOL_TYPE_KEY = "protocol_type"
-  private val GENERATION_KEY = "generation"
-  private val PROTOCOL_KEY = "protocol"
-  private val LEADER_KEY = "leader"
-  

[GitHub] [kafka] dajac commented on a change in pull request #9318: KAFKA-10497 Convert group coordinator metadata schemas to use generat…

2020-09-22 Thread GitBox


dajac commented on a change in pull request #9318:
URL: https://github.com/apache/kafka/pull/9318#discussion_r492661437



##
File path: 
core/src/main/scala/kafka/coordinator/group/GroupMetadataManager.scala
##
@@ -997,189 +997,52 @@ object GroupMetadataManager {
   val MetricsGroup: String = "group-coordinator-metrics"
   val LoadTimeSensor: String = "GroupPartitionLoadTime"
 
-  private val CURRENT_OFFSET_KEY_SCHEMA_VERSION = 1.toShort
-  private val CURRENT_GROUP_KEY_SCHEMA_VERSION = 2.toShort
-
-  private val OFFSET_COMMIT_KEY_SCHEMA = new Schema(new Field("group", STRING),
-new Field("topic", STRING),
-new Field("partition", INT32))
-  private val OFFSET_KEY_GROUP_FIELD = OFFSET_COMMIT_KEY_SCHEMA.get("group")
-  private val OFFSET_KEY_TOPIC_FIELD = OFFSET_COMMIT_KEY_SCHEMA.get("topic")
-  private val OFFSET_KEY_PARTITION_FIELD = 
OFFSET_COMMIT_KEY_SCHEMA.get("partition")
-
-  private val OFFSET_COMMIT_VALUE_SCHEMA_V0 = new Schema(new Field("offset", 
INT64),
-new Field("metadata", STRING, "Associated metadata.", ""),
-new Field("timestamp", INT64))
-  private val OFFSET_VALUE_OFFSET_FIELD_V0 = 
OFFSET_COMMIT_VALUE_SCHEMA_V0.get("offset")
-  private val OFFSET_VALUE_METADATA_FIELD_V0 = 
OFFSET_COMMIT_VALUE_SCHEMA_V0.get("metadata")
-  private val OFFSET_VALUE_TIMESTAMP_FIELD_V0 = 
OFFSET_COMMIT_VALUE_SCHEMA_V0.get("timestamp")
-
-  private val OFFSET_COMMIT_VALUE_SCHEMA_V1 = new Schema(new Field("offset", 
INT64),
-new Field("metadata", STRING, "Associated metadata.", ""),
-new Field("commit_timestamp", INT64),
-new Field("expire_timestamp", INT64))
-  private val OFFSET_VALUE_OFFSET_FIELD_V1 = 
OFFSET_COMMIT_VALUE_SCHEMA_V1.get("offset")
-  private val OFFSET_VALUE_METADATA_FIELD_V1 = 
OFFSET_COMMIT_VALUE_SCHEMA_V1.get("metadata")
-  private val OFFSET_VALUE_COMMIT_TIMESTAMP_FIELD_V1 = 
OFFSET_COMMIT_VALUE_SCHEMA_V1.get("commit_timestamp")
-  private val OFFSET_VALUE_EXPIRE_TIMESTAMP_FIELD_V1 = 
OFFSET_COMMIT_VALUE_SCHEMA_V1.get("expire_timestamp")
-
-  private val OFFSET_COMMIT_VALUE_SCHEMA_V2 = new Schema(new Field("offset", 
INT64),
-new Field("metadata", STRING, "Associated metadata.", ""),
-new Field("commit_timestamp", INT64))
-  private val OFFSET_VALUE_OFFSET_FIELD_V2 = 
OFFSET_COMMIT_VALUE_SCHEMA_V2.get("offset")
-  private val OFFSET_VALUE_METADATA_FIELD_V2 = 
OFFSET_COMMIT_VALUE_SCHEMA_V2.get("metadata")
-  private val OFFSET_VALUE_COMMIT_TIMESTAMP_FIELD_V2 = 
OFFSET_COMMIT_VALUE_SCHEMA_V2.get("commit_timestamp")
-
-  private val OFFSET_COMMIT_VALUE_SCHEMA_V3 = new Schema(
-new Field("offset", INT64),
-new Field("leader_epoch", INT32),
-new Field("metadata", STRING, "Associated metadata.", ""),
-new Field("commit_timestamp", INT64))
-  private val OFFSET_VALUE_OFFSET_FIELD_V3 = 
OFFSET_COMMIT_VALUE_SCHEMA_V3.get("offset")
-  private val OFFSET_VALUE_LEADER_EPOCH_FIELD_V3 = 
OFFSET_COMMIT_VALUE_SCHEMA_V3.get("leader_epoch")
-  private val OFFSET_VALUE_METADATA_FIELD_V3 = 
OFFSET_COMMIT_VALUE_SCHEMA_V3.get("metadata")
-  private val OFFSET_VALUE_COMMIT_TIMESTAMP_FIELD_V3 = 
OFFSET_COMMIT_VALUE_SCHEMA_V3.get("commit_timestamp")
-
-  private val GROUP_METADATA_KEY_SCHEMA = new Schema(new Field("group", 
STRING))
-  private val GROUP_KEY_GROUP_FIELD = GROUP_METADATA_KEY_SCHEMA.get("group")
-
-  private val MEMBER_ID_KEY = "member_id"
-  private val GROUP_INSTANCE_ID_KEY = "group_instance_id"
-  private val CLIENT_ID_KEY = "client_id"
-  private val CLIENT_HOST_KEY = "client_host"
-  private val REBALANCE_TIMEOUT_KEY = "rebalance_timeout"
-  private val SESSION_TIMEOUT_KEY = "session_timeout"
-  private val SUBSCRIPTION_KEY = "subscription"
-  private val ASSIGNMENT_KEY = "assignment"
-
-  private val MEMBER_METADATA_V0 = new Schema(
-new Field(MEMBER_ID_KEY, STRING),
-new Field(CLIENT_ID_KEY, STRING),
-new Field(CLIENT_HOST_KEY, STRING),
-new Field(SESSION_TIMEOUT_KEY, INT32),
-new Field(SUBSCRIPTION_KEY, BYTES),
-new Field(ASSIGNMENT_KEY, BYTES))
-
-  private val MEMBER_METADATA_V1 = new Schema(
-new Field(MEMBER_ID_KEY, STRING),
-new Field(CLIENT_ID_KEY, STRING),
-new Field(CLIENT_HOST_KEY, STRING),
-new Field(REBALANCE_TIMEOUT_KEY, INT32),
-new Field(SESSION_TIMEOUT_KEY, INT32),
-new Field(SUBSCRIPTION_KEY, BYTES),
-new Field(ASSIGNMENT_KEY, BYTES))
-
-  private val MEMBER_METADATA_V2 = MEMBER_METADATA_V1
-
-  private val MEMBER_METADATA_V3 = new Schema(
-new Field(MEMBER_ID_KEY, STRING),
-new Field(GROUP_INSTANCE_ID_KEY, NULLABLE_STRING),
-new Field(CLIENT_ID_KEY, STRING),
-new Field(CLIENT_HOST_KEY, STRING),
-new Field(REBALANCE_TIMEOUT_KEY, INT32),
-new Field(SESSION_TIMEOUT_KEY, INT32),
-new Field(SUBSCRIPTION_KEY, BYTES),
-new Field(ASSIGNMENT_KEY, BYTES))
-
-  private val PROTOCOL_TYPE_KEY = "protocol_type"
-  private val GENERATION_KEY = "generation"
-  private val PROTOCOL_KEY = "protocol"
-  private val LEADER_KEY = "leader"
-  

[GitHub] [kafka] dajac commented on a change in pull request #9318: KAFKA-10497 Convert group coordinator metadata schemas to use generat…

2020-09-22 Thread GitBox


dajac commented on a change in pull request #9318:
URL: https://github.com/apache/kafka/pull/9318#discussion_r492663086



##
File path: 
core/src/main/scala/kafka/coordinator/group/GroupMetadataManager.scala
##
@@ -997,189 +997,52 @@ object GroupMetadataManager {
   val MetricsGroup: String = "group-coordinator-metrics"
   val LoadTimeSensor: String = "GroupPartitionLoadTime"
 
-  private val CURRENT_OFFSET_KEY_SCHEMA_VERSION = 1.toShort
-  private val CURRENT_GROUP_KEY_SCHEMA_VERSION = 2.toShort
-
-  private val OFFSET_COMMIT_KEY_SCHEMA = new Schema(new Field("group", STRING),
-new Field("topic", STRING),
-new Field("partition", INT32))
-  private val OFFSET_KEY_GROUP_FIELD = OFFSET_COMMIT_KEY_SCHEMA.get("group")
-  private val OFFSET_KEY_TOPIC_FIELD = OFFSET_COMMIT_KEY_SCHEMA.get("topic")
-  private val OFFSET_KEY_PARTITION_FIELD = 
OFFSET_COMMIT_KEY_SCHEMA.get("partition")
-
-  private val OFFSET_COMMIT_VALUE_SCHEMA_V0 = new Schema(new Field("offset", 
INT64),
-new Field("metadata", STRING, "Associated metadata.", ""),
-new Field("timestamp", INT64))
-  private val OFFSET_VALUE_OFFSET_FIELD_V0 = 
OFFSET_COMMIT_VALUE_SCHEMA_V0.get("offset")
-  private val OFFSET_VALUE_METADATA_FIELD_V0 = 
OFFSET_COMMIT_VALUE_SCHEMA_V0.get("metadata")
-  private val OFFSET_VALUE_TIMESTAMP_FIELD_V0 = 
OFFSET_COMMIT_VALUE_SCHEMA_V0.get("timestamp")
-
-  private val OFFSET_COMMIT_VALUE_SCHEMA_V1 = new Schema(new Field("offset", 
INT64),
-new Field("metadata", STRING, "Associated metadata.", ""),
-new Field("commit_timestamp", INT64),
-new Field("expire_timestamp", INT64))
-  private val OFFSET_VALUE_OFFSET_FIELD_V1 = 
OFFSET_COMMIT_VALUE_SCHEMA_V1.get("offset")
-  private val OFFSET_VALUE_METADATA_FIELD_V1 = 
OFFSET_COMMIT_VALUE_SCHEMA_V1.get("metadata")
-  private val OFFSET_VALUE_COMMIT_TIMESTAMP_FIELD_V1 = 
OFFSET_COMMIT_VALUE_SCHEMA_V1.get("commit_timestamp")
-  private val OFFSET_VALUE_EXPIRE_TIMESTAMP_FIELD_V1 = 
OFFSET_COMMIT_VALUE_SCHEMA_V1.get("expire_timestamp")
-
-  private val OFFSET_COMMIT_VALUE_SCHEMA_V2 = new Schema(new Field("offset", 
INT64),
-new Field("metadata", STRING, "Associated metadata.", ""),
-new Field("commit_timestamp", INT64))
-  private val OFFSET_VALUE_OFFSET_FIELD_V2 = 
OFFSET_COMMIT_VALUE_SCHEMA_V2.get("offset")
-  private val OFFSET_VALUE_METADATA_FIELD_V2 = 
OFFSET_COMMIT_VALUE_SCHEMA_V2.get("metadata")
-  private val OFFSET_VALUE_COMMIT_TIMESTAMP_FIELD_V2 = 
OFFSET_COMMIT_VALUE_SCHEMA_V2.get("commit_timestamp")
-
-  private val OFFSET_COMMIT_VALUE_SCHEMA_V3 = new Schema(
-new Field("offset", INT64),
-new Field("leader_epoch", INT32),
-new Field("metadata", STRING, "Associated metadata.", ""),
-new Field("commit_timestamp", INT64))
-  private val OFFSET_VALUE_OFFSET_FIELD_V3 = 
OFFSET_COMMIT_VALUE_SCHEMA_V3.get("offset")
-  private val OFFSET_VALUE_LEADER_EPOCH_FIELD_V3 = 
OFFSET_COMMIT_VALUE_SCHEMA_V3.get("leader_epoch")
-  private val OFFSET_VALUE_METADATA_FIELD_V3 = 
OFFSET_COMMIT_VALUE_SCHEMA_V3.get("metadata")
-  private val OFFSET_VALUE_COMMIT_TIMESTAMP_FIELD_V3 = 
OFFSET_COMMIT_VALUE_SCHEMA_V3.get("commit_timestamp")
-
-  private val GROUP_METADATA_KEY_SCHEMA = new Schema(new Field("group", 
STRING))
-  private val GROUP_KEY_GROUP_FIELD = GROUP_METADATA_KEY_SCHEMA.get("group")
-
-  private val MEMBER_ID_KEY = "member_id"
-  private val GROUP_INSTANCE_ID_KEY = "group_instance_id"
-  private val CLIENT_ID_KEY = "client_id"
-  private val CLIENT_HOST_KEY = "client_host"
-  private val REBALANCE_TIMEOUT_KEY = "rebalance_timeout"
-  private val SESSION_TIMEOUT_KEY = "session_timeout"
-  private val SUBSCRIPTION_KEY = "subscription"
-  private val ASSIGNMENT_KEY = "assignment"
-
-  private val MEMBER_METADATA_V0 = new Schema(
-new Field(MEMBER_ID_KEY, STRING),
-new Field(CLIENT_ID_KEY, STRING),
-new Field(CLIENT_HOST_KEY, STRING),
-new Field(SESSION_TIMEOUT_KEY, INT32),
-new Field(SUBSCRIPTION_KEY, BYTES),
-new Field(ASSIGNMENT_KEY, BYTES))
-
-  private val MEMBER_METADATA_V1 = new Schema(
-new Field(MEMBER_ID_KEY, STRING),
-new Field(CLIENT_ID_KEY, STRING),
-new Field(CLIENT_HOST_KEY, STRING),
-new Field(REBALANCE_TIMEOUT_KEY, INT32),
-new Field(SESSION_TIMEOUT_KEY, INT32),
-new Field(SUBSCRIPTION_KEY, BYTES),
-new Field(ASSIGNMENT_KEY, BYTES))
-
-  private val MEMBER_METADATA_V2 = MEMBER_METADATA_V1
-
-  private val MEMBER_METADATA_V3 = new Schema(
-new Field(MEMBER_ID_KEY, STRING),
-new Field(GROUP_INSTANCE_ID_KEY, NULLABLE_STRING),
-new Field(CLIENT_ID_KEY, STRING),
-new Field(CLIENT_HOST_KEY, STRING),
-new Field(REBALANCE_TIMEOUT_KEY, INT32),
-new Field(SESSION_TIMEOUT_KEY, INT32),
-new Field(SUBSCRIPTION_KEY, BYTES),
-new Field(ASSIGNMENT_KEY, BYTES))
-
-  private val PROTOCOL_TYPE_KEY = "protocol_type"
-  private val GENERATION_KEY = "generation"
-  private val PROTOCOL_KEY = "protocol"
-  private val LEADER_KEY = "leader"
-  

[GitHub] [kafka] dajac commented on a change in pull request #9318: KAFKA-10497 Convert group coordinator metadata schemas to use generat…

2020-09-22 Thread GitBox


dajac commented on a change in pull request #9318:
URL: https://github.com/apache/kafka/pull/9318#discussion_r492662801



##
File path: core/src/main/resources/common/message/GroupMetadataValue.json
##
@@ -0,0 +1,103 @@
+// Licensed to the Apache Software Foundation (ASF) under one or more
+// contributor license agreements.  See the NOTICE file distributed with
+// this work for additional information regarding copyright ownership.
+// The ASF licenses this file to You under the Apache License, Version 2.0
+// (the "License"); you may not use this file except in compliance with
+// the License.  You may obtain a copy of the License at
+//
+//http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+
+{
+  "name": "GroupMetadataValue",

Review comment:
   Could we use the same formatting as the request/response? I know that 
the formatting that we use is a bit weird but I think that we should remain 
consistent.





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] dajac commented on a change in pull request #9318: KAFKA-10497 Convert group coordinator metadata schemas to use generat…

2020-09-22 Thread GitBox


dajac commented on a change in pull request #9318:
URL: https://github.com/apache/kafka/pull/9318#discussion_r492661437



##
File path: 
core/src/main/scala/kafka/coordinator/group/GroupMetadataManager.scala
##
@@ -997,189 +997,52 @@ object GroupMetadataManager {
   val MetricsGroup: String = "group-coordinator-metrics"
   val LoadTimeSensor: String = "GroupPartitionLoadTime"
 
-  private val CURRENT_OFFSET_KEY_SCHEMA_VERSION = 1.toShort
-  private val CURRENT_GROUP_KEY_SCHEMA_VERSION = 2.toShort
-
-  private val OFFSET_COMMIT_KEY_SCHEMA = new Schema(new Field("group", STRING),
-new Field("topic", STRING),
-new Field("partition", INT32))
-  private val OFFSET_KEY_GROUP_FIELD = OFFSET_COMMIT_KEY_SCHEMA.get("group")
-  private val OFFSET_KEY_TOPIC_FIELD = OFFSET_COMMIT_KEY_SCHEMA.get("topic")
-  private val OFFSET_KEY_PARTITION_FIELD = 
OFFSET_COMMIT_KEY_SCHEMA.get("partition")
-
-  private val OFFSET_COMMIT_VALUE_SCHEMA_V0 = new Schema(new Field("offset", 
INT64),
-new Field("metadata", STRING, "Associated metadata.", ""),
-new Field("timestamp", INT64))
-  private val OFFSET_VALUE_OFFSET_FIELD_V0 = 
OFFSET_COMMIT_VALUE_SCHEMA_V0.get("offset")
-  private val OFFSET_VALUE_METADATA_FIELD_V0 = 
OFFSET_COMMIT_VALUE_SCHEMA_V0.get("metadata")
-  private val OFFSET_VALUE_TIMESTAMP_FIELD_V0 = 
OFFSET_COMMIT_VALUE_SCHEMA_V0.get("timestamp")
-
-  private val OFFSET_COMMIT_VALUE_SCHEMA_V1 = new Schema(new Field("offset", 
INT64),
-new Field("metadata", STRING, "Associated metadata.", ""),
-new Field("commit_timestamp", INT64),
-new Field("expire_timestamp", INT64))
-  private val OFFSET_VALUE_OFFSET_FIELD_V1 = 
OFFSET_COMMIT_VALUE_SCHEMA_V1.get("offset")
-  private val OFFSET_VALUE_METADATA_FIELD_V1 = 
OFFSET_COMMIT_VALUE_SCHEMA_V1.get("metadata")
-  private val OFFSET_VALUE_COMMIT_TIMESTAMP_FIELD_V1 = 
OFFSET_COMMIT_VALUE_SCHEMA_V1.get("commit_timestamp")
-  private val OFFSET_VALUE_EXPIRE_TIMESTAMP_FIELD_V1 = 
OFFSET_COMMIT_VALUE_SCHEMA_V1.get("expire_timestamp")
-
-  private val OFFSET_COMMIT_VALUE_SCHEMA_V2 = new Schema(new Field("offset", 
INT64),
-new Field("metadata", STRING, "Associated metadata.", ""),
-new Field("commit_timestamp", INT64))
-  private val OFFSET_VALUE_OFFSET_FIELD_V2 = 
OFFSET_COMMIT_VALUE_SCHEMA_V2.get("offset")
-  private val OFFSET_VALUE_METADATA_FIELD_V2 = 
OFFSET_COMMIT_VALUE_SCHEMA_V2.get("metadata")
-  private val OFFSET_VALUE_COMMIT_TIMESTAMP_FIELD_V2 = 
OFFSET_COMMIT_VALUE_SCHEMA_V2.get("commit_timestamp")
-
-  private val OFFSET_COMMIT_VALUE_SCHEMA_V3 = new Schema(
-new Field("offset", INT64),
-new Field("leader_epoch", INT32),
-new Field("metadata", STRING, "Associated metadata.", ""),
-new Field("commit_timestamp", INT64))
-  private val OFFSET_VALUE_OFFSET_FIELD_V3 = 
OFFSET_COMMIT_VALUE_SCHEMA_V3.get("offset")
-  private val OFFSET_VALUE_LEADER_EPOCH_FIELD_V3 = 
OFFSET_COMMIT_VALUE_SCHEMA_V3.get("leader_epoch")
-  private val OFFSET_VALUE_METADATA_FIELD_V3 = 
OFFSET_COMMIT_VALUE_SCHEMA_V3.get("metadata")
-  private val OFFSET_VALUE_COMMIT_TIMESTAMP_FIELD_V3 = 
OFFSET_COMMIT_VALUE_SCHEMA_V3.get("commit_timestamp")
-
-  private val GROUP_METADATA_KEY_SCHEMA = new Schema(new Field("group", 
STRING))
-  private val GROUP_KEY_GROUP_FIELD = GROUP_METADATA_KEY_SCHEMA.get("group")
-
-  private val MEMBER_ID_KEY = "member_id"
-  private val GROUP_INSTANCE_ID_KEY = "group_instance_id"
-  private val CLIENT_ID_KEY = "client_id"
-  private val CLIENT_HOST_KEY = "client_host"
-  private val REBALANCE_TIMEOUT_KEY = "rebalance_timeout"
-  private val SESSION_TIMEOUT_KEY = "session_timeout"
-  private val SUBSCRIPTION_KEY = "subscription"
-  private val ASSIGNMENT_KEY = "assignment"
-
-  private val MEMBER_METADATA_V0 = new Schema(
-new Field(MEMBER_ID_KEY, STRING),
-new Field(CLIENT_ID_KEY, STRING),
-new Field(CLIENT_HOST_KEY, STRING),
-new Field(SESSION_TIMEOUT_KEY, INT32),
-new Field(SUBSCRIPTION_KEY, BYTES),
-new Field(ASSIGNMENT_KEY, BYTES))
-
-  private val MEMBER_METADATA_V1 = new Schema(
-new Field(MEMBER_ID_KEY, STRING),
-new Field(CLIENT_ID_KEY, STRING),
-new Field(CLIENT_HOST_KEY, STRING),
-new Field(REBALANCE_TIMEOUT_KEY, INT32),
-new Field(SESSION_TIMEOUT_KEY, INT32),
-new Field(SUBSCRIPTION_KEY, BYTES),
-new Field(ASSIGNMENT_KEY, BYTES))
-
-  private val MEMBER_METADATA_V2 = MEMBER_METADATA_V1
-
-  private val MEMBER_METADATA_V3 = new Schema(
-new Field(MEMBER_ID_KEY, STRING),
-new Field(GROUP_INSTANCE_ID_KEY, NULLABLE_STRING),
-new Field(CLIENT_ID_KEY, STRING),
-new Field(CLIENT_HOST_KEY, STRING),
-new Field(REBALANCE_TIMEOUT_KEY, INT32),
-new Field(SESSION_TIMEOUT_KEY, INT32),
-new Field(SUBSCRIPTION_KEY, BYTES),
-new Field(ASSIGNMENT_KEY, BYTES))
-
-  private val PROTOCOL_TYPE_KEY = "protocol_type"
-  private val GENERATION_KEY = "generation"
-  private val PROTOCOL_KEY = "protocol"
-  private val LEADER_KEY = "leader"
-  

[jira] [Comment Edited] (KAFKA-10413) rebalancing leads to unevenly balanced connectors

2020-09-22 Thread rameshkrishnan muthusamy (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-10413?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17193402#comment-17193402
 ] 

rameshkrishnan muthusamy edited comment on KAFKA-10413 at 9/22/20, 11:28 AM:
-

This is an issue starting with Kafka version 2.4.x onwards. The only work 
around I could find is to switch back to eager protocol which defeats the 
purpose of incremental cooperative rebalancing all together. 


was (Author: ramkrish1489):
This is an issue starting with Kafka version 2.4.x onwards. The online work 
around I could find is to switch back to eager protocol which defeats the 
purpose of incremental cooperative rebalancing all together. 

> rebalancing leads to unevenly balanced connectors
> -
>
> Key: KAFKA-10413
> URL: https://issues.apache.org/jira/browse/KAFKA-10413
> Project: Kafka
>  Issue Type: Bug
>  Components: KafkaConnect
>Affects Versions: 2.5.1
>Reporter: yazgoo
>Priority: Major
> Attachments: connect_worker_balanced.png
>
>
> Hi,
> With CP 5.5, running kafka connect s3 sink on EC2 whith autoscaling enabled, 
> if a connect instance disappear, or a new one appear, we're seeing unbalanced 
> consumption, much like mentionned in this post:
> [https://stackoverflow.com/questions/58644622/incremental-cooperative-rebalancing-leads-to-unevenly-balanced-connectors]
> This usually leads to one kafka connect instance taking most of the load and 
> consumption not being able to keep on.
> Currently, we're "fixing" this by deleting the connector and re-creating it, 
> but this is far from ideal.
> Any suggestion on what we could do to mitigate this ?



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Commented] (KAFKA-10413) rebalancing leads to unevenly balanced connectors

2020-09-22 Thread rameshkrishnan muthusamy (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-10413?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17200015#comment-17200015
 ] 

rameshkrishnan muthusamy commented on KAFKA-10413:
--

We have PR opened on  this [https://github.com/apache/kafka/pull/9319] 

> rebalancing leads to unevenly balanced connectors
> -
>
> Key: KAFKA-10413
> URL: https://issues.apache.org/jira/browse/KAFKA-10413
> Project: Kafka
>  Issue Type: Bug
>  Components: KafkaConnect
>Affects Versions: 2.5.1
>Reporter: yazgoo
>Priority: Major
> Attachments: connect_worker_balanced.png
>
>
> Hi,
> With CP 5.5, running kafka connect s3 sink on EC2 whith autoscaling enabled, 
> if a connect instance disappear, or a new one appear, we're seeing unbalanced 
> consumption, much like mentionned in this post:
> [https://stackoverflow.com/questions/58644622/incremental-cooperative-rebalancing-leads-to-unevenly-balanced-connectors]
> This usually leads to one kafka connect instance taking most of the load and 
> consumption not being able to keep on.
> Currently, we're "fixing" this by deleting the connector and re-creating it, 
> but this is far from ideal.
> Any suggestion on what we could do to mitigate this ?



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Commented] (KAFKA-7870) Error sending fetch request (sessionId=1578860481, epoch=INITIAL) to node 2: java.io.IOException: Connection to 2 was disconnected before the response was read.

2020-09-22 Thread Gowtham (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-7870?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=1724#comment-1724
 ] 

Gowtham commented on KAFKA-7870:


 
{code:java}
o.a.k.c.FetchSessionHandler pool-18-thread-1 [INFO] [Consumer 
clientId=baa4f80-dbb3-4e2e-8428-0363f974d5f6, groupId=mygroup] Error sending 
fetch request (sessionId=1854680744, epoch=3) to node 6: {}.
org.apache.kafka.common.errors.DisconnectException: null
{code}
 

We are also facing the same in 2.4.0 version. 

> Error sending fetch request (sessionId=1578860481, epoch=INITIAL) to node 2: 
> java.io.IOException: Connection to 2 was disconnected before the response was 
> read.
> 
>
> Key: KAFKA-7870
> URL: https://issues.apache.org/jira/browse/KAFKA-7870
> Project: Kafka
>  Issue Type: Bug
>  Components: clients
>Affects Versions: 2.1.0
>Reporter: Chakhsu Lau
>Priority: Blocker
>
> We build a kafka cluster with 5 brokers. But one of brokers suddenly stopped 
> running during the run. And it happened twice in the same broker. Here is the 
> log and is this a bug in kafka ?
> {code:java}
> [2019-01-25 12:57:14,686] INFO [ReplicaFetcher replicaId=3, leaderId=2, 
> fetcherId=0] Error sending fetch request (sessionId=1578860481, 
> epoch=INITIAL) to node 2: java.io.IOException: Connection to 2 was 
> disconnected before the response was read. 
> (org.apache.kafka.clients.FetchSessionHandler)
> [2019-01-25 12:57:14,687] WARN [ReplicaFetcher replicaId=3, leaderId=2, 
> fetcherId=0] Error in response for fetch request (type=FetchRequest, 
> replicaId=3, maxWait=500, minBytes=1, maxBytes=10485760, 
> fetchData={api-result-bi-heatmap-8=(offset=0, logStartOffset=0, 
> maxBytes=1048576, currentLeaderEpoch=Optional[4]), 
> api-result-bi-heatmap-save-12=(offset=0, logStartOffset=0, maxBytes=1048576, 
> currentLeaderEpoch=Optional[4]), api-result-bi-heatmap-task-2=(offset=2, 
> logStartOffset=0, maxBytes=1048576, currentLeaderEpoch=Optional[4]), 
> api-result-bi-flow-39=(offset=1883206, logStartOffset=0, maxBytes=1048576, 
> currentLeaderEpoch=Optional[4]), __consumer_offsets-47=(offset=349437, 
> logStartOffset=0, maxBytes=1048576, currentLeaderEpoch=Optional[4]), 
> api-result-bi-heatmap-track-6=(offset=1039889, logStartOffset=0, 
> maxBytes=1048576, currentLeaderEpoch=Optional[4]), 
> api-result-bi-heatmap-task-17=(offset=0, logStartOffset=0, maxBytes=1048576, 
> currentLeaderEpoch=Optional[4]), __consumer_offsets-2=(offset=0, 
> logStartOffset=0, maxBytes=1048576, currentLeaderEpoch=Optional[4]), 
> api-result-bi-heatmap-aggs-19=(offset=1255056, logStartOffset=0, 
> maxBytes=1048576, currentLeaderEpoch=Optional[4])}, 
> isolationLevel=READ_UNCOMMITTED, toForget=, metadata=(sessionId=1578860481, 
> epoch=INITIAL)) (kafka.server.ReplicaFetcherThread)
> java.io.IOException: Connection to 2 was disconnected before the response was 
> read
> at 
> org.apache.kafka.clients.NetworkClientUtils.sendAndReceive(NetworkClientUtils.java:97)
> at 
> kafka.server.ReplicaFetcherBlockingSend.sendRequest(ReplicaFetcherBlockingSend.scala:97)
> at 
> kafka.server.ReplicaFetcherThread.fetchFromLeader(ReplicaFetcherThread.scala:190)
> at 
> kafka.server.AbstractFetcherThread.kafka$server$AbstractFetcherThread$$processFetchRequest(AbstractFetcherThread.scala:241)
> at 
> kafka.server.AbstractFetcherThread$$anonfun$maybeFetch$1.apply(AbstractFetcherThread.scala:130)
> at 
> kafka.server.AbstractFetcherThread$$anonfun$maybeFetch$1.apply(AbstractFetcherThread.scala:129)
> at scala.Option.foreach(Option.scala:257)
> at 
> kafka.server.AbstractFetcherThread.maybeFetch(AbstractFetcherThread.scala:129)
> at kafka.server.AbstractFetcherThread.doWork(AbstractFetcherThread.scala:111)
> at kafka.utils.ShutdownableThread.run(ShutdownableThread.scala:82)
> {code}



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[GitHub] [kafka] ramesh-muthusamy opened a new pull request #9319: Allow even distribution of lost/new tasks when more than one worker j…

2020-09-22 Thread GitBox


ramesh-muthusamy opened a new pull request #9319:
URL: https://github.com/apache/kafka/pull/9319


   Allow even distribution of lost/new tasks when more than one worker joins 
the group at the same time
   
   Issue description:
   Existing issue 1 description : When more than one worker joins the consumer 
group the incremental co operative assignor revokes and re assigns atmost 
average number of tasks per worker.
   
   Issue: This results in the additional workers joining the group stay idle 
and would require more future rebalances to happen to have even distribution of 
tasks.
   
   Fix: As part of task assignment calculation following a deployment, the 
reassignment of tasks are calculated by revoking all the tasks above 
ceil(average) number of tasks.
   
   Existing issue 2 description: When more than one worker is lost and rejoins 
the group at most one worker will be re assigned with the lost tasks from all 
the workers that left the group.
   
   Issue: In scenarios where more than one worker is lost and rejoins the group 
only one among them gets assigned all the partitions that were lost in the 
past. The additional workers that have joined would not get any task assigned 
to them until a rebalance that happens in future.
   
   Fix: As part fo lost task re assignment all the new workers that have joined 
the group would be considered for task assignment and would be assigned in a 
round robin fashion with the new tasks.
   
   Testing strategy : System testing in a Kube environment completed.
   UT : updated to UT



This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[jira] [Commented] (KAFKA-10507) Limit the set of APIs returned in pre-authentication ApiVersions

2020-09-22 Thread Magnus Edenhill (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-10507?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=1716#comment-1716
 ] 

Magnus Edenhill commented on KAFKA-10507:
-

librdkafka will only send ApiVersionRequests prior to authentication, so this 
would be a breaking change and thus require an ApiVersion bump, and if we do 
that a client can still use an older ApiVersionRequest version to get the 
supported APIs prior to auth, making this pre-auth filtering a bit pointless.

What is the primary motivation here?

> Limit the set of APIs returned in pre-authentication ApiVersions 
> -
>
> Key: KAFKA-10507
> URL: https://issues.apache.org/jira/browse/KAFKA-10507
> Project: Kafka
>  Issue Type: Improvement
>Reporter: Jason Gustafson
>Assignee: David Jacot
>Priority: Major
>
> We use the ApiVersions RPC to check whether the SaslHandshake and 
> SaslAuthenticate APIs are supported before authenticating with the broker. 
> Currently the response contains all APIs supported by the broker. It seems 
> like a good idea to reduce the set of APIs returned at this level to only 
> those which are supported prior to authentication. 



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Commented] (KAFKA-10497) Convert group/transaction coordinator metadata schemas to use generated protocol

2020-09-22 Thread Chia-Ping Tsai (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-10497?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17199989#comment-17199989
 ] 

Chia-Ping Tsai commented on KAFKA-10497:


[~hachikuji] I have submitted a PR (https://github.com/apache/kafka/pull/9318) 
to convert group coordinator. Will file another PR to convert transaction. 

> Convert group/transaction coordinator metadata schemas to use generated 
> protocol
> 
>
> Key: KAFKA-10497
> URL: https://issues.apache.org/jira/browse/KAFKA-10497
> Project: Kafka
>  Issue Type: Improvement
>Reporter: Jason Gustafson
>Assignee: Chia-Ping Tsai
>Priority: Major
>
> We need to convert the internal schemas used for representing 
> transaction/group metadata to the generated protocol. This opens the door for 
> flexible version support on the next bump. 



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[GitHub] [kafka] chia7712 opened a new pull request #9318: KAFKA-10497 Convert group coordinator metadata schemas to use generat…

2020-09-22 Thread GitBox


chia7712 opened a new pull request #9318:
URL: https://github.com/apache/kafka/pull/9318


   issue: https://issues.apache.org/jira/browse/KAFKA-10497
   
   ### Committer Checklist (excluded from commit message)
   - [ ] Verify design and implementation 
   - [ ] Verify test coverage and CI build status
   - [ ] Verify documentation (including upgrade notes)
   



This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




  1   2   >