Re: [PR] KAFKA-16373: Adding code to support Apache Kafka Docker Official Images [kafka]
KrishVora01 commented on code in PR #15704: URL: https://github.com/apache/kafka/pull/15704#discussion_r1566786457 ## .github/workflows/prepare_docker_official_image_source.yml: ## @@ -0,0 +1,55 @@ +# Licensed to the Apache Software Foundation (ASF) under one or more +# contributor license agreements. See the NOTICE file distributed with +# this work for additional information regarding copyright ownership. +# The ASF licenses this file to You under the Apache License, Version 2.0 +# (the "License"); you may not use this file except in compliance with +# the License. You may obtain a copy of the License at +# +#http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +name: Docker Prepare Docker Official Image Source + +on: + workflow_dispatch: +inputs: + image_type: +type: choice +description: Docker image type to build and test +options: + - "jvm" + kafka_url: +description: Kafka url to be used to build the source for docker official image Review Comment: This change has been made 👍 -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-16373: Adding code to support Apache Kafka Docker Official Images [kafka]
KrishVora01 commented on code in PR #15704: URL: https://github.com/apache/kafka/pull/15704#discussion_r1566786231 ## docker/docker_official_image_build_test.py: ## @@ -0,0 +1,128 @@ +#!/usr/bin/env python + +# Licensed to the Apache Software Foundation (ASF) under one or more +# contributor license agreements. See the NOTICE file distributed with +# this work for additional information regarding copyright ownership. +# The ASF licenses this file to You under the Apache License, Version 2.0 +# (the "License"); you may not use this file except in compliance with +# the License. You may obtain a copy of the License at +# +#http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +""" +Python script to build and test a docker image +This script is used to generate a test report + +Usage: +docker_build_test.py --help +Get detailed description of each option + +Example command:- +docker_build_test.py --image-tag --image-type --kafka-url + +This command will build an image with as image name, as image_tag (it will be latest by default), + as image type (jvm by default), for the kafka inside the image and run tests on the image. +-b can be passed as additional argument if you just want to build the image. +-t can be passed if you just want to run tests on the image. +""" + +from datetime import date +import argparse +from distutils.dir_util import copy_tree +import shutil +from test.docker_sanity_test import run_tests +from common import execute, jvm_image +import tempfile +import os + + +def set_executable_permissions(directory): +""" +Sets executable permissions for all files in the specified directory and its subdirectories. +""" +for root, _, files in os.walk(directory): +for file in files: +path = os.path.join(root, file) +os.chmod(path, os.stat(path).st_mode | 0o111) + + +def build_jvm(image, tag, kafka_version): +image = f'{image}:{tag}' +current_dir = os.path.dirname(os.path.realpath(__file__)) +temp_dir_path = tempfile.mkdtemp() +directories = [ +f'{current_dir}/docker_official_images/{kafka_version}/jvm', +f'{current_dir}/docker_official_images/{kafka_version}/jvm/resources' +] +for directory in directories: +set_executable_permissions(directory) +copy_tree(f"{current_dir}/docker_official_images/{kafka_version}/jvm", + f"{temp_dir_path}/jvm") + copy_tree(f"{current_dir}/docker_official_images/{kafka_version}/jvm/resources", + f"{temp_dir_path}/jvm/resources") +command = f"docker build -f $DOCKER_FILE -t {image} $DOCKER_DIR" +command = command.replace("$DOCKER_FILE", f"{temp_dir_path}/jvm/Dockerfile") +command = command.replace("$DOCKER_DIR", f"{temp_dir_path}/jvm") +try: +execute(command.split()) +except: +raise SystemError("Docker Image Build failed") +finally: +shutil.rmtree(temp_dir_path) + + +def run_jvm_tests(image, tag, kafka_url): +temp_dir_path = tempfile.mkdtemp() +try: +current_dir = os.path.dirname(os.path.realpath(__file__)) +copy_tree(f"{current_dir}/test/fixtures", f"{temp_dir_path}/fixtures") +execute(["wget", "-nv", "-O", f"{temp_dir_path}/kafka.tgz", kafka_url]) +execute(["mkdir", f"{temp_dir_path}/fixtures/kafka"]) +execute(["tar", "xfz", f"{temp_dir_path}/kafka.tgz", "-C", +f"{temp_dir_path}/fixtures/kafka", "--strip-components", "1"]) +failure_count = run_tests(f"{image}:{tag}", "jvm", temp_dir_path) +except: +raise SystemError("Failed to run the tests") +finally: +shutil.rmtree(temp_dir_path) +test_report_location_text = f"To view test report please check {current_dir}/test/report_jvm.html" Review Comment: This change has been made 👍 ## docker/docker_official_image_build_test.py: ## @@ -0,0 +1,128 @@ +#!/usr/bin/env python + +# Licensed to the Apache Software Foundation (ASF) under one or more +# contributor license agreements. See the NOTICE file distributed with +# this work for additional information regarding copyright ownership. +# The ASF licenses this file to You under the Apache License, Version 2.0 +# (the "License"); you may not use this file except in compliance with +# the License. You may obtain a copy of the License at +# +#http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either expres
[PR] KAFKA-14509: [3/4] Add integration test for consumerGroupDescribe Api [kafka]
riedelmax opened a new pull request, #15727: URL: https://github.com/apache/kafka/pull/15727 *More detailed description of your change, if necessary. The PR title and PR message become the squashed commit message, so use a separate comment to ping reviewers.* *Summary of testing strategy (including rationale) for the feature or bug fix. Unit and/or integration tests are expected for any behaviour change and system tests should be considered for larger changes.* ### Committer Checklist (excluded from commit message) - [ ] Verify design and implementation - [ ] Verify test coverage and CI build status - [ ] Verify documentation (including upgrade notes) -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-16373: Adding code to support Apache Kafka Docker Official Images [kafka]
KrishVora01 commented on code in PR #15704: URL: https://github.com/apache/kafka/pull/15704#discussion_r1566779353 ## docker/docker_official_image_build_test.py: ## @@ -0,0 +1,128 @@ +#!/usr/bin/env python + +# Licensed to the Apache Software Foundation (ASF) under one or more +# contributor license agreements. See the NOTICE file distributed with +# this work for additional information regarding copyright ownership. +# The ASF licenses this file to You under the Apache License, Version 2.0 +# (the "License"); you may not use this file except in compliance with +# the License. You may obtain a copy of the License at +# +#http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +""" +Python script to build and test a docker image +This script is used to generate a test report + +Usage: +docker_build_test.py --help +Get detailed description of each option + +Example command:- +docker_build_test.py --image-tag --image-type --kafka-url + +This command will build an image with as image name, as image_tag (it will be latest by default), + as image type (jvm by default), for the kafka inside the image and run tests on the image. +-b can be passed as additional argument if you just want to build the image. +-t can be passed if you just want to run tests on the image. +""" + +from datetime import date +import argparse +from distutils.dir_util import copy_tree +import shutil +from test.docker_sanity_test import run_tests +from common import execute, jvm_image +import tempfile +import os + + +def set_executable_permissions(directory): +""" +Sets executable permissions for all files in the specified directory and its subdirectories. +""" +for root, _, files in os.walk(directory): +for file in files: +path = os.path.join(root, file) +os.chmod(path, os.stat(path).st_mode | 0o111) + + +def build_jvm(image, tag, kafka_version): +image = f'{image}:{tag}' +current_dir = os.path.dirname(os.path.realpath(__file__)) +temp_dir_path = tempfile.mkdtemp() +directories = [ +f'{current_dir}/docker_official_images/{kafka_version}/jvm', +f'{current_dir}/docker_official_images/{kafka_version}/jvm/resources' +] +for directory in directories: +set_executable_permissions(directory) +copy_tree(f"{current_dir}/docker_official_images/{kafka_version}/jvm", + f"{temp_dir_path}/jvm") + copy_tree(f"{current_dir}/docker_official_images/{kafka_version}/jvm/resources", + f"{temp_dir_path}/jvm/resources") +command = f"docker build -f $DOCKER_FILE -t {image} $DOCKER_DIR" +command = command.replace("$DOCKER_FILE", f"{temp_dir_path}/jvm/Dockerfile") +command = command.replace("$DOCKER_DIR", f"{temp_dir_path}/jvm") +try: +execute(command.split()) +except: +raise SystemError("Docker Image Build failed") +finally: +shutil.rmtree(temp_dir_path) + + +def run_jvm_tests(image, tag, kafka_url): +temp_dir_path = tempfile.mkdtemp() +try: +current_dir = os.path.dirname(os.path.realpath(__file__)) +copy_tree(f"{current_dir}/test/fixtures", f"{temp_dir_path}/fixtures") +execute(["wget", "-nv", "-O", f"{temp_dir_path}/kafka.tgz", kafka_url]) +execute(["mkdir", f"{temp_dir_path}/fixtures/kafka"]) +execute(["tar", "xfz", f"{temp_dir_path}/kafka.tgz", "-C", +f"{temp_dir_path}/fixtures/kafka", "--strip-components", "1"]) +failure_count = run_tests(f"{image}:{tag}", "jvm", temp_dir_path) +except: +raise SystemError("Failed to run the tests") +finally: +shutil.rmtree(temp_dir_path) +test_report_location_text = f"To view test report please check {current_dir}/test/report_jvm.html" +if failure_count != 0: +raise SystemError( +f"{failure_count} tests have failed. {test_report_location_text}") +else: +print(f"All tests passed successfully. {test_report_location_text}") + + +if __name__ == '__main__': +parser = argparse.ArgumentParser() +parser.add_argument( +"image", help="Image name that you want to keep for the Docker image") +parser.add_argument("--image-tag", "-tag", default="latest", +dest="tag", help="Image tag that you want to add to the image") +parser.add_argument("--image-type", "-type", choices=[ +"jvm"], default="jvm", dest="image_type", help="Image type you want to build") +parser.add_argument("--kafka-url", "-u", dest="kafka_url", +help="Kafka url to be
Re: [PR] KAFKA-16373: Adding code to support Apache Kafka Docker Official Images [kafka]
KrishVora01 commented on code in PR #15704: URL: https://github.com/apache/kafka/pull/15704#discussion_r1566778083 ## docker/generate_kafka_pr_template.sh: ## @@ -0,0 +1,76 @@ +#!/usr/bin/env bash + +# Licensed to the Apache Software Foundation (ASF) under one or more +# contributor license agreements. See the NOTICE file distributed with +# this work for additional information regarding copyright ownership. +# The ASF licenses this file to You under the Apache License, Version 2.0 +# (the "License"); you may not use this file except in compliance with +# the License. You may obtain a copy of the License at +# +#http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +# Ensure script exits on error or unset variable +set -eu + +# Define the 'self' variable with the script's basename +self="$(basename "$BASH_SOURCE")" + +# Navigate to the script's directory and then to docker_official_images +cd "$(dirname "$(readlink -f "$BASH_SOURCE")")/docker_official_images" + +# Source common utilities +source ../common.sh + +# Initialize an empty variable for the highest version +highest_version="" + +# Output header information +cat <<-EOH +# This file is generated via https://github.com/apache/kafka/blob/$(fileCommit "../$self")/docker/generate_kafka_pr_template.sh + +Maintainers: The Apache Kafka Project (@ApacheKafka) +GitRepo: https://github.com/apache/kafka.git +EOH + +# Find all versions, excluding -rc, sort them, and determine the globally highest version +versions=$(find . -mindepth 1 -maxdepth 1 -type d ! -name "*-rc" | sort -V) +for dir in $versions; do +version=$(basename "$dir") +highest_version="$version" # Continuously update to ensure the last is the highest +done Review Comment: This change has been made 👍 -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-16373: Adding code to support Apache Kafka Docker Official Images [kafka]
KrishVora01 commented on code in PR #15704: URL: https://github.com/apache/kafka/pull/15704#discussion_r1566755671 ## docker/generate_kafka_pr_template.sh: ## @@ -0,0 +1,61 @@ +#!/usr/bin/env bash Review Comment: This has been added -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] JMH Benchmarks for testing the performance of the Server Side Rebalances: KIP_848 [kafka]
rreddy-22 commented on code in PR #15717: URL: https://github.com/apache/kafka/pull/15717#discussion_r1566753533 ## jmh-benchmarks/src/main/java/org/apache/kafka/jmh/group_coordinator/ServerSideAssignorBenchmark.java: ## @@ -0,0 +1,185 @@ +package org.apache.kafka.jmh.group_coordinator; + +import org.apache.kafka.common.Uuid; +import org.apache.kafka.coordinator.group.assignor.AssignmentMemberSpec; +import org.apache.kafka.coordinator.group.assignor.AssignmentSpec; +import org.apache.kafka.coordinator.group.assignor.GroupAssignment; +import org.apache.kafka.coordinator.group.assignor.MemberAssignment; +import org.apache.kafka.coordinator.group.assignor.PartitionAssignor; +import org.apache.kafka.coordinator.group.assignor.RangeAssignor; +import org.apache.kafka.coordinator.group.assignor.SubscribedTopicDescriber; +import org.apache.kafka.coordinator.group.assignor.UniformAssignor; +import org.apache.kafka.coordinator.group.consumer.SubscribedTopicMetadata; +import org.apache.kafka.coordinator.group.consumer.TopicMetadata; +import org.openjdk.jmh.annotations.Benchmark; +import org.openjdk.jmh.annotations.BenchmarkMode; +import org.openjdk.jmh.annotations.Fork; +import org.openjdk.jmh.annotations.Level; +import org.openjdk.jmh.annotations.Measurement; +import org.openjdk.jmh.annotations.Mode; +import org.openjdk.jmh.annotations.OutputTimeUnit; +import org.openjdk.jmh.annotations.Param; +import org.openjdk.jmh.annotations.Scope; +import org.openjdk.jmh.annotations.Setup; +import org.openjdk.jmh.annotations.State; +import org.openjdk.jmh.annotations.Threads; +import org.openjdk.jmh.annotations.Warmup; + +import java.util.ArrayList; +import java.util.Arrays; +import java.util.Collections; +import java.util.HashMap; +import java.util.HashSet; +import java.util.List; +import java.util.Map; +import java.util.Optional; +import java.util.Set; +import java.util.TreeMap; +import java.util.concurrent.TimeUnit; + +import static java.lang.Integer.max; + +@State(Scope.Benchmark) +@Fork(value = 1) +@Warmup(iterations = 5) +@Measurement(iterations = 5) +@BenchmarkMode(Mode.AverageTime) +@OutputTimeUnit(TimeUnit.MILLISECONDS) +public class ServerSideAssignorBenchmark { + +@Param({"10", "50", "100"}) +private int partitionsPerTopicCount; + +@Param({"100"}) +private int topicCount; + +@Param({"500", "1000"}) +private int memberCount; + +@Param({"true", "false"}) +private boolean isRackAware; + +@Param({"true", "false"}) +private boolean isSubscriptionUniform; + +@Param({"true", "false"}) +private boolean isRangeAssignor; + +@Param({"true", "false"}) +private boolean isReassignment; + +private PartitionAssignor partitionAssignor; + +private final int numberOfRacks = 3; + +private AssignmentSpec assignmentSpec; + +private SubscribedTopicDescriber subscribedTopicDescriber; + +@Setup(Level.Trial) +public void setup() { +Map topicMetadata = new HashMap<>(); +Map> partitionRacks = isRackAware ? +mkMapOfPartitionRacks(partitionsPerTopicCount) : +Collections.emptyMap(); + +for (int i = 1; i <= topicCount; i++) { +Uuid topicUuid = Uuid.randomUuid(); +String topicName = "topic" + i; +topicMetadata.put(topicUuid, new TopicMetadata( +topicUuid, topicName, partitionsPerTopicCount, partitionRacks)); +} + +addTopicSubscriptions(topicMetadata); +this.subscribedTopicDescriber = new SubscribedTopicMetadata(topicMetadata); + +if (isRangeAssignor) { +this.partitionAssignor = new RangeAssignor(); +} else { +this.partitionAssignor = new UniformAssignor(); +} + +if (isReassignment) { +GroupAssignment initialAssignment = partitionAssignor.assign(assignmentSpec, subscribedTopicDescriber); +Map members; + +members = initialAssignment.members(); + +// Update the AssignmentSpec with the results from the initial assignment. +Map updatedMembers = new HashMap<>(); + +members.forEach((memberId, memberAssignment) -> { +AssignmentMemberSpec memberSpec = assignmentSpec.members().get(memberId); +updatedMembers.put(memberId, new AssignmentMemberSpec( +memberSpec.instanceId(), +memberSpec.rackId(), +memberSpec.subscribedTopicIds(), +memberAssignment.targetPartitions() +)); +}); + +// Add new member to trigger a reassignment. +Optional rackId = isRackAware ? Optional.of("rack" + (memberCount + 1) % numberOfRacks) : Optional.empty(); Review Comment: added anyways -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-ma
Re: [PR] KAFKA-15875 Make Snapshot public [kafka]
github-actions[bot] commented on PR #14816: URL: https://github.com/apache/kafka/pull/14816#issuecomment-2058166826 This PR is being marked as stale since it has not had any activity in 90 days. If you would like to keep this PR alive, please ask a committer for review. If the PR has merge conflicts, please update it with the latest from trunk (or appropriate release branch) If this PR is no longer valid or desired, please feel free to close it. If no activity occurs in the next 30 days, it will be automatically closed. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-16543:There may be ambiguous deletions in the `cleanupGroupMetadata` when the generation of the group is less than or equal to 0 [kafka]
hudeqi closed pull request #15706: KAFKA-16543:There may be ambiguous deletions in the `cleanupGroupMetadata` when the generation of the group is less than or equal to 0 URL: https://github.com/apache/kafka/pull/15706 -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-16543:There may be ambiguous deletions in the `cleanupGroupMetadata` when the generation of the group is less than or equal to 0 [kafka]
hudeqi commented on PR #15706: URL: https://github.com/apache/kafka/pull/15706#issuecomment-2058152299 > Hi @hudeqi. Thanks for the patch. I would like to better understand it. My first question is how would Flink commit Flink with a generationId equal to -1? The generation of the group is only managed by the group. It is not possible to alter it from an external system. The -1 passed in the offset commit request is only used for validation purposes. > > The reason why we don't write a tombstone in this case is because the group was never materialized in the log if it stayed at generation 0. I am not sure it is a worthwhile optimization though. @dajac Thank you for your review. The answer for first question: Flink only uses Kafka to commit and store offsets, and its group is not managed by Kafka. By default, the commit generation value is always -1. Since the generation is only changed when members are managed in Kafka, Flink's generation remains -1 and will not be changed. The answer for second question: In the case of using Kafka only to store and commit offsets, the group is also initialized in the `groupMetadataCache` in memory. In the log, the group's metadata and offsetMetadata are also written to `__consumer_offsets`. Therefore, considering data consistency, they should all be cleaned up when being purged. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-15685: added compatibility for MinGW [kafka]
Zta commented on PR #13321: URL: https://github.com/apache/kafka/pull/13321#issuecomment-2058148690 My OS is RHEL 8.7 and 8.8 In 3.5.1, I ran this script with kafka.tools.GetOffsetShell just fine. In 3.7.0, I got the following error: Error: Could not find or load main class kafka.tools.GetOffsetShell Caused by: java.lang.ClassNotFoundException: kafka.tools.GetOffsetShell -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[PR] MINOR: fix some typos [kafka]
Nicklee007 opened a new pull request, #15725: URL: https://github.com/apache/kafka/pull/15725 Fix some typos. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] MINOR: Add test for PartitionMetadataFile [kafka]
KevinZTW commented on code in PR #15714: URL: https://github.com/apache/kafka/pull/15714#discussion_r1566619569 ## storage/src/test/java/org/apache/kafka/storage/internals/checkpoint/PartitionMetadataFileTest.java: ## @@ -0,0 +1,98 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.kafka.storage.internals.checkpoint; + +import org.apache.kafka.common.Uuid; +import org.apache.kafka.common.errors.InconsistentTopicIdException; + +import org.apache.kafka.storage.internals.log.LogDirFailureChannel; +import org.apache.kafka.test.TestUtils; +import org.junit.jupiter.api.Test; +import org.mockito.Mockito; + +import java.io.File; +import java.nio.file.Files; +import java.util.List; + +import static org.junit.jupiter.api.Assertions.assertDoesNotThrow; +import static org.junit.jupiter.api.Assertions.assertEquals; +import static org.junit.jupiter.api.Assertions.assertThrows; + +public class PartitionMetadataFileTest { +private final File dir = TestUtils.tempDirectory(); Review Comment: Oh thanks! I forgot the hook would cleanup files in each test automatically -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] MINOR: Add test for PartitionMetadataFile [kafka]
KevinZTW commented on code in PR #15714: URL: https://github.com/apache/kafka/pull/15714#discussion_r1566612658 ## storage/src/test/java/org/apache/kafka/storage/internals/checkpoint/PartitionMetadataFileTest.java: ## @@ -0,0 +1,98 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.kafka.storage.internals.checkpoint; + +import org.apache.kafka.common.Uuid; +import org.apache.kafka.common.errors.InconsistentTopicIdException; + +import org.apache.kafka.storage.internals.log.LogDirFailureChannel; +import org.apache.kafka.test.TestUtils; +import org.junit.jupiter.api.Test; +import org.mockito.Mockito; + +import java.io.File; +import java.nio.file.Files; +import java.util.List; + +import static org.junit.jupiter.api.Assertions.assertDoesNotThrow; +import static org.junit.jupiter.api.Assertions.assertEquals; +import static org.junit.jupiter.api.Assertions.assertThrows; + +public class PartitionMetadataFileTest { +private final File dir = TestUtils.tempDirectory(); + +@Test +public void testSetRecordWithDifferentTopicId() { +File file = PartitionMetadataFile.newFile(dir); +PartitionMetadataFile partitionMetadataFile = new PartitionMetadataFile(file, null); +Uuid topicId = Uuid.randomUuid(); +assertDoesNotThrow(() -> partitionMetadataFile.record(topicId)); +Uuid differentTopicId = Uuid.randomUuid(); +assertThrows(InconsistentTopicIdException.class, () -> partitionMetadataFile.record(differentTopicId)); +} + +@Test +public void testSetRecordWithSameTopicId() { +File file = PartitionMetadataFile.newFile(dir); +PartitionMetadataFile partitionMetadataFile = new PartitionMetadataFile(file, null); +Uuid topicId = Uuid.randomUuid(); +assertDoesNotThrow(() -> partitionMetadataFile.record(topicId)); +assertDoesNotThrow(() -> partitionMetadataFile.record(topicId)); +} + +@Test +public void testMaybeFlushWithTopicIdPresent() { +File file = PartitionMetadataFile.newFile(dir); +PartitionMetadataFile partitionMetadataFile = new PartitionMetadataFile(file, null); + +Uuid topicId = Uuid.randomUuid(); +assertDoesNotThrow(() -> partitionMetadataFile.record(topicId)); +assertDoesNotThrow(partitionMetadataFile::maybeFlush); + +assertDoesNotThrow(() -> { +List lines = Files.readAllLines(file.toPath()); +assertEquals(2, lines.size()); +assertEquals("version: 0", lines.get(0)); +assertEquals("topic_id: " + topicId, lines.get(1)); +}); +} + +@Test +public void testMaybeFlushWithNoTopicIdPresent() { +File file = PartitionMetadataFile.newFile(dir); +PartitionMetadataFile partitionMetadataFile = new PartitionMetadataFile(file, null); + +assertDoesNotThrow(partitionMetadataFile::maybeFlush); +assertEquals(0, file.length()); +} + +@Test +public void testRead() { +File file = PartitionMetadataFile.newFile(dir); +LogDirFailureChannel channel = Mockito.mock(LogDirFailureChannel.class); +PartitionMetadataFile partitionMetadataFile = new PartitionMetadataFile(file, channel); + +Uuid topicId = Uuid.randomUuid(); +assertDoesNotThrow(() -> partitionMetadataFile.record(topicId)); Review Comment: The only exception would be generated is `InconsistentTopicIdException` in this case I think -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] MINOR: Add test for PartitionMetadataFile [kafka]
KevinZTW commented on code in PR #15714: URL: https://github.com/apache/kafka/pull/15714#discussion_r1566612024 ## storage/src/test/java/org/apache/kafka/storage/internals/checkpoint/PartitionMetadataFileTest.java: ## @@ -0,0 +1,98 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.kafka.storage.internals.checkpoint; + +import org.apache.kafka.common.Uuid; +import org.apache.kafka.common.errors.InconsistentTopicIdException; + +import org.apache.kafka.storage.internals.log.LogDirFailureChannel; +import org.apache.kafka.test.TestUtils; +import org.junit.jupiter.api.Test; +import org.mockito.Mockito; + +import java.io.File; +import java.nio.file.Files; +import java.util.List; + +import static org.junit.jupiter.api.Assertions.assertDoesNotThrow; +import static org.junit.jupiter.api.Assertions.assertEquals; +import static org.junit.jupiter.api.Assertions.assertThrows; + +public class PartitionMetadataFileTest { +private final File dir = TestUtils.tempDirectory(); + +@Test +public void testSetRecordWithDifferentTopicId() { +File file = PartitionMetadataFile.newFile(dir); +PartitionMetadataFile partitionMetadataFile = new PartitionMetadataFile(file, null); +Uuid topicId = Uuid.randomUuid(); +assertDoesNotThrow(() -> partitionMetadataFile.record(topicId)); +Uuid differentTopicId = Uuid.randomUuid(); +assertThrows(InconsistentTopicIdException.class, () -> partitionMetadataFile.record(differentTopicId)); +} + +@Test +public void testSetRecordWithSameTopicId() { +File file = PartitionMetadataFile.newFile(dir); +PartitionMetadataFile partitionMetadataFile = new PartitionMetadataFile(file, null); +Uuid topicId = Uuid.randomUuid(); +assertDoesNotThrow(() -> partitionMetadataFile.record(topicId)); +assertDoesNotThrow(() -> partitionMetadataFile.record(topicId)); +} + +@Test +public void testMaybeFlushWithTopicIdPresent() { +File file = PartitionMetadataFile.newFile(dir); +PartitionMetadataFile partitionMetadataFile = new PartitionMetadataFile(file, null); + +Uuid topicId = Uuid.randomUuid(); +assertDoesNotThrow(() -> partitionMetadataFile.record(topicId)); +assertDoesNotThrow(partitionMetadataFile::maybeFlush); + +assertDoesNotThrow(() -> { +List lines = Files.readAllLines(file.toPath()); +assertEquals(2, lines.size()); +assertEquals("version: 0", lines.get(0)); +assertEquals("topic_id: " + topicId, lines.get(1)); +}); +} + +@Test +public void testMaybeFlushWithNoTopicIdPresent() { +File file = PartitionMetadataFile.newFile(dir); +PartitionMetadataFile partitionMetadataFile = new PartitionMetadataFile(file, null); + +assertDoesNotThrow(partitionMetadataFile::maybeFlush); +assertEquals(0, file.length()); +} + +@Test +public void testRead() { +File file = PartitionMetadataFile.newFile(dir); +LogDirFailureChannel channel = Mockito.mock(LogDirFailureChannel.class); +PartitionMetadataFile partitionMetadataFile = new PartitionMetadataFile(file, channel); + +Uuid topicId = Uuid.randomUuid(); +assertDoesNotThrow(() -> partitionMetadataFile.record(topicId)); Review Comment: Oh, in here the reason I used `assertDoesNotThrow` is because we expect the first time to `record` the `topicId` shouldn't generate any exception, since there's no `dirtyTopicIdOpt` Do you think that make sense? Or that would be redundant? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] MINOR: Add test for PartitionMetadataFile [kafka]
KevinZTW commented on code in PR #15714: URL: https://github.com/apache/kafka/pull/15714#discussion_r1566609363 ## storage/src/test/java/org/apache/kafka/storage/internals/checkpoint/PartitionMetadataFileTest.java: ## @@ -0,0 +1,98 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.kafka.storage.internals.checkpoint; + +import org.apache.kafka.common.Uuid; +import org.apache.kafka.common.errors.InconsistentTopicIdException; + +import org.apache.kafka.storage.internals.log.LogDirFailureChannel; +import org.apache.kafka.test.TestUtils; +import org.junit.jupiter.api.Test; +import org.mockito.Mockito; + +import java.io.File; +import java.nio.file.Files; +import java.util.List; + +import static org.junit.jupiter.api.Assertions.assertDoesNotThrow; +import static org.junit.jupiter.api.Assertions.assertEquals; +import static org.junit.jupiter.api.Assertions.assertThrows; + +public class PartitionMetadataFileTest { +private final File dir = TestUtils.tempDirectory(); + +@Test +public void testSetRecordWithDifferentTopicId() { +File file = PartitionMetadataFile.newFile(dir); +PartitionMetadataFile partitionMetadataFile = new PartitionMetadataFile(file, null); +Uuid topicId = Uuid.randomUuid(); +assertDoesNotThrow(() -> partitionMetadataFile.record(topicId)); +Uuid differentTopicId = Uuid.randomUuid(); +assertThrows(InconsistentTopicIdException.class, () -> partitionMetadataFile.record(differentTopicId)); +} + +@Test +public void testSetRecordWithSameTopicId() { +File file = PartitionMetadataFile.newFile(dir); +PartitionMetadataFile partitionMetadataFile = new PartitionMetadataFile(file, null); +Uuid topicId = Uuid.randomUuid(); +assertDoesNotThrow(() -> partitionMetadataFile.record(topicId)); +assertDoesNotThrow(() -> partitionMetadataFile.record(topicId)); +} + +@Test +public void testMaybeFlushWithTopicIdPresent() { +File file = PartitionMetadataFile.newFile(dir); +PartitionMetadataFile partitionMetadataFile = new PartitionMetadataFile(file, null); + +Uuid topicId = Uuid.randomUuid(); +assertDoesNotThrow(() -> partitionMetadataFile.record(topicId)); +assertDoesNotThrow(partitionMetadataFile::maybeFlush); + +assertDoesNotThrow(() -> { +List lines = Files.readAllLines(file.toPath()); +assertEquals(2, lines.size()); +assertEquals("version: 0", lines.get(0)); Review Comment: ok! thanks -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-16556: SubscriptionState should not prematurely reset 'pending' partitions [kafka]
kirktrue commented on PR #15724: URL: https://github.com/apache/kafka/pull/15724#issuecomment-2058081763 @lucasbru—please kindly take a look at this issue that's occasionally causing consumers to ignore committed offsets. The result is the consumer resets the partition's position back to 0, causing reprocessing of messages. cc @lianetm @philipnee -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] MINOR: Add test for PartitionMetadataFile [kafka]
KevinZTW commented on code in PR #15714: URL: https://github.com/apache/kafka/pull/15714#discussion_r1566606914 ## storage/src/test/java/org/apache/kafka/storage/internals/checkpoint/PartitionMetadataFileTest.java: ## @@ -0,0 +1,98 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.kafka.storage.internals.checkpoint; + +import org.apache.kafka.common.Uuid; +import org.apache.kafka.common.errors.InconsistentTopicIdException; + +import org.apache.kafka.storage.internals.log.LogDirFailureChannel; +import org.apache.kafka.test.TestUtils; +import org.junit.jupiter.api.Test; +import org.mockito.Mockito; + +import java.io.File; +import java.nio.file.Files; +import java.util.List; + +import static org.junit.jupiter.api.Assertions.assertDoesNotThrow; +import static org.junit.jupiter.api.Assertions.assertEquals; +import static org.junit.jupiter.api.Assertions.assertThrows; + +public class PartitionMetadataFileTest { +private final File dir = TestUtils.tempDirectory(); + +@Test +public void testSetRecordWithDifferentTopicId() { +File file = PartitionMetadataFile.newFile(dir); +PartitionMetadataFile partitionMetadataFile = new PartitionMetadataFile(file, null); +Uuid topicId = Uuid.randomUuid(); +assertDoesNotThrow(() -> partitionMetadataFile.record(topicId)); +Uuid differentTopicId = Uuid.randomUuid(); +assertThrows(InconsistentTopicIdException.class, () -> partitionMetadataFile.record(differentTopicId)); +} + +@Test +public void testSetRecordWithSameTopicId() { +File file = PartitionMetadataFile.newFile(dir); +PartitionMetadataFile partitionMetadataFile = new PartitionMetadataFile(file, null); +Uuid topicId = Uuid.randomUuid(); +assertDoesNotThrow(() -> partitionMetadataFile.record(topicId)); +assertDoesNotThrow(() -> partitionMetadataFile.record(topicId)); +} + +@Test +public void testMaybeFlushWithTopicIdPresent() { +File file = PartitionMetadataFile.newFile(dir); +PartitionMetadataFile partitionMetadataFile = new PartitionMetadataFile(file, null); + +Uuid topicId = Uuid.randomUuid(); +assertDoesNotThrow(() -> partitionMetadataFile.record(topicId)); +assertDoesNotThrow(partitionMetadataFile::maybeFlush); + +assertDoesNotThrow(() -> { +List lines = Files.readAllLines(file.toPath()); +assertEquals(2, lines.size()); Review Comment: I see, thanks! -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[PR] KAFKA-16556: SubscriptionState should not prematurely reset 'pending' partitions [kafka]
kirktrue opened a new pull request, #15724: URL: https://github.com/apache/kafka/pull/15724 Partitions that are marked as `pendingOnAssignedCallback` should not be reset in `resetInitializingPositions()`. Pending partitions are omitted from the set returned by `initializingPartitions()`. As a result, the Consumer does not include them in the set of partitions for which it attempts to load committed offsets. The code used by the Consumer to reset positions (`resetInitializingPositions()`) should likewise skip partitions marked as pending. ### Committer Checklist (excluded from commit message) - [ ] Verify design and implementation - [ ] Verify test coverage and CI build status - [ ] Verify documentation (including upgrade notes) -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Updated] (KAFKA-16556) SubscriptionState should not prematurely reset 'pending' partitions
[ https://issues.apache.org/jira/browse/KAFKA-16556?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Kirk True updated KAFKA-16556: -- Summary: SubscriptionState should not prematurely reset 'pending' partitions (was: Race condition between ConsumerRebalanceListener and SubscriptionState causes commit offsets to be reset) > SubscriptionState should not prematurely reset 'pending' partitions > --- > > Key: KAFKA-16556 > URL: https://issues.apache.org/jira/browse/KAFKA-16556 > Project: Kafka > Issue Type: Bug > Components: clients, consumer >Affects Versions: 3.7.0 >Reporter: Kirk True >Assignee: Kirk True >Priority: Blocker > Labels: kip-848-client-support > Fix For: 3.8.0 > > > There appears to be a race condition between invoking the > {{ConsumerRebalanceListener}} callbacks on reconciliation and > {{initWithCommittedOffsetsIfNeeded}} in the consumer. > > The membership manager adds the newly assigned partitions to the > {{{}SubscriptionState{}}}, but marks them as > {{{}pendingOnAssignedCallback{}}}. Then, after the > {{ConsumerRebalanceListener.onPartitionsAssigned()}} completes, the > membership manager will invoke {{enablePartitionsAwaitingCallback}} to set > all of those partitions' 'pending' flag to false. > > During the main {{Consumer.poll()}} loop, {{AsyncKafkaConsumer}} may need to > call {{initWithCommittedOffsetsIfNeeded()}} if the positions aren't already > cached. Inside {{{}initWithCommittedOffsetsIfNeeded{}}}, the consumer calls > the subscription's {{initializingPartitions}} method to get a set of the > partitions for which to fetch their committed offsets. However, > {{SubscriptionState.initializingPartitions()}} only returns partitions that > have the {{pendingOnAssignedCallback}} flag set to to false. > > The result is: > * If the {{MembershipManagerImpl.assignPartitions()}} future is completed > on the background thread first, the 'pending' flag is set to false. On the > application thread, when {{SubscriptionState.initializingPartitions()}} is > called, it returns the partition, and we fetch its committed offsets > * If instead the application thread calls > {{SubscriptionState.initializingPartitions()}} first, the partitions's > 'pending' flag is still set to false, and so the partition is omitted from > the returned set. The {{updateFetchPositions()}} method then continues on and > re-initializes the partition's fetch offset to 0. -- This message was sent by Atlassian Jira (v8.20.10#820010)
Re: [PR] KAFKA-15265: Add Remote Log Manager quota manager [kafka]
HenryCaiHaiying commented on code in PR #15625: URL: https://github.com/apache/kafka/pull/15625#discussion_r1566566286 ## core/src/main/java/kafka/log/remote/quota/RLMQuotaManager.java: ## @@ -0,0 +1,119 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package kafka.log.remote.quota; + +import kafka.server.QuotaType; +import kafka.server.SensorAccess; +import org.apache.kafka.common.MetricName; +import org.apache.kafka.common.metrics.KafkaMetric; +import org.apache.kafka.common.metrics.MetricConfig; +import org.apache.kafka.common.metrics.Metrics; +import org.apache.kafka.common.metrics.Quota; +import org.apache.kafka.common.metrics.QuotaViolationException; +import org.apache.kafka.common.metrics.Sensor; +import org.apache.kafka.common.metrics.stats.SimpleRate; +import org.apache.kafka.common.utils.Time; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; +import scala.runtime.BoxedUnit; + +import java.util.Collections; +import java.util.Map; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.locks.ReentrantReadWriteLock; + +public class RLMQuotaManager { +private static final Logger LOGGER = LoggerFactory.getLogger(RLMQuotaManager.class); + +private final RLMQuotaManagerConfig config; +private final Metrics metrics; +private final QuotaType quotaType; +private final String description; +private final Time time; + +private final ReentrantReadWriteLock lock = new ReentrantReadWriteLock(); +private final SensorAccess sensorAccess; +private Quota quota; + +public RLMQuotaManager(RLMQuotaManagerConfig config, Metrics metrics, QuotaType quotaType, String description, Time time) { +this.config = config; +this.metrics = metrics; +this.quotaType = quotaType; +this.description = description; +this.time = time; + +this.quota = new Quota(config.getQuotaBytesPerSecond(), true); +this.sensorAccess = new SensorAccess(lock, metrics); +} + +public void updateQuota(Quota newQuota) { +lock.writeLock().lock(); +try { +this.quota = newQuota; + +Map allMetrics = metrics.metrics(); +MetricName quotaMetricName = metricName(); +allMetrics.forEach((metricName, metric) -> { +if (metricName.name().equals(quotaMetricName.name()) && metricName.group().equals(quotaMetricName.group())) { +Map metricTags = metricName.tags(); +LOGGER.info("Sensor for quota-id {} already exists. Setting quota to {} in MetricConfig", metricTags, newQuota); Review Comment: LOGGER.warn ? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-16555: Consumer's RequestState has incorrect logic to determine if inflight [kafka]
kirktrue commented on PR #15723: URL: https://github.com/apache/kafka/pull/15723#issuecomment-2057988468 @lucasbru—please kindly take a look at this issue that's occasionally causing duplicate heartbeat requests. cc @lianetm @philipnee -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[PR] KAFKA-16555: Consumer's RequestState has incorrect logic to determine if inflight [kafka]
kirktrue opened a new pull request, #15723: URL: https://github.com/apache/kafka/pull/15723 In some cases, the network layer is _very_ fast and can send out multiple requests within the same millisecond timestamp. The previous logic for tracking inflight status used timestamps: if the timestamp from the last received response was less than the timestamp from the last sent request, we'd interpret that as having an inflight request. However, this approach would incorrectly return `false` from `RequestState.requestInFlight()` if the two timestamps were _equal_. One result of this faulty logic is that in such cases, the consumer would accidentally send multiple heartbeat requests to the consumer group coordinator. The consumer group coordinator would interpret these requests as 'join group' requests and create members for each request. Therefore, the coordinator was under the false understanding that there were more members in the group than there really were. Consequently, if your luck was _really_ bad, the coordinator might assign partitions to one of the duplicate members. Those partitions would be assigned to a phantom consumer that was not reading any data, and this led to flaky tests. The implementation in `RequestState` has a stupid simple flag that is set in `onSendAttempt` and cleared in `onSuccessfulAttempt`, `onFailedAttempt`, and `reset`. A new unit test has been added and this has been tested against all of the consumer unit and integration tests, and has removed all known occurrences of phantom consumer group members in the system tests. ### Committer Checklist (excluded from commit message) - [ ] Verify design and implementation - [ ] Verify test coverage and CI build status - [ ] Verify documentation (including upgrade notes) -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Updated] (KAFKA-15974) Enforce that event processing respects user-provided timeout
[ https://issues.apache.org/jira/browse/KAFKA-15974?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Kirk True updated KAFKA-15974: -- Reviewer: Lucas Brutschy (was: Bruno Cadonna) > Enforce that event processing respects user-provided timeout > > > Key: KAFKA-15974 > URL: https://issues.apache.org/jira/browse/KAFKA-15974 > Project: Kafka > Issue Type: Improvement > Components: clients, consumer >Reporter: Kirk True >Assignee: Kirk True >Priority: Blocker > Labels: consumer-threading-refactor, timeout > Fix For: 3.8.0 > > > The intention of the {{CompletableApplicationEvent}} is for a {{Consumer}} to > block waiting for the event to complete. The application thread will block > for the timeout, but there is not yet a consistent manner in which events are > timed out. > Enforce at the event handler/event processing layer that timeouts are > respected per the design in KAFKA-15848. -- This message was sent by Atlassian Jira (v8.20.10#820010)
Re: [PR] Kafka-16540: Update partitions if min isr config is changed. [kafka]
CalvinConfluent commented on code in PR #15702: URL: https://github.com/apache/kafka/pull/15702#discussion_r1566471100 ## metadata/src/main/java/org/apache/kafka/controller/ConfigurationControlManager.java: ## @@ -423,6 +441,17 @@ public void replay(ConfigRecord record) { log.info("Replayed ConfigRecord for {} which set configuration {} to {}", configResource, record.name(), record.value()); } +if (record.name() == TopicConfig.MIN_IN_SYNC_REPLICAS_CONFIG) { +try { +if (type == Type.TOPIC) { + minIsrConfigUpdatePartitionHandler.addRecordsForMinIsrUpdate(Optional.of(record.resourceName())); +} else { + minIsrConfigUpdatePartitionHandler.addRecordsForMinIsrUpdate(Optional.empty()); +} +} catch (InterruptedException | ExecutionException e) { +throw new Throwable("Fail to append partition updates for the min isr update: " + e.getMessage()); +} Review Comment: Removed. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] Kafka-16540: Update partitions if min isr config is changed. [kafka]
CalvinConfluent commented on code in PR #15702: URL: https://github.com/apache/kafka/pull/15702#discussion_r1566455070 ## metadata/src/main/java/org/apache/kafka/controller/QuorumController.java: ## @@ -2360,4 +2363,9 @@ void setNewNextWriteOffset(long newNextWriteOffset) { void handleUncleanBrokerShutdown(int brokerId, List records) { replicationControl.handleBrokerUncleanShutdown(brokerId, records); } + +void maybeTriggerMinIsrConfigUpdate(Optional topicName) throws InterruptedException, ExecutionException { +appendWriteEvent("partitionUpdateForMinIsrChange", OptionalLong.empty(), +() -> replicationControl.getPartitionElrUpdatesForConfigChanges(topicName)).get(); +} Review Comment: Got it, we basically only need to call the appendWriteEvents and do not wait for the replay(). -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Updated] (KAFKA-16556) Race condition between ConsumerRebalanceListener and SubscriptionState causes commit offsets to be reset
[ https://issues.apache.org/jira/browse/KAFKA-16556?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Kirk True updated KAFKA-16556: -- Summary: Race condition between ConsumerRebalanceListener and SubscriptionState causes commit offsets to be reset (was: Race condition between ConsumerRebalanceListener and SubscriptionState) > Race condition between ConsumerRebalanceListener and SubscriptionState causes > commit offsets to be reset > > > Key: KAFKA-16556 > URL: https://issues.apache.org/jira/browse/KAFKA-16556 > Project: Kafka > Issue Type: Bug > Components: clients, consumer >Affects Versions: 3.7.0 >Reporter: Kirk True >Assignee: Kirk True >Priority: Blocker > Labels: kip-848-client-support > Fix For: 3.8.0 > > > There appears to be a race condition between invoking the > {{ConsumerRebalanceListener}} callbacks on reconciliation and > {{initWithCommittedOffsetsIfNeeded}} in the consumer. > > The membership manager adds the newly assigned partitions to the > {{{}SubscriptionState{}}}, but marks them as > {{{}pendingOnAssignedCallback{}}}. Then, after the > {{ConsumerRebalanceListener.onPartitionsAssigned()}} completes, the > membership manager will invoke {{enablePartitionsAwaitingCallback}} to set > all of those partitions' 'pending' flag to false. > > During the main {{Consumer.poll()}} loop, {{AsyncKafkaConsumer}} may need to > call {{initWithCommittedOffsetsIfNeeded()}} if the positions aren't already > cached. Inside {{{}initWithCommittedOffsetsIfNeeded{}}}, the consumer calls > the subscription's {{initializingPartitions}} method to get a set of the > partitions for which to fetch their committed offsets. However, > {{SubscriptionState.initializingPartitions()}} only returns partitions that > have the {{pendingOnAssignedCallback}} flag set to to false. > > The result is: > * If the {{MembershipManagerImpl.assignPartitions()}} future is completed > on the background thread first, the 'pending' flag is set to false. On the > application thread, when {{SubscriptionState.initializingPartitions()}} is > called, it returns the partition, and we fetch its committed offsets > * If instead the application thread calls > {{SubscriptionState.initializingPartitions()}} first, the partitions's > 'pending' flag is still set to false, and so the partition is omitted from > the returned set. The {{updateFetchPositions()}} method then continues on and > re-initializes the partition's fetch offset to 0. -- This message was sent by Atlassian Jira (v8.20.10#820010)
Re: [PR] MINOR: fix duplicated return and other streams docs typos [kafka]
AyoubOm commented on code in PR #15713: URL: https://github.com/apache/kafka/pull/15713#discussion_r1566436979 ## streams/src/main/java/org/apache/kafka/streams/kstream/NamedOperation.java: ## @@ -17,7 +17,7 @@ package org.apache.kafka.streams.kstream; /** - * Default interface which can be used to personalized the named of operations, internal topics or store. + * Default interface which can be used to customize the named of operations, internal topics or store. Review Comment: Thanks @johnnychhsu, fixed. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[PR] MINOR: Fix io-[wait-]ratio metrics description [kafka]
emitskevich-blp opened a new pull request, #15722: URL: https://github.com/apache/kafka/pull/15722 [Implementation of KIP-773](https://github.com/apache/kafka/pull/11302) deprecated `iotime-total` and `io-waittime-total` metrics. It wasn't expected to mark `io-ratio` and `io-wait-ratio` deprecated. However, now they have `*Deprecated* ` in their description. Here is the reason: 1. register `io-ratio` (desc: `*Deprecated* The fraction of time ...`) -> registered 2. register `iotime-total` (desc: `*Deprecated* The total time ...`) -> registered 3. register `io-ratio` (desc: `The fraction of time ...`) -> **skipped, the same name already exists in registry** 4. register `io-time-ns-total` (desc: `The total time ...`) -> registered As a result, `io-ratio` has incorrect description. The same for `io-wait-ratio`. This PR fixes these descriptions.. ### Committer Checklist (excluded from commit message) - [ ] Verify design and implementation - [ ] Verify test coverage and CI build status - [ ] Verify documentation (including upgrade notes) -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Created] (KAFKA-16558) Implement HeartbeatRequestState.toStringBase()
Kirk True created KAFKA-16558: - Summary: Implement HeartbeatRequestState.toStringBase() Key: KAFKA-16558 URL: https://issues.apache.org/jira/browse/KAFKA-16558 Project: Kafka Issue Type: Bug Components: clients, consumer Affects Versions: 3.7.0 Reporter: Kirk True Assignee: Kirk True Fix For: 3.8.0 The code incorrectly overrides the {{toString()}} method instead of overriding {{{}toStringBase(){}}}. This affects debugging and troubleshooting consumer issues. -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Updated] (KAFKA-16558) Implement HeartbeatRequestState.toStringBase()
[ https://issues.apache.org/jira/browse/KAFKA-16558?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Kirk True updated KAFKA-16558: -- Description: The inner class {{HeartbeatRequestState}} does not override the {{toStringBase()}} method. This affects debugging and troubleshooting consumer issues. (was: The code incorrectly overrides the {{toString()}} method instead of overriding {{{}toStringBase(){}}}. This affects debugging and troubleshooting consumer issues.) > Implement HeartbeatRequestState.toStringBase() > -- > > Key: KAFKA-16558 > URL: https://issues.apache.org/jira/browse/KAFKA-16558 > Project: Kafka > Issue Type: Bug > Components: clients, consumer >Affects Versions: 3.7.0 >Reporter: Kirk True >Assignee: Kirk True >Priority: Major > Labels: consumer-threading-refactor, logging > Fix For: 3.8.0 > > > The inner class {{HeartbeatRequestState}} does not override the > {{toStringBase()}} method. This affects debugging and troubleshooting > consumer issues. -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Updated] (KAFKA-16557) Fix OffsetFetchRequestState.toString()
[ https://issues.apache.org/jira/browse/KAFKA-16557?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Kirk True updated KAFKA-16557: -- Summary: Fix OffsetFetchRequestState.toString() (was: Fix CommitRequestManager’s OffsetFetchRequestState.toString()) > Fix OffsetFetchRequestState.toString() > -- > > Key: KAFKA-16557 > URL: https://issues.apache.org/jira/browse/KAFKA-16557 > Project: Kafka > Issue Type: Bug > Components: clients, consumer >Affects Versions: 3.7.0 >Reporter: Kirk True >Assignee: Kirk True >Priority: Major > Labels: consumer-threading-refactor, logging > Fix For: 3.8.0 > > > The code incorrectly overrides the {{toString()}} method instead of > overriding {{{}toStringBase(){}}}. This affects debugging and troubleshooting > consumer issues. -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Created] (KAFKA-16557) Fix CommitRequestManager’s OffsetFetchRequestState.toString()
Kirk True created KAFKA-16557: - Summary: Fix CommitRequestManager’s OffsetFetchRequestState.toString() Key: KAFKA-16557 URL: https://issues.apache.org/jira/browse/KAFKA-16557 Project: Kafka Issue Type: Bug Components: clients, consumer Affects Versions: 3.7.0 Reporter: Kirk True Assignee: Kirk True Fix For: 3.8.0 The code incorrectly overrides the {{toString()}} method instead of overriding {{{}toStringBase(){}}}. This affects debugging and troubleshooting consumer issues. -- This message was sent by Atlassian Jira (v8.20.10#820010)
Re: [PR] Kafka-16540: Update partitions if min isr config is changed. [kafka]
CalvinConfluent commented on code in PR #15702: URL: https://github.com/apache/kafka/pull/15702#discussion_r1566242343 ## metadata/src/main/java/org/apache/kafka/controller/QuorumController.java: ## @@ -2360,4 +2363,9 @@ void setNewNextWriteOffset(long newNextWriteOffset) { void handleUncleanBrokerShutdown(int brokerId, List records) { replicationControl.handleBrokerUncleanShutdown(brokerId, records); } + +void maybeTriggerMinIsrConfigUpdate(Optional topicName) throws InterruptedException, ExecutionException { +appendWriteEvent("partitionUpdateForMinIsrChange", OptionalLong.empty(), +() -> replicationControl.getPartitionElrUpdatesForConfigChanges(topicName)).get(); +} Review Comment: Then we only need to call appendWriteEvent here? We don't have to wait for the replay(). -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Updated] (KAFKA-16556) Race condition between ConsumerRebalanceListener and SubscriptionState
[ https://issues.apache.org/jira/browse/KAFKA-16556?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Kirk True updated KAFKA-16556: -- Description: There appears to be a race condition between invoking the {{ConsumerRebalanceListener}} callbacks on reconciliation and {{initWithCommittedOffsetsIfNeeded}} in the consumer. The membership manager adds the newly assigned partitions to the {{{}SubscriptionState{}}}, but marks them as {{{}pendingOnAssignedCallback{}}}. Then, after the {{ConsumerRebalanceListener.onPartitionsAssigned()}} completes, the membership manager will invoke {{enablePartitionsAwaitingCallback}} to set all of those partitions' 'pending' flag to false. During the main {{Consumer.poll()}} loop, {{AsyncKafkaConsumer}} may need to call {{initWithCommittedOffsetsIfNeeded()}} if the positions aren't already cached. Inside {{{}initWithCommittedOffsetsIfNeeded{}}}, the consumer calls the subscription's {{initializingPartitions}} method to get a set of the partitions for which to fetch their committed offsets. However, {{SubscriptionState.initializingPartitions()}} only returns partitions that have the {{pendingOnAssignedCallback}} flag set to to false. The result is: * If the {{MembershipManagerImpl.assignPartitions()}} future is completed on the background thread first, the 'pending' flag is set to false. On the application thread, when {{SubscriptionState.initializingPartitions()}} is called, it returns the partition, and we fetch its committed offsets * If instead the application thread calls {{SubscriptionState.initializingPartitions()}} first, the partitions's 'pending' flag is still set to false, and so the partition is omitted from the returned set. The {{updateFetchPositions()}} method then continues on and re-initializes the partition's fetch offset to 0. was: There appears to be a race condition between invoking the {{ConsumerRebalanceListener}} callbacks on reconciliation and {{initWithCommittedOffsetsIfNeeded}} in the consumer. The membership manager adds the newly assigned partitions to the {{{}SubscriptionState{}}}, but marks them as {{{}pendingOnAssignedCallback{}}}. Then, after the {{ConsumerRebalanceListener.onPartitionsAssigned()}} completes, the membership manager will invoke {{enablePartitionsAwaitingCallback}} to set all of those partitions' 'pending' flag to false. During the main {{Consumer.poll()}} loop, {{AsyncKafkaConsumer}} may need to call {{initWithCommittedOffsetsIfNeeded()}} if the positions aren't already cached. Inside {{{}initWithCommittedOffsetsIfNeeded{}}}, the consumer calls the subscription's {{initializingPartitions}} method to get a set of the partitions for which to fetch their committed offsets. However, {{SubscriptionState.initializingPartitions()}} only returns partitions that have the {{pendingOnAssignedCallback}} flag set to to false. The result is: * If the {{MembershipManagerImpl.assignPartitions()}} future is completed on the background thread first, the 'pending' flag is set to false. On the application thread, when {{SubscriptionState.initializingPartitions()}} is called, it returns the partition, and we fetch its committed offsets * If instead the application thread calls {{SubscriptionState.initializingPartitions()}} first, the partitions's 'pending' flag is still set to false, and so the partition is omitted from the returned set. The {{updateFetchPositions()}} method then continues on and re-initializes the partition's fetch offset to 0. > Race condition between ConsumerRebalanceListener and SubscriptionState > -- > > Key: KAFKA-16556 > URL: https://issues.apache.org/jira/browse/KAFKA-16556 > Project: Kafka > Issue Type: Bug > Components: clients, consumer >Affects Versions: 3.7.0 >Reporter: Kirk True >Assignee: Kirk True >Priority: Blocker > Labels: kip-848-client-support > Fix For: 3.8.0 > > > There appears to be a race condition between invoking the > {{ConsumerRebalanceListener}} callbacks on reconciliation and > {{initWithCommittedOffsetsIfNeeded}} in the consumer. > > The membership manager adds the newly assigned partitions to the > {{{}SubscriptionState{}}}, but marks them as > {{{}pendingOnAssignedCallback{}}}. Then, after the > {{ConsumerRebalanceListener.onPartitionsAssigned()}} completes, the > membership manager will invoke {{enablePartitionsAwaitingCallback}} to set > all of those partitions' 'pending' flag to false. > > During the main {{Consumer.poll()}} loop, {{AsyncKafkaConsumer}} may need to > call {{initWithCommittedOffsetsIfNeeded()}} if the positions aren't already > cached. Inside {{{}initWithCommittedOffsetsIfNeeded{}}}, the consumer calls > the subscription's {{initializingPartitions}} method to
[jira] [Assigned] (KAFKA-16556) Race condition between ConsumerRebalanceListener and SubscriptionState
[ https://issues.apache.org/jira/browse/KAFKA-16556?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Kirk True reassigned KAFKA-16556: - Assignee: Kirk True > Race condition between ConsumerRebalanceListener and SubscriptionState > -- > > Key: KAFKA-16556 > URL: https://issues.apache.org/jira/browse/KAFKA-16556 > Project: Kafka > Issue Type: Bug > Components: clients, consumer >Affects Versions: 3.7.0 >Reporter: Kirk True >Assignee: Kirk True >Priority: Blocker > Labels: kip-848-client-support > Fix For: 3.8.0 > > > There appears to be a race condition between invoking the > {{ConsumerRebalanceListener}} callbacks on reconciliation and > {{initWithCommittedOffsetsIfNeeded}} in the consumer. > > The membership manager adds the newly assigned partitions to the > {{{}SubscriptionState{}}}, but marks them as > {{{}pendingOnAssignedCallback{}}}. Then, after the > {{ConsumerRebalanceListener.onPartitionsAssigned()}} completes, the > membership manager will invoke {{enablePartitionsAwaitingCallback}} to set > all of those partitions' 'pending' flag to false. > > During the main {{Consumer.poll()}} loop, {{AsyncKafkaConsumer}} may need to > call {{initWithCommittedOffsetsIfNeeded()}} if the positions aren't already > cached. Inside {{{}initWithCommittedOffsetsIfNeeded{}}}, the consumer calls > the subscription's {{initializingPartitions}} method to get a set of the > partitions for which to fetch their committed offsets. However, > {{SubscriptionState.initializingPartitions()}} only returns partitions that > have the {{pendingOnAssignedCallback}} flag set to to false. > > The result is: * If the {{MembershipManagerImpl.assignPartitions()}} future > is completed on the background thread first, the 'pending' flag is set to > false. On the application thread, when > {{SubscriptionState.initializingPartitions()}} is called, it returns the > partition, and we fetch its committed offsets > * If instead the application thread calls > {{SubscriptionState.initializingPartitions()}} first, the partitions's > 'pending' flag is still set to false, and so the partition is omitted from > the returned set. The {{updateFetchPositions()}} method then continues on and > re-initializes the partition's fetch offset to 0. -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Created] (KAFKA-16556) Race condition between ConsumerRebalanceListener and SubscriptionState
Kirk True created KAFKA-16556: - Summary: Race condition between ConsumerRebalanceListener and SubscriptionState Key: KAFKA-16556 URL: https://issues.apache.org/jira/browse/KAFKA-16556 Project: Kafka Issue Type: Bug Components: clients, consumer Affects Versions: 3.7.0 Reporter: Kirk True Fix For: 3.8.0 There appears to be a race condition between invoking the {{ConsumerRebalanceListener}} callbacks on reconciliation and {{initWithCommittedOffsetsIfNeeded}} in the consumer. The membership manager adds the newly assigned partitions to the {{{}SubscriptionState{}}}, but marks them as {{{}pendingOnAssignedCallback{}}}. Then, after the {{ConsumerRebalanceListener.onPartitionsAssigned()}} completes, the membership manager will invoke {{enablePartitionsAwaitingCallback}} to set all of those partitions' 'pending' flag to false. During the main {{Consumer.poll()}} loop, {{AsyncKafkaConsumer}} may need to call {{initWithCommittedOffsetsIfNeeded()}} if the positions aren't already cached. Inside {{{}initWithCommittedOffsetsIfNeeded{}}}, the consumer calls the subscription's {{initializingPartitions}} method to get a set of the partitions for which to fetch their committed offsets. However, {{SubscriptionState.initializingPartitions()}} only returns partitions that have the {{pendingOnAssignedCallback}} flag set to to false. The result is: * If the {{MembershipManagerImpl.assignPartitions()}} future is completed on the background thread first, the 'pending' flag is set to false. On the application thread, when {{SubscriptionState.initializingPartitions()}} is called, it returns the partition, and we fetch its committed offsets * If instead the application thread calls {{SubscriptionState.initializingPartitions()}} first, the partitions's 'pending' flag is still set to false, and so the partition is omitted from the returned set. The {{updateFetchPositions()}} method then continues on and re-initializes the partition's fetch offset to 0. -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Created] (KAFKA-16555) Consumer's RequestState has incorrect logic to determine if inflight
Kirk True created KAFKA-16555: - Summary: Consumer's RequestState has incorrect logic to determine if inflight Key: KAFKA-16555 URL: https://issues.apache.org/jira/browse/KAFKA-16555 Project: Kafka Issue Type: Task Components: clients, consumer Affects Versions: 3.7.0 Reporter: Kirk True Assignee: Kirk True Fix For: 3.8.0 When running system tests for the new consumer, I've hit an issue where the {{HeartbeatRequestManager}} is sending out multiple concurrent {{CONSUMER_GROUP_REQUEST}} RPCs. The effect is the coordinator creates multiple members which causes downstream assignment problems. Here's the order of events: * Time 202: {{HearbeatRequestManager.poll()}} determines it's OK to send a request. In so doing, it updates the {{RequestState}}'s {{lastSentMs}} to the current timestamp, 202 * Time 236: the response is received and response handler is invoked, setting the {{RequestState}}'s {{lastReceivedMs}} to the current timestamp, 236 * Time 236: {{HearbeatRequestManager.poll()}} is invoked again, and it sees that it's OK to send a request. It creates another request, once again updating the {{RequestState}}'s {{lastSentMs}} to the current timestamp, 236 * Time 237: {{HearbeatRequestManager.poll()}} is invoked again, and ERRONEOUSLY decides it's OK to send another request, despite one already in flight. Here's the problem with {{requestInFlight()}}: {code:java} public boolean requestInFlight() { return this.lastSentMs > -1 && this.lastReceivedMs < this.lastSentMs; } {code} On our case, {{lastReceivedMs}} is 236 and {{lastSentMs}} is _also_ 236. So the received timestamp is _equal_ to the sent timestamp, not _less_. -- This message was sent by Atlassian Jira (v8.20.10#820010)
[PR] KAFKA-16554: Online downgrade triggering and group type conversion [kafka]
dongnuo123 opened a new pull request, #15721: URL: https://github.com/apache/kafka/pull/15721 Online downgrade from a consumer group to a classic group is triggered when the last consumer that uses the consumer protocol leaves the group. A rebalance is manually triggered after the group conversion. This patch adds consumer group downgrade validation and conversion. https://issues.apache.org/jira/browse/KAFKA-16554 ### Committer Checklist (excluded from commit message) - [ ] Verify design and implementation - [ ] Verify test coverage and CI build status - [ ] Verify documentation (including upgrade notes) -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] Kafka-16540: Update partitions if min isr config is changed. [kafka]
CalvinConfluent commented on code in PR #15702: URL: https://github.com/apache/kafka/pull/15702#discussion_r1566242343 ## metadata/src/main/java/org/apache/kafka/controller/QuorumController.java: ## @@ -2360,4 +2363,9 @@ void setNewNextWriteOffset(long newNextWriteOffset) { void handleUncleanBrokerShutdown(int brokerId, List records) { replicationControl.handleBrokerUncleanShutdown(brokerId, records); } + +void maybeTriggerMinIsrConfigUpdate(Optional topicName) throws InterruptedException, ExecutionException { +appendWriteEvent("partitionUpdateForMinIsrChange", OptionalLong.empty(), +() -> replicationControl.getPartitionElrUpdatesForConfigChanges(topicName)).get(); +} Review Comment: Then we only need to call appendWriteEvent here? We don't have to wait for the replay(). -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] JMH Benchmarks for testing the performance of the Server Side Rebalances: KIP_848 [kafka]
rreddy-22 commented on code in PR #15717: URL: https://github.com/apache/kafka/pull/15717#discussion_r1566238375 ## jmh-benchmarks/src/main/java/org/apache/kafka/jmh/group_coordinator/TargetAssignmentBuilderBenchmark.java: ## @@ -0,0 +1,259 @@ +package org.apache.kafka.jmh.group_coordinator; + +import org.apache.kafka.common.Uuid; +import org.apache.kafka.coordinator.group.assignor.AssignmentMemberSpec; +import org.apache.kafka.coordinator.group.assignor.AssignmentSpec; +import org.apache.kafka.coordinator.group.assignor.GroupAssignment; +import org.apache.kafka.coordinator.group.assignor.MemberAssignment; +import org.apache.kafka.coordinator.group.assignor.PartitionAssignor; +import org.apache.kafka.coordinator.group.assignor.RangeAssignor; +import org.apache.kafka.coordinator.group.assignor.UniformAssignor; +import org.apache.kafka.coordinator.group.consumer.Assignment; +import org.apache.kafka.coordinator.group.consumer.ConsumerGroupMember; +import org.apache.kafka.coordinator.group.consumer.SubscribedTopicMetadata; +import org.apache.kafka.coordinator.group.consumer.TargetAssignmentBuilder; +import org.apache.kafka.coordinator.group.consumer.TopicMetadata; +import org.apache.kafka.coordinator.group.consumer.VersionedMetadata; +import org.openjdk.jmh.annotations.Benchmark; +import org.openjdk.jmh.annotations.BenchmarkMode; +import org.openjdk.jmh.annotations.Fork; +import org.openjdk.jmh.annotations.Level; +import org.openjdk.jmh.annotations.Measurement; +import org.openjdk.jmh.annotations.Mode; +import org.openjdk.jmh.annotations.OutputTimeUnit; +import org.openjdk.jmh.annotations.Param; +import org.openjdk.jmh.annotations.Scope; +import org.openjdk.jmh.annotations.Setup; +import org.openjdk.jmh.annotations.State; +import org.openjdk.jmh.annotations.Threads; +import org.openjdk.jmh.annotations.Warmup; + +import java.util.ArrayList; +import java.util.Arrays; +import java.util.Collections; +import java.util.HashMap; +import java.util.HashSet; +import java.util.List; +import java.util.Map; +import java.util.Optional; +import java.util.Set; +import java.util.concurrent.TimeUnit; + +import static java.lang.Integer.max; + +@State(Scope.Benchmark) +@Fork(value = 1) +@Warmup(iterations = 5) +@Measurement(iterations = 5) +@BenchmarkMode(Mode.AverageTime) +@OutputTimeUnit(TimeUnit.MILLISECONDS) +public class TargetAssignmentBuilderBenchmark { + +@Param({"1000", "1"}) +private int memberCount; + +@Param({"10", "50"}) +private int partitionsPerTopicCount; + +@Param({"1000"}) +private int topicCount; + +@Param({"true", "false"}) +private boolean isSubscriptionUniform; + +@Param({"true", "false"}) +private boolean isRangeAssignor; + +@Param({"true", "false"}) +private boolean isRackAware; Review Comment: The numbers change based on the assignor and these factors though right? Which assignor and params would you want for the baseline of this benchmark -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] JMH Benchmarks for testing the performance of the Server Side Rebalances: KIP_848 [kafka]
rreddy-22 commented on code in PR #15717: URL: https://github.com/apache/kafka/pull/15717#discussion_r1566237268 ## jmh-benchmarks/src/main/java/org/apache/kafka/jmh/group_coordinator/ServerSideAssignorBenchmark.java: ## @@ -0,0 +1,185 @@ +package org.apache.kafka.jmh.group_coordinator; + +import org.apache.kafka.common.Uuid; +import org.apache.kafka.coordinator.group.assignor.AssignmentMemberSpec; +import org.apache.kafka.coordinator.group.assignor.AssignmentSpec; +import org.apache.kafka.coordinator.group.assignor.GroupAssignment; +import org.apache.kafka.coordinator.group.assignor.MemberAssignment; +import org.apache.kafka.coordinator.group.assignor.PartitionAssignor; +import org.apache.kafka.coordinator.group.assignor.RangeAssignor; +import org.apache.kafka.coordinator.group.assignor.SubscribedTopicDescriber; +import org.apache.kafka.coordinator.group.assignor.UniformAssignor; +import org.apache.kafka.coordinator.group.consumer.SubscribedTopicMetadata; +import org.apache.kafka.coordinator.group.consumer.TopicMetadata; +import org.openjdk.jmh.annotations.Benchmark; +import org.openjdk.jmh.annotations.BenchmarkMode; +import org.openjdk.jmh.annotations.Fork; +import org.openjdk.jmh.annotations.Level; +import org.openjdk.jmh.annotations.Measurement; +import org.openjdk.jmh.annotations.Mode; +import org.openjdk.jmh.annotations.OutputTimeUnit; +import org.openjdk.jmh.annotations.Param; +import org.openjdk.jmh.annotations.Scope; +import org.openjdk.jmh.annotations.Setup; +import org.openjdk.jmh.annotations.State; +import org.openjdk.jmh.annotations.Threads; +import org.openjdk.jmh.annotations.Warmup; + +import java.util.ArrayList; +import java.util.Arrays; +import java.util.Collections; +import java.util.HashMap; +import java.util.HashSet; +import java.util.List; +import java.util.Map; +import java.util.Optional; +import java.util.Set; +import java.util.TreeMap; +import java.util.concurrent.TimeUnit; + +import static java.lang.Integer.max; + +@State(Scope.Benchmark) +@Fork(value = 1) +@Warmup(iterations = 5) +@Measurement(iterations = 5) +@BenchmarkMode(Mode.AverageTime) +@OutputTimeUnit(TimeUnit.MILLISECONDS) +public class ServerSideAssignorBenchmark { + +@Param({"10", "50", "100"}) +private int partitionsPerTopicCount; + +@Param({"100"}) +private int topicCount; + +@Param({"500", "1000"}) +private int memberCount; + +@Param({"true", "false"}) +private boolean isRackAware; + +@Param({"true", "false"}) +private boolean isSubscriptionUniform; + +@Param({"true", "false"}) +private boolean isRangeAssignor; + +@Param({"true", "false"}) +private boolean isReassignment; + +private PartitionAssignor partitionAssignor; + +private final int numberOfRacks = 3; + +private AssignmentSpec assignmentSpec; + +private SubscribedTopicDescriber subscribedTopicDescriber; + +@Setup(Level.Trial) +public void setup() { +Map topicMetadata = new HashMap<>(); +Map> partitionRacks = isRackAware ? +mkMapOfPartitionRacks(partitionsPerTopicCount) : +Collections.emptyMap(); + +for (int i = 1; i <= topicCount; i++) { +Uuid topicUuid = Uuid.randomUuid(); +String topicName = "topic" + i; +topicMetadata.put(topicUuid, new TopicMetadata( +topicUuid, topicName, partitionsPerTopicCount, partitionRacks)); +} + +addTopicSubscriptions(topicMetadata); +this.subscribedTopicDescriber = new SubscribedTopicMetadata(topicMetadata); + +if (isRangeAssignor) { +this.partitionAssignor = new RangeAssignor(); +} else { +this.partitionAssignor = new UniformAssignor(); +} + +if (isReassignment) { +GroupAssignment initialAssignment = partitionAssignor.assign(assignmentSpec, subscribedTopicDescriber); +Map members; + +members = initialAssignment.members(); + +// Update the AssignmentSpec with the results from the initial assignment. +Map updatedMembers = new HashMap<>(); + +members.forEach((memberId, memberAssignment) -> { +AssignmentMemberSpec memberSpec = assignmentSpec.members().get(memberId); +updatedMembers.put(memberId, new AssignmentMemberSpec( +memberSpec.instanceId(), +memberSpec.rackId(), +memberSpec.subscribedTopicIds(), +memberAssignment.targetPartitions() +)); +}); + +// Add new member to trigger a reassignment. +Optional rackId = isRackAware ? Optional.of("rack" + (memberCount + 1) % numberOfRacks) : Optional.empty(); + +updatedMembers.put("newMember", new AssignmentMemberSpec( +Optional.empty(), +rackId, +topicMetadata.keySet(), +Collections.emptyMap() +
Re: [PR] JMH Benchmarks for testing the performance of the Server Side Rebalances: KIP_848 [kafka]
rreddy-22 commented on code in PR #15717: URL: https://github.com/apache/kafka/pull/15717#discussion_r1566234265 ## jmh-benchmarks/src/main/java/org/apache/kafka/jmh/group_coordinator/ServerSideAssignorBenchmark.java: ## @@ -0,0 +1,185 @@ +package org.apache.kafka.jmh.group_coordinator; + +import org.apache.kafka.common.Uuid; +import org.apache.kafka.coordinator.group.assignor.AssignmentMemberSpec; +import org.apache.kafka.coordinator.group.assignor.AssignmentSpec; +import org.apache.kafka.coordinator.group.assignor.GroupAssignment; +import org.apache.kafka.coordinator.group.assignor.MemberAssignment; +import org.apache.kafka.coordinator.group.assignor.PartitionAssignor; +import org.apache.kafka.coordinator.group.assignor.RangeAssignor; +import org.apache.kafka.coordinator.group.assignor.SubscribedTopicDescriber; +import org.apache.kafka.coordinator.group.assignor.UniformAssignor; +import org.apache.kafka.coordinator.group.consumer.SubscribedTopicMetadata; +import org.apache.kafka.coordinator.group.consumer.TopicMetadata; +import org.openjdk.jmh.annotations.Benchmark; +import org.openjdk.jmh.annotations.BenchmarkMode; +import org.openjdk.jmh.annotations.Fork; +import org.openjdk.jmh.annotations.Level; +import org.openjdk.jmh.annotations.Measurement; +import org.openjdk.jmh.annotations.Mode; +import org.openjdk.jmh.annotations.OutputTimeUnit; +import org.openjdk.jmh.annotations.Param; +import org.openjdk.jmh.annotations.Scope; +import org.openjdk.jmh.annotations.Setup; +import org.openjdk.jmh.annotations.State; +import org.openjdk.jmh.annotations.Threads; +import org.openjdk.jmh.annotations.Warmup; + +import java.util.ArrayList; +import java.util.Arrays; +import java.util.Collections; +import java.util.HashMap; +import java.util.HashSet; +import java.util.List; +import java.util.Map; +import java.util.Optional; +import java.util.Set; +import java.util.TreeMap; +import java.util.concurrent.TimeUnit; + +import static java.lang.Integer.max; + +@State(Scope.Benchmark) +@Fork(value = 1) +@Warmup(iterations = 5) +@Measurement(iterations = 5) +@BenchmarkMode(Mode.AverageTime) +@OutputTimeUnit(TimeUnit.MILLISECONDS) +public class ServerSideAssignorBenchmark { + +@Param({"10", "50", "100"}) +private int partitionsPerTopicCount; + +@Param({"100"}) +private int topicCount; + +@Param({"500", "1000"}) +private int memberCount; + +@Param({"true", "false"}) +private boolean isRackAware; + +@Param({"true", "false"}) +private boolean isSubscriptionUniform; + +@Param({"true", "false"}) +private boolean isRangeAssignor; + +@Param({"true", "false"}) +private boolean isReassignment; + +private PartitionAssignor partitionAssignor; + +private final int numberOfRacks = 3; + +private AssignmentSpec assignmentSpec; + +private SubscribedTopicDescriber subscribedTopicDescriber; + +@Setup(Level.Trial) +public void setup() { +Map topicMetadata = new HashMap<>(); +Map> partitionRacks = isRackAware ? +mkMapOfPartitionRacks(partitionsPerTopicCount) : +Collections.emptyMap(); + +for (int i = 1; i <= topicCount; i++) { +Uuid topicUuid = Uuid.randomUuid(); +String topicName = "topic" + i; +topicMetadata.put(topicUuid, new TopicMetadata( +topicUuid, topicName, partitionsPerTopicCount, partitionRacks)); +} + +addTopicSubscriptions(topicMetadata); +this.subscribedTopicDescriber = new SubscribedTopicMetadata(topicMetadata); + +if (isRangeAssignor) { +this.partitionAssignor = new RangeAssignor(); +} else { +this.partitionAssignor = new UniformAssignor(); +} + +if (isReassignment) { +GroupAssignment initialAssignment = partitionAssignor.assign(assignmentSpec, subscribedTopicDescriber); +Map members; + +members = initialAssignment.members(); + +// Update the AssignmentSpec with the results from the initial assignment. +Map updatedMembers = new HashMap<>(); + +members.forEach((memberId, memberAssignment) -> { +AssignmentMemberSpec memberSpec = assignmentSpec.members().get(memberId); +updatedMembers.put(memberId, new AssignmentMemberSpec( +memberSpec.instanceId(), +memberSpec.rackId(), +memberSpec.subscribedTopicIds(), +memberAssignment.targetPartitions() +)); +}); + +// Add new member to trigger a reassignment. +Optional rackId = isRackAware ? Optional.of("rack" + (memberCount + 1) % numberOfRacks) : Optional.empty(); Review Comment: It's pretty simple and just one line right? Do we really need a helper method for it? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub a
Re: [PR] JMH Benchmarks for testing the performance of the Server Side Rebalances: KIP_848 [kafka]
rreddy-22 commented on code in PR #15717: URL: https://github.com/apache/kafka/pull/15717#discussion_r1566233067 ## jmh-benchmarks/src/main/java/org/apache/kafka/jmh/group_coordinator/ServerSideAssignorBenchmark.java: ## @@ -0,0 +1,185 @@ +package org.apache.kafka.jmh.group_coordinator; + +import org.apache.kafka.common.Uuid; +import org.apache.kafka.coordinator.group.assignor.AssignmentMemberSpec; +import org.apache.kafka.coordinator.group.assignor.AssignmentSpec; +import org.apache.kafka.coordinator.group.assignor.GroupAssignment; +import org.apache.kafka.coordinator.group.assignor.MemberAssignment; +import org.apache.kafka.coordinator.group.assignor.PartitionAssignor; +import org.apache.kafka.coordinator.group.assignor.RangeAssignor; +import org.apache.kafka.coordinator.group.assignor.SubscribedTopicDescriber; +import org.apache.kafka.coordinator.group.assignor.UniformAssignor; +import org.apache.kafka.coordinator.group.consumer.SubscribedTopicMetadata; +import org.apache.kafka.coordinator.group.consumer.TopicMetadata; +import org.openjdk.jmh.annotations.Benchmark; +import org.openjdk.jmh.annotations.BenchmarkMode; +import org.openjdk.jmh.annotations.Fork; +import org.openjdk.jmh.annotations.Level; +import org.openjdk.jmh.annotations.Measurement; +import org.openjdk.jmh.annotations.Mode; +import org.openjdk.jmh.annotations.OutputTimeUnit; +import org.openjdk.jmh.annotations.Param; +import org.openjdk.jmh.annotations.Scope; +import org.openjdk.jmh.annotations.Setup; +import org.openjdk.jmh.annotations.State; +import org.openjdk.jmh.annotations.Threads; +import org.openjdk.jmh.annotations.Warmup; + +import java.util.ArrayList; +import java.util.Arrays; +import java.util.Collections; +import java.util.HashMap; +import java.util.HashSet; +import java.util.List; +import java.util.Map; +import java.util.Optional; +import java.util.Set; +import java.util.TreeMap; +import java.util.concurrent.TimeUnit; + +import static java.lang.Integer.max; + +@State(Scope.Benchmark) +@Fork(value = 1) +@Warmup(iterations = 5) +@Measurement(iterations = 5) +@BenchmarkMode(Mode.AverageTime) +@OutputTimeUnit(TimeUnit.MILLISECONDS) +public class ServerSideAssignorBenchmark { + +@Param({"10", "50", "100"}) +private int partitionsPerTopicCount; + +@Param({"100"}) +private int topicCount; + +@Param({"500", "1000"}) +private int memberCount; + +@Param({"true", "false"}) +private boolean isRackAware; + +@Param({"true", "false"}) +private boolean isSubscriptionUniform; + +@Param({"true", "false"}) +private boolean isRangeAssignor; + +@Param({"true", "false"}) +private boolean isReassignment; + +private PartitionAssignor partitionAssignor; + +private final int numberOfRacks = 3; + +private AssignmentSpec assignmentSpec; + +private SubscribedTopicDescriber subscribedTopicDescriber; + +@Setup(Level.Trial) +public void setup() { +Map topicMetadata = new HashMap<>(); +Map> partitionRacks = isRackAware ? +mkMapOfPartitionRacks(partitionsPerTopicCount) : +Collections.emptyMap(); + +for (int i = 1; i <= topicCount; i++) { +Uuid topicUuid = Uuid.randomUuid(); +String topicName = "topic" + i; +topicMetadata.put(topicUuid, new TopicMetadata( +topicUuid, topicName, partitionsPerTopicCount, partitionRacks)); +} + +addTopicSubscriptions(topicMetadata); +this.subscribedTopicDescriber = new SubscribedTopicMetadata(topicMetadata); + +if (isRangeAssignor) { +this.partitionAssignor = new RangeAssignor(); +} else { +this.partitionAssignor = new UniformAssignor(); +} + +if (isReassignment) { +GroupAssignment initialAssignment = partitionAssignor.assign(assignmentSpec, subscribedTopicDescriber); +Map members; + +members = initialAssignment.members(); Review Comment: oh yes missed changing it -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] JMH Benchmarks for testing the performance of the Server Side Rebalances: KIP_848 [kafka]
rreddy-22 commented on code in PR #15717: URL: https://github.com/apache/kafka/pull/15717#discussion_r1566224264 ## jmh-benchmarks/src/main/java/org/apache/kafka/jmh/group_coordinator/AssignPartitionsMicroBenchmark.java: ## @@ -0,0 +1,153 @@ +package org.apache.kafka.jmh.group_coordinator; Review Comment: I figured we would add all future benchmarks related to the group coordinator in this package. Like how the target assignment builder isn't directly an assignor benchmark, lmk what you think -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] JMH Benchmarks for testing the performance of the Server Side Rebalances: KIP_848 [kafka]
rreddy-22 commented on code in PR #15717: URL: https://github.com/apache/kafka/pull/15717#discussion_r1566219575 ## jmh-benchmarks/src/main/java/org/apache/kafka/jmh/group_coordinator/ServerSideAssignorBenchmark.java: ## @@ -0,0 +1,185 @@ +package org.apache.kafka.jmh.group_coordinator; + +import org.apache.kafka.common.Uuid; +import org.apache.kafka.coordinator.group.assignor.AssignmentMemberSpec; +import org.apache.kafka.coordinator.group.assignor.AssignmentSpec; +import org.apache.kafka.coordinator.group.assignor.GroupAssignment; +import org.apache.kafka.coordinator.group.assignor.MemberAssignment; +import org.apache.kafka.coordinator.group.assignor.PartitionAssignor; +import org.apache.kafka.coordinator.group.assignor.RangeAssignor; +import org.apache.kafka.coordinator.group.assignor.SubscribedTopicDescriber; +import org.apache.kafka.coordinator.group.assignor.UniformAssignor; +import org.apache.kafka.coordinator.group.consumer.SubscribedTopicMetadata; +import org.apache.kafka.coordinator.group.consumer.TopicMetadata; +import org.openjdk.jmh.annotations.Benchmark; +import org.openjdk.jmh.annotations.BenchmarkMode; +import org.openjdk.jmh.annotations.Fork; +import org.openjdk.jmh.annotations.Level; +import org.openjdk.jmh.annotations.Measurement; +import org.openjdk.jmh.annotations.Mode; +import org.openjdk.jmh.annotations.OutputTimeUnit; +import org.openjdk.jmh.annotations.Param; +import org.openjdk.jmh.annotations.Scope; +import org.openjdk.jmh.annotations.Setup; +import org.openjdk.jmh.annotations.State; +import org.openjdk.jmh.annotations.Threads; +import org.openjdk.jmh.annotations.Warmup; + +import java.util.ArrayList; +import java.util.Arrays; +import java.util.Collections; +import java.util.HashMap; +import java.util.HashSet; +import java.util.List; +import java.util.Map; +import java.util.Optional; +import java.util.Set; +import java.util.TreeMap; +import java.util.concurrent.TimeUnit; + +import static java.lang.Integer.max; + +@State(Scope.Benchmark) +@Fork(value = 1) +@Warmup(iterations = 5) +@Measurement(iterations = 5) +@BenchmarkMode(Mode.AverageTime) +@OutputTimeUnit(TimeUnit.MILLISECONDS) +public class ServerSideAssignorBenchmark { + +@Param({"10", "50", "100"}) +private int partitionsPerTopicCount; + +@Param({"100"}) +private int topicCount; + +@Param({"500", "1000"}) +private int memberCount; + +@Param({"true", "false"}) +private boolean isRackAware; + +@Param({"true", "false"}) +private boolean isSubscriptionUniform; + +@Param({"true", "false"}) +private boolean isRangeAssignor; + +@Param({"true", "false"}) +private boolean isReassignment; Review Comment: That makes sense, I'll adopt something similar -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Created] (KAFKA-16554) Online downgrade triggering and group type conversion
Dongnuo Lyu created KAFKA-16554: --- Summary: Online downgrade triggering and group type conversion Key: KAFKA-16554 URL: https://issues.apache.org/jira/browse/KAFKA-16554 Project: Kafka Issue Type: Sub-task Reporter: Dongnuo Lyu -- This message was sent by Atlassian Jira (v8.20.10#820010)
Re: [PR] JMH Benchmarks for testing the performance of the Server Side Rebalances: KIP_848 [kafka]
rreddy-22 commented on code in PR #15717: URL: https://github.com/apache/kafka/pull/15717#discussion_r1566211483 ## jmh-benchmarks/src/main/java/org/apache/kafka/jmh/group_coordinator/ClientSideAssignorBenchmark.java: ## @@ -0,0 +1,198 @@ +package org.apache.kafka.jmh.group_coordinator; + +import org.apache.kafka.clients.consumer.ConsumerPartitionAssignor; +import org.apache.kafka.clients.consumer.CooperativeStickyAssignor; +import org.apache.kafka.clients.consumer.RangeAssignor; +import org.apache.kafka.clients.consumer.internals.AbstractPartitionAssignor; +import org.apache.kafka.common.Node; +import org.apache.kafka.common.PartitionInfo; +import org.apache.kafka.common.TopicPartition; +import org.openjdk.jmh.annotations.Benchmark; +import org.openjdk.jmh.annotations.BenchmarkMode; +import org.openjdk.jmh.annotations.Fork; +import org.openjdk.jmh.annotations.Level; +import org.openjdk.jmh.annotations.Measurement; +import org.openjdk.jmh.annotations.Mode; +import org.openjdk.jmh.annotations.OutputTimeUnit; +import org.openjdk.jmh.annotations.Param; +import org.openjdk.jmh.annotations.Scope; +import org.openjdk.jmh.annotations.Setup; +import org.openjdk.jmh.annotations.State; +import org.openjdk.jmh.annotations.Threads; +import org.openjdk.jmh.annotations.Warmup; + +import java.util.ArrayList; +import java.util.Arrays; +import java.util.Collections; +import java.util.HashMap; +import java.util.List; +import java.util.Map; +import java.util.Optional; +import java.util.concurrent.TimeUnit; + +import static java.lang.Integer.max; +import static org.apache.kafka.clients.consumer.internals.AbstractStickyAssignor.DEFAULT_GENERATION; + +@State(Scope.Benchmark) +@Fork(value = 1) +@Warmup(iterations = 5) +@Measurement(iterations = 5) +@BenchmarkMode(Mode.AverageTime) +@OutputTimeUnit(TimeUnit.MILLISECONDS) +public class ClientSideAssignorBenchmark { + +@Param({"10", "50", "100"}) +private int partitionsPerTopicCount; + +@Param({"100"}) +private int topicCount; + +@Param({"500", "1000", "1"}) +private int memberCount; + +@Param({"true", "false"}) +private boolean isRackAware; + +@Param({"true", "false"}) +private boolean isSubscriptionUniform; + +@Param({"true", "false"}) +private boolean isRangeAssignor; + +@Param({"true", "false"}) +private boolean isReassignment; + +private Map subscriptions = new HashMap<>(); + +private final int numBrokerRacks = 3; + +private final int replicationFactor = 2; + +protected AbstractPartitionAssignor assignor; Review Comment: And we use the PartitionAssignor interface in the server side benchmark -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] JMH Benchmarks for testing the performance of the Server Side Rebalances: KIP_848 [kafka]
rreddy-22 commented on code in PR #15717: URL: https://github.com/apache/kafka/pull/15717#discussion_r1566211207 ## jmh-benchmarks/src/main/java/org/apache/kafka/jmh/group_coordinator/ClientSideAssignorBenchmark.java: ## @@ -0,0 +1,198 @@ +package org.apache.kafka.jmh.group_coordinator; + +import org.apache.kafka.clients.consumer.ConsumerPartitionAssignor; +import org.apache.kafka.clients.consumer.CooperativeStickyAssignor; +import org.apache.kafka.clients.consumer.RangeAssignor; +import org.apache.kafka.clients.consumer.internals.AbstractPartitionAssignor; +import org.apache.kafka.common.Node; +import org.apache.kafka.common.PartitionInfo; +import org.apache.kafka.common.TopicPartition; +import org.openjdk.jmh.annotations.Benchmark; +import org.openjdk.jmh.annotations.BenchmarkMode; +import org.openjdk.jmh.annotations.Fork; +import org.openjdk.jmh.annotations.Level; +import org.openjdk.jmh.annotations.Measurement; +import org.openjdk.jmh.annotations.Mode; +import org.openjdk.jmh.annotations.OutputTimeUnit; +import org.openjdk.jmh.annotations.Param; +import org.openjdk.jmh.annotations.Scope; +import org.openjdk.jmh.annotations.Setup; +import org.openjdk.jmh.annotations.State; +import org.openjdk.jmh.annotations.Threads; +import org.openjdk.jmh.annotations.Warmup; + +import java.util.ArrayList; +import java.util.Arrays; +import java.util.Collections; +import java.util.HashMap; +import java.util.List; +import java.util.Map; +import java.util.Optional; +import java.util.concurrent.TimeUnit; + +import static java.lang.Integer.max; +import static org.apache.kafka.clients.consumer.internals.AbstractStickyAssignor.DEFAULT_GENERATION; + +@State(Scope.Benchmark) +@Fork(value = 1) +@Warmup(iterations = 5) +@Measurement(iterations = 5) +@BenchmarkMode(Mode.AverageTime) +@OutputTimeUnit(TimeUnit.MILLISECONDS) +public class ClientSideAssignorBenchmark { + +@Param({"10", "50", "100"}) +private int partitionsPerTopicCount; + +@Param({"100"}) +private int topicCount; + +@Param({"500", "1000", "1"}) +private int memberCount; + +@Param({"true", "false"}) +private boolean isRackAware; + +@Param({"true", "false"}) +private boolean isSubscriptionUniform; + +@Param({"true", "false"}) +private boolean isRangeAssignor; + +@Param({"true", "false"}) +private boolean isReassignment; + +private Map subscriptions = new HashMap<>(); + +private final int numBrokerRacks = 3; + +private final int replicationFactor = 2; + +protected AbstractPartitionAssignor assignor; Review Comment: Hmm makes sense, I guess then it would be more accurate since it would include the time taken to create the metadata to pass to the assignor in the assign function, I'll look into it -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] JMH Benchmarks for testing the performance of the Server Side Rebalances: KIP_848 [kafka]
rreddy-22 commented on code in PR #15717: URL: https://github.com/apache/kafka/pull/15717#discussion_r1566204495 ## jmh-benchmarks/src/main/java/org/apache/kafka/jmh/group_coordinator/ServerSideAssignorBenchmark.java: ## @@ -0,0 +1,185 @@ +package org.apache.kafka.jmh.group_coordinator; + +import org.apache.kafka.common.Uuid; +import org.apache.kafka.coordinator.group.assignor.AssignmentMemberSpec; +import org.apache.kafka.coordinator.group.assignor.AssignmentSpec; +import org.apache.kafka.coordinator.group.assignor.GroupAssignment; +import org.apache.kafka.coordinator.group.assignor.MemberAssignment; +import org.apache.kafka.coordinator.group.assignor.PartitionAssignor; +import org.apache.kafka.coordinator.group.assignor.RangeAssignor; +import org.apache.kafka.coordinator.group.assignor.SubscribedTopicDescriber; +import org.apache.kafka.coordinator.group.assignor.UniformAssignor; +import org.apache.kafka.coordinator.group.consumer.SubscribedTopicMetadata; +import org.apache.kafka.coordinator.group.consumer.TopicMetadata; +import org.openjdk.jmh.annotations.Benchmark; +import org.openjdk.jmh.annotations.BenchmarkMode; +import org.openjdk.jmh.annotations.Fork; +import org.openjdk.jmh.annotations.Level; +import org.openjdk.jmh.annotations.Measurement; +import org.openjdk.jmh.annotations.Mode; +import org.openjdk.jmh.annotations.OutputTimeUnit; +import org.openjdk.jmh.annotations.Param; +import org.openjdk.jmh.annotations.Scope; +import org.openjdk.jmh.annotations.Setup; +import org.openjdk.jmh.annotations.State; +import org.openjdk.jmh.annotations.Threads; +import org.openjdk.jmh.annotations.Warmup; + +import java.util.ArrayList; +import java.util.Arrays; +import java.util.Collections; +import java.util.HashMap; +import java.util.HashSet; +import java.util.List; +import java.util.Map; +import java.util.Optional; +import java.util.Set; +import java.util.TreeMap; +import java.util.concurrent.TimeUnit; + +import static java.lang.Integer.max; + +@State(Scope.Benchmark) +@Fork(value = 1) +@Warmup(iterations = 5) +@Measurement(iterations = 5) +@BenchmarkMode(Mode.AverageTime) +@OutputTimeUnit(TimeUnit.MILLISECONDS) +public class ServerSideAssignorBenchmark { + +@Param({"10", "50", "100"}) +private int partitionsPerTopicCount; + +@Param({"100"}) +private int topicCount; + +@Param({"500", "1000"}) +private int memberCount; Review Comment: Hmm interesting, that way we can have more realistic ratios of members to topics/partitions, let me look into it -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-15615: Improve handling of fetching during metadata updates. [kafka]
appchemist commented on PR #15647: URL: https://github.com/apache/kafka/pull/15647#issuecomment-2057454239 > thanks for the PR! just curious, what is the problem without the newly added state? does it cause any issues? @johnnychhsu When a kafka consumer encounters a FENCED_LEADER_EPOCH error or other errors requiring metadata updates, a metadata update is triggered. However, subscription's fetchstate is FETCHING. In this state, the consumer will continuously fetch from the old leader.(because of PreferredReadReplica is reset and metadata is not yet updated) This patch is designed to make consumer wait without sending fetch while metadata is being updated. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] JMH Benchmarks for testing the performance of the Server Side Rebalances: KIP_848 [kafka]
rreddy-22 commented on code in PR #15717: URL: https://github.com/apache/kafka/pull/15717#discussion_r1566203863 ## jmh-benchmarks/src/main/java/org/apache/kafka/jmh/group_coordinator/ServerSideAssignorBenchmark.java: ## @@ -0,0 +1,185 @@ +package org.apache.kafka.jmh.group_coordinator; + +import org.apache.kafka.common.Uuid; +import org.apache.kafka.coordinator.group.assignor.AssignmentMemberSpec; +import org.apache.kafka.coordinator.group.assignor.AssignmentSpec; +import org.apache.kafka.coordinator.group.assignor.GroupAssignment; +import org.apache.kafka.coordinator.group.assignor.MemberAssignment; +import org.apache.kafka.coordinator.group.assignor.PartitionAssignor; +import org.apache.kafka.coordinator.group.assignor.RangeAssignor; +import org.apache.kafka.coordinator.group.assignor.SubscribedTopicDescriber; +import org.apache.kafka.coordinator.group.assignor.UniformAssignor; +import org.apache.kafka.coordinator.group.consumer.SubscribedTopicMetadata; +import org.apache.kafka.coordinator.group.consumer.TopicMetadata; +import org.openjdk.jmh.annotations.Benchmark; +import org.openjdk.jmh.annotations.BenchmarkMode; +import org.openjdk.jmh.annotations.Fork; +import org.openjdk.jmh.annotations.Level; +import org.openjdk.jmh.annotations.Measurement; +import org.openjdk.jmh.annotations.Mode; +import org.openjdk.jmh.annotations.OutputTimeUnit; +import org.openjdk.jmh.annotations.Param; +import org.openjdk.jmh.annotations.Scope; +import org.openjdk.jmh.annotations.Setup; +import org.openjdk.jmh.annotations.State; +import org.openjdk.jmh.annotations.Threads; +import org.openjdk.jmh.annotations.Warmup; + +import java.util.ArrayList; +import java.util.Arrays; +import java.util.Collections; +import java.util.HashMap; +import java.util.HashSet; +import java.util.List; +import java.util.Map; +import java.util.Optional; +import java.util.Set; +import java.util.TreeMap; +import java.util.concurrent.TimeUnit; + +import static java.lang.Integer.max; + +@State(Scope.Benchmark) +@Fork(value = 1) +@Warmup(iterations = 5) +@Measurement(iterations = 5) +@BenchmarkMode(Mode.AverageTime) +@OutputTimeUnit(TimeUnit.MILLISECONDS) +public class ServerSideAssignorBenchmark { + +@Param({"10", "50", "100"}) +private int partitionsPerTopicCount; + +@Param({"100"}) +private int topicCount; + +@Param({"500", "1000"}) +private int memberCount; Review Comment: Yes! I was going to ask what cases we want in the final benchmarks, they are not uniform or finalized yet -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] JMH Benchmarks for testing the performance of the Server Side Rebalances: KIP_848 [kafka]
rreddy-22 commented on code in PR #15717: URL: https://github.com/apache/kafka/pull/15717#discussion_r1566200808 ## jmh-benchmarks/src/main/java/org/apache/kafka/jmh/group_coordinator/AssignPartitionsMicroBenchmark.java: ## @@ -0,0 +1,153 @@ +package org.apache.kafka.jmh.group_coordinator; Review Comment: Oh yes! Thanks I completely missed that! -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] JMH Benchmarks for testing the performance of the Server Side Rebalances: KIP_848 [kafka]
rreddy-22 commented on code in PR #15717: URL: https://github.com/apache/kafka/pull/15717#discussion_r1566199529 ## jmh-benchmarks/src/main/java/org/apache/kafka/jmh/group_coordinator/AssignPartitionsMicroBenchmark.java: ## @@ -0,0 +1,153 @@ +package org.apache.kafka.jmh.group_coordinator; + +import org.apache.kafka.clients.consumer.CooperativeStickyAssignor; +import org.apache.kafka.clients.consumer.internals.AbstractPartitionAssignor; +import org.apache.kafka.common.TopicPartition; +import org.apache.kafka.common.Uuid; +import org.apache.kafka.coordinator.group.assignor.MemberAssignment; +import org.apache.kafka.coordinator.group.assignor.PartitionAssignor; +import org.apache.kafka.coordinator.group.assignor.UniformAssignor; +import org.apache.kafka.server.common.TopicIdPartition; +import org.openjdk.jmh.annotations.Benchmark; +import org.openjdk.jmh.annotations.BenchmarkMode; +import org.openjdk.jmh.annotations.Fork; +import org.openjdk.jmh.annotations.Level; +import org.openjdk.jmh.annotations.Measurement; +import org.openjdk.jmh.annotations.Mode; +import org.openjdk.jmh.annotations.OutputTimeUnit; +import org.openjdk.jmh.annotations.Param; +import org.openjdk.jmh.annotations.Scope; +import org.openjdk.jmh.annotations.Setup; +import org.openjdk.jmh.annotations.State; +import org.openjdk.jmh.annotations.Threads; +import org.openjdk.jmh.annotations.Warmup; + +import java.util.ArrayList; +import java.util.HashMap; +import java.util.HashSet; +import java.util.List; +import java.util.Map; +import java.util.concurrent.TimeUnit; +import java.util.stream.Collectors; + +@State(Scope.Benchmark) +@Fork(value = 1) +@Warmup(iterations = 5) +@Measurement(iterations = 7) +@BenchmarkMode(Mode.AverageTime) +@OutputTimeUnit(TimeUnit.MILLISECONDS) +public class AssignPartitionsMicroBenchmark { Review Comment: Yep agreed, I'll remove it -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-13328, KAFKA-13329 (2): Add custom preflight validation support for connector header, key, and value converters [kafka]
C0urante commented on PR #14309: URL: https://github.com/apache/kafka/pull/14309#issuecomment-2057408474 Hi @gharris1727 sorry for the delay. I've resolved the merge conflicts; let me know what you think if you have a moment. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Commented] (KAFKA-13105) Expose a method in KafkaConfig to write the configs to a logger
[ https://issues.apache.org/jira/browse/KAFKA-13105?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17837366#comment-17837366 ] Johnny Hsu commented on KAFKA-13105: hi [~cmccabe] I would like to work on this if no one is on it now :D > Expose a method in KafkaConfig to write the configs to a logger > --- > > Key: KAFKA-13105 > URL: https://issues.apache.org/jira/browse/KAFKA-13105 > Project: Kafka > Issue Type: Improvement >Reporter: Colin McCabe >Priority: Minor > Labels: 4.0-blocker > > We should expose a method in KafkaConfig to write the configs to a logger. > Currently there is no good way to write them out except creating a new > KafkaConfig object with doLog = true, which is unintuitive. -- This message was sent by Atlassian Jira (v8.20.10#820010)
Re: [PR] KAFKA-15853: Move KafkaConfig log properties and docs out of core [kafka]
OmniaGM commented on code in PR #15569: URL: https://github.com/apache/kafka/pull/15569#discussion_r1566156280 ## server/src/main/java/org/apache/kafka/server/config/KafkaLogConfigs.java: ## @@ -0,0 +1,166 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.kafka.server.config; + +import org.apache.kafka.common.config.TopicConfig; +import static org.apache.kafka.server.config.ServerTopicConfigSynonyms.LOG_PREFIX; + +/** + * Common home for broker-side log configs which need to be accessible from the libraries shared + * between the broker and the multiple modules in Kafka. + * + * Note this is an internal API and subject to change without notice. + */ +public class KafkaLogConfigs { +public static final String NUM_PARTITIONS_CONFIG = "num.partitions"; +public static final int NUM_PARTITIONS_DEFAULT = 1; +public static final String NUM_PARTITIONS_DOC = "The default number of log partitions per topic"; + +public static final String LOG_DIRS_CONFIG = LOG_PREFIX + "dirs"; +public static final String LOG_DIR_CONFIG = LOG_PREFIX + "dir"; +public static final String LOG_DIR_DEFAULT = "/tmp/kafka-logs"; +public static final String LOG_DIR_DOC = "The directory in which the log data is kept (supplemental for " + LOG_DIRS_CONFIG + " property)"; +public static final String LOG_DIRS_DOC = "A comma-separated list of the directories where the log data is stored. If not set, the value in " + LOG_DIR_CONFIG + " is used."; + +public static final String LOG_SEGMENT_BYTES_CONFIG = ServerTopicConfigSynonyms.serverSynonym(TopicConfig.SEGMENT_BYTES_CONFIG); +public static final String LOG_SEGMENT_BYTES_DOC = "The maximum size of a single log file"; + +public static final String LOG_ROLL_TIME_MILLIS_CONFIG = ServerTopicConfigSynonyms.serverSynonym(TopicConfig.SEGMENT_MS_CONFIG); +public static final String LOG_ROLL_TIME_HOURS_CONFIG = LOG_PREFIX + "roll.hours"; +public static final String LOG_ROLL_TIME_MILLIS_DOC = "The maximum time before a new log segment is rolled out (in milliseconds). If not set, the value in " + LOG_ROLL_TIME_HOURS_CONFIG + " is used"; +public static final String LOG_ROLL_TIME_HOURS_DOC = "The maximum time before a new log segment is rolled out (in hours), secondary to " + LOG_ROLL_TIME_MILLIS_CONFIG + " property"; + +public static final String LOG_ROLL_TIME_JITTER_MILLIS_CONFIG = ServerTopicConfigSynonyms.serverSynonym(TopicConfig.SEGMENT_JITTER_MS_CONFIG); +public static final String LOG_ROLL_TIME_JITTER_HOURS_CONFIG = LOG_PREFIX + "roll.jitter.hours"; +public static final String LOG_ROLL_TIME_JITTER_MILLIS_DOC = "The maximum jitter to subtract from logRollTimeMillis (in milliseconds). If not set, the value in " + LOG_ROLL_TIME_JITTER_HOURS_CONFIG + " is used"; +public static final String LOG_ROLL_TIME_JITTER_HOURS_DOC = "The maximum jitter to subtract from logRollTimeMillis (in hours), secondary to " + LOG_ROLL_TIME_JITTER_MILLIS_CONFIG + " property"; + + +public static final String LOG_RETENTION_TIME_MILLIS_CONFIG = ServerTopicConfigSynonyms.serverSynonym(TopicConfig.RETENTION_MS_CONFIG); +public static final String LOG_RETENTION_TIME_MINUTES_CONFIG = LOG_PREFIX + "retention.minutes"; +public static final String LOG_RETENTION_TIME_HOURS_CONFIG = LOG_PREFIX + "retention.hours"; +public static final String LOG_RETENTION_TIME_MILLIS_DOC = "The number of milliseconds to keep a log file before deleting it (in milliseconds), If not set, the value in " + LOG_RETENTION_TIME_MINUTES_CONFIG + " is used. If set to -1, no time limit is applied."; +public static final String LOG_RETENTION_TIME_MINUTES_DOC = "The number of minutes to keep a log file before deleting it (in minutes), secondary to " + LOG_RETENTION_TIME_MILLIS_CONFIG + " property. If not set, the value in " + LOG_RETENTION_TIME_HOURS_CONFIG + " is used"; +public static final String LOG_RETENTION_TIME_HOURS_DOC = "The number of hours to keep a log file before deleting it (in hours), tertiary to " + LOG_RETENTION_TIME_MILLIS_CONFIG + " property"; + +public static final String LOG_RETENTION_BYTES_CONFIG = ServerTopicConfigSynonyms.ser
Re: [PR] KAFKA-15853: Move KafkaConfig log properties and docs out of core [kafka]
OmniaGM commented on code in PR #15569: URL: https://github.com/apache/kafka/pull/15569#discussion_r1566156280 ## server/src/main/java/org/apache/kafka/server/config/KafkaLogConfigs.java: ## @@ -0,0 +1,166 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.kafka.server.config; + +import org.apache.kafka.common.config.TopicConfig; +import static org.apache.kafka.server.config.ServerTopicConfigSynonyms.LOG_PREFIX; + +/** + * Common home for broker-side log configs which need to be accessible from the libraries shared + * between the broker and the multiple modules in Kafka. + * + * Note this is an internal API and subject to change without notice. + */ +public class KafkaLogConfigs { +public static final String NUM_PARTITIONS_CONFIG = "num.partitions"; +public static final int NUM_PARTITIONS_DEFAULT = 1; +public static final String NUM_PARTITIONS_DOC = "The default number of log partitions per topic"; + +public static final String LOG_DIRS_CONFIG = LOG_PREFIX + "dirs"; +public static final String LOG_DIR_CONFIG = LOG_PREFIX + "dir"; +public static final String LOG_DIR_DEFAULT = "/tmp/kafka-logs"; +public static final String LOG_DIR_DOC = "The directory in which the log data is kept (supplemental for " + LOG_DIRS_CONFIG + " property)"; +public static final String LOG_DIRS_DOC = "A comma-separated list of the directories where the log data is stored. If not set, the value in " + LOG_DIR_CONFIG + " is used."; + +public static final String LOG_SEGMENT_BYTES_CONFIG = ServerTopicConfigSynonyms.serverSynonym(TopicConfig.SEGMENT_BYTES_CONFIG); +public static final String LOG_SEGMENT_BYTES_DOC = "The maximum size of a single log file"; + +public static final String LOG_ROLL_TIME_MILLIS_CONFIG = ServerTopicConfigSynonyms.serverSynonym(TopicConfig.SEGMENT_MS_CONFIG); +public static final String LOG_ROLL_TIME_HOURS_CONFIG = LOG_PREFIX + "roll.hours"; +public static final String LOG_ROLL_TIME_MILLIS_DOC = "The maximum time before a new log segment is rolled out (in milliseconds). If not set, the value in " + LOG_ROLL_TIME_HOURS_CONFIG + " is used"; +public static final String LOG_ROLL_TIME_HOURS_DOC = "The maximum time before a new log segment is rolled out (in hours), secondary to " + LOG_ROLL_TIME_MILLIS_CONFIG + " property"; + +public static final String LOG_ROLL_TIME_JITTER_MILLIS_CONFIG = ServerTopicConfigSynonyms.serverSynonym(TopicConfig.SEGMENT_JITTER_MS_CONFIG); +public static final String LOG_ROLL_TIME_JITTER_HOURS_CONFIG = LOG_PREFIX + "roll.jitter.hours"; +public static final String LOG_ROLL_TIME_JITTER_MILLIS_DOC = "The maximum jitter to subtract from logRollTimeMillis (in milliseconds). If not set, the value in " + LOG_ROLL_TIME_JITTER_HOURS_CONFIG + " is used"; +public static final String LOG_ROLL_TIME_JITTER_HOURS_DOC = "The maximum jitter to subtract from logRollTimeMillis (in hours), secondary to " + LOG_ROLL_TIME_JITTER_MILLIS_CONFIG + " property"; + + +public static final String LOG_RETENTION_TIME_MILLIS_CONFIG = ServerTopicConfigSynonyms.serverSynonym(TopicConfig.RETENTION_MS_CONFIG); +public static final String LOG_RETENTION_TIME_MINUTES_CONFIG = LOG_PREFIX + "retention.minutes"; +public static final String LOG_RETENTION_TIME_HOURS_CONFIG = LOG_PREFIX + "retention.hours"; +public static final String LOG_RETENTION_TIME_MILLIS_DOC = "The number of milliseconds to keep a log file before deleting it (in milliseconds), If not set, the value in " + LOG_RETENTION_TIME_MINUTES_CONFIG + " is used. If set to -1, no time limit is applied."; +public static final String LOG_RETENTION_TIME_MINUTES_DOC = "The number of minutes to keep a log file before deleting it (in minutes), secondary to " + LOG_RETENTION_TIME_MILLIS_CONFIG + " property. If not set, the value in " + LOG_RETENTION_TIME_HOURS_CONFIG + " is used"; +public static final String LOG_RETENTION_TIME_HOURS_DOC = "The number of hours to keep a log file before deleting it (in hours), tertiary to " + LOG_RETENTION_TIME_MILLIS_CONFIG + " property"; + +public static final String LOG_RETENTION_BYTES_CONFIG = ServerTopicConfigSynonyms.ser
[PR] [WIP] KAFKA-16475: add test for TopicImageNodeTest [kafka]
johnnychhsu opened a new pull request, #15720: URL: https://github.com/apache/kafka/pull/15720 ## Context Add unit test for TopicImageNodeTest Jira ticket: https://issues.apache.org/jira/browse/KAFKA-16475 ## Test ### Committer Checklist (excluded from commit message) - [ ] Verify design and implementation - [ ] Verify test coverage and CI build status - [ ] Verify documentation (including upgrade notes) -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-15853: Move KafkaConfig log properties and docs out of core [kafka]
chia7712 commented on code in PR #15569: URL: https://github.com/apache/kafka/pull/15569#discussion_r1566129710 ## server/src/main/java/org/apache/kafka/server/config/KafkaLogConfigs.java: ## @@ -0,0 +1,166 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.kafka.server.config; + +import org.apache.kafka.common.config.TopicConfig; +import static org.apache.kafka.server.config.ServerTopicConfigSynonyms.LOG_PREFIX; + +/** + * Common home for broker-side log configs which need to be accessible from the libraries shared + * between the broker and the multiple modules in Kafka. + * + * Note this is an internal API and subject to change without notice. + */ +public class KafkaLogConfigs { +public static final String NUM_PARTITIONS_CONFIG = "num.partitions"; +public static final int NUM_PARTITIONS_DEFAULT = 1; +public static final String NUM_PARTITIONS_DOC = "The default number of log partitions per topic"; + +public static final String LOG_DIRS_CONFIG = LOG_PREFIX + "dirs"; +public static final String LOG_DIR_CONFIG = LOG_PREFIX + "dir"; +public static final String LOG_DIR_DEFAULT = "/tmp/kafka-logs"; +public static final String LOG_DIR_DOC = "The directory in which the log data is kept (supplemental for " + LOG_DIRS_CONFIG + " property)"; +public static final String LOG_DIRS_DOC = "A comma-separated list of the directories where the log data is stored. If not set, the value in " + LOG_DIR_CONFIG + " is used."; + +public static final String LOG_SEGMENT_BYTES_CONFIG = ServerTopicConfigSynonyms.serverSynonym(TopicConfig.SEGMENT_BYTES_CONFIG); +public static final String LOG_SEGMENT_BYTES_DOC = "The maximum size of a single log file"; + +public static final String LOG_ROLL_TIME_MILLIS_CONFIG = ServerTopicConfigSynonyms.serverSynonym(TopicConfig.SEGMENT_MS_CONFIG); +public static final String LOG_ROLL_TIME_HOURS_CONFIG = LOG_PREFIX + "roll.hours"; +public static final String LOG_ROLL_TIME_MILLIS_DOC = "The maximum time before a new log segment is rolled out (in milliseconds). If not set, the value in " + LOG_ROLL_TIME_HOURS_CONFIG + " is used"; +public static final String LOG_ROLL_TIME_HOURS_DOC = "The maximum time before a new log segment is rolled out (in hours), secondary to " + LOG_ROLL_TIME_MILLIS_CONFIG + " property"; + +public static final String LOG_ROLL_TIME_JITTER_MILLIS_CONFIG = ServerTopicConfigSynonyms.serverSynonym(TopicConfig.SEGMENT_JITTER_MS_CONFIG); +public static final String LOG_ROLL_TIME_JITTER_HOURS_CONFIG = LOG_PREFIX + "roll.jitter.hours"; +public static final String LOG_ROLL_TIME_JITTER_MILLIS_DOC = "The maximum jitter to subtract from logRollTimeMillis (in milliseconds). If not set, the value in " + LOG_ROLL_TIME_JITTER_HOURS_CONFIG + " is used"; +public static final String LOG_ROLL_TIME_JITTER_HOURS_DOC = "The maximum jitter to subtract from logRollTimeMillis (in hours), secondary to " + LOG_ROLL_TIME_JITTER_MILLIS_CONFIG + " property"; + + +public static final String LOG_RETENTION_TIME_MILLIS_CONFIG = ServerTopicConfigSynonyms.serverSynonym(TopicConfig.RETENTION_MS_CONFIG); +public static final String LOG_RETENTION_TIME_MINUTES_CONFIG = LOG_PREFIX + "retention.minutes"; +public static final String LOG_RETENTION_TIME_HOURS_CONFIG = LOG_PREFIX + "retention.hours"; +public static final String LOG_RETENTION_TIME_MILLIS_DOC = "The number of milliseconds to keep a log file before deleting it (in milliseconds), If not set, the value in " + LOG_RETENTION_TIME_MINUTES_CONFIG + " is used. If set to -1, no time limit is applied."; +public static final String LOG_RETENTION_TIME_MINUTES_DOC = "The number of minutes to keep a log file before deleting it (in minutes), secondary to " + LOG_RETENTION_TIME_MILLIS_CONFIG + " property. If not set, the value in " + LOG_RETENTION_TIME_HOURS_CONFIG + " is used"; +public static final String LOG_RETENTION_TIME_HOURS_DOC = "The number of hours to keep a log file before deleting it (in hours), tertiary to " + LOG_RETENTION_TIME_MILLIS_CONFIG + " property"; + +public static final String LOG_RETENTION_BYTES_CONFIG = ServerTopicConfigSynonyms.se
Re: [PR] KAFKA-15853: Move KafkaConfig Replication properties and docs out of … [kafka]
OmniaGM commented on code in PR #15575: URL: https://github.com/apache/kafka/pull/15575#discussion_r1566114227 ## core/src/main/scala/kafka/server/KafkaConfig.scala: ## @@ -1458,40 +1395,40 @@ class KafkaConfig private(doLog: Boolean, val props: java.util.Map[_, _], dynami def logMessageDownConversionEnable: Boolean = getBoolean(KafkaConfig.LogMessageDownConversionEnableProp) /** * Replication configuration ***/ - val controllerSocketTimeoutMs: Int = getInt(KafkaConfig.ControllerSocketTimeoutMsProp) - val defaultReplicationFactor: Int = getInt(KafkaConfig.DefaultReplicationFactorProp) - val replicaLagTimeMaxMs = getLong(KafkaConfig.ReplicaLagTimeMaxMsProp) - val replicaSocketTimeoutMs = getInt(KafkaConfig.ReplicaSocketTimeoutMsProp) - val replicaSocketReceiveBufferBytes = getInt(KafkaConfig.ReplicaSocketReceiveBufferBytesProp) - val replicaFetchMaxBytes = getInt(KafkaConfig.ReplicaFetchMaxBytesProp) - val replicaFetchWaitMaxMs = getInt(KafkaConfig.ReplicaFetchWaitMaxMsProp) - val replicaFetchMinBytes = getInt(KafkaConfig.ReplicaFetchMinBytesProp) - val replicaFetchResponseMaxBytes = getInt(KafkaConfig.ReplicaFetchResponseMaxBytesProp) - val replicaFetchBackoffMs = getInt(KafkaConfig.ReplicaFetchBackoffMsProp) - def numReplicaFetchers = getInt(KafkaConfig.NumReplicaFetchersProp) - val replicaHighWatermarkCheckpointIntervalMs = getLong(KafkaConfig.ReplicaHighWatermarkCheckpointIntervalMsProp) - val fetchPurgatoryPurgeIntervalRequests = getInt(KafkaConfig.FetchPurgatoryPurgeIntervalRequestsProp) - val producerPurgatoryPurgeIntervalRequests = getInt(KafkaConfig.ProducerPurgatoryPurgeIntervalRequestsProp) - val deleteRecordsPurgatoryPurgeIntervalRequests = getInt(KafkaConfig.DeleteRecordsPurgatoryPurgeIntervalRequestsProp) - val autoLeaderRebalanceEnable = getBoolean(KafkaConfig.AutoLeaderRebalanceEnableProp) - val leaderImbalancePerBrokerPercentage = getInt(KafkaConfig.LeaderImbalancePerBrokerPercentageProp) - val leaderImbalanceCheckIntervalSeconds: Long = getLong(KafkaConfig.LeaderImbalanceCheckIntervalSecondsProp) - def uncleanLeaderElectionEnable: java.lang.Boolean = getBoolean(KafkaConfig.UncleanLeaderElectionEnableProp) + val controllerSocketTimeoutMs: Int = getInt(ReplicationConfigs.CONTROLLER_SOCKET_TIMEOUT_MS_CONFIG) + val defaultReplicationFactor: Int = getInt(ReplicationConfigs.DEFAULT_REPLICATION_FACTOR_CONFIG) + val replicaLagTimeMaxMs = getLong(ReplicationConfigs.REPLICA_LAG_TIME_MAX_MS_CONFIG) + val replicaSocketTimeoutMs = getInt(ReplicationConfigs.REPLICA_SOCKET_TIMEOUT_MS_CONFIG) + val replicaSocketReceiveBufferBytes = getInt(ReplicationConfigs.REPLICA_SOCKET_RECEIVE_BUFFER_BYTES_CONFIG) + val replicaFetchMaxBytes = getInt(ReplicationConfigs.REPLICA_FETCH_MAX_BYTES_CONFIG) + val replicaFetchWaitMaxMs = getInt(ReplicationConfigs.REPLICA_FETCH_WAIT_MAX_MS_CONFIG) + val replicaFetchMinBytes = getInt(ReplicationConfigs.REPLICA_FETCH_MIN_BYTES_CONFIG) + val replicaFetchResponseMaxBytes = getInt(ReplicationConfigs.REPLICA_FETCH_RESPONSE_MAX_BYTES_CONFIG) + val replicaFetchBackoffMs = getInt(ReplicationConfigs.REPLICA_FETCH_BACKOFF_MS_CONFIG) + def numReplicaFetchers = getInt(ReplicationConfigs.NUM_REPLICA_FETCHERS_CONFIG) + val replicaHighWatermarkCheckpointIntervalMs = getLong(ReplicationConfigs.REPLICA_HIGH_WATERMARK_CHECKPOINT_INTERVAL_MS_CONFIG) + val fetchPurgatoryPurgeIntervalRequests = getInt(ReplicationConfigs.FETCH_PURGATORY_PURGE_INTERVAL_REQUESTS_CONFIG) + val producerPurgatoryPurgeIntervalRequests = getInt(ReplicationConfigs.PRODUCER_PURGATORY_PURGE_INTERVAL_REQUESTS_CONFIG) + val deleteRecordsPurgatoryPurgeIntervalRequests = getInt(ReplicationConfigs.DELETE_RECORDS_PURGATORY_PURGE_INTERVAL_REQUESTS_CONFIG) + val autoLeaderRebalanceEnable = getBoolean(ReplicationConfigs.AUTO_LEADER_REBALANCE_ENABLE_CONFIG) + val leaderImbalancePerBrokerPercentage = getInt(ReplicationConfigs.LEADER_IMBALANCE_PER_BROKER_PERCENTAGE_CONFIG) + val leaderImbalanceCheckIntervalSeconds: Long = getLong(ReplicationConfigs.LEADER_IMBALANCE_CHECK_INTERVAL_SECONDS_CONFIG) + def uncleanLeaderElectionEnable: Boolean = getBoolean(ReplicationConfigs.UNCLEAN_LEADER_ELECTION_ENABLE_CONFIG) Review Comment: All of this will be converted to java soon as part of the same KAFKA-15853 ## core/src/main/scala/kafka/server/KafkaConfig.scala: ## @@ -1458,40 +1395,40 @@ class KafkaConfig private(doLog: Boolean, val props: java.util.Map[_, _], dynami def logMessageDownConversionEnable: Boolean = getBoolean(KafkaConfig.LogMessageDownConversionEnableProp) /** * Replication configuration ***/ - val controllerSocketTimeoutMs: Int = getInt(KafkaConfig.ControllerSocketTimeoutMsProp) - val defaultReplicationFactor: Int = getInt(KafkaConfig.DefaultReplicationFactorProp) - val replicaLagTimeMaxMs = getLong(KafkaConfig.ReplicaLagTimeMaxMsProp) - val replicaSocketTimeoutMs = ge
Re: [PR] KAFKA-15853: Move KafkaConfig Replication properties and docs out of … [kafka]
OmniaGM commented on code in PR #15575: URL: https://github.com/apache/kafka/pull/15575#discussion_r1566111409 ## core/src/main/scala/kafka/server/KafkaConfig.scala: ## @@ -1458,40 +1395,40 @@ class KafkaConfig private(doLog: Boolean, val props: java.util.Map[_, _], dynami def logMessageDownConversionEnable: Boolean = getBoolean(KafkaConfig.LogMessageDownConversionEnableProp) /** * Replication configuration ***/ - val controllerSocketTimeoutMs: Int = getInt(KafkaConfig.ControllerSocketTimeoutMsProp) - val defaultReplicationFactor: Int = getInt(KafkaConfig.DefaultReplicationFactorProp) - val replicaLagTimeMaxMs = getLong(KafkaConfig.ReplicaLagTimeMaxMsProp) - val replicaSocketTimeoutMs = getInt(KafkaConfig.ReplicaSocketTimeoutMsProp) - val replicaSocketReceiveBufferBytes = getInt(KafkaConfig.ReplicaSocketReceiveBufferBytesProp) - val replicaFetchMaxBytes = getInt(KafkaConfig.ReplicaFetchMaxBytesProp) - val replicaFetchWaitMaxMs = getInt(KafkaConfig.ReplicaFetchWaitMaxMsProp) - val replicaFetchMinBytes = getInt(KafkaConfig.ReplicaFetchMinBytesProp) - val replicaFetchResponseMaxBytes = getInt(KafkaConfig.ReplicaFetchResponseMaxBytesProp) - val replicaFetchBackoffMs = getInt(KafkaConfig.ReplicaFetchBackoffMsProp) - def numReplicaFetchers = getInt(KafkaConfig.NumReplicaFetchersProp) - val replicaHighWatermarkCheckpointIntervalMs = getLong(KafkaConfig.ReplicaHighWatermarkCheckpointIntervalMsProp) - val fetchPurgatoryPurgeIntervalRequests = getInt(KafkaConfig.FetchPurgatoryPurgeIntervalRequestsProp) - val producerPurgatoryPurgeIntervalRequests = getInt(KafkaConfig.ProducerPurgatoryPurgeIntervalRequestsProp) - val deleteRecordsPurgatoryPurgeIntervalRequests = getInt(KafkaConfig.DeleteRecordsPurgatoryPurgeIntervalRequestsProp) - val autoLeaderRebalanceEnable = getBoolean(KafkaConfig.AutoLeaderRebalanceEnableProp) - val leaderImbalancePerBrokerPercentage = getInt(KafkaConfig.LeaderImbalancePerBrokerPercentageProp) - val leaderImbalanceCheckIntervalSeconds: Long = getLong(KafkaConfig.LeaderImbalanceCheckIntervalSecondsProp) - def uncleanLeaderElectionEnable: java.lang.Boolean = getBoolean(KafkaConfig.UncleanLeaderElectionEnableProp) + val controllerSocketTimeoutMs: Int = getInt(ReplicationConfigs.CONTROLLER_SOCKET_TIMEOUT_MS_CONFIG) + val defaultReplicationFactor: Int = getInt(ReplicationConfigs.DEFAULT_REPLICATION_FACTOR_CONFIG) + val replicaLagTimeMaxMs = getLong(ReplicationConfigs.REPLICA_LAG_TIME_MAX_MS_CONFIG) + val replicaSocketTimeoutMs = getInt(ReplicationConfigs.REPLICA_SOCKET_TIMEOUT_MS_CONFIG) + val replicaSocketReceiveBufferBytes = getInt(ReplicationConfigs.REPLICA_SOCKET_RECEIVE_BUFFER_BYTES_CONFIG) + val replicaFetchMaxBytes = getInt(ReplicationConfigs.REPLICA_FETCH_MAX_BYTES_CONFIG) + val replicaFetchWaitMaxMs = getInt(ReplicationConfigs.REPLICA_FETCH_WAIT_MAX_MS_CONFIG) + val replicaFetchMinBytes = getInt(ReplicationConfigs.REPLICA_FETCH_MIN_BYTES_CONFIG) + val replicaFetchResponseMaxBytes = getInt(ReplicationConfigs.REPLICA_FETCH_RESPONSE_MAX_BYTES_CONFIG) + val replicaFetchBackoffMs = getInt(ReplicationConfigs.REPLICA_FETCH_BACKOFF_MS_CONFIG) + def numReplicaFetchers = getInt(ReplicationConfigs.NUM_REPLICA_FETCHERS_CONFIG) + val replicaHighWatermarkCheckpointIntervalMs = getLong(ReplicationConfigs.REPLICA_HIGH_WATERMARK_CHECKPOINT_INTERVAL_MS_CONFIG) + val fetchPurgatoryPurgeIntervalRequests = getInt(ReplicationConfigs.FETCH_PURGATORY_PURGE_INTERVAL_REQUESTS_CONFIG) + val producerPurgatoryPurgeIntervalRequests = getInt(ReplicationConfigs.PRODUCER_PURGATORY_PURGE_INTERVAL_REQUESTS_CONFIG) + val deleteRecordsPurgatoryPurgeIntervalRequests = getInt(ReplicationConfigs.DELETE_RECORDS_PURGATORY_PURGE_INTERVAL_REQUESTS_CONFIG) + val autoLeaderRebalanceEnable = getBoolean(ReplicationConfigs.AUTO_LEADER_REBALANCE_ENABLE_CONFIG) + val leaderImbalancePerBrokerPercentage = getInt(ReplicationConfigs.LEADER_IMBALANCE_PER_BROKER_PERCENTAGE_CONFIG) + val leaderImbalanceCheckIntervalSeconds: Long = getLong(ReplicationConfigs.LEADER_IMBALANCE_CHECK_INTERVAL_SECONDS_CONFIG) + def uncleanLeaderElectionEnable: Boolean = getBoolean(ReplicationConfigs.UNCLEAN_LEADER_ELECTION_ENABLE_CONFIG) Review Comment: revert it as this was already the original declaration -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-16316: Configure reprocessing with addGlobalStateStore [kafka]
AyoubOm commented on code in PR #15619: URL: https://github.com/apache/kafka/pull/15619#discussion_r1566096443 ## streams/src/main/java/org/apache/kafka/streams/StreamsBuilder.java: ## @@ -613,6 +616,74 @@ public synchronized StreamsBuilder addGlobalStore(final StoreBuilder< return this; } +/** + * Adds a global {@link StateStore} to the topology. + * The {@link StateStore} sources its data from all partitions of the provided input topic. + * There will be exactly one instance of this {@link StateStore} per Kafka Streams instance. + * + * A {@link SourceNode} with the provided sourceName will be added to consume the data arriving from the partitions + * of the input topic. + * + * The provided {@link ProcessorSupplier} will be used to create an + * {@link Processor} that will receive all records forwarded from the {@link SourceNode}. + * The supplier should always generate a new instance. Creating a single {@link Processor} object + * and returning the same object reference in {@link ProcessorSupplier#get()} is a + * violation of the supplier pattern and leads to runtime exceptions. + * This {@link Processor} should be used to keep the {@link StateStore} up-to-date. + * The default {@link TimestampExtractor} as specified in the {@link StreamsConfig config} is used. + * + * It is not required to connect a global store to the {@link Processor Processors}, + * {@link Transformer Transformers}, or {@link ValueTransformer ValueTransformer}; those have read-only access to all global stores by default. + * + * @param storeBuilder user defined {@link StoreBuilder}; can't be {@code null} + * @param topic the topic to source the data from + * @param consumed the instance of {@link Consumed} used to define optional parameters; can't be {@code null} + * @param stateUpdateSupplier the instance of {@link ProcessorSupplier} + * @param reprocessOnRestorerestore by reprocessing the data using a processor supplied by stateUpdateSupplier or loads the data in byte for byte Review Comment: nit: ```suggestion * @param reprocessOnRestorerestore by reprocessing the data using a processor supplied by stateUpdateSupplier or load the data in byte for byte ``` Wondering if it would make it clearer to say "if true, restore ..., otherwise load .." ## streams/src/main/java/org/apache/kafka/streams/StreamsBuilder.java: ## @@ -613,6 +616,74 @@ public synchronized StreamsBuilder addGlobalStore(final StoreBuilder< return this; } +/** + * Adds a global {@link StateStore} to the topology. + * The {@link StateStore} sources its data from all partitions of the provided input topic. + * There will be exactly one instance of this {@link StateStore} per Kafka Streams instance. + * + * A {@link SourceNode} with the provided sourceName will be added to consume the data arriving from the partitions + * of the input topic. + * + * The provided {@link ProcessorSupplier} will be used to create an Review Comment: nit: ```suggestion * The provided {@link ProcessorSupplier} will be used to create a ``` ## streams/src/main/java/org/apache/kafka/streams/StreamsBuilder.java: ## @@ -613,6 +616,74 @@ public synchronized StreamsBuilder addGlobalStore(final StoreBuilder< return this; } +/** + * Adds a global {@link StateStore} to the topology. + * The {@link StateStore} sources its data from all partitions of the provided input topic. + * There will be exactly one instance of this {@link StateStore} per Kafka Streams instance. + * + * A {@link SourceNode} with the provided sourceName will be added to consume the data arriving from the partitions + * of the input topic. + * + * The provided {@link ProcessorSupplier} will be used to create an + * {@link Processor} that will receive all records forwarded from the {@link SourceNode}. + * The supplier should always generate a new instance. Creating a single {@link Processor} object + * and returning the same object reference in {@link ProcessorSupplier#get()} is a + * violation of the supplier pattern and leads to runtime exceptions. + * This {@link Processor} should be used to keep the {@link StateStore} up-to-date. + * The default {@link TimestampExtractor} as specified in the {@link StreamsConfig config} is used. + * + * It is not required to connect a global store to the {@link Processor Processors}, + * {@link Transformer Transformers}, or {@link ValueTransformer ValueTransformer}; those have read-only access to all global stores by default. + * + * @param storeBuilder user defined {@link StoreBuilder}; can't be {@code null} + * @param topic t
Re: [PR] KAFKA-15853: Move KafkaConfig Replication properties and docs out of … [kafka]
chia7712 commented on code in PR #15575: URL: https://github.com/apache/kafka/pull/15575#discussion_r1566098229 ## core/src/main/scala/kafka/server/KafkaConfig.scala: ## @@ -1458,40 +1395,40 @@ class KafkaConfig private(doLog: Boolean, val props: java.util.Map[_, _], dynami def logMessageDownConversionEnable: Boolean = getBoolean(KafkaConfig.LogMessageDownConversionEnableProp) /** * Replication configuration ***/ - val controllerSocketTimeoutMs: Int = getInt(KafkaConfig.ControllerSocketTimeoutMsProp) - val defaultReplicationFactor: Int = getInt(KafkaConfig.DefaultReplicationFactorProp) - val replicaLagTimeMaxMs = getLong(KafkaConfig.ReplicaLagTimeMaxMsProp) - val replicaSocketTimeoutMs = getInt(KafkaConfig.ReplicaSocketTimeoutMsProp) - val replicaSocketReceiveBufferBytes = getInt(KafkaConfig.ReplicaSocketReceiveBufferBytesProp) - val replicaFetchMaxBytes = getInt(KafkaConfig.ReplicaFetchMaxBytesProp) - val replicaFetchWaitMaxMs = getInt(KafkaConfig.ReplicaFetchWaitMaxMsProp) - val replicaFetchMinBytes = getInt(KafkaConfig.ReplicaFetchMinBytesProp) - val replicaFetchResponseMaxBytes = getInt(KafkaConfig.ReplicaFetchResponseMaxBytesProp) - val replicaFetchBackoffMs = getInt(KafkaConfig.ReplicaFetchBackoffMsProp) - def numReplicaFetchers = getInt(KafkaConfig.NumReplicaFetchersProp) - val replicaHighWatermarkCheckpointIntervalMs = getLong(KafkaConfig.ReplicaHighWatermarkCheckpointIntervalMsProp) - val fetchPurgatoryPurgeIntervalRequests = getInt(KafkaConfig.FetchPurgatoryPurgeIntervalRequestsProp) - val producerPurgatoryPurgeIntervalRequests = getInt(KafkaConfig.ProducerPurgatoryPurgeIntervalRequestsProp) - val deleteRecordsPurgatoryPurgeIntervalRequests = getInt(KafkaConfig.DeleteRecordsPurgatoryPurgeIntervalRequestsProp) - val autoLeaderRebalanceEnable = getBoolean(KafkaConfig.AutoLeaderRebalanceEnableProp) - val leaderImbalancePerBrokerPercentage = getInt(KafkaConfig.LeaderImbalancePerBrokerPercentageProp) - val leaderImbalanceCheckIntervalSeconds: Long = getLong(KafkaConfig.LeaderImbalanceCheckIntervalSecondsProp) - def uncleanLeaderElectionEnable: java.lang.Boolean = getBoolean(KafkaConfig.UncleanLeaderElectionEnableProp) + val controllerSocketTimeoutMs: Int = getInt(ReplicationConfigs.CONTROLLER_SOCKET_TIMEOUT_MS_CONFIG) + val defaultReplicationFactor: Int = getInt(ReplicationConfigs.DEFAULT_REPLICATION_FACTOR_CONFIG) + val replicaLagTimeMaxMs = getLong(ReplicationConfigs.REPLICA_LAG_TIME_MAX_MS_CONFIG) + val replicaSocketTimeoutMs = getInt(ReplicationConfigs.REPLICA_SOCKET_TIMEOUT_MS_CONFIG) + val replicaSocketReceiveBufferBytes = getInt(ReplicationConfigs.REPLICA_SOCKET_RECEIVE_BUFFER_BYTES_CONFIG) + val replicaFetchMaxBytes = getInt(ReplicationConfigs.REPLICA_FETCH_MAX_BYTES_CONFIG) + val replicaFetchWaitMaxMs = getInt(ReplicationConfigs.REPLICA_FETCH_WAIT_MAX_MS_CONFIG) + val replicaFetchMinBytes = getInt(ReplicationConfigs.REPLICA_FETCH_MIN_BYTES_CONFIG) + val replicaFetchResponseMaxBytes = getInt(ReplicationConfigs.REPLICA_FETCH_RESPONSE_MAX_BYTES_CONFIG) + val replicaFetchBackoffMs = getInt(ReplicationConfigs.REPLICA_FETCH_BACKOFF_MS_CONFIG) + def numReplicaFetchers = getInt(ReplicationConfigs.NUM_REPLICA_FETCHERS_CONFIG) + val replicaHighWatermarkCheckpointIntervalMs = getLong(ReplicationConfigs.REPLICA_HIGH_WATERMARK_CHECKPOINT_INTERVAL_MS_CONFIG) + val fetchPurgatoryPurgeIntervalRequests = getInt(ReplicationConfigs.FETCH_PURGATORY_PURGE_INTERVAL_REQUESTS_CONFIG) + val producerPurgatoryPurgeIntervalRequests = getInt(ReplicationConfigs.PRODUCER_PURGATORY_PURGE_INTERVAL_REQUESTS_CONFIG) + val deleteRecordsPurgatoryPurgeIntervalRequests = getInt(ReplicationConfigs.DELETE_RECORDS_PURGATORY_PURGE_INTERVAL_REQUESTS_CONFIG) + val autoLeaderRebalanceEnable = getBoolean(ReplicationConfigs.AUTO_LEADER_REBALANCE_ENABLE_CONFIG) + val leaderImbalancePerBrokerPercentage = getInt(ReplicationConfigs.LEADER_IMBALANCE_PER_BROKER_PERCENTAGE_CONFIG) + val leaderImbalanceCheckIntervalSeconds: Long = getLong(ReplicationConfigs.LEADER_IMBALANCE_CHECK_INTERVAL_SECONDS_CONFIG) + def uncleanLeaderElectionEnable: Boolean = getBoolean(ReplicationConfigs.UNCLEAN_LEADER_ELECTION_ENABLE_CONFIG) Review Comment: we have to either remove the type declaration or keep `java.lang.Boolean`, otherwise, the build can't get pass. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[PR] KAFKA-16552: Create an internal config to control InitialTaskDelayMs in LogManager to speed up tests [kafka]
brandboat opened a new pull request, #15719: URL: https://github.com/apache/kafka/pull/15719 related to KAFKA-16552, Introduce a new internal config `log.initial.task.delay.ms` to control InitialTaskDelayMs in LogManager to speed up tests ### Committer Checklist (excluded from commit message) - [ ] Verify design and implementation - [ ] Verify test coverage and CI build status - [ ] Verify documentation (including upgrade notes) -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Commented] (KAFKA-16474) AsyncKafkaConsumer might send out heartbeat request without waiting for its response
[ https://issues.apache.org/jira/browse/KAFKA-16474?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17837342#comment-17837342 ] Philip Nee commented on KAFKA-16474: The log was attached: See line 7481 and 7492 in : {code:java} AssignmentValidationTest/test_valid_assignment/metadata_quorum=ISOLATED_KRAFT.use_new_coordinator=True.group_protocol=consumer.group_remote_assignor=uniform/2/VerifiableConsumer-0-281473320420544/ducker11/verifiable_consumer.log {code} {code:java} 7480 [2024-04-15 16:03:35,964] TRACE [Consumer clientId=consumer-test_group_id-1, groupId=test_group_id] Found least loaded node ducker10:9092 (id: 2 rack: null) connected with no in-flight requests (org.apache.kafka.clients.NetworkClient) 7481 [2024-04-15 16:03:35,964] DEBUG [Consumer clientId=consumer-test_group_id-1, groupId=test_group_id] Sending CONSUMER_GROUP_HEARTBEAT request with header RequestHeader(apiKey=CONSUMER_GROUP_HEARTBEAT, apiVersion=0, clientId=consumer-test_gro up_id-1, correlationId=108, headerVersion=2) and timeout 3 to node 2147483646: ConsumerGroupHeartbeatRequestData(groupId='test_group_id', memberId='', memberEpoch=0, instanceId=null, rackId=null, rebalanceTimeoutMs=30, subscribedTop icNames=[test_topic], serverAssignor='uniform', topicPartitions=[]) (org.apache.kafka.clients.NetworkClient) 7482 [2024-04-15 16:03:35,964] TRACE For telemetry state SUBSCRIPTION_NEEDED, returning the value 299843 ms; (org.apache.kafka.common.telemetry.internals.ClientTelemetryReporter) 7483 [2024-04-15 16:03:35,964] TRACE [Consumer clientId=consumer-test_group_id-1, groupId=test_group_id] Starting processing of 1 event (org.apache.kafka.clients.consumer.internals.events.EventProcessor) 7484 [2024-04-15 16:03:35,964] TRACE [Consumer clientId=consumer-test_group_id-1, groupId=test_group_id] Processing event: ValidatePositionsEvent{type=VALIDATE_POSITIONS, id=nMQf3YgtRU6vaJu-oVpiQg, future=java.util.concurrent.CompletableFuture@5 7bc27f5[Not completed, 1 dependents], deadlineMs=9223372036854775807} (org.apache.kafka.clients.consumer.internals.events.EventProcessor) 7485 [2024-04-15 16:03:35,964] TRACE [Consumer clientId=consumer-test_group_id-1, groupId=test_group_id] Completed processing (org.apache.kafka.clients.consumer.internals.events.EventProcessor) 7486 [2024-04-15 16:03:35,964] TRACE [Consumer clientId=consumer-test_group_id-1, groupId=test_group_id] 999 ms remain before another request should be sent for RequestState{owner='org.apache.kafka.clients.consumer.internals.HeartbeatRequestMana ger$HeartbeatRequestState', exponentialBackoff=ExponentialBackoff{multiplier=2, expMax=3.3219280948873626, initialInterval=100, jitter=30.0}, lastSentMs=1713197015963, lastReceivedMs=1713197015963, numAttempts=1, backoffMs=1000} (org.ap ache.kafka.clients.consumer.internals.RequestState) 7487 [2024-04-15 16:03:35,964] TRACE [Consumer clientId=consumer-test_group_id-1, groupId=test_group_id] Polling for fetches with timeout 0 (org.apache.kafka.clients.consumer.internals.AsyncKafkaConsumer) 7488 [2024-04-15 16:03:35,964] TRACE [Consumer clientId=consumer-test_group_id-1, groupId=test_group_id] Found least loaded node ducker10:9092 (id: 2 rack: null) connected with no in-flight requests (org.apache.kafka.clients.NetworkClient) 7489 [2024-04-15 16:03:35,964] TRACE [Consumer clientId=consumer-test_group_id-1, groupId=test_group_id] Enqueued event: PollEvent{type=POLL, id=T9IwUDeHQoGrnqJJIyzb8Q, pollTimeMs=1713197015964} (org.apache.kafka.clients.consumer.internals.event s.ApplicationEventHandler) 7490 [2024-04-15 16:03:35,964] TRACE [Consumer clientId=consumer-test_group_id-1, groupId=test_group_id] No events to process (org.apache.kafka.clients.consumer.internals.events.EventProcessor) 7491 [2024-04-15 16:03:35,964] TRACE [Consumer clientId=consumer-test_group_id-1, groupId=test_group_id] Enqueued event: ValidatePositionsEvent{type=VALIDATE_POSITIONS, id=otKgqakkS9COd01SkJ3btQ, future=java.util.concurrent.CompletableFuture@5fb 759d6[Not completed], deadlineMs=9223372036854775807} (org.apache.kafka.clients.consumer.internals.events.ApplicationEventHandler) 7492 [2024-04-15 16:03:35,964] DEBUG [Consumer clientId=consumer-test_group_id-1, groupId=test_group_id] Sending CONSUMER_GROUP_HEARTBEAT request with header RequestHeader(apiKey=CONSUMER_GROUP_HEARTBEAT, apiVersion=0, clientId=consumer-test_gro up_id-1, correlationId=109, headerVersion=2) and timeout 3 to node 2147483646: ConsumerGroupHeartbeatRequestData(groupId='test_group_id', memberId='', memberEpoch=0, instanceId=null, rackId=null, rebalanceTimeoutMs=30, subscribedTop icNames=[test_topic], serverAssignor='uniform', topicPartitions=[]) (org.apache.kafka.clients.NetworkClient) {code} > AsyncKafkaConsumer might send out heartbeat request without waiting for its > response > -
[jira] [Updated] (KAFKA-16474) AsyncKafkaConsumer might send out heartbeat request without waiting for its response
[ https://issues.apache.org/jira/browse/KAFKA-16474?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Philip Nee updated KAFKA-16474: --- Attachment: failing_results.zip > AsyncKafkaConsumer might send out heartbeat request without waiting for its > response > > > Key: KAFKA-16474 > URL: https://issues.apache.org/jira/browse/KAFKA-16474 > Project: Kafka > Issue Type: Bug > Components: clients, consumer >Reporter: Philip Nee >Assignee: Lianet Magrans >Priority: Critical > Labels: kip-848-client-support > Fix For: 3.8.0 > > Attachments: failing_results.zip > > > KAFKA-16389 > We've discovered that in some uncommon cases, the consumer could send out > successive heartbeats without waiting for the response to come back. this > might result in causing the consumer to revoke its just assigned assignments > in some cases. For example: > > The consumer first sends out a heartbeat with epoch=0 and memberId='' > The consumer then rapidly sends out another heartbeat with epoch=0 and > memberId='' because it has not gotten any response and thus not updating its > local state > > The consumer receives assignments from the first heartbeat and reconciles its > assignment. > > Since the second heartbeat has epoch=0 and memberId='', the server will think > this is a new member joining and therefore send out an empty assignment. > > The consumer receives the response from the second heartbeat. Revoke all of > its partitions. > > There are 2 issues associate with this bug: > # inflight logic > # rapid poll: In the KAFKA-16389 we've observe consumer polling interval to > be a few ms. -- This message was sent by Atlassian Jira (v8.20.10#820010)
Re: [PR] KAFKA-16436: Online upgrade triggering and group type conversion [kafka]
dajac commented on code in PR #15662: URL: https://github.com/apache/kafka/pull/15662#discussion_r1566001316 ## group-coordinator/src/test/java/org/apache/kafka/coordinator/group/consumer/ConsumerGroupTest.java: ## @@ -1046,4 +1051,101 @@ public void testIsInStatesCaseInsensitive() { assertTrue(group.isInStates(Collections.singleton("stable"), 1)); assertFalse(group.isInStates(Collections.singleton("empty"), 1)); } + +@Test +public void testClassicMembersSupportedProtocols() { +ConsumerGroup consumerGroup = createConsumerGroup("foo"); +List rangeProtocol = new ArrayList<>(); +rangeProtocol.add(new ConsumerGroupMemberMetadataValue.ClassicProtocol() +.setName("range") +.setMetadata(new byte[0])); + +List roundRobinAndRangeProtocols = new ArrayList<>(); +roundRobinAndRangeProtocols.add(new ConsumerGroupMemberMetadataValue.ClassicProtocol() +.setName("roundrobin") +.setMetadata(new byte[0])); +roundRobinAndRangeProtocols.add(new ConsumerGroupMemberMetadataValue.ClassicProtocol() +.setName("range") +.setMetadata(new byte[0])); + +ConsumerGroupMember member1 = new ConsumerGroupMember.Builder("member-1") +.setClassicMemberMetadata(new ConsumerGroupMemberMetadataValue.ClassicMemberMetadata() +.setSupportedProtocols(rangeProtocol)) +.build(); +consumerGroup.updateMember(member1); + +ConsumerGroupMember member2 = new ConsumerGroupMember.Builder("member-2") +.setClassicMemberMetadata(new ConsumerGroupMemberMetadataValue.ClassicMemberMetadata() +.setSupportedProtocols(roundRobinAndRangeProtocols)) +.build(); +consumerGroup.updateMember(member2); + +assertEquals(2, consumerGroup.classicMembersSupportedProtocols().get("range")); +assertEquals(1, consumerGroup.classicMembersSupportedProtocols().get("roundrobin")); + assertTrue(consumerGroup.supportsClassicProtocols(ConsumerProtocol.PROTOCOL_TYPE, new HashSet<>(Arrays.asList("range", "sticky"; + assertFalse(consumerGroup.supportsClassicProtocols(ConsumerProtocol.PROTOCOL_TYPE, new HashSet<>(Arrays.asList("sticky", "roundrobin"; + +member2 = new ConsumerGroupMember.Builder(member2) +.setClassicMemberMetadata(new ConsumerGroupMemberMetadataValue.ClassicMemberMetadata() +.setSupportedProtocols(rangeProtocol)) +.build(); +consumerGroup.updateMember(member2); + +assertEquals(2, consumerGroup.classicMembersSupportedProtocols().get("range")); + assertFalse(consumerGroup.classicMembersSupportedProtocols().containsKey("roundrobin")); + +member1 = new ConsumerGroupMember.Builder(member1) +.setClassicMemberMetadata(new ConsumerGroupMemberMetadataValue.ClassicMemberMetadata() +.setSupportedProtocols(roundRobinAndRangeProtocols)) +.build(); +consumerGroup.updateMember(member1); +member2 = new ConsumerGroupMember.Builder(member2) +.setClassicMemberMetadata(new ConsumerGroupMemberMetadataValue.ClassicMemberMetadata() +.setSupportedProtocols(roundRobinAndRangeProtocols)) +.build(); +consumerGroup.updateMember(member2); + +assertEquals(2, consumerGroup.classicMembersSupportedProtocols().get("range")); +assertEquals(2, consumerGroup.classicMembersSupportedProtocols().get("roundrobin")); + assertTrue(consumerGroup.supportsClassicProtocols(ConsumerProtocol.PROTOCOL_TYPE, new HashSet<>(Arrays.asList("sticky", "roundrobin"; +} + +@Test +public void testAllUseClassicProtocol() { Review Comment: nit: testAllMembersUseClassicProtocol -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] MINOR: Various cleanups in tools [kafka]
mimaison merged PR #15709: URL: https://github.com/apache/kafka/pull/15709 -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] MINOR: Various cleanups in tools [kafka]
mimaison commented on PR #15709: URL: https://github.com/apache/kafka/pull/15709#issuecomment-2057128107 None of the failures seem related, merging to trunk. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-16417:When initializeResources throws an exception in TopicBasedRemoteLogMetadataManager.scala, initializationFailed needs to be set to true [kafka]
johnnychhsu commented on code in PR #15595: URL: https://github.com/apache/kafka/pull/15595#discussion_r1565975540 ## storage/src/main/java/org/apache/kafka/server/log/remote/metadata/storage/TopicBasedRemoteLogMetadataManager.java: ## @@ -436,6 +436,7 @@ private void initializeResources() { log.info("Initialized topic-based RLMM resources successfully"); } catch (Exception e) { log.error("Encountered error while initializing producer/consumer", e); +initializationFailed = true; Review Comment: when the topic creation failed, currently it sets `initializationFailed` to true, and if it failed when init producer, consumer, it's not set now. Do we need to set `initializationFailed` when the producer/consumer init fail? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-15649: Handle directory failure timeout [kafka]
mimaison commented on code in PR #15697: URL: https://github.com/apache/kafka/pull/15697#discussion_r1565973156 ## core/src/main/scala/kafka/server/KafkaConfig.scala: ## @@ -870,6 +875,7 @@ object KafkaConfig { .define(CreateTopicPolicyClassNameProp, CLASS, null, LOW, CreateTopicPolicyClassNameDoc) .define(AlterConfigPolicyClassNameProp, CLASS, null, LOW, AlterConfigPolicyClassNameDoc) .define(LogMessageDownConversionEnableProp, BOOLEAN, LogConfig.DEFAULT_MESSAGE_DOWNCONVERSION_ENABLE, LOW, LogMessageDownConversionEnableDoc) + .define(LogDirFailureTimeoutMsProp, LONG, Defaults.LOG_DIR_FAILURE_TIMEOUT_MS, atLeast(0), MEDIUM, LogDirFailureTimeoutMsDoc) Review Comment: In the KIP the accepted value range is defined as >= 1. I wonder if values below 1s actually make much sense. Also the importance was defined as low. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] MINOR: fix typo [kafka]
johnnychhsu commented on PR #15643: URL: https://github.com/apache/kafka/pull/15643#issuecomment-2057106113 thanks for the PR. this looks good to me -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-15615: Improve handling of fetching during metadata updates. [kafka]
johnnychhsu commented on PR #15647: URL: https://github.com/apache/kafka/pull/15647#issuecomment-2057103180 thanks for the PR! just curious, what is the problem without the newly added state? does it cause any issues? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-16424: remove truncated logs after alter dir [kafka]
johnnychhsu commented on code in PR #15616: URL: https://github.com/apache/kafka/pull/15616#discussion_r1565898495 ## storage/src/main/java/org/apache/kafka/storage/internals/log/LogSegment.java: ## @@ -800,8 +802,23 @@ private Void deleteTypeIfExists(StorageAction delete, Stri try { if (delete.execute()) LOGGER.info("Deleted {} {}.", fileType, file.getAbsolutePath()); -else if (logIfMissing) -LOGGER.info("Failed to delete {} {} because it does not exist.", fileType, file.getAbsolutePath()); +else { +if (logIfMissing) { Review Comment: why can't we just add the fall back login inside `if (logIfMissing)`, instead adding a new if-else block inside this else block? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] MINOR: Various cleanups in server and server-common [kafka]
mimaison commented on code in PR #15710: URL: https://github.com/apache/kafka/pull/15710#discussion_r1565897312 ## server/src/main/java/org/apache/kafka/server/metrics/ClientMetricsInstance.java: ## @@ -116,8 +116,7 @@ public synchronized boolean maybeUpdatePushRequestTimestamp(long currentTime) { */ boolean canAccept = lastGetRequestTimestamp > lastPushRequestTimestamp; Review Comment: It's shorter but I'm not sure it's more readable. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-16373: Adding code to support Apache Kafka Docker Official Images [kafka]
soarez commented on code in PR #15704: URL: https://github.com/apache/kafka/pull/15704#discussion_r1565856796 ## docker/generate_kafka_pr_template.sh: ## @@ -0,0 +1,76 @@ +#!/usr/bin/env bash + +# Licensed to the Apache Software Foundation (ASF) under one or more +# contributor license agreements. See the NOTICE file distributed with +# this work for additional information regarding copyright ownership. +# The ASF licenses this file to You under the Apache License, Version 2.0 +# (the "License"); you may not use this file except in compliance with +# the License. You may obtain a copy of the License at +# +#http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +# Ensure script exits on error or unset variable +set -eu + +# Define the 'self' variable with the script's basename +self="$(basename "$BASH_SOURCE")" + +# Navigate to the script's directory and then to docker_official_images +cd "$(dirname "$(readlink -f "$BASH_SOURCE")")/docker_official_images" + +# Source common utilities +source ../common.sh + +# Initialize an empty variable for the highest version +highest_version="" + +# Output header information +cat <<-EOH +# This file is generated via https://github.com/apache/kafka/blob/$(fileCommit "../$self")/docker/generate_kafka_pr_template.sh + +Maintainers: The Apache Kafka Project (@ApacheKafka) +GitRepo: https://github.com/apache/kafka.git +EOH + +# Find all versions, excluding -rc, sort them, and determine the globally highest version +versions=$(find . -mindepth 1 -maxdepth 1 -type d ! -name "*-rc" | sort -V) +for dir in $versions; do +version=$(basename "$dir") +highest_version="$version" # Continuously update to ensure the last is the highest +done Review Comment: ```suggestion highest_version=$(find . -mindepth 1 -maxdepth 1 -type d ! -name "*-rc" | sort -Vr | xargs basename | head -n 1) ``` ## docker/docker_official_image_build_test.py: ## @@ -0,0 +1,128 @@ +#!/usr/bin/env python + +# Licensed to the Apache Software Foundation (ASF) under one or more +# contributor license agreements. See the NOTICE file distributed with +# this work for additional information regarding copyright ownership. +# The ASF licenses this file to You under the Apache License, Version 2.0 +# (the "License"); you may not use this file except in compliance with +# the License. You may obtain a copy of the License at +# +#http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +""" +Python script to build and test a docker image +This script is used to generate a test report + +Usage: +docker_build_test.py --help +Get detailed description of each option + +Example command:- +docker_build_test.py --image-tag --image-type --kafka-url + +This command will build an image with as image name, as image_tag (it will be latest by default), + as image type (jvm by default), for the kafka inside the image and run tests on the image. +-b can be passed as additional argument if you just want to build the image. +-t can be passed if you just want to run tests on the image. +""" + +from datetime import date +import argparse +from distutils.dir_util import copy_tree +import shutil +from test.docker_sanity_test import run_tests +from common import execute, jvm_image +import tempfile +import os + + +def set_executable_permissions(directory): +""" +Sets executable permissions for all files in the specified directory and its subdirectories. +""" +for root, _, files in os.walk(directory): +for file in files: +path = os.path.join(root, file) +os.chmod(path, os.stat(path).st_mode | 0o111) + + +def build_jvm(image, tag, kafka_version): +image = f'{image}:{tag}' +current_dir = os.path.dirname(os.path.realpath(__file__)) +temp_dir_path = tempfile.mkdtemp() +directories = [ +f'{current_dir}/docker_official_images/{kafka_version}/jvm', +f'{current_dir}/docker_official_images/{kafka_version}/jvm/resources' +] +for directory in directories: +set_executable_permissions(directory) +copy_tree(f"{current_dir}/docker_official_images/{kafka_version}/jvm", + f"{temp_dir_path}/jvm") + copy_tree(f"{current_dir}/docker_official_images/{kafka_version}/jvm/resources", + f"{temp
Re: [PR] MINOR: fix duplicated return and other streams docs typos [kafka]
johnnychhsu commented on code in PR #15713: URL: https://github.com/apache/kafka/pull/15713#discussion_r1565877195 ## streams/src/main/java/org/apache/kafka/streams/kstream/NamedOperation.java: ## @@ -17,7 +17,7 @@ package org.apache.kafka.streams.kstream; /** - * Default interface which can be used to personalized the named of operations, internal topics or store. + * Default interface which can be used to customize the named of operations, internal topics or store. Review Comment: shall we also change the description to 'the name of operations'? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-15018: Failing offset flush for EOS when secondary offset store writes fails for tombstone records [kafka]
C0urante commented on PR #13801: URL: https://github.com/apache/kafka/pull/13801#issuecomment-2056973237 Thanks Sagar, great catch! I suspected this would be a gnarly one to tackle but it's turning out to be even harder than I thought. I think there's still an issue with the current state of the PR. It looks like we aren't blocking on the future returned by `setPrimaryThenSecondary`, which means that we may spuriously return early from `get` in the future we're returning from `ConnectorOffsetBackingStore::set` if the write to the primary store hasn't completed yet. I believe this is missed by tests because the producer writes we mock out all take place synchronously; maybe we can use the `MockProducer` more idiomatically to simulate records being ack'd after calls to `MockProducer::send` have returned? I've sketched a new kind of `Future` implementation that seems to do the trick, though I haven't tested it rigorously: ```java private class ChainedOffsetWriteFuture implements Future { private final OffsetBackingStore primaryStore; private final OffsetBackingStore secondaryStore; private final Map completeOffsets; private final Map regularOffsets; private final Callback callback; private final AtomicReference writeError; private final CountDownLatch completed; public ChainedOffsetWriteFuture( OffsetBackingStore primaryStore, OffsetBackingStore secondaryStore, Map completeOffsets, Map regularOffsets, Map tombstoneOffsets, Callback callback ) { this.primaryStore = primaryStore; this.secondaryStore = secondaryStore; this.completeOffsets = completeOffsets; this.regularOffsets = regularOffsets; this.callback = callback; this.writeError = new AtomicReference<>(); this.completed = new CountDownLatch(1); secondaryStore.set(tombstoneOffsets, this::onFirstWrite); } private void onFirstWrite(Throwable error, Void ignored) { if (error != null) { log.trace("Skipping offsets write to primary store because secondary tombstone write has failed", error); try (LoggingContext context = loggingContext()) { callback.onCompletion(error, ignored); writeError.compareAndSet(null, error); completed.countDown(); } return; } setPrimaryThenSecondary(primaryStore, secondaryStore, completeOffsets, regularOffsets, this::onSecondWrite); } private void onSecondWrite(Throwable error, Void ignored) { callback.onCompletion(error, ignored); writeError.compareAndSet(null, error); completed.countDown(); } @Override public boolean cancel(boolean mayInterruptIfRunning) { return false; } @Override public boolean isCancelled() { return false; } @Override public boolean isDone() { return completed.getCount() == 0; } @Override public Void get() throws InterruptedException, ExecutionException { completed.await(); if (writeError.get() != null) { throw new ExecutionException(writeError.get()); } return null; } @Override public Void get(long timeout, TimeUnit unit) throws InterruptedException, ExecutionException, TimeoutException { if (!completed.await(timeout, unit)) { throw new TimeoutException("Failed to complete offset write in time"); } if (writeError.get() != null) { throw new ExecutionException(writeError.get()); } return null; } } ``` (I've omitted an implementation of `cancel` and `isCancelled` for now since I'm not sure it's really necessary, but LMK if I've missed a case where this would make a difference.) The new class can be used at the end of `ConnectorOffsetBackingStore::set` like this: ```java if (secondaryStore != null && !tombstoneOffsets.isEmpty()) { return new ChainedOffsetWriteFuture( primaryStore, secondaryStore, values, regularOffsets, tombstoneOffsets, callback ); } else { return setPrimaryThenSecondary(primaryStore, secondaryStore, values, regularOffsets, callback); } ``` -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra
Re: [PR] MINOR: fix duplicated return and other streams docs typos [kafka]
johnnychhsu commented on PR #15713: URL: https://github.com/apache/kafka/pull/15713#issuecomment-2056975870 thanks for the pr! this looks good to me, leave a simple comment -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-16436: Online upgrade triggering and group type conversion [kafka]
dajac commented on code in PR #15662: URL: https://github.com/apache/kafka/pull/15662#discussion_r1565866968 ## group-coordinator/src/main/resources/common/message/ConsumerGroupMemberMetadataValue.json: ## @@ -35,6 +35,20 @@ { "name": "RebalanceTimeoutMs", "type": "int32", "versions": "0+", "default": -1, "about": "The rebalance timeout" }, { "name": "ServerAssignor", "versions": "0+", "nullableVersions": "0+", "type": "string", - "about": "The server assignor to use; or null if not used." } + "about": "The server assignor to use; or null if not used." }, +{ "name": "ClassicMemberMetadata", "versions": "0+", "nullableVersions": "0+", "type": "ClassicMemberMetadata", Review Comment: Let's make it a tagged field in order to not break the backward compatibility of the record. You can do it by adding `"taggedVersions": "0+", "tag": 0`. ## group-coordinator/src/main/java/org/apache/kafka/coordinator/group/consumer/ConsumerGroup.java: ## @@ -998,4 +1090,177 @@ public ConsumerGroupDescribeResponseData.DescribedGroup asDescribedGroup( ); return describedGroup; } + +/** + * Create a new consumer group according to the given classic group. + * + * @param snapshotRegistry The SnapshotRegistry. + * @param metrics The GroupCoordinatorMetricsShard. + * @param classicGroup The converted classic group. + * @param topicsImage The TopicsImage for topic id and topic name conversion. + * @param log The logger to use. + * @return The created ConsumerGruop. + */ +public static ConsumerGroup fromClassicGroup( +SnapshotRegistry snapshotRegistry, +GroupCoordinatorMetricsShard metrics, +ClassicGroup classicGroup, +TopicsImage topicsImage, +Logger log +) { +String groupId = classicGroup.groupId(); +ConsumerGroup consumerGroup = new ConsumerGroup(snapshotRegistry, groupId, metrics); +consumerGroup.setGroupEpoch(classicGroup.generationId()); +consumerGroup.setTargetAssignmentEpoch(classicGroup.generationId()); + +classicGroup.allMembers().forEach(classicGroupMember -> { +ConsumerPartitionAssignor.Subscription subscription = deserializeSubscription( +classicGroupMember.metadata(classicGroup.protocolName().get()), +log, +"group upgrade" +); +Map> partitions = topicPartitionMapFromList(subscription.ownedPartitions(), topicsImage); + +ConsumerGroupMember newMember = new ConsumerGroupMember.Builder(classicGroupMember.memberId()) +.setMemberEpoch(classicGroup.generationId()) +.setState(MemberState.STABLE) +.setPreviousMemberEpoch(classicGroup.generationId()) + .setInstanceId(classicGroupMember.groupInstanceId().orElse(null)) +.setRackId(subscription.rackId().orElse(null)) +.setRebalanceTimeoutMs(classicGroupMember.rebalanceTimeoutMs()) +.setClientId(classicGroupMember.clientId()) +.setClientHost(classicGroupMember.clientHost()) +.setSubscribedTopicNames(subscription.topics()) +.setAssignedPartitions(partitions) + .setSupportedClassicProtocols(classicGroupMember.supportedProtocols()) +.build(); +consumerGroup.updateMember(newMember); +consumerGroup.updateTargetAssignment(newMember.memberId(), new Assignment(partitions)); +}); + +return consumerGroup; +} + +/** + * Populate the record list with the records needed to create the given consumer group. + * + * @param consumerGroup The consumer group to create. + * @param records The list to which the new records are added. + */ +public static void createConsumerGroupRecords( +ConsumerGroup consumerGroup, +List records +) { +String groupId = consumerGroup.groupId; + +consumerGroup.members().forEach((__, consumerGroupMember) -> +records.add(RecordHelpers.newMemberSubscriptionRecord(groupId, consumerGroupMember)) +); + +records.add(RecordHelpers.newGroupEpochRecord(groupId, consumerGroup.groupEpoch())); + +consumerGroup.members().forEach((consumerGroupMemberId, consumerGroupMember) -> +records.add(RecordHelpers.newTargetAssignmentRecord(groupId, consumerGroupMemberId, consumerGroup.targetAssignment(consumerGroupMemberId).partitions())) +); + +records.add(RecordHelpers.newTargetAssignmentEpochRecord(groupId, consumerGroup.groupEpoch())); + +consumerGroup.members().forEach((__, consumerGroupMember) -> +records.add(RecordHelpers.newCurrentAssignmentRecord(groupId, consumerGroupMember)) +); +} + +/** + * Converts the list
Re: [PR] MINOR: Various cleanups in server and server-common [kafka]
mimaison commented on code in PR #15710: URL: https://github.com/apache/kafka/pull/15710#discussion_r1565856071 ## server/src/main/java/org/apache/kafka/server/AssignmentsManager.java: ## @@ -321,7 +321,7 @@ public void run() throws Exception { AssignReplicasToDirsResponseData data = ((AssignReplicasToDirsResponse) response.responseBody()).data(); Set failed = filterFailures(data, inflight); -Set completed = Utils.diff(HashSet::new, inflight.values().stream().collect(Collectors.toSet()), failed); +Set completed = Utils.diff(HashSet::new, new HashSet<>(inflight.values()), failed); Review Comment: I'd prefer only sticking to simple refactorings in this PR. We can do this optimization in another PR. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] MINOR: Add missing docs for migration metrics [kafka]
soarez commented on code in PR #15718: URL: https://github.com/apache/kafka/pull/15718#discussion_r1565822707 ## docs/ops.html: ## @@ -2126,7 +2142,7 @@ https://github.com/apache/kafka/blob/3617dda9a5415ee1597967f754985fb73a5350c6/core/src/main/scala/kafka/controller/KafkaController.scala#L74 ## docs/ops.html: ## @@ -2113,6 +2113,22 @@ https://github.com/apache/kafka/blob/3617dda9a5415ee1597967f754985fb73a5350c6/metadata/src/main/java/org/apache/kafka/controller/metrics/ControllerMetadataMetrics.java#L54-L55 ## docs/ops.html: ## @@ -2113,6 +2113,22 @@ https://github.com/apache/kafka/blob/3617dda9a5415ee1597967f754985fb73a5350c6/metadata/src/main/java/org/apache/kafka/metadata/migration/ZkMigrationState.java#L33-L72 ## docs/ops.html: ## @@ -2113,6 +2113,22 @@ https://github.com/apache/kafka/blob/3617dda9a5415ee1597967f754985fb73a5350c6/metadata/src/main/java/org/apache/kafka/controller/metrics/ControllerMetadataMetrics.java#L42-L43 -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-16528: Client HB timing fix [kafka]
lianetm commented on code in PR #15698: URL: https://github.com/apache/kafka/pull/15698#discussion_r1565783934 ## clients/src/main/java/org/apache/kafka/clients/consumer/internals/HeartbeatRequestManager.java: ## @@ -269,6 +269,9 @@ private NetworkClientDelegate.UnsentRequest makeHeartbeatRequest(final long curr heartbeatRequestState.onSendAttempt(currentTimeMs); membershipManager.onHeartbeatRequestSent(); metricsManager.recordHeartbeatSentMs(currentTimeMs); +// Reset timer when sending the request, to make sure that, if waiting for the interval, +// we don't include the response time (which may introduce delay) Review Comment: You're right, I don't think it's bringing anything not clear with the func name and action themselves. Removed. This is covered in the new test I added [here](https://github.com/apache/kafka/blob/fe483ff816b62133291f77f29b00e3bc706b581f/clients/src/test/java/org/apache/kafka/clients/consumer/internals/HeartbeatRequestManagerTest.java#L258). I just included now an assert message along the lines of this comment to make it clearer in the test. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[PR] MINOR: Add missing docs for migration metrics [kafka]
soarez opened a new pull request, #15718: URL: https://github.com/apache/kafka/pull/15718 (no comment) -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-16528: Client HB timing fix [kafka]
lianetm commented on PR #15698: URL: https://github.com/apache/kafka/pull/15698#issuecomment-2056883089 Hey @cadonna, thanks a lot for your feedback! All comments addressed. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-16528: Client HB timing fix [kafka]
lianetm commented on code in PR #15698: URL: https://github.com/apache/kafka/pull/15698#discussion_r1565815246 ## clients/src/main/java/org/apache/kafka/clients/consumer/internals/HeartbeatRequestManager.java: ## @@ -482,6 +484,14 @@ public long nextHeartbeatMs(final long currentTimeMs) { return heartbeatTimer.remainingMs(); } +public void onFailedAttempt(final long currentTimeMs) { +// Expire timer to allow sending HB after a failure. After a failure, a next HB may be +// needed with backoff (ex. errors that lead to retries, like coordinator load error), +// or immediately (ex. errors that lead to rejoining, like fencing errors). +heartbeatTimer.update(heartbeatTimer.currentTimeMs() + heartbeatTimer.remainingMs()); Review Comment: Done, good point. I changed it to reset to 0, that shows the intention of not having an interval to wait for, which is what we want on these failure scenarios. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-16528: Client HB timing fix [kafka]
lianetm commented on code in PR #15698: URL: https://github.com/apache/kafka/pull/15698#discussion_r1565783934 ## clients/src/main/java/org/apache/kafka/clients/consumer/internals/HeartbeatRequestManager.java: ## @@ -269,6 +269,9 @@ private NetworkClientDelegate.UnsentRequest makeHeartbeatRequest(final long curr heartbeatRequestState.onSendAttempt(currentTimeMs); membershipManager.onHeartbeatRequestSent(); metricsManager.recordHeartbeatSentMs(currentTimeMs); +// Reset timer when sending the request, to make sure that, if waiting for the interval, +// we don't include the response time (which may introduce delay) Review Comment: You're right, I don't think it's bringing anything not clear with the func name and action themselves. Removed. This is covered in the new test I added. I just included now an assert message along the lines of this comment to make it clearer in the test. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] MINOR: Various cleanups in clients [kafka]
mimaison commented on code in PR #15705: URL: https://github.com/apache/kafka/pull/15705#discussion_r1565764251 ## clients/src/main/java/org/apache/kafka/clients/admin/internals/CoordinatorStrategy.java: ## @@ -136,8 +136,8 @@ private CoordinatorKey requireSingletonAndType(Set keys) { } private void ensureSameType(Set keys) { -if (keys.size() < 1) { -throw new IllegalArgumentException("Unexpected size of key set: expected >= 1, but got " + keys.size()); +if (keys.isEmpty()) { +throw new IllegalArgumentException("Unexpected size of key set: expected >= 1, but got 0"); Review Comment: It's the same really. I just simplified the error message but be a fixed string. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] MINOR: Various cleanups in clients [kafka]
mimaison commented on code in PR #15705: URL: https://github.com/apache/kafka/pull/15705#discussion_r1565721043 ## clients/src/main/java/org/apache/kafka/common/utils/BufferSupplier.java: ## @@ -79,12 +79,8 @@ public ByteBuffer get(int size) { @Override public void release(ByteBuffer buffer) { buffer.clear(); -Deque bufferQueue = bufferMap.get(buffer.capacity()); -if (bufferQueue == null) { -// We currently keep a single buffer in flight, so optimise for that case -bufferQueue = new ArrayDeque<>(1); -bufferMap.put(buffer.capacity(), bufferQueue); -} +Deque bufferQueue = bufferMap.computeIfAbsent(buffer.capacity(), k -> new ArrayDeque<>(1)); +// We currently keep a single buffer in flight, so optimise for that case Review Comment: Good point! -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] MINOR: Various cleanups in clients [kafka]
mimaison commented on code in PR #15705: URL: https://github.com/apache/kafka/pull/15705#discussion_r1565719621 ## clients/src/main/java/org/apache/kafka/common/memory/GarbageCollectedMemoryPool.java: ## @@ -37,13 +37,13 @@ public class GarbageCollectedMemoryPool extends SimpleMemoryPool implements Auto //serves 2 purposes - 1st it maintains the ref objects reachable (which is a requirement for them //to ever be enqueued), 2nd keeps some (small) metadata for every buffer allocated private final Map buffersInFlight = new ConcurrentHashMap<>(); -private final GarbageCollectionListener gcListener = new GarbageCollectionListener(); private final Thread gcListenerThread; -private volatile boolean alive = true; +private volatile boolean alive; public GarbageCollectedMemoryPool(long sizeBytes, int maxSingleAllocationSize, boolean strict, Sensor oomPeriodSensor) { super(sizeBytes, maxSingleAllocationSize, strict, oomPeriodSensor); this.alive = true; Review Comment: Yes I agree -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] MINOR: Various cleanups in clients [kafka]
mimaison commented on code in PR #15705: URL: https://github.com/apache/kafka/pull/15705#discussion_r1565718889 ## clients/src/main/java/org/apache/kafka/common/network/SslChannelBuilder.java: ## @@ -60,7 +58,6 @@ public SslChannelBuilder(Mode mode, this.mode = mode; this.listenerName = listenerName; this.isInterBrokerListener = isInterBrokerListener; -this.log = logContext.logger(getClass()); Review Comment: Since this field is unused so I thought we should just delete it. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Created] (KAFKA-16553) log controller configs when startup
Chia-Ping Tsai created KAFKA-16553: -- Summary: log controller configs when startup Key: KAFKA-16553 URL: https://issues.apache.org/jira/browse/KAFKA-16553 Project: Kafka Issue Type: Improvement Reporter: Chia-Ping Tsai Assignee: Chia-Ping Tsai We can't observe the controller configs from the log file. We can copy the solution used by broker (https://github.com/apache/kafka/blob/trunk/core/src/main/scala/kafka/server/BrokerServer.scala#L492). Or this issue should be blocked by https://issues.apache.org/jira/browse/KAFKA-13105 to wait for more graceful solution. -- This message was sent by Atlassian Jira (v8.20.10#820010)
Re: [PR] KAFKA-16371: fix lingering pending commit when handling OFFSET_METADATA_TOO_LARGE [kafka]
dajac commented on code in PR #15536: URL: https://github.com/apache/kafka/pull/15536#discussion_r1565639650 ## core/src/test/scala/unit/kafka/coordinator/group/GroupMetadataManagerTest.scala: ## @@ -1661,6 +1661,65 @@ class GroupMetadataManagerTest { assertEquals(0, TestUtils.totalMetricValue(metrics, "offset-commit-count")) } + @Test + def testOffsetMetadataTooLargePartialFailure(): Unit = { +val memberId = "" +val topicIdPartition = new TopicIdPartition(Uuid.randomUuid(), 0, "foo") +val validTopicIdPartition = new TopicIdPartition(topicIdPartition.topicId, 1, "foo") +val offset = 37 +val requireStable = true; + +groupMetadataManager.addOwnedPartition(groupPartitionId) +val group = new GroupMetadata(groupId, Empty, time) +groupMetadataManager.addGroup(group) + +val offsetTopicPartition = new TopicPartition(Topic.GROUP_METADATA_TOPIC_NAME, groupMetadataManager.partitionFor(group.groupId)) +val offsets = immutable.Map( + topicIdPartition -> OffsetAndMetadata(offset, "s" * (offsetConfig.maxMetadataSize + 1) , time.milliseconds()), + validTopicIdPartition -> OffsetAndMetadata(offset, "", time.milliseconds())) + +expectAppendMessage(Errors.NONE) + +var commitErrors: Option[immutable.Map[TopicIdPartition, Errors]] = None +def callback(errors: immutable.Map[TopicIdPartition, Errors]): Unit = { + commitErrors = Some(errors) +} + +assertEquals(0, TestUtils.totalMetricValue(metrics, "offset-commit-count")) +groupMetadataManager.storeOffsets(group, memberId, offsetTopicPartition, offsets, callback, verificationGuard = None) +assertTrue(group.hasOffsets) + +assertFalse(commitErrors.isEmpty) +assertEquals( + Some(Errors.OFFSET_METADATA_TOO_LARGE), + commitErrors.get.get(topicIdPartition) +) +assertEquals( + Some(Errors.NONE), + commitErrors.get.get(validTopicIdPartition) +) Review Comment: nit: Would it be possible to use `assertEquals(expectedMap, commitErrors.get)`? We usually prefer this way because it ensures that the Map only contains what we expect. ## core/src/test/scala/unit/kafka/coordinator/group/GroupMetadataManagerTest.scala: ## @@ -1661,6 +1661,65 @@ class GroupMetadataManagerTest { assertEquals(0, TestUtils.totalMetricValue(metrics, "offset-commit-count")) } + @Test + def testOffsetMetadataTooLargePartialFailure(): Unit = { +val memberId = "" +val topicIdPartition = new TopicIdPartition(Uuid.randomUuid(), 0, "foo") +val validTopicIdPartition = new TopicIdPartition(topicIdPartition.topicId, 1, "foo") +val offset = 37 +val requireStable = true; + +groupMetadataManager.addOwnedPartition(groupPartitionId) +val group = new GroupMetadata(groupId, Empty, time) +groupMetadataManager.addGroup(group) + +val offsetTopicPartition = new TopicPartition(Topic.GROUP_METADATA_TOPIC_NAME, groupMetadataManager.partitionFor(group.groupId)) +val offsets = immutable.Map( + topicIdPartition -> OffsetAndMetadata(offset, "s" * (offsetConfig.maxMetadataSize + 1) , time.milliseconds()), + validTopicIdPartition -> OffsetAndMetadata(offset, "", time.milliseconds())) + +expectAppendMessage(Errors.NONE) + +var commitErrors: Option[immutable.Map[TopicIdPartition, Errors]] = None +def callback(errors: immutable.Map[TopicIdPartition, Errors]): Unit = { + commitErrors = Some(errors) +} + +assertEquals(0, TestUtils.totalMetricValue(metrics, "offset-commit-count")) +groupMetadataManager.storeOffsets(group, memberId, offsetTopicPartition, offsets, callback, verificationGuard = None) +assertTrue(group.hasOffsets) + +assertFalse(commitErrors.isEmpty) +assertEquals( + Some(Errors.OFFSET_METADATA_TOO_LARGE), + commitErrors.get.get(topicIdPartition) +) +assertEquals( + Some(Errors.NONE), + commitErrors.get.get(validTopicIdPartition) +) + +val cachedOffsets = groupMetadataManager.getOffsets( + groupId, + requireStable, + Some(Seq(topicIdPartition.topicPartition, validTopicIdPartition.topicPartition)) +) +assertEquals( + Some(OffsetFetchResponse.INVALID_OFFSET), + cachedOffsets.get(topicIdPartition.topicPartition).map(_.offset) +) +assertEquals( + Some(Errors.NONE), + cachedOffsets.get(topicIdPartition.topicPartition).map(_.error) +) +assertEquals( + Some(offset), + cachedOffsets.get(validTopicIdPartition.topicPartition).map(_.offset) +) Review Comment: nit: Same comment as the previous one. ## core/src/test/scala/unit/kafka/coordinator/group/GroupMetadataManagerTest.scala: ## @@ -1661,6 +1661,65 @@ class GroupMetadataManagerTest { assertEquals(0, TestUtils.totalMetricValue(metrics, "offset-commit-count")) } + @Test + def testOffsetMetadataTooLargePartialFailure(): Unit = { +val memberId = "" +val topicIdPar
Re: [PR] KAFKA-16082: Avoid resuming future replica if current replica is in the same directory [kafka]
gaurav-narula commented on code in PR #15136: URL: https://github.com/apache/kafka/pull/15136#discussion_r1565638692 ## core/src/main/scala/kafka/log/LogManager.scala: ## @@ -1176,6 +1176,42 @@ class LogManager(logDirs: Seq[File], } } + def recoverAbandonedFutureLogs(brokerId: Int, newTopicsImage: TopicsImage): Unit = { +val abandonedFutureLogs = findAbandonedFutureLogs(brokerId, newTopicsImage) +abandonedFutureLogs.foreach { case (futureLog, currentLog) => + val tp = futureLog.topicPartition + + futureLog.renameDir(UnifiedLog.logDirName(tp), shouldReinitialize = true) + futureLog.removeLogMetrics() + futureLogs.remove(tp) + + currentLog.foreach { log => +log.removeLogMetrics() +log.renameDir(UnifiedLog.logDeleteDirName(tp), shouldReinitialize = false) +addLogToBeDeleted(log) +info(s"Old log for partition ${tp} is renamed to ${log.dir.getAbsolutePath} and is scheduled for deletion") + } + + currentLogs.put(tp, futureLog) + futureLog.newMetrics() + + info(s"Successfully renamed abandoned future log for $tp") Review Comment: That's a great suggestion! I'm going to try refactoring this to use `replaceCurrentWithFutureLog` -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA:-16399: Add JBOD support in tiered storage [kafka]
showuon commented on code in PR #15690: URL: https://github.com/apache/kafka/pull/15690#discussion_r1565635052 ## core/src/main/java/kafka/server/TierStateMachine.java: ## @@ -40,7 +92,72 @@ public interface TierStateMachine { */ PartitionFetchState start(TopicPartition topicPartition, PartitionFetchState currentFetchState, - PartitionData fetchPartitionData) throws Exception; + PartitionData fetchPartitionData) throws Exception { +OffsetAndEpoch epochAndLeaderLocalStartOffset = leader.fetchEarliestLocalOffset(topicPartition, currentFetchState.currentLeaderEpoch()); +int epoch = epochAndLeaderLocalStartOffset.leaderEpoch(); +long leaderLocalStartOffset = epochAndLeaderLocalStartOffset.offset(); + +long offsetToFetch; + replicaMgr.brokerTopicStats().topicStats(topicPartition.topic()).buildRemoteLogAuxStateRequestRate().mark(); + replicaMgr.brokerTopicStats().allTopicsStats().buildRemoteLogAuxStateRequestRate().mark(); + +try { +offsetToFetch = buildRemoteLogAuxState(topicPartition, currentFetchState.currentLeaderEpoch(), leaderLocalStartOffset, epoch, fetchPartitionData.logStartOffset()); +} catch (RemoteStorageException e) { + replicaMgr.brokerTopicStats().topicStats(topicPartition.topic()).failedBuildRemoteLogAuxStateRate().mark(); + replicaMgr.brokerTopicStats().allTopicsStats().failedBuildRemoteLogAuxStateRate().mark(); +throw e; +} + +OffsetAndEpoch fetchLatestOffsetResult = leader.fetchLatestOffset(topicPartition, currentFetchState.currentLeaderEpoch()); +long leaderEndOffset = fetchLatestOffsetResult.offset(); + +long initialLag = leaderEndOffset - offsetToFetch; + +return PartitionFetchState.apply(currentFetchState.topicId(), offsetToFetch, Option.apply(initialLag), currentFetchState.currentLeaderEpoch(), +Fetching$.MODULE$, replicaMgr.localLogOrException(topicPartition).latestEpoch()); Review Comment: Ah, nice catch! Interestingly there's no test caught this error. Let me write a test for it. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] MINOR: Various cleanups in storage [kafka]
mimaison merged PR #15711: URL: https://github.com/apache/kafka/pull/15711 -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Resolved] (KAFKA-15230) ApiVersions data between controllers is not reliable
[ https://issues.apache.org/jira/browse/KAFKA-15230?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Chia-Ping Tsai resolved KAFKA-15230. Fix Version/s: (was: 3.7.0) Resolution: Duplicate this is fixed by https://issues.apache.org/jira/browse/KAFKA-15369 > ApiVersions data between controllers is not reliable > > > Key: KAFKA-15230 > URL: https://issues.apache.org/jira/browse/KAFKA-15230 > Project: Kafka > Issue Type: Bug >Reporter: David Arthur >Assignee: Colin McCabe >Priority: Critical > > While testing ZK migrations, I noticed a case where the controller was not > starting the migration due to the missing ApiVersions data from other > controllers. This was unexpected because the quorum was running and the > followers were replicating the metadata log as expected. After examining a > heap dump of the leader, it was in fact the case that the ApiVersions map of > NodeApiVersions was empty. > > After further investigation and offline discussion with [~jsancio], we > realized that after the initial leader election, the connection from the Raft > leader to the followers will become idle and eventually timeout and close. > This causes NetworkClient to purge the NodeApiVersions data for the closed > connections. > > There are two main side effects of this behavior: > 1) If migrations are not started within the idle timeout period (10 minutes, > by default), then they will not be able to be started. After this timeout > period, I was unable to restart the controllers in such a way that the leader > had active connections with all followers. > 2) Dynamically updating features, such as "metadata.version", is not > guaranteed to be safe > > There is a partial workaround for the migration issue. If we set " > connections.max.idle.ms" to -1, the Raft leader will never disconnect from > the followers. However, if a follower restarts, the leader will not > re-establish a connection. > > The feature update issue has no safe workarounds. -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Reopened] (KAFKA-15230) ApiVersions data between controllers is not reliable
[ https://issues.apache.org/jira/browse/KAFKA-15230?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Chia-Ping Tsai reopened KAFKA-15230: reopen to make it as duplicate > ApiVersions data between controllers is not reliable > > > Key: KAFKA-15230 > URL: https://issues.apache.org/jira/browse/KAFKA-15230 > Project: Kafka > Issue Type: Bug >Reporter: David Arthur >Assignee: Colin McCabe >Priority: Critical > Fix For: 3.7.0 > > > While testing ZK migrations, I noticed a case where the controller was not > starting the migration due to the missing ApiVersions data from other > controllers. This was unexpected because the quorum was running and the > followers were replicating the metadata log as expected. After examining a > heap dump of the leader, it was in fact the case that the ApiVersions map of > NodeApiVersions was empty. > > After further investigation and offline discussion with [~jsancio], we > realized that after the initial leader election, the connection from the Raft > leader to the followers will become idle and eventually timeout and close. > This causes NetworkClient to purge the NodeApiVersions data for the closed > connections. > > There are two main side effects of this behavior: > 1) If migrations are not started within the idle timeout period (10 minutes, > by default), then they will not be able to be started. After this timeout > period, I was unable to restart the controllers in such a way that the leader > had active connections with all followers. > 2) Dynamically updating features, such as "metadata.version", is not > guaranteed to be safe > > There is a partial workaround for the migration issue. If we set " > connections.max.idle.ms" to -1, the Raft leader will never disconnect from > the followers. However, if a follower restarts, the leader will not > re-establish a connection. > > The feature update issue has no safe workarounds. -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Resolved] (KAFKA-15369) Allow AdminClient to Talk Directly with the KRaft Controller Quorum and add Controller Registration
[ https://issues.apache.org/jira/browse/KAFKA-15369?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Chia-Ping Tsai resolved KAFKA-15369. Fix Version/s: 3.7.0 Assignee: Colin McCabe Resolution: Fixed > Allow AdminClient to Talk Directly with the KRaft Controller Quorum and add > Controller Registration > --- > > Key: KAFKA-15369 > URL: https://issues.apache.org/jira/browse/KAFKA-15369 > Project: Kafka > Issue Type: Improvement >Reporter: Colin McCabe >Assignee: Colin McCabe >Priority: Major > Fix For: 3.7.0 > > -- This message was sent by Atlassian Jira (v8.20.10#820010)