[GitHub] [kafka] kamalcph commented on a diff in pull request #13747: MINOR: Fix ListOffsetsRequestTest.testResponseIncludesLeaderEpoch

2023-05-25 Thread via GitHub


kamalcph commented on code in PR #13747:
URL: https://github.com/apache/kafka/pull/13747#discussion_r1206227365


##
core/src/test/scala/unit/kafka/server/ListOffsetsRequestTest.scala:
##
@@ -192,7 +192,7 @@ class ListOffsetsRequestTest extends BaseRequestTest {
 killBroker(firstLeaderId)
 val secondLeaderId = TestUtils.awaitLeaderChange(servers, partition, 
firstLeaderId)
 // make sure high watermark of new leader has caught up
-TestUtils.waitUntilTrue(() => sendRequest(secondLeaderId, 0L, 
-1).errorCode() != Errors.OFFSET_NOT_AVAILABLE.code(),
+TestUtils.waitUntilTrue(() => sendRequest(secondLeaderId, 
ListOffsetsRequest.LATEST_TIMESTAMP, -1).errorCode != 
Errors.OFFSET_NOT_AVAILABLE.code,

Review Comment:
   Thanks for fixing this!



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] mumrah commented on a diff in pull request #13758: KAFKA-15010 ZK migration failover support

2023-05-25 Thread via GitHub


mumrah commented on code in PR #13758:
URL: https://github.com/apache/kafka/pull/13758#discussion_r1206199732


##
metadata/src/main/java/org/apache/kafka/metadata/migration/KRaftMigrationZkWriter.java:
##
@@ -260,37 +293,37 @@ public void visitScramCredential(String userName, 
ScramMechanism scramMechanism,
 });
 
 changedNonUserEntities.forEach(entity -> {
-Map quotaMap = 
clientQuotasImage.entities().get(entity).quotaMap();
-operationConsumer.accept("Update client quotas for " + entity, 
migrationState ->
+Map quotaMap = 
getClientQuotaMapForEntity(clientQuotasImage, entity);
+opConsumer.accept(UPDATE_CLIENT_QUOTA, "Update client quotas for " 
+ entity, migrationState ->
 
migrationClient.configClient().writeClientQuotas(entity.entries(), quotaMap, 
Collections.emptyMap(), migrationState));
 });
 
 changedUsers.forEach(userName -> {
 ClientQuotaEntity entity = new 
ClientQuotaEntity(Collections.singletonMap(ClientQuotaEntity.USER, userName));
-Map quotaMap = 
clientQuotasImage.entities().get(entity).quotaMap();
+Map quotaMap = 
getClientQuotaMapForEntity(clientQuotasImage, entity);

Review Comment:
   There was an NPE here prior to this patch. If the SCRAM credentials changed, 
but ClientQuotas did not, the `get(entity)` call would return a null.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] jeffkbkim commented on a diff in pull request #13267: KAFKA-14694: RPCProducerIdManager should not wait on new block

2023-05-25 Thread via GitHub


jeffkbkim commented on code in PR #13267:
URL: https://github.com/apache/kafka/pull/13267#discussion_r1206145989


##
core/src/test/scala/unit/kafka/coordinator/transaction/ProducerIdManagerTest.scala:
##
@@ -113,38 +142,113 @@ class ProducerIdManagerTest {
   }
 
   @ParameterizedTest
-  @ValueSource(ints = Array(1, 2, 10))
-  def testContiguousIds(idBlockLen: Int): Unit = {
+  @ValueSource(ints = Array(1, 2, 10, 100))
+  def testConcurrentGeneratePidRequests(idBlockLen: Int): Unit = {
+// Send concurrent generateProducerId requests. Ensure that the generated 
producer id is unique.
+// For each block (total 3 blocks), only "idBlockLen" number of requests 
should go through.
+// All other requests should fail immediately.
+
+val numThreads = 5
+val latch = new CountDownLatch(idBlockLen * 3)
 val manager = new MockProducerIdManager(0, 0, idBlockLen)
-
-IntStream.range(0, idBlockLen * 3).forEach { i =>
-  assertEquals(i, manager.generateProducerId())
+val pidMap = mutable.Map[Long, Int]()
+val requestHandlerThreadPool = Executors.newFixedThreadPool(numThreads)
+
+for ( _ <- 0 until numThreads) {
+  requestHandlerThreadPool.submit(() => {
+while(latch.getCount > 0) {
+  val result = manager.generateProducerId()
+  result match {
+case Success(pid) =>
+  pidMap synchronized {
+if (latch.getCount != 0) {
+  val counter = pidMap.getOrElse(pid, 0)
+  pidMap += pid -> (counter + 1)
+  latch.countDown()
+}
+  }
+
+case Failure(exception) =>
+  assertEquals(classOf[CoordinatorLoadInProgressException], 
exception.getClass)
+  }
+  Thread.sleep(100)
+}
+  }, 0)
+}
+assertTrue(latch.await(15000, TimeUnit.MILLISECONDS))

Review Comment:
   roughly 6 seconds. i have lowered this to 10s



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] jeffkbkim commented on a diff in pull request #13267: KAFKA-14694: RPCProducerIdManager should not wait on new block

2023-05-25 Thread via GitHub


jeffkbkim commented on code in PR #13267:
URL: https://github.com/apache/kafka/pull/13267#discussion_r1206145853


##
core/src/test/scala/integration/kafka/coordinator/transaction/ProducerIdsIntegrationTest.scala:
##
@@ -61,27 +63,59 @@ class ProducerIdsIntegrationTest {
 clusterInstance.stop()
   }
 
+  @ClusterTest(clusterType = Type.ZK, brokers = 1, autoStart = AutoStart.NO)
+  @Timeout(20)
+  def testHandleAllocateProducerIdsSingleRequestHandlerThread(clusterInstance: 
ClusterInstance): Unit = {
+
clusterInstance.config().serverProperties().put(KafkaConfig.NumIoThreadsProp, 
"1")
+clusterInstance.start()
+verifyUniqueIds(clusterInstance)
+clusterInstance.stop()
+  }
+
+  @Disabled // TODO: Enable once producer id block size is configurable

Review Comment:
   I have appended the JIRA at the end



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] showuon merged pull request #13763: MINOR: use debug level to log handleCommit

2023-05-25 Thread via GitHub


showuon merged PR #13763:
URL: https://github.com/apache/kafka/pull/13763


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] showuon commented on pull request #13763: MINOR: use debug level to log handleCommit

2023-05-25 Thread via GitHub


showuon commented on PR #13763:
URL: https://github.com/apache/kafka/pull/13763#issuecomment-1563700586

   Failed tests are unrelated
   ```
   Build / JDK 11 and Scala 2.13 / 
kafka.server.DynamicBrokerReconfigurationTest.testTrustStoreAlter(String).quorum=kraft
   Build / JDK 8 and Scala 2.12 / 
kafka.admin.TopicCommandIntegrationTest.testDescribeAndListTopicsWithoutInternalTopics(String).quorum=zk
   Build / JDK 17 and Scala 2.13 / 
kafka.admin.ResetConsumerGroupOffsetTest.testResetOffsetsToZonedDateTime()
   Build / JDK 17 and Scala 2.13 / 
org.apache.kafka.tools.MetadataQuorumCommandTest.[1] Type=Raft-CoReside, 
Name=testDescribeQuorumStatusSuccessful, MetadataVersion=3.4-IV0, 
Security=PLAINTEXT
   Build / JDK 17 and Scala 2.13 / 
org.apache.kafka.trogdor.coordinator.CoordinatorTest.testTaskRequestWithOldStartMsGetsUpdated()
   ```


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[jira] [Updated] (KAFKA-15029) Make ProducerIdBlock size configurable

2023-05-25 Thread Jeff Kim (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-15029?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Jeff Kim updated KAFKA-15029:
-
Description: 
The producer id block size is currently fixed at 1000. Increasing the size 
allows the pid manager to process more init pid requests before allocating 
more. The current 1000 is quite small and given that clusters can have 
thousands of producers, we should make this configurable to adapt in the 
future. We shoudl also consider updating to a global block size of 5000

 

ProducerIdsIntegrationTest.scala has a disabled test (from KAFKA-14694) that we 
should also enable once we have configurable block sizes.

  was:
The producer id block size is currently fixed at 1000. Increasing the size 
allows the pid manager to process more init pid requests before allocating 
more. The current 1000 is quite small and given that clusters can have 
thousands of producers, we should make this configurable to adapt in the 
future. We shoudl also consider updating to a global block size of 5000

 

ProducerIdsIntegrationTest.scala has a disabled test that we should also enable 
once we have configurable block sizes.


> Make ProducerIdBlock size configurable
> --
>
> Key: KAFKA-15029
> URL: https://issues.apache.org/jira/browse/KAFKA-15029
> Project: Kafka
>  Issue Type: Task
>Reporter: Jeff Kim
>Priority: Major
>
> The producer id block size is currently fixed at 1000. Increasing the size 
> allows the pid manager to process more init pid requests before allocating 
> more. The current 1000 is quite small and given that clusters can have 
> thousands of producers, we should make this configurable to adapt in the 
> future. We shoudl also consider updating to a global block size of 5000
>  
> ProducerIdsIntegrationTest.scala has a disabled test (from KAFKA-14694) that 
> we should also enable once we have configurable block sizes.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Updated] (KAFKA-15029) Make ProducerIdBlock size configurable

2023-05-25 Thread Jeff Kim (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-15029?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Jeff Kim updated KAFKA-15029:
-
Description: 
The producer id block size is currently fixed at 1000. Increasing the size 
allows the pid manager to process more init pid requests before allocating 
more. The current 1000 is quite small and given that clusters can have 
thousands of producers, we should make this configurable to adapt in the 
future. We shoudl also consider updating to a global block size of 5000

 

ProducerIdsIntegrationTest.scala has a disabled test that we should also enable 
once we have configurable block sizes.

  was:The producer id block size is currently fixed at 1000. Increasing the 
size allows the pid manager to process more init pid requests before allocating 
more. The current 1000 is quite small and given that clusters can have 
thousands of producers, we should make this configurable to adapt in the 
future. We shoudl also consider updating to a global block size of 5000


> Make ProducerIdBlock size configurable
> --
>
> Key: KAFKA-15029
> URL: https://issues.apache.org/jira/browse/KAFKA-15029
> Project: Kafka
>  Issue Type: Task
>Reporter: Jeff Kim
>Priority: Major
>
> The producer id block size is currently fixed at 1000. Increasing the size 
> allows the pid manager to process more init pid requests before allocating 
> more. The current 1000 is quite small and given that clusters can have 
> thousands of producers, we should make this configurable to adapt in the 
> future. We shoudl also consider updating to a global block size of 5000
>  
> ProducerIdsIntegrationTest.scala has a disabled test that we should also 
> enable once we have configurable block sizes.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Created] (KAFKA-15029) Make ProducerIdBlock size configurable

2023-05-25 Thread Jeff Kim (Jira)
Jeff Kim created KAFKA-15029:


 Summary: Make ProducerIdBlock size configurable
 Key: KAFKA-15029
 URL: https://issues.apache.org/jira/browse/KAFKA-15029
 Project: Kafka
  Issue Type: Task
Reporter: Jeff Kim


The producer id block size is currently fixed at 1000. Increasing the size 
allows the pid manager to process more init pid requests before allocating 
more. The current 1000 is quite small and given that clusters can have 
thousands of producers, we should make this configurable to adapt in the 
future. We shoudl also consider updating to a global block size of 5000



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Created] (KAFKA-15028) AddPartitionsToTxnManager metrics

2023-05-25 Thread Justine Olshan (Jira)
Justine Olshan created KAFKA-15028:
--

 Summary: AddPartitionsToTxnManager metrics
 Key: KAFKA-15028
 URL: https://issues.apache.org/jira/browse/KAFKA-15028
 Project: Kafka
  Issue Type: Sub-task
Reporter: Justine Olshan


KIP-890 added metrics for the AddPartitionsToTxnManager

VerificationTimeMs – number of milliseconds from adding partition info to the 
manager to the time the response is sent. This will include the round trip to 
the transaction coordinator if it is called. This will also account for 
verifications that fail before the coordinator is called.

VerificationFailureRate – rate of verifications that returned in failure either 
from the AddPartitionsToTxn response or through errors in the manager.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[GitHub] [kafka] hachikuji commented on a diff in pull request #13267: KAFKA-14694: RPCProducerIdManager should not wait on new block

2023-05-25 Thread via GitHub


hachikuji commented on code in PR #13267:
URL: https://github.com/apache/kafka/pull/13267#discussion_r1206088849


##
core/src/test/scala/integration/kafka/coordinator/transaction/ProducerIdsIntegrationTest.scala:
##
@@ -61,27 +63,59 @@ class ProducerIdsIntegrationTest {
 clusterInstance.stop()
   }
 
+  @ClusterTest(clusterType = Type.ZK, brokers = 1, autoStart = AutoStart.NO)
+  @Timeout(20)
+  def testHandleAllocateProducerIdsSingleRequestHandlerThread(clusterInstance: 
ClusterInstance): Unit = {
+
clusterInstance.config().serverProperties().put(KafkaConfig.NumIoThreadsProp, 
"1")
+clusterInstance.start()
+verifyUniqueIds(clusterInstance)
+clusterInstance.stop()
+  }
+
+  @Disabled // TODO: Enable once producer id block size is configurable

Review Comment:
   Can we replace the TODO with a jira? 



##
core/src/test/scala/unit/kafka/coordinator/transaction/ProducerIdManagerTest.scala:
##
@@ -113,38 +142,113 @@ class ProducerIdManagerTest {
   }
 
   @ParameterizedTest
-  @ValueSource(ints = Array(1, 2, 10))
-  def testContiguousIds(idBlockLen: Int): Unit = {
+  @ValueSource(ints = Array(1, 2, 10, 100))
+  def testConcurrentGeneratePidRequests(idBlockLen: Int): Unit = {
+// Send concurrent generateProducerId requests. Ensure that the generated 
producer id is unique.
+// For each block (total 3 blocks), only "idBlockLen" number of requests 
should go through.
+// All other requests should fail immediately.
+
+val numThreads = 5
+val latch = new CountDownLatch(idBlockLen * 3)
 val manager = new MockProducerIdManager(0, 0, idBlockLen)
-
-IntStream.range(0, idBlockLen * 3).forEach { i =>
-  assertEquals(i, manager.generateProducerId())
+val pidMap = mutable.Map[Long, Int]()
+val requestHandlerThreadPool = Executors.newFixedThreadPool(numThreads)
+
+for ( _ <- 0 until numThreads) {
+  requestHandlerThreadPool.submit(() => {
+while(latch.getCount > 0) {
+  val result = manager.generateProducerId()
+  result match {
+case Success(pid) =>
+  pidMap synchronized {
+if (latch.getCount != 0) {
+  val counter = pidMap.getOrElse(pid, 0)
+  pidMap += pid -> (counter + 1)
+  latch.countDown()
+}
+  }
+
+case Failure(exception) =>
+  assertEquals(classOf[CoordinatorLoadInProgressException], 
exception.getClass)
+  }
+  Thread.sleep(100)
+}
+  }, 0)
+}
+assertTrue(latch.await(15000, TimeUnit.MILLISECONDS))

Review Comment:
   How long does this test take?



##
core/src/main/scala/kafka/coordinator/transaction/TransactionCoordinator.scala:
##
@@ -126,17 +132,22 @@ class TransactionCoordinator(txnConfig: TransactionConfig,
 } else {
   val coordinatorEpochAndMetadata = 
txnManager.getTransactionState(transactionalId).flatMap {
 case None =>
-  val producerId = producerIdManager.generateProducerId()
-  val createdMetadata = new TransactionMetadata(transactionalId = 
transactionalId,
-producerId = producerId,
-lastProducerId = RecordBatch.NO_PRODUCER_ID,
-producerEpoch = RecordBatch.NO_PRODUCER_EPOCH,
-lastProducerEpoch = RecordBatch.NO_PRODUCER_EPOCH,
-txnTimeoutMs = transactionTimeoutMs,
-state = Empty,
-topicPartitions = collection.mutable.Set.empty[TopicPartition],
-txnLastUpdateTimestamp = time.milliseconds())
-  txnManager.putTransactionStateIfNotExists(createdMetadata)
+  val result = producerIdManager.generateProducerId()

Review Comment:
   nit: maybe we don't need `result`. Perhaps a little more concise to match on 
`producerIdManager.generateProducerId()`? Same comment below.



##
server-common/src/main/java/org/apache/kafka/server/common/ProducerIdsBlock.java:
##
@@ -32,11 +34,25 @@ public class ProducerIdsBlock {
 private final int assignedBrokerId;
 private final long firstProducerId;
 private final int blockSize;
+private final AtomicLong producerIdCounter;
 
 public ProducerIdsBlock(int assignedBrokerId, long firstProducerId, int 
blockSize) {
 this.assignedBrokerId = assignedBrokerId;
 this.firstProducerId = firstProducerId;
 this.blockSize = blockSize;
+producerIdCounter = new AtomicLong(firstProducerId);
+}
+
+/**
+ * Claim the next available producer id from the block.
+ * Returns an empty result if there are no more available producer ids in 
the block.
+ */
+public Optional claimNextId() {
+long nextId = producerIdCounter.getAndIncrement();
+if (nextId > lastProducerId()) {

Review Comment:
   This first check is duplicated below.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to Gi

[GitHub] [kafka] jeffkbkim commented on a diff in pull request #13639: KAFKA-14462; [12/N] Add GroupMetadataManager and ConsumerGroup

2023-05-25 Thread via GitHub


jeffkbkim commented on code in PR #13639:
URL: https://github.com/apache/kafka/pull/13639#discussion_r1206060783


##
group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupMetadataManager.java:
##
@@ -0,0 +1,876 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.coordinator.group;
+
+import org.apache.kafka.common.Uuid;
+import org.apache.kafka.common.errors.ApiException;
+import org.apache.kafka.common.errors.FencedMemberEpochException;
+import org.apache.kafka.common.errors.GroupIdNotFoundException;
+import org.apache.kafka.common.errors.GroupMaxSizeReachedException;
+import org.apache.kafka.common.errors.InvalidRequestException;
+import org.apache.kafka.common.errors.NotCoordinatorException;
+import org.apache.kafka.common.errors.UnknownServerException;
+import org.apache.kafka.common.errors.UnsupportedAssignorException;
+import org.apache.kafka.common.message.ConsumerGroupHeartbeatRequestData;
+import org.apache.kafka.common.message.ConsumerGroupHeartbeatResponseData;
+import org.apache.kafka.common.requests.RequestContext;
+import org.apache.kafka.common.utils.LogContext;
+import org.apache.kafka.coordinator.group.assignor.PartitionAssignor;
+import org.apache.kafka.coordinator.group.assignor.PartitionAssignorException;
+import org.apache.kafka.coordinator.group.consumer.Assignment;
+import org.apache.kafka.coordinator.group.consumer.ConsumerGroup;
+import org.apache.kafka.coordinator.group.consumer.ConsumerGroupMember;
+import org.apache.kafka.coordinator.group.consumer.CurrentAssignmentBuilder;
+import org.apache.kafka.coordinator.group.consumer.TargetAssignmentBuilder;
+import org.apache.kafka.coordinator.group.consumer.TopicMetadata;
+import 
org.apache.kafka.coordinator.group.generated.ConsumerGroupCurrentMemberAssignmentKey;
+import 
org.apache.kafka.coordinator.group.generated.ConsumerGroupCurrentMemberAssignmentValue;
+import 
org.apache.kafka.coordinator.group.generated.ConsumerGroupMemberMetadataKey;
+import 
org.apache.kafka.coordinator.group.generated.ConsumerGroupMemberMetadataValue;
+import org.apache.kafka.coordinator.group.generated.ConsumerGroupMetadataKey;
+import org.apache.kafka.coordinator.group.generated.ConsumerGroupMetadataValue;
+import 
org.apache.kafka.coordinator.group.generated.ConsumerGroupPartitionMetadataKey;
+import 
org.apache.kafka.coordinator.group.generated.ConsumerGroupPartitionMetadataValue;
+import 
org.apache.kafka.coordinator.group.generated.ConsumerGroupTargetAssignmentMemberKey;
+import 
org.apache.kafka.coordinator.group.generated.ConsumerGroupTargetAssignmentMemberValue;
+import 
org.apache.kafka.coordinator.group.generated.ConsumerGroupTargetAssignmentMetadataKey;
+import 
org.apache.kafka.coordinator.group.generated.ConsumerGroupTargetAssignmentMetadataValue;
+import org.apache.kafka.image.TopicsImage;
+import org.apache.kafka.timeline.SnapshotRegistry;
+import org.apache.kafka.timeline.TimelineHashMap;
+import org.slf4j.Logger;
+
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.Optional;
+import java.util.OptionalInt;
+import java.util.Set;
+import java.util.function.Function;
+import java.util.stream.Collectors;
+
+import static 
org.apache.kafka.coordinator.group.RecordHelpers.newCurrentAssignmentRecord;
+import static 
org.apache.kafka.coordinator.group.RecordHelpers.newCurrentAssignmentTombstoneRecord;
+import static 
org.apache.kafka.coordinator.group.RecordHelpers.newGroupEpochRecord;
+import static 
org.apache.kafka.coordinator.group.RecordHelpers.newGroupSubscriptionMetadataRecord;
+import static 
org.apache.kafka.coordinator.group.RecordHelpers.newMemberSubscriptionRecord;
+import static 
org.apache.kafka.coordinator.group.RecordHelpers.newMemberSubscriptionTombstoneRecord;
+import static 
org.apache.kafka.coordinator.group.RecordHelpers.newTargetAssignmentTombstoneRecord;
+
+/**
+ * The GroupMetadataManager manages the metadata of all generic and consumer 
groups. It holds
+ * the hard and the soft state of the groups. This class has two kinds of 
methods:
+ * 1) The request handlers which handle the requests and generate a respo

[GitHub] [kafka] jolshan commented on a diff in pull request #13639: KAFKA-14462; [12/N] Add GroupMetadataManager and ConsumerGroup

2023-05-25 Thread via GitHub


jolshan commented on code in PR #13639:
URL: https://github.com/apache/kafka/pull/13639#discussion_r1206064411


##
group-coordinator/src/test/java/org/apache/kafka/coordinator/group/consumer/ConsumerGroupTest.java:
##
@@ -0,0 +1,500 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.coordinator.group.consumer;
+
+import org.apache.kafka.common.Uuid;
+import org.apache.kafka.common.errors.UnknownMemberIdException;
+import org.apache.kafka.common.utils.LogContext;
+import org.apache.kafka.coordinator.group.GroupMetadataManagerTest;
+import org.apache.kafka.image.TopicsImage;
+import org.apache.kafka.timeline.SnapshotRegistry;
+import org.junit.jupiter.api.Test;
+
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.Optional;
+
+import static org.apache.kafka.common.utils.Utils.mkEntry;
+import static org.apache.kafka.common.utils.Utils.mkMap;
+import static 
org.apache.kafka.coordinator.group.AssignmentTestUtil.mkAssignment;
+import static 
org.apache.kafka.coordinator.group.AssignmentTestUtil.mkTopicAssignment;
+import static org.junit.jupiter.api.Assertions.assertEquals;
+import static org.junit.jupiter.api.Assertions.assertFalse;
+import static org.junit.jupiter.api.Assertions.assertThrows;
+import static org.junit.jupiter.api.Assertions.assertTrue;
+
+public class ConsumerGroupTest {
+
+private ConsumerGroup createConsumerGroup(String groupId) {
+SnapshotRegistry snapshotRegistry = new SnapshotRegistry(new 
LogContext());
+return new ConsumerGroup(snapshotRegistry, groupId);
+}
+
+@Test
+public void testGetOrCreateMember() {
+ConsumerGroup consumerGroup = createConsumerGroup("foo");
+ConsumerGroupMember member;
+
+// Create a group.
+member = consumerGroup.getOrMaybeCreateMember("member-id", true);
+assertEquals("member-id", member.memberId());
+
+// Get that group back.
+member = consumerGroup.getOrMaybeCreateMember("member-id", false);
+assertEquals("member-id", member.memberId());
+
+assertThrows(UnknownMemberIdException.class, () ->
+consumerGroup.getOrMaybeCreateMember("does-not-exist", false));
+}
+
+@Test
+public void testUpdateMember() {
+ConsumerGroup consumerGroup = createConsumerGroup("foo");
+ConsumerGroupMember member;
+
+member = consumerGroup.getOrMaybeCreateMember("member", true);
+
+member = new ConsumerGroupMember.Builder(member)
+.setSubscribedTopicNames(Arrays.asList("foo", "bar"))
+.build();
+
+consumerGroup.updateMember(member);
+
+assertEquals(member, consumerGroup.getOrMaybeCreateMember("member", 
false));
+}
+
+@Test
+public void testRemoveMember() {
+ConsumerGroup consumerGroup = createConsumerGroup("foo");
+
+consumerGroup.getOrMaybeCreateMember("member", true);
+assertTrue(consumerGroup.hasMember("member"));
+
+consumerGroup.removeMember("member");
+assertFalse(consumerGroup.hasMember("member"));
+
+}
+
+@Test
+public void testUpdatingMemberUpdatesPartitionEpoch() {
+Uuid fooTopicId = Uuid.randomUuid();
+Uuid barTopicId = Uuid.randomUuid();
+Uuid zarTopicId = Uuid.randomUuid();
+
+ConsumerGroup consumerGroup = createConsumerGroup("foo");
+ConsumerGroupMember member;
+
+member = new ConsumerGroupMember.Builder("member")
+.setMemberEpoch(10)
+.setAssignedPartitions(mkAssignment(
+mkTopicAssignment(fooTopicId, 1, 2, 3)))
+.setPartitionsPendingRevocation(mkAssignment(
+mkTopicAssignment(barTopicId, 4, 5, 6)))
+.setPartitionsPendingAssignment(mkAssignment(
+mkTopicAssignment(zarTopicId, 7, 8, 9)))
+.build();
+
+consumerGroup.updateMember(member);
+
+assertEquals(10, consumerGroup.currentPartitionEpoch(fooTopicId, 1));
+assertEquals(10, consumerGroup.currentPartitionEpoch(fooTopicId, 2));
+assertEquals(10, consumerGroup.currentPartitionEpoch(fooTopicId, 3));
+assertEquals(10, consumerGroup.currentPartitionEpoch(ba

[GitHub] [kafka] vcrfxia commented on pull request #13756: KAFKA-14936: Add On Disk Time Ordered Buffer (1/N)

2023-05-25 Thread via GitHub


vcrfxia commented on PR #13756:
URL: https://github.com/apache/kafka/pull/13756#issuecomment-1563584820

   Yep, this week's looking a bit dicey but I'll take a look early next week! 


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] jolshan commented on a diff in pull request #13639: KAFKA-14462; [12/N] Add GroupMetadataManager and ConsumerGroup

2023-05-25 Thread via GitHub


jolshan commented on code in PR #13639:
URL: https://github.com/apache/kafka/pull/13639#discussion_r1206053946


##
group-coordinator/src/test/java/org/apache/kafka/coordinator/group/consumer/ConsumerGroupTest.java:
##
@@ -0,0 +1,500 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.coordinator.group.consumer;
+
+import org.apache.kafka.common.Uuid;
+import org.apache.kafka.common.errors.UnknownMemberIdException;
+import org.apache.kafka.common.utils.LogContext;
+import org.apache.kafka.coordinator.group.GroupMetadataManagerTest;
+import org.apache.kafka.image.TopicsImage;
+import org.apache.kafka.timeline.SnapshotRegistry;
+import org.junit.jupiter.api.Test;
+
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.Optional;
+
+import static org.apache.kafka.common.utils.Utils.mkEntry;
+import static org.apache.kafka.common.utils.Utils.mkMap;
+import static 
org.apache.kafka.coordinator.group.AssignmentTestUtil.mkAssignment;
+import static 
org.apache.kafka.coordinator.group.AssignmentTestUtil.mkTopicAssignment;
+import static org.junit.jupiter.api.Assertions.assertEquals;
+import static org.junit.jupiter.api.Assertions.assertFalse;
+import static org.junit.jupiter.api.Assertions.assertThrows;
+import static org.junit.jupiter.api.Assertions.assertTrue;
+
+public class ConsumerGroupTest {
+
+private ConsumerGroup createConsumerGroup(String groupId) {
+SnapshotRegistry snapshotRegistry = new SnapshotRegistry(new 
LogContext());
+return new ConsumerGroup(snapshotRegistry, groupId);
+}
+
+@Test
+public void testGetOrCreateMember() {
+ConsumerGroup consumerGroup = createConsumerGroup("foo");
+ConsumerGroupMember member;
+
+// Create a group.
+member = consumerGroup.getOrMaybeCreateMember("member-id", true);
+assertEquals("member-id", member.memberId());
+
+// Get that group back.
+member = consumerGroup.getOrMaybeCreateMember("member-id", false);
+assertEquals("member-id", member.memberId());
+
+assertThrows(UnknownMemberIdException.class, () ->
+consumerGroup.getOrMaybeCreateMember("does-not-exist", false));
+}
+
+@Test
+public void testUpdateMember() {
+ConsumerGroup consumerGroup = createConsumerGroup("foo");
+ConsumerGroupMember member;
+
+member = consumerGroup.getOrMaybeCreateMember("member", true);
+
+member = new ConsumerGroupMember.Builder(member)
+.setSubscribedTopicNames(Arrays.asList("foo", "bar"))
+.build();
+
+consumerGroup.updateMember(member);
+
+assertEquals(member, consumerGroup.getOrMaybeCreateMember("member", 
false));
+}
+
+@Test
+public void testRemoveMember() {
+ConsumerGroup consumerGroup = createConsumerGroup("foo");
+
+consumerGroup.getOrMaybeCreateMember("member", true);
+assertTrue(consumerGroup.hasMember("member"));
+
+consumerGroup.removeMember("member");
+assertFalse(consumerGroup.hasMember("member"));
+
+}
+
+@Test
+public void testUpdatingMemberUpdatesPartitionEpoch() {
+Uuid fooTopicId = Uuid.randomUuid();
+Uuid barTopicId = Uuid.randomUuid();
+Uuid zarTopicId = Uuid.randomUuid();
+
+ConsumerGroup consumerGroup = createConsumerGroup("foo");
+ConsumerGroupMember member;
+
+member = new ConsumerGroupMember.Builder("member")
+.setMemberEpoch(10)
+.setAssignedPartitions(mkAssignment(
+mkTopicAssignment(fooTopicId, 1, 2, 3)))
+.setPartitionsPendingRevocation(mkAssignment(
+mkTopicAssignment(barTopicId, 4, 5, 6)))
+.setPartitionsPendingAssignment(mkAssignment(
+mkTopicAssignment(zarTopicId, 7, 8, 9)))
+.build();
+
+consumerGroup.updateMember(member);
+
+assertEquals(10, consumerGroup.currentPartitionEpoch(fooTopicId, 1));
+assertEquals(10, consumerGroup.currentPartitionEpoch(fooTopicId, 2));
+assertEquals(10, consumerGroup.currentPartitionEpoch(fooTopicId, 3));
+assertEquals(10, consumerGroup.currentPartitionEpoch(ba

[GitHub] [kafka] jolshan commented on a diff in pull request #13639: KAFKA-14462; [12/N] Add GroupMetadataManager and ConsumerGroup

2023-05-25 Thread via GitHub


jolshan commented on code in PR #13639:
URL: https://github.com/apache/kafka/pull/13639#discussion_r1206050414


##
group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupMetadataManager.java:
##
@@ -0,0 +1,859 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.coordinator.group;
+
+import org.apache.kafka.common.Uuid;
+import org.apache.kafka.common.errors.ApiException;
+import org.apache.kafka.common.errors.FencedMemberEpochException;
+import org.apache.kafka.common.errors.GroupIdNotFoundException;
+import org.apache.kafka.common.errors.GroupMaxSizeReachedException;
+import org.apache.kafka.common.errors.InvalidRequestException;
+import org.apache.kafka.common.errors.NotCoordinatorException;
+import org.apache.kafka.common.errors.UnknownServerException;
+import org.apache.kafka.common.errors.UnsupportedAssignorException;
+import org.apache.kafka.common.message.ConsumerGroupHeartbeatRequestData;
+import org.apache.kafka.common.message.ConsumerGroupHeartbeatResponseData;
+import org.apache.kafka.common.requests.RequestContext;
+import org.apache.kafka.common.utils.LogContext;
+import org.apache.kafka.coordinator.group.assignor.PartitionAssignor;
+import org.apache.kafka.coordinator.group.assignor.PartitionAssignorException;
+import org.apache.kafka.coordinator.group.consumer.Assignment;
+import org.apache.kafka.coordinator.group.consumer.ConsumerGroup;
+import org.apache.kafka.coordinator.group.consumer.ConsumerGroupMember;
+import org.apache.kafka.coordinator.group.consumer.CurrentAssignmentBuilder;
+import org.apache.kafka.coordinator.group.consumer.TargetAssignmentBuilder;
+import org.apache.kafka.coordinator.group.consumer.TopicMetadata;
+import 
org.apache.kafka.coordinator.group.generated.ConsumerGroupCurrentMemberAssignmentKey;
+import 
org.apache.kafka.coordinator.group.generated.ConsumerGroupCurrentMemberAssignmentValue;
+import 
org.apache.kafka.coordinator.group.generated.ConsumerGroupMemberMetadataKey;
+import 
org.apache.kafka.coordinator.group.generated.ConsumerGroupMemberMetadataValue;
+import org.apache.kafka.coordinator.group.generated.ConsumerGroupMetadataKey;
+import org.apache.kafka.coordinator.group.generated.ConsumerGroupMetadataValue;
+import 
org.apache.kafka.coordinator.group.generated.ConsumerGroupPartitionMetadataKey;
+import 
org.apache.kafka.coordinator.group.generated.ConsumerGroupPartitionMetadataValue;
+import 
org.apache.kafka.coordinator.group.generated.ConsumerGroupTargetAssignmentMemberKey;
+import 
org.apache.kafka.coordinator.group.generated.ConsumerGroupTargetAssignmentMemberValue;
+import 
org.apache.kafka.coordinator.group.generated.ConsumerGroupTargetAssignmentMetadataKey;
+import 
org.apache.kafka.coordinator.group.generated.ConsumerGroupTargetAssignmentMetadataValue;
+import org.apache.kafka.image.TopicsImage;
+import org.apache.kafka.timeline.SnapshotRegistry;
+import org.apache.kafka.timeline.TimelineHashMap;
+import org.slf4j.Logger;
+
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.Optional;
+import java.util.OptionalInt;
+import java.util.Set;
+import java.util.function.Function;
+import java.util.stream.Collectors;
+
+import static 
org.apache.kafka.coordinator.group.RecordHelpers.newCurrentAssignmentRecord;
+import static 
org.apache.kafka.coordinator.group.RecordHelpers.newCurrentAssignmentTombstoneRecord;
+import static 
org.apache.kafka.coordinator.group.RecordHelpers.newGroupEpochRecord;
+import static 
org.apache.kafka.coordinator.group.RecordHelpers.newGroupSubscriptionMetadataRecord;
+import static 
org.apache.kafka.coordinator.group.RecordHelpers.newMemberSubscriptionRecord;
+import static 
org.apache.kafka.coordinator.group.RecordHelpers.newMemberSubscriptionTombstoneRecord;
+import static 
org.apache.kafka.coordinator.group.RecordHelpers.newTargetAssignmentTombstoneRecord;
+
+/**
+ * The GroupMetadataManager manages the metadata of all generic and consumer 
groups. It holds
+ * the hard and the soft state of the groups. This class has two kinds of 
methods:
+ * 1) The request handlers which handle the requests and generate a respons

[GitHub] [kafka] jolshan commented on a diff in pull request #13639: KAFKA-14462; [12/N] Add GroupMetadataManager and ConsumerGroup

2023-05-25 Thread via GitHub


jolshan commented on code in PR #13639:
URL: https://github.com/apache/kafka/pull/13639#discussion_r1206048854


##
group-coordinator/src/main/java/org/apache/kafka/coordinator/group/consumer/ConsumerGroup.java:
##
@@ -0,0 +1,535 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.coordinator.group.consumer;
+
+import org.apache.kafka.common.Uuid;
+import org.apache.kafka.common.errors.UnknownMemberIdException;
+import org.apache.kafka.coordinator.group.Group;
+import org.apache.kafka.image.TopicImage;
+import org.apache.kafka.image.TopicsImage;
+import org.apache.kafka.timeline.SnapshotRegistry;
+import org.apache.kafka.timeline.TimelineHashMap;
+import org.apache.kafka.timeline.TimelineInteger;
+import org.apache.kafka.timeline.TimelineObject;
+
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.Objects;
+import java.util.Optional;
+import java.util.Set;
+import java.util.function.Consumer;
+
+/**
+ * A Consumer Group. All the metadata in this class are backed by
+ * records in the __consumer_offsets partitions.
+ */
+public class ConsumerGroup implements Group {
+
+public enum ConsumerGroupState {
+EMPTY("empty"),
+ASSIGNING("assigning"),
+RECONCILING("reconciling"),
+STABLE("stable"),
+DEAD("dead");
+
+private final String name;
+
+ConsumerGroupState(String name) {
+this.name = name;
+}
+
+@Override
+public String toString() {
+return name;
+}
+}
+
+/**
+ * The snapshot registry.
+ */
+private final SnapshotRegistry snapshotRegistry;
+
+/**
+ * The group id.
+ */
+private final String groupId;
+
+/**
+ * The group state.
+ */
+private final TimelineObject state;
+
+/**
+ * The group epoch. The epoch is incremented whenever the subscriptions
+ * are updated and it will trigger the computation of a new assignment
+ * for the group.
+ */
+private final TimelineInteger groupEpoch;
+
+/**
+ * The group members.
+ */
+private final TimelineHashMap members;
+
+/**
+ * The metadata of the subscribed topics.
+ */
+private final TimelineHashMap 
subscribedTopicMetadata;
+
+/**
+ * The assignment epoch. An assignment epoch smaller than the group epoch 
means
+ * that a new assignment is required. The assignment epoch is updated when 
a new
+ * assignment is installed.
+ */
+private final TimelineInteger assignmentEpoch;
+
+/**
+ * The target assignment.
+ */
+private final TimelineHashMap assignments;
+
+/**
+ * The current partition epoch maps each topic-partitions to their current 
epoch where
+ * the epoch is the epoch of their owners. When a member revokes a 
partition, it removes
+ * itself from this map. When a member gets a partition, it adds itself to 
this map.
+ */
+private final TimelineHashMap> 
currentPartitionEpoch;
+
+public ConsumerGroup(
+SnapshotRegistry snapshotRegistry,
+String groupId
+) {
+this.snapshotRegistry = Objects.requireNonNull(snapshotRegistry);
+this.groupId = Objects.requireNonNull(groupId);
+this.state = new TimelineObject<>(snapshotRegistry, 
ConsumerGroupState.EMPTY);
+this.groupEpoch = new TimelineInteger(snapshotRegistry);
+this.members = new TimelineHashMap<>(snapshotRegistry, 0);
+this.subscribedTopicMetadata = new TimelineHashMap<>(snapshotRegistry, 
0);
+this.assignmentEpoch = new TimelineInteger(snapshotRegistry);
+this.assignments = new TimelineHashMap<>(snapshotRegistry, 0);
+this.currentPartitionEpoch = new TimelineHashMap<>(snapshotRegistry, 
0);
+}
+
+/**
+ * The type of this group.
+ *
+ * @return The group type (Consumer).
+ */
+@Override
+public GroupType type() {
+return GroupType.CONSUMER;
+}
+
+/**
+ * The state of this group.
+ *
+ * @return The current state as a String.
+ */
+@Override
+public String stateAsString() {
+return state.get()

[GitHub] [kafka] rondagostino commented on a diff in pull request #13759: KAFKA-15019: Improve handling of overload situations in the kcontroller

2023-05-25 Thread via GitHub


rondagostino commented on code in PR #13759:
URL: https://github.com/apache/kafka/pull/13759#discussion_r1206028552


##
metadata/src/main/java/org/apache/kafka/controller/BrokerHeartbeatManager.java:
##
@@ -223,6 +283,21 @@ public BrokerHeartbeatState next() {
 }
 }
 
+/**
+ * The maximum number of timed out heartbeats to count.
+ */
+static final int DEFAULT_TIMED_OUT_HEARTBEAT_COUNT_MAX = 1000;
+
+/**
+ * The time period over which to track timed out heartbeats.
+ */
+static final long DEFAULT_TIMED_OUT_HEARTBEAT_COUNT_WINDOW_NS = 
TimeUnit.MINUTES.toNanos(5);
+
+/**
+ * The number of heartbeats to notice missing before we go into overload.
+ */
+static final int DEFAULT_TIMED_OUT_HEARTBEAT_OVERLOAD_THRESHOLD = 3;
+

Review Comment:
   Whatever we set this to (and currently we always take this default value) 
will determine how long it would take for a controller to leave the overloaded 
state, right?  So if we see a series of missed heartbeats that puts us into the 
overloaded state, and then we don't see any more missed heartbeats, it would 
take this amount of time to leave that overloaded state?
   
   Consider the following case.  We are overloaded.  And then some broker 
crashes.  How long would we want it to take before we move leadership away for 
any partitions led by that suddenly-dead broker?  Ideally the session timeout, 
of course, but that won't happen if we are overloaded -- will it take 5 minutes 
as currently coded? That seems like a long time.
   
   I think the only thing we can say is that we should move leadership away as 
soon as we have a correct view of the cluster.  The question then becomes: how 
do we know when we have good information?
   
   A fixed amount of time like this with no missed heartbeats -- that seems too 
simple.  We likely have a reasonable view of the cluster if we see enough 
heartbeats without missing any such that it is likely that all brokers have had 
a chance to communicate with us.  So it seems like maybe the broker heartbeat 
interval should factor into it somehow?
   
   Let's say the broker heartbeat interval is 2 seconds.  Right now we consider 
a heartbeat missed if it is more than half the heartbeat interval (i.e. 1 
second) old.  Let's say there are 60 brokers in the cluster.  If we see 
something on the order of 60 heartbeats in row without having missing any, then 
we probably have a reasonable view.
   
   So maybe it isn't having missed no more than N heartbeats in some fixed time 
window.  Maybe we define "not overloaded" as having seen some string of 
heartbeats uninterrupted without seeing a missed one?  So every time we miss a 
heartbeat we go into the "overloaded" state and we reset a counter of 
contiguous successfully processed heartbeats to 0.  Whenever we see a heartbeat 
and process it in time we increase that counter.  We are no longer overloaded 
when the `contiguousHeartbeatsProcessedInTime` counter is equal to or exceeds 
the broker count.
   
   I'm not sure I have it right, but I think this is a discussion worth 
pursuing.
   
   



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] rondagostino commented on a diff in pull request #13759: KAFKA-15019: Improve handling of overload situations in the kcontroller

2023-05-25 Thread via GitHub


rondagostino commented on code in PR #13759:
URL: https://github.com/apache/kafka/pull/13759#discussion_r1206028552


##
metadata/src/main/java/org/apache/kafka/controller/BrokerHeartbeatManager.java:
##
@@ -223,6 +283,21 @@ public BrokerHeartbeatState next() {
 }
 }
 
+/**
+ * The maximum number of timed out heartbeats to count.
+ */
+static final int DEFAULT_TIMED_OUT_HEARTBEAT_COUNT_MAX = 1000;
+
+/**
+ * The time period over which to track timed out heartbeats.
+ */
+static final long DEFAULT_TIMED_OUT_HEARTBEAT_COUNT_WINDOW_NS = 
TimeUnit.MINUTES.toNanos(5);
+
+/**
+ * The number of heartbeats to notice missing before we go into overload.
+ */
+static final int DEFAULT_TIMED_OUT_HEARTBEAT_OVERLOAD_THRESHOLD = 3;
+

Review Comment:
   Whatever we set this to (and currently we always take this default value) 
will determine how long it would take for a controller to leave the overloaded 
state, right?  So if we see a series of missed heartbeats that puts us into the 
overloaded state, and then we don't see any more missed heartbeats, it would 
take this amount of time to leave that overloaded state?
   
   Consider the following case.  We are overloaded.  And then some broker 
crashes.  How long would we want it to take before we move leadership away for 
any partitions led by that suddenly-dead broker?  Ideally the session timeout, 
of course, but that won't happen if we are overloaded -- will it take 5 minutes 
as currently coded? That seems like a long time.
   
   I think the only thing we can say is that we should move leadership away as 
soon as we have a correct view of the cluster.  The question then becomes: how 
do we know when we have good information?
   
   A fixed amount of time like this with no missed heartbeats -- that seems too 
simple.  We likely have a reasonable view of the cluster if we see enough 
heartbeats without missing any such that it is likely that all brokers have had 
a chance to communicate with us.  So it seems like maybe the broker heartbeat 
interval should factor into it somehow?
   
   Let's say the broker heartbeat interval is 2 seconds.  Right now we consider 
a heartbeat missed if it is more than half the heartbeat interval (i.e. 1 
second) old.  Let's say there are 60 brokers in the cluster.  If we see 
something on the order of 60 heartbeats in row without having missing any, then 
we probably have a reasonable view.
   
   So maybe it isn't having missed no more than N heartbeats in some fixed time 
window.  Maybe we define "not overloaded" as having seen some string of 
heartbeats uninterrupted without seeing a missed one?  So every time we miss a 
heartbeat we go into the "overloaded" state and we reset a counter of 
contiguous successfully processed heartbeats to 0.  Whenever we see a heartbeat 
and process it in time we increase that counter.  We are no longer overloaded 
when the `contiguousHeartbeatsProcessedInTime` counter is equal to or exceeds 
the broker count.
   
   I'm not sure I ave it right, but I think this is a discussion worth pursuing.
   
   



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] rondagostino commented on a diff in pull request #13759: KAFKA-15019: Improve handling of overload situations in the kcontroller

2023-05-25 Thread via GitHub


rondagostino commented on code in PR #13759:
URL: https://github.com/apache/kafka/pull/13759#discussion_r1205924251


##
metadata/src/main/java/org/apache/kafka/controller/BrokerHeartbeatManager.java:
##
@@ -613,4 +733,32 @@ BrokerControlStates calculateNextBrokerState(int brokerId,
 return new BrokerControlStates(currentState, SHUTDOWN_NOW);
 }
 }
+
+int timedOutHeartbeats(long nowNs) {

Review Comment:
   `private`?  It seems all methods are currently package-private.  Should any 
of them be `private` instead?



##
metadata/src/main/java/org/apache/kafka/controller/BrokerHeartbeatManager.java:
##
@@ -55,6 +59,62 @@
  * active controller does not know when the last heartbeats were received from 
each.
  */
 public class BrokerHeartbeatManager {
+static class Builder {
+private LogContext logContext = null;
+private Time time = Time.SYSTEM;
+private long sessionTimeoutNs = 8000;

Review Comment:
   Should probably default to 
`ClusterControlManager.DEFAULT_SESSION_TIMEOUT_NS` (which is 18_000_000_000, it 
appears you put in a milliseconds value, and the incorrect one at that).



##
metadata/src/main/java/org/apache/kafka/controller/BrokerHeartbeatManager.java:
##
@@ -613,4 +733,32 @@ BrokerControlStates calculateNextBrokerState(int brokerId,
 return new BrokerControlStates(currentState, SHUTDOWN_NOW);
 }
 }
+
+int timedOutHeartbeats(long nowNs) {
+return timedOutHeartbeatCounter.count(nowNs);
+}
+
+boolean overloaded(long nowNs) {
+return timedOutHeartbeats(nowNs) > timedOutHeartbeatOverloadThreshold;
+}
+
+void handleBrokerHeartbeatTimeout() {
+long nowNs = time.nanoseconds();
+boolean alreadyOverloaded = overloaded(nowNs);
+timedOutHeartbeatCounter.addEvent(nowNs);
+if ((!alreadyOverloaded) && overloaded(nowNs)) {
+
nonFatalFaultHandler.handleFault(String.format("handleBrokerHeartbeatTimeout: 
the " +
+"active controller missed %d heartbeats in the last %d ms 
and is now " +
+"overloaded.", timedOutHeartbeatOverloadThreshold,
+NANOSECONDS.toMillis(timedOutHeartbeatCounter.window(;
+}
+}
+
+boolean maybeLogOverloadExit(long nowNs) {
+if (!needToLogOverloadExit) return false;
+if (overloaded(time.nanoseconds())) return false;
+log.error("The active controller is no longer overloaded.");

Review Comment:
   Should this be at the `ERROR` level?



##
metadata/src/main/java/org/apache/kafka/controller/BrokerHeartbeatManager.java:
##
@@ -613,4 +733,32 @@ BrokerControlStates calculateNextBrokerState(int brokerId,
 return new BrokerControlStates(currentState, SHUTDOWN_NOW);
 }
 }
+
+int timedOutHeartbeats(long nowNs) {
+return timedOutHeartbeatCounter.count(nowNs);
+}
+
+boolean overloaded(long nowNs) {
+return timedOutHeartbeats(nowNs) > timedOutHeartbeatOverloadThreshold;
+}
+
+void handleBrokerHeartbeatTimeout() {
+long nowNs = time.nanoseconds();
+boolean alreadyOverloaded = overloaded(nowNs);
+timedOutHeartbeatCounter.addEvent(nowNs);
+if ((!alreadyOverloaded) && overloaded(nowNs)) {
+
nonFatalFaultHandler.handleFault(String.format("handleBrokerHeartbeatTimeout: 
the " +

Review Comment:
   Looks like we need to set `needToLogOverloadExit = true` here.



##
metadata/src/main/java/org/apache/kafka/controller/BrokerHeartbeatManager.java:
##
@@ -223,6 +283,21 @@ public BrokerHeartbeatState next() {
 }
 }
 
+/**
+ * The maximum number of timed out heartbeats to count.
+ */
+static final int DEFAULT_TIMED_OUT_HEARTBEAT_COUNT_MAX = 1000;
+
+/**
+ * The time period over which to track timed out heartbeats.
+ */
+static final long DEFAULT_TIMED_OUT_HEARTBEAT_COUNT_WINDOW_NS = 
TimeUnit.MINUTES.toNanos(5);
+
+/**
+ * The number of heartbeats to notice missing before we go into overload.
+ */
+static final int DEFAULT_TIMED_OUT_HEARTBEAT_OVERLOAD_THRESHOLD = 3;
+

Review Comment:
   Whatever we set this to (and currently we always take this default value) 
will determine how long it would take for a controller to leave the overloaded 
state, right?  So if we see a series of missed heartbeats that puts us into the 
overloaded state, and then we don't see any more missed heartbeats, it would 
take this amount of time to leave that overloaded state?
   
   Consider the following case.  We are overloaded.  And then some broker 
crashes.  How long would we want it to take before we move leadership away for 
any partitions led by that suddenly-dead broker?  Ideally the session timeout, 
of course, but that won't happen if we are overloaded -- will it take 5 minutes 
as currently coded? That seems like a long

[GitHub] [kafka] jolshan commented on a diff in pull request #13608: KAFKA-14884: Include check transaction is still ongoing right before append

2023-05-25 Thread via GitHub


jolshan commented on code in PR #13608:
URL: https://github.com/apache/kafka/pull/13608#discussion_r1205982602


##
storage/src/main/java/org/apache/kafka/storage/internals/log/ProducerStateEntry.java:
##
@@ -41,18 +42,35 @@ public class ProducerStateEntry {
 private int coordinatorEpoch;
 private long lastTimestamp;
 private OptionalLong currentTxnFirstOffset;
+
+private VerificationState verificationState;
+
+// Before any batches are associated with the entry, the tentative 
sequence represents the lowest sequence seen.
+private OptionalInt tentativeSequence;
+
+public enum VerificationState {
+EMPTY,
+VERIFYING,
+VERIFIED
+}

Review Comment:
   I've addressed this issue by creating a verification object that is created 
on first attempt to verify and removed when a marker is written. 
   
   When verification is needed, we pass this object through and check under log 
lock in the append path.
   In the steps above, 6 will clear this object, 7 will set a new one, so 8 
will not succeed and error out. 
   
   When verification is not needed (already verified), we rely on 
firstTxnOffset being present before appending to the log.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] jolshan commented on a diff in pull request #13608: KAFKA-14884: Include check transaction is still ongoing right before append

2023-05-25 Thread via GitHub


jolshan commented on code in PR #13608:
URL: https://github.com/apache/kafka/pull/13608#discussion_r1205980757


##
storage/src/main/java/org/apache/kafka/storage/internals/log/ProducerStateEntry.java:
##
@@ -104,27 +122,63 @@ public boolean maybeUpdateProducerEpoch(short 
producerEpoch) {
 }
 }
 
+public boolean maybeUpdateProducerHigherEpoch(short producerEpoch) {
+if (this.producerEpoch < producerEpoch) {
+batchMetadata.clear();
+this.producerEpoch = producerEpoch;
+return true;
+} else {
+return false;
+}
+}
+
+// We only set tentative sequence if no batches have been written to the 
log. It is used to avoid OutOfOrderSequenceExceptions
+// when we saw a lower sequence during transaction verification. We will 
update the sequence when there is no batch metadata if:
+//  a) There is no tentative sequence yet
+//  b) A lower sequence for the same epoch is seen and should thereby 
block records after that
+//  c) A higher producer epoch is found that will reset the lowest seen 
sequence
+public void maybeUpdateTentativeSequence(int sequence, short 
producerEpoch) {
+if (batchMetadata.isEmpty() && 

Review Comment:
   I dug in further and interestingly we do a different check on bumped epochs 
after markers. We do have a second check here:
   ```
   if (!(currentEntry.producerEpoch() == 
RecordBatch.NO_PRODUCER_EPOCH || inSequence(currentLastSeq, appendFirstSeq) || 
currentEntry.tentativeSequence().isPresent())) {
   throw new OutOfOrderSequenceException("Out of order sequence 
number for producer " + producerId + " at " +
   "offset " + offset + " in partition " + 
topicPartition + ": " + appendFirstSeq +
   " (incoming seq. number), " + currentLastSeq + " 
(current end sequence number)");
   }
   ```
   
   I've added the tentative sequence change because this PR actually changes 
this behavior on first verification. We put the producer epoch in the entry, so 
that first case no longer applies.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] jolshan commented on pull request #13764: KAFKA-14691; [1/N] Add new fields to OffsetFetchRequest and OffsetFetchResponse

2023-05-25 Thread via GitHub


jolshan commented on PR #13764:
URL: https://github.com/apache/kafka/pull/13764#issuecomment-1563462503

   Since this is still under development, can we set the flag to indicate the 
latest version is unstable?


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] bbejeck commented on pull request #13751: KAFKA-14539: Simplify StreamsMetadataState by replacing the Cluster metadata with partition info map

2023-05-25 Thread via GitHub


bbejeck commented on PR #13751:
URL: https://github.com/apache/kafka/pull/13751#issuecomment-1563454591

   Test failures are unrelated


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] machi1990 commented on pull request #13611: MINOR: remove unused variable from QuorumMetaLogListener#handleCommit method

2023-05-25 Thread via GitHub


machi1990 commented on PR #13611:
URL: https://github.com/apache/kafka/pull/13611#issuecomment-1563412353

   Thanks @jsancio I've added a description to the PR. 


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] rondagostino commented on pull request #13724: MINOR: more KRaft Metadata Image tests

2023-05-25 Thread via GitHub


rondagostino commented on PR #13724:
URL: https://github.com/apache/kafka/pull/13724#issuecomment-1563388747

   Lots of test failures, but all of the tests touched here succeeded, plus 
there are no changes to non-test code.  So tests failures are unrelated.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] gharris1727 commented on a diff in pull request #13334: MINOR: Move plugin path parsing from DelegatingClassLoader to PluginUtils

2023-05-25 Thread via GitHub


gharris1727 commented on code in PR #13334:
URL: https://github.com/apache/kafka/pull/13334#discussion_r1205901798


##
connect/runtime/src/main/java/org/apache/kafka/connect/runtime/isolation/PluginUtils.java:
##
@@ -188,11 +191,34 @@ public static boolean isClassFile(Path path) {
 return path.toString().toLowerCase(Locale.ROOT).endsWith(".class");
 }
 
-public static List pluginLocations(Path topPath) throws IOException {
+public static List pluginLocations(String pluginPath) {
+if (pluginPath == null) {
+return Collections.emptyList();
+}
+String[] pluginPathElements = 
COMMA_WITH_WHITESPACE.split(pluginPath.trim(), -1);
+List pluginLocations = new ArrayList<>();
+for (String path : pluginPathElements) {
+try {
+Path pluginPathElement = Paths.get(path).toAbsolutePath();
+// Currently 'plugin.paths' property is a list of top-level 
directories
+// containing plugins
+if (Files.isDirectory(pluginPathElement)) {
+pluginLocations.addAll(pluginLocations(pluginPathElement));
+} else if (isArchive(pluginPathElement)) {
+pluginLocations.add(pluginPathElement);
+}

Review Comment:
   Thanks for catching that, the Paths.get() throws this exception but it's not 
checked.
   
   I'm not sure if the catch in initLoaders still needs the 
InvalidPathException, but I left it in just to be safe.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] Hangleton commented on a diff in pull request #13764: KAFKA-14691; [1/N] Add new fields to OffsetFetchRequest and OffsetFetchResponse

2023-05-25 Thread via GitHub


Hangleton commented on code in PR #13764:
URL: https://github.com/apache/kafka/pull/13764#discussion_r1205888922


##
clients/src/main/resources/common/message/OffsetFetchRequest.json:
##
@@ -51,8 +57,9 @@
 "about": "The group ID."},
   { "name": "Topics", "type": "[]OffsetFetchRequestTopics", "versions": 
"8+", "nullableVersions": "8+",
 "about": "Each topic we would like to fetch offsets for, or null to 
fetch offsets for all topics.", "fields": [
-{ "name": "Name", "type": "string", "versions": "8+", "entityType": 
"topicName",
+{ "name": "Name", "type": "string", "versions": "0-8", "ignorable": 
true, "entityType": "topicName",
   "about": "The topic name."},
+{ "name": "TopicId", "type": "uuid", "versions": "9+", "ignorable": 
true, "about": "The unique topic ID" },

Review Comment:
   nit: The `"about"` field could be on a new line.



##
clients/src/main/resources/common/message/OffsetFetchRequest.json:
##
@@ -33,11 +33,17 @@
   // Version 7 is adding the require stable flag.
   //
   // Version 8 is adding support for fetching offsets for multiple groups at a 
time
-  "validVersions": "0-8",
+  //
+  // Version 9 adds GenerationIdOrMemberEpoch, MemberId and TopicId fields 
(KIP-848).
+  "validVersions": "0-9",
   "flexibleVersions": "6+",
   "fields": [
 { "name": "GroupId", "type": "string", "versions": "0-7", "entityType": 
"groupId",
   "about": "The group to fetch offsets for." },
+{ "name": "GenerationIdOrMemberEpoch", "type": "int32", "versions": "9+", 
"default": "-1", "ignorable": true,

Review Comment:
   Do we want to add these fields in this PR?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[jira] [Created] (KAFKA-15025) Implement min-cost flow without balancing tasks for same subtopology

2023-05-25 Thread Hao Li (Jira)
Hao Li created KAFKA-15025:
--

 Summary: Implement min-cost flow without balancing tasks for same 
subtopology
 Key: KAFKA-15025
 URL: https://issues.apache.org/jira/browse/KAFKA-15025
 Project: Kafka
  Issue Type: Sub-task
Reporter: Hao Li






--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Created] (KAFKA-15027) Implement rack aware assignment for standby tasks

2023-05-25 Thread Hao Li (Jira)
Hao Li created KAFKA-15027:
--

 Summary: Implement rack aware assignment for standby tasks
 Key: KAFKA-15027
 URL: https://issues.apache.org/jira/browse/KAFKA-15027
 Project: Kafka
  Issue Type: Sub-task
Reporter: Hao Li






--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Created] (KAFKA-15026) Implement min-cost flow balancing tasks for same subtopology

2023-05-25 Thread Hao Li (Jira)
Hao Li created KAFKA-15026:
--

 Summary: Implement min-cost flow balancing tasks for same 
subtopology
 Key: KAFKA-15026
 URL: https://issues.apache.org/jira/browse/KAFKA-15026
 Project: Kafka
  Issue Type: Sub-task
Reporter: Hao Li






--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Created] (KAFKA-15024) Add cost function for task/client

2023-05-25 Thread Hao Li (Jira)
Hao Li created KAFKA-15024:
--

 Summary: Add cost function for task/client
 Key: KAFKA-15024
 URL: https://issues.apache.org/jira/browse/KAFKA-15024
 Project: Kafka
  Issue Type: Sub-task
Reporter: Hao Li






--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Created] (KAFKA-15023) Get rack information for source topic partitions for a task

2023-05-25 Thread Hao Li (Jira)
Hao Li created KAFKA-15023:
--

 Summary: Get rack information for source topic partitions for a 
task
 Key: KAFKA-15023
 URL: https://issues.apache.org/jira/browse/KAFKA-15023
 Project: Kafka
  Issue Type: Sub-task
Reporter: Hao Li






--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Updated] (KAFKA-15022) Support rack aware task assignment in Kafka streams

2023-05-25 Thread Hao Li (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-15022?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Hao Li updated KAFKA-15022:
---
Labels: kip kip-925  (was: )

> Support rack aware task assignment in Kafka streams 
> 
>
> Key: KAFKA-15022
> URL: https://issues.apache.org/jira/browse/KAFKA-15022
> Project: Kafka
>  Issue Type: Improvement
>  Components: streams
>Reporter: Hao Li
>Assignee: Hao Li
>Priority: Major
>  Labels: kip, kip-925
>
> For KIP-925: 
> https://cwiki.apache.org/confluence/display/KAFKA/KIP-925%3A+Rack+aware+task+assignment+in+Kafka+Streams



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Created] (KAFKA-15022) Support rack aware task assignment in Kafka streams

2023-05-25 Thread Hao Li (Jira)
Hao Li created KAFKA-15022:
--

 Summary: Support rack aware task assignment in Kafka streams 
 Key: KAFKA-15022
 URL: https://issues.apache.org/jira/browse/KAFKA-15022
 Project: Kafka
  Issue Type: Improvement
Reporter: Hao Li


For KIP-925: 
https://cwiki.apache.org/confluence/display/KAFKA/KIP-925%3A+Rack+aware+task+assignment+in+Kafka+Streams



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Updated] (KAFKA-15022) Support rack aware task assignment in Kafka streams

2023-05-25 Thread Hao Li (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-15022?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Hao Li updated KAFKA-15022:
---
Component/s: streams

> Support rack aware task assignment in Kafka streams 
> 
>
> Key: KAFKA-15022
> URL: https://issues.apache.org/jira/browse/KAFKA-15022
> Project: Kafka
>  Issue Type: Improvement
>  Components: streams
>Reporter: Hao Li
>Assignee: Hao Li
>Priority: Major
>
> For KIP-925: 
> https://cwiki.apache.org/confluence/display/KAFKA/KIP-925%3A+Rack+aware+task+assignment+in+Kafka+Streams



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Assigned] (KAFKA-15022) Support rack aware task assignment in Kafka streams

2023-05-25 Thread Hao Li (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-15022?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Hao Li reassigned KAFKA-15022:
--

Assignee: Hao Li

> Support rack aware task assignment in Kafka streams 
> 
>
> Key: KAFKA-15022
> URL: https://issues.apache.org/jira/browse/KAFKA-15022
> Project: Kafka
>  Issue Type: Improvement
>Reporter: Hao Li
>Assignee: Hao Li
>Priority: Major
>
> For KIP-925: 
> https://cwiki.apache.org/confluence/display/KAFKA/KIP-925%3A+Rack+aware+task+assignment+in+Kafka+Streams



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[GitHub] [kafka] splett2 commented on a diff in pull request #13765: KAFKA-15021; Skip leader epoch bump on ISR shrink

2023-05-25 Thread via GitHub


splett2 commented on code in PR #13765:
URL: https://github.com/apache/kafka/pull/13765#discussion_r1205862927


##
core/src/main/scala/kafka/cluster/Partition.scala:
##
@@ -1087,12 +1087,14 @@ class Partition(val topicPartition: TopicPartition,
 // avoid unnecessary collection generation
 val leaderLogEndOffset = leaderLog.logEndOffsetMetadata
 var newHighWatermark = leaderLogEndOffset
-remoteReplicasMap.values.foreach { replica =>
+remoteReplicasMap.foreachEntry { (replicaId, replica) =>

Review Comment:
   can we use `remoteReplicasMap.values` here and use the replica.brokerId 
similar to the maximalIsr.contains call?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] C0urante commented on a diff in pull request #13334: MINOR: Move plugin path parsing from DelegatingClassLoader to PluginUtils

2023-05-25 Thread via GitHub


C0urante commented on code in PR #13334:
URL: https://github.com/apache/kafka/pull/13334#discussion_r1205854894


##
connect/runtime/src/main/java/org/apache/kafka/connect/runtime/isolation/PluginUtils.java:
##
@@ -188,11 +191,34 @@ public static boolean isClassFile(Path path) {
 return path.toString().toLowerCase(Locale.ROOT).endsWith(".class");
 }
 
-public static List pluginLocations(Path topPath) throws IOException {
+public static List pluginLocations(String pluginPath) {
+if (pluginPath == null) {
+return Collections.emptyList();
+}
+String[] pluginPathElements = 
COMMA_WITH_WHITESPACE.split(pluginPath.trim(), -1);
+List pluginLocations = new ArrayList<>();
+for (String path : pluginPathElements) {
+try {
+Path pluginPathElement = Paths.get(path).toAbsolutePath();
+// Currently 'plugin.paths' property is a list of top-level 
directories
+// containing plugins
+if (Files.isDirectory(pluginPathElement)) {
+pluginLocations.addAll(pluginLocations(pluginPathElement));
+} else if (isArchive(pluginPathElement)) {
+pluginLocations.add(pluginPathElement);
+}

Review Comment:
   Do we also want to include the catch block for `InvalidPathException` here, 
which matches the [logic on 
trunk](https://github.com/apache/kafka/blob/dc00832b965c29fbbb4148cc680fadfc3f28642a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/isolation/DelegatingClassLoader.java#L266-L267)?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[jira] [Reopened] (KAFKA-8713) [Connect] JsonConverter NULL Values are replaced by default values even in NULLABLE fields

2023-05-25 Thread Greg Harris (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-8713?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Greg Harris reopened KAFKA-8713:


> [Connect] JsonConverter NULL Values are replaced by default values even in 
> NULLABLE fields
> --
>
> Key: KAFKA-8713
> URL: https://issues.apache.org/jira/browse/KAFKA-8713
> Project: Kafka
>  Issue Type: Bug
>  Components: KafkaConnect
>Affects Versions: 2.0.0, 2.0.1, 2.1.0, 2.2.0, 2.3.0, 2.2.1
>Reporter: Cheng Pan
>Assignee: Mickael Maison
>Priority: Major
>  Labels: needs-kip
> Fix For: 3.5.0
>
>
> Class JsonConverter line: 582
> {code:java}
> private static JsonNode convertToJson(Schema schema, Object logicalValue) 
> {
> if (logicalValue == null) {
> if (schema == null) // Any schema is valid and we don't have a 
> default, so treat this as an optional schema
> return null;
> if (schema.defaultValue() != null)
> return convertToJson(schema, schema.defaultValue());
> if (schema.isOptional())
> return JsonNodeFactory.instance.nullNode();
> throw new DataException("Conversion error: null value for field 
> that is required and has no default value");
> }
> 
> }
> {code}
> h1.Expect:
> Value `null` is valid for an optional filed, even though the filed has a 
> default value.
>  Only when field is required, the converter return default value fallback 
> when value is `null`.
> h1.Actual:
> Always return default value if `null` was given.
> h1. Example:
> I'm not sure if the current behavior is the exactly expected, but at least on 
> MySQL, a table  define as 
> {code:sql}
> create table t1 {
>name varchar(40) not null,
>create_time datetime default '1999-01-01 11:11:11' null,
>update_time datetime default '1999-01-01 11:11:11' null
> }
> {code}
> Just insert a record:
> {code:sql}
> INSERT INTO `t1` (`name`,  `update_time`) VALUES ('kafka', null);
> {code}
> The result is:
> {code:json}
> {
> "name": "kafka",
> "create_time": "1999-01-01 11:11:11",
> "update_time": null
> }
> {code}
> But when I use debezium pull binlog and send the record to Kafka with 
> JsonConverter, the result changed to:
> {code:json}
> {
> "name": "kafka",
> "create_time": "1999-01-01 11:11:11",
> "update_time": "1999-01-01 11:11:11"
> }
> {code}
> For more details, see: https://issues.jboss.org/browse/DBZ-1064



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[GitHub] [kafka] gharris1727 commented on pull request #13748: [BUGFIX] Bugfixed in KAFKA-8713, but it doesn't work properly.

2023-05-25 Thread via GitHub


gharris1727 commented on PR #13748:
URL: https://github.com/apache/kafka/pull/13748#issuecomment-1563306818

   Hey @krespo
   
   Thank you for clarifying your setup. I was able to reproduce this issue in a 
unit test. I'll bring it up to the release managers to see if we can get a fix 
into 3.5. Thanks so much for validating this feature pre-release.
   
   Unfortunately this patch can't move forward as-is, because people are 
certainly relying on the Struct::get returning defaults. However, we can alter 
the JsonConverter to use Struct::getWithoutDefault, and use the 
`replace.null.with.default` configuration to gate whether to insert the default 
to maintain backwards compatibility.
   
   If you can update this patch to call getWithoutDefault, and add some unit 
tests for the struct field case, I think that will be merge-able.
   
   cc @mimaison @C0urante


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] jsancio commented on pull request #13611: MINOR: remove unused variable from QuorumMetaLogListener#handleCommit method

2023-05-25 Thread via GitHub


jsancio commented on PR #13611:
URL: https://github.com/apache/kafka/pull/13611#issuecomment-1563305554

   I'll take a look @machi1990 . In the mean time, can you please add a 
description?


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] jsancio opened a new pull request, #13765: KAFKA-15021; Skip leader epoch bump

2023-05-25 Thread via GitHub


jsancio opened a new pull request, #13765:
URL: https://github.com/apache/kafka/pull/13765

   TODO: Write description
   
   ### Committer Checklist (excluded from commit message)
   - [ ] Verify design and implementation 
   - [ ] Verify test coverage and CI build status
   - [ ] Verify documentation (including upgrade notes)
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] gharris1727 commented on a diff in pull request #13356: KAFKA-14789: Prevent mis-attributing classpath plugins, allow discovery of classpath RestExtension and ConfigProvider

2023-05-25 Thread via GitHub


gharris1727 commented on code in PR #13356:
URL: https://github.com/apache/kafka/pull/13356#discussion_r1205800242


##
connect/runtime/src/main/java/org/apache/kafka/connect/runtime/isolation/DelegatingClassLoader.java:
##
@@ -431,15 +427,29 @@ private  Collection> 
getServiceLoaderPluginDesc(Class klass,
 log.error("Failed to discover {}{}", 
klass.getSimpleName(), reflectiveErrorDescription(t.getCause()), t);
 continue;
 }
-result.add(pluginDesc((Class) 
pluginImpl.getClass(),
-versionFor(pluginImpl), loader));
+Class pluginKlass = (Class) 
pluginImpl.getClass();
+if (!isParentClassloader(pluginKlass.getClassLoader(), 
loader)) {
+log.debug("Exclude {} that is from classloader {}", 
pluginKlass.getSimpleName(), pluginKlass.getClassLoader());
+continue;
+}
+result.add(pluginDesc(pluginKlass, versionFor(pluginImpl), 
loader));
 }
 } finally {
 Plugins.compareAndSwapLoaders(savedLoader);
 }
 return result;
 }
 
+private static boolean isParentClassloader(ClassLoader loader, ClassLoader 
parent) {
+while (loader != null) {
+if (loader == parent) {
+return true;
+}
+loader = loader.getParent();
+}
+return false;

Review Comment:
   I wrote this in case a plugin came from a classloader which was a child of 
the PluginClassLoader. Thinking about it again, I don't know how a plugin would 
manage that, since the PluginClassLoader doesn't have any child-first logic 
that would permit it to load classes from deeper in the classloading hierarchy.
   
   I'll simplify this logic.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] jolshan commented on a diff in pull request #13608: KAFKA-14884: Include check transaction is still ongoing right before append

2023-05-25 Thread via GitHub


jolshan commented on code in PR #13608:
URL: https://github.com/apache/kafka/pull/13608#discussion_r1205794685


##
core/src/main/scala/kafka/log/UnifiedLog.scala:
##
@@ -980,6 +1007,15 @@ class UnifiedLog(@volatile var logStartOffset: Long,
   if (duplicateBatch.isPresent) {
 return (updatedProducers, completedTxns.toList, 
Some(duplicateBatch.get()))
   }
+
+  // Verify that if the record is transactional & the append origin is 
client, that we are in VERIFIED state.
+  // Also check that we are not appending a record with a higher 
sequence than one previously seen through verification.
+  if (batch.isTransactional && 
producerStateManager.producerStateManagerConfig().transactionVerificationEnabled())
 {
+if (verificationState(batch.producerId(), batch.producerEpoch()) 
!= ProducerStateEntry.VerificationState.VERIFIED) {
+  throw new InvalidRecordException("Record was not part of an 
ongoing transaction")
+} else if (maybeLastEntry.isPresent && 
maybeLastEntry.get.tentativeSequence.isPresent && 
maybeLastEntry.get.tentativeSequence.getAsInt < batch.baseSequence)

Review Comment:
   One thing that is tricky is that the producer state entry used in the 
sequence check is actually not the 



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] jolshan commented on a diff in pull request #13608: KAFKA-14884: Include check transaction is still ongoing right before append

2023-05-25 Thread via GitHub


jolshan commented on code in PR #13608:
URL: https://github.com/apache/kafka/pull/13608#discussion_r1205792946


##
core/src/main/scala/kafka/log/UnifiedLog.scala:
##
@@ -980,6 +1007,15 @@ class UnifiedLog(@volatile var logStartOffset: Long,
   if (duplicateBatch.isPresent) {
 return (updatedProducers, completedTxns.toList, 
Some(duplicateBatch.get()))
   }
+
+  // Verify that if the record is transactional & the append origin is 
client, that we are in VERIFIED state.
+  // Also check that we are not appending a record with a higher 
sequence than one previously seen through verification.
+  if (batch.isTransactional && 
producerStateManager.producerStateManagerConfig().transactionVerificationEnabled())
 {
+if (verificationState(batch.producerId(), batch.producerEpoch()) 
!= ProducerStateEntry.VerificationState.VERIFIED) {
+  throw new InvalidRecordException("Record was not part of an 
ongoing transaction")
+} else if (maybeLastEntry.isPresent && 
maybeLastEntry.get.tentativeSequence.isPresent && 
maybeLastEntry.get.tentativeSequence.getAsInt < batch.baseSequence)

Review Comment:
   Did we want to move the verification check there? Or just tentative sequence?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] jolshan commented on a diff in pull request #13608: KAFKA-14884: Include check transaction is still ongoing right before append

2023-05-25 Thread via GitHub


jolshan commented on code in PR #13608:
URL: https://github.com/apache/kafka/pull/13608#discussion_r1205777679


##
storage/src/main/java/org/apache/kafka/storage/internals/log/ProducerStateEntry.java:
##
@@ -104,27 +122,63 @@ public boolean maybeUpdateProducerEpoch(short 
producerEpoch) {
 }
 }
 
+public boolean maybeUpdateProducerHigherEpoch(short producerEpoch) {
+if (this.producerEpoch < producerEpoch) {
+batchMetadata.clear();
+this.producerEpoch = producerEpoch;
+return true;
+} else {
+return false;
+}
+}
+
+// We only set tentative sequence if no batches have been written to the 
log. It is used to avoid OutOfOrderSequenceExceptions
+// when we saw a lower sequence during transaction verification. We will 
update the sequence when there is no batch metadata if:
+//  a) There is no tentative sequence yet
+//  b) A lower sequence for the same epoch is seen and should thereby 
block records after that
+//  c) A higher producer epoch is found that will reset the lowest seen 
sequence
+public void maybeUpdateTentativeSequence(int sequence, short 
producerEpoch) {
+if (batchMetadata.isEmpty() && 

Review Comment:
   I dug into this a bit more. We actually do check sequence on bumped epoch! 
   
   ```
if (producerEpoch != updatedEntry.producerEpoch()) {
   if (appendFirstSeq != 0) {
   if (updatedEntry.producerEpoch() != 
RecordBatch.NO_PRODUCER_EPOCH) {
   throw new OutOfOrderSequenceException("Invalid sequence 
number for new epoch of producer " + producerId +
   "at offset " + offset + " in partition " + 
topicPartition + ": " + producerEpoch + " (request epoch), "
   + appendFirstSeq + " (seq. number), " + 
updatedEntry.producerEpoch() + " (current producer epoch)");
   ```
   
   Basically we check when the updated epoch is different than the epoch we 
want to use. The tricky part is that we update the epoch when creating the 
verification state, so we just need to do that check there somehow. I will look 
into it.



##
storage/src/main/java/org/apache/kafka/storage/internals/log/ProducerStateEntry.java:
##
@@ -104,27 +122,63 @@ public boolean maybeUpdateProducerEpoch(short 
producerEpoch) {
 }
 }
 
+public boolean maybeUpdateProducerHigherEpoch(short producerEpoch) {
+if (this.producerEpoch < producerEpoch) {
+batchMetadata.clear();
+this.producerEpoch = producerEpoch;
+return true;
+} else {
+return false;
+}
+}
+
+// We only set tentative sequence if no batches have been written to the 
log. It is used to avoid OutOfOrderSequenceExceptions
+// when we saw a lower sequence during transaction verification. We will 
update the sequence when there is no batch metadata if:
+//  a) There is no tentative sequence yet
+//  b) A lower sequence for the same epoch is seen and should thereby 
block records after that
+//  c) A higher producer epoch is found that will reset the lowest seen 
sequence
+public void maybeUpdateTentativeSequence(int sequence, short 
producerEpoch) {
+if (batchMetadata.isEmpty() && 

Review Comment:
   I dug into this a bit more. We actually do check sequence on bumped epoch! 
   
   ```
if (producerEpoch != updatedEntry.producerEpoch()) {
   if (appendFirstSeq != 0) {
   if (updatedEntry.producerEpoch() != 
RecordBatch.NO_PRODUCER_EPOCH) {
   throw new OutOfOrderSequenceException("Invalid sequence 
number for new epoch of producer " + producerId +
   "at offset " + offset + " in partition " + 
topicPartition + ": " + producerEpoch + " (request epoch), "
   + appendFirstSeq + " (seq. number), " + 
updatedEntry.producerEpoch() + " (current producer epoch)");
```
   
   Basically we check when the updated epoch is different than the epoch we 
want to use. The tricky part is that we update the epoch when creating the 
verification state, so we just need to do that check there somehow. I will look 
into it.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[jira] [Commented] (KAFKA-14953) Add metrics for tiered storage

2023-05-25 Thread Abhijeet Kumar (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-14953?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17726301#comment-17726301
 ] 

Abhijeet Kumar commented on KAFKA-14953:


Yes, that is in the plan as mentioned by Satish in the comment: 
[https://github.com/apache/kafka/pull/13535#discussion_r1181518655]

I plan to add all the metrics in scope.

> Add metrics for tiered storage
> --
>
> Key: KAFKA-14953
> URL: https://issues.apache.org/jira/browse/KAFKA-14953
> Project: Kafka
>  Issue Type: Sub-task
>Reporter: Divij Vaidya
>Assignee: Abhijeet Kumar
>Priority: Minor
>
> Not just for expired fetch. We also need to add all the metrics described in 
> KIP-405
> https://cwiki.apache.org/confluence/display/KAFKA/KIP-405%3A+Kafka+Tiered+Storage#KIP405:KafkaTieredStorage-NewMetrics
>  
> ref: [https://github.com/apache/kafka/pull/13535#discussion_r1180286031] 



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Updated] (KAFKA-9800) [KIP-580] Client Exponential Backoff Implementation

2023-05-25 Thread Kirk True (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-9800?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Kirk True updated KAFKA-9800:
-
Labels: KIP-580 client  (was: KIP-580)

> [KIP-580] Client Exponential Backoff Implementation
> ---
>
> Key: KAFKA-9800
> URL: https://issues.apache.org/jira/browse/KAFKA-9800
> Project: Kafka
>  Issue Type: New Feature
>Reporter: Cheng Tan
>Assignee: Luke Chen
>Priority: Major
>  Labels: KIP-580, client
>
> Design:
> The main idea is to bookkeep the failed attempt. Currently, the retry backoff 
> has two main usage patterns:
>  # Synchronous retires and blocking loop. The thread will sleep in each 
> iteration for retry backoff ms.
>  # Async retries. In each polling, the retries do not meet the backoff will 
> be filtered. The data class often maintains a 1:1 mapping to a set of 
> requests which are logically associated. (i.e. a set contains only one 
> initial request and only its retries.)
> For type 1, we can utilize a local failure counter of a Java generic data 
> type.
> For case 2, I already wrapped the exponential backoff/timeout util class in 
> my KIP-601 
> [implementation|https://github.com/apache/kafka/pull/8683/files#diff-9ca2b1294653dfa914b9277de62b52e3R28]
>  which takes the number of attempts and returns the backoff/timeout value at 
> the corresponding level. Thus, we can add a new class property to those 
> classes containing retriable data in order to record the number of failed 
> attempts.
>  
> Changes:
> KafkaProducer:
>  # Produce request (ApiKeys.PRODUCE). Currently, the backoff applies to each 
> ProducerBatch in Accumulator, which already has an attribute attempts 
> recording the number of failed attempts. So we can let the Accumulator 
> calculate the new retry backoff for each bach when it enqueues them, to avoid 
> instantiate the util class multiple times.
>  # Transaction request (ApiKeys..*TXN). TxnRequestHandler will have a new 
> class property of type `Long` to record the number of attempts.
> KafkaConsumer:
>  # Some synchronous retry use cases. Record the failed attempts in the 
> blocking loop.
>  # Partition request (ApiKeys.OFFSET_FOR_LEADER_EPOCH, ApiKeys.LIST_OFFSETS). 
> Though the actual requests are packed for each node, the current 
> implementation is applying backoff to each topic partition, where the backoff 
> value is kept by TopicPartitionState. Thus, TopicPartitionState will have the 
> new property recording the number of attempts.
> Metadata:
>  #  Metadata lives as a singleton in many clients. Add a new property 
> recording the number of attempts
>  AdminClient:
>  # AdminClient has its own request abstraction Call. The failed attempts are 
> already kept by the abstraction. So probably clean the Call class logic a bit.
> Existing tests:
>  # If the tests are testing the retry backoff, add a delta to the assertion, 
> considering the existence of the jitter.
>  # If the tests are testing other functionality, we can specify the same 
> value for both `retry.backoff.ms` and `retry.backoff.max.ms` in order to make 
> the retry backoff static. We can use this trick to make the existing tests 
> compatible with the changes.
> There're other common usages look like client.poll(timeout), where the 
> timeout passed in is the retry backoff value. We won't change these usages 
> since its underlying logic is nioSelector.select(timeout) and 
> nioSelector.selectNow(), which means if no interested op exists, the client 
> will block retry backoff milliseconds. This is an optimization when there's 
> no request that needs to be sent but the client is waiting for responses. 
> Specifically, if the client fails the inflight requests before the retry 
> backoff milliseconds passed, it still needs to wait until that amount of time 
> passed, unless there's a new request need to be sent.
>  



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[GitHub] [kafka] bbejeck commented on pull request #13751: KAFKA-14539: Simplify StreamsMetadataState by replacing the Cluster metadata with partition info map

2023-05-25 Thread via GitHub


bbejeck commented on PR #13751:
URL: https://github.com/apache/kafka/pull/13751#issuecomment-1563134216

   ping @mjsax or @ableegoldman for a second review


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] bbejeck commented on a diff in pull request #13751: KAFKA-14539: Simplify StreamsMetadataState by replacing the Cluster metadata with partition info map

2023-05-25 Thread via GitHub


bbejeck commented on code in PR #13751:
URL: https://github.com/apache/kafka/pull/13751#discussion_r1205696101


##
streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamsMetadataState.java:
##
@@ -558,7 +564,7 @@ private SourceTopicsInfo getSourceTopicsInfo(final String 
storeName, final Strin
 }
 
 private boolean isInitialized() {
-return clusterMetadata != null && !clusterMetadata.topics().isEmpty() 
&& localMetadata.get() != null;
+return partitionsByTopic != null && 
!partitionsByTopic.keySet().isEmpty() && localMetadata.get() != null;

Review Comment:
   Could be simplified to `partitionsByTopic.isEmpty()`



##
streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamsMetadataState.java:
##
@@ -308,12 +308,18 @@ public synchronized  KeyQueryMetadata 
getKeyQueryMetadataForKey(final String
  *
  * @param activePartitionHostMap  the current mapping of {@link HostInfo} 
-> {@link TopicPartition}s for active partitions
  * @param standbyPartitionHostMap the current mapping of {@link HostInfo} 
-> {@link TopicPartition}s for standby partitions
- * @param clusterMetadata the current clusterMetadata {@link 
Cluster}
+ * @param topicPartitionInfo  the current mapping of {@link 
TopicPartition} -> {@Link PartitionInfo}
  */
 synchronized void onChange(final Map> 
activePartitionHostMap,
final Map> 
standbyPartitionHostMap,
-   final Cluster clusterMetadata) {
-this.clusterMetadata = clusterMetadata;
+   final Map 
topicPartitionInfo) {
+this.partitionsByTopic = new HashMap<>();
+for (final Map.Entry entry: 
topicPartitionInfo.entrySet()) {

Review Comment:
   nit: use `topicPartitionInfo.entrySet().foreach( entry -> {...} )` 



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] Hangleton commented on a diff in pull request #13558: KAFKA-14845: Fix broker registration with Zookeeper when the previous ephemeral znode was not properly recorded by the broker

2023-05-25 Thread via GitHub


Hangleton commented on code in PR #13558:
URL: https://github.com/apache/kafka/pull/13558#discussion_r1205669500


##
checkstyle/suppressions.xml:
##
@@ -41,6 +41,8 @@
 
 
+

Review Comment:
   Added the packages to import-control-core in [this 
commit](https://github.com/apache/kafka/pull/13558/commits/03d444b94de2494548b61cd0458c842c05872e3f).
 I had to kept a suppression rule for the package `org.apache.zookeeper` 
because it is not part of any import control and Checkstyle would not accept it 
without a proper import control definition.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] Hangleton commented on a diff in pull request #13558: KAFKA-14845: Fix broker registration with Zookeeper when the previous ephemeral znode was not properly recorded by the broker

2023-05-25 Thread via GitHub


Hangleton commented on code in PR #13558:
URL: https://github.com/apache/kafka/pull/13558#discussion_r1205669500


##
checkstyle/suppressions.xml:
##
@@ -41,6 +41,8 @@
 
 
+

Review Comment:
   Added the packages to import-control-core in [this 
commit](https://github.com/apache/kafka/pull/13558/commits/03d444b94de2494548b61cd0458c842c05872e3f).
 I had to kept a suppression rule for the package `org.apache.zookeeper` 
because it is not part of any import control and Checkstyle would not accept it 
without it.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] C0urante commented on a diff in pull request #13356: KAFKA-14789: Prevent mis-attributing classpath plugins, allow discovery of classpath RestExtension and ConfigProvider

2023-05-25 Thread via GitHub


C0urante commented on code in PR #13356:
URL: https://github.com/apache/kafka/pull/13356#discussion_r1205631278


##
connect/runtime/src/main/java/org/apache/kafka/connect/runtime/isolation/DelegatingClassLoader.java:
##
@@ -431,15 +427,29 @@ private  Collection> 
getServiceLoaderPluginDesc(Class klass,
 log.error("Failed to discover {}{}", 
klass.getSimpleName(), reflectiveErrorDescription(t.getCause()), t);
 continue;
 }
-result.add(pluginDesc((Class) 
pluginImpl.getClass(),
-versionFor(pluginImpl), loader));
+Class pluginKlass = (Class) 
pluginImpl.getClass();
+if (!isParentClassloader(pluginKlass.getClassLoader(), 
loader)) {
+log.debug("Exclude {} that is from classloader {}", 
pluginKlass.getSimpleName(), pluginKlass.getClassLoader());
+continue;
+}
+result.add(pluginDesc(pluginKlass, versionFor(pluginImpl), 
loader));
 }
 } finally {
 Plugins.compareAndSwapLoaders(savedLoader);
 }
 return result;
 }
 
+private static boolean isParentClassloader(ClassLoader loader, ClassLoader 
parent) {
+while (loader != null) {
+if (loader == parent) {
+return true;
+}
+loader = loader.getParent();
+}
+return false;

Review Comment:
   I replaced this method body with a simple `return loader == parent` equality 
check and all unit tests passed.
   
   Can you shed some light on why the logic here is necessary? And possibly add 
a test case to demonstrate?



##
connect/runtime/src/main/java/org/apache/kafka/connect/runtime/isolation/DelegatingClassLoader.java:
##
@@ -398,15 +390,19 @@ private  Collection> getPluginDesc(
 }
 
 Collection> result = new ArrayList<>();
-for (Class plugin : plugins) {
-if (PluginUtils.isConcrete(plugin)) {
-try {
-result.add(pluginDesc(plugin, versionFor(plugin), loader));
-} catch (ReflectiveOperationException | LinkageError e) {
-log.error("Failed to discover {}: Unable to instantiate 
{}{}", klass.getSimpleName(), plugin.getSimpleName(), 
reflectiveErrorDescription(e), e);
-}
-} else {
-log.debug("Skipping {} as it is not concrete implementation", 
plugin);
+for (Class pluginKlass : plugins) {
+if (!PluginUtils.isConcrete(pluginKlass)) {
+log.debug("Skipping {} as it is not concrete implementation", 
pluginKlass);
+continue;
+}
+if (!isParentClassloader(pluginKlass.getClassLoader(), loader)) {
+log.debug("Exclude {} that is from classloader {}", 
pluginKlass.getSimpleName(), pluginKlass.getClassLoader());

Review Comment:
   Should we also state why this classloader warrants exclusion of the scanned 
plugin?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] jeffkbkim commented on a diff in pull request #13639: KAFKA-14462; [12/N] Add GroupMetadataManager and ConsumerGroup

2023-05-25 Thread via GitHub


jeffkbkim commented on code in PR #13639:
URL: https://github.com/apache/kafka/pull/13639#discussion_r1204874200


##
group-coordinator/src/test/java/org/apache/kafka/coordinator/group/consumer/ConsumerGroupTest.java:
##
@@ -0,0 +1,500 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.coordinator.group.consumer;
+
+import org.apache.kafka.common.Uuid;
+import org.apache.kafka.common.errors.UnknownMemberIdException;
+import org.apache.kafka.common.utils.LogContext;
+import org.apache.kafka.coordinator.group.GroupMetadataManagerTest;
+import org.apache.kafka.image.TopicsImage;
+import org.apache.kafka.timeline.SnapshotRegistry;
+import org.junit.jupiter.api.Test;
+
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.Optional;
+
+import static org.apache.kafka.common.utils.Utils.mkEntry;
+import static org.apache.kafka.common.utils.Utils.mkMap;
+import static 
org.apache.kafka.coordinator.group.AssignmentTestUtil.mkAssignment;
+import static 
org.apache.kafka.coordinator.group.AssignmentTestUtil.mkTopicAssignment;
+import static org.junit.jupiter.api.Assertions.assertEquals;
+import static org.junit.jupiter.api.Assertions.assertFalse;
+import static org.junit.jupiter.api.Assertions.assertThrows;
+import static org.junit.jupiter.api.Assertions.assertTrue;
+
+public class ConsumerGroupTest {
+
+private ConsumerGroup createConsumerGroup(String groupId) {
+SnapshotRegistry snapshotRegistry = new SnapshotRegistry(new 
LogContext());
+return new ConsumerGroup(snapshotRegistry, groupId);
+}
+
+@Test
+public void testGetOrCreateMember() {
+ConsumerGroup consumerGroup = createConsumerGroup("foo");
+ConsumerGroupMember member;
+
+// Create a group.
+member = consumerGroup.getOrMaybeCreateMember("member-id", true);
+assertEquals("member-id", member.memberId());
+
+// Get that group back.
+member = consumerGroup.getOrMaybeCreateMember("member-id", false);
+assertEquals("member-id", member.memberId());
+
+assertThrows(UnknownMemberIdException.class, () ->
+consumerGroup.getOrMaybeCreateMember("does-not-exist", false));
+}
+
+@Test
+public void testUpdateMember() {
+ConsumerGroup consumerGroup = createConsumerGroup("foo");
+ConsumerGroupMember member;
+
+member = consumerGroup.getOrMaybeCreateMember("member", true);
+
+member = new ConsumerGroupMember.Builder(member)
+.setSubscribedTopicNames(Arrays.asList("foo", "bar"))
+.build();
+
+consumerGroup.updateMember(member);
+
+assertEquals(member, consumerGroup.getOrMaybeCreateMember("member", 
false));
+}
+
+@Test
+public void testRemoveMember() {
+ConsumerGroup consumerGroup = createConsumerGroup("foo");
+
+consumerGroup.getOrMaybeCreateMember("member", true);
+assertTrue(consumerGroup.hasMember("member"));
+
+consumerGroup.removeMember("member");
+assertFalse(consumerGroup.hasMember("member"));
+
+}
+
+@Test
+public void testUpdatingMemberUpdatesPartitionEpoch() {
+Uuid fooTopicId = Uuid.randomUuid();
+Uuid barTopicId = Uuid.randomUuid();
+Uuid zarTopicId = Uuid.randomUuid();
+
+ConsumerGroup consumerGroup = createConsumerGroup("foo");
+ConsumerGroupMember member;
+
+member = new ConsumerGroupMember.Builder("member")
+.setMemberEpoch(10)
+.setAssignedPartitions(mkAssignment(
+mkTopicAssignment(fooTopicId, 1, 2, 3)))
+.setPartitionsPendingRevocation(mkAssignment(
+mkTopicAssignment(barTopicId, 4, 5, 6)))
+.setPartitionsPendingAssignment(mkAssignment(
+mkTopicAssignment(zarTopicId, 7, 8, 9)))
+.build();
+
+consumerGroup.updateMember(member);
+
+assertEquals(10, consumerGroup.currentPartitionEpoch(fooTopicId, 1));
+assertEquals(10, consumerGroup.currentPartitionEpoch(fooTopicId, 2));
+assertEquals(10, consumerGroup.currentPartitionEpoch(fooTopicId, 3));
+assertEquals(10, consumerGroup.currentPartitionEpoch(

[GitHub] [kafka] Hangleton commented on a diff in pull request #13558: KAFKA-14845: Fix broker registration with Zookeeper when the previous ephemeral znode was not properly recorded by the broker

2023-05-25 Thread via GitHub


Hangleton commented on code in PR #13558:
URL: https://github.com/apache/kafka/pull/13558#discussion_r1205648643


##
checkstyle/suppressions.xml:
##
@@ -41,6 +41,8 @@
 
 
+

Review Comment:
   Apologies if I am missing the obvious, but am I getting it correctly that it 
is not only an ordering problem, but that the packages of the imported classes 
need to be defined in 
[import-control-core.xml](https://github.com/apache/kafka/blob/trunk/checkstyle/import-control-core.xml)?
 I will modify that file and remove the suppressions rule.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[jira] [Created] (KAFKA-15021) KRaft controller increases leader epoch when shrinking ISR

2023-05-25 Thread Jira
José Armando García Sancio created KAFKA-15021:
--

 Summary: KRaft controller increases leader epoch when shrinking ISR
 Key: KAFKA-15021
 URL: https://issues.apache.org/jira/browse/KAFKA-15021
 Project: Kafka
  Issue Type: Bug
  Components: controller, kraft
Reporter: José Armando García Sancio
Assignee: José Armando García Sancio


When the KRaft controller shrinks the ISR it also forces the leader epoch to 
increase. This is unnecessary and cases all of the follower replica fetches to 
get invalidated.

Here is an example trace of this behavior after replica 8 was shutdown:
{code:java}
kafka-dump-log --cluster-metadata-decoder --files 
__cluster_metadata-0/38589501.log | grep Pd7wMb4lSkKI00--SrWNXw
...
| offset: 38655592 CreateTime: 1683849857362 keySize: -1 valueSize: 41 
sequence: -1 headerKeys: [] payload: 
{"type":"PARTITION_CHANGE_RECORD","version":0,"data":{"partitionId":7,"topicId":"Pd7wMb4lSkKI00--SrWNXw","isr":[3,1],"leader":1}}
| offset: 38655593 CreateTime: 1683849857362 keySize: -1 valueSize: 41 
sequence: -1 headerKeys: [] payload: 
{"type":"PARTITION_CHANGE_RECORD","version":0,"data":{"partitionId":5,"topicId":"Pd7wMb4lSkKI00--SrWNXw","isr":[0,4],"leader":4}}
| offset: 38655594 CreateTime: 1683849857362 keySize: -1 valueSize: 41 
sequence: -1 headerKeys: [] payload: 
{"type":"PARTITION_CHANGE_RECORD","version":0,"data":{"partitionId":6,"topicId":"Pd7wMb4lSkKI00--SrWNXw","isr":[0,1],"leader":0}}
| offset: 38656159 CreateTime: 1683849974945 keySize: -1 valueSize: 39 
sequence: -1 headerKeys: [] payload: 
{"type":"PARTITION_CHANGE_RECORD","version":0,"data":{"partitionId":7,"topicId":"Pd7wMb4lSkKI00--SrWNXw","isr":[3,1,8]}}
| offset: 38656256 CreateTime: 1683849994297 keySize: -1 valueSize: 39 
sequence: -1 headerKeys: [] payload: 
{"type":"PARTITION_CHANGE_RECORD","version":0,"data":{"partitionId":5,"topicId":"Pd7wMb4lSkKI00--SrWNXw","isr":[0,4,8]}}
| offset: 38656299 CreateTime: 1683849997139 keySize: -1 valueSize: 39 
sequence: -1 headerKeys: [] payload: 
{"type":"PARTITION_CHANGE_RECORD","version":0,"data":{"partitionId":6,"topicId":"Pd7wMb4lSkKI00--SrWNXw","isr":[0,1,8]}}
| offset: 38657003 CreateTime: 1683850157379 keySize: -1 valueSize: 30 
sequence: -1 headerKeys: [] payload: 
{"type":"PARTITION_CHANGE_RECORD","version":0,"data":{"partitionId":7,"topicId":"Pd7wMb4lSkKI00--SrWNXw","leader":8}}
 {code}
Also, notice how the leader epoch was not increased when the ISR was expanded.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[GitHub] [kafka] hudeqi commented on pull request #13421: KAFKA-14824: ReplicaAlterLogDirsThread may cause serious disk growing in case of potential exception

2023-05-25 Thread via GitHub


hudeqi commented on PR #13421:
URL: https://github.com/apache/kafka/pull/13421#issuecomment-1563021307

   > Heya @hudeqi, could you give more detailed explanation on what the problem 
you are trying to solve here because I do not understand? May I suggest you 
distinguish between a partition replica and a partition future replica in some 
way, because otherwise it is quite difficult to understand which replica you 
are referring to when it is just called "partition"?
   > 
   > Let's say we have an original replica in log directory (backed by one 
disk) A, and let's say we have a future replica in log directory (backed by 
another disk) B on the same broker. I have confirmed that it is the case that 
**compaction** is paused on A when B is first created 
(https://github.com/apache/kafka/blob/trunk/core/src/main/scala/kafka/server/ReplicaManager.scala#L910).
 It is only resumed 
(https://github.com/apache/kafka/blob/trunk/core/src/main/scala/kafka/log/LogManager.scala#L1099).
 I can think of two situations happening now.
   > 
   > 1. A fails and B doesn't know what to do.
   >In this situation, if A has failed then cleaning should not be resumed 
on A - an operator intervention is required to understand what went wrong with 
the log directory. Cleaning should not be started on B either. Since A has 
failed the amount of data that B has does not grow because it doesn't have a 
source to keep copying from.
   > 2. B fails and A doesn't know what to do.
   >In this situation B's size cannot grow because it is no longer copying 
from A. A should resume cleaning as for all intents and purposes it acts as a 
normal replica to a topic partition.
   > 
   > As far as I understand what you try to do is solve situation number 2 - am 
I correct? If I am correct, what is the reasoning behind marking A as a failed 
partition?
   
   Hi, clolov, thank you for your review! I'm sorry that I didn't express the 
issue clearly. Your scenario example and description are very correct. The 
problem I want to solve is indeed situation number 2. I will describe according 
to your scenario:
   
   Assuming that B fails due to a disk problem, then according to the current 
logic, the partition is directly marked as failed in the corresponding 
'ReplicaAlterLogDirsThread', nothing more. However, as a replica that normally 
provides services, A does not know that B has failed and remains in a 
clean-paused state, which will cause the disk where A is located to be fully 
occupied. That is the problem I want to solve. And my solution is to resume 
cleaning the partition when B fails to ensure that A can continue to clean up. 
@clolov 


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] Hangleton commented on pull request #13558: KAFKA-14845: Fix broker registration with Zookeeper when the previous ephemeral znode was not properly recorded by the broker

2023-05-25 Thread via GitHub


Hangleton commented on PR #13558:
URL: https://github.com/apache/kafka/pull/13558#issuecomment-1563016533

   Thanks Igor for the review, addressing these comments now - sorry i didn't 
earlier.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] soarez commented on a diff in pull request #13558: KAFKA-14845: Fix broker registration with Zookeeper when the previous ephemeral znode was not properly recorded by the broker

2023-05-25 Thread via GitHub


soarez commented on code in PR #13558:
URL: https://github.com/apache/kafka/pull/13558#discussion_r1205587123


##
checkstyle/suppressions.xml:
##
@@ -41,6 +41,8 @@
 
 
+

Review Comment:
   Is this intentional? Can't we sort the import statements to satisfy this 
Checkstyle rule?



##
core/src/main/scala/kafka/zk/KafkaZkClient.scala:
##
@@ -2113,7 +2113,12 @@ class KafkaZkClient private[zk] (zooKeeperClient: 
ZooKeeperClient, isSecure: Boo
   }
 
   private class CheckedEphemeral(path: String, data: Array[Byte]) extends 
Logging {
+private var attempt = 0
+private val maxAttempt = 5
+private val backoffMs = 1000

Review Comment:
   Should the backoff value be calculated here then? The session timeout is 
available in `apply()` where `KafkaZkClient` is created. 



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] C0urante merged pull request #13165: KAFKA-14654: Connector classes should statically initialize with plugin classloader

2023-05-25 Thread via GitHub


C0urante merged PR #13165:
URL: https://github.com/apache/kafka/pull/13165


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] hudeqi commented on a diff in pull request #13696: KAFKA-14979:Incorrect lag was calculated when markPartitionsForTruncation in ReplicaAlterLogDirsThread

2023-05-25 Thread via GitHub


hudeqi commented on code in PR #13696:
URL: https://github.com/apache/kafka/pull/13696#discussion_r1205574820


##
core/src/main/scala/kafka/server/AbstractFetcherThread.scala:
##
@@ -460,8 +460,16 @@ abstract class AbstractFetcherThread(name: String,
 partitionMapLock.lockInterruptibly()
 try {
   Option(partitionStates.stateValue(topicPartition)).foreach { state =>
-val newState = PartitionFetchState(state.topicId, 
math.min(truncationOffset, state.fetchOffset),
-  state.lag, state.currentLeaderEpoch, state.delay, state = Truncating,
+var lag = state.lag

Review Comment:
   > Isn't the lag recalculated on the next fetch iteration 
([here](https://github.com/apache/kafka/blob/trunk/core/src/main/scala/kafka/server/AbstractFetcherThread.scala#L362))?
   
   Hi Hangleton, thanks for your review! As you said, under normal 
circumstances, the lag will be recalculated on the next fetch. However, if an 
exception occurs before updating the lag (for example, processPartitionData 
throws an exception due to a disk error) or the return value of 
logAppendInfoOpt is None, then the lag value will always be or last for a 
period of time as the previous error value (obviously truncate, lag display is 
still very large, which seems to deviate greatly from our understanding). 
Therefore, the modification here is a defensive measure that can effectively 
avoid such situations. @Hangleton 



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] C0urante commented on pull request #13465: KAFKA-14368: Connect offset write REST API

2023-05-25 Thread via GitHub


C0urante commented on PR #13465:
URL: https://github.com/apache/kafka/pull/13465#issuecomment-1562965406

   Ah, spoke too soon!
   
   I'd be open to bumping timeouts. If this does turn out to be a correctness 
issue (which is still possible since the timing on CI may be different and 
therefore more likely to unearth certain kinds of concurrency bugs), we can 
investigate further.
   
   Also worth noting that `WorkerTest::testAlterOffsetsSourceConnectorError` is 
also failing right now because `offsetStore::stop` hasn't been invoked by the 
time we check for it. I think you handle this kind of issue elsewhere by adding 
a second `timeout(1000)` argument when making calls to `Mockito::verify`; 
hopefully that's sufficient for this test as well?


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] C0urante commented on pull request #13465: KAFKA-14368: Connect offset write REST API

2023-05-25 Thread via GitHub


C0urante commented on PR #13465:
URL: https://github.com/apache/kafka/pull/13465#issuecomment-1562958860

   Ah, spoke too soon! Looks like there are some CI test failures. Can you look 
into these?
   
   
org.apache.kafka.connect.runtime.WorkerTest.testAlterOffsetsSourceConnectorError:
   
   > Wanted but not invoked:
   connectorOffsetBackingStore.stop();
   -> at 
org.apache.kafka.connect.storage.ConnectorOffsetBackingStore.stop(ConnectorOffsetBackingStore.java:188)
   However, there was exactly 1 interaction with this mock:
   connectorOffsetBackingStore.start();
   -> at 
org.apache.kafka.connect.runtime.Worker.lambda$alterSourceConnectorOffsets$16(Worker.java:1438)
   at 
app//org.apache.kafka.connect.storage.ConnectorOffsetBackingStore.stop(ConnectorOffsetBackingStore.java:188)
   at 
app//org.apache.kafka.connect.runtime.WorkerTest.testAlterOffsetsSourceConnectorError(WorkerTest.java:2000)
   
   
   
org.apache.kafka.connect.integration.OffsetsApiIntegrationTest.testAlterSinkConnectorOffsetsDifferentKafkaClusterTargeted:
   
   > org.opentest4j.AssertionFailedError: Condition not met within timeout 
15000. Sink connector consumer group offsets should catch up to the topic end 
offsets ==> expected:  but was: 
at 
org.junit.jupiter.api.AssertionFailureBuilder.build(AssertionFailureBuilder.java:151)
at 
org.junit.jupiter.api.AssertionFailureBuilder.buildAndThrow(AssertionFailureBuilder.java:132)
at org.junit.jupiter.api.AssertTrue.failNotTrue(AssertTrue.java:63)
at org.junit.jupiter.api.AssertTrue.assertTrue(AssertTrue.java:36)
at org.junit.jupiter.api.Assertions.assertTrue(Assertions.java:211)
at 
org.apache.kafka.test.TestUtils.lambda$waitForCondition$3(TestUtils.java:331)
at 
org.apache.kafka.test.TestUtils.retryOnExceptionWithTimeout(TestUtils.java:379)
at 
org.apache.kafka.test.TestUtils.waitForCondition(TestUtils.java:328)
at 
org.apache.kafka.test.TestUtils.waitForCondition(TestUtils.java:312)
at 
org.apache.kafka.test.TestUtils.waitForCondition(TestUtils.java:285)
at 
org.apache.kafka.connect.integration.OffsetsApiIntegrationTest.waitForExpectedSinkConnectorOffsets(OffsetsApiIntegrationTest.java:602)
at 
org.apache.kafka.connect.integration.OffsetsApiIntegrationTest.alterAndVerifySinkConnectorOffsets(OffsetsApiIntegrationTest.java:312)
at 
org.apache.kafka.connect.integration.OffsetsApiIntegrationTest.testAlterSinkConnectorOffsetsDifferentKafkaClusterTargeted(OffsetsApiIntegrationTest.java:292)


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] yashmayya commented on pull request #13465: KAFKA-14368: Connect offset write REST API

2023-05-25 Thread via GitHub


yashmayya commented on PR #13465:
URL: https://github.com/apache/kafka/pull/13465#issuecomment-1562952794

   I just noticed that 
`testAlterSinkConnectorOffsetsDifferentKafkaClusterTargeted` is failing in the 
CI run. There's also https://issues.apache.org/jira/browse/KAFKA-14956 where 
`testGetSinkConnectorOffsetsDifferentKafkaClusterTargeted ` has been failing on 
CI for a while. Interestingly, neither of them have failed for me locally in 
over a 100 runs under various loads. The point at which both of them are 
failing is also interesting:
   
   1. We create an embedded Connect cluster with its own embedded backing Kafka 
cluster
   2. We create a second embedded Kafka cluster
   3. We configure a sink connector in the embedded Connect cluster which 
consumes from a topic on the second embedded Kafka cluster
   4. We produce 10 messages each to 5 different partitions of a topic on the 
second Kafka cluster (which the connector is configured to consume from)
   5. We use the offsets read REST API to get the consumer group offsets for 
the sink connector and wait until it "catches up" to the expected offsets. This 
operation is retried up to 15 seconds and if the consumer group offsets 
(obtained via an admin client in the worker) don't match the expected offsets, 
the test fails. 
   
   Both the tests are failing at this point. Since they consistently pass 
locally, it doesn't seem to be a correctness issue with connectors that target 
different Kafka clusters. I'm wondering if we need to up the timeout although 
15 seconds should be enough to consume just 50 messages 😕 


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] C0urante commented on a diff in pull request #13465: KAFKA-14368: Connect offset write REST API

2023-05-25 Thread via GitHub


C0urante commented on code in PR #13465:
URL: https://github.com/apache/kafka/pull/13465#discussion_r1205535710


##
connect/runtime/src/test/java/org/apache/kafka/connect/runtime/distributed/DistributedHerderTest.java:
##
@@ -4123,6 +4136,280 @@ public void testConnectorOffsets() throws Exception {
 PowerMock.verifyAll();
 }
 
+@Test
+public void testAlterOffsetsConnectorNotFound() throws Exception {
+// Get the initial assignment
+EasyMock.expect(member.memberId()).andStubReturn("leader");
+
EasyMock.expect(member.currentProtocolVersion()).andStubReturn(CONNECT_PROTOCOL_V0);
+expectRebalance(1, Collections.emptyList(), Collections.emptyList(), 
true);
+expectConfigRefreshAndSnapshot(SNAPSHOT);
+member.poll(EasyMock.anyInt());
+PowerMock.expectLastCall();
+
+// Now handle the alter connector offsets request
+member.wakeup();
+PowerMock.expectLastCall();
+member.ensureActive();
+PowerMock.expectLastCall();
+expectConfigRefreshAndSnapshot(SNAPSHOT);
+member.poll(EasyMock.anyInt());
+PowerMock.expectLastCall();
+
+PowerMock.replayAll();
+
+herder.tick();
+FutureCallback callback = new FutureCallback<>();
+herder.alterConnectorOffsets("connector-does-not-exist", new 
HashMap<>(), callback);
+herder.tick();
+ExecutionException e = assertThrows(ExecutionException.class, () -> 
callback.get(1000L, TimeUnit.MILLISECONDS));
+assertTrue(e.getCause() instanceof NotFoundException);
+
+PowerMock.verifyAll();
+}
+
+@Test
+public void testAlterOffsetsConnectorNotInStoppedState() throws Exception {
+// Get the initial assignment
+EasyMock.expect(member.memberId()).andStubReturn("leader");
+
EasyMock.expect(member.currentProtocolVersion()).andStubReturn(CONNECT_PROTOCOL_V0);
+expectRebalance(1, Collections.emptyList(), Collections.emptyList(), 
true);
+expectConfigRefreshAndSnapshot(SNAPSHOT);
+member.poll(EasyMock.anyInt());
+PowerMock.expectLastCall();
+
+// Now handle the alter connector offsets request
+member.wakeup();
+PowerMock.expectLastCall();
+member.ensureActive();
+PowerMock.expectLastCall();
+expectConfigRefreshAndSnapshot(SNAPSHOT);
+member.poll(EasyMock.anyInt());
+PowerMock.expectLastCall();
+
+PowerMock.replayAll();
+
+herder.tick();
+FutureCallback callback = new FutureCallback<>();
+herder.alterConnectorOffsets(CONN1, new HashMap<>(), callback);
+herder.tick();
+ExecutionException e = assertThrows(ExecutionException.class, () -> 
callback.get(1000L, TimeUnit.MILLISECONDS));
+assertTrue(e.getCause() instanceof BadRequestException);
+
+PowerMock.verifyAll();
+}
+
+@Test
+public void testAlterOffsetsNotLeader() throws Exception {
+// Get the initial assignment
+EasyMock.expect(member.memberId()).andStubReturn("member");
+
EasyMock.expect(member.currentProtocolVersion()).andStubReturn(CONNECT_PROTOCOL_V0);
+expectRebalance(1, Collections.emptyList(), Collections.emptyList(), 
false);
+expectConfigRefreshAndSnapshot(SNAPSHOT_STOPPED_CONN1);
+member.poll(EasyMock.anyInt());
+PowerMock.expectLastCall();
+
+// Now handle the alter connector offsets request
+member.wakeup();
+PowerMock.expectLastCall();
+member.ensureActive();
+member.poll(EasyMock.anyInt());
+PowerMock.expectLastCall();
+
+PowerMock.replayAll();
+
+herder.tick();
+FutureCallback callback = new FutureCallback<>();
+herder.alterConnectorOffsets(CONN1, new HashMap<>(), callback);
+herder.tick();
+ExecutionException e = assertThrows(ExecutionException.class, () -> 
callback.get(1000L, TimeUnit.MILLISECONDS));
+assertTrue(e.getCause() instanceof NotLeaderException);
+
+PowerMock.verifyAll();
+}
+
+@Test
+public void testAlterOffsetsSourceConnectorExactlyOnceDisabled() throws 
Exception {
+// Get the initial assignment
+EasyMock.expect(member.memberId()).andStubReturn("leader");
+
EasyMock.expect(member.currentProtocolVersion()).andStubReturn(CONNECT_PROTOCOL_V0);
+expectRebalance(1, Collections.emptyList(), Collections.emptyList(), 
true);
+expectConfigRefreshAndSnapshot(SNAPSHOT_STOPPED_CONN1);
+member.poll(EasyMock.anyInt());
+PowerMock.expectLastCall();
+
+// Now handle the alter connector offsets request
+Map, Map> offsets = new HashMap<>();
+member.wakeup();
+PowerMock.expectLastCall();
+member.ensureActive();
+PowerMock.expectLastCall();
+member.poll(EasyMock.anyInt());
+PowerMock.expectLastCall();
+expectConfigRefreshA

[GitHub] [kafka] C0urante commented on a diff in pull request #13465: KAFKA-14368: Connect offset write REST API

2023-05-25 Thread via GitHub


C0urante commented on code in PR #13465:
URL: https://github.com/apache/kafka/pull/13465#discussion_r1205533269


##
connect/runtime/src/test/java/org/apache/kafka/connect/runtime/distributed/DistributedHerderTest.java:
##
@@ -4123,6 +4136,280 @@ public void testConnectorOffsets() throws Exception {
 PowerMock.verifyAll();
 }
 
+@Test
+public void testAlterOffsetsConnectorNotFound() throws Exception {
+// Get the initial assignment
+EasyMock.expect(member.memberId()).andStubReturn("leader");
+
EasyMock.expect(member.currentProtocolVersion()).andStubReturn(CONNECT_PROTOCOL_V0);
+expectRebalance(1, Collections.emptyList(), Collections.emptyList(), 
true);
+expectConfigRefreshAndSnapshot(SNAPSHOT);
+member.poll(EasyMock.anyInt());
+PowerMock.expectLastCall();
+
+// Now handle the alter connector offsets request
+member.wakeup();
+PowerMock.expectLastCall();
+member.ensureActive();
+PowerMock.expectLastCall();
+expectConfigRefreshAndSnapshot(SNAPSHOT);
+member.poll(EasyMock.anyInt());
+PowerMock.expectLastCall();
+
+PowerMock.replayAll();
+
+herder.tick();
+FutureCallback callback = new FutureCallback<>();
+herder.alterConnectorOffsets("connector-does-not-exist", new 
HashMap<>(), callback);
+herder.tick();
+ExecutionException e = assertThrows(ExecutionException.class, () -> 
callback.get(1000L, TimeUnit.MILLISECONDS));
+assertTrue(e.getCause() instanceof NotFoundException);
+
+PowerMock.verifyAll();
+}
+
+@Test
+public void testAlterOffsetsConnectorNotInStoppedState() throws Exception {
+// Get the initial assignment
+EasyMock.expect(member.memberId()).andStubReturn("leader");
+
EasyMock.expect(member.currentProtocolVersion()).andStubReturn(CONNECT_PROTOCOL_V0);
+expectRebalance(1, Collections.emptyList(), Collections.emptyList(), 
true);
+expectConfigRefreshAndSnapshot(SNAPSHOT);
+member.poll(EasyMock.anyInt());
+PowerMock.expectLastCall();
+
+// Now handle the alter connector offsets request
+member.wakeup();
+PowerMock.expectLastCall();
+member.ensureActive();
+PowerMock.expectLastCall();
+expectConfigRefreshAndSnapshot(SNAPSHOT);
+member.poll(EasyMock.anyInt());
+PowerMock.expectLastCall();
+
+PowerMock.replayAll();
+
+herder.tick();
+FutureCallback callback = new FutureCallback<>();
+herder.alterConnectorOffsets(CONN1, new HashMap<>(), callback);
+herder.tick();
+ExecutionException e = assertThrows(ExecutionException.class, () -> 
callback.get(1000L, TimeUnit.MILLISECONDS));
+assertTrue(e.getCause() instanceof BadRequestException);
+
+PowerMock.verifyAll();
+}
+
+@Test
+public void testAlterOffsetsNotLeader() throws Exception {
+// Get the initial assignment
+EasyMock.expect(member.memberId()).andStubReturn("member");
+
EasyMock.expect(member.currentProtocolVersion()).andStubReturn(CONNECT_PROTOCOL_V0);
+expectRebalance(1, Collections.emptyList(), Collections.emptyList(), 
false);
+expectConfigRefreshAndSnapshot(SNAPSHOT_STOPPED_CONN1);
+member.poll(EasyMock.anyInt());
+PowerMock.expectLastCall();
+
+// Now handle the alter connector offsets request
+member.wakeup();
+PowerMock.expectLastCall();
+member.ensureActive();

Review Comment:
   Huh, TIL!



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] C0urante commented on pull request #13646: KAFKA-14938: Fixing flaky test testConnectorBoundary

2023-05-25 Thread via GitHub


C0urante commented on PR #13646:
URL: https://github.com/apache/kafka/pull/13646#issuecomment-1562913728

   @sambhav-jain-16 thanks for looking into this. I'm prioritizing a few other 
KIP-related PRs at the moment but will try to make time for this sometime in 
the next couple of weeks.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] clolov commented on pull request #13721: KAFKA-14782: Implementation Details Different from Documentation (del…

2023-05-25 Thread via GitHub


clolov commented on PR #13721:
URL: https://github.com/apache/kafka/pull/13721#issuecomment-1562897922

   Heya @WhitEyesX and thank you for the contribution! On a first look this 
does appear to be an omission - I went through the KIP, related PRs and 
subsequent commits to that area of the code and I haven't found a discussion or 
a comment which mentions that retry.backoff.ms is no longer considered as part 
of that equation. However, quite a lot of people reviewed the PR when it was 
merged, which leads me to suspect I am lacking a piece of information which is 
taken into account somewhere else in the code.
   
   Tagging @ijuma and @hachikuji as some of the original PRs reviewers.
   Tagging @jolshan and @artemlivshits as contributors who have greater 
knowledge than me when it comes to the producer code paths.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] urbandan commented on a diff in pull request #13591: KAFKA-14831: Illegal state errors should be fatal in transactional producer

2023-05-25 Thread via GitHub


urbandan commented on code in PR #13591:
URL: https://github.com/apache/kafka/pull/13591#discussion_r1205410637


##
clients/src/main/java/org/apache/kafka/clients/producer/internals/TransactionManager.java:
##
@@ -609,14 +686,15 @@ public synchronized void 
handleCompletedBatch(ProducerBatch batch, ProduceRespon
 }
 
 public synchronized void transitionToUninitialized(RuntimeException 
exception) {
-transitionTo(State.UNINITIALIZED);
+transitionTo(State.UNINITIALIZED, exception, 
InvalidStateDetectionStrategy.BACKGROUND);

Review Comment:
   What will happen with the exception we pass to transitionTo here?
   My understanding is that transitionTo only cares about the exception if the 
state is one of the error states.



##
clients/src/main/java/org/apache/kafka/clients/producer/internals/TransactionManager.java:
##
@@ -609,14 +686,15 @@ public synchronized void 
handleCompletedBatch(ProducerBatch batch, ProduceRespon
 }
 
 public synchronized void transitionToUninitialized(RuntimeException 
exception) {
-transitionTo(State.UNINITIALIZED);
+transitionTo(State.UNINITIALIZED, exception, 
InvalidStateDetectionStrategy.BACKGROUND);
 if (pendingTransition != null) {
 pendingTransition.result.fail(exception);
 }
 lastError = null;
 }
 
-public synchronized void maybeTransitionToErrorState(RuntimeException 
exception) {
+public synchronized void maybeTransitionToErrorState(RuntimeException 
exception,

Review Comment:
   Just to double check: the transitionToFatalError call on line 702 does not 
use the invalidStateDetectionStrategy param because all of the exceptions in 
the condition come from a server response, thus making it a BACKGROUND type 
call?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] showuon commented on a diff in pull request #13561: KAFKA-14888: Added remote log segments retention functionality based on time and size.

2023-05-25 Thread via GitHub


showuon commented on code in PR #13561:
URL: https://github.com/apache/kafka/pull/13561#discussion_r1205425110


##
core/src/main/java/kafka/log/remote/RemoteLogManager.java:
##
@@ -618,6 +629,193 @@ public void run() {
 }
 }
 
+public void handleLogStartOffsetUpdate(TopicPartition topicPartition, 
long remoteLogStartOffset) {
+logger.debug("Updating $topicPartition with remoteLogStartOffset: 
{}", remoteLogStartOffset);

Review Comment:
   debug("Update {} with remoteLogStartOffset: {}", topicPartition, 
remoteLogStartOffset)



##
core/src/main/java/kafka/log/remote/RemoteLogManager.java:
##
@@ -618,6 +629,193 @@ public void run() {
 }
 }
 
+public void handleLogStartOffsetUpdate(TopicPartition topicPartition, 
long remoteLogStartOffset) {
+logger.debug("Updating $topicPartition with remoteLogStartOffset: 
{}", remoteLogStartOffset);
+updateRemoteLogStartOffset.accept(topicPartition, 
remoteLogStartOffset);
+}
+
+class RemoteLogRetentionHandler {
+
+private long remainingBreachedSize = 0L;
+private OptionalLong logStartOffset = OptionalLong.empty();
+
+public RemoteLogRetentionHandler(long remainingBreachedSize) {
+this.remainingBreachedSize = remainingBreachedSize;
+}
+
+private boolean 
deleteRetentionSizeBreachedSegments(RemoteLogSegmentMetadata metadata, boolean 
checkSizeRetention) throws RemoteStorageException, ExecutionException, 
InterruptedException {
+boolean isSegmentDeleted = deleteRemoteLogSegment(metadata, x 
-> {
+// Assumption that segments contain size > 0
+if (checkSizeRetention && remainingBreachedSize > 0) {
+remainingBreachedSize -= x.segmentSizeInBytes();
+return remainingBreachedSize >= 0;
+} else return false;
+});
+if (isSegmentDeleted) {
+logStartOffset = OptionalLong.of(metadata.endOffset() + 1);
+logger.info("Deleted remote log segment 
${metadata.remoteLogSegmentId()} due to retention size " +
+"${log.config.retentionSize} breach. Log size 
after deletion will be " +
+"${remainingBreachedSize + 
log.config.retentionSize}.");
+}
+return isSegmentDeleted;
+}
+
+private boolean 
deleteLogStartOffsetBreachedSegments(RemoteLogSegmentMetadata metadata, long 
startOffset)
+throws RemoteStorageException, ExecutionException, 
InterruptedException {
+boolean isSegmentDeleted = deleteRemoteLogSegment(metadata, x 
-> startOffset > x.endOffset());
+if (isSegmentDeleted) {
+remainingBreachedSize = Math.max(0, remainingBreachedSize 
- metadata.segmentSizeInBytes());
+logger.info("Deleted remote log segment {} due to log 
start offset {} breach", metadata.remoteLogSegmentId(), startOffset);
+}
+
+// No need to update the logStartOffset.
+return isSegmentDeleted;
+}
+
+// There are two cases:
+// 1) When there are offline partitions and a new replica with 
empty disk is brought as leader, then the
+//leader-epoch gets bumped but the log-start-offset gets 
truncated back to 0.
+// 2) To remove the unreferenced segments.
+private boolean 
deleteLogSegmentsDueToLeaderEpochCacheTruncation(EpochEntry earliestEpochEntry, 
RemoteLogSegmentMetadata metadata) throws RemoteStorageException, 
ExecutionException, InterruptedException {
+boolean isSegmentDeleted = deleteRemoteLogSegment(metadata, x 
->
+
x.segmentLeaderEpochs().keySet().stream().allMatch(epoch -> epoch < 
earliestEpochEntry.epoch));
+if (isSegmentDeleted) {
+logger.info("Deleted remote log segment ${} due to leader 
epoch cache truncation. Current earliest epoch: {}, segmentEndOffset: {} and 
segmentEpochs: {}",

Review Comment:
   additional `$` sign: remote log segment [$]{} ...



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] fvaleri commented on pull request #13738: KAFKA-14982: Improve the kafka-metadata-quorum output

2023-05-25 Thread via GitHub


fvaleri commented on PR #13738:
URL: https://github.com/apache/kafka/pull/13738#issuecomment-1562817801

   @showuon all your comments should be addressed with the latest commit. 
   
   I removed the `-hr` abbreviation and updated the KIP because argparse4j 
supports them by default. For example you can do the following:
   
   ```sh
   $ bin/kafka-metadata-quorum.sh --bootstrap-server :9092 describe --re --hu
   NodeId   LogEndOffsetLag LastFetchTimestamp  
LastCaughtUpTimestamp   Status  
   220320   3 ms ago3 ms ago
Leader  
   320320   207 ms ago  207 ms ago  
Follower
   420320   207 ms ago  207 ms ago  
Follower
   ```


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] clolov commented on pull request #13421: KAFKA-14824: ReplicaAlterLogDirsThread may cause serious disk growing in case of potential exception

2023-05-25 Thread via GitHub


clolov commented on PR #13421:
URL: https://github.com/apache/kafka/pull/13421#issuecomment-1562816088

   Heya @hudeqi, could you give more detailed explanation on what the problem 
you are trying to solve here because I do not understand? May I suggest you 
distinguish between a partition replica and a partition future replica in some 
way, because otherwise it is quite difficult to understand which replica you 
are referring to when it is just called "partition"?
   
   Let's say we have an original replica in log directory (backed by one disk) 
A, and let's say we have a future replica in log directory (backed by another 
disk) B on the same broker. I have confirmed that it is the case that 
**compaction** is paused on A when B is first created 
(https://github.com/apache/kafka/blob/trunk/core/src/main/scala/kafka/server/ReplicaManager.scala#L910).
 It is only resumed 
(https://github.com/apache/kafka/blob/trunk/core/src/main/scala/kafka/log/LogManager.scala#L1099).
 I can think of two situations happening now.
   
   1. A fails and B doesn't know what to do.
   In this situation, if A has failed then cleaning should not be resumed on A 
- an operator intervention is required to understand what went wrong with the 
log directory. Cleaning should not be started on B either. Since A has failed 
the amount of data that B has does not grow because it doesn't have a source to 
keep copying from.
   
   2. B fails and A doesn't know what to do.
   In this situation B's size cannot grow because it is no longer copying from 
A. A should resume cleaning as for all intents and purposes it acts as a normal 
replica to a topic partition.
   
   As far as I understand what you try to do is solve situation number 2 - am I 
correct? If I am correct, what is the reasoning behind marking A as a failed 
partition?


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] fvaleri commented on a diff in pull request #13738: KAFKA-14982: Improve the kafka-metadata-quorum output

2023-05-25 Thread via GitHub


fvaleri commented on code in PR #13738:
URL: https://github.com/apache/kafka/pull/13738#discussion_r1205438962


##
tools/src/main/java/org/apache/kafka/tools/MetadataQuorumCommand.java:
##
@@ -155,17 +169,35 @@ private static void handleDescribeReplication(Admin 
admin) throws ExecutionExcep
 );
 }
 
-private static List> quorumInfoToRows(QuorumInfo.ReplicaState 
leader, Stream infos, String status) {
-return infos.map(info ->
-Stream.of(
+private static List> quorumInfoToRows(QuorumInfo.ReplicaState 
leader,
+   
Stream infos,
+   String status,
+   boolean humanReadable) {
+return infos.map(info -> {
+String lastFetchTimestamp = !info.lastFetchTimestamp().isPresent() 
? "-1" :
+humanReadable ? format("%d ms ago", 
durationMs(info.lastFetchTimestamp().getAsLong())) :
+valueOf(info.lastFetchTimestamp().getAsLong());
+String lastCaughtUpTimestamp = 
!info.lastCaughtUpTimestamp().isPresent() ? "-1" :
+humanReadable ? format("%d ms ago", 
durationMs(info.lastCaughtUpTimestamp().getAsLong())) :
+valueOf(info.lastCaughtUpTimestamp().getAsLong());
+return Stream.of(
 info.replicaId(),
 info.logEndOffset(),
 leader.logEndOffset() - info.logEndOffset(),
-info.lastFetchTimestamp().orElse(-1),
-info.lastCaughtUpTimestamp().orElse(-1),
+lastFetchTimestamp,
+lastCaughtUpTimestamp,
 status
-).map(r -> r.toString()).collect(Collectors.toList())
-).collect(Collectors.toList());
+).map(r -> r.toString()).collect(Collectors.toList());
+}).collect(Collectors.toList());
+}
+
+private static long durationMs(long timestampMs) {

Review Comment:
   Done.



##
tools/src/main/java/org/apache/kafka/tools/MetadataQuorumCommand.java:
##
@@ -131,22 +140,27 @@ private static void addDescribeParser(Subparsers 
subparsers) {
 .addArgument("--status")
 .help("A short summary of the quorum status and the other provides 
detailed information about the status of replication.")
 .action(Arguments.storeTrue());
+
 ArgumentGroup replicationArgs = 
describeParser.addArgumentGroup("Replication");
 replicationArgs
 .addArgument("--replication")
 .help("Detailed information about the status of replication")
 .action(Arguments.storeTrue());
+replicationArgs
+.addArgument("-hr", "--human-readable")
+.help("Print human-readable timestamps")

Review Comment:
   What about "Human-readable output"? It's short, clear and allows reuse. In 
the future and even in other tools, we could use this flag to make some fields 
readable or filter out less important details that make the output hard to read.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] fvaleri commented on a diff in pull request #13738: KAFKA-14982: Improve the kafka-metadata-quorum output

2023-05-25 Thread via GitHub


fvaleri commented on code in PR #13738:
URL: https://github.com/apache/kafka/pull/13738#discussion_r1205438624


##
tools/src/main/java/org/apache/kafka/tools/MetadataQuorumCommand.java:
##
@@ -155,17 +169,35 @@ private static void handleDescribeReplication(Admin 
admin) throws ExecutionExcep
 );
 }
 
-private static List> quorumInfoToRows(QuorumInfo.ReplicaState 
leader, Stream infos, String status) {
-return infos.map(info ->
-Stream.of(
+private static List> quorumInfoToRows(QuorumInfo.ReplicaState 
leader,
+   
Stream infos,
+   String status,
+   boolean humanReadable) {
+return infos.map(info -> {
+String lastFetchTimestamp = !info.lastFetchTimestamp().isPresent() 
? "-1" :
+humanReadable ? format("%d ms ago", 
durationMs(info.lastFetchTimestamp().getAsLong())) :
+valueOf(info.lastFetchTimestamp().getAsLong());
+String lastCaughtUpTimestamp = 
!info.lastCaughtUpTimestamp().isPresent() ? "-1" :
+humanReadable ? format("%d ms ago", 
durationMs(info.lastCaughtUpTimestamp().getAsLong())) :
+valueOf(info.lastCaughtUpTimestamp().getAsLong());
+return Stream.of(
 info.replicaId(),
 info.logEndOffset(),
 leader.logEndOffset() - info.logEndOffset(),
-info.lastFetchTimestamp().orElse(-1),
-info.lastCaughtUpTimestamp().orElse(-1),
+lastFetchTimestamp,
+lastCaughtUpTimestamp,
 status
-).map(r -> r.toString()).collect(Collectors.toList())
-).collect(Collectors.toList());
+).map(r -> r.toString()).collect(Collectors.toList());
+}).collect(Collectors.toList());
+}
+
+private static long durationMs(long timestampMs) {
+Instant instant = Instant.ofEpochMilli(timestampMs);

Review Comment:
   Renamed to lastTimestamp.



##
tools/src/main/java/org/apache/kafka/tools/MetadataQuorumCommand.java:
##
@@ -155,17 +169,35 @@ private static void handleDescribeReplication(Admin 
admin) throws ExecutionExcep
 );
 }
 
-private static List> quorumInfoToRows(QuorumInfo.ReplicaState 
leader, Stream infos, String status) {
-return infos.map(info ->
-Stream.of(
+private static List> quorumInfoToRows(QuorumInfo.ReplicaState 
leader,
+   
Stream infos,
+   String status,
+   boolean humanReadable) {
+return infos.map(info -> {
+String lastFetchTimestamp = !info.lastFetchTimestamp().isPresent() 
? "-1" :
+humanReadable ? format("%d ms ago", 
durationMs(info.lastFetchTimestamp().getAsLong())) :
+valueOf(info.lastFetchTimestamp().getAsLong());
+String lastCaughtUpTimestamp = 
!info.lastCaughtUpTimestamp().isPresent() ? "-1" :
+humanReadable ? format("%d ms ago", 
durationMs(info.lastCaughtUpTimestamp().getAsLong())) :
+valueOf(info.lastCaughtUpTimestamp().getAsLong());
+return Stream.of(
 info.replicaId(),
 info.logEndOffset(),
 leader.logEndOffset() - info.logEndOffset(),
-info.lastFetchTimestamp().orElse(-1),
-info.lastCaughtUpTimestamp().orElse(-1),
+lastFetchTimestamp,
+lastCaughtUpTimestamp,
 status
-).map(r -> r.toString()).collect(Collectors.toList())
-).collect(Collectors.toList());
+).map(r -> r.toString()).collect(Collectors.toList());
+}).collect(Collectors.toList());
+}
+
+private static long durationMs(long timestampMs) {
+Instant instant = Instant.ofEpochMilli(timestampMs);
+Instant now = Instant.now();
+if (!(instant.isAfter(Instant.EPOCH) && instant.isBefore(now))) {
+throw new KafkaException("Invalid timestamp, possible drift in 
system clock");

Review Comment:
   Sure, I reworked that code to include more details.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] krespo commented on pull request #13748: [BUGFIX] Bugfixed in KAFKA-8713, but it doesn't work properly.

2023-05-25 Thread via GitHub


krespo commented on PR #13748:
URL: https://github.com/apache/kafka/pull/13748#issuecomment-1562728086

   @gharris1727 
   First of all thank you for your feedback.
   As you said, I know that feature is only available in kafka version 3.5, 
which hasn't been released yet.
   
   The open source versions I'm using are as follows:
   ```
   debezium: 2.1
   kafka connect: 2.7.1
   mysql: 8
   ```
   
   When the column in mysql is "nullable" and "default value" is set, we 
founded that the message that the kafka connector sends through the 
JsonConverter is always the "default value", even though the column is updated 
to "null".
   
   So I checked the contents of "KAFKA-8713", and after building the source 
code of Kafka 3.5 version and creating "connect-json-3.5.0.jar", the 
"${kafka_connect_home} /libs" in the connect-json library was replaced with a 
new version.
   
   Afterwards, I set the "replace.null.with.default" setting to "false", but 
found that the same problem still occurs.
   And through debugging, it was found that to solve the "KAFKA-8713" issue, 
not only the connect-json module but also the connect-api module had to be 
modified.
   So I made a pull request for my modified code.
   Currently, connect-json.jar and connect-api.jar have been replaced with 
version 3.5, and they are working properly.
   
   Please check the code I modified once again. thank you


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[jira] [Commented] (KAFKA-15020) integration.kafka.server.FetchFromFollowerIntegrationTest.testRackAwareRangeAssignor test is flaky

2023-05-25 Thread Atul Sharma (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-15020?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17726147#comment-17726147
 ] 

Atul Sharma commented on KAFKA-15020:
-

https://ci-builds.apache.org/job/Kafka/job/kafka-pr/job/PR-13753/2/testReport/junit/integration.kafka.server/FetchFromFollowerIntegrationTest/Build___JDK_8_and_Scala_2_12___testRackAwareRangeAssignor__/

> integration.kafka.server.FetchFromFollowerIntegrationTest.testRackAwareRangeAssignor
>  test is flaky
> --
>
> Key: KAFKA-15020
> URL: https://issues.apache.org/jira/browse/KAFKA-15020
> Project: Kafka
>  Issue Type: Test
>Reporter: Atul Sharma
>Priority: Major
>
> Sometimes the test fails with the following log:
> {code:java}
> Gradle Test Run :core:integrationTest > Gradle Test Executor 175 > 
> FetchFromFollowerIntegrationTest > testRackAwareRangeAssignor() FAILED
> org.opentest4j.AssertionFailedError: Consumed 0 records before timeout 
> instead of the expected 2 records
> at org.junit.jupiter.api.AssertionUtils.fail(AssertionUtils.java:38)
> at org.junit.jupiter.api.Assertions.fail(Assertions.java:135)
> at kafka.utils.TestUtils$.pollUntilAtLeastNumRecords(TestUtils.scala:1087)
> at 
> integration.kafka.server.FetchFromFollowerIntegrationTest.$anonfun$testRackAwareRangeAssignor$11(FetchFromFollowerIntegrationTest.scala:216)
> at 
> integration.kafka.server.FetchFromFollowerIntegrationTest.$anonfun$testRackAwareRangeAssignor$11$adapted(FetchFromFollowerIntegrationTest.scala:215)
> at scala.collection.mutable.ResizableArray.foreach(ResizableArray.scala:62)
> at scala.collection.mutable.ResizableArray.foreach$(ResizableArray.scala:55)
> at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:49)
> at 
> integration.kafka.server.FetchFromFollowerIntegrationTest.verifyAssignments$1(FetchFromFollowerIntegrationTest.scala:215)
>  at 
> integration.kafka.server.FetchFromFollowerIntegrationTest.testRackAwareRangeAssignor(FetchFromFollowerIntegrationTest.scala:244)
> {code}



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] (KAFKA-14984) DynamicBrokerReconfigurationTest.testThreadPoolResize() test is flaky

2023-05-25 Thread Atul Sharma (Jira)


[ https://issues.apache.org/jira/browse/KAFKA-14984 ]


Atul Sharma deleted comment on KAFKA-14984:
-

was (Author: JIRAUSER299965):
https://ci-builds.apache.org/job/Kafka/job/kafka-pr/job/PR-13753/2/testReport/junit/integration.kafka.server/FetchFromFollowerIntegrationTest/Build___JDK_8_and_Scala_2_12___testRackAwareRangeAssignor__/

> DynamicBrokerReconfigurationTest.testThreadPoolResize() test is flaky 
> --
>
> Key: KAFKA-14984
> URL: https://issues.apache.org/jira/browse/KAFKA-14984
> Project: Kafka
>  Issue Type: Test
>Reporter: Manyanda Chitimbo
>Priority: Major
>
> The test sometimes fails with the below log 
> {code:java}
> kafka.server.DynamicBrokerReconfigurationTest.testThreadPoolResize() failed, 
> log available in 
> .../core/build/reports/testOutput/kafka.server.DynamicBrokerReconfigurationTest.testThreadPoolResize().test.stdoutGradle
>  Test Run :core:test > Gradle Test Executor 6 > 
> DynamicBrokerReconfigurationTest > testThreadPoolResize() FAILED
>     org.opentest4j.AssertionFailedError: Invalid threads: expected 6, got 8: 
> List(data-plane-kafka-socket-acceptor-ListenerName(PLAINTEXT)-PLAINTEXT-0, 
> data-plane-kafka-socket-acceptor-ListenerName(PLAINTEXT)-PLAINTEXT-0, 
> data-plane-kafka-socket-acceptor-ListenerName(INTERNAL)-SSL-0, 
> data-plane-kafka-socket-acceptor-ListenerName(EXTERNAL)-SASL_SSL-0, 
> data-plane-kafka-socket-acceptor-ListenerName(INTERNAL)-SSL-0, 
> data-plane-kafka-socket-acceptor-ListenerName(INTERNAL)-SSL-0, 
> data-plane-kafka-socket-acceptor-ListenerName(EXTERNAL)-SASL_SSL-0, 
> data-plane-kafka-socket-acceptor-ListenerName(EXTERNAL)-SASL_SSL-0) ==> 
> expected:  but was: 
>         at 
> app//org.junit.jupiter.api.AssertionFailureBuilder.build(AssertionFailureBuilder.java:151)
>         at 
> app//org.junit.jupiter.api.AssertionFailureBuilder.buildAndThrow(AssertionFailureBuilder.java:132)
>         at 
> app//org.junit.jupiter.api.AssertTrue.failNotTrue(AssertTrue.java:63)
>         at 
> app//org.junit.jupiter.api.AssertTrue.assertTrue(AssertTrue.java:36)
>         at 
> app//org.junit.jupiter.api.Assertions.assertTrue(Assertions.java:211)
>         at 
> app//kafka.server.DynamicBrokerReconfigurationTest.verifyThreads(DynamicBrokerReconfigurationTest.scala:1634)
>         at 
> app//kafka.server.DynamicBrokerReconfigurationTest.testThreadPoolResize(DynamicBrokerReconfigurationTest.scala:872)
>  {code}



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Commented] (KAFKA-14984) DynamicBrokerReconfigurationTest.testThreadPoolResize() test is flaky

2023-05-25 Thread Atul Sharma (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-14984?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17726146#comment-17726146
 ] 

Atul Sharma commented on KAFKA-14984:
-

https://ci-builds.apache.org/job/Kafka/job/kafka-pr/job/PR-13753/2/testReport/junit/integration.kafka.server/FetchFromFollowerIntegrationTest/Build___JDK_8_and_Scala_2_12___testRackAwareRangeAssignor__/

> DynamicBrokerReconfigurationTest.testThreadPoolResize() test is flaky 
> --
>
> Key: KAFKA-14984
> URL: https://issues.apache.org/jira/browse/KAFKA-14984
> Project: Kafka
>  Issue Type: Test
>Reporter: Manyanda Chitimbo
>Priority: Major
>
> The test sometimes fails with the below log 
> {code:java}
> kafka.server.DynamicBrokerReconfigurationTest.testThreadPoolResize() failed, 
> log available in 
> .../core/build/reports/testOutput/kafka.server.DynamicBrokerReconfigurationTest.testThreadPoolResize().test.stdoutGradle
>  Test Run :core:test > Gradle Test Executor 6 > 
> DynamicBrokerReconfigurationTest > testThreadPoolResize() FAILED
>     org.opentest4j.AssertionFailedError: Invalid threads: expected 6, got 8: 
> List(data-plane-kafka-socket-acceptor-ListenerName(PLAINTEXT)-PLAINTEXT-0, 
> data-plane-kafka-socket-acceptor-ListenerName(PLAINTEXT)-PLAINTEXT-0, 
> data-plane-kafka-socket-acceptor-ListenerName(INTERNAL)-SSL-0, 
> data-plane-kafka-socket-acceptor-ListenerName(EXTERNAL)-SASL_SSL-0, 
> data-plane-kafka-socket-acceptor-ListenerName(INTERNAL)-SSL-0, 
> data-plane-kafka-socket-acceptor-ListenerName(INTERNAL)-SSL-0, 
> data-plane-kafka-socket-acceptor-ListenerName(EXTERNAL)-SASL_SSL-0, 
> data-plane-kafka-socket-acceptor-ListenerName(EXTERNAL)-SASL_SSL-0) ==> 
> expected:  but was: 
>         at 
> app//org.junit.jupiter.api.AssertionFailureBuilder.build(AssertionFailureBuilder.java:151)
>         at 
> app//org.junit.jupiter.api.AssertionFailureBuilder.buildAndThrow(AssertionFailureBuilder.java:132)
>         at 
> app//org.junit.jupiter.api.AssertTrue.failNotTrue(AssertTrue.java:63)
>         at 
> app//org.junit.jupiter.api.AssertTrue.assertTrue(AssertTrue.java:36)
>         at 
> app//org.junit.jupiter.api.Assertions.assertTrue(Assertions.java:211)
>         at 
> app//kafka.server.DynamicBrokerReconfigurationTest.verifyThreads(DynamicBrokerReconfigurationTest.scala:1634)
>         at 
> app//kafka.server.DynamicBrokerReconfigurationTest.testThreadPoolResize(DynamicBrokerReconfigurationTest.scala:872)
>  {code}



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Created] (KAFKA-15020) integration.kafka.server.FetchFromFollowerIntegrationTest.testRackAwareRangeAssignor test is flaky

2023-05-25 Thread Atul Sharma (Jira)
Atul Sharma created KAFKA-15020:
---

 Summary: 
integration.kafka.server.FetchFromFollowerIntegrationTest.testRackAwareRangeAssignor
 test is flaky
 Key: KAFKA-15020
 URL: https://issues.apache.org/jira/browse/KAFKA-15020
 Project: Kafka
  Issue Type: Test
Reporter: Atul Sharma


Sometimes the test fails with the following log:

{code:java}

Gradle Test Run :core:integrationTest > Gradle Test Executor 175 > 
FetchFromFollowerIntegrationTest > testRackAwareRangeAssignor() FAILED
org.opentest4j.AssertionFailedError: Consumed 0 records before timeout instead 
of the expected 2 records
at org.junit.jupiter.api.AssertionUtils.fail(AssertionUtils.java:38)
at org.junit.jupiter.api.Assertions.fail(Assertions.java:135)
at kafka.utils.TestUtils$.pollUntilAtLeastNumRecords(TestUtils.scala:1087)
at 
integration.kafka.server.FetchFromFollowerIntegrationTest.$anonfun$testRackAwareRangeAssignor$11(FetchFromFollowerIntegrationTest.scala:216)
at 
integration.kafka.server.FetchFromFollowerIntegrationTest.$anonfun$testRackAwareRangeAssignor$11$adapted(FetchFromFollowerIntegrationTest.scala:215)
at scala.collection.mutable.ResizableArray.foreach(ResizableArray.scala:62)
at scala.collection.mutable.ResizableArray.foreach$(ResizableArray.scala:55)
at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:49)
at 
integration.kafka.server.FetchFromFollowerIntegrationTest.verifyAssignments$1(FetchFromFollowerIntegrationTest.scala:215)
 at 
integration.kafka.server.FetchFromFollowerIntegrationTest.testRackAwareRangeAssignor(FetchFromFollowerIntegrationTest.scala:244)

{code}





--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[GitHub] [kafka] dajac commented on pull request #13639: KAFKA-14462; [12/N] Add GroupMetadataManager and ConsumerGroup

2023-05-25 Thread via GitHub


dajac commented on PR #13639:
URL: https://github.com/apache/kafka/pull/13639#issuecomment-1562650336

   @jolshan @jeffkbkim Thanks for your comments. I have addressed them, I think.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] showuon opened a new pull request, #13763: MINOR: use debug level to log handleCommit

2023-05-25 Thread via GitHub


showuon opened a new pull request, #13763:
URL: https://github.com/apache/kafka/pull/13763

   In this 
[PR](https://github.com/apache/kafka/pull/13462/files#diff-38a0aec01732a73b638b91b3d88dfb6a074a6e47fac7972d5a8e7dc97c258606R321),
 we improved the log to make it clear, and backported to 3.4 branch. But in the 
[backport 
commit](https://github.com/apache/kafka/pull/13462/files#diff-38a0aec01732a73b638b91b3d88dfb6a074a6e47fac7972d5a8e7dc97c258606R321),
 we accidentally change the debug level to info level, and prefixed as 
`handleSnapshot`, instead of `handleCommit`. Fix it to avoid log flooding.
   
   ### Committer Checklist (excluded from commit message)
   - [ ] Verify design and implementation 
   - [ ] Verify test coverage and CI build status
   - [ ] Verify documentation (including upgrade notes)
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] dajac commented on a diff in pull request #13639: KAFKA-14462; [12/N] Add GroupMetadataManager and ConsumerGroup

2023-05-25 Thread via GitHub


dajac commented on code in PR #13639:
URL: https://github.com/apache/kafka/pull/13639#discussion_r1205278404


##
group-coordinator/src/test/java/org/apache/kafka/coordinator/group/GroupMetadataManagerTest.java:
##
@@ -0,0 +1,2017 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.coordinator.group;
+
+import org.apache.kafka.common.Uuid;
+import org.apache.kafka.common.errors.FencedMemberEpochException;
+import org.apache.kafka.common.errors.GroupIdNotFoundException;
+import org.apache.kafka.common.errors.GroupMaxSizeReachedException;
+import org.apache.kafka.common.errors.InvalidRequestException;
+import org.apache.kafka.common.errors.UnknownMemberIdException;
+import org.apache.kafka.common.errors.UnknownServerException;
+import org.apache.kafka.common.errors.UnsupportedAssignorException;
+import org.apache.kafka.common.message.ConsumerGroupHeartbeatRequestData;
+import org.apache.kafka.common.message.ConsumerGroupHeartbeatResponseData;
+import org.apache.kafka.common.metadata.PartitionRecord;
+import org.apache.kafka.common.metadata.TopicRecord;
+import org.apache.kafka.common.network.ClientInformation;
+import org.apache.kafka.common.network.ListenerName;
+import org.apache.kafka.common.protocol.ApiKeys;
+import org.apache.kafka.common.protocol.ApiMessage;
+import org.apache.kafka.common.requests.RequestContext;
+import org.apache.kafka.common.requests.RequestHeader;
+import org.apache.kafka.common.security.auth.KafkaPrincipal;
+import org.apache.kafka.common.security.auth.SecurityProtocol;
+import org.apache.kafka.common.utils.LogContext;
+import org.apache.kafka.coordinator.group.assignor.AssignmentSpec;
+import org.apache.kafka.coordinator.group.assignor.GroupAssignment;
+import org.apache.kafka.coordinator.group.assignor.PartitionAssignor;
+import org.apache.kafka.coordinator.group.assignor.PartitionAssignorException;
+import org.apache.kafka.coordinator.group.consumer.Assignment;
+import org.apache.kafka.coordinator.group.consumer.ConsumerGroup;
+import org.apache.kafka.coordinator.group.consumer.ConsumerGroupMember;
+import org.apache.kafka.coordinator.group.consumer.TopicMetadata;
+import 
org.apache.kafka.coordinator.group.generated.ConsumerGroupCurrentMemberAssignmentKey;
+import 
org.apache.kafka.coordinator.group.generated.ConsumerGroupCurrentMemberAssignmentValue;
+import 
org.apache.kafka.coordinator.group.generated.ConsumerGroupMemberMetadataKey;
+import 
org.apache.kafka.coordinator.group.generated.ConsumerGroupMemberMetadataValue;
+import org.apache.kafka.coordinator.group.generated.ConsumerGroupMetadataKey;
+import org.apache.kafka.coordinator.group.generated.ConsumerGroupMetadataValue;
+import 
org.apache.kafka.coordinator.group.generated.ConsumerGroupPartitionMetadataKey;
+import 
org.apache.kafka.coordinator.group.generated.ConsumerGroupPartitionMetadataValue;
+import 
org.apache.kafka.coordinator.group.generated.ConsumerGroupTargetAssignmentMemberKey;
+import 
org.apache.kafka.coordinator.group.generated.ConsumerGroupTargetAssignmentMemberValue;
+import 
org.apache.kafka.coordinator.group.generated.ConsumerGroupTargetAssignmentMetadataKey;
+import 
org.apache.kafka.coordinator.group.generated.ConsumerGroupTargetAssignmentMetadataValue;
+import org.apache.kafka.image.TopicImage;
+import org.apache.kafka.image.TopicsDelta;
+import org.apache.kafka.image.TopicsImage;
+import org.apache.kafka.server.common.ApiMessageAndVersion;
+import org.apache.kafka.timeline.SnapshotRegistry;
+import org.junit.jupiter.api.Test;
+
+import java.net.InetAddress;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.Objects;
+import java.util.Set;
+
+import static 
org.apache.kafka.coordinator.group.AssignmentTestUtil.mkAssignment;
+import static 
org.apache.kafka.coordinator.group.AssignmentTestUtil.mkTopicAssignment;
+import static org.junit.jupiter.api.AssertionFailureBuilder.assertionFailure;
+import static org.junit.jupiter.api.Assertions.assertEquals;
+import static org.junit.jupiter.api.Assertions.assertNotEquals;
+import static org.junit.jupiter.api.A

[GitHub] [kafka] dajac commented on a diff in pull request #13639: KAFKA-14462; [12/N] Add GroupMetadataManager and ConsumerGroup

2023-05-25 Thread via GitHub


dajac commented on code in PR #13639:
URL: https://github.com/apache/kafka/pull/13639#discussion_r1205275881


##
group-coordinator/src/test/java/org/apache/kafka/coordinator/group/GroupMetadataManagerTest.java:
##
@@ -0,0 +1,2017 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.coordinator.group;
+
+import org.apache.kafka.common.Uuid;
+import org.apache.kafka.common.errors.FencedMemberEpochException;
+import org.apache.kafka.common.errors.GroupIdNotFoundException;
+import org.apache.kafka.common.errors.GroupMaxSizeReachedException;
+import org.apache.kafka.common.errors.InvalidRequestException;
+import org.apache.kafka.common.errors.UnknownMemberIdException;
+import org.apache.kafka.common.errors.UnknownServerException;
+import org.apache.kafka.common.errors.UnsupportedAssignorException;
+import org.apache.kafka.common.message.ConsumerGroupHeartbeatRequestData;
+import org.apache.kafka.common.message.ConsumerGroupHeartbeatResponseData;
+import org.apache.kafka.common.metadata.PartitionRecord;
+import org.apache.kafka.common.metadata.TopicRecord;
+import org.apache.kafka.common.network.ClientInformation;
+import org.apache.kafka.common.network.ListenerName;
+import org.apache.kafka.common.protocol.ApiKeys;
+import org.apache.kafka.common.protocol.ApiMessage;
+import org.apache.kafka.common.requests.RequestContext;
+import org.apache.kafka.common.requests.RequestHeader;
+import org.apache.kafka.common.security.auth.KafkaPrincipal;
+import org.apache.kafka.common.security.auth.SecurityProtocol;
+import org.apache.kafka.common.utils.LogContext;
+import org.apache.kafka.coordinator.group.assignor.AssignmentSpec;
+import org.apache.kafka.coordinator.group.assignor.GroupAssignment;
+import org.apache.kafka.coordinator.group.assignor.PartitionAssignor;
+import org.apache.kafka.coordinator.group.assignor.PartitionAssignorException;
+import org.apache.kafka.coordinator.group.consumer.Assignment;
+import org.apache.kafka.coordinator.group.consumer.ConsumerGroup;
+import org.apache.kafka.coordinator.group.consumer.ConsumerGroupMember;
+import org.apache.kafka.coordinator.group.consumer.TopicMetadata;
+import 
org.apache.kafka.coordinator.group.generated.ConsumerGroupCurrentMemberAssignmentKey;
+import 
org.apache.kafka.coordinator.group.generated.ConsumerGroupCurrentMemberAssignmentValue;
+import 
org.apache.kafka.coordinator.group.generated.ConsumerGroupMemberMetadataKey;
+import 
org.apache.kafka.coordinator.group.generated.ConsumerGroupMemberMetadataValue;
+import org.apache.kafka.coordinator.group.generated.ConsumerGroupMetadataKey;
+import org.apache.kafka.coordinator.group.generated.ConsumerGroupMetadataValue;
+import 
org.apache.kafka.coordinator.group.generated.ConsumerGroupPartitionMetadataKey;
+import 
org.apache.kafka.coordinator.group.generated.ConsumerGroupPartitionMetadataValue;
+import 
org.apache.kafka.coordinator.group.generated.ConsumerGroupTargetAssignmentMemberKey;
+import 
org.apache.kafka.coordinator.group.generated.ConsumerGroupTargetAssignmentMemberValue;
+import 
org.apache.kafka.coordinator.group.generated.ConsumerGroupTargetAssignmentMetadataKey;
+import 
org.apache.kafka.coordinator.group.generated.ConsumerGroupTargetAssignmentMetadataValue;
+import org.apache.kafka.image.TopicImage;
+import org.apache.kafka.image.TopicsDelta;
+import org.apache.kafka.image.TopicsImage;
+import org.apache.kafka.server.common.ApiMessageAndVersion;
+import org.apache.kafka.timeline.SnapshotRegistry;
+import org.junit.jupiter.api.Test;
+
+import java.net.InetAddress;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.Objects;
+import java.util.Set;
+
+import static 
org.apache.kafka.coordinator.group.AssignmentTestUtil.mkAssignment;
+import static 
org.apache.kafka.coordinator.group.AssignmentTestUtil.mkTopicAssignment;
+import static org.junit.jupiter.api.AssertionFailureBuilder.assertionFailure;
+import static org.junit.jupiter.api.Assertions.assertEquals;
+import static org.junit.jupiter.api.Assertions.assertNotEquals;
+import static org.junit.jupiter.api.A

[GitHub] [kafka] dajac commented on a diff in pull request #13639: KAFKA-14462; [12/N] Add GroupMetadataManager and ConsumerGroup

2023-05-25 Thread via GitHub


dajac commented on code in PR #13639:
URL: https://github.com/apache/kafka/pull/13639#discussion_r1205273140


##
group-coordinator/src/test/java/org/apache/kafka/coordinator/group/GroupMetadataManagerTest.java:
##
@@ -0,0 +1,2017 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.coordinator.group;
+
+import org.apache.kafka.common.Uuid;
+import org.apache.kafka.common.errors.FencedMemberEpochException;
+import org.apache.kafka.common.errors.GroupIdNotFoundException;
+import org.apache.kafka.common.errors.GroupMaxSizeReachedException;
+import org.apache.kafka.common.errors.InvalidRequestException;
+import org.apache.kafka.common.errors.UnknownMemberIdException;
+import org.apache.kafka.common.errors.UnknownServerException;
+import org.apache.kafka.common.errors.UnsupportedAssignorException;
+import org.apache.kafka.common.message.ConsumerGroupHeartbeatRequestData;
+import org.apache.kafka.common.message.ConsumerGroupHeartbeatResponseData;
+import org.apache.kafka.common.metadata.PartitionRecord;
+import org.apache.kafka.common.metadata.TopicRecord;
+import org.apache.kafka.common.network.ClientInformation;
+import org.apache.kafka.common.network.ListenerName;
+import org.apache.kafka.common.protocol.ApiKeys;
+import org.apache.kafka.common.protocol.ApiMessage;
+import org.apache.kafka.common.requests.RequestContext;
+import org.apache.kafka.common.requests.RequestHeader;
+import org.apache.kafka.common.security.auth.KafkaPrincipal;
+import org.apache.kafka.common.security.auth.SecurityProtocol;
+import org.apache.kafka.common.utils.LogContext;
+import org.apache.kafka.coordinator.group.assignor.AssignmentSpec;
+import org.apache.kafka.coordinator.group.assignor.GroupAssignment;
+import org.apache.kafka.coordinator.group.assignor.PartitionAssignor;
+import org.apache.kafka.coordinator.group.assignor.PartitionAssignorException;
+import org.apache.kafka.coordinator.group.consumer.Assignment;
+import org.apache.kafka.coordinator.group.consumer.ConsumerGroup;
+import org.apache.kafka.coordinator.group.consumer.ConsumerGroupMember;
+import org.apache.kafka.coordinator.group.consumer.TopicMetadata;
+import 
org.apache.kafka.coordinator.group.generated.ConsumerGroupCurrentMemberAssignmentKey;
+import 
org.apache.kafka.coordinator.group.generated.ConsumerGroupCurrentMemberAssignmentValue;
+import 
org.apache.kafka.coordinator.group.generated.ConsumerGroupMemberMetadataKey;
+import 
org.apache.kafka.coordinator.group.generated.ConsumerGroupMemberMetadataValue;
+import org.apache.kafka.coordinator.group.generated.ConsumerGroupMetadataKey;
+import org.apache.kafka.coordinator.group.generated.ConsumerGroupMetadataValue;
+import 
org.apache.kafka.coordinator.group.generated.ConsumerGroupPartitionMetadataKey;
+import 
org.apache.kafka.coordinator.group.generated.ConsumerGroupPartitionMetadataValue;
+import 
org.apache.kafka.coordinator.group.generated.ConsumerGroupTargetAssignmentMemberKey;
+import 
org.apache.kafka.coordinator.group.generated.ConsumerGroupTargetAssignmentMemberValue;
+import 
org.apache.kafka.coordinator.group.generated.ConsumerGroupTargetAssignmentMetadataKey;
+import 
org.apache.kafka.coordinator.group.generated.ConsumerGroupTargetAssignmentMetadataValue;
+import org.apache.kafka.image.TopicImage;
+import org.apache.kafka.image.TopicsDelta;
+import org.apache.kafka.image.TopicsImage;
+import org.apache.kafka.server.common.ApiMessageAndVersion;
+import org.apache.kafka.timeline.SnapshotRegistry;
+import org.junit.jupiter.api.Test;
+
+import java.net.InetAddress;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.Objects;
+import java.util.Set;
+
+import static 
org.apache.kafka.coordinator.group.AssignmentTestUtil.mkAssignment;
+import static 
org.apache.kafka.coordinator.group.AssignmentTestUtil.mkTopicAssignment;
+import static org.junit.jupiter.api.AssertionFailureBuilder.assertionFailure;
+import static org.junit.jupiter.api.Assertions.assertEquals;
+import static org.junit.jupiter.api.Assertions.assertNotEquals;
+import static org.junit.jupiter.api.A

[GitHub] [kafka] Hangleton commented on a diff in pull request #13696: KAFKA-14979:Incorrect lag was calculated when markPartitionsForTruncation in ReplicaAlterLogDirsThread

2023-05-25 Thread via GitHub


Hangleton commented on code in PR #13696:
URL: https://github.com/apache/kafka/pull/13696#discussion_r1205272856


##
core/src/main/scala/kafka/server/AbstractFetcherThread.scala:
##
@@ -460,8 +460,16 @@ abstract class AbstractFetcherThread(name: String,
 partitionMapLock.lockInterruptibly()
 try {
   Option(partitionStates.stateValue(topicPartition)).foreach { state =>
-val newState = PartitionFetchState(state.topicId, 
math.min(truncationOffset, state.fetchOffset),
-  state.lag, state.currentLeaderEpoch, state.delay, state = Truncating,
+var lag = state.lag

Review Comment:
   Isn't the lag recalculated on the next fetch iteration 
([here](https://github.com/apache/kafka/blob/trunk/core/src/main/scala/kafka/server/AbstractFetcherThread.scala#L362))?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] dajac commented on a diff in pull request #13639: KAFKA-14462; [12/N] Add GroupMetadataManager and ConsumerGroup

2023-05-25 Thread via GitHub


dajac commented on code in PR #13639:
URL: https://github.com/apache/kafka/pull/13639#discussion_r1205271405


##
group-coordinator/src/test/java/org/apache/kafka/coordinator/group/GroupMetadataManagerTest.java:
##
@@ -0,0 +1,2017 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.coordinator.group;
+
+import org.apache.kafka.common.Uuid;
+import org.apache.kafka.common.errors.FencedMemberEpochException;
+import org.apache.kafka.common.errors.GroupIdNotFoundException;
+import org.apache.kafka.common.errors.GroupMaxSizeReachedException;
+import org.apache.kafka.common.errors.InvalidRequestException;
+import org.apache.kafka.common.errors.UnknownMemberIdException;
+import org.apache.kafka.common.errors.UnknownServerException;
+import org.apache.kafka.common.errors.UnsupportedAssignorException;
+import org.apache.kafka.common.message.ConsumerGroupHeartbeatRequestData;
+import org.apache.kafka.common.message.ConsumerGroupHeartbeatResponseData;
+import org.apache.kafka.common.metadata.PartitionRecord;
+import org.apache.kafka.common.metadata.TopicRecord;
+import org.apache.kafka.common.network.ClientInformation;
+import org.apache.kafka.common.network.ListenerName;
+import org.apache.kafka.common.protocol.ApiKeys;
+import org.apache.kafka.common.protocol.ApiMessage;
+import org.apache.kafka.common.requests.RequestContext;
+import org.apache.kafka.common.requests.RequestHeader;
+import org.apache.kafka.common.security.auth.KafkaPrincipal;
+import org.apache.kafka.common.security.auth.SecurityProtocol;
+import org.apache.kafka.common.utils.LogContext;
+import org.apache.kafka.coordinator.group.assignor.AssignmentSpec;
+import org.apache.kafka.coordinator.group.assignor.GroupAssignment;
+import org.apache.kafka.coordinator.group.assignor.PartitionAssignor;
+import org.apache.kafka.coordinator.group.assignor.PartitionAssignorException;
+import org.apache.kafka.coordinator.group.consumer.Assignment;
+import org.apache.kafka.coordinator.group.consumer.ConsumerGroup;
+import org.apache.kafka.coordinator.group.consumer.ConsumerGroupMember;
+import org.apache.kafka.coordinator.group.consumer.TopicMetadata;
+import 
org.apache.kafka.coordinator.group.generated.ConsumerGroupCurrentMemberAssignmentKey;
+import 
org.apache.kafka.coordinator.group.generated.ConsumerGroupCurrentMemberAssignmentValue;
+import 
org.apache.kafka.coordinator.group.generated.ConsumerGroupMemberMetadataKey;
+import 
org.apache.kafka.coordinator.group.generated.ConsumerGroupMemberMetadataValue;
+import org.apache.kafka.coordinator.group.generated.ConsumerGroupMetadataKey;
+import org.apache.kafka.coordinator.group.generated.ConsumerGroupMetadataValue;
+import 
org.apache.kafka.coordinator.group.generated.ConsumerGroupPartitionMetadataKey;
+import 
org.apache.kafka.coordinator.group.generated.ConsumerGroupPartitionMetadataValue;
+import 
org.apache.kafka.coordinator.group.generated.ConsumerGroupTargetAssignmentMemberKey;
+import 
org.apache.kafka.coordinator.group.generated.ConsumerGroupTargetAssignmentMemberValue;
+import 
org.apache.kafka.coordinator.group.generated.ConsumerGroupTargetAssignmentMetadataKey;
+import 
org.apache.kafka.coordinator.group.generated.ConsumerGroupTargetAssignmentMetadataValue;
+import org.apache.kafka.image.TopicImage;
+import org.apache.kafka.image.TopicsDelta;
+import org.apache.kafka.image.TopicsImage;
+import org.apache.kafka.server.common.ApiMessageAndVersion;
+import org.apache.kafka.timeline.SnapshotRegistry;
+import org.junit.jupiter.api.Test;
+
+import java.net.InetAddress;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.Objects;
+import java.util.Set;
+
+import static 
org.apache.kafka.coordinator.group.AssignmentTestUtil.mkAssignment;
+import static 
org.apache.kafka.coordinator.group.AssignmentTestUtil.mkTopicAssignment;
+import static org.junit.jupiter.api.AssertionFailureBuilder.assertionFailure;
+import static org.junit.jupiter.api.Assertions.assertEquals;
+import static org.junit.jupiter.api.Assertions.assertNotEquals;
+import static org.junit.jupiter.api.A

[GitHub] [kafka] dajac commented on a diff in pull request #13639: KAFKA-14462; [12/N] Add GroupMetadataManager and ConsumerGroup

2023-05-25 Thread via GitHub


dajac commented on code in PR #13639:
URL: https://github.com/apache/kafka/pull/13639#discussion_r1205271089


##
group-coordinator/src/test/java/org/apache/kafka/coordinator/group/GroupMetadataManagerTest.java:
##
@@ -0,0 +1,2017 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.coordinator.group;
+
+import org.apache.kafka.common.Uuid;
+import org.apache.kafka.common.errors.FencedMemberEpochException;
+import org.apache.kafka.common.errors.GroupIdNotFoundException;
+import org.apache.kafka.common.errors.GroupMaxSizeReachedException;
+import org.apache.kafka.common.errors.InvalidRequestException;
+import org.apache.kafka.common.errors.UnknownMemberIdException;
+import org.apache.kafka.common.errors.UnknownServerException;
+import org.apache.kafka.common.errors.UnsupportedAssignorException;
+import org.apache.kafka.common.message.ConsumerGroupHeartbeatRequestData;
+import org.apache.kafka.common.message.ConsumerGroupHeartbeatResponseData;
+import org.apache.kafka.common.metadata.PartitionRecord;
+import org.apache.kafka.common.metadata.TopicRecord;
+import org.apache.kafka.common.network.ClientInformation;
+import org.apache.kafka.common.network.ListenerName;
+import org.apache.kafka.common.protocol.ApiKeys;
+import org.apache.kafka.common.protocol.ApiMessage;
+import org.apache.kafka.common.requests.RequestContext;
+import org.apache.kafka.common.requests.RequestHeader;
+import org.apache.kafka.common.security.auth.KafkaPrincipal;
+import org.apache.kafka.common.security.auth.SecurityProtocol;
+import org.apache.kafka.common.utils.LogContext;
+import org.apache.kafka.coordinator.group.assignor.AssignmentSpec;
+import org.apache.kafka.coordinator.group.assignor.GroupAssignment;
+import org.apache.kafka.coordinator.group.assignor.PartitionAssignor;
+import org.apache.kafka.coordinator.group.assignor.PartitionAssignorException;
+import org.apache.kafka.coordinator.group.consumer.Assignment;
+import org.apache.kafka.coordinator.group.consumer.ConsumerGroup;
+import org.apache.kafka.coordinator.group.consumer.ConsumerGroupMember;
+import org.apache.kafka.coordinator.group.consumer.TopicMetadata;
+import 
org.apache.kafka.coordinator.group.generated.ConsumerGroupCurrentMemberAssignmentKey;
+import 
org.apache.kafka.coordinator.group.generated.ConsumerGroupCurrentMemberAssignmentValue;
+import 
org.apache.kafka.coordinator.group.generated.ConsumerGroupMemberMetadataKey;
+import 
org.apache.kafka.coordinator.group.generated.ConsumerGroupMemberMetadataValue;
+import org.apache.kafka.coordinator.group.generated.ConsumerGroupMetadataKey;
+import org.apache.kafka.coordinator.group.generated.ConsumerGroupMetadataValue;
+import 
org.apache.kafka.coordinator.group.generated.ConsumerGroupPartitionMetadataKey;
+import 
org.apache.kafka.coordinator.group.generated.ConsumerGroupPartitionMetadataValue;
+import 
org.apache.kafka.coordinator.group.generated.ConsumerGroupTargetAssignmentMemberKey;
+import 
org.apache.kafka.coordinator.group.generated.ConsumerGroupTargetAssignmentMemberValue;
+import 
org.apache.kafka.coordinator.group.generated.ConsumerGroupTargetAssignmentMetadataKey;
+import 
org.apache.kafka.coordinator.group.generated.ConsumerGroupTargetAssignmentMetadataValue;
+import org.apache.kafka.image.TopicImage;
+import org.apache.kafka.image.TopicsDelta;
+import org.apache.kafka.image.TopicsImage;
+import org.apache.kafka.server.common.ApiMessageAndVersion;
+import org.apache.kafka.timeline.SnapshotRegistry;
+import org.junit.jupiter.api.Test;
+
+import java.net.InetAddress;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.Objects;
+import java.util.Set;
+
+import static 
org.apache.kafka.coordinator.group.AssignmentTestUtil.mkAssignment;
+import static 
org.apache.kafka.coordinator.group.AssignmentTestUtil.mkTopicAssignment;
+import static org.junit.jupiter.api.AssertionFailureBuilder.assertionFailure;
+import static org.junit.jupiter.api.Assertions.assertEquals;
+import static org.junit.jupiter.api.Assertions.assertNotEquals;
+import static org.junit.jupiter.api.A

[GitHub] [kafka] dajac commented on a diff in pull request #13639: KAFKA-14462; [12/N] Add GroupMetadataManager and ConsumerGroup

2023-05-25 Thread via GitHub


dajac commented on code in PR #13639:
URL: https://github.com/apache/kafka/pull/13639#discussion_r1205266764


##
group-coordinator/src/test/java/org/apache/kafka/coordinator/group/GroupMetadataManagerTest.java:
##
@@ -0,0 +1,2017 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.coordinator.group;
+
+import org.apache.kafka.common.Uuid;
+import org.apache.kafka.common.errors.FencedMemberEpochException;
+import org.apache.kafka.common.errors.GroupIdNotFoundException;
+import org.apache.kafka.common.errors.GroupMaxSizeReachedException;
+import org.apache.kafka.common.errors.InvalidRequestException;
+import org.apache.kafka.common.errors.UnknownMemberIdException;
+import org.apache.kafka.common.errors.UnknownServerException;
+import org.apache.kafka.common.errors.UnsupportedAssignorException;
+import org.apache.kafka.common.message.ConsumerGroupHeartbeatRequestData;
+import org.apache.kafka.common.message.ConsumerGroupHeartbeatResponseData;
+import org.apache.kafka.common.metadata.PartitionRecord;
+import org.apache.kafka.common.metadata.TopicRecord;
+import org.apache.kafka.common.network.ClientInformation;
+import org.apache.kafka.common.network.ListenerName;
+import org.apache.kafka.common.protocol.ApiKeys;
+import org.apache.kafka.common.protocol.ApiMessage;
+import org.apache.kafka.common.requests.RequestContext;
+import org.apache.kafka.common.requests.RequestHeader;
+import org.apache.kafka.common.security.auth.KafkaPrincipal;
+import org.apache.kafka.common.security.auth.SecurityProtocol;
+import org.apache.kafka.common.utils.LogContext;
+import org.apache.kafka.coordinator.group.assignor.AssignmentSpec;
+import org.apache.kafka.coordinator.group.assignor.GroupAssignment;
+import org.apache.kafka.coordinator.group.assignor.PartitionAssignor;
+import org.apache.kafka.coordinator.group.assignor.PartitionAssignorException;
+import org.apache.kafka.coordinator.group.consumer.Assignment;
+import org.apache.kafka.coordinator.group.consumer.ConsumerGroup;
+import org.apache.kafka.coordinator.group.consumer.ConsumerGroupMember;
+import org.apache.kafka.coordinator.group.consumer.TopicMetadata;
+import 
org.apache.kafka.coordinator.group.generated.ConsumerGroupCurrentMemberAssignmentKey;
+import 
org.apache.kafka.coordinator.group.generated.ConsumerGroupCurrentMemberAssignmentValue;
+import 
org.apache.kafka.coordinator.group.generated.ConsumerGroupMemberMetadataKey;
+import 
org.apache.kafka.coordinator.group.generated.ConsumerGroupMemberMetadataValue;
+import org.apache.kafka.coordinator.group.generated.ConsumerGroupMetadataKey;
+import org.apache.kafka.coordinator.group.generated.ConsumerGroupMetadataValue;
+import 
org.apache.kafka.coordinator.group.generated.ConsumerGroupPartitionMetadataKey;
+import 
org.apache.kafka.coordinator.group.generated.ConsumerGroupPartitionMetadataValue;
+import 
org.apache.kafka.coordinator.group.generated.ConsumerGroupTargetAssignmentMemberKey;
+import 
org.apache.kafka.coordinator.group.generated.ConsumerGroupTargetAssignmentMemberValue;
+import 
org.apache.kafka.coordinator.group.generated.ConsumerGroupTargetAssignmentMetadataKey;
+import 
org.apache.kafka.coordinator.group.generated.ConsumerGroupTargetAssignmentMetadataValue;
+import org.apache.kafka.image.TopicImage;
+import org.apache.kafka.image.TopicsDelta;
+import org.apache.kafka.image.TopicsImage;
+import org.apache.kafka.server.common.ApiMessageAndVersion;
+import org.apache.kafka.timeline.SnapshotRegistry;
+import org.junit.jupiter.api.Test;
+
+import java.net.InetAddress;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.Objects;
+import java.util.Set;
+
+import static 
org.apache.kafka.coordinator.group.AssignmentTestUtil.mkAssignment;
+import static 
org.apache.kafka.coordinator.group.AssignmentTestUtil.mkTopicAssignment;
+import static org.junit.jupiter.api.AssertionFailureBuilder.assertionFailure;
+import static org.junit.jupiter.api.Assertions.assertEquals;
+import static org.junit.jupiter.api.Assertions.assertNotEquals;
+import static org.junit.jupiter.api.A

[GitHub] [kafka] dajac commented on a diff in pull request #13639: KAFKA-14462; [12/N] Add GroupMetadataManager and ConsumerGroup

2023-05-25 Thread via GitHub


dajac commented on code in PR #13639:
URL: https://github.com/apache/kafka/pull/13639#discussion_r1205264648


##
group-coordinator/src/test/java/org/apache/kafka/coordinator/group/GroupMetadataManagerTest.java:
##
@@ -0,0 +1,2017 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.coordinator.group;
+
+import org.apache.kafka.common.Uuid;
+import org.apache.kafka.common.errors.FencedMemberEpochException;
+import org.apache.kafka.common.errors.GroupIdNotFoundException;
+import org.apache.kafka.common.errors.GroupMaxSizeReachedException;
+import org.apache.kafka.common.errors.InvalidRequestException;
+import org.apache.kafka.common.errors.UnknownMemberIdException;
+import org.apache.kafka.common.errors.UnknownServerException;
+import org.apache.kafka.common.errors.UnsupportedAssignorException;
+import org.apache.kafka.common.message.ConsumerGroupHeartbeatRequestData;
+import org.apache.kafka.common.message.ConsumerGroupHeartbeatResponseData;
+import org.apache.kafka.common.metadata.PartitionRecord;
+import org.apache.kafka.common.metadata.TopicRecord;
+import org.apache.kafka.common.network.ClientInformation;
+import org.apache.kafka.common.network.ListenerName;
+import org.apache.kafka.common.protocol.ApiKeys;
+import org.apache.kafka.common.protocol.ApiMessage;
+import org.apache.kafka.common.requests.RequestContext;
+import org.apache.kafka.common.requests.RequestHeader;
+import org.apache.kafka.common.security.auth.KafkaPrincipal;
+import org.apache.kafka.common.security.auth.SecurityProtocol;
+import org.apache.kafka.common.utils.LogContext;
+import org.apache.kafka.coordinator.group.assignor.AssignmentSpec;
+import org.apache.kafka.coordinator.group.assignor.GroupAssignment;
+import org.apache.kafka.coordinator.group.assignor.PartitionAssignor;
+import org.apache.kafka.coordinator.group.assignor.PartitionAssignorException;
+import org.apache.kafka.coordinator.group.consumer.Assignment;
+import org.apache.kafka.coordinator.group.consumer.ConsumerGroup;
+import org.apache.kafka.coordinator.group.consumer.ConsumerGroupMember;
+import org.apache.kafka.coordinator.group.consumer.TopicMetadata;
+import 
org.apache.kafka.coordinator.group.generated.ConsumerGroupCurrentMemberAssignmentKey;
+import 
org.apache.kafka.coordinator.group.generated.ConsumerGroupCurrentMemberAssignmentValue;
+import 
org.apache.kafka.coordinator.group.generated.ConsumerGroupMemberMetadataKey;
+import 
org.apache.kafka.coordinator.group.generated.ConsumerGroupMemberMetadataValue;
+import org.apache.kafka.coordinator.group.generated.ConsumerGroupMetadataKey;
+import org.apache.kafka.coordinator.group.generated.ConsumerGroupMetadataValue;
+import 
org.apache.kafka.coordinator.group.generated.ConsumerGroupPartitionMetadataKey;
+import 
org.apache.kafka.coordinator.group.generated.ConsumerGroupPartitionMetadataValue;
+import 
org.apache.kafka.coordinator.group.generated.ConsumerGroupTargetAssignmentMemberKey;
+import 
org.apache.kafka.coordinator.group.generated.ConsumerGroupTargetAssignmentMemberValue;
+import 
org.apache.kafka.coordinator.group.generated.ConsumerGroupTargetAssignmentMetadataKey;
+import 
org.apache.kafka.coordinator.group.generated.ConsumerGroupTargetAssignmentMetadataValue;
+import org.apache.kafka.image.TopicImage;
+import org.apache.kafka.image.TopicsDelta;
+import org.apache.kafka.image.TopicsImage;
+import org.apache.kafka.server.common.ApiMessageAndVersion;
+import org.apache.kafka.timeline.SnapshotRegistry;
+import org.junit.jupiter.api.Test;
+
+import java.net.InetAddress;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.Objects;
+import java.util.Set;
+
+import static 
org.apache.kafka.coordinator.group.AssignmentTestUtil.mkAssignment;
+import static 
org.apache.kafka.coordinator.group.AssignmentTestUtil.mkTopicAssignment;
+import static org.junit.jupiter.api.AssertionFailureBuilder.assertionFailure;
+import static org.junit.jupiter.api.Assertions.assertEquals;
+import static org.junit.jupiter.api.Assertions.assertNotEquals;
+import static org.junit.jupiter.api.A

[GitHub] [kafka] dajac commented on a diff in pull request #13639: KAFKA-14462; [12/N] Add GroupMetadataManager and ConsumerGroup

2023-05-25 Thread via GitHub


dajac commented on code in PR #13639:
URL: https://github.com/apache/kafka/pull/13639#discussion_r1205264178


##
group-coordinator/src/test/java/org/apache/kafka/coordinator/group/GroupMetadataManagerTest.java:
##
@@ -0,0 +1,2017 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.coordinator.group;
+
+import org.apache.kafka.common.Uuid;
+import org.apache.kafka.common.errors.FencedMemberEpochException;
+import org.apache.kafka.common.errors.GroupIdNotFoundException;
+import org.apache.kafka.common.errors.GroupMaxSizeReachedException;
+import org.apache.kafka.common.errors.InvalidRequestException;
+import org.apache.kafka.common.errors.UnknownMemberIdException;
+import org.apache.kafka.common.errors.UnknownServerException;
+import org.apache.kafka.common.errors.UnsupportedAssignorException;
+import org.apache.kafka.common.message.ConsumerGroupHeartbeatRequestData;
+import org.apache.kafka.common.message.ConsumerGroupHeartbeatResponseData;
+import org.apache.kafka.common.metadata.PartitionRecord;
+import org.apache.kafka.common.metadata.TopicRecord;
+import org.apache.kafka.common.network.ClientInformation;
+import org.apache.kafka.common.network.ListenerName;
+import org.apache.kafka.common.protocol.ApiKeys;
+import org.apache.kafka.common.protocol.ApiMessage;
+import org.apache.kafka.common.requests.RequestContext;
+import org.apache.kafka.common.requests.RequestHeader;
+import org.apache.kafka.common.security.auth.KafkaPrincipal;
+import org.apache.kafka.common.security.auth.SecurityProtocol;
+import org.apache.kafka.common.utils.LogContext;
+import org.apache.kafka.coordinator.group.assignor.AssignmentSpec;
+import org.apache.kafka.coordinator.group.assignor.GroupAssignment;
+import org.apache.kafka.coordinator.group.assignor.PartitionAssignor;
+import org.apache.kafka.coordinator.group.assignor.PartitionAssignorException;
+import org.apache.kafka.coordinator.group.consumer.Assignment;
+import org.apache.kafka.coordinator.group.consumer.ConsumerGroup;
+import org.apache.kafka.coordinator.group.consumer.ConsumerGroupMember;
+import org.apache.kafka.coordinator.group.consumer.TopicMetadata;
+import 
org.apache.kafka.coordinator.group.generated.ConsumerGroupCurrentMemberAssignmentKey;
+import 
org.apache.kafka.coordinator.group.generated.ConsumerGroupCurrentMemberAssignmentValue;
+import 
org.apache.kafka.coordinator.group.generated.ConsumerGroupMemberMetadataKey;
+import 
org.apache.kafka.coordinator.group.generated.ConsumerGroupMemberMetadataValue;
+import org.apache.kafka.coordinator.group.generated.ConsumerGroupMetadataKey;
+import org.apache.kafka.coordinator.group.generated.ConsumerGroupMetadataValue;
+import 
org.apache.kafka.coordinator.group.generated.ConsumerGroupPartitionMetadataKey;
+import 
org.apache.kafka.coordinator.group.generated.ConsumerGroupPartitionMetadataValue;
+import 
org.apache.kafka.coordinator.group.generated.ConsumerGroupTargetAssignmentMemberKey;
+import 
org.apache.kafka.coordinator.group.generated.ConsumerGroupTargetAssignmentMemberValue;
+import 
org.apache.kafka.coordinator.group.generated.ConsumerGroupTargetAssignmentMetadataKey;
+import 
org.apache.kafka.coordinator.group.generated.ConsumerGroupTargetAssignmentMetadataValue;
+import org.apache.kafka.image.TopicImage;
+import org.apache.kafka.image.TopicsDelta;
+import org.apache.kafka.image.TopicsImage;
+import org.apache.kafka.server.common.ApiMessageAndVersion;
+import org.apache.kafka.timeline.SnapshotRegistry;
+import org.junit.jupiter.api.Test;
+
+import java.net.InetAddress;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.Objects;
+import java.util.Set;
+
+import static 
org.apache.kafka.coordinator.group.AssignmentTestUtil.mkAssignment;
+import static 
org.apache.kafka.coordinator.group.AssignmentTestUtil.mkTopicAssignment;
+import static org.junit.jupiter.api.AssertionFailureBuilder.assertionFailure;
+import static org.junit.jupiter.api.Assertions.assertEquals;
+import static org.junit.jupiter.api.Assertions.assertNotEquals;
+import static org.junit.jupiter.api.A

[GitHub] [kafka] dajac commented on a diff in pull request #13639: KAFKA-14462; [12/N] Add GroupMetadataManager and ConsumerGroup

2023-05-25 Thread via GitHub


dajac commented on code in PR #13639:
URL: https://github.com/apache/kafka/pull/13639#discussion_r1205256385


##
group-coordinator/src/test/java/org/apache/kafka/coordinator/group/GroupMetadataManagerTest.java:
##
@@ -0,0 +1,2017 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.coordinator.group;
+
+import org.apache.kafka.common.Uuid;
+import org.apache.kafka.common.errors.FencedMemberEpochException;
+import org.apache.kafka.common.errors.GroupIdNotFoundException;
+import org.apache.kafka.common.errors.GroupMaxSizeReachedException;
+import org.apache.kafka.common.errors.InvalidRequestException;
+import org.apache.kafka.common.errors.UnknownMemberIdException;
+import org.apache.kafka.common.errors.UnknownServerException;
+import org.apache.kafka.common.errors.UnsupportedAssignorException;
+import org.apache.kafka.common.message.ConsumerGroupHeartbeatRequestData;
+import org.apache.kafka.common.message.ConsumerGroupHeartbeatResponseData;
+import org.apache.kafka.common.metadata.PartitionRecord;
+import org.apache.kafka.common.metadata.TopicRecord;
+import org.apache.kafka.common.network.ClientInformation;
+import org.apache.kafka.common.network.ListenerName;
+import org.apache.kafka.common.protocol.ApiKeys;
+import org.apache.kafka.common.protocol.ApiMessage;
+import org.apache.kafka.common.requests.RequestContext;
+import org.apache.kafka.common.requests.RequestHeader;
+import org.apache.kafka.common.security.auth.KafkaPrincipal;
+import org.apache.kafka.common.security.auth.SecurityProtocol;
+import org.apache.kafka.common.utils.LogContext;
+import org.apache.kafka.coordinator.group.assignor.AssignmentSpec;
+import org.apache.kafka.coordinator.group.assignor.GroupAssignment;
+import org.apache.kafka.coordinator.group.assignor.PartitionAssignor;
+import org.apache.kafka.coordinator.group.assignor.PartitionAssignorException;
+import org.apache.kafka.coordinator.group.consumer.Assignment;
+import org.apache.kafka.coordinator.group.consumer.ConsumerGroup;
+import org.apache.kafka.coordinator.group.consumer.ConsumerGroupMember;
+import org.apache.kafka.coordinator.group.consumer.TopicMetadata;
+import 
org.apache.kafka.coordinator.group.generated.ConsumerGroupCurrentMemberAssignmentKey;
+import 
org.apache.kafka.coordinator.group.generated.ConsumerGroupCurrentMemberAssignmentValue;
+import 
org.apache.kafka.coordinator.group.generated.ConsumerGroupMemberMetadataKey;
+import 
org.apache.kafka.coordinator.group.generated.ConsumerGroupMemberMetadataValue;
+import org.apache.kafka.coordinator.group.generated.ConsumerGroupMetadataKey;
+import org.apache.kafka.coordinator.group.generated.ConsumerGroupMetadataValue;
+import 
org.apache.kafka.coordinator.group.generated.ConsumerGroupPartitionMetadataKey;
+import 
org.apache.kafka.coordinator.group.generated.ConsumerGroupPartitionMetadataValue;
+import 
org.apache.kafka.coordinator.group.generated.ConsumerGroupTargetAssignmentMemberKey;
+import 
org.apache.kafka.coordinator.group.generated.ConsumerGroupTargetAssignmentMemberValue;
+import 
org.apache.kafka.coordinator.group.generated.ConsumerGroupTargetAssignmentMetadataKey;
+import 
org.apache.kafka.coordinator.group.generated.ConsumerGroupTargetAssignmentMetadataValue;
+import org.apache.kafka.image.TopicImage;
+import org.apache.kafka.image.TopicsDelta;
+import org.apache.kafka.image.TopicsImage;
+import org.apache.kafka.server.common.ApiMessageAndVersion;
+import org.apache.kafka.timeline.SnapshotRegistry;
+import org.junit.jupiter.api.Test;
+
+import java.net.InetAddress;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.Objects;
+import java.util.Set;
+
+import static 
org.apache.kafka.coordinator.group.AssignmentTestUtil.mkAssignment;
+import static 
org.apache.kafka.coordinator.group.AssignmentTestUtil.mkTopicAssignment;
+import static org.junit.jupiter.api.AssertionFailureBuilder.assertionFailure;
+import static org.junit.jupiter.api.Assertions.assertEquals;
+import static org.junit.jupiter.api.Assertions.assertNotEquals;
+import static org.junit.jupiter.api.A

[GitHub] [kafka] dajac commented on a diff in pull request #13639: KAFKA-14462; [12/N] Add GroupMetadataManager and ConsumerGroup

2023-05-25 Thread via GitHub


dajac commented on code in PR #13639:
URL: https://github.com/apache/kafka/pull/13639#discussion_r1205254844


##
group-coordinator/src/test/java/org/apache/kafka/coordinator/group/GroupMetadataManagerTest.java:
##
@@ -0,0 +1,2017 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.coordinator.group;
+
+import org.apache.kafka.common.Uuid;
+import org.apache.kafka.common.errors.FencedMemberEpochException;
+import org.apache.kafka.common.errors.GroupIdNotFoundException;
+import org.apache.kafka.common.errors.GroupMaxSizeReachedException;
+import org.apache.kafka.common.errors.InvalidRequestException;
+import org.apache.kafka.common.errors.UnknownMemberIdException;
+import org.apache.kafka.common.errors.UnknownServerException;
+import org.apache.kafka.common.errors.UnsupportedAssignorException;
+import org.apache.kafka.common.message.ConsumerGroupHeartbeatRequestData;
+import org.apache.kafka.common.message.ConsumerGroupHeartbeatResponseData;
+import org.apache.kafka.common.metadata.PartitionRecord;
+import org.apache.kafka.common.metadata.TopicRecord;
+import org.apache.kafka.common.network.ClientInformation;
+import org.apache.kafka.common.network.ListenerName;
+import org.apache.kafka.common.protocol.ApiKeys;
+import org.apache.kafka.common.protocol.ApiMessage;
+import org.apache.kafka.common.requests.RequestContext;
+import org.apache.kafka.common.requests.RequestHeader;
+import org.apache.kafka.common.security.auth.KafkaPrincipal;
+import org.apache.kafka.common.security.auth.SecurityProtocol;
+import org.apache.kafka.common.utils.LogContext;
+import org.apache.kafka.coordinator.group.assignor.AssignmentSpec;
+import org.apache.kafka.coordinator.group.assignor.GroupAssignment;
+import org.apache.kafka.coordinator.group.assignor.PartitionAssignor;
+import org.apache.kafka.coordinator.group.assignor.PartitionAssignorException;
+import org.apache.kafka.coordinator.group.consumer.Assignment;
+import org.apache.kafka.coordinator.group.consumer.ConsumerGroup;
+import org.apache.kafka.coordinator.group.consumer.ConsumerGroupMember;
+import org.apache.kafka.coordinator.group.consumer.TopicMetadata;
+import 
org.apache.kafka.coordinator.group.generated.ConsumerGroupCurrentMemberAssignmentKey;
+import 
org.apache.kafka.coordinator.group.generated.ConsumerGroupCurrentMemberAssignmentValue;
+import 
org.apache.kafka.coordinator.group.generated.ConsumerGroupMemberMetadataKey;
+import 
org.apache.kafka.coordinator.group.generated.ConsumerGroupMemberMetadataValue;
+import org.apache.kafka.coordinator.group.generated.ConsumerGroupMetadataKey;
+import org.apache.kafka.coordinator.group.generated.ConsumerGroupMetadataValue;
+import 
org.apache.kafka.coordinator.group.generated.ConsumerGroupPartitionMetadataKey;
+import 
org.apache.kafka.coordinator.group.generated.ConsumerGroupPartitionMetadataValue;
+import 
org.apache.kafka.coordinator.group.generated.ConsumerGroupTargetAssignmentMemberKey;
+import 
org.apache.kafka.coordinator.group.generated.ConsumerGroupTargetAssignmentMemberValue;
+import 
org.apache.kafka.coordinator.group.generated.ConsumerGroupTargetAssignmentMetadataKey;
+import 
org.apache.kafka.coordinator.group.generated.ConsumerGroupTargetAssignmentMetadataValue;
+import org.apache.kafka.image.TopicImage;
+import org.apache.kafka.image.TopicsDelta;
+import org.apache.kafka.image.TopicsImage;
+import org.apache.kafka.server.common.ApiMessageAndVersion;
+import org.apache.kafka.timeline.SnapshotRegistry;
+import org.junit.jupiter.api.Test;
+
+import java.net.InetAddress;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.Objects;
+import java.util.Set;
+
+import static 
org.apache.kafka.coordinator.group.AssignmentTestUtil.mkAssignment;
+import static 
org.apache.kafka.coordinator.group.AssignmentTestUtil.mkTopicAssignment;
+import static org.junit.jupiter.api.AssertionFailureBuilder.assertionFailure;
+import static org.junit.jupiter.api.Assertions.assertEquals;
+import static org.junit.jupiter.api.Assertions.assertNotEquals;
+import static org.junit.jupiter.api.A

[GitHub] [kafka] dajac commented on a diff in pull request #13639: KAFKA-14462; [12/N] Add GroupMetadataManager and ConsumerGroup

2023-05-25 Thread via GitHub


dajac commented on code in PR #13639:
URL: https://github.com/apache/kafka/pull/13639#discussion_r1205253148


##
group-coordinator/src/test/java/org/apache/kafka/coordinator/group/GroupMetadataManagerTest.java:
##
@@ -0,0 +1,2017 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.coordinator.group;
+
+import org.apache.kafka.common.Uuid;
+import org.apache.kafka.common.errors.FencedMemberEpochException;
+import org.apache.kafka.common.errors.GroupIdNotFoundException;
+import org.apache.kafka.common.errors.GroupMaxSizeReachedException;
+import org.apache.kafka.common.errors.InvalidRequestException;
+import org.apache.kafka.common.errors.UnknownMemberIdException;
+import org.apache.kafka.common.errors.UnknownServerException;
+import org.apache.kafka.common.errors.UnsupportedAssignorException;
+import org.apache.kafka.common.message.ConsumerGroupHeartbeatRequestData;
+import org.apache.kafka.common.message.ConsumerGroupHeartbeatResponseData;
+import org.apache.kafka.common.metadata.PartitionRecord;
+import org.apache.kafka.common.metadata.TopicRecord;
+import org.apache.kafka.common.network.ClientInformation;
+import org.apache.kafka.common.network.ListenerName;
+import org.apache.kafka.common.protocol.ApiKeys;
+import org.apache.kafka.common.protocol.ApiMessage;
+import org.apache.kafka.common.requests.RequestContext;
+import org.apache.kafka.common.requests.RequestHeader;
+import org.apache.kafka.common.security.auth.KafkaPrincipal;
+import org.apache.kafka.common.security.auth.SecurityProtocol;
+import org.apache.kafka.common.utils.LogContext;
+import org.apache.kafka.coordinator.group.assignor.AssignmentSpec;
+import org.apache.kafka.coordinator.group.assignor.GroupAssignment;
+import org.apache.kafka.coordinator.group.assignor.PartitionAssignor;
+import org.apache.kafka.coordinator.group.assignor.PartitionAssignorException;
+import org.apache.kafka.coordinator.group.consumer.Assignment;
+import org.apache.kafka.coordinator.group.consumer.ConsumerGroup;
+import org.apache.kafka.coordinator.group.consumer.ConsumerGroupMember;
+import org.apache.kafka.coordinator.group.consumer.TopicMetadata;
+import 
org.apache.kafka.coordinator.group.generated.ConsumerGroupCurrentMemberAssignmentKey;
+import 
org.apache.kafka.coordinator.group.generated.ConsumerGroupCurrentMemberAssignmentValue;
+import 
org.apache.kafka.coordinator.group.generated.ConsumerGroupMemberMetadataKey;
+import 
org.apache.kafka.coordinator.group.generated.ConsumerGroupMemberMetadataValue;
+import org.apache.kafka.coordinator.group.generated.ConsumerGroupMetadataKey;
+import org.apache.kafka.coordinator.group.generated.ConsumerGroupMetadataValue;
+import 
org.apache.kafka.coordinator.group.generated.ConsumerGroupPartitionMetadataKey;
+import 
org.apache.kafka.coordinator.group.generated.ConsumerGroupPartitionMetadataValue;
+import 
org.apache.kafka.coordinator.group.generated.ConsumerGroupTargetAssignmentMemberKey;
+import 
org.apache.kafka.coordinator.group.generated.ConsumerGroupTargetAssignmentMemberValue;
+import 
org.apache.kafka.coordinator.group.generated.ConsumerGroupTargetAssignmentMetadataKey;
+import 
org.apache.kafka.coordinator.group.generated.ConsumerGroupTargetAssignmentMetadataValue;
+import org.apache.kafka.image.TopicImage;
+import org.apache.kafka.image.TopicsDelta;
+import org.apache.kafka.image.TopicsImage;
+import org.apache.kafka.server.common.ApiMessageAndVersion;
+import org.apache.kafka.timeline.SnapshotRegistry;
+import org.junit.jupiter.api.Test;
+
+import java.net.InetAddress;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.Objects;
+import java.util.Set;
+
+import static 
org.apache.kafka.coordinator.group.AssignmentTestUtil.mkAssignment;
+import static 
org.apache.kafka.coordinator.group.AssignmentTestUtil.mkTopicAssignment;
+import static org.junit.jupiter.api.AssertionFailureBuilder.assertionFailure;
+import static org.junit.jupiter.api.Assertions.assertEquals;
+import static org.junit.jupiter.api.Assertions.assertNotEquals;
+import static org.junit.jupiter.api.A

[GitHub] [kafka] dajac commented on a diff in pull request #13639: KAFKA-14462; [12/N] Add GroupMetadataManager and ConsumerGroup

2023-05-25 Thread via GitHub


dajac commented on code in PR #13639:
URL: https://github.com/apache/kafka/pull/13639#discussion_r1205252133


##
group-coordinator/src/test/java/org/apache/kafka/coordinator/group/GroupMetadataManagerTest.java:
##
@@ -0,0 +1,2017 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.coordinator.group;
+
+import org.apache.kafka.common.Uuid;
+import org.apache.kafka.common.errors.FencedMemberEpochException;
+import org.apache.kafka.common.errors.GroupIdNotFoundException;
+import org.apache.kafka.common.errors.GroupMaxSizeReachedException;
+import org.apache.kafka.common.errors.InvalidRequestException;
+import org.apache.kafka.common.errors.UnknownMemberIdException;
+import org.apache.kafka.common.errors.UnknownServerException;
+import org.apache.kafka.common.errors.UnsupportedAssignorException;
+import org.apache.kafka.common.message.ConsumerGroupHeartbeatRequestData;
+import org.apache.kafka.common.message.ConsumerGroupHeartbeatResponseData;
+import org.apache.kafka.common.metadata.PartitionRecord;
+import org.apache.kafka.common.metadata.TopicRecord;
+import org.apache.kafka.common.network.ClientInformation;
+import org.apache.kafka.common.network.ListenerName;
+import org.apache.kafka.common.protocol.ApiKeys;
+import org.apache.kafka.common.protocol.ApiMessage;
+import org.apache.kafka.common.requests.RequestContext;
+import org.apache.kafka.common.requests.RequestHeader;
+import org.apache.kafka.common.security.auth.KafkaPrincipal;
+import org.apache.kafka.common.security.auth.SecurityProtocol;
+import org.apache.kafka.common.utils.LogContext;
+import org.apache.kafka.coordinator.group.assignor.AssignmentSpec;
+import org.apache.kafka.coordinator.group.assignor.GroupAssignment;
+import org.apache.kafka.coordinator.group.assignor.PartitionAssignor;
+import org.apache.kafka.coordinator.group.assignor.PartitionAssignorException;
+import org.apache.kafka.coordinator.group.consumer.Assignment;
+import org.apache.kafka.coordinator.group.consumer.ConsumerGroup;
+import org.apache.kafka.coordinator.group.consumer.ConsumerGroupMember;
+import org.apache.kafka.coordinator.group.consumer.TopicMetadata;
+import 
org.apache.kafka.coordinator.group.generated.ConsumerGroupCurrentMemberAssignmentKey;
+import 
org.apache.kafka.coordinator.group.generated.ConsumerGroupCurrentMemberAssignmentValue;
+import 
org.apache.kafka.coordinator.group.generated.ConsumerGroupMemberMetadataKey;
+import 
org.apache.kafka.coordinator.group.generated.ConsumerGroupMemberMetadataValue;
+import org.apache.kafka.coordinator.group.generated.ConsumerGroupMetadataKey;
+import org.apache.kafka.coordinator.group.generated.ConsumerGroupMetadataValue;
+import 
org.apache.kafka.coordinator.group.generated.ConsumerGroupPartitionMetadataKey;
+import 
org.apache.kafka.coordinator.group.generated.ConsumerGroupPartitionMetadataValue;
+import 
org.apache.kafka.coordinator.group.generated.ConsumerGroupTargetAssignmentMemberKey;
+import 
org.apache.kafka.coordinator.group.generated.ConsumerGroupTargetAssignmentMemberValue;
+import 
org.apache.kafka.coordinator.group.generated.ConsumerGroupTargetAssignmentMetadataKey;
+import 
org.apache.kafka.coordinator.group.generated.ConsumerGroupTargetAssignmentMetadataValue;
+import org.apache.kafka.image.TopicImage;
+import org.apache.kafka.image.TopicsDelta;
+import org.apache.kafka.image.TopicsImage;
+import org.apache.kafka.server.common.ApiMessageAndVersion;
+import org.apache.kafka.timeline.SnapshotRegistry;
+import org.junit.jupiter.api.Test;
+
+import java.net.InetAddress;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.Objects;
+import java.util.Set;
+
+import static 
org.apache.kafka.coordinator.group.AssignmentTestUtil.mkAssignment;
+import static 
org.apache.kafka.coordinator.group.AssignmentTestUtil.mkTopicAssignment;
+import static org.junit.jupiter.api.AssertionFailureBuilder.assertionFailure;
+import static org.junit.jupiter.api.Assertions.assertEquals;
+import static org.junit.jupiter.api.Assertions.assertNotEquals;
+import static org.junit.jupiter.api.A

[jira] [Commented] (KAFKA-14934) KafkaClusterTestKit makes FaultHandler accessible

2023-05-25 Thread Owen C.H. Leung (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-14934?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17726127#comment-17726127
 ] 

Owen C.H. Leung commented on KAFKA-14934:
-

Hi [~mumrah] , Can I give a try to this ? I'm new to contributing to kafka and 
want to get my hands dirty with it

> KafkaClusterTestKit makes FaultHandler accessible
> -
>
> Key: KAFKA-14934
> URL: https://issues.apache.org/jira/browse/KAFKA-14934
> Project: Kafka
>  Issue Type: Improvement
>  Components: unit tests
>Reporter: David Arthur
>Priority: Trivial
>  Labels: good-first-issue
>
> In KafkaClusterTestKit, we use a mock fault handler to avoid exiting the 
> process during tests. It would be useful to expose this fault handler so 
> tests could verify certain fault conditions (like a broker/controller failing 
> to start)



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[GitHub] [kafka] dajac commented on a diff in pull request #13639: KAFKA-14462; [12/N] Add GroupMetadataManager and ConsumerGroup

2023-05-25 Thread via GitHub


dajac commented on code in PR #13639:
URL: https://github.com/apache/kafka/pull/13639#discussion_r1205248818


##
group-coordinator/src/test/java/org/apache/kafka/coordinator/group/GroupMetadataManagerTest.java:
##
@@ -0,0 +1,2017 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.coordinator.group;
+
+import org.apache.kafka.common.Uuid;
+import org.apache.kafka.common.errors.FencedMemberEpochException;
+import org.apache.kafka.common.errors.GroupIdNotFoundException;
+import org.apache.kafka.common.errors.GroupMaxSizeReachedException;
+import org.apache.kafka.common.errors.InvalidRequestException;
+import org.apache.kafka.common.errors.UnknownMemberIdException;
+import org.apache.kafka.common.errors.UnknownServerException;
+import org.apache.kafka.common.errors.UnsupportedAssignorException;
+import org.apache.kafka.common.message.ConsumerGroupHeartbeatRequestData;
+import org.apache.kafka.common.message.ConsumerGroupHeartbeatResponseData;
+import org.apache.kafka.common.metadata.PartitionRecord;
+import org.apache.kafka.common.metadata.TopicRecord;
+import org.apache.kafka.common.network.ClientInformation;
+import org.apache.kafka.common.network.ListenerName;
+import org.apache.kafka.common.protocol.ApiKeys;
+import org.apache.kafka.common.protocol.ApiMessage;
+import org.apache.kafka.common.requests.RequestContext;
+import org.apache.kafka.common.requests.RequestHeader;
+import org.apache.kafka.common.security.auth.KafkaPrincipal;
+import org.apache.kafka.common.security.auth.SecurityProtocol;
+import org.apache.kafka.common.utils.LogContext;
+import org.apache.kafka.coordinator.group.assignor.AssignmentSpec;
+import org.apache.kafka.coordinator.group.assignor.GroupAssignment;
+import org.apache.kafka.coordinator.group.assignor.PartitionAssignor;
+import org.apache.kafka.coordinator.group.assignor.PartitionAssignorException;
+import org.apache.kafka.coordinator.group.consumer.Assignment;
+import org.apache.kafka.coordinator.group.consumer.ConsumerGroup;
+import org.apache.kafka.coordinator.group.consumer.ConsumerGroupMember;
+import org.apache.kafka.coordinator.group.consumer.TopicMetadata;
+import 
org.apache.kafka.coordinator.group.generated.ConsumerGroupCurrentMemberAssignmentKey;
+import 
org.apache.kafka.coordinator.group.generated.ConsumerGroupCurrentMemberAssignmentValue;
+import 
org.apache.kafka.coordinator.group.generated.ConsumerGroupMemberMetadataKey;
+import 
org.apache.kafka.coordinator.group.generated.ConsumerGroupMemberMetadataValue;
+import org.apache.kafka.coordinator.group.generated.ConsumerGroupMetadataKey;
+import org.apache.kafka.coordinator.group.generated.ConsumerGroupMetadataValue;
+import 
org.apache.kafka.coordinator.group.generated.ConsumerGroupPartitionMetadataKey;
+import 
org.apache.kafka.coordinator.group.generated.ConsumerGroupPartitionMetadataValue;
+import 
org.apache.kafka.coordinator.group.generated.ConsumerGroupTargetAssignmentMemberKey;
+import 
org.apache.kafka.coordinator.group.generated.ConsumerGroupTargetAssignmentMemberValue;
+import 
org.apache.kafka.coordinator.group.generated.ConsumerGroupTargetAssignmentMetadataKey;
+import 
org.apache.kafka.coordinator.group.generated.ConsumerGroupTargetAssignmentMetadataValue;
+import org.apache.kafka.image.TopicImage;
+import org.apache.kafka.image.TopicsDelta;
+import org.apache.kafka.image.TopicsImage;
+import org.apache.kafka.server.common.ApiMessageAndVersion;
+import org.apache.kafka.timeline.SnapshotRegistry;
+import org.junit.jupiter.api.Test;
+
+import java.net.InetAddress;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.Objects;
+import java.util.Set;
+
+import static 
org.apache.kafka.coordinator.group.AssignmentTestUtil.mkAssignment;
+import static 
org.apache.kafka.coordinator.group.AssignmentTestUtil.mkTopicAssignment;
+import static org.junit.jupiter.api.AssertionFailureBuilder.assertionFailure;
+import static org.junit.jupiter.api.Assertions.assertEquals;
+import static org.junit.jupiter.api.Assertions.assertNotEquals;
+import static org.junit.jupiter.api.A

[GitHub] [kafka] dajac commented on a diff in pull request #13639: KAFKA-14462; [12/N] Add GroupMetadataManager and ConsumerGroup

2023-05-25 Thread via GitHub


dajac commented on code in PR #13639:
URL: https://github.com/apache/kafka/pull/13639#discussion_r1205247698


##
group-coordinator/src/test/java/org/apache/kafka/coordinator/group/GroupMetadataManagerTest.java:
##
@@ -0,0 +1,2017 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.coordinator.group;
+
+import org.apache.kafka.common.Uuid;
+import org.apache.kafka.common.errors.FencedMemberEpochException;
+import org.apache.kafka.common.errors.GroupIdNotFoundException;
+import org.apache.kafka.common.errors.GroupMaxSizeReachedException;
+import org.apache.kafka.common.errors.InvalidRequestException;
+import org.apache.kafka.common.errors.UnknownMemberIdException;
+import org.apache.kafka.common.errors.UnknownServerException;
+import org.apache.kafka.common.errors.UnsupportedAssignorException;
+import org.apache.kafka.common.message.ConsumerGroupHeartbeatRequestData;
+import org.apache.kafka.common.message.ConsumerGroupHeartbeatResponseData;
+import org.apache.kafka.common.metadata.PartitionRecord;
+import org.apache.kafka.common.metadata.TopicRecord;
+import org.apache.kafka.common.network.ClientInformation;
+import org.apache.kafka.common.network.ListenerName;
+import org.apache.kafka.common.protocol.ApiKeys;
+import org.apache.kafka.common.protocol.ApiMessage;
+import org.apache.kafka.common.requests.RequestContext;
+import org.apache.kafka.common.requests.RequestHeader;
+import org.apache.kafka.common.security.auth.KafkaPrincipal;
+import org.apache.kafka.common.security.auth.SecurityProtocol;
+import org.apache.kafka.common.utils.LogContext;
+import org.apache.kafka.coordinator.group.assignor.AssignmentSpec;
+import org.apache.kafka.coordinator.group.assignor.GroupAssignment;
+import org.apache.kafka.coordinator.group.assignor.PartitionAssignor;
+import org.apache.kafka.coordinator.group.assignor.PartitionAssignorException;
+import org.apache.kafka.coordinator.group.consumer.Assignment;
+import org.apache.kafka.coordinator.group.consumer.ConsumerGroup;
+import org.apache.kafka.coordinator.group.consumer.ConsumerGroupMember;
+import org.apache.kafka.coordinator.group.consumer.TopicMetadata;
+import 
org.apache.kafka.coordinator.group.generated.ConsumerGroupCurrentMemberAssignmentKey;
+import 
org.apache.kafka.coordinator.group.generated.ConsumerGroupCurrentMemberAssignmentValue;
+import 
org.apache.kafka.coordinator.group.generated.ConsumerGroupMemberMetadataKey;
+import 
org.apache.kafka.coordinator.group.generated.ConsumerGroupMemberMetadataValue;
+import org.apache.kafka.coordinator.group.generated.ConsumerGroupMetadataKey;
+import org.apache.kafka.coordinator.group.generated.ConsumerGroupMetadataValue;
+import 
org.apache.kafka.coordinator.group.generated.ConsumerGroupPartitionMetadataKey;
+import 
org.apache.kafka.coordinator.group.generated.ConsumerGroupPartitionMetadataValue;
+import 
org.apache.kafka.coordinator.group.generated.ConsumerGroupTargetAssignmentMemberKey;
+import 
org.apache.kafka.coordinator.group.generated.ConsumerGroupTargetAssignmentMemberValue;
+import 
org.apache.kafka.coordinator.group.generated.ConsumerGroupTargetAssignmentMetadataKey;
+import 
org.apache.kafka.coordinator.group.generated.ConsumerGroupTargetAssignmentMetadataValue;
+import org.apache.kafka.image.TopicImage;
+import org.apache.kafka.image.TopicsDelta;
+import org.apache.kafka.image.TopicsImage;
+import org.apache.kafka.server.common.ApiMessageAndVersion;
+import org.apache.kafka.timeline.SnapshotRegistry;
+import org.junit.jupiter.api.Test;
+
+import java.net.InetAddress;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.Objects;
+import java.util.Set;
+
+import static 
org.apache.kafka.coordinator.group.AssignmentTestUtil.mkAssignment;
+import static 
org.apache.kafka.coordinator.group.AssignmentTestUtil.mkTopicAssignment;
+import static org.junit.jupiter.api.AssertionFailureBuilder.assertionFailure;
+import static org.junit.jupiter.api.Assertions.assertEquals;
+import static org.junit.jupiter.api.Assertions.assertNotEquals;
+import static org.junit.jupiter.api.A

[GitHub] [kafka] dajac commented on a diff in pull request #13639: KAFKA-14462; [12/N] Add GroupMetadataManager and ConsumerGroup

2023-05-25 Thread via GitHub


dajac commented on code in PR #13639:
URL: https://github.com/apache/kafka/pull/13639#discussion_r1205239532


##
group-coordinator/src/test/java/org/apache/kafka/coordinator/group/consumer/ConsumerGroupTest.java:
##
@@ -0,0 +1,500 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.coordinator.group.consumer;
+
+import org.apache.kafka.common.Uuid;
+import org.apache.kafka.common.errors.UnknownMemberIdException;
+import org.apache.kafka.common.utils.LogContext;
+import org.apache.kafka.coordinator.group.GroupMetadataManagerTest;
+import org.apache.kafka.image.TopicsImage;
+import org.apache.kafka.timeline.SnapshotRegistry;
+import org.junit.jupiter.api.Test;
+
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.Optional;
+
+import static org.apache.kafka.common.utils.Utils.mkEntry;
+import static org.apache.kafka.common.utils.Utils.mkMap;
+import static 
org.apache.kafka.coordinator.group.AssignmentTestUtil.mkAssignment;
+import static 
org.apache.kafka.coordinator.group.AssignmentTestUtil.mkTopicAssignment;
+import static org.junit.jupiter.api.Assertions.assertEquals;
+import static org.junit.jupiter.api.Assertions.assertFalse;
+import static org.junit.jupiter.api.Assertions.assertThrows;
+import static org.junit.jupiter.api.Assertions.assertTrue;
+
+public class ConsumerGroupTest {
+
+private ConsumerGroup createConsumerGroup(String groupId) {
+SnapshotRegistry snapshotRegistry = new SnapshotRegistry(new 
LogContext());
+return new ConsumerGroup(snapshotRegistry, groupId);
+}
+
+@Test
+public void testGetOrCreateMember() {
+ConsumerGroup consumerGroup = createConsumerGroup("foo");
+ConsumerGroupMember member;
+
+// Create a group.
+member = consumerGroup.getOrMaybeCreateMember("member-id", true);
+assertEquals("member-id", member.memberId());
+
+// Get that group back.
+member = consumerGroup.getOrMaybeCreateMember("member-id", false);
+assertEquals("member-id", member.memberId());
+
+assertThrows(UnknownMemberIdException.class, () ->
+consumerGroup.getOrMaybeCreateMember("does-not-exist", false));
+}
+
+@Test
+public void testUpdateMember() {
+ConsumerGroup consumerGroup = createConsumerGroup("foo");
+ConsumerGroupMember member;
+
+member = consumerGroup.getOrMaybeCreateMember("member", true);
+
+member = new ConsumerGroupMember.Builder(member)
+.setSubscribedTopicNames(Arrays.asList("foo", "bar"))
+.build();
+
+consumerGroup.updateMember(member);
+
+assertEquals(member, consumerGroup.getOrMaybeCreateMember("member", 
false));
+}
+
+@Test
+public void testRemoveMember() {
+ConsumerGroup consumerGroup = createConsumerGroup("foo");
+
+consumerGroup.getOrMaybeCreateMember("member", true);
+assertTrue(consumerGroup.hasMember("member"));
+
+consumerGroup.removeMember("member");
+assertFalse(consumerGroup.hasMember("member"));
+
+}
+
+@Test
+public void testUpdatingMemberUpdatesPartitionEpoch() {
+Uuid fooTopicId = Uuid.randomUuid();
+Uuid barTopicId = Uuid.randomUuid();
+Uuid zarTopicId = Uuid.randomUuid();
+
+ConsumerGroup consumerGroup = createConsumerGroup("foo");
+ConsumerGroupMember member;
+
+member = new ConsumerGroupMember.Builder("member")
+.setMemberEpoch(10)
+.setAssignedPartitions(mkAssignment(
+mkTopicAssignment(fooTopicId, 1, 2, 3)))
+.setPartitionsPendingRevocation(mkAssignment(
+mkTopicAssignment(barTopicId, 4, 5, 6)))
+.setPartitionsPendingAssignment(mkAssignment(
+mkTopicAssignment(zarTopicId, 7, 8, 9)))
+.build();
+
+consumerGroup.updateMember(member);
+
+assertEquals(10, consumerGroup.currentPartitionEpoch(fooTopicId, 1));
+assertEquals(10, consumerGroup.currentPartitionEpoch(fooTopicId, 2));
+assertEquals(10, consumerGroup.currentPartitionEpoch(fooTopicId, 3));
+assertEquals(10, consumerGroup.currentPartitionEpoch(barT

[GitHub] [kafka] dajac commented on a diff in pull request #13639: KAFKA-14462; [12/N] Add GroupMetadataManager and ConsumerGroup

2023-05-25 Thread via GitHub


dajac commented on code in PR #13639:
URL: https://github.com/apache/kafka/pull/13639#discussion_r1205239034


##
group-coordinator/src/test/java/org/apache/kafka/coordinator/group/consumer/ConsumerGroupTest.java:
##
@@ -0,0 +1,500 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.coordinator.group.consumer;
+
+import org.apache.kafka.common.Uuid;
+import org.apache.kafka.common.errors.UnknownMemberIdException;
+import org.apache.kafka.common.utils.LogContext;
+import org.apache.kafka.coordinator.group.GroupMetadataManagerTest;
+import org.apache.kafka.image.TopicsImage;
+import org.apache.kafka.timeline.SnapshotRegistry;
+import org.junit.jupiter.api.Test;
+
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.Optional;
+
+import static org.apache.kafka.common.utils.Utils.mkEntry;
+import static org.apache.kafka.common.utils.Utils.mkMap;
+import static 
org.apache.kafka.coordinator.group.AssignmentTestUtil.mkAssignment;
+import static 
org.apache.kafka.coordinator.group.AssignmentTestUtil.mkTopicAssignment;
+import static org.junit.jupiter.api.Assertions.assertEquals;
+import static org.junit.jupiter.api.Assertions.assertFalse;
+import static org.junit.jupiter.api.Assertions.assertThrows;
+import static org.junit.jupiter.api.Assertions.assertTrue;
+
+public class ConsumerGroupTest {
+
+private ConsumerGroup createConsumerGroup(String groupId) {
+SnapshotRegistry snapshotRegistry = new SnapshotRegistry(new 
LogContext());
+return new ConsumerGroup(snapshotRegistry, groupId);
+}
+
+@Test
+public void testGetOrCreateMember() {
+ConsumerGroup consumerGroup = createConsumerGroup("foo");
+ConsumerGroupMember member;
+
+// Create a group.
+member = consumerGroup.getOrMaybeCreateMember("member-id", true);
+assertEquals("member-id", member.memberId());
+
+// Get that group back.
+member = consumerGroup.getOrMaybeCreateMember("member-id", false);
+assertEquals("member-id", member.memberId());
+
+assertThrows(UnknownMemberIdException.class, () ->
+consumerGroup.getOrMaybeCreateMember("does-not-exist", false));
+}
+
+@Test
+public void testUpdateMember() {
+ConsumerGroup consumerGroup = createConsumerGroup("foo");
+ConsumerGroupMember member;
+
+member = consumerGroup.getOrMaybeCreateMember("member", true);
+
+member = new ConsumerGroupMember.Builder(member)
+.setSubscribedTopicNames(Arrays.asList("foo", "bar"))
+.build();
+
+consumerGroup.updateMember(member);
+
+assertEquals(member, consumerGroup.getOrMaybeCreateMember("member", 
false));
+}
+
+@Test
+public void testRemoveMember() {
+ConsumerGroup consumerGroup = createConsumerGroup("foo");
+
+consumerGroup.getOrMaybeCreateMember("member", true);
+assertTrue(consumerGroup.hasMember("member"));
+
+consumerGroup.removeMember("member");
+assertFalse(consumerGroup.hasMember("member"));
+
+}
+
+@Test
+public void testUpdatingMemberUpdatesPartitionEpoch() {
+Uuid fooTopicId = Uuid.randomUuid();
+Uuid barTopicId = Uuid.randomUuid();
+Uuid zarTopicId = Uuid.randomUuid();
+
+ConsumerGroup consumerGroup = createConsumerGroup("foo");
+ConsumerGroupMember member;
+
+member = new ConsumerGroupMember.Builder("member")
+.setMemberEpoch(10)
+.setAssignedPartitions(mkAssignment(
+mkTopicAssignment(fooTopicId, 1, 2, 3)))
+.setPartitionsPendingRevocation(mkAssignment(
+mkTopicAssignment(barTopicId, 4, 5, 6)))
+.setPartitionsPendingAssignment(mkAssignment(
+mkTopicAssignment(zarTopicId, 7, 8, 9)))
+.build();
+
+consumerGroup.updateMember(member);
+
+assertEquals(10, consumerGroup.currentPartitionEpoch(fooTopicId, 1));
+assertEquals(10, consumerGroup.currentPartitionEpoch(fooTopicId, 2));
+assertEquals(10, consumerGroup.currentPartitionEpoch(fooTopicId, 3));
+assertEquals(10, consumerGroup.currentPartitionEpoch(barT

  1   2   >