Re: [PR] KAFKA-16373: KIP-1028: Adding code to support Apache Kafka Docker Official Images [kafka]

2024-05-22 Thread via GitHub


VedarthConfluent commented on code in PR #16027:
URL: https://github.com/apache/kafka/pull/16027#discussion_r1611042138


##
.github/workflows/prepare_docker_official_image_source.yml:
##
@@ -0,0 +1,53 @@
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements.  See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License.  You may obtain a copy of the License at
+#
+#http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+name: Docker Prepare Docker Official Image Source
+
+on:
+  workflow_dispatch:
+inputs:
+  image_type:
+type: choice
+description: Docker image type to build and test
+options: 
+  - "jvm"
+  kafka_version:
+description: Kafka version for which the source for docker official 
image is to be built
+required: true
+
+jobs:
+  build:
+runs-on: ubuntu-latest
+steps:
+- uses: actions/checkout@v3
+- name: Set up Python 3.10
+  uses: actions/setup-python@v3
+  with:
+python-version: "3.10"
+- name: Install dependencies
+  run: |
+python -m pip install --upgrade pip
+pip install -r docker/requirements.txt
+- name: Build New Artifact

Review Comment:
   nit: Maybe Build Docker Official Image Source?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-16373: KIP-1028: Adding code to support Apache Kafka Docker Official Images [kafka]

2024-05-22 Thread via GitHub


VedarthConfluent commented on code in PR #16027:
URL: https://github.com/apache/kafka/pull/16027#discussion_r1611041547


##
.github/workflows/prepare_docker_official_image_source.yml:
##
@@ -0,0 +1,53 @@
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements.  See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License.  You may obtain a copy of the License at
+#
+#http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+name: Docker Prepare Docker Official Image Source
+
+on:
+  workflow_dispatch:
+inputs:
+  image_type:
+type: choice
+description: Docker image type to build and test
+options: 
+  - "jvm"
+  kafka_version:
+description: Kafka version for which the source for docker official 
image is to be built

Review Comment:
   Same as https://github.com/apache/kafka/pull/16027/files#r1611040507



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-16373: KIP-1028: Adding code to support Apache Kafka Docker Official Images [kafka]

2024-05-22 Thread via GitHub


VedarthConfluent commented on code in PR #16027:
URL: https://github.com/apache/kafka/pull/16027#discussion_r1611040507


##
.github/workflows/docker_official_image_build_and_test.yml:
##
@@ -0,0 +1,66 @@
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements.  See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License.  You may obtain a copy of the License at
+#
+#http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+name: Docker Official Image Build Test
+
+on:
+  workflow_dispatch:
+inputs:
+  image_type:
+type: choice
+description: Docker image type to build and test
+options: 
+  - "jvm"
+  kafka_version:
+description: Kafka version for which the source for docker official 
image is to be built

Review Comment:
   Simplify and mention that version should be >= 3.7.0



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[jira] [Assigned] (KAFKA-16257) SchemaProjector should be extensible to logical types

2024-05-22 Thread Fan Yang (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-16257?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Fan Yang reassigned KAFKA-16257:


Assignee: Fan Yang

> SchemaProjector should be extensible to logical types
> -
>
> Key: KAFKA-16257
> URL: https://issues.apache.org/jira/browse/KAFKA-16257
> Project: Kafka
>  Issue Type: New Feature
>  Components: connect
>Reporter: Greg Harris
>Assignee: Fan Yang
>Priority: Minor
>  Labels: needs-kip
>
> The SchemaProjector currently only supports projecting primitive Number 
> types, and cannot handle common logical types as have proliferated in the 
> Connect ecosystem.
> The SchemaProjector or a replacement should have the ability to extend it's 
> functionality to support these logical types.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Commented] (KAFKA-16820) Kafka Broker fails to connect to Kraft Controller with no DNS matching

2024-05-22 Thread Vikash Mishra (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-16820?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17848821#comment-17848821
 ] 

Vikash Mishra commented on KAFKA-16820:
---

Despite the fact that brokers are provided with controller IPs, it still tries 
to communicate using DNS of controller, refer below error. This is a different 
behavior without Kraft where inter-broker communication when provided with IPs, 
uses IPs to communicate and SSL handshake works using ip_san.
{code:java}
failed due to authentication error with controller 
(kafka.server.NodeToControllerRequestThread)org.apache.kafka.common.errors.SslAuthenticationException:
 SSL handshake failedCaused by: javax.net.ssl.SSLHandshakeException: No subject 
alternative DNS name matching 
cp-internal-onecloud-kfkc1.node.cp-internal-onecloud.consul found.{code}
Is there a configuration to disable & do IPs based communication between broker 
& controller when IPs are provided during bootstrap. or is this the default 
behavior going forward?

looping in [~soarez] [~jlprat] release managers of 3.7.1 & 3.8.0 to confirm if 
the reported issue has already been identified in upcoming releases and a known 
issue?

> Kafka Broker fails to connect to Kraft Controller with no DNS matching 
> ---
>
> Key: KAFKA-16820
> URL: https://issues.apache.org/jira/browse/KAFKA-16820
> Project: Kafka
>  Issue Type: Bug
>  Components: kraft
>Affects Versions: 3.7.0, 3.6.1, 3.8.0
>Reporter: Arushi Helms
>Priority: Major
> Attachments: Screenshot 2024-05-22 at 1.09.11 PM-1.png
>
>
>  
> We are migrating our Kafka cluster from zookeeper to Kraft mode. We are 
> running individual brokers and controllers with TLS enabled and IPs are given 
> for communication. 
> TLS enabled setup works fine among the brokers and the certificate looks 
> something like:
> {noformat}
> Common Name: *.kafka.service.consul
> Subject Alternative Names: *.kafka.service.consul, IP 
> Address:10.87.171.84{noformat}
> Note:
>  * The DNS name for the node does not match the CN but since we are using IPs 
> as communication, we have provided IPs as SAN.
>  * Same with the controllers, IPs are given as SAN in the certificate. 
>  * Issue is not related to the migration so just sharing configuration 
> relevant for the TLS piece. 
> In the current setup I am running 3 brokers and 3 controllers. 
> Relevant controller configurations from one of the controllers:
> {noformat}
> KAFKA_CFG_PROCESS_ROLES=controller 
> KAFKA_KRAFT_CLUSTER_ID=5kztjhJ4SxSu-kdiEYDUow
> KAFKA_CFG_NODE_ID=6 
> KAFKA_CFG_CONTROLLER_QUORUM_VOTERS=4@10.87.170.83:9097,5@10.87.170.9:9097,6@10.87.170.6:9097
>  
> KAFKA_CFG_CONTROLLER_LISTENER_NAMES=CONTROLLER 
> KAFKA_CFG_INTER_BROKER_LISTENER_NAME=INSIDE_SSL
> KAFKA_CFG_LISTENER_SECURITY_PROTOCOL_MAP=CONTROLLER:SSL,INSIDE_SSL:SSL 
> KAFKA_CFG_LISTENERS=CONTROLLER://10.87.170.6:9097{noformat}
> Relevant broker configuration from one of the brokers:
> {noformat}
> KAFKA_CFG_CONTROLLER_LISTENER_NAMES=CONTROLLER
> KAFKA_CFG_INTER_BROKER_LISTENER_NAME=INSIDE_SSL
> KAFKA_CFG_CONTROLLER_QUORUM_VOTERS=4@10.87.170.83:9097,5@10.87.170.9:9097,6@10.87.170.6:9097
>  
> KAFKA_CFG_PROCESS_ROLES=broker 
> KAFKA_CFG_NODE_ID=3 
> KAFKA_CFG_LISTENER_SECURITY_PROTOCOL_MAP=INSIDE_SSL:SSL,OUTSIDE_SSL:SSL,CONTROLLER:SSL
>  
> KAFKA_CFG_LISTENERS=INSIDE_SSL://10.87.170.78:9093,OUTSIDE_SSL://10.87.170.78:9096
>  
> KAFKA_CFG_ADVERTISED_LISTENERS=INSIDE_SSL://10.87.170.78:9093,OUTSIDE_SSL://10.87.170.78:9096{noformat}
>  
> ISSUE 1: 
> With this setup Kafka broker is failing to connect to the controller, see the 
> following error:
> {noformat}
> 2024-05-22 17:53:46,413] ERROR 
> [broker-2-to-controller-heartbeat-channel-manager]: Request 
> BrokerRegistrationRequestData(brokerId=2, clusterId='5kztjhJ4SxSu-kdiEYDUow', 
> incarnationId=7741fgH6T4SQqGsho8E6mw, listeners=[Listener(name='INSIDE_SSL', 
> host='10.87.170.81', port=9093, securityProtocol=1), Listener(name='INSIDE', 
> host='10.87.170.81', port=9094, securityProtocol=0), Listener(name='OUTSIDE', 
> host='10.87.170.81', port=9092, securityProtocol=0), 
> Listener(name='OUTSIDE_SSL', host='10.87.170.81', port=9096, 
> securityProtocol=1)], features=[Feature(name='metadata.version', 
> minSupportedVersion=1, maxSupportedVersion=19)], rack=null, 
> isMigratingZkBroker=false, logDirs=[TJssfKDD-iBFYfIYCKOcew], 
> previousBrokerEpoch=-1) failed due to authentication error with controller 
> (kafka.server.NodeToControllerRequestThread)org.apache.kafka.common.errors.SslAuthenticationException:
>  SSL handshake failedCaused by: javax.net.ssl.SSLHandshakeException: No 
> subject alternative DNS name matching 
> cp-internal-onecloud-kfkc1.node.cp-internal-onecloud.consul found.at 
> 

[PR] Improve logical type compatibility in SchemaProjector [kafka]

2024-05-22 Thread via GitHub


fanyang opened a new pull request, #16035:
URL: https://github.com/apache/kafka/pull/16035

   This PR improved logical type compatibility in SchemaProjector.
   Decimal type with higher scale is compatible with one with same or lower 
scale.
   Timestamp type is compatible with Date and Time type.
   Date and Time types are incompatible with each other.
   
   ### Committer Checklist (excluded from commit message)
   - [ ] Verify design and implementation 
   - [ ] Verify test coverage and CI build status
   - [ ] Verify documentation (including upgrade notes)
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-15541: Add iterator-duration metrics [kafka]

2024-05-22 Thread via GitHub


mjsax merged PR #16028:
URL: https://github.com/apache/kafka/pull/16028


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-15541: Add iterator-duration metrics [kafka]

2024-05-22 Thread via GitHub


mjsax commented on code in PR #16028:
URL: https://github.com/apache/kafka/pull/16028#discussion_r1610944460


##
streams/src/main/java/org/apache/kafka/streams/state/internals/metrics/StateStoreMetrics.java:
##
@@ -149,6 +149,14 @@ private StateStoreMetrics() {}
 private static final String NUM_OPEN_ITERATORS_DESCRIPTION =
 "The current number of iterators on the store that have been 
created, but not yet closed";
 
+private static final String ITERATOR_DURATION = "iterator-duration";
+private static final String ITERATOR_DURATION_DESCRIPTION =
+"time spent between creating an Iterator and closing it, in 
nanoseconds";

Review Comment:
   ```suggestion
   "time spent between creating an iterator and closing it, in 
nanoseconds";
   ```



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-14412: Add statestore.uncommitted.max.bytes [kafka]

2024-05-22 Thread via GitHub


github-actions[bot] commented on PR #15264:
URL: https://github.com/apache/kafka/pull/15264#issuecomment-2126168320

   This PR is being marked as stale since it has not had any activity in 90 
days. If you would like to keep this PR alive, please ask a committer for 
review. If the PR has  merge conflicts, please update it with the latest from 
trunk (or appropriate release branch)  If this PR is no longer valid or 
desired, please feel free to close it. If no activity occurs in the next 30 
days, it will be automatically closed.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-16452: Don't throw OOORE when converting the offset to metadata [kafka]

2024-05-22 Thread via GitHub


kamalcph commented on code in PR #15825:
URL: https://github.com/apache/kafka/pull/15825#discussion_r1610893769


##
core/src/test/scala/integration/kafka/server/DelayedFetchTest.scala:
##
@@ -164,18 +169,71 @@ class DelayedFetchTest {
 assertTrue(delayedFetch.tryComplete())
 assertTrue(delayedFetch.isCompleted)
 assertTrue(fetchResultOpt.isDefined)
+
+val fetchResult = fetchResultOpt.get
+assertEquals(Errors.NONE, fetchResult.error)
+  }
+
+  @ParameterizedTest(name = "testDelayedFetchWithMessageOnlyHighWatermark 
endOffset={0}")
+  @ValueSource(longs = Array(0, 500))
+  def testDelayedFetchWithMessageOnlyHighWatermark(endOffset: Long): Unit = {
+val topicIdPartition = new TopicIdPartition(Uuid.randomUuid(), 0, "topic")
+val fetchOffset = 450L
+val logStartOffset = 5L
+val currentLeaderEpoch = Optional.of[Integer](10)
+val replicaId = 1
+
+val fetchStatus = FetchPartitionStatus(
+  startOffsetMetadata = new LogOffsetMetadata(fetchOffset),
+  fetchInfo = new FetchRequest.PartitionData(topicIdPartition.topicId, 
fetchOffset, logStartOffset, maxBytes, currentLeaderEpoch))
+val fetchParams = buildFollowerFetchParams(replicaId, maxWaitMs = 500)
+
+var fetchResultOpt: Option[FetchPartitionData] = None
+def callback(responses: Seq[(TopicIdPartition, FetchPartitionData)]): Unit 
= {
+  fetchResultOpt = Some(responses.head._2)
+}
+
+val delayedFetch = new DelayedFetch(
+  params = fetchParams,
+  fetchPartitionStatus = Seq(topicIdPartition -> fetchStatus),
+  replicaManager = replicaManager,
+  quota = replicaQuota,
+  responseCallback = callback
+)
+
+val partition: Partition = mock(classOf[Partition])
+
when(replicaManager.getPartitionOrException(topicIdPartition.topicPartition)).thenReturn(partition)
+// Note that the high-watermark does not contain the complete metadata
+val endOffsetMetadata = new LogOffsetMetadata(endOffset, -1L, -1)
+when(partition.fetchOffsetSnapshot(
+  currentLeaderEpoch,
+  fetchOnlyFromLeader = true))
+  .thenReturn(new LogOffsetSnapshot(0L, endOffsetMetadata, 
endOffsetMetadata, endOffsetMetadata))
+when(replicaManager.isAddingReplica(any(), anyInt())).thenReturn(false)
+expectReadFromReplica(fetchParams, topicIdPartition, 
fetchStatus.fetchInfo, Errors.NONE)
+
+// 1. When `endOffset` is 0, it refers to the truncation case
+// 2. When `endOffset` is 500, it refers to the normal case
+val expected = endOffset == 0

Review Comment:
   when fetchOffset > endOffset, `forceComplete` gets called. In this case. 450 
> 0, so it triggers force completion.
   
   This is the only behavior change post the refactor. Previously when 
fetchOffset > endOffset:
   
   1. If the offsets lie on the same segment, then we wait for the end-offset 
to move (or) timeout the request. 
   2. If the endOffset lie on the older segment compared to fetch-offset, then 
we complete the request by calling force-complete. 
   
   We can retain the same behavior if required.
   
   ```
   if (fetchOffset.messageOffset > endOffset.messageOffset) {
 if (endOffset.onOlderSegment(fetchOffset)) {
   // Case F, this can happen when the new fetch operation is on a 
truncated leader
   debug(s"Satisfying fetch $this since it is fetching later segments of 
partition $topicIdPartition.")
   return forceComplete()
 }
   } else if (fetchOffset.messageOffset < endOffset.messageOffset) {
  ...
   ```



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-16452: Don't throw OOORE when converting the offset to metadata [kafka]

2024-05-22 Thread via GitHub


kamalcph commented on code in PR #15825:
URL: https://github.com/apache/kafka/pull/15825#discussion_r1610893769


##
core/src/test/scala/integration/kafka/server/DelayedFetchTest.scala:
##
@@ -164,18 +169,71 @@ class DelayedFetchTest {
 assertTrue(delayedFetch.tryComplete())
 assertTrue(delayedFetch.isCompleted)
 assertTrue(fetchResultOpt.isDefined)
+
+val fetchResult = fetchResultOpt.get
+assertEquals(Errors.NONE, fetchResult.error)
+  }
+
+  @ParameterizedTest(name = "testDelayedFetchWithMessageOnlyHighWatermark 
endOffset={0}")
+  @ValueSource(longs = Array(0, 500))
+  def testDelayedFetchWithMessageOnlyHighWatermark(endOffset: Long): Unit = {
+val topicIdPartition = new TopicIdPartition(Uuid.randomUuid(), 0, "topic")
+val fetchOffset = 450L
+val logStartOffset = 5L
+val currentLeaderEpoch = Optional.of[Integer](10)
+val replicaId = 1
+
+val fetchStatus = FetchPartitionStatus(
+  startOffsetMetadata = new LogOffsetMetadata(fetchOffset),
+  fetchInfo = new FetchRequest.PartitionData(topicIdPartition.topicId, 
fetchOffset, logStartOffset, maxBytes, currentLeaderEpoch))
+val fetchParams = buildFollowerFetchParams(replicaId, maxWaitMs = 500)
+
+var fetchResultOpt: Option[FetchPartitionData] = None
+def callback(responses: Seq[(TopicIdPartition, FetchPartitionData)]): Unit 
= {
+  fetchResultOpt = Some(responses.head._2)
+}
+
+val delayedFetch = new DelayedFetch(
+  params = fetchParams,
+  fetchPartitionStatus = Seq(topicIdPartition -> fetchStatus),
+  replicaManager = replicaManager,
+  quota = replicaQuota,
+  responseCallback = callback
+)
+
+val partition: Partition = mock(classOf[Partition])
+
when(replicaManager.getPartitionOrException(topicIdPartition.topicPartition)).thenReturn(partition)
+// Note that the high-watermark does not contain the complete metadata
+val endOffsetMetadata = new LogOffsetMetadata(endOffset, -1L, -1)
+when(partition.fetchOffsetSnapshot(
+  currentLeaderEpoch,
+  fetchOnlyFromLeader = true))
+  .thenReturn(new LogOffsetSnapshot(0L, endOffsetMetadata, 
endOffsetMetadata, endOffsetMetadata))
+when(replicaManager.isAddingReplica(any(), anyInt())).thenReturn(false)
+expectReadFromReplica(fetchParams, topicIdPartition, 
fetchStatus.fetchInfo, Errors.NONE)
+
+// 1. When `endOffset` is 0, it refers to the truncation case
+// 2. When `endOffset` is 500, it refers to the normal case
+val expected = endOffset == 0

Review Comment:
   when fetchOffset > endOffset, `forceComplete` gets called. In this case. 450 
> 0, so it triggers force completion.
   
   This is the only behavior change post the refactor. Previously when 
fetchOffset > endOffset:
   
   1. If the offsets lie on the same segment, then we wait for the end-offset 
to move (or) timeout the request. 
   2. If they lies on the different segment, then we complete the request by 
calling force-complete. 
   
   We can retain the same behavior if required.
   
   ```
   if (fetchOffset.messageOffset > endOffset.messageOffset) {
 if (endOffset.onOlderSegment(fetchOffset)) {
   // Case F, this can happen when the new fetch operation is on a 
truncated leader
   debug(s"Satisfying fetch $this since it is fetching later segments of 
partition $topicIdPartition.")
   return forceComplete()
 }
   } else if (fetchOffset.messageOffset < endOffset.messageOffset) {
  ...
   ```



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[jira] [Commented] (KAFKA-16811) Punctuate Ratio metric almost impossible to track

2024-05-22 Thread Rohan Desai (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-16811?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17848797#comment-17848797
 ] 

Rohan Desai commented on KAFKA-16811:
-

Alternatively, or perhaps in addition to a pre-windowed value, we can add a 
metric that measures the total time spent in punctuate since the StreamThread 
was created. Then, users can compute the time spent in the window of their 
choice downstream of the application by taking the value of the metric at the 
beginning of the window and subtracting that from the value of the metric at 
the end of the window.

> Punctuate Ratio metric almost impossible to track
> -
>
> Key: KAFKA-16811
> URL: https://issues.apache.org/jira/browse/KAFKA-16811
> Project: Kafka
>  Issue Type: Bug
>  Components: streams
>Affects Versions: 3.7.0
>Reporter: Sebastien Viale
>Priority: Minor
>
> The Punctuate ratio metric is returned after the last record of the poll 
> loop. It is recomputed in every poll loop.
> After a puntuate, the value is close to 1, but there is little chance that 
> metric is sampled at this time. 
> So its value is almost always 0.   
> A solution could be to apply a kind of "sliding window" to it and report the 
> value for the last x seconds.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


Re: [PR] KAFKA-16452: Don't throw OOORE when converting the offset to metadata [kafka]

2024-05-22 Thread via GitHub


kamalcph commented on code in PR #15825:
URL: https://github.com/apache/kafka/pull/15825#discussion_r1610895466


##
core/src/main/scala/kafka/server/DelayedFetch.scala:
##
@@ -91,19 +91,23 @@ class DelayedFetch(
 // Go directly to the check for Case G if the message offsets are 
the same. If the log segment
 // has just rolled, then the high watermark offset will remain the 
same but be on the old segment,
 // which would incorrectly be seen as an instance of Case F.
-if (endOffset.messageOffset != fetchOffset.messageOffset) {
-  if (endOffset.onOlderSegment(fetchOffset)) {
-// Case F, this can happen when the new fetch operation is on 
a truncated leader
-debug(s"Satisfying fetch $this since it is fetching later 
segments of partition $topicIdPartition.")
-return forceComplete()
+if (fetchOffset.messageOffset > endOffset.messageOffset) {
+  // Case F, this can happen when the new fetch operation is on a 
truncated leader
+  debug(s"Satisfying fetch $this since it is fetching later 
segments of partition $topicIdPartition.")
+  return forceComplete()
+} else if (fetchOffset.messageOffset < endOffset.messageOffset) {
+  if (fetchOffset.messageOffsetOnly() || 
endOffset.messageOffsetOnly()) {

Review Comment:
   Not clear on this one. The current `if` checks are easy to read, we can add 
one debug log to avoid the empty `if` block.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-16452: Don't throw OOORE when converting the offset to metadata [kafka]

2024-05-22 Thread via GitHub


kamalcph commented on code in PR #15825:
URL: https://github.com/apache/kafka/pull/15825#discussion_r1610893769


##
core/src/test/scala/integration/kafka/server/DelayedFetchTest.scala:
##
@@ -164,18 +169,71 @@ class DelayedFetchTest {
 assertTrue(delayedFetch.tryComplete())
 assertTrue(delayedFetch.isCompleted)
 assertTrue(fetchResultOpt.isDefined)
+
+val fetchResult = fetchResultOpt.get
+assertEquals(Errors.NONE, fetchResult.error)
+  }
+
+  @ParameterizedTest(name = "testDelayedFetchWithMessageOnlyHighWatermark 
endOffset={0}")
+  @ValueSource(longs = Array(0, 500))
+  def testDelayedFetchWithMessageOnlyHighWatermark(endOffset: Long): Unit = {
+val topicIdPartition = new TopicIdPartition(Uuid.randomUuid(), 0, "topic")
+val fetchOffset = 450L
+val logStartOffset = 5L
+val currentLeaderEpoch = Optional.of[Integer](10)
+val replicaId = 1
+
+val fetchStatus = FetchPartitionStatus(
+  startOffsetMetadata = new LogOffsetMetadata(fetchOffset),
+  fetchInfo = new FetchRequest.PartitionData(topicIdPartition.topicId, 
fetchOffset, logStartOffset, maxBytes, currentLeaderEpoch))
+val fetchParams = buildFollowerFetchParams(replicaId, maxWaitMs = 500)
+
+var fetchResultOpt: Option[FetchPartitionData] = None
+def callback(responses: Seq[(TopicIdPartition, FetchPartitionData)]): Unit 
= {
+  fetchResultOpt = Some(responses.head._2)
+}
+
+val delayedFetch = new DelayedFetch(
+  params = fetchParams,
+  fetchPartitionStatus = Seq(topicIdPartition -> fetchStatus),
+  replicaManager = replicaManager,
+  quota = replicaQuota,
+  responseCallback = callback
+)
+
+val partition: Partition = mock(classOf[Partition])
+
when(replicaManager.getPartitionOrException(topicIdPartition.topicPartition)).thenReturn(partition)
+// Note that the high-watermark does not contain the complete metadata
+val endOffsetMetadata = new LogOffsetMetadata(endOffset, -1L, -1)
+when(partition.fetchOffsetSnapshot(
+  currentLeaderEpoch,
+  fetchOnlyFromLeader = true))
+  .thenReturn(new LogOffsetSnapshot(0L, endOffsetMetadata, 
endOffsetMetadata, endOffsetMetadata))
+when(replicaManager.isAddingReplica(any(), anyInt())).thenReturn(false)
+expectReadFromReplica(fetchParams, topicIdPartition, 
fetchStatus.fetchInfo, Errors.NONE)
+
+// 1. When `endOffset` is 0, it refers to the truncation case
+// 2. When `endOffset` is 500, it refers to the normal case
+val expected = endOffset == 0

Review Comment:
   when fetchOffset > endOffset, `forceComplete` gets called. In this case. 450 
> 0, so it triggers force completion.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[jira] [Commented] (KAFKA-16160) AsyncKafkaConsumer is trying to connect to a disconnected node in a tight loop

2024-05-22 Thread Phuc Hong Tran (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-16160?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17848791#comment-17848791
 ] 

Phuc Hong Tran commented on KAFKA-16160:


[~pnee] I understand 

> AsyncKafkaConsumer is trying to connect to a disconnected node in a tight loop
> --
>
> Key: KAFKA-16160
> URL: https://issues.apache.org/jira/browse/KAFKA-16160
> Project: Kafka
>  Issue Type: Bug
>  Components: clients, consumer
>Reporter: Philip Nee
>Assignee: Phuc Hong Tran
>Priority: Major
>  Labels: consumer-threading-refactor
> Fix For: 3.8.0
>
>
> Observing some excessive logging running AsyncKafkaConsumer and observing 
> excessive logging of :
> {code:java}
> 1271 [2024-01-15 09:43:36,627] DEBUG [Consumer clientId=console-consumer, 
> groupId=concurrent_consumer] Node is not ready, handle the request in the 
> next event loop: node=worker4:9092 (id: 2147483644 rack: null), 
> request=UnsentRequest{requestBuil     
> der=ConsumerGroupHeartbeatRequestData(groupId='concurrent_consumer', 
> memberId='laIqS789StuhXFpTwjh6hA', memberEpoch=1, instanceId=null, 
> rackId=null, rebalanceTimeoutMs=30, subscribedTopicNames=[output-topic], 
> serverAssignor=null, topicP     
> artitions=[TopicPartitions(topicId=I5P5lIXvR1Cjc8hfoJg5bg, partitions=[0])]), 
> handler=org.apache.kafka.clients.consumer.internals.NetworkClientDelegate$FutureCompletionHandler@918925b,
>  node=Optional[worker4:9092 (id: 2147483644 rack: null)]     , 
> timer=org.apache.kafka.common.utils.Timer@55ed4733} 
> (org.apache.kafka.clients.consumer.internals.NetworkClientDelegate) {code}
> This seems to be triggered by a tight poll loop of the network thread.  The 
> right thing to do is to backoff a bit for that given node and retry later.
> This should be a blocker for 3.8 release.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Commented] (KAFKA-14517) Implement regex subscriptions

2024-05-22 Thread Phuc Hong Tran (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-14517?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17848792#comment-17848792
 ] 

Phuc Hong Tran commented on KAFKA-14517:


Thanks again [~lianetm] 

> Implement regex subscriptions
> -
>
> Key: KAFKA-14517
> URL: https://issues.apache.org/jira/browse/KAFKA-14517
> Project: Kafka
>  Issue Type: Sub-task
>Reporter: David Jacot
>Assignee: Lianet Magrans
>Priority: Major
>  Labels: kip-848-preview
>




--
This message was sent by Atlassian Jira
(v8.20.10#820010)


Re: [PR] KAFKA-16771 First log directory printed twice when formatting storage [kafka]

2024-05-22 Thread via GitHub


gongxuanzhang commented on code in PR #16010:
URL: https://github.com/apache/kafka/pull/16010#discussion_r1610875448


##
core/src/test/scala/unit/kafka/tools/StorageToolTest.scala:
##
@@ -488,4 +488,26 @@ Found problem:
   assertEquals(1, exitStatus)
 }
   }
+
+  @Test
+  def testFormatMultiEmptyDirectory(): Unit = {

Review Comment:
   > There is already a similar test case 
`testFormatSucceedsIfAllDirectoriesAreAvailable`, so could you please add new 
check to `testFormatSucceedsIfAllDirectoriesAreAvailable`? for example, we can 
split the `stream#toString` by new line literal and then check the duplicate 
contents.
   
   I think the print order should be consistent with the configuration 
order,it's just as important as format once.
   Additional,printing is done through callbacks,just change `HashMap` to 
`linkedHashMap` seems impossible.
   We will inverted order,  so I think we should  change related `HashMap` and 
`HashSet` to `LinkedHashMap` and `LinkedHashSet`  and create new Copier in each 
loop. 
   About test cases,I will do it 



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-15541: Add num-open-iterators metric [kafka]

2024-05-22 Thread via GitHub


mjsax commented on PR #15975:
URL: https://github.com/apache/kafka/pull/15975#issuecomment-2126049466

   Makes sense. Could you file a Jira for the KIP and the follow up cleanup for 
KS code to use it :) 


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-16709: abortAndPauseCleaning only when future log is not existed [kafka]

2024-05-22 Thread via GitHub


showuon commented on code in PR #15951:
URL: https://github.com/apache/kafka/pull/15951#discussion_r1610854305


##
core/src/main/scala/kafka/server/ReplicaManager.scala:
##
@@ -2114,16 +2114,12 @@ class ReplicaManager(val config: KafkaConfig,
 partition.log.foreach { _ =>
   val leader = BrokerEndPoint(config.brokerId, "localhost", -1)
 
-  // Add future replica log to partition's map
-  partition.createLogIfNotExists(
-isNew = false,
-isFutureReplica = true,
-offsetCheckpoints,
-topicIds(partition.topic))
-
-  // pause cleaning for partitions that are being moved and start 
ReplicaAlterDirThread to move
-  // replica from source dir to destination dir
-  logManager.abortAndPauseCleaning(topicPartition)
+  // Add future replica log to partition's map if it's not existed

Review Comment:
   @chia7712 , thanks for the comment. 
   For this:
   > In short, alterReplicaLogDirs adds alter thread [0] only if it succeeds to 
create future log of partition. Maybe maybeAddLogDirFetchers should follow same 
rule? Or we can add comments to say "that is fine as 
replicaAlterLogDirsManager.addFetcherForPartitions will be a no-op in this case?
   
   I chose latter option because if we only create fetcher when future log is 
inexisted, it might cause potential side effect that this fetcher is removed 
when leadership change, but not get added later. I've added the comment in this 
commit: 
https://github.com/apache/kafka/pull/15951/commits/0d78e493e484dd4f27ba6a127616d802999f22d0
 . Thanks.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-5072[WIP]: Kafka topics should allow custom metadata configs within some config namespace [kafka]

2024-05-22 Thread via GitHub


andreyolv commented on PR #2873:
URL: https://github.com/apache/kafka/pull/2873#issuecomment-2125996595

   Extremely important feature for governance. Any release forecast?


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-15045: (KIP-924 pt. 10) Topic partition rack annotation simplified [kafka]

2024-05-22 Thread via GitHub


apourchet commented on code in PR #16034:
URL: https://github.com/apache/kafka/pull/16034#discussion_r1610816330


##
streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamsPartitionAssignor.java:
##
@@ -513,50 +515,45 @@ private ApplicationState buildApplicationState(final 
TopologyMetadata topologyMe
   + "tasks for source topics vs 
changelog topics.");
 }
 
-final Set sourceTopicPartitions = new HashSet<>();
-final Set nonSourceChangelogTopicPartitions = new 
HashSet<>();
-for (final Map.Entry> entry : 
sourcePartitionsForTask.entrySet()) {
-final TaskId taskId = entry.getKey();
-final Set taskSourcePartitions = entry.getValue();
-final Set taskChangelogPartitions = 
changelogPartitionsForTask.get(taskId);
-final Set taskNonSourceChangelogPartitions = new 
HashSet<>(taskChangelogPartitions);
-taskNonSourceChangelogPartitions.removeAll(taskSourcePartitions);
-
-sourceTopicPartitions.addAll(taskSourcePartitions);
-
nonSourceChangelogTopicPartitions.addAll(taskNonSourceChangelogPartitions);
-}
+final Set logicalTaskIds = 
unmodifiableSet(sourcePartitionsForTask.keySet());
+final Set allTopicPartitions = new 
HashSet<>();
+final Map> topicPartitionsForTask = 
new HashMap<>();
+logicalTaskIds.forEach(taskId -> {
+final Set topicPartitions = new HashSet<>();
+
+for (final TopicPartition topicPartition : 
sourcePartitionsForTask.get(taskId)) {
+final boolean isSource = true;
+final boolean isChangelog = 
changelogPartitionsForTask.get(taskId).contains(topicPartition);
+final DefaultTaskTopicPartition racklessTopicPartition = new 
DefaultTaskTopicPartition(
+topicPartition, isSource, isChangelog, null);
+allTopicPartitions.add(racklessTopicPartition);
+topicPartitionsForTask.get(taskId).add(racklessTopicPartition);

Review Comment:
   Good catch!



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-16598 Mirgrate `ResetConsumerGroupOffsetTest` to new test infra [kafka]

2024-05-22 Thread via GitHub


m1a2st commented on code in PR #15779:
URL: https://github.com/apache/kafka/pull/15779#discussion_r1610809626


##
tools/src/test/java/org/apache/kafka/tools/consumer/group/ResetConsumerGroupOffsetTest.java:
##
@@ -62,506 +86,756 @@
  * - scope=topics+partitions, scenario=to-earliest
  * - export/import
  */
-public class ResetConsumerGroupOffsetTest extends ConsumerGroupCommandTest {
-private String[] basicArgs() {
+@ExtendWith(value = ClusterTestExtensions.class)
+public class ResetConsumerGroupOffsetTest {
+
+private static final String TOPIC_PREFIX = "foo-";
+private static final String GROUP_PREFIX = "test.group-";
+
+private static void generator(ClusterGenerator clusterGenerator) {

Review Comment:
   Ok, I rebase it.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[jira] [Commented] (KAFKA-16814) KRaft broker cannot startup when `partition.metadata` is missing

2024-05-22 Thread Luke Chen (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-16814?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17848781#comment-17848781
 ] 

Luke Chen commented on KAFKA-16814:
---

[~muralibasani] , no, it's not related to remote storage. I didn't enable it.

> KRaft broker cannot startup when `partition.metadata` is missing
> 
>
> Key: KAFKA-16814
> URL: https://issues.apache.org/jira/browse/KAFKA-16814
> Project: Kafka
>  Issue Type: Bug
>Affects Versions: 3.7.0
>Reporter: Luke Chen
>Priority: Major
>
> When starting up kafka logManager, we'll check stray replicas to avoid some 
> corner cases. But this check might cause broker unable to startup if 
> `partition.metadata` is missing because when startup kafka, we load log from 
> file, and the topicId of the log is coming from `partition.metadata` file. 
> So, if `partition.metadata` is missing, the topicId will be None, and the 
> `LogManager#isStrayKraftReplica` will fail with no topicID error.
> The `partition.metadata` missing could be some storage failure, or another 
> possible path is unclean shutdown after topic is created in the replica, but 
> before data is flushed into `partition.metadata` file. This is possible 
> because we do the flush in async way 
> [here|https://github.com/apache/kafka/blob/5552f5c26df4eb07b2d6ee218e4a29e4ca790d5c/core/src/main/scala/kafka/log/UnifiedLog.scala#L229].
>  
>  
> {code:java}
> ERROR Encountered fatal fault: Error starting LogManager 
> (org.apache.kafka.server.fault.ProcessTerminatingFaultHandler)
> java.lang.RuntimeException: The log dir 
> Log(dir=/tmp/kraft-broker-logs/quickstart-events-0, topic=quickstart-events, 
> partition=0, highWatermark=0, lastStableOffset=0, logStartOffset=0, 
> logEndOffset=0) does not have a topic ID, which is not allowed when running 
> in KRaft mode.
>     at 
> kafka.log.LogManager$.$anonfun$isStrayKraftReplica$1(LogManager.scala:1609)
>     at scala.Option.getOrElse(Option.scala:201)
>     at kafka.log.LogManager$.isStrayKraftReplica(LogManager.scala:1608)
>     at 
> kafka.server.metadata.BrokerMetadataPublisher.$anonfun$initializeManagers$1(BrokerMetadataPublisher.scala:294)
>     at 
> kafka.server.metadata.BrokerMetadataPublisher.$anonfun$initializeManagers$1$adapted(BrokerMetadataPublisher.scala:294)
>     at kafka.log.LogManager.loadLog(LogManager.scala:359)
>     at kafka.log.LogManager.$anonfun$loadLogs$15(LogManager.scala:493)
>     at 
> java.base/java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:577)
>     at java.base/java.util.concurrent.FutureTask.run(FutureTask.java:317)
>     at 
> java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1144)
>     at 
> java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:642)
>     at java.base/java.lang.Thread.run(Thread.java:1623) {code}
>  
> Because if we don't do the isStrayKraftReplica check, the topicID and the 
> `partition.metadata` will get recovered after getting topic partition update 
> and becoming leader or follower later. I'm proposing we skip the 
> `isStrayKraftReplica` check if topicID is None, instead of throwing exception 
> to terminate the kafka. `isStrayKraftReplica` check is just for a corner case 
> only, it should be fine IMO.
>  



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


Re: [PR] KAFKA-9228: Restart tasks on runtime-only connector config changes [kafka]

2024-05-22 Thread via GitHub


gharris1727 commented on code in PR #16001:
URL: https://github.com/apache/kafka/pull/16001#discussion_r1610727814


##
connect/runtime/src/main/java/org/apache/kafka/connect/runtime/AbstractHerder.java:
##
@@ -1039,7 +1039,12 @@ public static List> 
reverseTransform(String connName,
 return result;
 }
 
-public boolean taskConfigsChanged(ClusterConfigState configState, String 
connName, List> taskProps) {
+public boolean taskConfigsChanged(
+ClusterConfigState configState,
+String connName,
+List> taskProps,
+ConfigHash connectorConfigHash

Review Comment:
   optional: You could use the old signature, and compute ConfigHash from the 
configState variable.
   
   This would switch the AbstractHerderTest cases to test the real hash 
algorithm, rather than using mocked hashes, and less mocking seems better to me.



##
connect/runtime/src/main/java/org/apache/kafka/connect/runtime/ConfigHash.java:
##
@@ -0,0 +1,245 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.kafka.connect.runtime;
+
+import com.fasterxml.jackson.databind.ObjectMapper;
+import org.apache.kafka.common.utils.Utils;
+import org.apache.kafka.connect.data.Struct;
+import org.apache.kafka.connect.errors.ConnectException;
+import org.apache.kafka.connect.util.ConnectUtils;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import javax.ws.rs.core.HttpHeaders;
+import java.io.IOException;
+import java.util.Map;
+import java.util.TreeMap;
+
+/**
+ * A deterministic hash of a connector configuration. This can be used to 
detect changes
+ * in connector configurations across worker lifetimes, which is sometimes 
necessary when
+ * connectors are reconfigured in a way that affects their tasks' runtime 
behavior but does
+ * not affect their tasks' configurations (for example, changing the key 
converter class).
+ *
+ * @see https://issues.apache.org/jira/browse/KAFKA-9228;>KAFKA-9228
+ */
+public class ConfigHash {
+
+private static final Logger log = 
LoggerFactory.getLogger(ConfigHash.class);
+
+public static final ConfigHash NO_HASH = new ConfigHash(null);
+public static final String CONNECTOR_CONFIG_HASH_HEADER = 
"X-Connect-Connector-Config-Hash";
+
+private static final ObjectMapper OBJECT_MAPPER = new ObjectMapper();
+
+private final Integer hash;
+
+// Visible for testing
+ConfigHash(Integer hash) {
+this.hash = hash;
+}
+
+/**
+ * Read and parse a hash from the headers of a REST request.
+ *
+ * @param connector the name of the connector; only used for error logging
+ *  purposes and may be null
+ * @param headers the headers from which to read and parse the hash;
+ *may be null
+ *
+ * @return the parsed hash; never null, but may be {@link #NO_HASH} if
+ * no hash header is present
+ *
+ * @throws ConnectException if the expected header is present for the hash,
+ * but it cannot be parsed as a 32-bit signed integer
+ */
+public static ConfigHash fromHeaders(String connector, HttpHeaders 
headers) {
+if (headers == null)
+return NO_HASH;
+
+String header = headers.getHeaderString(CONNECTOR_CONFIG_HASH_HEADER);
+if (header == null)
+return NO_HASH;
+
+int hash;
+try {
+hash = Integer.parseInt(header);
+} catch (NumberFormatException e) {
+if (connector == null)
+connector = "";
+
+if (log.isTraceEnabled()) {
+log.error("Invalid connector config hash header for connector 
{}", connector);

Review Comment:
   nit: error log in trace if-condition



##
connect/runtime/src/main/java/org/apache/kafka/connect/runtime/ConfigHash.java:
##
@@ -0,0 +1,245 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not 

Re: [PR] KAFKA-16764: New consumer should throw InvalidTopicException on poll when invalid topic in metadata [kafka]

2024-05-22 Thread via GitHub


appchemist commented on PR #15961:
URL: https://github.com/apache/kafka/pull/15961#issuecomment-2125950049

   @lianetm I got it, Thank you!
   @kirktrue Right! I think what you said is simple and more intuitive.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[jira] [Updated] (KAFKA-15630) Improve documentation of offset.lag.max

2024-05-22 Thread Greg Harris (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-15630?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Greg Harris updated KAFKA-15630:

Labels: newbie  (was: )

> Improve documentation of offset.lag.max
> ---
>
> Key: KAFKA-15630
> URL: https://issues.apache.org/jira/browse/KAFKA-15630
> Project: Kafka
>  Issue Type: Improvement
>  Components: docs, mirrormaker
>Reporter: Mickael Maison
>Priority: Major
>  Labels: newbie
>
> It would be good to expand on the role of this configuration on offset 
> translation and mention that it can be set to a smaller value, or even 0, to 
> help in scenarios when records may not flow constantly.
> The documentation string is here: 
> [https://github.com/apache/kafka/blob/06739d5aa026e7db62ff0bd7da57e079cca35f07/connect/mirror/src/main/java/org/apache/kafka/connect/mirror/MirrorSourceConfig.java#L104]
>  



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Updated] (KAFKA-15630) Improve documentation of offset.lag.max

2024-05-22 Thread Greg Harris (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-15630?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Greg Harris updated KAFKA-15630:

Description: 
It would be good to expand on the role of this configuration on offset 
translation and mention that it can be set to a smaller value, or even 0, to 
help in scenarios when records may not flow constantly.

The documentation string is here: 
[https://github.com/apache/kafka/blob/06739d5aa026e7db62ff0bd7da57e079cca35f07/connect/mirror/src/main/java/org/apache/kafka/connect/mirror/MirrorSourceConfig.java#L104]
 

  was:It would be good to expand on the role of this configuration on offset 
translation and mention that it can be set to a smaller value, or even 0, to 
help in scenarios when records may not flow constantly.


> Improve documentation of offset.lag.max
> ---
>
> Key: KAFKA-15630
> URL: https://issues.apache.org/jira/browse/KAFKA-15630
> Project: Kafka
>  Issue Type: Improvement
>  Components: docs, mirrormaker
>Reporter: Mickael Maison
>Priority: Major
>
> It would be good to expand on the role of this configuration on offset 
> translation and mention that it can be set to a smaller value, or even 0, to 
> help in scenarios when records may not flow constantly.
> The documentation string is here: 
> [https://github.com/apache/kafka/blob/06739d5aa026e7db62ff0bd7da57e079cca35f07/connect/mirror/src/main/java/org/apache/kafka/connect/mirror/MirrorSourceConfig.java#L104]
>  



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Commented] (KAFKA-16798) Mirrormaker2 dedicated mode - sync.group.offsets.interval not working

2024-05-22 Thread Greg Harris (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-16798?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17848770#comment-17848770
 ] 

Greg Harris commented on KAFKA-16798:
-

Hi [~sektor.coder] There's already a ticket to improve the in-line 
documentation: KAFKA-15630 but it hasn't had any activity on it. If you're 
interested please pick that ticket up, as documentation writing is best done 
with fresh eyes.

> Mirrormaker2 dedicated mode - sync.group.offsets.interval not working
> -
>
> Key: KAFKA-16798
> URL: https://issues.apache.org/jira/browse/KAFKA-16798
> Project: Kafka
>  Issue Type: Bug
>  Components: mirrormaker
>Affects Versions: 3.7.0
>Reporter: Thanos Athanasopoulos
>Priority: Major
>
> Single instance MirrorMaker2 in dedicated mode, active passive replication 
> logic.
> sync.group.offsets.interval.seconds=2 configuration is enabled and active
> {noformat}
> [root@x-x ~]# docker logs cc-mm 2>&1 -f | grep -i 
> "auto.commit.interval\|checkpoint.interval\|consumer.commit.interval\|sync.topics.interval\|sync.group.offsets.interval\|offset-syncs.interval.seconds
>                                  "
>         auto.commit.interval.ms = 5000
>         auto.commit.interval.ms = 5000
>         auto.commit.interval.ms = 5000
>         auto.commit.interval.ms = 5000
>         auto.commit.interval.ms = 5000
>         auto.commit.interval.ms = 5000
>         sync.group.offsets.interval.seconds = 2
>         sync.group.offsets.interval.seconds = 2
>         auto.commit.interval.ms = 5000
>         sync.group.offsets.interval.seconds = 2
>         auto.commit.interval.ms = 5000
>         auto.commit.interval.ms = 5000
>         auto.commit.interval.ms = 5000
>         auto.commit.interval.ms = 5000
>         auto.commit.interval.ms = 5000
>         auto.commit.interval.ms = 5000
>         auto.commit.interval.ms = 5000
>         auto.commit.interval.ms = 5000
>         sync.group.offsets.interval.seconds = 2
>         auto.commit.interval.ms = 5000
>         auto.commit.interval.ms = 5000
>         sync.group.offsets.interval.seconds = 2
>         sync.group.offsets.interval.seconds = 2
>         auto.commit.interval.ms = 5000
> {noformat}
> but is not working, the commit of offsets happens *always every 60 seconds* 
> as you can see in the logs
> {noformat}
> [2024-05-20 09:32:44,847] INFO [MirrorSourceConnector|task-0|offsets] 
> WorkerSourceTask{id=MirrorSourceConnector-0} Committing offsets for 20 
> acknowledged messages (org.apache.kafka.connect.runtime.WorkerSourceTask:233)
> [2024-05-20 09:32:44,852] INFO [MirrorSourceConnector|task-1|offsets] 
> WorkerSourceTask{id=MirrorSourceConnector-1} Committing offsets for 12 
> acknowledged messages (org.apache.kafka.connect.runtime.WorkerSourceTask:233)
> [2024-05-20 09:32:44,875] INFO [MirrorHeartbeatConnector|task-0|offsets] 
> WorkerSourceTask{id=MirrorHeartbeatConnector-0} Committing offsets for 12 
> acknowledged messages (org.apache.kafka.connect.runtime.WorkerSourceTask:233)
> [2024-05-20 09:32:44,881] INFO [MirrorCheckpointConnector|task-0|offsets] 
> WorkerSourceTask{id=MirrorCheckpointConnector-0} Committing offsets for 1 
> acknowledged messages (org.apache.kafka.connect.runtime.WorkerSourceTask:233)
> [2024-05-20 09:32:44,890] INFO [MirrorHeartbeatConnector|task-0|offsets] 
> WorkerSourceTask{id=MirrorHeartbeatConnector-0} Committing offsets for 12 
> acknowledged messages (org.apache.kafka.connect.runtime.WorkerSourceTask:233)
> [2024-05-20 09:33:44,850] INFO [MirrorSourceConnector|task-0|offsets] 
> WorkerSourceTask{id=MirrorSourceConnector-0} Committing offsets for 21 
> acknowledged messages (org.apache.kafka.connect.runtime.WorkerSourceTask:233)
> [2024-05-20 09:33:44,854] INFO [MirrorSourceConnector|task-1|offsets] 
> WorkerSourceTask{id=MirrorSourceConnector-1} Committing offsets for 12 
> acknowledged messages (org.apache.kafka.connect.runtime.WorkerSourceTask:233)
> [2024-05-20 09:33:44,878] INFO [MirrorHeartbeatConnector|task-0|offsets] 
> WorkerSourceTask{id=MirrorHeartbeatConnector-0} Committing offsets for 12 
> acknowledged messages (org.apache.kafka.connect.runtime.WorkerSourceTask:233)
> [2024-05-20 09:33:44,883] INFO [MirrorCheckpointConnector|task-0|offsets] 
> WorkerSourceTask{id=MirrorCheckpointConnector-0} Committing offsets for 2 
> acknowledged messages (org.apache.kafka.connect.runtime.WorkerSourceTask:233)
> [2024-05-20 09:33:44,895] INFO [MirrorHeartbeatConnector|task-0|offsets] 
> WorkerSourceTask{id=MirrorHeartbeatConnector-0} Committing offsets for 12 
> acknowledged messages (org.apache.kafka.connect.runtime.WorkerSourceTask:233)
> [2024-05-20 09:34:44,852] INFO [MirrorSourceConnector|task-0|offsets] 
> WorkerSourceTask{id=MirrorSourceConnector-0} Committing offsets for 20 
> 

Re: [PR] KAFKA-15045: (KIP-924 pt. 10) Topic partition rack annotation simplified [kafka]

2024-05-22 Thread via GitHub


ableegoldman commented on code in PR #16034:
URL: https://github.com/apache/kafka/pull/16034#discussion_r1610769106


##
streams/src/main/java/org/apache/kafka/streams/processor/internals/assignment/RackUtils.java:
##
@@ -38,26 +40,37 @@ public final class RackUtils {
 
 private RackUtils() { }
 
-public static Map> 
getRacksForTopicPartition(final Cluster cluster,
- 
final InternalTopicManager internalTopicManager,
- 
final Set topicPartitions,
- 
final boolean isChangelog) {
-final Set topicsToDescribe = new HashSet<>();
-if (isChangelog) {
-
topicsToDescribe.addAll(topicPartitions.stream().map(TopicPartition::topic).collect(
-Collectors.toSet()));
-} else {
-topicsToDescribe.addAll(topicsWithMissingMetadata(cluster, 
topicPartitions));
-}
+public static void annotateWithTopicPartitionsWithRackInfo(final Cluster 
cluster,
+final 
InternalTopicManager internalTopicManager,

Review Comment:
   nit: fix the formatting/indentation (although this will change when you 
change the method name anyway)



##
streams/src/main/java/org/apache/kafka/streams/processor/internals/assignment/RackUtils.java:
##
@@ -38,26 +40,37 @@ public final class RackUtils {
 
 private RackUtils() { }
 
-public static Map> 
getRacksForTopicPartition(final Cluster cluster,
- 
final InternalTopicManager internalTopicManager,
- 
final Set topicPartitions,
- 
final boolean isChangelog) {
-final Set topicsToDescribe = new HashSet<>();
-if (isChangelog) {
-
topicsToDescribe.addAll(topicPartitions.stream().map(TopicPartition::topic).collect(
-Collectors.toSet()));
-} else {
-topicsToDescribe.addAll(topicsWithMissingMetadata(cluster, 
topicPartitions));
-}
+public static void annotateWithTopicPartitionsWithRackInfo(final Cluster 
cluster,

Review Comment:
   this doesn't sound quite right, should it be 
`annotateTopicPartitionsWithRackInfo`? (ie extra "with" )



##
streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamsPartitionAssignor.java:
##
@@ -513,50 +515,45 @@ private ApplicationState buildApplicationState(final 
TopologyMetadata topologyMe
   + "tasks for source topics vs 
changelog topics.");
 }
 
-final Set sourceTopicPartitions = new HashSet<>();
-final Set nonSourceChangelogTopicPartitions = new 
HashSet<>();
-for (final Map.Entry> entry : 
sourcePartitionsForTask.entrySet()) {
-final TaskId taskId = entry.getKey();
-final Set taskSourcePartitions = entry.getValue();
-final Set taskChangelogPartitions = 
changelogPartitionsForTask.get(taskId);
-final Set taskNonSourceChangelogPartitions = new 
HashSet<>(taskChangelogPartitions);
-taskNonSourceChangelogPartitions.removeAll(taskSourcePartitions);
-
-sourceTopicPartitions.addAll(taskSourcePartitions);
-
nonSourceChangelogTopicPartitions.addAll(taskNonSourceChangelogPartitions);
-}
+final Set logicalTaskIds = 
unmodifiableSet(sourcePartitionsForTask.keySet());
+final Set allTopicPartitions = new 
HashSet<>();
+final Map> topicPartitionsForTask = 
new HashMap<>();
+logicalTaskIds.forEach(taskId -> {
+final Set topicPartitions = new HashSet<>();
+
+for (final TopicPartition topicPartition : 
sourcePartitionsForTask.get(taskId)) {
+final boolean isSource = true;
+final boolean isChangelog = 
changelogPartitionsForTask.get(taskId).contains(topicPartition);
+final DefaultTaskTopicPartition racklessTopicPartition = new 
DefaultTaskTopicPartition(
+topicPartition, isSource, isChangelog, null);
+allTopicPartitions.add(racklessTopicPartition);
+topicPartitionsForTask.get(taskId).add(racklessTopicPartition);

Review Comment:
   both here and for the changelog loop below
   
   ```suggestion
   topicPartitions.add(racklessTopicPartition);
   ```



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please 

Re: [PR] KAFKA-15045: (KIP-924 pt. 9) TaskAssignmentUtils implementation of optimizeRackAwareActiveTasks [kafka]

2024-05-22 Thread via GitHub


apourchet commented on code in PR #16033:
URL: https://github.com/apache/kafka/pull/16033#discussion_r1610768911


##
streams/src/main/java/org/apache/kafka/streams/processor/internals/assignment/RackAwareTaskAssignor.java:
##
@@ -486,12 +486,12 @@ public long optimizeStandbyTasks(final SortedMap clientStates
 .sorted()
 .collect(Collectors.toList());
 
-final Map taskClientMap = new HashMap<>();
 final List clients = Stream.of(clientList.get(i), 
clientList.get(j))
 .sorted().collect(
 Collectors.toList());
-final Map originalAssignedTaskNumber = new 
HashMap<>();
 
+final Map taskClientMap = new HashMap<>();
+final Map originalAssignedTaskNumber = new 
HashMap<>();

Review Comment:
   I moved this lower down to show them clearly initialized empty right before 
the function call. It turns out there parameters are used as outputs, which is 
why I added the `AssignmentGraph` inner class to the `TaskAssignmentUtils`, in 
order to simplify the usage of the graph functions.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-15045: (KIP-924 pt. 9) TaskAssignmentUtils implementation of optimizeRackAwareActiveTasks [kafka]

2024-05-22 Thread via GitHub


apourchet commented on code in PR #16033:
URL: https://github.com/apache/kafka/pull/16033#discussion_r1610767256


##
streams/src/main/java/org/apache/kafka/streams/processor/assignment/TaskInfo.java:
##
@@ -50,6 +50,12 @@ public interface TaskInfo {
  */
 Set stateStoreNames();
 
+/**
+ *
+ * @return The topic partitions in use by this task.
+ */
+Set topicPartitions();

Review Comment:
   This is temporary to get things to compile until we merge part 10 
https://github.com/apache/kafka/pull/16034/files



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-15045: (KIP-924 pt. 8) Added TopicPartitionAssignmentInfo [kafka]

2024-05-22 Thread via GitHub


ableegoldman merged PR #16024:
URL: https://github.com/apache/kafka/pull/16024


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[PR] KAFKA-15045: (KIP-924 pt. 10) Topic partition rack annotation simplified [kafka]

2024-05-22 Thread via GitHub


apourchet opened a new pull request, #16034:
URL: https://github.com/apache/kafka/pull/16034

   This PR uses the new TaskTopicPartition structure to simplify the build
   process for the ApplicationState, which is the input to the new
   TaskAssignor#assign call.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[jira] [Comment Edited] (KAFKA-16798) Mirrormaker2 dedicated mode - sync.group.offsets.interval not working

2024-05-22 Thread Thanos Athanasopoulos (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-16798?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17848767#comment-17848767
 ] 

Thanos Athanasopoulos edited comment on KAFKA-16798 at 5/22/24 10:44 PM:
-

[~gharris1727] that was the culprit !

I set offset.lag.max=0 in mm2.properties and now it syncs asap.

Will head for some reading in the resources you shared and then some 
performance tuning in real world scenarios.

I am not sure how things work in Kafka ecosystem regarding documentation, I can 
help improve it or start a kind of KB regarding this case.



Thanks for the support.


was (Author: JIRAUSER305481):
[~gharris1727] that was the culprit !

I set offset.lag.max=0 in mm2.properties and now it syncs asap.

Will head for some reading in the resources you shared and then some 
performance tuning in real world scenarios.

Thank you !

> Mirrormaker2 dedicated mode - sync.group.offsets.interval not working
> -
>
> Key: KAFKA-16798
> URL: https://issues.apache.org/jira/browse/KAFKA-16798
> Project: Kafka
>  Issue Type: Bug
>  Components: mirrormaker
>Affects Versions: 3.7.0
>Reporter: Thanos Athanasopoulos
>Priority: Major
>
> Single instance MirrorMaker2 in dedicated mode, active passive replication 
> logic.
> sync.group.offsets.interval.seconds=2 configuration is enabled and active
> {noformat}
> [root@x-x ~]# docker logs cc-mm 2>&1 -f | grep -i 
> "auto.commit.interval\|checkpoint.interval\|consumer.commit.interval\|sync.topics.interval\|sync.group.offsets.interval\|offset-syncs.interval.seconds
>                                  "
>         auto.commit.interval.ms = 5000
>         auto.commit.interval.ms = 5000
>         auto.commit.interval.ms = 5000
>         auto.commit.interval.ms = 5000
>         auto.commit.interval.ms = 5000
>         auto.commit.interval.ms = 5000
>         sync.group.offsets.interval.seconds = 2
>         sync.group.offsets.interval.seconds = 2
>         auto.commit.interval.ms = 5000
>         sync.group.offsets.interval.seconds = 2
>         auto.commit.interval.ms = 5000
>         auto.commit.interval.ms = 5000
>         auto.commit.interval.ms = 5000
>         auto.commit.interval.ms = 5000
>         auto.commit.interval.ms = 5000
>         auto.commit.interval.ms = 5000
>         auto.commit.interval.ms = 5000
>         auto.commit.interval.ms = 5000
>         sync.group.offsets.interval.seconds = 2
>         auto.commit.interval.ms = 5000
>         auto.commit.interval.ms = 5000
>         sync.group.offsets.interval.seconds = 2
>         sync.group.offsets.interval.seconds = 2
>         auto.commit.interval.ms = 5000
> {noformat}
> but is not working, the commit of offsets happens *always every 60 seconds* 
> as you can see in the logs
> {noformat}
> [2024-05-20 09:32:44,847] INFO [MirrorSourceConnector|task-0|offsets] 
> WorkerSourceTask{id=MirrorSourceConnector-0} Committing offsets for 20 
> acknowledged messages (org.apache.kafka.connect.runtime.WorkerSourceTask:233)
> [2024-05-20 09:32:44,852] INFO [MirrorSourceConnector|task-1|offsets] 
> WorkerSourceTask{id=MirrorSourceConnector-1} Committing offsets for 12 
> acknowledged messages (org.apache.kafka.connect.runtime.WorkerSourceTask:233)
> [2024-05-20 09:32:44,875] INFO [MirrorHeartbeatConnector|task-0|offsets] 
> WorkerSourceTask{id=MirrorHeartbeatConnector-0} Committing offsets for 12 
> acknowledged messages (org.apache.kafka.connect.runtime.WorkerSourceTask:233)
> [2024-05-20 09:32:44,881] INFO [MirrorCheckpointConnector|task-0|offsets] 
> WorkerSourceTask{id=MirrorCheckpointConnector-0} Committing offsets for 1 
> acknowledged messages (org.apache.kafka.connect.runtime.WorkerSourceTask:233)
> [2024-05-20 09:32:44,890] INFO [MirrorHeartbeatConnector|task-0|offsets] 
> WorkerSourceTask{id=MirrorHeartbeatConnector-0} Committing offsets for 12 
> acknowledged messages (org.apache.kafka.connect.runtime.WorkerSourceTask:233)
> [2024-05-20 09:33:44,850] INFO [MirrorSourceConnector|task-0|offsets] 
> WorkerSourceTask{id=MirrorSourceConnector-0} Committing offsets for 21 
> acknowledged messages (org.apache.kafka.connect.runtime.WorkerSourceTask:233)
> [2024-05-20 09:33:44,854] INFO [MirrorSourceConnector|task-1|offsets] 
> WorkerSourceTask{id=MirrorSourceConnector-1} Committing offsets for 12 
> acknowledged messages (org.apache.kafka.connect.runtime.WorkerSourceTask:233)
> [2024-05-20 09:33:44,878] INFO [MirrorHeartbeatConnector|task-0|offsets] 
> WorkerSourceTask{id=MirrorHeartbeatConnector-0} Committing offsets for 12 
> acknowledged messages (org.apache.kafka.connect.runtime.WorkerSourceTask:233)
> [2024-05-20 09:33:44,883] INFO [MirrorCheckpointConnector|task-0|offsets] 
> WorkerSourceTask{id=MirrorCheckpointConnector-0} Committing offsets 

[jira] [Comment Edited] (KAFKA-16798) Mirrormaker2 dedicated mode - sync.group.offsets.interval not working

2024-05-22 Thread Thanos Athanasopoulos (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-16798?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17848767#comment-17848767
 ] 

Thanos Athanasopoulos edited comment on KAFKA-16798 at 5/22/24 10:39 PM:
-

[~gharris1727] that was the culprit !

I set offset.lag.max=0 in mm2.properties and now it syncs asap.

Will head for some reading in the resources you shared and then some 
performance tuning in real world scenarios.

Thank you !


was (Author: JIRAUSER305481):
[~gharris1727] that was the culprit !

I set offset.lag.max=0 in mm2.properties and now it syncs asap.

 

Will head for some reading in the resources you shared and then some 
performance tuning in real world scenarios.

 

Thank you !

> Mirrormaker2 dedicated mode - sync.group.offsets.interval not working
> -
>
> Key: KAFKA-16798
> URL: https://issues.apache.org/jira/browse/KAFKA-16798
> Project: Kafka
>  Issue Type: Bug
>  Components: mirrormaker
>Affects Versions: 3.7.0
>Reporter: Thanos Athanasopoulos
>Priority: Major
>
> Single instance MirrorMaker2 in dedicated mode, active passive replication 
> logic.
> sync.group.offsets.interval.seconds=2 configuration is enabled and active
> {noformat}
> [root@x-x ~]# docker logs cc-mm 2>&1 -f | grep -i 
> "auto.commit.interval\|checkpoint.interval\|consumer.commit.interval\|sync.topics.interval\|sync.group.offsets.interval\|offset-syncs.interval.seconds
>                                  "
>         auto.commit.interval.ms = 5000
>         auto.commit.interval.ms = 5000
>         auto.commit.interval.ms = 5000
>         auto.commit.interval.ms = 5000
>         auto.commit.interval.ms = 5000
>         auto.commit.interval.ms = 5000
>         sync.group.offsets.interval.seconds = 2
>         sync.group.offsets.interval.seconds = 2
>         auto.commit.interval.ms = 5000
>         sync.group.offsets.interval.seconds = 2
>         auto.commit.interval.ms = 5000
>         auto.commit.interval.ms = 5000
>         auto.commit.interval.ms = 5000
>         auto.commit.interval.ms = 5000
>         auto.commit.interval.ms = 5000
>         auto.commit.interval.ms = 5000
>         auto.commit.interval.ms = 5000
>         auto.commit.interval.ms = 5000
>         sync.group.offsets.interval.seconds = 2
>         auto.commit.interval.ms = 5000
>         auto.commit.interval.ms = 5000
>         sync.group.offsets.interval.seconds = 2
>         sync.group.offsets.interval.seconds = 2
>         auto.commit.interval.ms = 5000
> {noformat}
> but is not working, the commit of offsets happens *always every 60 seconds* 
> as you can see in the logs
> {noformat}
> [2024-05-20 09:32:44,847] INFO [MirrorSourceConnector|task-0|offsets] 
> WorkerSourceTask{id=MirrorSourceConnector-0} Committing offsets for 20 
> acknowledged messages (org.apache.kafka.connect.runtime.WorkerSourceTask:233)
> [2024-05-20 09:32:44,852] INFO [MirrorSourceConnector|task-1|offsets] 
> WorkerSourceTask{id=MirrorSourceConnector-1} Committing offsets for 12 
> acknowledged messages (org.apache.kafka.connect.runtime.WorkerSourceTask:233)
> [2024-05-20 09:32:44,875] INFO [MirrorHeartbeatConnector|task-0|offsets] 
> WorkerSourceTask{id=MirrorHeartbeatConnector-0} Committing offsets for 12 
> acknowledged messages (org.apache.kafka.connect.runtime.WorkerSourceTask:233)
> [2024-05-20 09:32:44,881] INFO [MirrorCheckpointConnector|task-0|offsets] 
> WorkerSourceTask{id=MirrorCheckpointConnector-0} Committing offsets for 1 
> acknowledged messages (org.apache.kafka.connect.runtime.WorkerSourceTask:233)
> [2024-05-20 09:32:44,890] INFO [MirrorHeartbeatConnector|task-0|offsets] 
> WorkerSourceTask{id=MirrorHeartbeatConnector-0} Committing offsets for 12 
> acknowledged messages (org.apache.kafka.connect.runtime.WorkerSourceTask:233)
> [2024-05-20 09:33:44,850] INFO [MirrorSourceConnector|task-0|offsets] 
> WorkerSourceTask{id=MirrorSourceConnector-0} Committing offsets for 21 
> acknowledged messages (org.apache.kafka.connect.runtime.WorkerSourceTask:233)
> [2024-05-20 09:33:44,854] INFO [MirrorSourceConnector|task-1|offsets] 
> WorkerSourceTask{id=MirrorSourceConnector-1} Committing offsets for 12 
> acknowledged messages (org.apache.kafka.connect.runtime.WorkerSourceTask:233)
> [2024-05-20 09:33:44,878] INFO [MirrorHeartbeatConnector|task-0|offsets] 
> WorkerSourceTask{id=MirrorHeartbeatConnector-0} Committing offsets for 12 
> acknowledged messages (org.apache.kafka.connect.runtime.WorkerSourceTask:233)
> [2024-05-20 09:33:44,883] INFO [MirrorCheckpointConnector|task-0|offsets] 
> WorkerSourceTask{id=MirrorCheckpointConnector-0} Committing offsets for 2 
> acknowledged messages (org.apache.kafka.connect.runtime.WorkerSourceTask:233)
> [2024-05-20 09:33:44,895] INFO 

[jira] [Comment Edited] (KAFKA-16798) Mirrormaker2 dedicated mode - sync.group.offsets.interval not working

2024-05-22 Thread Thanos Athanasopoulos (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-16798?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17848767#comment-17848767
 ] 

Thanos Athanasopoulos edited comment on KAFKA-16798 at 5/22/24 10:38 PM:
-

[~gharris1727] that was the culprit !

I set offset.lag.max=0 in mm2.properties and now it syncs asap.

 

Will head for some reading in the resources you shared and then some 
performance tuning in real world scenarios.

 

Thank you !


was (Author: JIRAUSER305481):
[~gharris1727] that was the culprit !

I set offset.lag.max=0 in mm2.properties and now it syncs asap.

Will go for some reading in the resources you shared and then some performance 
tuning in real world scenarios.

Thank you !

> Mirrormaker2 dedicated mode - sync.group.offsets.interval not working
> -
>
> Key: KAFKA-16798
> URL: https://issues.apache.org/jira/browse/KAFKA-16798
> Project: Kafka
>  Issue Type: Bug
>  Components: mirrormaker
>Affects Versions: 3.7.0
>Reporter: Thanos Athanasopoulos
>Priority: Major
>
> Single instance MirrorMaker2 in dedicated mode, active passive replication 
> logic.
> sync.group.offsets.interval.seconds=2 configuration is enabled and active
> {noformat}
> [root@x-x ~]# docker logs cc-mm 2>&1 -f | grep -i 
> "auto.commit.interval\|checkpoint.interval\|consumer.commit.interval\|sync.topics.interval\|sync.group.offsets.interval\|offset-syncs.interval.seconds
>                                  "
>         auto.commit.interval.ms = 5000
>         auto.commit.interval.ms = 5000
>         auto.commit.interval.ms = 5000
>         auto.commit.interval.ms = 5000
>         auto.commit.interval.ms = 5000
>         auto.commit.interval.ms = 5000
>         sync.group.offsets.interval.seconds = 2
>         sync.group.offsets.interval.seconds = 2
>         auto.commit.interval.ms = 5000
>         sync.group.offsets.interval.seconds = 2
>         auto.commit.interval.ms = 5000
>         auto.commit.interval.ms = 5000
>         auto.commit.interval.ms = 5000
>         auto.commit.interval.ms = 5000
>         auto.commit.interval.ms = 5000
>         auto.commit.interval.ms = 5000
>         auto.commit.interval.ms = 5000
>         auto.commit.interval.ms = 5000
>         sync.group.offsets.interval.seconds = 2
>         auto.commit.interval.ms = 5000
>         auto.commit.interval.ms = 5000
>         sync.group.offsets.interval.seconds = 2
>         sync.group.offsets.interval.seconds = 2
>         auto.commit.interval.ms = 5000
> {noformat}
> but is not working, the commit of offsets happens *always every 60 seconds* 
> as you can see in the logs
> {noformat}
> [2024-05-20 09:32:44,847] INFO [MirrorSourceConnector|task-0|offsets] 
> WorkerSourceTask{id=MirrorSourceConnector-0} Committing offsets for 20 
> acknowledged messages (org.apache.kafka.connect.runtime.WorkerSourceTask:233)
> [2024-05-20 09:32:44,852] INFO [MirrorSourceConnector|task-1|offsets] 
> WorkerSourceTask{id=MirrorSourceConnector-1} Committing offsets for 12 
> acknowledged messages (org.apache.kafka.connect.runtime.WorkerSourceTask:233)
> [2024-05-20 09:32:44,875] INFO [MirrorHeartbeatConnector|task-0|offsets] 
> WorkerSourceTask{id=MirrorHeartbeatConnector-0} Committing offsets for 12 
> acknowledged messages (org.apache.kafka.connect.runtime.WorkerSourceTask:233)
> [2024-05-20 09:32:44,881] INFO [MirrorCheckpointConnector|task-0|offsets] 
> WorkerSourceTask{id=MirrorCheckpointConnector-0} Committing offsets for 1 
> acknowledged messages (org.apache.kafka.connect.runtime.WorkerSourceTask:233)
> [2024-05-20 09:32:44,890] INFO [MirrorHeartbeatConnector|task-0|offsets] 
> WorkerSourceTask{id=MirrorHeartbeatConnector-0} Committing offsets for 12 
> acknowledged messages (org.apache.kafka.connect.runtime.WorkerSourceTask:233)
> [2024-05-20 09:33:44,850] INFO [MirrorSourceConnector|task-0|offsets] 
> WorkerSourceTask{id=MirrorSourceConnector-0} Committing offsets for 21 
> acknowledged messages (org.apache.kafka.connect.runtime.WorkerSourceTask:233)
> [2024-05-20 09:33:44,854] INFO [MirrorSourceConnector|task-1|offsets] 
> WorkerSourceTask{id=MirrorSourceConnector-1} Committing offsets for 12 
> acknowledged messages (org.apache.kafka.connect.runtime.WorkerSourceTask:233)
> [2024-05-20 09:33:44,878] INFO [MirrorHeartbeatConnector|task-0|offsets] 
> WorkerSourceTask{id=MirrorHeartbeatConnector-0} Committing offsets for 12 
> acknowledged messages (org.apache.kafka.connect.runtime.WorkerSourceTask:233)
> [2024-05-20 09:33:44,883] INFO [MirrorCheckpointConnector|task-0|offsets] 
> WorkerSourceTask{id=MirrorCheckpointConnector-0} Committing offsets for 2 
> acknowledged messages (org.apache.kafka.connect.runtime.WorkerSourceTask:233)
> [2024-05-20 09:33:44,895] INFO 

[jira] [Commented] (KAFKA-16798) Mirrormaker2 dedicated mode - sync.group.offsets.interval not working

2024-05-22 Thread Thanos Athanasopoulos (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-16798?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17848767#comment-17848767
 ] 

Thanos Athanasopoulos commented on KAFKA-16798:
---

[~gharris1727] that was the culprit !

I set offset.lag.max=0 in mm2.properties and now it syncs asap.

Will go for some reading in the resources you shared and then some performance 
tuning in real world scenarios.

Thank you !

> Mirrormaker2 dedicated mode - sync.group.offsets.interval not working
> -
>
> Key: KAFKA-16798
> URL: https://issues.apache.org/jira/browse/KAFKA-16798
> Project: Kafka
>  Issue Type: Bug
>  Components: mirrormaker
>Affects Versions: 3.7.0
>Reporter: Thanos Athanasopoulos
>Priority: Major
>
> Single instance MirrorMaker2 in dedicated mode, active passive replication 
> logic.
> sync.group.offsets.interval.seconds=2 configuration is enabled and active
> {noformat}
> [root@x-x ~]# docker logs cc-mm 2>&1 -f | grep -i 
> "auto.commit.interval\|checkpoint.interval\|consumer.commit.interval\|sync.topics.interval\|sync.group.offsets.interval\|offset-syncs.interval.seconds
>                                  "
>         auto.commit.interval.ms = 5000
>         auto.commit.interval.ms = 5000
>         auto.commit.interval.ms = 5000
>         auto.commit.interval.ms = 5000
>         auto.commit.interval.ms = 5000
>         auto.commit.interval.ms = 5000
>         sync.group.offsets.interval.seconds = 2
>         sync.group.offsets.interval.seconds = 2
>         auto.commit.interval.ms = 5000
>         sync.group.offsets.interval.seconds = 2
>         auto.commit.interval.ms = 5000
>         auto.commit.interval.ms = 5000
>         auto.commit.interval.ms = 5000
>         auto.commit.interval.ms = 5000
>         auto.commit.interval.ms = 5000
>         auto.commit.interval.ms = 5000
>         auto.commit.interval.ms = 5000
>         auto.commit.interval.ms = 5000
>         sync.group.offsets.interval.seconds = 2
>         auto.commit.interval.ms = 5000
>         auto.commit.interval.ms = 5000
>         sync.group.offsets.interval.seconds = 2
>         sync.group.offsets.interval.seconds = 2
>         auto.commit.interval.ms = 5000
> {noformat}
> but is not working, the commit of offsets happens *always every 60 seconds* 
> as you can see in the logs
> {noformat}
> [2024-05-20 09:32:44,847] INFO [MirrorSourceConnector|task-0|offsets] 
> WorkerSourceTask{id=MirrorSourceConnector-0} Committing offsets for 20 
> acknowledged messages (org.apache.kafka.connect.runtime.WorkerSourceTask:233)
> [2024-05-20 09:32:44,852] INFO [MirrorSourceConnector|task-1|offsets] 
> WorkerSourceTask{id=MirrorSourceConnector-1} Committing offsets for 12 
> acknowledged messages (org.apache.kafka.connect.runtime.WorkerSourceTask:233)
> [2024-05-20 09:32:44,875] INFO [MirrorHeartbeatConnector|task-0|offsets] 
> WorkerSourceTask{id=MirrorHeartbeatConnector-0} Committing offsets for 12 
> acknowledged messages (org.apache.kafka.connect.runtime.WorkerSourceTask:233)
> [2024-05-20 09:32:44,881] INFO [MirrorCheckpointConnector|task-0|offsets] 
> WorkerSourceTask{id=MirrorCheckpointConnector-0} Committing offsets for 1 
> acknowledged messages (org.apache.kafka.connect.runtime.WorkerSourceTask:233)
> [2024-05-20 09:32:44,890] INFO [MirrorHeartbeatConnector|task-0|offsets] 
> WorkerSourceTask{id=MirrorHeartbeatConnector-0} Committing offsets for 12 
> acknowledged messages (org.apache.kafka.connect.runtime.WorkerSourceTask:233)
> [2024-05-20 09:33:44,850] INFO [MirrorSourceConnector|task-0|offsets] 
> WorkerSourceTask{id=MirrorSourceConnector-0} Committing offsets for 21 
> acknowledged messages (org.apache.kafka.connect.runtime.WorkerSourceTask:233)
> [2024-05-20 09:33:44,854] INFO [MirrorSourceConnector|task-1|offsets] 
> WorkerSourceTask{id=MirrorSourceConnector-1} Committing offsets for 12 
> acknowledged messages (org.apache.kafka.connect.runtime.WorkerSourceTask:233)
> [2024-05-20 09:33:44,878] INFO [MirrorHeartbeatConnector|task-0|offsets] 
> WorkerSourceTask{id=MirrorHeartbeatConnector-0} Committing offsets for 12 
> acknowledged messages (org.apache.kafka.connect.runtime.WorkerSourceTask:233)
> [2024-05-20 09:33:44,883] INFO [MirrorCheckpointConnector|task-0|offsets] 
> WorkerSourceTask{id=MirrorCheckpointConnector-0} Committing offsets for 2 
> acknowledged messages (org.apache.kafka.connect.runtime.WorkerSourceTask:233)
> [2024-05-20 09:33:44,895] INFO [MirrorHeartbeatConnector|task-0|offsets] 
> WorkerSourceTask{id=MirrorHeartbeatConnector-0} Committing offsets for 12 
> acknowledged messages (org.apache.kafka.connect.runtime.WorkerSourceTask:233)
> [2024-05-20 09:34:44,852] INFO [MirrorSourceConnector|task-0|offsets] 
> WorkerSourceTask{id=MirrorSourceConnector-0} Committing offsets for 20 
> 

Re: [PR] KAFKA-16793: Heartbeat API for upgrading ConsumerGroup [kafka]

2024-05-22 Thread via GitHub


dongnuo123 commented on code in PR #15988:
URL: https://github.com/apache/kafka/pull/15988#discussion_r1610735381


##
group-coordinator/src/test/java/org/apache/kafka/coordinator/group/GroupMetadataManagerTest.java:
##
@@ -12401,6 +12436,363 @@ public void 
testClassicGroupSyncToConsumerGroupRebalanceInProgress() throws Exce
 .withProtocolName("range")
 .build())
 );
+context.assertJoinTimeout(groupId, memberId, 1);
+}
+
+@Test
+public void testClassicGroupHeartbeatToConsumerGroupMaintainsSession() 
throws Exception {
+String groupId = "group-id";
+String memberId = Uuid.randomUuid().toString();
+int sessionTimeout = 5000;
+
+List protocols = 
Collections.singletonList(
+new ConsumerGroupMemberMetadataValue.ClassicProtocol()
+.setName("range")
+
.setMetadata(Utils.toArray(ConsumerProtocol.serializeSubscription(
+new ConsumerPartitionAssignor.Subscription(
+Arrays.asList("foo"),
+null,
+Collections.emptyList()
+)
+)))
+);
+
+// Consumer group with a member using the classic protocol.
+GroupMetadataManagerTestContext context = new 
GroupMetadataManagerTestContext.Builder()
+.withAssignors(Collections.singletonList(new 
MockPartitionAssignor("range")))
+.withConsumerGroup(new ConsumerGroupBuilder(groupId, 10)
+.withMember(new ConsumerGroupMember.Builder(memberId)
+.setClassicMemberMetadata(
+new 
ConsumerGroupMemberMetadataValue.ClassicMemberMetadata()
+.setSessionTimeoutMs(sessionTimeout)
+.setSupportedProtocols(protocols)
+)
+.setMemberEpoch(10)
+.build()))
+.build();
+
+// Heartbeat to schedule the session timeout.
+HeartbeatRequestData request = new HeartbeatRequestData()
+.setGroupId(groupId)
+.setMemberId(memberId)
+.setGenerationId(10);
+context.sendClassicGroupHeartbeat(request);
+context.assertSessionTimeout(groupId, memberId, sessionTimeout);
+
+// Advance clock by 1/2 of session timeout.
+
GroupMetadataManagerTestContext.assertNoOrEmptyResult(context.sleep(sessionTimeout
 / 2));
+
+HeartbeatResponseData heartbeatResponse = 
context.sendClassicGroupHeartbeat(request).response();
+assertEquals(Errors.NONE.code(), heartbeatResponse.errorCode());
+context.assertSessionTimeout(groupId, memberId, sessionTimeout);
+
+// Advance clock by 1/2 of session timeout.
+
GroupMetadataManagerTestContext.assertNoOrEmptyResult(context.sleep(sessionTimeout
 / 2));
+
+heartbeatResponse = 
context.sendClassicGroupHeartbeat(request).response();
+assertEquals(Errors.NONE.code(), heartbeatResponse.errorCode());
+context.assertSessionTimeout(groupId, memberId, sessionTimeout);
+}
+
+@Test
+public void testClassicGroupHeartbeatToConsumerGroupRebalanceInProgress() 
throws Exception {
+String groupId = "group-id";
+String memberId1 = Uuid.randomUuid().toString();
+String memberId2 = Uuid.randomUuid().toString();
+String memberId3 = Uuid.randomUuid().toString();
+Uuid fooTopicId = Uuid.randomUuid();
+Uuid barTopicId = Uuid.randomUuid();
+int sessionTimeout = 5000;
+int rebalanceTimeout = 1;
+
+List protocols = 
Collections.singletonList(
+new ConsumerGroupMemberMetadataValue.ClassicProtocol()
+.setName("range")
+
.setMetadata(Utils.toArray(ConsumerProtocol.serializeSubscription(
+new ConsumerPartitionAssignor.Subscription(
+Arrays.asList("foo"),
+null,
+Collections.emptyList()
+)
+)))
+);
+
+// Member 1 has a member epoch smaller than the group epoch.
+ConsumerGroupMember member1 = new 
ConsumerGroupMember.Builder(memberId1)
+.setRebalanceTimeoutMs(rebalanceTimeout)
+.setClassicMemberMetadata(
+new ConsumerGroupMemberMetadataValue.ClassicMemberMetadata()
+.setSessionTimeoutMs(sessionTimeout)
+.setSupportedProtocols(protocols)
+)
+.setMemberEpoch(9)
+.build();
+
+// Member 2 has unrevoked partition.
+ConsumerGroupMember member2 = new 
ConsumerGroupMember.Builder(memberId2)
+.setState(MemberState.UNREVOKED_PARTITIONS)
+.setRebalanceTimeoutMs(rebalanceTimeout)
+

[jira] [Assigned] (KAFKA-15284) Implement ConsumerGroupProtocolVersionResolver to determine consumer group protocol

2024-05-22 Thread Kirk True (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-15284?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Kirk True reassigned KAFKA-15284:
-

Assignee: (was: Kirk True)

> Implement ConsumerGroupProtocolVersionResolver to determine consumer group 
> protocol
> ---
>
> Key: KAFKA-15284
> URL: https://issues.apache.org/jira/browse/KAFKA-15284
> Project: Kafka
>  Issue Type: New Feature
>  Components: clients, consumer
>Reporter: Kirk True
>Priority: Major
>  Labels: kip-848-client-support
> Fix For: 4.0.0
>
>
> At client initialization, we need to determine which of the 
> {{ConsumerDelegate}} implementations to use:
>  # {{LegacyKafkaConsumerDelegate}}
>  # {{AsyncKafkaConsumerDelegate}}
> There are conditions defined by KIP-848 that determine client eligibility to 
> use the new protocol. This will be modeled by the—deep 
> breath—{{{}ConsumerGroupProtocolVersionResolver{}}}.
> Known tasks:
>  * Determine at what point in the {{Consumer}} initialization the network 
> communication should happen
>  * Determine what RPCs to invoke in order to determine eligibility (API 
> versions, IBP version, etc.)
>  * Implement the network client lifecycle (startup, communication, shutdown, 
> etc.)
>  * Determine the fallback path in case the client is not eligible to use the 
> protocol



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


Re: [PR] KAFKA-16793: Heartbeat API for upgrading ConsumerGroup [kafka]

2024-05-22 Thread via GitHub


dongnuo123 commented on code in PR #15988:
URL: https://github.com/apache/kafka/pull/15988#discussion_r1610729396


##
group-coordinator/src/test/java/org/apache/kafka/coordinator/group/GroupMetadataManagerTest.java:
##
@@ -12143,7 +12183,6 @@ public void 
testClassicGroupSyncToConsumerGroupWithAllConsumerProtocolVersions()
 // Consumer group with two members.
 // Member 1 uses the classic protocol and member 2 uses the 
consumer protocol.
 GroupMetadataManagerTestContext context = new 
GroupMetadataManagerTestContext.Builder()
-
.withConsumerGroupMigrationPolicy(ConsumerGroupMigrationPolicy.DOWNGRADE)

Review Comment:
   It actually doesn't matter. This was first added because I copied and pasted 
it from the downgrade unit test.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-16793: Heartbeat API for upgrading ConsumerGroup [kafka]

2024-05-22 Thread via GitHub


dongnuo123 commented on code in PR #15988:
URL: https://github.com/apache/kafka/pull/15988#discussion_r1610728790


##
group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupMetadataManager.java:
##
@@ -4274,6 +4343,81 @@ private void validateClassicGroupHeartbeat(
 }
 }
 
+/**
+ * Handle a classic group HeartbeatRequest to a consumer group. A response 
with
+ * REBALANCE_IN_PROGRESS is returned if 1) the member epoch is smaller 
than the
+ * group epoch, 2) the member is in UNREVOKED_PARTITIONS, or 3) the member 
is in
+ * UNRELEASED_PARTITIONS and all its partitions pending assignment are 
free.
+ *
+ * @param group  The ConsumerGroup.
+ * @param contextThe request context.
+ * @param requestThe actual Heartbeat request.
+ *
+ * @return The coordinator result that contains the heartbeat response.
+ */
+private CoordinatorResult 
classicGroupHeartbeatToConsumerGroup(
+ConsumerGroup group,
+RequestContext context,
+HeartbeatRequestData request
+) throws UnknownMemberIdException, FencedInstanceIdException, 
IllegalGenerationException {
+String groupId = request.groupId();
+String memberId = request.memberId();
+String instanceId = request.groupInstanceId();
+ConsumerGroupMember member = validateConsumerGroupMember(group, 
memberId, instanceId);
+
+throwIfMemberDoesNotUseClassicProtocol(member);
+throwIfGenerationIdUnmatched(memberId, member.memberEpoch(), 
request.generationId());
+
+scheduleConsumerGroupSessionTimeout(groupId, memberId, 
member.classicProtocolSessionTimeout().get());
+
+Errors error = Errors.NONE;
+// The member should rejoin if any of the following conditions is met.
+// 1) The group epoch is bumped so the member need to rejoin to catch 
up.
+// 2) The member needs to revoke some partitions and rejoin to 
reconcile with the new epoch.
+// 3) The member's partitions pending assignment are free, so it can 
rejoin to get the complete assignment.
+if (member.memberEpoch() < group.groupEpoch() ||
+member.state() == MemberState.UNREVOKED_PARTITIONS ||
+(member.state() == MemberState.UNRELEASED_PARTITIONS && 
!group.waitingOnUnreleasedPartition(member))) {

Review Comment:
   > the helper checks that the latest state does in fact have all partitions 
released but we want it to rejoin to get the updated assignment
   
   Yes this is correct.
   
   > Will this member be updated to STABLE state in the next 
CurrentAssignmentBuilder#computeNextAssignment
   
   Yes it will in the reconciliation part in the 
`classicGroupJoinToConsumerGroup` 
https://github.com/apache/kafka/blob/27a6c156c49e375edea0e6f33a35c64c615db1b5/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupMetadataManager.java#L1737-L1745



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-9228: Restart tasks on runtime-only connector config changes [kafka]

2024-05-22 Thread via GitHub


C0urante commented on code in PR #16001:
URL: https://github.com/apache/kafka/pull/16001#discussion_r1610679966


##
connect/runtime/src/main/java/org/apache/kafka/connect/runtime/ConfigHash.java:
##
@@ -0,0 +1,245 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.kafka.connect.runtime;
+
+import com.fasterxml.jackson.databind.ObjectMapper;
+import org.apache.kafka.common.utils.Utils;
+import org.apache.kafka.connect.data.Struct;
+import org.apache.kafka.connect.errors.ConnectException;
+import org.apache.kafka.connect.util.ConnectUtils;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import javax.ws.rs.core.HttpHeaders;
+import java.io.IOException;
+import java.util.Map;
+import java.util.TreeMap;
+
+/**
+ * A deterministic hash of a connector configuration. This can be used to 
detect changes
+ * in connector configurations across worker lifetimes, which is sometimes 
necessary when
+ * connectors are reconfigured in a way that affects their tasks' runtime 
behavior but does
+ * not affect their tasks' configurations (for example, changing the key 
converter class).
+ *
+ * @see https://issues.apache.org/jira/browse/KAFKA-9228;>KAFKA-9228
+ */
+public class ConfigHash {
+
+private static final Logger log = 
LoggerFactory.getLogger(ConfigHash.class);
+
+public static final ConfigHash NO_HASH = new ConfigHash(null);
+public static final String CONNECTOR_CONFIG_HASH_HEADER = 
"X-Connect-Connector-Config-Hash";
+
+private static final ObjectMapper OBJECT_MAPPER = new ObjectMapper();
+
+private final Integer hash;
+
+// Visible for testing
+ConfigHash(Integer hash) {
+this.hash = hash;
+}
+
+/**
+ * Read and parse a hash from the headers of a REST request.
+ *
+ * @param connector the name of the connector; only used for error logging
+ *  purposes and may be null
+ * @param headers the headers from which to read and parse the hash;
+ *may be null
+ *
+ * @return the parsed hash; never null, but may be {@link #NO_HASH} if
+ * no hash header is present
+ *
+ * @throws ConnectException if the expected header is present for the hash,
+ * but it cannot be parsed as a 32-bit signed integer
+ */
+public static ConfigHash fromHeaders(String connector, HttpHeaders 
headers) {
+if (headers == null)
+return NO_HASH;
+
+String header = headers.getHeaderString(CONNECTOR_CONFIG_HASH_HEADER);
+if (header == null)
+return NO_HASH;
+
+int hash;
+try {
+hash = Integer.parseInt(header);
+} catch (NumberFormatException e) {
+if (connector == null)
+connector = "";
+
+if (log.isTraceEnabled()) {
+log.error("Invalid connector config hash header for connector 
{}", connector);
+log.trace("Invalid connector config hash header for connector 
{}: '{}'", connector, header);
+} else {
+log.error(
+"Invalid connector config hash header for connector 
{}. "
++ "Please enable TRACE logging to see the 
invalid value",
+connector
+);
+}
+throw new ConnectException("Invalid hash header; expected a 32-bit 
signed integer");
+}
+return new ConfigHash(hash);
+}
+
+/**
+ * Generate a deterministic hash from the config. For configurations
+ * with identical key-value pairs, this hash will always be the same, and
+ * {@link #shouldUpdateTasks(ConfigHash, ConfigHash)} will return {@code 
false}
+ * for any two such configurations. Note that, for security reasons, those
+ * {@link ConfigHash} instances will still not {@link #equals(Object) 
equal}
+ * each other.
+ *
+ * @param config the configuration to hash; may be null
+ *
+ * @return the resulting hash; may be {@link #NO_HASH} if the configuration
+ * was null
+ *
+ * @throws ConnectException if the configuration cannot be serialized to 
JSON
+ * for the purposes of hashing
+ 

Re: [PR] KAFKA-9228: Restart tasks on runtime-only connector config changes [kafka]

2024-05-22 Thread via GitHub


C0urante commented on PR #16001:
URL: https://github.com/apache/kafka/pull/16001#issuecomment-2125789945

   After some offline discussion with @gharris1727 we realized there was a 
potential for infinite rebalance loops with the previous implementation when 
config providers resolved values differently depending on the worker on which 
they were running.
   
   Assume some non-leader worker `W` hosts some connector `C`:
   1. `W` computes the hash of a config-provider-resolved connector config
   2. After observing that the hash differs from latest hash present in the 
config topic for that connector, `W` forwards a request to the leader to 
publish task configs for `C` to the config topic
   4. The leader accepts this request, performs its own config provider 
resolution on the connector config, and writes a hash of that 
config-provider-resolved config to the config topic
   5. After the ensuing rebalance, `W` sees a different hash in the config 
topic than the one that it computed in step 1, and the process repeats
   
   In order to address this, I've published a commit that causes non-leader 
workers to send their own connector config hash to the leader as an HTTP 
request header when forwarding task configs to the leader.
   
   I've also introduced a dedicated `ConfigHash` class that should make it 
significantly harder for config hash values to be leaked, including via log 
message. This class does not directly expose the underlying 32-bit integer and 
instead only permits indirect access to the value.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[jira] [Assigned] (KAFKA-16815) Handle FencedInstanceId on heartbeat for new consumer

2024-05-22 Thread Lianet Magrans (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-16815?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Lianet Magrans reassigned KAFKA-16815:
--

Assignee: Lianet Magrans

> Handle FencedInstanceId on heartbeat for new consumer
> -
>
> Key: KAFKA-16815
> URL: https://issues.apache.org/jira/browse/KAFKA-16815
> Project: Kafka
>  Issue Type: Task
>  Components: clients, consumer
>Reporter: Lianet Magrans
>Assignee: Lianet Magrans
>Priority: Major
>  Labels: kip-848-client-support
>
> With the new consumer group protocol, a member could receive a 
> FencedInstanceIdError in the heartbeat response. This could be the case when 
> an active member using a group instance id is removed from the group by an 
> admin client. If a second member joins with the same instance id, the first 
> member will receive a FencedInstanceId on the next heartbeat response. This 
> should be treated as a fatal error (consumer should not attempt to rejoin). 
> Currently, the FencedInstanceId is not explicitly handled by the client in 
> the HeartbeatRequestManager. It ends up being treated as a fatal error, see 
> [here|https://github.com/apache/kafka/blob/5552f5c26df4eb07b2d6ee218e4a29e4ca790d5c/clients/src/main/java/org/apache/kafka/clients/consumer/internals/HeartbeatRequestManager.java#L417]
>  (just because it lands on the "unexpected" error category). We should handle 
> it explicitly, just to make sure that we express that it's is an expected 
> error: log a proper message for it and fail (handleFatalFailure). We should 
> also that the error is included in the tests that cover the HB request error 
> handling 
> ([here|https://github.com/apache/kafka/blob/5552f5c26df4eb07b2d6ee218e4a29e4ca790d5c/clients/src/test/java/org/apache/kafka/clients/consumer/internals/HeartbeatRequestManagerTest.java#L798])
>     



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Resolved] (KAFKA-16160) AsyncKafkaConsumer is trying to connect to a disconnected node in a tight loop

2024-05-22 Thread Philip Nee (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-16160?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Philip Nee resolved KAFKA-16160.

Resolution: Cannot Reproduce

> AsyncKafkaConsumer is trying to connect to a disconnected node in a tight loop
> --
>
> Key: KAFKA-16160
> URL: https://issues.apache.org/jira/browse/KAFKA-16160
> Project: Kafka
>  Issue Type: Bug
>  Components: clients, consumer
>Reporter: Philip Nee
>Assignee: Phuc Hong Tran
>Priority: Major
>  Labels: consumer-threading-refactor
> Fix For: 3.8.0
>
>
> Observing some excessive logging running AsyncKafkaConsumer and observing 
> excessive logging of :
> {code:java}
> 1271 [2024-01-15 09:43:36,627] DEBUG [Consumer clientId=console-consumer, 
> groupId=concurrent_consumer] Node is not ready, handle the request in the 
> next event loop: node=worker4:9092 (id: 2147483644 rack: null), 
> request=UnsentRequest{requestBuil     
> der=ConsumerGroupHeartbeatRequestData(groupId='concurrent_consumer', 
> memberId='laIqS789StuhXFpTwjh6hA', memberEpoch=1, instanceId=null, 
> rackId=null, rebalanceTimeoutMs=30, subscribedTopicNames=[output-topic], 
> serverAssignor=null, topicP     
> artitions=[TopicPartitions(topicId=I5P5lIXvR1Cjc8hfoJg5bg, partitions=[0])]), 
> handler=org.apache.kafka.clients.consumer.internals.NetworkClientDelegate$FutureCompletionHandler@918925b,
>  node=Optional[worker4:9092 (id: 2147483644 rack: null)]     , 
> timer=org.apache.kafka.common.utils.Timer@55ed4733} 
> (org.apache.kafka.clients.consumer.internals.NetworkClientDelegate) {code}
> This seems to be triggered by a tight poll loop of the network thread.  The 
> right thing to do is to backoff a bit for that given node and retry later.
> This should be a blocker for 3.8 release.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Commented] (KAFKA-16160) AsyncKafkaConsumer is trying to connect to a disconnected node in a tight loop

2024-05-22 Thread Philip Nee (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-16160?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17848748#comment-17848748
 ] 

Philip Nee commented on KAFKA-16160:


Hey [~phuctran] - I'm going to close this issue because I haven't been seeing 
this in test logs.  Given that we fixed a few relevant issues like inflight 
logics...

> AsyncKafkaConsumer is trying to connect to a disconnected node in a tight loop
> --
>
> Key: KAFKA-16160
> URL: https://issues.apache.org/jira/browse/KAFKA-16160
> Project: Kafka
>  Issue Type: Bug
>  Components: clients, consumer
>Reporter: Philip Nee
>Assignee: Phuc Hong Tran
>Priority: Major
>  Labels: consumer-threading-refactor
> Fix For: 3.8.0
>
>
> Observing some excessive logging running AsyncKafkaConsumer and observing 
> excessive logging of :
> {code:java}
> 1271 [2024-01-15 09:43:36,627] DEBUG [Consumer clientId=console-consumer, 
> groupId=concurrent_consumer] Node is not ready, handle the request in the 
> next event loop: node=worker4:9092 (id: 2147483644 rack: null), 
> request=UnsentRequest{requestBuil     
> der=ConsumerGroupHeartbeatRequestData(groupId='concurrent_consumer', 
> memberId='laIqS789StuhXFpTwjh6hA', memberEpoch=1, instanceId=null, 
> rackId=null, rebalanceTimeoutMs=30, subscribedTopicNames=[output-topic], 
> serverAssignor=null, topicP     
> artitions=[TopicPartitions(topicId=I5P5lIXvR1Cjc8hfoJg5bg, partitions=[0])]), 
> handler=org.apache.kafka.clients.consumer.internals.NetworkClientDelegate$FutureCompletionHandler@918925b,
>  node=Optional[worker4:9092 (id: 2147483644 rack: null)]     , 
> timer=org.apache.kafka.common.utils.Timer@55ed4733} 
> (org.apache.kafka.clients.consumer.internals.NetworkClientDelegate) {code}
> This seems to be triggered by a tight poll loop of the network thread.  The 
> right thing to do is to backoff a bit for that given node and retry later.
> This should be a blocker for 3.8 release.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Updated] (KAFKA-16160) AsyncKafkaConsumer is trying to connect to a disconnected node in a tight loop

2024-05-22 Thread Jon Chiu (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-16160?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Jon Chiu updated KAFKA-16160:
-
Priority: Major  (was: Blocker)

> AsyncKafkaConsumer is trying to connect to a disconnected node in a tight loop
> --
>
> Key: KAFKA-16160
> URL: https://issues.apache.org/jira/browse/KAFKA-16160
> Project: Kafka
>  Issue Type: Bug
>  Components: clients, consumer
>Reporter: Philip Nee
>Assignee: Phuc Hong Tran
>Priority: Major
>  Labels: consumer-threading-refactor
> Fix For: 3.8.0
>
>
> Observing some excessive logging running AsyncKafkaConsumer and observing 
> excessive logging of :
> {code:java}
> 1271 [2024-01-15 09:43:36,627] DEBUG [Consumer clientId=console-consumer, 
> groupId=concurrent_consumer] Node is not ready, handle the request in the 
> next event loop: node=worker4:9092 (id: 2147483644 rack: null), 
> request=UnsentRequest{requestBuil     
> der=ConsumerGroupHeartbeatRequestData(groupId='concurrent_consumer', 
> memberId='laIqS789StuhXFpTwjh6hA', memberEpoch=1, instanceId=null, 
> rackId=null, rebalanceTimeoutMs=30, subscribedTopicNames=[output-topic], 
> serverAssignor=null, topicP     
> artitions=[TopicPartitions(topicId=I5P5lIXvR1Cjc8hfoJg5bg, partitions=[0])]), 
> handler=org.apache.kafka.clients.consumer.internals.NetworkClientDelegate$FutureCompletionHandler@918925b,
>  node=Optional[worker4:9092 (id: 2147483644 rack: null)]     , 
> timer=org.apache.kafka.common.utils.Timer@55ed4733} 
> (org.apache.kafka.clients.consumer.internals.NetworkClientDelegate) {code}
> This seems to be triggered by a tight poll loop of the network thread.  The 
> right thing to do is to backoff a bit for that given node and retry later.
> This should be a blocker for 3.8 release.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Assigned] (KAFKA-16623) KafkaAsyncConsumer system tests warn about revoking partitions that weren't previously assigned

2024-05-22 Thread Jon Chiu (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-16623?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Jon Chiu reassigned KAFKA-16623:


Assignee: Lianet Magrans  (was: Kirk True)

> KafkaAsyncConsumer system tests warn about revoking partitions that weren't 
> previously assigned
> ---
>
> Key: KAFKA-16623
> URL: https://issues.apache.org/jira/browse/KAFKA-16623
> Project: Kafka
>  Issue Type: Bug
>  Components: clients, consumer, system tests
>Affects Versions: 3.8.0
>Reporter: Kirk True
>Assignee: Lianet Magrans
>Priority: Critical
>  Labels: consumer-threading-refactor, kip-848-client-support, 
> system-tests
> Fix For: 3.8.0
>
>
> When running system tests for the KafkaAsyncConsumer, we occasionally see 
> this warning:
> {noformat}
>   File "/usr/lib/python3.7/threading.py", line 917, in _bootstrap_inner
> self.run()
>   File "/usr/lib/python3.7/threading.py", line 865, in run
> self._target(*self._args, **self._kwargs)
>   File 
> "/home/jenkins/workspace/system-test-kafka-branch-builder/kafka/venv/lib/python3.7/site-packages/ducktape/services/background_thread.py",
>  line 38, in _protected_worker
> self._worker(idx, node)
>   File 
> "/home/jenkins/workspace/system-test-kafka-branch-builder/kafka/tests/kafkatest/services/verifiable_consumer.py",
>  line 304, in _worker
> handler.handle_partitions_revoked(event, node, self.logger)
>   File 
> "/home/jenkins/workspace/system-test-kafka-branch-builder/kafka/tests/kafkatest/services/verifiable_consumer.py",
>  line 163, in handle_partitions_revoked
> (tp, node.account.hostname)
> AssertionError: Topic partition TopicPartition(topic='test_topic', 
> partition=0) cannot be revoked from worker20 as it was not previously 
> assigned to that consumer
> {noformat}
> In test_fencing_static_consumer, there are two sets of consumers that use 
> group instance IDs: the initial set and the "conflict" set. It appears that 
> one of the "conflicting" consumers hijacks the partition ownership from the 
> coordinator's perspective when the initial consumer leaves the group.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


Re: [PR] MINOR: Fix rate metric spikes [kafka]

2024-05-22 Thread via GitHub


emitskevich-blp commented on code in PR #15889:
URL: https://github.com/apache/kafka/pull/15889#discussion_r1610656953


##
clients/src/main/java/org/apache/kafka/common/metrics/stats/SampledStat.java:
##
@@ -50,10 +50,13 @@ public void record(MetricConfig config, double value, long 
timeMs) {
 sample = advance(config, timeMs);
 update(sample, config, value, timeMs);
 sample.eventCount += 1;
+sample.lastEventMs = timeMs;
 }
 
 private Sample advance(MetricConfig config, long timeMs) {
-this.current = (this.current + 1) % config.samples();
+// need to keep one extra sample (see purgeObsoleteSamples() logic)

Review Comment:
   Addressed, but I slightly changed the suggestion, please check if it's 
readable. Because technically this extra space is not to remember the last 
recording time, it's to keep the entire extra sample with all collected data.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] MINOR: Fix rate metric spikes [kafka]

2024-05-22 Thread via GitHub


emitskevich-blp commented on code in PR #15889:
URL: https://github.com/apache/kafka/pull/15889#discussion_r1610656953


##
clients/src/main/java/org/apache/kafka/common/metrics/stats/SampledStat.java:
##
@@ -50,10 +50,13 @@ public void record(MetricConfig config, double value, long 
timeMs) {
 sample = advance(config, timeMs);
 update(sample, config, value, timeMs);
 sample.eventCount += 1;
+sample.lastEventMs = timeMs;
 }
 
 private Sample advance(MetricConfig config, long timeMs) {
-this.current = (this.current + 1) % config.samples();
+// need to keep one extra sample (see purgeObsoleteSamples() logic)

Review Comment:
   I slightly changed the suggestion, please check if it's readable. Because 
technically this extra space is not to remember the last recording time, it's 
to keep the entire extra sample with all collected data.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[jira] [Updated] (KAFKA-16639) AsyncKafkaConsumer#close does not send heartbeat to leave group

2024-05-22 Thread Kirk True (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-16639?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Kirk True updated KAFKA-16639:
--
Priority: Critical  (was: Major)

> AsyncKafkaConsumer#close does not send heartbeat to leave group
> ---
>
> Key: KAFKA-16639
> URL: https://issues.apache.org/jira/browse/KAFKA-16639
> Project: Kafka
>  Issue Type: Bug
>  Components: clients, consumer
>Reporter: Chia-Ping Tsai
>Assignee: Philip Nee
>Priority: Critical
>  Labels: kip-848-client-support
> Fix For: 3.8.0
>
>
> This bug can be reproduced by immediately closing a consumer which is just 
> created.
> The root cause is that we skip the new heartbeat used to leave group when 
> there is a in-flight heartbeat request 
> ([https://github.com/apache/kafka/blob/5de5d967adffd864bad3ec729760a430253abf38/clients/src/main/java/org/apache/kafka/clients/consumer/internals/HeartbeatRequestManager.java#L212).]
> It seems to me the simple solution is that we create a heartbeat request when 
> meeting above situation and then send it by pollOnClose 
> ([https://github.com/apache/kafka/blob/5de5d967adffd864bad3ec729760a430253abf38/clients/src/main/java/org/apache/kafka/clients/consumer/internals/RequestManager.java#L62).]
>  



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


Re: [PR] MINOR: Fix rate metric spikes [kafka]

2024-05-22 Thread via GitHub


emitskevich-blp commented on code in PR #15889:
URL: https://github.com/apache/kafka/pull/15889#discussion_r1610653813


##
clients/src/test/java/org/apache/kafka/common/metrics/stats/SampledStatTest.java:
##
@@ -0,0 +1,110 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.common.metrics.stats;
+
+import org.apache.kafka.common.metrics.MetricConfig;
+import org.apache.kafka.common.utils.MockTime;
+import org.apache.kafka.common.utils.Time;
+import org.junit.jupiter.api.BeforeEach;
+import org.junit.jupiter.api.DisplayName;
+import org.junit.jupiter.api.Test;
+
+import java.util.List;
+
+import static java.util.concurrent.TimeUnit.SECONDS;
+import static org.junit.jupiter.api.Assertions.assertEquals;
+
+class SampledStatTest {
+
+private SampledStat stat;
+private Time time;
+
+@BeforeEach
+public void setup() {
+stat = new SampleCount(0);
+time = new MockTime();
+}
+
+@Test
+@DisplayName("Sample should be purged if doesn't overlap the window")
+public void testSampleIsPurgedIfDoesntOverlap() {
+MetricConfig config = new MetricConfig().timeWindow(1, 
SECONDS).samples(2);
+
+// Monitored window: 2s. Complete a sample and wait 2.5s after.
+completeSample(config);
+time.sleep(2500);
+
+double numSamples = stat.measure(config, time.milliseconds());
+assertEquals(0, numSamples);
+}
+
+@Test
+@DisplayName("Sample should be kept if overlaps the window")
+public void testSampleIsKeptIfOverlaps() {
+MetricConfig config = new MetricConfig().timeWindow(1, 
SECONDS).samples(2);
+
+// Monitored window: 2s. Complete a sample and wait 1.5s after.
+completeSample(config);
+time.sleep(1500);
+
+double numSamples = stat.measure(config, time.milliseconds());
+assertEquals(1, numSamples);
+}
+
+@Test
+@DisplayName("Sample should be kept if overlaps the window and is n+1")
+public void testSampleIsKeptIfOverlapsAndExtra() {
+MetricConfig config = new MetricConfig().timeWindow(1, 
SECONDS).samples(2);
+
+// Monitored window: 2s. Create 2 samples with gaps in between and
+// take a measurement at 2.2s from the start.
+completeSample(config);
+time.sleep(100);
+completeSample(config);
+time.sleep(100);
+stat.record(config, 1, time.milliseconds());
+
+double numSamples = stat.measure(config, time.milliseconds());
+assertEquals(3, numSamples);
+}
+
+// Creates a sample with events at the start and at the end. Positions 
clock at the end.
+private void completeSample(MetricConfig config) {
+stat.record(config, 1, time.milliseconds());
+time.sleep(config.timeWindowMs() - 1);
+stat.record(config, 1, time.milliseconds());
+time.sleep(1);
+}
+
+// measure() of this impl returns the number of samples
+static class SampleCount extends SampledStat {
+
+SampleCount(double initialValue) {

Review Comment:
   Applied



##
clients/src/main/java/org/apache/kafka/common/metrics/stats/SampledStat.java:
##
@@ -40,7 +40,7 @@ public abstract class SampledStat implements MeasurableStat {
 
 public SampledStat(double initialValue) {
 this.initialValue = initialValue;
-this.samples = new ArrayList<>(2);
+this.samples = new ArrayList<>(3);

Review Comment:
   Addressed



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[PR] KAFKA-15045: (KIP-924 pt. 9) TaskAssignmentUtils implementation of optimizeRackAwareActiveTasks [kafka]

2024-05-22 Thread via GitHub


apourchet opened a new pull request, #16033:
URL: https://github.com/apache/kafka/pull/16033

   This PR implements the rack aware optimization of active tasks that can be 
used by the assignors themselves. It takes in the full output of the assignment 
and tries to reorganize it so as to minimize traffic and non-overlap costs.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-16252; Dynamic KRaft network manager and channel [kafka]

2024-05-22 Thread via GitHub


jsancio commented on code in PR #15986:
URL: https://github.com/apache/kafka/pull/15986#discussion_r1610641974


##
core/src/test/scala/unit/kafka/server/KafkaConfigTest.scala:
##
@@ -1366,6 +1368,26 @@ class KafkaConfigTest {
 assertEquals(expectedVoters, addresses)
   }
 
+  @Test
+  def testParseQuorumBootstrapServers(): Unit = {

Review Comment:
   Okay. It does look like there are bugs in the in the implementation of 
`Utils.getHost` and `Utils.getPort`. I don't really want to fix them in this 
PR. I created this issue: https://issues.apache.org/jira/browse/KAFKA-16824



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[jira] [Created] (KAFKA-16824) Utils.getHost and Utils.getPort do not catch a lot of invalid host and ports

2024-05-22 Thread Jira
José Armando García Sancio created KAFKA-16824:
--

 Summary: Utils.getHost and Utils.getPort do not catch a lot of 
invalid host and ports
 Key: KAFKA-16824
 URL: https://issues.apache.org/jira/browse/KAFKA-16824
 Project: Kafka
  Issue Type: Bug
Reporter: José Armando García Sancio


For example it is not able to detect at least this malformed hosts and ports:
 # ho(st:9092
 # host:-92



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[PR] KAFKA-16737: Enable unit tests for new consumer & cleanup TODOs [kafka]

2024-05-22 Thread via GitHub


lianetm opened a new pull request, #16032:
URL: https://github.com/apache/kafka/pull/16032

   KafkaConsumerTest contained lots of unit tests that were not enabled for the 
new consumer, with TODOs for verifying if they cold be enabled. This PR enables 
all the unit tests that could be applied/pass for the new consumer (~16 unit 
tests enabled) and cleans up the TODOs.
   
   The tests that remain being applied to the legacy consumer only cannot run 
for the new consumer, due to:
   
   1. bugs in the new consumer. Only 5 tests in this case, for those I left 
TODOs just to reference the Jira I filed to make sure the tests are enabled as 
soon as the bugs are fixed (only TODOs intentionally left)
   2. the way the test is written or what it is testing is specific to the 
legacy consumer.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-8206: Allow client to rebootstrap [kafka]

2024-05-22 Thread via GitHub


gharris1727 commented on code in PR #13277:
URL: https://github.com/apache/kafka/pull/13277#discussion_r1610617003


##
clients/src/main/java/org/apache/kafka/clients/CommonClientConfigs.java:
##
@@ -219,6 +219,19 @@ public class CommonClientConfigs {
 public static final String DEFAULT_API_TIMEOUT_MS_DOC = "Specifies the 
timeout (in milliseconds) for client APIs. " +
 "This configuration is used as the default timeout for all client 
operations that do not specify a timeout parameter.";
 
+public static final String METADATA_RECOVERY_STRATEGY_CONFIG = 
"metadata.recovery.strategy";
+public static final String METADATA_RECOVERY_STRATEGY_DOC = "Controls how 
the client recovers when none of the brokers known to it is available. " +
+"If set to none, the client fails. If set to 
rebootstrap, " +
+"the client repeats the bootstrap process using 
bootstrap.servers. " +
+"Rebootstrapping is useful when a client communicates with brokers 
so infrequently " +
+"that the set of brokers may change entirely before the client 
refreshes metadata. " +
+"Opportunities to rebootstrapping depend on connection 
establishing and reconnect timeouts and the broker count. " +
+"The timeouts may prevent identifying brokers as unavailable 
simultaneously, which is necessary to trigger rebootstrapping. " +

Review Comment:
   This section is very confusing for me, and I think this should be reworded.
   
   Maybe it can flow like this: "Metadata recovery is triggered when all 
last-known brokers appear unavailable simultaneously. Brokers appear 
unavailable when disconnected and no current retry attempt is in-progress."



##
clients/src/main/java/org/apache/kafka/clients/NetworkClient.java:
##


Review Comment:
   Stuff in this package is not included in the public API: 
https://kafka.apache.org/37/javadoc/index.html only the stuff in 
admin/producer/consumer (and not in sub-packages) is truly public.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[jira] [Created] (KAFKA-16823) Extract LegacyConsumer-specific unit tests from generic KafkaConsumerTest

2024-05-22 Thread Lianet Magrans (Jira)
Lianet Magrans created KAFKA-16823:
--

 Summary: Extract LegacyConsumer-specific unit tests from generic 
KafkaConsumerTest 
 Key: KAFKA-16823
 URL: https://issues.apache.org/jira/browse/KAFKA-16823
 Project: Kafka
  Issue Type: Task
  Components: clients, consumer
Reporter: Lianet Magrans


Currently the KafkaConsumerTest file contains unit tests that apply to both 
consumer implementations, but also tests that apply to the legacy consumer 
only. We should consider splitting the tests that apply to the legacy only into 
their own LegacyConsumerTest file (aligning with the existing 
AsyncKafkaConsumerTest). End result would be: 

KafkaConsumerTest -> unit tests that apply to both consumers. 

LegacyKafkaConsumerTest -> unit tests that apply only to the 
LegacyKafkaConsumer, either because of the logic they test, or the way they are 
written (file to be created with this task)

AsyncKafkaConsumerTest -> unit tests that apply only to the AsyncKafkaConsumer 
(this file already exist)

 



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


Re: [PR] KAFKA-10551: Add topic id support to produce request and response [kafka]

2024-05-22 Thread via GitHub


jolshan commented on code in PR #15968:
URL: https://github.com/apache/kafka/pull/15968#discussion_r1610605263


##
core/src/main/scala/kafka/server/ReplicaManager.scala:
##
@@ -1361,10 +1366,10 @@ class ReplicaManager(val config: KafkaConfig,
*/
   private def appendToLocalLog(internalTopicsAllowed: Boolean,
origin: AppendOrigin,
-   entriesPerPartition: Map[TopicPartition, 
MemoryRecords],
+   entriesPerPartition: Map[TopicIdPartition, 
MemoryRecords],

Review Comment:
   Ok -- once we start using these across the log layer it makes sense. 



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] MINOR: Fix rate metric spikes [kafka]

2024-05-22 Thread via GitHub


junrao commented on code in PR #15889:
URL: https://github.com/apache/kafka/pull/15889#discussion_r1610602287


##
clients/src/main/java/org/apache/kafka/common/metrics/stats/SampledStat.java:
##
@@ -106,44 +109,51 @@ public String toString() {
 
 public abstract double combine(List samples, MetricConfig config, 
long now);
 
-/* Timeout any windows that have expired in the absence of any events */
+// purge any samples that lack observed events within the monitored window
 protected void purgeObsoleteSamples(MetricConfig config, long now) {
 long expireAge = config.samples() * config.timeWindowMs();
 for (Sample sample : samples) {
-if (now - sample.lastWindowMs >= expireAge)
+// samples overlapping the monitored window are kept,
+// even if they started before it
+if (now - sample.lastEventMs >= expireAge) {
 sample.reset(now);
+}
 }
 }
 
 protected static class Sample {
 public double initialValue;
 public long eventCount;
-public long lastWindowMs;
+public long startTimeMs;
+public long lastEventMs;
 public double value;
 
 public Sample(double initialValue, long now) {
 this.initialValue = initialValue;
 this.eventCount = 0;
-this.lastWindowMs = now;
+this.startTimeMs = now;
+this.lastEventMs = now;
 this.value = initialValue;
 }
 
 public void reset(long now) {
 this.eventCount = 0;
-this.lastWindowMs = now;
+this.startTimeMs = now;
+this.lastEventMs = now;
 this.value = initialValue;
 }
 
 public boolean isComplete(long timeMs, MetricConfig config) {
-return timeMs - lastWindowMs >= config.timeWindowMs() || 
eventCount >= config.eventWindow();
+return timeMs - startTimeMs >= config.timeWindowMs() || eventCount 
>= config.eventWindow();

Review Comment:
   > in this case, totalElapsedTimeMs will be equal to (config.samples() - 1) * 
timeWindowMs, and then we will get smaller measure due to the larger denominator
   
   @chia7712: Could you explain your point a bit more? In the current design, 
we always ensure at least `(config.samples() - 1) * timeWindowMs` in 
`totalElapsedTimeMs`. This is to avoid the potential extreme large measured 
value when the accumulated sample window is small (e.g. when the metric is 
first recorded).



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-15045: (KIP-924 pt. 7) Simplify requirements for rack aware graphs [kafka]

2024-05-22 Thread via GitHub


ableegoldman merged PR #16004:
URL: https://github.com/apache/kafka/pull/16004


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-15045: (KIP-924 pt. 5) Add rack information to ApplicationState [kafka]

2024-05-22 Thread via GitHub


ableegoldman merged PR #15972:
URL: https://github.com/apache/kafka/pull/15972


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[jira] [Updated] (KAFKA-16819) CoordinatorRequestManager seems to return 0ms during the coordinator discovery

2024-05-22 Thread Kirk True (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-16819?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Kirk True updated KAFKA-16819:
--
Labels: consumer-threading-refactor  (was: )

> CoordinatorRequestManager seems to return 0ms during the coordinator discovery
> --
>
> Key: KAFKA-16819
> URL: https://issues.apache.org/jira/browse/KAFKA-16819
> Project: Kafka
>  Issue Type: Bug
>  Components: clients, consumer
>Reporter: Philip Nee
>Assignee: Philip Nee
>Priority: Minor
>  Labels: consumer-threading-refactor
>
> In KAFKA-15250 we discovered the ConsumerNetworkThread is looping without 
> much backoff.  The in-flight check PR fixed a lot of it; however, during the 
> coordinator discovery phase, CoordinatorRequestManager would keep on 
> returning 0 before the coordinator node was found.
>  
> The impact is minor but we should be expecting the coordinator manager to 
> backoff until the exp backoff expired (so it should return around 100ms).



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Updated] (KAFKA-16819) CoordinatorRequestManager seems to return 0ms during the coordinator discovery

2024-05-22 Thread Kirk True (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-16819?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Kirk True updated KAFKA-16819:
--
Component/s: clients

> CoordinatorRequestManager seems to return 0ms during the coordinator discovery
> --
>
> Key: KAFKA-16819
> URL: https://issues.apache.org/jira/browse/KAFKA-16819
> Project: Kafka
>  Issue Type: Bug
>  Components: clients, consumer
>Reporter: Philip Nee
>Assignee: Philip Nee
>Priority: Minor
>
> In KAFKA-15250 we discovered the ConsumerNetworkThread is looping without 
> much backoff.  The in-flight check PR fixed a lot of it; however, during the 
> coordinator discovery phase, CoordinatorRequestManager would keep on 
> returning 0 before the coordinator node was found.
>  
> The impact is minor but we should be expecting the coordinator manager to 
> backoff until the exp backoff expired (so it should return around 100ms).



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


Re: [PR] KAFKA-16452: Don't throw OOORE when converting the offset to metadata [kafka]

2024-05-22 Thread via GitHub


junrao commented on code in PR #15825:
URL: https://github.com/apache/kafka/pull/15825#discussion_r1610572095


##
core/src/main/scala/kafka/server/DelayedFetch.scala:
##
@@ -91,19 +91,23 @@ class DelayedFetch(
 // Go directly to the check for Case G if the message offsets are 
the same. If the log segment
 // has just rolled, then the high watermark offset will remain the 
same but be on the old segment,
 // which would incorrectly be seen as an instance of Case F.
-if (endOffset.messageOffset != fetchOffset.messageOffset) {
-  if (endOffset.onOlderSegment(fetchOffset)) {
-// Case F, this can happen when the new fetch operation is on 
a truncated leader
-debug(s"Satisfying fetch $this since it is fetching later 
segments of partition $topicIdPartition.")
-return forceComplete()
+if (fetchOffset.messageOffset > endOffset.messageOffset) {
+  // Case F, this can happen when the new fetch operation is on a 
truncated leader
+  debug(s"Satisfying fetch $this since it is fetching later 
segments of partition $topicIdPartition.")
+  return forceComplete()
+} else if (fetchOffset.messageOffset < endOffset.messageOffset) {
+  if (fetchOffset.messageOffsetOnly() || 
endOffset.messageOffsetOnly()) {

Review Comment:
   Could we fold this into the condition above?



##
core/src/test/scala/integration/kafka/server/DelayedFetchTest.scala:
##
@@ -164,18 +169,71 @@ class DelayedFetchTest {
 assertTrue(delayedFetch.tryComplete())
 assertTrue(delayedFetch.isCompleted)
 assertTrue(fetchResultOpt.isDefined)
+
+val fetchResult = fetchResultOpt.get
+assertEquals(Errors.NONE, fetchResult.error)
+  }
+
+  @ParameterizedTest(name = "testDelayedFetchWithMessageOnlyHighWatermark 
endOffset={0}")
+  @ValueSource(longs = Array(0, 500))
+  def testDelayedFetchWithMessageOnlyHighWatermark(endOffset: Long): Unit = {
+val topicIdPartition = new TopicIdPartition(Uuid.randomUuid(), 0, "topic")
+val fetchOffset = 450L
+val logStartOffset = 5L
+val currentLeaderEpoch = Optional.of[Integer](10)
+val replicaId = 1
+
+val fetchStatus = FetchPartitionStatus(
+  startOffsetMetadata = new LogOffsetMetadata(fetchOffset),
+  fetchInfo = new FetchRequest.PartitionData(topicIdPartition.topicId, 
fetchOffset, logStartOffset, maxBytes, currentLeaderEpoch))
+val fetchParams = buildFollowerFetchParams(replicaId, maxWaitMs = 500)
+
+var fetchResultOpt: Option[FetchPartitionData] = None
+def callback(responses: Seq[(TopicIdPartition, FetchPartitionData)]): Unit 
= {
+  fetchResultOpt = Some(responses.head._2)
+}
+
+val delayedFetch = new DelayedFetch(
+  params = fetchParams,
+  fetchPartitionStatus = Seq(topicIdPartition -> fetchStatus),
+  replicaManager = replicaManager,
+  quota = replicaQuota,
+  responseCallback = callback
+)
+
+val partition: Partition = mock(classOf[Partition])
+
when(replicaManager.getPartitionOrException(topicIdPartition.topicPartition)).thenReturn(partition)
+// Note that the high-watermark does not contain the complete metadata
+val endOffsetMetadata = new LogOffsetMetadata(endOffset, -1L, -1)
+when(partition.fetchOffsetSnapshot(
+  currentLeaderEpoch,
+  fetchOnlyFromLeader = true))
+  .thenReturn(new LogOffsetSnapshot(0L, endOffsetMetadata, 
endOffsetMetadata, endOffsetMetadata))
+when(replicaManager.isAddingReplica(any(), anyInt())).thenReturn(false)
+expectReadFromReplica(fetchParams, topicIdPartition, 
fetchStatus.fetchInfo, Errors.NONE)
+
+// 1. When `endOffset` is 0, it refers to the truncation case
+// 2. When `endOffset` is 500, it refers to the normal case
+val expected = endOffset == 0

Review Comment:
   Hmm, in both cases, we are not forcing the completion of the delayed fetch, 
right?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-15713: KRaft support in AclCommandTest [kafka]

2024-05-22 Thread via GitHub


chia7712 commented on code in PR #15830:
URL: https://github.com/apache/kafka/pull/15830#discussion_r1610529172


##
core/src/test/scala/unit/kafka/admin/AclCommandTest.scala:
##
@@ -122,19 +128,27 @@ class AclCommandTest extends QuorumTestHarness with 
Logging {
 super.tearDown()
   }
 
+  override protected def kraftControllerConfigs(): Seq[Properties] = {
+val controllerConfig = new Properties
+controllerConfig.put(KafkaConfig.AuthorizerClassNameProp, 
classOf[StandardAuthorizer].getName)
+controllerConfig.put(StandardAuthorizer.SUPER_USERS_CONFIG, 
"User:ANONYMOUS")
+Seq(controllerConfig)
+  }
+
   @Test
   def testAclCliWithAuthorizer(): Unit = {
 testAclCli(zkArgs)
   }
 
-  @Test
-  def testAclCliWithAdminAPI(): Unit = {
+  @ParameterizedTest
+  @ValueSource(strings = Array("zk"))

Review Comment:
   Maybe we can create `AclCommand.AdminClientService` to test its method 
`listAcls` with retry instead of grabbing the output from `main` ?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[jira] [Created] (KAFKA-16822) Abstract consumer group in coordinator to share functionality with share group

2024-05-22 Thread Apoorv Mittal (Jira)
Apoorv Mittal created KAFKA-16822:
-

 Summary: Abstract consumer group in coordinator to share 
functionality with share group
 Key: KAFKA-16822
 URL: https://issues.apache.org/jira/browse/KAFKA-16822
 Project: Kafka
  Issue Type: Sub-task
Reporter: Apoorv Mittal
Assignee: Apoorv Mittal






--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Updated] (KAFKA-16766) New consumer offsetsForTimes timeout exception does not have the proper message

2024-05-22 Thread Lianet Magrans (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-16766?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Lianet Magrans updated KAFKA-16766:
---
Description: 
If a call to consumer.offsetsForTimes times out, the new AsyncKafkaConsumer 
will throw a org.apache.kafka.common.errors.TimeoutException as expected, but 
with the following as message: "java.util.concurrent.TimeoutException".

We should provide a clearer message, and I would even say we keep the same 
message that the LegacyConsumer shows in this case, ex: "Failed to get offsets 
by times in 6ms".

To fix this we should consider catching the timeout exception in the consumer 
when offsetsForTimes result times out 
([here|https://github.com/apache/kafka/blob/0e023e1f736ea03568032cc282803df8e61eb451/clients/src/main/java/org/apache/kafka/clients/consumer/internals/AsyncKafkaConsumer.java#L1115]),
 and propagate it with the message specific to offsetsForTimes.

Same situation exists for beginningOffsets and endOffsets. All 3 funcs show the 
same timeout message in the LegacyConsumer (defined 
[here|https://github.com/apache/kafka/blob/0e023e1f736ea03568032cc282803df8e61eb451/clients/src/main/java/org/apache/kafka/clients/consumer/internals/OffsetFetcher.java#L182]),
 but do not have a clear message in the Async, so we should fix them all 3.

With the fix, we should write tests for each func, like the ones defined for 
the Legacy Consumer 
([here|https://github.com/apache/kafka/blob/0e023e1f736ea03568032cc282803df8e61eb451/clients/src/test/java/org/apache/kafka/clients/consumer/KafkaConsumerTest.java#L3244-L3276]).
 Note that we would need different tests, added to AsyncKafkaConsumerTest, 
given that the AsyncKafkaConsumer issues a FindCoordinator request in this case 
(on manager poll), but the LegacyKafkaConsumer does not, so it does not account 
for that when matching requests/responses in the current tests.

  was:
If a call to consumer.offsetsForTimes times out, the new AsyncKafkaConsumer 
will throw a org.apache.kafka.common.errors.TimeoutException as expected, but 
with the following as message: "java.util.concurrent.TimeoutException". 

We should provide a clearer message, and I would even say we keep the same 
message that the LegacyConsumer shows in this case, ex: "Failed to get offsets 
by times in 6ms".

To fix this we should consider catching the timeout exception in the consumer 
when offsetsForTimes result times out 
([here|https://github.com/apache/kafka/blob/0e023e1f736ea03568032cc282803df8e61eb451/clients/src/main/java/org/apache/kafka/clients/consumer/internals/AsyncKafkaConsumer.java#L1115]),
 and propagate it with the message specific to offsetsForTimes.

Same situation exists for beginningOffsets and endOffsets. All 3 funcs show the 
same timeout message in the LegacyConsumer (defined 
[here|https://github.com/apache/kafka/blob/0e023e1f736ea03568032cc282803df8e61eb451/clients/src/main/java/org/apache/kafka/clients/consumer/internals/OffsetFetcher.java#L182]),
 but do not have a clear message in the Async, so we should fix them all 3.

With the fix, we should write tests for each func, like the ones defined for 
the Legacy Consumer 
([here|https://github.com/apache/kafka/blob/0e023e1f736ea03568032cc282803df8e61eb451/clients/src/test/java/org/apache/kafka/clients/consumer/KafkaConsumerTest.java#L3244-L3276]).
 Note that we would need different tests, added to AsyncKafkaConsumerTest, 
given that the async consumer issues a FindCoordinator request in this case, 
but the AsyncConsumer does, so it does not account for that when matching 
requests/responses in the current tests.


> New consumer offsetsForTimes timeout exception does not have the proper 
> message
> ---
>
> Key: KAFKA-16766
> URL: https://issues.apache.org/jira/browse/KAFKA-16766
> Project: Kafka
>  Issue Type: Bug
>  Components: clients, consumer
>Affects Versions: 3.7.0
>Reporter: Lianet Magrans
>Priority: Major
>  Labels: kip-848-client-support
> Fix For: 3.8.0
>
>
> If a call to consumer.offsetsForTimes times out, the new AsyncKafkaConsumer 
> will throw a org.apache.kafka.common.errors.TimeoutException as expected, but 
> with the following as message: "java.util.concurrent.TimeoutException".
> We should provide a clearer message, and I would even say we keep the same 
> message that the LegacyConsumer shows in this case, ex: "Failed to get 
> offsets by times in 6ms".
> To fix this we should consider catching the timeout exception in the consumer 
> when offsetsForTimes result times out 
> ([here|https://github.com/apache/kafka/blob/0e023e1f736ea03568032cc282803df8e61eb451/clients/src/main/java/org/apache/kafka/clients/consumer/internals/AsyncKafkaConsumer.java#L1115]),
>  and propagate it with the message 

Re: [PR] MINOR: Fix rate metric spikes [kafka]

2024-05-22 Thread via GitHub


chia7712 commented on code in PR #15889:
URL: https://github.com/apache/kafka/pull/15889#discussion_r1610336329


##
clients/src/main/java/org/apache/kafka/common/metrics/stats/SampledStat.java:
##
@@ -40,7 +40,7 @@ public abstract class SampledStat implements MeasurableStat {
 
 public SampledStat(double initialValue) {
 this.initialValue = initialValue;
-this.samples = new ArrayList<>(2);
+this.samples = new ArrayList<>(3);

Review Comment:
   It would be nice to add comments for this magic number :)



##
clients/src/main/java/org/apache/kafka/common/metrics/stats/SampledStat.java:
##
@@ -106,44 +109,51 @@ public String toString() {
 
 public abstract double combine(List samples, MetricConfig config, 
long now);
 
-/* Timeout any windows that have expired in the absence of any events */
+// purge any samples that lack observed events within the monitored window
 protected void purgeObsoleteSamples(MetricConfig config, long now) {
 long expireAge = config.samples() * config.timeWindowMs();
 for (Sample sample : samples) {
-if (now - sample.lastWindowMs >= expireAge)
+// samples overlapping the monitored window are kept,
+// even if they started before it
+if (now - sample.lastEventMs >= expireAge) {
 sample.reset(now);
+}
 }
 }
 
 protected static class Sample {
 public double initialValue;
 public long eventCount;
-public long lastWindowMs;
+public long startTimeMs;
+public long lastEventMs;
 public double value;
 
 public Sample(double initialValue, long now) {
 this.initialValue = initialValue;
 this.eventCount = 0;
-this.lastWindowMs = now;
+this.startTimeMs = now;
+this.lastEventMs = now;
 this.value = initialValue;
 }
 
 public void reset(long now) {
 this.eventCount = 0;
-this.lastWindowMs = now;
+this.startTimeMs = now;
+this.lastEventMs = now;
 this.value = initialValue;
 }
 
 public boolean isComplete(long timeMs, MetricConfig config) {
-return timeMs - lastWindowMs >= config.timeWindowMs() || 
eventCount >= config.eventWindow();
+return timeMs - startTimeMs >= config.timeWindowMs() || eventCount 
>= config.eventWindow();

Review Comment:
   This is unrelated to this PR, but it seems `eventCount` could be another 
issue if we set a non-maximum value. For example, all samples are within a 
single `timeWindowMs` due to a bunch of records. in this case, 
`totalElapsedTimeMs` will be equal to `(config.samples() - 1) * timeWindowMs`, 
and then we will get smaller measure due to the larger denominator
   
   Maybe we can remove `eventWindow` as it is unused in production. This can be 
another PR if it is valid



##
clients/src/main/java/org/apache/kafka/common/metrics/stats/SampledStat.java:
##
@@ -50,10 +50,13 @@ public void record(MetricConfig config, double value, long 
timeMs) {
 sample = advance(config, timeMs);
 update(sample, config, value, timeMs);
 sample.eventCount += 1;
+sample.lastEventMs = timeMs;
 }
 
 private Sample advance(MetricConfig config, long timeMs) {
-this.current = (this.current + 1) % config.samples();
+// need to keep one extra sample (see purgeObsoleteSamples() logic)

Review Comment:
   How about saying "we have one extra sample to remember the last recording 
time in order to keep the overlapping sample (see purgeObsoleteSamples() logic)"



##
clients/src/test/java/org/apache/kafka/common/metrics/stats/SampledStatTest.java:
##
@@ -0,0 +1,110 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.common.metrics.stats;
+
+import org.apache.kafka.common.metrics.MetricConfig;
+import org.apache.kafka.common.utils.MockTime;
+import org.apache.kafka.common.utils.Time;
+import org.junit.jupiter.api.BeforeEach;
+import org.junit.jupiter.api.DisplayName;
+import org.junit.jupiter.api.Test;
+

Re: [PR] KAFKA-16308 [1/N]: Create FeatureVersion interface and add `--feature` flag and handling to StorageTool [kafka]

2024-05-22 Thread via GitHub


jolshan commented on code in PR #15685:
URL: https://github.com/apache/kafka/pull/15685#discussion_r1610507371


##
server-common/src/main/java/org/apache/kafka/server/common/TestFeatureVersion.java:
##
@@ -0,0 +1,56 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.server.common;
+
+import java.util.Collections;
+import java.util.Map;
+
+public enum TestFeatureVersion implements FeatureVersion {
+
+TEST_0(0, MetadataVersion.IBP_3_3_IV0, Collections.emptyMap()),

Review Comment:
   I tried this, but the logic for defaults breaks when we don't have a 0 
version. 
   ```
   val allFeaturesAndLevels: List[FeatureVersion] = allFeatures.map { 
feature =>
 val level: java.lang.Short = 
specifiedFeatures.getOrElse(feature.featureName, 
feature.defaultValue(metadataVersionForDefault))
 feature.fromFeatureLevel(level)
   }
   ```
   Here, I suppose we could do some work to filter out 0s and assume on the 
rest of the code that a missing value in the map = 0. If that is preferred I 
can do that instead, but I also don't think the 0 version necessarily hurts 
anything.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[jira] [Created] (KAFKA-16821) Create a new interface to store member metadata

2024-05-22 Thread Ritika Reddy (Jira)
Ritika Reddy created KAFKA-16821:


 Summary: Create a new interface to store member metadata
 Key: KAFKA-16821
 URL: https://issues.apache.org/jira/browse/KAFKA-16821
 Project: Kafka
  Issue Type: Sub-task
Reporter: Ritika Reddy
Assignee: Ritika Reddy
 Attachments: Screenshot 2024-05-14 at 11.03.10 AM.png

!Screenshot 2024-05-14 at 11.03.10 AM.png!



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


Re: [PR] KAFKA-16793: Heartbeat API for upgrading ConsumerGroup [kafka]

2024-05-22 Thread via GitHub


jeffkbkim commented on code in PR #15988:
URL: https://github.com/apache/kafka/pull/15988#discussion_r1610481770


##
group-coordinator/src/test/java/org/apache/kafka/coordinator/group/GroupMetadataManagerTest.java:
##
@@ -12401,6 +12436,363 @@ public void 
testClassicGroupSyncToConsumerGroupRebalanceInProgress() throws Exce
 .withProtocolName("range")
 .build())
 );
+context.assertJoinTimeout(groupId, memberId, 1);
+}
+
+@Test
+public void testClassicGroupHeartbeatToConsumerGroupMaintainsSession() 
throws Exception {
+String groupId = "group-id";
+String memberId = Uuid.randomUuid().toString();
+int sessionTimeout = 5000;
+
+List protocols = 
Collections.singletonList(
+new ConsumerGroupMemberMetadataValue.ClassicProtocol()
+.setName("range")
+
.setMetadata(Utils.toArray(ConsumerProtocol.serializeSubscription(
+new ConsumerPartitionAssignor.Subscription(
+Arrays.asList("foo"),
+null,
+Collections.emptyList()
+)
+)))
+);
+
+// Consumer group with a member using the classic protocol.
+GroupMetadataManagerTestContext context = new 
GroupMetadataManagerTestContext.Builder()
+.withAssignors(Collections.singletonList(new 
MockPartitionAssignor("range")))
+.withConsumerGroup(new ConsumerGroupBuilder(groupId, 10)
+.withMember(new ConsumerGroupMember.Builder(memberId)
+.setClassicMemberMetadata(
+new 
ConsumerGroupMemberMetadataValue.ClassicMemberMetadata()
+.setSessionTimeoutMs(sessionTimeout)
+.setSupportedProtocols(protocols)
+)
+.setMemberEpoch(10)
+.build()))
+.build();
+
+// Heartbeat to schedule the session timeout.
+HeartbeatRequestData request = new HeartbeatRequestData()
+.setGroupId(groupId)
+.setMemberId(memberId)
+.setGenerationId(10);
+context.sendClassicGroupHeartbeat(request);
+context.assertSessionTimeout(groupId, memberId, sessionTimeout);
+
+// Advance clock by 1/2 of session timeout.
+
GroupMetadataManagerTestContext.assertNoOrEmptyResult(context.sleep(sessionTimeout
 / 2));
+
+HeartbeatResponseData heartbeatResponse = 
context.sendClassicGroupHeartbeat(request).response();
+assertEquals(Errors.NONE.code(), heartbeatResponse.errorCode());
+context.assertSessionTimeout(groupId, memberId, sessionTimeout);
+
+// Advance clock by 1/2 of session timeout.
+
GroupMetadataManagerTestContext.assertNoOrEmptyResult(context.sleep(sessionTimeout
 / 2));
+
+heartbeatResponse = 
context.sendClassicGroupHeartbeat(request).response();
+assertEquals(Errors.NONE.code(), heartbeatResponse.errorCode());
+context.assertSessionTimeout(groupId, memberId, sessionTimeout);
+}
+
+@Test
+public void testClassicGroupHeartbeatToConsumerGroupRebalanceInProgress() 
throws Exception {
+String groupId = "group-id";
+String memberId1 = Uuid.randomUuid().toString();
+String memberId2 = Uuid.randomUuid().toString();
+String memberId3 = Uuid.randomUuid().toString();
+Uuid fooTopicId = Uuid.randomUuid();
+Uuid barTopicId = Uuid.randomUuid();
+int sessionTimeout = 5000;
+int rebalanceTimeout = 1;
+
+List protocols = 
Collections.singletonList(
+new ConsumerGroupMemberMetadataValue.ClassicProtocol()
+.setName("range")
+
.setMetadata(Utils.toArray(ConsumerProtocol.serializeSubscription(
+new ConsumerPartitionAssignor.Subscription(
+Arrays.asList("foo"),
+null,
+Collections.emptyList()
+)
+)))
+);
+
+// Member 1 has a member epoch smaller than the group epoch.
+ConsumerGroupMember member1 = new 
ConsumerGroupMember.Builder(memberId1)
+.setRebalanceTimeoutMs(rebalanceTimeout)
+.setClassicMemberMetadata(
+new ConsumerGroupMemberMetadataValue.ClassicMemberMetadata()
+.setSessionTimeoutMs(sessionTimeout)
+.setSupportedProtocols(protocols)
+)
+.setMemberEpoch(9)
+.build();
+
+// Member 2 has unrevoked partition.
+ConsumerGroupMember member2 = new 
ConsumerGroupMember.Builder(memberId2)
+.setState(MemberState.UNREVOKED_PARTITIONS)
+.setRebalanceTimeoutMs(rebalanceTimeout)
+

Re: [PR] KAFKA-16308 [1/N]: Create FeatureVersion interface and add `--feature` flag and handling to StorageTool [kafka]

2024-05-22 Thread via GitHub


jolshan commented on code in PR #15685:
URL: https://github.com/apache/kafka/pull/15685#discussion_r1610472137


##
server-common/src/main/java/org/apache/kafka/server/common/Features.java:
##
@@ -64,14 +63,16 @@ public enum Features {
 
 PRODUCTION_FEATURES = Arrays.stream(FEATURES).filter(feature ->
 feature.usedInProduction).collect(Collectors.toList());
+PRODUCTION_FEATURE_NAMES = PRODUCTION_FEATURES.stream().map(feature ->
+feature.name).collect(Collectors.toList());
 }
 
 public String featureName() {
 return name;
 }
 
 public FeatureVersion[] features() {

Review Comment:
   sure



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-16308 [1/N]: Create FeatureVersion interface and add `--feature` flag and handling to StorageTool [kafka]

2024-05-22 Thread via GitHub


jolshan commented on code in PR #15685:
URL: https://github.com/apache/kafka/pull/15685#discussion_r1610471028


##
server-common/src/main/java/org/apache/kafka/server/common/FeatureVersion.java:
##
@@ -0,0 +1,49 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.kafka.server.common;
+
+import java.util.Map;
+
+public interface FeatureVersion {
+
+/**
+ * The level of the feature. 0 means the feature is disabled.
+ */
+short featureLevel();
+
+/**
+ * The name of the feature.
+ */
+String featureName();
+
+/**
+ * The minimum metadata version which sets this feature version as 
default. When bootstrapping using only
+ * a metadata version, a reasonable default for all other features is 
chosen based on this value.
+ * This should be defined as the next metadata version to be released when 
the feature version becomes production ready.
+ * (Ie, if the current production MV is 17 when a feature version is 
released, its mapping should be to MV 18)

Review Comment:
   How do we know the release version when we create the feature. The release 
version can change, so should we change it as each new MV is added? I suppose 
you mean the latest MV when the feature is released.
   
   This is also what I did but adding 1 to every MV for the reasons I explained 
[here](https://github.com/apache/kafka/pull/15685#discussion_r1610248735)



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-16308 [1/N]: Create FeatureVersion interface and add `--feature` flag and handling to StorageTool [kafka]

2024-05-22 Thread via GitHub


jolshan commented on code in PR #15685:
URL: https://github.com/apache/kafka/pull/15685#discussion_r1610471028


##
server-common/src/main/java/org/apache/kafka/server/common/FeatureVersion.java:
##
@@ -0,0 +1,49 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.kafka.server.common;
+
+import java.util.Map;
+
+public interface FeatureVersion {
+
+/**
+ * The level of the feature. 0 means the feature is disabled.
+ */
+short featureLevel();
+
+/**
+ * The name of the feature.
+ */
+String featureName();
+
+/**
+ * The minimum metadata version which sets this feature version as 
default. When bootstrapping using only
+ * a metadata version, a reasonable default for all other features is 
chosen based on this value.
+ * This should be defined as the next metadata version to be released when 
the feature version becomes production ready.
+ * (Ie, if the current production MV is 17 when a feature version is 
released, its mapping should be to MV 18)

Review Comment:
   How do we know the release version when we create the feature. The release 
version can change, so should we change it as each new MV is added?
   
   This is also what I did but adding 1 to every MV for the reasons I explained 
[here](https://github.com/apache/kafka/pull/15685#discussion_r1610248735)



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-16793: Heartbeat API for upgrading ConsumerGroup [kafka]

2024-05-22 Thread via GitHub


jeffkbkim commented on code in PR #15988:
URL: https://github.com/apache/kafka/pull/15988#discussion_r1610471371


##
group-coordinator/src/test/java/org/apache/kafka/coordinator/group/GroupMetadataManagerTest.java:
##
@@ -12401,6 +12436,363 @@ public void 
testClassicGroupSyncToConsumerGroupRebalanceInProgress() throws Exce
 .withProtocolName("range")
 .build())
 );
+context.assertJoinTimeout(groupId, memberId, 1);
+}
+
+@Test
+public void testClassicGroupHeartbeatToConsumerGroupMaintainsSession() 
throws Exception {
+String groupId = "group-id";
+String memberId = Uuid.randomUuid().toString();
+int sessionTimeout = 5000;
+
+List protocols = 
Collections.singletonList(
+new ConsumerGroupMemberMetadataValue.ClassicProtocol()
+.setName("range")
+
.setMetadata(Utils.toArray(ConsumerProtocol.serializeSubscription(
+new ConsumerPartitionAssignor.Subscription(
+Arrays.asList("foo"),
+null,
+Collections.emptyList()
+)
+)))
+);
+
+// Consumer group with a member using the classic protocol.
+GroupMetadataManagerTestContext context = new 
GroupMetadataManagerTestContext.Builder()
+.withAssignors(Collections.singletonList(new 
MockPartitionAssignor("range")))
+.withConsumerGroup(new ConsumerGroupBuilder(groupId, 10)
+.withMember(new ConsumerGroupMember.Builder(memberId)
+.setClassicMemberMetadata(
+new 
ConsumerGroupMemberMetadataValue.ClassicMemberMetadata()
+.setSessionTimeoutMs(sessionTimeout)
+.setSupportedProtocols(protocols)
+)
+.setMemberEpoch(10)
+.build()))
+.build();
+
+// Heartbeat to schedule the session timeout.
+HeartbeatRequestData request = new HeartbeatRequestData()
+.setGroupId(groupId)
+.setMemberId(memberId)
+.setGenerationId(10);
+context.sendClassicGroupHeartbeat(request);
+context.assertSessionTimeout(groupId, memberId, sessionTimeout);
+
+// Advance clock by 1/2 of session timeout.
+
GroupMetadataManagerTestContext.assertNoOrEmptyResult(context.sleep(sessionTimeout
 / 2));
+
+HeartbeatResponseData heartbeatResponse = 
context.sendClassicGroupHeartbeat(request).response();
+assertEquals(Errors.NONE.code(), heartbeatResponse.errorCode());
+context.assertSessionTimeout(groupId, memberId, sessionTimeout);
+
+// Advance clock by 1/2 of session timeout.
+
GroupMetadataManagerTestContext.assertNoOrEmptyResult(context.sleep(sessionTimeout
 / 2));
+
+heartbeatResponse = 
context.sendClassicGroupHeartbeat(request).response();
+assertEquals(Errors.NONE.code(), heartbeatResponse.errorCode());
+context.assertSessionTimeout(groupId, memberId, sessionTimeout);
+}
+
+@Test
+public void testClassicGroupHeartbeatToConsumerGroupRebalanceInProgress() 
throws Exception {
+String groupId = "group-id";
+String memberId1 = Uuid.randomUuid().toString();
+String memberId2 = Uuid.randomUuid().toString();
+String memberId3 = Uuid.randomUuid().toString();
+Uuid fooTopicId = Uuid.randomUuid();
+Uuid barTopicId = Uuid.randomUuid();
+int sessionTimeout = 5000;
+int rebalanceTimeout = 1;
+
+List protocols = 
Collections.singletonList(
+new ConsumerGroupMemberMetadataValue.ClassicProtocol()
+.setName("range")
+
.setMetadata(Utils.toArray(ConsumerProtocol.serializeSubscription(
+new ConsumerPartitionAssignor.Subscription(
+Arrays.asList("foo"),
+null,
+Collections.emptyList()
+)
+)))
+);
+
+// Member 1 has a member epoch smaller than the group epoch.
+ConsumerGroupMember member1 = new 
ConsumerGroupMember.Builder(memberId1)
+.setRebalanceTimeoutMs(rebalanceTimeout)
+.setClassicMemberMetadata(
+new ConsumerGroupMemberMetadataValue.ClassicMemberMetadata()
+.setSessionTimeoutMs(sessionTimeout)
+.setSupportedProtocols(protocols)
+)
+.setMemberEpoch(9)
+.build();
+
+// Member 2 has unrevoked partition.
+ConsumerGroupMember member2 = new 
ConsumerGroupMember.Builder(memberId2)
+.setState(MemberState.UNREVOKED_PARTITIONS)
+.setRebalanceTimeoutMs(rebalanceTimeout)
+

[jira] [Updated] (KAFKA-16820) Kafka Broker fails to connect to Kraft Controller with no DNS matching

2024-05-22 Thread Arushi Helms (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-16820?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Arushi Helms updated KAFKA-16820:
-
Description: 
 

We are migrating our Kafka cluster from zookeeper to Kraft mode. We are running 
individual brokers and controllers with TLS enabled and IPs are given for 
communication. 
TLS enabled setup works fine among the brokers and the certificate looks 
something like:
{noformat}
Common Name: *.kafka.service.consul
Subject Alternative Names: *.kafka.service.consul, IP 
Address:10.87.171.84{noformat}
Note:
 * The DNS name for the node does not match the CN but since we are using IPs 
as communication, we have provided IPs as SAN.
 * Same with the controllers, IPs are given as SAN in the certificate. 
 * Issue is not related to the migration so just sharing configuration relevant 
for the TLS piece. 



In the current setup I am running 3 brokers and 3 controllers. 

Relevant controller configurations from one of the controllers:
{noformat}
KAFKA_CFG_PROCESS_ROLES=controller 
KAFKA_KRAFT_CLUSTER_ID=5kztjhJ4SxSu-kdiEYDUow
KAFKA_CFG_NODE_ID=6 
KAFKA_CFG_CONTROLLER_QUORUM_VOTERS=4@10.87.170.83:9097,5@10.87.170.9:9097,6@10.87.170.6:9097
 
KAFKA_CFG_CONTROLLER_LISTENER_NAMES=CONTROLLER 
KAFKA_CFG_INTER_BROKER_LISTENER_NAME=INSIDE_SSL
KAFKA_CFG_LISTENER_SECURITY_PROTOCOL_MAP=CONTROLLER:SSL,INSIDE_SSL:SSL 
KAFKA_CFG_LISTENERS=CONTROLLER://10.87.170.6:9097{noformat}
Relevant broker configuration from one of the brokers:
{noformat}
KAFKA_CFG_CONTROLLER_LISTENER_NAMES=CONTROLLER
KAFKA_CFG_INTER_BROKER_LISTENER_NAME=INSIDE_SSL
KAFKA_CFG_CONTROLLER_QUORUM_VOTERS=4@10.87.170.83:9097,5@10.87.170.9:9097,6@10.87.170.6:9097
 
KAFKA_CFG_PROCESS_ROLES=broker 
KAFKA_CFG_NODE_ID=3 
KAFKA_CFG_LISTENER_SECURITY_PROTOCOL_MAP=INSIDE_SSL:SSL,OUTSIDE_SSL:SSL,CONTROLLER:SSL
 
KAFKA_CFG_LISTENERS=INSIDE_SSL://10.87.170.78:9093,OUTSIDE_SSL://10.87.170.78:9096
 
KAFKA_CFG_ADVERTISED_LISTENERS=INSIDE_SSL://10.87.170.78:9093,OUTSIDE_SSL://10.87.170.78:9096{noformat}
 

ISSUE 1: 
With this setup Kafka broker is failing to connect to the controller, see the 
following error:
{noformat}
2024-05-22 17:53:46,413] ERROR 
[broker-2-to-controller-heartbeat-channel-manager]: Request 
BrokerRegistrationRequestData(brokerId=2, clusterId='5kztjhJ4SxSu-kdiEYDUow', 
incarnationId=7741fgH6T4SQqGsho8E6mw, listeners=[Listener(name='INSIDE_SSL', 
host='10.87.170.81', port=9093, securityProtocol=1), Listener(name='INSIDE', 
host='10.87.170.81', port=9094, securityProtocol=0), Listener(name='OUTSIDE', 
host='10.87.170.81', port=9092, securityProtocol=0), 
Listener(name='OUTSIDE_SSL', host='10.87.170.81', port=9096, 
securityProtocol=1)], features=[Feature(name='metadata.version', 
minSupportedVersion=1, maxSupportedVersion=19)], rack=null, 
isMigratingZkBroker=false, logDirs=[TJssfKDD-iBFYfIYCKOcew], 
previousBrokerEpoch=-1) failed due to authentication error with controller 
(kafka.server.NodeToControllerRequestThread)org.apache.kafka.common.errors.SslAuthenticationException:
 SSL handshake failedCaused by: javax.net.ssl.SSLHandshakeException: No subject 
alternative DNS name matching 
cp-internal-onecloud-kfkc1.node.cp-internal-onecloud.consul found.  at 
java.base/sun.security.ssl.Alert.createSSLException(Alert.java:131)  at 
java.base/sun.security.ssl.TransportContext.fatal(TransportContext.java:378) at 
java.base/sun.security.ssl.TransportContext.fatal(TransportContext.java:321) at 
java.base/sun.security.ssl.TransportContext.fatal(TransportContext.java:316) at 
java.base/sun.security.ssl.CertificateMessage$T13CertificateConsumer.checkServerCerts(CertificateMessage.java:1351)
  at 
java.base/sun.security.ssl.CertificateMessage$T13CertificateConsumer.onConsumeCertificate(CertificateMessage.java:1226)
  at 
java.base/sun.security.ssl.CertificateMessage$T13CertificateConsumer.consume(CertificateMessage.java:1169)
   at java.base/sun.security.ssl.SSLHandshake.consume(SSLHandshake.java:396)
   at 
java.base/sun.security.ssl.HandshakeContext.dispatch(HandshakeContext.java:480) 
 at 
java.base/sun.security.ssl.SSLEngineImpl$DelegatedTask$DelegatedAction.run(SSLEngineImpl.java:1277)
  at 
java.base/sun.security.ssl.SSLEngineImpl$DelegatedTask$DelegatedAction.run(SSLEngineImpl.java:1264)
  at 
java.base/java.security.AccessController.doPrivileged(AccessController.java:712)
 at 
java.base/sun.security.ssl.SSLEngineImpl$DelegatedTask.run(SSLEngineImpl.java:1209)
  at 
org.apache.kafka.common.network.SslTransportLayer.runDelegatedTasks(SslTransportLayer.java:435)
  at 
org.apache.kafka.common.network.SslTransportLayer.handshakeUnwrap(SslTransportLayer.java:523)
at 
org.apache.kafka.common.network.SslTransportLayer.doHandshake(SslTransportLayer.java:373)
at 
org.apache.kafka.common.network.SslTransportLayer.handshake(SslTransportLayer.java:293)
  at 
org.apache.kafka.common.network.KafkaChannel.prepare(KafkaChannel.java:178) 

Re: [PR] Update RemoteLogManager configuration in broker server - KAFKA-16790 [kafka]

2024-05-22 Thread via GitHub


muralibasani commented on PR #16005:
URL: https://github.com/apache/kafka/pull/16005#issuecomment-2125492819

   @nikramakrishnan 
   I think calling applyDelta on replicaManager wouldn't invoke configure on 
rlmm. Probably we should start mocking from BrokerServer.startup method, and 
create a topic, which invoke both configure and applyDelta
   wdyt ? 


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-16516: Fix the controller node provider for broker to control channel [kafka]

2024-05-22 Thread via GitHub


jsancio commented on code in PR #16008:
URL: https://github.com/apache/kafka/pull/16008#discussion_r1610457959


##
core/src/main/scala/kafka/server/NodeToControllerChannelManager.scala:
##
@@ -120,15 +118,15 @@ object RaftControllerNodeProvider {
  */
 class RaftControllerNodeProvider(
   val raftManager: RaftManager[ApiMessageAndVersion],
-  controllerQuorumVoterNodes: Seq[Node],
   val listenerName: ListenerName,
   val securityProtocol: SecurityProtocol,
   val saslMechanism: String
 ) extends ControllerNodeProvider with Logging {
-  private val idToNode = controllerQuorumVoterNodes.map(node => node.id() -> 
node).toMap
+
+  private def idToNode(id: Int): Option[Node] = raftManager.voterNode(id, 
listenerName.value())
 
   override def getControllerInfo(): ControllerInformation =
-
ControllerInformation(raftManager.leaderAndEpoch.leaderId.asScala.map(idToNode),
+
ControllerInformation(raftManager.leaderAndEpoch.leaderId.asScala.flatMap(idToNode),

Review Comment:
   Okay. In a future change PR, I'll change this to `Option[Node] 
RaftManager.leaderNode()` since it looks like the node is always the leader 
node a not some arbitrary voters.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[jira] [Updated] (KAFKA-16820) Kafka Broker fails to connect to Kraft Controller with no DNS matching

2024-05-22 Thread Arushi Helms (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-16820?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Arushi Helms updated KAFKA-16820:
-
Description: 
 

We are migrating our Kafka cluster from zookeeper to Kraft mode. We are running 
individual brokers and controllers with TLS enabled and IPs are given for 
communication. 
TLS enabled setup works fine among the brokers and the certificate looks 
something like:
h5.  
{noformat}
Common Name: *.kafka.service.consul
Subject Alternative Names: *.kafka.service.consul, IP 
Address:10.87.171.84{noformat}
Note: The DNS name for the node does not match the CN but since we are using 
IPs as communication, we have provided IPs as SAN. 

Same with the controllers, IPs are given as SAN in the certificate. 

In the current setup I am running 3 brokers and 3 controllers. 

Relevant controller configurations from one of the controllers:
{noformat}
KAFKA_CFG_PROCESS_ROLES=controller 
KAFKA_KRAFT_CLUSTER_ID=5kztjhJ4SxSu-kdiEYDUow
KAFKA_CFG_NODE_ID=6 
KAFKA_CFG_CONTROLLER_QUORUM_VOTERS=4@10.87.170.83:9097,5@10.87.170.9:9097,6@10.87.170.6:9097
 
KAFKA_CFG_CONTROLLER_LISTENER_NAMES=CONTROLLER 
KAFKA_CFG_INTER_BROKER_LISTENER_NAME=INSIDE_SSL
KAFKA_CFG_LISTENER_SECURITY_PROTOCOL_MAP=CONTROLLER:SSL,INSIDE_SSL:SSL 
KAFKA_CFG_LISTENERS=CONTROLLER://10.87.170.6:9097{noformat}
Relevant broker configuration from one of the brokers:
{noformat}
KAFKA_CFG_CONTROLLER_LISTENER_NAMES=CONTROLLER
KAFKA_CFG_INTER_BROKER_LISTENER_NAME=INSIDE_SSL
KAFKA_CFG_CONTROLLER_QUORUM_VOTERS=4@10.87.170.83:9097,5@10.87.170.9:9097,6@10.87.170.6:9097
 
KAFKA_CFG_PROCESS_ROLES=broker 
KAFKA_CFG_NODE_ID=3 
KAFKA_CFG_LISTENER_SECURITY_PROTOCOL_MAP=INSIDE_SSL:SSL,OUTSIDE_SSL:SSL,CONTROLLER:SSL
 
KAFKA_CFG_LISTENERS=INSIDE_SSL://10.87.170.78:9093,OUTSIDE_SSL://10.87.170.78:9096
 
KAFKA_CFG_ADVERTISED_LISTENERS=INSIDE_SSL://10.87.170.78:9093,OUTSIDE_SSL://10.87.170.78:9096{noformat}
 

ISSUE 1: 
With this setup Kafka broker is failing to connect to the controller, see the 
following error:
{noformat}
2024-05-22 17:53:46,413] ERROR 
[broker-2-to-controller-heartbeat-channel-manager]: Request 
BrokerRegistrationRequestData(brokerId=2, clusterId='5kztjhJ4SxSu-kdiEYDUow', 
incarnationId=7741fgH6T4SQqGsho8E6mw, listeners=[Listener(name='INSIDE_SSL', 
host='10.87.170.81', port=9093, securityProtocol=1), Listener(name='INSIDE', 
host='10.87.170.81', port=9094, securityProtocol=0), Listener(name='OUTSIDE', 
host='10.87.170.81', port=9092, securityProtocol=0), 
Listener(name='OUTSIDE_SSL', host='10.87.170.81', port=9096, 
securityProtocol=1)], features=[Feature(name='metadata.version', 
minSupportedVersion=1, maxSupportedVersion=19)], rack=null, 
isMigratingZkBroker=false, logDirs=[TJssfKDD-iBFYfIYCKOcew], 
previousBrokerEpoch=-1) failed due to authentication error with controller 
(kafka.server.NodeToControllerRequestThread)org.apache.kafka.common.errors.SslAuthenticationException:
 SSL handshake failedCaused by: javax.net.ssl.SSLHandshakeException: No subject 
alternative DNS name matching 
cp-internal-onecloud-kfkc1.node.cp-internal-onecloud.consul found.  at 
java.base/sun.security.ssl.Alert.createSSLException(Alert.java:131)  at 
java.base/sun.security.ssl.TransportContext.fatal(TransportContext.java:378) at 
java.base/sun.security.ssl.TransportContext.fatal(TransportContext.java:321) at 
java.base/sun.security.ssl.TransportContext.fatal(TransportContext.java:316) at 
java.base/sun.security.ssl.CertificateMessage$T13CertificateConsumer.checkServerCerts(CertificateMessage.java:1351)
  at 
java.base/sun.security.ssl.CertificateMessage$T13CertificateConsumer.onConsumeCertificate(CertificateMessage.java:1226)
  at 
java.base/sun.security.ssl.CertificateMessage$T13CertificateConsumer.consume(CertificateMessage.java:1169)
   at java.base/sun.security.ssl.SSLHandshake.consume(SSLHandshake.java:396)
   at 
java.base/sun.security.ssl.HandshakeContext.dispatch(HandshakeContext.java:480) 
 at 
java.base/sun.security.ssl.SSLEngineImpl$DelegatedTask$DelegatedAction.run(SSLEngineImpl.java:1277)
  at 
java.base/sun.security.ssl.SSLEngineImpl$DelegatedTask$DelegatedAction.run(SSLEngineImpl.java:1264)
  at 
java.base/java.security.AccessController.doPrivileged(AccessController.java:712)
 at 
java.base/sun.security.ssl.SSLEngineImpl$DelegatedTask.run(SSLEngineImpl.java:1209)
  at 
org.apache.kafka.common.network.SslTransportLayer.runDelegatedTasks(SslTransportLayer.java:435)
  at 
org.apache.kafka.common.network.SslTransportLayer.handshakeUnwrap(SslTransportLayer.java:523)
at 
org.apache.kafka.common.network.SslTransportLayer.doHandshake(SslTransportLayer.java:373)
at 
org.apache.kafka.common.network.SslTransportLayer.handshake(SslTransportLayer.java:293)
  at 
org.apache.kafka.common.network.KafkaChannel.prepare(KafkaChannel.java:178)  at 
org.apache.kafka.common.network.Selector.pollSelectionKeys(Selector.java:543)   
 at 

[jira] [Created] (KAFKA-16820) Kafka Broker fails to connect to Kraft Controller with no DNS matching

2024-05-22 Thread Arushi Helms (Jira)
Arushi Helms created KAFKA-16820:


 Summary: Kafka Broker fails to connect to Kraft Controller with no 
DNS matching 
 Key: KAFKA-16820
 URL: https://issues.apache.org/jira/browse/KAFKA-16820
 Project: Kafka
  Issue Type: Bug
  Components: kraft
Affects Versions: 3.6.1, 3.7.0, 3.8.0
Reporter: Arushi Helms
 Attachments: Screenshot 2024-05-22 at 1.09.11 PM-1.png

 

We are migrating our Kafka cluster from zookeeper to Kraft mode. We are running 
individual brokers and controllers with TLS enabled and IPs are given for 
communication. 
TLS enabled setup works fine among the brokers and the certificate looks 
something like:
h5.  
{noformat}
Common Name: *.kafka.service.consul
Subject Alternative Names: *.kafka.service.consul, IP 
Address:10.87.171.84{noformat}

Note: The DNS name for the node does not match the CN but since we are using 
IPs as communication, we have provided IPs as SAN. 

Same with the controllers, IPs are given as SAN in the certificate. 

In the current setup I am running 3 brokers and 3 controllers. 

Relevant controller configurations from one of the controllers:

{{}}
{noformat}
KAFKA_CFG_PROCESS_ROLES=controller 
KAFKA_KRAFT_CLUSTER_ID=5kztjhJ4SxSu-kdiEYDUow
KAFKA_CFG_NODE_ID=6 
KAFKA_CFG_CONTROLLER_QUORUM_VOTERS=4@10.87.170.83:9097,5@10.87.170.9:9097,6@10.87.170.6:9097
 
KAFKA_CFG_CONTROLLER_LISTENER_NAMES=CONTROLLER 
KAFKA_CFG_INTER_BROKER_LISTENER_NAME=INSIDE_SSL
KAFKA_CFG_LISTENER_SECURITY_PROTOCOL_MAP=CONTROLLER:SSL,INSIDE_SSL:SSL 
KAFKA_CFG_LISTENERS=CONTROLLER://10.87.170.6:9097{noformat}
{{}}

 

Relevant broker configuration from one of the brokers:

 
{noformat}
KAFKA_CFG_CONTROLLER_LISTENER_NAMES=CONTROLLER
KAFKA_CFG_INTER_BROKER_LISTENER_NAME=INSIDE_SSL 
KAFKA_CFG_CONTROLLER_QUORUM_VOTERS=4@10.87.170.83:9097,5@10.87.170.9:9097,6@10.87.170.6:9097
 
KAFKA_CFG_PROCESS_ROLES=broker 
KAFKA_CFG_NODE_ID=3 
KAFKA_CFG_LISTENER_SECURITY_PROTOCOL_MAP=INSIDE_SSL:SSL,OUTSIDE_SSL:SSL,CONTROLLER:SSL
 
KAFKA_CFG_LISTENERS=INSIDE_SSL://10.87.170.78:9093,OUTSIDE_SSL://10.87.170.78:9096
 
KAFKA_CFG_ADVERTISED_LISTENERS=INSIDE_SSL://10.87.170.78:9093,OUTSIDE_SSL://10.87.170.78:9096{noformat}
{{}}

 

ISSUE 1: 
With this setup Kafka broker is failing to connect to the controller, see the 
following error:
{noformat}
2024-05-22 17:53:46,413] ERROR 
[broker-2-to-controller-heartbeat-channel-manager]: Request 
BrokerRegistrationRequestData(brokerId=2, clusterId='5kztjhJ4SxSu-kdiEYDUow', 
incarnationId=7741fgH6T4SQqGsho8E6mw, listeners=[Listener(name='INSIDE_SSL', 
host='10.87.170.81', port=9093, securityProtocol=1), Listener(name='INSIDE', 
host='10.87.170.81', port=9094, securityProtocol=0), Listener(name='OUTSIDE', 
host='10.87.170.81', port=9092, securityProtocol=0), 
Listener(name='OUTSIDE_SSL', host='10.87.170.81', port=9096, 
securityProtocol=1)], features=[Feature(name='metadata.version', 
minSupportedVersion=1, maxSupportedVersion=19)], rack=null, 
isMigratingZkBroker=false, logDirs=[TJssfKDD-iBFYfIYCKOcew], 
previousBrokerEpoch=-1) failed due to authentication error with controller 
(kafka.server.NodeToControllerRequestThread)org.apache.kafka.common.errors.SslAuthenticationException:
 SSL handshake failedCaused by: javax.net.ssl.SSLHandshakeException: No subject 
alternative DNS name matching 
cp-internal-onecloud-kfkc1.node.cp-internal-onecloud.consul found.  at 
java.base/sun.security.ssl.Alert.createSSLException(Alert.java:131)  at 
java.base/sun.security.ssl.TransportContext.fatal(TransportContext.java:378) at 
java.base/sun.security.ssl.TransportContext.fatal(TransportContext.java:321) at 
java.base/sun.security.ssl.TransportContext.fatal(TransportContext.java:316) at 
java.base/sun.security.ssl.CertificateMessage$T13CertificateConsumer.checkServerCerts(CertificateMessage.java:1351)
  at 
java.base/sun.security.ssl.CertificateMessage$T13CertificateConsumer.onConsumeCertificate(CertificateMessage.java:1226)
  at 
java.base/sun.security.ssl.CertificateMessage$T13CertificateConsumer.consume(CertificateMessage.java:1169)
   at java.base/sun.security.ssl.SSLHandshake.consume(SSLHandshake.java:396)
   at 
java.base/sun.security.ssl.HandshakeContext.dispatch(HandshakeContext.java:480) 
 at 
java.base/sun.security.ssl.SSLEngineImpl$DelegatedTask$DelegatedAction.run(SSLEngineImpl.java:1277)
  at 
java.base/sun.security.ssl.SSLEngineImpl$DelegatedTask$DelegatedAction.run(SSLEngineImpl.java:1264)
  at 
java.base/java.security.AccessController.doPrivileged(AccessController.java:712)
 at 
java.base/sun.security.ssl.SSLEngineImpl$DelegatedTask.run(SSLEngineImpl.java:1209)
  at 
org.apache.kafka.common.network.SslTransportLayer.runDelegatedTasks(SslTransportLayer.java:435)
  at 
org.apache.kafka.common.network.SslTransportLayer.handshakeUnwrap(SslTransportLayer.java:523)
at 

Re: [PR] KAFKA-16793: Heartbeat API for upgrading ConsumerGroup [kafka]

2024-05-22 Thread via GitHub


jeffkbkim commented on code in PR #15988:
URL: https://github.com/apache/kafka/pull/15988#discussion_r1610462585


##
group-coordinator/src/test/java/org/apache/kafka/coordinator/group/GroupMetadataManagerTest.java:
##
@@ -12401,6 +12436,363 @@ public void 
testClassicGroupSyncToConsumerGroupRebalanceInProgress() throws Exce
 .withProtocolName("range")
 .build())
 );
+context.assertJoinTimeout(groupId, memberId, 1);
+}
+
+@Test
+public void testClassicGroupHeartbeatToConsumerGroupMaintainsSession() 
throws Exception {
+String groupId = "group-id";
+String memberId = Uuid.randomUuid().toString();
+int sessionTimeout = 5000;
+
+List protocols = 
Collections.singletonList(
+new ConsumerGroupMemberMetadataValue.ClassicProtocol()
+.setName("range")
+
.setMetadata(Utils.toArray(ConsumerProtocol.serializeSubscription(
+new ConsumerPartitionAssignor.Subscription(
+Arrays.asList("foo"),
+null,
+Collections.emptyList()
+)
+)))
+);
+
+// Consumer group with a member using the classic protocol.
+GroupMetadataManagerTestContext context = new 
GroupMetadataManagerTestContext.Builder()
+.withAssignors(Collections.singletonList(new 
MockPartitionAssignor("range")))
+.withConsumerGroup(new ConsumerGroupBuilder(groupId, 10)
+.withMember(new ConsumerGroupMember.Builder(memberId)
+.setClassicMemberMetadata(
+new 
ConsumerGroupMemberMetadataValue.ClassicMemberMetadata()
+.setSessionTimeoutMs(sessionTimeout)
+.setSupportedProtocols(protocols)
+)
+.setMemberEpoch(10)
+.build()))
+.build();
+
+// Heartbeat to schedule the session timeout.
+HeartbeatRequestData request = new HeartbeatRequestData()
+.setGroupId(groupId)
+.setMemberId(memberId)
+.setGenerationId(10);
+context.sendClassicGroupHeartbeat(request);
+context.assertSessionTimeout(groupId, memberId, sessionTimeout);
+
+// Advance clock by 1/2 of session timeout.
+
GroupMetadataManagerTestContext.assertNoOrEmptyResult(context.sleep(sessionTimeout
 / 2));
+
+HeartbeatResponseData heartbeatResponse = 
context.sendClassicGroupHeartbeat(request).response();
+assertEquals(Errors.NONE.code(), heartbeatResponse.errorCode());
+context.assertSessionTimeout(groupId, memberId, sessionTimeout);
+
+// Advance clock by 1/2 of session timeout.
+
GroupMetadataManagerTestContext.assertNoOrEmptyResult(context.sleep(sessionTimeout
 / 2));
+
+heartbeatResponse = 
context.sendClassicGroupHeartbeat(request).response();
+assertEquals(Errors.NONE.code(), heartbeatResponse.errorCode());
+context.assertSessionTimeout(groupId, memberId, sessionTimeout);
+}
+
+@Test
+public void testClassicGroupHeartbeatToConsumerGroupRebalanceInProgress() 
throws Exception {
+String groupId = "group-id";
+String memberId1 = Uuid.randomUuid().toString();
+String memberId2 = Uuid.randomUuid().toString();
+String memberId3 = Uuid.randomUuid().toString();
+Uuid fooTopicId = Uuid.randomUuid();
+Uuid barTopicId = Uuid.randomUuid();
+int sessionTimeout = 5000;
+int rebalanceTimeout = 1;
+
+List protocols = 
Collections.singletonList(
+new ConsumerGroupMemberMetadataValue.ClassicProtocol()
+.setName("range")
+
.setMetadata(Utils.toArray(ConsumerProtocol.serializeSubscription(
+new ConsumerPartitionAssignor.Subscription(
+Arrays.asList("foo"),

Review Comment:
   nit: Collections.singletonList("foo")



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-16793: Heartbeat API for upgrading ConsumerGroup [kafka]

2024-05-22 Thread via GitHub


jeffkbkim commented on code in PR #15988:
URL: https://github.com/apache/kafka/pull/15988#discussion_r1610460306


##
group-coordinator/src/test/java/org/apache/kafka/coordinator/group/GroupMetadataManagerTest.java:
##
@@ -12401,6 +12436,363 @@ public void 
testClassicGroupSyncToConsumerGroupRebalanceInProgress() throws Exce
 .withProtocolName("range")
 .build())
 );
+context.assertJoinTimeout(groupId, memberId, 1);
+}
+
+@Test
+public void testClassicGroupHeartbeatToConsumerGroupMaintainsSession() 
throws Exception {
+String groupId = "group-id";
+String memberId = Uuid.randomUuid().toString();
+int sessionTimeout = 5000;
+
+List protocols = 
Collections.singletonList(
+new ConsumerGroupMemberMetadataValue.ClassicProtocol()
+.setName("range")
+
.setMetadata(Utils.toArray(ConsumerProtocol.serializeSubscription(
+new ConsumerPartitionAssignor.Subscription(
+Arrays.asList("foo"),

Review Comment:
   nit: i think we can use `Collections.singletonList("foo"),`



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-16252; Dynamic KRaft network manager and channel [kafka]

2024-05-22 Thread via GitHub


hachikuji commented on code in PR #15986:
URL: https://github.com/apache/kafka/pull/15986#discussion_r1610398594


##
core/src/test/scala/unit/kafka/raft/RaftManagerTest.scala:
##
@@ -118,6 +120,10 @@ class RaftManagerTest {
   new Metrics(Time.SYSTEM),
   Option.empty,
   
CompletableFuture.completedFuture(QuorumConfig.parseVoterConnections(config.quorumVoters)),
+  config.quorumBootstrapServers

Review Comment:
   nit: we seem to have this same little snippet in a few places. Is there 
somewhere we could add a helper?



##
core/src/test/resources/log4j.properties:
##
@@ -20,6 +20,8 @@ log4j.appender.stdout.layout.ConversionPattern=[%d] %p %m 
(%c:%L)%n
 
 log4j.logger.kafka=WARN
 log4j.logger.org.apache.kafka=WARN
+# TODO; remove this line

Review Comment:
   Reminder about TODO



##
raft/src/main/java/org/apache/kafka/raft/QuorumConfig.java:
##
@@ -231,4 +266,26 @@ public String toString() {
 return "non-empty list";
 }
 }
+
+public static class ControllerQuorumBootstrapServersValidator implements 
ConfigDef.Validator {
+@Override
+public void ensureValid(String name, Object value) {
+if (value == null) {
+throw new ConfigException(name, null);
+}
+
+@SuppressWarnings("unchecked")
+List entries = (List) value;
+
+// Attempt to parse the connect strings
+for (String entry : entries) {
+QuorumConfig.parseBootstrapServer(entry);

Review Comment:
   nit: drop unnecessary `QuorumConfig` prefix?



##
core/src/test/scala/unit/kafka/server/KafkaConfigTest.scala:
##
@@ -1366,6 +1368,26 @@ class KafkaConfigTest {
 assertEquals(expectedVoters, addresses)
   }
 
+  @Test
+  def testParseQuorumBootstrapServers(): Unit = {

Review Comment:
   Perhaps we should have some invalid test cases as well?



##
raft/src/main/java/org/apache/kafka/raft/RequestManager.java:
##
@@ -17,108 +17,196 @@
 package org.apache.kafka.raft;
 
 import java.util.ArrayList;
+import java.util.Collection;
 import java.util.HashMap;
-import java.util.List;
 import java.util.Map;
-import java.util.OptionalInt;
+import java.util.Optional;
 import java.util.OptionalLong;
 import java.util.Random;
-import java.util.Set;
+import org.apache.kafka.common.Node;
 
 public class RequestManager {
-private final Map connections = new HashMap<>();
-private final List voters = new ArrayList<>();
+private final Map connections = new HashMap<>();
+private final ArrayList bootstrapServers;
 
 private final int retryBackoffMs;
 private final int requestTimeoutMs;
 private final Random random;
 
-public RequestManager(Set voterIds,
-  int retryBackoffMs,
-  int requestTimeoutMs,
-  Random random) {
-
+public RequestManager(
+Collection bootstrapServers,
+int retryBackoffMs,
+int requestTimeoutMs,
+Random random
+) {
+this.bootstrapServers = new ArrayList<>(bootstrapServers);
 this.retryBackoffMs = retryBackoffMs;
 this.requestTimeoutMs = requestTimeoutMs;
-this.voters.addAll(voterIds);
 this.random = random;
-
-for (Integer voterId: voterIds) {
-ConnectionState connection = new ConnectionState(voterId);
-connections.put(voterId, connection);
-}
-}
-
-public ConnectionState getOrCreate(int id) {
-return connections.computeIfAbsent(id, key -> new ConnectionState(id));
 }
 
-public OptionalInt findReadyVoter(long currentTimeMs) {
-int startIndex = random.nextInt(voters.size());
-OptionalInt res = OptionalInt.empty();
-for (int i = 0; i < voters.size(); i++) {
-int index = (startIndex + i) % voters.size();
-Integer voterId = voters.get(index);
-ConnectionState connection = connections.get(voterId);
-boolean isReady = connection.isReady(currentTimeMs);
+public Optional findReadyBootstrapServer(long currentTimeMs) {
+int startIndex = random.nextInt(bootstrapServers.size());
+Optional res = Optional.empty();
+for (int i = 0; i < bootstrapServers.size(); i++) {
+int index = (startIndex + i) % bootstrapServers.size();
+Node node = bootstrapServers.get(index);
 
-if (isReady) {
-res = OptionalInt.of(voterId);
-} else if (connection.inFlightCorrelationId.isPresent()) {
-res = OptionalInt.empty();
+if (isReady(node, currentTimeMs)) {
+res = Optional.of(node);
+} else if (hasInflightRequest(node, currentTimeMs)) {
+res = Optional.empty();
 break;
 }
 }
+
 return res;
 }
 
-public long 

Re: [PR] KAFKA-16793: Heartbeat API for upgrading ConsumerGroup [kafka]

2024-05-22 Thread via GitHub


jeffkbkim commented on code in PR #15988:
URL: https://github.com/apache/kafka/pull/15988#discussion_r1610459144


##
group-coordinator/src/test/java/org/apache/kafka/coordinator/group/GroupMetadataManagerTest.java:
##
@@ -12143,7 +12183,6 @@ public void 
testClassicGroupSyncToConsumerGroupWithAllConsumerProtocolVersions()
 // Consumer group with two members.
 // Member 1 uses the classic protocol and member 2 uses the 
consumer protocol.
 GroupMetadataManagerTestContext context = new 
GroupMetadataManagerTestContext.Builder()
-
.withConsumerGroupMigrationPolicy(ConsumerGroupMigrationPolicy.DOWNGRADE)

Review Comment:
   why were these removed?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-16793: Heartbeat API for upgrading ConsumerGroup [kafka]

2024-05-22 Thread via GitHub


jeffkbkim commented on code in PR #15988:
URL: https://github.com/apache/kafka/pull/15988#discussion_r1610456745


##
group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupMetadataManager.java:
##
@@ -4274,6 +4343,81 @@ private void validateClassicGroupHeartbeat(
 }
 }
 
+/**
+ * Handle a classic group HeartbeatRequest to a consumer group. A response 
with
+ * REBALANCE_IN_PROGRESS is returned if 1) the member epoch is smaller 
than the
+ * group epoch, 2) the member is in UNREVOKED_PARTITIONS, or 3) the member 
is in
+ * UNRELEASED_PARTITIONS and all its partitions pending assignment are 
free.
+ *
+ * @param group  The ConsumerGroup.
+ * @param contextThe request context.
+ * @param requestThe actual Heartbeat request.
+ *
+ * @return The coordinator result that contains the heartbeat response.
+ */
+private CoordinatorResult 
classicGroupHeartbeatToConsumerGroup(
+ConsumerGroup group,
+RequestContext context,
+HeartbeatRequestData request
+) throws UnknownMemberIdException, FencedInstanceIdException, 
IllegalGenerationException {
+String groupId = request.groupId();
+String memberId = request.memberId();
+String instanceId = request.groupInstanceId();
+ConsumerGroupMember member = validateConsumerGroupMember(group, 
memberId, instanceId);
+
+throwIfMemberDoesNotUseClassicProtocol(member);
+throwIfGenerationIdUnmatched(memberId, member.memberEpoch(), 
request.generationId());
+
+scheduleConsumerGroupSessionTimeout(groupId, memberId, 
member.classicProtocolSessionTimeout().get());
+
+Errors error = Errors.NONE;
+// The member should rejoin if any of the following conditions is met.
+// 1) The group epoch is bumped so the member need to rejoin to catch 
up.
+// 2) The member needs to revoke some partitions and rejoin to 
reconcile with the new epoch.
+// 3) The member's partitions pending assignment are free, so it can 
rejoin to get the complete assignment.
+if (member.memberEpoch() < group.groupEpoch() ||
+member.state() == MemberState.UNREVOKED_PARTITIONS ||
+(member.state() == MemberState.UNRELEASED_PARTITIONS && 
!group.waitingOnUnreleasedPartition(member))) {

Review Comment:
   i'm not sure i fully understand this part.
   
   UNRELEASED_PARTITIONS means that the member is waiting on partitions. 
However, i'm guessing the helper checks that the latest state does in fact have 
all partitions released but we want it to rejoin to get the updated assignment. 
Is this correct?
   
   Will this member be updated to STABLE state in the next 
CurrentAssignmentBuilder#computeNextAssignment?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-16308 [1/N]: Create FeatureVersion interface and add `--feature` flag and handling to StorageTool [kafka]

2024-05-22 Thread via GitHub


artemlivshits commented on code in PR #15685:
URL: https://github.com/apache/kafka/pull/15685#discussion_r1610437612


##
server-common/src/main/java/org/apache/kafka/server/common/TestFeatureVersion.java:
##
@@ -0,0 +1,56 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.server.common;
+
+import java.util.Collections;
+import java.util.Map;
+
+public enum TestFeatureVersion implements FeatureVersion {
+
+TEST_0(0, MetadataVersion.IBP_3_3_IV0, Collections.emptyMap()),

Review Comment:
   Version 0 == "feature doesn't exist", does it need to be explicitly codified 
the enum?



##
metadata/src/main/java/org/apache/kafka/controller/ClusterControlManager.java:
##
@@ -470,8 +471,8 @@ BrokerFeature processRegistrationFeature(
 // A feature is not found in the finalizedFeature map if it is unknown 
to the controller or set to 0 (feature not enabled).
 // As more features roll out, it may be common to leave a feature 
disabled, so this log is debug level in the case

Review Comment:
   Comment seems to be outdated w.r.t. latest logic?



##
server-common/src/main/java/org/apache/kafka/server/common/FeatureVersion.java:
##
@@ -0,0 +1,49 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.kafka.server.common;
+
+import java.util.Map;
+
+public interface FeatureVersion {
+
+/**
+ * The level of the feature. 0 means the feature is disabled.
+ */
+short featureLevel();
+
+/**
+ * The name of the feature.
+ */
+String featureName();
+
+/**
+ * The minimum metadata version which sets this feature version as 
default. When bootstrapping using only
+ * a metadata version, a reasonable default for all other features is 
chosen based on this value.
+ * This should be defined as the next metadata version to be released when 
the feature version becomes production ready.
+ * (Ie, if the current production MV is 17 when a feature version is 
released, its mapping should be to MV 18)

Review Comment:
   Can we make it map to the release version?  If not, can we add a comment 
explaining the logic?
   
   Intuitively, if we have a feature like
   - FOO(1, 42) // released version 1 when MV got bumped to 42 
   - FOO(2, 77) // released version 2 when MV got bumped to 77
   - FOO(3, 77) // released version 3 at the same time as 2
   
   we should be able to discover that if we got MV 45, then FOO=1 because 1 was 
available at 42 and 2&3 are not available yet.  If we got 78 then it should be 
3 because it was max available at 77.
   
   If we got MV 30 then we should get 0 (implicit) because it wasn't available 
yet.
   
   Using the same table, we can determine that if a feature version 1 was 
selected with MV 41, then it's invalid combination; similarly if feature 2 or 3 
was selected with MV 45 it's invalid combination.  And we can select 1, 2 or 3 
when MV is 77.



##
server-common/src/main/java/org/apache/kafka/server/common/Features.java:
##
@@ -64,14 +63,16 @@ public enum Features {
 
 PRODUCTION_FEATURES = Arrays.stream(FEATURES).filter(feature ->
 feature.usedInProduction).collect(Collectors.toList());
+PRODUCTION_FEATURE_NAMES = PRODUCTION_FEATURES.stream().map(feature ->
+feature.name).collect(Collectors.toList());
 }
 
 public String featureName() {
 return name;
 }

Re: [PR] KAFKA-16452: Don't throw OOORE when converting the offset to metadata [kafka]

2024-05-22 Thread via GitHub


junrao commented on code in PR #15825:
URL: https://github.com/apache/kafka/pull/15825#discussion_r1610452888


##
core/src/main/scala/kafka/server/DelayedFetch.scala:
##
@@ -91,19 +91,24 @@ class DelayedFetch(
 // Go directly to the check for Case G if the message offsets are 
the same. If the log segment
 // has just rolled, then the high watermark offset will remain the 
same but be on the old segment,
 // which would incorrectly be seen as an instance of Case F.
-if (endOffset.messageOffset != fetchOffset.messageOffset) {
-  if (endOffset.onOlderSegment(fetchOffset)) {
-// Case F, this can happen when the new fetch operation is on 
a truncated leader
-debug(s"Satisfying fetch $this since it is fetching later 
segments of partition $topicIdPartition.")
-return forceComplete()
+if (fetchOffset.messageOffset > endOffset.messageOffset) {
+  // Case F, this can happen when the new fetch operation is on a 
truncated leader
+  debug(s"Satisfying fetch $this since it is fetching later 
segments of partition $topicIdPartition.")
+  return forceComplete()
+} else if (fetchOffset.messageOffset < endOffset.messageOffset) {
+  if (endOffset.messageOffsetOnly() || 
fetchOffset.messageOffsetOnly()) {
+// If we don't know the position of the offset on log 
segments, just pessimistically assume that we
+// only gained 1 byte when fetchOffset < endOffset, otherwise 
do nothing. This can happen when the
+// high-watermark is stale, but should be rare.
+accumulatedSize += 1

Review Comment:
   Yes, in that case, we just wait until endOffset moves to within the local 
log segments. If the timeout hits, we will just return empty.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-16452: Don't throw OOORE when converting the offset to metadata [kafka]

2024-05-22 Thread via GitHub


kamalcph commented on PR #15825:
URL: https://github.com/apache/kafka/pull/15825#issuecomment-2125471280

   @junrao 
   
   Thanks for the review! Addressed all your comments. PTAL.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[jira] [Commented] (KAFKA-15250) ConsumerNetworkThread is running tight loop

2024-05-22 Thread Philip Nee (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-15250?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17848703#comment-17848703
 ] 

Philip Nee commented on KAFKA-15250:


After the inflight check fix. I think most request managers are backing up 
correctly.  Closing this issue.

> ConsumerNetworkThread is running tight loop
> ---
>
> Key: KAFKA-15250
> URL: https://issues.apache.org/jira/browse/KAFKA-15250
> Project: Kafka
>  Issue Type: Bug
>  Components: clients, consumer
>Reporter: Philip Nee
>Assignee: Philip Nee
>Priority: Critical
>  Labels: consumer-threading-refactor, events
> Fix For: 3.8.0
>
>
> The DefaultBackgroundThread is running tight loops and wasting CPU cycles.  I 
> think we need to reexamine the timeout pass to networkclientDelegate.poll.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Resolved] (KAFKA-15250) ConsumerNetworkThread is running tight loop

2024-05-22 Thread Philip Nee (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-15250?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Philip Nee resolved KAFKA-15250.

Resolution: Fixed

> ConsumerNetworkThread is running tight loop
> ---
>
> Key: KAFKA-15250
> URL: https://issues.apache.org/jira/browse/KAFKA-15250
> Project: Kafka
>  Issue Type: Bug
>  Components: clients, consumer
>Reporter: Philip Nee
>Assignee: Philip Nee
>Priority: Critical
>  Labels: consumer-threading-refactor, events
> Fix For: 3.8.0
>
>
> The DefaultBackgroundThread is running tight loops and wasting CPU cycles.  I 
> think we need to reexamine the timeout pass to networkclientDelegate.poll.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Created] (KAFKA-16819) CoordinatorRequestManager seems to return 0ms during the coordinator discovery

2024-05-22 Thread Philip Nee (Jira)
Philip Nee created KAFKA-16819:
--

 Summary: CoordinatorRequestManager seems to return 0ms during the 
coordinator discovery
 Key: KAFKA-16819
 URL: https://issues.apache.org/jira/browse/KAFKA-16819
 Project: Kafka
  Issue Type: Bug
  Components: consumer
Reporter: Philip Nee
Assignee: Philip Nee


In KAFKA-15250 we discovered the ConsumerNetworkThread is looping without much 
backoff.  The in-flight check PR fixed a lot of it; however, during the 
coordinator discovery phase, CoordinatorRequestManager would keep on returning 
0 before the coordinator node was found.

 

The impact is minor but we should be expecting the coordinator manager to 
backoff until the exp backoff expired (so it should return around 100ms).



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


Re: [PR] KAFKA-16452: Don't throw OOORE when converting the offset to metadata [kafka]

2024-05-22 Thread via GitHub


kamalcph commented on code in PR #15825:
URL: https://github.com/apache/kafka/pull/15825#discussion_r1610404165


##
storage/src/main/java/org/apache/kafka/storage/internals/log/LogSegment.java:
##
@@ -445,7 +445,7 @@ public FetchDataInfo read(long startOffset, int maxSize, 
long maxPosition, boole
 adjustedMaxSize = Math.max(maxSize, startOffsetAndSize.size);
 
 // return a log segment but with zero size in the case below
-if (adjustedMaxSize == 0)
+if (adjustedMaxSize == 0 || maxPosition == -1)

Review Comment:
   LogSegment is in Java and using Optional as a method parameter in 
LogSegment#read shows warnings in the intelliJ and require some refactoring 
(passing `int` for `long` parameter gets converted implicitly but not with 
optional), so went with the negative value approach:
   
   ```
   'Optional' used as type for parameter 'maxPosition' 
   ```



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-16452: Don't throw OOORE when converting the offset to metadata [kafka]

2024-05-22 Thread via GitHub


kamalcph commented on code in PR #15825:
URL: https://github.com/apache/kafka/pull/15825#discussion_r1610404165


##
storage/src/main/java/org/apache/kafka/storage/internals/log/LogSegment.java:
##
@@ -445,7 +445,7 @@ public FetchDataInfo read(long startOffset, int maxSize, 
long maxPosition, boole
 adjustedMaxSize = Math.max(maxSize, startOffsetAndSize.size);
 
 // return a log segment but with zero size in the case below
-if (adjustedMaxSize == 0)
+if (adjustedMaxSize == 0 || maxPosition == -1)

Review Comment:
   LogSegment is in Java and using Optional as a method parameter in 
LogSegment#read shows warnings in the intelliJ and require some refactoring 
(passing `int` for `long` parameter gets converted implicitly not with 
optional), so went with the negative value approach:
   
   ```
   'Optional' used as type for parameter 'maxPosition' 
   ```



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-16625: Reverse lookup map from topic partitions to members [kafka]

2024-05-22 Thread via GitHub


rreddy-22 commented on code in PR #15974:
URL: https://github.com/apache/kafka/pull/15974#discussion_r1610424080


##
jmh-benchmarks/src/main/java/org/apache/kafka/jmh/assignor/TargetAssignmentBuilderBenchmark.java:
##
@@ -186,9 +190,33 @@ private void createAssignmentSpec() {
 Collections.emptyMap()
 ));
 }
-assignmentSpec = new AssignmentSpec(members, HOMOGENEOUS);
+groupSpec = new GroupSpecImpl(members, HOMOGENEOUS, 
Collections.emptyMap());
 }
 
+public Map> invertedTargetAssignment(

Review Comment:
   I think it's fine, it's just going to be used in these two benchmarks and 
they have some differences



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-16625: Reverse lookup map from topic partitions to members [kafka]

2024-05-22 Thread via GitHub


jeffkbkim commented on code in PR #15974:
URL: https://github.com/apache/kafka/pull/15974#discussion_r1610394065


##
group-coordinator/src/test/java/org/apache/kafka/coordinator/group/assignor/GeneralUniformAssignmentBuilderTest.java:
##
@@ -79,8 +80,16 @@ public void testTwoMembersNoTopicSubscription() {
 Collections.emptyMap()
 ));
 
-AssignmentSpec assignmentSpec = new AssignmentSpec(members, 
HETEROGENEOUS);
-GroupAssignment groupAssignment = assignor.assign(assignmentSpec, 
subscribedTopicMetadata);
+GroupSpecImpl groupSpec = new GroupSpecImpl(

Review Comment:
   nit: should all the `GroupSpecImpl groupSpec` initializations in the tests 
be `GroupSpec groupSpec`?



##
group-coordinator/src/test/java/org/apache/kafka/coordinator/group/AssignmentTestUtil.java:
##
@@ -82,4 +83,34 @@ public static void assertAssignment(
 assertEquals(expectedAssignment.get(memberId), 
computedAssignmentForMember);
 });
 }
+
+/**
+ * Generate a reverse look up map of partition to member target 
assignments from the given member spec.
+ *
+ * @param memberSpecA map where the key is the member Id and the 
value is an
+ *  AssignmentMemberSpec object containing the 
member's partition assignments.
+ * @return Map of topic partition to member assignments.
+ */
+public static Map> invertedTargetAssignment(
+Map memberSpec
+) {
+Map> invertedTargetAssignment = new 
HashMap<>();
+for (Map.Entry memberEntry : 
memberSpec.entrySet()) {
+String memberId = memberEntry.getKey();
+Map> topicsAndPartitions = 
memberEntry.getValue().assignedPartitions();
+
+for (Map.Entry> topicEntry : 
topicsAndPartitions.entrySet()) {
+Uuid topicId = topicEntry.getKey();
+Set partitions = topicEntry.getValue();
+
+invertedTargetAssignment.putIfAbsent(topicId, new HashMap<>());
+Map partitionMap = 
invertedTargetAssignment.get(topicId);

Review Comment:
   i think computeIfAbsent would work



##
group-coordinator/src/main/java/org/apache/kafka/coordinator/group/assignor/PartitionAssignorException.java:
##
@@ -19,7 +19,7 @@
 import org.apache.kafka.common.errors.ApiException;
 
 /**
- * Exception thrown by {@link PartitionAssignor#assign(AssignmentSpec)}. The 
exception
+ * Exception thrown by {@link PartitionAssignor#assign(GroupSpecImpl, 
SubscribedTopicDescriber)}}. The exception

Review Comment:
   i think this needs to be GroupSpec



##
jmh-benchmarks/src/main/java/org/apache/kafka/jmh/assignor/TargetAssignmentBuilderBenchmark.java:
##
@@ -186,9 +190,33 @@ private void createAssignmentSpec() {
 Collections.emptyMap()
 ));
 }
-assignmentSpec = new AssignmentSpec(members, HOMOGENEOUS);
+groupSpec = new GroupSpecImpl(members, HOMOGENEOUS, 
Collections.emptyMap());
 }
 
+public Map> invertedTargetAssignment(
+GroupAssignment groupAssignment
+) {
+Map> invertedTargetAssignment = new 
HashMap<>();
+for (Map.Entry memberEntry : 
groupAssignment.members().entrySet()) {
+String memberId = memberEntry.getKey();
+Map> topicsAndPartitions = 
memberEntry.getValue().targetPartitions();
+
+for (Map.Entry> topicEntry : 
topicsAndPartitions.entrySet()) {
+Uuid topicId = topicEntry.getKey();
+Set partitions = topicEntry.getValue();
+
+invertedTargetAssignment.putIfAbsent(topicId, new HashMap<>());
+Map partitionMap = 
invertedTargetAssignment.get(topicId);
+
+for (Integer partitionId : partitions) {
+partitionMap.put(partitionId, memberId);
+}
+}
+}
+return invertedTargetAssignment;
+}
+
+

Review Comment:
   nit: newline



##
jmh-benchmarks/src/main/java/org/apache/kafka/jmh/assignor/TargetAssignmentBuilderBenchmark.java:
##
@@ -186,9 +190,33 @@ private void createAssignmentSpec() {
 Collections.emptyMap()
 ));
 }
-assignmentSpec = new AssignmentSpec(members, HOMOGENEOUS);
+groupSpec = new GroupSpecImpl(members, HOMOGENEOUS, 
Collections.emptyMap());
 }
 
+public Map> invertedTargetAssignment(

Review Comment:
   is it not possible to unify this with 
ServerSideAssignorBenchmark#invertedTargetAssignment?
   
   seems like some details are different but they have a lot in common



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] MINOR: Fix rate metric spikes [kafka]

2024-05-22 Thread via GitHub


emitskevich-blp commented on code in PR #15889:
URL: https://github.com/apache/kafka/pull/15889#discussion_r1610406308


##
clients/src/test/java/org/apache/kafka/common/metrics/stats/RateTest.java:
##
@@ -64,4 +69,31 @@ public void testRateWithNoPriorAvailableSamples(int 
numSample, int sampleWindowS
 double expectedRatePerSec = sampleValue / windowSize;
 assertEquals(expectedRatePerSec, observedRate, EPS);
 }
+
+// Record an event every 100 ms on average, moving some 1 ms back or forth 
for fine-grained 
+// window control. The expected rate, hence, is 10-11 events/sec depending 
on the moment of 
+// measurement. Start assertions from the second window. This test is to 
address past issue,
+// when measurements in the end of the sample led to value spikes.

Review Comment:
   Applied



##
clients/src/test/java/org/apache/kafka/common/metrics/stats/RateTest.java:
##
@@ -64,4 +69,30 @@ public void testRateWithNoPriorAvailableSamples(int 
numSample, int sampleWindowS
 double expectedRatePerSec = sampleValue / windowSize;
 assertEquals(expectedRatePerSec, observedRate, EPS);
 }
+
+// Record an event every 100 ms on average, moving some 1 ms back or forth 
for fine-grained 
+// window control. The expected rate, hence, is 10-11 events/sec depending 
on the moment of 
+// measurement. Start assertions from the second window.
+@Test
+public void testRateIsConsistentAfterTheFirstWindow() {
+MetricConfig config = new MetricConfig().timeWindow(1, 
SECONDS).samples(2);
+List steps = Arrays.asList(0, 99, 100, 100, 100, 100, 100, 
100, 100, 100, 100);
+
+// start the first window and record events at 0,99,199,...,999 ms 
+for (int stepMs : steps) {
+time.sleep(stepMs);
+rate.record(config, 1, time.milliseconds());
+}
+
+// making a gap of 100 ms between windows
+time.sleep(101);
+
+// start the second window and record events at 0,99,199,...,999 ms
+for (int stepMs : steps) {
+time.sleep(stepMs);
+rate.record(config, 1, time.milliseconds());
+double observedRate = rate.measure(config, time.milliseconds());

Review Comment:
   Applied



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-16452: Don't throw OOORE when converting the offset to metadata [kafka]

2024-05-22 Thread via GitHub


kamalcph commented on code in PR #15825:
URL: https://github.com/apache/kafka/pull/15825#discussion_r1610404165


##
storage/src/main/java/org/apache/kafka/storage/internals/log/LogSegment.java:
##
@@ -445,7 +445,7 @@ public FetchDataInfo read(long startOffset, int maxSize, 
long maxPosition, boole
 adjustedMaxSize = Math.max(maxSize, startOffsetAndSize.size);
 
 // return a log segment but with zero size in the case below
-if (adjustedMaxSize == 0)
+if (adjustedMaxSize == 0 || maxPosition == -1)

Review Comment:
   LogSegment is in Java and using Optional as a method parameter in 
LogSegment#read shows warnings in the intelliJ and require some refactoring, so 
went with the negative value approach:
   
   ```
   'Optional' used as type for parameter 'maxPosition' 
   ```



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-16452: Don't throw OOORE when converting the offset to metadata [kafka]

2024-05-22 Thread via GitHub


kamalcph commented on code in PR #15825:
URL: https://github.com/apache/kafka/pull/15825#discussion_r1610401875


##
core/src/main/scala/kafka/server/DelayedFetch.scala:
##
@@ -91,19 +91,24 @@ class DelayedFetch(
 // Go directly to the check for Case G if the message offsets are 
the same. If the log segment
 // has just rolled, then the high watermark offset will remain the 
same but be on the old segment,
 // which would incorrectly be seen as an instance of Case F.
-if (endOffset.messageOffset != fetchOffset.messageOffset) {
-  if (endOffset.onOlderSegment(fetchOffset)) {
-// Case F, this can happen when the new fetch operation is on 
a truncated leader
-debug(s"Satisfying fetch $this since it is fetching later 
segments of partition $topicIdPartition.")
-return forceComplete()
+if (fetchOffset.messageOffset > endOffset.messageOffset) {
+  // Case F, this can happen when the new fetch operation is on a 
truncated leader
+  debug(s"Satisfying fetch $this since it is fetching later 
segments of partition $topicIdPartition.")
+  return forceComplete()
+} else if (fetchOffset.messageOffset < endOffset.messageOffset) {
+  if (endOffset.messageOffsetOnly() || 
fetchOffset.messageOffsetOnly()) {
+// If we don't know the position of the offset on log 
segments, just pessimistically assume that we
+// only gained 1 byte when fetchOffset < endOffset, otherwise 
do nothing. This can happen when the
+// high-watermark is stale, but should be rare.
+accumulatedSize += 1

Review Comment:
   To confirm, Should we avoid accumulating the 1 byte when fetchOffset < 
endOffset? The FETCH request will be parked in the purgatory for 500 ms, I 
don't see any issues with it.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-16452: Don't throw OOORE when converting the offset to metadata [kafka]

2024-05-22 Thread via GitHub


kamalcph commented on code in PR #15825:
URL: https://github.com/apache/kafka/pull/15825#discussion_r1610399596


##
core/src/test/scala/unit/kafka/log/LogSegmentTest.scala:
##
@@ -143,6 +143,39 @@ class LogSegmentTest {
 checkEquals(ms2.records.iterator, read.records.records.iterator)
   }
 
+  @Test
+  def testReadWhenNoMaxPosition(): Unit = {
+val maxPosition = -1
+val maxSize = 1
+val seg = createSegment(40)
+val ms = records(50, "hello", "there")
+seg.append(51, RecordBatch.NO_TIMESTAMP, -1L, ms)
+for (minOneMessage <- Array(true, false)) {
+  // read before first offset
+  var read = seg.read(48, maxSize, maxPosition, minOneMessage)

Review Comment:
   yes, when maxSize is negative (-1), then the following error will be thrown:
   
   ```
   java.lang.IllegalArgumentException: Invalid max size -1 for log read from 
segment FileRecords(size=78, 
file=/var/folders/bq/w6tnkbq964q8sqpvj4fbmjr4gq/T/kafka-18165220027855018994/0040.log,
 start=0, end=2147483647)
at 
org.apache.kafka.storage.internals.log.LogSegment.read(LogSegment.java:432)
   ```



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



  1   2   3   >