Re: [PR] KAFKA-16593: Rewrite DeleteConsumerGroupsTest by ClusterTestExtensions [kafka]
chia7712 commented on code in PR #15766: URL: https://github.com/apache/kafka/pull/15766#discussion_r1582583036 ## tools/src/test/java/org/apache/kafka/tools/consumer/group/ConsumerGroupExecutor.java: ## @@ -0,0 +1,189 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.kafka.tools.consumer.group; + +import org.apache.kafka.clients.consumer.GroupProtocol; +import org.apache.kafka.clients.consumer.KafkaConsumer; +import org.apache.kafka.clients.consumer.RangeAssignor; +import org.apache.kafka.common.errors.InterruptException; +import org.apache.kafka.common.errors.WakeupException; +import org.apache.kafka.common.serialization.StringDeserializer; +import org.apache.kafka.common.utils.Utils; + +import java.time.Duration; +import java.util.HashMap; +import java.util.LinkedList; +import java.util.List; +import java.util.Map; +import java.util.Objects; +import java.util.Optional; +import java.util.Queue; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.Executors; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicBoolean; +import java.util.stream.Collectors; +import java.util.stream.IntStream; + +import static java.util.Collections.singleton; +import static org.apache.kafka.clients.consumer.ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG; +import static org.apache.kafka.clients.consumer.ConsumerConfig.GROUP_ID_CONFIG; +import static org.apache.kafka.clients.consumer.ConsumerConfig.GROUP_PROTOCOL_CONFIG; +import static org.apache.kafka.clients.consumer.ConsumerConfig.GROUP_REMOTE_ASSIGNOR_CONFIG; +import static org.apache.kafka.clients.consumer.ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG; +import static org.apache.kafka.clients.consumer.ConsumerConfig.PARTITION_ASSIGNMENT_STRATEGY_CONFIG; +import static org.apache.kafka.clients.consumer.ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG; +import static org.apache.kafka.common.GroupType.CONSUMER; + +class ConsumerGroupExecutor { + +private ConsumerGroupExecutor() { +} + +static AutoCloseable buildConsumerGroup(String brokerAddress, +int numberOfConsumers, +String groupId, +String topic, +String groupProtocol, +Optional remoteAssignor, +Map customConfigs, +boolean syncCommit) { +return buildConsumers( +brokerAddress, +numberOfConsumers, +groupId, +groupProtocol, +topic, +RangeAssignor.class.getName(), +remoteAssignor, +customConfigs, +syncCommit +); +} + +static AutoCloseable buildClassicGroup(String brokerAddress, + int numberOfConsumers, + String groupId, + String topic, + String assignmentStrategy, + Map customConfigs, + boolean syncCommit) { +return buildConsumers( +brokerAddress, +numberOfConsumers, +groupId, +GroupProtocol.CLASSIC.name, +topic, +assignmentStrategy, +Optional.empty(), +customConfigs, +syncCommit +); +} + +private static AutoCloseable buildConsumers( +String brokerAddress, +int numberOfConsumers, +String groupId, +String groupProtocol, +String topic, +String assignmentStrategy, +Optional remoteAssignor, +Map customConfigs, +boolean syncCommit +) { +List> allConfigs = IntStream.range(0, numberOfConsumers) +.mapToObj(ignored -> +composeConfigs( +
[jira] [Assigned] (KAFKA-16640) Replace TestUtils#resource by scala.util.Using
[ https://issues.apache.org/jira/browse/KAFKA-16640?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Chia-Ping Tsai reassigned KAFKA-16640: -- Assignee: TengYao Chi (was: Chia-Ping Tsai) > Replace TestUtils#resource by scala.util.Using > -- > > Key: KAFKA-16640 > URL: https://issues.apache.org/jira/browse/KAFKA-16640 > Project: Kafka > Issue Type: Improvement >Reporter: Chia-Ping Tsai >Assignee: TengYao Chi >Priority: Minor > > `scala.util.Using` is in both scala 2.13 and scala-collection-compat so we > don't need to have custom try-resource function. -- This message was sent by Atlassian Jira (v8.20.10#820010)
Re: [PR] KAFKA-16205 Allow MetadataLoader to coalesce small batches [kafka]
github-actions[bot] commented on PR #15283: URL: https://github.com/apache/kafka/pull/15283#issuecomment-2081835478 This PR is being marked as stale since it has not had any activity in 90 days. If you would like to keep this PR alive, please ask a committer for review. If the PR has merge conflicts, please update it with the latest from trunk (or appropriate release branch) If this PR is no longer valid or desired, please feel free to close it. If no activity occurs in the next 30 days, it will be automatically closed. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-16572: allow defining number of disks per broker in ClusterTest [kafka]
brandboat commented on code in PR #15745: URL: https://github.com/apache/kafka/pull/15745#discussion_r1582482314 ## core/src/test/java/kafka/test/ClusterConfig.java: ## @@ -160,6 +166,7 @@ public boolean equals(Object object) { return Objects.equals(type, clusterConfig.type) && Objects.equals(brokers, clusterConfig.brokers) && Objects.equals(controllers, clusterConfig.controllers) +&& Objects.equals(disksPerBroker, clusterConfig.disksPerBroker) Review Comment: feel free to remove them as currently they are unused 😃 -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Commented] (KAFKA-16640) Replace TestUtils#resource by scala.util.Using
[ https://issues.apache.org/jira/browse/KAFKA-16640?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17841791#comment-17841791 ] TengYao Chi commented on KAFKA-16640: - i am able to handle this issue 😀 > Replace TestUtils#resource by scala.util.Using > -- > > Key: KAFKA-16640 > URL: https://issues.apache.org/jira/browse/KAFKA-16640 > Project: Kafka > Issue Type: Improvement >Reporter: Chia-Ping Tsai >Assignee: Chia-Ping Tsai >Priority: Minor > > `scala.util.Using` is in both scala 2.13 and scala-collection-compat so we > don't need to have custom try-resource function. -- This message was sent by Atlassian Jira (v8.20.10#820010)
Re: [PR] MINOR: Add replayRecords to CoordinatorResult [kafka]
dongnuo123 commented on PR #15818: URL: https://github.com/apache/kafka/pull/15818#issuecomment-2081675669 > @dongnuo123 Thanks for the patch. Could you also check if we have other cases like this one: > > https://github.com/apache/kafka/blob/e7792258df934a5c8470c2925c5d164c7d5a8e6c/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/GroupMetadataManagerTestContext.java#L531 > > ? We need to ensure that we use replayRecords. This is the only case that we don't use replayRecords. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-16615: JoinGroup API for upgrading ConsumerGroup [kafka]
dongnuo123 commented on code in PR #15798: URL: https://github.com/apache/kafka/pull/15798#discussion_r1582404976 ## group-coordinator/src/test/java/org/apache/kafka/coordinator/group/GroupMetadataManagerTest.java: ## @@ -10921,6 +10855,544 @@ public void testLastConsumerProtocolMemberRebalanceTimeoutInConsumerGroup() { assertTrue(classicGroup.isInState(PREPARING_REBALANCE)); } +@Test +public void testConsumerGroupJoinThrowsExceptionIfGroupOverMaxSize() { +String groupId = "group-id"; +String memberId = Uuid.randomUuid().toString(); +GroupMetadataManagerTestContext context = new GroupMetadataManagerTestContext.Builder() +.withConsumerGroup(new ConsumerGroupBuilder(groupId, 10) +.withMember(new ConsumerGroupMember.Builder(memberId) +.setState(MemberState.STABLE) +.setMemberEpoch(10) +.setPreviousMemberEpoch(10) +.build())) +.withConsumerGroupMaxSize(1) +.build(); + +JoinGroupRequestData request = new GroupMetadataManagerTestContext.JoinGroupRequestBuilder() +.withGroupId(groupId) +.withMemberId(UNKNOWN_MEMBER_ID) +.withDefaultProtocolTypeAndProtocols() +.build(); + +Exception ex = assertThrows(GroupMaxSizeReachedException.class, () -> context.sendClassicGroupJoin(request)); +assertEquals("The consumer group has reached its maximum capacity of 1 members.", ex.getMessage()); +} + +@Test +public void testConsumerGroupJoinInvalidSessionTimeout() throws Exception { +int minSessionTimeout = 50; +int maxSessionTimeout = 100; +String groupId = "group-id"; + +GroupMetadataManagerTestContext context = new GroupMetadataManagerTestContext.Builder() +.withClassicGroupMinSessionTimeoutMs(minSessionTimeout) +.withClassicGroupMaxSessionTimeoutMs(maxSessionTimeout) +.withConsumerGroup(new ConsumerGroupBuilder(groupId, 10)) +.build(); + +JoinGroupRequestData requestWithSmallSessionTimeout = new GroupMetadataManagerTestContext.JoinGroupRequestBuilder() +.withGroupId(groupId) +.withMemberId(UNKNOWN_MEMBER_ID) +.withSessionTimeoutMs(minSessionTimeout - 1) +.build(); +assertThrows(InvalidSessionTimeoutException.class, () -> context.sendClassicGroupJoin(requestWithSmallSessionTimeout)); + +JoinGroupRequestData requestWithLargeSessionTimeout = new GroupMetadataManagerTestContext.JoinGroupRequestBuilder() +.withGroupId(groupId) +.withMemberId(UNKNOWN_MEMBER_ID) +.withSessionTimeoutMs(maxSessionTimeout + 1) +.build(); +assertThrows(InvalidSessionTimeoutException.class, () -> context.sendClassicGroupJoin(requestWithLargeSessionTimeout)); +} + +@Test +public void testConsumerGroupJoinThrowsExceptionIfProtocolIsNotSupported() { +String groupId = "group-id"; +String memberId = Uuid.randomUuid().toString(); +GroupMetadataManagerTestContext context = new GroupMetadataManagerTestContext.Builder() +.withConsumerGroup(new ConsumerGroupBuilder(groupId, 10) +.withMember(new ConsumerGroupMember.Builder(memberId) +.setState(MemberState.STABLE) +.setMemberEpoch(10) +.setPreviousMemberEpoch(10) + .setSupportedClassicProtocols(GroupMetadataManagerTestContext.toProtocols("roundrobin")) +.build())) +.build(); + +JoinGroupRequestData requestWithEmptyProtocols = new GroupMetadataManagerTestContext.JoinGroupRequestBuilder() +.withGroupId(groupId) +.withMemberId(UNKNOWN_MEMBER_ID) +.withProtocolType(ConsumerProtocol.PROTOCOL_TYPE) +.withDefaultProtocolTypeAndProtocols() +.build(); +assertThrows(InconsistentGroupProtocolException.class, () -> context.sendClassicGroupJoin(requestWithEmptyProtocols)); + +JoinGroupRequestData requestWithInvalidProtocolType = new GroupMetadataManagerTestContext.JoinGroupRequestBuilder() +.withGroupId(groupId) +.withMemberId(UNKNOWN_MEMBER_ID) +.withProtocolType("connect") +.withDefaultProtocolTypeAndProtocols() +.build(); +assertThrows(InconsistentGroupProtocolException.class, () -> context.sendClassicGroupJoin(requestWithInvalidProtocolType)); +} + +@Test +public void testConsumerGroupJoinWithNewDynamicMember() throws Exception { +String groupId = "group-id"; +String memberId = Uuid.randomUuid().toString(); +Uuid fooTopicId = Uuid.randomUuid(); +String fooTopicName = "foo"; +Uuid barTopicId = Uuid.randomUuid(); +String barTopicName = "bar"; + +
Re: [PR] KAFKA-16615: JoinGroup API for upgrading ConsumerGroup [kafka]
dongnuo123 commented on code in PR #15798: URL: https://github.com/apache/kafka/pull/15798#discussion_r1582403702 ## group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupMetadataManager.java: ## @@ -1413,6 +1506,243 @@ private CoordinatorResult consumerGr return new CoordinatorResult<>(records, response); } +/** + * Handle a JoinGroupRequest to a ConsumerGroup. + * + * @param group The group to join. + * @param contextThe request context. + * @param requestThe actual JoinGroup request. + * @param responseFuture The join group response future. + * + * @return The result that contains records to append if the join group phase completes. + */ +private CoordinatorResult classicGroupJoinToConsumerGroup( +ConsumerGroup group, +RequestContext context, +JoinGroupRequestData request, +CompletableFuture responseFuture +) throws ApiException { +final long currentTimeMs = time.milliseconds(); +final List records = new ArrayList<>(); +final String groupId = request.groupId(); +String memberId = request.memberId(); +final String instanceId = request.groupInstanceId(); +final JoinGroupRequestProtocolCollection protocols = request.protocols(); +final boolean isUnknownMember = memberId.equals(UNKNOWN_MEMBER_ID); + +throwIfConsumerGroupIsFull(group, memberId); +throwIfClassicProtocolIsNotSupported(group, memberId, request.protocolType(), protocols); +// TODO: need to throw an exception if group is dead? Review Comment: Do we ever transition a consumer group to DEAD? It's always confusing when it comes to type check in the new group coordinator. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-16615: JoinGroup API for upgrading ConsumerGroup [kafka]
dongnuo123 commented on code in PR #15798: URL: https://github.com/apache/kafka/pull/15798#discussion_r1582397694 ## group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupMetadataManager.java: ## @@ -1169,6 +1175,107 @@ private void throwIfStaticMemberIsUnknown(ConsumerGroupMember staticMember, Stri } } +/** + * Validates if the received classic member protocols are supported by the group. + * + * @param group The ConsumerGroup. + * @param memberId The joining member id. + * @param protocolType The joining member protocol type. + * @param protocols The joining member protocol collection. + */ +private void throwIfClassicProtocolIsNotSupported( +ConsumerGroup group, +String memberId, +String protocolType, +JoinGroupRequestProtocolCollection protocols +) { +if (!group.supportsClassicProtocols(protocolType, ClassicGroupMember.plainProtocolSet(protocols))) { Review Comment: Yeah I was also thinking about this. The last commit just made a change in `supportsClassicProtocols` which requires the protocolType to be `ConsumerProtocol.PROTOCOL_TYPE` for both empty and non-empty groups. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-16615: JoinGroup API for upgrading ConsumerGroup [kafka]
dongnuo123 commented on code in PR #15798: URL: https://github.com/apache/kafka/pull/15798#discussion_r1582397540 ## group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupMetadataManager.java: ## @@ -1413,6 +1506,243 @@ private CoordinatorResult consumerGr return new CoordinatorResult<>(records, response); } +/** + * Handle a JoinGroupRequest to a ConsumerGroup. + * + * @param group The group to join. + * @param contextThe request context. + * @param requestThe actual JoinGroup request. + * @param responseFuture The join group response future. + * + * @return The result that contains records to append if the join group phase completes. + */ +private CoordinatorResult classicGroupJoinToConsumerGroup( +ConsumerGroup group, +RequestContext context, +JoinGroupRequestData request, +CompletableFuture responseFuture +) throws ApiException { +final long currentTimeMs = time.milliseconds(); +final List records = new ArrayList<>(); +final String groupId = request.groupId(); +String memberId = request.memberId(); +final String instanceId = request.groupInstanceId(); +final JoinGroupRequestProtocolCollection protocols = request.protocols(); +final boolean isUnknownMember = memberId.equals(UNKNOWN_MEMBER_ID); + +throwIfConsumerGroupIsFull(group, memberId); +throwIfClassicProtocolIsNotSupported(group, memberId, request.protocolType(), protocols); +// TODO: need to throw an exception if group is dead? + +// Get or create the member. +if (isUnknownMember) memberId = Uuid.randomUuid().toString(); +ConsumerGroupMember member; +ConsumerGroupMember.Builder updatedMemberBuilder; +boolean staticMemberReplaced = false; +boolean newMemberCreated = false; +if (instanceId == null) { +// A dynamic member (re-)joins. +throwIfRequiresKnownMemberId(groupId, memberId, isUnknownMember, context); +newMemberCreated = !group.hasMember(memberId); +member = group.getOrMaybeCreateMember(memberId, true); +log.info("[GroupId {}] Member {} joins the consumer group.", groupId, memberId); +updatedMemberBuilder = new ConsumerGroupMember.Builder(member); +} else { +member = group.staticMember(instanceId); +// A new static member joins or the existing static member rejoins. +if (isUnknownMember) { +newMemberCreated = true; +if (member == null) { +// New static member. +member = group.getOrMaybeCreateMember(memberId, true); +updatedMemberBuilder = new ConsumerGroupMember.Builder(member); +log.info("[GroupId {}] Static member {} with instance id {} joins the consumer group.", groupId, memberId, instanceId); +} else { +// Replace the current static member. +staticMemberReplaced = true; +updatedMemberBuilder = new ConsumerGroupMember.Builder(memberId) +.setAssignedPartitions(member.assignedPartitions()); +removeMember(records, groupId, member.memberId()); +log.info("[GroupId {}] Static member with unknown member id and instance id {} re-joins the consumer group. " + +"Created a new member {} to replace the existing member {}.", groupId, instanceId, memberId, member.memberId()); +} +} else { +// Rejoining static member. Fence the static group with unmatched member id. +throwIfStaticMemberIsUnknown(member, instanceId); +throwIfInstanceIdIsFenced(member, groupId, memberId, instanceId); +updatedMemberBuilder = new ConsumerGroupMember.Builder(member); +log.info("[GroupId {}] Static member {} with instance id {} re-joins the consumer group.", groupId, memberId, instanceId); +} +} + +int groupEpoch = group.groupEpoch(); +Map subscriptionMetadata = group.subscriptionMetadata(); +final ConsumerPartitionAssignor.Subscription subscription = deserializeSubscription(protocols); +final List ownedTopicPartitions = +validateGenerationIdAndGetOwnedPartition(member, subscription); + +// 1. Create or update the member. If the member is new or has changed, a ConsumerGroupMemberMetadataValue +// record is written to the __consumer_offsets partition to persist the change. If the subscriptions have +// changed, the subscription metadata is updated and persisted by writing a ConsumerGroupPartitionMetadataValue +// record to the __consumer_offsets partition. Finally, the group epoch is
Re: [PR] KAFKA-16615: JoinGroup API for upgrading ConsumerGroup [kafka]
dongnuo123 commented on code in PR #15798: URL: https://github.com/apache/kafka/pull/15798#discussion_r1582397180 ## group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupCoordinatorService.java: ## @@ -1122,4 +1122,27 @@ private OUT handleOperationException( return handler.apply(apiError.error(), apiError.message()); } } + +/** + * Creates the JoinGroupResponseData according to the error type. + * + * @param memberId The member id. + * @param error The error. + * @return The JoinGroupResponseData. + */ +private static JoinGroupResponseData createJoinGroupResponseData( +String memberId, +Errors error +) { +switch (error) { +case MEMBER_ID_REQUIRED: +case INVALID_SESSION_TIMEOUT: +return new JoinGroupResponseData() +.setMemberId(memberId) Review Comment: Seems that `INVALID_GROUP_ID` also sets the member id. I don't think we use the member id here either. Should we remove them? https://github.com/apache/kafka/blob/e7792258df934a5c8470c2925c5d164c7d5a8e6c/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupCoordinatorService.java#L325 -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-16211: Inconsistent config values in CreateTopicsResult and DescribeConfigsResult [kafka]
chia7712 commented on PR #15696: URL: https://github.com/apache/kafka/pull/15696#issuecomment-2081646079 > or would like me to add a new test in the test class for the function? this one. Normally, the patch related to bug fix needs to offer the test to prove the bug does get fixed :) -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-16614:Disallow @ClusterTemplate("") [kafka]
chia7712 commented on code in PR #15800: URL: https://github.com/apache/kafka/pull/15800#discussion_r1582294569 ## core/src/test/java/kafka/test/junit/ClusterTestExtensions.java: ## @@ -91,9 +91,6 @@ public Stream provideTestTemplateInvocationContex ClusterTemplate clusterTemplateAnnot = context.getRequiredTestMethod().getDeclaredAnnotation(ClusterTemplate.class); if (clusterTemplateAnnot != null) { processClusterTemplate(context, clusterTemplateAnnot, generatedContexts::add); -if (generatedContexts.isEmpty()) { -throw new IllegalStateException("ClusterConfig generator method should provide at least one config"); Review Comment: My previous comment is wrong. This check is used to make sure there is at least one `ClusterConfig`, so we should not remove it. However, It seems to me this check should be moved to `processClusterTemplate` and we should add unit test for it. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] MINOR: Replaced Utils.join() with String.join() [kafka]
chia7712 commented on code in PR #15823: URL: https://github.com/apache/kafka/pull/15823#discussion_r1582290331 ## clients/src/main/java/org/apache/kafka/clients/FetchSessionHandler.java: ## @@ -392,14 +392,14 @@ private String topicPartitionsToLogString(Collection partitions) if (!log.isTraceEnabled()) { return String.format("%d partition(s)", partitions.size()); } -return "(" + Utils.join(partitions, ", ") + ")"; +return "(" + String.join(", ", Arrays.toString(partitions.toArray())) + ")"; Review Comment: ditto ## clients/src/main/java/org/apache/kafka/clients/FetchSessionHandler.java: ## @@ -392,14 +392,14 @@ private String topicPartitionsToLogString(Collection partitions) if (!log.isTraceEnabled()) { return String.format("%d partition(s)", partitions.size()); } -return "(" + Utils.join(partitions, ", ") + ")"; +return "(" + String.join(", ", Arrays.toString(partitions.toArray())) + ")"; } private String topicIdPartitionsToLogString(Collection partitions) { if (!log.isTraceEnabled()) { return String.format("%d partition(s)", partitions.size()); } -return "(" + Utils.join(partitions, ", ") + ")"; +return "(" + String.join(", ", Arrays.toString(partitions.toArray())) + ")"; Review Comment: ditto ## clients/src/main/java/org/apache/kafka/common/utils/Utils.java: ## @@ -584,27 +584,6 @@ public static String formatBytes(long bytes) { } } -/** - * Create a string representation of an array joined by the given separator - * @param strs The array of items - * @param separator The separator - * @return The string representation. - */ -public static String join(T[] strs, String separator) { -return join(Arrays.asList(strs), separator); -} - -/** - * Create a string representation of a collection joined by the given separator - * @param collection The list of items - * @param separator The separator - * @return The string representation. - */ -public static String join(Collection collection, String separator) { -Objects.requireNonNull(collection); -return mkString(collection.stream(), "", "", separator); -} - /** * Create a string representation of a stream surrounded by `begin` and `end` and joined by `separator`. * Review Comment: It seems `mkString(Stream stream, String begin, String end, String separator)` can be removed too since the only one use case (https://github.com/apache/kafka/blob/5de5d967adffd864bad3ec729760a430253abf38/tools/src/test/java/org/apache/kafka/tools/consumer/group/ConsumerGroupServiceTest.java#L199) does not use "begin" and "end" ## clients/src/main/java/org/apache/kafka/clients/CommonClientConfigs.java: ## @@ -131,7 +130,7 @@ public class CommonClientConfigs { public static final String SECURITY_PROTOCOL_CONFIG = "security.protocol"; public static final String SECURITY_PROTOCOL_DOC = "Protocol used to communicate with brokers. Valid values are: " + -Utils.join(SecurityProtocol.names(), ", ") + "."; +String.join(", ", SecurityProtocol.names()).replace("[", "").replace("]", "") + "."; Review Comment: please don't use `replace`. We can generate the correct message at once. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] MINOR: Clean up TestUtils.scala [kafka]
chia7712 commented on code in PR #15808: URL: https://github.com/apache/kafka/pull/15808#discussion_r1582280178 ## core/src/test/scala/unit/kafka/utils/TestUtils.scala: ## @@ -1124,24 +933,18 @@ object TestUtils extends Logging { }, msg = msg, pause = 0L, waitTimeMs = waitTimeMs) } - /** - * Wait for the presence of an optional value. - * - * @param func The function defining the optional value - * @param msg Error message in the case that the value never appears - * @param waitTimeMs Maximum time to wait - * @return The unwrapped value returned by the function - */ - def awaitValue[T](func: () => Option[T], msg: => String, waitTimeMs: Long = JTestUtils.DEFAULT_MAX_WAIT_MS): T = { -var value: Option[T] = None -waitUntilTrue(() => { - value = func() - value.isDefined -}, msg, waitTimeMs) -value.get + def subscribeAndWaitForRecords(topic: String, Review Comment: why adding this new method? it is unused. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Created] (KAFKA-16640) Replace TestUtils#resource by scala.util.Using
Chia-Ping Tsai created KAFKA-16640: -- Summary: Replace TestUtils#resource by scala.util.Using Key: KAFKA-16640 URL: https://issues.apache.org/jira/browse/KAFKA-16640 Project: Kafka Issue Type: Improvement Reporter: Chia-Ping Tsai Assignee: Chia-Ping Tsai `scala.util.Using` is in both scala 2.13 and scala-collection-compat so we don't need to have custom try-resource function. -- This message was sent by Atlassian Jira (v8.20.10#820010)
Re: [PR] MINOR: Remove unused parameters in KafkaConfig [kafka]
chia7712 commented on code in PR #15788: URL: https://github.com/apache/kafka/pull/15788#discussion_r1582265983 ## core/src/main/scala/kafka/MetadataLogConfig.scala: ## @@ -32,13 +32,13 @@ final case class MetadataLogConfig( ) object MetadataLogConfig { - def apply(config: AbstractConfig, maxBatchSizeInBytes: Int, maxFetchSizeInBytes: Int): MetadataLogConfig = { + def apply(config: KafkaConfig, maxBatchSizeInBytes: Int, maxFetchSizeInBytes: Int): MetadataLogConfig = { new MetadataLogConfig( - config.getInt(KRaftConfigs.METADATA_LOG_SEGMENT_BYTES_CONFIG), - config.getInt(KRaftConfigs.METADATA_LOG_SEGMENT_MIN_BYTES_CONFIG), - config.getLong(KRaftConfigs.METADATA_LOG_SEGMENT_MILLIS_CONFIG), - config.getLong(KRaftConfigs.METADATA_MAX_RETENTION_BYTES_CONFIG), - config.getLong(KRaftConfigs.METADATA_MAX_RETENTION_MILLIS_CONFIG), + config.metadataLogSegmentBytes, + config.metadataLogSegmentMinBytes, + config.metadataLogSegmentMillis, + config.metadataRetentionBytes, + config.metadataRetentionMillis, maxBatchSizeInBytes, maxFetchSizeInBytes, ServerLogConfigs.LOG_DELETE_DELAY_MS_DEFAULT, Review Comment: Could you change line#45 to use `config.nodeId`? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-16572: allow defining number of disks per broker in ClusterTest [kafka]
chia7712 commented on code in PR #15745: URL: https://github.com/apache/kafka/pull/15745#discussion_r1582250475 ## core/src/test/java/kafka/test/ClusterConfig.java: ## @@ -55,14 +56,15 @@ public class ClusterConfig { private final Map> perBrokerOverrideProperties; @SuppressWarnings("checkstyle:ParameterNumber") -private ClusterConfig(Type type, int brokers, int controllers, String name, boolean autoStart, +private ClusterConfig(Type type, int brokers, int controllers, int disksPerBroker, String name, boolean autoStart, SecurityProtocol securityProtocol, String listenerName, File trustStoreFile, MetadataVersion metadataVersion, Map serverProperties, Map producerProperties, Map consumerProperties, Map adminClientProperties, Map saslServerProperties, Map saslClientProperties, Map> perBrokerOverrideProperties) { this.type = Objects.requireNonNull(type); this.brokers = brokers; this.controllers = controllers; +this.disksPerBroker = disksPerBroker; Review Comment: Could you add check to make sure the value is larger than zero? ## core/src/test/java/kafka/test/ClusterTestExtensionsTest.java: ## @@ -121,6 +124,26 @@ public void testClusterTests() { } } +@ClusterTests({ +@ClusterTest(clusterType = Type.ZK, disksPerBroker = 1), Review Comment: We can't use `disksPerBroker = 1` since it is equal to the default value. ## core/src/test/java/kafka/test/ClusterConfig.java: ## @@ -160,6 +166,7 @@ public boolean equals(Object object) { return Objects.equals(type, clusterConfig.type) && Objects.equals(brokers, clusterConfig.brokers) && Objects.equals(controllers, clusterConfig.controllers) +&& Objects.equals(disksPerBroker, clusterConfig.disksPerBroker) Review Comment: not sure whether we need to keep `hashCode` and `equals`. We don't use it actually and it is error-prone when adding new fields. For example, this PR does not update `hashCode` for the new field. @brandboat WDTY? ## core/src/test/java/kafka/test/junit/ClusterTestExtensions.java: ## @@ -162,32 +162,39 @@ private void processClusterTest(ExtensionContext context, ClusterTest annot, Clu controllers = annot.controllers(); } -if (brokers <= 0 || controllers <= 0) { -throw new IllegalArgumentException("Number of brokers/controllers must be greater than zero."); +final int disksPerBroker; +if (annot.disksPerBroker() == 0) { +disksPerBroker = defaults.disksPerBroker(); Review Comment: Could we leverage the builder instead of creating a bunch of temporary local variables? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Commented] (KAFKA-16639) AsyncKafkaConsumer#close does not send heartbeat to leave group
[ https://issues.apache.org/jira/browse/KAFKA-16639?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17841678#comment-17841678 ] Philip Nee commented on KAFKA-16639: thanks for reporting. will do. > AsyncKafkaConsumer#close does not send heartbeat to leave group > --- > > Key: KAFKA-16639 > URL: https://issues.apache.org/jira/browse/KAFKA-16639 > Project: Kafka > Issue Type: Bug >Reporter: Chia-Ping Tsai >Assignee: Chia-Ping Tsai >Priority: Major > Labels: kip-848-client-support > > This bug can be reproduced by immediately closing a consumer which is just > created. > The root cause is that we skip the new heartbeat used to leave group when > there is a in-flight heartbeat request > ([https://github.com/apache/kafka/blob/5de5d967adffd864bad3ec729760a430253abf38/clients/src/main/java/org/apache/kafka/clients/consumer/internals/HeartbeatRequestManager.java#L212).] > It seems to me the simple solution is that we create a heartbeat request when > meeting above situation and then send it by pollOnClose > ([https://github.com/apache/kafka/blob/5de5d967adffd864bad3ec729760a430253abf38/clients/src/main/java/org/apache/kafka/clients/consumer/internals/RequestManager.java#L62).] > -- This message was sent by Atlassian Jira (v8.20.10#820010)
Re: [PR] KAFKA-16593: Rewrite DeleteConsumerGroupsTest by ClusterTestExtensions [kafka]
chia7712 commented on PR #15766: URL: https://github.com/apache/kafka/pull/15766#issuecomment-2081560990 I file https://issues.apache.org/jira/browse/KAFKA-16639 to trace the potential bug about AsyncConsumer. Please wait for that issue. If the issue is too hard to resolve, we can remove the test case about new consumer from this PR -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-15853: Move metrics configs out of KafkaConfig [kafka]
chia7712 commented on PR #15822: URL: https://github.com/apache/kafka/pull/15822#issuecomment-2081560445 ``` [2024-04-28T12:34:14.550Z] [Error] /home/jenkins/workspace/Kafka_kafka-pr_PR-15822/core/src/test/scala/unit/kafka/server/KafkaConfigTest.scala:1000:26: value KafkaMetricsReporterClassesProp is not a member of object kafka.server.KafkaConfig [2024-04-28T12:34:14.550Z] [Error] /home/jenkins/workspace/Kafka_kafka-pr_PR-15822/core/src/test/scala/unit/kafka/server/KafkaConfigTest.scala:1001:26: value KafkaMetricsPollingIntervalSecondsProp is not a member of object kafka.server.KafkaConfig ``` could you please fix the build error -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[PR] [WIP] KAFKA-16553: Add config logging for controller [kafka]
johnnychhsu opened a new pull request, #15826: URL: https://github.com/apache/kafka/pull/15826 *More detailed description of your change, if necessary. The PR title and PR message become the squashed commit message, so use a separate comment to ping reviewers.* *Summary of testing strategy (including rationale) for the feature or bug fix. Unit and/or integration tests are expected for any behaviour change and system tests should be considered for larger changes.* ### Committer Checklist (excluded from commit message) - [ ] Verify design and implementation - [ ] Verify test coverage and CI build status - [ ] Verify documentation (including upgrade notes) -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Commented] (KAFKA-16639) AsyncKafkaConsumer#close does not send heartbeat to leave group
[ https://issues.apache.org/jira/browse/KAFKA-16639?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17841675#comment-17841675 ] Chia-Ping Tsai commented on KAFKA-16639: [~pnee] Could you please take a look if you have free cycle? thanks! > AsyncKafkaConsumer#close does not send heartbeat to leave group > --- > > Key: KAFKA-16639 > URL: https://issues.apache.org/jira/browse/KAFKA-16639 > Project: Kafka > Issue Type: Bug >Reporter: Chia-Ping Tsai >Assignee: Chia-Ping Tsai >Priority: Major > Labels: kip-848-client-support > > This bug can be reproduced by immediately closing a consumer which is just > created. > The root cause is that we skip the new heartbeat used to leave group when > there is a in-flight heartbeat request > ([https://github.com/apache/kafka/blob/5de5d967adffd864bad3ec729760a430253abf38/clients/src/main/java/org/apache/kafka/clients/consumer/internals/HeartbeatRequestManager.java#L212).] > It seems to me the simple solution is that we create a heartbeat request when > meeting above situation and then send it by pollOnClose > ([https://github.com/apache/kafka/blob/5de5d967adffd864bad3ec729760a430253abf38/clients/src/main/java/org/apache/kafka/clients/consumer/internals/RequestManager.java#L62).] > -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Created] (KAFKA-16639) AsyncKafkaConsumer#close does not send heartbeat to leave group
Chia-Ping Tsai created KAFKA-16639: -- Summary: AsyncKafkaConsumer#close does not send heartbeat to leave group Key: KAFKA-16639 URL: https://issues.apache.org/jira/browse/KAFKA-16639 Project: Kafka Issue Type: Bug Reporter: Chia-Ping Tsai Assignee: Chia-Ping Tsai This bug can be reproduced by immediately closing a consumer which is just created. The root cause is that we skip the new heartbeat used to leave group when there is a in-flight heartbeat request ([https://github.com/apache/kafka/blob/5de5d967adffd864bad3ec729760a430253abf38/clients/src/main/java/org/apache/kafka/clients/consumer/internals/HeartbeatRequestManager.java#L212).] It seems to me the simple solution is that we create a heartbeat request when meeting above situation and then send it by pollOnClose ([https://github.com/apache/kafka/blob/5de5d967adffd864bad3ec729760a430253abf38/clients/src/main/java/org/apache/kafka/clients/consumer/internals/RequestManager.java#L62).] -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Commented] (KAFKA-16514) Kafka Streams: stream.close(CloseOptions) does not respect options.leaveGroup flag.
[ https://issues.apache.org/jira/browse/KAFKA-16514?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17841670#comment-17841670 ] Sal Sorrentino commented on KAFKA-16514: Well, this is not entirely accurate. As mentioned in one of my first comments, by providing a `group.instance.id`, you can achieve the desired behavior without using any "internal" configs. However, I agree that it does seem broken that the only way to voluntarily leave a group is to be static member. > Kafka Streams: stream.close(CloseOptions) does not respect options.leaveGroup > flag. > --- > > Key: KAFKA-16514 > URL: https://issues.apache.org/jira/browse/KAFKA-16514 > Project: Kafka > Issue Type: Bug > Components: streams >Affects Versions: 3.7.0 >Reporter: Sal Sorrentino >Priority: Minor > > Working with Kafka Streams 3.7.0, but may affect earlier versions as well. > When attempting to shutdown a streams application and leave the associated > consumer group, the supplied `leaveGroup` option seems to have no effect. > Sample code: > {code:java} > CloseOptions options = new CloseOptions().leaveGroup(true); > stream.close(options);{code} > The expected behavior here is that the group member would shutdown and leave > the group, immediately triggering a consumer group rebalance. In practice, > the rebalance happens after the appropriate timeout configuration has expired. > I understand the default behavior in that there is an assumption that any > associated StateStores would be persisted to disk and that in the case of a > rolling restart/deployment, the rebalance delay may be preferable. However, > in our application we are using in-memory state stores and standby replicas. > There is no benefit in delaying the rebalance in this setup and we are in > need of a way to force a member to leave the group when shutting down. > The workaround we found is to set an undocumented internal StreamConfig to > enforce this behavior: > {code:java} > props.put("internal.leave.group.on.close", true); > {code} > To state the obvious, this is less than ideal. > Additional configuration details: > {code:java} > Properties props = new Properties(); > props.put(StreamsConfig.APPLICATION_ID_CONFIG, "someApplicationId"); > props.put( > StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, > "localhost:9092,localhost:9093,localhost:9094"); > props.put(StreamsConfig.REPLICATION_FACTOR_CONFIG, 3); > props.put(StreamsConfig.NUM_STANDBY_REPLICAS_CONFIG, 1); > props.put(StreamsConfig.NUM_STREAM_THREADS_CONFIG, numProcessors); > props.put(StreamsConfig.PROCESSING_GUARANTEE_CONFIG, > StreamsConfig.EXACTLY_ONCE_V2);{code} > -- This message was sent by Atlassian Jira (v8.20.10#820010)
Re: [PR] KAFKA-16572: allow defining number of disks per broker in ClusterTest [kafka]
FrankYang0529 commented on PR #15745: URL: https://github.com/apache/kafka/pull/15745#issuecomment-2081484120 > @FrankYang0529 please fix the conflicts Hi @chia7712, I resolve conflicts. Thank you. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-15853: Move metrics configs out of KafkaConfig [kafka]
OmniaGM commented on code in PR #15822: URL: https://github.com/apache/kafka/pull/15822#discussion_r1582152852 ## clients/src/main/java/org/apache/kafka/clients/CommonClientConfigs.java: ## @@ -77,7 +78,7 @@ public class CommonClientConfigs { public static final String CLIENT_RACK_CONFIG = "client.rack"; public static final String CLIENT_RACK_DOC = "A rack identifier for this client. This can be any string value which indicates where this client is physically located. It corresponds with the broker config 'broker.rack'"; -public static final String DEFAULT_CLIENT_RACK = ""; +public static final String CLIENT_RACK_DEFAULT = ""; Review Comment: Created this jira https://issues.apache.org/jira/browse/KAFKA-16638 as a followup to align the naming convention for config classes. Left note there that some classes need KIPs and might need a separate jira later. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Created] (KAFKA-16638) Align the naming convention for config and default variables in *Config classes
Omnia Ibrahim created KAFKA-16638: - Summary: Align the naming convention for config and default variables in *Config classes Key: KAFKA-16638 URL: https://issues.apache.org/jira/browse/KAFKA-16638 Project: Kafka Issue Type: Task Reporter: Omnia Ibrahim Some classes in the code is violating the naming naming convention for config, doc, and default variables which is: * `_CONFIG` suffix for defining the configuration * `_DEFAULT` suffix for default value * `_DOC` suffix for doc The following classes need to be updated * `CleanerConfig` and `RemoteLogManagerConfig` to use `_CONFIG` suffix instead of `_PROP`. * Others like `LogConfig` and `QuorumConfig` to use `_DEFAULT` suffix instead of `DEFAULT_` prefix . * same goes with `CommonClientConfigs`, `StreamsConfig` however these are public interfaces and will need a KIP to rename the default value variables and mark the old one as deprecated. This might need to be broken to different Jira. -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Updated] (KAFKA-16637) KIP-848 does not work well
[ https://issues.apache.org/jira/browse/KAFKA-16637?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] sanghyeok An updated KAFKA-16637: - Description: I want to test next generation of the consumer rebalance protocol ([https://cwiki.apache.org/confluence/display/KAFKA/The+Next+Generation+of+the+Consumer+Rebalance+Protocol+%28KIP-848%29+-+Early+Access+Release+Notes)] However, it does not works well. You can check my condition. *Docker-compose.yaml* [https://github.com/chickenchickenlove/kraft-test/blob/main/docker-compose/docker-compose.yaml] *Consumer Code* [https://github.com/chickenchickenlove/kraft-test/blob/main/src/main/java/org/example/Main.java] *Consumer logs* [main] INFO org.apache.kafka.common.telemetry.internals.KafkaMetricsCollector - initializing Kafka metrics collector [main] INFO org.apache.kafka.common.utils.AppInfoParser - Kafka version: 3.7.0 [main] INFO org.apache.kafka.common.utils.AppInfoParser - Kafka commitId: 2ae524ed625438c5 [main] INFO org.apache.kafka.common.utils.AppInfoParser - Kafka startTimeMs: 1714309299215 [main] INFO org.apache.kafka.clients.consumer.internals.AsyncKafkaConsumer - [Consumer clientId=1-1, groupId=1] Subscribed to topic(s): test-topic1 [consumer_background_thread] INFO org.apache.kafka.clients.Metadata - [Consumer clientId=1-1, groupId=1] Cluster ID: Some(MkU3OEVBNTcwNTJENDM2Qk) Stuck In here... *Broker logs* broker | [2024-04-28 12:42:27,751] INFO Sent auto-creation request for Set(__consumer_offsets) to the active controller. (kafka.server.DefaultAutoTopicCreationManager) broker | [2024-04-28 12:42:27,801] INFO Sent auto-creation request for Set(__consumer_offsets) to the active controller. (kafka.server.DefaultAutoTopicCreationManager) broker | [2024-04-28 12:42:28,211] INFO Sent auto-creation request for Set(__consumer_offsets) to the active controller. (kafka.server.DefaultAutoTopicCreationManager) broker | [2024-04-28 12:42:28,259] INFO Sent auto-creation request for Set(__consumer_offsets) to the active controller. (kafka.server.DefaultAutoTopicCreationManager) broker | [2024-04-28 12:42:28,727] INFO Sent auto-creation request for Set(__consumer_offsets) to the active controller. (kafka.server.DefaultAutoTopicCreationManager) stuck in here was: I want to test next generation of the consumer rebalance protocol ([https://cwiki.apache.org/confluence/display/KAFKA/The+Next+Generation+of+the+Consumer+Rebalance+Protocol+%28KIP-848%29+-+Early+Access+Release+Notes)] However, it does not works well. You can check my condition. ### Docker-compose.yaml [https://github.com/chickenchickenlove/kraft-test/blob/main/docker-compose/docker-compose.yaml] ### Consumer Code [https://github.com/chickenchickenlove/kraft-test/blob/main/src/main/java/org/example/Main.java] ### Consumer logs [main] INFO org.apache.kafka.common.telemetry.internals.KafkaMetricsCollector - initializing Kafka metrics collector [main] INFO org.apache.kafka.common.utils.AppInfoParser - Kafka version: 3.7.0 [main] INFO org.apache.kafka.common.utils.AppInfoParser - Kafka commitId: 2ae524ed625438c5 [main] INFO org.apache.kafka.common.utils.AppInfoParser - Kafka startTimeMs: 1714309299215 [main] INFO org.apache.kafka.clients.consumer.internals.AsyncKafkaConsumer - [Consumer clientId=1-1, groupId=1] Subscribed to topic(s): test-topic1 [consumer_background_thread] INFO org.apache.kafka.clients.Metadata - [Consumer clientId=1-1, groupId=1] Cluster ID: Some(MkU3OEVBNTcwNTJENDM2Qk) Stuck In here... ### Broker logs broker | [2024-04-28 12:42:27,751] INFO Sent auto-creation request for Set(__consumer_offsets) to the active controller. (kafka.server.DefaultAutoTopicCreationManager) broker | [2024-04-28 12:42:27,801] INFO Sent auto-creation request for Set(__consumer_offsets) to the active controller. (kafka.server.DefaultAutoTopicCreationManager) broker | [2024-04-28 12:42:28,211] INFO Sent auto-creation request for Set(__consumer_offsets) to the active controller. (kafka.server.DefaultAutoTopicCreationManager) broker | [2024-04-28 12:42:28,259] INFO Sent auto-creation request for Set(__consumer_offsets) to the active controller. (kafka.server.DefaultAutoTopicCreationManager) broker | [2024-04-28 12:42:28,727] INFO Sent auto-creation request for Set(__consumer_offsets) to the active controller. (kafka.server.DefaultAutoTopicCreationManager) stuck in here > KIP-848 does not work well > -- > > Key: KAFKA-16637 > URL: https://issues.apache.org/jira/browse/KAFKA-16637 > Project: Kafka > Issue Type: Bug >Reporter: sanghyeok An >Priority: Minor > > I want to test next generation of the consumer rebalance protocol > ([https://cwiki.apache.org/confluence/display/KAFKA/The+Next+Generation+of+the+Consumer+Rebalance+Protocol+%28KIP-
[jira] [Updated] (KAFKA-16637) KIP-848 does not work well
[ https://issues.apache.org/jira/browse/KAFKA-16637?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] sanghyeok An updated KAFKA-16637: - Description: I want to test next generation of the consumer rebalance protocol ([https://cwiki.apache.org/confluence/display/KAFKA/The+Next+Generation+of+the+Consumer+Rebalance+Protocol+%28KIP-848%29+-+Early+Access+Release+Notes)] However, it does not works well. You can check my condition. ### Docker-compose.yaml [https://github.com/chickenchickenlove/kraft-test/blob/main/docker-compose/docker-compose.yaml] ### Consumer Code [https://github.com/chickenchickenlove/kraft-test/blob/main/src/main/java/org/example/Main.java] ### Consumer logs [main] INFO org.apache.kafka.common.telemetry.internals.KafkaMetricsCollector - initializing Kafka metrics collector [main] INFO org.apache.kafka.common.utils.AppInfoParser - Kafka version: 3.7.0 [main] INFO org.apache.kafka.common.utils.AppInfoParser - Kafka commitId: 2ae524ed625438c5 [main] INFO org.apache.kafka.common.utils.AppInfoParser - Kafka startTimeMs: 1714309299215 [main] INFO org.apache.kafka.clients.consumer.internals.AsyncKafkaConsumer - [Consumer clientId=1-1, groupId=1] Subscribed to topic(s): test-topic1 [consumer_background_thread] INFO org.apache.kafka.clients.Metadata - [Consumer clientId=1-1, groupId=1] Cluster ID: Some(MkU3OEVBNTcwNTJENDM2Qk) Stuck In here... ### Broker logs broker | [2024-04-28 12:42:27,751] INFO Sent auto-creation request for Set(__consumer_offsets) to the active controller. (kafka.server.DefaultAutoTopicCreationManager) broker | [2024-04-28 12:42:27,801] INFO Sent auto-creation request for Set(__consumer_offsets) to the active controller. (kafka.server.DefaultAutoTopicCreationManager) broker | [2024-04-28 12:42:28,211] INFO Sent auto-creation request for Set(__consumer_offsets) to the active controller. (kafka.server.DefaultAutoTopicCreationManager) broker | [2024-04-28 12:42:28,259] INFO Sent auto-creation request for Set(__consumer_offsets) to the active controller. (kafka.server.DefaultAutoTopicCreationManager) broker | [2024-04-28 12:42:28,727] INFO Sent auto-creation request for Set(__consumer_offsets) to the active controller. (kafka.server.DefaultAutoTopicCreationManager) stuck in here > KIP-848 does not work well > -- > > Key: KAFKA-16637 > URL: https://issues.apache.org/jira/browse/KAFKA-16637 > Project: Kafka > Issue Type: Bug >Reporter: sanghyeok An >Priority: Minor > > I want to test next generation of the consumer rebalance protocol > ([https://cwiki.apache.org/confluence/display/KAFKA/The+Next+Generation+of+the+Consumer+Rebalance+Protocol+%28KIP-848%29+-+Early+Access+Release+Notes)] > > However, it does not works well. > You can check my condition. > > ### Docker-compose.yaml > [https://github.com/chickenchickenlove/kraft-test/blob/main/docker-compose/docker-compose.yaml] > > ### Consumer Code > [https://github.com/chickenchickenlove/kraft-test/blob/main/src/main/java/org/example/Main.java] > > ### Consumer logs > [main] INFO org.apache.kafka.common.telemetry.internals.KafkaMetricsCollector > - initializing Kafka metrics collector > [main] INFO org.apache.kafka.common.utils.AppInfoParser - Kafka version: 3.7.0 > [main] INFO org.apache.kafka.common.utils.AppInfoParser - Kafka commitId: > 2ae524ed625438c5 > [main] INFO org.apache.kafka.common.utils.AppInfoParser - Kafka startTimeMs: > 1714309299215 > [main] INFO org.apache.kafka.clients.consumer.internals.AsyncKafkaConsumer - > [Consumer clientId=1-1, groupId=1] Subscribed to topic(s): test-topic1 > [consumer_background_thread] INFO org.apache.kafka.clients.Metadata - > [Consumer clientId=1-1, groupId=1] Cluster ID: Some(MkU3OEVBNTcwNTJENDM2Qk) > Stuck In here... > > ### Broker logs > broker | [2024-04-28 12:42:27,751] INFO Sent auto-creation request for > Set(__consumer_offsets) to the active controller. > (kafka.server.DefaultAutoTopicCreationManager) > broker | [2024-04-28 12:42:27,801] INFO Sent auto-creation request for > Set(__consumer_offsets) to the active controller. > (kafka.server.DefaultAutoTopicCreationManager) > broker | [2024-04-28 12:42:28,211] INFO Sent auto-creation request for > Set(__consumer_offsets) to the active controller. > (kafka.server.DefaultAutoTopicCreationManager) > broker | [2024-04-28 12:42:28,259] INFO Sent auto-creation request for > Set(__consumer_offsets) to the active controller. > (kafka.server.DefaultAutoTopicCreationManager) > broker | [2024-04-28 12:42:28,727] INFO Sent auto-creation request for > Set(__consumer_offsets) to the active controller. > (kafka.server.DefaultAutoTopicCreationManager) > stuck in here -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Created] (KAFKA-16637) KIP-848 does not work well
sanghyeok An created KAFKA-16637: Summary: KIP-848 does not work well Key: KAFKA-16637 URL: https://issues.apache.org/jira/browse/KAFKA-16637 Project: Kafka Issue Type: Bug Reporter: sanghyeok An -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Updated] (KAFKA-16637) KIP-848 does not work well
[ https://issues.apache.org/jira/browse/KAFKA-16637?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] sanghyeok An updated KAFKA-16637: - Priority: Minor (was: Major) > KIP-848 does not work well > -- > > Key: KAFKA-16637 > URL: https://issues.apache.org/jira/browse/KAFKA-16637 > Project: Kafka > Issue Type: Bug >Reporter: sanghyeok An >Priority: Minor > -- This message was sent by Atlassian Jira (v8.20.10#820010)
Re: [PR] KAFKA-16452: Bound high-watermark offset to range between LLSO and LEO [kafka]
kamalcph commented on code in PR #15634: URL: https://github.com/apache/kafka/pull/15634#discussion_r1582149811 ## core/src/main/scala/kafka/log/UnifiedLog.scala: ## @@ -282,15 +282,15 @@ class UnifiedLog(@volatile var logStartOffset: Long, /** * Update high watermark with offset metadata. The new high watermark will be lower - * bounded by the log start offset and upper bounded by the log end offset. + * bounded by the local-log-start-offset and upper bounded by the log-end-offset. * * @param highWatermarkMetadata the suggested high watermark with offset metadata * @return the updated high watermark offset */ def updateHighWatermark(highWatermarkMetadata: LogOffsetMetadata): Long = { val endOffsetMetadata = localLog.logEndOffsetMetadata -val newHighWatermarkMetadata = if (highWatermarkMetadata.messageOffset < logStartOffset) { - new LogOffsetMetadata(logStartOffset) +val newHighWatermarkMetadata = if (highWatermarkMetadata.messageOffset < _localLogStartOffset) { Review Comment: Opened #15825 a draft PR with the suggested approach. PTAL. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[PR] KAFKA-16452: Don't throw OOORE when converting the offset to metadata [kafka]
kamalcph opened a new pull request, #15825: URL: https://github.com/apache/kafka/pull/15825 ### Committer Checklist (excluded from commit message) - [ ] Verify design and implementation - [ ] Verify test coverage and CI build status - [ ] Verify documentation (including upgrade notes) -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-16452: Bound high-watermark offset to range between LLSO and LEO [kafka]
kamalcph commented on code in PR #15634: URL: https://github.com/apache/kafka/pull/15634#discussion_r1582144306 ## core/src/main/scala/kafka/log/UnifiedLog.scala: ## @@ -282,15 +282,15 @@ class UnifiedLog(@volatile var logStartOffset: Long, /** * Update high watermark with offset metadata. The new high watermark will be lower - * bounded by the log start offset and upper bounded by the log end offset. + * bounded by the local-log-start-offset and upper bounded by the log-end-offset. * * @param highWatermarkMetadata the suggested high watermark with offset metadata * @return the updated high watermark offset */ def updateHighWatermark(highWatermarkMetadata: LogOffsetMetadata): Long = { val endOffsetMetadata = localLog.logEndOffsetMetadata -val newHighWatermarkMetadata = if (highWatermarkMetadata.messageOffset < logStartOffset) { - new LogOffsetMetadata(logStartOffset) +val newHighWatermarkMetadata = if (highWatermarkMetadata.messageOffset < _localLogStartOffset) { Review Comment: > For example, if we lose the local data in all replicas, the lastStableOffset could still be in the middle of a tiered segment and moving it to localLogStartOffset immediately will be incorrect. I'm not clear on this: 1. Segments that are eligible for upload to remote storage only when the `lastStableOffset` moves beyond the segment-to-be-uploaded-end-offset. 2. When all the replicas loses local data (offline partition), then we consider the data in remote storage also lost. Currently, for this case, we don't have provision to serve the remote data. 3. When `firstUnstableOffsetMetadata` is empty, we return `highWatermark`. With this patch, the `highWatermark` lower boundary is set to `localLogStartOffset` so there won't be an issue. > Note that OffsetMetadata (segmentBaseOffset and relativePositionInSegment) is only used in DelayedFetch for estimating the amount of available bytes. The [LogOffsetMetadata#onOlderSegment](https://sourcegraph.com/github.com/apache/kafka@5de5d967adffd864bad3ec729760a430253abf38/-/blob/storage/src/main/java/org/apache/kafka/storage/internals/log/LogOffsetMetadata.java?L54) method is used in the [hot-path](https://sourcegraph.com/github.com/apache/kafka@5de5d967adffd864bad3ec729760a430253abf38/-/blob/core/src/main/scala/kafka/log/UnifiedLog.scala?L324) of incrementing the high-watermark and expects the full metadata, otherwise it throws an error. Is it ok to remove the throwable from LogOffsetMetadata#onOlderSegment method and return `false` when `messageOffsetOnly` available? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-16452: Bound high-watermark offset to range between LLSO and LEO [kafka]
kamalcph commented on code in PR #15634: URL: https://github.com/apache/kafka/pull/15634#discussion_r1582144306 ## core/src/main/scala/kafka/log/UnifiedLog.scala: ## @@ -282,15 +282,15 @@ class UnifiedLog(@volatile var logStartOffset: Long, /** * Update high watermark with offset metadata. The new high watermark will be lower - * bounded by the log start offset and upper bounded by the log end offset. + * bounded by the local-log-start-offset and upper bounded by the log-end-offset. * * @param highWatermarkMetadata the suggested high watermark with offset metadata * @return the updated high watermark offset */ def updateHighWatermark(highWatermarkMetadata: LogOffsetMetadata): Long = { val endOffsetMetadata = localLog.logEndOffsetMetadata -val newHighWatermarkMetadata = if (highWatermarkMetadata.messageOffset < logStartOffset) { - new LogOffsetMetadata(logStartOffset) +val newHighWatermarkMetadata = if (highWatermarkMetadata.messageOffset < _localLogStartOffset) { Review Comment: > For example, if we lose the local data in all replicas, the lastStableOffset could still be in the middle of a tiered segment and moving it to localLogStartOffset immediately will be incorrect. I'm not clear on this: 1. Segments that are eligible for upload to remote storage only when the `lastStableOffset` moves beyond the segment-to-be-uploaded-end-offset. 2. When all the replicas loses local data (offline partition), then we consider the data in remote storage also lost. Currently, for this case, we don't have provision to serve the remote data. 3. When `firstUnstableOffsetMetadata` is empty, we return `highWatermark`. With this patch, the `highWatermark` lower boundary is set to `localLogStartOffset` so there won't be an issue. > Note that OffsetMetadata (segmentBaseOffset and relativePositionInSegment) is only used in DelayedFetch for estimating the amount of available bytes. The [LogOffsetMetadata#onOlderSegment](https://sourcegraph.com/github.com/apache/kafka@5de5d967adffd864bad3ec729760a430253abf38/-/blob/storage/src/main/java/org/apache/kafka/storage/internals/log/LogOffsetMetadata.java?L54) method is used in the [hot-path](https://sourcegraph.com/github.com/apache/kafka@5de5d967adffd864bad3ec729760a430253abf38/-/blob/core/src/main/scala/kafka/log/UnifiedLog.scala?L324) of incrementing the high-watermark and expects the full metadata, otherwise it throws an error. Is it ok to remove the throwable from LogOffsetMetadata#onOlderSegment method and return `false` by default? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-16452: Bound high-watermark offset to range between LLSO and LEO [kafka]
kamalcph commented on code in PR #15634: URL: https://github.com/apache/kafka/pull/15634#discussion_r1582144306 ## core/src/main/scala/kafka/log/UnifiedLog.scala: ## @@ -282,15 +282,15 @@ class UnifiedLog(@volatile var logStartOffset: Long, /** * Update high watermark with offset metadata. The new high watermark will be lower - * bounded by the log start offset and upper bounded by the log end offset. + * bounded by the local-log-start-offset and upper bounded by the log-end-offset. * * @param highWatermarkMetadata the suggested high watermark with offset metadata * @return the updated high watermark offset */ def updateHighWatermark(highWatermarkMetadata: LogOffsetMetadata): Long = { val endOffsetMetadata = localLog.logEndOffsetMetadata -val newHighWatermarkMetadata = if (highWatermarkMetadata.messageOffset < logStartOffset) { - new LogOffsetMetadata(logStartOffset) +val newHighWatermarkMetadata = if (highWatermarkMetadata.messageOffset < _localLogStartOffset) { Review Comment: > For example, if we lose the local data in all replicas, the lastStableOffset could still be in the middle of a tiered segment and moving it to localLogStartOffset immediately will be incorrect. I'm not clear on this: 1. Segments that are eligible for upload to remote storage only when the `lastStableOffset` moves beyond the segment-to-be-uploaded-end-offset. 2. When all the replicas loses local data (offline partition), then we consider the data in remote storage also lost. Currently, for this case, we don't have provision to serve the remote data. 3. When `firstUnstableOffsetMetadata` is empty, we return `highWatermark`. With this patch, the `highWatermark` lower boundary is set to `localLogStartOffset` so there won't be an issue. > Note that OffsetMetadata (segmentBaseOffset and relativePositionInSegment) is only used in DelayedFetch for estimating the amount of available bytes. The [LogOffsetMetadata#onOlderSegment](https://sourcegraph.com/github.com/apache/kafka@5de5d967adffd864bad3ec729760a430253abf38/-/blob/storage/src/main/java/org/apache/kafka/storage/internals/log/LogOffsetMetadata.java?L54) method is used in the [hot-path](https://sourcegraph.com/github.com/apache/kafka@5de5d967adffd864bad3ec729760a430253abf38/-/blob/core/src/main/scala/kafka/log/UnifiedLog.scala?L324) of incrementing the high-watermark and expects the full metadata, otherwise it throws an error. Is it ok to remove the throwable from LogOffsetMetadata#onOlderSegment method and return `false` by default. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-15853: Move metrics configs out of KafkaConfig [kafka]
OmniaGM commented on code in PR #15822: URL: https://github.com/apache/kafka/pull/15822#discussion_r1582144106 ## clients/src/main/java/org/apache/kafka/clients/CommonClientConfigs.java: ## @@ -77,7 +78,7 @@ public class CommonClientConfigs { public static final String CLIENT_RACK_CONFIG = "client.rack"; public static final String CLIENT_RACK_DOC = "A rack identifier for this client. This can be any string value which indicates where this client is physically located. It corresponds with the broker config 'broker.rack'"; -public static final String DEFAULT_CLIENT_RACK = ""; +public static final String CLIENT_RACK_DEFAULT = ""; Review Comment: had a second thought about this and instead of raising a KIP I decided to just move all metric configs in one place for now in `MetricConfigs`. As I want to keep KAFKA-15853 as simple as possible as it already huge effort to move these config out of core. I will create a cleanup Jiras to align the naming convention for some of these config files. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Assigned] (KAFKA-15156) Update cipherInformation correctly in DefaultChannelMetadataRegistry
[ https://issues.apache.org/jira/browse/KAFKA-15156?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Walter Hernandez reassigned KAFKA-15156: Assignee: (was: Walter Hernandez) > Update cipherInformation correctly in DefaultChannelMetadataRegistry > > > Key: KAFKA-15156 > URL: https://issues.apache.org/jira/browse/KAFKA-15156 > Project: Kafka > Issue Type: Bug >Reporter: Divij Vaidya >Priority: Minor > Labels: newbie > > At > [https://github.com/apache/kafka/blob/4a61b48d3dca81e28b57f10af6052f36c50a05e3/clients/src/main/java/org/apache/kafka/common/network/DefaultChannelMetadataRegistry.java#L25] > > we do not end up assigning the new value of cipherInformation to the member > variable. > The code over here, should be the following so that we can update the cipher > information. > {noformat} > if (cipherInformation == null) { > throw Illegal exception. > } > this.cipherInformation = cipherInformation{noformat} > > > While this class is only used in tests, we should still fix this. It's a > minor bug. -- This message was sent by Atlassian Jira (v8.20.10#820010)
[PR] KAFKA-16627: Remove ClusterConfig parameter in BeforeEach and AfterEach [kafka]
brandboat opened a new pull request, #15824: URL: https://github.com/apache/kafka/pull/15824 related to [KAFKA-16627](https://issues.apache.org/jira/browse/KAFKA-16627) In the past we modify configs like server broker properties by modifying the ClusterConfig reference passed to BeforeEach and AfterEach based on the requirements of the tests. While after [KAFKA-16560](https://issues.apache.org/jira/browse/KAFKA-16560), the ClusterConfig become immutable, modify the ClusterConfig reference no longer reflects any changes to the test cluster. Then pass ClusterConfig to BeforeEach and AfterEach become redundant. This pr directly remove the behavior that pass `ClusterConfig` to all test methods and before/after each method. ### Committer Checklist (excluded from commit message) - [ ] Verify design and implementation - [ ] Verify test coverage and CI build status - [ ] Verify documentation (including upgrade notes) -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] MINOR: Remove unused parameters in KafkaConfig [kafka]
johnnychhsu commented on PR #15788: URL: https://github.com/apache/kafka/pull/15788#issuecomment-2081407268 thanks @chia7712! just rebased and fixed the conflicts. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-16399: Add JBOD support in tiered storage [kafka]
kamalcph commented on code in PR #15690: URL: https://github.com/apache/kafka/pull/15690#discussion_r1582059485 ## core/src/main/java/kafka/server/TierStateMachine.java: ## @@ -54,5 +180,109 @@ PartitionFetchState start(TopicPartition topicPartition, * @return the new PartitionFetchState if the tier state machine was advanced, otherwise, return the currentFetchState */ Optional maybeAdvanceState(TopicPartition topicPartition, -PartitionFetchState currentFetchState); +PartitionFetchState currentFetchState) { +// This is currently a no-op but will be used for implementing async tiering logic in KAFKA-13560. +return Optional.of(currentFetchState); +} + +/** + * It tries to build the required state for this partition from leader and remote storage so that it can start + * fetching records from the leader. The return value is the next offset to fetch from the leader, which is the + * next offset following the end offset of the remote log portion. + */ +private Long buildRemoteLogAuxState(TopicPartition topicPartition, +Integer currentLeaderEpoch, +Long leaderLocalLogStartOffset, +Integer epochForLeaderLocalLogStartOffset, +Long leaderLogStartOffset, +UnifiedLog unifiedLog) throws IOException, RemoteStorageException { + +long nextOffset; + +if (unifiedLog.remoteStorageSystemEnable() && unifiedLog.config().remoteStorageEnable()) { +if (replicaMgr.remoteLogManager().isEmpty()) throw new IllegalStateException("RemoteLogManager is not yet instantiated"); + +RemoteLogManager rlm = replicaMgr.remoteLogManager().get(); + +// Find the respective leader epoch for (leaderLocalLogStartOffset - 1). We need to build the leader epoch cache +// until that offset +long previousOffsetToLeaderLocalLogStartOffset = leaderLocalLogStartOffset - 1; +int targetEpoch; +// If the existing epoch is 0, no need to fetch from earlier epoch as the desired offset(leaderLogStartOffset - 1) +// will have the same epoch. +if (epochForLeaderLocalLogStartOffset == 0) { +targetEpoch = epochForLeaderLocalLogStartOffset; +} else { Review Comment: I would suggest to take the refactoring in the next/separate PR to review this PR effectively. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] MINOR: Replaced Utils.join() with String.join() [kafka]
chiacyu commented on code in PR #15823: URL: https://github.com/apache/kafka/pull/15823#discussion_r1582059240 ## clients/src/main/java/org/apache/kafka/clients/consumer/internals/AsyncKafkaConsumer.java: ## @@ -1491,7 +1490,7 @@ public void assign(Collection partitions) { // See the ApplicationEventProcessor.process() method that handles this event for more detail. applicationEventHandler.add(new AssignmentChangeEvent(subscriptions.allConsumed(), time.milliseconds())); -log.info("Assigned to partition(s): {}", join(partitions, ", ")); +log.info("Assigned to partition(s): {}", String.join(", ", Arrays.toString(partitions.toArray(; Review Comment: Got it, thanks for the advice! -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-16399: Add JBOD support in tiered storage [kafka]
kamalcph commented on code in PR #15690: URL: https://github.com/apache/kafka/pull/15690#discussion_r1582055490 ## storage/src/test/java/org/apache/kafka/tiered/storage/integration/TransactionsWithTieredStoreTest.java: ## @@ -74,7 +74,7 @@ public Properties topicConfig() { public void maybeWaitForAtLeastOneSegmentUpload(scala.collection.Seq topicPartitions) { JavaConverters.seqAsJavaList(topicPartitions).forEach(topicPartition -> { List localStorages = JavaConverters.bufferAsJavaList(brokers()).stream() -.map(b -> new BrokerLocalStorage(b.config().brokerId(), b.config().logDirs().head(), STORAGE_WAIT_TIMEOUT_SEC)) +.map(b -> new BrokerLocalStorage(b.config().brokerId(), JavaConverters.asJava(b.config().logDirs().toSet()), STORAGE_WAIT_TIMEOUT_SEC)) Review Comment: ditto ## storage/src/test/java/org/apache/kafka/tiered/storage/utils/BrokerLocalStorage.java: ## @@ -31,31 +31,36 @@ import java.util.List; import java.util.Objects; import java.util.Optional; +import java.util.Set; import java.util.concurrent.TimeUnit; import java.util.function.Function; import java.util.stream.Collectors; public final class BrokerLocalStorage { private final Integer brokerId; -private final File brokerStorageDirectory; +private final Set brokerStorageDirectorys; Review Comment: nit: `brokerStorageDirectorys` -> `brokerStorageDirectories` ## core/src/test/scala/unit/kafka/server/ReplicaManagerTest.scala: ## @@ -3287,11 +3287,9 @@ class ReplicaManagerTest { val path1 = TestUtils.tempRelativeDir("data").getAbsolutePath val path2 = TestUtils.tempRelativeDir("data2").getAbsolutePath if (enableRemoteStorage) { Review Comment: nit: do we need this `if` check? ## storage/src/test/java/org/apache/kafka/tiered/storage/utils/BrokerLocalStorage.java: ## @@ -141,7 +146,11 @@ private boolean isOffsetPresentInFirstLocalSegment(TopicPartition topicPartition if (offsetToSearch.equals(firstLogFileBaseOffset)) { return true; } -File partitionDir = new File(brokerStorageDirectory.getAbsolutePath(), topicPartition.toString()); +File partitionDir = brokerStorageDirectorys.stream() +.filter(dir -> dirContainsTopicPartition(topicPartition, dir)) +.findFirst() +.orElseThrow(() -> new IllegalArgumentException(String.format("[BrokerId=%d] Directory for the topic-partition %s " + +"was not found", brokerId, topicPartition))); Review Comment: previously, we were returning the `partitionDir` instead of `logDir`: ```suggestion File logDir = brokerStorageDirectorys.stream() .filter(dir -> dirContainsTopicPartition(topicPartition, dir)) .findFirst() .orElseThrow(() -> new IllegalArgumentException(String.format("[BrokerId=%d] Directory for the topic-partition %s " + "was not found", brokerId, topicPartition))); File partitionDir = new File(logDir.getAbsolutePath(), topicPartition.toString()); ``` ## storage/src/test/java/org/apache/kafka/tiered/storage/utils/BrokerLocalStorage.java: ## @@ -31,31 +31,36 @@ import java.util.List; import java.util.Objects; import java.util.Optional; +import java.util.Set; import java.util.concurrent.TimeUnit; import java.util.function.Function; import java.util.stream.Collectors; public final class BrokerLocalStorage { private final Integer brokerId; -private final File brokerStorageDirectory; +private final Set brokerStorageDirectorys; private final Integer storageWaitTimeoutSec; private final int storagePollPeriodSec = 1; private final Time time = Time.SYSTEM; public BrokerLocalStorage(Integer brokerId, - String storageDirname, + Set storageDirnames, Integer storageWaitTimeoutSec) { this.brokerId = brokerId; -this.brokerStorageDirectory = new File(storageDirname); +this.brokerStorageDirectorys = storageDirnames.stream().map(File::new).collect(Collectors.toSet()); this.storageWaitTimeoutSec = storageWaitTimeoutSec; } public Integer getBrokerId() { return brokerId; } +public Set getBrokerStorageDirectory() { Review Comment: rename `getBrokerStorageDirectory` -> `getBrokerStorageDirectories` ## storage/src/test/java/org/apache/kafka/tiered/storage/TieredStorageTestBuilder.java: ## @@ -313,6 +314,14 @@ public TieredStorageTestBuilder reassignReplica(String topic, return this; } +public TieredStorageTestBuilder alterLogDir(String topic, +Integer partition, Review Comment: nit: parameter alignment
Re: [PR] MINOR: Replaced Utils.join() with String.join() [kafka]
chia7712 commented on code in PR #15823: URL: https://github.com/apache/kafka/pull/15823#discussion_r1582049275 ## clients/src/main/java/org/apache/kafka/clients/consumer/internals/AsyncKafkaConsumer.java: ## @@ -1491,7 +1490,7 @@ public void assign(Collection partitions) { // See the ApplicationEventProcessor.process() method that handles this event for more detail. applicationEventHandler.add(new AssignmentChangeEvent(subscriptions.allConsumed(), time.milliseconds())); -log.info("Assigned to partition(s): {}", join(partitions, ", ")); +log.info("Assigned to partition(s): {}", String.join(", ", Arrays.toString(partitions.toArray(; Review Comment: how about `partitions.stream().map(TopicPartition::toString).collect(Collectors.joining(","))`? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Commented] (KAFKA-15897) Flaky Test: testWrongIncarnationId() – kafka.server.ControllerRegistrationManagerTest
[ https://issues.apache.org/jira/browse/KAFKA-15897?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17841597#comment-17841597 ] Chia-Ping Tsai commented on KAFKA-15897: It seems the root cause is that `context.mockChannelManager.poll()` ([https://github.com/apache/kafka/blob/trunk/core/src/test/scala/unit/kafka/server/ControllerRegistrationManagerTest.scala#L221)] is executed after event queue thread adds the `ControllerRegistrationRequest` ([https://github.com/apache/kafka/blob/trunk/core/src/main/scala/kafka/server/ControllerRegistrationManager.scala#L233).] The request is consumed without no response as we don't set response ([https://github.com/apache/kafka/blob/trunk/core/src/test/scala/unit/kafka/server/ControllerRegistrationManagerTest.scala#L226)]. Hence, the following test ([https://github.com/apache/kafka/blob/trunk/core/src/test/scala/unit/kafka/server/ControllerRegistrationManagerTest.scala#L230)] gets failed as the request is gone. I feel the first poll ([https://github.com/apache/kafka/blob/trunk/core/src/test/scala/unit/kafka/server/ControllerRegistrationManagerTest.scala#L221)] is unnecessary since the check is waiting for event queue thread to send controller registration request. That does NOT need the poll, and removing the poll can prevent above race condition. > Flaky Test: testWrongIncarnationId() – > kafka.server.ControllerRegistrationManagerTest > - > > Key: KAFKA-15897 > URL: https://issues.apache.org/jira/browse/KAFKA-15897 > Project: Kafka > Issue Type: Test >Reporter: Apoorv Mittal >Assignee: Chia-Ping Tsai >Priority: Major > Labels: flaky-test > > Build run: > https://ci-builds.apache.org/blue/organizations/jenkins/Kafka%2Fkafka-pr/detail/PR-14699/21/tests/ > > {code:java} > org.opentest4j.AssertionFailedError: expected: <(false,1,0)> but was: > <(true,0,0)>Stacktraceorg.opentest4j.AssertionFailedError: expected: > <(false,1,0)> but was: <(true,0,0)>at > app//org.junit.jupiter.api.AssertionFailureBuilder.build(AssertionFailureBuilder.java:151) >at > app//org.junit.jupiter.api.AssertionFailureBuilder.buildAndThrow(AssertionFailureBuilder.java:132) >at > app//org.junit.jupiter.api.AssertEquals.failNotEqual(AssertEquals.java:197) > at > app//org.junit.jupiter.api.AssertEquals.assertEquals(AssertEquals.java:182) > at > app//org.junit.jupiter.api.AssertEquals.assertEquals(AssertEquals.java:177) > at app//org.junit.jupiter.api.Assertions.assertEquals(Assertions.java:1141) > at > app//kafka.server.ControllerRegistrationManagerTest.$anonfun$testWrongIncarnationId$3(ControllerRegistrationManagerTest.scala:228) >at > app//org.apache.kafka.test.TestUtils.retryOnExceptionWithTimeout(TestUtils.java:379) > at > app//kafka.server.ControllerRegistrationManagerTest.testWrongIncarnationId(ControllerRegistrationManagerTest.scala:226) > at > java.base@17.0.7/jdk.internal.reflect.NativeMethodAccessorImpl.invoke0(Native > Method)at > java.base@17.0.7/jdk.internal.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:77) > at > java.base@17.0.7/jdk.internal.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) > at java.base@17.0.7/java.lang.reflect.Method.invoke(Method.java:568) > at > app//org.junit.platform.commons.util.ReflectionUtils.invokeMethod(ReflectionUtils.java:728) > at > app//org.junit.jupiter.engine.execution.MethodInvocation.proceed(MethodInvocation.java:60) >at > app//org.junit.jupiter.engine.execution.InvocationInterceptorChain$ValidatingInvocation.proceed(InvocationInterceptorChain.java:131) > at > app//org.junit.jupiter.engine.extension.SameThreadTimeoutInvocation.proceed(SameThreadTimeoutInvocation.java:45) > at > app//org.junit.jupiter.engine.extension.TimeoutExtension.intercept(TimeoutExtension.java:156) > at > app//org.junit.jupiter.engine.extension.TimeoutExtension.interceptTestableMethod(TimeoutExtension.java:147) > at > app//org.junit.jupiter.engine.extension.TimeoutExtension.interceptTestMethod(TimeoutExtension.java:86) >at > app//org.junit.jupiter.engine.execution.InterceptingExecutableInvoker$ReflectiveInterceptorCall.lambda$ofVoidMethod$0(InterceptingExecutableInvoker.java:103) > at > app//org.junit.jupiter.engine.execution.InterceptingExecutableInvoker.lambda$invoke$0(InterceptingExecutableInvoker.java:93) > at > app//org.junit.jupiter.engine.execution.InvocationInterceptorChain$InterceptedInvocation.proceed(InvocationInterceptorChain.java:106) > at > app//org.junit.jupiter.engine.execution.InvocationInterceptorChain.proceed(InvocationInterceptorChain.java:64) >at > app//org.junit.jupiter.e