Re: [PR] KAFKA-16593: Rewrite DeleteConsumerGroupsTest by ClusterTestExtensions [kafka]

2024-04-28 Thread via GitHub


chia7712 commented on code in PR #15766:
URL: https://github.com/apache/kafka/pull/15766#discussion_r1582583036


##
tools/src/test/java/org/apache/kafka/tools/consumer/group/ConsumerGroupExecutor.java:
##
@@ -0,0 +1,189 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.kafka.tools.consumer.group;
+
+import org.apache.kafka.clients.consumer.GroupProtocol;
+import org.apache.kafka.clients.consumer.KafkaConsumer;
+import org.apache.kafka.clients.consumer.RangeAssignor;
+import org.apache.kafka.common.errors.InterruptException;
+import org.apache.kafka.common.errors.WakeupException;
+import org.apache.kafka.common.serialization.StringDeserializer;
+import org.apache.kafka.common.utils.Utils;
+
+import java.time.Duration;
+import java.util.HashMap;
+import java.util.LinkedList;
+import java.util.List;
+import java.util.Map;
+import java.util.Objects;
+import java.util.Optional;
+import java.util.Queue;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.stream.Collectors;
+import java.util.stream.IntStream;
+
+import static java.util.Collections.singleton;
+import static 
org.apache.kafka.clients.consumer.ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG;
+import static org.apache.kafka.clients.consumer.ConsumerConfig.GROUP_ID_CONFIG;
+import static 
org.apache.kafka.clients.consumer.ConsumerConfig.GROUP_PROTOCOL_CONFIG;
+import static 
org.apache.kafka.clients.consumer.ConsumerConfig.GROUP_REMOTE_ASSIGNOR_CONFIG;
+import static 
org.apache.kafka.clients.consumer.ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG;
+import static 
org.apache.kafka.clients.consumer.ConsumerConfig.PARTITION_ASSIGNMENT_STRATEGY_CONFIG;
+import static 
org.apache.kafka.clients.consumer.ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG;
+import static org.apache.kafka.common.GroupType.CONSUMER;
+
+class ConsumerGroupExecutor {
+
+private ConsumerGroupExecutor() {
+}
+
+static AutoCloseable buildConsumerGroup(String brokerAddress,
+int numberOfConsumers,
+String groupId,
+String topic,
+String groupProtocol,
+Optional remoteAssignor,
+Map customConfigs,
+boolean syncCommit) {
+return buildConsumers(
+brokerAddress,
+numberOfConsumers,
+groupId,
+groupProtocol,
+topic,
+RangeAssignor.class.getName(),
+remoteAssignor,
+customConfigs,
+syncCommit
+);
+}
+
+static AutoCloseable buildClassicGroup(String brokerAddress,
+   int numberOfConsumers,
+   String groupId,
+   String topic,
+   String assignmentStrategy,
+   Map customConfigs,
+   boolean syncCommit) {
+return buildConsumers(
+brokerAddress,
+numberOfConsumers,
+groupId,
+GroupProtocol.CLASSIC.name,
+topic,
+assignmentStrategy,
+Optional.empty(),
+customConfigs,
+syncCommit
+);
+}
+
+private static AutoCloseable buildConsumers(
+String brokerAddress,
+int numberOfConsumers,
+String groupId,
+String groupProtocol,
+String topic,
+String assignmentStrategy,
+Optional remoteAssignor,
+Map customConfigs,
+boolean syncCommit
+) {
+List> allConfigs = IntStream.range(0, 
numberOfConsumers)
+.mapToObj(ignored ->
+composeConfigs(
+  

[jira] [Assigned] (KAFKA-16640) Replace TestUtils#resource by scala.util.Using

2024-04-28 Thread Chia-Ping Tsai (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-16640?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Chia-Ping Tsai reassigned KAFKA-16640:
--

Assignee: TengYao Chi  (was: Chia-Ping Tsai)

> Replace TestUtils#resource by scala.util.Using
> --
>
> Key: KAFKA-16640
> URL: https://issues.apache.org/jira/browse/KAFKA-16640
> Project: Kafka
>  Issue Type: Improvement
>Reporter: Chia-Ping Tsai
>Assignee: TengYao Chi
>Priority: Minor
>
> `scala.util.Using` is in both scala 2.13 and scala-collection-compat so we 
> don't need to have custom try-resource function.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


Re: [PR] KAFKA-16205 Allow MetadataLoader to coalesce small batches [kafka]

2024-04-28 Thread via GitHub


github-actions[bot] commented on PR #15283:
URL: https://github.com/apache/kafka/pull/15283#issuecomment-2081835478

   This PR is being marked as stale since it has not had any activity in 90 
days. If you would like to keep this PR alive, please ask a committer for 
review. If the PR has  merge conflicts, please update it with the latest from 
trunk (or appropriate release branch)  If this PR is no longer valid or 
desired, please feel free to close it. If no activity occurs in the next 30 
days, it will be automatically closed.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-16572: allow defining number of disks per broker in ClusterTest [kafka]

2024-04-28 Thread via GitHub


brandboat commented on code in PR #15745:
URL: https://github.com/apache/kafka/pull/15745#discussion_r1582482314


##
core/src/test/java/kafka/test/ClusterConfig.java:
##
@@ -160,6 +166,7 @@ public boolean equals(Object object) {
 return Objects.equals(type, clusterConfig.type)
 && Objects.equals(brokers, clusterConfig.brokers)
 && Objects.equals(controllers, clusterConfig.controllers)
+&& Objects.equals(disksPerBroker, clusterConfig.disksPerBroker)

Review Comment:
   feel free to remove them as currently they are unused 😃 



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[jira] [Commented] (KAFKA-16640) Replace TestUtils#resource by scala.util.Using

2024-04-28 Thread TengYao Chi (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-16640?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17841791#comment-17841791
 ] 

TengYao Chi commented on KAFKA-16640:
-

i am able to handle this issue 😀

> Replace TestUtils#resource by scala.util.Using
> --
>
> Key: KAFKA-16640
> URL: https://issues.apache.org/jira/browse/KAFKA-16640
> Project: Kafka
>  Issue Type: Improvement
>Reporter: Chia-Ping Tsai
>Assignee: Chia-Ping Tsai
>Priority: Minor
>
> `scala.util.Using` is in both scala 2.13 and scala-collection-compat so we 
> don't need to have custom try-resource function.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


Re: [PR] MINOR: Add replayRecords to CoordinatorResult [kafka]

2024-04-28 Thread via GitHub


dongnuo123 commented on PR #15818:
URL: https://github.com/apache/kafka/pull/15818#issuecomment-2081675669

   > @dongnuo123 Thanks for the patch. Could you also check if we have other 
cases like this one:
   > 
   > 
https://github.com/apache/kafka/blob/e7792258df934a5c8470c2925c5d164c7d5a8e6c/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/GroupMetadataManagerTestContext.java#L531
   > 
   > ? We need to ensure that we use replayRecords.
   
   This is the only case that we don't use replayRecords. 


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-16615: JoinGroup API for upgrading ConsumerGroup [kafka]

2024-04-28 Thread via GitHub


dongnuo123 commented on code in PR #15798:
URL: https://github.com/apache/kafka/pull/15798#discussion_r1582404976


##
group-coordinator/src/test/java/org/apache/kafka/coordinator/group/GroupMetadataManagerTest.java:
##
@@ -10921,6 +10855,544 @@ public void 
testLastConsumerProtocolMemberRebalanceTimeoutInConsumerGroup() {
 assertTrue(classicGroup.isInState(PREPARING_REBALANCE));
 }
 
+@Test
+public void testConsumerGroupJoinThrowsExceptionIfGroupOverMaxSize() {
+String groupId = "group-id";
+String memberId = Uuid.randomUuid().toString();
+GroupMetadataManagerTestContext context = new 
GroupMetadataManagerTestContext.Builder()
+.withConsumerGroup(new ConsumerGroupBuilder(groupId, 10)
+.withMember(new ConsumerGroupMember.Builder(memberId)
+.setState(MemberState.STABLE)
+.setMemberEpoch(10)
+.setPreviousMemberEpoch(10)
+.build()))
+.withConsumerGroupMaxSize(1)
+.build();
+
+JoinGroupRequestData request = new 
GroupMetadataManagerTestContext.JoinGroupRequestBuilder()
+.withGroupId(groupId)
+.withMemberId(UNKNOWN_MEMBER_ID)
+.withDefaultProtocolTypeAndProtocols()
+.build();
+
+Exception ex = assertThrows(GroupMaxSizeReachedException.class, () -> 
context.sendClassicGroupJoin(request));
+assertEquals("The consumer group has reached its maximum capacity of 1 
members.", ex.getMessage());
+}
+
+@Test
+public void testConsumerGroupJoinInvalidSessionTimeout() throws Exception {
+int minSessionTimeout = 50;
+int maxSessionTimeout = 100;
+String groupId = "group-id";
+
+GroupMetadataManagerTestContext context = new 
GroupMetadataManagerTestContext.Builder()
+.withClassicGroupMinSessionTimeoutMs(minSessionTimeout)
+.withClassicGroupMaxSessionTimeoutMs(maxSessionTimeout)
+.withConsumerGroup(new ConsumerGroupBuilder(groupId, 10))
+.build();
+
+JoinGroupRequestData requestWithSmallSessionTimeout = new 
GroupMetadataManagerTestContext.JoinGroupRequestBuilder()
+.withGroupId(groupId)
+.withMemberId(UNKNOWN_MEMBER_ID)
+.withSessionTimeoutMs(minSessionTimeout - 1)
+.build();
+assertThrows(InvalidSessionTimeoutException.class, () -> 
context.sendClassicGroupJoin(requestWithSmallSessionTimeout));
+
+JoinGroupRequestData requestWithLargeSessionTimeout = new 
GroupMetadataManagerTestContext.JoinGroupRequestBuilder()
+.withGroupId(groupId)
+.withMemberId(UNKNOWN_MEMBER_ID)
+.withSessionTimeoutMs(maxSessionTimeout + 1)
+.build();
+assertThrows(InvalidSessionTimeoutException.class, () -> 
context.sendClassicGroupJoin(requestWithLargeSessionTimeout));
+}
+
+@Test
+public void testConsumerGroupJoinThrowsExceptionIfProtocolIsNotSupported() 
{
+String groupId = "group-id";
+String memberId = Uuid.randomUuid().toString();
+GroupMetadataManagerTestContext context = new 
GroupMetadataManagerTestContext.Builder()
+.withConsumerGroup(new ConsumerGroupBuilder(groupId, 10)
+.withMember(new ConsumerGroupMember.Builder(memberId)
+.setState(MemberState.STABLE)
+.setMemberEpoch(10)
+.setPreviousMemberEpoch(10)
+
.setSupportedClassicProtocols(GroupMetadataManagerTestContext.toProtocols("roundrobin"))
+.build()))
+.build();
+
+JoinGroupRequestData requestWithEmptyProtocols = new 
GroupMetadataManagerTestContext.JoinGroupRequestBuilder()
+.withGroupId(groupId)
+.withMemberId(UNKNOWN_MEMBER_ID)
+.withProtocolType(ConsumerProtocol.PROTOCOL_TYPE)
+.withDefaultProtocolTypeAndProtocols()
+.build();
+assertThrows(InconsistentGroupProtocolException.class, () -> 
context.sendClassicGroupJoin(requestWithEmptyProtocols));
+
+JoinGroupRequestData requestWithInvalidProtocolType = new 
GroupMetadataManagerTestContext.JoinGroupRequestBuilder()
+.withGroupId(groupId)
+.withMemberId(UNKNOWN_MEMBER_ID)
+.withProtocolType("connect")
+.withDefaultProtocolTypeAndProtocols()
+.build();
+assertThrows(InconsistentGroupProtocolException.class, () -> 
context.sendClassicGroupJoin(requestWithInvalidProtocolType));
+}
+
+@Test
+public void testConsumerGroupJoinWithNewDynamicMember() throws Exception {
+String groupId = "group-id";
+String memberId = Uuid.randomUuid().toString();
+Uuid fooTopicId = Uuid.randomUuid();
+String fooTopicName = "foo";
+Uuid barTopicId = Uuid.randomUuid();
+String barTopicName = "bar";
+
+  

Re: [PR] KAFKA-16615: JoinGroup API for upgrading ConsumerGroup [kafka]

2024-04-28 Thread via GitHub


dongnuo123 commented on code in PR #15798:
URL: https://github.com/apache/kafka/pull/15798#discussion_r1582403702


##
group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupMetadataManager.java:
##
@@ -1413,6 +1506,243 @@ private 
CoordinatorResult consumerGr
 return new CoordinatorResult<>(records, response);
 }
 
+/**
+ * Handle a JoinGroupRequest to a ConsumerGroup.
+ *
+ * @param group  The group to join.
+ * @param contextThe request context.
+ * @param requestThe actual JoinGroup request.
+ * @param responseFuture The join group response future.
+ *
+ * @return The result that contains records to append if the join group 
phase completes.
+ */
+private CoordinatorResult classicGroupJoinToConsumerGroup(
+ConsumerGroup group,
+RequestContext context,
+JoinGroupRequestData request,
+CompletableFuture responseFuture
+) throws ApiException {
+final long currentTimeMs = time.milliseconds();
+final List records = new ArrayList<>();
+final String groupId = request.groupId();
+String memberId = request.memberId();
+final String instanceId = request.groupInstanceId();
+final JoinGroupRequestProtocolCollection protocols = 
request.protocols();
+final boolean isUnknownMember = memberId.equals(UNKNOWN_MEMBER_ID);
+
+throwIfConsumerGroupIsFull(group, memberId);
+throwIfClassicProtocolIsNotSupported(group, memberId, 
request.protocolType(), protocols);
+// TODO: need to throw an exception if group is dead?

Review Comment:
   Do we ever transition a consumer group to DEAD? It's always confusing when 
it comes to type check in the new group coordinator.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-16615: JoinGroup API for upgrading ConsumerGroup [kafka]

2024-04-28 Thread via GitHub


dongnuo123 commented on code in PR #15798:
URL: https://github.com/apache/kafka/pull/15798#discussion_r1582397694


##
group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupMetadataManager.java:
##
@@ -1169,6 +1175,107 @@ private void 
throwIfStaticMemberIsUnknown(ConsumerGroupMember staticMember, Stri
 }
 }
 
+/**
+ * Validates if the received classic member protocols are supported by the 
group.
+ *
+ * @param group The ConsumerGroup.
+ * @param memberId  The joining member id.
+ * @param protocolType  The joining member protocol type.
+ * @param protocols The joining member protocol collection.
+ */
+private void throwIfClassicProtocolIsNotSupported(
+ConsumerGroup group,
+String memberId,
+String protocolType,
+JoinGroupRequestProtocolCollection protocols
+) {
+if (!group.supportsClassicProtocols(protocolType, 
ClassicGroupMember.plainProtocolSet(protocols))) {

Review Comment:
   Yeah I was also thinking about this. The last commit just made a change in 
`supportsClassicProtocols` which requires the protocolType to be 
`ConsumerProtocol.PROTOCOL_TYPE` for both empty and non-empty groups.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-16615: JoinGroup API for upgrading ConsumerGroup [kafka]

2024-04-28 Thread via GitHub


dongnuo123 commented on code in PR #15798:
URL: https://github.com/apache/kafka/pull/15798#discussion_r1582397540


##
group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupMetadataManager.java:
##
@@ -1413,6 +1506,243 @@ private 
CoordinatorResult consumerGr
 return new CoordinatorResult<>(records, response);
 }
 
+/**
+ * Handle a JoinGroupRequest to a ConsumerGroup.
+ *
+ * @param group  The group to join.
+ * @param contextThe request context.
+ * @param requestThe actual JoinGroup request.
+ * @param responseFuture The join group response future.
+ *
+ * @return The result that contains records to append if the join group 
phase completes.
+ */
+private CoordinatorResult classicGroupJoinToConsumerGroup(
+ConsumerGroup group,
+RequestContext context,
+JoinGroupRequestData request,
+CompletableFuture responseFuture
+) throws ApiException {
+final long currentTimeMs = time.milliseconds();
+final List records = new ArrayList<>();
+final String groupId = request.groupId();
+String memberId = request.memberId();
+final String instanceId = request.groupInstanceId();
+final JoinGroupRequestProtocolCollection protocols = 
request.protocols();
+final boolean isUnknownMember = memberId.equals(UNKNOWN_MEMBER_ID);
+
+throwIfConsumerGroupIsFull(group, memberId);
+throwIfClassicProtocolIsNotSupported(group, memberId, 
request.protocolType(), protocols);
+// TODO: need to throw an exception if group is dead?
+
+// Get or create the member.
+if (isUnknownMember) memberId = Uuid.randomUuid().toString();
+ConsumerGroupMember member;
+ConsumerGroupMember.Builder updatedMemberBuilder;
+boolean staticMemberReplaced = false;
+boolean newMemberCreated = false;
+if (instanceId == null) {
+// A dynamic member (re-)joins.
+throwIfRequiresKnownMemberId(groupId, memberId, isUnknownMember, 
context);
+newMemberCreated = !group.hasMember(memberId);
+member = group.getOrMaybeCreateMember(memberId, true);
+log.info("[GroupId {}] Member {} joins the consumer group.", 
groupId, memberId);
+updatedMemberBuilder = new ConsumerGroupMember.Builder(member);
+} else {
+member = group.staticMember(instanceId);
+// A new static member joins or the existing static member rejoins.
+if (isUnknownMember) {
+newMemberCreated = true;
+if (member == null) {
+// New static member.
+member = group.getOrMaybeCreateMember(memberId, true);
+updatedMemberBuilder = new 
ConsumerGroupMember.Builder(member);
+log.info("[GroupId {}] Static member {} with instance id 
{} joins the consumer group.", groupId, memberId, instanceId);
+} else {
+// Replace the current static member.
+staticMemberReplaced = true;
+updatedMemberBuilder = new 
ConsumerGroupMember.Builder(memberId)
+.setAssignedPartitions(member.assignedPartitions());
+removeMember(records, groupId, member.memberId());
+log.info("[GroupId {}] Static member with unknown member 
id and instance id {} re-joins the consumer group. " +
+"Created a new member {} to replace the existing 
member {}.", groupId, instanceId, memberId, member.memberId());
+}
+} else {
+// Rejoining static member. Fence the static group with 
unmatched member id.
+throwIfStaticMemberIsUnknown(member, instanceId);
+throwIfInstanceIdIsFenced(member, groupId, memberId, 
instanceId);
+updatedMemberBuilder = new ConsumerGroupMember.Builder(member);
+log.info("[GroupId {}] Static member {} with instance id {} 
re-joins the consumer group.", groupId, memberId, instanceId);
+}
+}
+
+int groupEpoch = group.groupEpoch();
+Map subscriptionMetadata = 
group.subscriptionMetadata();
+final ConsumerPartitionAssignor.Subscription subscription = 
deserializeSubscription(protocols);
+final List 
ownedTopicPartitions =
+validateGenerationIdAndGetOwnedPartition(member, subscription);
+
+// 1. Create or update the member. If the member is new or has 
changed, a ConsumerGroupMemberMetadataValue
+// record is written to the __consumer_offsets partition to persist 
the change. If the subscriptions have
+// changed, the subscription metadata is updated and persisted by 
writing a ConsumerGroupPartitionMetadataValue
+// record to the __consumer_offsets partition. Finally, the group 
epoch is

Re: [PR] KAFKA-16615: JoinGroup API for upgrading ConsumerGroup [kafka]

2024-04-28 Thread via GitHub


dongnuo123 commented on code in PR #15798:
URL: https://github.com/apache/kafka/pull/15798#discussion_r1582397180


##
group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupCoordinatorService.java:
##
@@ -1122,4 +1122,27 @@ private  OUT handleOperationException(
 return handler.apply(apiError.error(), apiError.message());
 }
 }
+
+/**
+ * Creates the JoinGroupResponseData according to the error type.
+ *
+ * @param memberId  The member id.
+ * @param error The error.
+ * @return The JoinGroupResponseData.
+ */
+private static JoinGroupResponseData createJoinGroupResponseData(
+String memberId,
+Errors error
+) {
+switch (error) {
+case MEMBER_ID_REQUIRED:
+case INVALID_SESSION_TIMEOUT:
+return new JoinGroupResponseData()
+.setMemberId(memberId)

Review Comment:
   Seems that `INVALID_GROUP_ID` also sets the member id. I don't think we use 
the member id here either. Should we remove them?
   
https://github.com/apache/kafka/blob/e7792258df934a5c8470c2925c5d164c7d5a8e6c/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupCoordinatorService.java#L325



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-16211: Inconsistent config values in CreateTopicsResult and DescribeConfigsResult [kafka]

2024-04-28 Thread via GitHub


chia7712 commented on PR #15696:
URL: https://github.com/apache/kafka/pull/15696#issuecomment-2081646079

   > or would like me to add a new test in the test class for the function?
   
   this one. Normally, the patch related to bug fix needs to offer the test to 
prove the bug does get fixed :)


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-16614:Disallow @ClusterTemplate("") [kafka]

2024-04-28 Thread via GitHub


chia7712 commented on code in PR #15800:
URL: https://github.com/apache/kafka/pull/15800#discussion_r1582294569


##
core/src/test/java/kafka/test/junit/ClusterTestExtensions.java:
##
@@ -91,9 +91,6 @@ public Stream 
provideTestTemplateInvocationContex
 ClusterTemplate clusterTemplateAnnot = 
context.getRequiredTestMethod().getDeclaredAnnotation(ClusterTemplate.class);
 if (clusterTemplateAnnot != null) {
 processClusterTemplate(context, clusterTemplateAnnot, 
generatedContexts::add);
-if (generatedContexts.isEmpty()) {
-throw new IllegalStateException("ClusterConfig generator 
method should provide at least one config");

Review Comment:
   My previous comment is wrong. This check is used to make sure there is at 
least one `ClusterConfig`, so we should not remove it. However, It seems to me 
this check should be moved to `processClusterTemplate` and we should add unit 
test for it.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] MINOR: Replaced Utils.join() with String.join() [kafka]

2024-04-28 Thread via GitHub


chia7712 commented on code in PR #15823:
URL: https://github.com/apache/kafka/pull/15823#discussion_r1582290331


##
clients/src/main/java/org/apache/kafka/clients/FetchSessionHandler.java:
##
@@ -392,14 +392,14 @@ private String 
topicPartitionsToLogString(Collection partitions)
 if (!log.isTraceEnabled()) {
 return String.format("%d partition(s)", partitions.size());
 }
-return "(" + Utils.join(partitions, ", ") + ")";
+return "(" + String.join(", ", Arrays.toString(partitions.toArray())) 
+ ")";

Review Comment:
   ditto



##
clients/src/main/java/org/apache/kafka/clients/FetchSessionHandler.java:
##
@@ -392,14 +392,14 @@ private String 
topicPartitionsToLogString(Collection partitions)
 if (!log.isTraceEnabled()) {
 return String.format("%d partition(s)", partitions.size());
 }
-return "(" + Utils.join(partitions, ", ") + ")";
+return "(" + String.join(", ", Arrays.toString(partitions.toArray())) 
+ ")";
 }
 
 private String topicIdPartitionsToLogString(Collection 
partitions) {
 if (!log.isTraceEnabled()) {
 return String.format("%d partition(s)", partitions.size());
 }
-return "(" + Utils.join(partitions, ", ") + ")";
+return "(" + String.join(", ", Arrays.toString(partitions.toArray())) 
+ ")";

Review Comment:
   ditto



##
clients/src/main/java/org/apache/kafka/common/utils/Utils.java:
##
@@ -584,27 +584,6 @@ public static String formatBytes(long bytes) {
 }
 }
 
-/**
- * Create a string representation of an array joined by the given separator
- * @param strs The array of items
- * @param separator The separator
- * @return The string representation.
- */
-public static  String join(T[] strs, String separator) {
-return join(Arrays.asList(strs), separator);
-}
-
-/**
- * Create a string representation of a collection joined by the given 
separator
- * @param collection The list of items
- * @param separator The separator
- * @return The string representation.
- */
-public static  String join(Collection collection, String separator) {
-Objects.requireNonNull(collection);
-return mkString(collection.stream(), "", "", separator);
-}
-
 /**
  * Create a string representation of a stream surrounded by `begin` and 
`end` and joined by `separator`.
  *

Review Comment:
   It seems `mkString(Stream stream, String begin, String end, String 
separator)` can be removed too since the only one use case 
(https://github.com/apache/kafka/blob/5de5d967adffd864bad3ec729760a430253abf38/tools/src/test/java/org/apache/kafka/tools/consumer/group/ConsumerGroupServiceTest.java#L199)
 does not use "begin" and "end"



##
clients/src/main/java/org/apache/kafka/clients/CommonClientConfigs.java:
##
@@ -131,7 +130,7 @@ public class CommonClientConfigs {
 
 public static final String SECURITY_PROTOCOL_CONFIG = "security.protocol";
 public static final String SECURITY_PROTOCOL_DOC = "Protocol used to 
communicate with brokers. Valid values are: " +
-Utils.join(SecurityProtocol.names(), ", ") + ".";
+String.join(", ", SecurityProtocol.names()).replace("[", 
"").replace("]", "") + ".";

Review Comment:
   please don't use `replace`. We can generate the correct message at once.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] MINOR: Clean up TestUtils.scala [kafka]

2024-04-28 Thread via GitHub


chia7712 commented on code in PR #15808:
URL: https://github.com/apache/kafka/pull/15808#discussion_r1582280178


##
core/src/test/scala/unit/kafka/utils/TestUtils.scala:
##
@@ -1124,24 +933,18 @@ object TestUtils extends Logging {
 }, msg = msg, pause = 0L, waitTimeMs = waitTimeMs)
   }
 
-  /**
-   * Wait for the presence of an optional value.
-   *
-   * @param func The function defining the optional value
-   * @param msg Error message in the case that the value never appears
-   * @param waitTimeMs Maximum time to wait
-   * @return The unwrapped value returned by the function
-   */
-  def awaitValue[T](func: () => Option[T], msg: => String, waitTimeMs: Long = 
JTestUtils.DEFAULT_MAX_WAIT_MS): T = {
-var value: Option[T] = None
-waitUntilTrue(() => {
-  value = func()
-  value.isDefined
-}, msg, waitTimeMs)
-value.get
+  def subscribeAndWaitForRecords(topic: String,

Review Comment:
   why adding this new method? it is unused.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[jira] [Created] (KAFKA-16640) Replace TestUtils#resource by scala.util.Using

2024-04-28 Thread Chia-Ping Tsai (Jira)
Chia-Ping Tsai created KAFKA-16640:
--

 Summary: Replace TestUtils#resource by scala.util.Using
 Key: KAFKA-16640
 URL: https://issues.apache.org/jira/browse/KAFKA-16640
 Project: Kafka
  Issue Type: Improvement
Reporter: Chia-Ping Tsai
Assignee: Chia-Ping Tsai


`scala.util.Using` is in both scala 2.13 and scala-collection-compat so we 
don't need to have custom try-resource function.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


Re: [PR] MINOR: Remove unused parameters in KafkaConfig [kafka]

2024-04-28 Thread via GitHub


chia7712 commented on code in PR #15788:
URL: https://github.com/apache/kafka/pull/15788#discussion_r1582265983


##
core/src/main/scala/kafka/MetadataLogConfig.scala:
##
@@ -32,13 +32,13 @@ final case class MetadataLogConfig(
 )
 
 object MetadataLogConfig {
-  def apply(config: AbstractConfig, maxBatchSizeInBytes: Int, 
maxFetchSizeInBytes: Int): MetadataLogConfig = {
+  def apply(config: KafkaConfig, maxBatchSizeInBytes: Int, 
maxFetchSizeInBytes: Int): MetadataLogConfig = {
 new MetadataLogConfig(
-  config.getInt(KRaftConfigs.METADATA_LOG_SEGMENT_BYTES_CONFIG),
-  config.getInt(KRaftConfigs.METADATA_LOG_SEGMENT_MIN_BYTES_CONFIG),
-  config.getLong(KRaftConfigs.METADATA_LOG_SEGMENT_MILLIS_CONFIG),
-  config.getLong(KRaftConfigs.METADATA_MAX_RETENTION_BYTES_CONFIG),
-  config.getLong(KRaftConfigs.METADATA_MAX_RETENTION_MILLIS_CONFIG),
+  config.metadataLogSegmentBytes,
+  config.metadataLogSegmentMinBytes,
+  config.metadataLogSegmentMillis,
+  config.metadataRetentionBytes,
+  config.metadataRetentionMillis,
   maxBatchSizeInBytes,
   maxFetchSizeInBytes,
   ServerLogConfigs.LOG_DELETE_DELAY_MS_DEFAULT,

Review Comment:
   Could you change line#45 to use `config.nodeId`?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-16572: allow defining number of disks per broker in ClusterTest [kafka]

2024-04-28 Thread via GitHub


chia7712 commented on code in PR #15745:
URL: https://github.com/apache/kafka/pull/15745#discussion_r1582250475


##
core/src/test/java/kafka/test/ClusterConfig.java:
##
@@ -55,14 +56,15 @@ public class ClusterConfig {
 private final Map> 
perBrokerOverrideProperties;
 
 @SuppressWarnings("checkstyle:ParameterNumber")
-private ClusterConfig(Type type, int brokers, int controllers, String 
name, boolean autoStart,
+private ClusterConfig(Type type, int brokers, int controllers, int 
disksPerBroker, String name, boolean autoStart,
   SecurityProtocol securityProtocol, String listenerName, File 
trustStoreFile,
   MetadataVersion metadataVersion, Map 
serverProperties, Map producerProperties,
   Map consumerProperties, Map 
adminClientProperties, Map saslServerProperties,
   Map saslClientProperties, Map> perBrokerOverrideProperties) {
 this.type = Objects.requireNonNull(type);
 this.brokers = brokers;
 this.controllers = controllers;
+this.disksPerBroker = disksPerBroker;

Review Comment:
   Could you add check to make sure the value is larger than zero?



##
core/src/test/java/kafka/test/ClusterTestExtensionsTest.java:
##
@@ -121,6 +124,26 @@ public void testClusterTests() {
 }
 }
 
+@ClusterTests({
+@ClusterTest(clusterType = Type.ZK, disksPerBroker = 1),

Review Comment:
   We can't use `disksPerBroker = 1` since it is equal to the default value.



##
core/src/test/java/kafka/test/ClusterConfig.java:
##
@@ -160,6 +166,7 @@ public boolean equals(Object object) {
 return Objects.equals(type, clusterConfig.type)
 && Objects.equals(brokers, clusterConfig.brokers)
 && Objects.equals(controllers, clusterConfig.controllers)
+&& Objects.equals(disksPerBroker, clusterConfig.disksPerBroker)

Review Comment:
   not sure whether we need to keep `hashCode` and `equals`. We don't use it 
actually and it is error-prone when adding new fields. For example, this PR 
does not update `hashCode` for the new field. @brandboat WDTY?



##
core/src/test/java/kafka/test/junit/ClusterTestExtensions.java:
##
@@ -162,32 +162,39 @@ private void processClusterTest(ExtensionContext context, 
ClusterTest annot, Clu
 controllers = annot.controllers();
 }
 
-if (brokers <= 0 || controllers <= 0) {
-throw new IllegalArgumentException("Number of brokers/controllers 
must be greater than zero.");
+final int disksPerBroker;
+if (annot.disksPerBroker() == 0) {
+disksPerBroker = defaults.disksPerBroker();

Review Comment:
   Could we leverage the builder instead of creating a bunch of temporary local 
variables?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[jira] [Commented] (KAFKA-16639) AsyncKafkaConsumer#close does not send heartbeat to leave group

2024-04-28 Thread Philip Nee (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-16639?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17841678#comment-17841678
 ] 

Philip Nee commented on KAFKA-16639:


thanks for reporting. will do.

> AsyncKafkaConsumer#close does not send heartbeat to leave group
> ---
>
> Key: KAFKA-16639
> URL: https://issues.apache.org/jira/browse/KAFKA-16639
> Project: Kafka
>  Issue Type: Bug
>Reporter: Chia-Ping Tsai
>Assignee: Chia-Ping Tsai
>Priority: Major
>  Labels: kip-848-client-support
>
> This bug can be reproduced by immediately closing a consumer which is just 
> created.
> The root cause is that we skip the new heartbeat used to leave group when 
> there is a in-flight heartbeat request 
> ([https://github.com/apache/kafka/blob/5de5d967adffd864bad3ec729760a430253abf38/clients/src/main/java/org/apache/kafka/clients/consumer/internals/HeartbeatRequestManager.java#L212).]
> It seems to me the simple solution is that we create a heartbeat request when 
> meeting above situation and then send it by pollOnClose 
> ([https://github.com/apache/kafka/blob/5de5d967adffd864bad3ec729760a430253abf38/clients/src/main/java/org/apache/kafka/clients/consumer/internals/RequestManager.java#L62).]
>  



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


Re: [PR] KAFKA-16593: Rewrite DeleteConsumerGroupsTest by ClusterTestExtensions [kafka]

2024-04-28 Thread via GitHub


chia7712 commented on PR #15766:
URL: https://github.com/apache/kafka/pull/15766#issuecomment-2081560990

   I file https://issues.apache.org/jira/browse/KAFKA-16639 to trace the 
potential bug about AsyncConsumer. Please wait for that issue. If the issue is 
too hard to resolve, we can remove the test case about new consumer from this PR


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-15853: Move metrics configs out of KafkaConfig [kafka]

2024-04-28 Thread via GitHub


chia7712 commented on PR #15822:
URL: https://github.com/apache/kafka/pull/15822#issuecomment-2081560445

   ```
   [2024-04-28T12:34:14.550Z] [Error] 
/home/jenkins/workspace/Kafka_kafka-pr_PR-15822/core/src/test/scala/unit/kafka/server/KafkaConfigTest.scala:1000:26:
 value KafkaMetricsReporterClassesProp is not a member of object 
kafka.server.KafkaConfig
   
   [2024-04-28T12:34:14.550Z] [Error] 
/home/jenkins/workspace/Kafka_kafka-pr_PR-15822/core/src/test/scala/unit/kafka/server/KafkaConfigTest.scala:1001:26:
 value KafkaMetricsPollingIntervalSecondsProp is not a member of object 
kafka.server.KafkaConfig
   ```
   could you please fix the build error


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[PR] [WIP] KAFKA-16553: Add config logging for controller [kafka]

2024-04-28 Thread via GitHub


johnnychhsu opened a new pull request, #15826:
URL: https://github.com/apache/kafka/pull/15826

   *More detailed description of your change,
   if necessary. The PR title and PR message become
   the squashed commit message, so use a separate
   comment to ping reviewers.*
   
   *Summary of testing strategy (including rationale)
   for the feature or bug fix. Unit and/or integration
   tests are expected for any behaviour change and
   system tests should be considered for larger changes.*
   
   ### Committer Checklist (excluded from commit message)
   - [ ] Verify design and implementation 
   - [ ] Verify test coverage and CI build status
   - [ ] Verify documentation (including upgrade notes)
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[jira] [Commented] (KAFKA-16639) AsyncKafkaConsumer#close does not send heartbeat to leave group

2024-04-28 Thread Chia-Ping Tsai (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-16639?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17841675#comment-17841675
 ] 

Chia-Ping Tsai commented on KAFKA-16639:


[~pnee] Could you please take a look if you have free cycle? thanks!

> AsyncKafkaConsumer#close does not send heartbeat to leave group
> ---
>
> Key: KAFKA-16639
> URL: https://issues.apache.org/jira/browse/KAFKA-16639
> Project: Kafka
>  Issue Type: Bug
>Reporter: Chia-Ping Tsai
>Assignee: Chia-Ping Tsai
>Priority: Major
>  Labels: kip-848-client-support
>
> This bug can be reproduced by immediately closing a consumer which is just 
> created.
> The root cause is that we skip the new heartbeat used to leave group when 
> there is a in-flight heartbeat request 
> ([https://github.com/apache/kafka/blob/5de5d967adffd864bad3ec729760a430253abf38/clients/src/main/java/org/apache/kafka/clients/consumer/internals/HeartbeatRequestManager.java#L212).]
> It seems to me the simple solution is that we create a heartbeat request when 
> meeting above situation and then send it by pollOnClose 
> ([https://github.com/apache/kafka/blob/5de5d967adffd864bad3ec729760a430253abf38/clients/src/main/java/org/apache/kafka/clients/consumer/internals/RequestManager.java#L62).]
>  



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Created] (KAFKA-16639) AsyncKafkaConsumer#close does not send heartbeat to leave group

2024-04-28 Thread Chia-Ping Tsai (Jira)
Chia-Ping Tsai created KAFKA-16639:
--

 Summary: AsyncKafkaConsumer#close does not send heartbeat to leave 
group
 Key: KAFKA-16639
 URL: https://issues.apache.org/jira/browse/KAFKA-16639
 Project: Kafka
  Issue Type: Bug
Reporter: Chia-Ping Tsai
Assignee: Chia-Ping Tsai


This bug can be reproduced by immediately closing a consumer which is just 
created.

The root cause is that we skip the new heartbeat used to leave group when there 
is a in-flight heartbeat request 
([https://github.com/apache/kafka/blob/5de5d967adffd864bad3ec729760a430253abf38/clients/src/main/java/org/apache/kafka/clients/consumer/internals/HeartbeatRequestManager.java#L212).]

It seems to me the simple solution is that we create a heartbeat request when 
meeting above situation and then send it by pollOnClose 
([https://github.com/apache/kafka/blob/5de5d967adffd864bad3ec729760a430253abf38/clients/src/main/java/org/apache/kafka/clients/consumer/internals/RequestManager.java#L62).]

 



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Commented] (KAFKA-16514) Kafka Streams: stream.close(CloseOptions) does not respect options.leaveGroup flag.

2024-04-28 Thread Sal Sorrentino (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-16514?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17841670#comment-17841670
 ] 

Sal Sorrentino commented on KAFKA-16514:


Well, this is not entirely accurate. As mentioned in one of my first comments, 
by providing a `group.instance.id`, you can achieve the desired behavior 
without using any "internal" configs. However, I agree that it does seem broken 
that the only way to voluntarily leave a group is to be static member.

> Kafka Streams: stream.close(CloseOptions) does not respect options.leaveGroup 
> flag.
> ---
>
> Key: KAFKA-16514
> URL: https://issues.apache.org/jira/browse/KAFKA-16514
> Project: Kafka
>  Issue Type: Bug
>  Components: streams
>Affects Versions: 3.7.0
>Reporter: Sal Sorrentino
>Priority: Minor
>
> Working with Kafka Streams 3.7.0, but may affect earlier versions as well.
> When attempting to shutdown a streams application and leave the associated 
> consumer group, the supplied `leaveGroup` option seems to have no effect. 
> Sample code:
> {code:java}
> CloseOptions options = new CloseOptions().leaveGroup(true);
> stream.close(options);{code}
> The expected behavior here is that the group member would shutdown and leave 
> the group, immediately triggering a consumer group rebalance. In practice, 
> the rebalance happens after the appropriate timeout configuration has expired.
> I understand the default behavior in that there is an assumption that any 
> associated StateStores would be persisted to disk and that in the case of a 
> rolling restart/deployment, the rebalance delay may be preferable. However, 
> in our application we are using in-memory state stores and standby replicas. 
> There is no benefit in delaying the rebalance in this setup and we are in 
> need of a way to force a member to leave the group when shutting down.
> The workaround we found is to set an undocumented internal StreamConfig to 
> enforce this behavior:
> {code:java}
> props.put("internal.leave.group.on.close", true);
> {code}
> To state the obvious, this is less than ideal.
> Additional configuration details:
> {code:java}
> Properties props = new Properties();
> props.put(StreamsConfig.APPLICATION_ID_CONFIG, "someApplicationId");
> props.put(
> StreamsConfig.BOOTSTRAP_SERVERS_CONFIG,
> "localhost:9092,localhost:9093,localhost:9094");
> props.put(StreamsConfig.REPLICATION_FACTOR_CONFIG, 3);
> props.put(StreamsConfig.NUM_STANDBY_REPLICAS_CONFIG, 1);
> props.put(StreamsConfig.NUM_STREAM_THREADS_CONFIG, numProcessors);
> props.put(StreamsConfig.PROCESSING_GUARANTEE_CONFIG, 
> StreamsConfig.EXACTLY_ONCE_V2);{code}
>  



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


Re: [PR] KAFKA-16572: allow defining number of disks per broker in ClusterTest [kafka]

2024-04-28 Thread via GitHub


FrankYang0529 commented on PR #15745:
URL: https://github.com/apache/kafka/pull/15745#issuecomment-2081484120

   > @FrankYang0529 please fix the conflicts
   
   Hi @chia7712, I resolve conflicts. Thank you.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-15853: Move metrics configs out of KafkaConfig [kafka]

2024-04-28 Thread via GitHub


OmniaGM commented on code in PR #15822:
URL: https://github.com/apache/kafka/pull/15822#discussion_r1582152852


##
clients/src/main/java/org/apache/kafka/clients/CommonClientConfigs.java:
##
@@ -77,7 +78,7 @@ public class CommonClientConfigs {
 
 public static final String CLIENT_RACK_CONFIG = "client.rack";
 public static final String CLIENT_RACK_DOC = "A rack identifier for this 
client. This can be any string value which indicates where this client is 
physically located. It corresponds with the broker config 'broker.rack'";
-public static final String DEFAULT_CLIENT_RACK = "";
+public static final String CLIENT_RACK_DEFAULT = "";

Review Comment:
   Created this jira https://issues.apache.org/jira/browse/KAFKA-16638 as a 
followup to align the naming convention for config classes. Left note there 
that some classes need KIPs and might need a separate jira later. 



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[jira] [Created] (KAFKA-16638) Align the naming convention for config and default variables in *Config classes

2024-04-28 Thread Omnia Ibrahim (Jira)
Omnia Ibrahim created KAFKA-16638:
-

 Summary: Align the naming convention for config and default 
variables in *Config classes
 Key: KAFKA-16638
 URL: https://issues.apache.org/jira/browse/KAFKA-16638
 Project: Kafka
  Issue Type: Task
Reporter: Omnia Ibrahim


Some classes in the code is violating the naming naming convention for config, 
doc, and default variables which is:
 * `_CONFIG` suffix for defining the configuration
 * `_DEFAULT` suffix for default value 
 * `_DOC` suffix for doc

The following classes need to be updated 
 * `CleanerConfig` and `RemoteLogManagerConfig` to use  `_CONFIG` suffix 
instead of `_PROP`.
 * Others like `LogConfig` and `QuorumConfig` to use `_DEFAULT` suffix instead 
of `DEFAULT_` prefix .
 * same goes with `CommonClientConfigs`, `StreamsConfig` however these are 
public interfaces and will need a KIP to rename the default value variables and 
mark the old one as deprecated. This might need to be broken to different Jira.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Updated] (KAFKA-16637) KIP-848 does not work well

2024-04-28 Thread sanghyeok An (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-16637?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

sanghyeok An updated KAFKA-16637:
-
Description: 
I want to test next generation of the consumer rebalance protocol  
([https://cwiki.apache.org/confluence/display/KAFKA/The+Next+Generation+of+the+Consumer+Rebalance+Protocol+%28KIP-848%29+-+Early+Access+Release+Notes)]

 

However, it does not works well. 

You can check my condition.

 

*Docker-compose.yaml*

[https://github.com/chickenchickenlove/kraft-test/blob/main/docker-compose/docker-compose.yaml]

 

*Consumer Code*

[https://github.com/chickenchickenlove/kraft-test/blob/main/src/main/java/org/example/Main.java]

 

*Consumer logs*

[main] INFO org.apache.kafka.common.telemetry.internals.KafkaMetricsCollector - 
initializing Kafka metrics collector
[main] INFO org.apache.kafka.common.utils.AppInfoParser - Kafka version: 3.7.0
[main] INFO org.apache.kafka.common.utils.AppInfoParser - Kafka commitId: 
2ae524ed625438c5
[main] INFO org.apache.kafka.common.utils.AppInfoParser - Kafka startTimeMs: 
1714309299215
[main] INFO org.apache.kafka.clients.consumer.internals.AsyncKafkaConsumer - 
[Consumer clientId=1-1, groupId=1] Subscribed to topic(s): test-topic1
[consumer_background_thread] INFO org.apache.kafka.clients.Metadata - [Consumer 
clientId=1-1, groupId=1] Cluster ID: Some(MkU3OEVBNTcwNTJENDM2Qk)

Stuck In here...

 

*Broker logs* 

broker    | [2024-04-28 12:42:27,751] INFO Sent auto-creation request for 
Set(__consumer_offsets) to the active controller. 
(kafka.server.DefaultAutoTopicCreationManager)
broker    | [2024-04-28 12:42:27,801] INFO Sent auto-creation request for 
Set(__consumer_offsets) to the active controller. 
(kafka.server.DefaultAutoTopicCreationManager)
broker    | [2024-04-28 12:42:28,211] INFO Sent auto-creation request for 
Set(__consumer_offsets) to the active controller. 
(kafka.server.DefaultAutoTopicCreationManager)
broker    | [2024-04-28 12:42:28,259] INFO Sent auto-creation request for 
Set(__consumer_offsets) to the active controller. 
(kafka.server.DefaultAutoTopicCreationManager)
broker    | [2024-04-28 12:42:28,727] INFO Sent auto-creation request for 
Set(__consumer_offsets) to the active controller. 
(kafka.server.DefaultAutoTopicCreationManager)

stuck in here

  was:
I want to test next generation of the consumer rebalance protocol  
([https://cwiki.apache.org/confluence/display/KAFKA/The+Next+Generation+of+the+Consumer+Rebalance+Protocol+%28KIP-848%29+-+Early+Access+Release+Notes)]

 

However, it does not works well. 

You can check my condition.

 

### Docker-compose.yaml

[https://github.com/chickenchickenlove/kraft-test/blob/main/docker-compose/docker-compose.yaml]

 

### Consumer Code

[https://github.com/chickenchickenlove/kraft-test/blob/main/src/main/java/org/example/Main.java]

 

### Consumer logs

[main] INFO org.apache.kafka.common.telemetry.internals.KafkaMetricsCollector - 
initializing Kafka metrics collector
[main] INFO org.apache.kafka.common.utils.AppInfoParser - Kafka version: 3.7.0
[main] INFO org.apache.kafka.common.utils.AppInfoParser - Kafka commitId: 
2ae524ed625438c5
[main] INFO org.apache.kafka.common.utils.AppInfoParser - Kafka startTimeMs: 
1714309299215
[main] INFO org.apache.kafka.clients.consumer.internals.AsyncKafkaConsumer - 
[Consumer clientId=1-1, groupId=1] Subscribed to topic(s): test-topic1
[consumer_background_thread] INFO org.apache.kafka.clients.Metadata - [Consumer 
clientId=1-1, groupId=1] Cluster ID: Some(MkU3OEVBNTcwNTJENDM2Qk)

Stuck In here...

 

### Broker logs 

broker    | [2024-04-28 12:42:27,751] INFO Sent auto-creation request for 
Set(__consumer_offsets) to the active controller. 
(kafka.server.DefaultAutoTopicCreationManager)
broker    | [2024-04-28 12:42:27,801] INFO Sent auto-creation request for 
Set(__consumer_offsets) to the active controller. 
(kafka.server.DefaultAutoTopicCreationManager)
broker    | [2024-04-28 12:42:28,211] INFO Sent auto-creation request for 
Set(__consumer_offsets) to the active controller. 
(kafka.server.DefaultAutoTopicCreationManager)
broker    | [2024-04-28 12:42:28,259] INFO Sent auto-creation request for 
Set(__consumer_offsets) to the active controller. 
(kafka.server.DefaultAutoTopicCreationManager)
broker    | [2024-04-28 12:42:28,727] INFO Sent auto-creation request for 
Set(__consumer_offsets) to the active controller. 
(kafka.server.DefaultAutoTopicCreationManager)

stuck in here


> KIP-848 does not work well
> --
>
> Key: KAFKA-16637
> URL: https://issues.apache.org/jira/browse/KAFKA-16637
> Project: Kafka
>  Issue Type: Bug
>Reporter: sanghyeok An
>Priority: Minor
>
> I want to test next generation of the consumer rebalance protocol  
> ([https://cwiki.apache.org/confluence/display/KAFKA/The+Next+Generation+of+the+Consumer+Rebalance+Protocol+%28KIP-

[jira] [Updated] (KAFKA-16637) KIP-848 does not work well

2024-04-28 Thread sanghyeok An (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-16637?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

sanghyeok An updated KAFKA-16637:
-
Description: 
I want to test next generation of the consumer rebalance protocol  
([https://cwiki.apache.org/confluence/display/KAFKA/The+Next+Generation+of+the+Consumer+Rebalance+Protocol+%28KIP-848%29+-+Early+Access+Release+Notes)]

 

However, it does not works well. 

You can check my condition.

 

### Docker-compose.yaml

[https://github.com/chickenchickenlove/kraft-test/blob/main/docker-compose/docker-compose.yaml]

 

### Consumer Code

[https://github.com/chickenchickenlove/kraft-test/blob/main/src/main/java/org/example/Main.java]

 

### Consumer logs

[main] INFO org.apache.kafka.common.telemetry.internals.KafkaMetricsCollector - 
initializing Kafka metrics collector
[main] INFO org.apache.kafka.common.utils.AppInfoParser - Kafka version: 3.7.0
[main] INFO org.apache.kafka.common.utils.AppInfoParser - Kafka commitId: 
2ae524ed625438c5
[main] INFO org.apache.kafka.common.utils.AppInfoParser - Kafka startTimeMs: 
1714309299215
[main] INFO org.apache.kafka.clients.consumer.internals.AsyncKafkaConsumer - 
[Consumer clientId=1-1, groupId=1] Subscribed to topic(s): test-topic1
[consumer_background_thread] INFO org.apache.kafka.clients.Metadata - [Consumer 
clientId=1-1, groupId=1] Cluster ID: Some(MkU3OEVBNTcwNTJENDM2Qk)

Stuck In here...

 

### Broker logs 

broker    | [2024-04-28 12:42:27,751] INFO Sent auto-creation request for 
Set(__consumer_offsets) to the active controller. 
(kafka.server.DefaultAutoTopicCreationManager)
broker    | [2024-04-28 12:42:27,801] INFO Sent auto-creation request for 
Set(__consumer_offsets) to the active controller. 
(kafka.server.DefaultAutoTopicCreationManager)
broker    | [2024-04-28 12:42:28,211] INFO Sent auto-creation request for 
Set(__consumer_offsets) to the active controller. 
(kafka.server.DefaultAutoTopicCreationManager)
broker    | [2024-04-28 12:42:28,259] INFO Sent auto-creation request for 
Set(__consumer_offsets) to the active controller. 
(kafka.server.DefaultAutoTopicCreationManager)
broker    | [2024-04-28 12:42:28,727] INFO Sent auto-creation request for 
Set(__consumer_offsets) to the active controller. 
(kafka.server.DefaultAutoTopicCreationManager)

stuck in here

> KIP-848 does not work well
> --
>
> Key: KAFKA-16637
> URL: https://issues.apache.org/jira/browse/KAFKA-16637
> Project: Kafka
>  Issue Type: Bug
>Reporter: sanghyeok An
>Priority: Minor
>
> I want to test next generation of the consumer rebalance protocol  
> ([https://cwiki.apache.org/confluence/display/KAFKA/The+Next+Generation+of+the+Consumer+Rebalance+Protocol+%28KIP-848%29+-+Early+Access+Release+Notes)]
>  
> However, it does not works well. 
> You can check my condition.
>  
> ### Docker-compose.yaml
> [https://github.com/chickenchickenlove/kraft-test/blob/main/docker-compose/docker-compose.yaml]
>  
> ### Consumer Code
> [https://github.com/chickenchickenlove/kraft-test/blob/main/src/main/java/org/example/Main.java]
>  
> ### Consumer logs
> [main] INFO org.apache.kafka.common.telemetry.internals.KafkaMetricsCollector 
> - initializing Kafka metrics collector
> [main] INFO org.apache.kafka.common.utils.AppInfoParser - Kafka version: 3.7.0
> [main] INFO org.apache.kafka.common.utils.AppInfoParser - Kafka commitId: 
> 2ae524ed625438c5
> [main] INFO org.apache.kafka.common.utils.AppInfoParser - Kafka startTimeMs: 
> 1714309299215
> [main] INFO org.apache.kafka.clients.consumer.internals.AsyncKafkaConsumer - 
> [Consumer clientId=1-1, groupId=1] Subscribed to topic(s): test-topic1
> [consumer_background_thread] INFO org.apache.kafka.clients.Metadata - 
> [Consumer clientId=1-1, groupId=1] Cluster ID: Some(MkU3OEVBNTcwNTJENDM2Qk)
> Stuck In here...
>  
> ### Broker logs 
> broker    | [2024-04-28 12:42:27,751] INFO Sent auto-creation request for 
> Set(__consumer_offsets) to the active controller. 
> (kafka.server.DefaultAutoTopicCreationManager)
> broker    | [2024-04-28 12:42:27,801] INFO Sent auto-creation request for 
> Set(__consumer_offsets) to the active controller. 
> (kafka.server.DefaultAutoTopicCreationManager)
> broker    | [2024-04-28 12:42:28,211] INFO Sent auto-creation request for 
> Set(__consumer_offsets) to the active controller. 
> (kafka.server.DefaultAutoTopicCreationManager)
> broker    | [2024-04-28 12:42:28,259] INFO Sent auto-creation request for 
> Set(__consumer_offsets) to the active controller. 
> (kafka.server.DefaultAutoTopicCreationManager)
> broker    | [2024-04-28 12:42:28,727] INFO Sent auto-creation request for 
> Set(__consumer_offsets) to the active controller. 
> (kafka.server.DefaultAutoTopicCreationManager)
> stuck in here



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Created] (KAFKA-16637) KIP-848 does not work well

2024-04-28 Thread sanghyeok An (Jira)
sanghyeok An created KAFKA-16637:


 Summary: KIP-848 does not work well
 Key: KAFKA-16637
 URL: https://issues.apache.org/jira/browse/KAFKA-16637
 Project: Kafka
  Issue Type: Bug
Reporter: sanghyeok An






--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Updated] (KAFKA-16637) KIP-848 does not work well

2024-04-28 Thread sanghyeok An (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-16637?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

sanghyeok An updated KAFKA-16637:
-
Priority: Minor  (was: Major)

> KIP-848 does not work well
> --
>
> Key: KAFKA-16637
> URL: https://issues.apache.org/jira/browse/KAFKA-16637
> Project: Kafka
>  Issue Type: Bug
>Reporter: sanghyeok An
>Priority: Minor
>




--
This message was sent by Atlassian Jira
(v8.20.10#820010)


Re: [PR] KAFKA-16452: Bound high-watermark offset to range between LLSO and LEO [kafka]

2024-04-28 Thread via GitHub


kamalcph commented on code in PR #15634:
URL: https://github.com/apache/kafka/pull/15634#discussion_r1582149811


##
core/src/main/scala/kafka/log/UnifiedLog.scala:
##
@@ -282,15 +282,15 @@ class UnifiedLog(@volatile var logStartOffset: Long,
 
   /**
* Update high watermark with offset metadata. The new high watermark will 
be lower
-   * bounded by the log start offset and upper bounded by the log end offset.
+   * bounded by the local-log-start-offset and upper bounded by the 
log-end-offset.
*
* @param highWatermarkMetadata the suggested high watermark with offset 
metadata
* @return the updated high watermark offset
*/
   def updateHighWatermark(highWatermarkMetadata: LogOffsetMetadata): Long = {
 val endOffsetMetadata = localLog.logEndOffsetMetadata
-val newHighWatermarkMetadata = if (highWatermarkMetadata.messageOffset < 
logStartOffset) {
-  new LogOffsetMetadata(logStartOffset)
+val newHighWatermarkMetadata = if (highWatermarkMetadata.messageOffset < 
_localLogStartOffset) {

Review Comment:
   Opened #15825 a draft PR with the suggested approach. PTAL.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[PR] KAFKA-16452: Don't throw OOORE when converting the offset to metadata [kafka]

2024-04-28 Thread via GitHub


kamalcph opened a new pull request, #15825:
URL: https://github.com/apache/kafka/pull/15825

   
   ### Committer Checklist (excluded from commit message)
   - [ ] Verify design and implementation 
   - [ ] Verify test coverage and CI build status
   - [ ] Verify documentation (including upgrade notes)
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-16452: Bound high-watermark offset to range between LLSO and LEO [kafka]

2024-04-28 Thread via GitHub


kamalcph commented on code in PR #15634:
URL: https://github.com/apache/kafka/pull/15634#discussion_r1582144306


##
core/src/main/scala/kafka/log/UnifiedLog.scala:
##
@@ -282,15 +282,15 @@ class UnifiedLog(@volatile var logStartOffset: Long,
 
   /**
* Update high watermark with offset metadata. The new high watermark will 
be lower
-   * bounded by the log start offset and upper bounded by the log end offset.
+   * bounded by the local-log-start-offset and upper bounded by the 
log-end-offset.
*
* @param highWatermarkMetadata the suggested high watermark with offset 
metadata
* @return the updated high watermark offset
*/
   def updateHighWatermark(highWatermarkMetadata: LogOffsetMetadata): Long = {
 val endOffsetMetadata = localLog.logEndOffsetMetadata
-val newHighWatermarkMetadata = if (highWatermarkMetadata.messageOffset < 
logStartOffset) {
-  new LogOffsetMetadata(logStartOffset)
+val newHighWatermarkMetadata = if (highWatermarkMetadata.messageOffset < 
_localLogStartOffset) {

Review Comment:
   > For example, if we lose the local data in all replicas, the 
lastStableOffset could still be in the middle of a tiered segment and moving it 
to localLogStartOffset immediately will be incorrect.
   
   I'm not clear on this:
   
   1. Segments that are eligible for upload to remote storage only when the 
`lastStableOffset` moves beyond the segment-to-be-uploaded-end-offset. 
   2. When all the replicas loses local data (offline partition), then we 
consider the data in remote storage also lost. Currently, for this case, we 
don't have provision to serve the remote data.
   3. When `firstUnstableOffsetMetadata` is empty, we return `highWatermark`. 
With this patch, the `highWatermark` lower boundary is set to 
`localLogStartOffset` so there won't be an issue. 
   
   > Note that OffsetMetadata (segmentBaseOffset and relativePositionInSegment) 
is only used in DelayedFetch for estimating the amount of available bytes.
   
   The 
[LogOffsetMetadata#onOlderSegment](https://sourcegraph.com/github.com/apache/kafka@5de5d967adffd864bad3ec729760a430253abf38/-/blob/storage/src/main/java/org/apache/kafka/storage/internals/log/LogOffsetMetadata.java?L54)
 method is used in the 
[hot-path](https://sourcegraph.com/github.com/apache/kafka@5de5d967adffd864bad3ec729760a430253abf38/-/blob/core/src/main/scala/kafka/log/UnifiedLog.scala?L324)
 of incrementing the high-watermark and expects the full metadata, otherwise it 
throws an error. Is it ok to remove the throwable from 
LogOffsetMetadata#onOlderSegment method and return `false` when 
`messageOffsetOnly` available?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-16452: Bound high-watermark offset to range between LLSO and LEO [kafka]

2024-04-28 Thread via GitHub


kamalcph commented on code in PR #15634:
URL: https://github.com/apache/kafka/pull/15634#discussion_r1582144306


##
core/src/main/scala/kafka/log/UnifiedLog.scala:
##
@@ -282,15 +282,15 @@ class UnifiedLog(@volatile var logStartOffset: Long,
 
   /**
* Update high watermark with offset metadata. The new high watermark will 
be lower
-   * bounded by the log start offset and upper bounded by the log end offset.
+   * bounded by the local-log-start-offset and upper bounded by the 
log-end-offset.
*
* @param highWatermarkMetadata the suggested high watermark with offset 
metadata
* @return the updated high watermark offset
*/
   def updateHighWatermark(highWatermarkMetadata: LogOffsetMetadata): Long = {
 val endOffsetMetadata = localLog.logEndOffsetMetadata
-val newHighWatermarkMetadata = if (highWatermarkMetadata.messageOffset < 
logStartOffset) {
-  new LogOffsetMetadata(logStartOffset)
+val newHighWatermarkMetadata = if (highWatermarkMetadata.messageOffset < 
_localLogStartOffset) {

Review Comment:
   > For example, if we lose the local data in all replicas, the 
lastStableOffset could still be in the middle of a tiered segment and moving it 
to localLogStartOffset immediately will be incorrect.
   
   I'm not clear on this:
   
   1. Segments that are eligible for upload to remote storage only when the 
`lastStableOffset` moves beyond the segment-to-be-uploaded-end-offset. 
   2. When all the replicas loses local data (offline partition), then we 
consider the data in remote storage also lost. Currently, for this case, we 
don't have provision to serve the remote data.
   3. When `firstUnstableOffsetMetadata` is empty, we return `highWatermark`. 
With this patch, the `highWatermark` lower boundary is set to 
`localLogStartOffset` so there won't be an issue. 
   
   > Note that OffsetMetadata (segmentBaseOffset and relativePositionInSegment) 
is only used in DelayedFetch for estimating the amount of available bytes.
   
   The 
[LogOffsetMetadata#onOlderSegment](https://sourcegraph.com/github.com/apache/kafka@5de5d967adffd864bad3ec729760a430253abf38/-/blob/storage/src/main/java/org/apache/kafka/storage/internals/log/LogOffsetMetadata.java?L54)
 method is used in the 
[hot-path](https://sourcegraph.com/github.com/apache/kafka@5de5d967adffd864bad3ec729760a430253abf38/-/blob/core/src/main/scala/kafka/log/UnifiedLog.scala?L324)
 of incrementing the high-watermark and expects the full metadata, otherwise it 
throws an error. Is it ok to remove the throwable from 
LogOffsetMetadata#onOlderSegment method and return `false` by default?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-16452: Bound high-watermark offset to range between LLSO and LEO [kafka]

2024-04-28 Thread via GitHub


kamalcph commented on code in PR #15634:
URL: https://github.com/apache/kafka/pull/15634#discussion_r1582144306


##
core/src/main/scala/kafka/log/UnifiedLog.scala:
##
@@ -282,15 +282,15 @@ class UnifiedLog(@volatile var logStartOffset: Long,
 
   /**
* Update high watermark with offset metadata. The new high watermark will 
be lower
-   * bounded by the log start offset and upper bounded by the log end offset.
+   * bounded by the local-log-start-offset and upper bounded by the 
log-end-offset.
*
* @param highWatermarkMetadata the suggested high watermark with offset 
metadata
* @return the updated high watermark offset
*/
   def updateHighWatermark(highWatermarkMetadata: LogOffsetMetadata): Long = {
 val endOffsetMetadata = localLog.logEndOffsetMetadata
-val newHighWatermarkMetadata = if (highWatermarkMetadata.messageOffset < 
logStartOffset) {
-  new LogOffsetMetadata(logStartOffset)
+val newHighWatermarkMetadata = if (highWatermarkMetadata.messageOffset < 
_localLogStartOffset) {

Review Comment:
   > For example, if we lose the local data in all replicas, the 
lastStableOffset could still be in the middle of a tiered segment and moving it 
to localLogStartOffset immediately will be incorrect.
   
   I'm not clear on this:
   
   1. Segments that are eligible for upload to remote storage only when the 
`lastStableOffset` moves beyond the segment-to-be-uploaded-end-offset. 
   2. When all the replicas loses local data (offline partition), then we 
consider the data in remote storage also lost. Currently, for this case, we 
don't have provision to serve the remote data.
   3. When `firstUnstableOffsetMetadata` is empty, we return `highWatermark`. 
With this patch, the `highWatermark` lower boundary is set to 
`localLogStartOffset` so there won't be an issue. 
   
   > Note that OffsetMetadata (segmentBaseOffset and relativePositionInSegment) 
is only used in DelayedFetch for estimating the amount of available bytes.
   
   The 
[LogOffsetMetadata#onOlderSegment](https://sourcegraph.com/github.com/apache/kafka@5de5d967adffd864bad3ec729760a430253abf38/-/blob/storage/src/main/java/org/apache/kafka/storage/internals/log/LogOffsetMetadata.java?L54)
 method is used in the 
[hot-path](https://sourcegraph.com/github.com/apache/kafka@5de5d967adffd864bad3ec729760a430253abf38/-/blob/core/src/main/scala/kafka/log/UnifiedLog.scala?L324)
 of incrementing the high-watermark and expects the full metadata, otherwise it 
throws an error. Is it ok to remove the throwable from 
LogOffsetMetadata#onOlderSegment method and return `false` by default.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-15853: Move metrics configs out of KafkaConfig [kafka]

2024-04-28 Thread via GitHub


OmniaGM commented on code in PR #15822:
URL: https://github.com/apache/kafka/pull/15822#discussion_r1582144106


##
clients/src/main/java/org/apache/kafka/clients/CommonClientConfigs.java:
##
@@ -77,7 +78,7 @@ public class CommonClientConfigs {
 
 public static final String CLIENT_RACK_CONFIG = "client.rack";
 public static final String CLIENT_RACK_DOC = "A rack identifier for this 
client. This can be any string value which indicates where this client is 
physically located. It corresponds with the broker config 'broker.rack'";
-public static final String DEFAULT_CLIENT_RACK = "";
+public static final String CLIENT_RACK_DEFAULT = "";

Review Comment:
   had a second thought about this and instead of raising a KIP I decided to 
just move all metric configs in one place for now in `MetricConfigs`. As I want 
to keep KAFKA-15853 as simple as possible as it already huge effort to move 
these config out of core. I will create a cleanup Jiras to align the naming 
convention for some of these config files. 



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[jira] [Assigned] (KAFKA-15156) Update cipherInformation correctly in DefaultChannelMetadataRegistry

2024-04-28 Thread Walter Hernandez (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-15156?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Walter Hernandez reassigned KAFKA-15156:


Assignee: (was: Walter Hernandez)

> Update cipherInformation correctly in DefaultChannelMetadataRegistry
> 
>
> Key: KAFKA-15156
> URL: https://issues.apache.org/jira/browse/KAFKA-15156
> Project: Kafka
>  Issue Type: Bug
>Reporter: Divij Vaidya
>Priority: Minor
>  Labels: newbie
>
> At 
> [https://github.com/apache/kafka/blob/4a61b48d3dca81e28b57f10af6052f36c50a05e3/clients/src/main/java/org/apache/kafka/common/network/DefaultChannelMetadataRegistry.java#L25]
>  
> we do not end up assigning the new value of cipherInformation to the member 
> variable. 
> The code over here, should be the following so that we can update the cipher 
> information.
> {noformat}
> if (cipherInformation == null) {     
> throw Illegal exception.
> }
> this.cipherInformation = cipherInformation{noformat}
>  
>  
> While this class is only used in tests, we should still fix this. It's a 
> minor bug.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[PR] KAFKA-16627: Remove ClusterConfig parameter in BeforeEach and AfterEach [kafka]

2024-04-28 Thread via GitHub


brandboat opened a new pull request, #15824:
URL: https://github.com/apache/kafka/pull/15824

   related to [KAFKA-16627](https://issues.apache.org/jira/browse/KAFKA-16627)
   
   In the past we modify configs like server broker properties by modifying the 
ClusterConfig reference passed to BeforeEach and AfterEach based on the 
requirements of the tests.
   While after 
[KAFKA-16560](https://issues.apache.org/jira/browse/KAFKA-16560), the 
ClusterConfig become immutable, modify the ClusterConfig reference no longer 
reflects any changes to the test cluster. Then pass ClusterConfig to BeforeEach 
and AfterEach become redundant.
   
   This pr directly remove the behavior that pass `ClusterConfig` to all test 
methods and before/after each method.
   
   ### Committer Checklist (excluded from commit message)
   - [ ] Verify design and implementation 
   - [ ] Verify test coverage and CI build status
   - [ ] Verify documentation (including upgrade notes)
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] MINOR: Remove unused parameters in KafkaConfig [kafka]

2024-04-28 Thread via GitHub


johnnychhsu commented on PR #15788:
URL: https://github.com/apache/kafka/pull/15788#issuecomment-2081407268

   thanks @chia7712!
   just rebased and fixed the conflicts.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-16399: Add JBOD support in tiered storage [kafka]

2024-04-28 Thread via GitHub


kamalcph commented on code in PR #15690:
URL: https://github.com/apache/kafka/pull/15690#discussion_r1582059485


##
core/src/main/java/kafka/server/TierStateMachine.java:
##
@@ -54,5 +180,109 @@ PartitionFetchState start(TopicPartition topicPartition,
  * @return the new PartitionFetchState if the tier state machine was 
advanced, otherwise, return the currentFetchState
  */
 Optional maybeAdvanceState(TopicPartition 
topicPartition,
-PartitionFetchState 
currentFetchState);
+PartitionFetchState 
currentFetchState) {
+// This is currently a no-op but will be used for implementing async 
tiering logic in KAFKA-13560.
+return Optional.of(currentFetchState);
+}
+
+/**
+ * It tries to build the required state for this partition from leader and 
remote storage so that it can start
+ * fetching records from the leader. The return value is the next offset 
to fetch from the leader, which is the
+ * next offset following the end offset of the remote log portion.
+ */
+private Long buildRemoteLogAuxState(TopicPartition topicPartition,
+Integer currentLeaderEpoch,
+Long leaderLocalLogStartOffset,
+Integer 
epochForLeaderLocalLogStartOffset,
+Long leaderLogStartOffset,
+UnifiedLog unifiedLog) throws 
IOException, RemoteStorageException {
+
+long nextOffset;
+
+if (unifiedLog.remoteStorageSystemEnable() && 
unifiedLog.config().remoteStorageEnable()) {
+if (replicaMgr.remoteLogManager().isEmpty()) throw new 
IllegalStateException("RemoteLogManager is not yet instantiated");
+
+RemoteLogManager rlm = replicaMgr.remoteLogManager().get();
+
+// Find the respective leader epoch for (leaderLocalLogStartOffset 
- 1). We need to build the leader epoch cache
+// until that offset
+long previousOffsetToLeaderLocalLogStartOffset = 
leaderLocalLogStartOffset - 1;
+int targetEpoch;
+// If the existing epoch is 0, no need to fetch from earlier epoch 
as the desired offset(leaderLogStartOffset - 1)
+// will have the same epoch.
+if (epochForLeaderLocalLogStartOffset == 0) {
+targetEpoch = epochForLeaderLocalLogStartOffset;
+} else {

Review Comment:
   I would suggest to take the refactoring in the next/separate PR to review 
this PR effectively. 



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] MINOR: Replaced Utils.join() with String.join() [kafka]

2024-04-28 Thread via GitHub


chiacyu commented on code in PR #15823:
URL: https://github.com/apache/kafka/pull/15823#discussion_r1582059240


##
clients/src/main/java/org/apache/kafka/clients/consumer/internals/AsyncKafkaConsumer.java:
##
@@ -1491,7 +1490,7 @@ public void assign(Collection partitions) 
{
 // See the ApplicationEventProcessor.process() method that handles 
this event for more detail.
 applicationEventHandler.add(new 
AssignmentChangeEvent(subscriptions.allConsumed(), time.milliseconds()));
 
-log.info("Assigned to partition(s): {}", join(partitions, ", "));
+log.info("Assigned to partition(s): {}", String.join(", ", 
Arrays.toString(partitions.toArray(;

Review Comment:
   Got it, thanks for the advice!



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-16399: Add JBOD support in tiered storage [kafka]

2024-04-28 Thread via GitHub


kamalcph commented on code in PR #15690:
URL: https://github.com/apache/kafka/pull/15690#discussion_r1582055490


##
storage/src/test/java/org/apache/kafka/tiered/storage/integration/TransactionsWithTieredStoreTest.java:
##
@@ -74,7 +74,7 @@ public Properties topicConfig() {
 public void 
maybeWaitForAtLeastOneSegmentUpload(scala.collection.Seq 
topicPartitions) {
 JavaConverters.seqAsJavaList(topicPartitions).forEach(topicPartition 
-> {
 List localStorages = 
JavaConverters.bufferAsJavaList(brokers()).stream()
-.map(b -> new BrokerLocalStorage(b.config().brokerId(), 
b.config().logDirs().head(), STORAGE_WAIT_TIMEOUT_SEC))
+.map(b -> new BrokerLocalStorage(b.config().brokerId(), 
JavaConverters.asJava(b.config().logDirs().toSet()), STORAGE_WAIT_TIMEOUT_SEC))

Review Comment:
   ditto



##
storage/src/test/java/org/apache/kafka/tiered/storage/utils/BrokerLocalStorage.java:
##
@@ -31,31 +31,36 @@
 import java.util.List;
 import java.util.Objects;
 import java.util.Optional;
+import java.util.Set;
 import java.util.concurrent.TimeUnit;
 import java.util.function.Function;
 import java.util.stream.Collectors;
 
 public final class BrokerLocalStorage {
 
 private final Integer brokerId;
-private final File brokerStorageDirectory;
+private final Set brokerStorageDirectorys;

Review Comment:
   nit: `brokerStorageDirectorys` -> `brokerStorageDirectories`
   



##
core/src/test/scala/unit/kafka/server/ReplicaManagerTest.scala:
##
@@ -3287,11 +3287,9 @@ class ReplicaManagerTest {
 val path1 = TestUtils.tempRelativeDir("data").getAbsolutePath
 val path2 = TestUtils.tempRelativeDir("data2").getAbsolutePath
 if (enableRemoteStorage) {

Review Comment:
   nit: do we need this `if` check?



##
storage/src/test/java/org/apache/kafka/tiered/storage/utils/BrokerLocalStorage.java:
##
@@ -141,7 +146,11 @@ private boolean 
isOffsetPresentInFirstLocalSegment(TopicPartition topicPartition
 if (offsetToSearch.equals(firstLogFileBaseOffset)) {
 return true;
 }
-File partitionDir = new File(brokerStorageDirectory.getAbsolutePath(), 
topicPartition.toString());
+File partitionDir = brokerStorageDirectorys.stream()
+.filter(dir -> dirContainsTopicPartition(topicPartition, dir))
+.findFirst()
+.orElseThrow(() -> new 
IllegalArgumentException(String.format("[BrokerId=%d] Directory for the 
topic-partition %s " +
+"was not found", brokerId, topicPartition)));

Review Comment:
   previously, we were returning the `partitionDir` instead of  `logDir`:
   
   ```suggestion
   File logDir = brokerStorageDirectorys.stream()
   .filter(dir -> dirContainsTopicPartition(topicPartition, 
dir))
   .findFirst()
   .orElseThrow(() -> new 
IllegalArgumentException(String.format("[BrokerId=%d] Directory for the 
topic-partition %s " +
   "was not found", brokerId, topicPartition)));
   File partitionDir = new File(logDir.getAbsolutePath(), 
topicPartition.toString());
   ```



##
storage/src/test/java/org/apache/kafka/tiered/storage/utils/BrokerLocalStorage.java:
##
@@ -31,31 +31,36 @@
 import java.util.List;
 import java.util.Objects;
 import java.util.Optional;
+import java.util.Set;
 import java.util.concurrent.TimeUnit;
 import java.util.function.Function;
 import java.util.stream.Collectors;
 
 public final class BrokerLocalStorage {
 
 private final Integer brokerId;
-private final File brokerStorageDirectory;
+private final Set brokerStorageDirectorys;
 private final Integer storageWaitTimeoutSec;
 
 private final int storagePollPeriodSec = 1;
 private final Time time = Time.SYSTEM;
 
 public BrokerLocalStorage(Integer brokerId,
-  String storageDirname,
+  Set storageDirnames,
   Integer storageWaitTimeoutSec) {
 this.brokerId = brokerId;
-this.brokerStorageDirectory = new File(storageDirname);
+this.brokerStorageDirectorys = 
storageDirnames.stream().map(File::new).collect(Collectors.toSet());
 this.storageWaitTimeoutSec = storageWaitTimeoutSec;
 }
 
 public Integer getBrokerId() {
 return brokerId;
 }
 
+public Set getBrokerStorageDirectory() {

Review Comment:
   rename `getBrokerStorageDirectory` -> `getBrokerStorageDirectories`



##
storage/src/test/java/org/apache/kafka/tiered/storage/TieredStorageTestBuilder.java:
##
@@ -313,6 +314,14 @@ public TieredStorageTestBuilder reassignReplica(String 
topic,
 return this;
 }
 
+public TieredStorageTestBuilder alterLogDir(String topic,
+Integer partition,

Review Comment:
   nit: parameter alignment




Re: [PR] MINOR: Replaced Utils.join() with String.join() [kafka]

2024-04-28 Thread via GitHub


chia7712 commented on code in PR #15823:
URL: https://github.com/apache/kafka/pull/15823#discussion_r1582049275


##
clients/src/main/java/org/apache/kafka/clients/consumer/internals/AsyncKafkaConsumer.java:
##
@@ -1491,7 +1490,7 @@ public void assign(Collection partitions) 
{
 // See the ApplicationEventProcessor.process() method that handles 
this event for more detail.
 applicationEventHandler.add(new 
AssignmentChangeEvent(subscriptions.allConsumed(), time.milliseconds()));
 
-log.info("Assigned to partition(s): {}", join(partitions, ", "));
+log.info("Assigned to partition(s): {}", String.join(", ", 
Arrays.toString(partitions.toArray(;

Review Comment:
   how about 
`partitions.stream().map(TopicPartition::toString).collect(Collectors.joining(","))`?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[jira] [Commented] (KAFKA-15897) Flaky Test: testWrongIncarnationId() – kafka.server.ControllerRegistrationManagerTest

2024-04-28 Thread Chia-Ping Tsai (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-15897?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17841597#comment-17841597
 ] 

Chia-Ping Tsai commented on KAFKA-15897:


It seems the root cause is that `context.mockChannelManager.poll()` 
([https://github.com/apache/kafka/blob/trunk/core/src/test/scala/unit/kafka/server/ControllerRegistrationManagerTest.scala#L221)]
  is executed after event queue thread adds the `ControllerRegistrationRequest` 
([https://github.com/apache/kafka/blob/trunk/core/src/main/scala/kafka/server/ControllerRegistrationManager.scala#L233).]
 The request is consumed without no response as we don't set response 
([https://github.com/apache/kafka/blob/trunk/core/src/test/scala/unit/kafka/server/ControllerRegistrationManagerTest.scala#L226)].
 Hence, the following test 
([https://github.com/apache/kafka/blob/trunk/core/src/test/scala/unit/kafka/server/ControllerRegistrationManagerTest.scala#L230)]
 gets failed as the request is gone.

I feel the first poll 
([https://github.com/apache/kafka/blob/trunk/core/src/test/scala/unit/kafka/server/ControllerRegistrationManagerTest.scala#L221)]
 is unnecessary since the check is waiting for event queue thread to send 
controller registration request. That does NOT need the poll, and removing the 
poll can prevent above race condition.

 

 

> Flaky Test: testWrongIncarnationId() – 
> kafka.server.ControllerRegistrationManagerTest
> -
>
> Key: KAFKA-15897
> URL: https://issues.apache.org/jira/browse/KAFKA-15897
> Project: Kafka
>  Issue Type: Test
>Reporter: Apoorv Mittal
>Assignee: Chia-Ping Tsai
>Priority: Major
>  Labels: flaky-test
>
> Build run: 
> https://ci-builds.apache.org/blue/organizations/jenkins/Kafka%2Fkafka-pr/detail/PR-14699/21/tests/
>  
> {code:java}
> org.opentest4j.AssertionFailedError: expected: <(false,1,0)> but was: 
> <(true,0,0)>Stacktraceorg.opentest4j.AssertionFailedError: expected: 
> <(false,1,0)> but was: <(true,0,0)>at 
> app//org.junit.jupiter.api.AssertionFailureBuilder.build(AssertionFailureBuilder.java:151)
>at 
> app//org.junit.jupiter.api.AssertionFailureBuilder.buildAndThrow(AssertionFailureBuilder.java:132)
>at 
> app//org.junit.jupiter.api.AssertEquals.failNotEqual(AssertEquals.java:197)  
> at 
> app//org.junit.jupiter.api.AssertEquals.assertEquals(AssertEquals.java:182)  
> at 
> app//org.junit.jupiter.api.AssertEquals.assertEquals(AssertEquals.java:177)  
> at app//org.junit.jupiter.api.Assertions.assertEquals(Assertions.java:1141)   
>   at 
> app//kafka.server.ControllerRegistrationManagerTest.$anonfun$testWrongIncarnationId$3(ControllerRegistrationManagerTest.scala:228)
>at 
> app//org.apache.kafka.test.TestUtils.retryOnExceptionWithTimeout(TestUtils.java:379)
>  at 
> app//kafka.server.ControllerRegistrationManagerTest.testWrongIncarnationId(ControllerRegistrationManagerTest.scala:226)
>   at 
> java.base@17.0.7/jdk.internal.reflect.NativeMethodAccessorImpl.invoke0(Native 
> Method)at 
> java.base@17.0.7/jdk.internal.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:77)
>   at 
> java.base@17.0.7/jdk.internal.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
>   at java.base@17.0.7/java.lang.reflect.Method.invoke(Method.java:568)
> at 
> app//org.junit.platform.commons.util.ReflectionUtils.invokeMethod(ReflectionUtils.java:728)
>   at 
> app//org.junit.jupiter.engine.execution.MethodInvocation.proceed(MethodInvocation.java:60)
>at 
> app//org.junit.jupiter.engine.execution.InvocationInterceptorChain$ValidatingInvocation.proceed(InvocationInterceptorChain.java:131)
>  at 
> app//org.junit.jupiter.engine.extension.SameThreadTimeoutInvocation.proceed(SameThreadTimeoutInvocation.java:45)
>  at 
> app//org.junit.jupiter.engine.extension.TimeoutExtension.intercept(TimeoutExtension.java:156)
> at 
> app//org.junit.jupiter.engine.extension.TimeoutExtension.interceptTestableMethod(TimeoutExtension.java:147)
>   at 
> app//org.junit.jupiter.engine.extension.TimeoutExtension.interceptTestMethod(TimeoutExtension.java:86)
>at 
> app//org.junit.jupiter.engine.execution.InterceptingExecutableInvoker$ReflectiveInterceptorCall.lambda$ofVoidMethod$0(InterceptingExecutableInvoker.java:103)
> at 
> app//org.junit.jupiter.engine.execution.InterceptingExecutableInvoker.lambda$invoke$0(InterceptingExecutableInvoker.java:93)
>  at 
> app//org.junit.jupiter.engine.execution.InvocationInterceptorChain$InterceptedInvocation.proceed(InvocationInterceptorChain.java:106)
> at 
> app//org.junit.jupiter.engine.execution.InvocationInterceptorChain.proceed(InvocationInterceptorChain.java:64)
>at 
> app//org.junit.jupiter.e