Re: [PR] KAFKA-16373: KIP-1028: Adding code to support Apache Kafka Docker Official Images [kafka]
VedarthConfluent commented on code in PR #16027: URL: https://github.com/apache/kafka/pull/16027#discussion_r1611042138 ## .github/workflows/prepare_docker_official_image_source.yml: ## @@ -0,0 +1,53 @@ +# Licensed to the Apache Software Foundation (ASF) under one or more +# contributor license agreements. See the NOTICE file distributed with +# this work for additional information regarding copyright ownership. +# The ASF licenses this file to You under the Apache License, Version 2.0 +# (the "License"); you may not use this file except in compliance with +# the License. You may obtain a copy of the License at +# +#http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +name: Docker Prepare Docker Official Image Source + +on: + workflow_dispatch: +inputs: + image_type: +type: choice +description: Docker image type to build and test +options: + - "jvm" + kafka_version: +description: Kafka version for which the source for docker official image is to be built +required: true + +jobs: + build: +runs-on: ubuntu-latest +steps: +- uses: actions/checkout@v3 +- name: Set up Python 3.10 + uses: actions/setup-python@v3 + with: +python-version: "3.10" +- name: Install dependencies + run: | +python -m pip install --upgrade pip +pip install -r docker/requirements.txt +- name: Build New Artifact Review Comment: nit: Maybe Build Docker Official Image Source? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-16373: KIP-1028: Adding code to support Apache Kafka Docker Official Images [kafka]
VedarthConfluent commented on code in PR #16027: URL: https://github.com/apache/kafka/pull/16027#discussion_r1611041547 ## .github/workflows/prepare_docker_official_image_source.yml: ## @@ -0,0 +1,53 @@ +# Licensed to the Apache Software Foundation (ASF) under one or more +# contributor license agreements. See the NOTICE file distributed with +# this work for additional information regarding copyright ownership. +# The ASF licenses this file to You under the Apache License, Version 2.0 +# (the "License"); you may not use this file except in compliance with +# the License. You may obtain a copy of the License at +# +#http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +name: Docker Prepare Docker Official Image Source + +on: + workflow_dispatch: +inputs: + image_type: +type: choice +description: Docker image type to build and test +options: + - "jvm" + kafka_version: +description: Kafka version for which the source for docker official image is to be built Review Comment: Same as https://github.com/apache/kafka/pull/16027/files#r1611040507 -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-16373: KIP-1028: Adding code to support Apache Kafka Docker Official Images [kafka]
VedarthConfluent commented on code in PR #16027: URL: https://github.com/apache/kafka/pull/16027#discussion_r1611040507 ## .github/workflows/docker_official_image_build_and_test.yml: ## @@ -0,0 +1,66 @@ +# Licensed to the Apache Software Foundation (ASF) under one or more +# contributor license agreements. See the NOTICE file distributed with +# this work for additional information regarding copyright ownership. +# The ASF licenses this file to You under the Apache License, Version 2.0 +# (the "License"); you may not use this file except in compliance with +# the License. You may obtain a copy of the License at +# +#http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +name: Docker Official Image Build Test + +on: + workflow_dispatch: +inputs: + image_type: +type: choice +description: Docker image type to build and test +options: + - "jvm" + kafka_version: +description: Kafka version for which the source for docker official image is to be built Review Comment: Simplify and mention that version should be >= 3.7.0 -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Assigned] (KAFKA-16257) SchemaProjector should be extensible to logical types
[ https://issues.apache.org/jira/browse/KAFKA-16257?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Fan Yang reassigned KAFKA-16257: Assignee: Fan Yang > SchemaProjector should be extensible to logical types > - > > Key: KAFKA-16257 > URL: https://issues.apache.org/jira/browse/KAFKA-16257 > Project: Kafka > Issue Type: New Feature > Components: connect >Reporter: Greg Harris >Assignee: Fan Yang >Priority: Minor > Labels: needs-kip > > The SchemaProjector currently only supports projecting primitive Number > types, and cannot handle common logical types as have proliferated in the > Connect ecosystem. > The SchemaProjector or a replacement should have the ability to extend it's > functionality to support these logical types. -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Commented] (KAFKA-16820) Kafka Broker fails to connect to Kraft Controller with no DNS matching
[ https://issues.apache.org/jira/browse/KAFKA-16820?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17848821#comment-17848821 ] Vikash Mishra commented on KAFKA-16820: --- Despite the fact that brokers are provided with controller IPs, it still tries to communicate using DNS of controller, refer below error. This is a different behavior without Kraft where inter-broker communication when provided with IPs, uses IPs to communicate and SSL handshake works using ip_san. {code:java} failed due to authentication error with controller (kafka.server.NodeToControllerRequestThread)org.apache.kafka.common.errors.SslAuthenticationException: SSL handshake failedCaused by: javax.net.ssl.SSLHandshakeException: No subject alternative DNS name matching cp-internal-onecloud-kfkc1.node.cp-internal-onecloud.consul found.{code} Is there a configuration to disable & do IPs based communication between broker & controller when IPs are provided during bootstrap. or is this the default behavior going forward? looping in [~soarez] [~jlprat] release managers of 3.7.1 & 3.8.0 to confirm if the reported issue has already been identified in upcoming releases and a known issue? > Kafka Broker fails to connect to Kraft Controller with no DNS matching > --- > > Key: KAFKA-16820 > URL: https://issues.apache.org/jira/browse/KAFKA-16820 > Project: Kafka > Issue Type: Bug > Components: kraft >Affects Versions: 3.7.0, 3.6.1, 3.8.0 >Reporter: Arushi Helms >Priority: Major > Attachments: Screenshot 2024-05-22 at 1.09.11 PM-1.png > > > > We are migrating our Kafka cluster from zookeeper to Kraft mode. We are > running individual brokers and controllers with TLS enabled and IPs are given > for communication. > TLS enabled setup works fine among the brokers and the certificate looks > something like: > {noformat} > Common Name: *.kafka.service.consul > Subject Alternative Names: *.kafka.service.consul, IP > Address:10.87.171.84{noformat} > Note: > * The DNS name for the node does not match the CN but since we are using IPs > as communication, we have provided IPs as SAN. > * Same with the controllers, IPs are given as SAN in the certificate. > * Issue is not related to the migration so just sharing configuration > relevant for the TLS piece. > In the current setup I am running 3 brokers and 3 controllers. > Relevant controller configurations from one of the controllers: > {noformat} > KAFKA_CFG_PROCESS_ROLES=controller > KAFKA_KRAFT_CLUSTER_ID=5kztjhJ4SxSu-kdiEYDUow > KAFKA_CFG_NODE_ID=6 > KAFKA_CFG_CONTROLLER_QUORUM_VOTERS=4@10.87.170.83:9097,5@10.87.170.9:9097,6@10.87.170.6:9097 > > KAFKA_CFG_CONTROLLER_LISTENER_NAMES=CONTROLLER > KAFKA_CFG_INTER_BROKER_LISTENER_NAME=INSIDE_SSL > KAFKA_CFG_LISTENER_SECURITY_PROTOCOL_MAP=CONTROLLER:SSL,INSIDE_SSL:SSL > KAFKA_CFG_LISTENERS=CONTROLLER://10.87.170.6:9097{noformat} > Relevant broker configuration from one of the brokers: > {noformat} > KAFKA_CFG_CONTROLLER_LISTENER_NAMES=CONTROLLER > KAFKA_CFG_INTER_BROKER_LISTENER_NAME=INSIDE_SSL > KAFKA_CFG_CONTROLLER_QUORUM_VOTERS=4@10.87.170.83:9097,5@10.87.170.9:9097,6@10.87.170.6:9097 > > KAFKA_CFG_PROCESS_ROLES=broker > KAFKA_CFG_NODE_ID=3 > KAFKA_CFG_LISTENER_SECURITY_PROTOCOL_MAP=INSIDE_SSL:SSL,OUTSIDE_SSL:SSL,CONTROLLER:SSL > > KAFKA_CFG_LISTENERS=INSIDE_SSL://10.87.170.78:9093,OUTSIDE_SSL://10.87.170.78:9096 > > KAFKA_CFG_ADVERTISED_LISTENERS=INSIDE_SSL://10.87.170.78:9093,OUTSIDE_SSL://10.87.170.78:9096{noformat} > > ISSUE 1: > With this setup Kafka broker is failing to connect to the controller, see the > following error: > {noformat} > 2024-05-22 17:53:46,413] ERROR > [broker-2-to-controller-heartbeat-channel-manager]: Request > BrokerRegistrationRequestData(brokerId=2, clusterId='5kztjhJ4SxSu-kdiEYDUow', > incarnationId=7741fgH6T4SQqGsho8E6mw, listeners=[Listener(name='INSIDE_SSL', > host='10.87.170.81', port=9093, securityProtocol=1), Listener(name='INSIDE', > host='10.87.170.81', port=9094, securityProtocol=0), Listener(name='OUTSIDE', > host='10.87.170.81', port=9092, securityProtocol=0), > Listener(name='OUTSIDE_SSL', host='10.87.170.81', port=9096, > securityProtocol=1)], features=[Feature(name='metadata.version', > minSupportedVersion=1, maxSupportedVersion=19)], rack=null, > isMigratingZkBroker=false, logDirs=[TJssfKDD-iBFYfIYCKOcew], > previousBrokerEpoch=-1) failed due to authentication error with controller > (kafka.server.NodeToControllerRequestThread)org.apache.kafka.common.errors.SslAuthenticationException: > SSL handshake failedCaused by: javax.net.ssl.SSLHandshakeException: No > subject alternative DNS name matching > cp-internal-onecloud-kfkc1.node.cp-internal-onecloud.consul found.at >
[PR] Improve logical type compatibility in SchemaProjector [kafka]
fanyang opened a new pull request, #16035: URL: https://github.com/apache/kafka/pull/16035 This PR improved logical type compatibility in SchemaProjector. Decimal type with higher scale is compatible with one with same or lower scale. Timestamp type is compatible with Date and Time type. Date and Time types are incompatible with each other. ### Committer Checklist (excluded from commit message) - [ ] Verify design and implementation - [ ] Verify test coverage and CI build status - [ ] Verify documentation (including upgrade notes) -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-15541: Add iterator-duration metrics [kafka]
mjsax merged PR #16028: URL: https://github.com/apache/kafka/pull/16028 -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-15541: Add iterator-duration metrics [kafka]
mjsax commented on code in PR #16028: URL: https://github.com/apache/kafka/pull/16028#discussion_r1610944460 ## streams/src/main/java/org/apache/kafka/streams/state/internals/metrics/StateStoreMetrics.java: ## @@ -149,6 +149,14 @@ private StateStoreMetrics() {} private static final String NUM_OPEN_ITERATORS_DESCRIPTION = "The current number of iterators on the store that have been created, but not yet closed"; +private static final String ITERATOR_DURATION = "iterator-duration"; +private static final String ITERATOR_DURATION_DESCRIPTION = +"time spent between creating an Iterator and closing it, in nanoseconds"; Review Comment: ```suggestion "time spent between creating an iterator and closing it, in nanoseconds"; ``` -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-14412: Add statestore.uncommitted.max.bytes [kafka]
github-actions[bot] commented on PR #15264: URL: https://github.com/apache/kafka/pull/15264#issuecomment-2126168320 This PR is being marked as stale since it has not had any activity in 90 days. If you would like to keep this PR alive, please ask a committer for review. If the PR has merge conflicts, please update it with the latest from trunk (or appropriate release branch) If this PR is no longer valid or desired, please feel free to close it. If no activity occurs in the next 30 days, it will be automatically closed. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-16452: Don't throw OOORE when converting the offset to metadata [kafka]
kamalcph commented on code in PR #15825: URL: https://github.com/apache/kafka/pull/15825#discussion_r1610893769 ## core/src/test/scala/integration/kafka/server/DelayedFetchTest.scala: ## @@ -164,18 +169,71 @@ class DelayedFetchTest { assertTrue(delayedFetch.tryComplete()) assertTrue(delayedFetch.isCompleted) assertTrue(fetchResultOpt.isDefined) + +val fetchResult = fetchResultOpt.get +assertEquals(Errors.NONE, fetchResult.error) + } + + @ParameterizedTest(name = "testDelayedFetchWithMessageOnlyHighWatermark endOffset={0}") + @ValueSource(longs = Array(0, 500)) + def testDelayedFetchWithMessageOnlyHighWatermark(endOffset: Long): Unit = { +val topicIdPartition = new TopicIdPartition(Uuid.randomUuid(), 0, "topic") +val fetchOffset = 450L +val logStartOffset = 5L +val currentLeaderEpoch = Optional.of[Integer](10) +val replicaId = 1 + +val fetchStatus = FetchPartitionStatus( + startOffsetMetadata = new LogOffsetMetadata(fetchOffset), + fetchInfo = new FetchRequest.PartitionData(topicIdPartition.topicId, fetchOffset, logStartOffset, maxBytes, currentLeaderEpoch)) +val fetchParams = buildFollowerFetchParams(replicaId, maxWaitMs = 500) + +var fetchResultOpt: Option[FetchPartitionData] = None +def callback(responses: Seq[(TopicIdPartition, FetchPartitionData)]): Unit = { + fetchResultOpt = Some(responses.head._2) +} + +val delayedFetch = new DelayedFetch( + params = fetchParams, + fetchPartitionStatus = Seq(topicIdPartition -> fetchStatus), + replicaManager = replicaManager, + quota = replicaQuota, + responseCallback = callback +) + +val partition: Partition = mock(classOf[Partition]) + when(replicaManager.getPartitionOrException(topicIdPartition.topicPartition)).thenReturn(partition) +// Note that the high-watermark does not contain the complete metadata +val endOffsetMetadata = new LogOffsetMetadata(endOffset, -1L, -1) +when(partition.fetchOffsetSnapshot( + currentLeaderEpoch, + fetchOnlyFromLeader = true)) + .thenReturn(new LogOffsetSnapshot(0L, endOffsetMetadata, endOffsetMetadata, endOffsetMetadata)) +when(replicaManager.isAddingReplica(any(), anyInt())).thenReturn(false) +expectReadFromReplica(fetchParams, topicIdPartition, fetchStatus.fetchInfo, Errors.NONE) + +// 1. When `endOffset` is 0, it refers to the truncation case +// 2. When `endOffset` is 500, it refers to the normal case +val expected = endOffset == 0 Review Comment: when fetchOffset > endOffset, `forceComplete` gets called. In this case. 450 > 0, so it triggers force completion. This is the only behavior change post the refactor. Previously when fetchOffset > endOffset: 1. If the offsets lie on the same segment, then we wait for the end-offset to move (or) timeout the request. 2. If the endOffset lie on the older segment compared to fetch-offset, then we complete the request by calling force-complete. We can retain the same behavior if required. ``` if (fetchOffset.messageOffset > endOffset.messageOffset) { if (endOffset.onOlderSegment(fetchOffset)) { // Case F, this can happen when the new fetch operation is on a truncated leader debug(s"Satisfying fetch $this since it is fetching later segments of partition $topicIdPartition.") return forceComplete() } } else if (fetchOffset.messageOffset < endOffset.messageOffset) { ... ``` -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-16452: Don't throw OOORE when converting the offset to metadata [kafka]
kamalcph commented on code in PR #15825: URL: https://github.com/apache/kafka/pull/15825#discussion_r1610893769 ## core/src/test/scala/integration/kafka/server/DelayedFetchTest.scala: ## @@ -164,18 +169,71 @@ class DelayedFetchTest { assertTrue(delayedFetch.tryComplete()) assertTrue(delayedFetch.isCompleted) assertTrue(fetchResultOpt.isDefined) + +val fetchResult = fetchResultOpt.get +assertEquals(Errors.NONE, fetchResult.error) + } + + @ParameterizedTest(name = "testDelayedFetchWithMessageOnlyHighWatermark endOffset={0}") + @ValueSource(longs = Array(0, 500)) + def testDelayedFetchWithMessageOnlyHighWatermark(endOffset: Long): Unit = { +val topicIdPartition = new TopicIdPartition(Uuid.randomUuid(), 0, "topic") +val fetchOffset = 450L +val logStartOffset = 5L +val currentLeaderEpoch = Optional.of[Integer](10) +val replicaId = 1 + +val fetchStatus = FetchPartitionStatus( + startOffsetMetadata = new LogOffsetMetadata(fetchOffset), + fetchInfo = new FetchRequest.PartitionData(topicIdPartition.topicId, fetchOffset, logStartOffset, maxBytes, currentLeaderEpoch)) +val fetchParams = buildFollowerFetchParams(replicaId, maxWaitMs = 500) + +var fetchResultOpt: Option[FetchPartitionData] = None +def callback(responses: Seq[(TopicIdPartition, FetchPartitionData)]): Unit = { + fetchResultOpt = Some(responses.head._2) +} + +val delayedFetch = new DelayedFetch( + params = fetchParams, + fetchPartitionStatus = Seq(topicIdPartition -> fetchStatus), + replicaManager = replicaManager, + quota = replicaQuota, + responseCallback = callback +) + +val partition: Partition = mock(classOf[Partition]) + when(replicaManager.getPartitionOrException(topicIdPartition.topicPartition)).thenReturn(partition) +// Note that the high-watermark does not contain the complete metadata +val endOffsetMetadata = new LogOffsetMetadata(endOffset, -1L, -1) +when(partition.fetchOffsetSnapshot( + currentLeaderEpoch, + fetchOnlyFromLeader = true)) + .thenReturn(new LogOffsetSnapshot(0L, endOffsetMetadata, endOffsetMetadata, endOffsetMetadata)) +when(replicaManager.isAddingReplica(any(), anyInt())).thenReturn(false) +expectReadFromReplica(fetchParams, topicIdPartition, fetchStatus.fetchInfo, Errors.NONE) + +// 1. When `endOffset` is 0, it refers to the truncation case +// 2. When `endOffset` is 500, it refers to the normal case +val expected = endOffset == 0 Review Comment: when fetchOffset > endOffset, `forceComplete` gets called. In this case. 450 > 0, so it triggers force completion. This is the only behavior change post the refactor. Previously when fetchOffset > endOffset: 1. If the offsets lie on the same segment, then we wait for the end-offset to move (or) timeout the request. 2. If they lies on the different segment, then we complete the request by calling force-complete. We can retain the same behavior if required. ``` if (fetchOffset.messageOffset > endOffset.messageOffset) { if (endOffset.onOlderSegment(fetchOffset)) { // Case F, this can happen when the new fetch operation is on a truncated leader debug(s"Satisfying fetch $this since it is fetching later segments of partition $topicIdPartition.") return forceComplete() } } else if (fetchOffset.messageOffset < endOffset.messageOffset) { ... ``` -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Commented] (KAFKA-16811) Punctuate Ratio metric almost impossible to track
[ https://issues.apache.org/jira/browse/KAFKA-16811?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17848797#comment-17848797 ] Rohan Desai commented on KAFKA-16811: - Alternatively, or perhaps in addition to a pre-windowed value, we can add a metric that measures the total time spent in punctuate since the StreamThread was created. Then, users can compute the time spent in the window of their choice downstream of the application by taking the value of the metric at the beginning of the window and subtracting that from the value of the metric at the end of the window. > Punctuate Ratio metric almost impossible to track > - > > Key: KAFKA-16811 > URL: https://issues.apache.org/jira/browse/KAFKA-16811 > Project: Kafka > Issue Type: Bug > Components: streams >Affects Versions: 3.7.0 >Reporter: Sebastien Viale >Priority: Minor > > The Punctuate ratio metric is returned after the last record of the poll > loop. It is recomputed in every poll loop. > After a puntuate, the value is close to 1, but there is little chance that > metric is sampled at this time. > So its value is almost always 0. > A solution could be to apply a kind of "sliding window" to it and report the > value for the last x seconds. -- This message was sent by Atlassian Jira (v8.20.10#820010)
Re: [PR] KAFKA-16452: Don't throw OOORE when converting the offset to metadata [kafka]
kamalcph commented on code in PR #15825: URL: https://github.com/apache/kafka/pull/15825#discussion_r1610895466 ## core/src/main/scala/kafka/server/DelayedFetch.scala: ## @@ -91,19 +91,23 @@ class DelayedFetch( // Go directly to the check for Case G if the message offsets are the same. If the log segment // has just rolled, then the high watermark offset will remain the same but be on the old segment, // which would incorrectly be seen as an instance of Case F. -if (endOffset.messageOffset != fetchOffset.messageOffset) { - if (endOffset.onOlderSegment(fetchOffset)) { -// Case F, this can happen when the new fetch operation is on a truncated leader -debug(s"Satisfying fetch $this since it is fetching later segments of partition $topicIdPartition.") -return forceComplete() +if (fetchOffset.messageOffset > endOffset.messageOffset) { + // Case F, this can happen when the new fetch operation is on a truncated leader + debug(s"Satisfying fetch $this since it is fetching later segments of partition $topicIdPartition.") + return forceComplete() +} else if (fetchOffset.messageOffset < endOffset.messageOffset) { + if (fetchOffset.messageOffsetOnly() || endOffset.messageOffsetOnly()) { Review Comment: Not clear on this one. The current `if` checks are easy to read, we can add one debug log to avoid the empty `if` block. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-16452: Don't throw OOORE when converting the offset to metadata [kafka]
kamalcph commented on code in PR #15825: URL: https://github.com/apache/kafka/pull/15825#discussion_r1610893769 ## core/src/test/scala/integration/kafka/server/DelayedFetchTest.scala: ## @@ -164,18 +169,71 @@ class DelayedFetchTest { assertTrue(delayedFetch.tryComplete()) assertTrue(delayedFetch.isCompleted) assertTrue(fetchResultOpt.isDefined) + +val fetchResult = fetchResultOpt.get +assertEquals(Errors.NONE, fetchResult.error) + } + + @ParameterizedTest(name = "testDelayedFetchWithMessageOnlyHighWatermark endOffset={0}") + @ValueSource(longs = Array(0, 500)) + def testDelayedFetchWithMessageOnlyHighWatermark(endOffset: Long): Unit = { +val topicIdPartition = new TopicIdPartition(Uuid.randomUuid(), 0, "topic") +val fetchOffset = 450L +val logStartOffset = 5L +val currentLeaderEpoch = Optional.of[Integer](10) +val replicaId = 1 + +val fetchStatus = FetchPartitionStatus( + startOffsetMetadata = new LogOffsetMetadata(fetchOffset), + fetchInfo = new FetchRequest.PartitionData(topicIdPartition.topicId, fetchOffset, logStartOffset, maxBytes, currentLeaderEpoch)) +val fetchParams = buildFollowerFetchParams(replicaId, maxWaitMs = 500) + +var fetchResultOpt: Option[FetchPartitionData] = None +def callback(responses: Seq[(TopicIdPartition, FetchPartitionData)]): Unit = { + fetchResultOpt = Some(responses.head._2) +} + +val delayedFetch = new DelayedFetch( + params = fetchParams, + fetchPartitionStatus = Seq(topicIdPartition -> fetchStatus), + replicaManager = replicaManager, + quota = replicaQuota, + responseCallback = callback +) + +val partition: Partition = mock(classOf[Partition]) + when(replicaManager.getPartitionOrException(topicIdPartition.topicPartition)).thenReturn(partition) +// Note that the high-watermark does not contain the complete metadata +val endOffsetMetadata = new LogOffsetMetadata(endOffset, -1L, -1) +when(partition.fetchOffsetSnapshot( + currentLeaderEpoch, + fetchOnlyFromLeader = true)) + .thenReturn(new LogOffsetSnapshot(0L, endOffsetMetadata, endOffsetMetadata, endOffsetMetadata)) +when(replicaManager.isAddingReplica(any(), anyInt())).thenReturn(false) +expectReadFromReplica(fetchParams, topicIdPartition, fetchStatus.fetchInfo, Errors.NONE) + +// 1. When `endOffset` is 0, it refers to the truncation case +// 2. When `endOffset` is 500, it refers to the normal case +val expected = endOffset == 0 Review Comment: when fetchOffset > endOffset, `forceComplete` gets called. In this case. 450 > 0, so it triggers force completion. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Commented] (KAFKA-16160) AsyncKafkaConsumer is trying to connect to a disconnected node in a tight loop
[ https://issues.apache.org/jira/browse/KAFKA-16160?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17848791#comment-17848791 ] Phuc Hong Tran commented on KAFKA-16160: [~pnee] I understand > AsyncKafkaConsumer is trying to connect to a disconnected node in a tight loop > -- > > Key: KAFKA-16160 > URL: https://issues.apache.org/jira/browse/KAFKA-16160 > Project: Kafka > Issue Type: Bug > Components: clients, consumer >Reporter: Philip Nee >Assignee: Phuc Hong Tran >Priority: Major > Labels: consumer-threading-refactor > Fix For: 3.8.0 > > > Observing some excessive logging running AsyncKafkaConsumer and observing > excessive logging of : > {code:java} > 1271 [2024-01-15 09:43:36,627] DEBUG [Consumer clientId=console-consumer, > groupId=concurrent_consumer] Node is not ready, handle the request in the > next event loop: node=worker4:9092 (id: 2147483644 rack: null), > request=UnsentRequest{requestBuil > der=ConsumerGroupHeartbeatRequestData(groupId='concurrent_consumer', > memberId='laIqS789StuhXFpTwjh6hA', memberEpoch=1, instanceId=null, > rackId=null, rebalanceTimeoutMs=30, subscribedTopicNames=[output-topic], > serverAssignor=null, topicP > artitions=[TopicPartitions(topicId=I5P5lIXvR1Cjc8hfoJg5bg, partitions=[0])]), > handler=org.apache.kafka.clients.consumer.internals.NetworkClientDelegate$FutureCompletionHandler@918925b, > node=Optional[worker4:9092 (id: 2147483644 rack: null)] , > timer=org.apache.kafka.common.utils.Timer@55ed4733} > (org.apache.kafka.clients.consumer.internals.NetworkClientDelegate) {code} > This seems to be triggered by a tight poll loop of the network thread. The > right thing to do is to backoff a bit for that given node and retry later. > This should be a blocker for 3.8 release. -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Commented] (KAFKA-14517) Implement regex subscriptions
[ https://issues.apache.org/jira/browse/KAFKA-14517?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17848792#comment-17848792 ] Phuc Hong Tran commented on KAFKA-14517: Thanks again [~lianetm] > Implement regex subscriptions > - > > Key: KAFKA-14517 > URL: https://issues.apache.org/jira/browse/KAFKA-14517 > Project: Kafka > Issue Type: Sub-task >Reporter: David Jacot >Assignee: Lianet Magrans >Priority: Major > Labels: kip-848-preview > -- This message was sent by Atlassian Jira (v8.20.10#820010)
Re: [PR] KAFKA-16771 First log directory printed twice when formatting storage [kafka]
gongxuanzhang commented on code in PR #16010: URL: https://github.com/apache/kafka/pull/16010#discussion_r1610875448 ## core/src/test/scala/unit/kafka/tools/StorageToolTest.scala: ## @@ -488,4 +488,26 @@ Found problem: assertEquals(1, exitStatus) } } + + @Test + def testFormatMultiEmptyDirectory(): Unit = { Review Comment: > There is already a similar test case `testFormatSucceedsIfAllDirectoriesAreAvailable`, so could you please add new check to `testFormatSucceedsIfAllDirectoriesAreAvailable`? for example, we can split the `stream#toString` by new line literal and then check the duplicate contents. I think the print order should be consistent with the configuration order,it's just as important as format once. Additional,printing is done through callbacks,just change `HashMap` to `linkedHashMap` seems impossible. We will inverted order, so I think we should change related `HashMap` and `HashSet` to `LinkedHashMap` and `LinkedHashSet` and create new Copier in each loop. About test cases,I will do it -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-15541: Add num-open-iterators metric [kafka]
mjsax commented on PR #15975: URL: https://github.com/apache/kafka/pull/15975#issuecomment-2126049466 Makes sense. Could you file a Jira for the KIP and the follow up cleanup for KS code to use it :) -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-16709: abortAndPauseCleaning only when future log is not existed [kafka]
showuon commented on code in PR #15951: URL: https://github.com/apache/kafka/pull/15951#discussion_r1610854305 ## core/src/main/scala/kafka/server/ReplicaManager.scala: ## @@ -2114,16 +2114,12 @@ class ReplicaManager(val config: KafkaConfig, partition.log.foreach { _ => val leader = BrokerEndPoint(config.brokerId, "localhost", -1) - // Add future replica log to partition's map - partition.createLogIfNotExists( -isNew = false, -isFutureReplica = true, -offsetCheckpoints, -topicIds(partition.topic)) - - // pause cleaning for partitions that are being moved and start ReplicaAlterDirThread to move - // replica from source dir to destination dir - logManager.abortAndPauseCleaning(topicPartition) + // Add future replica log to partition's map if it's not existed Review Comment: @chia7712 , thanks for the comment. For this: > In short, alterReplicaLogDirs adds alter thread [0] only if it succeeds to create future log of partition. Maybe maybeAddLogDirFetchers should follow same rule? Or we can add comments to say "that is fine as replicaAlterLogDirsManager.addFetcherForPartitions will be a no-op in this case? I chose latter option because if we only create fetcher when future log is inexisted, it might cause potential side effect that this fetcher is removed when leadership change, but not get added later. I've added the comment in this commit: https://github.com/apache/kafka/pull/15951/commits/0d78e493e484dd4f27ba6a127616d802999f22d0 . Thanks. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-5072[WIP]: Kafka topics should allow custom metadata configs within some config namespace [kafka]
andreyolv commented on PR #2873: URL: https://github.com/apache/kafka/pull/2873#issuecomment-2125996595 Extremely important feature for governance. Any release forecast? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-15045: (KIP-924 pt. 10) Topic partition rack annotation simplified [kafka]
apourchet commented on code in PR #16034: URL: https://github.com/apache/kafka/pull/16034#discussion_r1610816330 ## streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamsPartitionAssignor.java: ## @@ -513,50 +515,45 @@ private ApplicationState buildApplicationState(final TopologyMetadata topologyMe + "tasks for source topics vs changelog topics."); } -final Set sourceTopicPartitions = new HashSet<>(); -final Set nonSourceChangelogTopicPartitions = new HashSet<>(); -for (final Map.Entry> entry : sourcePartitionsForTask.entrySet()) { -final TaskId taskId = entry.getKey(); -final Set taskSourcePartitions = entry.getValue(); -final Set taskChangelogPartitions = changelogPartitionsForTask.get(taskId); -final Set taskNonSourceChangelogPartitions = new HashSet<>(taskChangelogPartitions); -taskNonSourceChangelogPartitions.removeAll(taskSourcePartitions); - -sourceTopicPartitions.addAll(taskSourcePartitions); - nonSourceChangelogTopicPartitions.addAll(taskNonSourceChangelogPartitions); -} +final Set logicalTaskIds = unmodifiableSet(sourcePartitionsForTask.keySet()); +final Set allTopicPartitions = new HashSet<>(); +final Map> topicPartitionsForTask = new HashMap<>(); +logicalTaskIds.forEach(taskId -> { +final Set topicPartitions = new HashSet<>(); + +for (final TopicPartition topicPartition : sourcePartitionsForTask.get(taskId)) { +final boolean isSource = true; +final boolean isChangelog = changelogPartitionsForTask.get(taskId).contains(topicPartition); +final DefaultTaskTopicPartition racklessTopicPartition = new DefaultTaskTopicPartition( +topicPartition, isSource, isChangelog, null); +allTopicPartitions.add(racklessTopicPartition); +topicPartitionsForTask.get(taskId).add(racklessTopicPartition); Review Comment: Good catch! -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-16598 Mirgrate `ResetConsumerGroupOffsetTest` to new test infra [kafka]
m1a2st commented on code in PR #15779: URL: https://github.com/apache/kafka/pull/15779#discussion_r1610809626 ## tools/src/test/java/org/apache/kafka/tools/consumer/group/ResetConsumerGroupOffsetTest.java: ## @@ -62,506 +86,756 @@ * - scope=topics+partitions, scenario=to-earliest * - export/import */ -public class ResetConsumerGroupOffsetTest extends ConsumerGroupCommandTest { -private String[] basicArgs() { +@ExtendWith(value = ClusterTestExtensions.class) +public class ResetConsumerGroupOffsetTest { + +private static final String TOPIC_PREFIX = "foo-"; +private static final String GROUP_PREFIX = "test.group-"; + +private static void generator(ClusterGenerator clusterGenerator) { Review Comment: Ok, I rebase it. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Commented] (KAFKA-16814) KRaft broker cannot startup when `partition.metadata` is missing
[ https://issues.apache.org/jira/browse/KAFKA-16814?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17848781#comment-17848781 ] Luke Chen commented on KAFKA-16814: --- [~muralibasani] , no, it's not related to remote storage. I didn't enable it. > KRaft broker cannot startup when `partition.metadata` is missing > > > Key: KAFKA-16814 > URL: https://issues.apache.org/jira/browse/KAFKA-16814 > Project: Kafka > Issue Type: Bug >Affects Versions: 3.7.0 >Reporter: Luke Chen >Priority: Major > > When starting up kafka logManager, we'll check stray replicas to avoid some > corner cases. But this check might cause broker unable to startup if > `partition.metadata` is missing because when startup kafka, we load log from > file, and the topicId of the log is coming from `partition.metadata` file. > So, if `partition.metadata` is missing, the topicId will be None, and the > `LogManager#isStrayKraftReplica` will fail with no topicID error. > The `partition.metadata` missing could be some storage failure, or another > possible path is unclean shutdown after topic is created in the replica, but > before data is flushed into `partition.metadata` file. This is possible > because we do the flush in async way > [here|https://github.com/apache/kafka/blob/5552f5c26df4eb07b2d6ee218e4a29e4ca790d5c/core/src/main/scala/kafka/log/UnifiedLog.scala#L229]. > > > {code:java} > ERROR Encountered fatal fault: Error starting LogManager > (org.apache.kafka.server.fault.ProcessTerminatingFaultHandler) > java.lang.RuntimeException: The log dir > Log(dir=/tmp/kraft-broker-logs/quickstart-events-0, topic=quickstart-events, > partition=0, highWatermark=0, lastStableOffset=0, logStartOffset=0, > logEndOffset=0) does not have a topic ID, which is not allowed when running > in KRaft mode. > at > kafka.log.LogManager$.$anonfun$isStrayKraftReplica$1(LogManager.scala:1609) > at scala.Option.getOrElse(Option.scala:201) > at kafka.log.LogManager$.isStrayKraftReplica(LogManager.scala:1608) > at > kafka.server.metadata.BrokerMetadataPublisher.$anonfun$initializeManagers$1(BrokerMetadataPublisher.scala:294) > at > kafka.server.metadata.BrokerMetadataPublisher.$anonfun$initializeManagers$1$adapted(BrokerMetadataPublisher.scala:294) > at kafka.log.LogManager.loadLog(LogManager.scala:359) > at kafka.log.LogManager.$anonfun$loadLogs$15(LogManager.scala:493) > at > java.base/java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:577) > at java.base/java.util.concurrent.FutureTask.run(FutureTask.java:317) > at > java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1144) > at > java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:642) > at java.base/java.lang.Thread.run(Thread.java:1623) {code} > > Because if we don't do the isStrayKraftReplica check, the topicID and the > `partition.metadata` will get recovered after getting topic partition update > and becoming leader or follower later. I'm proposing we skip the > `isStrayKraftReplica` check if topicID is None, instead of throwing exception > to terminate the kafka. `isStrayKraftReplica` check is just for a corner case > only, it should be fine IMO. > -- This message was sent by Atlassian Jira (v8.20.10#820010)
Re: [PR] KAFKA-9228: Restart tasks on runtime-only connector config changes [kafka]
gharris1727 commented on code in PR #16001: URL: https://github.com/apache/kafka/pull/16001#discussion_r1610727814 ## connect/runtime/src/main/java/org/apache/kafka/connect/runtime/AbstractHerder.java: ## @@ -1039,7 +1039,12 @@ public static List> reverseTransform(String connName, return result; } -public boolean taskConfigsChanged(ClusterConfigState configState, String connName, List> taskProps) { +public boolean taskConfigsChanged( +ClusterConfigState configState, +String connName, +List> taskProps, +ConfigHash connectorConfigHash Review Comment: optional: You could use the old signature, and compute ConfigHash from the configState variable. This would switch the AbstractHerderTest cases to test the real hash algorithm, rather than using mocked hashes, and less mocking seems better to me. ## connect/runtime/src/main/java/org/apache/kafka/connect/runtime/ConfigHash.java: ## @@ -0,0 +1,245 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.kafka.connect.runtime; + +import com.fasterxml.jackson.databind.ObjectMapper; +import org.apache.kafka.common.utils.Utils; +import org.apache.kafka.connect.data.Struct; +import org.apache.kafka.connect.errors.ConnectException; +import org.apache.kafka.connect.util.ConnectUtils; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import javax.ws.rs.core.HttpHeaders; +import java.io.IOException; +import java.util.Map; +import java.util.TreeMap; + +/** + * A deterministic hash of a connector configuration. This can be used to detect changes + * in connector configurations across worker lifetimes, which is sometimes necessary when + * connectors are reconfigured in a way that affects their tasks' runtime behavior but does + * not affect their tasks' configurations (for example, changing the key converter class). + * + * @see https://issues.apache.org/jira/browse/KAFKA-9228;>KAFKA-9228 + */ +public class ConfigHash { + +private static final Logger log = LoggerFactory.getLogger(ConfigHash.class); + +public static final ConfigHash NO_HASH = new ConfigHash(null); +public static final String CONNECTOR_CONFIG_HASH_HEADER = "X-Connect-Connector-Config-Hash"; + +private static final ObjectMapper OBJECT_MAPPER = new ObjectMapper(); + +private final Integer hash; + +// Visible for testing +ConfigHash(Integer hash) { +this.hash = hash; +} + +/** + * Read and parse a hash from the headers of a REST request. + * + * @param connector the name of the connector; only used for error logging + * purposes and may be null + * @param headers the headers from which to read and parse the hash; + *may be null + * + * @return the parsed hash; never null, but may be {@link #NO_HASH} if + * no hash header is present + * + * @throws ConnectException if the expected header is present for the hash, + * but it cannot be parsed as a 32-bit signed integer + */ +public static ConfigHash fromHeaders(String connector, HttpHeaders headers) { +if (headers == null) +return NO_HASH; + +String header = headers.getHeaderString(CONNECTOR_CONFIG_HASH_HEADER); +if (header == null) +return NO_HASH; + +int hash; +try { +hash = Integer.parseInt(header); +} catch (NumberFormatException e) { +if (connector == null) +connector = ""; + +if (log.isTraceEnabled()) { +log.error("Invalid connector config hash header for connector {}", connector); Review Comment: nit: error log in trace if-condition ## connect/runtime/src/main/java/org/apache/kafka/connect/runtime/ConfigHash.java: ## @@ -0,0 +1,245 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not
Re: [PR] KAFKA-16764: New consumer should throw InvalidTopicException on poll when invalid topic in metadata [kafka]
appchemist commented on PR #15961: URL: https://github.com/apache/kafka/pull/15961#issuecomment-2125950049 @lianetm I got it, Thank you! @kirktrue Right! I think what you said is simple and more intuitive. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Updated] (KAFKA-15630) Improve documentation of offset.lag.max
[ https://issues.apache.org/jira/browse/KAFKA-15630?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Greg Harris updated KAFKA-15630: Labels: newbie (was: ) > Improve documentation of offset.lag.max > --- > > Key: KAFKA-15630 > URL: https://issues.apache.org/jira/browse/KAFKA-15630 > Project: Kafka > Issue Type: Improvement > Components: docs, mirrormaker >Reporter: Mickael Maison >Priority: Major > Labels: newbie > > It would be good to expand on the role of this configuration on offset > translation and mention that it can be set to a smaller value, or even 0, to > help in scenarios when records may not flow constantly. > The documentation string is here: > [https://github.com/apache/kafka/blob/06739d5aa026e7db62ff0bd7da57e079cca35f07/connect/mirror/src/main/java/org/apache/kafka/connect/mirror/MirrorSourceConfig.java#L104] > -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Updated] (KAFKA-15630) Improve documentation of offset.lag.max
[ https://issues.apache.org/jira/browse/KAFKA-15630?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Greg Harris updated KAFKA-15630: Description: It would be good to expand on the role of this configuration on offset translation and mention that it can be set to a smaller value, or even 0, to help in scenarios when records may not flow constantly. The documentation string is here: [https://github.com/apache/kafka/blob/06739d5aa026e7db62ff0bd7da57e079cca35f07/connect/mirror/src/main/java/org/apache/kafka/connect/mirror/MirrorSourceConfig.java#L104] was:It would be good to expand on the role of this configuration on offset translation and mention that it can be set to a smaller value, or even 0, to help in scenarios when records may not flow constantly. > Improve documentation of offset.lag.max > --- > > Key: KAFKA-15630 > URL: https://issues.apache.org/jira/browse/KAFKA-15630 > Project: Kafka > Issue Type: Improvement > Components: docs, mirrormaker >Reporter: Mickael Maison >Priority: Major > > It would be good to expand on the role of this configuration on offset > translation and mention that it can be set to a smaller value, or even 0, to > help in scenarios when records may not flow constantly. > The documentation string is here: > [https://github.com/apache/kafka/blob/06739d5aa026e7db62ff0bd7da57e079cca35f07/connect/mirror/src/main/java/org/apache/kafka/connect/mirror/MirrorSourceConfig.java#L104] > -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Commented] (KAFKA-16798) Mirrormaker2 dedicated mode - sync.group.offsets.interval not working
[ https://issues.apache.org/jira/browse/KAFKA-16798?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17848770#comment-17848770 ] Greg Harris commented on KAFKA-16798: - Hi [~sektor.coder] There's already a ticket to improve the in-line documentation: KAFKA-15630 but it hasn't had any activity on it. If you're interested please pick that ticket up, as documentation writing is best done with fresh eyes. > Mirrormaker2 dedicated mode - sync.group.offsets.interval not working > - > > Key: KAFKA-16798 > URL: https://issues.apache.org/jira/browse/KAFKA-16798 > Project: Kafka > Issue Type: Bug > Components: mirrormaker >Affects Versions: 3.7.0 >Reporter: Thanos Athanasopoulos >Priority: Major > > Single instance MirrorMaker2 in dedicated mode, active passive replication > logic. > sync.group.offsets.interval.seconds=2 configuration is enabled and active > {noformat} > [root@x-x ~]# docker logs cc-mm 2>&1 -f | grep -i > "auto.commit.interval\|checkpoint.interval\|consumer.commit.interval\|sync.topics.interval\|sync.group.offsets.interval\|offset-syncs.interval.seconds > " > auto.commit.interval.ms = 5000 > auto.commit.interval.ms = 5000 > auto.commit.interval.ms = 5000 > auto.commit.interval.ms = 5000 > auto.commit.interval.ms = 5000 > auto.commit.interval.ms = 5000 > sync.group.offsets.interval.seconds = 2 > sync.group.offsets.interval.seconds = 2 > auto.commit.interval.ms = 5000 > sync.group.offsets.interval.seconds = 2 > auto.commit.interval.ms = 5000 > auto.commit.interval.ms = 5000 > auto.commit.interval.ms = 5000 > auto.commit.interval.ms = 5000 > auto.commit.interval.ms = 5000 > auto.commit.interval.ms = 5000 > auto.commit.interval.ms = 5000 > auto.commit.interval.ms = 5000 > sync.group.offsets.interval.seconds = 2 > auto.commit.interval.ms = 5000 > auto.commit.interval.ms = 5000 > sync.group.offsets.interval.seconds = 2 > sync.group.offsets.interval.seconds = 2 > auto.commit.interval.ms = 5000 > {noformat} > but is not working, the commit of offsets happens *always every 60 seconds* > as you can see in the logs > {noformat} > [2024-05-20 09:32:44,847] INFO [MirrorSourceConnector|task-0|offsets] > WorkerSourceTask{id=MirrorSourceConnector-0} Committing offsets for 20 > acknowledged messages (org.apache.kafka.connect.runtime.WorkerSourceTask:233) > [2024-05-20 09:32:44,852] INFO [MirrorSourceConnector|task-1|offsets] > WorkerSourceTask{id=MirrorSourceConnector-1} Committing offsets for 12 > acknowledged messages (org.apache.kafka.connect.runtime.WorkerSourceTask:233) > [2024-05-20 09:32:44,875] INFO [MirrorHeartbeatConnector|task-0|offsets] > WorkerSourceTask{id=MirrorHeartbeatConnector-0} Committing offsets for 12 > acknowledged messages (org.apache.kafka.connect.runtime.WorkerSourceTask:233) > [2024-05-20 09:32:44,881] INFO [MirrorCheckpointConnector|task-0|offsets] > WorkerSourceTask{id=MirrorCheckpointConnector-0} Committing offsets for 1 > acknowledged messages (org.apache.kafka.connect.runtime.WorkerSourceTask:233) > [2024-05-20 09:32:44,890] INFO [MirrorHeartbeatConnector|task-0|offsets] > WorkerSourceTask{id=MirrorHeartbeatConnector-0} Committing offsets for 12 > acknowledged messages (org.apache.kafka.connect.runtime.WorkerSourceTask:233) > [2024-05-20 09:33:44,850] INFO [MirrorSourceConnector|task-0|offsets] > WorkerSourceTask{id=MirrorSourceConnector-0} Committing offsets for 21 > acknowledged messages (org.apache.kafka.connect.runtime.WorkerSourceTask:233) > [2024-05-20 09:33:44,854] INFO [MirrorSourceConnector|task-1|offsets] > WorkerSourceTask{id=MirrorSourceConnector-1} Committing offsets for 12 > acknowledged messages (org.apache.kafka.connect.runtime.WorkerSourceTask:233) > [2024-05-20 09:33:44,878] INFO [MirrorHeartbeatConnector|task-0|offsets] > WorkerSourceTask{id=MirrorHeartbeatConnector-0} Committing offsets for 12 > acknowledged messages (org.apache.kafka.connect.runtime.WorkerSourceTask:233) > [2024-05-20 09:33:44,883] INFO [MirrorCheckpointConnector|task-0|offsets] > WorkerSourceTask{id=MirrorCheckpointConnector-0} Committing offsets for 2 > acknowledged messages (org.apache.kafka.connect.runtime.WorkerSourceTask:233) > [2024-05-20 09:33:44,895] INFO [MirrorHeartbeatConnector|task-0|offsets] > WorkerSourceTask{id=MirrorHeartbeatConnector-0} Committing offsets for 12 > acknowledged messages (org.apache.kafka.connect.runtime.WorkerSourceTask:233) > [2024-05-20 09:34:44,852] INFO [MirrorSourceConnector|task-0|offsets] > WorkerSourceTask{id=MirrorSourceConnector-0} Committing offsets for 20 >
Re: [PR] KAFKA-15045: (KIP-924 pt. 10) Topic partition rack annotation simplified [kafka]
ableegoldman commented on code in PR #16034: URL: https://github.com/apache/kafka/pull/16034#discussion_r1610769106 ## streams/src/main/java/org/apache/kafka/streams/processor/internals/assignment/RackUtils.java: ## @@ -38,26 +40,37 @@ public final class RackUtils { private RackUtils() { } -public static Map> getRacksForTopicPartition(final Cluster cluster, - final InternalTopicManager internalTopicManager, - final Set topicPartitions, - final boolean isChangelog) { -final Set topicsToDescribe = new HashSet<>(); -if (isChangelog) { - topicsToDescribe.addAll(topicPartitions.stream().map(TopicPartition::topic).collect( -Collectors.toSet())); -} else { -topicsToDescribe.addAll(topicsWithMissingMetadata(cluster, topicPartitions)); -} +public static void annotateWithTopicPartitionsWithRackInfo(final Cluster cluster, +final InternalTopicManager internalTopicManager, Review Comment: nit: fix the formatting/indentation (although this will change when you change the method name anyway) ## streams/src/main/java/org/apache/kafka/streams/processor/internals/assignment/RackUtils.java: ## @@ -38,26 +40,37 @@ public final class RackUtils { private RackUtils() { } -public static Map> getRacksForTopicPartition(final Cluster cluster, - final InternalTopicManager internalTopicManager, - final Set topicPartitions, - final boolean isChangelog) { -final Set topicsToDescribe = new HashSet<>(); -if (isChangelog) { - topicsToDescribe.addAll(topicPartitions.stream().map(TopicPartition::topic).collect( -Collectors.toSet())); -} else { -topicsToDescribe.addAll(topicsWithMissingMetadata(cluster, topicPartitions)); -} +public static void annotateWithTopicPartitionsWithRackInfo(final Cluster cluster, Review Comment: this doesn't sound quite right, should it be `annotateTopicPartitionsWithRackInfo`? (ie extra "with" ) ## streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamsPartitionAssignor.java: ## @@ -513,50 +515,45 @@ private ApplicationState buildApplicationState(final TopologyMetadata topologyMe + "tasks for source topics vs changelog topics."); } -final Set sourceTopicPartitions = new HashSet<>(); -final Set nonSourceChangelogTopicPartitions = new HashSet<>(); -for (final Map.Entry> entry : sourcePartitionsForTask.entrySet()) { -final TaskId taskId = entry.getKey(); -final Set taskSourcePartitions = entry.getValue(); -final Set taskChangelogPartitions = changelogPartitionsForTask.get(taskId); -final Set taskNonSourceChangelogPartitions = new HashSet<>(taskChangelogPartitions); -taskNonSourceChangelogPartitions.removeAll(taskSourcePartitions); - -sourceTopicPartitions.addAll(taskSourcePartitions); - nonSourceChangelogTopicPartitions.addAll(taskNonSourceChangelogPartitions); -} +final Set logicalTaskIds = unmodifiableSet(sourcePartitionsForTask.keySet()); +final Set allTopicPartitions = new HashSet<>(); +final Map> topicPartitionsForTask = new HashMap<>(); +logicalTaskIds.forEach(taskId -> { +final Set topicPartitions = new HashSet<>(); + +for (final TopicPartition topicPartition : sourcePartitionsForTask.get(taskId)) { +final boolean isSource = true; +final boolean isChangelog = changelogPartitionsForTask.get(taskId).contains(topicPartition); +final DefaultTaskTopicPartition racklessTopicPartition = new DefaultTaskTopicPartition( +topicPartition, isSource, isChangelog, null); +allTopicPartitions.add(racklessTopicPartition); +topicPartitionsForTask.get(taskId).add(racklessTopicPartition); Review Comment: both here and for the changelog loop below ```suggestion topicPartitions.add(racklessTopicPartition); ``` -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please
Re: [PR] KAFKA-15045: (KIP-924 pt. 9) TaskAssignmentUtils implementation of optimizeRackAwareActiveTasks [kafka]
apourchet commented on code in PR #16033: URL: https://github.com/apache/kafka/pull/16033#discussion_r1610768911 ## streams/src/main/java/org/apache/kafka/streams/processor/internals/assignment/RackAwareTaskAssignor.java: ## @@ -486,12 +486,12 @@ public long optimizeStandbyTasks(final SortedMap clientStates .sorted() .collect(Collectors.toList()); -final Map taskClientMap = new HashMap<>(); final List clients = Stream.of(clientList.get(i), clientList.get(j)) .sorted().collect( Collectors.toList()); -final Map originalAssignedTaskNumber = new HashMap<>(); +final Map taskClientMap = new HashMap<>(); +final Map originalAssignedTaskNumber = new HashMap<>(); Review Comment: I moved this lower down to show them clearly initialized empty right before the function call. It turns out there parameters are used as outputs, which is why I added the `AssignmentGraph` inner class to the `TaskAssignmentUtils`, in order to simplify the usage of the graph functions. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-15045: (KIP-924 pt. 9) TaskAssignmentUtils implementation of optimizeRackAwareActiveTasks [kafka]
apourchet commented on code in PR #16033: URL: https://github.com/apache/kafka/pull/16033#discussion_r1610767256 ## streams/src/main/java/org/apache/kafka/streams/processor/assignment/TaskInfo.java: ## @@ -50,6 +50,12 @@ public interface TaskInfo { */ Set stateStoreNames(); +/** + * + * @return The topic partitions in use by this task. + */ +Set topicPartitions(); Review Comment: This is temporary to get things to compile until we merge part 10 https://github.com/apache/kafka/pull/16034/files -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-15045: (KIP-924 pt. 8) Added TopicPartitionAssignmentInfo [kafka]
ableegoldman merged PR #16024: URL: https://github.com/apache/kafka/pull/16024 -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[PR] KAFKA-15045: (KIP-924 pt. 10) Topic partition rack annotation simplified [kafka]
apourchet opened a new pull request, #16034: URL: https://github.com/apache/kafka/pull/16034 This PR uses the new TaskTopicPartition structure to simplify the build process for the ApplicationState, which is the input to the new TaskAssignor#assign call. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Comment Edited] (KAFKA-16798) Mirrormaker2 dedicated mode - sync.group.offsets.interval not working
[ https://issues.apache.org/jira/browse/KAFKA-16798?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17848767#comment-17848767 ] Thanos Athanasopoulos edited comment on KAFKA-16798 at 5/22/24 10:44 PM: - [~gharris1727] that was the culprit ! I set offset.lag.max=0 in mm2.properties and now it syncs asap. Will head for some reading in the resources you shared and then some performance tuning in real world scenarios. I am not sure how things work in Kafka ecosystem regarding documentation, I can help improve it or start a kind of KB regarding this case. Thanks for the support. was (Author: JIRAUSER305481): [~gharris1727] that was the culprit ! I set offset.lag.max=0 in mm2.properties and now it syncs asap. Will head for some reading in the resources you shared and then some performance tuning in real world scenarios. Thank you ! > Mirrormaker2 dedicated mode - sync.group.offsets.interval not working > - > > Key: KAFKA-16798 > URL: https://issues.apache.org/jira/browse/KAFKA-16798 > Project: Kafka > Issue Type: Bug > Components: mirrormaker >Affects Versions: 3.7.0 >Reporter: Thanos Athanasopoulos >Priority: Major > > Single instance MirrorMaker2 in dedicated mode, active passive replication > logic. > sync.group.offsets.interval.seconds=2 configuration is enabled and active > {noformat} > [root@x-x ~]# docker logs cc-mm 2>&1 -f | grep -i > "auto.commit.interval\|checkpoint.interval\|consumer.commit.interval\|sync.topics.interval\|sync.group.offsets.interval\|offset-syncs.interval.seconds > " > auto.commit.interval.ms = 5000 > auto.commit.interval.ms = 5000 > auto.commit.interval.ms = 5000 > auto.commit.interval.ms = 5000 > auto.commit.interval.ms = 5000 > auto.commit.interval.ms = 5000 > sync.group.offsets.interval.seconds = 2 > sync.group.offsets.interval.seconds = 2 > auto.commit.interval.ms = 5000 > sync.group.offsets.interval.seconds = 2 > auto.commit.interval.ms = 5000 > auto.commit.interval.ms = 5000 > auto.commit.interval.ms = 5000 > auto.commit.interval.ms = 5000 > auto.commit.interval.ms = 5000 > auto.commit.interval.ms = 5000 > auto.commit.interval.ms = 5000 > auto.commit.interval.ms = 5000 > sync.group.offsets.interval.seconds = 2 > auto.commit.interval.ms = 5000 > auto.commit.interval.ms = 5000 > sync.group.offsets.interval.seconds = 2 > sync.group.offsets.interval.seconds = 2 > auto.commit.interval.ms = 5000 > {noformat} > but is not working, the commit of offsets happens *always every 60 seconds* > as you can see in the logs > {noformat} > [2024-05-20 09:32:44,847] INFO [MirrorSourceConnector|task-0|offsets] > WorkerSourceTask{id=MirrorSourceConnector-0} Committing offsets for 20 > acknowledged messages (org.apache.kafka.connect.runtime.WorkerSourceTask:233) > [2024-05-20 09:32:44,852] INFO [MirrorSourceConnector|task-1|offsets] > WorkerSourceTask{id=MirrorSourceConnector-1} Committing offsets for 12 > acknowledged messages (org.apache.kafka.connect.runtime.WorkerSourceTask:233) > [2024-05-20 09:32:44,875] INFO [MirrorHeartbeatConnector|task-0|offsets] > WorkerSourceTask{id=MirrorHeartbeatConnector-0} Committing offsets for 12 > acknowledged messages (org.apache.kafka.connect.runtime.WorkerSourceTask:233) > [2024-05-20 09:32:44,881] INFO [MirrorCheckpointConnector|task-0|offsets] > WorkerSourceTask{id=MirrorCheckpointConnector-0} Committing offsets for 1 > acknowledged messages (org.apache.kafka.connect.runtime.WorkerSourceTask:233) > [2024-05-20 09:32:44,890] INFO [MirrorHeartbeatConnector|task-0|offsets] > WorkerSourceTask{id=MirrorHeartbeatConnector-0} Committing offsets for 12 > acknowledged messages (org.apache.kafka.connect.runtime.WorkerSourceTask:233) > [2024-05-20 09:33:44,850] INFO [MirrorSourceConnector|task-0|offsets] > WorkerSourceTask{id=MirrorSourceConnector-0} Committing offsets for 21 > acknowledged messages (org.apache.kafka.connect.runtime.WorkerSourceTask:233) > [2024-05-20 09:33:44,854] INFO [MirrorSourceConnector|task-1|offsets] > WorkerSourceTask{id=MirrorSourceConnector-1} Committing offsets for 12 > acknowledged messages (org.apache.kafka.connect.runtime.WorkerSourceTask:233) > [2024-05-20 09:33:44,878] INFO [MirrorHeartbeatConnector|task-0|offsets] > WorkerSourceTask{id=MirrorHeartbeatConnector-0} Committing offsets for 12 > acknowledged messages (org.apache.kafka.connect.runtime.WorkerSourceTask:233) > [2024-05-20 09:33:44,883] INFO [MirrorCheckpointConnector|task-0|offsets] > WorkerSourceTask{id=MirrorCheckpointConnector-0} Committing offsets
[jira] [Comment Edited] (KAFKA-16798) Mirrormaker2 dedicated mode - sync.group.offsets.interval not working
[ https://issues.apache.org/jira/browse/KAFKA-16798?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17848767#comment-17848767 ] Thanos Athanasopoulos edited comment on KAFKA-16798 at 5/22/24 10:39 PM: - [~gharris1727] that was the culprit ! I set offset.lag.max=0 in mm2.properties and now it syncs asap. Will head for some reading in the resources you shared and then some performance tuning in real world scenarios. Thank you ! was (Author: JIRAUSER305481): [~gharris1727] that was the culprit ! I set offset.lag.max=0 in mm2.properties and now it syncs asap. Will head for some reading in the resources you shared and then some performance tuning in real world scenarios. Thank you ! > Mirrormaker2 dedicated mode - sync.group.offsets.interval not working > - > > Key: KAFKA-16798 > URL: https://issues.apache.org/jira/browse/KAFKA-16798 > Project: Kafka > Issue Type: Bug > Components: mirrormaker >Affects Versions: 3.7.0 >Reporter: Thanos Athanasopoulos >Priority: Major > > Single instance MirrorMaker2 in dedicated mode, active passive replication > logic. > sync.group.offsets.interval.seconds=2 configuration is enabled and active > {noformat} > [root@x-x ~]# docker logs cc-mm 2>&1 -f | grep -i > "auto.commit.interval\|checkpoint.interval\|consumer.commit.interval\|sync.topics.interval\|sync.group.offsets.interval\|offset-syncs.interval.seconds > " > auto.commit.interval.ms = 5000 > auto.commit.interval.ms = 5000 > auto.commit.interval.ms = 5000 > auto.commit.interval.ms = 5000 > auto.commit.interval.ms = 5000 > auto.commit.interval.ms = 5000 > sync.group.offsets.interval.seconds = 2 > sync.group.offsets.interval.seconds = 2 > auto.commit.interval.ms = 5000 > sync.group.offsets.interval.seconds = 2 > auto.commit.interval.ms = 5000 > auto.commit.interval.ms = 5000 > auto.commit.interval.ms = 5000 > auto.commit.interval.ms = 5000 > auto.commit.interval.ms = 5000 > auto.commit.interval.ms = 5000 > auto.commit.interval.ms = 5000 > auto.commit.interval.ms = 5000 > sync.group.offsets.interval.seconds = 2 > auto.commit.interval.ms = 5000 > auto.commit.interval.ms = 5000 > sync.group.offsets.interval.seconds = 2 > sync.group.offsets.interval.seconds = 2 > auto.commit.interval.ms = 5000 > {noformat} > but is not working, the commit of offsets happens *always every 60 seconds* > as you can see in the logs > {noformat} > [2024-05-20 09:32:44,847] INFO [MirrorSourceConnector|task-0|offsets] > WorkerSourceTask{id=MirrorSourceConnector-0} Committing offsets for 20 > acknowledged messages (org.apache.kafka.connect.runtime.WorkerSourceTask:233) > [2024-05-20 09:32:44,852] INFO [MirrorSourceConnector|task-1|offsets] > WorkerSourceTask{id=MirrorSourceConnector-1} Committing offsets for 12 > acknowledged messages (org.apache.kafka.connect.runtime.WorkerSourceTask:233) > [2024-05-20 09:32:44,875] INFO [MirrorHeartbeatConnector|task-0|offsets] > WorkerSourceTask{id=MirrorHeartbeatConnector-0} Committing offsets for 12 > acknowledged messages (org.apache.kafka.connect.runtime.WorkerSourceTask:233) > [2024-05-20 09:32:44,881] INFO [MirrorCheckpointConnector|task-0|offsets] > WorkerSourceTask{id=MirrorCheckpointConnector-0} Committing offsets for 1 > acknowledged messages (org.apache.kafka.connect.runtime.WorkerSourceTask:233) > [2024-05-20 09:32:44,890] INFO [MirrorHeartbeatConnector|task-0|offsets] > WorkerSourceTask{id=MirrorHeartbeatConnector-0} Committing offsets for 12 > acknowledged messages (org.apache.kafka.connect.runtime.WorkerSourceTask:233) > [2024-05-20 09:33:44,850] INFO [MirrorSourceConnector|task-0|offsets] > WorkerSourceTask{id=MirrorSourceConnector-0} Committing offsets for 21 > acknowledged messages (org.apache.kafka.connect.runtime.WorkerSourceTask:233) > [2024-05-20 09:33:44,854] INFO [MirrorSourceConnector|task-1|offsets] > WorkerSourceTask{id=MirrorSourceConnector-1} Committing offsets for 12 > acknowledged messages (org.apache.kafka.connect.runtime.WorkerSourceTask:233) > [2024-05-20 09:33:44,878] INFO [MirrorHeartbeatConnector|task-0|offsets] > WorkerSourceTask{id=MirrorHeartbeatConnector-0} Committing offsets for 12 > acknowledged messages (org.apache.kafka.connect.runtime.WorkerSourceTask:233) > [2024-05-20 09:33:44,883] INFO [MirrorCheckpointConnector|task-0|offsets] > WorkerSourceTask{id=MirrorCheckpointConnector-0} Committing offsets for 2 > acknowledged messages (org.apache.kafka.connect.runtime.WorkerSourceTask:233) > [2024-05-20 09:33:44,895] INFO
[jira] [Comment Edited] (KAFKA-16798) Mirrormaker2 dedicated mode - sync.group.offsets.interval not working
[ https://issues.apache.org/jira/browse/KAFKA-16798?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17848767#comment-17848767 ] Thanos Athanasopoulos edited comment on KAFKA-16798 at 5/22/24 10:38 PM: - [~gharris1727] that was the culprit ! I set offset.lag.max=0 in mm2.properties and now it syncs asap. Will head for some reading in the resources you shared and then some performance tuning in real world scenarios. Thank you ! was (Author: JIRAUSER305481): [~gharris1727] that was the culprit ! I set offset.lag.max=0 in mm2.properties and now it syncs asap. Will go for some reading in the resources you shared and then some performance tuning in real world scenarios. Thank you ! > Mirrormaker2 dedicated mode - sync.group.offsets.interval not working > - > > Key: KAFKA-16798 > URL: https://issues.apache.org/jira/browse/KAFKA-16798 > Project: Kafka > Issue Type: Bug > Components: mirrormaker >Affects Versions: 3.7.0 >Reporter: Thanos Athanasopoulos >Priority: Major > > Single instance MirrorMaker2 in dedicated mode, active passive replication > logic. > sync.group.offsets.interval.seconds=2 configuration is enabled and active > {noformat} > [root@x-x ~]# docker logs cc-mm 2>&1 -f | grep -i > "auto.commit.interval\|checkpoint.interval\|consumer.commit.interval\|sync.topics.interval\|sync.group.offsets.interval\|offset-syncs.interval.seconds > " > auto.commit.interval.ms = 5000 > auto.commit.interval.ms = 5000 > auto.commit.interval.ms = 5000 > auto.commit.interval.ms = 5000 > auto.commit.interval.ms = 5000 > auto.commit.interval.ms = 5000 > sync.group.offsets.interval.seconds = 2 > sync.group.offsets.interval.seconds = 2 > auto.commit.interval.ms = 5000 > sync.group.offsets.interval.seconds = 2 > auto.commit.interval.ms = 5000 > auto.commit.interval.ms = 5000 > auto.commit.interval.ms = 5000 > auto.commit.interval.ms = 5000 > auto.commit.interval.ms = 5000 > auto.commit.interval.ms = 5000 > auto.commit.interval.ms = 5000 > auto.commit.interval.ms = 5000 > sync.group.offsets.interval.seconds = 2 > auto.commit.interval.ms = 5000 > auto.commit.interval.ms = 5000 > sync.group.offsets.interval.seconds = 2 > sync.group.offsets.interval.seconds = 2 > auto.commit.interval.ms = 5000 > {noformat} > but is not working, the commit of offsets happens *always every 60 seconds* > as you can see in the logs > {noformat} > [2024-05-20 09:32:44,847] INFO [MirrorSourceConnector|task-0|offsets] > WorkerSourceTask{id=MirrorSourceConnector-0} Committing offsets for 20 > acknowledged messages (org.apache.kafka.connect.runtime.WorkerSourceTask:233) > [2024-05-20 09:32:44,852] INFO [MirrorSourceConnector|task-1|offsets] > WorkerSourceTask{id=MirrorSourceConnector-1} Committing offsets for 12 > acknowledged messages (org.apache.kafka.connect.runtime.WorkerSourceTask:233) > [2024-05-20 09:32:44,875] INFO [MirrorHeartbeatConnector|task-0|offsets] > WorkerSourceTask{id=MirrorHeartbeatConnector-0} Committing offsets for 12 > acknowledged messages (org.apache.kafka.connect.runtime.WorkerSourceTask:233) > [2024-05-20 09:32:44,881] INFO [MirrorCheckpointConnector|task-0|offsets] > WorkerSourceTask{id=MirrorCheckpointConnector-0} Committing offsets for 1 > acknowledged messages (org.apache.kafka.connect.runtime.WorkerSourceTask:233) > [2024-05-20 09:32:44,890] INFO [MirrorHeartbeatConnector|task-0|offsets] > WorkerSourceTask{id=MirrorHeartbeatConnector-0} Committing offsets for 12 > acknowledged messages (org.apache.kafka.connect.runtime.WorkerSourceTask:233) > [2024-05-20 09:33:44,850] INFO [MirrorSourceConnector|task-0|offsets] > WorkerSourceTask{id=MirrorSourceConnector-0} Committing offsets for 21 > acknowledged messages (org.apache.kafka.connect.runtime.WorkerSourceTask:233) > [2024-05-20 09:33:44,854] INFO [MirrorSourceConnector|task-1|offsets] > WorkerSourceTask{id=MirrorSourceConnector-1} Committing offsets for 12 > acknowledged messages (org.apache.kafka.connect.runtime.WorkerSourceTask:233) > [2024-05-20 09:33:44,878] INFO [MirrorHeartbeatConnector|task-0|offsets] > WorkerSourceTask{id=MirrorHeartbeatConnector-0} Committing offsets for 12 > acknowledged messages (org.apache.kafka.connect.runtime.WorkerSourceTask:233) > [2024-05-20 09:33:44,883] INFO [MirrorCheckpointConnector|task-0|offsets] > WorkerSourceTask{id=MirrorCheckpointConnector-0} Committing offsets for 2 > acknowledged messages (org.apache.kafka.connect.runtime.WorkerSourceTask:233) > [2024-05-20 09:33:44,895] INFO
[jira] [Commented] (KAFKA-16798) Mirrormaker2 dedicated mode - sync.group.offsets.interval not working
[ https://issues.apache.org/jira/browse/KAFKA-16798?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17848767#comment-17848767 ] Thanos Athanasopoulos commented on KAFKA-16798: --- [~gharris1727] that was the culprit ! I set offset.lag.max=0 in mm2.properties and now it syncs asap. Will go for some reading in the resources you shared and then some performance tuning in real world scenarios. Thank you ! > Mirrormaker2 dedicated mode - sync.group.offsets.interval not working > - > > Key: KAFKA-16798 > URL: https://issues.apache.org/jira/browse/KAFKA-16798 > Project: Kafka > Issue Type: Bug > Components: mirrormaker >Affects Versions: 3.7.0 >Reporter: Thanos Athanasopoulos >Priority: Major > > Single instance MirrorMaker2 in dedicated mode, active passive replication > logic. > sync.group.offsets.interval.seconds=2 configuration is enabled and active > {noformat} > [root@x-x ~]# docker logs cc-mm 2>&1 -f | grep -i > "auto.commit.interval\|checkpoint.interval\|consumer.commit.interval\|sync.topics.interval\|sync.group.offsets.interval\|offset-syncs.interval.seconds > " > auto.commit.interval.ms = 5000 > auto.commit.interval.ms = 5000 > auto.commit.interval.ms = 5000 > auto.commit.interval.ms = 5000 > auto.commit.interval.ms = 5000 > auto.commit.interval.ms = 5000 > sync.group.offsets.interval.seconds = 2 > sync.group.offsets.interval.seconds = 2 > auto.commit.interval.ms = 5000 > sync.group.offsets.interval.seconds = 2 > auto.commit.interval.ms = 5000 > auto.commit.interval.ms = 5000 > auto.commit.interval.ms = 5000 > auto.commit.interval.ms = 5000 > auto.commit.interval.ms = 5000 > auto.commit.interval.ms = 5000 > auto.commit.interval.ms = 5000 > auto.commit.interval.ms = 5000 > sync.group.offsets.interval.seconds = 2 > auto.commit.interval.ms = 5000 > auto.commit.interval.ms = 5000 > sync.group.offsets.interval.seconds = 2 > sync.group.offsets.interval.seconds = 2 > auto.commit.interval.ms = 5000 > {noformat} > but is not working, the commit of offsets happens *always every 60 seconds* > as you can see in the logs > {noformat} > [2024-05-20 09:32:44,847] INFO [MirrorSourceConnector|task-0|offsets] > WorkerSourceTask{id=MirrorSourceConnector-0} Committing offsets for 20 > acknowledged messages (org.apache.kafka.connect.runtime.WorkerSourceTask:233) > [2024-05-20 09:32:44,852] INFO [MirrorSourceConnector|task-1|offsets] > WorkerSourceTask{id=MirrorSourceConnector-1} Committing offsets for 12 > acknowledged messages (org.apache.kafka.connect.runtime.WorkerSourceTask:233) > [2024-05-20 09:32:44,875] INFO [MirrorHeartbeatConnector|task-0|offsets] > WorkerSourceTask{id=MirrorHeartbeatConnector-0} Committing offsets for 12 > acknowledged messages (org.apache.kafka.connect.runtime.WorkerSourceTask:233) > [2024-05-20 09:32:44,881] INFO [MirrorCheckpointConnector|task-0|offsets] > WorkerSourceTask{id=MirrorCheckpointConnector-0} Committing offsets for 1 > acknowledged messages (org.apache.kafka.connect.runtime.WorkerSourceTask:233) > [2024-05-20 09:32:44,890] INFO [MirrorHeartbeatConnector|task-0|offsets] > WorkerSourceTask{id=MirrorHeartbeatConnector-0} Committing offsets for 12 > acknowledged messages (org.apache.kafka.connect.runtime.WorkerSourceTask:233) > [2024-05-20 09:33:44,850] INFO [MirrorSourceConnector|task-0|offsets] > WorkerSourceTask{id=MirrorSourceConnector-0} Committing offsets for 21 > acknowledged messages (org.apache.kafka.connect.runtime.WorkerSourceTask:233) > [2024-05-20 09:33:44,854] INFO [MirrorSourceConnector|task-1|offsets] > WorkerSourceTask{id=MirrorSourceConnector-1} Committing offsets for 12 > acknowledged messages (org.apache.kafka.connect.runtime.WorkerSourceTask:233) > [2024-05-20 09:33:44,878] INFO [MirrorHeartbeatConnector|task-0|offsets] > WorkerSourceTask{id=MirrorHeartbeatConnector-0} Committing offsets for 12 > acknowledged messages (org.apache.kafka.connect.runtime.WorkerSourceTask:233) > [2024-05-20 09:33:44,883] INFO [MirrorCheckpointConnector|task-0|offsets] > WorkerSourceTask{id=MirrorCheckpointConnector-0} Committing offsets for 2 > acknowledged messages (org.apache.kafka.connect.runtime.WorkerSourceTask:233) > [2024-05-20 09:33:44,895] INFO [MirrorHeartbeatConnector|task-0|offsets] > WorkerSourceTask{id=MirrorHeartbeatConnector-0} Committing offsets for 12 > acknowledged messages (org.apache.kafka.connect.runtime.WorkerSourceTask:233) > [2024-05-20 09:34:44,852] INFO [MirrorSourceConnector|task-0|offsets] > WorkerSourceTask{id=MirrorSourceConnector-0} Committing offsets for 20 >
Re: [PR] KAFKA-16793: Heartbeat API for upgrading ConsumerGroup [kafka]
dongnuo123 commented on code in PR #15988: URL: https://github.com/apache/kafka/pull/15988#discussion_r1610735381 ## group-coordinator/src/test/java/org/apache/kafka/coordinator/group/GroupMetadataManagerTest.java: ## @@ -12401,6 +12436,363 @@ public void testClassicGroupSyncToConsumerGroupRebalanceInProgress() throws Exce .withProtocolName("range") .build()) ); +context.assertJoinTimeout(groupId, memberId, 1); +} + +@Test +public void testClassicGroupHeartbeatToConsumerGroupMaintainsSession() throws Exception { +String groupId = "group-id"; +String memberId = Uuid.randomUuid().toString(); +int sessionTimeout = 5000; + +List protocols = Collections.singletonList( +new ConsumerGroupMemberMetadataValue.ClassicProtocol() +.setName("range") + .setMetadata(Utils.toArray(ConsumerProtocol.serializeSubscription( +new ConsumerPartitionAssignor.Subscription( +Arrays.asList("foo"), +null, +Collections.emptyList() +) +))) +); + +// Consumer group with a member using the classic protocol. +GroupMetadataManagerTestContext context = new GroupMetadataManagerTestContext.Builder() +.withAssignors(Collections.singletonList(new MockPartitionAssignor("range"))) +.withConsumerGroup(new ConsumerGroupBuilder(groupId, 10) +.withMember(new ConsumerGroupMember.Builder(memberId) +.setClassicMemberMetadata( +new ConsumerGroupMemberMetadataValue.ClassicMemberMetadata() +.setSessionTimeoutMs(sessionTimeout) +.setSupportedProtocols(protocols) +) +.setMemberEpoch(10) +.build())) +.build(); + +// Heartbeat to schedule the session timeout. +HeartbeatRequestData request = new HeartbeatRequestData() +.setGroupId(groupId) +.setMemberId(memberId) +.setGenerationId(10); +context.sendClassicGroupHeartbeat(request); +context.assertSessionTimeout(groupId, memberId, sessionTimeout); + +// Advance clock by 1/2 of session timeout. + GroupMetadataManagerTestContext.assertNoOrEmptyResult(context.sleep(sessionTimeout / 2)); + +HeartbeatResponseData heartbeatResponse = context.sendClassicGroupHeartbeat(request).response(); +assertEquals(Errors.NONE.code(), heartbeatResponse.errorCode()); +context.assertSessionTimeout(groupId, memberId, sessionTimeout); + +// Advance clock by 1/2 of session timeout. + GroupMetadataManagerTestContext.assertNoOrEmptyResult(context.sleep(sessionTimeout / 2)); + +heartbeatResponse = context.sendClassicGroupHeartbeat(request).response(); +assertEquals(Errors.NONE.code(), heartbeatResponse.errorCode()); +context.assertSessionTimeout(groupId, memberId, sessionTimeout); +} + +@Test +public void testClassicGroupHeartbeatToConsumerGroupRebalanceInProgress() throws Exception { +String groupId = "group-id"; +String memberId1 = Uuid.randomUuid().toString(); +String memberId2 = Uuid.randomUuid().toString(); +String memberId3 = Uuid.randomUuid().toString(); +Uuid fooTopicId = Uuid.randomUuid(); +Uuid barTopicId = Uuid.randomUuid(); +int sessionTimeout = 5000; +int rebalanceTimeout = 1; + +List protocols = Collections.singletonList( +new ConsumerGroupMemberMetadataValue.ClassicProtocol() +.setName("range") + .setMetadata(Utils.toArray(ConsumerProtocol.serializeSubscription( +new ConsumerPartitionAssignor.Subscription( +Arrays.asList("foo"), +null, +Collections.emptyList() +) +))) +); + +// Member 1 has a member epoch smaller than the group epoch. +ConsumerGroupMember member1 = new ConsumerGroupMember.Builder(memberId1) +.setRebalanceTimeoutMs(rebalanceTimeout) +.setClassicMemberMetadata( +new ConsumerGroupMemberMetadataValue.ClassicMemberMetadata() +.setSessionTimeoutMs(sessionTimeout) +.setSupportedProtocols(protocols) +) +.setMemberEpoch(9) +.build(); + +// Member 2 has unrevoked partition. +ConsumerGroupMember member2 = new ConsumerGroupMember.Builder(memberId2) +.setState(MemberState.UNREVOKED_PARTITIONS) +.setRebalanceTimeoutMs(rebalanceTimeout) +
[jira] [Assigned] (KAFKA-15284) Implement ConsumerGroupProtocolVersionResolver to determine consumer group protocol
[ https://issues.apache.org/jira/browse/KAFKA-15284?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Kirk True reassigned KAFKA-15284: - Assignee: (was: Kirk True) > Implement ConsumerGroupProtocolVersionResolver to determine consumer group > protocol > --- > > Key: KAFKA-15284 > URL: https://issues.apache.org/jira/browse/KAFKA-15284 > Project: Kafka > Issue Type: New Feature > Components: clients, consumer >Reporter: Kirk True >Priority: Major > Labels: kip-848-client-support > Fix For: 4.0.0 > > > At client initialization, we need to determine which of the > {{ConsumerDelegate}} implementations to use: > # {{LegacyKafkaConsumerDelegate}} > # {{AsyncKafkaConsumerDelegate}} > There are conditions defined by KIP-848 that determine client eligibility to > use the new protocol. This will be modeled by the—deep > breath—{{{}ConsumerGroupProtocolVersionResolver{}}}. > Known tasks: > * Determine at what point in the {{Consumer}} initialization the network > communication should happen > * Determine what RPCs to invoke in order to determine eligibility (API > versions, IBP version, etc.) > * Implement the network client lifecycle (startup, communication, shutdown, > etc.) > * Determine the fallback path in case the client is not eligible to use the > protocol -- This message was sent by Atlassian Jira (v8.20.10#820010)
Re: [PR] KAFKA-16793: Heartbeat API for upgrading ConsumerGroup [kafka]
dongnuo123 commented on code in PR #15988: URL: https://github.com/apache/kafka/pull/15988#discussion_r1610729396 ## group-coordinator/src/test/java/org/apache/kafka/coordinator/group/GroupMetadataManagerTest.java: ## @@ -12143,7 +12183,6 @@ public void testClassicGroupSyncToConsumerGroupWithAllConsumerProtocolVersions() // Consumer group with two members. // Member 1 uses the classic protocol and member 2 uses the consumer protocol. GroupMetadataManagerTestContext context = new GroupMetadataManagerTestContext.Builder() - .withConsumerGroupMigrationPolicy(ConsumerGroupMigrationPolicy.DOWNGRADE) Review Comment: It actually doesn't matter. This was first added because I copied and pasted it from the downgrade unit test. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-16793: Heartbeat API for upgrading ConsumerGroup [kafka]
dongnuo123 commented on code in PR #15988: URL: https://github.com/apache/kafka/pull/15988#discussion_r1610728790 ## group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupMetadataManager.java: ## @@ -4274,6 +4343,81 @@ private void validateClassicGroupHeartbeat( } } +/** + * Handle a classic group HeartbeatRequest to a consumer group. A response with + * REBALANCE_IN_PROGRESS is returned if 1) the member epoch is smaller than the + * group epoch, 2) the member is in UNREVOKED_PARTITIONS, or 3) the member is in + * UNRELEASED_PARTITIONS and all its partitions pending assignment are free. + * + * @param group The ConsumerGroup. + * @param contextThe request context. + * @param requestThe actual Heartbeat request. + * + * @return The coordinator result that contains the heartbeat response. + */ +private CoordinatorResult classicGroupHeartbeatToConsumerGroup( +ConsumerGroup group, +RequestContext context, +HeartbeatRequestData request +) throws UnknownMemberIdException, FencedInstanceIdException, IllegalGenerationException { +String groupId = request.groupId(); +String memberId = request.memberId(); +String instanceId = request.groupInstanceId(); +ConsumerGroupMember member = validateConsumerGroupMember(group, memberId, instanceId); + +throwIfMemberDoesNotUseClassicProtocol(member); +throwIfGenerationIdUnmatched(memberId, member.memberEpoch(), request.generationId()); + +scheduleConsumerGroupSessionTimeout(groupId, memberId, member.classicProtocolSessionTimeout().get()); + +Errors error = Errors.NONE; +// The member should rejoin if any of the following conditions is met. +// 1) The group epoch is bumped so the member need to rejoin to catch up. +// 2) The member needs to revoke some partitions and rejoin to reconcile with the new epoch. +// 3) The member's partitions pending assignment are free, so it can rejoin to get the complete assignment. +if (member.memberEpoch() < group.groupEpoch() || +member.state() == MemberState.UNREVOKED_PARTITIONS || +(member.state() == MemberState.UNRELEASED_PARTITIONS && !group.waitingOnUnreleasedPartition(member))) { Review Comment: > the helper checks that the latest state does in fact have all partitions released but we want it to rejoin to get the updated assignment Yes this is correct. > Will this member be updated to STABLE state in the next CurrentAssignmentBuilder#computeNextAssignment Yes it will in the reconciliation part in the `classicGroupJoinToConsumerGroup` https://github.com/apache/kafka/blob/27a6c156c49e375edea0e6f33a35c64c615db1b5/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupMetadataManager.java#L1737-L1745 -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-9228: Restart tasks on runtime-only connector config changes [kafka]
C0urante commented on code in PR #16001: URL: https://github.com/apache/kafka/pull/16001#discussion_r1610679966 ## connect/runtime/src/main/java/org/apache/kafka/connect/runtime/ConfigHash.java: ## @@ -0,0 +1,245 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.kafka.connect.runtime; + +import com.fasterxml.jackson.databind.ObjectMapper; +import org.apache.kafka.common.utils.Utils; +import org.apache.kafka.connect.data.Struct; +import org.apache.kafka.connect.errors.ConnectException; +import org.apache.kafka.connect.util.ConnectUtils; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import javax.ws.rs.core.HttpHeaders; +import java.io.IOException; +import java.util.Map; +import java.util.TreeMap; + +/** + * A deterministic hash of a connector configuration. This can be used to detect changes + * in connector configurations across worker lifetimes, which is sometimes necessary when + * connectors are reconfigured in a way that affects their tasks' runtime behavior but does + * not affect their tasks' configurations (for example, changing the key converter class). + * + * @see https://issues.apache.org/jira/browse/KAFKA-9228;>KAFKA-9228 + */ +public class ConfigHash { + +private static final Logger log = LoggerFactory.getLogger(ConfigHash.class); + +public static final ConfigHash NO_HASH = new ConfigHash(null); +public static final String CONNECTOR_CONFIG_HASH_HEADER = "X-Connect-Connector-Config-Hash"; + +private static final ObjectMapper OBJECT_MAPPER = new ObjectMapper(); + +private final Integer hash; + +// Visible for testing +ConfigHash(Integer hash) { +this.hash = hash; +} + +/** + * Read and parse a hash from the headers of a REST request. + * + * @param connector the name of the connector; only used for error logging + * purposes and may be null + * @param headers the headers from which to read and parse the hash; + *may be null + * + * @return the parsed hash; never null, but may be {@link #NO_HASH} if + * no hash header is present + * + * @throws ConnectException if the expected header is present for the hash, + * but it cannot be parsed as a 32-bit signed integer + */ +public static ConfigHash fromHeaders(String connector, HttpHeaders headers) { +if (headers == null) +return NO_HASH; + +String header = headers.getHeaderString(CONNECTOR_CONFIG_HASH_HEADER); +if (header == null) +return NO_HASH; + +int hash; +try { +hash = Integer.parseInt(header); +} catch (NumberFormatException e) { +if (connector == null) +connector = ""; + +if (log.isTraceEnabled()) { +log.error("Invalid connector config hash header for connector {}", connector); +log.trace("Invalid connector config hash header for connector {}: '{}'", connector, header); +} else { +log.error( +"Invalid connector config hash header for connector {}. " ++ "Please enable TRACE logging to see the invalid value", +connector +); +} +throw new ConnectException("Invalid hash header; expected a 32-bit signed integer"); +} +return new ConfigHash(hash); +} + +/** + * Generate a deterministic hash from the config. For configurations + * with identical key-value pairs, this hash will always be the same, and + * {@link #shouldUpdateTasks(ConfigHash, ConfigHash)} will return {@code false} + * for any two such configurations. Note that, for security reasons, those + * {@link ConfigHash} instances will still not {@link #equals(Object) equal} + * each other. + * + * @param config the configuration to hash; may be null + * + * @return the resulting hash; may be {@link #NO_HASH} if the configuration + * was null + * + * @throws ConnectException if the configuration cannot be serialized to JSON + * for the purposes of hashing +
Re: [PR] KAFKA-9228: Restart tasks on runtime-only connector config changes [kafka]
C0urante commented on PR #16001: URL: https://github.com/apache/kafka/pull/16001#issuecomment-2125789945 After some offline discussion with @gharris1727 we realized there was a potential for infinite rebalance loops with the previous implementation when config providers resolved values differently depending on the worker on which they were running. Assume some non-leader worker `W` hosts some connector `C`: 1. `W` computes the hash of a config-provider-resolved connector config 2. After observing that the hash differs from latest hash present in the config topic for that connector, `W` forwards a request to the leader to publish task configs for `C` to the config topic 4. The leader accepts this request, performs its own config provider resolution on the connector config, and writes a hash of that config-provider-resolved config to the config topic 5. After the ensuing rebalance, `W` sees a different hash in the config topic than the one that it computed in step 1, and the process repeats In order to address this, I've published a commit that causes non-leader workers to send their own connector config hash to the leader as an HTTP request header when forwarding task configs to the leader. I've also introduced a dedicated `ConfigHash` class that should make it significantly harder for config hash values to be leaked, including via log message. This class does not directly expose the underlying 32-bit integer and instead only permits indirect access to the value. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Assigned] (KAFKA-16815) Handle FencedInstanceId on heartbeat for new consumer
[ https://issues.apache.org/jira/browse/KAFKA-16815?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Lianet Magrans reassigned KAFKA-16815: -- Assignee: Lianet Magrans > Handle FencedInstanceId on heartbeat for new consumer > - > > Key: KAFKA-16815 > URL: https://issues.apache.org/jira/browse/KAFKA-16815 > Project: Kafka > Issue Type: Task > Components: clients, consumer >Reporter: Lianet Magrans >Assignee: Lianet Magrans >Priority: Major > Labels: kip-848-client-support > > With the new consumer group protocol, a member could receive a > FencedInstanceIdError in the heartbeat response. This could be the case when > an active member using a group instance id is removed from the group by an > admin client. If a second member joins with the same instance id, the first > member will receive a FencedInstanceId on the next heartbeat response. This > should be treated as a fatal error (consumer should not attempt to rejoin). > Currently, the FencedInstanceId is not explicitly handled by the client in > the HeartbeatRequestManager. It ends up being treated as a fatal error, see > [here|https://github.com/apache/kafka/blob/5552f5c26df4eb07b2d6ee218e4a29e4ca790d5c/clients/src/main/java/org/apache/kafka/clients/consumer/internals/HeartbeatRequestManager.java#L417] > (just because it lands on the "unexpected" error category). We should handle > it explicitly, just to make sure that we express that it's is an expected > error: log a proper message for it and fail (handleFatalFailure). We should > also that the error is included in the tests that cover the HB request error > handling > ([here|https://github.com/apache/kafka/blob/5552f5c26df4eb07b2d6ee218e4a29e4ca790d5c/clients/src/test/java/org/apache/kafka/clients/consumer/internals/HeartbeatRequestManagerTest.java#L798]) > -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Resolved] (KAFKA-16160) AsyncKafkaConsumer is trying to connect to a disconnected node in a tight loop
[ https://issues.apache.org/jira/browse/KAFKA-16160?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Philip Nee resolved KAFKA-16160. Resolution: Cannot Reproduce > AsyncKafkaConsumer is trying to connect to a disconnected node in a tight loop > -- > > Key: KAFKA-16160 > URL: https://issues.apache.org/jira/browse/KAFKA-16160 > Project: Kafka > Issue Type: Bug > Components: clients, consumer >Reporter: Philip Nee >Assignee: Phuc Hong Tran >Priority: Major > Labels: consumer-threading-refactor > Fix For: 3.8.0 > > > Observing some excessive logging running AsyncKafkaConsumer and observing > excessive logging of : > {code:java} > 1271 [2024-01-15 09:43:36,627] DEBUG [Consumer clientId=console-consumer, > groupId=concurrent_consumer] Node is not ready, handle the request in the > next event loop: node=worker4:9092 (id: 2147483644 rack: null), > request=UnsentRequest{requestBuil > der=ConsumerGroupHeartbeatRequestData(groupId='concurrent_consumer', > memberId='laIqS789StuhXFpTwjh6hA', memberEpoch=1, instanceId=null, > rackId=null, rebalanceTimeoutMs=30, subscribedTopicNames=[output-topic], > serverAssignor=null, topicP > artitions=[TopicPartitions(topicId=I5P5lIXvR1Cjc8hfoJg5bg, partitions=[0])]), > handler=org.apache.kafka.clients.consumer.internals.NetworkClientDelegate$FutureCompletionHandler@918925b, > node=Optional[worker4:9092 (id: 2147483644 rack: null)] , > timer=org.apache.kafka.common.utils.Timer@55ed4733} > (org.apache.kafka.clients.consumer.internals.NetworkClientDelegate) {code} > This seems to be triggered by a tight poll loop of the network thread. The > right thing to do is to backoff a bit for that given node and retry later. > This should be a blocker for 3.8 release. -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Commented] (KAFKA-16160) AsyncKafkaConsumer is trying to connect to a disconnected node in a tight loop
[ https://issues.apache.org/jira/browse/KAFKA-16160?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17848748#comment-17848748 ] Philip Nee commented on KAFKA-16160: Hey [~phuctran] - I'm going to close this issue because I haven't been seeing this in test logs. Given that we fixed a few relevant issues like inflight logics... > AsyncKafkaConsumer is trying to connect to a disconnected node in a tight loop > -- > > Key: KAFKA-16160 > URL: https://issues.apache.org/jira/browse/KAFKA-16160 > Project: Kafka > Issue Type: Bug > Components: clients, consumer >Reporter: Philip Nee >Assignee: Phuc Hong Tran >Priority: Major > Labels: consumer-threading-refactor > Fix For: 3.8.0 > > > Observing some excessive logging running AsyncKafkaConsumer and observing > excessive logging of : > {code:java} > 1271 [2024-01-15 09:43:36,627] DEBUG [Consumer clientId=console-consumer, > groupId=concurrent_consumer] Node is not ready, handle the request in the > next event loop: node=worker4:9092 (id: 2147483644 rack: null), > request=UnsentRequest{requestBuil > der=ConsumerGroupHeartbeatRequestData(groupId='concurrent_consumer', > memberId='laIqS789StuhXFpTwjh6hA', memberEpoch=1, instanceId=null, > rackId=null, rebalanceTimeoutMs=30, subscribedTopicNames=[output-topic], > serverAssignor=null, topicP > artitions=[TopicPartitions(topicId=I5P5lIXvR1Cjc8hfoJg5bg, partitions=[0])]), > handler=org.apache.kafka.clients.consumer.internals.NetworkClientDelegate$FutureCompletionHandler@918925b, > node=Optional[worker4:9092 (id: 2147483644 rack: null)] , > timer=org.apache.kafka.common.utils.Timer@55ed4733} > (org.apache.kafka.clients.consumer.internals.NetworkClientDelegate) {code} > This seems to be triggered by a tight poll loop of the network thread. The > right thing to do is to backoff a bit for that given node and retry later. > This should be a blocker for 3.8 release. -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Updated] (KAFKA-16160) AsyncKafkaConsumer is trying to connect to a disconnected node in a tight loop
[ https://issues.apache.org/jira/browse/KAFKA-16160?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Jon Chiu updated KAFKA-16160: - Priority: Major (was: Blocker) > AsyncKafkaConsumer is trying to connect to a disconnected node in a tight loop > -- > > Key: KAFKA-16160 > URL: https://issues.apache.org/jira/browse/KAFKA-16160 > Project: Kafka > Issue Type: Bug > Components: clients, consumer >Reporter: Philip Nee >Assignee: Phuc Hong Tran >Priority: Major > Labels: consumer-threading-refactor > Fix For: 3.8.0 > > > Observing some excessive logging running AsyncKafkaConsumer and observing > excessive logging of : > {code:java} > 1271 [2024-01-15 09:43:36,627] DEBUG [Consumer clientId=console-consumer, > groupId=concurrent_consumer] Node is not ready, handle the request in the > next event loop: node=worker4:9092 (id: 2147483644 rack: null), > request=UnsentRequest{requestBuil > der=ConsumerGroupHeartbeatRequestData(groupId='concurrent_consumer', > memberId='laIqS789StuhXFpTwjh6hA', memberEpoch=1, instanceId=null, > rackId=null, rebalanceTimeoutMs=30, subscribedTopicNames=[output-topic], > serverAssignor=null, topicP > artitions=[TopicPartitions(topicId=I5P5lIXvR1Cjc8hfoJg5bg, partitions=[0])]), > handler=org.apache.kafka.clients.consumer.internals.NetworkClientDelegate$FutureCompletionHandler@918925b, > node=Optional[worker4:9092 (id: 2147483644 rack: null)] , > timer=org.apache.kafka.common.utils.Timer@55ed4733} > (org.apache.kafka.clients.consumer.internals.NetworkClientDelegate) {code} > This seems to be triggered by a tight poll loop of the network thread. The > right thing to do is to backoff a bit for that given node and retry later. > This should be a blocker for 3.8 release. -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Assigned] (KAFKA-16623) KafkaAsyncConsumer system tests warn about revoking partitions that weren't previously assigned
[ https://issues.apache.org/jira/browse/KAFKA-16623?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Jon Chiu reassigned KAFKA-16623: Assignee: Lianet Magrans (was: Kirk True) > KafkaAsyncConsumer system tests warn about revoking partitions that weren't > previously assigned > --- > > Key: KAFKA-16623 > URL: https://issues.apache.org/jira/browse/KAFKA-16623 > Project: Kafka > Issue Type: Bug > Components: clients, consumer, system tests >Affects Versions: 3.8.0 >Reporter: Kirk True >Assignee: Lianet Magrans >Priority: Critical > Labels: consumer-threading-refactor, kip-848-client-support, > system-tests > Fix For: 3.8.0 > > > When running system tests for the KafkaAsyncConsumer, we occasionally see > this warning: > {noformat} > File "/usr/lib/python3.7/threading.py", line 917, in _bootstrap_inner > self.run() > File "/usr/lib/python3.7/threading.py", line 865, in run > self._target(*self._args, **self._kwargs) > File > "/home/jenkins/workspace/system-test-kafka-branch-builder/kafka/venv/lib/python3.7/site-packages/ducktape/services/background_thread.py", > line 38, in _protected_worker > self._worker(idx, node) > File > "/home/jenkins/workspace/system-test-kafka-branch-builder/kafka/tests/kafkatest/services/verifiable_consumer.py", > line 304, in _worker > handler.handle_partitions_revoked(event, node, self.logger) > File > "/home/jenkins/workspace/system-test-kafka-branch-builder/kafka/tests/kafkatest/services/verifiable_consumer.py", > line 163, in handle_partitions_revoked > (tp, node.account.hostname) > AssertionError: Topic partition TopicPartition(topic='test_topic', > partition=0) cannot be revoked from worker20 as it was not previously > assigned to that consumer > {noformat} > In test_fencing_static_consumer, there are two sets of consumers that use > group instance IDs: the initial set and the "conflict" set. It appears that > one of the "conflicting" consumers hijacks the partition ownership from the > coordinator's perspective when the initial consumer leaves the group. -- This message was sent by Atlassian Jira (v8.20.10#820010)
Re: [PR] MINOR: Fix rate metric spikes [kafka]
emitskevich-blp commented on code in PR #15889: URL: https://github.com/apache/kafka/pull/15889#discussion_r1610656953 ## clients/src/main/java/org/apache/kafka/common/metrics/stats/SampledStat.java: ## @@ -50,10 +50,13 @@ public void record(MetricConfig config, double value, long timeMs) { sample = advance(config, timeMs); update(sample, config, value, timeMs); sample.eventCount += 1; +sample.lastEventMs = timeMs; } private Sample advance(MetricConfig config, long timeMs) { -this.current = (this.current + 1) % config.samples(); +// need to keep one extra sample (see purgeObsoleteSamples() logic) Review Comment: Addressed, but I slightly changed the suggestion, please check if it's readable. Because technically this extra space is not to remember the last recording time, it's to keep the entire extra sample with all collected data. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] MINOR: Fix rate metric spikes [kafka]
emitskevich-blp commented on code in PR #15889: URL: https://github.com/apache/kafka/pull/15889#discussion_r1610656953 ## clients/src/main/java/org/apache/kafka/common/metrics/stats/SampledStat.java: ## @@ -50,10 +50,13 @@ public void record(MetricConfig config, double value, long timeMs) { sample = advance(config, timeMs); update(sample, config, value, timeMs); sample.eventCount += 1; +sample.lastEventMs = timeMs; } private Sample advance(MetricConfig config, long timeMs) { -this.current = (this.current + 1) % config.samples(); +// need to keep one extra sample (see purgeObsoleteSamples() logic) Review Comment: I slightly changed the suggestion, please check if it's readable. Because technically this extra space is not to remember the last recording time, it's to keep the entire extra sample with all collected data. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Updated] (KAFKA-16639) AsyncKafkaConsumer#close does not send heartbeat to leave group
[ https://issues.apache.org/jira/browse/KAFKA-16639?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Kirk True updated KAFKA-16639: -- Priority: Critical (was: Major) > AsyncKafkaConsumer#close does not send heartbeat to leave group > --- > > Key: KAFKA-16639 > URL: https://issues.apache.org/jira/browse/KAFKA-16639 > Project: Kafka > Issue Type: Bug > Components: clients, consumer >Reporter: Chia-Ping Tsai >Assignee: Philip Nee >Priority: Critical > Labels: kip-848-client-support > Fix For: 3.8.0 > > > This bug can be reproduced by immediately closing a consumer which is just > created. > The root cause is that we skip the new heartbeat used to leave group when > there is a in-flight heartbeat request > ([https://github.com/apache/kafka/blob/5de5d967adffd864bad3ec729760a430253abf38/clients/src/main/java/org/apache/kafka/clients/consumer/internals/HeartbeatRequestManager.java#L212).] > It seems to me the simple solution is that we create a heartbeat request when > meeting above situation and then send it by pollOnClose > ([https://github.com/apache/kafka/blob/5de5d967adffd864bad3ec729760a430253abf38/clients/src/main/java/org/apache/kafka/clients/consumer/internals/RequestManager.java#L62).] > -- This message was sent by Atlassian Jira (v8.20.10#820010)
Re: [PR] MINOR: Fix rate metric spikes [kafka]
emitskevich-blp commented on code in PR #15889: URL: https://github.com/apache/kafka/pull/15889#discussion_r1610653813 ## clients/src/test/java/org/apache/kafka/common/metrics/stats/SampledStatTest.java: ## @@ -0,0 +1,110 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.kafka.common.metrics.stats; + +import org.apache.kafka.common.metrics.MetricConfig; +import org.apache.kafka.common.utils.MockTime; +import org.apache.kafka.common.utils.Time; +import org.junit.jupiter.api.BeforeEach; +import org.junit.jupiter.api.DisplayName; +import org.junit.jupiter.api.Test; + +import java.util.List; + +import static java.util.concurrent.TimeUnit.SECONDS; +import static org.junit.jupiter.api.Assertions.assertEquals; + +class SampledStatTest { + +private SampledStat stat; +private Time time; + +@BeforeEach +public void setup() { +stat = new SampleCount(0); +time = new MockTime(); +} + +@Test +@DisplayName("Sample should be purged if doesn't overlap the window") +public void testSampleIsPurgedIfDoesntOverlap() { +MetricConfig config = new MetricConfig().timeWindow(1, SECONDS).samples(2); + +// Monitored window: 2s. Complete a sample and wait 2.5s after. +completeSample(config); +time.sleep(2500); + +double numSamples = stat.measure(config, time.milliseconds()); +assertEquals(0, numSamples); +} + +@Test +@DisplayName("Sample should be kept if overlaps the window") +public void testSampleIsKeptIfOverlaps() { +MetricConfig config = new MetricConfig().timeWindow(1, SECONDS).samples(2); + +// Monitored window: 2s. Complete a sample and wait 1.5s after. +completeSample(config); +time.sleep(1500); + +double numSamples = stat.measure(config, time.milliseconds()); +assertEquals(1, numSamples); +} + +@Test +@DisplayName("Sample should be kept if overlaps the window and is n+1") +public void testSampleIsKeptIfOverlapsAndExtra() { +MetricConfig config = new MetricConfig().timeWindow(1, SECONDS).samples(2); + +// Monitored window: 2s. Create 2 samples with gaps in between and +// take a measurement at 2.2s from the start. +completeSample(config); +time.sleep(100); +completeSample(config); +time.sleep(100); +stat.record(config, 1, time.milliseconds()); + +double numSamples = stat.measure(config, time.milliseconds()); +assertEquals(3, numSamples); +} + +// Creates a sample with events at the start and at the end. Positions clock at the end. +private void completeSample(MetricConfig config) { +stat.record(config, 1, time.milliseconds()); +time.sleep(config.timeWindowMs() - 1); +stat.record(config, 1, time.milliseconds()); +time.sleep(1); +} + +// measure() of this impl returns the number of samples +static class SampleCount extends SampledStat { + +SampleCount(double initialValue) { Review Comment: Applied ## clients/src/main/java/org/apache/kafka/common/metrics/stats/SampledStat.java: ## @@ -40,7 +40,7 @@ public abstract class SampledStat implements MeasurableStat { public SampledStat(double initialValue) { this.initialValue = initialValue; -this.samples = new ArrayList<>(2); +this.samples = new ArrayList<>(3); Review Comment: Addressed -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[PR] KAFKA-15045: (KIP-924 pt. 9) TaskAssignmentUtils implementation of optimizeRackAwareActiveTasks [kafka]
apourchet opened a new pull request, #16033: URL: https://github.com/apache/kafka/pull/16033 This PR implements the rack aware optimization of active tasks that can be used by the assignors themselves. It takes in the full output of the assignment and tries to reorganize it so as to minimize traffic and non-overlap costs. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-16252; Dynamic KRaft network manager and channel [kafka]
jsancio commented on code in PR #15986: URL: https://github.com/apache/kafka/pull/15986#discussion_r1610641974 ## core/src/test/scala/unit/kafka/server/KafkaConfigTest.scala: ## @@ -1366,6 +1368,26 @@ class KafkaConfigTest { assertEquals(expectedVoters, addresses) } + @Test + def testParseQuorumBootstrapServers(): Unit = { Review Comment: Okay. It does look like there are bugs in the in the implementation of `Utils.getHost` and `Utils.getPort`. I don't really want to fix them in this PR. I created this issue: https://issues.apache.org/jira/browse/KAFKA-16824 -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Created] (KAFKA-16824) Utils.getHost and Utils.getPort do not catch a lot of invalid host and ports
José Armando García Sancio created KAFKA-16824: -- Summary: Utils.getHost and Utils.getPort do not catch a lot of invalid host and ports Key: KAFKA-16824 URL: https://issues.apache.org/jira/browse/KAFKA-16824 Project: Kafka Issue Type: Bug Reporter: José Armando García Sancio For example it is not able to detect at least this malformed hosts and ports: # ho(st:9092 # host:-92 -- This message was sent by Atlassian Jira (v8.20.10#820010)
[PR] KAFKA-16737: Enable unit tests for new consumer & cleanup TODOs [kafka]
lianetm opened a new pull request, #16032: URL: https://github.com/apache/kafka/pull/16032 KafkaConsumerTest contained lots of unit tests that were not enabled for the new consumer, with TODOs for verifying if they cold be enabled. This PR enables all the unit tests that could be applied/pass for the new consumer (~16 unit tests enabled) and cleans up the TODOs. The tests that remain being applied to the legacy consumer only cannot run for the new consumer, due to: 1. bugs in the new consumer. Only 5 tests in this case, for those I left TODOs just to reference the Jira I filed to make sure the tests are enabled as soon as the bugs are fixed (only TODOs intentionally left) 2. the way the test is written or what it is testing is specific to the legacy consumer. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-8206: Allow client to rebootstrap [kafka]
gharris1727 commented on code in PR #13277: URL: https://github.com/apache/kafka/pull/13277#discussion_r1610617003 ## clients/src/main/java/org/apache/kafka/clients/CommonClientConfigs.java: ## @@ -219,6 +219,19 @@ public class CommonClientConfigs { public static final String DEFAULT_API_TIMEOUT_MS_DOC = "Specifies the timeout (in milliseconds) for client APIs. " + "This configuration is used as the default timeout for all client operations that do not specify a timeout parameter."; +public static final String METADATA_RECOVERY_STRATEGY_CONFIG = "metadata.recovery.strategy"; +public static final String METADATA_RECOVERY_STRATEGY_DOC = "Controls how the client recovers when none of the brokers known to it is available. " + +"If set to none, the client fails. If set to rebootstrap, " + +"the client repeats the bootstrap process using bootstrap.servers. " + +"Rebootstrapping is useful when a client communicates with brokers so infrequently " + +"that the set of brokers may change entirely before the client refreshes metadata. " + +"Opportunities to rebootstrapping depend on connection establishing and reconnect timeouts and the broker count. " + +"The timeouts may prevent identifying brokers as unavailable simultaneously, which is necessary to trigger rebootstrapping. " + Review Comment: This section is very confusing for me, and I think this should be reworded. Maybe it can flow like this: "Metadata recovery is triggered when all last-known brokers appear unavailable simultaneously. Brokers appear unavailable when disconnected and no current retry attempt is in-progress." ## clients/src/main/java/org/apache/kafka/clients/NetworkClient.java: ## Review Comment: Stuff in this package is not included in the public API: https://kafka.apache.org/37/javadoc/index.html only the stuff in admin/producer/consumer (and not in sub-packages) is truly public. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Created] (KAFKA-16823) Extract LegacyConsumer-specific unit tests from generic KafkaConsumerTest
Lianet Magrans created KAFKA-16823: -- Summary: Extract LegacyConsumer-specific unit tests from generic KafkaConsumerTest Key: KAFKA-16823 URL: https://issues.apache.org/jira/browse/KAFKA-16823 Project: Kafka Issue Type: Task Components: clients, consumer Reporter: Lianet Magrans Currently the KafkaConsumerTest file contains unit tests that apply to both consumer implementations, but also tests that apply to the legacy consumer only. We should consider splitting the tests that apply to the legacy only into their own LegacyConsumerTest file (aligning with the existing AsyncKafkaConsumerTest). End result would be: KafkaConsumerTest -> unit tests that apply to both consumers. LegacyKafkaConsumerTest -> unit tests that apply only to the LegacyKafkaConsumer, either because of the logic they test, or the way they are written (file to be created with this task) AsyncKafkaConsumerTest -> unit tests that apply only to the AsyncKafkaConsumer (this file already exist) -- This message was sent by Atlassian Jira (v8.20.10#820010)
Re: [PR] KAFKA-10551: Add topic id support to produce request and response [kafka]
jolshan commented on code in PR #15968: URL: https://github.com/apache/kafka/pull/15968#discussion_r1610605263 ## core/src/main/scala/kafka/server/ReplicaManager.scala: ## @@ -1361,10 +1366,10 @@ class ReplicaManager(val config: KafkaConfig, */ private def appendToLocalLog(internalTopicsAllowed: Boolean, origin: AppendOrigin, - entriesPerPartition: Map[TopicPartition, MemoryRecords], + entriesPerPartition: Map[TopicIdPartition, MemoryRecords], Review Comment: Ok -- once we start using these across the log layer it makes sense. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] MINOR: Fix rate metric spikes [kafka]
junrao commented on code in PR #15889: URL: https://github.com/apache/kafka/pull/15889#discussion_r1610602287 ## clients/src/main/java/org/apache/kafka/common/metrics/stats/SampledStat.java: ## @@ -106,44 +109,51 @@ public String toString() { public abstract double combine(List samples, MetricConfig config, long now); -/* Timeout any windows that have expired in the absence of any events */ +// purge any samples that lack observed events within the monitored window protected void purgeObsoleteSamples(MetricConfig config, long now) { long expireAge = config.samples() * config.timeWindowMs(); for (Sample sample : samples) { -if (now - sample.lastWindowMs >= expireAge) +// samples overlapping the monitored window are kept, +// even if they started before it +if (now - sample.lastEventMs >= expireAge) { sample.reset(now); +} } } protected static class Sample { public double initialValue; public long eventCount; -public long lastWindowMs; +public long startTimeMs; +public long lastEventMs; public double value; public Sample(double initialValue, long now) { this.initialValue = initialValue; this.eventCount = 0; -this.lastWindowMs = now; +this.startTimeMs = now; +this.lastEventMs = now; this.value = initialValue; } public void reset(long now) { this.eventCount = 0; -this.lastWindowMs = now; +this.startTimeMs = now; +this.lastEventMs = now; this.value = initialValue; } public boolean isComplete(long timeMs, MetricConfig config) { -return timeMs - lastWindowMs >= config.timeWindowMs() || eventCount >= config.eventWindow(); +return timeMs - startTimeMs >= config.timeWindowMs() || eventCount >= config.eventWindow(); Review Comment: > in this case, totalElapsedTimeMs will be equal to (config.samples() - 1) * timeWindowMs, and then we will get smaller measure due to the larger denominator @chia7712: Could you explain your point a bit more? In the current design, we always ensure at least `(config.samples() - 1) * timeWindowMs` in `totalElapsedTimeMs`. This is to avoid the potential extreme large measured value when the accumulated sample window is small (e.g. when the metric is first recorded). -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-15045: (KIP-924 pt. 7) Simplify requirements for rack aware graphs [kafka]
ableegoldman merged PR #16004: URL: https://github.com/apache/kafka/pull/16004 -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-15045: (KIP-924 pt. 5) Add rack information to ApplicationState [kafka]
ableegoldman merged PR #15972: URL: https://github.com/apache/kafka/pull/15972 -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Updated] (KAFKA-16819) CoordinatorRequestManager seems to return 0ms during the coordinator discovery
[ https://issues.apache.org/jira/browse/KAFKA-16819?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Kirk True updated KAFKA-16819: -- Labels: consumer-threading-refactor (was: ) > CoordinatorRequestManager seems to return 0ms during the coordinator discovery > -- > > Key: KAFKA-16819 > URL: https://issues.apache.org/jira/browse/KAFKA-16819 > Project: Kafka > Issue Type: Bug > Components: clients, consumer >Reporter: Philip Nee >Assignee: Philip Nee >Priority: Minor > Labels: consumer-threading-refactor > > In KAFKA-15250 we discovered the ConsumerNetworkThread is looping without > much backoff. The in-flight check PR fixed a lot of it; however, during the > coordinator discovery phase, CoordinatorRequestManager would keep on > returning 0 before the coordinator node was found. > > The impact is minor but we should be expecting the coordinator manager to > backoff until the exp backoff expired (so it should return around 100ms). -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Updated] (KAFKA-16819) CoordinatorRequestManager seems to return 0ms during the coordinator discovery
[ https://issues.apache.org/jira/browse/KAFKA-16819?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Kirk True updated KAFKA-16819: -- Component/s: clients > CoordinatorRequestManager seems to return 0ms during the coordinator discovery > -- > > Key: KAFKA-16819 > URL: https://issues.apache.org/jira/browse/KAFKA-16819 > Project: Kafka > Issue Type: Bug > Components: clients, consumer >Reporter: Philip Nee >Assignee: Philip Nee >Priority: Minor > > In KAFKA-15250 we discovered the ConsumerNetworkThread is looping without > much backoff. The in-flight check PR fixed a lot of it; however, during the > coordinator discovery phase, CoordinatorRequestManager would keep on > returning 0 before the coordinator node was found. > > The impact is minor but we should be expecting the coordinator manager to > backoff until the exp backoff expired (so it should return around 100ms). -- This message was sent by Atlassian Jira (v8.20.10#820010)
Re: [PR] KAFKA-16452: Don't throw OOORE when converting the offset to metadata [kafka]
junrao commented on code in PR #15825: URL: https://github.com/apache/kafka/pull/15825#discussion_r1610572095 ## core/src/main/scala/kafka/server/DelayedFetch.scala: ## @@ -91,19 +91,23 @@ class DelayedFetch( // Go directly to the check for Case G if the message offsets are the same. If the log segment // has just rolled, then the high watermark offset will remain the same but be on the old segment, // which would incorrectly be seen as an instance of Case F. -if (endOffset.messageOffset != fetchOffset.messageOffset) { - if (endOffset.onOlderSegment(fetchOffset)) { -// Case F, this can happen when the new fetch operation is on a truncated leader -debug(s"Satisfying fetch $this since it is fetching later segments of partition $topicIdPartition.") -return forceComplete() +if (fetchOffset.messageOffset > endOffset.messageOffset) { + // Case F, this can happen when the new fetch operation is on a truncated leader + debug(s"Satisfying fetch $this since it is fetching later segments of partition $topicIdPartition.") + return forceComplete() +} else if (fetchOffset.messageOffset < endOffset.messageOffset) { + if (fetchOffset.messageOffsetOnly() || endOffset.messageOffsetOnly()) { Review Comment: Could we fold this into the condition above? ## core/src/test/scala/integration/kafka/server/DelayedFetchTest.scala: ## @@ -164,18 +169,71 @@ class DelayedFetchTest { assertTrue(delayedFetch.tryComplete()) assertTrue(delayedFetch.isCompleted) assertTrue(fetchResultOpt.isDefined) + +val fetchResult = fetchResultOpt.get +assertEquals(Errors.NONE, fetchResult.error) + } + + @ParameterizedTest(name = "testDelayedFetchWithMessageOnlyHighWatermark endOffset={0}") + @ValueSource(longs = Array(0, 500)) + def testDelayedFetchWithMessageOnlyHighWatermark(endOffset: Long): Unit = { +val topicIdPartition = new TopicIdPartition(Uuid.randomUuid(), 0, "topic") +val fetchOffset = 450L +val logStartOffset = 5L +val currentLeaderEpoch = Optional.of[Integer](10) +val replicaId = 1 + +val fetchStatus = FetchPartitionStatus( + startOffsetMetadata = new LogOffsetMetadata(fetchOffset), + fetchInfo = new FetchRequest.PartitionData(topicIdPartition.topicId, fetchOffset, logStartOffset, maxBytes, currentLeaderEpoch)) +val fetchParams = buildFollowerFetchParams(replicaId, maxWaitMs = 500) + +var fetchResultOpt: Option[FetchPartitionData] = None +def callback(responses: Seq[(TopicIdPartition, FetchPartitionData)]): Unit = { + fetchResultOpt = Some(responses.head._2) +} + +val delayedFetch = new DelayedFetch( + params = fetchParams, + fetchPartitionStatus = Seq(topicIdPartition -> fetchStatus), + replicaManager = replicaManager, + quota = replicaQuota, + responseCallback = callback +) + +val partition: Partition = mock(classOf[Partition]) + when(replicaManager.getPartitionOrException(topicIdPartition.topicPartition)).thenReturn(partition) +// Note that the high-watermark does not contain the complete metadata +val endOffsetMetadata = new LogOffsetMetadata(endOffset, -1L, -1) +when(partition.fetchOffsetSnapshot( + currentLeaderEpoch, + fetchOnlyFromLeader = true)) + .thenReturn(new LogOffsetSnapshot(0L, endOffsetMetadata, endOffsetMetadata, endOffsetMetadata)) +when(replicaManager.isAddingReplica(any(), anyInt())).thenReturn(false) +expectReadFromReplica(fetchParams, topicIdPartition, fetchStatus.fetchInfo, Errors.NONE) + +// 1. When `endOffset` is 0, it refers to the truncation case +// 2. When `endOffset` is 500, it refers to the normal case +val expected = endOffset == 0 Review Comment: Hmm, in both cases, we are not forcing the completion of the delayed fetch, right? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-15713: KRaft support in AclCommandTest [kafka]
chia7712 commented on code in PR #15830: URL: https://github.com/apache/kafka/pull/15830#discussion_r1610529172 ## core/src/test/scala/unit/kafka/admin/AclCommandTest.scala: ## @@ -122,19 +128,27 @@ class AclCommandTest extends QuorumTestHarness with Logging { super.tearDown() } + override protected def kraftControllerConfigs(): Seq[Properties] = { +val controllerConfig = new Properties +controllerConfig.put(KafkaConfig.AuthorizerClassNameProp, classOf[StandardAuthorizer].getName) +controllerConfig.put(StandardAuthorizer.SUPER_USERS_CONFIG, "User:ANONYMOUS") +Seq(controllerConfig) + } + @Test def testAclCliWithAuthorizer(): Unit = { testAclCli(zkArgs) } - @Test - def testAclCliWithAdminAPI(): Unit = { + @ParameterizedTest + @ValueSource(strings = Array("zk")) Review Comment: Maybe we can create `AclCommand.AdminClientService` to test its method `listAcls` with retry instead of grabbing the output from `main` ? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Created] (KAFKA-16822) Abstract consumer group in coordinator to share functionality with share group
Apoorv Mittal created KAFKA-16822: - Summary: Abstract consumer group in coordinator to share functionality with share group Key: KAFKA-16822 URL: https://issues.apache.org/jira/browse/KAFKA-16822 Project: Kafka Issue Type: Sub-task Reporter: Apoorv Mittal Assignee: Apoorv Mittal -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Updated] (KAFKA-16766) New consumer offsetsForTimes timeout exception does not have the proper message
[ https://issues.apache.org/jira/browse/KAFKA-16766?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Lianet Magrans updated KAFKA-16766: --- Description: If a call to consumer.offsetsForTimes times out, the new AsyncKafkaConsumer will throw a org.apache.kafka.common.errors.TimeoutException as expected, but with the following as message: "java.util.concurrent.TimeoutException". We should provide a clearer message, and I would even say we keep the same message that the LegacyConsumer shows in this case, ex: "Failed to get offsets by times in 6ms". To fix this we should consider catching the timeout exception in the consumer when offsetsForTimes result times out ([here|https://github.com/apache/kafka/blob/0e023e1f736ea03568032cc282803df8e61eb451/clients/src/main/java/org/apache/kafka/clients/consumer/internals/AsyncKafkaConsumer.java#L1115]), and propagate it with the message specific to offsetsForTimes. Same situation exists for beginningOffsets and endOffsets. All 3 funcs show the same timeout message in the LegacyConsumer (defined [here|https://github.com/apache/kafka/blob/0e023e1f736ea03568032cc282803df8e61eb451/clients/src/main/java/org/apache/kafka/clients/consumer/internals/OffsetFetcher.java#L182]), but do not have a clear message in the Async, so we should fix them all 3. With the fix, we should write tests for each func, like the ones defined for the Legacy Consumer ([here|https://github.com/apache/kafka/blob/0e023e1f736ea03568032cc282803df8e61eb451/clients/src/test/java/org/apache/kafka/clients/consumer/KafkaConsumerTest.java#L3244-L3276]). Note that we would need different tests, added to AsyncKafkaConsumerTest, given that the AsyncKafkaConsumer issues a FindCoordinator request in this case (on manager poll), but the LegacyKafkaConsumer does not, so it does not account for that when matching requests/responses in the current tests. was: If a call to consumer.offsetsForTimes times out, the new AsyncKafkaConsumer will throw a org.apache.kafka.common.errors.TimeoutException as expected, but with the following as message: "java.util.concurrent.TimeoutException". We should provide a clearer message, and I would even say we keep the same message that the LegacyConsumer shows in this case, ex: "Failed to get offsets by times in 6ms". To fix this we should consider catching the timeout exception in the consumer when offsetsForTimes result times out ([here|https://github.com/apache/kafka/blob/0e023e1f736ea03568032cc282803df8e61eb451/clients/src/main/java/org/apache/kafka/clients/consumer/internals/AsyncKafkaConsumer.java#L1115]), and propagate it with the message specific to offsetsForTimes. Same situation exists for beginningOffsets and endOffsets. All 3 funcs show the same timeout message in the LegacyConsumer (defined [here|https://github.com/apache/kafka/blob/0e023e1f736ea03568032cc282803df8e61eb451/clients/src/main/java/org/apache/kafka/clients/consumer/internals/OffsetFetcher.java#L182]), but do not have a clear message in the Async, so we should fix them all 3. With the fix, we should write tests for each func, like the ones defined for the Legacy Consumer ([here|https://github.com/apache/kafka/blob/0e023e1f736ea03568032cc282803df8e61eb451/clients/src/test/java/org/apache/kafka/clients/consumer/KafkaConsumerTest.java#L3244-L3276]). Note that we would need different tests, added to AsyncKafkaConsumerTest, given that the async consumer issues a FindCoordinator request in this case, but the AsyncConsumer does, so it does not account for that when matching requests/responses in the current tests. > New consumer offsetsForTimes timeout exception does not have the proper > message > --- > > Key: KAFKA-16766 > URL: https://issues.apache.org/jira/browse/KAFKA-16766 > Project: Kafka > Issue Type: Bug > Components: clients, consumer >Affects Versions: 3.7.0 >Reporter: Lianet Magrans >Priority: Major > Labels: kip-848-client-support > Fix For: 3.8.0 > > > If a call to consumer.offsetsForTimes times out, the new AsyncKafkaConsumer > will throw a org.apache.kafka.common.errors.TimeoutException as expected, but > with the following as message: "java.util.concurrent.TimeoutException". > We should provide a clearer message, and I would even say we keep the same > message that the LegacyConsumer shows in this case, ex: "Failed to get > offsets by times in 6ms". > To fix this we should consider catching the timeout exception in the consumer > when offsetsForTimes result times out > ([here|https://github.com/apache/kafka/blob/0e023e1f736ea03568032cc282803df8e61eb451/clients/src/main/java/org/apache/kafka/clients/consumer/internals/AsyncKafkaConsumer.java#L1115]), > and propagate it with the message
Re: [PR] MINOR: Fix rate metric spikes [kafka]
chia7712 commented on code in PR #15889: URL: https://github.com/apache/kafka/pull/15889#discussion_r1610336329 ## clients/src/main/java/org/apache/kafka/common/metrics/stats/SampledStat.java: ## @@ -40,7 +40,7 @@ public abstract class SampledStat implements MeasurableStat { public SampledStat(double initialValue) { this.initialValue = initialValue; -this.samples = new ArrayList<>(2); +this.samples = new ArrayList<>(3); Review Comment: It would be nice to add comments for this magic number :) ## clients/src/main/java/org/apache/kafka/common/metrics/stats/SampledStat.java: ## @@ -106,44 +109,51 @@ public String toString() { public abstract double combine(List samples, MetricConfig config, long now); -/* Timeout any windows that have expired in the absence of any events */ +// purge any samples that lack observed events within the monitored window protected void purgeObsoleteSamples(MetricConfig config, long now) { long expireAge = config.samples() * config.timeWindowMs(); for (Sample sample : samples) { -if (now - sample.lastWindowMs >= expireAge) +// samples overlapping the monitored window are kept, +// even if they started before it +if (now - sample.lastEventMs >= expireAge) { sample.reset(now); +} } } protected static class Sample { public double initialValue; public long eventCount; -public long lastWindowMs; +public long startTimeMs; +public long lastEventMs; public double value; public Sample(double initialValue, long now) { this.initialValue = initialValue; this.eventCount = 0; -this.lastWindowMs = now; +this.startTimeMs = now; +this.lastEventMs = now; this.value = initialValue; } public void reset(long now) { this.eventCount = 0; -this.lastWindowMs = now; +this.startTimeMs = now; +this.lastEventMs = now; this.value = initialValue; } public boolean isComplete(long timeMs, MetricConfig config) { -return timeMs - lastWindowMs >= config.timeWindowMs() || eventCount >= config.eventWindow(); +return timeMs - startTimeMs >= config.timeWindowMs() || eventCount >= config.eventWindow(); Review Comment: This is unrelated to this PR, but it seems `eventCount` could be another issue if we set a non-maximum value. For example, all samples are within a single `timeWindowMs` due to a bunch of records. in this case, `totalElapsedTimeMs` will be equal to `(config.samples() - 1) * timeWindowMs`, and then we will get smaller measure due to the larger denominator Maybe we can remove `eventWindow` as it is unused in production. This can be another PR if it is valid ## clients/src/main/java/org/apache/kafka/common/metrics/stats/SampledStat.java: ## @@ -50,10 +50,13 @@ public void record(MetricConfig config, double value, long timeMs) { sample = advance(config, timeMs); update(sample, config, value, timeMs); sample.eventCount += 1; +sample.lastEventMs = timeMs; } private Sample advance(MetricConfig config, long timeMs) { -this.current = (this.current + 1) % config.samples(); +// need to keep one extra sample (see purgeObsoleteSamples() logic) Review Comment: How about saying "we have one extra sample to remember the last recording time in order to keep the overlapping sample (see purgeObsoleteSamples() logic)" ## clients/src/test/java/org/apache/kafka/common/metrics/stats/SampledStatTest.java: ## @@ -0,0 +1,110 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.kafka.common.metrics.stats; + +import org.apache.kafka.common.metrics.MetricConfig; +import org.apache.kafka.common.utils.MockTime; +import org.apache.kafka.common.utils.Time; +import org.junit.jupiter.api.BeforeEach; +import org.junit.jupiter.api.DisplayName; +import org.junit.jupiter.api.Test; +
Re: [PR] KAFKA-16308 [1/N]: Create FeatureVersion interface and add `--feature` flag and handling to StorageTool [kafka]
jolshan commented on code in PR #15685: URL: https://github.com/apache/kafka/pull/15685#discussion_r1610507371 ## server-common/src/main/java/org/apache/kafka/server/common/TestFeatureVersion.java: ## @@ -0,0 +1,56 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.kafka.server.common; + +import java.util.Collections; +import java.util.Map; + +public enum TestFeatureVersion implements FeatureVersion { + +TEST_0(0, MetadataVersion.IBP_3_3_IV0, Collections.emptyMap()), Review Comment: I tried this, but the logic for defaults breaks when we don't have a 0 version. ``` val allFeaturesAndLevels: List[FeatureVersion] = allFeatures.map { feature => val level: java.lang.Short = specifiedFeatures.getOrElse(feature.featureName, feature.defaultValue(metadataVersionForDefault)) feature.fromFeatureLevel(level) } ``` Here, I suppose we could do some work to filter out 0s and assume on the rest of the code that a missing value in the map = 0. If that is preferred I can do that instead, but I also don't think the 0 version necessarily hurts anything. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Created] (KAFKA-16821) Create a new interface to store member metadata
Ritika Reddy created KAFKA-16821: Summary: Create a new interface to store member metadata Key: KAFKA-16821 URL: https://issues.apache.org/jira/browse/KAFKA-16821 Project: Kafka Issue Type: Sub-task Reporter: Ritika Reddy Assignee: Ritika Reddy Attachments: Screenshot 2024-05-14 at 11.03.10 AM.png !Screenshot 2024-05-14 at 11.03.10 AM.png! -- This message was sent by Atlassian Jira (v8.20.10#820010)
Re: [PR] KAFKA-16793: Heartbeat API for upgrading ConsumerGroup [kafka]
jeffkbkim commented on code in PR #15988: URL: https://github.com/apache/kafka/pull/15988#discussion_r1610481770 ## group-coordinator/src/test/java/org/apache/kafka/coordinator/group/GroupMetadataManagerTest.java: ## @@ -12401,6 +12436,363 @@ public void testClassicGroupSyncToConsumerGroupRebalanceInProgress() throws Exce .withProtocolName("range") .build()) ); +context.assertJoinTimeout(groupId, memberId, 1); +} + +@Test +public void testClassicGroupHeartbeatToConsumerGroupMaintainsSession() throws Exception { +String groupId = "group-id"; +String memberId = Uuid.randomUuid().toString(); +int sessionTimeout = 5000; + +List protocols = Collections.singletonList( +new ConsumerGroupMemberMetadataValue.ClassicProtocol() +.setName("range") + .setMetadata(Utils.toArray(ConsumerProtocol.serializeSubscription( +new ConsumerPartitionAssignor.Subscription( +Arrays.asList("foo"), +null, +Collections.emptyList() +) +))) +); + +// Consumer group with a member using the classic protocol. +GroupMetadataManagerTestContext context = new GroupMetadataManagerTestContext.Builder() +.withAssignors(Collections.singletonList(new MockPartitionAssignor("range"))) +.withConsumerGroup(new ConsumerGroupBuilder(groupId, 10) +.withMember(new ConsumerGroupMember.Builder(memberId) +.setClassicMemberMetadata( +new ConsumerGroupMemberMetadataValue.ClassicMemberMetadata() +.setSessionTimeoutMs(sessionTimeout) +.setSupportedProtocols(protocols) +) +.setMemberEpoch(10) +.build())) +.build(); + +// Heartbeat to schedule the session timeout. +HeartbeatRequestData request = new HeartbeatRequestData() +.setGroupId(groupId) +.setMemberId(memberId) +.setGenerationId(10); +context.sendClassicGroupHeartbeat(request); +context.assertSessionTimeout(groupId, memberId, sessionTimeout); + +// Advance clock by 1/2 of session timeout. + GroupMetadataManagerTestContext.assertNoOrEmptyResult(context.sleep(sessionTimeout / 2)); + +HeartbeatResponseData heartbeatResponse = context.sendClassicGroupHeartbeat(request).response(); +assertEquals(Errors.NONE.code(), heartbeatResponse.errorCode()); +context.assertSessionTimeout(groupId, memberId, sessionTimeout); + +// Advance clock by 1/2 of session timeout. + GroupMetadataManagerTestContext.assertNoOrEmptyResult(context.sleep(sessionTimeout / 2)); + +heartbeatResponse = context.sendClassicGroupHeartbeat(request).response(); +assertEquals(Errors.NONE.code(), heartbeatResponse.errorCode()); +context.assertSessionTimeout(groupId, memberId, sessionTimeout); +} + +@Test +public void testClassicGroupHeartbeatToConsumerGroupRebalanceInProgress() throws Exception { +String groupId = "group-id"; +String memberId1 = Uuid.randomUuid().toString(); +String memberId2 = Uuid.randomUuid().toString(); +String memberId3 = Uuid.randomUuid().toString(); +Uuid fooTopicId = Uuid.randomUuid(); +Uuid barTopicId = Uuid.randomUuid(); +int sessionTimeout = 5000; +int rebalanceTimeout = 1; + +List protocols = Collections.singletonList( +new ConsumerGroupMemberMetadataValue.ClassicProtocol() +.setName("range") + .setMetadata(Utils.toArray(ConsumerProtocol.serializeSubscription( +new ConsumerPartitionAssignor.Subscription( +Arrays.asList("foo"), +null, +Collections.emptyList() +) +))) +); + +// Member 1 has a member epoch smaller than the group epoch. +ConsumerGroupMember member1 = new ConsumerGroupMember.Builder(memberId1) +.setRebalanceTimeoutMs(rebalanceTimeout) +.setClassicMemberMetadata( +new ConsumerGroupMemberMetadataValue.ClassicMemberMetadata() +.setSessionTimeoutMs(sessionTimeout) +.setSupportedProtocols(protocols) +) +.setMemberEpoch(9) +.build(); + +// Member 2 has unrevoked partition. +ConsumerGroupMember member2 = new ConsumerGroupMember.Builder(memberId2) +.setState(MemberState.UNREVOKED_PARTITIONS) +.setRebalanceTimeoutMs(rebalanceTimeout) +
Re: [PR] KAFKA-16308 [1/N]: Create FeatureVersion interface and add `--feature` flag and handling to StorageTool [kafka]
jolshan commented on code in PR #15685: URL: https://github.com/apache/kafka/pull/15685#discussion_r1610472137 ## server-common/src/main/java/org/apache/kafka/server/common/Features.java: ## @@ -64,14 +63,16 @@ public enum Features { PRODUCTION_FEATURES = Arrays.stream(FEATURES).filter(feature -> feature.usedInProduction).collect(Collectors.toList()); +PRODUCTION_FEATURE_NAMES = PRODUCTION_FEATURES.stream().map(feature -> +feature.name).collect(Collectors.toList()); } public String featureName() { return name; } public FeatureVersion[] features() { Review Comment: sure -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-16308 [1/N]: Create FeatureVersion interface and add `--feature` flag and handling to StorageTool [kafka]
jolshan commented on code in PR #15685: URL: https://github.com/apache/kafka/pull/15685#discussion_r1610471028 ## server-common/src/main/java/org/apache/kafka/server/common/FeatureVersion.java: ## @@ -0,0 +1,49 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.kafka.server.common; + +import java.util.Map; + +public interface FeatureVersion { + +/** + * The level of the feature. 0 means the feature is disabled. + */ +short featureLevel(); + +/** + * The name of the feature. + */ +String featureName(); + +/** + * The minimum metadata version which sets this feature version as default. When bootstrapping using only + * a metadata version, a reasonable default for all other features is chosen based on this value. + * This should be defined as the next metadata version to be released when the feature version becomes production ready. + * (Ie, if the current production MV is 17 when a feature version is released, its mapping should be to MV 18) Review Comment: How do we know the release version when we create the feature. The release version can change, so should we change it as each new MV is added? I suppose you mean the latest MV when the feature is released. This is also what I did but adding 1 to every MV for the reasons I explained [here](https://github.com/apache/kafka/pull/15685#discussion_r1610248735) -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-16308 [1/N]: Create FeatureVersion interface and add `--feature` flag and handling to StorageTool [kafka]
jolshan commented on code in PR #15685: URL: https://github.com/apache/kafka/pull/15685#discussion_r1610471028 ## server-common/src/main/java/org/apache/kafka/server/common/FeatureVersion.java: ## @@ -0,0 +1,49 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.kafka.server.common; + +import java.util.Map; + +public interface FeatureVersion { + +/** + * The level of the feature. 0 means the feature is disabled. + */ +short featureLevel(); + +/** + * The name of the feature. + */ +String featureName(); + +/** + * The minimum metadata version which sets this feature version as default. When bootstrapping using only + * a metadata version, a reasonable default for all other features is chosen based on this value. + * This should be defined as the next metadata version to be released when the feature version becomes production ready. + * (Ie, if the current production MV is 17 when a feature version is released, its mapping should be to MV 18) Review Comment: How do we know the release version when we create the feature. The release version can change, so should we change it as each new MV is added? This is also what I did but adding 1 to every MV for the reasons I explained [here](https://github.com/apache/kafka/pull/15685#discussion_r1610248735) -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-16793: Heartbeat API for upgrading ConsumerGroup [kafka]
jeffkbkim commented on code in PR #15988: URL: https://github.com/apache/kafka/pull/15988#discussion_r1610471371 ## group-coordinator/src/test/java/org/apache/kafka/coordinator/group/GroupMetadataManagerTest.java: ## @@ -12401,6 +12436,363 @@ public void testClassicGroupSyncToConsumerGroupRebalanceInProgress() throws Exce .withProtocolName("range") .build()) ); +context.assertJoinTimeout(groupId, memberId, 1); +} + +@Test +public void testClassicGroupHeartbeatToConsumerGroupMaintainsSession() throws Exception { +String groupId = "group-id"; +String memberId = Uuid.randomUuid().toString(); +int sessionTimeout = 5000; + +List protocols = Collections.singletonList( +new ConsumerGroupMemberMetadataValue.ClassicProtocol() +.setName("range") + .setMetadata(Utils.toArray(ConsumerProtocol.serializeSubscription( +new ConsumerPartitionAssignor.Subscription( +Arrays.asList("foo"), +null, +Collections.emptyList() +) +))) +); + +// Consumer group with a member using the classic protocol. +GroupMetadataManagerTestContext context = new GroupMetadataManagerTestContext.Builder() +.withAssignors(Collections.singletonList(new MockPartitionAssignor("range"))) +.withConsumerGroup(new ConsumerGroupBuilder(groupId, 10) +.withMember(new ConsumerGroupMember.Builder(memberId) +.setClassicMemberMetadata( +new ConsumerGroupMemberMetadataValue.ClassicMemberMetadata() +.setSessionTimeoutMs(sessionTimeout) +.setSupportedProtocols(protocols) +) +.setMemberEpoch(10) +.build())) +.build(); + +// Heartbeat to schedule the session timeout. +HeartbeatRequestData request = new HeartbeatRequestData() +.setGroupId(groupId) +.setMemberId(memberId) +.setGenerationId(10); +context.sendClassicGroupHeartbeat(request); +context.assertSessionTimeout(groupId, memberId, sessionTimeout); + +// Advance clock by 1/2 of session timeout. + GroupMetadataManagerTestContext.assertNoOrEmptyResult(context.sleep(sessionTimeout / 2)); + +HeartbeatResponseData heartbeatResponse = context.sendClassicGroupHeartbeat(request).response(); +assertEquals(Errors.NONE.code(), heartbeatResponse.errorCode()); +context.assertSessionTimeout(groupId, memberId, sessionTimeout); + +// Advance clock by 1/2 of session timeout. + GroupMetadataManagerTestContext.assertNoOrEmptyResult(context.sleep(sessionTimeout / 2)); + +heartbeatResponse = context.sendClassicGroupHeartbeat(request).response(); +assertEquals(Errors.NONE.code(), heartbeatResponse.errorCode()); +context.assertSessionTimeout(groupId, memberId, sessionTimeout); +} + +@Test +public void testClassicGroupHeartbeatToConsumerGroupRebalanceInProgress() throws Exception { +String groupId = "group-id"; +String memberId1 = Uuid.randomUuid().toString(); +String memberId2 = Uuid.randomUuid().toString(); +String memberId3 = Uuid.randomUuid().toString(); +Uuid fooTopicId = Uuid.randomUuid(); +Uuid barTopicId = Uuid.randomUuid(); +int sessionTimeout = 5000; +int rebalanceTimeout = 1; + +List protocols = Collections.singletonList( +new ConsumerGroupMemberMetadataValue.ClassicProtocol() +.setName("range") + .setMetadata(Utils.toArray(ConsumerProtocol.serializeSubscription( +new ConsumerPartitionAssignor.Subscription( +Arrays.asList("foo"), +null, +Collections.emptyList() +) +))) +); + +// Member 1 has a member epoch smaller than the group epoch. +ConsumerGroupMember member1 = new ConsumerGroupMember.Builder(memberId1) +.setRebalanceTimeoutMs(rebalanceTimeout) +.setClassicMemberMetadata( +new ConsumerGroupMemberMetadataValue.ClassicMemberMetadata() +.setSessionTimeoutMs(sessionTimeout) +.setSupportedProtocols(protocols) +) +.setMemberEpoch(9) +.build(); + +// Member 2 has unrevoked partition. +ConsumerGroupMember member2 = new ConsumerGroupMember.Builder(memberId2) +.setState(MemberState.UNREVOKED_PARTITIONS) +.setRebalanceTimeoutMs(rebalanceTimeout) +
[jira] [Updated] (KAFKA-16820) Kafka Broker fails to connect to Kraft Controller with no DNS matching
[ https://issues.apache.org/jira/browse/KAFKA-16820?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Arushi Helms updated KAFKA-16820: - Description: We are migrating our Kafka cluster from zookeeper to Kraft mode. We are running individual brokers and controllers with TLS enabled and IPs are given for communication. TLS enabled setup works fine among the brokers and the certificate looks something like: {noformat} Common Name: *.kafka.service.consul Subject Alternative Names: *.kafka.service.consul, IP Address:10.87.171.84{noformat} Note: * The DNS name for the node does not match the CN but since we are using IPs as communication, we have provided IPs as SAN. * Same with the controllers, IPs are given as SAN in the certificate. * Issue is not related to the migration so just sharing configuration relevant for the TLS piece. In the current setup I am running 3 brokers and 3 controllers. Relevant controller configurations from one of the controllers: {noformat} KAFKA_CFG_PROCESS_ROLES=controller KAFKA_KRAFT_CLUSTER_ID=5kztjhJ4SxSu-kdiEYDUow KAFKA_CFG_NODE_ID=6 KAFKA_CFG_CONTROLLER_QUORUM_VOTERS=4@10.87.170.83:9097,5@10.87.170.9:9097,6@10.87.170.6:9097 KAFKA_CFG_CONTROLLER_LISTENER_NAMES=CONTROLLER KAFKA_CFG_INTER_BROKER_LISTENER_NAME=INSIDE_SSL KAFKA_CFG_LISTENER_SECURITY_PROTOCOL_MAP=CONTROLLER:SSL,INSIDE_SSL:SSL KAFKA_CFG_LISTENERS=CONTROLLER://10.87.170.6:9097{noformat} Relevant broker configuration from one of the brokers: {noformat} KAFKA_CFG_CONTROLLER_LISTENER_NAMES=CONTROLLER KAFKA_CFG_INTER_BROKER_LISTENER_NAME=INSIDE_SSL KAFKA_CFG_CONTROLLER_QUORUM_VOTERS=4@10.87.170.83:9097,5@10.87.170.9:9097,6@10.87.170.6:9097 KAFKA_CFG_PROCESS_ROLES=broker KAFKA_CFG_NODE_ID=3 KAFKA_CFG_LISTENER_SECURITY_PROTOCOL_MAP=INSIDE_SSL:SSL,OUTSIDE_SSL:SSL,CONTROLLER:SSL KAFKA_CFG_LISTENERS=INSIDE_SSL://10.87.170.78:9093,OUTSIDE_SSL://10.87.170.78:9096 KAFKA_CFG_ADVERTISED_LISTENERS=INSIDE_SSL://10.87.170.78:9093,OUTSIDE_SSL://10.87.170.78:9096{noformat} ISSUE 1: With this setup Kafka broker is failing to connect to the controller, see the following error: {noformat} 2024-05-22 17:53:46,413] ERROR [broker-2-to-controller-heartbeat-channel-manager]: Request BrokerRegistrationRequestData(brokerId=2, clusterId='5kztjhJ4SxSu-kdiEYDUow', incarnationId=7741fgH6T4SQqGsho8E6mw, listeners=[Listener(name='INSIDE_SSL', host='10.87.170.81', port=9093, securityProtocol=1), Listener(name='INSIDE', host='10.87.170.81', port=9094, securityProtocol=0), Listener(name='OUTSIDE', host='10.87.170.81', port=9092, securityProtocol=0), Listener(name='OUTSIDE_SSL', host='10.87.170.81', port=9096, securityProtocol=1)], features=[Feature(name='metadata.version', minSupportedVersion=1, maxSupportedVersion=19)], rack=null, isMigratingZkBroker=false, logDirs=[TJssfKDD-iBFYfIYCKOcew], previousBrokerEpoch=-1) failed due to authentication error with controller (kafka.server.NodeToControllerRequestThread)org.apache.kafka.common.errors.SslAuthenticationException: SSL handshake failedCaused by: javax.net.ssl.SSLHandshakeException: No subject alternative DNS name matching cp-internal-onecloud-kfkc1.node.cp-internal-onecloud.consul found. at java.base/sun.security.ssl.Alert.createSSLException(Alert.java:131) at java.base/sun.security.ssl.TransportContext.fatal(TransportContext.java:378) at java.base/sun.security.ssl.TransportContext.fatal(TransportContext.java:321) at java.base/sun.security.ssl.TransportContext.fatal(TransportContext.java:316) at java.base/sun.security.ssl.CertificateMessage$T13CertificateConsumer.checkServerCerts(CertificateMessage.java:1351) at java.base/sun.security.ssl.CertificateMessage$T13CertificateConsumer.onConsumeCertificate(CertificateMessage.java:1226) at java.base/sun.security.ssl.CertificateMessage$T13CertificateConsumer.consume(CertificateMessage.java:1169) at java.base/sun.security.ssl.SSLHandshake.consume(SSLHandshake.java:396) at java.base/sun.security.ssl.HandshakeContext.dispatch(HandshakeContext.java:480) at java.base/sun.security.ssl.SSLEngineImpl$DelegatedTask$DelegatedAction.run(SSLEngineImpl.java:1277) at java.base/sun.security.ssl.SSLEngineImpl$DelegatedTask$DelegatedAction.run(SSLEngineImpl.java:1264) at java.base/java.security.AccessController.doPrivileged(AccessController.java:712) at java.base/sun.security.ssl.SSLEngineImpl$DelegatedTask.run(SSLEngineImpl.java:1209) at org.apache.kafka.common.network.SslTransportLayer.runDelegatedTasks(SslTransportLayer.java:435) at org.apache.kafka.common.network.SslTransportLayer.handshakeUnwrap(SslTransportLayer.java:523) at org.apache.kafka.common.network.SslTransportLayer.doHandshake(SslTransportLayer.java:373) at org.apache.kafka.common.network.SslTransportLayer.handshake(SslTransportLayer.java:293) at org.apache.kafka.common.network.KafkaChannel.prepare(KafkaChannel.java:178)
Re: [PR] Update RemoteLogManager configuration in broker server - KAFKA-16790 [kafka]
muralibasani commented on PR #16005: URL: https://github.com/apache/kafka/pull/16005#issuecomment-2125492819 @nikramakrishnan I think calling applyDelta on replicaManager wouldn't invoke configure on rlmm. Probably we should start mocking from BrokerServer.startup method, and create a topic, which invoke both configure and applyDelta wdyt ? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-16516: Fix the controller node provider for broker to control channel [kafka]
jsancio commented on code in PR #16008: URL: https://github.com/apache/kafka/pull/16008#discussion_r1610457959 ## core/src/main/scala/kafka/server/NodeToControllerChannelManager.scala: ## @@ -120,15 +118,15 @@ object RaftControllerNodeProvider { */ class RaftControllerNodeProvider( val raftManager: RaftManager[ApiMessageAndVersion], - controllerQuorumVoterNodes: Seq[Node], val listenerName: ListenerName, val securityProtocol: SecurityProtocol, val saslMechanism: String ) extends ControllerNodeProvider with Logging { - private val idToNode = controllerQuorumVoterNodes.map(node => node.id() -> node).toMap + + private def idToNode(id: Int): Option[Node] = raftManager.voterNode(id, listenerName.value()) override def getControllerInfo(): ControllerInformation = - ControllerInformation(raftManager.leaderAndEpoch.leaderId.asScala.map(idToNode), + ControllerInformation(raftManager.leaderAndEpoch.leaderId.asScala.flatMap(idToNode), Review Comment: Okay. In a future change PR, I'll change this to `Option[Node] RaftManager.leaderNode()` since it looks like the node is always the leader node a not some arbitrary voters. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Updated] (KAFKA-16820) Kafka Broker fails to connect to Kraft Controller with no DNS matching
[ https://issues.apache.org/jira/browse/KAFKA-16820?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Arushi Helms updated KAFKA-16820: - Description: We are migrating our Kafka cluster from zookeeper to Kraft mode. We are running individual brokers and controllers with TLS enabled and IPs are given for communication. TLS enabled setup works fine among the brokers and the certificate looks something like: h5. {noformat} Common Name: *.kafka.service.consul Subject Alternative Names: *.kafka.service.consul, IP Address:10.87.171.84{noformat} Note: The DNS name for the node does not match the CN but since we are using IPs as communication, we have provided IPs as SAN. Same with the controllers, IPs are given as SAN in the certificate. In the current setup I am running 3 brokers and 3 controllers. Relevant controller configurations from one of the controllers: {noformat} KAFKA_CFG_PROCESS_ROLES=controller KAFKA_KRAFT_CLUSTER_ID=5kztjhJ4SxSu-kdiEYDUow KAFKA_CFG_NODE_ID=6 KAFKA_CFG_CONTROLLER_QUORUM_VOTERS=4@10.87.170.83:9097,5@10.87.170.9:9097,6@10.87.170.6:9097 KAFKA_CFG_CONTROLLER_LISTENER_NAMES=CONTROLLER KAFKA_CFG_INTER_BROKER_LISTENER_NAME=INSIDE_SSL KAFKA_CFG_LISTENER_SECURITY_PROTOCOL_MAP=CONTROLLER:SSL,INSIDE_SSL:SSL KAFKA_CFG_LISTENERS=CONTROLLER://10.87.170.6:9097{noformat} Relevant broker configuration from one of the brokers: {noformat} KAFKA_CFG_CONTROLLER_LISTENER_NAMES=CONTROLLER KAFKA_CFG_INTER_BROKER_LISTENER_NAME=INSIDE_SSL KAFKA_CFG_CONTROLLER_QUORUM_VOTERS=4@10.87.170.83:9097,5@10.87.170.9:9097,6@10.87.170.6:9097 KAFKA_CFG_PROCESS_ROLES=broker KAFKA_CFG_NODE_ID=3 KAFKA_CFG_LISTENER_SECURITY_PROTOCOL_MAP=INSIDE_SSL:SSL,OUTSIDE_SSL:SSL,CONTROLLER:SSL KAFKA_CFG_LISTENERS=INSIDE_SSL://10.87.170.78:9093,OUTSIDE_SSL://10.87.170.78:9096 KAFKA_CFG_ADVERTISED_LISTENERS=INSIDE_SSL://10.87.170.78:9093,OUTSIDE_SSL://10.87.170.78:9096{noformat} ISSUE 1: With this setup Kafka broker is failing to connect to the controller, see the following error: {noformat} 2024-05-22 17:53:46,413] ERROR [broker-2-to-controller-heartbeat-channel-manager]: Request BrokerRegistrationRequestData(brokerId=2, clusterId='5kztjhJ4SxSu-kdiEYDUow', incarnationId=7741fgH6T4SQqGsho8E6mw, listeners=[Listener(name='INSIDE_SSL', host='10.87.170.81', port=9093, securityProtocol=1), Listener(name='INSIDE', host='10.87.170.81', port=9094, securityProtocol=0), Listener(name='OUTSIDE', host='10.87.170.81', port=9092, securityProtocol=0), Listener(name='OUTSIDE_SSL', host='10.87.170.81', port=9096, securityProtocol=1)], features=[Feature(name='metadata.version', minSupportedVersion=1, maxSupportedVersion=19)], rack=null, isMigratingZkBroker=false, logDirs=[TJssfKDD-iBFYfIYCKOcew], previousBrokerEpoch=-1) failed due to authentication error with controller (kafka.server.NodeToControllerRequestThread)org.apache.kafka.common.errors.SslAuthenticationException: SSL handshake failedCaused by: javax.net.ssl.SSLHandshakeException: No subject alternative DNS name matching cp-internal-onecloud-kfkc1.node.cp-internal-onecloud.consul found. at java.base/sun.security.ssl.Alert.createSSLException(Alert.java:131) at java.base/sun.security.ssl.TransportContext.fatal(TransportContext.java:378) at java.base/sun.security.ssl.TransportContext.fatal(TransportContext.java:321) at java.base/sun.security.ssl.TransportContext.fatal(TransportContext.java:316) at java.base/sun.security.ssl.CertificateMessage$T13CertificateConsumer.checkServerCerts(CertificateMessage.java:1351) at java.base/sun.security.ssl.CertificateMessage$T13CertificateConsumer.onConsumeCertificate(CertificateMessage.java:1226) at java.base/sun.security.ssl.CertificateMessage$T13CertificateConsumer.consume(CertificateMessage.java:1169) at java.base/sun.security.ssl.SSLHandshake.consume(SSLHandshake.java:396) at java.base/sun.security.ssl.HandshakeContext.dispatch(HandshakeContext.java:480) at java.base/sun.security.ssl.SSLEngineImpl$DelegatedTask$DelegatedAction.run(SSLEngineImpl.java:1277) at java.base/sun.security.ssl.SSLEngineImpl$DelegatedTask$DelegatedAction.run(SSLEngineImpl.java:1264) at java.base/java.security.AccessController.doPrivileged(AccessController.java:712) at java.base/sun.security.ssl.SSLEngineImpl$DelegatedTask.run(SSLEngineImpl.java:1209) at org.apache.kafka.common.network.SslTransportLayer.runDelegatedTasks(SslTransportLayer.java:435) at org.apache.kafka.common.network.SslTransportLayer.handshakeUnwrap(SslTransportLayer.java:523) at org.apache.kafka.common.network.SslTransportLayer.doHandshake(SslTransportLayer.java:373) at org.apache.kafka.common.network.SslTransportLayer.handshake(SslTransportLayer.java:293) at org.apache.kafka.common.network.KafkaChannel.prepare(KafkaChannel.java:178) at org.apache.kafka.common.network.Selector.pollSelectionKeys(Selector.java:543) at
[jira] [Created] (KAFKA-16820) Kafka Broker fails to connect to Kraft Controller with no DNS matching
Arushi Helms created KAFKA-16820: Summary: Kafka Broker fails to connect to Kraft Controller with no DNS matching Key: KAFKA-16820 URL: https://issues.apache.org/jira/browse/KAFKA-16820 Project: Kafka Issue Type: Bug Components: kraft Affects Versions: 3.6.1, 3.7.0, 3.8.0 Reporter: Arushi Helms Attachments: Screenshot 2024-05-22 at 1.09.11 PM-1.png We are migrating our Kafka cluster from zookeeper to Kraft mode. We are running individual brokers and controllers with TLS enabled and IPs are given for communication. TLS enabled setup works fine among the brokers and the certificate looks something like: h5. {noformat} Common Name: *.kafka.service.consul Subject Alternative Names: *.kafka.service.consul, IP Address:10.87.171.84{noformat} Note: The DNS name for the node does not match the CN but since we are using IPs as communication, we have provided IPs as SAN. Same with the controllers, IPs are given as SAN in the certificate. In the current setup I am running 3 brokers and 3 controllers. Relevant controller configurations from one of the controllers: {{}} {noformat} KAFKA_CFG_PROCESS_ROLES=controller KAFKA_KRAFT_CLUSTER_ID=5kztjhJ4SxSu-kdiEYDUow KAFKA_CFG_NODE_ID=6 KAFKA_CFG_CONTROLLER_QUORUM_VOTERS=4@10.87.170.83:9097,5@10.87.170.9:9097,6@10.87.170.6:9097 KAFKA_CFG_CONTROLLER_LISTENER_NAMES=CONTROLLER KAFKA_CFG_INTER_BROKER_LISTENER_NAME=INSIDE_SSL KAFKA_CFG_LISTENER_SECURITY_PROTOCOL_MAP=CONTROLLER:SSL,INSIDE_SSL:SSL KAFKA_CFG_LISTENERS=CONTROLLER://10.87.170.6:9097{noformat} {{}} Relevant broker configuration from one of the brokers: {noformat} KAFKA_CFG_CONTROLLER_LISTENER_NAMES=CONTROLLER KAFKA_CFG_INTER_BROKER_LISTENER_NAME=INSIDE_SSL KAFKA_CFG_CONTROLLER_QUORUM_VOTERS=4@10.87.170.83:9097,5@10.87.170.9:9097,6@10.87.170.6:9097 KAFKA_CFG_PROCESS_ROLES=broker KAFKA_CFG_NODE_ID=3 KAFKA_CFG_LISTENER_SECURITY_PROTOCOL_MAP=INSIDE_SSL:SSL,OUTSIDE_SSL:SSL,CONTROLLER:SSL KAFKA_CFG_LISTENERS=INSIDE_SSL://10.87.170.78:9093,OUTSIDE_SSL://10.87.170.78:9096 KAFKA_CFG_ADVERTISED_LISTENERS=INSIDE_SSL://10.87.170.78:9093,OUTSIDE_SSL://10.87.170.78:9096{noformat} {{}} ISSUE 1: With this setup Kafka broker is failing to connect to the controller, see the following error: {noformat} 2024-05-22 17:53:46,413] ERROR [broker-2-to-controller-heartbeat-channel-manager]: Request BrokerRegistrationRequestData(brokerId=2, clusterId='5kztjhJ4SxSu-kdiEYDUow', incarnationId=7741fgH6T4SQqGsho8E6mw, listeners=[Listener(name='INSIDE_SSL', host='10.87.170.81', port=9093, securityProtocol=1), Listener(name='INSIDE', host='10.87.170.81', port=9094, securityProtocol=0), Listener(name='OUTSIDE', host='10.87.170.81', port=9092, securityProtocol=0), Listener(name='OUTSIDE_SSL', host='10.87.170.81', port=9096, securityProtocol=1)], features=[Feature(name='metadata.version', minSupportedVersion=1, maxSupportedVersion=19)], rack=null, isMigratingZkBroker=false, logDirs=[TJssfKDD-iBFYfIYCKOcew], previousBrokerEpoch=-1) failed due to authentication error with controller (kafka.server.NodeToControllerRequestThread)org.apache.kafka.common.errors.SslAuthenticationException: SSL handshake failedCaused by: javax.net.ssl.SSLHandshakeException: No subject alternative DNS name matching cp-internal-onecloud-kfkc1.node.cp-internal-onecloud.consul found. at java.base/sun.security.ssl.Alert.createSSLException(Alert.java:131) at java.base/sun.security.ssl.TransportContext.fatal(TransportContext.java:378) at java.base/sun.security.ssl.TransportContext.fatal(TransportContext.java:321) at java.base/sun.security.ssl.TransportContext.fatal(TransportContext.java:316) at java.base/sun.security.ssl.CertificateMessage$T13CertificateConsumer.checkServerCerts(CertificateMessage.java:1351) at java.base/sun.security.ssl.CertificateMessage$T13CertificateConsumer.onConsumeCertificate(CertificateMessage.java:1226) at java.base/sun.security.ssl.CertificateMessage$T13CertificateConsumer.consume(CertificateMessage.java:1169) at java.base/sun.security.ssl.SSLHandshake.consume(SSLHandshake.java:396) at java.base/sun.security.ssl.HandshakeContext.dispatch(HandshakeContext.java:480) at java.base/sun.security.ssl.SSLEngineImpl$DelegatedTask$DelegatedAction.run(SSLEngineImpl.java:1277) at java.base/sun.security.ssl.SSLEngineImpl$DelegatedTask$DelegatedAction.run(SSLEngineImpl.java:1264) at java.base/java.security.AccessController.doPrivileged(AccessController.java:712) at java.base/sun.security.ssl.SSLEngineImpl$DelegatedTask.run(SSLEngineImpl.java:1209) at org.apache.kafka.common.network.SslTransportLayer.runDelegatedTasks(SslTransportLayer.java:435) at org.apache.kafka.common.network.SslTransportLayer.handshakeUnwrap(SslTransportLayer.java:523) at
Re: [PR] KAFKA-16793: Heartbeat API for upgrading ConsumerGroup [kafka]
jeffkbkim commented on code in PR #15988: URL: https://github.com/apache/kafka/pull/15988#discussion_r1610462585 ## group-coordinator/src/test/java/org/apache/kafka/coordinator/group/GroupMetadataManagerTest.java: ## @@ -12401,6 +12436,363 @@ public void testClassicGroupSyncToConsumerGroupRebalanceInProgress() throws Exce .withProtocolName("range") .build()) ); +context.assertJoinTimeout(groupId, memberId, 1); +} + +@Test +public void testClassicGroupHeartbeatToConsumerGroupMaintainsSession() throws Exception { +String groupId = "group-id"; +String memberId = Uuid.randomUuid().toString(); +int sessionTimeout = 5000; + +List protocols = Collections.singletonList( +new ConsumerGroupMemberMetadataValue.ClassicProtocol() +.setName("range") + .setMetadata(Utils.toArray(ConsumerProtocol.serializeSubscription( +new ConsumerPartitionAssignor.Subscription( +Arrays.asList("foo"), +null, +Collections.emptyList() +) +))) +); + +// Consumer group with a member using the classic protocol. +GroupMetadataManagerTestContext context = new GroupMetadataManagerTestContext.Builder() +.withAssignors(Collections.singletonList(new MockPartitionAssignor("range"))) +.withConsumerGroup(new ConsumerGroupBuilder(groupId, 10) +.withMember(new ConsumerGroupMember.Builder(memberId) +.setClassicMemberMetadata( +new ConsumerGroupMemberMetadataValue.ClassicMemberMetadata() +.setSessionTimeoutMs(sessionTimeout) +.setSupportedProtocols(protocols) +) +.setMemberEpoch(10) +.build())) +.build(); + +// Heartbeat to schedule the session timeout. +HeartbeatRequestData request = new HeartbeatRequestData() +.setGroupId(groupId) +.setMemberId(memberId) +.setGenerationId(10); +context.sendClassicGroupHeartbeat(request); +context.assertSessionTimeout(groupId, memberId, sessionTimeout); + +// Advance clock by 1/2 of session timeout. + GroupMetadataManagerTestContext.assertNoOrEmptyResult(context.sleep(sessionTimeout / 2)); + +HeartbeatResponseData heartbeatResponse = context.sendClassicGroupHeartbeat(request).response(); +assertEquals(Errors.NONE.code(), heartbeatResponse.errorCode()); +context.assertSessionTimeout(groupId, memberId, sessionTimeout); + +// Advance clock by 1/2 of session timeout. + GroupMetadataManagerTestContext.assertNoOrEmptyResult(context.sleep(sessionTimeout / 2)); + +heartbeatResponse = context.sendClassicGroupHeartbeat(request).response(); +assertEquals(Errors.NONE.code(), heartbeatResponse.errorCode()); +context.assertSessionTimeout(groupId, memberId, sessionTimeout); +} + +@Test +public void testClassicGroupHeartbeatToConsumerGroupRebalanceInProgress() throws Exception { +String groupId = "group-id"; +String memberId1 = Uuid.randomUuid().toString(); +String memberId2 = Uuid.randomUuid().toString(); +String memberId3 = Uuid.randomUuid().toString(); +Uuid fooTopicId = Uuid.randomUuid(); +Uuid barTopicId = Uuid.randomUuid(); +int sessionTimeout = 5000; +int rebalanceTimeout = 1; + +List protocols = Collections.singletonList( +new ConsumerGroupMemberMetadataValue.ClassicProtocol() +.setName("range") + .setMetadata(Utils.toArray(ConsumerProtocol.serializeSubscription( +new ConsumerPartitionAssignor.Subscription( +Arrays.asList("foo"), Review Comment: nit: Collections.singletonList("foo") -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-16793: Heartbeat API for upgrading ConsumerGroup [kafka]
jeffkbkim commented on code in PR #15988: URL: https://github.com/apache/kafka/pull/15988#discussion_r1610460306 ## group-coordinator/src/test/java/org/apache/kafka/coordinator/group/GroupMetadataManagerTest.java: ## @@ -12401,6 +12436,363 @@ public void testClassicGroupSyncToConsumerGroupRebalanceInProgress() throws Exce .withProtocolName("range") .build()) ); +context.assertJoinTimeout(groupId, memberId, 1); +} + +@Test +public void testClassicGroupHeartbeatToConsumerGroupMaintainsSession() throws Exception { +String groupId = "group-id"; +String memberId = Uuid.randomUuid().toString(); +int sessionTimeout = 5000; + +List protocols = Collections.singletonList( +new ConsumerGroupMemberMetadataValue.ClassicProtocol() +.setName("range") + .setMetadata(Utils.toArray(ConsumerProtocol.serializeSubscription( +new ConsumerPartitionAssignor.Subscription( +Arrays.asList("foo"), Review Comment: nit: i think we can use `Collections.singletonList("foo"),` -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-16252; Dynamic KRaft network manager and channel [kafka]
hachikuji commented on code in PR #15986: URL: https://github.com/apache/kafka/pull/15986#discussion_r1610398594 ## core/src/test/scala/unit/kafka/raft/RaftManagerTest.scala: ## @@ -118,6 +120,10 @@ class RaftManagerTest { new Metrics(Time.SYSTEM), Option.empty, CompletableFuture.completedFuture(QuorumConfig.parseVoterConnections(config.quorumVoters)), + config.quorumBootstrapServers Review Comment: nit: we seem to have this same little snippet in a few places. Is there somewhere we could add a helper? ## core/src/test/resources/log4j.properties: ## @@ -20,6 +20,8 @@ log4j.appender.stdout.layout.ConversionPattern=[%d] %p %m (%c:%L)%n log4j.logger.kafka=WARN log4j.logger.org.apache.kafka=WARN +# TODO; remove this line Review Comment: Reminder about TODO ## raft/src/main/java/org/apache/kafka/raft/QuorumConfig.java: ## @@ -231,4 +266,26 @@ public String toString() { return "non-empty list"; } } + +public static class ControllerQuorumBootstrapServersValidator implements ConfigDef.Validator { +@Override +public void ensureValid(String name, Object value) { +if (value == null) { +throw new ConfigException(name, null); +} + +@SuppressWarnings("unchecked") +List entries = (List) value; + +// Attempt to parse the connect strings +for (String entry : entries) { +QuorumConfig.parseBootstrapServer(entry); Review Comment: nit: drop unnecessary `QuorumConfig` prefix? ## core/src/test/scala/unit/kafka/server/KafkaConfigTest.scala: ## @@ -1366,6 +1368,26 @@ class KafkaConfigTest { assertEquals(expectedVoters, addresses) } + @Test + def testParseQuorumBootstrapServers(): Unit = { Review Comment: Perhaps we should have some invalid test cases as well? ## raft/src/main/java/org/apache/kafka/raft/RequestManager.java: ## @@ -17,108 +17,196 @@ package org.apache.kafka.raft; import java.util.ArrayList; +import java.util.Collection; import java.util.HashMap; -import java.util.List; import java.util.Map; -import java.util.OptionalInt; +import java.util.Optional; import java.util.OptionalLong; import java.util.Random; -import java.util.Set; +import org.apache.kafka.common.Node; public class RequestManager { -private final Map connections = new HashMap<>(); -private final List voters = new ArrayList<>(); +private final Map connections = new HashMap<>(); +private final ArrayList bootstrapServers; private final int retryBackoffMs; private final int requestTimeoutMs; private final Random random; -public RequestManager(Set voterIds, - int retryBackoffMs, - int requestTimeoutMs, - Random random) { - +public RequestManager( +Collection bootstrapServers, +int retryBackoffMs, +int requestTimeoutMs, +Random random +) { +this.bootstrapServers = new ArrayList<>(bootstrapServers); this.retryBackoffMs = retryBackoffMs; this.requestTimeoutMs = requestTimeoutMs; -this.voters.addAll(voterIds); this.random = random; - -for (Integer voterId: voterIds) { -ConnectionState connection = new ConnectionState(voterId); -connections.put(voterId, connection); -} -} - -public ConnectionState getOrCreate(int id) { -return connections.computeIfAbsent(id, key -> new ConnectionState(id)); } -public OptionalInt findReadyVoter(long currentTimeMs) { -int startIndex = random.nextInt(voters.size()); -OptionalInt res = OptionalInt.empty(); -for (int i = 0; i < voters.size(); i++) { -int index = (startIndex + i) % voters.size(); -Integer voterId = voters.get(index); -ConnectionState connection = connections.get(voterId); -boolean isReady = connection.isReady(currentTimeMs); +public Optional findReadyBootstrapServer(long currentTimeMs) { +int startIndex = random.nextInt(bootstrapServers.size()); +Optional res = Optional.empty(); +for (int i = 0; i < bootstrapServers.size(); i++) { +int index = (startIndex + i) % bootstrapServers.size(); +Node node = bootstrapServers.get(index); -if (isReady) { -res = OptionalInt.of(voterId); -} else if (connection.inFlightCorrelationId.isPresent()) { -res = OptionalInt.empty(); +if (isReady(node, currentTimeMs)) { +res = Optional.of(node); +} else if (hasInflightRequest(node, currentTimeMs)) { +res = Optional.empty(); break; } } + return res; } -public long
Re: [PR] KAFKA-16793: Heartbeat API for upgrading ConsumerGroup [kafka]
jeffkbkim commented on code in PR #15988: URL: https://github.com/apache/kafka/pull/15988#discussion_r1610459144 ## group-coordinator/src/test/java/org/apache/kafka/coordinator/group/GroupMetadataManagerTest.java: ## @@ -12143,7 +12183,6 @@ public void testClassicGroupSyncToConsumerGroupWithAllConsumerProtocolVersions() // Consumer group with two members. // Member 1 uses the classic protocol and member 2 uses the consumer protocol. GroupMetadataManagerTestContext context = new GroupMetadataManagerTestContext.Builder() - .withConsumerGroupMigrationPolicy(ConsumerGroupMigrationPolicy.DOWNGRADE) Review Comment: why were these removed? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-16793: Heartbeat API for upgrading ConsumerGroup [kafka]
jeffkbkim commented on code in PR #15988: URL: https://github.com/apache/kafka/pull/15988#discussion_r1610456745 ## group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupMetadataManager.java: ## @@ -4274,6 +4343,81 @@ private void validateClassicGroupHeartbeat( } } +/** + * Handle a classic group HeartbeatRequest to a consumer group. A response with + * REBALANCE_IN_PROGRESS is returned if 1) the member epoch is smaller than the + * group epoch, 2) the member is in UNREVOKED_PARTITIONS, or 3) the member is in + * UNRELEASED_PARTITIONS and all its partitions pending assignment are free. + * + * @param group The ConsumerGroup. + * @param contextThe request context. + * @param requestThe actual Heartbeat request. + * + * @return The coordinator result that contains the heartbeat response. + */ +private CoordinatorResult classicGroupHeartbeatToConsumerGroup( +ConsumerGroup group, +RequestContext context, +HeartbeatRequestData request +) throws UnknownMemberIdException, FencedInstanceIdException, IllegalGenerationException { +String groupId = request.groupId(); +String memberId = request.memberId(); +String instanceId = request.groupInstanceId(); +ConsumerGroupMember member = validateConsumerGroupMember(group, memberId, instanceId); + +throwIfMemberDoesNotUseClassicProtocol(member); +throwIfGenerationIdUnmatched(memberId, member.memberEpoch(), request.generationId()); + +scheduleConsumerGroupSessionTimeout(groupId, memberId, member.classicProtocolSessionTimeout().get()); + +Errors error = Errors.NONE; +// The member should rejoin if any of the following conditions is met. +// 1) The group epoch is bumped so the member need to rejoin to catch up. +// 2) The member needs to revoke some partitions and rejoin to reconcile with the new epoch. +// 3) The member's partitions pending assignment are free, so it can rejoin to get the complete assignment. +if (member.memberEpoch() < group.groupEpoch() || +member.state() == MemberState.UNREVOKED_PARTITIONS || +(member.state() == MemberState.UNRELEASED_PARTITIONS && !group.waitingOnUnreleasedPartition(member))) { Review Comment: i'm not sure i fully understand this part. UNRELEASED_PARTITIONS means that the member is waiting on partitions. However, i'm guessing the helper checks that the latest state does in fact have all partitions released but we want it to rejoin to get the updated assignment. Is this correct? Will this member be updated to STABLE state in the next CurrentAssignmentBuilder#computeNextAssignment? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-16308 [1/N]: Create FeatureVersion interface and add `--feature` flag and handling to StorageTool [kafka]
artemlivshits commented on code in PR #15685: URL: https://github.com/apache/kafka/pull/15685#discussion_r1610437612 ## server-common/src/main/java/org/apache/kafka/server/common/TestFeatureVersion.java: ## @@ -0,0 +1,56 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.kafka.server.common; + +import java.util.Collections; +import java.util.Map; + +public enum TestFeatureVersion implements FeatureVersion { + +TEST_0(0, MetadataVersion.IBP_3_3_IV0, Collections.emptyMap()), Review Comment: Version 0 == "feature doesn't exist", does it need to be explicitly codified the enum? ## metadata/src/main/java/org/apache/kafka/controller/ClusterControlManager.java: ## @@ -470,8 +471,8 @@ BrokerFeature processRegistrationFeature( // A feature is not found in the finalizedFeature map if it is unknown to the controller or set to 0 (feature not enabled). // As more features roll out, it may be common to leave a feature disabled, so this log is debug level in the case Review Comment: Comment seems to be outdated w.r.t. latest logic? ## server-common/src/main/java/org/apache/kafka/server/common/FeatureVersion.java: ## @@ -0,0 +1,49 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.kafka.server.common; + +import java.util.Map; + +public interface FeatureVersion { + +/** + * The level of the feature. 0 means the feature is disabled. + */ +short featureLevel(); + +/** + * The name of the feature. + */ +String featureName(); + +/** + * The minimum metadata version which sets this feature version as default. When bootstrapping using only + * a metadata version, a reasonable default for all other features is chosen based on this value. + * This should be defined as the next metadata version to be released when the feature version becomes production ready. + * (Ie, if the current production MV is 17 when a feature version is released, its mapping should be to MV 18) Review Comment: Can we make it map to the release version? If not, can we add a comment explaining the logic? Intuitively, if we have a feature like - FOO(1, 42) // released version 1 when MV got bumped to 42 - FOO(2, 77) // released version 2 when MV got bumped to 77 - FOO(3, 77) // released version 3 at the same time as 2 we should be able to discover that if we got MV 45, then FOO=1 because 1 was available at 42 and 2&3 are not available yet. If we got 78 then it should be 3 because it was max available at 77. If we got MV 30 then we should get 0 (implicit) because it wasn't available yet. Using the same table, we can determine that if a feature version 1 was selected with MV 41, then it's invalid combination; similarly if feature 2 or 3 was selected with MV 45 it's invalid combination. And we can select 1, 2 or 3 when MV is 77. ## server-common/src/main/java/org/apache/kafka/server/common/Features.java: ## @@ -64,14 +63,16 @@ public enum Features { PRODUCTION_FEATURES = Arrays.stream(FEATURES).filter(feature -> feature.usedInProduction).collect(Collectors.toList()); +PRODUCTION_FEATURE_NAMES = PRODUCTION_FEATURES.stream().map(feature -> +feature.name).collect(Collectors.toList()); } public String featureName() { return name; }
Re: [PR] KAFKA-16452: Don't throw OOORE when converting the offset to metadata [kafka]
junrao commented on code in PR #15825: URL: https://github.com/apache/kafka/pull/15825#discussion_r1610452888 ## core/src/main/scala/kafka/server/DelayedFetch.scala: ## @@ -91,19 +91,24 @@ class DelayedFetch( // Go directly to the check for Case G if the message offsets are the same. If the log segment // has just rolled, then the high watermark offset will remain the same but be on the old segment, // which would incorrectly be seen as an instance of Case F. -if (endOffset.messageOffset != fetchOffset.messageOffset) { - if (endOffset.onOlderSegment(fetchOffset)) { -// Case F, this can happen when the new fetch operation is on a truncated leader -debug(s"Satisfying fetch $this since it is fetching later segments of partition $topicIdPartition.") -return forceComplete() +if (fetchOffset.messageOffset > endOffset.messageOffset) { + // Case F, this can happen when the new fetch operation is on a truncated leader + debug(s"Satisfying fetch $this since it is fetching later segments of partition $topicIdPartition.") + return forceComplete() +} else if (fetchOffset.messageOffset < endOffset.messageOffset) { + if (endOffset.messageOffsetOnly() || fetchOffset.messageOffsetOnly()) { +// If we don't know the position of the offset on log segments, just pessimistically assume that we +// only gained 1 byte when fetchOffset < endOffset, otherwise do nothing. This can happen when the +// high-watermark is stale, but should be rare. +accumulatedSize += 1 Review Comment: Yes, in that case, we just wait until endOffset moves to within the local log segments. If the timeout hits, we will just return empty. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-16452: Don't throw OOORE when converting the offset to metadata [kafka]
kamalcph commented on PR #15825: URL: https://github.com/apache/kafka/pull/15825#issuecomment-2125471280 @junrao Thanks for the review! Addressed all your comments. PTAL. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Commented] (KAFKA-15250) ConsumerNetworkThread is running tight loop
[ https://issues.apache.org/jira/browse/KAFKA-15250?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17848703#comment-17848703 ] Philip Nee commented on KAFKA-15250: After the inflight check fix. I think most request managers are backing up correctly. Closing this issue. > ConsumerNetworkThread is running tight loop > --- > > Key: KAFKA-15250 > URL: https://issues.apache.org/jira/browse/KAFKA-15250 > Project: Kafka > Issue Type: Bug > Components: clients, consumer >Reporter: Philip Nee >Assignee: Philip Nee >Priority: Critical > Labels: consumer-threading-refactor, events > Fix For: 3.8.0 > > > The DefaultBackgroundThread is running tight loops and wasting CPU cycles. I > think we need to reexamine the timeout pass to networkclientDelegate.poll. -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Resolved] (KAFKA-15250) ConsumerNetworkThread is running tight loop
[ https://issues.apache.org/jira/browse/KAFKA-15250?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Philip Nee resolved KAFKA-15250. Resolution: Fixed > ConsumerNetworkThread is running tight loop > --- > > Key: KAFKA-15250 > URL: https://issues.apache.org/jira/browse/KAFKA-15250 > Project: Kafka > Issue Type: Bug > Components: clients, consumer >Reporter: Philip Nee >Assignee: Philip Nee >Priority: Critical > Labels: consumer-threading-refactor, events > Fix For: 3.8.0 > > > The DefaultBackgroundThread is running tight loops and wasting CPU cycles. I > think we need to reexamine the timeout pass to networkclientDelegate.poll. -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Created] (KAFKA-16819) CoordinatorRequestManager seems to return 0ms during the coordinator discovery
Philip Nee created KAFKA-16819: -- Summary: CoordinatorRequestManager seems to return 0ms during the coordinator discovery Key: KAFKA-16819 URL: https://issues.apache.org/jira/browse/KAFKA-16819 Project: Kafka Issue Type: Bug Components: consumer Reporter: Philip Nee Assignee: Philip Nee In KAFKA-15250 we discovered the ConsumerNetworkThread is looping without much backoff. The in-flight check PR fixed a lot of it; however, during the coordinator discovery phase, CoordinatorRequestManager would keep on returning 0 before the coordinator node was found. The impact is minor but we should be expecting the coordinator manager to backoff until the exp backoff expired (so it should return around 100ms). -- This message was sent by Atlassian Jira (v8.20.10#820010)
Re: [PR] KAFKA-16452: Don't throw OOORE when converting the offset to metadata [kafka]
kamalcph commented on code in PR #15825: URL: https://github.com/apache/kafka/pull/15825#discussion_r1610404165 ## storage/src/main/java/org/apache/kafka/storage/internals/log/LogSegment.java: ## @@ -445,7 +445,7 @@ public FetchDataInfo read(long startOffset, int maxSize, long maxPosition, boole adjustedMaxSize = Math.max(maxSize, startOffsetAndSize.size); // return a log segment but with zero size in the case below -if (adjustedMaxSize == 0) +if (adjustedMaxSize == 0 || maxPosition == -1) Review Comment: LogSegment is in Java and using Optional as a method parameter in LogSegment#read shows warnings in the intelliJ and require some refactoring (passing `int` for `long` parameter gets converted implicitly but not with optional), so went with the negative value approach: ``` 'Optional' used as type for parameter 'maxPosition' ``` -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-16452: Don't throw OOORE when converting the offset to metadata [kafka]
kamalcph commented on code in PR #15825: URL: https://github.com/apache/kafka/pull/15825#discussion_r1610404165 ## storage/src/main/java/org/apache/kafka/storage/internals/log/LogSegment.java: ## @@ -445,7 +445,7 @@ public FetchDataInfo read(long startOffset, int maxSize, long maxPosition, boole adjustedMaxSize = Math.max(maxSize, startOffsetAndSize.size); // return a log segment but with zero size in the case below -if (adjustedMaxSize == 0) +if (adjustedMaxSize == 0 || maxPosition == -1) Review Comment: LogSegment is in Java and using Optional as a method parameter in LogSegment#read shows warnings in the intelliJ and require some refactoring (passing `int` for `long` parameter gets converted implicitly not with optional), so went with the negative value approach: ``` 'Optional' used as type for parameter 'maxPosition' ``` -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-16625: Reverse lookup map from topic partitions to members [kafka]
rreddy-22 commented on code in PR #15974: URL: https://github.com/apache/kafka/pull/15974#discussion_r1610424080 ## jmh-benchmarks/src/main/java/org/apache/kafka/jmh/assignor/TargetAssignmentBuilderBenchmark.java: ## @@ -186,9 +190,33 @@ private void createAssignmentSpec() { Collections.emptyMap() )); } -assignmentSpec = new AssignmentSpec(members, HOMOGENEOUS); +groupSpec = new GroupSpecImpl(members, HOMOGENEOUS, Collections.emptyMap()); } +public Map> invertedTargetAssignment( Review Comment: I think it's fine, it's just going to be used in these two benchmarks and they have some differences -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-16625: Reverse lookup map from topic partitions to members [kafka]
jeffkbkim commented on code in PR #15974: URL: https://github.com/apache/kafka/pull/15974#discussion_r1610394065 ## group-coordinator/src/test/java/org/apache/kafka/coordinator/group/assignor/GeneralUniformAssignmentBuilderTest.java: ## @@ -79,8 +80,16 @@ public void testTwoMembersNoTopicSubscription() { Collections.emptyMap() )); -AssignmentSpec assignmentSpec = new AssignmentSpec(members, HETEROGENEOUS); -GroupAssignment groupAssignment = assignor.assign(assignmentSpec, subscribedTopicMetadata); +GroupSpecImpl groupSpec = new GroupSpecImpl( Review Comment: nit: should all the `GroupSpecImpl groupSpec` initializations in the tests be `GroupSpec groupSpec`? ## group-coordinator/src/test/java/org/apache/kafka/coordinator/group/AssignmentTestUtil.java: ## @@ -82,4 +83,34 @@ public static void assertAssignment( assertEquals(expectedAssignment.get(memberId), computedAssignmentForMember); }); } + +/** + * Generate a reverse look up map of partition to member target assignments from the given member spec. + * + * @param memberSpecA map where the key is the member Id and the value is an + * AssignmentMemberSpec object containing the member's partition assignments. + * @return Map of topic partition to member assignments. + */ +public static Map> invertedTargetAssignment( +Map memberSpec +) { +Map> invertedTargetAssignment = new HashMap<>(); +for (Map.Entry memberEntry : memberSpec.entrySet()) { +String memberId = memberEntry.getKey(); +Map> topicsAndPartitions = memberEntry.getValue().assignedPartitions(); + +for (Map.Entry> topicEntry : topicsAndPartitions.entrySet()) { +Uuid topicId = topicEntry.getKey(); +Set partitions = topicEntry.getValue(); + +invertedTargetAssignment.putIfAbsent(topicId, new HashMap<>()); +Map partitionMap = invertedTargetAssignment.get(topicId); Review Comment: i think computeIfAbsent would work ## group-coordinator/src/main/java/org/apache/kafka/coordinator/group/assignor/PartitionAssignorException.java: ## @@ -19,7 +19,7 @@ import org.apache.kafka.common.errors.ApiException; /** - * Exception thrown by {@link PartitionAssignor#assign(AssignmentSpec)}. The exception + * Exception thrown by {@link PartitionAssignor#assign(GroupSpecImpl, SubscribedTopicDescriber)}}. The exception Review Comment: i think this needs to be GroupSpec ## jmh-benchmarks/src/main/java/org/apache/kafka/jmh/assignor/TargetAssignmentBuilderBenchmark.java: ## @@ -186,9 +190,33 @@ private void createAssignmentSpec() { Collections.emptyMap() )); } -assignmentSpec = new AssignmentSpec(members, HOMOGENEOUS); +groupSpec = new GroupSpecImpl(members, HOMOGENEOUS, Collections.emptyMap()); } +public Map> invertedTargetAssignment( +GroupAssignment groupAssignment +) { +Map> invertedTargetAssignment = new HashMap<>(); +for (Map.Entry memberEntry : groupAssignment.members().entrySet()) { +String memberId = memberEntry.getKey(); +Map> topicsAndPartitions = memberEntry.getValue().targetPartitions(); + +for (Map.Entry> topicEntry : topicsAndPartitions.entrySet()) { +Uuid topicId = topicEntry.getKey(); +Set partitions = topicEntry.getValue(); + +invertedTargetAssignment.putIfAbsent(topicId, new HashMap<>()); +Map partitionMap = invertedTargetAssignment.get(topicId); + +for (Integer partitionId : partitions) { +partitionMap.put(partitionId, memberId); +} +} +} +return invertedTargetAssignment; +} + + Review Comment: nit: newline ## jmh-benchmarks/src/main/java/org/apache/kafka/jmh/assignor/TargetAssignmentBuilderBenchmark.java: ## @@ -186,9 +190,33 @@ private void createAssignmentSpec() { Collections.emptyMap() )); } -assignmentSpec = new AssignmentSpec(members, HOMOGENEOUS); +groupSpec = new GroupSpecImpl(members, HOMOGENEOUS, Collections.emptyMap()); } +public Map> invertedTargetAssignment( Review Comment: is it not possible to unify this with ServerSideAssignorBenchmark#invertedTargetAssignment? seems like some details are different but they have a lot in common -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] MINOR: Fix rate metric spikes [kafka]
emitskevich-blp commented on code in PR #15889: URL: https://github.com/apache/kafka/pull/15889#discussion_r1610406308 ## clients/src/test/java/org/apache/kafka/common/metrics/stats/RateTest.java: ## @@ -64,4 +69,31 @@ public void testRateWithNoPriorAvailableSamples(int numSample, int sampleWindowS double expectedRatePerSec = sampleValue / windowSize; assertEquals(expectedRatePerSec, observedRate, EPS); } + +// Record an event every 100 ms on average, moving some 1 ms back or forth for fine-grained +// window control. The expected rate, hence, is 10-11 events/sec depending on the moment of +// measurement. Start assertions from the second window. This test is to address past issue, +// when measurements in the end of the sample led to value spikes. Review Comment: Applied ## clients/src/test/java/org/apache/kafka/common/metrics/stats/RateTest.java: ## @@ -64,4 +69,30 @@ public void testRateWithNoPriorAvailableSamples(int numSample, int sampleWindowS double expectedRatePerSec = sampleValue / windowSize; assertEquals(expectedRatePerSec, observedRate, EPS); } + +// Record an event every 100 ms on average, moving some 1 ms back or forth for fine-grained +// window control. The expected rate, hence, is 10-11 events/sec depending on the moment of +// measurement. Start assertions from the second window. +@Test +public void testRateIsConsistentAfterTheFirstWindow() { +MetricConfig config = new MetricConfig().timeWindow(1, SECONDS).samples(2); +List steps = Arrays.asList(0, 99, 100, 100, 100, 100, 100, 100, 100, 100, 100); + +// start the first window and record events at 0,99,199,...,999 ms +for (int stepMs : steps) { +time.sleep(stepMs); +rate.record(config, 1, time.milliseconds()); +} + +// making a gap of 100 ms between windows +time.sleep(101); + +// start the second window and record events at 0,99,199,...,999 ms +for (int stepMs : steps) { +time.sleep(stepMs); +rate.record(config, 1, time.milliseconds()); +double observedRate = rate.measure(config, time.milliseconds()); Review Comment: Applied -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-16452: Don't throw OOORE when converting the offset to metadata [kafka]
kamalcph commented on code in PR #15825: URL: https://github.com/apache/kafka/pull/15825#discussion_r1610404165 ## storage/src/main/java/org/apache/kafka/storage/internals/log/LogSegment.java: ## @@ -445,7 +445,7 @@ public FetchDataInfo read(long startOffset, int maxSize, long maxPosition, boole adjustedMaxSize = Math.max(maxSize, startOffsetAndSize.size); // return a log segment but with zero size in the case below -if (adjustedMaxSize == 0) +if (adjustedMaxSize == 0 || maxPosition == -1) Review Comment: LogSegment is in Java and using Optional as a method parameter in LogSegment#read shows warnings in the intelliJ and require some refactoring, so went with the negative value approach: ``` 'Optional' used as type for parameter 'maxPosition' ``` -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-16452: Don't throw OOORE when converting the offset to metadata [kafka]
kamalcph commented on code in PR #15825: URL: https://github.com/apache/kafka/pull/15825#discussion_r1610401875 ## core/src/main/scala/kafka/server/DelayedFetch.scala: ## @@ -91,19 +91,24 @@ class DelayedFetch( // Go directly to the check for Case G if the message offsets are the same. If the log segment // has just rolled, then the high watermark offset will remain the same but be on the old segment, // which would incorrectly be seen as an instance of Case F. -if (endOffset.messageOffset != fetchOffset.messageOffset) { - if (endOffset.onOlderSegment(fetchOffset)) { -// Case F, this can happen when the new fetch operation is on a truncated leader -debug(s"Satisfying fetch $this since it is fetching later segments of partition $topicIdPartition.") -return forceComplete() +if (fetchOffset.messageOffset > endOffset.messageOffset) { + // Case F, this can happen when the new fetch operation is on a truncated leader + debug(s"Satisfying fetch $this since it is fetching later segments of partition $topicIdPartition.") + return forceComplete() +} else if (fetchOffset.messageOffset < endOffset.messageOffset) { + if (endOffset.messageOffsetOnly() || fetchOffset.messageOffsetOnly()) { +// If we don't know the position of the offset on log segments, just pessimistically assume that we +// only gained 1 byte when fetchOffset < endOffset, otherwise do nothing. This can happen when the +// high-watermark is stale, but should be rare. +accumulatedSize += 1 Review Comment: To confirm, Should we avoid accumulating the 1 byte when fetchOffset < endOffset? The FETCH request will be parked in the purgatory for 500 ms, I don't see any issues with it. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-16452: Don't throw OOORE when converting the offset to metadata [kafka]
kamalcph commented on code in PR #15825: URL: https://github.com/apache/kafka/pull/15825#discussion_r1610399596 ## core/src/test/scala/unit/kafka/log/LogSegmentTest.scala: ## @@ -143,6 +143,39 @@ class LogSegmentTest { checkEquals(ms2.records.iterator, read.records.records.iterator) } + @Test + def testReadWhenNoMaxPosition(): Unit = { +val maxPosition = -1 +val maxSize = 1 +val seg = createSegment(40) +val ms = records(50, "hello", "there") +seg.append(51, RecordBatch.NO_TIMESTAMP, -1L, ms) +for (minOneMessage <- Array(true, false)) { + // read before first offset + var read = seg.read(48, maxSize, maxPosition, minOneMessage) Review Comment: yes, when maxSize is negative (-1), then the following error will be thrown: ``` java.lang.IllegalArgumentException: Invalid max size -1 for log read from segment FileRecords(size=78, file=/var/folders/bq/w6tnkbq964q8sqpvj4fbmjr4gq/T/kafka-18165220027855018994/0040.log, start=0, end=2147483647) at org.apache.kafka.storage.internals.log.LogSegment.read(LogSegment.java:432) ``` -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org