[GitHub] [kafka] dajac commented on a diff in pull request #12845: KAFKA-14367; Add `JoinGroup` to the new `GroupCoordinator` interface
dajac commented on code in PR #12845: URL: https://github.com/apache/kafka/pull/12845#discussion_r1024835046 ## core/src/main/scala/kafka/server/KafkaApis.scala: ## @@ -1646,73 +1649,46 @@ class KafkaApis(val requestChannel: RequestChannel, } } + private def makeGroupCoordinatorRequestContextFrom( Review Comment: I can remove the `From` part. I am not too opinionated on this. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] showuon commented on pull request #12748: KAFKA-13715: add generationId field in subscription
showuon commented on PR #12748: URL: https://github.com/apache/kafka/pull/12748#issuecomment-1318195383 @dajac , thanks for the comments. PR updated. Thanks. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] showuon commented on a diff in pull request #12748: KAFKA-13715: add generationId field in subscription
showuon commented on code in PR #12748: URL: https://github.com/apache/kafka/pull/12748#discussion_r1024795520 ## clients/src/main/java/org/apache/kafka/clients/consumer/internals/AbstractStickyAssignor.java: ## @@ -162,6 +162,17 @@ private boolean allSubscriptionsEqual(Set allTopics, return isAllSubscriptionsEqual; } +// visible for testing +MemberData memberDataFromSubscription(Subscription subscription) { +if (!subscription.ownedPartitions().isEmpty() && subscription.generationId().isPresent()) { Review Comment: I know what you mean now. Yes, for `subscription v2` check, we should only check generationId. Will update it and add a test for it. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Comment Edited] (KAFKA-13639) NotEnoughReplicasException for __consumer_offsets topic due to out of order offset
[ https://issues.apache.org/jira/browse/KAFKA-13639?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17635089#comment-17635089 ] HaiyuanZhao edited comment on KAFKA-13639 at 11/17/22 6:23 AM: --- [~ijuma] Hi, We also have seen similar behavior with java 1.8.0_45. In addition, we encountered this issue with very large values, and not just topic __consumer_offset. The exception log is as followed. Hope this can help. {code:java} [2022-11-16 22:54:52,147] ERROR [ReplicaFetcher replicaId=0, leaderId=1, fetcherId=419] Unexpected error occurred while processing data for partition test-topic-test-topic500--2 at offset 7434 (kafka.server.ReplicaFetcherThread) kafka.common.OffsetsOutOfOrderException: Out of order offsets found in append to test-topic-test-topic500--2: ArrayBuffer(4126996017, 4126996018, 4126996019, 4126996020, 4126996021, 4126996022, 4126996023, 4126996024, 4126996025, 4126996026, 4126996027, 4126996028, 4126996029, 4126996030, 4126996031, 4126996032, 4126996033, 4126996034, 4126996035, 4126996036, 4126 996037, 4126996038, 4126996039, 4126996040, 4126996041, 4126996042, 4126996043, 4126996044, 4126996045, 4126996046, 4126996047, 4126996048, 4126996049, 4126996050, 4126996051, 412699605 2, 4126996053, 4126996054, 4126996055, 4126996056, 4126996057, 4126996058, 4126996059, 4126996060, 4126996061, 4126996062, 4126996063, 4126996064, 4126996065, 4126996066, 4126996067, 41 26996068, 4126996069, 4126996070, 4126996071, 4126996072, 4126996073, 4126996074, 4126996075, 4126996076, 4126996077, 4126996078, 4126996079, 4126996080, 4126996081, 4126996082, 4126996 083, 4126996084, 4126996085, 4126996086, 4126996087, 4126996088, 4126996089, 4126996090, 4126996091, 4126996092, 4126996093, 4126996094, 4126996095, 4126996096, 4126996097, 4126996098, 4126996099, 4126996100, 4126996101, 4126996102, 4126996103, 4126996104, 4126996105, 4126996106, 4126996107, 4126996108, 4126996109, 4126996110, 4126996111, 4126996112, 4126996113, 41269 96114, 4126996115, 4126996116, 4126996117, 4126996118, 4126996119, 4126996120, 4126996121, 4126996122, 4126996123, 4126996124, 4126996125, 4126996126, 4126996127, 4126996128, 4126996129 , 4126996130, 4126996131, 4126996132, 4126996133, 4126996134, 4126996135, 4126996136, 4126996137, 4126996138, 4126996139, 4126996140, 4126996141, 4126996142, 4071685845, 4071685846, 407 1685847, 4071685848, 4071685849, 4071685850, 4071685851, 4071685852, 4071685853, 4071685854, 4071685855, 4071685856, 4071685857, 4071685858, 4071685859, 4071685860, 4071685861, 40716858 62, 4071685863, 4071685864, 4071685865, 4071685866, 4071685867, 4071685868, 4071685869, 4071685870, 4071685871, 4071685872, 4071685873, 4071685874, 4071685875, 4071685876, 4071685877, 4 071685878, 4071685879, 4071685880, 4071685881, 4071685882, 4071685883, 4071685884, 4071685885, 4071685886, 4071685887, 4071685888, 4071685889, 4071685890, 4071685891, 4071685892, 407168 5893, 4071685894, 4071685895, 4071685896, 4071685897, 4071685898, 4071685899, 4071685900, 4071685901, 4071685902, 4071685903, 4071685904, 4071685905, 4071685906, 4071685907, 4071685908, 4071685909, 4071685910, 4071685911, 4071685912, 4071685913, 4071685914, 4071685915, 4071685916, 4071685917, 4071685918, 4071685919, 4071685920, 4071685921, 4071685922, 4071685923, 4071 685924, 4071685925, 4071685926, 4071685927, 4071685928, 4071685929, 4071685930, 4071685931, 4071685932, 4071685933, 4071685934, 4071685935, 4071685936, 4071685937, 4071685938, 407168593 9, 4071685940, 4071685941, 4071685942, 4071685943, 4071685944, 4071685945, 4071685946, 4071685947, 4071685948, 4071685949, 4071685950, 4071685951, 4071685952, 4071685953, 4071685954, 40 71685955, 4071685956, 4071685957, 4071685958, 4071685959, 4071685960, 4071685961, 4071685962, 4071685963, 4071685964, 4071685965, 4071685966, 4071685967, 4071685968, 4071685969, 4071685 970) at kafka.log.Log.$anonfun$append$2(Log.scala:1165) at kafka.log.Log.append(Log.scala:2409) at kafka.log.Log.appendAsFollower(Log.scala:1073) at kafka.cluster.Partition.doAppendRecordsToFollowerOrFutureReplica(Partition.scala:1036) at kafka.cluster.Partition.appendRecordsToFollowerOrFutureReplica(Partition.scala:1043) at kafka.server.ReplicaFetcherThread.processPartitionData(ReplicaFetcherThread.scala:172) at kafka.server.AbstractFetcherThread.$anonfun$processFetchRequest$7(AbstractFetcherThread.scala:339) at kafka.server.AbstractFetcherThread.$anonfun$processFetchRequest$6(AbstractFetcherThread.scala:326) at kafka.server.AbstractFetcherThread.$anonfun$processFetchRequest$6$adapted(AbstractFetcherThread.scala:325) at kafka.server.AbstractFetcherThread$$Lambda$1298/1889097333.apply(Unknown Source) at kafka.utils.Implicits$MapExtensionMethods$.$anonfun$forKeyValue$1(Implicits.scala:62) at
[GitHub] [kafka] showuon commented on a diff in pull request #12748: KAFKA-13715: add generationId field in subscription
showuon commented on code in PR #12748: URL: https://github.com/apache/kafka/pull/12748#discussion_r1024795520 ## clients/src/main/java/org/apache/kafka/clients/consumer/internals/AbstractStickyAssignor.java: ## @@ -162,6 +162,17 @@ private boolean allSubscriptionsEqual(Set allTopics, return isAllSubscriptionsEqual; } +// visible for testing +MemberData memberDataFromSubscription(Subscription subscription) { +if (!subscription.ownedPartitions().isEmpty() && subscription.generationId().isPresent()) { Review Comment: I know what you mean now. Yes, for `subscription v3` check, we should only check generationId. Will update it and add a test for it. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] akhileshchg commented on a diff in pull request #12815: KIP-866 Part 1
akhileshchg commented on code in PR #12815: URL: https://github.com/apache/kafka/pull/12815#discussion_r1024674845 ## metadata/src/main/java/org/apache/kafka/controller/QuorumController.java: ## @@ -1571,6 +1638,10 @@ private void resetToEmptyState() { */ private final FeatureControlManager featureControl; +private final KRaftMetadataListener listener; + +public final MigrationListener migrationListener = new MigrationListener(); // TODO clean this up Review Comment: These two names get confusing quickly. In my understanding, `KRaftMetadataListener` is for replicating data consistently to Zookeeper from the KRaft metadata log and `MigrationListener` is for migrating Zookepeer data to KRaft Metadata log. Can we rename `migrationListener` to `kraftToZkMigrationHandler` or something to that effect? and `listener` to zkToKRaftMigrationHandler`? ## core/src/main/scala/kafka/zk/KafkaZkClient.scala: ## @@ -156,6 +157,76 @@ class KafkaZkClient private[zk] (zooKeeperClient: ZooKeeperClient, isSecure: Boo tryCreateControllerZNodeAndIncrementEpoch() } + /** + * Registers a given KRaft controller in zookeeper as the active controller. Unlike the ZK equivalent of this method, + * this creates /controller as a persistent znode. This prevents ZK brokers from attempting to claim the controller + * leadership during a KRaft leadership failover. + * + * This method is called at the beginning of a KRaft migration and during subsequent KRaft leadership changes during + * the migration. + * + * To ensure that the KRaft controller epoch proceeds the current ZK controller epoch, this registration algorithm + * uses a conditional update on the /controller_epoch znode. If a new ZK controller is elected during this method, + * the multi request transaction will fail and this method will return None. + * + * @param kraftControllerId ID of the KRaft controller node + * @param kraftControllerEpoch Epoch of the KRaft controller node + * @return An optional of the new zkVersion of /controller_epoch. None if we could not register the KRaft controller. + */ + def tryRegisterKRaftControllerAsActiveController(kraftControllerId: Int, kraftControllerEpoch: Int): Option[Int] = { Review Comment: Just a design consideration, do you think this must be part of the MigrationClient instead? ## core/src/main/scala/kafka/migration/ZkMigrationClient.scala: ## @@ -0,0 +1,359 @@ +package kafka.migration + +import kafka.api.LeaderAndIsr +import kafka.cluster.{Broker, EndPoint} +import kafka.controller.{ControllerChannelManager, LeaderIsrAndControllerEpoch, ReplicaAssignment} +import kafka.migration.ZkMigrationClient.brokerToBrokerRegistration +import kafka.server.{ConfigEntityName, ConfigType, ZkAdminManager} +import kafka.utils.Logging +import kafka.zk.TopicZNode.TopicIdReplicaAssignment +import kafka.zk._ +import kafka.zookeeper._ +import org.apache.kafka.common.config.ConfigResource +import org.apache.kafka.common.errors.ControllerMovedException +import org.apache.kafka.common.metadata.ClientQuotaRecord.EntityData +import org.apache.kafka.common.metadata._ +import org.apache.kafka.common.requests.{AbstractControlRequest, AbstractResponse} +import org.apache.kafka.common.{Endpoint, TopicPartition, Uuid} +import org.apache.kafka.metadata.{BrokerRegistration, PartitionRegistration, VersionRange} +import org.apache.kafka.migration._ +import org.apache.kafka.server.common.{ApiMessageAndVersion, MetadataVersion} +import org.apache.zookeeper.CreateMode + +import java.util +import java.util.function.Consumer +import java.util.{Collections, Optional} +import scala.collection.Seq +import scala.jdk.CollectionConverters._ +import scala.jdk.OptionConverters._ + +object ZkMigrationClient { + def brokerToBrokerRegistration(broker: Broker, epoch: Long): ZkBrokerRegistration = { + val registration = new BrokerRegistration(broker.id, epoch, Uuid.ZERO_UUID, Review Comment: Just wondering, do we use the incarnation id for anything? ## core/src/main/scala/kafka/migration/ZkMigrationClient.scala: ## @@ -0,0 +1,359 @@ +package kafka.migration + +import kafka.api.LeaderAndIsr +import kafka.cluster.{Broker, EndPoint} +import kafka.controller.{ControllerChannelManager, LeaderIsrAndControllerEpoch, ReplicaAssignment} +import kafka.migration.ZkMigrationClient.brokerToBrokerRegistration +import kafka.server.{ConfigEntityName, ConfigType, ZkAdminManager} +import kafka.utils.Logging +import kafka.zk.TopicZNode.TopicIdReplicaAssignment +import kafka.zk._ +import kafka.zookeeper._ +import org.apache.kafka.common.config.ConfigResource +import org.apache.kafka.common.errors.ControllerMovedException +import org.apache.kafka.common.metadata.ClientQuotaRecord.EntityData +import org.apache.kafka.common.metadata._ +import org.apache.kafka.common.requests.{AbstractControlRequest, AbstractResponse} +import
[GitHub] [kafka] jeffkbkim commented on a diff in pull request #12847: KAFKA-14367; Add `SyncGroup` to the new `GroupCoordinator` interface
jeffkbkim commented on code in PR #12847: URL: https://github.com/apache/kafka/pull/12847#discussion_r1024721499 ## core/src/test/scala/unit/kafka/coordinator/group/GroupCoordinatorAdapterTest.scala: ## @@ -135,4 +133,71 @@ class GroupCoordinatorAdapterTest { assertEquals(expectedData, future.get()) } + @ParameterizedTest + @ApiKeyVersionsSource(apiKey = ApiKeys.SYNC_GROUP) + def testSyncGroup(version: Short): Unit = { +val groupCoordinator = mock(classOf[GroupCoordinator]) +val adapter = new GroupCoordinatorAdapter(groupCoordinator) + +val ctx = makeContext(version) +val data = new SyncGroupRequestData() + .setGroupId("group") + .setMemberId("member1") + .setGroupInstanceId("instance") + .setProtocolType("consumer") + .setProtocolName("range") + .setGenerationId(10) + .setAssignments(List( +new SyncGroupRequestData.SyncGroupRequestAssignment() + .setMemberId("member1") + .setAssignment("member1".getBytes()), +new SyncGroupRequestData.SyncGroupRequestAssignment() + .setMemberId("member2") + .setAssignment("member2".getBytes()) + ).asJava) + +val future = adapter.syncGroup(ctx, data) +assertFalse(future.isDone) + +val capturedAssignment: ArgumentCaptor[Map[String, Array[Byte]]] = + ArgumentCaptor.forClass(classOf[Map[String, Array[Byte]]]) +val capturedCallback: ArgumentCaptor[SyncGroupCallback] = + ArgumentCaptor.forClass(classOf[SyncGroupCallback]) + +verify(groupCoordinator).handleSyncGroup( + ArgumentMatchers.eq(data.groupId), + ArgumentMatchers.eq(data.generationId), + ArgumentMatchers.eq(data.memberId), + ArgumentMatchers.eq(Some(data.protocolType)), + ArgumentMatchers.eq(Some(data.protocolName)), + ArgumentMatchers.eq(Some(data.groupInstanceId)), + capturedAssignment.capture(), + capturedCallback.capture(), + ArgumentMatchers.eq(RequestLocal(ctx.bufferSupplier)) +) + +assertEquals(Map( + "member1" -> "member1", + "member2" -> "member2", +), capturedAssignment.getValue.map { case (member, metadata) => + (member, new String(metadata)) +}.toMap) Review Comment: do we need toMap? ## core/src/test/scala/unit/kafka/coordinator/group/GroupCoordinatorAdapterTest.scala: ## @@ -135,4 +133,71 @@ class GroupCoordinatorAdapterTest { assertEquals(expectedData, future.get()) } + @ParameterizedTest + @ApiKeyVersionsSource(apiKey = ApiKeys.SYNC_GROUP) + def testSyncGroup(version: Short): Unit = { +val groupCoordinator = mock(classOf[GroupCoordinator]) +val adapter = new GroupCoordinatorAdapter(groupCoordinator) + +val ctx = makeContext(version) +val data = new SyncGroupRequestData() + .setGroupId("group") + .setMemberId("member1") + .setGroupInstanceId("instance") + .setProtocolType("consumer") + .setProtocolName("range") + .setGenerationId(10) + .setAssignments(List( +new SyncGroupRequestData.SyncGroupRequestAssignment() + .setMemberId("member1") + .setAssignment("member1".getBytes()), +new SyncGroupRequestData.SyncGroupRequestAssignment() + .setMemberId("member2") + .setAssignment("member2".getBytes()) + ).asJava) + +val future = adapter.syncGroup(ctx, data) +assertFalse(future.isDone) + +val capturedAssignment: ArgumentCaptor[Map[String, Array[Byte]]] = + ArgumentCaptor.forClass(classOf[Map[String, Array[Byte]]]) +val capturedCallback: ArgumentCaptor[SyncGroupCallback] = + ArgumentCaptor.forClass(classOf[SyncGroupCallback]) + +verify(groupCoordinator).handleSyncGroup( + ArgumentMatchers.eq(data.groupId), + ArgumentMatchers.eq(data.generationId), + ArgumentMatchers.eq(data.memberId), + ArgumentMatchers.eq(Some(data.protocolType)), + ArgumentMatchers.eq(Some(data.protocolName)), + ArgumentMatchers.eq(Some(data.groupInstanceId)), + capturedAssignment.capture(), + capturedCallback.capture(), + ArgumentMatchers.eq(RequestLocal(ctx.bufferSupplier)) +) + +assertEquals(Map( + "member1" -> "member1", + "member2" -> "member2", +), capturedAssignment.getValue.map { case (member, metadata) => + (member, new String(metadata)) +}.toMap) + +capturedCallback.getValue.apply(SyncGroupResult( + error = Errors.NONE, + protocolType = Some("consumer"), + protocolName = Some("range"), + memberAssignment = "member1".getBytes() +)) + +val expectedResponseData = new SyncGroupResponseData() + .setErrorCode(Errors.NONE.code) + .setProtocolType("consumer") + .setProtocolName("range") Review Comment: generally i see a lot of literal strings being used across the tests in GroupCoordinatorAdapterTest. what's the reason for not re-using them, is it the convention? -- This is an automated message from
[GitHub] [kafka] showuon commented on a diff in pull request #12748: KAFKA-13715: add generationId field in subscription
showuon commented on code in PR #12748: URL: https://github.com/apache/kafka/pull/12748#discussion_r1024726967 ## clients/src/main/java/org/apache/kafka/clients/consumer/internals/AbstractStickyAssignor.java: ## @@ -162,6 +162,17 @@ private boolean allSubscriptionsEqual(Set allTopics, return isAllSubscriptionsEqual; } +// visible for testing +MemberData memberDataFromSubscription(Subscription subscription) { +if (!subscription.ownedPartitions().isEmpty() && subscription.generationId().isPresent()) { Review Comment: > For instance, this could happen if the group has more members than partitions. In that case, the `subscription.generationId()` should still contain valid number. Is that right? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Updated] (KAFKA-14382) StreamThreads can miss rebalance events when processing records during a rebalance
[ https://issues.apache.org/jira/browse/KAFKA-14382?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] A. Sophie Blee-Goldman updated KAFKA-14382: --- Fix Version/s: 3.4.0 > StreamThreads can miss rebalance events when processing records during a > rebalance > -- > > Key: KAFKA-14382 > URL: https://issues.apache.org/jira/browse/KAFKA-14382 > Project: Kafka > Issue Type: Bug > Components: streams >Reporter: A. Sophie Blee-Goldman >Priority: Major > Fix For: 3.4.0 > > > One of the main improvements introduced by the cooperative protocol was the > ability to continue processing records during a rebalance. In Streams, we > take advantage of this by polling with a timeout of 0 when a rebalance is/has > been in progress, so it can return immediately and continue on through the > main loop to process new records. The main poll loop uses an algorithm based > on the max.poll.interval.ms to ensure the StreamThread returns to call #poll > in time to stay in the consumer group. > > Generally speaking, it should exit the processing loop and invoke poll within > a few minutes at most based on the poll interval, though typically it will > break out much sooner once it's used up all the records from the last poll > (based on the max.poll.records config which Streams sets to 1,000 by > default). However, if doing heavy processing or setting a higher > max.poll.records, the thread may continue processing for more than a few > seconds. If it had sent out a JoinGroup request before going on to process > and was waiting for its JoinGroup response, then once it does return to > invoke #poll it will process this response and send out a SyncGroup – but if > the processing took too long, this SyncGroup may immediately fail with the > REBALANCE_IN_PROGRESS error. > > Essentially, while the thread was processing the group leader will itself be > processing the JoinGroup subscriptions of all members and generating an > assignment, then sending this back in its SyncGroup. This may take only a few > seconds or less, and the group coordinator will not yet have noticed (or > care) that one of the consumers hasn't sent a SyncGroup – it will just return > the assigned partitions in the SyncGroup request of the members who have > responded in time, and "complete" the rebalance in their eyes. But if the > assignment involved moving any partitions from one consumer to another, then > it will need to trigger a followup rebalance right away to finish assigning > those partitions which were revoked in the previous rebalance. This is what > causes a new rebalance to be kicked off just seconds after the first one > began. > > If the consumer that was stuck processing was among those who needed to > revoke partitions, this can lead to repeating rebalances – since it fails the > SyncGroup of the 1st rebalance it never receives the assignment for it and > never knows to revoke those partitions, meaning it will rejoin for the new > rebalance still claiming them among its ownedPartitions. When the assignor > generates the same assignment for the 2nd rebalance, it will again see that > some partitions need to be revoked and will therefore trigger yet another new > rebalance after finishing the 2nd. This can go on for as long as the > StreamThreads are struggling to finish the JoinGroup phase in time due to > processing. > > Note that the best workaround at the moment is probably to just set a lower > max.poll.records to reduce the processing loop duration -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Assigned] (KAFKA-14382) StreamThreads can miss rebalance events when processing records during a rebalance
[ https://issues.apache.org/jira/browse/KAFKA-14382?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] A. Sophie Blee-Goldman reassigned KAFKA-14382: -- Assignee: A. Sophie Blee-Goldman > StreamThreads can miss rebalance events when processing records during a > rebalance > -- > > Key: KAFKA-14382 > URL: https://issues.apache.org/jira/browse/KAFKA-14382 > Project: Kafka > Issue Type: Bug > Components: streams >Reporter: A. Sophie Blee-Goldman >Assignee: A. Sophie Blee-Goldman >Priority: Major > Fix For: 3.4.0 > > > One of the main improvements introduced by the cooperative protocol was the > ability to continue processing records during a rebalance. In Streams, we > take advantage of this by polling with a timeout of 0 when a rebalance is/has > been in progress, so it can return immediately and continue on through the > main loop to process new records. The main poll loop uses an algorithm based > on the max.poll.interval.ms to ensure the StreamThread returns to call #poll > in time to stay in the consumer group. > > Generally speaking, it should exit the processing loop and invoke poll within > a few minutes at most based on the poll interval, though typically it will > break out much sooner once it's used up all the records from the last poll > (based on the max.poll.records config which Streams sets to 1,000 by > default). However, if doing heavy processing or setting a higher > max.poll.records, the thread may continue processing for more than a few > seconds. If it had sent out a JoinGroup request before going on to process > and was waiting for its JoinGroup response, then once it does return to > invoke #poll it will process this response and send out a SyncGroup – but if > the processing took too long, this SyncGroup may immediately fail with the > REBALANCE_IN_PROGRESS error. > > Essentially, while the thread was processing the group leader will itself be > processing the JoinGroup subscriptions of all members and generating an > assignment, then sending this back in its SyncGroup. This may take only a few > seconds or less, and the group coordinator will not yet have noticed (or > care) that one of the consumers hasn't sent a SyncGroup – it will just return > the assigned partitions in the SyncGroup request of the members who have > responded in time, and "complete" the rebalance in their eyes. But if the > assignment involved moving any partitions from one consumer to another, then > it will need to trigger a followup rebalance right away to finish assigning > those partitions which were revoked in the previous rebalance. This is what > causes a new rebalance to be kicked off just seconds after the first one > began. > > If the consumer that was stuck processing was among those who needed to > revoke partitions, this can lead to repeating rebalances – since it fails the > SyncGroup of the 1st rebalance it never receives the assignment for it and > never knows to revoke those partitions, meaning it will rejoin for the new > rebalance still claiming them among its ownedPartitions. When the assignor > generates the same assignment for the 2nd rebalance, it will again see that > some partitions need to be revoked and will therefore trigger yet another new > rebalance after finishing the 2nd. This can go on for as long as the > StreamThreads are struggling to finish the JoinGroup phase in time due to > processing. > > Note that the best workaround at the moment is probably to just set a lower > max.poll.records to reduce the processing loop duration -- This message was sent by Atlassian Jira (v8.20.10#820010)
[GitHub] [kafka] ableegoldman opened a new pull request, #12869: KAFKA-14382: wait for current rebalance to complete before triggering followup
ableegoldman opened a new pull request, #12869: URL: https://github.com/apache/kafka/pull/12869 Fix for the subtle bug described in KAFKA-14382 that was causing rebalancing loops. If we trigger a new rebalance while the current one is still ongoing, it may cause some members to fail the first rebalance if they weren't able to send the SyncGroup request in time (for example due to processing records during the rebalance). This means those consumers never receive their assignment from the original rebalance, and won't revoke any partitions they might have needed to. This can send the group into a loop as each rebalance schedules a new followup cooperative rebalance due to partitions that need to be revoked, and each followup rebalance causes some consumer(s) to miss the SyncGroup and never revoke those partitions. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] jeffkbkim commented on a diff in pull request #12845: KAFKA-14367; Add `JoinGroup` to the new `GroupCoordinator` interface
jeffkbkim commented on code in PR #12845: URL: https://github.com/apache/kafka/pull/12845#discussion_r1024713679 ## core/src/main/scala/kafka/server/KafkaApis.scala: ## @@ -1646,73 +1649,46 @@ class KafkaApis(val requestChannel: RequestChannel, } } + private def makeGroupCoordinatorRequestContextFrom( Review Comment: the `From` part i guess. i don't think i've seen a method name like it, though it does make sense -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] jeffkbkim commented on a diff in pull request #12845: KAFKA-14367; Add `JoinGroup` to the new `GroupCoordinator` interface
jeffkbkim commented on code in PR #12845: URL: https://github.com/apache/kafka/pull/12845#discussion_r1024712573 ## core/src/test/scala/unit/kafka/coordinator/group/GroupCoordinatorAdapterTest.scala: ## @@ -0,0 +1,138 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package kafka.coordinator.group + +import kafka.coordinator.group.GroupCoordinatorConcurrencyTest.JoinGroupCallback +import kafka.server.RequestLocal + +import org.apache.kafka.common.message.{JoinGroupRequestData, JoinGroupResponseData} +import org.apache.kafka.common.message.JoinGroupRequestData.JoinGroupRequestProtocol +import org.apache.kafka.common.message.JoinGroupResponseData.JoinGroupResponseMember +import org.apache.kafka.common.protocol.{ApiKeys, Errors} +import org.apache.kafka.common.utils.BufferSupplier +import org.apache.kafka.common.utils.annotation.ApiKeyVersionsSource +import org.apache.kafka.coordinator.group.GroupCoordinatorRequestContext + +import org.junit.jupiter.api.Assertions.{assertEquals, assertFalse, assertTrue} +import org.junit.jupiter.params.ParameterizedTest +import org.mockito.{ArgumentCaptor, ArgumentMatchers} +import org.mockito.Mockito.{mock, verify} + +import java.net.InetAddress +import scala.jdk.CollectionConverters._ + +class GroupCoordinatorAdapterTest { + + private def makeContext( +apiVersion: Short + ): GroupCoordinatorRequestContext = { +new GroupCoordinatorRequestContext( + apiVersion, + "client", + InetAddress.getLocalHost, + BufferSupplier.create() +) + } + + @ParameterizedTest + @ApiKeyVersionsSource(apiKey = ApiKeys.JOIN_GROUP) + def testJoinGroup(version: Short): Unit = { Review Comment: that makes sense. thanks! -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Commented] (KAFKA-13639) NotEnoughReplicasException for __consumer_offsets topic due to out of order offset
[ https://issues.apache.org/jira/browse/KAFKA-13639?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17635089#comment-17635089 ] HaiyuanZhao commented on KAFKA-13639: - [~ijuma] Hi, We also have seen similar behavior with java 1.8.0_45. In addition, we encountered this issue very large values, and not just topic __consumer_offset. The exception log is as followed. Hope this can help. {code:java} [2022-11-16 22:54:52,147] ERROR [ReplicaFetcher replicaId=0, leaderId=1, fetcherId=419] Unexpected error occurred while processing data for partition test-topic-test-topic500--2 at offset 7434 (kafka.server.ReplicaFetcherThread) kafka.common.OffsetsOutOfOrderException: Out of order offsets found in append to test-topic-test-topic500--2: ArrayBuffer(4126996017, 4126996018, 4126996019, 4126996020, 4126996021, 4126996022, 4126996023, 4126996024, 4126996025, 4126996026, 4126996027, 4126996028, 4126996029, 4126996030, 4126996031, 4126996032, 4126996033, 4126996034, 4126996035, 4126996036, 4126 996037, 4126996038, 4126996039, 4126996040, 4126996041, 4126996042, 4126996043, 4126996044, 4126996045, 4126996046, 4126996047, 4126996048, 4126996049, 4126996050, 4126996051, 412699605 2, 4126996053, 4126996054, 4126996055, 4126996056, 4126996057, 4126996058, 4126996059, 4126996060, 4126996061, 4126996062, 4126996063, 4126996064, 4126996065, 4126996066, 4126996067, 41 26996068, 4126996069, 4126996070, 4126996071, 4126996072, 4126996073, 4126996074, 4126996075, 4126996076, 4126996077, 4126996078, 4126996079, 4126996080, 4126996081, 4126996082, 4126996 083, 4126996084, 4126996085, 4126996086, 4126996087, 4126996088, 4126996089, 4126996090, 4126996091, 4126996092, 4126996093, 4126996094, 4126996095, 4126996096, 4126996097, 4126996098, 4126996099, 4126996100, 4126996101, 4126996102, 4126996103, 4126996104, 4126996105, 4126996106, 4126996107, 4126996108, 4126996109, 4126996110, 4126996111, 4126996112, 4126996113, 41269 96114, 4126996115, 4126996116, 4126996117, 4126996118, 4126996119, 4126996120, 4126996121, 4126996122, 4126996123, 4126996124, 4126996125, 4126996126, 4126996127, 4126996128, 4126996129 , 4126996130, 4126996131, 4126996132, 4126996133, 4126996134, 4126996135, 4126996136, 4126996137, 4126996138, 4126996139, 4126996140, 4126996141, 4126996142, 4071685845, 4071685846, 407 1685847, 4071685848, 4071685849, 4071685850, 4071685851, 4071685852, 4071685853, 4071685854, 4071685855, 4071685856, 4071685857, 4071685858, 4071685859, 4071685860, 4071685861, 40716858 62, 4071685863, 4071685864, 4071685865, 4071685866, 4071685867, 4071685868, 4071685869, 4071685870, 4071685871, 4071685872, 4071685873, 4071685874, 4071685875, 4071685876, 4071685877, 4 071685878, 4071685879, 4071685880, 4071685881, 4071685882, 4071685883, 4071685884, 4071685885, 4071685886, 4071685887, 4071685888, 4071685889, 4071685890, 4071685891, 4071685892, 407168 5893, 4071685894, 4071685895, 4071685896, 4071685897, 4071685898, 4071685899, 4071685900, 4071685901, 4071685902, 4071685903, 4071685904, 4071685905, 4071685906, 4071685907, 4071685908, 4071685909, 4071685910, 4071685911, 4071685912, 4071685913, 4071685914, 4071685915, 4071685916, 4071685917, 4071685918, 4071685919, 4071685920, 4071685921, 4071685922, 4071685923, 4071 685924, 4071685925, 4071685926, 4071685927, 4071685928, 4071685929, 4071685930, 4071685931, 4071685932, 4071685933, 4071685934, 4071685935, 4071685936, 4071685937, 4071685938, 407168593 9, 4071685940, 4071685941, 4071685942, 4071685943, 4071685944, 4071685945, 4071685946, 4071685947, 4071685948, 4071685949, 4071685950, 4071685951, 4071685952, 4071685953, 4071685954, 40 71685955, 4071685956, 4071685957, 4071685958, 4071685959, 4071685960, 4071685961, 4071685962, 4071685963, 4071685964, 4071685965, 4071685966, 4071685967, 4071685968, 4071685969, 4071685 970) at kafka.log.Log.$anonfun$append$2(Log.scala:1165) at kafka.log.Log.append(Log.scala:2409) at kafka.log.Log.appendAsFollower(Log.scala:1073) at kafka.cluster.Partition.doAppendRecordsToFollowerOrFutureReplica(Partition.scala:1036) at kafka.cluster.Partition.appendRecordsToFollowerOrFutureReplica(Partition.scala:1043) at kafka.server.ReplicaFetcherThread.processPartitionData(ReplicaFetcherThread.scala:172) at kafka.server.AbstractFetcherThread.$anonfun$processFetchRequest$7(AbstractFetcherThread.scala:339) at kafka.server.AbstractFetcherThread.$anonfun$processFetchRequest$6(AbstractFetcherThread.scala:326) at kafka.server.AbstractFetcherThread.$anonfun$processFetchRequest$6$adapted(AbstractFetcherThread.scala:325) at kafka.server.AbstractFetcherThread$$Lambda$1298/1889097333.apply(Unknown Source) at kafka.utils.Implicits$MapExtensionMethods$.$anonfun$forKeyValue$1(Implicits.scala:62) at kafka.server.AbstractFetcherThread$$Lambda$1332/1703195329.apply(Unknown Source)
[GitHub] [kafka] pprovenzano commented on pull request #12843: KAFKA-14375: Remove use of "authorizer-properties" from EndToEndAuthorizerTest
pprovenzano commented on PR #12843: URL: https://github.com/apache/kafka/pull/12843#issuecomment-1317984289 I have manually tested all the failed tests from the checks and they all pass fine locally. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] ableegoldman commented on a diff in pull request #12803: KAFKA-13602: Adding ability to multicast records.
ableegoldman commented on code in PR #12803: URL: https://github.com/apache/kafka/pull/12803#discussion_r1024690215 ## streams/src/main/java/org/apache/kafka/streams/processor/StreamPartitioner.java: ## @@ -58,5 +62,24 @@ * @param numPartitions the total number of partitions * @return an integer between 0 and {@code numPartitions-1}, or {@code null} if the default partitioning logic should be used */ +@Deprecated Integer partition(String topic, K key, V value, int numPartitions); + +/** + * Determine the partition numbers to which a record, with the given key and value and the current number + * of partitions, should be multi-casted to. + * @param topic the topic name this record is sent to + * @param key the key of the record + * @param value the value of the record + * @param numPartitions the total number of partitions + * @return an Optional of Set of integers between 0 and {@code numPartitions-1}, + * Empty optional means using default partitioner + * Optional of an empty set means the record won't be sent to any partitions i.e drop it. + * Optional of Set of integers means the partitions to which the record should be sent to. + * */ +default Optional> partitions(String topic, K key, V value, int numPartitions) { +final Integer partition = partition(topic, key, value, numPartitions); +return partition == null ? Optional.empty() : Optional.of(Collections.singleton(partition(topic, key, value, numPartitions))); Review Comment: ```suggestion return partition == null ? Optional.empty() : Optional.of(Collections.singleton(partition)); ``` ## streams/src/main/java/org/apache/kafka/streams/processor/StreamPartitioner.java: ## @@ -58,5 +62,24 @@ * @param numPartitions the total number of partitions * @return an integer between 0 and {@code numPartitions-1}, or {@code null} if the default partitioning logic should be used */ +@Deprecated Integer partition(String topic, K key, V value, int numPartitions); + +/** + * Determine the partition numbers to which a record, with the given key and value and the current number + * of partitions, should be multi-casted to. Review Comment: nit: wording here is a bit awkward, sounds like we're talking about the current number of partitions of the record, not the topic -- maybe something like this? ```suggestion * Determine the number(s) of the partition(s) to which a record with the given key and value should be sent, * for the given topic and current partition count ``` ## streams/src/main/java/org/apache/kafka/streams/processor/internals/RecordCollectorImpl.java: ## @@ -150,16 +151,29 @@ public void send(final String topic, ); } if (partitions.size() > 0) { -partition = partitioner.partition(topic, key, value, partitions.size()); +final Optional> maybeMulticastPartitions = partitioner.partitions(topic, key, value, partitions.size()); +if (!maybeMulticastPartitions.isPresent()) { +// New change. Use default partitioner Review Comment: ```suggestion // A null//empty partition indicates we should use the default partitioner ``` ## streams/src/main/java/org/apache/kafka/streams/kstream/internals/KTableImpl.java: ## @@ -1059,6 +1071,7 @@ private KTable doJoinOnForeignKey(final KTable forei Objects.requireNonNull(tableJoined, "tableJoined can't be null"); Objects.requireNonNull(materialized, "materialized can't be null"); + Review Comment: nit: accidental extra line? ## streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamsMetadataState.java: ## @@ -459,12 +461,23 @@ private List rebuildMetadataForSingleTopology(final Map>, Integer> getPartition = maybeMulticastPartitions -> { +if (!maybeMulticastPartitions.isPresent()) { +return null; +} +if (maybeMulticastPartitions.get().size() != 1) { +throw new IllegalArgumentException("The partitions returned by StreamPartitioner#partitions method when used for fetching KeyQueryMetadata for key should be a singleton set"); Review Comment: Hm...the IQ case seems to complicate things a bit, I think we may have missed this during the KIP discussion/design. Because there's no reason to enforce that the partitioner return only a single partition, right? In fact this would even be incorrect in some cases, if the partitioner had originally opted to send this record/key to multiple partitions. But obviously the current API doesn't allow for that. I think we need to make a small update to the KIP to account for the possibility of multiple partitions here. We can just modify the `KeyQueryMetadata` class to
[jira] [Updated] (KAFKA-14397) Idempotent producer may bump epoch and reset sequence numbers prematurely
[ https://issues.apache.org/jira/browse/KAFKA-14397?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Jason Gustafson updated KAFKA-14397: Description: Suppose that idempotence is enabled in the producer and we send the following single-record batches to a partition leader: * A: epoch=0, seq=0 * B: epoch=0, seq=1 * C: epoch=0, seq=2 The partition leader receives all 3 of these batches and commits them to the log. However, the connection is lost before the `Produce` responses are received by the client. Subsequent retries by the producer all fail to be delivered. It is possible in this scenario for the first batch `A` to reach the delivery timeout before the subsequence batches. This triggers the following check: [https://github.com/apache/kafka/blob/trunk/clients/src/main/java/org/apache/kafka/clients/producer/internals/Sender.java#L642.] Depending whether retries are exhausted, we may adjust sequence numbers. The intuition behind this check is that if retries have not been exhausted, then we saw a fatal error and the batch could not have been written to the log. Hence we should bump the epoch and adjust the sequence numbers of the pending batches since they are presumed to be doomed to failure. So in this case, batches B and C might get reset with the bumped epoch: * B: epoch=1, seq=0 * C: epoch=1, seq=1 If the producer is able to reach the partition leader before these batches are expired locally, then they may get written and committed to the log. This can result in duplicates. The root of the issue is that this logic does not account for expiration of the delivery timeout. When the delivery timeout is reached, the number of retries is still likely much lower than the max allowed number of retries (which is `Int.MaxValue` by default). was: Suppose that idempotence is enabled in the producer and we send the following single-record batches to a partition leader: * A: epoch=0, seq=0 * B: epoch=0, seq=1 * C: epoch=0, seq=2 The partition leader receives all 3 of these batches and commits them to the log. However, the connection is lost before the `Produce` responses are received by the client. Subsequent retries by the producer all fail to be delivered. It is possible in this scenario for the first batch `A` to reach the delivery timeout before the subsequence batches. This triggers the following check: [https://github.com/apache/kafka/blob/trunk/clients/src/main/java/org/apache/kafka/clients/producer/internals/Sender.java#L642.] Depending whether retries are exhausted, we may adjust sequence numbers. The intuition behind this check is that if retries have not been exhausted, then we saw a fatal error and the batch could not have been written to the log. Hence we should bump the epoch and adjust the sequence numbers of the pending batches since they are presumed to be doomed to failure. So in this case, batches B and C might get reset with the bumped epoch: * B: epoch=1, seq=0 * C: epoch=1, seq=1 This can result in duplicate records in the log. The root of the issue is that this logic does not account for expiration of the delivery timeout. When the delivery timeout is reached, the number of retries is still likely much lower than the max allowed number of retries (which is `Int.MaxValue` by default). > Idempotent producer may bump epoch and reset sequence numbers prematurely > - > > Key: KAFKA-14397 > URL: https://issues.apache.org/jira/browse/KAFKA-14397 > Project: Kafka > Issue Type: Bug >Reporter: Jason Gustafson >Assignee: Jason Gustafson >Priority: Major > > Suppose that idempotence is enabled in the producer and we send the following > single-record batches to a partition leader: > * A: epoch=0, seq=0 > * B: epoch=0, seq=1 > * C: epoch=0, seq=2 > The partition leader receives all 3 of these batches and commits them to the > log. However, the connection is lost before the `Produce` responses are > received by the client. Subsequent retries by the producer all fail to be > delivered. > It is possible in this scenario for the first batch `A` to reach the delivery > timeout before the subsequence batches. This triggers the following check: > [https://github.com/apache/kafka/blob/trunk/clients/src/main/java/org/apache/kafka/clients/producer/internals/Sender.java#L642.] > Depending whether retries are exhausted, we may adjust sequence numbers. > The intuition behind this check is that if retries have not been exhausted, > then we saw a fatal error and the batch could not have been written to the > log. Hence we should bump the epoch and adjust the sequence numbers of the > pending batches since they are presumed to be doomed to failure. So in this > case, batches B and C might get reset with the bumped epoch: >
[jira] [Created] (KAFKA-14397) Idempotent producer may bump epoch and reset sequence numbers prematurely
Jason Gustafson created KAFKA-14397: --- Summary: Idempotent producer may bump epoch and reset sequence numbers prematurely Key: KAFKA-14397 URL: https://issues.apache.org/jira/browse/KAFKA-14397 Project: Kafka Issue Type: Bug Reporter: Jason Gustafson Assignee: Jason Gustafson Suppose that idempotence is enabled in the producer and we send the following single-record batches to a partition leader: * A: epoch=0, seq=0 * B: epoch=0, seq=1 * C: epoch=0, seq=2 The partition leader receives all 3 of these batches and commits them to the log. However, the connection is lost before the `Produce` responses are received by the client. Subsequent retries by the producer all fail to be delivered. It is possible in this scenario for the first batch `A` to reach the delivery timeout before the subsequence batches. This triggers the following check: [https://github.com/apache/kafka/blob/trunk/clients/src/main/java/org/apache/kafka/clients/producer/internals/Sender.java#L642.] Depending whether retries are exhausted, we may adjust sequence numbers. The intuition behind this check is that if retries have not been exhausted, then we saw a fatal error and the batch could not have been written to the log. Hence we should bump the epoch and adjust the sequence numbers of the pending batches since they are presumed to be doomed to failure. So in this case, batches B and C might get reset with the bumped epoch: * B: epoch=1, seq=0 * C: epoch=1, seq=1 This can result in duplicate records in the log. The root of the issue is that this logic does not account for expiration of the delivery timeout. When the delivery timeout is reached, the number of retries is still likely much lower than the max allowed number of retries (which is `Int.MaxValue` by default). -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Updated] (KAFKA-14388) NPE When Retrieving StateStore with new Processor API
[ https://issues.apache.org/jira/browse/KAFKA-14388?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Bill Bejeck updated KAFKA-14388: Component/s: streams > NPE When Retrieving StateStore with new Processor API > - > > Key: KAFKA-14388 > URL: https://issues.apache.org/jira/browse/KAFKA-14388 > Project: Kafka > Issue Type: Bug > Components: streams >Affects Versions: 3.3.0, 3.3.1 >Reporter: Bill Bejeck >Assignee: Bill Bejeck >Priority: Major > Fix For: 3.4.0, 3.3.2 > > > Using the new Processor API introduced with KIP-820 when adding a state store > to the Processor when executing `context().getStore("store-name")` always > returns `null` as the store is not in the `stores` `HashMap` in the > `ProcessorStateManager`. This occurs even when using the > `ConnectedStoreProvider.stores()` method > I've confirmed the store is associated with the processor by viewing the > `Topology` description. > From some initial triage, it looks like the store is never registered. -- This message was sent by Atlassian Jira (v8.20.10#820010)
[GitHub] [kafka] bbejeck commented on pull request #12861: KAFKA-14388 - Fixes the NPE when using the new Processor API with the DSL
bbejeck commented on PR #12861: URL: https://github.com/apache/kafka/pull/12861#issuecomment-1317781612 Cherry-picked to 3.3 -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Commented] (KAFKA-14388) NPE When Retrieving StateStore with new Processor API
[ https://issues.apache.org/jira/browse/KAFKA-14388?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17635035#comment-17635035 ] Bill Bejeck commented on KAFKA-14388: - Cherry-picked to 3.3 > NPE When Retrieving StateStore with new Processor API > - > > Key: KAFKA-14388 > URL: https://issues.apache.org/jira/browse/KAFKA-14388 > Project: Kafka > Issue Type: Bug >Affects Versions: 3.3.0, 3.3.1 >Reporter: Bill Bejeck >Assignee: Bill Bejeck >Priority: Major > Fix For: 3.4.0, 3.3.2 > > > Using the new Processor API introduced with KIP-820 when adding a state store > to the Processor when executing `context().getStore("store-name")` always > returns `null` as the store is not in the `stores` `HashMap` in the > `ProcessorStateManager`. This occurs even when using the > `ConnectedStoreProvider.stores()` method > I've confirmed the store is associated with the processor by viewing the > `Topology` description. > From some initial triage, it looks like the store is never registered. -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Updated] (KAFKA-14396) Flaky memory leak tests rely on System.gc for correctness
[ https://issues.apache.org/jira/browse/KAFKA-14396?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Greg Harris updated KAFKA-14396: Labels: flaky-test (was: ) > Flaky memory leak tests rely on System.gc for correctness > - > > Key: KAFKA-14396 > URL: https://issues.apache.org/jira/browse/KAFKA-14396 > Project: Kafka > Issue Type: Test >Reporter: Greg Harris >Priority: Minor > Labels: flaky-test > > There are a few tests which currently call System.gc to help verify that code > running during a test does not leak memory. These tests are: > * > org.apache.kafka.common.memory.GarbageCollectedMemoryPoolTest#testBuffersGarbageCollected > * > org.apache.kafka.common.record.MemoryRecordsBuilderTest#testBuffersDereferencedOnClose > * > org.apache.kafka.streams.state.internals.ThreadCacheTest#cacheOverheadsSmallValues > * > org.apache.kafka.streams.state.internals.ThreadCacheTest#cacheOverheadsLargeValues > Unfortunately the System.gc call is only an advisory call to the JVM, as > documented: > > When control returns from the method call, the Java Virtual Machine has > > made a best effort to reclaim space from all discarded objects. > This means that the System.gc call may not have performed any garbage > collection at all, and so tests which expect garbage collection to have > happened will not always succeed. For example, a no-op is an implementation > of the System.gc method which would fulfill the method contract. > To reproduce this class of failures: > 1. Comment out the System.gc calls > 2. Run the test > We should try to find an alternative method of verifying that these > components do not have memory leaks that does not rely on the > implementation-specific behavior of the containing JVM runtime. For example, > verifying that buffers have been closed may be a proxy for the literal memory > references being released and garbage collected. -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Created] (KAFKA-14396) Flaky memory leak tests rely on System.gc for correctness
Greg Harris created KAFKA-14396: --- Summary: Flaky memory leak tests rely on System.gc for correctness Key: KAFKA-14396 URL: https://issues.apache.org/jira/browse/KAFKA-14396 Project: Kafka Issue Type: Test Reporter: Greg Harris There are a few tests which currently call System.gc to help verify that code running during a test does not leak memory. These tests are: * org.apache.kafka.common.memory.GarbageCollectedMemoryPoolTest#testBuffersGarbageCollected * org.apache.kafka.common.record.MemoryRecordsBuilderTest#testBuffersDereferencedOnClose * org.apache.kafka.streams.state.internals.ThreadCacheTest#cacheOverheadsSmallValues * org.apache.kafka.streams.state.internals.ThreadCacheTest#cacheOverheadsLargeValues Unfortunately the System.gc call is only an advisory call to the JVM, as documented: > When control returns from the method call, the Java Virtual Machine has made > a best effort to reclaim space from all discarded objects. This means that the System.gc call may not have performed any garbage collection at all, and so tests which expect garbage collection to have happened will not always succeed. For example, a no-op is an implementation of the System.gc method which would fulfill the method contract. To reproduce this class of failures: 1. Comment out the System.gc calls 2. Run the test We should try to find an alternative method of verifying that these components do not have memory leaks that does not rely on the implementation-specific behavior of the containing JVM runtime. For example, verifying that buffers have been closed may be a proxy for the literal memory references being released and garbage collected. -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Updated] (KAFKA-14386) Change ReplicaPlacer place method to return a class instead of list of list of integers
[ https://issues.apache.org/jira/browse/KAFKA-14386?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Andrew Grant updated KAFKA-14386: - Description: In KRaft mode, a new interface was introduced, [ReplicaPlacer|https://github.com/apache/kafka/blob/trunk/metadata/src/main/java/org/apache/kafka/metadata/placement/ReplicaPlacer.java], that is used by ReplicationControlManager to create partition assignments during both CreateTopics and CreatePartitions RPCs. Right now it has one implementation, StripedReplicaPlacer. Currently, it has a method called place that returns a list of list of integers: {code:java} List> place( PlacementSpec placement, ClusterDescriber cluster ) throws InvalidReplicationFactorException;{code} The index corresponds to the partition ID and the integers are the replicas of the assignment. The suggestion is to update the interface so that it models topic and partitions more explicitly. I'm thinking something like: {code:java} TopicAssignment place( PlacementSpec placement, ClusterDescriber cluster ) throws InvalidReplicationFactorException;{code} where we have {code:java} public class TopicAssignment { private List assignments; public TopicAssignment(List assignments) { this.assignments = assignments; } public List assignments() { return assignments; } }{code} and {code:java} public class PartitionAssignment { private List replicas; public PartitionAssignment(List replicas) { this.replicas = replicas; } public List replicas() { return replicas; } }{code} There are two reasons for the suggestion. First, as mentioned above, it will make the interface, arguably, a bit more readable and understandable by explicitly modeling the idea of topic and partition. Second and more importantly, it makes the interface more extendable in the future. Right now it would be challenging to add more metadata to the response. By having classes, we can easily add fields to them without breaking/changing the interface. For example, in the CreatePartitions RPC we are adding partitions to an existing topic and we might want to add some metadata to response making it clear which partition the assignment starts at which could look something like: public class TopicAssignment {private List assignments; private Integer firstPartitionId;public TopicAssignment(List assignments, Integer firstPartitionId) { this.assignments = assignments; this.firstPartitionId = firstPartitionId; }public List assignments() { return assignments; } ... } Curious to hear other folks thoughts on this. was: In KRaft mode, a new interface was introduced, [ReplicaPlacer|https://github.com/apache/kafka/blob/trunk/metadata/src/main/java/org/apache/kafka/metadata/placement/ReplicaPlacer.java], that is used by ReplicationControlManager to create partition assignments during both CreateTopics and CreatePartitions RPCs. Right now it has one implementation, StripedReplicaPlacer. Currently, it has a method called place that returns a list of list of integers: {code:java} List> place( PlacementSpec placement, ClusterDescriber cluster ) throws InvalidReplicationFactorException;{code} The index corresponds to the partition ID and the integers are the replicas of the assignment. The suggestion is to update the interface so that it models topic and partitions more explicitly. I'm thinking something like: {code:java} TopicAssignment place( PlacementSpec placement, ClusterDescriber cluster ) throws InvalidReplicationFactorException;{code} where we have {code:java} public class TopicAssignment { private List assignments; public TopicAssignment(List assignments) { this.assignments = assignments; } public List assignments() { return assignments; } }{code} and {code:java} public class PartitionAssignment { private List replicas; public PartitionAssignment(List replicas) { this.replicas = replicas; } public List replicas() { return replicas; } }{code} There are two reasons for the suggestion. First, as mentioned above, it will make the interface, arguably, a bit more readable and understandable by explicitly modeling the idea of topic and partition. Second and more importantly, it makes the interface more extendable in the future. Right now it would be challenging to add more metadata to the response. By having classes, we can easily add fields to them without breaking/changing the interface. Curious to hear other folks thoughts on this. > Change ReplicaPlacer place method to return a class instead of list of list > of integers > --- > > Key: KAFKA-14386 > URL: https://issues.apache.org/jira/browse/KAFKA-14386 > Project: Kafka > Issue Type: Improvement >
[jira] [Resolved] (KAFKA-14388) NPE When Retrieving StateStore with new Processor API
[ https://issues.apache.org/jira/browse/KAFKA-14388?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Bill Bejeck resolved KAFKA-14388. - Resolution: Fixed Merged PR to trunk > NPE When Retrieving StateStore with new Processor API > - > > Key: KAFKA-14388 > URL: https://issues.apache.org/jira/browse/KAFKA-14388 > Project: Kafka > Issue Type: Bug >Affects Versions: 3.3.0, 3.3.1 >Reporter: Bill Bejeck >Assignee: Bill Bejeck >Priority: Major > Fix For: 3.4.0, 3.3.2 > > > Using the new Processor API introduced with KIP-820 when adding a state store > to the Processor when executing `context().getStore("store-name")` always > returns `null` as the store is not in the `stores` `HashMap` in the > `ProcessorStateManager`. This occurs even when using the > `ConnectedStoreProvider.stores()` method > I've confirmed the store is associated with the processor by viewing the > `Topology` description. > From some initial triage, it looks like the store is never registered. -- This message was sent by Atlassian Jira (v8.20.10#820010)
[GitHub] [kafka] bbejeck commented on pull request #12861: KAFKA-14388 - Fixes the NPE when using the new Processor API with the DSL
bbejeck commented on PR #12861: URL: https://github.com/apache/kafka/pull/12861#issuecomment-1317733576 Merged #12861 into trunk -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] bbejeck merged pull request #12861: KAFKA-14388 - Fixes the NPE when using the new Processor API with the DSL
bbejeck merged PR #12861: URL: https://github.com/apache/kafka/pull/12861 -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] bbejeck commented on pull request #12861: KAFKA-14388 - Fixes the NPE when using the new Processor API with the DSL
bbejeck commented on PR #12861: URL: https://github.com/apache/kafka/pull/12861#issuecomment-1317730593 Failures unrelated -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] gharris1727 commented on pull request #12826: Using Timer class to track expiry in IncrementalCooperativeAssignor
gharris1727 commented on PR #12826: URL: https://github.com/apache/kafka/pull/12826#issuecomment-1317721387 @vamossagar12 Thanks for the further explanation. As proposed, I don't think this refactor improves readability and maintainability more than it costs to implement, review, and merge. I think there are other potential changes that deserve your valuable attention more, and that we can leave this `long` as-is for a little longer. Thanks! -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] gharris1727 commented on a diff in pull request #12846: KAFKA-14293: Basic Auth filter should set the SecurityContext after a…
gharris1727 commented on code in PR #12846: URL: https://github.com/apache/kafka/pull/12846#discussion_r1024519136 ## connect/basic-auth-extension/src/main/java/org/apache/kafka/connect/rest/basic/auth/extension/JaasBasicAuthFilter.java: ## @@ -174,4 +153,84 @@ public void handle(Callback[] callbacks) throws UnsupportedCallbackException { )); } } + +private void setSecurityContextForRequest(ContainerRequestContext requestContext, BasicAuthenticationCredential credential) { +requestContext.setSecurityContext(new SecurityContext() { +@Override +public Principal getUserPrincipal() { +return () -> credential.getUsername(); +} + +@Override +public boolean isUserInRole(String role) { +return false; +} + +@Override +public boolean isSecure() { +return requestContext.getUriInfo().getRequestUri().getScheme().equalsIgnoreCase("https"); +} + +@Override +public String getAuthenticationScheme() { +return BASIC_AUTH; +} +}); +} + + +public static class BasicAuthenticationCredential { +private String username; +private String password; + +public BasicAuthenticationCredential(String authorizationHeader) { +final char colon = ':'; +final char space = ' '; +final String authType = "basic"; + +initializeEmptyCredentials(); + +if (authorizationHeader == null) { +log.trace("No credentials were provided with the request"); +return; +} + +int spaceIndex = authorizationHeader.indexOf(space); +if (spaceIndex <= 0) { +log.trace("Request credentials were malformed; no space present in value for authorization header"); +return; +} + +String method = authorizationHeader.substring(0, spaceIndex); +if (!authType.equalsIgnoreCase(method)) { +log.trace("Request credentials used {} authentication, but only {} supported; ignoring", method, authType); +return; +} + +authorizationHeader = authorizationHeader.substring(spaceIndex + 1); +authorizationHeader = new String(Base64.getDecoder().decode(authorizationHeader), +StandardCharsets.UTF_8); +int i = authorizationHeader.indexOf(colon); +if (i <= 0) { +log.trace("Request credentials were malformed; no colon present between username and password"); +return; +} + +this.username = authorizationHeader.substring(0, i); +this.password = authorizationHeader.substring(i + 1); +} + +private void initializeEmptyCredentials() { +this.username = ""; +this.password = ""; +} Review Comment: This is different than the previous behavior. Previously, if the authorizationHeader was null, the username/password would be null as well. Now they are empty string. Is this an intentional change? ## connect/basic-auth-extension/src/main/java/org/apache/kafka/connect/rest/basic/auth/extension/JaasBasicAuthFilter.java: ## @@ -174,4 +153,84 @@ public void handle(Callback[] callbacks) throws UnsupportedCallbackException { )); } } + +private void setSecurityContextForRequest(ContainerRequestContext requestContext, BasicAuthenticationCredential credential) { +requestContext.setSecurityContext(new SecurityContext() { +@Override +public Principal getUserPrincipal() { +return () -> credential.getUsername(); +} + +@Override +public boolean isUserInRole(String role) { +return false; +} + +@Override +public boolean isSecure() { +return requestContext.getUriInfo().getRequestUri().getScheme().equalsIgnoreCase("https"); +} + +@Override +public String getAuthenticationScheme() { +return BASIC_AUTH; +} +}); +} + + +public static class BasicAuthenticationCredential { +private String username; +private String password; + +public BasicAuthenticationCredential(String authorizationHeader) { +final char colon = ':'; +final char space = ' '; +final String authType = "basic"; Review Comment: nit: these can remain static constants. Maybe we can even pull from SecurityContext.BASIC_AUTH. ## connect/basic-auth-extension/src/main/java/org/apache/kafka/connect/rest/basic/auth/extension/JaasBasicAuthFilter.java: ## @@ -174,4 +153,84 @@
[jira] [Updated] (KAFKA-14388) NPE When Retrieving StateStore with new Processor API
[ https://issues.apache.org/jira/browse/KAFKA-14388?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Bill Bejeck updated KAFKA-14388: Fix Version/s: 3.3.2 > NPE When Retrieving StateStore with new Processor API > - > > Key: KAFKA-14388 > URL: https://issues.apache.org/jira/browse/KAFKA-14388 > Project: Kafka > Issue Type: Bug >Affects Versions: 3.3.0, 3.3.1 >Reporter: Bill Bejeck >Assignee: Bill Bejeck >Priority: Major > Fix For: 3.4.0, 3.3.2 > > > Using the new Processor API introduced with KIP-820 when adding a state store > to the Processor when executing `context().getStore("store-name")` always > returns `null` as the store is not in the `stores` `HashMap` in the > `ProcessorStateManager`. This occurs even when using the > `ConnectedStoreProvider.stores()` method > I've confirmed the store is associated with the processor by viewing the > `Topology` description. > From some initial triage, it looks like the store is never registered. -- This message was sent by Atlassian Jira (v8.20.10#820010)
[GitHub] [kafka] joel-hamill commented on pull request #12868: MINOR: fix syntax typo
joel-hamill commented on PR #12868: URL: https://github.com/apache/kafka/pull/12868#issuecomment-1317664755 cc @guozhangwang -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] joel-hamill opened a new pull request, #12868: MINOR: fix syntax typo
joel-hamill opened a new pull request, #12868: URL: https://github.com/apache/kafka/pull/12868 *More detailed description of your change, if necessary. The PR title and PR message become the squashed commit message, so use a separate comment to ping reviewers.* Replaces ``` with `'` *Summary of testing strategy (including rationale) for the feature or bug fix. Unit and/or integration tests are expected for any behaviour change and system tests should be considered for larger changes.* ### Committer Checklist (excluded from commit message) - [ ] Verify design and implementation - [ ] Verify test coverage and CI build status - [ ] Verify documentation (including upgrade notes) -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] hachikuji merged pull request #12864: MINOR: Handle JoinGroupResponseData.protocolName backward compatibility in JoinGroupResponse
hachikuji merged PR #12864: URL: https://github.com/apache/kafka/pull/12864 -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] pprovenzano commented on a diff in pull request #12843: KAFKA-14375: Remove use of "authorizer-properties" from EndToEndAuthorizerTest
pprovenzano commented on code in PR #12843: URL: https://github.com/apache/kafka/pull/12843#discussion_r1024471383 ## core/src/test/scala/integration/kafka/api/SslEndToEndAuthorizationTest.scala: ## @@ -37,8 +37,8 @@ object SslEndToEndAuthorizationTest { override def build(context: AuthenticationContext): KafkaPrincipal = { val peerPrincipal = context.asInstanceOf[SslAuthenticationContext].session.getPeerPrincipal.getName peerPrincipal match { -case Pattern(name, _) => - val principal = if (name == "server") name else peerPrincipal +case Pattern(name, extra) => Review Comment: Done -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] pprovenzano commented on a diff in pull request #12843: KAFKA-14375: Remove use of "authorizer-properties" from EndToEndAuthorizerTest
pprovenzano commented on code in PR #12843: URL: https://github.com/apache/kafka/pull/12843#discussion_r1024471192 ## core/src/test/scala/integration/kafka/api/EndToEndAuthorizationTest.scala: ## @@ -96,96 +90,52 @@ abstract class EndToEndAuthorizationTest extends IntegrationTestHarness with Sas def clientPrincipal: KafkaPrincipal def kafkaPrincipal: KafkaPrincipal - // Arguments to AclCommand to set ACLs. - def clusterActionArgs: Array[String] = Array("--authorizer-properties", - s"zookeeper.connect=$zkConnect", - s"--add", - s"--cluster", - s"--operation=ClusterAction", - s"--allow-principal=$kafkaPrincipal") - // necessary to create SCRAM credentials via the admin client using the broker's credentials - // without this we would need to create the SCRAM credentials via ZooKeeper - def clusterAlterArgs: Array[String] = Array("--authorizer-properties", - s"zookeeper.connect=$zkConnect", - s"--add", - s"--cluster", - s"--operation=Alter", - s"--allow-principal=$kafkaPrincipal") - def topicBrokerReadAclArgs: Array[String] = Array("--authorizer-properties", - s"zookeeper.connect=$zkConnect", - s"--add", - s"--topic=$wildcard", - s"--operation=Read", - s"--allow-principal=$kafkaPrincipal") - def produceAclArgs(topic: String): Array[String] = Array("--authorizer-properties", - s"zookeeper.connect=$zkConnect", - s"--add", - s"--topic=$topic", - s"--producer", - s"--allow-principal=$clientPrincipal") - def describeAclArgs: Array[String] = Array("--authorizer-properties", - s"zookeeper.connect=$zkConnect", - s"--add", - s"--topic=$topic", - s"--operation=Describe", - s"--allow-principal=$clientPrincipal") - def deleteDescribeAclArgs: Array[String] = Array("--authorizer-properties", - s"zookeeper.connect=$zkConnect", - s"--remove", - s"--force", - s"--topic=$topic", - s"--operation=Describe", - s"--allow-principal=$clientPrincipal") - def deleteWriteAclArgs: Array[String] = Array("--authorizer-properties", - s"zookeeper.connect=$zkConnect", - s"--remove", - s"--force", - s"--topic=$topic", - s"--operation=Write", - s"--allow-principal=$clientPrincipal") - def consumeAclArgs(topic: String): Array[String] = Array("--authorizer-properties", - s"zookeeper.connect=$zkConnect", - s"--add", - s"--topic=$topic", - s"--group=$group", - s"--consumer", - s"--allow-principal=$clientPrincipal") - def groupAclArgs: Array[String] = Array("--authorizer-properties", - s"zookeeper.connect=$zkConnect", - s"--add", - s"--group=$group", - s"--operation=Read", - s"--allow-principal=$clientPrincipal") - def produceConsumeWildcardAclArgs: Array[String] = Array("--authorizer-properties", - s"zookeeper.connect=$zkConnect", - s"--add", - s"--topic=$wildcard", - s"--group=$wildcard", - s"--consumer", - s"--producer", -
[GitHub] [kafka] gharris1727 commented on a diff in pull request #12828: KAFKA-14346: Remove hard-to-mock RestClient calls
gharris1727 commented on code in PR #12828: URL: https://github.com/apache/kafka/pull/12828#discussion_r1024441937 ## connect/runtime/src/main/java/org/apache/kafka/connect/runtime/rest/RestClient.java: ## @@ -43,10 +43,38 @@ import java.util.concurrent.ExecutionException; import java.util.concurrent.TimeoutException; -public class RestClient { +public class RestClient implements AutoCloseable { private static final Logger log = LoggerFactory.getLogger(RestClient.class); private static final ObjectMapper JSON_SERDE = new ObjectMapper(); +private final HttpClient client; +public RestClient(WorkerConfig config) { +client = new HttpClient(SSLUtils.createClientSideSslContextFactory(config)); Review Comment: It's even better now: It actually works! Also it tests all phases of the roll from a non-SSL to SSL cluster. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] jolshan commented on a diff in pull request #12847: KAFKA-14367; Add `SyncGroup` to the new `GroupCoordinator` interface
jolshan commented on code in PR #12847: URL: https://github.com/apache/kafka/pull/12847#discussion_r1024435836 ## core/src/test/scala/unit/kafka/server/KafkaApisTest.scala: ## @@ -2690,110 +2689,150 @@ class KafkaApisTest { assertEquals(Errors.GROUP_AUTHORIZATION_FAILED, response.error) } - @Test - def testSyncGroupProtocolTypeAndName(): Unit = { -for (version <- ApiKeys.SYNC_GROUP.oldestVersion to ApiKeys.SYNC_GROUP.latestVersion) { - testSyncGroupProtocolTypeAndName(version.asInstanceOf[Short]) -} - } + @ParameterizedTest + @ApiKeyVersionsSource(apiKey = ApiKeys.SYNC_GROUP) + def testHandleSyncGroupRequest(version: Short): Unit = { +val syncGroupRequest = new SyncGroupRequestData() + .setGroupId("group") + .setMemberId("member") + .setProtocolType("consumer") + .setProtocolName("range") - def testSyncGroupProtocolTypeAndName(version: Short): Unit = { -reset(groupCoordinator, clientRequestQuotaManager, requestChannel, replicaManager) +val requestChannelRequest = buildRequest(new SyncGroupRequest.Builder(syncGroupRequest).build(version)) -val groupId = "group" -val memberId = "member1" -val protocolType = "consumer" -val protocolName = "range" +val expectedRequestContext = new GroupCoordinatorRequestContext( + version, + requestChannelRequest.context.clientId, + requestChannelRequest.context.clientAddress, + RequestLocal.NoCaching.bufferSupplier +) -val capturedCallback: ArgumentCaptor[SyncGroupCallback] = ArgumentCaptor.forClass(classOf[SyncGroupCallback]) +val expectedSyncGroupRequest = new SyncGroupRequestData() + .setGroupId("group") + .setMemberId("member") + .setProtocolType(if (version >= 5) "consumer" else null) + .setProtocolName(if (version >= 5) "range" else null) -val requestLocal = RequestLocal.withThreadConfinedCaching -val syncGroupRequest = new SyncGroupRequest.Builder( - new SyncGroupRequestData() -.setGroupId(groupId) -.setGenerationId(0) -.setMemberId(memberId) -.setProtocolType(protocolType) -.setProtocolName(protocolName) -).build(version) +val future = new CompletableFuture[SyncGroupResponseData]() +when(newGroupCoordinator.syncGroup( + ArgumentMatchers.eq(expectedRequestContext), + ArgumentMatchers.eq(expectedSyncGroupRequest) +)).thenReturn(future) -val requestChannelRequest = buildRequest(syncGroupRequest) +createKafkaApis().handleSyncGroupRequest( + requestChannelRequest, + RequestLocal.NoCaching +) -createKafkaApis().handleSyncGroupRequest(requestChannelRequest, requestLocal) +val expectedSyncGroupResponse = new SyncGroupResponseData() + .setProtocolType("consumer") + .setProtocolName("range") -verify(groupCoordinator).handleSyncGroup( - ArgumentMatchers.eq(groupId), - ArgumentMatchers.eq(0), - ArgumentMatchers.eq(memberId), - ArgumentMatchers.eq(if (version >= 5) Some(protocolType) else None), - ArgumentMatchers.eq(if (version >= 5) Some(protocolName) else None), - ArgumentMatchers.eq(None), - ArgumentMatchers.eq(Map.empty), - capturedCallback.capture(), - ArgumentMatchers.eq(requestLocal) +future.complete(expectedSyncGroupResponse) +val capturedResponse = verifyNoThrottling(requestChannelRequest) +val response = capturedResponse.getValue.asInstanceOf[SyncGroupResponse] +assertEquals(expectedSyncGroupResponse, response.data) + } + + @Test + def testHandleSyncGroupRequestFutureFailed(): Unit = { Review Comment: Confirming my understanding, we added `testHandleSyncGroupRequestFutureFailed` and `testHandleSyncGroupRequestAuthenticationFailed`? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] mjsax commented on pull request #12867: KAFKA-13785: docs for emit final
mjsax commented on PR #12867: URL: https://github.com/apache/kafka/pull/12867#issuecomment-1317561590 Thanks for the PR. Merged to `trunk`. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] mjsax merged pull request #12867: KAFKA-13785: docs for emit final
mjsax merged PR #12867: URL: https://github.com/apache/kafka/pull/12867 -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] mjsax commented on a diff in pull request #12867: KAFKA-13785: docs for emit final
mjsax commented on code in PR #12867: URL: https://github.com/apache/kafka/pull/12867#discussion_r1024420977 ## streams/src/main/java/org/apache/kafka/streams/kstream/SessionWindowedKStream.java: ## @@ -644,6 +644,16 @@ KTable, V> reduce(final Reducer reducer, final Named named, final Materialized> materialized); -// TODO: add javadoc +/** + * Configure when the aggregated result will be emitted for {@code SessionWindowedKStream}. + * + * For example, for {@link EmitStrategy#onWindowClose} strategy, the aggregated result for a + * window will only be emitted when the window closes. For {@link EmitStrategy#onWindowUpdate()} + * strategy, the aggregated result for a window will be emitted whenever there is an update to Review Comment: > emitted whenever there is an update Should we mention caching in addition? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Assigned] (KAFKA-10532) Do not wipe state store under EOS when closing-dirty a RESTORING active or RUNNING standby task
[ https://issues.apache.org/jira/browse/KAFKA-10532?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Shay Lin reassigned KAFKA-10532: Assignee: Shay Lin > Do not wipe state store under EOS when closing-dirty a RESTORING active or > RUNNING standby task > --- > > Key: KAFKA-10532 > URL: https://issues.apache.org/jira/browse/KAFKA-10532 > Project: Kafka > Issue Type: Improvement > Components: streams >Reporter: Guozhang Wang >Assignee: Shay Lin >Priority: Major > Labels: new-streams-runtime-should-fix, newbie++ > > Today whenever we are closing-dirty a task, we always wipe out the state > stores if we are under EOS. But when the closing task was a RESTORING active, > or a RUNNING standby, we may actually not need to wipe out the stores since > we know that upon resuming, we would still continue restoring the task before > transit to processing ever (assuming the LEO offset would not be truncated), > i.e. when they resumes it does not matter if the same records gets applied > twice during the continued restoration. -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Comment Edited] (KAFKA-10532) Do not wipe state store under EOS when closing-dirty a RESTORING active or RUNNING standby task
[ https://issues.apache.org/jira/browse/KAFKA-10532?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17634982#comment-17634982 ] Shay Lin edited comment on KAFKA-10532 at 11/16/22 7:16 PM: Hi [~guozhang] I'm new to KS contribution and would like to take a stab at this issue. Could you give me some pointers to the "closing task" in context? was (Author: JIRAUSER285919): Hi [~guozhang] I'm new to KS contribution and would like to take a stab at this issue. Could you give me some pointers to the "closing task" in context? > Do not wipe state store under EOS when closing-dirty a RESTORING active or > RUNNING standby task > --- > > Key: KAFKA-10532 > URL: https://issues.apache.org/jira/browse/KAFKA-10532 > Project: Kafka > Issue Type: Improvement > Components: streams >Reporter: Guozhang Wang >Priority: Major > Labels: new-streams-runtime-should-fix, newbie++ > > Today whenever we are closing-dirty a task, we always wipe out the state > stores if we are under EOS. But when the closing task was a RESTORING active, > or a RUNNING standby, we may actually not need to wipe out the stores since > we know that upon resuming, we would still continue restoring the task before > transit to processing ever (assuming the LEO offset would not be truncated), > i.e. when they resumes it does not matter if the same records gets applied > twice during the continued restoration. -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Commented] (KAFKA-10532) Do not wipe state store under EOS when closing-dirty a RESTORING active or RUNNING standby task
[ https://issues.apache.org/jira/browse/KAFKA-10532?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17634982#comment-17634982 ] Shay Lin commented on KAFKA-10532: -- Hi [~guozhang] I'm new to KS contribution and would like to take a stab at this issue. Could you give me some pointers to the "closing task" in context? > Do not wipe state store under EOS when closing-dirty a RESTORING active or > RUNNING standby task > --- > > Key: KAFKA-10532 > URL: https://issues.apache.org/jira/browse/KAFKA-10532 > Project: Kafka > Issue Type: Improvement > Components: streams >Reporter: Guozhang Wang >Priority: Major > Labels: new-streams-runtime-should-fix, newbie++ > > Today whenever we are closing-dirty a task, we always wipe out the state > stores if we are under EOS. But when the closing task was a RESTORING active, > or a RUNNING standby, we may actually not need to wipe out the stores since > we know that upon resuming, we would still continue restoring the task before > transit to processing ever (assuming the LEO offset would not be truncated), > i.e. when they resumes it does not matter if the same records gets applied > twice during the continued restoration. -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Assigned] (KAFKA-14395) Add config to configure client supplier for KafkaStreams
[ https://issues.apache.org/jira/browse/KAFKA-14395?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Hao Li reassigned KAFKA-14395: -- Assignee: Hao Li > Add config to configure client supplier for KafkaStreams > > > Key: KAFKA-14395 > URL: https://issues.apache.org/jira/browse/KAFKA-14395 > Project: Kafka > Issue Type: Improvement > Components: streams >Reporter: Hao Li >Assignee: Hao Li >Priority: Major > Labels: kip > > For KIP: > https://cwiki.apache.org/confluence/display/KAFKA/KIP-884%3A+Add+config+to+configure+KafkaClientSupplier+in+Kafka+Streams -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Resolved] (KAFKA-13785) Support emit final result for windowed aggregation
[ https://issues.apache.org/jira/browse/KAFKA-13785?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Hao Li resolved KAFKA-13785. Resolution: Done > Support emit final result for windowed aggregation > -- > > Key: KAFKA-13785 > URL: https://issues.apache.org/jira/browse/KAFKA-13785 > Project: Kafka > Issue Type: Improvement > Components: streams >Reporter: Hao Li >Assignee: Hao Li >Priority: Major > Labels: kip > > For KIP-825: > https://cwiki.apache.org/confluence/display/KAFKA/KIP-825%3A+introduce+a+new+API+to+control+when+aggregated+results+are+produced -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Closed] (KAFKA-13785) Support emit final result for windowed aggregation
[ https://issues.apache.org/jira/browse/KAFKA-13785?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Hao Li closed KAFKA-13785. -- > Support emit final result for windowed aggregation > -- > > Key: KAFKA-13785 > URL: https://issues.apache.org/jira/browse/KAFKA-13785 > Project: Kafka > Issue Type: Improvement > Components: streams >Reporter: Hao Li >Assignee: Hao Li >Priority: Major > Labels: kip > > For KIP-825: > https://cwiki.apache.org/confluence/display/KAFKA/KIP-825%3A+introduce+a+new+API+to+control+when+aggregated+results+are+produced -- This message was sent by Atlassian Jira (v8.20.10#820010)
[GitHub] [kafka] lihaosky opened a new pull request, #12867: KAFKA-13785: docs for emit final
lihaosky opened a new pull request, #12867: URL: https://github.com/apache/kafka/pull/12867 Add Java docs for emit final ### Committer Checklist (excluded from commit message) - [ ] Verify design and implementation - [ ] Verify test coverage and CI build status - [ ] Verify documentation (including upgrade notes) -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] jolshan commented on a diff in pull request #12847: KAFKA-14367; Add `SyncGroup` to the new `GroupCoordinator` interface
jolshan commented on code in PR #12847: URL: https://github.com/apache/kafka/pull/12847#discussion_r1024400644 ## core/src/test/scala/unit/kafka/coordinator/group/GroupCoordinatorAdapterTest.scala: ## @@ -135,4 +133,71 @@ class GroupCoordinatorAdapterTest { assertEquals(expectedData, future.get()) } + @ParameterizedTest + @ApiKeyVersionsSource(apiKey = ApiKeys.SYNC_GROUP) + def testSyncGroup(version: Short): Unit = { Review Comment: in the JoinGroup PR we suggested renaming, so maybe something similar can be done here. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] jolshan commented on a diff in pull request #12847: KAFKA-14367; Add `SyncGroup` to the new `GroupCoordinator` interface
jolshan commented on code in PR #12847: URL: https://github.com/apache/kafka/pull/12847#discussion_r1024397613 ## core/src/test/scala/unit/kafka/coordinator/group/GroupCoordinatorAdapterTest.scala: ## @@ -16,17 +16,15 @@ */ package kafka.coordinator.group -import kafka.coordinator.group.GroupCoordinatorConcurrencyTest.JoinGroupCallback +import kafka.coordinator.group.GroupCoordinatorConcurrencyTest.{JoinGroupCallback, SyncGroupCallback} import kafka.server.RequestLocal - Review Comment: nit: more of these -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] omkreddy commented on a diff in pull request #12843: KAFKA-14375: Remove use of "authorizer-properties" from EndToEndAuthorizerTest
omkreddy commented on code in PR #12843: URL: https://github.com/apache/kafka/pull/12843#discussion_r1024315000 ## core/src/test/scala/integration/kafka/api/EndToEndAuthorizationTest.scala: ## @@ -96,96 +90,52 @@ abstract class EndToEndAuthorizationTest extends IntegrationTestHarness with Sas def clientPrincipal: KafkaPrincipal def kafkaPrincipal: KafkaPrincipal - // Arguments to AclCommand to set ACLs. - def clusterActionArgs: Array[String] = Array("--authorizer-properties", - s"zookeeper.connect=$zkConnect", - s"--add", - s"--cluster", - s"--operation=ClusterAction", - s"--allow-principal=$kafkaPrincipal") - // necessary to create SCRAM credentials via the admin client using the broker's credentials - // without this we would need to create the SCRAM credentials via ZooKeeper - def clusterAlterArgs: Array[String] = Array("--authorizer-properties", - s"zookeeper.connect=$zkConnect", - s"--add", - s"--cluster", - s"--operation=Alter", - s"--allow-principal=$kafkaPrincipal") - def topicBrokerReadAclArgs: Array[String] = Array("--authorizer-properties", - s"zookeeper.connect=$zkConnect", - s"--add", - s"--topic=$wildcard", - s"--operation=Read", - s"--allow-principal=$kafkaPrincipal") - def produceAclArgs(topic: String): Array[String] = Array("--authorizer-properties", - s"zookeeper.connect=$zkConnect", - s"--add", - s"--topic=$topic", - s"--producer", - s"--allow-principal=$clientPrincipal") - def describeAclArgs: Array[String] = Array("--authorizer-properties", - s"zookeeper.connect=$zkConnect", - s"--add", - s"--topic=$topic", - s"--operation=Describe", - s"--allow-principal=$clientPrincipal") - def deleteDescribeAclArgs: Array[String] = Array("--authorizer-properties", - s"zookeeper.connect=$zkConnect", - s"--remove", - s"--force", - s"--topic=$topic", - s"--operation=Describe", - s"--allow-principal=$clientPrincipal") - def deleteWriteAclArgs: Array[String] = Array("--authorizer-properties", - s"zookeeper.connect=$zkConnect", - s"--remove", - s"--force", - s"--topic=$topic", - s"--operation=Write", - s"--allow-principal=$clientPrincipal") - def consumeAclArgs(topic: String): Array[String] = Array("--authorizer-properties", - s"zookeeper.connect=$zkConnect", - s"--add", - s"--topic=$topic", - s"--group=$group", - s"--consumer", - s"--allow-principal=$clientPrincipal") - def groupAclArgs: Array[String] = Array("--authorizer-properties", - s"zookeeper.connect=$zkConnect", - s"--add", - s"--group=$group", - s"--operation=Read", - s"--allow-principal=$clientPrincipal") - def produceConsumeWildcardAclArgs: Array[String] = Array("--authorizer-properties", - s"zookeeper.connect=$zkConnect", - s"--add", - s"--topic=$wildcard", - s"--group=$wildcard", - s"--consumer", - s"--producer", -
[GitHub] [kafka] C0urante commented on a diff in pull request #12828: KAFKA-14346: Remove hard-to-mock RestClient calls
C0urante commented on code in PR #12828: URL: https://github.com/apache/kafka/pull/12828#discussion_r1024255803 ## connect/runtime/src/main/java/org/apache/kafka/connect/runtime/distributed/DistributedHerder.java: ## @@ -1906,6 +1917,8 @@ private void reconfigureConnector(final String connName, final Callback cb if (isLeader()) { writeToConfigTopicAsLeader(() -> configBackingStore.putTaskConfigs(connName, rawTaskProps)); cb.onCompletion(null, null); +} else if (restClient == null) { +throw new NotLeaderException("Request forwarding disabled in distributed MirrorMaker2; reconfiguring tasks must be performed by the leader", leaderUrl()); Review Comment: Similar wording suggestions for this message: ```suggestion // TODO: Update this message if KIP-710 is accepted and merged // (https://cwiki.apache.org/confluence/display/KAFKA/KIP-710%3A+Full+support+for+distributed+mode+in+dedicated+MirrorMaker+2.0+clusters) throw new NotLeaderException("This worker is not able to communicate with the leader of the cluster, " + "which is required for dynamically-reconfiguring connectors. If running MirrorMaker 2 " + "in dedicated mode, consider deploying the connectors for MirrorMaker 2 directly onto a " + "distributed Kafka Connect cluster.", leaderUrl() ); ``` ## connect/runtime/src/main/java/org/apache/kafka/connect/runtime/rest/RestClient.java: ## @@ -43,10 +43,38 @@ import java.util.concurrent.ExecutionException; import java.util.concurrent.TimeoutException; -public class RestClient { +public class RestClient implements AutoCloseable { private static final Logger log = LoggerFactory.getLogger(RestClient.class); private static final ObjectMapper JSON_SERDE = new ObjectMapper(); +private final HttpClient client; +public RestClient(WorkerConfig config) { +client = new HttpClient(SSLUtils.createClientSideSslContextFactory(config)); Review Comment: This new integration test is fantastic, thanks! ## connect/runtime/src/main/java/org/apache/kafka/connect/runtime/distributed/DistributedHerder.java: ## @@ -1157,16 +1163,21 @@ void fenceZombieSourceTasks(final ConnectorTaskId id, Callback callback) { if (error == null) { callback.onCompletion(null, null); } else if (error instanceof NotLeaderException) { -String forwardedUrl = ((NotLeaderException) error).forwardUrl() + "connectors/" + id.connector() + "/fence"; -log.trace("Forwarding zombie fencing request for connector {} to leader at {}", id.connector(), forwardedUrl); -forwardRequestExecutor.execute(() -> { -try { -RestClient.httpRequest(forwardedUrl, "PUT", null, null, null, config, sessionKey, requestSignatureAlgorithm); -callback.onCompletion(null, null); -} catch (Throwable t) { -callback.onCompletion(t, null); -} -}); +if (restClient != null) { +String forwardedUrl = ((NotLeaderException) error).forwardUrl() + "connectors/" + id.connector() + "/fence"; +log.trace("Forwarding zombie fencing request for connector {} to leader at {}", id.connector(), forwardedUrl); +forwardRequestExecutor.execute(() -> { +try { +restClient.httpRequest(forwardedUrl, "PUT", null, null, null, sessionKey, requestSignatureAlgorithm); +callback.onCompletion(null, null); +} catch (Throwable t) { +callback.onCompletion(t, null); +} +}); +} else { +error = ConnectUtils.maybeWrap(error, "Request forwarding disabled in distributed MirrorMaker2; fencing zombie source tasks must be performed by the leader"); Review Comment: We shouldn't use `maybeWrap` here since we know that the exception is a `NotLeaderException`, which extends `ConnectException`, so the error message will never actually be used. The message itself is also a little misleading since it implies that it'll be possible to bring up source tasks in exactly-once mode on multi-node clusters as long as the leader does the fencing, but that's not actually the case (it's impossible for a follower to start any source tasks with exactly-once enabled if it cannot issue a REST request to the leader's internal zombie fencing endpoint). We might consider
[GitHub] [kafka] akhileshchg commented on a diff in pull request #12860: Add RPC changes, records, and config from KIP-866
akhileshchg commented on code in PR #12860: URL: https://github.com/apache/kafka/pull/12860#discussion_r1024336463 ## metadata/src/main/java/org/apache/kafka/migration/ZkControllerState.java: ## @@ -0,0 +1,52 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.kafka.migration; + +public class ZkControllerState { Review Comment: Can you add a comment explaining this state as well? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Created] (KAFKA-14395) Add config to configure client supplier for KafkaStreams
Hao Li created KAFKA-14395: -- Summary: Add config to configure client supplier for KafkaStreams Key: KAFKA-14395 URL: https://issues.apache.org/jira/browse/KAFKA-14395 Project: Kafka Issue Type: Improvement Components: streams Reporter: Hao Li For KIP: https://cwiki.apache.org/confluence/display/KAFKA/KIP-884%3A+Add+config+to+configure+KafkaClientSupplier+in+Kafka+Streams -- This message was sent by Atlassian Jira (v8.20.10#820010)
[GitHub] [kafka] gharris1727 commented on pull request #12839: KAFKA-14346: Replace static mocking of WorkerConfig::lookupKafkaClusterId
gharris1727 commented on PR #12839: URL: https://github.com/apache/kafka/pull/12839#issuecomment-1317398884 @mimaison I fixed the non-trivial merge conflicts for the KafkaOffsetBackingStoreTest, please re-review that file in particular. Thanks! -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] pprovenzano commented on a diff in pull request #12843: KAFKA-14375: Remove use of "authorizer-properties" from EndToEndAuthorizerTest
pprovenzano commented on code in PR #12843: URL: https://github.com/apache/kafka/pull/12843#discussion_r1024275128 ## core/src/test/scala/integration/kafka/api/EndToEndAuthorizationTest.scala: ## @@ -200,11 +150,23 @@ abstract class EndToEndAuthorizationTest extends IntegrationTestHarness with Sas */ @BeforeEach override def setUp(testInfo: TestInfo): Unit = { + +// The next two configuration parameters enable ZooKeeper secure ACLs +// and sets the Kafka authorizer, both necessary to enable security. +this.serverConfig.setProperty(KafkaConfig.ZkEnableSecureAclsProp, "true") +this.serverConfig.setProperty(KafkaConfig.AuthorizerClassNameProp, authorizerClass.getName) + +// Set the specific principal that can update ACLs. +this.serverConfig.setProperty(AclAuthorizer.SuperUsersProp, kafkaPrincipal.toString) + super.setUp(testInfo) + +val superuserAdminClient = createSuperuserAdminClient() Review Comment: I tested as we discussed and in fact it is not necessary. I removed the code at setUp(). -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] C0urante commented on pull request #12828: KAFKA-14346: Remove hard-to-mock RestClient calls
C0urante commented on PR #12828: URL: https://github.com/apache/kafka/pull/12828#issuecomment-1317293132 @gharris1727 There appear to be test failures; can you take a look? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] C0urante commented on a diff in pull request #12233: MINOR: Clean up tmp files created by tests
C0urante commented on code in PR #12233: URL: https://github.com/apache/kafka/pull/12233#discussion_r1024220705 ## clients/src/test/java/org/apache/kafka/test/TestUtils.java: ## @@ -142,26 +142,40 @@ public static String randomString(final int len) { } /** - * Create an empty file in the default temporary-file directory, using `kafka` as the prefix and `tmp` as the - * suffix to generate its name. + * Create an empty file in the default temporary-file directory, using the given prefix and suffix + * to generate its name. + * @throws IOException */ -public static File tempFile() throws IOException { -final File file = File.createTempFile("kafka", ".tmp"); +public static File tempFile(final String prefix, final String suffix) throws IOException { +final File file = Files.createTempFile(prefix, suffix).toFile(); file.deleteOnExit(); +Exit.addShutdownHook("delete-temp-file-shutdown-hook", () -> { Review Comment: The idea (at least with calls to terminate the JVM) is that we can add a layer in between the components under test and the exit API provided by the SDK, and then alter that layer during testing to prevent the actual API from being accessed. One example is in the embedded Connect clusters we use for our integration tests: https://github.com/apache/kafka/blob/dbf5826cd545820652ed6d136727aa80a03f4f54/connect/runtime/src/test/java/org/apache/kafka/connect/util/clusters/EmbeddedConnectCluster.java#L111-L120 With shutdown hooks, it's mostly hypothetical at this point since AFAICT there are no actual invocations of `Exit::setShutdownHookAdder` in the code base, but I believe the intention would be similar. You'd intercept attempts by components under test to add shutdown hooks, both to prevent those hooks from actually being added (in other words, to prevent `Runtime.getRuntime().addShutdownHook` from being called), and possibly to make assertions on those hooks. We don't want to allow for the shutdown hooks we added in this PR to be intercepted, right? If so, we might consider invoking `Runtime.getRuntime().addShutdownHook` directly here. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] mimaison commented on pull request #12839: KAFKA-14346: Replace static mocking of WorkerConfig::lookupKafkaClusterId
mimaison commented on PR #12839: URL: https://github.com/apache/kafka/pull/12839#issuecomment-1317243176 @gharris1727 Can you rebase on trunk? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] bbejeck commented on pull request #12861: KAFKA-14388 - Fixes the NPE when using the new Processor API with the DSL
bbejeck commented on PR #12861: URL: https://github.com/apache/kafka/pull/12861#issuecomment-1317228760 Updates per comments - will merge once build completes -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] clolov commented on a diff in pull request #12821: KAFKA-14132: Replace PowerMock and EasyMock with Mockito in streams tests
clolov commented on code in PR #12821: URL: https://github.com/apache/kafka/pull/12821#discussion_r1024174183 ## streams/src/test/java/org/apache/kafka/streams/processor/internals/StateManagerUtilTest.java: ## @@ -42,77 +39,54 @@ import static java.util.Collections.emptyList; import static java.util.Collections.singletonList; -import static org.easymock.EasyMock.createStrictControl; -import static org.easymock.EasyMock.expect; -import static org.easymock.EasyMock.expectLastCall; import static org.junit.Assert.assertEquals; import static org.junit.Assert.assertThrows; -import static org.powermock.api.easymock.PowerMock.mockStatic; -import static org.powermock.api.easymock.PowerMock.replayAll; +import static org.mockito.Mockito.doThrow; +import static org.mockito.Mockito.mockStatic; +import static org.mockito.Mockito.never; +import static org.mockito.Mockito.verify; +import static org.mockito.Mockito.when; -@RunWith(PowerMockRunner.class) -@PrepareForTest(Utils.class) +@RunWith(MockitoJUnitRunner.StrictStubs.class) public class StateManagerUtilTest { -@Mock(type = MockType.NICE) +@Mock private ProcessorStateManager stateManager; -@Mock(type = MockType.NICE) +@Mock private StateDirectory stateDirectory; -@Mock(type = MockType.NICE) +@Mock private ProcessorTopology topology; -@Mock(type = MockType.NICE) +@Mock private InternalProcessorContext processorContext; -private IMocksControl ctrl; - private Logger logger = new LogContext("test").logger(AbstractTask.class); private final TaskId taskId = new TaskId(0, 0); -@Before -public void setup() { -ctrl = createStrictControl(); -topology = ctrl.createMock(ProcessorTopology.class); -processorContext = ctrl.createMock(InternalProcessorContext.class); - -stateManager = ctrl.createMock(ProcessorStateManager.class); -stateDirectory = ctrl.createMock(StateDirectory.class); -} - @Test public void testRegisterStateStoreWhenTopologyEmpty() { -expect(topology.stateStores()).andReturn(emptyList()); - -ctrl.checkOrder(true); Review Comment: I have hopefully addressed this in the next commit, but if I have missed or misunderstood something I will rework it. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] clolov commented on a diff in pull request #12821: KAFKA-14132: Replace PowerMock and EasyMock with Mockito in streams tests
clolov commented on code in PR #12821: URL: https://github.com/apache/kafka/pull/12821#discussion_r1024170827 ## streams/src/test/java/org/apache/kafka/streams/processor/internals/StateManagerUtilTest.java: ## @@ -187,151 +139,92 @@ public void testCloseStateManagerThrowsExceptionWhenClean() { // Thrown stateMgr exception will not be wrapped. assertEquals("state manager failed to close", thrown.getMessage()); -ctrl.verify(); +// The unlock logic should still be executed. +verify(stateDirectory).unlock(taskId); } @Test public void testCloseStateManagerThrowsExceptionWhenDirty() { -expect(stateManager.taskId()).andReturn(taskId); +when(stateManager.taskId()).thenReturn(taskId); -expect(stateDirectory.lock(taskId)).andReturn(true); +when(stateDirectory.lock(taskId)).thenReturn(true); -stateManager.close(); -expectLastCall().andThrow(new ProcessorStateException("state manager failed to close")); - -stateDirectory.unlock(taskId); - -ctrl.checkOrder(true); -ctrl.replay(); +doThrow(new ProcessorStateException("state manager failed to close")).when(stateManager).close(); assertThrows( ProcessorStateException.class, () -> StateManagerUtil.closeStateManager( logger, "logPrefix:", false, false, stateManager, stateDirectory, TaskType.ACTIVE)); -ctrl.verify(); +verify(stateDirectory).unlock(taskId); } @Test public void testCloseStateManagerWithStateStoreWipeOut() { -expect(stateManager.taskId()).andReturn(taskId); -expect(stateDirectory.lock(taskId)).andReturn(true); - -stateManager.close(); -expectLastCall(); +when(stateManager.taskId()).thenReturn(taskId); +when(stateDirectory.lock(taskId)).thenReturn(true); // The `baseDir` will be accessed when attempting to delete the state store. - expect(stateManager.baseDir()).andReturn(TestUtils.tempDirectory("state_store")); - -stateDirectory.unlock(taskId); -expectLastCall(); - -ctrl.checkOrder(true); -ctrl.replay(); + when(stateManager.baseDir()).thenReturn(TestUtils.tempDirectory("state_store")); StateManagerUtil.closeStateManager(logger, "logPrefix:", false, true, stateManager, stateDirectory, TaskType.ACTIVE); -ctrl.verify(); +verify(stateManager).close(); +verify(stateDirectory).unlock(taskId); } @Test public void shouldStillWipeStateStoresIfCloseThrowsException() throws IOException { final File randomFile = new File("/random/path"); -mockStatic(Utils.class); - -expect(stateManager.taskId()).andReturn(taskId); -expect(stateDirectory.lock(taskId)).andReturn(true); -stateManager.close(); -expectLastCall().andThrow(new ProcessorStateException("Close failed")); +when(stateManager.taskId()).thenReturn(taskId); +when(stateDirectory.lock(taskId)).thenReturn(true); -expect(stateManager.baseDir()).andReturn(randomFile); +doThrow(new ProcessorStateException("Close failed")).when(stateManager).close(); -Utils.delete(randomFile); +when(stateManager.baseDir()).thenReturn(randomFile); -stateDirectory.unlock(taskId); -expectLastCall(); +try (MockedStatic utils = mockStatic(Utils.class)) { +assertThrows(ProcessorStateException.class, () -> +StateManagerUtil.closeStateManager(logger, "logPrefix:", false, true, stateManager, stateDirectory, TaskType.ACTIVE)); +} -ctrl.checkOrder(true); -ctrl.replay(); - -replayAll(); - -assertThrows(ProcessorStateException.class, () -> -StateManagerUtil.closeStateManager(logger, "logPrefix:", false, true, stateManager, stateDirectory, TaskType.ACTIVE)); - -ctrl.verify(); +verify(stateDirectory).unlock(taskId); } @Test public void testCloseStateManagerWithStateStoreWipeOutRethrowWrappedIOException() throws IOException { final File unknownFile = new File("/unknown/path"); -mockStatic(Utils.class); - -expect(stateManager.taskId()).andReturn(taskId); -expect(stateDirectory.lock(taskId)).andReturn(true); - -stateManager.close(); -expectLastCall(); - -expect(stateManager.baseDir()).andReturn(unknownFile); -Utils.delete(unknownFile); -expectLastCall().andThrow(new IOException("Deletion failed")); +when(stateManager.taskId()).thenReturn(taskId); +when(stateDirectory.lock(taskId)).thenReturn(true); +when(stateManager.baseDir()).thenReturn(unknownFile); -stateDirectory.unlock(taskId); -expectLastCall(); +try (MockedStatic utils = mockStatic(Utils.class)) { +
[GitHub] [kafka] clolov commented on a diff in pull request #12821: KAFKA-14132: Replace PowerMock and EasyMock with Mockito in streams tests
clolov commented on code in PR #12821: URL: https://github.com/apache/kafka/pull/12821#discussion_r1024170827 ## streams/src/test/java/org/apache/kafka/streams/processor/internals/StateManagerUtilTest.java: ## @@ -187,151 +139,92 @@ public void testCloseStateManagerThrowsExceptionWhenClean() { // Thrown stateMgr exception will not be wrapped. assertEquals("state manager failed to close", thrown.getMessage()); -ctrl.verify(); +// The unlock logic should still be executed. +verify(stateDirectory).unlock(taskId); } @Test public void testCloseStateManagerThrowsExceptionWhenDirty() { -expect(stateManager.taskId()).andReturn(taskId); +when(stateManager.taskId()).thenReturn(taskId); -expect(stateDirectory.lock(taskId)).andReturn(true); +when(stateDirectory.lock(taskId)).thenReturn(true); -stateManager.close(); -expectLastCall().andThrow(new ProcessorStateException("state manager failed to close")); - -stateDirectory.unlock(taskId); - -ctrl.checkOrder(true); -ctrl.replay(); +doThrow(new ProcessorStateException("state manager failed to close")).when(stateManager).close(); assertThrows( ProcessorStateException.class, () -> StateManagerUtil.closeStateManager( logger, "logPrefix:", false, false, stateManager, stateDirectory, TaskType.ACTIVE)); -ctrl.verify(); +verify(stateDirectory).unlock(taskId); } @Test public void testCloseStateManagerWithStateStoreWipeOut() { -expect(stateManager.taskId()).andReturn(taskId); -expect(stateDirectory.lock(taskId)).andReturn(true); - -stateManager.close(); -expectLastCall(); +when(stateManager.taskId()).thenReturn(taskId); +when(stateDirectory.lock(taskId)).thenReturn(true); // The `baseDir` will be accessed when attempting to delete the state store. - expect(stateManager.baseDir()).andReturn(TestUtils.tempDirectory("state_store")); - -stateDirectory.unlock(taskId); -expectLastCall(); - -ctrl.checkOrder(true); -ctrl.replay(); + when(stateManager.baseDir()).thenReturn(TestUtils.tempDirectory("state_store")); StateManagerUtil.closeStateManager(logger, "logPrefix:", false, true, stateManager, stateDirectory, TaskType.ACTIVE); -ctrl.verify(); +verify(stateManager).close(); +verify(stateDirectory).unlock(taskId); } @Test public void shouldStillWipeStateStoresIfCloseThrowsException() throws IOException { final File randomFile = new File("/random/path"); -mockStatic(Utils.class); - -expect(stateManager.taskId()).andReturn(taskId); -expect(stateDirectory.lock(taskId)).andReturn(true); -stateManager.close(); -expectLastCall().andThrow(new ProcessorStateException("Close failed")); +when(stateManager.taskId()).thenReturn(taskId); +when(stateDirectory.lock(taskId)).thenReturn(true); -expect(stateManager.baseDir()).andReturn(randomFile); +doThrow(new ProcessorStateException("Close failed")).when(stateManager).close(); -Utils.delete(randomFile); +when(stateManager.baseDir()).thenReturn(randomFile); -stateDirectory.unlock(taskId); -expectLastCall(); +try (MockedStatic utils = mockStatic(Utils.class)) { +assertThrows(ProcessorStateException.class, () -> +StateManagerUtil.closeStateManager(logger, "logPrefix:", false, true, stateManager, stateDirectory, TaskType.ACTIVE)); +} -ctrl.checkOrder(true); -ctrl.replay(); - -replayAll(); - -assertThrows(ProcessorStateException.class, () -> -StateManagerUtil.closeStateManager(logger, "logPrefix:", false, true, stateManager, stateDirectory, TaskType.ACTIVE)); - -ctrl.verify(); +verify(stateDirectory).unlock(taskId); } @Test public void testCloseStateManagerWithStateStoreWipeOutRethrowWrappedIOException() throws IOException { final File unknownFile = new File("/unknown/path"); -mockStatic(Utils.class); - -expect(stateManager.taskId()).andReturn(taskId); -expect(stateDirectory.lock(taskId)).andReturn(true); - -stateManager.close(); -expectLastCall(); - -expect(stateManager.baseDir()).andReturn(unknownFile); -Utils.delete(unknownFile); -expectLastCall().andThrow(new IOException("Deletion failed")); +when(stateManager.taskId()).thenReturn(taskId); +when(stateDirectory.lock(taskId)).thenReturn(true); +when(stateManager.baseDir()).thenReturn(unknownFile); -stateDirectory.unlock(taskId); -expectLastCall(); +try (MockedStatic utils = mockStatic(Utils.class)) { +
[GitHub] [kafka] clolov commented on a diff in pull request #12821: KAFKA-14132: Replace PowerMock and EasyMock with Mockito in streams tests
clolov commented on code in PR #12821: URL: https://github.com/apache/kafka/pull/12821#discussion_r1024166413 ## streams/src/test/java/org/apache/kafka/streams/processor/internals/StateManagerUtilTest.java: ## @@ -187,151 +139,92 @@ public void testCloseStateManagerThrowsExceptionWhenClean() { // Thrown stateMgr exception will not be wrapped. assertEquals("state manager failed to close", thrown.getMessage()); -ctrl.verify(); +// The unlock logic should still be executed. +verify(stateDirectory).unlock(taskId); } @Test public void testCloseStateManagerThrowsExceptionWhenDirty() { -expect(stateManager.taskId()).andReturn(taskId); +when(stateManager.taskId()).thenReturn(taskId); -expect(stateDirectory.lock(taskId)).andReturn(true); +when(stateDirectory.lock(taskId)).thenReturn(true); -stateManager.close(); -expectLastCall().andThrow(new ProcessorStateException("state manager failed to close")); - -stateDirectory.unlock(taskId); - -ctrl.checkOrder(true); -ctrl.replay(); +doThrow(new ProcessorStateException("state manager failed to close")).when(stateManager).close(); assertThrows( ProcessorStateException.class, () -> StateManagerUtil.closeStateManager( logger, "logPrefix:", false, false, stateManager, stateDirectory, TaskType.ACTIVE)); -ctrl.verify(); +verify(stateDirectory).unlock(taskId); } @Test public void testCloseStateManagerWithStateStoreWipeOut() { -expect(stateManager.taskId()).andReturn(taskId); -expect(stateDirectory.lock(taskId)).andReturn(true); - -stateManager.close(); -expectLastCall(); +when(stateManager.taskId()).thenReturn(taskId); +when(stateDirectory.lock(taskId)).thenReturn(true); // The `baseDir` will be accessed when attempting to delete the state store. - expect(stateManager.baseDir()).andReturn(TestUtils.tempDirectory("state_store")); - -stateDirectory.unlock(taskId); -expectLastCall(); - -ctrl.checkOrder(true); -ctrl.replay(); + when(stateManager.baseDir()).thenReturn(TestUtils.tempDirectory("state_store")); StateManagerUtil.closeStateManager(logger, "logPrefix:", false, true, stateManager, stateDirectory, TaskType.ACTIVE); -ctrl.verify(); +verify(stateManager).close(); +verify(stateDirectory).unlock(taskId); } @Test public void shouldStillWipeStateStoresIfCloseThrowsException() throws IOException { final File randomFile = new File("/random/path"); -mockStatic(Utils.class); - -expect(stateManager.taskId()).andReturn(taskId); -expect(stateDirectory.lock(taskId)).andReturn(true); -stateManager.close(); -expectLastCall().andThrow(new ProcessorStateException("Close failed")); +when(stateManager.taskId()).thenReturn(taskId); +when(stateDirectory.lock(taskId)).thenReturn(true); -expect(stateManager.baseDir()).andReturn(randomFile); +doThrow(new ProcessorStateException("Close failed")).when(stateManager).close(); -Utils.delete(randomFile); +when(stateManager.baseDir()).thenReturn(randomFile); -stateDirectory.unlock(taskId); -expectLastCall(); +try (MockedStatic utils = mockStatic(Utils.class)) { +assertThrows(ProcessorStateException.class, () -> +StateManagerUtil.closeStateManager(logger, "logPrefix:", false, true, stateManager, stateDirectory, TaskType.ACTIVE)); +} -ctrl.checkOrder(true); -ctrl.replay(); - -replayAll(); - -assertThrows(ProcessorStateException.class, () -> -StateManagerUtil.closeStateManager(logger, "logPrefix:", false, true, stateManager, stateDirectory, TaskType.ACTIVE)); - -ctrl.verify(); +verify(stateDirectory).unlock(taskId); } @Test public void testCloseStateManagerWithStateStoreWipeOutRethrowWrappedIOException() throws IOException { final File unknownFile = new File("/unknown/path"); -mockStatic(Utils.class); - -expect(stateManager.taskId()).andReturn(taskId); -expect(stateDirectory.lock(taskId)).andReturn(true); - -stateManager.close(); -expectLastCall(); - -expect(stateManager.baseDir()).andReturn(unknownFile); -Utils.delete(unknownFile); -expectLastCall().andThrow(new IOException("Deletion failed")); +when(stateManager.taskId()).thenReturn(taskId); +when(stateDirectory.lock(taskId)).thenReturn(true); +when(stateManager.baseDir()).thenReturn(unknownFile); -stateDirectory.unlock(taskId); -expectLastCall(); +try (MockedStatic utils = mockStatic(Utils.class)) { +
[GitHub] [kafka] rondagostino commented on a diff in pull request #12856: KAFKA-14392: KRaft broker heartbeat timeout should not exceed broker.session.timeout.ms
rondagostino commented on code in PR #12856: URL: https://github.com/apache/kafka/pull/12856#discussion_r1024154049 ## core/src/main/scala/kafka/server/BrokerToControllerChannelManager.scala: ## @@ -121,16 +121,18 @@ object BrokerToControllerChannelManager { config: KafkaConfig, channelName: String, threadNamePrefix: Option[String], -retryTimeoutMs: Long - ): BrokerToControllerChannelManager = { +retryTimeoutMs: Long, +networkClientRetryTimeoutMs: Option[Int] = None Review Comment: The fact that there are actually two separate timeouts here is a signal that something may be wrong. To be clear, there were 2 separate timeouts prior to this PR -- this patch just makes their existence explicit so that we can set both of them in the KRaft case. I've opened https://issues.apache.org/jira/browse/KAFKA-14394 to track this. ## core/src/main/scala/kafka/server/BrokerToControllerChannelManager.scala: ## @@ -211,7 +214,7 @@ class BrokerToControllerChannelManagerImpl( 50, Selectable.USE_DEFAULT_BUFFER_SIZE, Selectable.USE_DEFAULT_BUFFER_SIZE, -config.requestTimeoutMs, + networkClientRetryTimeoutMs.getOrElse(config.controllerSocketTimeoutMs), Review Comment: Again, this is a cosmetic change. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] YeonCheolGit commented on a diff in pull request #12803: KAFKA-13602: Adding ability to multicast records.
YeonCheolGit commented on code in PR #12803: URL: https://github.com/apache/kafka/pull/12803#discussion_r1024151927 ## streams/src/main/java/org/apache/kafka/streams/kstream/internals/KTableImpl.java: ## @@ -1046,7 +1047,18 @@ public KTable leftJoin(final KTable other, return doJoinOnForeignKey(other, foreignKeyExtractor, joiner, TableJoined.with(null, null), materialized, true); } -@SuppressWarnings("unchecked") +private final Function>, Integer> getPartition = maybeMulticastPartitions -> { Review Comment: @vamossagar12 Thanks for letting me know about your intention:) Your answer is very clear to me. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] rondagostino commented on pull request #12856: KAFKA-14392: KRaft broker heartbeat timeout should not exceed broker.session.timeout.ms
rondagostino commented on PR #12856: URL: https://github.com/apache/kafka/pull/12856#issuecomment-1317191981 > change is getting a bit too big, Thanks, @cmccabe. I've trimmed it down and opened the separate ticket as you mentioned. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] rondagostino commented on a diff in pull request #12856: KAFKA-14392: KRaft broker heartbeat timeout should not exceed broker.session.timeout.ms
rondagostino commented on code in PR #12856: URL: https://github.com/apache/kafka/pull/12856#discussion_r1024149966 ## core/src/main/scala/kafka/server/AlterPartitionManager.scala: ## @@ -91,7 +91,8 @@ object AlterPartitionManager { config = config, channelName = "alterPartition", threadNamePrefix = threadNamePrefix, - retryTimeoutMs = Long.MaxValue + networkClientRetryTimeoutMs = if (config.processRoles.isEmpty) config.controllerSocketTimeoutMs else config.brokerSessionTimeoutMs / 2, Review Comment: > let's leave this one alone for now and file a follow-up JIRA for it I've opened https://issues.apache.org/jira/browse/KAFKA-14394 to track this. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Created] (KAFKA-14394) BrokerToControllerChannelManager has 2 separate timeouts
Ron Dagostino created KAFKA-14394: - Summary: BrokerToControllerChannelManager has 2 separate timeouts Key: KAFKA-14394 URL: https://issues.apache.org/jira/browse/KAFKA-14394 Project: Kafka Issue Type: Task Reporter: Ron Dagostino BrokerToControllerChannelManager uses `config.controllerSocketTimeoutMs` as its default `networkClientRetryTimeoutMs` in general, but it does accept a second `retryTimeoutMs`, value -- and then there is exactly one place where second timeout is used: within BrokerToControllerRequestThread. Is this second, separate timeout actually necessary, or is it a bug (in which case the two timeouts should be the same). Closely related to this is the case of AlterPartitionManager, which sends Long.MAX_VALUE as the retryTimeoutMs value when it instantiates its instance of BrokerToControllerChannelManager. Is this Long.MAX_VALUE correct, when in fact `config.controllerSocketTimeoutMs` is being used as the other timeout? This is related to https://issues.apache.org/jira/projects/KAFKA/issues/KAFKA-14392 and the associated PR, https://github.com/apache/kafka/pull/12856 -- This message was sent by Atlassian Jira (v8.20.10#820010)
[GitHub] [kafka] bbejeck commented on a diff in pull request #12861: KAFKA-14388 - Fixes the NPE when using the new Processor API with the DSL
bbejeck commented on code in PR #12861: URL: https://github.com/apache/kafka/pull/12861#discussion_r1024098926 ## streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamNewProcessorApiTest.java: ## @@ -0,0 +1,117 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.kafka.streams.kstream.internals; + +import org.apache.kafka.common.serialization.Deserializer; +import org.apache.kafka.common.serialization.Serdes; +import org.apache.kafka.streams.KeyValue; +import org.apache.kafka.streams.StreamsBuilder; +import org.apache.kafka.streams.TestInputTopic; +import org.apache.kafka.streams.TopologyTestDriver; +import org.apache.kafka.streams.kstream.Consumed; +import org.apache.kafka.streams.kstream.Produced; +import org.apache.kafka.streams.processor.api.ContextualFixedKeyProcessor; +import org.apache.kafka.streams.processor.api.FixedKeyProcessorContext; +import org.apache.kafka.streams.processor.api.FixedKeyProcessorSupplier; +import org.apache.kafka.streams.processor.api.FixedKeyRecord; +import org.apache.kafka.streams.state.KeyValueStore; +import org.apache.kafka.streams.state.StoreBuilder; +import org.apache.kafka.streams.state.Stores; +import org.junit.jupiter.api.Assertions; +import org.junit.jupiter.api.DisplayName; +import org.junit.jupiter.api.Test; + +import java.util.ArrayList; +import java.util.Arrays; +import java.util.Collections; +import java.util.List; +import java.util.Set; + +import static java.util.Arrays.asList; + + +public class KStreamNewProcessorApiTest { + +@Test +@DisplayName("Test for using new Processor API and state stores with the DSL") +void shouldGetStateStoreWithNewProcessor() { +final StreamsBuilder builder = new StreamsBuilder(); +final StoreBuilder storeBuilder = Stores.keyValueStoreBuilder(Stores.inMemoryKeyValueStore("store"), Serdes.String(), Serdes.String()); + + +builder.stream("input", Consumed.with(Serdes.String(), Serdes.String())) +.processValues(new TransformerSupplier(storeBuilder), "store") +.to("output", Produced.with(Serdes.String(), Serdes.String())); + +final List> words = Arrays.asList(KeyValue.pair("a", "foo"), KeyValue.pair("b", "bar"), KeyValue.pair("c", "baz")); +try (TopologyTestDriver testDriver = new TopologyTestDriver(builder.build())) { +final TestInputTopic +testDriverInputTopic = +testDriver.createInputTopic("input", Serdes.String().serializer(), Serdes.String().serializer()); + +words.forEach(clk -> testDriverInputTopic.pipeInput(clk.key, clk.value)); + +final List expectedOutput = asList("fooUpdated", "barUpdated", "bazUpdated"); + +final Deserializer keyDeserializer = Serdes.String().deserializer(); +final List actualOutput = +new ArrayList<>(testDriver.createOutputTopic("output", keyDeserializer, Serdes.String().deserializer()).readValuesToList()); + +Assertions.assertEquals(expectedOutput, actualOutput); +} +} +private static class TransformerSupplier implements FixedKeyProcessorSupplier { +private final StoreBuilder storeBuilder; + +public TransformerSupplier(final StoreBuilder storeBuilder) { +this.storeBuilder = storeBuilder; +} + +@Override +public ContextualFixedKeyProcessor get() { +return new ContextualFixedKeyProcessor() { +KeyValueStore store; +FixedKeyProcessorContext context; + +@Override +public void init(final FixedKeyProcessorContext context) { +super.init(context); +store = context.getStateStore("store"); +this.context = context; +} + +@Override +public void process(final FixedKeyRecord record) { +final String updated = store.get(record.key()); +store.putIfAbsent(record.key(), record.value() + "Updated"); Review Comment: ack - will test the store content as well -- This is an automated message from the
[GitHub] [kafka] bbejeck commented on a diff in pull request #12861: KAFKA-14388 - Fixes the NPE when using the new Processor API with the DSL
bbejeck commented on code in PR #12861: URL: https://github.com/apache/kafka/pull/12861#discussion_r1024098085 ## streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamNewProcessorApiTest.java: ## @@ -0,0 +1,117 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.kafka.streams.kstream.internals; + +import org.apache.kafka.common.serialization.Deserializer; +import org.apache.kafka.common.serialization.Serdes; +import org.apache.kafka.streams.KeyValue; +import org.apache.kafka.streams.StreamsBuilder; +import org.apache.kafka.streams.TestInputTopic; +import org.apache.kafka.streams.TopologyTestDriver; +import org.apache.kafka.streams.kstream.Consumed; +import org.apache.kafka.streams.kstream.Produced; +import org.apache.kafka.streams.processor.api.ContextualFixedKeyProcessor; +import org.apache.kafka.streams.processor.api.FixedKeyProcessorContext; +import org.apache.kafka.streams.processor.api.FixedKeyProcessorSupplier; +import org.apache.kafka.streams.processor.api.FixedKeyRecord; +import org.apache.kafka.streams.state.KeyValueStore; +import org.apache.kafka.streams.state.StoreBuilder; +import org.apache.kafka.streams.state.Stores; +import org.junit.jupiter.api.Assertions; +import org.junit.jupiter.api.DisplayName; +import org.junit.jupiter.api.Test; + +import java.util.ArrayList; +import java.util.Arrays; +import java.util.Collections; +import java.util.List; +import java.util.Set; + +import static java.util.Arrays.asList; + + +public class KStreamNewProcessorApiTest { + +@Test +@DisplayName("Test for using new Processor API and state stores with the DSL") +void shouldGetStateStoreWithNewProcessor() { +final StreamsBuilder builder = new StreamsBuilder(); +final StoreBuilder storeBuilder = Stores.keyValueStoreBuilder(Stores.inMemoryKeyValueStore("store"), Serdes.String(), Serdes.String()); + + +builder.stream("input", Consumed.with(Serdes.String(), Serdes.String())) +.processValues(new TransformerSupplier(storeBuilder), "store") Review Comment: ack - I'll add test for using `addStateStore` instead of using `ConnectedStoreProvider` as well -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] jonathan-albrecht-ibm commented on a diff in pull request #12343: MINOR: Update unit/integration tests to work with the IBM Semeru JDK
jonathan-albrecht-ibm commented on code in PR #12343: URL: https://github.com/apache/kafka/pull/12343#discussion_r1024093773 ## core/src/test/scala/unit/kafka/utils/JaasTestUtils.scala: ## @@ -119,6 +120,18 @@ object JaasTestUtils { } } + private val isIbmSecurity = try { + Class.forName("sun.security.krb5.Config"); false +} catch { + case ex: Exception => { +if (Java.isIbmJdk) { + Class.forName("com.ibm.security.krb5.internal.Config"); true +} else { + throw ex +} + } +} Review Comment: See reply above. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] jonathan-albrecht-ibm commented on a diff in pull request #12343: MINOR: Update unit/integration tests to work with the IBM Semeru JDK
jonathan-albrecht-ibm commented on code in PR #12343: URL: https://github.com/apache/kafka/pull/12343#discussion_r1024093478 ## core/src/test/scala/kafka/security/minikdc/MiniKdc.scala: ## @@ -260,11 +260,17 @@ class MiniKdc(config: Properties, workDir: File) extends Logging { } private def refreshJvmKerberosConfig(): Unit = { -val klass = - if (Java.isIbmJdk) -Class.forName("com.ibm.security.krb5.internal.Config") - else -Class.forName("sun.security.krb5.Config") +// Newer IBM JDKs use the OpenJDK security providers so try that first +val klass = try { + Class.forName("sun.security.krb5.Config") +} catch { + case ex: Exception => { +if (Java.isIbmJdk) + Class.forName("com.ibm.security.krb5.internal.Config") +else + throw ex + } +} Review Comment: The idea was to just check for the classes directly but you're definitely right that `Java.isIbmJdkSemeru` is more readable. I have a new commit to make this change and the other one below, but I'll hold off on pushing it to see if any more feedback from @ijuma or @hachikuji. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] jonathan-albrecht-ibm commented on a diff in pull request #12343: MINOR: Update unit/integration tests to work with the IBM Semeru JDK
jonathan-albrecht-ibm commented on code in PR #12343: URL: https://github.com/apache/kafka/pull/12343#discussion_r1024086819 ## clients/src/test/java/org/apache/kafka/clients/producer/KafkaProducerTest.java: ## @@ -2263,7 +2263,10 @@ public void configure(Map configs) { private Sender sender = mock(Sender.class); private TransactionManager transactionManager = mock(TransactionManager.class); private Partitioner partitioner = mock(Partitioner.class); -private KafkaThread ioThread = mock(KafkaThread.class); +private KafkaThread ioThread = new KafkaThread("Fake Kafka Producer I/O Thread", new Runnable() { +@Override +public void run() {} +}, true); Review Comment: I have run the test again without this change and it looks like it is no longer needed. I see that mockito has been upgraded to newer releases a few times since this PR was created so I'm guessing that has fixed this problem. I'll push an amended commit to remove this change but let me know if the kafka developers prefer adding a new commit to remove it instead. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] dajac commented on a diff in pull request #12748: KAFKA-13715: add generationId field in subscription
dajac commented on code in PR #12748: URL: https://github.com/apache/kafka/pull/12748#discussion_r1024071860 ## clients/src/test/java/org/apache/kafka/clients/consumer/internals/AbstractStickyAssignorTest.java: ## @@ -73,6 +88,74 @@ public void setUp() { } } +@Test +public void testMemberDataFromSubscription() { +List topics = topics(topic); +List ownedPartitions = partitions(tp(topic1, 0), tp(topic2, 1)); +List subscriptions = new ArrayList<>(); +// add all subscription in ConsumerProtocolSubscription versions +subscriptions.add(buildSubscriptionV0(topics, ownedPartitions, generationId)); +subscriptions.add(buildSubscriptionV1(topics, ownedPartitions, generationId)); +subscriptions.add(buildSubscriptionV2Above(topics, ownedPartitions, generationId)); +for (Subscription subscription : subscriptions) { +if (subscription != null) { +AbstractStickyAssignor.MemberData memberData = assignor.memberDataFromSubscription(subscription); +assertEquals(ownedPartitions, memberData.partitions, "subscription: " + subscription + " doesn't have expected owned partition"); +assertEquals(generationId, memberData.generation.orElse(-1), "subscription: " + subscription + " doesn't have expected generation id"); +} +} +} + +@Test +public void testMemberDataFromSubscriptionWithInconsistentData() { +Map partitionsPerTopic = new HashMap<>(); +partitionsPerTopic.put(topic, 2); +List ownedPartitionsInUserdata = partitions(tp1); +List ownedPartitionsInSubscription = partitions(tp0); + +assignor.onAssignment(new ConsumerPartitionAssignor.Assignment(ownedPartitionsInUserdata), new ConsumerGroupMetadata(groupId, generationId, consumer1, Optional.empty())); +ByteBuffer userDataWithHigherGenerationId = assignor.subscriptionUserData(new HashSet<>(topics(topic))); +// The owned partitions and generation id are provided in user data and different owned partition is provided in subscription without generation id +// If subscription provides no generation id, we'll honor the generation id in userData and owned partitions in subscription +Subscription subscription = new Subscription(topics(topic), userDataWithHigherGenerationId, ownedPartitionsInSubscription); + +AbstractStickyAssignor.MemberData memberData = assignor.memberDataFromSubscription(subscription); +// in CooperativeStickyAssignor, we'll serialize owned partition in subscription into userData +// but in StickyAssignor, we'll serialize owned partition in assignment into userData +if (assignor instanceof CooperativeStickyAssignor) { Review Comment: This is not very elegant given that we have subclasses for each. Should we move this test to the respective subclasses? ## clients/src/test/java/org/apache/kafka/clients/consumer/internals/AbstractStickyAssignorTest.java: ## @@ -73,6 +88,74 @@ public void setUp() { } } +@Test +public void testMemberDataFromSubscription() { +List topics = topics(topic); +List ownedPartitions = partitions(tp(topic1, 0), tp(topic2, 1)); +List subscriptions = new ArrayList<>(); +// add all subscription in ConsumerProtocolSubscription versions +subscriptions.add(buildSubscriptionV0(topics, ownedPartitions, generationId)); +subscriptions.add(buildSubscriptionV1(topics, ownedPartitions, generationId)); +subscriptions.add(buildSubscriptionV2Above(topics, ownedPartitions, generationId)); +for (Subscription subscription : subscriptions) { +if (subscription != null) { +AbstractStickyAssignor.MemberData memberData = assignor.memberDataFromSubscription(subscription); +assertEquals(ownedPartitions, memberData.partitions, "subscription: " + subscription + " doesn't have expected owned partition"); +assertEquals(generationId, memberData.generation.orElse(-1), "subscription: " + subscription + " doesn't have expected generation id"); +} +} +} + +@Test +public void testMemberDataFromSubscriptionWithInconsistentData() { +Map partitionsPerTopic = new HashMap<>(); +partitionsPerTopic.put(topic, 2); +List ownedPartitionsInUserdata = partitions(tp1); +List ownedPartitionsInSubscription = partitions(tp0); + +assignor.onAssignment(new ConsumerPartitionAssignor.Assignment(ownedPartitionsInUserdata), new ConsumerGroupMetadata(groupId, generationId, consumer1, Optional.empty())); +ByteBuffer userDataWithHigherGenerationId = assignor.subscriptionUserData(new HashSet<>(topics(topic))); +// The owned partitions and generation id are provided in user data and different owned partition is provided in
[GitHub] [kafka] rigelbm closed pull request #12493: MINOR: Migrate from slf4j-log4j12 to slf4j-reload4j.
rigelbm closed pull request #12493: MINOR: Migrate from slf4j-log4j12 to slf4j-reload4j. URL: https://github.com/apache/kafka/pull/12493 -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] dajac commented on a diff in pull request #12748: KAFKA-13715: add generationId field in subscription
dajac commented on code in PR #12748: URL: https://github.com/apache/kafka/pull/12748#discussion_r1024047722 ## clients/src/main/java/org/apache/kafka/clients/consumer/internals/AbstractStickyAssignor.java: ## @@ -162,6 +162,17 @@ private boolean allSubscriptionsEqual(Set allTopics, return isAllSubscriptionsEqual; } +// visible for testing +MemberData memberDataFromSubscription(Subscription subscription) { +if (!subscription.ownedPartitions().isEmpty() && subscription.generationId().isPresent()) { Review Comment: I wonder if this is 100% correct. Could we have a case where subscription v3 is used but no owned partitions are reported by the member? For instance, this could happen if the group has more members than partitions. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Resolved] (KAFKA-14334) DelayedFetch purgatory not completed when appending as follower
[ https://issues.apache.org/jira/browse/KAFKA-14334?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] David Jacot resolved KAFKA-14334. - Fix Version/s: 3.4.0 3.3.2 Reviewer: David Jacot Resolution: Fixed > DelayedFetch purgatory not completed when appending as follower > --- > > Key: KAFKA-14334 > URL: https://issues.apache.org/jira/browse/KAFKA-14334 > Project: Kafka > Issue Type: Bug >Reporter: Jeff Kim >Assignee: Jeff Kim >Priority: Major > Fix For: 3.4.0, 3.3.2 > > > Currently, the ReplicaManager.delayedFetchPurgatory is only completed when > appending as leader. With > [https://cwiki.apache.org/confluence/display/KAFKA/KIP-392%3A+Allow+consumers+to+fetch+from+closest+replica] > enabled, followers will also have to complete delayed fetch requests after > successfully replicating. Otherwise, consumer fetches to closest followers > will hit fetch.max.wait.ms -- This message was sent by Atlassian Jira (v8.20.10#820010)
[GitHub] [kafka] divijvaidya commented on a diff in pull request #12233: MINOR: Clean up tmp files created by tests
divijvaidya commented on code in PR #12233: URL: https://github.com/apache/kafka/pull/12233#discussion_r1024041485 ## clients/src/test/java/org/apache/kafka/test/TestUtils.java: ## @@ -142,26 +142,40 @@ public static String randomString(final int len) { } /** - * Create an empty file in the default temporary-file directory, using `kafka` as the prefix and `tmp` as the - * suffix to generate its name. + * Create an empty file in the default temporary-file directory, using the given prefix and suffix + * to generate its name. + * @throws IOException */ -public static File tempFile() throws IOException { -final File file = File.createTempFile("kafka", ".tmp"); +public static File tempFile(final String prefix, final String suffix) throws IOException { +final File file = Files.createTempFile(prefix, suffix).toFile(); file.deleteOnExit(); +Exit.addShutdownHook("delete-temp-file-shutdown-hook", () -> { Review Comment: From what I understand, we try to use `Exit.scala` instead of `Runtime.getRuntime()` so that Tests may provide their own implementation of shutdown hook adder. This is mentioned here: https://github.com/apache/kafka/blob/trunk/core/src/main/scala/kafka/utils/Exit.scala#L22 I didn't understand the scenario where this hook will be disabled! Could you please explain with an example? If a test would override the shutdown hook behaviour, then I would assume it will add a new shutdown hook? or perhaps, add a new `ShutdownHookAdder` where it may play with the order of the hooks? In all such scenarios, this cleanup hook will still be executed as long it is registered correctly. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] dajac commented on pull request #12783: KAFKA-14334: complete delayed purgatory after replication
dajac commented on PR #12783: URL: https://github.com/apache/kafka/pull/12783#issuecomment-1317073961 Merged to trunk and 3.3. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] dajac merged pull request #12783: KAFKA-14334: complete delayed purgatory after replication
dajac merged PR #12783: URL: https://github.com/apache/kafka/pull/12783 -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] dajac commented on pull request #12783: KAFKA-14334: complete delayed purgatory after replication
dajac commented on PR #12783: URL: https://github.com/apache/kafka/pull/12783#issuecomment-1317056696 Failed tests are unrelated: ``` Build / JDK 17 and Scala 2.13 / kafka.api.TransactionsExpirationTest.testTransactionAfterProducerIdExpires(String).quorum=kraft Build / JDK 11 and Scala 2.13 / org.apache.kafka.connect.mirror.integration.MirrorConnectorsWithCustomForwardingAdminIntegrationTest.testCreatePartitionsUseProvidedForwardingAdmin() Build / JDK 11 and Scala 2.13 / org.apache.kafka.controller.QuorumControllerTest.testDelayedConfigurationOperations() Build / JDK 11 and Scala 2.13 / org.apache.kafka.trogdor.coordinator.CoordinatorTest.testTaskRequestWithOldStartMsGetsUpdated() Build / JDK 8 and Scala 2.12 / org.apache.kafka.controller.QuorumControllerTest.testEarlyControllerResults() ``` -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] cadonna commented on pull request #12809: KAFKA-14324: Upgrade RocksDB to 7.1.2
cadonna commented on PR #12809: URL: https://github.com/apache/kafka/pull/12809#issuecomment-1317015966 Backported to 3.3, 3.2, 3.1, and 3.0. For 3.1 and 3.0, I also needed to backport https://github.com/apache/kafka/pull/11690. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] jeqo commented on a diff in pull request #12859: KAFKA-14325: Fix NPE on Processor Parameters toString
jeqo commented on code in PR #12859: URL: https://github.com/apache/kafka/pull/12859#discussion_r1023922604 ## streams/src/main/java/org/apache/kafka/streams/kstream/internals/graph/ProcessorParameters.java: ## @@ -130,7 +129,8 @@ public String processorName() { @Override public String toString() { return "ProcessorParameters{" + -"processor class=" + processorSupplier.get().getClass() + +"processor supplier class=" + (processorSupplier != null ? processorSupplier.getClass() : "null") + +", fixed key processor supplier class=" + (fixedKeyProcessorSupplier != null ? fixedKeyProcessorSupplier.getClass() : "null") + Review Comment: Avoid calling get() as it could trigger side effects (it does on tests, see https://github.com/apache/kafka/blob/039a245a2314c63834e61989939738579bad/streams/src/test/java/org/apache/kafka/test/MockApiProcessorSupplier.java#L51-L54) ## streams/src/main/java/org/apache/kafka/streams/kstream/internals/graph/ProcessorParameters.java: ## @@ -114,13 +114,12 @@ KTableSource kTableSourceSupplier() { @SuppressWarnings("unchecked") KTableProcessorSupplier kTableProcessorSupplier() { -// This cast always works because KTableProcessorSupplier hasn't been converted yet. Review Comment: Now that TableProcessor suplier is using latest API, I think this is worth updating ## streams/src/test/resources/log4j.properties: ## @@ -27,7 +27,7 @@ log4j.logger.org.apache.kafka.clients=ERROR # These are the only logs we will likely ever find anything useful in to debug Streams test failures log4j.logger.org.apache.kafka.clients.consumer=INFO log4j.logger.org.apache.kafka.clients.producer=INFO -log4j.logger.org.apache.kafka.streams=INFO +log4j.logger.org.apache.kafka.streams=DEBUG Review Comment: This would have triggered the error in the first place -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] dajac opened a new pull request, #12865: MINOR: Serialize response in KafkaApisTest
dajac opened a new pull request, #12865: URL: https://github.com/apache/kafka/pull/12865 WIP ### Committer Checklist (excluded from commit message) - [ ] Verify design and implementation - [ ] Verify test coverage and CI build status - [ ] Verify documentation (including upgrade notes) -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] clolov commented on pull request #12818: KAFKA-14133: Replace EasyMock with Mockito in streams tests
clolov commented on PR #12818: URL: https://github.com/apache/kafka/pull/12818#issuecomment-1316803784 Tagging @divijvaidya for visibility as well :) -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] clolov commented on pull request #12418: KAFKA-13414: Replace PowerMock/EasyMock with Mockito in connect.storage.KafkaOffsetBackingStoreTest
clolov commented on PR #12418: URL: https://github.com/apache/kafka/pull/12418#issuecomment-1316765701 Thank you for the quick turnaround on the review @C0urante! I will aim to open a couple more pull requests in the upcoming days for Connect with respect to the Mockito migration and tag you there. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Updated] (KAFKA-14132) Remaining PowerMock to Mockito tests
[ https://issues.apache.org/jira/browse/KAFKA-14132?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Christo Lolov updated KAFKA-14132: -- Description: {color:#de350b}Some of the tests below use EasyMock as well. For those migrate both PowerMock and EasyMock to Mockito.{color} Unless stated in brackets the tests are in the connect module. A list of tests which still require to be moved from PowerMock to Mockito as of 2nd of August 2022 which do not have a Jira issue and do not have pull requests I am aware of which are opened: {color:#ff8b00}InReview{color} {color:#00875a}Merged{color} # {color:#ff8b00}ErrorHandlingTaskTest{color} (owner: [~shekharrajak]) # {color:#00875a}SourceTaskOffsetCommiterTest{color} (owner: Christo) # {color:#00875a}WorkerMetricsGroupTest{color} (owner: Divij) # WorkerSinkTaskTest (owner: Divij) *WIP* # WorkerSinkTaskThreadedTest (owner: Divij) *WIP* # {color:#00875a}WorkerTaskTest{color} (owner: [~yash.mayya]) # {color:#00875a}ErrorReporterTest{color} (owner: [~yash.mayya]) # {color:#00875a}RetryWithToleranceOperatorTest{color} (owner: [~yash.mayya]) # {color:#00875a}WorkerErrantRecordReporterTest{color} (owner: [~yash.mayya]) # {color:#00875a}ConnectorsResourceTest{color} (owner: [~mdedetrich-aiven]) # {color:#ff8b00}StandaloneHerderTest{color} (owner: [~mdedetrich-aiven]) ([https://github.com/apache/kafka/pull/12728]) # KafkaConfigBackingStoreTest (owner: [~mdedetrich-aiven]) # {color:#00875a}KafkaOffsetBackingStoreTest{color} (owner: Christo) ([https://github.com/apache/kafka/pull/12418]) # {color:#FF8B00}KafkaBasedLogTest{color} (owner: [~mdedetrich-aiven]) # RetryUtilTest (owner: [~mdedetrich-aiven] ) # {color:#FF8B00}RepartitionTopicTest{color} (streams) (owner: Christo) # {color:#FF8B00}StateManagerUtilTest{color} (streams) (owner: Christo) *The coverage report for the above tests after the change should be >= to what the coverage is now.* was: {color:#de350b}Some of the tests below use EasyMock as well. For those migrate both PowerMock and EasyMock to Mockito.{color} Unless stated in brackets the tests are in the connect module. A list of tests which still require to be moved from PowerMock to Mockito as of 2nd of August 2022 which do not have a Jira issue and do not have pull requests I am aware of which are opened: {color:#ff8b00}InReview{color} {color:#00875a}Merged{color} # {color:#ff8b00}ErrorHandlingTaskTest{color} (owner: [~shekharrajak]) # {color:#00875a}SourceTaskOffsetCommiterTest{color} (owner: Christo) # {color:#00875a}WorkerMetricsGroupTest{color} (owner: Divij) # WorkerSinkTaskTest (owner: Divij) *WIP* # WorkerSinkTaskThreadedTest (owner: Divij) *WIP* # {color:#00875a}WorkerTaskTest{color} (owner: [~yash.mayya]) # {color:#00875a}ErrorReporterTest{color} (owner: [~yash.mayya]) # {color:#00875a}RetryWithToleranceOperatorTest{color} (owner: [~yash.mayya]) # {color:#00875a}WorkerErrantRecordReporterTest{color} (owner: [~yash.mayya]) # {color:#00875a}ConnectorsResourceTest{color} (owner: [~mdedetrich-aiven]) # {color:#ff8b00}StandaloneHerderTest{color} (owner: [~mdedetrich-aiven]) ([https://github.com/apache/kafka/pull/12728]) # KafkaConfigBackingStoreTest (owner: [~mdedetrich-aiven]) # {color:#ff8b00}KafkaOffsetBackingStoreTest{color} (owner: Christo) ([https://github.com/apache/kafka/pull/12418]) # {color:#FF8B00}KafkaBasedLogTest{color} (owner: [~mdedetrich-aiven]) # RetryUtilTest (owner: [~mdedetrich-aiven] ) # {color:#FF8B00}RepartitionTopicTest{color} (streams) (owner: Christo) # {color:#FF8B00}StateManagerUtilTest{color} (streams) (owner: Christo) *The coverage report for the above tests after the change should be >= to what the coverage is now.* > Remaining PowerMock to Mockito tests > > > Key: KAFKA-14132 > URL: https://issues.apache.org/jira/browse/KAFKA-14132 > Project: Kafka > Issue Type: Sub-task >Reporter: Christo Lolov >Assignee: Christo Lolov >Priority: Major > > {color:#de350b}Some of the tests below use EasyMock as well. For those > migrate both PowerMock and EasyMock to Mockito.{color} > Unless stated in brackets the tests are in the connect module. > A list of tests which still require to be moved from PowerMock to Mockito as > of 2nd of August 2022 which do not have a Jira issue and do not have pull > requests I am aware of which are opened: > {color:#ff8b00}InReview{color} > {color:#00875a}Merged{color} > # {color:#ff8b00}ErrorHandlingTaskTest{color} (owner: [~shekharrajak]) > # {color:#00875a}SourceTaskOffsetCommiterTest{color} (owner: Christo) > # {color:#00875a}WorkerMetricsGroupTest{color} (owner: Divij) > # WorkerSinkTaskTest (owner: Divij) *WIP* > # WorkerSinkTaskThreadedTest (owner: Divij) *WIP* > # {color:#00875a}WorkerTaskTest{color} (owner: [~yash.mayya]) > #
[GitHub] [kafka] dajac commented on pull request #12845: KAFKA-14367; Add `JoinGroup` to the new `GroupCoordinator` interface
dajac commented on PR #12845: URL: https://github.com/apache/kafka/pull/12845#issuecomment-1316692147 @jeffkbkim @jolshan Thanks for your comments. I have addressed your feedback. Note that I have extracted the changes about the JoinGroupResponse version handling in a separate PR: https://github.com/apache/kafka/pull/12864. It is less risky this way. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] dajac commented on a diff in pull request #12845: KAFKA-14367; Add `JoinGroup` to the new `GroupCoordinator` interface
dajac commented on code in PR #12845: URL: https://github.com/apache/kafka/pull/12845#discussion_r1023749684 ## core/src/main/scala/kafka/server/KafkaApis.scala: ## @@ -1646,73 +1649,46 @@ class KafkaApis(val requestChannel: RequestChannel, } } + private def makeGroupCoordinatorRequestContextFrom( Review Comment: why do you think so? is it because of the usage of `make`? the name looks quite reasonable to me. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] dajac commented on a diff in pull request #12845: KAFKA-14367; Add `JoinGroup` to the new `GroupCoordinator` interface
dajac commented on code in PR #12845: URL: https://github.com/apache/kafka/pull/12845#discussion_r1023741182 ## core/src/main/scala/kafka/server/BrokerServer.scala: ## @@ -22,9 +22,8 @@ import java.util import java.util.concurrent.atomic.AtomicBoolean import java.util.concurrent.locks.ReentrantLock import java.util.concurrent.{CompletableFuture, ExecutionException, TimeUnit, TimeoutException} - Review Comment: Nope. Reverted. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] dajac commented on a diff in pull request #12864: MINOR: Handle JoinGroupResponseData.protocolName backward compatibility in JoinGroupResponse
dajac commented on code in PR #12864: URL: https://github.com/apache/kafka/pull/12864#discussion_r1023727410 ## clients/src/main/java/org/apache/kafka/common/requests/JoinGroupResponse.java: ## @@ -28,9 +28,15 @@ public class JoinGroupResponse extends AbstractResponse { private final JoinGroupResponseData data; -public JoinGroupResponse(JoinGroupResponseData data) { +public JoinGroupResponse(JoinGroupResponseData data, short version) { super(ApiKeys.JOIN_GROUP); this.data = data; + +// All versions prior to version 7 do not support nullable +// string for the protocol name. Empty string should be used. +if (version < 7 && data.protocolName() == null) { +data.setProtocolName(""); +} Review Comment: This new logic replaces https://github.com/apache/kafka/pull/12864/files#diff-cc056b4960ededba37a438b1454f0f3c5ff5e8ad5e6d2ec9a08e813ca056ffebL1655. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] dajac opened a new pull request, #12864: MINOR: Handle JoinGroupResponseData.protocolName backward compatibility in JoinGroupResponse
dajac opened a new pull request, #12864: URL: https://github.com/apache/kafka/pull/12864 This is a small refactor extracted from https://github.com/apache/kafka/pull/12845. It basically moves the logic to handle the backward compatibility of `JoinGroupResponseData.protocolName` from `KafkaApis` to `JoinGroupResponse`. The patch adds a new unit test for `JoinGroupResponse` and relies on existing tests as well. ### Committer Checklist (excluded from commit message) - [ ] Verify design and implementation - [ ] Verify test coverage and CI build status - [ ] Verify documentation (including upgrade notes) -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org