[GitHub] [kafka] kamalcph commented on a diff in pull request #13747: MINOR: Fix ListOffsetsRequestTest.testResponseIncludesLeaderEpoch
kamalcph commented on code in PR #13747: URL: https://github.com/apache/kafka/pull/13747#discussion_r1206227365 ## core/src/test/scala/unit/kafka/server/ListOffsetsRequestTest.scala: ## @@ -192,7 +192,7 @@ class ListOffsetsRequestTest extends BaseRequestTest { killBroker(firstLeaderId) val secondLeaderId = TestUtils.awaitLeaderChange(servers, partition, firstLeaderId) // make sure high watermark of new leader has caught up -TestUtils.waitUntilTrue(() => sendRequest(secondLeaderId, 0L, -1).errorCode() != Errors.OFFSET_NOT_AVAILABLE.code(), +TestUtils.waitUntilTrue(() => sendRequest(secondLeaderId, ListOffsetsRequest.LATEST_TIMESTAMP, -1).errorCode != Errors.OFFSET_NOT_AVAILABLE.code, Review Comment: Thanks for fixing this! -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] mumrah commented on a diff in pull request #13758: KAFKA-15010 ZK migration failover support
mumrah commented on code in PR #13758: URL: https://github.com/apache/kafka/pull/13758#discussion_r1206199732 ## metadata/src/main/java/org/apache/kafka/metadata/migration/KRaftMigrationZkWriter.java: ## @@ -260,37 +293,37 @@ public void visitScramCredential(String userName, ScramMechanism scramMechanism, }); changedNonUserEntities.forEach(entity -> { -Map quotaMap = clientQuotasImage.entities().get(entity).quotaMap(); -operationConsumer.accept("Update client quotas for " + entity, migrationState -> +Map quotaMap = getClientQuotaMapForEntity(clientQuotasImage, entity); +opConsumer.accept(UPDATE_CLIENT_QUOTA, "Update client quotas for " + entity, migrationState -> migrationClient.configClient().writeClientQuotas(entity.entries(), quotaMap, Collections.emptyMap(), migrationState)); }); changedUsers.forEach(userName -> { ClientQuotaEntity entity = new ClientQuotaEntity(Collections.singletonMap(ClientQuotaEntity.USER, userName)); -Map quotaMap = clientQuotasImage.entities().get(entity).quotaMap(); +Map quotaMap = getClientQuotaMapForEntity(clientQuotasImage, entity); Review Comment: There was an NPE here prior to this patch. If the SCRAM credentials changed, but ClientQuotas did not, the `get(entity)` call would return a null. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] jeffkbkim commented on a diff in pull request #13267: KAFKA-14694: RPCProducerIdManager should not wait on new block
jeffkbkim commented on code in PR #13267: URL: https://github.com/apache/kafka/pull/13267#discussion_r1206145989 ## core/src/test/scala/unit/kafka/coordinator/transaction/ProducerIdManagerTest.scala: ## @@ -113,38 +142,113 @@ class ProducerIdManagerTest { } @ParameterizedTest - @ValueSource(ints = Array(1, 2, 10)) - def testContiguousIds(idBlockLen: Int): Unit = { + @ValueSource(ints = Array(1, 2, 10, 100)) + def testConcurrentGeneratePidRequests(idBlockLen: Int): Unit = { +// Send concurrent generateProducerId requests. Ensure that the generated producer id is unique. +// For each block (total 3 blocks), only "idBlockLen" number of requests should go through. +// All other requests should fail immediately. + +val numThreads = 5 +val latch = new CountDownLatch(idBlockLen * 3) val manager = new MockProducerIdManager(0, 0, idBlockLen) - -IntStream.range(0, idBlockLen * 3).forEach { i => - assertEquals(i, manager.generateProducerId()) +val pidMap = mutable.Map[Long, Int]() +val requestHandlerThreadPool = Executors.newFixedThreadPool(numThreads) + +for ( _ <- 0 until numThreads) { + requestHandlerThreadPool.submit(() => { +while(latch.getCount > 0) { + val result = manager.generateProducerId() + result match { +case Success(pid) => + pidMap synchronized { +if (latch.getCount != 0) { + val counter = pidMap.getOrElse(pid, 0) + pidMap += pid -> (counter + 1) + latch.countDown() +} + } + +case Failure(exception) => + assertEquals(classOf[CoordinatorLoadInProgressException], exception.getClass) + } + Thread.sleep(100) +} + }, 0) +} +assertTrue(latch.await(15000, TimeUnit.MILLISECONDS)) Review Comment: roughly 6 seconds. i have lowered this to 10s -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] jeffkbkim commented on a diff in pull request #13267: KAFKA-14694: RPCProducerIdManager should not wait on new block
jeffkbkim commented on code in PR #13267: URL: https://github.com/apache/kafka/pull/13267#discussion_r1206145853 ## core/src/test/scala/integration/kafka/coordinator/transaction/ProducerIdsIntegrationTest.scala: ## @@ -61,27 +63,59 @@ class ProducerIdsIntegrationTest { clusterInstance.stop() } + @ClusterTest(clusterType = Type.ZK, brokers = 1, autoStart = AutoStart.NO) + @Timeout(20) + def testHandleAllocateProducerIdsSingleRequestHandlerThread(clusterInstance: ClusterInstance): Unit = { + clusterInstance.config().serverProperties().put(KafkaConfig.NumIoThreadsProp, "1") +clusterInstance.start() +verifyUniqueIds(clusterInstance) +clusterInstance.stop() + } + + @Disabled // TODO: Enable once producer id block size is configurable Review Comment: I have appended the JIRA at the end -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] showuon merged pull request #13763: MINOR: use debug level to log handleCommit
showuon merged PR #13763: URL: https://github.com/apache/kafka/pull/13763 -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] showuon commented on pull request #13763: MINOR: use debug level to log handleCommit
showuon commented on PR #13763: URL: https://github.com/apache/kafka/pull/13763#issuecomment-1563700586 Failed tests are unrelated ``` Build / JDK 11 and Scala 2.13 / kafka.server.DynamicBrokerReconfigurationTest.testTrustStoreAlter(String).quorum=kraft Build / JDK 8 and Scala 2.12 / kafka.admin.TopicCommandIntegrationTest.testDescribeAndListTopicsWithoutInternalTopics(String).quorum=zk Build / JDK 17 and Scala 2.13 / kafka.admin.ResetConsumerGroupOffsetTest.testResetOffsetsToZonedDateTime() Build / JDK 17 and Scala 2.13 / org.apache.kafka.tools.MetadataQuorumCommandTest.[1] Type=Raft-CoReside, Name=testDescribeQuorumStatusSuccessful, MetadataVersion=3.4-IV0, Security=PLAINTEXT Build / JDK 17 and Scala 2.13 / org.apache.kafka.trogdor.coordinator.CoordinatorTest.testTaskRequestWithOldStartMsGetsUpdated() ``` -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Updated] (KAFKA-15029) Make ProducerIdBlock size configurable
[ https://issues.apache.org/jira/browse/KAFKA-15029?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Jeff Kim updated KAFKA-15029: - Description: The producer id block size is currently fixed at 1000. Increasing the size allows the pid manager to process more init pid requests before allocating more. The current 1000 is quite small and given that clusters can have thousands of producers, we should make this configurable to adapt in the future. We shoudl also consider updating to a global block size of 5000 ProducerIdsIntegrationTest.scala has a disabled test (from KAFKA-14694) that we should also enable once we have configurable block sizes. was: The producer id block size is currently fixed at 1000. Increasing the size allows the pid manager to process more init pid requests before allocating more. The current 1000 is quite small and given that clusters can have thousands of producers, we should make this configurable to adapt in the future. We shoudl also consider updating to a global block size of 5000 ProducerIdsIntegrationTest.scala has a disabled test that we should also enable once we have configurable block sizes. > Make ProducerIdBlock size configurable > -- > > Key: KAFKA-15029 > URL: https://issues.apache.org/jira/browse/KAFKA-15029 > Project: Kafka > Issue Type: Task >Reporter: Jeff Kim >Priority: Major > > The producer id block size is currently fixed at 1000. Increasing the size > allows the pid manager to process more init pid requests before allocating > more. The current 1000 is quite small and given that clusters can have > thousands of producers, we should make this configurable to adapt in the > future. We shoudl also consider updating to a global block size of 5000 > > ProducerIdsIntegrationTest.scala has a disabled test (from KAFKA-14694) that > we should also enable once we have configurable block sizes. -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Updated] (KAFKA-15029) Make ProducerIdBlock size configurable
[ https://issues.apache.org/jira/browse/KAFKA-15029?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Jeff Kim updated KAFKA-15029: - Description: The producer id block size is currently fixed at 1000. Increasing the size allows the pid manager to process more init pid requests before allocating more. The current 1000 is quite small and given that clusters can have thousands of producers, we should make this configurable to adapt in the future. We shoudl also consider updating to a global block size of 5000 ProducerIdsIntegrationTest.scala has a disabled test that we should also enable once we have configurable block sizes. was:The producer id block size is currently fixed at 1000. Increasing the size allows the pid manager to process more init pid requests before allocating more. The current 1000 is quite small and given that clusters can have thousands of producers, we should make this configurable to adapt in the future. We shoudl also consider updating to a global block size of 5000 > Make ProducerIdBlock size configurable > -- > > Key: KAFKA-15029 > URL: https://issues.apache.org/jira/browse/KAFKA-15029 > Project: Kafka > Issue Type: Task >Reporter: Jeff Kim >Priority: Major > > The producer id block size is currently fixed at 1000. Increasing the size > allows the pid manager to process more init pid requests before allocating > more. The current 1000 is quite small and given that clusters can have > thousands of producers, we should make this configurable to adapt in the > future. We shoudl also consider updating to a global block size of 5000 > > ProducerIdsIntegrationTest.scala has a disabled test that we should also > enable once we have configurable block sizes. -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Created] (KAFKA-15029) Make ProducerIdBlock size configurable
Jeff Kim created KAFKA-15029: Summary: Make ProducerIdBlock size configurable Key: KAFKA-15029 URL: https://issues.apache.org/jira/browse/KAFKA-15029 Project: Kafka Issue Type: Task Reporter: Jeff Kim The producer id block size is currently fixed at 1000. Increasing the size allows the pid manager to process more init pid requests before allocating more. The current 1000 is quite small and given that clusters can have thousands of producers, we should make this configurable to adapt in the future. We shoudl also consider updating to a global block size of 5000 -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Created] (KAFKA-15028) AddPartitionsToTxnManager metrics
Justine Olshan created KAFKA-15028: -- Summary: AddPartitionsToTxnManager metrics Key: KAFKA-15028 URL: https://issues.apache.org/jira/browse/KAFKA-15028 Project: Kafka Issue Type: Sub-task Reporter: Justine Olshan KIP-890 added metrics for the AddPartitionsToTxnManager VerificationTimeMs – number of milliseconds from adding partition info to the manager to the time the response is sent. This will include the round trip to the transaction coordinator if it is called. This will also account for verifications that fail before the coordinator is called. VerificationFailureRate – rate of verifications that returned in failure either from the AddPartitionsToTxn response or through errors in the manager. -- This message was sent by Atlassian Jira (v8.20.10#820010)
[GitHub] [kafka] hachikuji commented on a diff in pull request #13267: KAFKA-14694: RPCProducerIdManager should not wait on new block
hachikuji commented on code in PR #13267: URL: https://github.com/apache/kafka/pull/13267#discussion_r1206088849 ## core/src/test/scala/integration/kafka/coordinator/transaction/ProducerIdsIntegrationTest.scala: ## @@ -61,27 +63,59 @@ class ProducerIdsIntegrationTest { clusterInstance.stop() } + @ClusterTest(clusterType = Type.ZK, brokers = 1, autoStart = AutoStart.NO) + @Timeout(20) + def testHandleAllocateProducerIdsSingleRequestHandlerThread(clusterInstance: ClusterInstance): Unit = { + clusterInstance.config().serverProperties().put(KafkaConfig.NumIoThreadsProp, "1") +clusterInstance.start() +verifyUniqueIds(clusterInstance) +clusterInstance.stop() + } + + @Disabled // TODO: Enable once producer id block size is configurable Review Comment: Can we replace the TODO with a jira? ## core/src/test/scala/unit/kafka/coordinator/transaction/ProducerIdManagerTest.scala: ## @@ -113,38 +142,113 @@ class ProducerIdManagerTest { } @ParameterizedTest - @ValueSource(ints = Array(1, 2, 10)) - def testContiguousIds(idBlockLen: Int): Unit = { + @ValueSource(ints = Array(1, 2, 10, 100)) + def testConcurrentGeneratePidRequests(idBlockLen: Int): Unit = { +// Send concurrent generateProducerId requests. Ensure that the generated producer id is unique. +// For each block (total 3 blocks), only "idBlockLen" number of requests should go through. +// All other requests should fail immediately. + +val numThreads = 5 +val latch = new CountDownLatch(idBlockLen * 3) val manager = new MockProducerIdManager(0, 0, idBlockLen) - -IntStream.range(0, idBlockLen * 3).forEach { i => - assertEquals(i, manager.generateProducerId()) +val pidMap = mutable.Map[Long, Int]() +val requestHandlerThreadPool = Executors.newFixedThreadPool(numThreads) + +for ( _ <- 0 until numThreads) { + requestHandlerThreadPool.submit(() => { +while(latch.getCount > 0) { + val result = manager.generateProducerId() + result match { +case Success(pid) => + pidMap synchronized { +if (latch.getCount != 0) { + val counter = pidMap.getOrElse(pid, 0) + pidMap += pid -> (counter + 1) + latch.countDown() +} + } + +case Failure(exception) => + assertEquals(classOf[CoordinatorLoadInProgressException], exception.getClass) + } + Thread.sleep(100) +} + }, 0) +} +assertTrue(latch.await(15000, TimeUnit.MILLISECONDS)) Review Comment: How long does this test take? ## core/src/main/scala/kafka/coordinator/transaction/TransactionCoordinator.scala: ## @@ -126,17 +132,22 @@ class TransactionCoordinator(txnConfig: TransactionConfig, } else { val coordinatorEpochAndMetadata = txnManager.getTransactionState(transactionalId).flatMap { case None => - val producerId = producerIdManager.generateProducerId() - val createdMetadata = new TransactionMetadata(transactionalId = transactionalId, -producerId = producerId, -lastProducerId = RecordBatch.NO_PRODUCER_ID, -producerEpoch = RecordBatch.NO_PRODUCER_EPOCH, -lastProducerEpoch = RecordBatch.NO_PRODUCER_EPOCH, -txnTimeoutMs = transactionTimeoutMs, -state = Empty, -topicPartitions = collection.mutable.Set.empty[TopicPartition], -txnLastUpdateTimestamp = time.milliseconds()) - txnManager.putTransactionStateIfNotExists(createdMetadata) + val result = producerIdManager.generateProducerId() Review Comment: nit: maybe we don't need `result`. Perhaps a little more concise to match on `producerIdManager.generateProducerId()`? Same comment below. ## server-common/src/main/java/org/apache/kafka/server/common/ProducerIdsBlock.java: ## @@ -32,11 +34,25 @@ public class ProducerIdsBlock { private final int assignedBrokerId; private final long firstProducerId; private final int blockSize; +private final AtomicLong producerIdCounter; public ProducerIdsBlock(int assignedBrokerId, long firstProducerId, int blockSize) { this.assignedBrokerId = assignedBrokerId; this.firstProducerId = firstProducerId; this.blockSize = blockSize; +producerIdCounter = new AtomicLong(firstProducerId); +} + +/** + * Claim the next available producer id from the block. + * Returns an empty result if there are no more available producer ids in the block. + */ +public Optional claimNextId() { +long nextId = producerIdCounter.getAndIncrement(); +if (nextId > lastProducerId()) { Review Comment: This first check is duplicated below. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to Gi
[GitHub] [kafka] jeffkbkim commented on a diff in pull request #13639: KAFKA-14462; [12/N] Add GroupMetadataManager and ConsumerGroup
jeffkbkim commented on code in PR #13639: URL: https://github.com/apache/kafka/pull/13639#discussion_r1206060783 ## group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupMetadataManager.java: ## @@ -0,0 +1,876 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.kafka.coordinator.group; + +import org.apache.kafka.common.Uuid; +import org.apache.kafka.common.errors.ApiException; +import org.apache.kafka.common.errors.FencedMemberEpochException; +import org.apache.kafka.common.errors.GroupIdNotFoundException; +import org.apache.kafka.common.errors.GroupMaxSizeReachedException; +import org.apache.kafka.common.errors.InvalidRequestException; +import org.apache.kafka.common.errors.NotCoordinatorException; +import org.apache.kafka.common.errors.UnknownServerException; +import org.apache.kafka.common.errors.UnsupportedAssignorException; +import org.apache.kafka.common.message.ConsumerGroupHeartbeatRequestData; +import org.apache.kafka.common.message.ConsumerGroupHeartbeatResponseData; +import org.apache.kafka.common.requests.RequestContext; +import org.apache.kafka.common.utils.LogContext; +import org.apache.kafka.coordinator.group.assignor.PartitionAssignor; +import org.apache.kafka.coordinator.group.assignor.PartitionAssignorException; +import org.apache.kafka.coordinator.group.consumer.Assignment; +import org.apache.kafka.coordinator.group.consumer.ConsumerGroup; +import org.apache.kafka.coordinator.group.consumer.ConsumerGroupMember; +import org.apache.kafka.coordinator.group.consumer.CurrentAssignmentBuilder; +import org.apache.kafka.coordinator.group.consumer.TargetAssignmentBuilder; +import org.apache.kafka.coordinator.group.consumer.TopicMetadata; +import org.apache.kafka.coordinator.group.generated.ConsumerGroupCurrentMemberAssignmentKey; +import org.apache.kafka.coordinator.group.generated.ConsumerGroupCurrentMemberAssignmentValue; +import org.apache.kafka.coordinator.group.generated.ConsumerGroupMemberMetadataKey; +import org.apache.kafka.coordinator.group.generated.ConsumerGroupMemberMetadataValue; +import org.apache.kafka.coordinator.group.generated.ConsumerGroupMetadataKey; +import org.apache.kafka.coordinator.group.generated.ConsumerGroupMetadataValue; +import org.apache.kafka.coordinator.group.generated.ConsumerGroupPartitionMetadataKey; +import org.apache.kafka.coordinator.group.generated.ConsumerGroupPartitionMetadataValue; +import org.apache.kafka.coordinator.group.generated.ConsumerGroupTargetAssignmentMemberKey; +import org.apache.kafka.coordinator.group.generated.ConsumerGroupTargetAssignmentMemberValue; +import org.apache.kafka.coordinator.group.generated.ConsumerGroupTargetAssignmentMetadataKey; +import org.apache.kafka.coordinator.group.generated.ConsumerGroupTargetAssignmentMetadataValue; +import org.apache.kafka.image.TopicsImage; +import org.apache.kafka.timeline.SnapshotRegistry; +import org.apache.kafka.timeline.TimelineHashMap; +import org.slf4j.Logger; + +import java.util.ArrayList; +import java.util.Collections; +import java.util.HashMap; +import java.util.List; +import java.util.Map; +import java.util.Optional; +import java.util.OptionalInt; +import java.util.Set; +import java.util.function.Function; +import java.util.stream.Collectors; + +import static org.apache.kafka.coordinator.group.RecordHelpers.newCurrentAssignmentRecord; +import static org.apache.kafka.coordinator.group.RecordHelpers.newCurrentAssignmentTombstoneRecord; +import static org.apache.kafka.coordinator.group.RecordHelpers.newGroupEpochRecord; +import static org.apache.kafka.coordinator.group.RecordHelpers.newGroupSubscriptionMetadataRecord; +import static org.apache.kafka.coordinator.group.RecordHelpers.newMemberSubscriptionRecord; +import static org.apache.kafka.coordinator.group.RecordHelpers.newMemberSubscriptionTombstoneRecord; +import static org.apache.kafka.coordinator.group.RecordHelpers.newTargetAssignmentTombstoneRecord; + +/** + * The GroupMetadataManager manages the metadata of all generic and consumer groups. It holds + * the hard and the soft state of the groups. This class has two kinds of methods: + * 1) The request handlers which handle the requests and generate a respo
[GitHub] [kafka] jolshan commented on a diff in pull request #13639: KAFKA-14462; [12/N] Add GroupMetadataManager and ConsumerGroup
jolshan commented on code in PR #13639: URL: https://github.com/apache/kafka/pull/13639#discussion_r1206064411 ## group-coordinator/src/test/java/org/apache/kafka/coordinator/group/consumer/ConsumerGroupTest.java: ## @@ -0,0 +1,500 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.kafka.coordinator.group.consumer; + +import org.apache.kafka.common.Uuid; +import org.apache.kafka.common.errors.UnknownMemberIdException; +import org.apache.kafka.common.utils.LogContext; +import org.apache.kafka.coordinator.group.GroupMetadataManagerTest; +import org.apache.kafka.image.TopicsImage; +import org.apache.kafka.timeline.SnapshotRegistry; +import org.junit.jupiter.api.Test; + +import java.util.Arrays; +import java.util.Collections; +import java.util.Optional; + +import static org.apache.kafka.common.utils.Utils.mkEntry; +import static org.apache.kafka.common.utils.Utils.mkMap; +import static org.apache.kafka.coordinator.group.AssignmentTestUtil.mkAssignment; +import static org.apache.kafka.coordinator.group.AssignmentTestUtil.mkTopicAssignment; +import static org.junit.jupiter.api.Assertions.assertEquals; +import static org.junit.jupiter.api.Assertions.assertFalse; +import static org.junit.jupiter.api.Assertions.assertThrows; +import static org.junit.jupiter.api.Assertions.assertTrue; + +public class ConsumerGroupTest { + +private ConsumerGroup createConsumerGroup(String groupId) { +SnapshotRegistry snapshotRegistry = new SnapshotRegistry(new LogContext()); +return new ConsumerGroup(snapshotRegistry, groupId); +} + +@Test +public void testGetOrCreateMember() { +ConsumerGroup consumerGroup = createConsumerGroup("foo"); +ConsumerGroupMember member; + +// Create a group. +member = consumerGroup.getOrMaybeCreateMember("member-id", true); +assertEquals("member-id", member.memberId()); + +// Get that group back. +member = consumerGroup.getOrMaybeCreateMember("member-id", false); +assertEquals("member-id", member.memberId()); + +assertThrows(UnknownMemberIdException.class, () -> +consumerGroup.getOrMaybeCreateMember("does-not-exist", false)); +} + +@Test +public void testUpdateMember() { +ConsumerGroup consumerGroup = createConsumerGroup("foo"); +ConsumerGroupMember member; + +member = consumerGroup.getOrMaybeCreateMember("member", true); + +member = new ConsumerGroupMember.Builder(member) +.setSubscribedTopicNames(Arrays.asList("foo", "bar")) +.build(); + +consumerGroup.updateMember(member); + +assertEquals(member, consumerGroup.getOrMaybeCreateMember("member", false)); +} + +@Test +public void testRemoveMember() { +ConsumerGroup consumerGroup = createConsumerGroup("foo"); + +consumerGroup.getOrMaybeCreateMember("member", true); +assertTrue(consumerGroup.hasMember("member")); + +consumerGroup.removeMember("member"); +assertFalse(consumerGroup.hasMember("member")); + +} + +@Test +public void testUpdatingMemberUpdatesPartitionEpoch() { +Uuid fooTopicId = Uuid.randomUuid(); +Uuid barTopicId = Uuid.randomUuid(); +Uuid zarTopicId = Uuid.randomUuid(); + +ConsumerGroup consumerGroup = createConsumerGroup("foo"); +ConsumerGroupMember member; + +member = new ConsumerGroupMember.Builder("member") +.setMemberEpoch(10) +.setAssignedPartitions(mkAssignment( +mkTopicAssignment(fooTopicId, 1, 2, 3))) +.setPartitionsPendingRevocation(mkAssignment( +mkTopicAssignment(barTopicId, 4, 5, 6))) +.setPartitionsPendingAssignment(mkAssignment( +mkTopicAssignment(zarTopicId, 7, 8, 9))) +.build(); + +consumerGroup.updateMember(member); + +assertEquals(10, consumerGroup.currentPartitionEpoch(fooTopicId, 1)); +assertEquals(10, consumerGroup.currentPartitionEpoch(fooTopicId, 2)); +assertEquals(10, consumerGroup.currentPartitionEpoch(fooTopicId, 3)); +assertEquals(10, consumerGroup.currentPartitionEpoch(ba
[GitHub] [kafka] vcrfxia commented on pull request #13756: KAFKA-14936: Add On Disk Time Ordered Buffer (1/N)
vcrfxia commented on PR #13756: URL: https://github.com/apache/kafka/pull/13756#issuecomment-1563584820 Yep, this week's looking a bit dicey but I'll take a look early next week! -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] jolshan commented on a diff in pull request #13639: KAFKA-14462; [12/N] Add GroupMetadataManager and ConsumerGroup
jolshan commented on code in PR #13639: URL: https://github.com/apache/kafka/pull/13639#discussion_r1206053946 ## group-coordinator/src/test/java/org/apache/kafka/coordinator/group/consumer/ConsumerGroupTest.java: ## @@ -0,0 +1,500 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.kafka.coordinator.group.consumer; + +import org.apache.kafka.common.Uuid; +import org.apache.kafka.common.errors.UnknownMemberIdException; +import org.apache.kafka.common.utils.LogContext; +import org.apache.kafka.coordinator.group.GroupMetadataManagerTest; +import org.apache.kafka.image.TopicsImage; +import org.apache.kafka.timeline.SnapshotRegistry; +import org.junit.jupiter.api.Test; + +import java.util.Arrays; +import java.util.Collections; +import java.util.Optional; + +import static org.apache.kafka.common.utils.Utils.mkEntry; +import static org.apache.kafka.common.utils.Utils.mkMap; +import static org.apache.kafka.coordinator.group.AssignmentTestUtil.mkAssignment; +import static org.apache.kafka.coordinator.group.AssignmentTestUtil.mkTopicAssignment; +import static org.junit.jupiter.api.Assertions.assertEquals; +import static org.junit.jupiter.api.Assertions.assertFalse; +import static org.junit.jupiter.api.Assertions.assertThrows; +import static org.junit.jupiter.api.Assertions.assertTrue; + +public class ConsumerGroupTest { + +private ConsumerGroup createConsumerGroup(String groupId) { +SnapshotRegistry snapshotRegistry = new SnapshotRegistry(new LogContext()); +return new ConsumerGroup(snapshotRegistry, groupId); +} + +@Test +public void testGetOrCreateMember() { +ConsumerGroup consumerGroup = createConsumerGroup("foo"); +ConsumerGroupMember member; + +// Create a group. +member = consumerGroup.getOrMaybeCreateMember("member-id", true); +assertEquals("member-id", member.memberId()); + +// Get that group back. +member = consumerGroup.getOrMaybeCreateMember("member-id", false); +assertEquals("member-id", member.memberId()); + +assertThrows(UnknownMemberIdException.class, () -> +consumerGroup.getOrMaybeCreateMember("does-not-exist", false)); +} + +@Test +public void testUpdateMember() { +ConsumerGroup consumerGroup = createConsumerGroup("foo"); +ConsumerGroupMember member; + +member = consumerGroup.getOrMaybeCreateMember("member", true); + +member = new ConsumerGroupMember.Builder(member) +.setSubscribedTopicNames(Arrays.asList("foo", "bar")) +.build(); + +consumerGroup.updateMember(member); + +assertEquals(member, consumerGroup.getOrMaybeCreateMember("member", false)); +} + +@Test +public void testRemoveMember() { +ConsumerGroup consumerGroup = createConsumerGroup("foo"); + +consumerGroup.getOrMaybeCreateMember("member", true); +assertTrue(consumerGroup.hasMember("member")); + +consumerGroup.removeMember("member"); +assertFalse(consumerGroup.hasMember("member")); + +} + +@Test +public void testUpdatingMemberUpdatesPartitionEpoch() { +Uuid fooTopicId = Uuid.randomUuid(); +Uuid barTopicId = Uuid.randomUuid(); +Uuid zarTopicId = Uuid.randomUuid(); + +ConsumerGroup consumerGroup = createConsumerGroup("foo"); +ConsumerGroupMember member; + +member = new ConsumerGroupMember.Builder("member") +.setMemberEpoch(10) +.setAssignedPartitions(mkAssignment( +mkTopicAssignment(fooTopicId, 1, 2, 3))) +.setPartitionsPendingRevocation(mkAssignment( +mkTopicAssignment(barTopicId, 4, 5, 6))) +.setPartitionsPendingAssignment(mkAssignment( +mkTopicAssignment(zarTopicId, 7, 8, 9))) +.build(); + +consumerGroup.updateMember(member); + +assertEquals(10, consumerGroup.currentPartitionEpoch(fooTopicId, 1)); +assertEquals(10, consumerGroup.currentPartitionEpoch(fooTopicId, 2)); +assertEquals(10, consumerGroup.currentPartitionEpoch(fooTopicId, 3)); +assertEquals(10, consumerGroup.currentPartitionEpoch(ba
[GitHub] [kafka] jolshan commented on a diff in pull request #13639: KAFKA-14462; [12/N] Add GroupMetadataManager and ConsumerGroup
jolshan commented on code in PR #13639: URL: https://github.com/apache/kafka/pull/13639#discussion_r1206050414 ## group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupMetadataManager.java: ## @@ -0,0 +1,859 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.kafka.coordinator.group; + +import org.apache.kafka.common.Uuid; +import org.apache.kafka.common.errors.ApiException; +import org.apache.kafka.common.errors.FencedMemberEpochException; +import org.apache.kafka.common.errors.GroupIdNotFoundException; +import org.apache.kafka.common.errors.GroupMaxSizeReachedException; +import org.apache.kafka.common.errors.InvalidRequestException; +import org.apache.kafka.common.errors.NotCoordinatorException; +import org.apache.kafka.common.errors.UnknownServerException; +import org.apache.kafka.common.errors.UnsupportedAssignorException; +import org.apache.kafka.common.message.ConsumerGroupHeartbeatRequestData; +import org.apache.kafka.common.message.ConsumerGroupHeartbeatResponseData; +import org.apache.kafka.common.requests.RequestContext; +import org.apache.kafka.common.utils.LogContext; +import org.apache.kafka.coordinator.group.assignor.PartitionAssignor; +import org.apache.kafka.coordinator.group.assignor.PartitionAssignorException; +import org.apache.kafka.coordinator.group.consumer.Assignment; +import org.apache.kafka.coordinator.group.consumer.ConsumerGroup; +import org.apache.kafka.coordinator.group.consumer.ConsumerGroupMember; +import org.apache.kafka.coordinator.group.consumer.CurrentAssignmentBuilder; +import org.apache.kafka.coordinator.group.consumer.TargetAssignmentBuilder; +import org.apache.kafka.coordinator.group.consumer.TopicMetadata; +import org.apache.kafka.coordinator.group.generated.ConsumerGroupCurrentMemberAssignmentKey; +import org.apache.kafka.coordinator.group.generated.ConsumerGroupCurrentMemberAssignmentValue; +import org.apache.kafka.coordinator.group.generated.ConsumerGroupMemberMetadataKey; +import org.apache.kafka.coordinator.group.generated.ConsumerGroupMemberMetadataValue; +import org.apache.kafka.coordinator.group.generated.ConsumerGroupMetadataKey; +import org.apache.kafka.coordinator.group.generated.ConsumerGroupMetadataValue; +import org.apache.kafka.coordinator.group.generated.ConsumerGroupPartitionMetadataKey; +import org.apache.kafka.coordinator.group.generated.ConsumerGroupPartitionMetadataValue; +import org.apache.kafka.coordinator.group.generated.ConsumerGroupTargetAssignmentMemberKey; +import org.apache.kafka.coordinator.group.generated.ConsumerGroupTargetAssignmentMemberValue; +import org.apache.kafka.coordinator.group.generated.ConsumerGroupTargetAssignmentMetadataKey; +import org.apache.kafka.coordinator.group.generated.ConsumerGroupTargetAssignmentMetadataValue; +import org.apache.kafka.image.TopicsImage; +import org.apache.kafka.timeline.SnapshotRegistry; +import org.apache.kafka.timeline.TimelineHashMap; +import org.slf4j.Logger; + +import java.util.ArrayList; +import java.util.Collections; +import java.util.HashMap; +import java.util.List; +import java.util.Map; +import java.util.Optional; +import java.util.OptionalInt; +import java.util.Set; +import java.util.function.Function; +import java.util.stream.Collectors; + +import static org.apache.kafka.coordinator.group.RecordHelpers.newCurrentAssignmentRecord; +import static org.apache.kafka.coordinator.group.RecordHelpers.newCurrentAssignmentTombstoneRecord; +import static org.apache.kafka.coordinator.group.RecordHelpers.newGroupEpochRecord; +import static org.apache.kafka.coordinator.group.RecordHelpers.newGroupSubscriptionMetadataRecord; +import static org.apache.kafka.coordinator.group.RecordHelpers.newMemberSubscriptionRecord; +import static org.apache.kafka.coordinator.group.RecordHelpers.newMemberSubscriptionTombstoneRecord; +import static org.apache.kafka.coordinator.group.RecordHelpers.newTargetAssignmentTombstoneRecord; + +/** + * The GroupMetadataManager manages the metadata of all generic and consumer groups. It holds + * the hard and the soft state of the groups. This class has two kinds of methods: + * 1) The request handlers which handle the requests and generate a respons
[GitHub] [kafka] jolshan commented on a diff in pull request #13639: KAFKA-14462; [12/N] Add GroupMetadataManager and ConsumerGroup
jolshan commented on code in PR #13639: URL: https://github.com/apache/kafka/pull/13639#discussion_r1206048854 ## group-coordinator/src/main/java/org/apache/kafka/coordinator/group/consumer/ConsumerGroup.java: ## @@ -0,0 +1,535 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.kafka.coordinator.group.consumer; + +import org.apache.kafka.common.Uuid; +import org.apache.kafka.common.errors.UnknownMemberIdException; +import org.apache.kafka.coordinator.group.Group; +import org.apache.kafka.image.TopicImage; +import org.apache.kafka.image.TopicsImage; +import org.apache.kafka.timeline.SnapshotRegistry; +import org.apache.kafka.timeline.TimelineHashMap; +import org.apache.kafka.timeline.TimelineInteger; +import org.apache.kafka.timeline.TimelineObject; + +import java.util.Collections; +import java.util.HashMap; +import java.util.List; +import java.util.Map; +import java.util.Objects; +import java.util.Optional; +import java.util.Set; +import java.util.function.Consumer; + +/** + * A Consumer Group. All the metadata in this class are backed by + * records in the __consumer_offsets partitions. + */ +public class ConsumerGroup implements Group { + +public enum ConsumerGroupState { +EMPTY("empty"), +ASSIGNING("assigning"), +RECONCILING("reconciling"), +STABLE("stable"), +DEAD("dead"); + +private final String name; + +ConsumerGroupState(String name) { +this.name = name; +} + +@Override +public String toString() { +return name; +} +} + +/** + * The snapshot registry. + */ +private final SnapshotRegistry snapshotRegistry; + +/** + * The group id. + */ +private final String groupId; + +/** + * The group state. + */ +private final TimelineObject state; + +/** + * The group epoch. The epoch is incremented whenever the subscriptions + * are updated and it will trigger the computation of a new assignment + * for the group. + */ +private final TimelineInteger groupEpoch; + +/** + * The group members. + */ +private final TimelineHashMap members; + +/** + * The metadata of the subscribed topics. + */ +private final TimelineHashMap subscribedTopicMetadata; + +/** + * The assignment epoch. An assignment epoch smaller than the group epoch means + * that a new assignment is required. The assignment epoch is updated when a new + * assignment is installed. + */ +private final TimelineInteger assignmentEpoch; + +/** + * The target assignment. + */ +private final TimelineHashMap assignments; + +/** + * The current partition epoch maps each topic-partitions to their current epoch where + * the epoch is the epoch of their owners. When a member revokes a partition, it removes + * itself from this map. When a member gets a partition, it adds itself to this map. + */ +private final TimelineHashMap> currentPartitionEpoch; + +public ConsumerGroup( +SnapshotRegistry snapshotRegistry, +String groupId +) { +this.snapshotRegistry = Objects.requireNonNull(snapshotRegistry); +this.groupId = Objects.requireNonNull(groupId); +this.state = new TimelineObject<>(snapshotRegistry, ConsumerGroupState.EMPTY); +this.groupEpoch = new TimelineInteger(snapshotRegistry); +this.members = new TimelineHashMap<>(snapshotRegistry, 0); +this.subscribedTopicMetadata = new TimelineHashMap<>(snapshotRegistry, 0); +this.assignmentEpoch = new TimelineInteger(snapshotRegistry); +this.assignments = new TimelineHashMap<>(snapshotRegistry, 0); +this.currentPartitionEpoch = new TimelineHashMap<>(snapshotRegistry, 0); +} + +/** + * The type of this group. + * + * @return The group type (Consumer). + */ +@Override +public GroupType type() { +return GroupType.CONSUMER; +} + +/** + * The state of this group. + * + * @return The current state as a String. + */ +@Override +public String stateAsString() { +return state.get()
[GitHub] [kafka] rondagostino commented on a diff in pull request #13759: KAFKA-15019: Improve handling of overload situations in the kcontroller
rondagostino commented on code in PR #13759: URL: https://github.com/apache/kafka/pull/13759#discussion_r1206028552 ## metadata/src/main/java/org/apache/kafka/controller/BrokerHeartbeatManager.java: ## @@ -223,6 +283,21 @@ public BrokerHeartbeatState next() { } } +/** + * The maximum number of timed out heartbeats to count. + */ +static final int DEFAULT_TIMED_OUT_HEARTBEAT_COUNT_MAX = 1000; + +/** + * The time period over which to track timed out heartbeats. + */ +static final long DEFAULT_TIMED_OUT_HEARTBEAT_COUNT_WINDOW_NS = TimeUnit.MINUTES.toNanos(5); + +/** + * The number of heartbeats to notice missing before we go into overload. + */ +static final int DEFAULT_TIMED_OUT_HEARTBEAT_OVERLOAD_THRESHOLD = 3; + Review Comment: Whatever we set this to (and currently we always take this default value) will determine how long it would take for a controller to leave the overloaded state, right? So if we see a series of missed heartbeats that puts us into the overloaded state, and then we don't see any more missed heartbeats, it would take this amount of time to leave that overloaded state? Consider the following case. We are overloaded. And then some broker crashes. How long would we want it to take before we move leadership away for any partitions led by that suddenly-dead broker? Ideally the session timeout, of course, but that won't happen if we are overloaded -- will it take 5 minutes as currently coded? That seems like a long time. I think the only thing we can say is that we should move leadership away as soon as we have a correct view of the cluster. The question then becomes: how do we know when we have good information? A fixed amount of time like this with no missed heartbeats -- that seems too simple. We likely have a reasonable view of the cluster if we see enough heartbeats without missing any such that it is likely that all brokers have had a chance to communicate with us. So it seems like maybe the broker heartbeat interval should factor into it somehow? Let's say the broker heartbeat interval is 2 seconds. Right now we consider a heartbeat missed if it is more than half the heartbeat interval (i.e. 1 second) old. Let's say there are 60 brokers in the cluster. If we see something on the order of 60 heartbeats in row without having missing any, then we probably have a reasonable view. So maybe it isn't having missed no more than N heartbeats in some fixed time window. Maybe we define "not overloaded" as having seen some string of heartbeats uninterrupted without seeing a missed one? So every time we miss a heartbeat we go into the "overloaded" state and we reset a counter of contiguous successfully processed heartbeats to 0. Whenever we see a heartbeat and process it in time we increase that counter. We are no longer overloaded when the `contiguousHeartbeatsProcessedInTime` counter is equal to or exceeds the broker count. I'm not sure I have it right, but I think this is a discussion worth pursuing. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] rondagostino commented on a diff in pull request #13759: KAFKA-15019: Improve handling of overload situations in the kcontroller
rondagostino commented on code in PR #13759: URL: https://github.com/apache/kafka/pull/13759#discussion_r1206028552 ## metadata/src/main/java/org/apache/kafka/controller/BrokerHeartbeatManager.java: ## @@ -223,6 +283,21 @@ public BrokerHeartbeatState next() { } } +/** + * The maximum number of timed out heartbeats to count. + */ +static final int DEFAULT_TIMED_OUT_HEARTBEAT_COUNT_MAX = 1000; + +/** + * The time period over which to track timed out heartbeats. + */ +static final long DEFAULT_TIMED_OUT_HEARTBEAT_COUNT_WINDOW_NS = TimeUnit.MINUTES.toNanos(5); + +/** + * The number of heartbeats to notice missing before we go into overload. + */ +static final int DEFAULT_TIMED_OUT_HEARTBEAT_OVERLOAD_THRESHOLD = 3; + Review Comment: Whatever we set this to (and currently we always take this default value) will determine how long it would take for a controller to leave the overloaded state, right? So if we see a series of missed heartbeats that puts us into the overloaded state, and then we don't see any more missed heartbeats, it would take this amount of time to leave that overloaded state? Consider the following case. We are overloaded. And then some broker crashes. How long would we want it to take before we move leadership away for any partitions led by that suddenly-dead broker? Ideally the session timeout, of course, but that won't happen if we are overloaded -- will it take 5 minutes as currently coded? That seems like a long time. I think the only thing we can say is that we should move leadership away as soon as we have a correct view of the cluster. The question then becomes: how do we know when we have good information? A fixed amount of time like this with no missed heartbeats -- that seems too simple. We likely have a reasonable view of the cluster if we see enough heartbeats without missing any such that it is likely that all brokers have had a chance to communicate with us. So it seems like maybe the broker heartbeat interval should factor into it somehow? Let's say the broker heartbeat interval is 2 seconds. Right now we consider a heartbeat missed if it is more than half the heartbeat interval (i.e. 1 second) old. Let's say there are 60 brokers in the cluster. If we see something on the order of 60 heartbeats in row without having missing any, then we probably have a reasonable view. So maybe it isn't having missed no more than N heartbeats in some fixed time window. Maybe we define "not overloaded" as having seen some string of heartbeats uninterrupted without seeing a missed one? So every time we miss a heartbeat we go into the "overloaded" state and we reset a counter of contiguous successfully processed heartbeats to 0. Whenever we see a heartbeat and process it in time we increase that counter. We are no longer overloaded when the `contiguousHeartbeatsProcessedInTime` counter is equal to or exceeds the broker count. I'm not sure I ave it right, but I think this is a discussion worth pursuing. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] rondagostino commented on a diff in pull request #13759: KAFKA-15019: Improve handling of overload situations in the kcontroller
rondagostino commented on code in PR #13759: URL: https://github.com/apache/kafka/pull/13759#discussion_r1205924251 ## metadata/src/main/java/org/apache/kafka/controller/BrokerHeartbeatManager.java: ## @@ -613,4 +733,32 @@ BrokerControlStates calculateNextBrokerState(int brokerId, return new BrokerControlStates(currentState, SHUTDOWN_NOW); } } + +int timedOutHeartbeats(long nowNs) { Review Comment: `private`? It seems all methods are currently package-private. Should any of them be `private` instead? ## metadata/src/main/java/org/apache/kafka/controller/BrokerHeartbeatManager.java: ## @@ -55,6 +59,62 @@ * active controller does not know when the last heartbeats were received from each. */ public class BrokerHeartbeatManager { +static class Builder { +private LogContext logContext = null; +private Time time = Time.SYSTEM; +private long sessionTimeoutNs = 8000; Review Comment: Should probably default to `ClusterControlManager.DEFAULT_SESSION_TIMEOUT_NS` (which is 18_000_000_000, it appears you put in a milliseconds value, and the incorrect one at that). ## metadata/src/main/java/org/apache/kafka/controller/BrokerHeartbeatManager.java: ## @@ -613,4 +733,32 @@ BrokerControlStates calculateNextBrokerState(int brokerId, return new BrokerControlStates(currentState, SHUTDOWN_NOW); } } + +int timedOutHeartbeats(long nowNs) { +return timedOutHeartbeatCounter.count(nowNs); +} + +boolean overloaded(long nowNs) { +return timedOutHeartbeats(nowNs) > timedOutHeartbeatOverloadThreshold; +} + +void handleBrokerHeartbeatTimeout() { +long nowNs = time.nanoseconds(); +boolean alreadyOverloaded = overloaded(nowNs); +timedOutHeartbeatCounter.addEvent(nowNs); +if ((!alreadyOverloaded) && overloaded(nowNs)) { + nonFatalFaultHandler.handleFault(String.format("handleBrokerHeartbeatTimeout: the " + +"active controller missed %d heartbeats in the last %d ms and is now " + +"overloaded.", timedOutHeartbeatOverloadThreshold, +NANOSECONDS.toMillis(timedOutHeartbeatCounter.window(; +} +} + +boolean maybeLogOverloadExit(long nowNs) { +if (!needToLogOverloadExit) return false; +if (overloaded(time.nanoseconds())) return false; +log.error("The active controller is no longer overloaded."); Review Comment: Should this be at the `ERROR` level? ## metadata/src/main/java/org/apache/kafka/controller/BrokerHeartbeatManager.java: ## @@ -613,4 +733,32 @@ BrokerControlStates calculateNextBrokerState(int brokerId, return new BrokerControlStates(currentState, SHUTDOWN_NOW); } } + +int timedOutHeartbeats(long nowNs) { +return timedOutHeartbeatCounter.count(nowNs); +} + +boolean overloaded(long nowNs) { +return timedOutHeartbeats(nowNs) > timedOutHeartbeatOverloadThreshold; +} + +void handleBrokerHeartbeatTimeout() { +long nowNs = time.nanoseconds(); +boolean alreadyOverloaded = overloaded(nowNs); +timedOutHeartbeatCounter.addEvent(nowNs); +if ((!alreadyOverloaded) && overloaded(nowNs)) { + nonFatalFaultHandler.handleFault(String.format("handleBrokerHeartbeatTimeout: the " + Review Comment: Looks like we need to set `needToLogOverloadExit = true` here. ## metadata/src/main/java/org/apache/kafka/controller/BrokerHeartbeatManager.java: ## @@ -223,6 +283,21 @@ public BrokerHeartbeatState next() { } } +/** + * The maximum number of timed out heartbeats to count. + */ +static final int DEFAULT_TIMED_OUT_HEARTBEAT_COUNT_MAX = 1000; + +/** + * The time period over which to track timed out heartbeats. + */ +static final long DEFAULT_TIMED_OUT_HEARTBEAT_COUNT_WINDOW_NS = TimeUnit.MINUTES.toNanos(5); + +/** + * The number of heartbeats to notice missing before we go into overload. + */ +static final int DEFAULT_TIMED_OUT_HEARTBEAT_OVERLOAD_THRESHOLD = 3; + Review Comment: Whatever we set this to (and currently we always take this default value) will determine how long it would take for a controller to leave the overloaded state, right? So if we see a series of missed heartbeats that puts us into the overloaded state, and then we don't see any more missed heartbeats, it would take this amount of time to leave that overloaded state? Consider the following case. We are overloaded. And then some broker crashes. How long would we want it to take before we move leadership away for any partitions led by that suddenly-dead broker? Ideally the session timeout, of course, but that won't happen if we are overloaded -- will it take 5 minutes as currently coded? That seems like a long
[GitHub] [kafka] jolshan commented on a diff in pull request #13608: KAFKA-14884: Include check transaction is still ongoing right before append
jolshan commented on code in PR #13608: URL: https://github.com/apache/kafka/pull/13608#discussion_r1205982602 ## storage/src/main/java/org/apache/kafka/storage/internals/log/ProducerStateEntry.java: ## @@ -41,18 +42,35 @@ public class ProducerStateEntry { private int coordinatorEpoch; private long lastTimestamp; private OptionalLong currentTxnFirstOffset; + +private VerificationState verificationState; + +// Before any batches are associated with the entry, the tentative sequence represents the lowest sequence seen. +private OptionalInt tentativeSequence; + +public enum VerificationState { +EMPTY, +VERIFYING, +VERIFIED +} Review Comment: I've addressed this issue by creating a verification object that is created on first attempt to verify and removed when a marker is written. When verification is needed, we pass this object through and check under log lock in the append path. In the steps above, 6 will clear this object, 7 will set a new one, so 8 will not succeed and error out. When verification is not needed (already verified), we rely on firstTxnOffset being present before appending to the log. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] jolshan commented on a diff in pull request #13608: KAFKA-14884: Include check transaction is still ongoing right before append
jolshan commented on code in PR #13608: URL: https://github.com/apache/kafka/pull/13608#discussion_r1205980757 ## storage/src/main/java/org/apache/kafka/storage/internals/log/ProducerStateEntry.java: ## @@ -104,27 +122,63 @@ public boolean maybeUpdateProducerEpoch(short producerEpoch) { } } +public boolean maybeUpdateProducerHigherEpoch(short producerEpoch) { +if (this.producerEpoch < producerEpoch) { +batchMetadata.clear(); +this.producerEpoch = producerEpoch; +return true; +} else { +return false; +} +} + +// We only set tentative sequence if no batches have been written to the log. It is used to avoid OutOfOrderSequenceExceptions +// when we saw a lower sequence during transaction verification. We will update the sequence when there is no batch metadata if: +// a) There is no tentative sequence yet +// b) A lower sequence for the same epoch is seen and should thereby block records after that +// c) A higher producer epoch is found that will reset the lowest seen sequence +public void maybeUpdateTentativeSequence(int sequence, short producerEpoch) { +if (batchMetadata.isEmpty() && Review Comment: I dug in further and interestingly we do a different check on bumped epochs after markers. We do have a second check here: ``` if (!(currentEntry.producerEpoch() == RecordBatch.NO_PRODUCER_EPOCH || inSequence(currentLastSeq, appendFirstSeq) || currentEntry.tentativeSequence().isPresent())) { throw new OutOfOrderSequenceException("Out of order sequence number for producer " + producerId + " at " + "offset " + offset + " in partition " + topicPartition + ": " + appendFirstSeq + " (incoming seq. number), " + currentLastSeq + " (current end sequence number)"); } ``` I've added the tentative sequence change because this PR actually changes this behavior on first verification. We put the producer epoch in the entry, so that first case no longer applies. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] jolshan commented on pull request #13764: KAFKA-14691; [1/N] Add new fields to OffsetFetchRequest and OffsetFetchResponse
jolshan commented on PR #13764: URL: https://github.com/apache/kafka/pull/13764#issuecomment-1563462503 Since this is still under development, can we set the flag to indicate the latest version is unstable? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] bbejeck commented on pull request #13751: KAFKA-14539: Simplify StreamsMetadataState by replacing the Cluster metadata with partition info map
bbejeck commented on PR #13751: URL: https://github.com/apache/kafka/pull/13751#issuecomment-1563454591 Test failures are unrelated -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] machi1990 commented on pull request #13611: MINOR: remove unused variable from QuorumMetaLogListener#handleCommit method
machi1990 commented on PR #13611: URL: https://github.com/apache/kafka/pull/13611#issuecomment-1563412353 Thanks @jsancio I've added a description to the PR. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] rondagostino commented on pull request #13724: MINOR: more KRaft Metadata Image tests
rondagostino commented on PR #13724: URL: https://github.com/apache/kafka/pull/13724#issuecomment-1563388747 Lots of test failures, but all of the tests touched here succeeded, plus there are no changes to non-test code. So tests failures are unrelated. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] gharris1727 commented on a diff in pull request #13334: MINOR: Move plugin path parsing from DelegatingClassLoader to PluginUtils
gharris1727 commented on code in PR #13334: URL: https://github.com/apache/kafka/pull/13334#discussion_r1205901798 ## connect/runtime/src/main/java/org/apache/kafka/connect/runtime/isolation/PluginUtils.java: ## @@ -188,11 +191,34 @@ public static boolean isClassFile(Path path) { return path.toString().toLowerCase(Locale.ROOT).endsWith(".class"); } -public static List pluginLocations(Path topPath) throws IOException { +public static List pluginLocations(String pluginPath) { +if (pluginPath == null) { +return Collections.emptyList(); +} +String[] pluginPathElements = COMMA_WITH_WHITESPACE.split(pluginPath.trim(), -1); +List pluginLocations = new ArrayList<>(); +for (String path : pluginPathElements) { +try { +Path pluginPathElement = Paths.get(path).toAbsolutePath(); +// Currently 'plugin.paths' property is a list of top-level directories +// containing plugins +if (Files.isDirectory(pluginPathElement)) { +pluginLocations.addAll(pluginLocations(pluginPathElement)); +} else if (isArchive(pluginPathElement)) { +pluginLocations.add(pluginPathElement); +} Review Comment: Thanks for catching that, the Paths.get() throws this exception but it's not checked. I'm not sure if the catch in initLoaders still needs the InvalidPathException, but I left it in just to be safe. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] Hangleton commented on a diff in pull request #13764: KAFKA-14691; [1/N] Add new fields to OffsetFetchRequest and OffsetFetchResponse
Hangleton commented on code in PR #13764: URL: https://github.com/apache/kafka/pull/13764#discussion_r1205888922 ## clients/src/main/resources/common/message/OffsetFetchRequest.json: ## @@ -51,8 +57,9 @@ "about": "The group ID."}, { "name": "Topics", "type": "[]OffsetFetchRequestTopics", "versions": "8+", "nullableVersions": "8+", "about": "Each topic we would like to fetch offsets for, or null to fetch offsets for all topics.", "fields": [ -{ "name": "Name", "type": "string", "versions": "8+", "entityType": "topicName", +{ "name": "Name", "type": "string", "versions": "0-8", "ignorable": true, "entityType": "topicName", "about": "The topic name."}, +{ "name": "TopicId", "type": "uuid", "versions": "9+", "ignorable": true, "about": "The unique topic ID" }, Review Comment: nit: The `"about"` field could be on a new line. ## clients/src/main/resources/common/message/OffsetFetchRequest.json: ## @@ -33,11 +33,17 @@ // Version 7 is adding the require stable flag. // // Version 8 is adding support for fetching offsets for multiple groups at a time - "validVersions": "0-8", + // + // Version 9 adds GenerationIdOrMemberEpoch, MemberId and TopicId fields (KIP-848). + "validVersions": "0-9", "flexibleVersions": "6+", "fields": [ { "name": "GroupId", "type": "string", "versions": "0-7", "entityType": "groupId", "about": "The group to fetch offsets for." }, +{ "name": "GenerationIdOrMemberEpoch", "type": "int32", "versions": "9+", "default": "-1", "ignorable": true, Review Comment: Do we want to add these fields in this PR? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Created] (KAFKA-15025) Implement min-cost flow without balancing tasks for same subtopology
Hao Li created KAFKA-15025: -- Summary: Implement min-cost flow without balancing tasks for same subtopology Key: KAFKA-15025 URL: https://issues.apache.org/jira/browse/KAFKA-15025 Project: Kafka Issue Type: Sub-task Reporter: Hao Li -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Created] (KAFKA-15027) Implement rack aware assignment for standby tasks
Hao Li created KAFKA-15027: -- Summary: Implement rack aware assignment for standby tasks Key: KAFKA-15027 URL: https://issues.apache.org/jira/browse/KAFKA-15027 Project: Kafka Issue Type: Sub-task Reporter: Hao Li -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Created] (KAFKA-15026) Implement min-cost flow balancing tasks for same subtopology
Hao Li created KAFKA-15026: -- Summary: Implement min-cost flow balancing tasks for same subtopology Key: KAFKA-15026 URL: https://issues.apache.org/jira/browse/KAFKA-15026 Project: Kafka Issue Type: Sub-task Reporter: Hao Li -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Created] (KAFKA-15024) Add cost function for task/client
Hao Li created KAFKA-15024: -- Summary: Add cost function for task/client Key: KAFKA-15024 URL: https://issues.apache.org/jira/browse/KAFKA-15024 Project: Kafka Issue Type: Sub-task Reporter: Hao Li -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Created] (KAFKA-15023) Get rack information for source topic partitions for a task
Hao Li created KAFKA-15023: -- Summary: Get rack information for source topic partitions for a task Key: KAFKA-15023 URL: https://issues.apache.org/jira/browse/KAFKA-15023 Project: Kafka Issue Type: Sub-task Reporter: Hao Li -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Updated] (KAFKA-15022) Support rack aware task assignment in Kafka streams
[ https://issues.apache.org/jira/browse/KAFKA-15022?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Hao Li updated KAFKA-15022: --- Labels: kip kip-925 (was: ) > Support rack aware task assignment in Kafka streams > > > Key: KAFKA-15022 > URL: https://issues.apache.org/jira/browse/KAFKA-15022 > Project: Kafka > Issue Type: Improvement > Components: streams >Reporter: Hao Li >Assignee: Hao Li >Priority: Major > Labels: kip, kip-925 > > For KIP-925: > https://cwiki.apache.org/confluence/display/KAFKA/KIP-925%3A+Rack+aware+task+assignment+in+Kafka+Streams -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Created] (KAFKA-15022) Support rack aware task assignment in Kafka streams
Hao Li created KAFKA-15022: -- Summary: Support rack aware task assignment in Kafka streams Key: KAFKA-15022 URL: https://issues.apache.org/jira/browse/KAFKA-15022 Project: Kafka Issue Type: Improvement Reporter: Hao Li For KIP-925: https://cwiki.apache.org/confluence/display/KAFKA/KIP-925%3A+Rack+aware+task+assignment+in+Kafka+Streams -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Updated] (KAFKA-15022) Support rack aware task assignment in Kafka streams
[ https://issues.apache.org/jira/browse/KAFKA-15022?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Hao Li updated KAFKA-15022: --- Component/s: streams > Support rack aware task assignment in Kafka streams > > > Key: KAFKA-15022 > URL: https://issues.apache.org/jira/browse/KAFKA-15022 > Project: Kafka > Issue Type: Improvement > Components: streams >Reporter: Hao Li >Assignee: Hao Li >Priority: Major > > For KIP-925: > https://cwiki.apache.org/confluence/display/KAFKA/KIP-925%3A+Rack+aware+task+assignment+in+Kafka+Streams -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Assigned] (KAFKA-15022) Support rack aware task assignment in Kafka streams
[ https://issues.apache.org/jira/browse/KAFKA-15022?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Hao Li reassigned KAFKA-15022: -- Assignee: Hao Li > Support rack aware task assignment in Kafka streams > > > Key: KAFKA-15022 > URL: https://issues.apache.org/jira/browse/KAFKA-15022 > Project: Kafka > Issue Type: Improvement >Reporter: Hao Li >Assignee: Hao Li >Priority: Major > > For KIP-925: > https://cwiki.apache.org/confluence/display/KAFKA/KIP-925%3A+Rack+aware+task+assignment+in+Kafka+Streams -- This message was sent by Atlassian Jira (v8.20.10#820010)
[GitHub] [kafka] splett2 commented on a diff in pull request #13765: KAFKA-15021; Skip leader epoch bump on ISR shrink
splett2 commented on code in PR #13765: URL: https://github.com/apache/kafka/pull/13765#discussion_r1205862927 ## core/src/main/scala/kafka/cluster/Partition.scala: ## @@ -1087,12 +1087,14 @@ class Partition(val topicPartition: TopicPartition, // avoid unnecessary collection generation val leaderLogEndOffset = leaderLog.logEndOffsetMetadata var newHighWatermark = leaderLogEndOffset -remoteReplicasMap.values.foreach { replica => +remoteReplicasMap.foreachEntry { (replicaId, replica) => Review Comment: can we use `remoteReplicasMap.values` here and use the replica.brokerId similar to the maximalIsr.contains call? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] C0urante commented on a diff in pull request #13334: MINOR: Move plugin path parsing from DelegatingClassLoader to PluginUtils
C0urante commented on code in PR #13334: URL: https://github.com/apache/kafka/pull/13334#discussion_r1205854894 ## connect/runtime/src/main/java/org/apache/kafka/connect/runtime/isolation/PluginUtils.java: ## @@ -188,11 +191,34 @@ public static boolean isClassFile(Path path) { return path.toString().toLowerCase(Locale.ROOT).endsWith(".class"); } -public static List pluginLocations(Path topPath) throws IOException { +public static List pluginLocations(String pluginPath) { +if (pluginPath == null) { +return Collections.emptyList(); +} +String[] pluginPathElements = COMMA_WITH_WHITESPACE.split(pluginPath.trim(), -1); +List pluginLocations = new ArrayList<>(); +for (String path : pluginPathElements) { +try { +Path pluginPathElement = Paths.get(path).toAbsolutePath(); +// Currently 'plugin.paths' property is a list of top-level directories +// containing plugins +if (Files.isDirectory(pluginPathElement)) { +pluginLocations.addAll(pluginLocations(pluginPathElement)); +} else if (isArchive(pluginPathElement)) { +pluginLocations.add(pluginPathElement); +} Review Comment: Do we also want to include the catch block for `InvalidPathException` here, which matches the [logic on trunk](https://github.com/apache/kafka/blob/dc00832b965c29fbbb4148cc680fadfc3f28642a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/isolation/DelegatingClassLoader.java#L266-L267)? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Reopened] (KAFKA-8713) [Connect] JsonConverter NULL Values are replaced by default values even in NULLABLE fields
[ https://issues.apache.org/jira/browse/KAFKA-8713?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Greg Harris reopened KAFKA-8713: > [Connect] JsonConverter NULL Values are replaced by default values even in > NULLABLE fields > -- > > Key: KAFKA-8713 > URL: https://issues.apache.org/jira/browse/KAFKA-8713 > Project: Kafka > Issue Type: Bug > Components: KafkaConnect >Affects Versions: 2.0.0, 2.0.1, 2.1.0, 2.2.0, 2.3.0, 2.2.1 >Reporter: Cheng Pan >Assignee: Mickael Maison >Priority: Major > Labels: needs-kip > Fix For: 3.5.0 > > > Class JsonConverter line: 582 > {code:java} > private static JsonNode convertToJson(Schema schema, Object logicalValue) > { > if (logicalValue == null) { > if (schema == null) // Any schema is valid and we don't have a > default, so treat this as an optional schema > return null; > if (schema.defaultValue() != null) > return convertToJson(schema, schema.defaultValue()); > if (schema.isOptional()) > return JsonNodeFactory.instance.nullNode(); > throw new DataException("Conversion error: null value for field > that is required and has no default value"); > } > > } > {code} > h1.Expect: > Value `null` is valid for an optional filed, even though the filed has a > default value. > Only when field is required, the converter return default value fallback > when value is `null`. > h1.Actual: > Always return default value if `null` was given. > h1. Example: > I'm not sure if the current behavior is the exactly expected, but at least on > MySQL, a table define as > {code:sql} > create table t1 { >name varchar(40) not null, >create_time datetime default '1999-01-01 11:11:11' null, >update_time datetime default '1999-01-01 11:11:11' null > } > {code} > Just insert a record: > {code:sql} > INSERT INTO `t1` (`name`, `update_time`) VALUES ('kafka', null); > {code} > The result is: > {code:json} > { > "name": "kafka", > "create_time": "1999-01-01 11:11:11", > "update_time": null > } > {code} > But when I use debezium pull binlog and send the record to Kafka with > JsonConverter, the result changed to: > {code:json} > { > "name": "kafka", > "create_time": "1999-01-01 11:11:11", > "update_time": "1999-01-01 11:11:11" > } > {code} > For more details, see: https://issues.jboss.org/browse/DBZ-1064 -- This message was sent by Atlassian Jira (v8.20.10#820010)
[GitHub] [kafka] gharris1727 commented on pull request #13748: [BUGFIX] Bugfixed in KAFKA-8713, but it doesn't work properly.
gharris1727 commented on PR #13748: URL: https://github.com/apache/kafka/pull/13748#issuecomment-1563306818 Hey @krespo Thank you for clarifying your setup. I was able to reproduce this issue in a unit test. I'll bring it up to the release managers to see if we can get a fix into 3.5. Thanks so much for validating this feature pre-release. Unfortunately this patch can't move forward as-is, because people are certainly relying on the Struct::get returning defaults. However, we can alter the JsonConverter to use Struct::getWithoutDefault, and use the `replace.null.with.default` configuration to gate whether to insert the default to maintain backwards compatibility. If you can update this patch to call getWithoutDefault, and add some unit tests for the struct field case, I think that will be merge-able. cc @mimaison @C0urante -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] jsancio commented on pull request #13611: MINOR: remove unused variable from QuorumMetaLogListener#handleCommit method
jsancio commented on PR #13611: URL: https://github.com/apache/kafka/pull/13611#issuecomment-1563305554 I'll take a look @machi1990 . In the mean time, can you please add a description? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] jsancio opened a new pull request, #13765: KAFKA-15021; Skip leader epoch bump
jsancio opened a new pull request, #13765: URL: https://github.com/apache/kafka/pull/13765 TODO: Write description ### Committer Checklist (excluded from commit message) - [ ] Verify design and implementation - [ ] Verify test coverage and CI build status - [ ] Verify documentation (including upgrade notes) -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] gharris1727 commented on a diff in pull request #13356: KAFKA-14789: Prevent mis-attributing classpath plugins, allow discovery of classpath RestExtension and ConfigProvider
gharris1727 commented on code in PR #13356: URL: https://github.com/apache/kafka/pull/13356#discussion_r1205800242 ## connect/runtime/src/main/java/org/apache/kafka/connect/runtime/isolation/DelegatingClassLoader.java: ## @@ -431,15 +427,29 @@ private Collection> getServiceLoaderPluginDesc(Class klass, log.error("Failed to discover {}{}", klass.getSimpleName(), reflectiveErrorDescription(t.getCause()), t); continue; } -result.add(pluginDesc((Class) pluginImpl.getClass(), -versionFor(pluginImpl), loader)); +Class pluginKlass = (Class) pluginImpl.getClass(); +if (!isParentClassloader(pluginKlass.getClassLoader(), loader)) { +log.debug("Exclude {} that is from classloader {}", pluginKlass.getSimpleName(), pluginKlass.getClassLoader()); +continue; +} +result.add(pluginDesc(pluginKlass, versionFor(pluginImpl), loader)); } } finally { Plugins.compareAndSwapLoaders(savedLoader); } return result; } +private static boolean isParentClassloader(ClassLoader loader, ClassLoader parent) { +while (loader != null) { +if (loader == parent) { +return true; +} +loader = loader.getParent(); +} +return false; Review Comment: I wrote this in case a plugin came from a classloader which was a child of the PluginClassLoader. Thinking about it again, I don't know how a plugin would manage that, since the PluginClassLoader doesn't have any child-first logic that would permit it to load classes from deeper in the classloading hierarchy. I'll simplify this logic. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] jolshan commented on a diff in pull request #13608: KAFKA-14884: Include check transaction is still ongoing right before append
jolshan commented on code in PR #13608: URL: https://github.com/apache/kafka/pull/13608#discussion_r1205794685 ## core/src/main/scala/kafka/log/UnifiedLog.scala: ## @@ -980,6 +1007,15 @@ class UnifiedLog(@volatile var logStartOffset: Long, if (duplicateBatch.isPresent) { return (updatedProducers, completedTxns.toList, Some(duplicateBatch.get())) } + + // Verify that if the record is transactional & the append origin is client, that we are in VERIFIED state. + // Also check that we are not appending a record with a higher sequence than one previously seen through verification. + if (batch.isTransactional && producerStateManager.producerStateManagerConfig().transactionVerificationEnabled()) { +if (verificationState(batch.producerId(), batch.producerEpoch()) != ProducerStateEntry.VerificationState.VERIFIED) { + throw new InvalidRecordException("Record was not part of an ongoing transaction") +} else if (maybeLastEntry.isPresent && maybeLastEntry.get.tentativeSequence.isPresent && maybeLastEntry.get.tentativeSequence.getAsInt < batch.baseSequence) Review Comment: One thing that is tricky is that the producer state entry used in the sequence check is actually not the -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] jolshan commented on a diff in pull request #13608: KAFKA-14884: Include check transaction is still ongoing right before append
jolshan commented on code in PR #13608: URL: https://github.com/apache/kafka/pull/13608#discussion_r1205792946 ## core/src/main/scala/kafka/log/UnifiedLog.scala: ## @@ -980,6 +1007,15 @@ class UnifiedLog(@volatile var logStartOffset: Long, if (duplicateBatch.isPresent) { return (updatedProducers, completedTxns.toList, Some(duplicateBatch.get())) } + + // Verify that if the record is transactional & the append origin is client, that we are in VERIFIED state. + // Also check that we are not appending a record with a higher sequence than one previously seen through verification. + if (batch.isTransactional && producerStateManager.producerStateManagerConfig().transactionVerificationEnabled()) { +if (verificationState(batch.producerId(), batch.producerEpoch()) != ProducerStateEntry.VerificationState.VERIFIED) { + throw new InvalidRecordException("Record was not part of an ongoing transaction") +} else if (maybeLastEntry.isPresent && maybeLastEntry.get.tentativeSequence.isPresent && maybeLastEntry.get.tentativeSequence.getAsInt < batch.baseSequence) Review Comment: Did we want to move the verification check there? Or just tentative sequence? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] jolshan commented on a diff in pull request #13608: KAFKA-14884: Include check transaction is still ongoing right before append
jolshan commented on code in PR #13608: URL: https://github.com/apache/kafka/pull/13608#discussion_r1205777679 ## storage/src/main/java/org/apache/kafka/storage/internals/log/ProducerStateEntry.java: ## @@ -104,27 +122,63 @@ public boolean maybeUpdateProducerEpoch(short producerEpoch) { } } +public boolean maybeUpdateProducerHigherEpoch(short producerEpoch) { +if (this.producerEpoch < producerEpoch) { +batchMetadata.clear(); +this.producerEpoch = producerEpoch; +return true; +} else { +return false; +} +} + +// We only set tentative sequence if no batches have been written to the log. It is used to avoid OutOfOrderSequenceExceptions +// when we saw a lower sequence during transaction verification. We will update the sequence when there is no batch metadata if: +// a) There is no tentative sequence yet +// b) A lower sequence for the same epoch is seen and should thereby block records after that +// c) A higher producer epoch is found that will reset the lowest seen sequence +public void maybeUpdateTentativeSequence(int sequence, short producerEpoch) { +if (batchMetadata.isEmpty() && Review Comment: I dug into this a bit more. We actually do check sequence on bumped epoch! ``` if (producerEpoch != updatedEntry.producerEpoch()) { if (appendFirstSeq != 0) { if (updatedEntry.producerEpoch() != RecordBatch.NO_PRODUCER_EPOCH) { throw new OutOfOrderSequenceException("Invalid sequence number for new epoch of producer " + producerId + "at offset " + offset + " in partition " + topicPartition + ": " + producerEpoch + " (request epoch), " + appendFirstSeq + " (seq. number), " + updatedEntry.producerEpoch() + " (current producer epoch)"); ``` Basically we check when the updated epoch is different than the epoch we want to use. The tricky part is that we update the epoch when creating the verification state, so we just need to do that check there somehow. I will look into it. ## storage/src/main/java/org/apache/kafka/storage/internals/log/ProducerStateEntry.java: ## @@ -104,27 +122,63 @@ public boolean maybeUpdateProducerEpoch(short producerEpoch) { } } +public boolean maybeUpdateProducerHigherEpoch(short producerEpoch) { +if (this.producerEpoch < producerEpoch) { +batchMetadata.clear(); +this.producerEpoch = producerEpoch; +return true; +} else { +return false; +} +} + +// We only set tentative sequence if no batches have been written to the log. It is used to avoid OutOfOrderSequenceExceptions +// when we saw a lower sequence during transaction verification. We will update the sequence when there is no batch metadata if: +// a) There is no tentative sequence yet +// b) A lower sequence for the same epoch is seen and should thereby block records after that +// c) A higher producer epoch is found that will reset the lowest seen sequence +public void maybeUpdateTentativeSequence(int sequence, short producerEpoch) { +if (batchMetadata.isEmpty() && Review Comment: I dug into this a bit more. We actually do check sequence on bumped epoch! ``` if (producerEpoch != updatedEntry.producerEpoch()) { if (appendFirstSeq != 0) { if (updatedEntry.producerEpoch() != RecordBatch.NO_PRODUCER_EPOCH) { throw new OutOfOrderSequenceException("Invalid sequence number for new epoch of producer " + producerId + "at offset " + offset + " in partition " + topicPartition + ": " + producerEpoch + " (request epoch), " + appendFirstSeq + " (seq. number), " + updatedEntry.producerEpoch() + " (current producer epoch)"); ``` Basically we check when the updated epoch is different than the epoch we want to use. The tricky part is that we update the epoch when creating the verification state, so we just need to do that check there somehow. I will look into it. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Commented] (KAFKA-14953) Add metrics for tiered storage
[ https://issues.apache.org/jira/browse/KAFKA-14953?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17726301#comment-17726301 ] Abhijeet Kumar commented on KAFKA-14953: Yes, that is in the plan as mentioned by Satish in the comment: [https://github.com/apache/kafka/pull/13535#discussion_r1181518655] I plan to add all the metrics in scope. > Add metrics for tiered storage > -- > > Key: KAFKA-14953 > URL: https://issues.apache.org/jira/browse/KAFKA-14953 > Project: Kafka > Issue Type: Sub-task >Reporter: Divij Vaidya >Assignee: Abhijeet Kumar >Priority: Minor > > Not just for expired fetch. We also need to add all the metrics described in > KIP-405 > https://cwiki.apache.org/confluence/display/KAFKA/KIP-405%3A+Kafka+Tiered+Storage#KIP405:KafkaTieredStorage-NewMetrics > > ref: [https://github.com/apache/kafka/pull/13535#discussion_r1180286031] -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Updated] (KAFKA-9800) [KIP-580] Client Exponential Backoff Implementation
[ https://issues.apache.org/jira/browse/KAFKA-9800?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Kirk True updated KAFKA-9800: - Labels: KIP-580 client (was: KIP-580) > [KIP-580] Client Exponential Backoff Implementation > --- > > Key: KAFKA-9800 > URL: https://issues.apache.org/jira/browse/KAFKA-9800 > Project: Kafka > Issue Type: New Feature >Reporter: Cheng Tan >Assignee: Luke Chen >Priority: Major > Labels: KIP-580, client > > Design: > The main idea is to bookkeep the failed attempt. Currently, the retry backoff > has two main usage patterns: > # Synchronous retires and blocking loop. The thread will sleep in each > iteration for retry backoff ms. > # Async retries. In each polling, the retries do not meet the backoff will > be filtered. The data class often maintains a 1:1 mapping to a set of > requests which are logically associated. (i.e. a set contains only one > initial request and only its retries.) > For type 1, we can utilize a local failure counter of a Java generic data > type. > For case 2, I already wrapped the exponential backoff/timeout util class in > my KIP-601 > [implementation|https://github.com/apache/kafka/pull/8683/files#diff-9ca2b1294653dfa914b9277de62b52e3R28] > which takes the number of attempts and returns the backoff/timeout value at > the corresponding level. Thus, we can add a new class property to those > classes containing retriable data in order to record the number of failed > attempts. > > Changes: > KafkaProducer: > # Produce request (ApiKeys.PRODUCE). Currently, the backoff applies to each > ProducerBatch in Accumulator, which already has an attribute attempts > recording the number of failed attempts. So we can let the Accumulator > calculate the new retry backoff for each bach when it enqueues them, to avoid > instantiate the util class multiple times. > # Transaction request (ApiKeys..*TXN). TxnRequestHandler will have a new > class property of type `Long` to record the number of attempts. > KafkaConsumer: > # Some synchronous retry use cases. Record the failed attempts in the > blocking loop. > # Partition request (ApiKeys.OFFSET_FOR_LEADER_EPOCH, ApiKeys.LIST_OFFSETS). > Though the actual requests are packed for each node, the current > implementation is applying backoff to each topic partition, where the backoff > value is kept by TopicPartitionState. Thus, TopicPartitionState will have the > new property recording the number of attempts. > Metadata: > # Metadata lives as a singleton in many clients. Add a new property > recording the number of attempts > AdminClient: > # AdminClient has its own request abstraction Call. The failed attempts are > already kept by the abstraction. So probably clean the Call class logic a bit. > Existing tests: > # If the tests are testing the retry backoff, add a delta to the assertion, > considering the existence of the jitter. > # If the tests are testing other functionality, we can specify the same > value for both `retry.backoff.ms` and `retry.backoff.max.ms` in order to make > the retry backoff static. We can use this trick to make the existing tests > compatible with the changes. > There're other common usages look like client.poll(timeout), where the > timeout passed in is the retry backoff value. We won't change these usages > since its underlying logic is nioSelector.select(timeout) and > nioSelector.selectNow(), which means if no interested op exists, the client > will block retry backoff milliseconds. This is an optimization when there's > no request that needs to be sent but the client is waiting for responses. > Specifically, if the client fails the inflight requests before the retry > backoff milliseconds passed, it still needs to wait until that amount of time > passed, unless there's a new request need to be sent. > -- This message was sent by Atlassian Jira (v8.20.10#820010)
[GitHub] [kafka] bbejeck commented on pull request #13751: KAFKA-14539: Simplify StreamsMetadataState by replacing the Cluster metadata with partition info map
bbejeck commented on PR #13751: URL: https://github.com/apache/kafka/pull/13751#issuecomment-1563134216 ping @mjsax or @ableegoldman for a second review -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] bbejeck commented on a diff in pull request #13751: KAFKA-14539: Simplify StreamsMetadataState by replacing the Cluster metadata with partition info map
bbejeck commented on code in PR #13751: URL: https://github.com/apache/kafka/pull/13751#discussion_r1205696101 ## streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamsMetadataState.java: ## @@ -558,7 +564,7 @@ private SourceTopicsInfo getSourceTopicsInfo(final String storeName, final Strin } private boolean isInitialized() { -return clusterMetadata != null && !clusterMetadata.topics().isEmpty() && localMetadata.get() != null; +return partitionsByTopic != null && !partitionsByTopic.keySet().isEmpty() && localMetadata.get() != null; Review Comment: Could be simplified to `partitionsByTopic.isEmpty()` ## streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamsMetadataState.java: ## @@ -308,12 +308,18 @@ public synchronized KeyQueryMetadata getKeyQueryMetadataForKey(final String * * @param activePartitionHostMap the current mapping of {@link HostInfo} -> {@link TopicPartition}s for active partitions * @param standbyPartitionHostMap the current mapping of {@link HostInfo} -> {@link TopicPartition}s for standby partitions - * @param clusterMetadata the current clusterMetadata {@link Cluster} + * @param topicPartitionInfo the current mapping of {@link TopicPartition} -> {@Link PartitionInfo} */ synchronized void onChange(final Map> activePartitionHostMap, final Map> standbyPartitionHostMap, - final Cluster clusterMetadata) { -this.clusterMetadata = clusterMetadata; + final Map topicPartitionInfo) { +this.partitionsByTopic = new HashMap<>(); +for (final Map.Entry entry: topicPartitionInfo.entrySet()) { Review Comment: nit: use `topicPartitionInfo.entrySet().foreach( entry -> {...} )` -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] Hangleton commented on a diff in pull request #13558: KAFKA-14845: Fix broker registration with Zookeeper when the previous ephemeral znode was not properly recorded by the broker
Hangleton commented on code in PR #13558: URL: https://github.com/apache/kafka/pull/13558#discussion_r1205669500 ## checkstyle/suppressions.xml: ## @@ -41,6 +41,8 @@ + Review Comment: Added the packages to import-control-core in [this commit](https://github.com/apache/kafka/pull/13558/commits/03d444b94de2494548b61cd0458c842c05872e3f). I had to kept a suppression rule for the package `org.apache.zookeeper` because it is not part of any import control and Checkstyle would not accept it without a proper import control definition. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] Hangleton commented on a diff in pull request #13558: KAFKA-14845: Fix broker registration with Zookeeper when the previous ephemeral znode was not properly recorded by the broker
Hangleton commented on code in PR #13558: URL: https://github.com/apache/kafka/pull/13558#discussion_r1205669500 ## checkstyle/suppressions.xml: ## @@ -41,6 +41,8 @@ + Review Comment: Added the packages to import-control-core in [this commit](https://github.com/apache/kafka/pull/13558/commits/03d444b94de2494548b61cd0458c842c05872e3f). I had to kept a suppression rule for the package `org.apache.zookeeper` because it is not part of any import control and Checkstyle would not accept it without it. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] C0urante commented on a diff in pull request #13356: KAFKA-14789: Prevent mis-attributing classpath plugins, allow discovery of classpath RestExtension and ConfigProvider
C0urante commented on code in PR #13356: URL: https://github.com/apache/kafka/pull/13356#discussion_r1205631278 ## connect/runtime/src/main/java/org/apache/kafka/connect/runtime/isolation/DelegatingClassLoader.java: ## @@ -431,15 +427,29 @@ private Collection> getServiceLoaderPluginDesc(Class klass, log.error("Failed to discover {}{}", klass.getSimpleName(), reflectiveErrorDescription(t.getCause()), t); continue; } -result.add(pluginDesc((Class) pluginImpl.getClass(), -versionFor(pluginImpl), loader)); +Class pluginKlass = (Class) pluginImpl.getClass(); +if (!isParentClassloader(pluginKlass.getClassLoader(), loader)) { +log.debug("Exclude {} that is from classloader {}", pluginKlass.getSimpleName(), pluginKlass.getClassLoader()); +continue; +} +result.add(pluginDesc(pluginKlass, versionFor(pluginImpl), loader)); } } finally { Plugins.compareAndSwapLoaders(savedLoader); } return result; } +private static boolean isParentClassloader(ClassLoader loader, ClassLoader parent) { +while (loader != null) { +if (loader == parent) { +return true; +} +loader = loader.getParent(); +} +return false; Review Comment: I replaced this method body with a simple `return loader == parent` equality check and all unit tests passed. Can you shed some light on why the logic here is necessary? And possibly add a test case to demonstrate? ## connect/runtime/src/main/java/org/apache/kafka/connect/runtime/isolation/DelegatingClassLoader.java: ## @@ -398,15 +390,19 @@ private Collection> getPluginDesc( } Collection> result = new ArrayList<>(); -for (Class plugin : plugins) { -if (PluginUtils.isConcrete(plugin)) { -try { -result.add(pluginDesc(plugin, versionFor(plugin), loader)); -} catch (ReflectiveOperationException | LinkageError e) { -log.error("Failed to discover {}: Unable to instantiate {}{}", klass.getSimpleName(), plugin.getSimpleName(), reflectiveErrorDescription(e), e); -} -} else { -log.debug("Skipping {} as it is not concrete implementation", plugin); +for (Class pluginKlass : plugins) { +if (!PluginUtils.isConcrete(pluginKlass)) { +log.debug("Skipping {} as it is not concrete implementation", pluginKlass); +continue; +} +if (!isParentClassloader(pluginKlass.getClassLoader(), loader)) { +log.debug("Exclude {} that is from classloader {}", pluginKlass.getSimpleName(), pluginKlass.getClassLoader()); Review Comment: Should we also state why this classloader warrants exclusion of the scanned plugin? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] jeffkbkim commented on a diff in pull request #13639: KAFKA-14462; [12/N] Add GroupMetadataManager and ConsumerGroup
jeffkbkim commented on code in PR #13639: URL: https://github.com/apache/kafka/pull/13639#discussion_r1204874200 ## group-coordinator/src/test/java/org/apache/kafka/coordinator/group/consumer/ConsumerGroupTest.java: ## @@ -0,0 +1,500 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.kafka.coordinator.group.consumer; + +import org.apache.kafka.common.Uuid; +import org.apache.kafka.common.errors.UnknownMemberIdException; +import org.apache.kafka.common.utils.LogContext; +import org.apache.kafka.coordinator.group.GroupMetadataManagerTest; +import org.apache.kafka.image.TopicsImage; +import org.apache.kafka.timeline.SnapshotRegistry; +import org.junit.jupiter.api.Test; + +import java.util.Arrays; +import java.util.Collections; +import java.util.Optional; + +import static org.apache.kafka.common.utils.Utils.mkEntry; +import static org.apache.kafka.common.utils.Utils.mkMap; +import static org.apache.kafka.coordinator.group.AssignmentTestUtil.mkAssignment; +import static org.apache.kafka.coordinator.group.AssignmentTestUtil.mkTopicAssignment; +import static org.junit.jupiter.api.Assertions.assertEquals; +import static org.junit.jupiter.api.Assertions.assertFalse; +import static org.junit.jupiter.api.Assertions.assertThrows; +import static org.junit.jupiter.api.Assertions.assertTrue; + +public class ConsumerGroupTest { + +private ConsumerGroup createConsumerGroup(String groupId) { +SnapshotRegistry snapshotRegistry = new SnapshotRegistry(new LogContext()); +return new ConsumerGroup(snapshotRegistry, groupId); +} + +@Test +public void testGetOrCreateMember() { +ConsumerGroup consumerGroup = createConsumerGroup("foo"); +ConsumerGroupMember member; + +// Create a group. +member = consumerGroup.getOrMaybeCreateMember("member-id", true); +assertEquals("member-id", member.memberId()); + +// Get that group back. +member = consumerGroup.getOrMaybeCreateMember("member-id", false); +assertEquals("member-id", member.memberId()); + +assertThrows(UnknownMemberIdException.class, () -> +consumerGroup.getOrMaybeCreateMember("does-not-exist", false)); +} + +@Test +public void testUpdateMember() { +ConsumerGroup consumerGroup = createConsumerGroup("foo"); +ConsumerGroupMember member; + +member = consumerGroup.getOrMaybeCreateMember("member", true); + +member = new ConsumerGroupMember.Builder(member) +.setSubscribedTopicNames(Arrays.asList("foo", "bar")) +.build(); + +consumerGroup.updateMember(member); + +assertEquals(member, consumerGroup.getOrMaybeCreateMember("member", false)); +} + +@Test +public void testRemoveMember() { +ConsumerGroup consumerGroup = createConsumerGroup("foo"); + +consumerGroup.getOrMaybeCreateMember("member", true); +assertTrue(consumerGroup.hasMember("member")); + +consumerGroup.removeMember("member"); +assertFalse(consumerGroup.hasMember("member")); + +} + +@Test +public void testUpdatingMemberUpdatesPartitionEpoch() { +Uuid fooTopicId = Uuid.randomUuid(); +Uuid barTopicId = Uuid.randomUuid(); +Uuid zarTopicId = Uuid.randomUuid(); + +ConsumerGroup consumerGroup = createConsumerGroup("foo"); +ConsumerGroupMember member; + +member = new ConsumerGroupMember.Builder("member") +.setMemberEpoch(10) +.setAssignedPartitions(mkAssignment( +mkTopicAssignment(fooTopicId, 1, 2, 3))) +.setPartitionsPendingRevocation(mkAssignment( +mkTopicAssignment(barTopicId, 4, 5, 6))) +.setPartitionsPendingAssignment(mkAssignment( +mkTopicAssignment(zarTopicId, 7, 8, 9))) +.build(); + +consumerGroup.updateMember(member); + +assertEquals(10, consumerGroup.currentPartitionEpoch(fooTopicId, 1)); +assertEquals(10, consumerGroup.currentPartitionEpoch(fooTopicId, 2)); +assertEquals(10, consumerGroup.currentPartitionEpoch(fooTopicId, 3)); +assertEquals(10, consumerGroup.currentPartitionEpoch(
[GitHub] [kafka] Hangleton commented on a diff in pull request #13558: KAFKA-14845: Fix broker registration with Zookeeper when the previous ephemeral znode was not properly recorded by the broker
Hangleton commented on code in PR #13558: URL: https://github.com/apache/kafka/pull/13558#discussion_r1205648643 ## checkstyle/suppressions.xml: ## @@ -41,6 +41,8 @@ + Review Comment: Apologies if I am missing the obvious, but am I getting it correctly that it is not only an ordering problem, but that the packages of the imported classes need to be defined in [import-control-core.xml](https://github.com/apache/kafka/blob/trunk/checkstyle/import-control-core.xml)? I will modify that file and remove the suppressions rule. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Created] (KAFKA-15021) KRaft controller increases leader epoch when shrinking ISR
José Armando García Sancio created KAFKA-15021: -- Summary: KRaft controller increases leader epoch when shrinking ISR Key: KAFKA-15021 URL: https://issues.apache.org/jira/browse/KAFKA-15021 Project: Kafka Issue Type: Bug Components: controller, kraft Reporter: José Armando García Sancio Assignee: José Armando García Sancio When the KRaft controller shrinks the ISR it also forces the leader epoch to increase. This is unnecessary and cases all of the follower replica fetches to get invalidated. Here is an example trace of this behavior after replica 8 was shutdown: {code:java} kafka-dump-log --cluster-metadata-decoder --files __cluster_metadata-0/38589501.log | grep Pd7wMb4lSkKI00--SrWNXw ... | offset: 38655592 CreateTime: 1683849857362 keySize: -1 valueSize: 41 sequence: -1 headerKeys: [] payload: {"type":"PARTITION_CHANGE_RECORD","version":0,"data":{"partitionId":7,"topicId":"Pd7wMb4lSkKI00--SrWNXw","isr":[3,1],"leader":1}} | offset: 38655593 CreateTime: 1683849857362 keySize: -1 valueSize: 41 sequence: -1 headerKeys: [] payload: {"type":"PARTITION_CHANGE_RECORD","version":0,"data":{"partitionId":5,"topicId":"Pd7wMb4lSkKI00--SrWNXw","isr":[0,4],"leader":4}} | offset: 38655594 CreateTime: 1683849857362 keySize: -1 valueSize: 41 sequence: -1 headerKeys: [] payload: {"type":"PARTITION_CHANGE_RECORD","version":0,"data":{"partitionId":6,"topicId":"Pd7wMb4lSkKI00--SrWNXw","isr":[0,1],"leader":0}} | offset: 38656159 CreateTime: 1683849974945 keySize: -1 valueSize: 39 sequence: -1 headerKeys: [] payload: {"type":"PARTITION_CHANGE_RECORD","version":0,"data":{"partitionId":7,"topicId":"Pd7wMb4lSkKI00--SrWNXw","isr":[3,1,8]}} | offset: 38656256 CreateTime: 1683849994297 keySize: -1 valueSize: 39 sequence: -1 headerKeys: [] payload: {"type":"PARTITION_CHANGE_RECORD","version":0,"data":{"partitionId":5,"topicId":"Pd7wMb4lSkKI00--SrWNXw","isr":[0,4,8]}} | offset: 38656299 CreateTime: 1683849997139 keySize: -1 valueSize: 39 sequence: -1 headerKeys: [] payload: {"type":"PARTITION_CHANGE_RECORD","version":0,"data":{"partitionId":6,"topicId":"Pd7wMb4lSkKI00--SrWNXw","isr":[0,1,8]}} | offset: 38657003 CreateTime: 1683850157379 keySize: -1 valueSize: 30 sequence: -1 headerKeys: [] payload: {"type":"PARTITION_CHANGE_RECORD","version":0,"data":{"partitionId":7,"topicId":"Pd7wMb4lSkKI00--SrWNXw","leader":8}} {code} Also, notice how the leader epoch was not increased when the ISR was expanded. -- This message was sent by Atlassian Jira (v8.20.10#820010)
[GitHub] [kafka] hudeqi commented on pull request #13421: KAFKA-14824: ReplicaAlterLogDirsThread may cause serious disk growing in case of potential exception
hudeqi commented on PR #13421: URL: https://github.com/apache/kafka/pull/13421#issuecomment-1563021307 > Heya @hudeqi, could you give more detailed explanation on what the problem you are trying to solve here because I do not understand? May I suggest you distinguish between a partition replica and a partition future replica in some way, because otherwise it is quite difficult to understand which replica you are referring to when it is just called "partition"? > > Let's say we have an original replica in log directory (backed by one disk) A, and let's say we have a future replica in log directory (backed by another disk) B on the same broker. I have confirmed that it is the case that **compaction** is paused on A when B is first created (https://github.com/apache/kafka/blob/trunk/core/src/main/scala/kafka/server/ReplicaManager.scala#L910). It is only resumed (https://github.com/apache/kafka/blob/trunk/core/src/main/scala/kafka/log/LogManager.scala#L1099). I can think of two situations happening now. > > 1. A fails and B doesn't know what to do. >In this situation, if A has failed then cleaning should not be resumed on A - an operator intervention is required to understand what went wrong with the log directory. Cleaning should not be started on B either. Since A has failed the amount of data that B has does not grow because it doesn't have a source to keep copying from. > 2. B fails and A doesn't know what to do. >In this situation B's size cannot grow because it is no longer copying from A. A should resume cleaning as for all intents and purposes it acts as a normal replica to a topic partition. > > As far as I understand what you try to do is solve situation number 2 - am I correct? If I am correct, what is the reasoning behind marking A as a failed partition? Hi, clolov, thank you for your review! I'm sorry that I didn't express the issue clearly. Your scenario example and description are very correct. The problem I want to solve is indeed situation number 2. I will describe according to your scenario: Assuming that B fails due to a disk problem, then according to the current logic, the partition is directly marked as failed in the corresponding 'ReplicaAlterLogDirsThread', nothing more. However, as a replica that normally provides services, A does not know that B has failed and remains in a clean-paused state, which will cause the disk where A is located to be fully occupied. That is the problem I want to solve. And my solution is to resume cleaning the partition when B fails to ensure that A can continue to clean up. @clolov -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] Hangleton commented on pull request #13558: KAFKA-14845: Fix broker registration with Zookeeper when the previous ephemeral znode was not properly recorded by the broker
Hangleton commented on PR #13558: URL: https://github.com/apache/kafka/pull/13558#issuecomment-1563016533 Thanks Igor for the review, addressing these comments now - sorry i didn't earlier. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] soarez commented on a diff in pull request #13558: KAFKA-14845: Fix broker registration with Zookeeper when the previous ephemeral znode was not properly recorded by the broker
soarez commented on code in PR #13558: URL: https://github.com/apache/kafka/pull/13558#discussion_r1205587123 ## checkstyle/suppressions.xml: ## @@ -41,6 +41,8 @@ + Review Comment: Is this intentional? Can't we sort the import statements to satisfy this Checkstyle rule? ## core/src/main/scala/kafka/zk/KafkaZkClient.scala: ## @@ -2113,7 +2113,12 @@ class KafkaZkClient private[zk] (zooKeeperClient: ZooKeeperClient, isSecure: Boo } private class CheckedEphemeral(path: String, data: Array[Byte]) extends Logging { +private var attempt = 0 +private val maxAttempt = 5 +private val backoffMs = 1000 Review Comment: Should the backoff value be calculated here then? The session timeout is available in `apply()` where `KafkaZkClient` is created. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] C0urante merged pull request #13165: KAFKA-14654: Connector classes should statically initialize with plugin classloader
C0urante merged PR #13165: URL: https://github.com/apache/kafka/pull/13165 -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] hudeqi commented on a diff in pull request #13696: KAFKA-14979:Incorrect lag was calculated when markPartitionsForTruncation in ReplicaAlterLogDirsThread
hudeqi commented on code in PR #13696: URL: https://github.com/apache/kafka/pull/13696#discussion_r1205574820 ## core/src/main/scala/kafka/server/AbstractFetcherThread.scala: ## @@ -460,8 +460,16 @@ abstract class AbstractFetcherThread(name: String, partitionMapLock.lockInterruptibly() try { Option(partitionStates.stateValue(topicPartition)).foreach { state => -val newState = PartitionFetchState(state.topicId, math.min(truncationOffset, state.fetchOffset), - state.lag, state.currentLeaderEpoch, state.delay, state = Truncating, +var lag = state.lag Review Comment: > Isn't the lag recalculated on the next fetch iteration ([here](https://github.com/apache/kafka/blob/trunk/core/src/main/scala/kafka/server/AbstractFetcherThread.scala#L362))? Hi Hangleton, thanks for your review! As you said, under normal circumstances, the lag will be recalculated on the next fetch. However, if an exception occurs before updating the lag (for example, processPartitionData throws an exception due to a disk error) or the return value of logAppendInfoOpt is None, then the lag value will always be or last for a period of time as the previous error value (obviously truncate, lag display is still very large, which seems to deviate greatly from our understanding). Therefore, the modification here is a defensive measure that can effectively avoid such situations. @Hangleton -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] C0urante commented on pull request #13465: KAFKA-14368: Connect offset write REST API
C0urante commented on PR #13465: URL: https://github.com/apache/kafka/pull/13465#issuecomment-1562965406 Ah, spoke too soon! I'd be open to bumping timeouts. If this does turn out to be a correctness issue (which is still possible since the timing on CI may be different and therefore more likely to unearth certain kinds of concurrency bugs), we can investigate further. Also worth noting that `WorkerTest::testAlterOffsetsSourceConnectorError` is also failing right now because `offsetStore::stop` hasn't been invoked by the time we check for it. I think you handle this kind of issue elsewhere by adding a second `timeout(1000)` argument when making calls to `Mockito::verify`; hopefully that's sufficient for this test as well? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] C0urante commented on pull request #13465: KAFKA-14368: Connect offset write REST API
C0urante commented on PR #13465: URL: https://github.com/apache/kafka/pull/13465#issuecomment-1562958860 Ah, spoke too soon! Looks like there are some CI test failures. Can you look into these? org.apache.kafka.connect.runtime.WorkerTest.testAlterOffsetsSourceConnectorError: > Wanted but not invoked: connectorOffsetBackingStore.stop(); -> at org.apache.kafka.connect.storage.ConnectorOffsetBackingStore.stop(ConnectorOffsetBackingStore.java:188) However, there was exactly 1 interaction with this mock: connectorOffsetBackingStore.start(); -> at org.apache.kafka.connect.runtime.Worker.lambda$alterSourceConnectorOffsets$16(Worker.java:1438) at app//org.apache.kafka.connect.storage.ConnectorOffsetBackingStore.stop(ConnectorOffsetBackingStore.java:188) at app//org.apache.kafka.connect.runtime.WorkerTest.testAlterOffsetsSourceConnectorError(WorkerTest.java:2000) org.apache.kafka.connect.integration.OffsetsApiIntegrationTest.testAlterSinkConnectorOffsetsDifferentKafkaClusterTargeted: > org.opentest4j.AssertionFailedError: Condition not met within timeout 15000. Sink connector consumer group offsets should catch up to the topic end offsets ==> expected:but was: at org.junit.jupiter.api.AssertionFailureBuilder.build(AssertionFailureBuilder.java:151) at org.junit.jupiter.api.AssertionFailureBuilder.buildAndThrow(AssertionFailureBuilder.java:132) at org.junit.jupiter.api.AssertTrue.failNotTrue(AssertTrue.java:63) at org.junit.jupiter.api.AssertTrue.assertTrue(AssertTrue.java:36) at org.junit.jupiter.api.Assertions.assertTrue(Assertions.java:211) at org.apache.kafka.test.TestUtils.lambda$waitForCondition$3(TestUtils.java:331) at org.apache.kafka.test.TestUtils.retryOnExceptionWithTimeout(TestUtils.java:379) at org.apache.kafka.test.TestUtils.waitForCondition(TestUtils.java:328) at org.apache.kafka.test.TestUtils.waitForCondition(TestUtils.java:312) at org.apache.kafka.test.TestUtils.waitForCondition(TestUtils.java:285) at org.apache.kafka.connect.integration.OffsetsApiIntegrationTest.waitForExpectedSinkConnectorOffsets(OffsetsApiIntegrationTest.java:602) at org.apache.kafka.connect.integration.OffsetsApiIntegrationTest.alterAndVerifySinkConnectorOffsets(OffsetsApiIntegrationTest.java:312) at org.apache.kafka.connect.integration.OffsetsApiIntegrationTest.testAlterSinkConnectorOffsetsDifferentKafkaClusterTargeted(OffsetsApiIntegrationTest.java:292) -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] yashmayya commented on pull request #13465: KAFKA-14368: Connect offset write REST API
yashmayya commented on PR #13465: URL: https://github.com/apache/kafka/pull/13465#issuecomment-1562952794 I just noticed that `testAlterSinkConnectorOffsetsDifferentKafkaClusterTargeted` is failing in the CI run. There's also https://issues.apache.org/jira/browse/KAFKA-14956 where `testGetSinkConnectorOffsetsDifferentKafkaClusterTargeted ` has been failing on CI for a while. Interestingly, neither of them have failed for me locally in over a 100 runs under various loads. The point at which both of them are failing is also interesting: 1. We create an embedded Connect cluster with its own embedded backing Kafka cluster 2. We create a second embedded Kafka cluster 3. We configure a sink connector in the embedded Connect cluster which consumes from a topic on the second embedded Kafka cluster 4. We produce 10 messages each to 5 different partitions of a topic on the second Kafka cluster (which the connector is configured to consume from) 5. We use the offsets read REST API to get the consumer group offsets for the sink connector and wait until it "catches up" to the expected offsets. This operation is retried up to 15 seconds and if the consumer group offsets (obtained via an admin client in the worker) don't match the expected offsets, the test fails. Both the tests are failing at this point. Since they consistently pass locally, it doesn't seem to be a correctness issue with connectors that target different Kafka clusters. I'm wondering if we need to up the timeout although 15 seconds should be enough to consume just 50 messages 😕 -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] C0urante commented on a diff in pull request #13465: KAFKA-14368: Connect offset write REST API
C0urante commented on code in PR #13465: URL: https://github.com/apache/kafka/pull/13465#discussion_r1205535710 ## connect/runtime/src/test/java/org/apache/kafka/connect/runtime/distributed/DistributedHerderTest.java: ## @@ -4123,6 +4136,280 @@ public void testConnectorOffsets() throws Exception { PowerMock.verifyAll(); } +@Test +public void testAlterOffsetsConnectorNotFound() throws Exception { +// Get the initial assignment +EasyMock.expect(member.memberId()).andStubReturn("leader"); + EasyMock.expect(member.currentProtocolVersion()).andStubReturn(CONNECT_PROTOCOL_V0); +expectRebalance(1, Collections.emptyList(), Collections.emptyList(), true); +expectConfigRefreshAndSnapshot(SNAPSHOT); +member.poll(EasyMock.anyInt()); +PowerMock.expectLastCall(); + +// Now handle the alter connector offsets request +member.wakeup(); +PowerMock.expectLastCall(); +member.ensureActive(); +PowerMock.expectLastCall(); +expectConfigRefreshAndSnapshot(SNAPSHOT); +member.poll(EasyMock.anyInt()); +PowerMock.expectLastCall(); + +PowerMock.replayAll(); + +herder.tick(); +FutureCallback callback = new FutureCallback<>(); +herder.alterConnectorOffsets("connector-does-not-exist", new HashMap<>(), callback); +herder.tick(); +ExecutionException e = assertThrows(ExecutionException.class, () -> callback.get(1000L, TimeUnit.MILLISECONDS)); +assertTrue(e.getCause() instanceof NotFoundException); + +PowerMock.verifyAll(); +} + +@Test +public void testAlterOffsetsConnectorNotInStoppedState() throws Exception { +// Get the initial assignment +EasyMock.expect(member.memberId()).andStubReturn("leader"); + EasyMock.expect(member.currentProtocolVersion()).andStubReturn(CONNECT_PROTOCOL_V0); +expectRebalance(1, Collections.emptyList(), Collections.emptyList(), true); +expectConfigRefreshAndSnapshot(SNAPSHOT); +member.poll(EasyMock.anyInt()); +PowerMock.expectLastCall(); + +// Now handle the alter connector offsets request +member.wakeup(); +PowerMock.expectLastCall(); +member.ensureActive(); +PowerMock.expectLastCall(); +expectConfigRefreshAndSnapshot(SNAPSHOT); +member.poll(EasyMock.anyInt()); +PowerMock.expectLastCall(); + +PowerMock.replayAll(); + +herder.tick(); +FutureCallback callback = new FutureCallback<>(); +herder.alterConnectorOffsets(CONN1, new HashMap<>(), callback); +herder.tick(); +ExecutionException e = assertThrows(ExecutionException.class, () -> callback.get(1000L, TimeUnit.MILLISECONDS)); +assertTrue(e.getCause() instanceof BadRequestException); + +PowerMock.verifyAll(); +} + +@Test +public void testAlterOffsetsNotLeader() throws Exception { +// Get the initial assignment +EasyMock.expect(member.memberId()).andStubReturn("member"); + EasyMock.expect(member.currentProtocolVersion()).andStubReturn(CONNECT_PROTOCOL_V0); +expectRebalance(1, Collections.emptyList(), Collections.emptyList(), false); +expectConfigRefreshAndSnapshot(SNAPSHOT_STOPPED_CONN1); +member.poll(EasyMock.anyInt()); +PowerMock.expectLastCall(); + +// Now handle the alter connector offsets request +member.wakeup(); +PowerMock.expectLastCall(); +member.ensureActive(); +member.poll(EasyMock.anyInt()); +PowerMock.expectLastCall(); + +PowerMock.replayAll(); + +herder.tick(); +FutureCallback callback = new FutureCallback<>(); +herder.alterConnectorOffsets(CONN1, new HashMap<>(), callback); +herder.tick(); +ExecutionException e = assertThrows(ExecutionException.class, () -> callback.get(1000L, TimeUnit.MILLISECONDS)); +assertTrue(e.getCause() instanceof NotLeaderException); + +PowerMock.verifyAll(); +} + +@Test +public void testAlterOffsetsSourceConnectorExactlyOnceDisabled() throws Exception { +// Get the initial assignment +EasyMock.expect(member.memberId()).andStubReturn("leader"); + EasyMock.expect(member.currentProtocolVersion()).andStubReturn(CONNECT_PROTOCOL_V0); +expectRebalance(1, Collections.emptyList(), Collections.emptyList(), true); +expectConfigRefreshAndSnapshot(SNAPSHOT_STOPPED_CONN1); +member.poll(EasyMock.anyInt()); +PowerMock.expectLastCall(); + +// Now handle the alter connector offsets request +Map, Map> offsets = new HashMap<>(); +member.wakeup(); +PowerMock.expectLastCall(); +member.ensureActive(); +PowerMock.expectLastCall(); +member.poll(EasyMock.anyInt()); +PowerMock.expectLastCall(); +expectConfigRefreshA
[GitHub] [kafka] C0urante commented on a diff in pull request #13465: KAFKA-14368: Connect offset write REST API
C0urante commented on code in PR #13465: URL: https://github.com/apache/kafka/pull/13465#discussion_r1205533269 ## connect/runtime/src/test/java/org/apache/kafka/connect/runtime/distributed/DistributedHerderTest.java: ## @@ -4123,6 +4136,280 @@ public void testConnectorOffsets() throws Exception { PowerMock.verifyAll(); } +@Test +public void testAlterOffsetsConnectorNotFound() throws Exception { +// Get the initial assignment +EasyMock.expect(member.memberId()).andStubReturn("leader"); + EasyMock.expect(member.currentProtocolVersion()).andStubReturn(CONNECT_PROTOCOL_V0); +expectRebalance(1, Collections.emptyList(), Collections.emptyList(), true); +expectConfigRefreshAndSnapshot(SNAPSHOT); +member.poll(EasyMock.anyInt()); +PowerMock.expectLastCall(); + +// Now handle the alter connector offsets request +member.wakeup(); +PowerMock.expectLastCall(); +member.ensureActive(); +PowerMock.expectLastCall(); +expectConfigRefreshAndSnapshot(SNAPSHOT); +member.poll(EasyMock.anyInt()); +PowerMock.expectLastCall(); + +PowerMock.replayAll(); + +herder.tick(); +FutureCallback callback = new FutureCallback<>(); +herder.alterConnectorOffsets("connector-does-not-exist", new HashMap<>(), callback); +herder.tick(); +ExecutionException e = assertThrows(ExecutionException.class, () -> callback.get(1000L, TimeUnit.MILLISECONDS)); +assertTrue(e.getCause() instanceof NotFoundException); + +PowerMock.verifyAll(); +} + +@Test +public void testAlterOffsetsConnectorNotInStoppedState() throws Exception { +// Get the initial assignment +EasyMock.expect(member.memberId()).andStubReturn("leader"); + EasyMock.expect(member.currentProtocolVersion()).andStubReturn(CONNECT_PROTOCOL_V0); +expectRebalance(1, Collections.emptyList(), Collections.emptyList(), true); +expectConfigRefreshAndSnapshot(SNAPSHOT); +member.poll(EasyMock.anyInt()); +PowerMock.expectLastCall(); + +// Now handle the alter connector offsets request +member.wakeup(); +PowerMock.expectLastCall(); +member.ensureActive(); +PowerMock.expectLastCall(); +expectConfigRefreshAndSnapshot(SNAPSHOT); +member.poll(EasyMock.anyInt()); +PowerMock.expectLastCall(); + +PowerMock.replayAll(); + +herder.tick(); +FutureCallback callback = new FutureCallback<>(); +herder.alterConnectorOffsets(CONN1, new HashMap<>(), callback); +herder.tick(); +ExecutionException e = assertThrows(ExecutionException.class, () -> callback.get(1000L, TimeUnit.MILLISECONDS)); +assertTrue(e.getCause() instanceof BadRequestException); + +PowerMock.verifyAll(); +} + +@Test +public void testAlterOffsetsNotLeader() throws Exception { +// Get the initial assignment +EasyMock.expect(member.memberId()).andStubReturn("member"); + EasyMock.expect(member.currentProtocolVersion()).andStubReturn(CONNECT_PROTOCOL_V0); +expectRebalance(1, Collections.emptyList(), Collections.emptyList(), false); +expectConfigRefreshAndSnapshot(SNAPSHOT_STOPPED_CONN1); +member.poll(EasyMock.anyInt()); +PowerMock.expectLastCall(); + +// Now handle the alter connector offsets request +member.wakeup(); +PowerMock.expectLastCall(); +member.ensureActive(); Review Comment: Huh, TIL! -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] C0urante commented on pull request #13646: KAFKA-14938: Fixing flaky test testConnectorBoundary
C0urante commented on PR #13646: URL: https://github.com/apache/kafka/pull/13646#issuecomment-1562913728 @sambhav-jain-16 thanks for looking into this. I'm prioritizing a few other KIP-related PRs at the moment but will try to make time for this sometime in the next couple of weeks. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] clolov commented on pull request #13721: KAFKA-14782: Implementation Details Different from Documentation (del…
clolov commented on PR #13721: URL: https://github.com/apache/kafka/pull/13721#issuecomment-1562897922 Heya @WhitEyesX and thank you for the contribution! On a first look this does appear to be an omission - I went through the KIP, related PRs and subsequent commits to that area of the code and I haven't found a discussion or a comment which mentions that retry.backoff.ms is no longer considered as part of that equation. However, quite a lot of people reviewed the PR when it was merged, which leads me to suspect I am lacking a piece of information which is taken into account somewhere else in the code. Tagging @ijuma and @hachikuji as some of the original PRs reviewers. Tagging @jolshan and @artemlivshits as contributors who have greater knowledge than me when it comes to the producer code paths. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] urbandan commented on a diff in pull request #13591: KAFKA-14831: Illegal state errors should be fatal in transactional producer
urbandan commented on code in PR #13591: URL: https://github.com/apache/kafka/pull/13591#discussion_r1205410637 ## clients/src/main/java/org/apache/kafka/clients/producer/internals/TransactionManager.java: ## @@ -609,14 +686,15 @@ public synchronized void handleCompletedBatch(ProducerBatch batch, ProduceRespon } public synchronized void transitionToUninitialized(RuntimeException exception) { -transitionTo(State.UNINITIALIZED); +transitionTo(State.UNINITIALIZED, exception, InvalidStateDetectionStrategy.BACKGROUND); Review Comment: What will happen with the exception we pass to transitionTo here? My understanding is that transitionTo only cares about the exception if the state is one of the error states. ## clients/src/main/java/org/apache/kafka/clients/producer/internals/TransactionManager.java: ## @@ -609,14 +686,15 @@ public synchronized void handleCompletedBatch(ProducerBatch batch, ProduceRespon } public synchronized void transitionToUninitialized(RuntimeException exception) { -transitionTo(State.UNINITIALIZED); +transitionTo(State.UNINITIALIZED, exception, InvalidStateDetectionStrategy.BACKGROUND); if (pendingTransition != null) { pendingTransition.result.fail(exception); } lastError = null; } -public synchronized void maybeTransitionToErrorState(RuntimeException exception) { +public synchronized void maybeTransitionToErrorState(RuntimeException exception, Review Comment: Just to double check: the transitionToFatalError call on line 702 does not use the invalidStateDetectionStrategy param because all of the exceptions in the condition come from a server response, thus making it a BACKGROUND type call? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] showuon commented on a diff in pull request #13561: KAFKA-14888: Added remote log segments retention functionality based on time and size.
showuon commented on code in PR #13561: URL: https://github.com/apache/kafka/pull/13561#discussion_r1205425110 ## core/src/main/java/kafka/log/remote/RemoteLogManager.java: ## @@ -618,6 +629,193 @@ public void run() { } } +public void handleLogStartOffsetUpdate(TopicPartition topicPartition, long remoteLogStartOffset) { +logger.debug("Updating $topicPartition with remoteLogStartOffset: {}", remoteLogStartOffset); Review Comment: debug("Update {} with remoteLogStartOffset: {}", topicPartition, remoteLogStartOffset) ## core/src/main/java/kafka/log/remote/RemoteLogManager.java: ## @@ -618,6 +629,193 @@ public void run() { } } +public void handleLogStartOffsetUpdate(TopicPartition topicPartition, long remoteLogStartOffset) { +logger.debug("Updating $topicPartition with remoteLogStartOffset: {}", remoteLogStartOffset); +updateRemoteLogStartOffset.accept(topicPartition, remoteLogStartOffset); +} + +class RemoteLogRetentionHandler { + +private long remainingBreachedSize = 0L; +private OptionalLong logStartOffset = OptionalLong.empty(); + +public RemoteLogRetentionHandler(long remainingBreachedSize) { +this.remainingBreachedSize = remainingBreachedSize; +} + +private boolean deleteRetentionSizeBreachedSegments(RemoteLogSegmentMetadata metadata, boolean checkSizeRetention) throws RemoteStorageException, ExecutionException, InterruptedException { +boolean isSegmentDeleted = deleteRemoteLogSegment(metadata, x -> { +// Assumption that segments contain size > 0 +if (checkSizeRetention && remainingBreachedSize > 0) { +remainingBreachedSize -= x.segmentSizeInBytes(); +return remainingBreachedSize >= 0; +} else return false; +}); +if (isSegmentDeleted) { +logStartOffset = OptionalLong.of(metadata.endOffset() + 1); +logger.info("Deleted remote log segment ${metadata.remoteLogSegmentId()} due to retention size " + +"${log.config.retentionSize} breach. Log size after deletion will be " + +"${remainingBreachedSize + log.config.retentionSize}."); +} +return isSegmentDeleted; +} + +private boolean deleteLogStartOffsetBreachedSegments(RemoteLogSegmentMetadata metadata, long startOffset) +throws RemoteStorageException, ExecutionException, InterruptedException { +boolean isSegmentDeleted = deleteRemoteLogSegment(metadata, x -> startOffset > x.endOffset()); +if (isSegmentDeleted) { +remainingBreachedSize = Math.max(0, remainingBreachedSize - metadata.segmentSizeInBytes()); +logger.info("Deleted remote log segment {} due to log start offset {} breach", metadata.remoteLogSegmentId(), startOffset); +} + +// No need to update the logStartOffset. +return isSegmentDeleted; +} + +// There are two cases: +// 1) When there are offline partitions and a new replica with empty disk is brought as leader, then the +//leader-epoch gets bumped but the log-start-offset gets truncated back to 0. +// 2) To remove the unreferenced segments. +private boolean deleteLogSegmentsDueToLeaderEpochCacheTruncation(EpochEntry earliestEpochEntry, RemoteLogSegmentMetadata metadata) throws RemoteStorageException, ExecutionException, InterruptedException { +boolean isSegmentDeleted = deleteRemoteLogSegment(metadata, x -> + x.segmentLeaderEpochs().keySet().stream().allMatch(epoch -> epoch < earliestEpochEntry.epoch)); +if (isSegmentDeleted) { +logger.info("Deleted remote log segment ${} due to leader epoch cache truncation. Current earliest epoch: {}, segmentEndOffset: {} and segmentEpochs: {}", Review Comment: additional `$` sign: remote log segment [$]{} ... -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] fvaleri commented on pull request #13738: KAFKA-14982: Improve the kafka-metadata-quorum output
fvaleri commented on PR #13738: URL: https://github.com/apache/kafka/pull/13738#issuecomment-1562817801 @showuon all your comments should be addressed with the latest commit. I removed the `-hr` abbreviation and updated the KIP because argparse4j supports them by default. For example you can do the following: ```sh $ bin/kafka-metadata-quorum.sh --bootstrap-server :9092 describe --re --hu NodeId LogEndOffsetLag LastFetchTimestamp LastCaughtUpTimestamp Status 220320 3 ms ago3 ms ago Leader 320320 207 ms ago 207 ms ago Follower 420320 207 ms ago 207 ms ago Follower ``` -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] clolov commented on pull request #13421: KAFKA-14824: ReplicaAlterLogDirsThread may cause serious disk growing in case of potential exception
clolov commented on PR #13421: URL: https://github.com/apache/kafka/pull/13421#issuecomment-1562816088 Heya @hudeqi, could you give more detailed explanation on what the problem you are trying to solve here because I do not understand? May I suggest you distinguish between a partition replica and a partition future replica in some way, because otherwise it is quite difficult to understand which replica you are referring to when it is just called "partition"? Let's say we have an original replica in log directory (backed by one disk) A, and let's say we have a future replica in log directory (backed by another disk) B on the same broker. I have confirmed that it is the case that **compaction** is paused on A when B is first created (https://github.com/apache/kafka/blob/trunk/core/src/main/scala/kafka/server/ReplicaManager.scala#L910). It is only resumed (https://github.com/apache/kafka/blob/trunk/core/src/main/scala/kafka/log/LogManager.scala#L1099). I can think of two situations happening now. 1. A fails and B doesn't know what to do. In this situation, if A has failed then cleaning should not be resumed on A - an operator intervention is required to understand what went wrong with the log directory. Cleaning should not be started on B either. Since A has failed the amount of data that B has does not grow because it doesn't have a source to keep copying from. 2. B fails and A doesn't know what to do. In this situation B's size cannot grow because it is no longer copying from A. A should resume cleaning as for all intents and purposes it acts as a normal replica to a topic partition. As far as I understand what you try to do is solve situation number 2 - am I correct? If I am correct, what is the reasoning behind marking A as a failed partition? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] fvaleri commented on a diff in pull request #13738: KAFKA-14982: Improve the kafka-metadata-quorum output
fvaleri commented on code in PR #13738: URL: https://github.com/apache/kafka/pull/13738#discussion_r1205438962 ## tools/src/main/java/org/apache/kafka/tools/MetadataQuorumCommand.java: ## @@ -155,17 +169,35 @@ private static void handleDescribeReplication(Admin admin) throws ExecutionExcep ); } -private static List> quorumInfoToRows(QuorumInfo.ReplicaState leader, Stream infos, String status) { -return infos.map(info -> -Stream.of( +private static List> quorumInfoToRows(QuorumInfo.ReplicaState leader, + Stream infos, + String status, + boolean humanReadable) { +return infos.map(info -> { +String lastFetchTimestamp = !info.lastFetchTimestamp().isPresent() ? "-1" : +humanReadable ? format("%d ms ago", durationMs(info.lastFetchTimestamp().getAsLong())) : +valueOf(info.lastFetchTimestamp().getAsLong()); +String lastCaughtUpTimestamp = !info.lastCaughtUpTimestamp().isPresent() ? "-1" : +humanReadable ? format("%d ms ago", durationMs(info.lastCaughtUpTimestamp().getAsLong())) : +valueOf(info.lastCaughtUpTimestamp().getAsLong()); +return Stream.of( info.replicaId(), info.logEndOffset(), leader.logEndOffset() - info.logEndOffset(), -info.lastFetchTimestamp().orElse(-1), -info.lastCaughtUpTimestamp().orElse(-1), +lastFetchTimestamp, +lastCaughtUpTimestamp, status -).map(r -> r.toString()).collect(Collectors.toList()) -).collect(Collectors.toList()); +).map(r -> r.toString()).collect(Collectors.toList()); +}).collect(Collectors.toList()); +} + +private static long durationMs(long timestampMs) { Review Comment: Done. ## tools/src/main/java/org/apache/kafka/tools/MetadataQuorumCommand.java: ## @@ -131,22 +140,27 @@ private static void addDescribeParser(Subparsers subparsers) { .addArgument("--status") .help("A short summary of the quorum status and the other provides detailed information about the status of replication.") .action(Arguments.storeTrue()); + ArgumentGroup replicationArgs = describeParser.addArgumentGroup("Replication"); replicationArgs .addArgument("--replication") .help("Detailed information about the status of replication") .action(Arguments.storeTrue()); +replicationArgs +.addArgument("-hr", "--human-readable") +.help("Print human-readable timestamps") Review Comment: What about "Human-readable output"? It's short, clear and allows reuse. In the future and even in other tools, we could use this flag to make some fields readable or filter out less important details that make the output hard to read. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] fvaleri commented on a diff in pull request #13738: KAFKA-14982: Improve the kafka-metadata-quorum output
fvaleri commented on code in PR #13738: URL: https://github.com/apache/kafka/pull/13738#discussion_r1205438624 ## tools/src/main/java/org/apache/kafka/tools/MetadataQuorumCommand.java: ## @@ -155,17 +169,35 @@ private static void handleDescribeReplication(Admin admin) throws ExecutionExcep ); } -private static List> quorumInfoToRows(QuorumInfo.ReplicaState leader, Stream infos, String status) { -return infos.map(info -> -Stream.of( +private static List> quorumInfoToRows(QuorumInfo.ReplicaState leader, + Stream infos, + String status, + boolean humanReadable) { +return infos.map(info -> { +String lastFetchTimestamp = !info.lastFetchTimestamp().isPresent() ? "-1" : +humanReadable ? format("%d ms ago", durationMs(info.lastFetchTimestamp().getAsLong())) : +valueOf(info.lastFetchTimestamp().getAsLong()); +String lastCaughtUpTimestamp = !info.lastCaughtUpTimestamp().isPresent() ? "-1" : +humanReadable ? format("%d ms ago", durationMs(info.lastCaughtUpTimestamp().getAsLong())) : +valueOf(info.lastCaughtUpTimestamp().getAsLong()); +return Stream.of( info.replicaId(), info.logEndOffset(), leader.logEndOffset() - info.logEndOffset(), -info.lastFetchTimestamp().orElse(-1), -info.lastCaughtUpTimestamp().orElse(-1), +lastFetchTimestamp, +lastCaughtUpTimestamp, status -).map(r -> r.toString()).collect(Collectors.toList()) -).collect(Collectors.toList()); +).map(r -> r.toString()).collect(Collectors.toList()); +}).collect(Collectors.toList()); +} + +private static long durationMs(long timestampMs) { +Instant instant = Instant.ofEpochMilli(timestampMs); Review Comment: Renamed to lastTimestamp. ## tools/src/main/java/org/apache/kafka/tools/MetadataQuorumCommand.java: ## @@ -155,17 +169,35 @@ private static void handleDescribeReplication(Admin admin) throws ExecutionExcep ); } -private static List> quorumInfoToRows(QuorumInfo.ReplicaState leader, Stream infos, String status) { -return infos.map(info -> -Stream.of( +private static List> quorumInfoToRows(QuorumInfo.ReplicaState leader, + Stream infos, + String status, + boolean humanReadable) { +return infos.map(info -> { +String lastFetchTimestamp = !info.lastFetchTimestamp().isPresent() ? "-1" : +humanReadable ? format("%d ms ago", durationMs(info.lastFetchTimestamp().getAsLong())) : +valueOf(info.lastFetchTimestamp().getAsLong()); +String lastCaughtUpTimestamp = !info.lastCaughtUpTimestamp().isPresent() ? "-1" : +humanReadable ? format("%d ms ago", durationMs(info.lastCaughtUpTimestamp().getAsLong())) : +valueOf(info.lastCaughtUpTimestamp().getAsLong()); +return Stream.of( info.replicaId(), info.logEndOffset(), leader.logEndOffset() - info.logEndOffset(), -info.lastFetchTimestamp().orElse(-1), -info.lastCaughtUpTimestamp().orElse(-1), +lastFetchTimestamp, +lastCaughtUpTimestamp, status -).map(r -> r.toString()).collect(Collectors.toList()) -).collect(Collectors.toList()); +).map(r -> r.toString()).collect(Collectors.toList()); +}).collect(Collectors.toList()); +} + +private static long durationMs(long timestampMs) { +Instant instant = Instant.ofEpochMilli(timestampMs); +Instant now = Instant.now(); +if (!(instant.isAfter(Instant.EPOCH) && instant.isBefore(now))) { +throw new KafkaException("Invalid timestamp, possible drift in system clock"); Review Comment: Sure, I reworked that code to include more details. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] krespo commented on pull request #13748: [BUGFIX] Bugfixed in KAFKA-8713, but it doesn't work properly.
krespo commented on PR #13748: URL: https://github.com/apache/kafka/pull/13748#issuecomment-1562728086 @gharris1727 First of all thank you for your feedback. As you said, I know that feature is only available in kafka version 3.5, which hasn't been released yet. The open source versions I'm using are as follows: ``` debezium: 2.1 kafka connect: 2.7.1 mysql: 8 ``` When the column in mysql is "nullable" and "default value" is set, we founded that the message that the kafka connector sends through the JsonConverter is always the "default value", even though the column is updated to "null". So I checked the contents of "KAFKA-8713", and after building the source code of Kafka 3.5 version and creating "connect-json-3.5.0.jar", the "${kafka_connect_home} /libs" in the connect-json library was replaced with a new version. Afterwards, I set the "replace.null.with.default" setting to "false", but found that the same problem still occurs. And through debugging, it was found that to solve the "KAFKA-8713" issue, not only the connect-json module but also the connect-api module had to be modified. So I made a pull request for my modified code. Currently, connect-json.jar and connect-api.jar have been replaced with version 3.5, and they are working properly. Please check the code I modified once again. thank you -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Commented] (KAFKA-15020) integration.kafka.server.FetchFromFollowerIntegrationTest.testRackAwareRangeAssignor test is flaky
[ https://issues.apache.org/jira/browse/KAFKA-15020?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17726147#comment-17726147 ] Atul Sharma commented on KAFKA-15020: - https://ci-builds.apache.org/job/Kafka/job/kafka-pr/job/PR-13753/2/testReport/junit/integration.kafka.server/FetchFromFollowerIntegrationTest/Build___JDK_8_and_Scala_2_12___testRackAwareRangeAssignor__/ > integration.kafka.server.FetchFromFollowerIntegrationTest.testRackAwareRangeAssignor > test is flaky > -- > > Key: KAFKA-15020 > URL: https://issues.apache.org/jira/browse/KAFKA-15020 > Project: Kafka > Issue Type: Test >Reporter: Atul Sharma >Priority: Major > > Sometimes the test fails with the following log: > {code:java} > Gradle Test Run :core:integrationTest > Gradle Test Executor 175 > > FetchFromFollowerIntegrationTest > testRackAwareRangeAssignor() FAILED > org.opentest4j.AssertionFailedError: Consumed 0 records before timeout > instead of the expected 2 records > at org.junit.jupiter.api.AssertionUtils.fail(AssertionUtils.java:38) > at org.junit.jupiter.api.Assertions.fail(Assertions.java:135) > at kafka.utils.TestUtils$.pollUntilAtLeastNumRecords(TestUtils.scala:1087) > at > integration.kafka.server.FetchFromFollowerIntegrationTest.$anonfun$testRackAwareRangeAssignor$11(FetchFromFollowerIntegrationTest.scala:216) > at > integration.kafka.server.FetchFromFollowerIntegrationTest.$anonfun$testRackAwareRangeAssignor$11$adapted(FetchFromFollowerIntegrationTest.scala:215) > at scala.collection.mutable.ResizableArray.foreach(ResizableArray.scala:62) > at scala.collection.mutable.ResizableArray.foreach$(ResizableArray.scala:55) > at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:49) > at > integration.kafka.server.FetchFromFollowerIntegrationTest.verifyAssignments$1(FetchFromFollowerIntegrationTest.scala:215) > at > integration.kafka.server.FetchFromFollowerIntegrationTest.testRackAwareRangeAssignor(FetchFromFollowerIntegrationTest.scala:244) > {code} -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] (KAFKA-14984) DynamicBrokerReconfigurationTest.testThreadPoolResize() test is flaky
[ https://issues.apache.org/jira/browse/KAFKA-14984 ] Atul Sharma deleted comment on KAFKA-14984: - was (Author: JIRAUSER299965): https://ci-builds.apache.org/job/Kafka/job/kafka-pr/job/PR-13753/2/testReport/junit/integration.kafka.server/FetchFromFollowerIntegrationTest/Build___JDK_8_and_Scala_2_12___testRackAwareRangeAssignor__/ > DynamicBrokerReconfigurationTest.testThreadPoolResize() test is flaky > -- > > Key: KAFKA-14984 > URL: https://issues.apache.org/jira/browse/KAFKA-14984 > Project: Kafka > Issue Type: Test >Reporter: Manyanda Chitimbo >Priority: Major > > The test sometimes fails with the below log > {code:java} > kafka.server.DynamicBrokerReconfigurationTest.testThreadPoolResize() failed, > log available in > .../core/build/reports/testOutput/kafka.server.DynamicBrokerReconfigurationTest.testThreadPoolResize().test.stdoutGradle > Test Run :core:test > Gradle Test Executor 6 > > DynamicBrokerReconfigurationTest > testThreadPoolResize() FAILED > org.opentest4j.AssertionFailedError: Invalid threads: expected 6, got 8: > List(data-plane-kafka-socket-acceptor-ListenerName(PLAINTEXT)-PLAINTEXT-0, > data-plane-kafka-socket-acceptor-ListenerName(PLAINTEXT)-PLAINTEXT-0, > data-plane-kafka-socket-acceptor-ListenerName(INTERNAL)-SSL-0, > data-plane-kafka-socket-acceptor-ListenerName(EXTERNAL)-SASL_SSL-0, > data-plane-kafka-socket-acceptor-ListenerName(INTERNAL)-SSL-0, > data-plane-kafka-socket-acceptor-ListenerName(INTERNAL)-SSL-0, > data-plane-kafka-socket-acceptor-ListenerName(EXTERNAL)-SASL_SSL-0, > data-plane-kafka-socket-acceptor-ListenerName(EXTERNAL)-SASL_SSL-0) ==> > expected: but was: > at > app//org.junit.jupiter.api.AssertionFailureBuilder.build(AssertionFailureBuilder.java:151) > at > app//org.junit.jupiter.api.AssertionFailureBuilder.buildAndThrow(AssertionFailureBuilder.java:132) > at > app//org.junit.jupiter.api.AssertTrue.failNotTrue(AssertTrue.java:63) > at > app//org.junit.jupiter.api.AssertTrue.assertTrue(AssertTrue.java:36) > at > app//org.junit.jupiter.api.Assertions.assertTrue(Assertions.java:211) > at > app//kafka.server.DynamicBrokerReconfigurationTest.verifyThreads(DynamicBrokerReconfigurationTest.scala:1634) > at > app//kafka.server.DynamicBrokerReconfigurationTest.testThreadPoolResize(DynamicBrokerReconfigurationTest.scala:872) > {code} -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Commented] (KAFKA-14984) DynamicBrokerReconfigurationTest.testThreadPoolResize() test is flaky
[ https://issues.apache.org/jira/browse/KAFKA-14984?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17726146#comment-17726146 ] Atul Sharma commented on KAFKA-14984: - https://ci-builds.apache.org/job/Kafka/job/kafka-pr/job/PR-13753/2/testReport/junit/integration.kafka.server/FetchFromFollowerIntegrationTest/Build___JDK_8_and_Scala_2_12___testRackAwareRangeAssignor__/ > DynamicBrokerReconfigurationTest.testThreadPoolResize() test is flaky > -- > > Key: KAFKA-14984 > URL: https://issues.apache.org/jira/browse/KAFKA-14984 > Project: Kafka > Issue Type: Test >Reporter: Manyanda Chitimbo >Priority: Major > > The test sometimes fails with the below log > {code:java} > kafka.server.DynamicBrokerReconfigurationTest.testThreadPoolResize() failed, > log available in > .../core/build/reports/testOutput/kafka.server.DynamicBrokerReconfigurationTest.testThreadPoolResize().test.stdoutGradle > Test Run :core:test > Gradle Test Executor 6 > > DynamicBrokerReconfigurationTest > testThreadPoolResize() FAILED > org.opentest4j.AssertionFailedError: Invalid threads: expected 6, got 8: > List(data-plane-kafka-socket-acceptor-ListenerName(PLAINTEXT)-PLAINTEXT-0, > data-plane-kafka-socket-acceptor-ListenerName(PLAINTEXT)-PLAINTEXT-0, > data-plane-kafka-socket-acceptor-ListenerName(INTERNAL)-SSL-0, > data-plane-kafka-socket-acceptor-ListenerName(EXTERNAL)-SASL_SSL-0, > data-plane-kafka-socket-acceptor-ListenerName(INTERNAL)-SSL-0, > data-plane-kafka-socket-acceptor-ListenerName(INTERNAL)-SSL-0, > data-plane-kafka-socket-acceptor-ListenerName(EXTERNAL)-SASL_SSL-0, > data-plane-kafka-socket-acceptor-ListenerName(EXTERNAL)-SASL_SSL-0) ==> > expected: but was: > at > app//org.junit.jupiter.api.AssertionFailureBuilder.build(AssertionFailureBuilder.java:151) > at > app//org.junit.jupiter.api.AssertionFailureBuilder.buildAndThrow(AssertionFailureBuilder.java:132) > at > app//org.junit.jupiter.api.AssertTrue.failNotTrue(AssertTrue.java:63) > at > app//org.junit.jupiter.api.AssertTrue.assertTrue(AssertTrue.java:36) > at > app//org.junit.jupiter.api.Assertions.assertTrue(Assertions.java:211) > at > app//kafka.server.DynamicBrokerReconfigurationTest.verifyThreads(DynamicBrokerReconfigurationTest.scala:1634) > at > app//kafka.server.DynamicBrokerReconfigurationTest.testThreadPoolResize(DynamicBrokerReconfigurationTest.scala:872) > {code} -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Created] (KAFKA-15020) integration.kafka.server.FetchFromFollowerIntegrationTest.testRackAwareRangeAssignor test is flaky
Atul Sharma created KAFKA-15020: --- Summary: integration.kafka.server.FetchFromFollowerIntegrationTest.testRackAwareRangeAssignor test is flaky Key: KAFKA-15020 URL: https://issues.apache.org/jira/browse/KAFKA-15020 Project: Kafka Issue Type: Test Reporter: Atul Sharma Sometimes the test fails with the following log: {code:java} Gradle Test Run :core:integrationTest > Gradle Test Executor 175 > FetchFromFollowerIntegrationTest > testRackAwareRangeAssignor() FAILED org.opentest4j.AssertionFailedError: Consumed 0 records before timeout instead of the expected 2 records at org.junit.jupiter.api.AssertionUtils.fail(AssertionUtils.java:38) at org.junit.jupiter.api.Assertions.fail(Assertions.java:135) at kafka.utils.TestUtils$.pollUntilAtLeastNumRecords(TestUtils.scala:1087) at integration.kafka.server.FetchFromFollowerIntegrationTest.$anonfun$testRackAwareRangeAssignor$11(FetchFromFollowerIntegrationTest.scala:216) at integration.kafka.server.FetchFromFollowerIntegrationTest.$anonfun$testRackAwareRangeAssignor$11$adapted(FetchFromFollowerIntegrationTest.scala:215) at scala.collection.mutable.ResizableArray.foreach(ResizableArray.scala:62) at scala.collection.mutable.ResizableArray.foreach$(ResizableArray.scala:55) at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:49) at integration.kafka.server.FetchFromFollowerIntegrationTest.verifyAssignments$1(FetchFromFollowerIntegrationTest.scala:215) at integration.kafka.server.FetchFromFollowerIntegrationTest.testRackAwareRangeAssignor(FetchFromFollowerIntegrationTest.scala:244) {code} -- This message was sent by Atlassian Jira (v8.20.10#820010)
[GitHub] [kafka] dajac commented on pull request #13639: KAFKA-14462; [12/N] Add GroupMetadataManager and ConsumerGroup
dajac commented on PR #13639: URL: https://github.com/apache/kafka/pull/13639#issuecomment-1562650336 @jolshan @jeffkbkim Thanks for your comments. I have addressed them, I think. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] showuon opened a new pull request, #13763: MINOR: use debug level to log handleCommit
showuon opened a new pull request, #13763: URL: https://github.com/apache/kafka/pull/13763 In this [PR](https://github.com/apache/kafka/pull/13462/files#diff-38a0aec01732a73b638b91b3d88dfb6a074a6e47fac7972d5a8e7dc97c258606R321), we improved the log to make it clear, and backported to 3.4 branch. But in the [backport commit](https://github.com/apache/kafka/pull/13462/files#diff-38a0aec01732a73b638b91b3d88dfb6a074a6e47fac7972d5a8e7dc97c258606R321), we accidentally change the debug level to info level, and prefixed as `handleSnapshot`, instead of `handleCommit`. Fix it to avoid log flooding. ### Committer Checklist (excluded from commit message) - [ ] Verify design and implementation - [ ] Verify test coverage and CI build status - [ ] Verify documentation (including upgrade notes) -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] dajac commented on a diff in pull request #13639: KAFKA-14462; [12/N] Add GroupMetadataManager and ConsumerGroup
dajac commented on code in PR #13639: URL: https://github.com/apache/kafka/pull/13639#discussion_r1205278404 ## group-coordinator/src/test/java/org/apache/kafka/coordinator/group/GroupMetadataManagerTest.java: ## @@ -0,0 +1,2017 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.kafka.coordinator.group; + +import org.apache.kafka.common.Uuid; +import org.apache.kafka.common.errors.FencedMemberEpochException; +import org.apache.kafka.common.errors.GroupIdNotFoundException; +import org.apache.kafka.common.errors.GroupMaxSizeReachedException; +import org.apache.kafka.common.errors.InvalidRequestException; +import org.apache.kafka.common.errors.UnknownMemberIdException; +import org.apache.kafka.common.errors.UnknownServerException; +import org.apache.kafka.common.errors.UnsupportedAssignorException; +import org.apache.kafka.common.message.ConsumerGroupHeartbeatRequestData; +import org.apache.kafka.common.message.ConsumerGroupHeartbeatResponseData; +import org.apache.kafka.common.metadata.PartitionRecord; +import org.apache.kafka.common.metadata.TopicRecord; +import org.apache.kafka.common.network.ClientInformation; +import org.apache.kafka.common.network.ListenerName; +import org.apache.kafka.common.protocol.ApiKeys; +import org.apache.kafka.common.protocol.ApiMessage; +import org.apache.kafka.common.requests.RequestContext; +import org.apache.kafka.common.requests.RequestHeader; +import org.apache.kafka.common.security.auth.KafkaPrincipal; +import org.apache.kafka.common.security.auth.SecurityProtocol; +import org.apache.kafka.common.utils.LogContext; +import org.apache.kafka.coordinator.group.assignor.AssignmentSpec; +import org.apache.kafka.coordinator.group.assignor.GroupAssignment; +import org.apache.kafka.coordinator.group.assignor.PartitionAssignor; +import org.apache.kafka.coordinator.group.assignor.PartitionAssignorException; +import org.apache.kafka.coordinator.group.consumer.Assignment; +import org.apache.kafka.coordinator.group.consumer.ConsumerGroup; +import org.apache.kafka.coordinator.group.consumer.ConsumerGroupMember; +import org.apache.kafka.coordinator.group.consumer.TopicMetadata; +import org.apache.kafka.coordinator.group.generated.ConsumerGroupCurrentMemberAssignmentKey; +import org.apache.kafka.coordinator.group.generated.ConsumerGroupCurrentMemberAssignmentValue; +import org.apache.kafka.coordinator.group.generated.ConsumerGroupMemberMetadataKey; +import org.apache.kafka.coordinator.group.generated.ConsumerGroupMemberMetadataValue; +import org.apache.kafka.coordinator.group.generated.ConsumerGroupMetadataKey; +import org.apache.kafka.coordinator.group.generated.ConsumerGroupMetadataValue; +import org.apache.kafka.coordinator.group.generated.ConsumerGroupPartitionMetadataKey; +import org.apache.kafka.coordinator.group.generated.ConsumerGroupPartitionMetadataValue; +import org.apache.kafka.coordinator.group.generated.ConsumerGroupTargetAssignmentMemberKey; +import org.apache.kafka.coordinator.group.generated.ConsumerGroupTargetAssignmentMemberValue; +import org.apache.kafka.coordinator.group.generated.ConsumerGroupTargetAssignmentMetadataKey; +import org.apache.kafka.coordinator.group.generated.ConsumerGroupTargetAssignmentMetadataValue; +import org.apache.kafka.image.TopicImage; +import org.apache.kafka.image.TopicsDelta; +import org.apache.kafka.image.TopicsImage; +import org.apache.kafka.server.common.ApiMessageAndVersion; +import org.apache.kafka.timeline.SnapshotRegistry; +import org.junit.jupiter.api.Test; + +import java.net.InetAddress; +import java.util.ArrayList; +import java.util.Arrays; +import java.util.Collections; +import java.util.HashMap; +import java.util.HashSet; +import java.util.List; +import java.util.Map; +import java.util.Objects; +import java.util.Set; + +import static org.apache.kafka.coordinator.group.AssignmentTestUtil.mkAssignment; +import static org.apache.kafka.coordinator.group.AssignmentTestUtil.mkTopicAssignment; +import static org.junit.jupiter.api.AssertionFailureBuilder.assertionFailure; +import static org.junit.jupiter.api.Assertions.assertEquals; +import static org.junit.jupiter.api.Assertions.assertNotEquals; +import static org.junit.jupiter.api.A
[GitHub] [kafka] dajac commented on a diff in pull request #13639: KAFKA-14462; [12/N] Add GroupMetadataManager and ConsumerGroup
dajac commented on code in PR #13639: URL: https://github.com/apache/kafka/pull/13639#discussion_r1205275881 ## group-coordinator/src/test/java/org/apache/kafka/coordinator/group/GroupMetadataManagerTest.java: ## @@ -0,0 +1,2017 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.kafka.coordinator.group; + +import org.apache.kafka.common.Uuid; +import org.apache.kafka.common.errors.FencedMemberEpochException; +import org.apache.kafka.common.errors.GroupIdNotFoundException; +import org.apache.kafka.common.errors.GroupMaxSizeReachedException; +import org.apache.kafka.common.errors.InvalidRequestException; +import org.apache.kafka.common.errors.UnknownMemberIdException; +import org.apache.kafka.common.errors.UnknownServerException; +import org.apache.kafka.common.errors.UnsupportedAssignorException; +import org.apache.kafka.common.message.ConsumerGroupHeartbeatRequestData; +import org.apache.kafka.common.message.ConsumerGroupHeartbeatResponseData; +import org.apache.kafka.common.metadata.PartitionRecord; +import org.apache.kafka.common.metadata.TopicRecord; +import org.apache.kafka.common.network.ClientInformation; +import org.apache.kafka.common.network.ListenerName; +import org.apache.kafka.common.protocol.ApiKeys; +import org.apache.kafka.common.protocol.ApiMessage; +import org.apache.kafka.common.requests.RequestContext; +import org.apache.kafka.common.requests.RequestHeader; +import org.apache.kafka.common.security.auth.KafkaPrincipal; +import org.apache.kafka.common.security.auth.SecurityProtocol; +import org.apache.kafka.common.utils.LogContext; +import org.apache.kafka.coordinator.group.assignor.AssignmentSpec; +import org.apache.kafka.coordinator.group.assignor.GroupAssignment; +import org.apache.kafka.coordinator.group.assignor.PartitionAssignor; +import org.apache.kafka.coordinator.group.assignor.PartitionAssignorException; +import org.apache.kafka.coordinator.group.consumer.Assignment; +import org.apache.kafka.coordinator.group.consumer.ConsumerGroup; +import org.apache.kafka.coordinator.group.consumer.ConsumerGroupMember; +import org.apache.kafka.coordinator.group.consumer.TopicMetadata; +import org.apache.kafka.coordinator.group.generated.ConsumerGroupCurrentMemberAssignmentKey; +import org.apache.kafka.coordinator.group.generated.ConsumerGroupCurrentMemberAssignmentValue; +import org.apache.kafka.coordinator.group.generated.ConsumerGroupMemberMetadataKey; +import org.apache.kafka.coordinator.group.generated.ConsumerGroupMemberMetadataValue; +import org.apache.kafka.coordinator.group.generated.ConsumerGroupMetadataKey; +import org.apache.kafka.coordinator.group.generated.ConsumerGroupMetadataValue; +import org.apache.kafka.coordinator.group.generated.ConsumerGroupPartitionMetadataKey; +import org.apache.kafka.coordinator.group.generated.ConsumerGroupPartitionMetadataValue; +import org.apache.kafka.coordinator.group.generated.ConsumerGroupTargetAssignmentMemberKey; +import org.apache.kafka.coordinator.group.generated.ConsumerGroupTargetAssignmentMemberValue; +import org.apache.kafka.coordinator.group.generated.ConsumerGroupTargetAssignmentMetadataKey; +import org.apache.kafka.coordinator.group.generated.ConsumerGroupTargetAssignmentMetadataValue; +import org.apache.kafka.image.TopicImage; +import org.apache.kafka.image.TopicsDelta; +import org.apache.kafka.image.TopicsImage; +import org.apache.kafka.server.common.ApiMessageAndVersion; +import org.apache.kafka.timeline.SnapshotRegistry; +import org.junit.jupiter.api.Test; + +import java.net.InetAddress; +import java.util.ArrayList; +import java.util.Arrays; +import java.util.Collections; +import java.util.HashMap; +import java.util.HashSet; +import java.util.List; +import java.util.Map; +import java.util.Objects; +import java.util.Set; + +import static org.apache.kafka.coordinator.group.AssignmentTestUtil.mkAssignment; +import static org.apache.kafka.coordinator.group.AssignmentTestUtil.mkTopicAssignment; +import static org.junit.jupiter.api.AssertionFailureBuilder.assertionFailure; +import static org.junit.jupiter.api.Assertions.assertEquals; +import static org.junit.jupiter.api.Assertions.assertNotEquals; +import static org.junit.jupiter.api.A
[GitHub] [kafka] dajac commented on a diff in pull request #13639: KAFKA-14462; [12/N] Add GroupMetadataManager and ConsumerGroup
dajac commented on code in PR #13639: URL: https://github.com/apache/kafka/pull/13639#discussion_r1205273140 ## group-coordinator/src/test/java/org/apache/kafka/coordinator/group/GroupMetadataManagerTest.java: ## @@ -0,0 +1,2017 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.kafka.coordinator.group; + +import org.apache.kafka.common.Uuid; +import org.apache.kafka.common.errors.FencedMemberEpochException; +import org.apache.kafka.common.errors.GroupIdNotFoundException; +import org.apache.kafka.common.errors.GroupMaxSizeReachedException; +import org.apache.kafka.common.errors.InvalidRequestException; +import org.apache.kafka.common.errors.UnknownMemberIdException; +import org.apache.kafka.common.errors.UnknownServerException; +import org.apache.kafka.common.errors.UnsupportedAssignorException; +import org.apache.kafka.common.message.ConsumerGroupHeartbeatRequestData; +import org.apache.kafka.common.message.ConsumerGroupHeartbeatResponseData; +import org.apache.kafka.common.metadata.PartitionRecord; +import org.apache.kafka.common.metadata.TopicRecord; +import org.apache.kafka.common.network.ClientInformation; +import org.apache.kafka.common.network.ListenerName; +import org.apache.kafka.common.protocol.ApiKeys; +import org.apache.kafka.common.protocol.ApiMessage; +import org.apache.kafka.common.requests.RequestContext; +import org.apache.kafka.common.requests.RequestHeader; +import org.apache.kafka.common.security.auth.KafkaPrincipal; +import org.apache.kafka.common.security.auth.SecurityProtocol; +import org.apache.kafka.common.utils.LogContext; +import org.apache.kafka.coordinator.group.assignor.AssignmentSpec; +import org.apache.kafka.coordinator.group.assignor.GroupAssignment; +import org.apache.kafka.coordinator.group.assignor.PartitionAssignor; +import org.apache.kafka.coordinator.group.assignor.PartitionAssignorException; +import org.apache.kafka.coordinator.group.consumer.Assignment; +import org.apache.kafka.coordinator.group.consumer.ConsumerGroup; +import org.apache.kafka.coordinator.group.consumer.ConsumerGroupMember; +import org.apache.kafka.coordinator.group.consumer.TopicMetadata; +import org.apache.kafka.coordinator.group.generated.ConsumerGroupCurrentMemberAssignmentKey; +import org.apache.kafka.coordinator.group.generated.ConsumerGroupCurrentMemberAssignmentValue; +import org.apache.kafka.coordinator.group.generated.ConsumerGroupMemberMetadataKey; +import org.apache.kafka.coordinator.group.generated.ConsumerGroupMemberMetadataValue; +import org.apache.kafka.coordinator.group.generated.ConsumerGroupMetadataKey; +import org.apache.kafka.coordinator.group.generated.ConsumerGroupMetadataValue; +import org.apache.kafka.coordinator.group.generated.ConsumerGroupPartitionMetadataKey; +import org.apache.kafka.coordinator.group.generated.ConsumerGroupPartitionMetadataValue; +import org.apache.kafka.coordinator.group.generated.ConsumerGroupTargetAssignmentMemberKey; +import org.apache.kafka.coordinator.group.generated.ConsumerGroupTargetAssignmentMemberValue; +import org.apache.kafka.coordinator.group.generated.ConsumerGroupTargetAssignmentMetadataKey; +import org.apache.kafka.coordinator.group.generated.ConsumerGroupTargetAssignmentMetadataValue; +import org.apache.kafka.image.TopicImage; +import org.apache.kafka.image.TopicsDelta; +import org.apache.kafka.image.TopicsImage; +import org.apache.kafka.server.common.ApiMessageAndVersion; +import org.apache.kafka.timeline.SnapshotRegistry; +import org.junit.jupiter.api.Test; + +import java.net.InetAddress; +import java.util.ArrayList; +import java.util.Arrays; +import java.util.Collections; +import java.util.HashMap; +import java.util.HashSet; +import java.util.List; +import java.util.Map; +import java.util.Objects; +import java.util.Set; + +import static org.apache.kafka.coordinator.group.AssignmentTestUtil.mkAssignment; +import static org.apache.kafka.coordinator.group.AssignmentTestUtil.mkTopicAssignment; +import static org.junit.jupiter.api.AssertionFailureBuilder.assertionFailure; +import static org.junit.jupiter.api.Assertions.assertEquals; +import static org.junit.jupiter.api.Assertions.assertNotEquals; +import static org.junit.jupiter.api.A
[GitHub] [kafka] Hangleton commented on a diff in pull request #13696: KAFKA-14979:Incorrect lag was calculated when markPartitionsForTruncation in ReplicaAlterLogDirsThread
Hangleton commented on code in PR #13696: URL: https://github.com/apache/kafka/pull/13696#discussion_r1205272856 ## core/src/main/scala/kafka/server/AbstractFetcherThread.scala: ## @@ -460,8 +460,16 @@ abstract class AbstractFetcherThread(name: String, partitionMapLock.lockInterruptibly() try { Option(partitionStates.stateValue(topicPartition)).foreach { state => -val newState = PartitionFetchState(state.topicId, math.min(truncationOffset, state.fetchOffset), - state.lag, state.currentLeaderEpoch, state.delay, state = Truncating, +var lag = state.lag Review Comment: Isn't the lag recalculated on the next fetch iteration ([here](https://github.com/apache/kafka/blob/trunk/core/src/main/scala/kafka/server/AbstractFetcherThread.scala#L362))? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] dajac commented on a diff in pull request #13639: KAFKA-14462; [12/N] Add GroupMetadataManager and ConsumerGroup
dajac commented on code in PR #13639: URL: https://github.com/apache/kafka/pull/13639#discussion_r1205271405 ## group-coordinator/src/test/java/org/apache/kafka/coordinator/group/GroupMetadataManagerTest.java: ## @@ -0,0 +1,2017 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.kafka.coordinator.group; + +import org.apache.kafka.common.Uuid; +import org.apache.kafka.common.errors.FencedMemberEpochException; +import org.apache.kafka.common.errors.GroupIdNotFoundException; +import org.apache.kafka.common.errors.GroupMaxSizeReachedException; +import org.apache.kafka.common.errors.InvalidRequestException; +import org.apache.kafka.common.errors.UnknownMemberIdException; +import org.apache.kafka.common.errors.UnknownServerException; +import org.apache.kafka.common.errors.UnsupportedAssignorException; +import org.apache.kafka.common.message.ConsumerGroupHeartbeatRequestData; +import org.apache.kafka.common.message.ConsumerGroupHeartbeatResponseData; +import org.apache.kafka.common.metadata.PartitionRecord; +import org.apache.kafka.common.metadata.TopicRecord; +import org.apache.kafka.common.network.ClientInformation; +import org.apache.kafka.common.network.ListenerName; +import org.apache.kafka.common.protocol.ApiKeys; +import org.apache.kafka.common.protocol.ApiMessage; +import org.apache.kafka.common.requests.RequestContext; +import org.apache.kafka.common.requests.RequestHeader; +import org.apache.kafka.common.security.auth.KafkaPrincipal; +import org.apache.kafka.common.security.auth.SecurityProtocol; +import org.apache.kafka.common.utils.LogContext; +import org.apache.kafka.coordinator.group.assignor.AssignmentSpec; +import org.apache.kafka.coordinator.group.assignor.GroupAssignment; +import org.apache.kafka.coordinator.group.assignor.PartitionAssignor; +import org.apache.kafka.coordinator.group.assignor.PartitionAssignorException; +import org.apache.kafka.coordinator.group.consumer.Assignment; +import org.apache.kafka.coordinator.group.consumer.ConsumerGroup; +import org.apache.kafka.coordinator.group.consumer.ConsumerGroupMember; +import org.apache.kafka.coordinator.group.consumer.TopicMetadata; +import org.apache.kafka.coordinator.group.generated.ConsumerGroupCurrentMemberAssignmentKey; +import org.apache.kafka.coordinator.group.generated.ConsumerGroupCurrentMemberAssignmentValue; +import org.apache.kafka.coordinator.group.generated.ConsumerGroupMemberMetadataKey; +import org.apache.kafka.coordinator.group.generated.ConsumerGroupMemberMetadataValue; +import org.apache.kafka.coordinator.group.generated.ConsumerGroupMetadataKey; +import org.apache.kafka.coordinator.group.generated.ConsumerGroupMetadataValue; +import org.apache.kafka.coordinator.group.generated.ConsumerGroupPartitionMetadataKey; +import org.apache.kafka.coordinator.group.generated.ConsumerGroupPartitionMetadataValue; +import org.apache.kafka.coordinator.group.generated.ConsumerGroupTargetAssignmentMemberKey; +import org.apache.kafka.coordinator.group.generated.ConsumerGroupTargetAssignmentMemberValue; +import org.apache.kafka.coordinator.group.generated.ConsumerGroupTargetAssignmentMetadataKey; +import org.apache.kafka.coordinator.group.generated.ConsumerGroupTargetAssignmentMetadataValue; +import org.apache.kafka.image.TopicImage; +import org.apache.kafka.image.TopicsDelta; +import org.apache.kafka.image.TopicsImage; +import org.apache.kafka.server.common.ApiMessageAndVersion; +import org.apache.kafka.timeline.SnapshotRegistry; +import org.junit.jupiter.api.Test; + +import java.net.InetAddress; +import java.util.ArrayList; +import java.util.Arrays; +import java.util.Collections; +import java.util.HashMap; +import java.util.HashSet; +import java.util.List; +import java.util.Map; +import java.util.Objects; +import java.util.Set; + +import static org.apache.kafka.coordinator.group.AssignmentTestUtil.mkAssignment; +import static org.apache.kafka.coordinator.group.AssignmentTestUtil.mkTopicAssignment; +import static org.junit.jupiter.api.AssertionFailureBuilder.assertionFailure; +import static org.junit.jupiter.api.Assertions.assertEquals; +import static org.junit.jupiter.api.Assertions.assertNotEquals; +import static org.junit.jupiter.api.A
[GitHub] [kafka] dajac commented on a diff in pull request #13639: KAFKA-14462; [12/N] Add GroupMetadataManager and ConsumerGroup
dajac commented on code in PR #13639: URL: https://github.com/apache/kafka/pull/13639#discussion_r1205271089 ## group-coordinator/src/test/java/org/apache/kafka/coordinator/group/GroupMetadataManagerTest.java: ## @@ -0,0 +1,2017 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.kafka.coordinator.group; + +import org.apache.kafka.common.Uuid; +import org.apache.kafka.common.errors.FencedMemberEpochException; +import org.apache.kafka.common.errors.GroupIdNotFoundException; +import org.apache.kafka.common.errors.GroupMaxSizeReachedException; +import org.apache.kafka.common.errors.InvalidRequestException; +import org.apache.kafka.common.errors.UnknownMemberIdException; +import org.apache.kafka.common.errors.UnknownServerException; +import org.apache.kafka.common.errors.UnsupportedAssignorException; +import org.apache.kafka.common.message.ConsumerGroupHeartbeatRequestData; +import org.apache.kafka.common.message.ConsumerGroupHeartbeatResponseData; +import org.apache.kafka.common.metadata.PartitionRecord; +import org.apache.kafka.common.metadata.TopicRecord; +import org.apache.kafka.common.network.ClientInformation; +import org.apache.kafka.common.network.ListenerName; +import org.apache.kafka.common.protocol.ApiKeys; +import org.apache.kafka.common.protocol.ApiMessage; +import org.apache.kafka.common.requests.RequestContext; +import org.apache.kafka.common.requests.RequestHeader; +import org.apache.kafka.common.security.auth.KafkaPrincipal; +import org.apache.kafka.common.security.auth.SecurityProtocol; +import org.apache.kafka.common.utils.LogContext; +import org.apache.kafka.coordinator.group.assignor.AssignmentSpec; +import org.apache.kafka.coordinator.group.assignor.GroupAssignment; +import org.apache.kafka.coordinator.group.assignor.PartitionAssignor; +import org.apache.kafka.coordinator.group.assignor.PartitionAssignorException; +import org.apache.kafka.coordinator.group.consumer.Assignment; +import org.apache.kafka.coordinator.group.consumer.ConsumerGroup; +import org.apache.kafka.coordinator.group.consumer.ConsumerGroupMember; +import org.apache.kafka.coordinator.group.consumer.TopicMetadata; +import org.apache.kafka.coordinator.group.generated.ConsumerGroupCurrentMemberAssignmentKey; +import org.apache.kafka.coordinator.group.generated.ConsumerGroupCurrentMemberAssignmentValue; +import org.apache.kafka.coordinator.group.generated.ConsumerGroupMemberMetadataKey; +import org.apache.kafka.coordinator.group.generated.ConsumerGroupMemberMetadataValue; +import org.apache.kafka.coordinator.group.generated.ConsumerGroupMetadataKey; +import org.apache.kafka.coordinator.group.generated.ConsumerGroupMetadataValue; +import org.apache.kafka.coordinator.group.generated.ConsumerGroupPartitionMetadataKey; +import org.apache.kafka.coordinator.group.generated.ConsumerGroupPartitionMetadataValue; +import org.apache.kafka.coordinator.group.generated.ConsumerGroupTargetAssignmentMemberKey; +import org.apache.kafka.coordinator.group.generated.ConsumerGroupTargetAssignmentMemberValue; +import org.apache.kafka.coordinator.group.generated.ConsumerGroupTargetAssignmentMetadataKey; +import org.apache.kafka.coordinator.group.generated.ConsumerGroupTargetAssignmentMetadataValue; +import org.apache.kafka.image.TopicImage; +import org.apache.kafka.image.TopicsDelta; +import org.apache.kafka.image.TopicsImage; +import org.apache.kafka.server.common.ApiMessageAndVersion; +import org.apache.kafka.timeline.SnapshotRegistry; +import org.junit.jupiter.api.Test; + +import java.net.InetAddress; +import java.util.ArrayList; +import java.util.Arrays; +import java.util.Collections; +import java.util.HashMap; +import java.util.HashSet; +import java.util.List; +import java.util.Map; +import java.util.Objects; +import java.util.Set; + +import static org.apache.kafka.coordinator.group.AssignmentTestUtil.mkAssignment; +import static org.apache.kafka.coordinator.group.AssignmentTestUtil.mkTopicAssignment; +import static org.junit.jupiter.api.AssertionFailureBuilder.assertionFailure; +import static org.junit.jupiter.api.Assertions.assertEquals; +import static org.junit.jupiter.api.Assertions.assertNotEquals; +import static org.junit.jupiter.api.A
[GitHub] [kafka] dajac commented on a diff in pull request #13639: KAFKA-14462; [12/N] Add GroupMetadataManager and ConsumerGroup
dajac commented on code in PR #13639: URL: https://github.com/apache/kafka/pull/13639#discussion_r1205266764 ## group-coordinator/src/test/java/org/apache/kafka/coordinator/group/GroupMetadataManagerTest.java: ## @@ -0,0 +1,2017 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.kafka.coordinator.group; + +import org.apache.kafka.common.Uuid; +import org.apache.kafka.common.errors.FencedMemberEpochException; +import org.apache.kafka.common.errors.GroupIdNotFoundException; +import org.apache.kafka.common.errors.GroupMaxSizeReachedException; +import org.apache.kafka.common.errors.InvalidRequestException; +import org.apache.kafka.common.errors.UnknownMemberIdException; +import org.apache.kafka.common.errors.UnknownServerException; +import org.apache.kafka.common.errors.UnsupportedAssignorException; +import org.apache.kafka.common.message.ConsumerGroupHeartbeatRequestData; +import org.apache.kafka.common.message.ConsumerGroupHeartbeatResponseData; +import org.apache.kafka.common.metadata.PartitionRecord; +import org.apache.kafka.common.metadata.TopicRecord; +import org.apache.kafka.common.network.ClientInformation; +import org.apache.kafka.common.network.ListenerName; +import org.apache.kafka.common.protocol.ApiKeys; +import org.apache.kafka.common.protocol.ApiMessage; +import org.apache.kafka.common.requests.RequestContext; +import org.apache.kafka.common.requests.RequestHeader; +import org.apache.kafka.common.security.auth.KafkaPrincipal; +import org.apache.kafka.common.security.auth.SecurityProtocol; +import org.apache.kafka.common.utils.LogContext; +import org.apache.kafka.coordinator.group.assignor.AssignmentSpec; +import org.apache.kafka.coordinator.group.assignor.GroupAssignment; +import org.apache.kafka.coordinator.group.assignor.PartitionAssignor; +import org.apache.kafka.coordinator.group.assignor.PartitionAssignorException; +import org.apache.kafka.coordinator.group.consumer.Assignment; +import org.apache.kafka.coordinator.group.consumer.ConsumerGroup; +import org.apache.kafka.coordinator.group.consumer.ConsumerGroupMember; +import org.apache.kafka.coordinator.group.consumer.TopicMetadata; +import org.apache.kafka.coordinator.group.generated.ConsumerGroupCurrentMemberAssignmentKey; +import org.apache.kafka.coordinator.group.generated.ConsumerGroupCurrentMemberAssignmentValue; +import org.apache.kafka.coordinator.group.generated.ConsumerGroupMemberMetadataKey; +import org.apache.kafka.coordinator.group.generated.ConsumerGroupMemberMetadataValue; +import org.apache.kafka.coordinator.group.generated.ConsumerGroupMetadataKey; +import org.apache.kafka.coordinator.group.generated.ConsumerGroupMetadataValue; +import org.apache.kafka.coordinator.group.generated.ConsumerGroupPartitionMetadataKey; +import org.apache.kafka.coordinator.group.generated.ConsumerGroupPartitionMetadataValue; +import org.apache.kafka.coordinator.group.generated.ConsumerGroupTargetAssignmentMemberKey; +import org.apache.kafka.coordinator.group.generated.ConsumerGroupTargetAssignmentMemberValue; +import org.apache.kafka.coordinator.group.generated.ConsumerGroupTargetAssignmentMetadataKey; +import org.apache.kafka.coordinator.group.generated.ConsumerGroupTargetAssignmentMetadataValue; +import org.apache.kafka.image.TopicImage; +import org.apache.kafka.image.TopicsDelta; +import org.apache.kafka.image.TopicsImage; +import org.apache.kafka.server.common.ApiMessageAndVersion; +import org.apache.kafka.timeline.SnapshotRegistry; +import org.junit.jupiter.api.Test; + +import java.net.InetAddress; +import java.util.ArrayList; +import java.util.Arrays; +import java.util.Collections; +import java.util.HashMap; +import java.util.HashSet; +import java.util.List; +import java.util.Map; +import java.util.Objects; +import java.util.Set; + +import static org.apache.kafka.coordinator.group.AssignmentTestUtil.mkAssignment; +import static org.apache.kafka.coordinator.group.AssignmentTestUtil.mkTopicAssignment; +import static org.junit.jupiter.api.AssertionFailureBuilder.assertionFailure; +import static org.junit.jupiter.api.Assertions.assertEquals; +import static org.junit.jupiter.api.Assertions.assertNotEquals; +import static org.junit.jupiter.api.A
[GitHub] [kafka] dajac commented on a diff in pull request #13639: KAFKA-14462; [12/N] Add GroupMetadataManager and ConsumerGroup
dajac commented on code in PR #13639: URL: https://github.com/apache/kafka/pull/13639#discussion_r1205264648 ## group-coordinator/src/test/java/org/apache/kafka/coordinator/group/GroupMetadataManagerTest.java: ## @@ -0,0 +1,2017 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.kafka.coordinator.group; + +import org.apache.kafka.common.Uuid; +import org.apache.kafka.common.errors.FencedMemberEpochException; +import org.apache.kafka.common.errors.GroupIdNotFoundException; +import org.apache.kafka.common.errors.GroupMaxSizeReachedException; +import org.apache.kafka.common.errors.InvalidRequestException; +import org.apache.kafka.common.errors.UnknownMemberIdException; +import org.apache.kafka.common.errors.UnknownServerException; +import org.apache.kafka.common.errors.UnsupportedAssignorException; +import org.apache.kafka.common.message.ConsumerGroupHeartbeatRequestData; +import org.apache.kafka.common.message.ConsumerGroupHeartbeatResponseData; +import org.apache.kafka.common.metadata.PartitionRecord; +import org.apache.kafka.common.metadata.TopicRecord; +import org.apache.kafka.common.network.ClientInformation; +import org.apache.kafka.common.network.ListenerName; +import org.apache.kafka.common.protocol.ApiKeys; +import org.apache.kafka.common.protocol.ApiMessage; +import org.apache.kafka.common.requests.RequestContext; +import org.apache.kafka.common.requests.RequestHeader; +import org.apache.kafka.common.security.auth.KafkaPrincipal; +import org.apache.kafka.common.security.auth.SecurityProtocol; +import org.apache.kafka.common.utils.LogContext; +import org.apache.kafka.coordinator.group.assignor.AssignmentSpec; +import org.apache.kafka.coordinator.group.assignor.GroupAssignment; +import org.apache.kafka.coordinator.group.assignor.PartitionAssignor; +import org.apache.kafka.coordinator.group.assignor.PartitionAssignorException; +import org.apache.kafka.coordinator.group.consumer.Assignment; +import org.apache.kafka.coordinator.group.consumer.ConsumerGroup; +import org.apache.kafka.coordinator.group.consumer.ConsumerGroupMember; +import org.apache.kafka.coordinator.group.consumer.TopicMetadata; +import org.apache.kafka.coordinator.group.generated.ConsumerGroupCurrentMemberAssignmentKey; +import org.apache.kafka.coordinator.group.generated.ConsumerGroupCurrentMemberAssignmentValue; +import org.apache.kafka.coordinator.group.generated.ConsumerGroupMemberMetadataKey; +import org.apache.kafka.coordinator.group.generated.ConsumerGroupMemberMetadataValue; +import org.apache.kafka.coordinator.group.generated.ConsumerGroupMetadataKey; +import org.apache.kafka.coordinator.group.generated.ConsumerGroupMetadataValue; +import org.apache.kafka.coordinator.group.generated.ConsumerGroupPartitionMetadataKey; +import org.apache.kafka.coordinator.group.generated.ConsumerGroupPartitionMetadataValue; +import org.apache.kafka.coordinator.group.generated.ConsumerGroupTargetAssignmentMemberKey; +import org.apache.kafka.coordinator.group.generated.ConsumerGroupTargetAssignmentMemberValue; +import org.apache.kafka.coordinator.group.generated.ConsumerGroupTargetAssignmentMetadataKey; +import org.apache.kafka.coordinator.group.generated.ConsumerGroupTargetAssignmentMetadataValue; +import org.apache.kafka.image.TopicImage; +import org.apache.kafka.image.TopicsDelta; +import org.apache.kafka.image.TopicsImage; +import org.apache.kafka.server.common.ApiMessageAndVersion; +import org.apache.kafka.timeline.SnapshotRegistry; +import org.junit.jupiter.api.Test; + +import java.net.InetAddress; +import java.util.ArrayList; +import java.util.Arrays; +import java.util.Collections; +import java.util.HashMap; +import java.util.HashSet; +import java.util.List; +import java.util.Map; +import java.util.Objects; +import java.util.Set; + +import static org.apache.kafka.coordinator.group.AssignmentTestUtil.mkAssignment; +import static org.apache.kafka.coordinator.group.AssignmentTestUtil.mkTopicAssignment; +import static org.junit.jupiter.api.AssertionFailureBuilder.assertionFailure; +import static org.junit.jupiter.api.Assertions.assertEquals; +import static org.junit.jupiter.api.Assertions.assertNotEquals; +import static org.junit.jupiter.api.A
[GitHub] [kafka] dajac commented on a diff in pull request #13639: KAFKA-14462; [12/N] Add GroupMetadataManager and ConsumerGroup
dajac commented on code in PR #13639: URL: https://github.com/apache/kafka/pull/13639#discussion_r1205264178 ## group-coordinator/src/test/java/org/apache/kafka/coordinator/group/GroupMetadataManagerTest.java: ## @@ -0,0 +1,2017 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.kafka.coordinator.group; + +import org.apache.kafka.common.Uuid; +import org.apache.kafka.common.errors.FencedMemberEpochException; +import org.apache.kafka.common.errors.GroupIdNotFoundException; +import org.apache.kafka.common.errors.GroupMaxSizeReachedException; +import org.apache.kafka.common.errors.InvalidRequestException; +import org.apache.kafka.common.errors.UnknownMemberIdException; +import org.apache.kafka.common.errors.UnknownServerException; +import org.apache.kafka.common.errors.UnsupportedAssignorException; +import org.apache.kafka.common.message.ConsumerGroupHeartbeatRequestData; +import org.apache.kafka.common.message.ConsumerGroupHeartbeatResponseData; +import org.apache.kafka.common.metadata.PartitionRecord; +import org.apache.kafka.common.metadata.TopicRecord; +import org.apache.kafka.common.network.ClientInformation; +import org.apache.kafka.common.network.ListenerName; +import org.apache.kafka.common.protocol.ApiKeys; +import org.apache.kafka.common.protocol.ApiMessage; +import org.apache.kafka.common.requests.RequestContext; +import org.apache.kafka.common.requests.RequestHeader; +import org.apache.kafka.common.security.auth.KafkaPrincipal; +import org.apache.kafka.common.security.auth.SecurityProtocol; +import org.apache.kafka.common.utils.LogContext; +import org.apache.kafka.coordinator.group.assignor.AssignmentSpec; +import org.apache.kafka.coordinator.group.assignor.GroupAssignment; +import org.apache.kafka.coordinator.group.assignor.PartitionAssignor; +import org.apache.kafka.coordinator.group.assignor.PartitionAssignorException; +import org.apache.kafka.coordinator.group.consumer.Assignment; +import org.apache.kafka.coordinator.group.consumer.ConsumerGroup; +import org.apache.kafka.coordinator.group.consumer.ConsumerGroupMember; +import org.apache.kafka.coordinator.group.consumer.TopicMetadata; +import org.apache.kafka.coordinator.group.generated.ConsumerGroupCurrentMemberAssignmentKey; +import org.apache.kafka.coordinator.group.generated.ConsumerGroupCurrentMemberAssignmentValue; +import org.apache.kafka.coordinator.group.generated.ConsumerGroupMemberMetadataKey; +import org.apache.kafka.coordinator.group.generated.ConsumerGroupMemberMetadataValue; +import org.apache.kafka.coordinator.group.generated.ConsumerGroupMetadataKey; +import org.apache.kafka.coordinator.group.generated.ConsumerGroupMetadataValue; +import org.apache.kafka.coordinator.group.generated.ConsumerGroupPartitionMetadataKey; +import org.apache.kafka.coordinator.group.generated.ConsumerGroupPartitionMetadataValue; +import org.apache.kafka.coordinator.group.generated.ConsumerGroupTargetAssignmentMemberKey; +import org.apache.kafka.coordinator.group.generated.ConsumerGroupTargetAssignmentMemberValue; +import org.apache.kafka.coordinator.group.generated.ConsumerGroupTargetAssignmentMetadataKey; +import org.apache.kafka.coordinator.group.generated.ConsumerGroupTargetAssignmentMetadataValue; +import org.apache.kafka.image.TopicImage; +import org.apache.kafka.image.TopicsDelta; +import org.apache.kafka.image.TopicsImage; +import org.apache.kafka.server.common.ApiMessageAndVersion; +import org.apache.kafka.timeline.SnapshotRegistry; +import org.junit.jupiter.api.Test; + +import java.net.InetAddress; +import java.util.ArrayList; +import java.util.Arrays; +import java.util.Collections; +import java.util.HashMap; +import java.util.HashSet; +import java.util.List; +import java.util.Map; +import java.util.Objects; +import java.util.Set; + +import static org.apache.kafka.coordinator.group.AssignmentTestUtil.mkAssignment; +import static org.apache.kafka.coordinator.group.AssignmentTestUtil.mkTopicAssignment; +import static org.junit.jupiter.api.AssertionFailureBuilder.assertionFailure; +import static org.junit.jupiter.api.Assertions.assertEquals; +import static org.junit.jupiter.api.Assertions.assertNotEquals; +import static org.junit.jupiter.api.A
[GitHub] [kafka] dajac commented on a diff in pull request #13639: KAFKA-14462; [12/N] Add GroupMetadataManager and ConsumerGroup
dajac commented on code in PR #13639: URL: https://github.com/apache/kafka/pull/13639#discussion_r1205256385 ## group-coordinator/src/test/java/org/apache/kafka/coordinator/group/GroupMetadataManagerTest.java: ## @@ -0,0 +1,2017 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.kafka.coordinator.group; + +import org.apache.kafka.common.Uuid; +import org.apache.kafka.common.errors.FencedMemberEpochException; +import org.apache.kafka.common.errors.GroupIdNotFoundException; +import org.apache.kafka.common.errors.GroupMaxSizeReachedException; +import org.apache.kafka.common.errors.InvalidRequestException; +import org.apache.kafka.common.errors.UnknownMemberIdException; +import org.apache.kafka.common.errors.UnknownServerException; +import org.apache.kafka.common.errors.UnsupportedAssignorException; +import org.apache.kafka.common.message.ConsumerGroupHeartbeatRequestData; +import org.apache.kafka.common.message.ConsumerGroupHeartbeatResponseData; +import org.apache.kafka.common.metadata.PartitionRecord; +import org.apache.kafka.common.metadata.TopicRecord; +import org.apache.kafka.common.network.ClientInformation; +import org.apache.kafka.common.network.ListenerName; +import org.apache.kafka.common.protocol.ApiKeys; +import org.apache.kafka.common.protocol.ApiMessage; +import org.apache.kafka.common.requests.RequestContext; +import org.apache.kafka.common.requests.RequestHeader; +import org.apache.kafka.common.security.auth.KafkaPrincipal; +import org.apache.kafka.common.security.auth.SecurityProtocol; +import org.apache.kafka.common.utils.LogContext; +import org.apache.kafka.coordinator.group.assignor.AssignmentSpec; +import org.apache.kafka.coordinator.group.assignor.GroupAssignment; +import org.apache.kafka.coordinator.group.assignor.PartitionAssignor; +import org.apache.kafka.coordinator.group.assignor.PartitionAssignorException; +import org.apache.kafka.coordinator.group.consumer.Assignment; +import org.apache.kafka.coordinator.group.consumer.ConsumerGroup; +import org.apache.kafka.coordinator.group.consumer.ConsumerGroupMember; +import org.apache.kafka.coordinator.group.consumer.TopicMetadata; +import org.apache.kafka.coordinator.group.generated.ConsumerGroupCurrentMemberAssignmentKey; +import org.apache.kafka.coordinator.group.generated.ConsumerGroupCurrentMemberAssignmentValue; +import org.apache.kafka.coordinator.group.generated.ConsumerGroupMemberMetadataKey; +import org.apache.kafka.coordinator.group.generated.ConsumerGroupMemberMetadataValue; +import org.apache.kafka.coordinator.group.generated.ConsumerGroupMetadataKey; +import org.apache.kafka.coordinator.group.generated.ConsumerGroupMetadataValue; +import org.apache.kafka.coordinator.group.generated.ConsumerGroupPartitionMetadataKey; +import org.apache.kafka.coordinator.group.generated.ConsumerGroupPartitionMetadataValue; +import org.apache.kafka.coordinator.group.generated.ConsumerGroupTargetAssignmentMemberKey; +import org.apache.kafka.coordinator.group.generated.ConsumerGroupTargetAssignmentMemberValue; +import org.apache.kafka.coordinator.group.generated.ConsumerGroupTargetAssignmentMetadataKey; +import org.apache.kafka.coordinator.group.generated.ConsumerGroupTargetAssignmentMetadataValue; +import org.apache.kafka.image.TopicImage; +import org.apache.kafka.image.TopicsDelta; +import org.apache.kafka.image.TopicsImage; +import org.apache.kafka.server.common.ApiMessageAndVersion; +import org.apache.kafka.timeline.SnapshotRegistry; +import org.junit.jupiter.api.Test; + +import java.net.InetAddress; +import java.util.ArrayList; +import java.util.Arrays; +import java.util.Collections; +import java.util.HashMap; +import java.util.HashSet; +import java.util.List; +import java.util.Map; +import java.util.Objects; +import java.util.Set; + +import static org.apache.kafka.coordinator.group.AssignmentTestUtil.mkAssignment; +import static org.apache.kafka.coordinator.group.AssignmentTestUtil.mkTopicAssignment; +import static org.junit.jupiter.api.AssertionFailureBuilder.assertionFailure; +import static org.junit.jupiter.api.Assertions.assertEquals; +import static org.junit.jupiter.api.Assertions.assertNotEquals; +import static org.junit.jupiter.api.A
[GitHub] [kafka] dajac commented on a diff in pull request #13639: KAFKA-14462; [12/N] Add GroupMetadataManager and ConsumerGroup
dajac commented on code in PR #13639: URL: https://github.com/apache/kafka/pull/13639#discussion_r1205254844 ## group-coordinator/src/test/java/org/apache/kafka/coordinator/group/GroupMetadataManagerTest.java: ## @@ -0,0 +1,2017 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.kafka.coordinator.group; + +import org.apache.kafka.common.Uuid; +import org.apache.kafka.common.errors.FencedMemberEpochException; +import org.apache.kafka.common.errors.GroupIdNotFoundException; +import org.apache.kafka.common.errors.GroupMaxSizeReachedException; +import org.apache.kafka.common.errors.InvalidRequestException; +import org.apache.kafka.common.errors.UnknownMemberIdException; +import org.apache.kafka.common.errors.UnknownServerException; +import org.apache.kafka.common.errors.UnsupportedAssignorException; +import org.apache.kafka.common.message.ConsumerGroupHeartbeatRequestData; +import org.apache.kafka.common.message.ConsumerGroupHeartbeatResponseData; +import org.apache.kafka.common.metadata.PartitionRecord; +import org.apache.kafka.common.metadata.TopicRecord; +import org.apache.kafka.common.network.ClientInformation; +import org.apache.kafka.common.network.ListenerName; +import org.apache.kafka.common.protocol.ApiKeys; +import org.apache.kafka.common.protocol.ApiMessage; +import org.apache.kafka.common.requests.RequestContext; +import org.apache.kafka.common.requests.RequestHeader; +import org.apache.kafka.common.security.auth.KafkaPrincipal; +import org.apache.kafka.common.security.auth.SecurityProtocol; +import org.apache.kafka.common.utils.LogContext; +import org.apache.kafka.coordinator.group.assignor.AssignmentSpec; +import org.apache.kafka.coordinator.group.assignor.GroupAssignment; +import org.apache.kafka.coordinator.group.assignor.PartitionAssignor; +import org.apache.kafka.coordinator.group.assignor.PartitionAssignorException; +import org.apache.kafka.coordinator.group.consumer.Assignment; +import org.apache.kafka.coordinator.group.consumer.ConsumerGroup; +import org.apache.kafka.coordinator.group.consumer.ConsumerGroupMember; +import org.apache.kafka.coordinator.group.consumer.TopicMetadata; +import org.apache.kafka.coordinator.group.generated.ConsumerGroupCurrentMemberAssignmentKey; +import org.apache.kafka.coordinator.group.generated.ConsumerGroupCurrentMemberAssignmentValue; +import org.apache.kafka.coordinator.group.generated.ConsumerGroupMemberMetadataKey; +import org.apache.kafka.coordinator.group.generated.ConsumerGroupMemberMetadataValue; +import org.apache.kafka.coordinator.group.generated.ConsumerGroupMetadataKey; +import org.apache.kafka.coordinator.group.generated.ConsumerGroupMetadataValue; +import org.apache.kafka.coordinator.group.generated.ConsumerGroupPartitionMetadataKey; +import org.apache.kafka.coordinator.group.generated.ConsumerGroupPartitionMetadataValue; +import org.apache.kafka.coordinator.group.generated.ConsumerGroupTargetAssignmentMemberKey; +import org.apache.kafka.coordinator.group.generated.ConsumerGroupTargetAssignmentMemberValue; +import org.apache.kafka.coordinator.group.generated.ConsumerGroupTargetAssignmentMetadataKey; +import org.apache.kafka.coordinator.group.generated.ConsumerGroupTargetAssignmentMetadataValue; +import org.apache.kafka.image.TopicImage; +import org.apache.kafka.image.TopicsDelta; +import org.apache.kafka.image.TopicsImage; +import org.apache.kafka.server.common.ApiMessageAndVersion; +import org.apache.kafka.timeline.SnapshotRegistry; +import org.junit.jupiter.api.Test; + +import java.net.InetAddress; +import java.util.ArrayList; +import java.util.Arrays; +import java.util.Collections; +import java.util.HashMap; +import java.util.HashSet; +import java.util.List; +import java.util.Map; +import java.util.Objects; +import java.util.Set; + +import static org.apache.kafka.coordinator.group.AssignmentTestUtil.mkAssignment; +import static org.apache.kafka.coordinator.group.AssignmentTestUtil.mkTopicAssignment; +import static org.junit.jupiter.api.AssertionFailureBuilder.assertionFailure; +import static org.junit.jupiter.api.Assertions.assertEquals; +import static org.junit.jupiter.api.Assertions.assertNotEquals; +import static org.junit.jupiter.api.A
[GitHub] [kafka] dajac commented on a diff in pull request #13639: KAFKA-14462; [12/N] Add GroupMetadataManager and ConsumerGroup
dajac commented on code in PR #13639: URL: https://github.com/apache/kafka/pull/13639#discussion_r1205253148 ## group-coordinator/src/test/java/org/apache/kafka/coordinator/group/GroupMetadataManagerTest.java: ## @@ -0,0 +1,2017 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.kafka.coordinator.group; + +import org.apache.kafka.common.Uuid; +import org.apache.kafka.common.errors.FencedMemberEpochException; +import org.apache.kafka.common.errors.GroupIdNotFoundException; +import org.apache.kafka.common.errors.GroupMaxSizeReachedException; +import org.apache.kafka.common.errors.InvalidRequestException; +import org.apache.kafka.common.errors.UnknownMemberIdException; +import org.apache.kafka.common.errors.UnknownServerException; +import org.apache.kafka.common.errors.UnsupportedAssignorException; +import org.apache.kafka.common.message.ConsumerGroupHeartbeatRequestData; +import org.apache.kafka.common.message.ConsumerGroupHeartbeatResponseData; +import org.apache.kafka.common.metadata.PartitionRecord; +import org.apache.kafka.common.metadata.TopicRecord; +import org.apache.kafka.common.network.ClientInformation; +import org.apache.kafka.common.network.ListenerName; +import org.apache.kafka.common.protocol.ApiKeys; +import org.apache.kafka.common.protocol.ApiMessage; +import org.apache.kafka.common.requests.RequestContext; +import org.apache.kafka.common.requests.RequestHeader; +import org.apache.kafka.common.security.auth.KafkaPrincipal; +import org.apache.kafka.common.security.auth.SecurityProtocol; +import org.apache.kafka.common.utils.LogContext; +import org.apache.kafka.coordinator.group.assignor.AssignmentSpec; +import org.apache.kafka.coordinator.group.assignor.GroupAssignment; +import org.apache.kafka.coordinator.group.assignor.PartitionAssignor; +import org.apache.kafka.coordinator.group.assignor.PartitionAssignorException; +import org.apache.kafka.coordinator.group.consumer.Assignment; +import org.apache.kafka.coordinator.group.consumer.ConsumerGroup; +import org.apache.kafka.coordinator.group.consumer.ConsumerGroupMember; +import org.apache.kafka.coordinator.group.consumer.TopicMetadata; +import org.apache.kafka.coordinator.group.generated.ConsumerGroupCurrentMemberAssignmentKey; +import org.apache.kafka.coordinator.group.generated.ConsumerGroupCurrentMemberAssignmentValue; +import org.apache.kafka.coordinator.group.generated.ConsumerGroupMemberMetadataKey; +import org.apache.kafka.coordinator.group.generated.ConsumerGroupMemberMetadataValue; +import org.apache.kafka.coordinator.group.generated.ConsumerGroupMetadataKey; +import org.apache.kafka.coordinator.group.generated.ConsumerGroupMetadataValue; +import org.apache.kafka.coordinator.group.generated.ConsumerGroupPartitionMetadataKey; +import org.apache.kafka.coordinator.group.generated.ConsumerGroupPartitionMetadataValue; +import org.apache.kafka.coordinator.group.generated.ConsumerGroupTargetAssignmentMemberKey; +import org.apache.kafka.coordinator.group.generated.ConsumerGroupTargetAssignmentMemberValue; +import org.apache.kafka.coordinator.group.generated.ConsumerGroupTargetAssignmentMetadataKey; +import org.apache.kafka.coordinator.group.generated.ConsumerGroupTargetAssignmentMetadataValue; +import org.apache.kafka.image.TopicImage; +import org.apache.kafka.image.TopicsDelta; +import org.apache.kafka.image.TopicsImage; +import org.apache.kafka.server.common.ApiMessageAndVersion; +import org.apache.kafka.timeline.SnapshotRegistry; +import org.junit.jupiter.api.Test; + +import java.net.InetAddress; +import java.util.ArrayList; +import java.util.Arrays; +import java.util.Collections; +import java.util.HashMap; +import java.util.HashSet; +import java.util.List; +import java.util.Map; +import java.util.Objects; +import java.util.Set; + +import static org.apache.kafka.coordinator.group.AssignmentTestUtil.mkAssignment; +import static org.apache.kafka.coordinator.group.AssignmentTestUtil.mkTopicAssignment; +import static org.junit.jupiter.api.AssertionFailureBuilder.assertionFailure; +import static org.junit.jupiter.api.Assertions.assertEquals; +import static org.junit.jupiter.api.Assertions.assertNotEquals; +import static org.junit.jupiter.api.A
[GitHub] [kafka] dajac commented on a diff in pull request #13639: KAFKA-14462; [12/N] Add GroupMetadataManager and ConsumerGroup
dajac commented on code in PR #13639: URL: https://github.com/apache/kafka/pull/13639#discussion_r1205252133 ## group-coordinator/src/test/java/org/apache/kafka/coordinator/group/GroupMetadataManagerTest.java: ## @@ -0,0 +1,2017 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.kafka.coordinator.group; + +import org.apache.kafka.common.Uuid; +import org.apache.kafka.common.errors.FencedMemberEpochException; +import org.apache.kafka.common.errors.GroupIdNotFoundException; +import org.apache.kafka.common.errors.GroupMaxSizeReachedException; +import org.apache.kafka.common.errors.InvalidRequestException; +import org.apache.kafka.common.errors.UnknownMemberIdException; +import org.apache.kafka.common.errors.UnknownServerException; +import org.apache.kafka.common.errors.UnsupportedAssignorException; +import org.apache.kafka.common.message.ConsumerGroupHeartbeatRequestData; +import org.apache.kafka.common.message.ConsumerGroupHeartbeatResponseData; +import org.apache.kafka.common.metadata.PartitionRecord; +import org.apache.kafka.common.metadata.TopicRecord; +import org.apache.kafka.common.network.ClientInformation; +import org.apache.kafka.common.network.ListenerName; +import org.apache.kafka.common.protocol.ApiKeys; +import org.apache.kafka.common.protocol.ApiMessage; +import org.apache.kafka.common.requests.RequestContext; +import org.apache.kafka.common.requests.RequestHeader; +import org.apache.kafka.common.security.auth.KafkaPrincipal; +import org.apache.kafka.common.security.auth.SecurityProtocol; +import org.apache.kafka.common.utils.LogContext; +import org.apache.kafka.coordinator.group.assignor.AssignmentSpec; +import org.apache.kafka.coordinator.group.assignor.GroupAssignment; +import org.apache.kafka.coordinator.group.assignor.PartitionAssignor; +import org.apache.kafka.coordinator.group.assignor.PartitionAssignorException; +import org.apache.kafka.coordinator.group.consumer.Assignment; +import org.apache.kafka.coordinator.group.consumer.ConsumerGroup; +import org.apache.kafka.coordinator.group.consumer.ConsumerGroupMember; +import org.apache.kafka.coordinator.group.consumer.TopicMetadata; +import org.apache.kafka.coordinator.group.generated.ConsumerGroupCurrentMemberAssignmentKey; +import org.apache.kafka.coordinator.group.generated.ConsumerGroupCurrentMemberAssignmentValue; +import org.apache.kafka.coordinator.group.generated.ConsumerGroupMemberMetadataKey; +import org.apache.kafka.coordinator.group.generated.ConsumerGroupMemberMetadataValue; +import org.apache.kafka.coordinator.group.generated.ConsumerGroupMetadataKey; +import org.apache.kafka.coordinator.group.generated.ConsumerGroupMetadataValue; +import org.apache.kafka.coordinator.group.generated.ConsumerGroupPartitionMetadataKey; +import org.apache.kafka.coordinator.group.generated.ConsumerGroupPartitionMetadataValue; +import org.apache.kafka.coordinator.group.generated.ConsumerGroupTargetAssignmentMemberKey; +import org.apache.kafka.coordinator.group.generated.ConsumerGroupTargetAssignmentMemberValue; +import org.apache.kafka.coordinator.group.generated.ConsumerGroupTargetAssignmentMetadataKey; +import org.apache.kafka.coordinator.group.generated.ConsumerGroupTargetAssignmentMetadataValue; +import org.apache.kafka.image.TopicImage; +import org.apache.kafka.image.TopicsDelta; +import org.apache.kafka.image.TopicsImage; +import org.apache.kafka.server.common.ApiMessageAndVersion; +import org.apache.kafka.timeline.SnapshotRegistry; +import org.junit.jupiter.api.Test; + +import java.net.InetAddress; +import java.util.ArrayList; +import java.util.Arrays; +import java.util.Collections; +import java.util.HashMap; +import java.util.HashSet; +import java.util.List; +import java.util.Map; +import java.util.Objects; +import java.util.Set; + +import static org.apache.kafka.coordinator.group.AssignmentTestUtil.mkAssignment; +import static org.apache.kafka.coordinator.group.AssignmentTestUtil.mkTopicAssignment; +import static org.junit.jupiter.api.AssertionFailureBuilder.assertionFailure; +import static org.junit.jupiter.api.Assertions.assertEquals; +import static org.junit.jupiter.api.Assertions.assertNotEquals; +import static org.junit.jupiter.api.A
[jira] [Commented] (KAFKA-14934) KafkaClusterTestKit makes FaultHandler accessible
[ https://issues.apache.org/jira/browse/KAFKA-14934?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17726127#comment-17726127 ] Owen C.H. Leung commented on KAFKA-14934: - Hi [~mumrah] , Can I give a try to this ? I'm new to contributing to kafka and want to get my hands dirty with it > KafkaClusterTestKit makes FaultHandler accessible > - > > Key: KAFKA-14934 > URL: https://issues.apache.org/jira/browse/KAFKA-14934 > Project: Kafka > Issue Type: Improvement > Components: unit tests >Reporter: David Arthur >Priority: Trivial > Labels: good-first-issue > > In KafkaClusterTestKit, we use a mock fault handler to avoid exiting the > process during tests. It would be useful to expose this fault handler so > tests could verify certain fault conditions (like a broker/controller failing > to start) -- This message was sent by Atlassian Jira (v8.20.10#820010)
[GitHub] [kafka] dajac commented on a diff in pull request #13639: KAFKA-14462; [12/N] Add GroupMetadataManager and ConsumerGroup
dajac commented on code in PR #13639: URL: https://github.com/apache/kafka/pull/13639#discussion_r1205248818 ## group-coordinator/src/test/java/org/apache/kafka/coordinator/group/GroupMetadataManagerTest.java: ## @@ -0,0 +1,2017 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.kafka.coordinator.group; + +import org.apache.kafka.common.Uuid; +import org.apache.kafka.common.errors.FencedMemberEpochException; +import org.apache.kafka.common.errors.GroupIdNotFoundException; +import org.apache.kafka.common.errors.GroupMaxSizeReachedException; +import org.apache.kafka.common.errors.InvalidRequestException; +import org.apache.kafka.common.errors.UnknownMemberIdException; +import org.apache.kafka.common.errors.UnknownServerException; +import org.apache.kafka.common.errors.UnsupportedAssignorException; +import org.apache.kafka.common.message.ConsumerGroupHeartbeatRequestData; +import org.apache.kafka.common.message.ConsumerGroupHeartbeatResponseData; +import org.apache.kafka.common.metadata.PartitionRecord; +import org.apache.kafka.common.metadata.TopicRecord; +import org.apache.kafka.common.network.ClientInformation; +import org.apache.kafka.common.network.ListenerName; +import org.apache.kafka.common.protocol.ApiKeys; +import org.apache.kafka.common.protocol.ApiMessage; +import org.apache.kafka.common.requests.RequestContext; +import org.apache.kafka.common.requests.RequestHeader; +import org.apache.kafka.common.security.auth.KafkaPrincipal; +import org.apache.kafka.common.security.auth.SecurityProtocol; +import org.apache.kafka.common.utils.LogContext; +import org.apache.kafka.coordinator.group.assignor.AssignmentSpec; +import org.apache.kafka.coordinator.group.assignor.GroupAssignment; +import org.apache.kafka.coordinator.group.assignor.PartitionAssignor; +import org.apache.kafka.coordinator.group.assignor.PartitionAssignorException; +import org.apache.kafka.coordinator.group.consumer.Assignment; +import org.apache.kafka.coordinator.group.consumer.ConsumerGroup; +import org.apache.kafka.coordinator.group.consumer.ConsumerGroupMember; +import org.apache.kafka.coordinator.group.consumer.TopicMetadata; +import org.apache.kafka.coordinator.group.generated.ConsumerGroupCurrentMemberAssignmentKey; +import org.apache.kafka.coordinator.group.generated.ConsumerGroupCurrentMemberAssignmentValue; +import org.apache.kafka.coordinator.group.generated.ConsumerGroupMemberMetadataKey; +import org.apache.kafka.coordinator.group.generated.ConsumerGroupMemberMetadataValue; +import org.apache.kafka.coordinator.group.generated.ConsumerGroupMetadataKey; +import org.apache.kafka.coordinator.group.generated.ConsumerGroupMetadataValue; +import org.apache.kafka.coordinator.group.generated.ConsumerGroupPartitionMetadataKey; +import org.apache.kafka.coordinator.group.generated.ConsumerGroupPartitionMetadataValue; +import org.apache.kafka.coordinator.group.generated.ConsumerGroupTargetAssignmentMemberKey; +import org.apache.kafka.coordinator.group.generated.ConsumerGroupTargetAssignmentMemberValue; +import org.apache.kafka.coordinator.group.generated.ConsumerGroupTargetAssignmentMetadataKey; +import org.apache.kafka.coordinator.group.generated.ConsumerGroupTargetAssignmentMetadataValue; +import org.apache.kafka.image.TopicImage; +import org.apache.kafka.image.TopicsDelta; +import org.apache.kafka.image.TopicsImage; +import org.apache.kafka.server.common.ApiMessageAndVersion; +import org.apache.kafka.timeline.SnapshotRegistry; +import org.junit.jupiter.api.Test; + +import java.net.InetAddress; +import java.util.ArrayList; +import java.util.Arrays; +import java.util.Collections; +import java.util.HashMap; +import java.util.HashSet; +import java.util.List; +import java.util.Map; +import java.util.Objects; +import java.util.Set; + +import static org.apache.kafka.coordinator.group.AssignmentTestUtil.mkAssignment; +import static org.apache.kafka.coordinator.group.AssignmentTestUtil.mkTopicAssignment; +import static org.junit.jupiter.api.AssertionFailureBuilder.assertionFailure; +import static org.junit.jupiter.api.Assertions.assertEquals; +import static org.junit.jupiter.api.Assertions.assertNotEquals; +import static org.junit.jupiter.api.A
[GitHub] [kafka] dajac commented on a diff in pull request #13639: KAFKA-14462; [12/N] Add GroupMetadataManager and ConsumerGroup
dajac commented on code in PR #13639: URL: https://github.com/apache/kafka/pull/13639#discussion_r1205247698 ## group-coordinator/src/test/java/org/apache/kafka/coordinator/group/GroupMetadataManagerTest.java: ## @@ -0,0 +1,2017 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.kafka.coordinator.group; + +import org.apache.kafka.common.Uuid; +import org.apache.kafka.common.errors.FencedMemberEpochException; +import org.apache.kafka.common.errors.GroupIdNotFoundException; +import org.apache.kafka.common.errors.GroupMaxSizeReachedException; +import org.apache.kafka.common.errors.InvalidRequestException; +import org.apache.kafka.common.errors.UnknownMemberIdException; +import org.apache.kafka.common.errors.UnknownServerException; +import org.apache.kafka.common.errors.UnsupportedAssignorException; +import org.apache.kafka.common.message.ConsumerGroupHeartbeatRequestData; +import org.apache.kafka.common.message.ConsumerGroupHeartbeatResponseData; +import org.apache.kafka.common.metadata.PartitionRecord; +import org.apache.kafka.common.metadata.TopicRecord; +import org.apache.kafka.common.network.ClientInformation; +import org.apache.kafka.common.network.ListenerName; +import org.apache.kafka.common.protocol.ApiKeys; +import org.apache.kafka.common.protocol.ApiMessage; +import org.apache.kafka.common.requests.RequestContext; +import org.apache.kafka.common.requests.RequestHeader; +import org.apache.kafka.common.security.auth.KafkaPrincipal; +import org.apache.kafka.common.security.auth.SecurityProtocol; +import org.apache.kafka.common.utils.LogContext; +import org.apache.kafka.coordinator.group.assignor.AssignmentSpec; +import org.apache.kafka.coordinator.group.assignor.GroupAssignment; +import org.apache.kafka.coordinator.group.assignor.PartitionAssignor; +import org.apache.kafka.coordinator.group.assignor.PartitionAssignorException; +import org.apache.kafka.coordinator.group.consumer.Assignment; +import org.apache.kafka.coordinator.group.consumer.ConsumerGroup; +import org.apache.kafka.coordinator.group.consumer.ConsumerGroupMember; +import org.apache.kafka.coordinator.group.consumer.TopicMetadata; +import org.apache.kafka.coordinator.group.generated.ConsumerGroupCurrentMemberAssignmentKey; +import org.apache.kafka.coordinator.group.generated.ConsumerGroupCurrentMemberAssignmentValue; +import org.apache.kafka.coordinator.group.generated.ConsumerGroupMemberMetadataKey; +import org.apache.kafka.coordinator.group.generated.ConsumerGroupMemberMetadataValue; +import org.apache.kafka.coordinator.group.generated.ConsumerGroupMetadataKey; +import org.apache.kafka.coordinator.group.generated.ConsumerGroupMetadataValue; +import org.apache.kafka.coordinator.group.generated.ConsumerGroupPartitionMetadataKey; +import org.apache.kafka.coordinator.group.generated.ConsumerGroupPartitionMetadataValue; +import org.apache.kafka.coordinator.group.generated.ConsumerGroupTargetAssignmentMemberKey; +import org.apache.kafka.coordinator.group.generated.ConsumerGroupTargetAssignmentMemberValue; +import org.apache.kafka.coordinator.group.generated.ConsumerGroupTargetAssignmentMetadataKey; +import org.apache.kafka.coordinator.group.generated.ConsumerGroupTargetAssignmentMetadataValue; +import org.apache.kafka.image.TopicImage; +import org.apache.kafka.image.TopicsDelta; +import org.apache.kafka.image.TopicsImage; +import org.apache.kafka.server.common.ApiMessageAndVersion; +import org.apache.kafka.timeline.SnapshotRegistry; +import org.junit.jupiter.api.Test; + +import java.net.InetAddress; +import java.util.ArrayList; +import java.util.Arrays; +import java.util.Collections; +import java.util.HashMap; +import java.util.HashSet; +import java.util.List; +import java.util.Map; +import java.util.Objects; +import java.util.Set; + +import static org.apache.kafka.coordinator.group.AssignmentTestUtil.mkAssignment; +import static org.apache.kafka.coordinator.group.AssignmentTestUtil.mkTopicAssignment; +import static org.junit.jupiter.api.AssertionFailureBuilder.assertionFailure; +import static org.junit.jupiter.api.Assertions.assertEquals; +import static org.junit.jupiter.api.Assertions.assertNotEquals; +import static org.junit.jupiter.api.A
[GitHub] [kafka] dajac commented on a diff in pull request #13639: KAFKA-14462; [12/N] Add GroupMetadataManager and ConsumerGroup
dajac commented on code in PR #13639: URL: https://github.com/apache/kafka/pull/13639#discussion_r1205239532 ## group-coordinator/src/test/java/org/apache/kafka/coordinator/group/consumer/ConsumerGroupTest.java: ## @@ -0,0 +1,500 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.kafka.coordinator.group.consumer; + +import org.apache.kafka.common.Uuid; +import org.apache.kafka.common.errors.UnknownMemberIdException; +import org.apache.kafka.common.utils.LogContext; +import org.apache.kafka.coordinator.group.GroupMetadataManagerTest; +import org.apache.kafka.image.TopicsImage; +import org.apache.kafka.timeline.SnapshotRegistry; +import org.junit.jupiter.api.Test; + +import java.util.Arrays; +import java.util.Collections; +import java.util.Optional; + +import static org.apache.kafka.common.utils.Utils.mkEntry; +import static org.apache.kafka.common.utils.Utils.mkMap; +import static org.apache.kafka.coordinator.group.AssignmentTestUtil.mkAssignment; +import static org.apache.kafka.coordinator.group.AssignmentTestUtil.mkTopicAssignment; +import static org.junit.jupiter.api.Assertions.assertEquals; +import static org.junit.jupiter.api.Assertions.assertFalse; +import static org.junit.jupiter.api.Assertions.assertThrows; +import static org.junit.jupiter.api.Assertions.assertTrue; + +public class ConsumerGroupTest { + +private ConsumerGroup createConsumerGroup(String groupId) { +SnapshotRegistry snapshotRegistry = new SnapshotRegistry(new LogContext()); +return new ConsumerGroup(snapshotRegistry, groupId); +} + +@Test +public void testGetOrCreateMember() { +ConsumerGroup consumerGroup = createConsumerGroup("foo"); +ConsumerGroupMember member; + +// Create a group. +member = consumerGroup.getOrMaybeCreateMember("member-id", true); +assertEquals("member-id", member.memberId()); + +// Get that group back. +member = consumerGroup.getOrMaybeCreateMember("member-id", false); +assertEquals("member-id", member.memberId()); + +assertThrows(UnknownMemberIdException.class, () -> +consumerGroup.getOrMaybeCreateMember("does-not-exist", false)); +} + +@Test +public void testUpdateMember() { +ConsumerGroup consumerGroup = createConsumerGroup("foo"); +ConsumerGroupMember member; + +member = consumerGroup.getOrMaybeCreateMember("member", true); + +member = new ConsumerGroupMember.Builder(member) +.setSubscribedTopicNames(Arrays.asList("foo", "bar")) +.build(); + +consumerGroup.updateMember(member); + +assertEquals(member, consumerGroup.getOrMaybeCreateMember("member", false)); +} + +@Test +public void testRemoveMember() { +ConsumerGroup consumerGroup = createConsumerGroup("foo"); + +consumerGroup.getOrMaybeCreateMember("member", true); +assertTrue(consumerGroup.hasMember("member")); + +consumerGroup.removeMember("member"); +assertFalse(consumerGroup.hasMember("member")); + +} + +@Test +public void testUpdatingMemberUpdatesPartitionEpoch() { +Uuid fooTopicId = Uuid.randomUuid(); +Uuid barTopicId = Uuid.randomUuid(); +Uuid zarTopicId = Uuid.randomUuid(); + +ConsumerGroup consumerGroup = createConsumerGroup("foo"); +ConsumerGroupMember member; + +member = new ConsumerGroupMember.Builder("member") +.setMemberEpoch(10) +.setAssignedPartitions(mkAssignment( +mkTopicAssignment(fooTopicId, 1, 2, 3))) +.setPartitionsPendingRevocation(mkAssignment( +mkTopicAssignment(barTopicId, 4, 5, 6))) +.setPartitionsPendingAssignment(mkAssignment( +mkTopicAssignment(zarTopicId, 7, 8, 9))) +.build(); + +consumerGroup.updateMember(member); + +assertEquals(10, consumerGroup.currentPartitionEpoch(fooTopicId, 1)); +assertEquals(10, consumerGroup.currentPartitionEpoch(fooTopicId, 2)); +assertEquals(10, consumerGroup.currentPartitionEpoch(fooTopicId, 3)); +assertEquals(10, consumerGroup.currentPartitionEpoch(barT
[GitHub] [kafka] dajac commented on a diff in pull request #13639: KAFKA-14462; [12/N] Add GroupMetadataManager and ConsumerGroup
dajac commented on code in PR #13639: URL: https://github.com/apache/kafka/pull/13639#discussion_r1205239034 ## group-coordinator/src/test/java/org/apache/kafka/coordinator/group/consumer/ConsumerGroupTest.java: ## @@ -0,0 +1,500 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.kafka.coordinator.group.consumer; + +import org.apache.kafka.common.Uuid; +import org.apache.kafka.common.errors.UnknownMemberIdException; +import org.apache.kafka.common.utils.LogContext; +import org.apache.kafka.coordinator.group.GroupMetadataManagerTest; +import org.apache.kafka.image.TopicsImage; +import org.apache.kafka.timeline.SnapshotRegistry; +import org.junit.jupiter.api.Test; + +import java.util.Arrays; +import java.util.Collections; +import java.util.Optional; + +import static org.apache.kafka.common.utils.Utils.mkEntry; +import static org.apache.kafka.common.utils.Utils.mkMap; +import static org.apache.kafka.coordinator.group.AssignmentTestUtil.mkAssignment; +import static org.apache.kafka.coordinator.group.AssignmentTestUtil.mkTopicAssignment; +import static org.junit.jupiter.api.Assertions.assertEquals; +import static org.junit.jupiter.api.Assertions.assertFalse; +import static org.junit.jupiter.api.Assertions.assertThrows; +import static org.junit.jupiter.api.Assertions.assertTrue; + +public class ConsumerGroupTest { + +private ConsumerGroup createConsumerGroup(String groupId) { +SnapshotRegistry snapshotRegistry = new SnapshotRegistry(new LogContext()); +return new ConsumerGroup(snapshotRegistry, groupId); +} + +@Test +public void testGetOrCreateMember() { +ConsumerGroup consumerGroup = createConsumerGroup("foo"); +ConsumerGroupMember member; + +// Create a group. +member = consumerGroup.getOrMaybeCreateMember("member-id", true); +assertEquals("member-id", member.memberId()); + +// Get that group back. +member = consumerGroup.getOrMaybeCreateMember("member-id", false); +assertEquals("member-id", member.memberId()); + +assertThrows(UnknownMemberIdException.class, () -> +consumerGroup.getOrMaybeCreateMember("does-not-exist", false)); +} + +@Test +public void testUpdateMember() { +ConsumerGroup consumerGroup = createConsumerGroup("foo"); +ConsumerGroupMember member; + +member = consumerGroup.getOrMaybeCreateMember("member", true); + +member = new ConsumerGroupMember.Builder(member) +.setSubscribedTopicNames(Arrays.asList("foo", "bar")) +.build(); + +consumerGroup.updateMember(member); + +assertEquals(member, consumerGroup.getOrMaybeCreateMember("member", false)); +} + +@Test +public void testRemoveMember() { +ConsumerGroup consumerGroup = createConsumerGroup("foo"); + +consumerGroup.getOrMaybeCreateMember("member", true); +assertTrue(consumerGroup.hasMember("member")); + +consumerGroup.removeMember("member"); +assertFalse(consumerGroup.hasMember("member")); + +} + +@Test +public void testUpdatingMemberUpdatesPartitionEpoch() { +Uuid fooTopicId = Uuid.randomUuid(); +Uuid barTopicId = Uuid.randomUuid(); +Uuid zarTopicId = Uuid.randomUuid(); + +ConsumerGroup consumerGroup = createConsumerGroup("foo"); +ConsumerGroupMember member; + +member = new ConsumerGroupMember.Builder("member") +.setMemberEpoch(10) +.setAssignedPartitions(mkAssignment( +mkTopicAssignment(fooTopicId, 1, 2, 3))) +.setPartitionsPendingRevocation(mkAssignment( +mkTopicAssignment(barTopicId, 4, 5, 6))) +.setPartitionsPendingAssignment(mkAssignment( +mkTopicAssignment(zarTopicId, 7, 8, 9))) +.build(); + +consumerGroup.updateMember(member); + +assertEquals(10, consumerGroup.currentPartitionEpoch(fooTopicId, 1)); +assertEquals(10, consumerGroup.currentPartitionEpoch(fooTopicId, 2)); +assertEquals(10, consumerGroup.currentPartitionEpoch(fooTopicId, 3)); +assertEquals(10, consumerGroup.currentPartitionEpoch(barT