[GitHub] [kafka] dajac commented on a diff in pull request #12845: KAFKA-14367; Add `JoinGroup` to the new `GroupCoordinator` interface

2022-11-16 Thread GitBox


dajac commented on code in PR #12845:
URL: https://github.com/apache/kafka/pull/12845#discussion_r1024835046


##
core/src/main/scala/kafka/server/KafkaApis.scala:
##
@@ -1646,73 +1649,46 @@ class KafkaApis(val requestChannel: RequestChannel,
 }
   }
 
+  private def makeGroupCoordinatorRequestContextFrom(

Review Comment:
   I can remove the `From` part. I am not too opinionated on this.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] showuon commented on pull request #12748: KAFKA-13715: add generationId field in subscription

2022-11-16 Thread GitBox


showuon commented on PR #12748:
URL: https://github.com/apache/kafka/pull/12748#issuecomment-1318195383

   @dajac , thanks for the comments. PR updated. Thanks.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] showuon commented on a diff in pull request #12748: KAFKA-13715: add generationId field in subscription

2022-11-16 Thread GitBox


showuon commented on code in PR #12748:
URL: https://github.com/apache/kafka/pull/12748#discussion_r1024795520


##
clients/src/main/java/org/apache/kafka/clients/consumer/internals/AbstractStickyAssignor.java:
##
@@ -162,6 +162,17 @@ private boolean allSubscriptionsEqual(Set 
allTopics,
 return isAllSubscriptionsEqual;
 }
 
+// visible for testing
+MemberData memberDataFromSubscription(Subscription subscription) {
+if (!subscription.ownedPartitions().isEmpty() && 
subscription.generationId().isPresent()) {

Review Comment:
   I know what you mean now. Yes, for `subscription v2` check, we should only 
check generationId. Will update it and add a test for it.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[jira] [Comment Edited] (KAFKA-13639) NotEnoughReplicasException for __consumer_offsets topic due to out of order offset

2022-11-16 Thread HaiyuanZhao (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-13639?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17635089#comment-17635089
 ] 

HaiyuanZhao edited comment on KAFKA-13639 at 11/17/22 6:23 AM:
---

[~ijuma] Hi, We also have seen similar behavior with java 1.8.0_45. 

In addition, we encountered this issue with very large values, and not just 
topic __consumer_offset. The exception log is as followed. Hope this can help.
{code:java}
[2022-11-16 22:54:52,147] ERROR [ReplicaFetcher replicaId=0, leaderId=1, 
fetcherId=419] Unexpected error occurred while processing data for partition 
test-topic-test-topic500--2 at
offset 7434 (kafka.server.ReplicaFetcherThread)
kafka.common.OffsetsOutOfOrderException: Out of order offsets found in append 
to test-topic-test-topic500--2: ArrayBuffer(4126996017, 4126996018, 
4126996019, 4126996020, 4126996021,
 4126996022, 4126996023, 4126996024, 4126996025, 4126996026, 4126996027, 
4126996028, 4126996029, 4126996030, 4126996031, 4126996032, 4126996033, 
4126996034, 4126996035, 4126996036, 4126
996037, 4126996038, 4126996039, 4126996040, 4126996041, 4126996042, 4126996043, 
4126996044, 4126996045, 4126996046, 4126996047, 4126996048, 4126996049, 
4126996050, 4126996051, 412699605
2, 4126996053, 4126996054, 4126996055, 4126996056, 4126996057, 4126996058, 
4126996059, 4126996060, 4126996061, 4126996062, 4126996063, 4126996064, 
4126996065, 4126996066, 4126996067, 41
26996068, 4126996069, 4126996070, 4126996071, 4126996072, 4126996073, 
4126996074, 4126996075, 4126996076, 4126996077, 4126996078, 4126996079, 
4126996080, 4126996081, 4126996082, 4126996
083, 4126996084, 4126996085, 4126996086, 4126996087, 4126996088, 4126996089, 
4126996090, 4126996091, 4126996092, 4126996093, 4126996094, 4126996095, 
4126996096, 4126996097, 4126996098,
4126996099, 4126996100, 4126996101, 4126996102, 4126996103, 4126996104, 
4126996105, 4126996106, 4126996107, 4126996108, 4126996109, 4126996110, 
4126996111, 4126996112, 4126996113, 41269
96114, 4126996115, 4126996116, 4126996117, 4126996118, 4126996119, 4126996120, 
4126996121, 4126996122, 4126996123, 4126996124, 4126996125, 4126996126, 
4126996127, 4126996128, 4126996129
, 4126996130, 4126996131, 4126996132, 4126996133, 4126996134, 4126996135, 
4126996136, 4126996137, 4126996138, 4126996139, 4126996140, 4126996141, 
4126996142, 4071685845, 4071685846, 407
1685847, 4071685848, 4071685849, 4071685850, 4071685851, 4071685852, 
4071685853, 4071685854, 4071685855, 4071685856, 4071685857, 4071685858, 
4071685859, 4071685860, 4071685861, 40716858
62, 4071685863, 4071685864, 4071685865, 4071685866, 4071685867, 4071685868, 
4071685869, 4071685870, 4071685871, 4071685872, 4071685873, 4071685874, 
4071685875, 4071685876, 4071685877, 4
071685878, 4071685879, 4071685880, 4071685881, 4071685882, 4071685883, 
4071685884, 4071685885, 4071685886, 4071685887, 4071685888, 4071685889, 
4071685890, 4071685891, 4071685892, 407168
5893, 4071685894, 4071685895, 4071685896, 4071685897, 4071685898, 4071685899, 
4071685900, 4071685901, 4071685902, 4071685903, 4071685904, 4071685905, 
4071685906, 4071685907, 4071685908,
 4071685909, 4071685910, 4071685911, 4071685912, 4071685913, 4071685914, 
4071685915, 4071685916, 4071685917, 4071685918, 4071685919, 4071685920, 
4071685921, 4071685922, 4071685923, 4071
685924, 4071685925, 4071685926, 4071685927, 4071685928, 4071685929, 4071685930, 
4071685931, 4071685932, 4071685933, 4071685934, 4071685935, 4071685936, 
4071685937, 4071685938, 407168593
9, 4071685940, 4071685941, 4071685942, 4071685943, 4071685944, 4071685945, 
4071685946, 4071685947, 4071685948, 4071685949, 4071685950, 4071685951, 
4071685952, 4071685953, 4071685954, 40
71685955, 4071685956, 4071685957, 4071685958, 4071685959, 4071685960, 
4071685961, 4071685962, 4071685963, 4071685964, 4071685965, 4071685966, 
4071685967, 4071685968, 4071685969, 4071685
970)
        at kafka.log.Log.$anonfun$append$2(Log.scala:1165)
        at kafka.log.Log.append(Log.scala:2409)
        at kafka.log.Log.appendAsFollower(Log.scala:1073)
        at 
kafka.cluster.Partition.doAppendRecordsToFollowerOrFutureReplica(Partition.scala:1036)
        at 
kafka.cluster.Partition.appendRecordsToFollowerOrFutureReplica(Partition.scala:1043)
        at 
kafka.server.ReplicaFetcherThread.processPartitionData(ReplicaFetcherThread.scala:172)
        at 
kafka.server.AbstractFetcherThread.$anonfun$processFetchRequest$7(AbstractFetcherThread.scala:339)
        at 
kafka.server.AbstractFetcherThread.$anonfun$processFetchRequest$6(AbstractFetcherThread.scala:326)
        at 
kafka.server.AbstractFetcherThread.$anonfun$processFetchRequest$6$adapted(AbstractFetcherThread.scala:325)
        at 
kafka.server.AbstractFetcherThread$$Lambda$1298/1889097333.apply(Unknown Source)
        at 
kafka.utils.Implicits$MapExtensionMethods$.$anonfun$forKeyValue$1(Implicits.scala:62)
        at 

[GitHub] [kafka] showuon commented on a diff in pull request #12748: KAFKA-13715: add generationId field in subscription

2022-11-16 Thread GitBox


showuon commented on code in PR #12748:
URL: https://github.com/apache/kafka/pull/12748#discussion_r1024795520


##
clients/src/main/java/org/apache/kafka/clients/consumer/internals/AbstractStickyAssignor.java:
##
@@ -162,6 +162,17 @@ private boolean allSubscriptionsEqual(Set 
allTopics,
 return isAllSubscriptionsEqual;
 }
 
+// visible for testing
+MemberData memberDataFromSubscription(Subscription subscription) {
+if (!subscription.ownedPartitions().isEmpty() && 
subscription.generationId().isPresent()) {

Review Comment:
   I know what you mean now. Yes, for `subscription v3` check, we should only 
check generationId. Will update it and add a test for it.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] akhileshchg commented on a diff in pull request #12815: KIP-866 Part 1

2022-11-16 Thread GitBox


akhileshchg commented on code in PR #12815:
URL: https://github.com/apache/kafka/pull/12815#discussion_r1024674845


##
metadata/src/main/java/org/apache/kafka/controller/QuorumController.java:
##
@@ -1571,6 +1638,10 @@ private void resetToEmptyState() {
  */
 private final FeatureControlManager featureControl;
 
+private final KRaftMetadataListener listener;
+
+public final MigrationListener migrationListener = new 
MigrationListener(); // TODO clean this up

Review Comment:
   These two names get confusing quickly. In my understanding, 
`KRaftMetadataListener` is for replicating data consistently to Zookeeper from 
the KRaft metadata log and `MigrationListener` is for migrating Zookepeer data 
to KRaft Metadata log. 
   
   Can we rename `migrationListener` to `kraftToZkMigrationHandler` or 
something to that effect?
   and `listener` to zkToKRaftMigrationHandler`?



##
core/src/main/scala/kafka/zk/KafkaZkClient.scala:
##
@@ -156,6 +157,76 @@ class KafkaZkClient private[zk] (zooKeeperClient: 
ZooKeeperClient, isSecure: Boo
 tryCreateControllerZNodeAndIncrementEpoch()
   }
 
+  /**
+   * Registers a given KRaft controller in zookeeper as the active controller. 
Unlike the ZK equivalent of this method,
+   * this creates /controller as a persistent znode. This prevents ZK brokers 
from attempting to claim the controller
+   * leadership during a KRaft leadership failover.
+   *
+   * This method is called at the beginning of a KRaft migration and during 
subsequent KRaft leadership changes during
+   * the migration.
+   *
+   * To ensure that the KRaft controller epoch proceeds the current ZK 
controller epoch, this registration algorithm
+   * uses a conditional update on the /controller_epoch znode. If a new ZK 
controller is elected during this method,
+   * the multi request transaction will fail and this method will return None.
+   *
+   * @param kraftControllerId ID of the KRaft controller node
+   * @param kraftControllerEpoch Epoch of the KRaft controller node
+   * @return An optional of the new zkVersion of /controller_epoch. None if we 
could not register the KRaft controller.
+   */
+  def tryRegisterKRaftControllerAsActiveController(kraftControllerId: Int, 
kraftControllerEpoch: Int): Option[Int] = {

Review Comment:
   Just a design consideration, do you think this must be part of the 
MigrationClient instead?



##
core/src/main/scala/kafka/migration/ZkMigrationClient.scala:
##
@@ -0,0 +1,359 @@
+package kafka.migration
+
+import kafka.api.LeaderAndIsr
+import kafka.cluster.{Broker, EndPoint}
+import kafka.controller.{ControllerChannelManager, 
LeaderIsrAndControllerEpoch, ReplicaAssignment}
+import kafka.migration.ZkMigrationClient.brokerToBrokerRegistration
+import kafka.server.{ConfigEntityName, ConfigType, ZkAdminManager}
+import kafka.utils.Logging
+import kafka.zk.TopicZNode.TopicIdReplicaAssignment
+import kafka.zk._
+import kafka.zookeeper._
+import org.apache.kafka.common.config.ConfigResource
+import org.apache.kafka.common.errors.ControllerMovedException
+import org.apache.kafka.common.metadata.ClientQuotaRecord.EntityData
+import org.apache.kafka.common.metadata._
+import org.apache.kafka.common.requests.{AbstractControlRequest, 
AbstractResponse}
+import org.apache.kafka.common.{Endpoint, TopicPartition, Uuid}
+import org.apache.kafka.metadata.{BrokerRegistration, PartitionRegistration, 
VersionRange}
+import org.apache.kafka.migration._
+import org.apache.kafka.server.common.{ApiMessageAndVersion, MetadataVersion}
+import org.apache.zookeeper.CreateMode
+
+import java.util
+import java.util.function.Consumer
+import java.util.{Collections, Optional}
+import scala.collection.Seq
+import scala.jdk.CollectionConverters._
+import scala.jdk.OptionConverters._
+
+object ZkMigrationClient {
+  def brokerToBrokerRegistration(broker: Broker, epoch: Long): 
ZkBrokerRegistration = {
+  val registration = new BrokerRegistration(broker.id, epoch, 
Uuid.ZERO_UUID,

Review Comment:
   Just wondering, do we use the incarnation id for anything?



##
core/src/main/scala/kafka/migration/ZkMigrationClient.scala:
##
@@ -0,0 +1,359 @@
+package kafka.migration
+
+import kafka.api.LeaderAndIsr
+import kafka.cluster.{Broker, EndPoint}
+import kafka.controller.{ControllerChannelManager, 
LeaderIsrAndControllerEpoch, ReplicaAssignment}
+import kafka.migration.ZkMigrationClient.brokerToBrokerRegistration
+import kafka.server.{ConfigEntityName, ConfigType, ZkAdminManager}
+import kafka.utils.Logging
+import kafka.zk.TopicZNode.TopicIdReplicaAssignment
+import kafka.zk._
+import kafka.zookeeper._
+import org.apache.kafka.common.config.ConfigResource
+import org.apache.kafka.common.errors.ControllerMovedException
+import org.apache.kafka.common.metadata.ClientQuotaRecord.EntityData
+import org.apache.kafka.common.metadata._
+import org.apache.kafka.common.requests.{AbstractControlRequest, 
AbstractResponse}
+import 

[GitHub] [kafka] jeffkbkim commented on a diff in pull request #12847: KAFKA-14367; Add `SyncGroup` to the new `GroupCoordinator` interface

2022-11-16 Thread GitBox


jeffkbkim commented on code in PR #12847:
URL: https://github.com/apache/kafka/pull/12847#discussion_r1024721499


##
core/src/test/scala/unit/kafka/coordinator/group/GroupCoordinatorAdapterTest.scala:
##
@@ -135,4 +133,71 @@ class GroupCoordinatorAdapterTest {
 assertEquals(expectedData, future.get())
   }
 
+  @ParameterizedTest
+  @ApiKeyVersionsSource(apiKey = ApiKeys.SYNC_GROUP)
+  def testSyncGroup(version: Short): Unit = {
+val groupCoordinator = mock(classOf[GroupCoordinator])
+val adapter = new GroupCoordinatorAdapter(groupCoordinator)
+
+val ctx = makeContext(version)
+val data = new SyncGroupRequestData()
+  .setGroupId("group")
+  .setMemberId("member1")
+  .setGroupInstanceId("instance")
+  .setProtocolType("consumer")
+  .setProtocolName("range")
+  .setGenerationId(10)
+  .setAssignments(List(
+new SyncGroupRequestData.SyncGroupRequestAssignment()
+  .setMemberId("member1")
+  .setAssignment("member1".getBytes()),
+new SyncGroupRequestData.SyncGroupRequestAssignment()
+  .setMemberId("member2")
+  .setAssignment("member2".getBytes())
+  ).asJava)
+
+val future = adapter.syncGroup(ctx, data)
+assertFalse(future.isDone)
+
+val capturedAssignment: ArgumentCaptor[Map[String, Array[Byte]]] =
+  ArgumentCaptor.forClass(classOf[Map[String, Array[Byte]]])
+val capturedCallback: ArgumentCaptor[SyncGroupCallback] =
+  ArgumentCaptor.forClass(classOf[SyncGroupCallback])
+
+verify(groupCoordinator).handleSyncGroup(
+  ArgumentMatchers.eq(data.groupId),
+  ArgumentMatchers.eq(data.generationId),
+  ArgumentMatchers.eq(data.memberId),
+  ArgumentMatchers.eq(Some(data.protocolType)),
+  ArgumentMatchers.eq(Some(data.protocolName)),
+  ArgumentMatchers.eq(Some(data.groupInstanceId)),
+  capturedAssignment.capture(),
+  capturedCallback.capture(),
+  ArgumentMatchers.eq(RequestLocal(ctx.bufferSupplier))
+)
+
+assertEquals(Map(
+  "member1" -> "member1",
+  "member2" -> "member2",
+), capturedAssignment.getValue.map { case (member, metadata) =>
+  (member, new String(metadata))
+}.toMap)

Review Comment:
   do we need toMap?



##
core/src/test/scala/unit/kafka/coordinator/group/GroupCoordinatorAdapterTest.scala:
##
@@ -135,4 +133,71 @@ class GroupCoordinatorAdapterTest {
 assertEquals(expectedData, future.get())
   }
 
+  @ParameterizedTest
+  @ApiKeyVersionsSource(apiKey = ApiKeys.SYNC_GROUP)
+  def testSyncGroup(version: Short): Unit = {
+val groupCoordinator = mock(classOf[GroupCoordinator])
+val adapter = new GroupCoordinatorAdapter(groupCoordinator)
+
+val ctx = makeContext(version)
+val data = new SyncGroupRequestData()
+  .setGroupId("group")
+  .setMemberId("member1")
+  .setGroupInstanceId("instance")
+  .setProtocolType("consumer")
+  .setProtocolName("range")
+  .setGenerationId(10)
+  .setAssignments(List(
+new SyncGroupRequestData.SyncGroupRequestAssignment()
+  .setMemberId("member1")
+  .setAssignment("member1".getBytes()),
+new SyncGroupRequestData.SyncGroupRequestAssignment()
+  .setMemberId("member2")
+  .setAssignment("member2".getBytes())
+  ).asJava)
+
+val future = adapter.syncGroup(ctx, data)
+assertFalse(future.isDone)
+
+val capturedAssignment: ArgumentCaptor[Map[String, Array[Byte]]] =
+  ArgumentCaptor.forClass(classOf[Map[String, Array[Byte]]])
+val capturedCallback: ArgumentCaptor[SyncGroupCallback] =
+  ArgumentCaptor.forClass(classOf[SyncGroupCallback])
+
+verify(groupCoordinator).handleSyncGroup(
+  ArgumentMatchers.eq(data.groupId),
+  ArgumentMatchers.eq(data.generationId),
+  ArgumentMatchers.eq(data.memberId),
+  ArgumentMatchers.eq(Some(data.protocolType)),
+  ArgumentMatchers.eq(Some(data.protocolName)),
+  ArgumentMatchers.eq(Some(data.groupInstanceId)),
+  capturedAssignment.capture(),
+  capturedCallback.capture(),
+  ArgumentMatchers.eq(RequestLocal(ctx.bufferSupplier))
+)
+
+assertEquals(Map(
+  "member1" -> "member1",
+  "member2" -> "member2",
+), capturedAssignment.getValue.map { case (member, metadata) =>
+  (member, new String(metadata))
+}.toMap)
+
+capturedCallback.getValue.apply(SyncGroupResult(
+  error = Errors.NONE,
+  protocolType = Some("consumer"),
+  protocolName = Some("range"),
+  memberAssignment = "member1".getBytes()
+))
+
+val expectedResponseData = new SyncGroupResponseData()
+  .setErrorCode(Errors.NONE.code)
+  .setProtocolType("consumer")
+  .setProtocolName("range")

Review Comment:
   generally i see a lot of literal strings being used across the tests in 
GroupCoordinatorAdapterTest. what's the reason for not re-using them, is it the 
convention?



-- 
This is an automated message from 

[GitHub] [kafka] showuon commented on a diff in pull request #12748: KAFKA-13715: add generationId field in subscription

2022-11-16 Thread GitBox


showuon commented on code in PR #12748:
URL: https://github.com/apache/kafka/pull/12748#discussion_r1024726967


##
clients/src/main/java/org/apache/kafka/clients/consumer/internals/AbstractStickyAssignor.java:
##
@@ -162,6 +162,17 @@ private boolean allSubscriptionsEqual(Set 
allTopics,
 return isAllSubscriptionsEqual;
 }
 
+// visible for testing
+MemberData memberDataFromSubscription(Subscription subscription) {
+if (!subscription.ownedPartitions().isEmpty() && 
subscription.generationId().isPresent()) {

Review Comment:
   > For instance, this could happen if the group has more members than 
partitions.
   
   In that case, the `subscription.generationId()` should still contain valid 
number. Is that right?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[jira] [Updated] (KAFKA-14382) StreamThreads can miss rebalance events when processing records during a rebalance

2022-11-16 Thread A. Sophie Blee-Goldman (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-14382?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

A. Sophie Blee-Goldman updated KAFKA-14382:
---
Fix Version/s: 3.4.0

> StreamThreads can miss rebalance events when processing records during a 
> rebalance
> --
>
> Key: KAFKA-14382
> URL: https://issues.apache.org/jira/browse/KAFKA-14382
> Project: Kafka
>  Issue Type: Bug
>  Components: streams
>Reporter: A. Sophie Blee-Goldman
>Priority: Major
> Fix For: 3.4.0
>
>
> One of the main improvements introduced by the cooperative protocol was the 
> ability to continue processing records during a rebalance. In Streams, we 
> take advantage of this by polling with a timeout of 0 when a rebalance is/has 
> been in progress, so it can return immediately and continue on through the 
> main loop to process new records. The main poll loop uses an algorithm based 
> on the max.poll.interval.ms to ensure the StreamThread returns to call #poll 
> in time to stay in the consumer group.
>  
> Generally speaking, it should exit the processing loop and invoke poll within 
> a few minutes at most based on the poll interval, though typically it will 
> break out much sooner once it's used up all the records from the last poll 
> (based on the max.poll.records config which Streams sets to 1,000 by 
> default). However, if doing heavy processing or setting a higher 
> max.poll.records, the thread may continue processing for more than a few 
> seconds. If it had sent out a JoinGroup request before going on to process 
> and was waiting for its JoinGroup response, then once it does return to 
> invoke #poll it will process this response and send out a SyncGroup – but if 
> the processing took too long, this SyncGroup may immediately fail with the 
> REBALANCE_IN_PROGRESS error.
>  
> Essentially, while the thread was processing the group leader will itself be 
> processing the JoinGroup subscriptions of all members and generating an 
> assignment, then sending this back in its SyncGroup. This may take only a few 
> seconds or less, and the group coordinator will not yet have noticed (or 
> care) that one of the consumers hasn't sent a SyncGroup – it will just return 
> the assigned partitions in the SyncGroup request of the members who have 
> responded in time, and "complete" the rebalance in their eyes. But if the 
> assignment involved moving any partitions from one consumer to another, then 
> it will need to trigger a followup rebalance right away to finish assigning 
> those partitions which were revoked in the previous rebalance. This is what 
> causes a new rebalance to be kicked off just seconds after the first one 
> began.
>  
> If the consumer that was stuck processing was among those who needed to 
> revoke partitions, this can lead to repeating rebalances – since it fails the 
> SyncGroup of the 1st rebalance it never receives the assignment for it and 
> never knows to revoke those partitions, meaning it will rejoin for the new 
> rebalance still claiming them among its ownedPartitions. When the assignor 
> generates the same assignment for the 2nd rebalance, it will again see that 
> some partitions need to be revoked and will therefore trigger yet another new 
> rebalance after finishing the 2nd. This can go on for as long as the 
> StreamThreads are struggling to finish the JoinGroup phase in time due to 
> processing.
>  
> Note that the best workaround at the moment is probably to just set a lower 
> max.poll.records to reduce the processing loop duration



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Assigned] (KAFKA-14382) StreamThreads can miss rebalance events when processing records during a rebalance

2022-11-16 Thread A. Sophie Blee-Goldman (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-14382?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

A. Sophie Blee-Goldman reassigned KAFKA-14382:
--

Assignee: A. Sophie Blee-Goldman

> StreamThreads can miss rebalance events when processing records during a 
> rebalance
> --
>
> Key: KAFKA-14382
> URL: https://issues.apache.org/jira/browse/KAFKA-14382
> Project: Kafka
>  Issue Type: Bug
>  Components: streams
>Reporter: A. Sophie Blee-Goldman
>Assignee: A. Sophie Blee-Goldman
>Priority: Major
> Fix For: 3.4.0
>
>
> One of the main improvements introduced by the cooperative protocol was the 
> ability to continue processing records during a rebalance. In Streams, we 
> take advantage of this by polling with a timeout of 0 when a rebalance is/has 
> been in progress, so it can return immediately and continue on through the 
> main loop to process new records. The main poll loop uses an algorithm based 
> on the max.poll.interval.ms to ensure the StreamThread returns to call #poll 
> in time to stay in the consumer group.
>  
> Generally speaking, it should exit the processing loop and invoke poll within 
> a few minutes at most based on the poll interval, though typically it will 
> break out much sooner once it's used up all the records from the last poll 
> (based on the max.poll.records config which Streams sets to 1,000 by 
> default). However, if doing heavy processing or setting a higher 
> max.poll.records, the thread may continue processing for more than a few 
> seconds. If it had sent out a JoinGroup request before going on to process 
> and was waiting for its JoinGroup response, then once it does return to 
> invoke #poll it will process this response and send out a SyncGroup – but if 
> the processing took too long, this SyncGroup may immediately fail with the 
> REBALANCE_IN_PROGRESS error.
>  
> Essentially, while the thread was processing the group leader will itself be 
> processing the JoinGroup subscriptions of all members and generating an 
> assignment, then sending this back in its SyncGroup. This may take only a few 
> seconds or less, and the group coordinator will not yet have noticed (or 
> care) that one of the consumers hasn't sent a SyncGroup – it will just return 
> the assigned partitions in the SyncGroup request of the members who have 
> responded in time, and "complete" the rebalance in their eyes. But if the 
> assignment involved moving any partitions from one consumer to another, then 
> it will need to trigger a followup rebalance right away to finish assigning 
> those partitions which were revoked in the previous rebalance. This is what 
> causes a new rebalance to be kicked off just seconds after the first one 
> began.
>  
> If the consumer that was stuck processing was among those who needed to 
> revoke partitions, this can lead to repeating rebalances – since it fails the 
> SyncGroup of the 1st rebalance it never receives the assignment for it and 
> never knows to revoke those partitions, meaning it will rejoin for the new 
> rebalance still claiming them among its ownedPartitions. When the assignor 
> generates the same assignment for the 2nd rebalance, it will again see that 
> some partitions need to be revoked and will therefore trigger yet another new 
> rebalance after finishing the 2nd. This can go on for as long as the 
> StreamThreads are struggling to finish the JoinGroup phase in time due to 
> processing.
>  
> Note that the best workaround at the moment is probably to just set a lower 
> max.poll.records to reduce the processing loop duration



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[GitHub] [kafka] ableegoldman opened a new pull request, #12869: KAFKA-14382: wait for current rebalance to complete before triggering followup

2022-11-16 Thread GitBox


ableegoldman opened a new pull request, #12869:
URL: https://github.com/apache/kafka/pull/12869

   Fix for the subtle bug described in KAFKA-14382 that was causing rebalancing 
loops. If we trigger a new rebalance while the current one is still ongoing, it 
may cause some members to fail the first rebalance if they weren't able to send 
the SyncGroup request in time (for example due to processing records during the 
rebalance). This means those consumers never receive their assignment from the 
original rebalance, and won't revoke any partitions they might have needed to. 
This can send the group into a loop as each rebalance schedules a new followup 
cooperative rebalance due to partitions that need to be revoked, and each 
followup rebalance causes some consumer(s) to miss the SyncGroup and never 
revoke those partitions.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] jeffkbkim commented on a diff in pull request #12845: KAFKA-14367; Add `JoinGroup` to the new `GroupCoordinator` interface

2022-11-16 Thread GitBox


jeffkbkim commented on code in PR #12845:
URL: https://github.com/apache/kafka/pull/12845#discussion_r1024713679


##
core/src/main/scala/kafka/server/KafkaApis.scala:
##
@@ -1646,73 +1649,46 @@ class KafkaApis(val requestChannel: RequestChannel,
 }
   }
 
+  private def makeGroupCoordinatorRequestContextFrom(

Review Comment:
   the `From` part i guess. i don't think i've seen a method name like it, 
though it does make sense



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] jeffkbkim commented on a diff in pull request #12845: KAFKA-14367; Add `JoinGroup` to the new `GroupCoordinator` interface

2022-11-16 Thread GitBox


jeffkbkim commented on code in PR #12845:
URL: https://github.com/apache/kafka/pull/12845#discussion_r1024712573


##
core/src/test/scala/unit/kafka/coordinator/group/GroupCoordinatorAdapterTest.scala:
##
@@ -0,0 +1,138 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package kafka.coordinator.group
+
+import 
kafka.coordinator.group.GroupCoordinatorConcurrencyTest.JoinGroupCallback
+import kafka.server.RequestLocal
+
+import org.apache.kafka.common.message.{JoinGroupRequestData, 
JoinGroupResponseData}
+import 
org.apache.kafka.common.message.JoinGroupRequestData.JoinGroupRequestProtocol
+import 
org.apache.kafka.common.message.JoinGroupResponseData.JoinGroupResponseMember
+import org.apache.kafka.common.protocol.{ApiKeys, Errors}
+import org.apache.kafka.common.utils.BufferSupplier
+import org.apache.kafka.common.utils.annotation.ApiKeyVersionsSource
+import org.apache.kafka.coordinator.group.GroupCoordinatorRequestContext
+
+import org.junit.jupiter.api.Assertions.{assertEquals, assertFalse, assertTrue}
+import org.junit.jupiter.params.ParameterizedTest
+import org.mockito.{ArgumentCaptor, ArgumentMatchers}
+import org.mockito.Mockito.{mock, verify}
+
+import java.net.InetAddress
+import scala.jdk.CollectionConverters._
+
+class GroupCoordinatorAdapterTest {
+
+  private def makeContext(
+apiVersion: Short
+  ): GroupCoordinatorRequestContext = {
+new GroupCoordinatorRequestContext(
+  apiVersion,
+  "client",
+  InetAddress.getLocalHost,
+  BufferSupplier.create()
+)
+  }
+
+  @ParameterizedTest
+  @ApiKeyVersionsSource(apiKey = ApiKeys.JOIN_GROUP)
+  def testJoinGroup(version: Short): Unit = {

Review Comment:
   that makes sense. thanks!



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[jira] [Commented] (KAFKA-13639) NotEnoughReplicasException for __consumer_offsets topic due to out of order offset

2022-11-16 Thread HaiyuanZhao (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-13639?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17635089#comment-17635089
 ] 

HaiyuanZhao commented on KAFKA-13639:
-

[~ijuma] Hi, We also have seen similar behavior with java 1.8.0_45. 

In addition, we encountered this issue very large values, and not just topic 
__consumer_offset. The exception log is as followed. Hope this can help.
{code:java}
[2022-11-16 22:54:52,147] ERROR [ReplicaFetcher replicaId=0, leaderId=1, 
fetcherId=419] Unexpected error occurred while processing data for partition 
test-topic-test-topic500--2 at
offset 7434 (kafka.server.ReplicaFetcherThread)
kafka.common.OffsetsOutOfOrderException: Out of order offsets found in append 
to test-topic-test-topic500--2: ArrayBuffer(4126996017, 4126996018, 
4126996019, 4126996020, 4126996021,
 4126996022, 4126996023, 4126996024, 4126996025, 4126996026, 4126996027, 
4126996028, 4126996029, 4126996030, 4126996031, 4126996032, 4126996033, 
4126996034, 4126996035, 4126996036, 4126
996037, 4126996038, 4126996039, 4126996040, 4126996041, 4126996042, 4126996043, 
4126996044, 4126996045, 4126996046, 4126996047, 4126996048, 4126996049, 
4126996050, 4126996051, 412699605
2, 4126996053, 4126996054, 4126996055, 4126996056, 4126996057, 4126996058, 
4126996059, 4126996060, 4126996061, 4126996062, 4126996063, 4126996064, 
4126996065, 4126996066, 4126996067, 41
26996068, 4126996069, 4126996070, 4126996071, 4126996072, 4126996073, 
4126996074, 4126996075, 4126996076, 4126996077, 4126996078, 4126996079, 
4126996080, 4126996081, 4126996082, 4126996
083, 4126996084, 4126996085, 4126996086, 4126996087, 4126996088, 4126996089, 
4126996090, 4126996091, 4126996092, 4126996093, 4126996094, 4126996095, 
4126996096, 4126996097, 4126996098,
4126996099, 4126996100, 4126996101, 4126996102, 4126996103, 4126996104, 
4126996105, 4126996106, 4126996107, 4126996108, 4126996109, 4126996110, 
4126996111, 4126996112, 4126996113, 41269
96114, 4126996115, 4126996116, 4126996117, 4126996118, 4126996119, 4126996120, 
4126996121, 4126996122, 4126996123, 4126996124, 4126996125, 4126996126, 
4126996127, 4126996128, 4126996129
, 4126996130, 4126996131, 4126996132, 4126996133, 4126996134, 4126996135, 
4126996136, 4126996137, 4126996138, 4126996139, 4126996140, 4126996141, 
4126996142, 4071685845, 4071685846, 407
1685847, 4071685848, 4071685849, 4071685850, 4071685851, 4071685852, 
4071685853, 4071685854, 4071685855, 4071685856, 4071685857, 4071685858, 
4071685859, 4071685860, 4071685861, 40716858
62, 4071685863, 4071685864, 4071685865, 4071685866, 4071685867, 4071685868, 
4071685869, 4071685870, 4071685871, 4071685872, 4071685873, 4071685874, 
4071685875, 4071685876, 4071685877, 4
071685878, 4071685879, 4071685880, 4071685881, 4071685882, 4071685883, 
4071685884, 4071685885, 4071685886, 4071685887, 4071685888, 4071685889, 
4071685890, 4071685891, 4071685892, 407168
5893, 4071685894, 4071685895, 4071685896, 4071685897, 4071685898, 4071685899, 
4071685900, 4071685901, 4071685902, 4071685903, 4071685904, 4071685905, 
4071685906, 4071685907, 4071685908,
 4071685909, 4071685910, 4071685911, 4071685912, 4071685913, 4071685914, 
4071685915, 4071685916, 4071685917, 4071685918, 4071685919, 4071685920, 
4071685921, 4071685922, 4071685923, 4071
685924, 4071685925, 4071685926, 4071685927, 4071685928, 4071685929, 4071685930, 
4071685931, 4071685932, 4071685933, 4071685934, 4071685935, 4071685936, 
4071685937, 4071685938, 407168593
9, 4071685940, 4071685941, 4071685942, 4071685943, 4071685944, 4071685945, 
4071685946, 4071685947, 4071685948, 4071685949, 4071685950, 4071685951, 
4071685952, 4071685953, 4071685954, 40
71685955, 4071685956, 4071685957, 4071685958, 4071685959, 4071685960, 
4071685961, 4071685962, 4071685963, 4071685964, 4071685965, 4071685966, 
4071685967, 4071685968, 4071685969, 4071685
970)
        at kafka.log.Log.$anonfun$append$2(Log.scala:1165)
        at kafka.log.Log.append(Log.scala:2409)
        at kafka.log.Log.appendAsFollower(Log.scala:1073)
        at 
kafka.cluster.Partition.doAppendRecordsToFollowerOrFutureReplica(Partition.scala:1036)
        at 
kafka.cluster.Partition.appendRecordsToFollowerOrFutureReplica(Partition.scala:1043)
        at 
kafka.server.ReplicaFetcherThread.processPartitionData(ReplicaFetcherThread.scala:172)
        at 
kafka.server.AbstractFetcherThread.$anonfun$processFetchRequest$7(AbstractFetcherThread.scala:339)
        at 
kafka.server.AbstractFetcherThread.$anonfun$processFetchRequest$6(AbstractFetcherThread.scala:326)
        at 
kafka.server.AbstractFetcherThread.$anonfun$processFetchRequest$6$adapted(AbstractFetcherThread.scala:325)
        at 
kafka.server.AbstractFetcherThread$$Lambda$1298/1889097333.apply(Unknown Source)
        at 
kafka.utils.Implicits$MapExtensionMethods$.$anonfun$forKeyValue$1(Implicits.scala:62)
        at 
kafka.server.AbstractFetcherThread$$Lambda$1332/1703195329.apply(Unknown Source)
        

[GitHub] [kafka] pprovenzano commented on pull request #12843: KAFKA-14375: Remove use of "authorizer-properties" from EndToEndAuthorizerTest

2022-11-16 Thread GitBox


pprovenzano commented on PR #12843:
URL: https://github.com/apache/kafka/pull/12843#issuecomment-1317984289

   I have manually tested all the failed tests from the checks and they all 
pass fine locally.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] ableegoldman commented on a diff in pull request #12803: KAFKA-13602: Adding ability to multicast records.

2022-11-16 Thread GitBox


ableegoldman commented on code in PR #12803:
URL: https://github.com/apache/kafka/pull/12803#discussion_r1024690215


##
streams/src/main/java/org/apache/kafka/streams/processor/StreamPartitioner.java:
##
@@ -58,5 +62,24 @@
  * @param numPartitions the total number of partitions
  * @return an integer between 0 and {@code numPartitions-1}, or {@code 
null} if the default partitioning logic should be used
  */
+@Deprecated
 Integer partition(String topic, K key, V value, int numPartitions);
+
+/**
+ * Determine the partition numbers to which a record, with the given key 
and value and the current number
+ * of partitions, should be multi-casted to.
+ * @param topic the topic name this record is sent to
+ * @param key the key of the record
+ * @param value the value of the record
+ * @param numPartitions the total number of partitions
+ * @return an Optional of Set of integers between 0 and {@code 
numPartitions-1},
+ * Empty optional means using default partitioner
+ * Optional of an empty set means the record won't be sent to any 
partitions i.e drop it.
+ * Optional of Set of integers means the partitions to which the record 
should be sent to.
+ * */
+default Optional> partitions(String topic, K key, V value, 
int numPartitions) {
+final Integer partition = partition(topic, key, value, numPartitions);
+return partition == null ? Optional.empty() : 
Optional.of(Collections.singleton(partition(topic, key, value, numPartitions)));

Review Comment:
   ```suggestion
   return partition == null ? Optional.empty() : 
Optional.of(Collections.singleton(partition));
   ```



##
streams/src/main/java/org/apache/kafka/streams/processor/StreamPartitioner.java:
##
@@ -58,5 +62,24 @@
  * @param numPartitions the total number of partitions
  * @return an integer between 0 and {@code numPartitions-1}, or {@code 
null} if the default partitioning logic should be used
  */
+@Deprecated
 Integer partition(String topic, K key, V value, int numPartitions);
+
+/**
+ * Determine the partition numbers to which a record, with the given key 
and value and the current number
+ * of partitions, should be multi-casted to.

Review Comment:
   nit: wording here is a bit awkward, sounds like we're talking about the 
current number of partitions of the record, not the topic -- maybe something 
like this?
   ```suggestion
* Determine the number(s) of the partition(s) to which a record with 
the given key and value should be sent, 
* for the given topic and current partition count
   ```



##
streams/src/main/java/org/apache/kafka/streams/processor/internals/RecordCollectorImpl.java:
##
@@ -150,16 +151,29 @@ public  void send(final String topic,
 );
 }
 if (partitions.size() > 0) {
-partition = partitioner.partition(topic, key, value, 
partitions.size());
+final Optional> maybeMulticastPartitions = 
partitioner.partitions(topic, key, value, partitions.size());
+if (!maybeMulticastPartitions.isPresent()) {
+// New change. Use default partitioner

Review Comment:
   ```suggestion
   // A null//empty partition indicates we should use the 
default partitioner
   ```



##
streams/src/main/java/org/apache/kafka/streams/kstream/internals/KTableImpl.java:
##
@@ -1059,6 +1071,7 @@ private  KTable 
doJoinOnForeignKey(final KTable forei
 Objects.requireNonNull(tableJoined, "tableJoined can't be null");
 Objects.requireNonNull(materialized, "materialized can't be null");
 
+

Review Comment:
   nit: accidental extra line?



##
streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamsMetadataState.java:
##
@@ -459,12 +461,23 @@ private List 
rebuildMetadataForSingleTopology(final Map>, Integer> getPartition = 
maybeMulticastPartitions -> {
+if (!maybeMulticastPartitions.isPresent()) {
+return null;
+}
+if (maybeMulticastPartitions.get().size() != 1) {
+throw new IllegalArgumentException("The partitions returned by 
StreamPartitioner#partitions method when used for fetching KeyQueryMetadata for 
key should be a singleton set");

Review Comment:
   Hm...the IQ case seems to complicate things a bit, I think we may have 
missed this during the KIP discussion/design. Because there's no reason to 
enforce that the partitioner return only a single partition, right? In fact 
this would even be incorrect in some cases, if the partitioner had originally 
opted to send this record/key to multiple partitions.
   
   But obviously the current API doesn't allow for that. I think we need to 
make a small update to the KIP to account for the possibility of multiple 
partitions here. We can just modify the `KeyQueryMetadata` class to 

[jira] [Updated] (KAFKA-14397) Idempotent producer may bump epoch and reset sequence numbers prematurely

2022-11-16 Thread Jason Gustafson (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-14397?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Jason Gustafson updated KAFKA-14397:

Description: 
Suppose that idempotence is enabled in the producer and we send the following 
single-record batches to a partition leader:
 * A: epoch=0, seq=0
 * B: epoch=0, seq=1
 * C: epoch=0, seq=2

The partition leader receives all 3 of these batches and commits them to the 
log. However, the connection is lost before the `Produce` responses are 
received by the client. Subsequent retries by the producer all fail to be 
delivered.

It is possible in this scenario for the first batch `A` to reach the delivery 
timeout before the subsequence batches. This triggers the following check: 
[https://github.com/apache/kafka/blob/trunk/clients/src/main/java/org/apache/kafka/clients/producer/internals/Sender.java#L642.]
 Depending whether retries are exhausted, we may adjust sequence numbers.

The intuition behind this check is that if retries have not been exhausted, 
then we saw a fatal error and the batch could not have been written to the log. 
Hence we should bump the epoch and adjust the sequence numbers of the pending 
batches since they are presumed to be doomed to failure. So in this case, 
batches B and C might get reset with the bumped epoch:
 * B: epoch=1, seq=0
 * C: epoch=1, seq=1

If the producer is able to reach the partition leader before these batches are 
expired locally, then they may get written and committed to the log. This can 
result in duplicates.

The root of the issue is that this logic does not account for expiration of the 
delivery timeout. When the delivery timeout is reached, the number of retries 
is still likely much lower than the max allowed number of retries (which is 
`Int.MaxValue` by default).

  was:
Suppose that idempotence is enabled in the producer and we send the following 
single-record batches to a partition leader:
 * A: epoch=0, seq=0
 * B: epoch=0, seq=1
 * C: epoch=0, seq=2

The partition leader receives all 3 of these batches and commits them to the 
log. However, the connection is lost before the `Produce` responses are 
received by the client. Subsequent retries by the producer all fail to be 
delivered.

It is possible in this scenario for the first batch `A` to reach the delivery 
timeout before the subsequence batches. This triggers the following check: 
[https://github.com/apache/kafka/blob/trunk/clients/src/main/java/org/apache/kafka/clients/producer/internals/Sender.java#L642.]
 Depending whether retries are exhausted, we may adjust sequence numbers.

The intuition behind this check is that if retries have not been exhausted, 
then we saw a fatal error and the batch could not have been written to the log. 
Hence we should bump the epoch and adjust the sequence numbers of the pending 
batches since they are presumed to be doomed to failure. So in this case, 
batches B and C might get reset with the bumped epoch:
 * B: epoch=1, seq=0
 * C: epoch=1, seq=1

This can result in duplicate records in the log.

The root of the issue is that this logic does not account for expiration of the 
delivery timeout. When the delivery timeout is reached, the number of retries 
is still likely much lower than the max allowed number of retries (which is 
`Int.MaxValue` by default).


> Idempotent producer may bump epoch and reset sequence numbers prematurely
> -
>
> Key: KAFKA-14397
> URL: https://issues.apache.org/jira/browse/KAFKA-14397
> Project: Kafka
>  Issue Type: Bug
>Reporter: Jason Gustafson
>Assignee: Jason Gustafson
>Priority: Major
>
> Suppose that idempotence is enabled in the producer and we send the following 
> single-record batches to a partition leader:
>  * A: epoch=0, seq=0
>  * B: epoch=0, seq=1
>  * C: epoch=0, seq=2
> The partition leader receives all 3 of these batches and commits them to the 
> log. However, the connection is lost before the `Produce` responses are 
> received by the client. Subsequent retries by the producer all fail to be 
> delivered.
> It is possible in this scenario for the first batch `A` to reach the delivery 
> timeout before the subsequence batches. This triggers the following check: 
> [https://github.com/apache/kafka/blob/trunk/clients/src/main/java/org/apache/kafka/clients/producer/internals/Sender.java#L642.]
>  Depending whether retries are exhausted, we may adjust sequence numbers.
> The intuition behind this check is that if retries have not been exhausted, 
> then we saw a fatal error and the batch could not have been written to the 
> log. Hence we should bump the epoch and adjust the sequence numbers of the 
> pending batches since they are presumed to be doomed to failure. So in this 
> case, batches B and C might get reset with the bumped epoch:
>  

[jira] [Created] (KAFKA-14397) Idempotent producer may bump epoch and reset sequence numbers prematurely

2022-11-16 Thread Jason Gustafson (Jira)
Jason Gustafson created KAFKA-14397:
---

 Summary: Idempotent producer may bump epoch and reset sequence 
numbers prematurely
 Key: KAFKA-14397
 URL: https://issues.apache.org/jira/browse/KAFKA-14397
 Project: Kafka
  Issue Type: Bug
Reporter: Jason Gustafson
Assignee: Jason Gustafson


Suppose that idempotence is enabled in the producer and we send the following 
single-record batches to a partition leader:
 * A: epoch=0, seq=0
 * B: epoch=0, seq=1
 * C: epoch=0, seq=2

The partition leader receives all 3 of these batches and commits them to the 
log. However, the connection is lost before the `Produce` responses are 
received by the client. Subsequent retries by the producer all fail to be 
delivered.

It is possible in this scenario for the first batch `A` to reach the delivery 
timeout before the subsequence batches. This triggers the following check: 
[https://github.com/apache/kafka/blob/trunk/clients/src/main/java/org/apache/kafka/clients/producer/internals/Sender.java#L642.]
 Depending whether retries are exhausted, we may adjust sequence numbers.

The intuition behind this check is that if retries have not been exhausted, 
then we saw a fatal error and the batch could not have been written to the log. 
Hence we should bump the epoch and adjust the sequence numbers of the pending 
batches since they are presumed to be doomed to failure. So in this case, 
batches B and C might get reset with the bumped epoch:
 * B: epoch=1, seq=0
 * C: epoch=1, seq=1

This can result in duplicate records in the log.

The root of the issue is that this logic does not account for expiration of the 
delivery timeout. When the delivery timeout is reached, the number of retries 
is still likely much lower than the max allowed number of retries (which is 
`Int.MaxValue` by default).



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Updated] (KAFKA-14388) NPE When Retrieving StateStore with new Processor API

2022-11-16 Thread Bill Bejeck (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-14388?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Bill Bejeck updated KAFKA-14388:

Component/s: streams

> NPE When Retrieving StateStore with new Processor API
> -
>
> Key: KAFKA-14388
> URL: https://issues.apache.org/jira/browse/KAFKA-14388
> Project: Kafka
>  Issue Type: Bug
>  Components: streams
>Affects Versions: 3.3.0, 3.3.1
>Reporter: Bill Bejeck
>Assignee: Bill Bejeck
>Priority: Major
> Fix For: 3.4.0, 3.3.2
>
>
> Using the new Processor API introduced with KIP-820 when adding a state store 
> to the Processor when executing `context().getStore("store-name")` always 
> returns `null` as the store is not in the `stores` `HashMap` in the 
> `ProcessorStateManager`.  This occurs even when using the 
> `ConnectedStoreProvider.stores()` method
> I've confirmed the store is associated with the processor by viewing the 
> `Topology` description.   
> From some initial triage, it looks like the store is never registered.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[GitHub] [kafka] bbejeck commented on pull request #12861: KAFKA-14388 - Fixes the NPE when using the new Processor API with the DSL

2022-11-16 Thread GitBox


bbejeck commented on PR #12861:
URL: https://github.com/apache/kafka/pull/12861#issuecomment-1317781612

   Cherry-picked to 3.3


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[jira] [Commented] (KAFKA-14388) NPE When Retrieving StateStore with new Processor API

2022-11-16 Thread Bill Bejeck (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-14388?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17635035#comment-17635035
 ] 

Bill Bejeck commented on KAFKA-14388:
-

Cherry-picked to 3.3

> NPE When Retrieving StateStore with new Processor API
> -
>
> Key: KAFKA-14388
> URL: https://issues.apache.org/jira/browse/KAFKA-14388
> Project: Kafka
>  Issue Type: Bug
>Affects Versions: 3.3.0, 3.3.1
>Reporter: Bill Bejeck
>Assignee: Bill Bejeck
>Priority: Major
> Fix For: 3.4.0, 3.3.2
>
>
> Using the new Processor API introduced with KIP-820 when adding a state store 
> to the Processor when executing `context().getStore("store-name")` always 
> returns `null` as the store is not in the `stores` `HashMap` in the 
> `ProcessorStateManager`.  This occurs even when using the 
> `ConnectedStoreProvider.stores()` method
> I've confirmed the store is associated with the processor by viewing the 
> `Topology` description.   
> From some initial triage, it looks like the store is never registered.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Updated] (KAFKA-14396) Flaky memory leak tests rely on System.gc for correctness

2022-11-16 Thread Greg Harris (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-14396?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Greg Harris updated KAFKA-14396:

Labels: flaky-test  (was: )

> Flaky memory leak tests rely on System.gc for correctness
> -
>
> Key: KAFKA-14396
> URL: https://issues.apache.org/jira/browse/KAFKA-14396
> Project: Kafka
>  Issue Type: Test
>Reporter: Greg Harris
>Priority: Minor
>  Labels: flaky-test
>
> There are a few tests which currently call System.gc to help verify that code 
> running during a test does not leak memory. These tests are:
> * 
> org.apache.kafka.common.memory.GarbageCollectedMemoryPoolTest#testBuffersGarbageCollected
> * 
> org.apache.kafka.common.record.MemoryRecordsBuilderTest#testBuffersDereferencedOnClose
> * 
> org.apache.kafka.streams.state.internals.ThreadCacheTest#cacheOverheadsSmallValues
> * 
> org.apache.kafka.streams.state.internals.ThreadCacheTest#cacheOverheadsLargeValues
> Unfortunately the System.gc call is only an advisory call to the JVM, as 
> documented:
> > When control returns from the method call, the Java Virtual Machine has 
> > made a best effort to reclaim space from all discarded objects.
> This means that the System.gc call may not have performed any garbage 
> collection at all, and so tests which expect garbage collection to have 
> happened will not always succeed. For example, a no-op is an implementation 
> of the System.gc method which would fulfill the method contract.
> To reproduce this class of failures:
> 1. Comment out the System.gc calls
> 2. Run the test
> We should try to find an alternative method of verifying that these 
> components do not have memory leaks that does not rely on the 
> implementation-specific behavior of the containing JVM runtime. For example, 
> verifying that buffers have been closed may be a proxy for the literal memory 
> references being released and garbage collected.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Created] (KAFKA-14396) Flaky memory leak tests rely on System.gc for correctness

2022-11-16 Thread Greg Harris (Jira)
Greg Harris created KAFKA-14396:
---

 Summary: Flaky memory leak tests rely on System.gc for correctness
 Key: KAFKA-14396
 URL: https://issues.apache.org/jira/browse/KAFKA-14396
 Project: Kafka
  Issue Type: Test
Reporter: Greg Harris


There are a few tests which currently call System.gc to help verify that code 
running during a test does not leak memory. These tests are:

* 
org.apache.kafka.common.memory.GarbageCollectedMemoryPoolTest#testBuffersGarbageCollected
* 
org.apache.kafka.common.record.MemoryRecordsBuilderTest#testBuffersDereferencedOnClose
* 
org.apache.kafka.streams.state.internals.ThreadCacheTest#cacheOverheadsSmallValues
* 
org.apache.kafka.streams.state.internals.ThreadCacheTest#cacheOverheadsLargeValues

Unfortunately the System.gc call is only an advisory call to the JVM, as 
documented:

> When control returns from the method call, the Java Virtual Machine has made 
> a best effort to reclaim space from all discarded objects.

This means that the System.gc call may not have performed any garbage 
collection at all, and so tests which expect garbage collection to have 
happened will not always succeed. For example, a no-op is an implementation of 
the System.gc method which would fulfill the method contract.

To reproduce this class of failures:
1. Comment out the System.gc calls
2. Run the test

We should try to find an alternative method of verifying that these components 
do not have memory leaks that does not rely on the implementation-specific 
behavior of the containing JVM runtime. For example, verifying that buffers 
have been closed may be a proxy for the literal memory references being 
released and garbage collected.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Updated] (KAFKA-14386) Change ReplicaPlacer place method to return a class instead of list of list of integers

2022-11-16 Thread Andrew Grant (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-14386?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Andrew Grant updated KAFKA-14386:
-
Description: 
In KRaft mode, a new interface was introduced, 
[ReplicaPlacer|https://github.com/apache/kafka/blob/trunk/metadata/src/main/java/org/apache/kafka/metadata/placement/ReplicaPlacer.java],
 that is used by ReplicationControlManager to create partition assignments 
during both CreateTopics and CreatePartitions RPCs. Right now it has one 
implementation, StripedReplicaPlacer.

Currently, it has a method called place that returns a list of list of integers:
{code:java}
List> place(
PlacementSpec placement,
ClusterDescriber cluster
) throws InvalidReplicationFactorException;{code}
The index corresponds to the partition ID and the integers are the replicas of 
the assignment. The suggestion is to update the interface so that it models 
topic and partitions more explicitly. I'm thinking something like:
{code:java}
TopicAssignment place(
PlacementSpec placement,
ClusterDescriber cluster
) throws InvalidReplicationFactorException;{code}
where we have
{code:java}
public class TopicAssignment {
private List assignments;

public TopicAssignment(List assignments) {
 this.assignments = assignments;
}

public List assignments() {
 return assignments;
}
}{code}
and
{code:java}
public class PartitionAssignment {
private List replicas;

public PartitionAssignment(List replicas) {
 this.replicas = replicas;
}

public List replicas() {
 return replicas;
}
}{code}
There are two reasons for the suggestion. First, as mentioned above, it will 
make the interface, arguably, a bit more readable and understandable by 
explicitly modeling the idea of topic and partition. Second and more 
importantly, it makes the interface more extendable in the future. Right now it 
would be challenging to add more metadata to the response. By having classes, 
we can easily add fields to them without breaking/changing the interface. For 
example, in the CreatePartitions RPC we are adding partitions to an existing 
topic and we might want to add some metadata to response making it clear which 
partition the assignment starts at which could look something like:
public class TopicAssignment {private List assignments;
private Integer firstPartitionId;public 
TopicAssignment(List assignments, Integer 
firstPartitionId) { this.assignments = assignments;
 this.firstPartitionId = firstPartitionId;
}public List assignments() { return 
assignments;
}
...
}
 

Curious to hear other folks thoughts on this.

  was:
In KRaft mode, a new interface was introduced, 
[ReplicaPlacer|https://github.com/apache/kafka/blob/trunk/metadata/src/main/java/org/apache/kafka/metadata/placement/ReplicaPlacer.java],
 that is used by ReplicationControlManager to create partition assignments 
during both CreateTopics and CreatePartitions RPCs. Right now it has one 
implementation, StripedReplicaPlacer.

Currently, it has a method called place that returns a list of list of integers:
{code:java}
List> place(
PlacementSpec placement,
ClusterDescriber cluster
) throws InvalidReplicationFactorException;{code}
The index corresponds to the partition ID and the integers are the replicas of 
the assignment. The suggestion is to update the interface so that it models 
topic and partitions more explicitly. I'm thinking something like:
{code:java}
TopicAssignment place(
PlacementSpec placement,
ClusterDescriber cluster
) throws InvalidReplicationFactorException;{code}
where we have
{code:java}
public class TopicAssignment {
private List assignments;

public TopicAssignment(List assignments) {
 this.assignments = assignments;
}

public List assignments() {
 return assignments;
}
}{code}
and
{code:java}
public class PartitionAssignment {
private List replicas;

public PartitionAssignment(List replicas) {
 this.replicas = replicas;
}

public List replicas() {
 return replicas;
}
}{code}
There are two reasons for the suggestion. First, as mentioned above, it will 
make the interface, arguably, a bit more readable and understandable by 
explicitly modeling the idea of topic and partition. Second and more 
importantly, it makes the interface more extendable in the future. Right now it 
would be challenging to add more metadata to the response. By having classes, 
we can easily add fields to them without breaking/changing the interface.

Curious to hear other folks thoughts on this.


> Change ReplicaPlacer place method to return a class instead of list of list 
> of integers
> ---
>
> Key: KAFKA-14386
> URL: https://issues.apache.org/jira/browse/KAFKA-14386
> Project: Kafka
>  Issue Type: Improvement
> 

[jira] [Resolved] (KAFKA-14388) NPE When Retrieving StateStore with new Processor API

2022-11-16 Thread Bill Bejeck (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-14388?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Bill Bejeck resolved KAFKA-14388.
-
Resolution: Fixed

Merged PR to trunk

> NPE When Retrieving StateStore with new Processor API
> -
>
> Key: KAFKA-14388
> URL: https://issues.apache.org/jira/browse/KAFKA-14388
> Project: Kafka
>  Issue Type: Bug
>Affects Versions: 3.3.0, 3.3.1
>Reporter: Bill Bejeck
>Assignee: Bill Bejeck
>Priority: Major
> Fix For: 3.4.0, 3.3.2
>
>
> Using the new Processor API introduced with KIP-820 when adding a state store 
> to the Processor when executing `context().getStore("store-name")` always 
> returns `null` as the store is not in the `stores` `HashMap` in the 
> `ProcessorStateManager`.  This occurs even when using the 
> `ConnectedStoreProvider.stores()` method
> I've confirmed the store is associated with the processor by viewing the 
> `Topology` description.   
> From some initial triage, it looks like the store is never registered.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[GitHub] [kafka] bbejeck commented on pull request #12861: KAFKA-14388 - Fixes the NPE when using the new Processor API with the DSL

2022-11-16 Thread GitBox


bbejeck commented on PR #12861:
URL: https://github.com/apache/kafka/pull/12861#issuecomment-1317733576

   Merged #12861 into trunk


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] bbejeck merged pull request #12861: KAFKA-14388 - Fixes the NPE when using the new Processor API with the DSL

2022-11-16 Thread GitBox


bbejeck merged PR #12861:
URL: https://github.com/apache/kafka/pull/12861


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] bbejeck commented on pull request #12861: KAFKA-14388 - Fixes the NPE when using the new Processor API with the DSL

2022-11-16 Thread GitBox


bbejeck commented on PR #12861:
URL: https://github.com/apache/kafka/pull/12861#issuecomment-1317730593

   Failures unrelated


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] gharris1727 commented on pull request #12826: Using Timer class to track expiry in IncrementalCooperativeAssignor

2022-11-16 Thread GitBox


gharris1727 commented on PR #12826:
URL: https://github.com/apache/kafka/pull/12826#issuecomment-1317721387

   @vamossagar12 Thanks for the further explanation.
   
   As proposed, I don't think this refactor improves readability and 
maintainability more than it costs to implement, review, and merge.
   I think there are other potential changes that deserve your valuable 
attention more, and that we can leave this `long` as-is for a little longer.
   
   Thanks!


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] gharris1727 commented on a diff in pull request #12846: KAFKA-14293: Basic Auth filter should set the SecurityContext after a…

2022-11-16 Thread GitBox


gharris1727 commented on code in PR #12846:
URL: https://github.com/apache/kafka/pull/12846#discussion_r1024519136


##
connect/basic-auth-extension/src/main/java/org/apache/kafka/connect/rest/basic/auth/extension/JaasBasicAuthFilter.java:
##
@@ -174,4 +153,84 @@ public void handle(Callback[] callbacks) throws 
UnsupportedCallbackException {
 ));
 }
 }
+
+private void setSecurityContextForRequest(ContainerRequestContext 
requestContext, BasicAuthenticationCredential credential) {
+requestContext.setSecurityContext(new SecurityContext() {
+@Override
+public Principal getUserPrincipal() {
+return () -> credential.getUsername();
+}
+
+@Override
+public boolean isUserInRole(String role) {
+return false;
+}
+
+@Override
+public boolean isSecure() {
+return 
requestContext.getUriInfo().getRequestUri().getScheme().equalsIgnoreCase("https");
+}
+
+@Override
+public String getAuthenticationScheme() {
+return BASIC_AUTH;
+}
+});
+}
+
+
+public static class BasicAuthenticationCredential {
+private String username;
+private String password;
+
+public BasicAuthenticationCredential(String authorizationHeader) {
+final char colon = ':';
+final char space = ' ';
+final String authType = "basic";
+
+initializeEmptyCredentials();
+
+if (authorizationHeader == null) {
+log.trace("No credentials were provided with the request");
+return;
+}
+
+int spaceIndex = authorizationHeader.indexOf(space);
+if (spaceIndex <= 0) {
+log.trace("Request credentials were malformed; no space 
present in value for authorization header");
+return;
+}
+
+String method = authorizationHeader.substring(0, spaceIndex);
+if (!authType.equalsIgnoreCase(method)) {
+log.trace("Request credentials used {} authentication, but 
only {} supported; ignoring", method, authType);
+return;
+}
+
+authorizationHeader = authorizationHeader.substring(spaceIndex + 
1);
+authorizationHeader = new 
String(Base64.getDecoder().decode(authorizationHeader),
+StandardCharsets.UTF_8);
+int i = authorizationHeader.indexOf(colon);
+if (i <= 0) {
+log.trace("Request credentials were malformed; no colon 
present between username and password");
+return;
+}
+
+this.username = authorizationHeader.substring(0, i);
+this.password = authorizationHeader.substring(i + 1);
+}
+
+private void initializeEmptyCredentials() {
+this.username = "";
+this.password = "";
+}

Review Comment:
   This is different than the previous behavior. Previously, if the 
authorizationHeader was null, the username/password would be null as well. Now 
they are empty string.
   
   Is this an intentional change?



##
connect/basic-auth-extension/src/main/java/org/apache/kafka/connect/rest/basic/auth/extension/JaasBasicAuthFilter.java:
##
@@ -174,4 +153,84 @@ public void handle(Callback[] callbacks) throws 
UnsupportedCallbackException {
 ));
 }
 }
+
+private void setSecurityContextForRequest(ContainerRequestContext 
requestContext, BasicAuthenticationCredential credential) {
+requestContext.setSecurityContext(new SecurityContext() {
+@Override
+public Principal getUserPrincipal() {
+return () -> credential.getUsername();
+}
+
+@Override
+public boolean isUserInRole(String role) {
+return false;
+}
+
+@Override
+public boolean isSecure() {
+return 
requestContext.getUriInfo().getRequestUri().getScheme().equalsIgnoreCase("https");
+}
+
+@Override
+public String getAuthenticationScheme() {
+return BASIC_AUTH;
+}
+});
+}
+
+
+public static class BasicAuthenticationCredential {
+private String username;
+private String password;
+
+public BasicAuthenticationCredential(String authorizationHeader) {
+final char colon = ':';
+final char space = ' ';
+final String authType = "basic";

Review Comment:
   nit: these can remain static constants. Maybe we can even pull from 
SecurityContext.BASIC_AUTH.



##
connect/basic-auth-extension/src/main/java/org/apache/kafka/connect/rest/basic/auth/extension/JaasBasicAuthFilter.java:
##
@@ -174,4 +153,84 @@ 

[jira] [Updated] (KAFKA-14388) NPE When Retrieving StateStore with new Processor API

2022-11-16 Thread Bill Bejeck (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-14388?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Bill Bejeck updated KAFKA-14388:

Fix Version/s: 3.3.2

> NPE When Retrieving StateStore with new Processor API
> -
>
> Key: KAFKA-14388
> URL: https://issues.apache.org/jira/browse/KAFKA-14388
> Project: Kafka
>  Issue Type: Bug
>Affects Versions: 3.3.0, 3.3.1
>Reporter: Bill Bejeck
>Assignee: Bill Bejeck
>Priority: Major
> Fix For: 3.4.0, 3.3.2
>
>
> Using the new Processor API introduced with KIP-820 when adding a state store 
> to the Processor when executing `context().getStore("store-name")` always 
> returns `null` as the store is not in the `stores` `HashMap` in the 
> `ProcessorStateManager`.  This occurs even when using the 
> `ConnectedStoreProvider.stores()` method
> I've confirmed the store is associated with the processor by viewing the 
> `Topology` description.   
> From some initial triage, it looks like the store is never registered.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[GitHub] [kafka] joel-hamill commented on pull request #12868: MINOR: fix syntax typo

2022-11-16 Thread GitBox


joel-hamill commented on PR #12868:
URL: https://github.com/apache/kafka/pull/12868#issuecomment-1317664755

   cc @guozhangwang 


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] joel-hamill opened a new pull request, #12868: MINOR: fix syntax typo

2022-11-16 Thread GitBox


joel-hamill opened a new pull request, #12868:
URL: https://github.com/apache/kafka/pull/12868

   *More detailed description of your change,
   if necessary. The PR title and PR message become
   the squashed commit message, so use a separate
   comment to ping reviewers.*
   
   Replaces ``` with `'`
   
   *Summary of testing strategy (including rationale)
   for the feature or bug fix. Unit and/or integration
   tests are expected for any behaviour change and
   system tests should be considered for larger changes.*
   
   ### Committer Checklist (excluded from commit message)
   - [ ] Verify design and implementation 
   - [ ] Verify test coverage and CI build status
   - [ ] Verify documentation (including upgrade notes)
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] hachikuji merged pull request #12864: MINOR: Handle JoinGroupResponseData.protocolName backward compatibility in JoinGroupResponse

2022-11-16 Thread GitBox


hachikuji merged PR #12864:
URL: https://github.com/apache/kafka/pull/12864


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] pprovenzano commented on a diff in pull request #12843: KAFKA-14375: Remove use of "authorizer-properties" from EndToEndAuthorizerTest

2022-11-16 Thread GitBox


pprovenzano commented on code in PR #12843:
URL: https://github.com/apache/kafka/pull/12843#discussion_r1024471383


##
core/src/test/scala/integration/kafka/api/SslEndToEndAuthorizationTest.scala:
##
@@ -37,8 +37,8 @@ object SslEndToEndAuthorizationTest {
 override def build(context: AuthenticationContext): KafkaPrincipal = {
   val peerPrincipal = 
context.asInstanceOf[SslAuthenticationContext].session.getPeerPrincipal.getName
   peerPrincipal match {
-case Pattern(name, _) =>
-  val principal = if (name == "server") name else peerPrincipal
+case Pattern(name, extra) =>

Review Comment:
   Done



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] pprovenzano commented on a diff in pull request #12843: KAFKA-14375: Remove use of "authorizer-properties" from EndToEndAuthorizerTest

2022-11-16 Thread GitBox


pprovenzano commented on code in PR #12843:
URL: https://github.com/apache/kafka/pull/12843#discussion_r1024471192


##
core/src/test/scala/integration/kafka/api/EndToEndAuthorizationTest.scala:
##
@@ -96,96 +90,52 @@ abstract class EndToEndAuthorizationTest extends 
IntegrationTestHarness with Sas
   def clientPrincipal: KafkaPrincipal
   def kafkaPrincipal: KafkaPrincipal
 
-  // Arguments to AclCommand to set ACLs.
-  def clusterActionArgs: Array[String] = Array("--authorizer-properties",
-  s"zookeeper.connect=$zkConnect",
-  s"--add",
-  s"--cluster",
-  s"--operation=ClusterAction",
-  s"--allow-principal=$kafkaPrincipal")
-  // necessary to create SCRAM credentials via the admin client using the 
broker's credentials
-  // without this we would need to create the SCRAM credentials via ZooKeeper
-  def clusterAlterArgs: Array[String] = Array("--authorizer-properties",
-  s"zookeeper.connect=$zkConnect",
-  s"--add",
-  s"--cluster",
-  s"--operation=Alter",
-  s"--allow-principal=$kafkaPrincipal")
-  def topicBrokerReadAclArgs: Array[String] = Array("--authorizer-properties",
-  s"zookeeper.connect=$zkConnect",
-  s"--add",
-  s"--topic=$wildcard",
-  s"--operation=Read",
-  s"--allow-principal=$kafkaPrincipal")
-  def produceAclArgs(topic: String): Array[String] = 
Array("--authorizer-properties",
-  s"zookeeper.connect=$zkConnect",
-  s"--add",
-  s"--topic=$topic",
-  s"--producer",
-  
s"--allow-principal=$clientPrincipal")
-  def describeAclArgs: Array[String] = Array("--authorizer-properties",
-  s"zookeeper.connect=$zkConnect",
-  s"--add",
-  s"--topic=$topic",
-  s"--operation=Describe",
-  
s"--allow-principal=$clientPrincipal")
-  def deleteDescribeAclArgs: Array[String] = Array("--authorizer-properties",
-  s"zookeeper.connect=$zkConnect",
-  s"--remove",
-  s"--force",
-  s"--topic=$topic",
-  s"--operation=Describe",
-  
s"--allow-principal=$clientPrincipal")
-  def deleteWriteAclArgs: Array[String] = Array("--authorizer-properties",
-  s"zookeeper.connect=$zkConnect",
-  s"--remove",
-  s"--force",
-  s"--topic=$topic",
-  s"--operation=Write",
-  
s"--allow-principal=$clientPrincipal")
-  def consumeAclArgs(topic: String): Array[String] = 
Array("--authorizer-properties",
-  s"zookeeper.connect=$zkConnect",
-  s"--add",
-  s"--topic=$topic",
-  s"--group=$group",
-  s"--consumer",
-  
s"--allow-principal=$clientPrincipal")
-  def groupAclArgs: Array[String] = Array("--authorizer-properties",
-  s"zookeeper.connect=$zkConnect",
-  s"--add",
-  s"--group=$group",
-  s"--operation=Read",
-  
s"--allow-principal=$clientPrincipal")
-  def produceConsumeWildcardAclArgs: Array[String] = 
Array("--authorizer-properties",
-  s"zookeeper.connect=$zkConnect",
-  s"--add",
-  s"--topic=$wildcard",
-  s"--group=$wildcard",
-  s"--consumer",
-  s"--producer",
-  

[GitHub] [kafka] gharris1727 commented on a diff in pull request #12828: KAFKA-14346: Remove hard-to-mock RestClient calls

2022-11-16 Thread GitBox


gharris1727 commented on code in PR #12828:
URL: https://github.com/apache/kafka/pull/12828#discussion_r1024441937


##
connect/runtime/src/main/java/org/apache/kafka/connect/runtime/rest/RestClient.java:
##
@@ -43,10 +43,38 @@
 import java.util.concurrent.ExecutionException;
 import java.util.concurrent.TimeoutException;
 
-public class RestClient {
+public class RestClient implements AutoCloseable {
 private static final Logger log = 
LoggerFactory.getLogger(RestClient.class);
 private static final ObjectMapper JSON_SERDE = new ObjectMapper();
 
+private final HttpClient client;
+public RestClient(WorkerConfig config) {
+client = new 
HttpClient(SSLUtils.createClientSideSslContextFactory(config));

Review Comment:
   It's even better now: It actually works!
   
   Also it tests all phases of the roll from a non-SSL to SSL cluster.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] jolshan commented on a diff in pull request #12847: KAFKA-14367; Add `SyncGroup` to the new `GroupCoordinator` interface

2022-11-16 Thread GitBox


jolshan commented on code in PR #12847:
URL: https://github.com/apache/kafka/pull/12847#discussion_r1024435836


##
core/src/test/scala/unit/kafka/server/KafkaApisTest.scala:
##
@@ -2690,110 +2689,150 @@ class KafkaApisTest {
 assertEquals(Errors.GROUP_AUTHORIZATION_FAILED, response.error)
   }
 
-  @Test
-  def testSyncGroupProtocolTypeAndName(): Unit = {
-for (version <- ApiKeys.SYNC_GROUP.oldestVersion to 
ApiKeys.SYNC_GROUP.latestVersion) {
-  testSyncGroupProtocolTypeAndName(version.asInstanceOf[Short])
-}
-  }
+  @ParameterizedTest
+  @ApiKeyVersionsSource(apiKey = ApiKeys.SYNC_GROUP)
+  def testHandleSyncGroupRequest(version: Short): Unit = {
+val syncGroupRequest = new SyncGroupRequestData()
+  .setGroupId("group")
+  .setMemberId("member")
+  .setProtocolType("consumer")
+  .setProtocolName("range")
 
-  def testSyncGroupProtocolTypeAndName(version: Short): Unit = {
-reset(groupCoordinator, clientRequestQuotaManager, requestChannel, 
replicaManager)
+val requestChannelRequest = buildRequest(new 
SyncGroupRequest.Builder(syncGroupRequest).build(version))
 
-val groupId = "group"
-val memberId = "member1"
-val protocolType = "consumer"
-val protocolName = "range"
+val expectedRequestContext = new GroupCoordinatorRequestContext(
+  version,
+  requestChannelRequest.context.clientId,
+  requestChannelRequest.context.clientAddress,
+  RequestLocal.NoCaching.bufferSupplier
+)
 
-val capturedCallback: ArgumentCaptor[SyncGroupCallback] = 
ArgumentCaptor.forClass(classOf[SyncGroupCallback])
+val expectedSyncGroupRequest = new SyncGroupRequestData()
+  .setGroupId("group")
+  .setMemberId("member")
+  .setProtocolType(if (version >= 5) "consumer" else null)
+  .setProtocolName(if (version >= 5) "range" else null)
 
-val requestLocal = RequestLocal.withThreadConfinedCaching
-val syncGroupRequest = new SyncGroupRequest.Builder(
-  new SyncGroupRequestData()
-.setGroupId(groupId)
-.setGenerationId(0)
-.setMemberId(memberId)
-.setProtocolType(protocolType)
-.setProtocolName(protocolName)
-).build(version)
+val future = new CompletableFuture[SyncGroupResponseData]()
+when(newGroupCoordinator.syncGroup(
+  ArgumentMatchers.eq(expectedRequestContext),
+  ArgumentMatchers.eq(expectedSyncGroupRequest)
+)).thenReturn(future)
 
-val requestChannelRequest = buildRequest(syncGroupRequest)
+createKafkaApis().handleSyncGroupRequest(
+  requestChannelRequest,
+  RequestLocal.NoCaching
+)
 
-createKafkaApis().handleSyncGroupRequest(requestChannelRequest, 
requestLocal)
+val expectedSyncGroupResponse = new SyncGroupResponseData()
+  .setProtocolType("consumer")
+  .setProtocolName("range")
 
-verify(groupCoordinator).handleSyncGroup(
-  ArgumentMatchers.eq(groupId),
-  ArgumentMatchers.eq(0),
-  ArgumentMatchers.eq(memberId),
-  ArgumentMatchers.eq(if (version >= 5) Some(protocolType) else None),
-  ArgumentMatchers.eq(if (version >= 5) Some(protocolName) else None),
-  ArgumentMatchers.eq(None),
-  ArgumentMatchers.eq(Map.empty),
-  capturedCallback.capture(),
-  ArgumentMatchers.eq(requestLocal)
+future.complete(expectedSyncGroupResponse)
+val capturedResponse = verifyNoThrottling(requestChannelRequest)
+val response = capturedResponse.getValue.asInstanceOf[SyncGroupResponse]
+assertEquals(expectedSyncGroupResponse, response.data)
+  }
+
+  @Test
+  def testHandleSyncGroupRequestFutureFailed(): Unit = {

Review Comment:
   Confirming my understanding, we added 
`testHandleSyncGroupRequestFutureFailed` and 
`testHandleSyncGroupRequestAuthenticationFailed`?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] mjsax commented on pull request #12867: KAFKA-13785: docs for emit final

2022-11-16 Thread GitBox


mjsax commented on PR #12867:
URL: https://github.com/apache/kafka/pull/12867#issuecomment-1317561590

   Thanks for the PR. Merged to `trunk`.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] mjsax merged pull request #12867: KAFKA-13785: docs for emit final

2022-11-16 Thread GitBox


mjsax merged PR #12867:
URL: https://github.com/apache/kafka/pull/12867


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] mjsax commented on a diff in pull request #12867: KAFKA-13785: docs for emit final

2022-11-16 Thread GitBox


mjsax commented on code in PR #12867:
URL: https://github.com/apache/kafka/pull/12867#discussion_r1024420977


##
streams/src/main/java/org/apache/kafka/streams/kstream/SessionWindowedKStream.java:
##
@@ -644,6 +644,16 @@ KTable, V> reduce(final Reducer reducer,
   final Named named,
   final Materialized> materialized);
 
-// TODO: add javadoc
+/**
+ * Configure when the aggregated result will be emitted for {@code 
SessionWindowedKStream}.
+ * 
+ * For example, for {@link EmitStrategy#onWindowClose} strategy, the 
aggregated result for a
+ * window will only be emitted when the window closes. For {@link 
EmitStrategy#onWindowUpdate()}
+ * strategy, the aggregated result for a window will be emitted whenever 
there is an update to

Review Comment:
   > emitted whenever there is an update
   
   Should we mention caching in addition?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[jira] [Assigned] (KAFKA-10532) Do not wipe state store under EOS when closing-dirty a RESTORING active or RUNNING standby task

2022-11-16 Thread Shay Lin (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-10532?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Shay Lin reassigned KAFKA-10532:


Assignee: Shay Lin

> Do not wipe state store under EOS when closing-dirty a RESTORING active or 
> RUNNING standby task
> ---
>
> Key: KAFKA-10532
> URL: https://issues.apache.org/jira/browse/KAFKA-10532
> Project: Kafka
>  Issue Type: Improvement
>  Components: streams
>Reporter: Guozhang Wang
>Assignee: Shay Lin
>Priority: Major
>  Labels: new-streams-runtime-should-fix, newbie++
>
> Today whenever we are closing-dirty a task, we always wipe out the state 
> stores if we are under EOS. But when the closing task was a RESTORING active, 
> or a RUNNING standby, we may actually not need to wipe out the stores since 
> we know that upon resuming, we would still continue restoring the task before 
> transit to processing ever (assuming the LEO offset would not be truncated), 
> i.e. when they resumes it does not matter if the same records gets applied 
> twice during the continued restoration.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Comment Edited] (KAFKA-10532) Do not wipe state store under EOS when closing-dirty a RESTORING active or RUNNING standby task

2022-11-16 Thread Shay Lin (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-10532?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17634982#comment-17634982
 ] 

Shay Lin edited comment on KAFKA-10532 at 11/16/22 7:16 PM:


Hi [~guozhang] I'm new to KS contribution and would like to take a stab at this 
issue. Could you give me some pointers to the "closing task" in context?


was (Author: JIRAUSER285919):
Hi [~guozhang] I'm new to KS contribution and would like to take a stab at this 
issue. Could you give me some pointers to the "closing task" in context?

 

 

> Do not wipe state store under EOS when closing-dirty a RESTORING active or 
> RUNNING standby task
> ---
>
> Key: KAFKA-10532
> URL: https://issues.apache.org/jira/browse/KAFKA-10532
> Project: Kafka
>  Issue Type: Improvement
>  Components: streams
>Reporter: Guozhang Wang
>Priority: Major
>  Labels: new-streams-runtime-should-fix, newbie++
>
> Today whenever we are closing-dirty a task, we always wipe out the state 
> stores if we are under EOS. But when the closing task was a RESTORING active, 
> or a RUNNING standby, we may actually not need to wipe out the stores since 
> we know that upon resuming, we would still continue restoring the task before 
> transit to processing ever (assuming the LEO offset would not be truncated), 
> i.e. when they resumes it does not matter if the same records gets applied 
> twice during the continued restoration.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Commented] (KAFKA-10532) Do not wipe state store under EOS when closing-dirty a RESTORING active or RUNNING standby task

2022-11-16 Thread Shay Lin (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-10532?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17634982#comment-17634982
 ] 

Shay Lin commented on KAFKA-10532:
--

Hi [~guozhang] I'm new to KS contribution and would like to take a stab at this 
issue. Could you give me some pointers to the "closing task" in context?

 

 

> Do not wipe state store under EOS when closing-dirty a RESTORING active or 
> RUNNING standby task
> ---
>
> Key: KAFKA-10532
> URL: https://issues.apache.org/jira/browse/KAFKA-10532
> Project: Kafka
>  Issue Type: Improvement
>  Components: streams
>Reporter: Guozhang Wang
>Priority: Major
>  Labels: new-streams-runtime-should-fix, newbie++
>
> Today whenever we are closing-dirty a task, we always wipe out the state 
> stores if we are under EOS. But when the closing task was a RESTORING active, 
> or a RUNNING standby, we may actually not need to wipe out the stores since 
> we know that upon resuming, we would still continue restoring the task before 
> transit to processing ever (assuming the LEO offset would not be truncated), 
> i.e. when they resumes it does not matter if the same records gets applied 
> twice during the continued restoration.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Assigned] (KAFKA-14395) Add config to configure client supplier for KafkaStreams

2022-11-16 Thread Hao Li (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-14395?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Hao Li reassigned KAFKA-14395:
--

Assignee: Hao Li

> Add config to configure client supplier for KafkaStreams
> 
>
> Key: KAFKA-14395
> URL: https://issues.apache.org/jira/browse/KAFKA-14395
> Project: Kafka
>  Issue Type: Improvement
>  Components: streams
>Reporter: Hao Li
>Assignee: Hao Li
>Priority: Major
>  Labels: kip
>
> For KIP: 
> https://cwiki.apache.org/confluence/display/KAFKA/KIP-884%3A+Add+config+to+configure+KafkaClientSupplier+in+Kafka+Streams



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Resolved] (KAFKA-13785) Support emit final result for windowed aggregation

2022-11-16 Thread Hao Li (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-13785?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Hao Li resolved KAFKA-13785.

Resolution: Done

> Support emit final result for windowed aggregation
> --
>
> Key: KAFKA-13785
> URL: https://issues.apache.org/jira/browse/KAFKA-13785
> Project: Kafka
>  Issue Type: Improvement
>  Components: streams
>Reporter: Hao Li
>Assignee: Hao Li
>Priority: Major
>  Labels: kip
>
> For KIP-825: 
> https://cwiki.apache.org/confluence/display/KAFKA/KIP-825%3A+introduce+a+new+API+to+control+when+aggregated+results+are+produced



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Closed] (KAFKA-13785) Support emit final result for windowed aggregation

2022-11-16 Thread Hao Li (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-13785?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Hao Li closed KAFKA-13785.
--

> Support emit final result for windowed aggregation
> --
>
> Key: KAFKA-13785
> URL: https://issues.apache.org/jira/browse/KAFKA-13785
> Project: Kafka
>  Issue Type: Improvement
>  Components: streams
>Reporter: Hao Li
>Assignee: Hao Li
>Priority: Major
>  Labels: kip
>
> For KIP-825: 
> https://cwiki.apache.org/confluence/display/KAFKA/KIP-825%3A+introduce+a+new+API+to+control+when+aggregated+results+are+produced



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[GitHub] [kafka] lihaosky opened a new pull request, #12867: KAFKA-13785: docs for emit final

2022-11-16 Thread GitBox


lihaosky opened a new pull request, #12867:
URL: https://github.com/apache/kafka/pull/12867

   Add Java docs for emit final
   
   ### Committer Checklist (excluded from commit message)
   - [ ] Verify design and implementation 
   - [ ] Verify test coverage and CI build status
   - [ ] Verify documentation (including upgrade notes)
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] jolshan commented on a diff in pull request #12847: KAFKA-14367; Add `SyncGroup` to the new `GroupCoordinator` interface

2022-11-16 Thread GitBox


jolshan commented on code in PR #12847:
URL: https://github.com/apache/kafka/pull/12847#discussion_r1024400644


##
core/src/test/scala/unit/kafka/coordinator/group/GroupCoordinatorAdapterTest.scala:
##
@@ -135,4 +133,71 @@ class GroupCoordinatorAdapterTest {
 assertEquals(expectedData, future.get())
   }
 
+  @ParameterizedTest
+  @ApiKeyVersionsSource(apiKey = ApiKeys.SYNC_GROUP)
+  def testSyncGroup(version: Short): Unit = {

Review Comment:
   in the JoinGroup PR we suggested renaming, so maybe something similar can be 
done here.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] jolshan commented on a diff in pull request #12847: KAFKA-14367; Add `SyncGroup` to the new `GroupCoordinator` interface

2022-11-16 Thread GitBox


jolshan commented on code in PR #12847:
URL: https://github.com/apache/kafka/pull/12847#discussion_r1024397613


##
core/src/test/scala/unit/kafka/coordinator/group/GroupCoordinatorAdapterTest.scala:
##
@@ -16,17 +16,15 @@
  */
 package kafka.coordinator.group
 
-import 
kafka.coordinator.group.GroupCoordinatorConcurrencyTest.JoinGroupCallback
+import 
kafka.coordinator.group.GroupCoordinatorConcurrencyTest.{JoinGroupCallback, 
SyncGroupCallback}
 import kafka.server.RequestLocal
-

Review Comment:
   nit: more of these  



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] omkreddy commented on a diff in pull request #12843: KAFKA-14375: Remove use of "authorizer-properties" from EndToEndAuthorizerTest

2022-11-16 Thread GitBox


omkreddy commented on code in PR #12843:
URL: https://github.com/apache/kafka/pull/12843#discussion_r1024315000


##
core/src/test/scala/integration/kafka/api/EndToEndAuthorizationTest.scala:
##
@@ -96,96 +90,52 @@ abstract class EndToEndAuthorizationTest extends 
IntegrationTestHarness with Sas
   def clientPrincipal: KafkaPrincipal
   def kafkaPrincipal: KafkaPrincipal
 
-  // Arguments to AclCommand to set ACLs.
-  def clusterActionArgs: Array[String] = Array("--authorizer-properties",
-  s"zookeeper.connect=$zkConnect",
-  s"--add",
-  s"--cluster",
-  s"--operation=ClusterAction",
-  s"--allow-principal=$kafkaPrincipal")
-  // necessary to create SCRAM credentials via the admin client using the 
broker's credentials
-  // without this we would need to create the SCRAM credentials via ZooKeeper
-  def clusterAlterArgs: Array[String] = Array("--authorizer-properties",
-  s"zookeeper.connect=$zkConnect",
-  s"--add",
-  s"--cluster",
-  s"--operation=Alter",
-  s"--allow-principal=$kafkaPrincipal")
-  def topicBrokerReadAclArgs: Array[String] = Array("--authorizer-properties",
-  s"zookeeper.connect=$zkConnect",
-  s"--add",
-  s"--topic=$wildcard",
-  s"--operation=Read",
-  s"--allow-principal=$kafkaPrincipal")
-  def produceAclArgs(topic: String): Array[String] = 
Array("--authorizer-properties",
-  s"zookeeper.connect=$zkConnect",
-  s"--add",
-  s"--topic=$topic",
-  s"--producer",
-  
s"--allow-principal=$clientPrincipal")
-  def describeAclArgs: Array[String] = Array("--authorizer-properties",
-  s"zookeeper.connect=$zkConnect",
-  s"--add",
-  s"--topic=$topic",
-  s"--operation=Describe",
-  
s"--allow-principal=$clientPrincipal")
-  def deleteDescribeAclArgs: Array[String] = Array("--authorizer-properties",
-  s"zookeeper.connect=$zkConnect",
-  s"--remove",
-  s"--force",
-  s"--topic=$topic",
-  s"--operation=Describe",
-  
s"--allow-principal=$clientPrincipal")
-  def deleteWriteAclArgs: Array[String] = Array("--authorizer-properties",
-  s"zookeeper.connect=$zkConnect",
-  s"--remove",
-  s"--force",
-  s"--topic=$topic",
-  s"--operation=Write",
-  
s"--allow-principal=$clientPrincipal")
-  def consumeAclArgs(topic: String): Array[String] = 
Array("--authorizer-properties",
-  s"zookeeper.connect=$zkConnect",
-  s"--add",
-  s"--topic=$topic",
-  s"--group=$group",
-  s"--consumer",
-  
s"--allow-principal=$clientPrincipal")
-  def groupAclArgs: Array[String] = Array("--authorizer-properties",
-  s"zookeeper.connect=$zkConnect",
-  s"--add",
-  s"--group=$group",
-  s"--operation=Read",
-  
s"--allow-principal=$clientPrincipal")
-  def produceConsumeWildcardAclArgs: Array[String] = 
Array("--authorizer-properties",
-  s"zookeeper.connect=$zkConnect",
-  s"--add",
-  s"--topic=$wildcard",
-  s"--group=$wildcard",
-  s"--consumer",
-  s"--producer",
- 

[GitHub] [kafka] C0urante commented on a diff in pull request #12828: KAFKA-14346: Remove hard-to-mock RestClient calls

2022-11-16 Thread GitBox


C0urante commented on code in PR #12828:
URL: https://github.com/apache/kafka/pull/12828#discussion_r1024255803


##
connect/runtime/src/main/java/org/apache/kafka/connect/runtime/distributed/DistributedHerder.java:
##
@@ -1906,6 +1917,8 @@ private void reconfigureConnector(final String connName, 
final Callback cb
 if (isLeader()) {
 writeToConfigTopicAsLeader(() -> 
configBackingStore.putTaskConfigs(connName, rawTaskProps));
 cb.onCompletion(null, null);
+} else if (restClient == null) {
+throw new NotLeaderException("Request forwarding disabled 
in distributed MirrorMaker2; reconfiguring tasks must be performed by the 
leader", leaderUrl());

Review Comment:
   Similar wording suggestions for this message:
   ```suggestion
   // TODO: Update this message if KIP-710 is accepted and 
merged
   //   
(https://cwiki.apache.org/confluence/display/KAFKA/KIP-710%3A+Full+support+for+distributed+mode+in+dedicated+MirrorMaker+2.0+clusters)
   throw new NotLeaderException("This worker is not able to 
communicate with the leader of the cluster, "
   + "which is required for 
dynamically-reconfiguring connectors. If running MirrorMaker 2 "
   + "in dedicated mode, consider deploying 
the connectors for MirrorMaker 2 directly onto a "
   + "distributed Kafka Connect cluster.",
   leaderUrl()
   );
   ```



##
connect/runtime/src/main/java/org/apache/kafka/connect/runtime/rest/RestClient.java:
##
@@ -43,10 +43,38 @@
 import java.util.concurrent.ExecutionException;
 import java.util.concurrent.TimeoutException;
 
-public class RestClient {
+public class RestClient implements AutoCloseable {
 private static final Logger log = 
LoggerFactory.getLogger(RestClient.class);
 private static final ObjectMapper JSON_SERDE = new ObjectMapper();
 
+private final HttpClient client;
+public RestClient(WorkerConfig config) {
+client = new 
HttpClient(SSLUtils.createClientSideSslContextFactory(config));

Review Comment:
   This new integration test is fantastic, thanks!



##
connect/runtime/src/main/java/org/apache/kafka/connect/runtime/distributed/DistributedHerder.java:
##
@@ -1157,16 +1163,21 @@ void fenceZombieSourceTasks(final ConnectorTaskId id, 
Callback callback) {
 if (error == null) {
 callback.onCompletion(null, null);
 } else if (error instanceof NotLeaderException) {
-String forwardedUrl = ((NotLeaderException) 
error).forwardUrl() + "connectors/" + id.connector() + "/fence";
-log.trace("Forwarding zombie fencing request for connector {} 
to leader at {}", id.connector(), forwardedUrl);
-forwardRequestExecutor.execute(() -> {
-try {
-RestClient.httpRequest(forwardedUrl, "PUT", null, 
null, null, config, sessionKey, requestSignatureAlgorithm);
-callback.onCompletion(null, null);
-} catch (Throwable t) {
-callback.onCompletion(t, null);
-}
-});
+if (restClient != null) {
+String forwardedUrl = ((NotLeaderException) 
error).forwardUrl() + "connectors/" + id.connector() + "/fence";
+log.trace("Forwarding zombie fencing request for connector 
{} to leader at {}", id.connector(), forwardedUrl);
+forwardRequestExecutor.execute(() -> {
+try {
+restClient.httpRequest(forwardedUrl, "PUT", null, 
null, null, sessionKey, requestSignatureAlgorithm);
+callback.onCompletion(null, null);
+} catch (Throwable t) {
+callback.onCompletion(t, null);
+}
+});
+} else {
+error = ConnectUtils.maybeWrap(error, "Request forwarding 
disabled in distributed MirrorMaker2; fencing zombie source tasks must be 
performed by the leader");

Review Comment:
   We shouldn't use `maybeWrap` here since we know that the exception is a 
`NotLeaderException`, which extends `ConnectException`, so the error message 
will never actually be used.
   
   The message itself is also a little misleading since it implies that it'll 
be possible to bring up source tasks in exactly-once mode on multi-node 
clusters as long as the leader does the fencing, but that's not actually the 
case (it's impossible for a follower to start any source tasks with 
exactly-once enabled if it cannot issue a REST request to the leader's internal 
zombie fencing endpoint). We might consider 

[GitHub] [kafka] akhileshchg commented on a diff in pull request #12860: Add RPC changes, records, and config from KIP-866

2022-11-16 Thread GitBox


akhileshchg commented on code in PR #12860:
URL: https://github.com/apache/kafka/pull/12860#discussion_r1024336463


##
metadata/src/main/java/org/apache/kafka/migration/ZkControllerState.java:
##
@@ -0,0 +1,52 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.migration;
+
+public class ZkControllerState {

Review Comment:
   Can you add a comment explaining this state as well?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[jira] [Created] (KAFKA-14395) Add config to configure client supplier for KafkaStreams

2022-11-16 Thread Hao Li (Jira)
Hao Li created KAFKA-14395:
--

 Summary: Add config to configure client supplier for KafkaStreams
 Key: KAFKA-14395
 URL: https://issues.apache.org/jira/browse/KAFKA-14395
 Project: Kafka
  Issue Type: Improvement
  Components: streams
Reporter: Hao Li


For KIP: 
https://cwiki.apache.org/confluence/display/KAFKA/KIP-884%3A+Add+config+to+configure+KafkaClientSupplier+in+Kafka+Streams



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[GitHub] [kafka] gharris1727 commented on pull request #12839: KAFKA-14346: Replace static mocking of WorkerConfig::lookupKafkaClusterId

2022-11-16 Thread GitBox


gharris1727 commented on PR #12839:
URL: https://github.com/apache/kafka/pull/12839#issuecomment-1317398884

   @mimaison I fixed the non-trivial merge conflicts for the 
KafkaOffsetBackingStoreTest, please re-review that file in particular.
   
   Thanks!


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] pprovenzano commented on a diff in pull request #12843: KAFKA-14375: Remove use of "authorizer-properties" from EndToEndAuthorizerTest

2022-11-16 Thread GitBox


pprovenzano commented on code in PR #12843:
URL: https://github.com/apache/kafka/pull/12843#discussion_r1024275128


##
core/src/test/scala/integration/kafka/api/EndToEndAuthorizationTest.scala:
##
@@ -200,11 +150,23 @@ abstract class EndToEndAuthorizationTest extends 
IntegrationTestHarness with Sas
 */
   @BeforeEach
   override def setUp(testInfo: TestInfo): Unit = {
+
+// The next two configuration parameters enable ZooKeeper secure ACLs
+// and sets the Kafka authorizer, both necessary to enable security.
+this.serverConfig.setProperty(KafkaConfig.ZkEnableSecureAclsProp, "true")
+this.serverConfig.setProperty(KafkaConfig.AuthorizerClassNameProp, 
authorizerClass.getName)
+
+// Set the specific principal that can update ACLs.
+this.serverConfig.setProperty(AclAuthorizer.SuperUsersProp, 
kafkaPrincipal.toString)
+
 super.setUp(testInfo)
+
+val superuserAdminClient = createSuperuserAdminClient()

Review Comment:
   I tested as we discussed and in fact it is not necessary. I removed the code 
at setUp().



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] C0urante commented on pull request #12828: KAFKA-14346: Remove hard-to-mock RestClient calls

2022-11-16 Thread GitBox


C0urante commented on PR #12828:
URL: https://github.com/apache/kafka/pull/12828#issuecomment-1317293132

   @gharris1727 There appear to be test failures; can you take a look?


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] C0urante commented on a diff in pull request #12233: MINOR: Clean up tmp files created by tests

2022-11-16 Thread GitBox


C0urante commented on code in PR #12233:
URL: https://github.com/apache/kafka/pull/12233#discussion_r1024220705


##
clients/src/test/java/org/apache/kafka/test/TestUtils.java:
##
@@ -142,26 +142,40 @@ public static String randomString(final int len) {
 }
 
 /**
- * Create an empty file in the default temporary-file directory, using 
`kafka` as the prefix and `tmp` as the
- * suffix to generate its name.
+ * Create an empty file in the default temporary-file directory, using the 
given prefix and suffix
+ * to generate its name.
+ * @throws IOException
  */
-public static File tempFile() throws IOException {
-final File file = File.createTempFile("kafka", ".tmp");
+public static File tempFile(final String prefix, final String suffix) 
throws IOException {
+final File file = Files.createTempFile(prefix, suffix).toFile();
 file.deleteOnExit();
 
+Exit.addShutdownHook("delete-temp-file-shutdown-hook", () -> {

Review Comment:
   The idea (at least with calls to terminate the JVM) is that we can add a 
layer in between the components under test and the exit API provided by the 
SDK, and then alter that layer during testing to prevent the actual API from 
being accessed.
   
   One example is in the embedded Connect clusters we use for our integration 
tests: 
https://github.com/apache/kafka/blob/dbf5826cd545820652ed6d136727aa80a03f4f54/connect/runtime/src/test/java/org/apache/kafka/connect/util/clusters/EmbeddedConnectCluster.java#L111-L120
   
   With shutdown hooks, it's mostly hypothetical at this point since AFAICT 
there are no actual invocations of `Exit::setShutdownHookAdder` in the code 
base, but I believe the intention would be similar. You'd intercept attempts by 
components under test to add shutdown hooks, both to prevent those hooks from 
actually being added (in other words, to prevent 
`Runtime.getRuntime().addShutdownHook` from being called), and possibly to make 
assertions on those hooks.
   
   We don't want to allow for the shutdown hooks we added in this PR to be 
intercepted, right? If so, we might consider invoking 
`Runtime.getRuntime().addShutdownHook` directly here.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] mimaison commented on pull request #12839: KAFKA-14346: Replace static mocking of WorkerConfig::lookupKafkaClusterId

2022-11-16 Thread GitBox


mimaison commented on PR #12839:
URL: https://github.com/apache/kafka/pull/12839#issuecomment-1317243176

   @gharris1727 Can you rebase on trunk?


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] bbejeck commented on pull request #12861: KAFKA-14388 - Fixes the NPE when using the new Processor API with the DSL

2022-11-16 Thread GitBox


bbejeck commented on PR #12861:
URL: https://github.com/apache/kafka/pull/12861#issuecomment-1317228760

   Updates per comments - will merge once build completes


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] clolov commented on a diff in pull request #12821: KAFKA-14132: Replace PowerMock and EasyMock with Mockito in streams tests

2022-11-16 Thread GitBox


clolov commented on code in PR #12821:
URL: https://github.com/apache/kafka/pull/12821#discussion_r1024174183


##
streams/src/test/java/org/apache/kafka/streams/processor/internals/StateManagerUtilTest.java:
##
@@ -42,77 +39,54 @@
 
 import static java.util.Collections.emptyList;
 import static java.util.Collections.singletonList;
-import static org.easymock.EasyMock.createStrictControl;
-import static org.easymock.EasyMock.expect;
-import static org.easymock.EasyMock.expectLastCall;
 import static org.junit.Assert.assertEquals;
 import static org.junit.Assert.assertThrows;
-import static org.powermock.api.easymock.PowerMock.mockStatic;
-import static org.powermock.api.easymock.PowerMock.replayAll;
+import static org.mockito.Mockito.doThrow;
+import static org.mockito.Mockito.mockStatic;
+import static org.mockito.Mockito.never;
+import static org.mockito.Mockito.verify;
+import static org.mockito.Mockito.when;
 
-@RunWith(PowerMockRunner.class)
-@PrepareForTest(Utils.class)
+@RunWith(MockitoJUnitRunner.StrictStubs.class)
 public class StateManagerUtilTest {
 
-@Mock(type = MockType.NICE)
+@Mock
 private ProcessorStateManager stateManager;
 
-@Mock(type = MockType.NICE)
+@Mock
 private StateDirectory stateDirectory;
 
-@Mock(type = MockType.NICE)
+@Mock
 private ProcessorTopology topology;
 
-@Mock(type = MockType.NICE)
+@Mock
 private InternalProcessorContext processorContext;
 
-private IMocksControl ctrl;
-
 private Logger logger = new LogContext("test").logger(AbstractTask.class);
 
 private final TaskId taskId = new TaskId(0, 0);
 
-@Before
-public void setup() {
-ctrl = createStrictControl();
-topology = ctrl.createMock(ProcessorTopology.class);
-processorContext = ctrl.createMock(InternalProcessorContext.class);
-
-stateManager = ctrl.createMock(ProcessorStateManager.class);
-stateDirectory = ctrl.createMock(StateDirectory.class);
-}
-
 @Test
 public void testRegisterStateStoreWhenTopologyEmpty() {
-expect(topology.stateStores()).andReturn(emptyList());
-
-ctrl.checkOrder(true);

Review Comment:
   I have hopefully addressed this in the next commit, but if I have missed or 
misunderstood something I will rework it.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] clolov commented on a diff in pull request #12821: KAFKA-14132: Replace PowerMock and EasyMock with Mockito in streams tests

2022-11-16 Thread GitBox


clolov commented on code in PR #12821:
URL: https://github.com/apache/kafka/pull/12821#discussion_r1024170827


##
streams/src/test/java/org/apache/kafka/streams/processor/internals/StateManagerUtilTest.java:
##
@@ -187,151 +139,92 @@ public void 
testCloseStateManagerThrowsExceptionWhenClean() {
 // Thrown stateMgr exception will not be wrapped.
 assertEquals("state manager failed to close", thrown.getMessage());
 
-ctrl.verify();
+// The unlock logic should still be executed.
+verify(stateDirectory).unlock(taskId);
 }
 
 @Test
 public void testCloseStateManagerThrowsExceptionWhenDirty() {
-expect(stateManager.taskId()).andReturn(taskId);
+when(stateManager.taskId()).thenReturn(taskId);
 
-expect(stateDirectory.lock(taskId)).andReturn(true);
+when(stateDirectory.lock(taskId)).thenReturn(true);
 
-stateManager.close();
-expectLastCall().andThrow(new ProcessorStateException("state manager 
failed to close"));
-
-stateDirectory.unlock(taskId);
-
-ctrl.checkOrder(true);
-ctrl.replay();
+doThrow(new ProcessorStateException("state manager failed to 
close")).when(stateManager).close();
 
 assertThrows(
 ProcessorStateException.class,
 () -> StateManagerUtil.closeStateManager(
 logger, "logPrefix:", false, false, stateManager, 
stateDirectory, TaskType.ACTIVE));
 
-ctrl.verify();
+verify(stateDirectory).unlock(taskId);
 }
 
 @Test
 public void testCloseStateManagerWithStateStoreWipeOut() {
-expect(stateManager.taskId()).andReturn(taskId);
-expect(stateDirectory.lock(taskId)).andReturn(true);
-
-stateManager.close();
-expectLastCall();
+when(stateManager.taskId()).thenReturn(taskId);
+when(stateDirectory.lock(taskId)).thenReturn(true);
 
 // The `baseDir` will be accessed when attempting to delete the state 
store.
-
expect(stateManager.baseDir()).andReturn(TestUtils.tempDirectory("state_store"));
-
-stateDirectory.unlock(taskId);
-expectLastCall();
-
-ctrl.checkOrder(true);
-ctrl.replay();
+
when(stateManager.baseDir()).thenReturn(TestUtils.tempDirectory("state_store"));
 
 StateManagerUtil.closeStateManager(logger,
 "logPrefix:", false, true, stateManager, stateDirectory, 
TaskType.ACTIVE);
 
-ctrl.verify();
+verify(stateManager).close();
+verify(stateDirectory).unlock(taskId);
 }
 
 @Test
 public void  shouldStillWipeStateStoresIfCloseThrowsException() throws 
IOException {
 final File randomFile = new File("/random/path");
-mockStatic(Utils.class);
-
-expect(stateManager.taskId()).andReturn(taskId);
-expect(stateDirectory.lock(taskId)).andReturn(true);
 
-stateManager.close();
-expectLastCall().andThrow(new ProcessorStateException("Close failed"));
+when(stateManager.taskId()).thenReturn(taskId);
+when(stateDirectory.lock(taskId)).thenReturn(true);
 
-expect(stateManager.baseDir()).andReturn(randomFile);
+doThrow(new ProcessorStateException("Close 
failed")).when(stateManager).close();
 
-Utils.delete(randomFile);
+when(stateManager.baseDir()).thenReturn(randomFile);
 
-stateDirectory.unlock(taskId);
-expectLastCall();
+try (MockedStatic utils = mockStatic(Utils.class)) {
+assertThrows(ProcessorStateException.class, () ->
+StateManagerUtil.closeStateManager(logger, "logPrefix:", 
false, true, stateManager, stateDirectory, TaskType.ACTIVE));
+}
 
-ctrl.checkOrder(true);
-ctrl.replay();
-
-replayAll();
-
-assertThrows(ProcessorStateException.class, () ->
-StateManagerUtil.closeStateManager(logger, "logPrefix:", false, 
true, stateManager, stateDirectory, TaskType.ACTIVE));
-
-ctrl.verify();
+verify(stateDirectory).unlock(taskId);
 }
 
 @Test
 public void 
testCloseStateManagerWithStateStoreWipeOutRethrowWrappedIOException() throws 
IOException {
 final File unknownFile = new File("/unknown/path");
-mockStatic(Utils.class);
-
-expect(stateManager.taskId()).andReturn(taskId);
-expect(stateDirectory.lock(taskId)).andReturn(true);
-
-stateManager.close();
-expectLastCall();
-
-expect(stateManager.baseDir()).andReturn(unknownFile);
 
-Utils.delete(unknownFile);
-expectLastCall().andThrow(new IOException("Deletion failed"));
+when(stateManager.taskId()).thenReturn(taskId);
+when(stateDirectory.lock(taskId)).thenReturn(true);
+when(stateManager.baseDir()).thenReturn(unknownFile);
 
-stateDirectory.unlock(taskId);
-expectLastCall();
+try (MockedStatic utils = mockStatic(Utils.class)) {
+

[GitHub] [kafka] clolov commented on a diff in pull request #12821: KAFKA-14132: Replace PowerMock and EasyMock with Mockito in streams tests

2022-11-16 Thread GitBox


clolov commented on code in PR #12821:
URL: https://github.com/apache/kafka/pull/12821#discussion_r1024170827


##
streams/src/test/java/org/apache/kafka/streams/processor/internals/StateManagerUtilTest.java:
##
@@ -187,151 +139,92 @@ public void 
testCloseStateManagerThrowsExceptionWhenClean() {
 // Thrown stateMgr exception will not be wrapped.
 assertEquals("state manager failed to close", thrown.getMessage());
 
-ctrl.verify();
+// The unlock logic should still be executed.
+verify(stateDirectory).unlock(taskId);
 }
 
 @Test
 public void testCloseStateManagerThrowsExceptionWhenDirty() {
-expect(stateManager.taskId()).andReturn(taskId);
+when(stateManager.taskId()).thenReturn(taskId);
 
-expect(stateDirectory.lock(taskId)).andReturn(true);
+when(stateDirectory.lock(taskId)).thenReturn(true);
 
-stateManager.close();
-expectLastCall().andThrow(new ProcessorStateException("state manager 
failed to close"));
-
-stateDirectory.unlock(taskId);
-
-ctrl.checkOrder(true);
-ctrl.replay();
+doThrow(new ProcessorStateException("state manager failed to 
close")).when(stateManager).close();
 
 assertThrows(
 ProcessorStateException.class,
 () -> StateManagerUtil.closeStateManager(
 logger, "logPrefix:", false, false, stateManager, 
stateDirectory, TaskType.ACTIVE));
 
-ctrl.verify();
+verify(stateDirectory).unlock(taskId);
 }
 
 @Test
 public void testCloseStateManagerWithStateStoreWipeOut() {
-expect(stateManager.taskId()).andReturn(taskId);
-expect(stateDirectory.lock(taskId)).andReturn(true);
-
-stateManager.close();
-expectLastCall();
+when(stateManager.taskId()).thenReturn(taskId);
+when(stateDirectory.lock(taskId)).thenReturn(true);
 
 // The `baseDir` will be accessed when attempting to delete the state 
store.
-
expect(stateManager.baseDir()).andReturn(TestUtils.tempDirectory("state_store"));
-
-stateDirectory.unlock(taskId);
-expectLastCall();
-
-ctrl.checkOrder(true);
-ctrl.replay();
+
when(stateManager.baseDir()).thenReturn(TestUtils.tempDirectory("state_store"));
 
 StateManagerUtil.closeStateManager(logger,
 "logPrefix:", false, true, stateManager, stateDirectory, 
TaskType.ACTIVE);
 
-ctrl.verify();
+verify(stateManager).close();
+verify(stateDirectory).unlock(taskId);
 }
 
 @Test
 public void  shouldStillWipeStateStoresIfCloseThrowsException() throws 
IOException {
 final File randomFile = new File("/random/path");
-mockStatic(Utils.class);
-
-expect(stateManager.taskId()).andReturn(taskId);
-expect(stateDirectory.lock(taskId)).andReturn(true);
 
-stateManager.close();
-expectLastCall().andThrow(new ProcessorStateException("Close failed"));
+when(stateManager.taskId()).thenReturn(taskId);
+when(stateDirectory.lock(taskId)).thenReturn(true);
 
-expect(stateManager.baseDir()).andReturn(randomFile);
+doThrow(new ProcessorStateException("Close 
failed")).when(stateManager).close();
 
-Utils.delete(randomFile);
+when(stateManager.baseDir()).thenReturn(randomFile);
 
-stateDirectory.unlock(taskId);
-expectLastCall();
+try (MockedStatic utils = mockStatic(Utils.class)) {
+assertThrows(ProcessorStateException.class, () ->
+StateManagerUtil.closeStateManager(logger, "logPrefix:", 
false, true, stateManager, stateDirectory, TaskType.ACTIVE));
+}
 
-ctrl.checkOrder(true);
-ctrl.replay();
-
-replayAll();
-
-assertThrows(ProcessorStateException.class, () ->
-StateManagerUtil.closeStateManager(logger, "logPrefix:", false, 
true, stateManager, stateDirectory, TaskType.ACTIVE));
-
-ctrl.verify();
+verify(stateDirectory).unlock(taskId);
 }
 
 @Test
 public void 
testCloseStateManagerWithStateStoreWipeOutRethrowWrappedIOException() throws 
IOException {
 final File unknownFile = new File("/unknown/path");
-mockStatic(Utils.class);
-
-expect(stateManager.taskId()).andReturn(taskId);
-expect(stateDirectory.lock(taskId)).andReturn(true);
-
-stateManager.close();
-expectLastCall();
-
-expect(stateManager.baseDir()).andReturn(unknownFile);
 
-Utils.delete(unknownFile);
-expectLastCall().andThrow(new IOException("Deletion failed"));
+when(stateManager.taskId()).thenReturn(taskId);
+when(stateDirectory.lock(taskId)).thenReturn(true);
+when(stateManager.baseDir()).thenReturn(unknownFile);
 
-stateDirectory.unlock(taskId);
-expectLastCall();
+try (MockedStatic utils = mockStatic(Utils.class)) {
+

[GitHub] [kafka] clolov commented on a diff in pull request #12821: KAFKA-14132: Replace PowerMock and EasyMock with Mockito in streams tests

2022-11-16 Thread GitBox


clolov commented on code in PR #12821:
URL: https://github.com/apache/kafka/pull/12821#discussion_r1024166413


##
streams/src/test/java/org/apache/kafka/streams/processor/internals/StateManagerUtilTest.java:
##
@@ -187,151 +139,92 @@ public void 
testCloseStateManagerThrowsExceptionWhenClean() {
 // Thrown stateMgr exception will not be wrapped.
 assertEquals("state manager failed to close", thrown.getMessage());
 
-ctrl.verify();
+// The unlock logic should still be executed.
+verify(stateDirectory).unlock(taskId);
 }
 
 @Test
 public void testCloseStateManagerThrowsExceptionWhenDirty() {
-expect(stateManager.taskId()).andReturn(taskId);
+when(stateManager.taskId()).thenReturn(taskId);
 
-expect(stateDirectory.lock(taskId)).andReturn(true);
+when(stateDirectory.lock(taskId)).thenReturn(true);
 
-stateManager.close();
-expectLastCall().andThrow(new ProcessorStateException("state manager 
failed to close"));
-
-stateDirectory.unlock(taskId);
-
-ctrl.checkOrder(true);
-ctrl.replay();
+doThrow(new ProcessorStateException("state manager failed to 
close")).when(stateManager).close();
 
 assertThrows(
 ProcessorStateException.class,
 () -> StateManagerUtil.closeStateManager(
 logger, "logPrefix:", false, false, stateManager, 
stateDirectory, TaskType.ACTIVE));
 
-ctrl.verify();
+verify(stateDirectory).unlock(taskId);
 }
 
 @Test
 public void testCloseStateManagerWithStateStoreWipeOut() {
-expect(stateManager.taskId()).andReturn(taskId);
-expect(stateDirectory.lock(taskId)).andReturn(true);
-
-stateManager.close();
-expectLastCall();
+when(stateManager.taskId()).thenReturn(taskId);
+when(stateDirectory.lock(taskId)).thenReturn(true);
 
 // The `baseDir` will be accessed when attempting to delete the state 
store.
-
expect(stateManager.baseDir()).andReturn(TestUtils.tempDirectory("state_store"));
-
-stateDirectory.unlock(taskId);
-expectLastCall();
-
-ctrl.checkOrder(true);
-ctrl.replay();
+
when(stateManager.baseDir()).thenReturn(TestUtils.tempDirectory("state_store"));
 
 StateManagerUtil.closeStateManager(logger,
 "logPrefix:", false, true, stateManager, stateDirectory, 
TaskType.ACTIVE);
 
-ctrl.verify();
+verify(stateManager).close();
+verify(stateDirectory).unlock(taskId);
 }
 
 @Test
 public void  shouldStillWipeStateStoresIfCloseThrowsException() throws 
IOException {
 final File randomFile = new File("/random/path");
-mockStatic(Utils.class);
-
-expect(stateManager.taskId()).andReturn(taskId);
-expect(stateDirectory.lock(taskId)).andReturn(true);
 
-stateManager.close();
-expectLastCall().andThrow(new ProcessorStateException("Close failed"));
+when(stateManager.taskId()).thenReturn(taskId);
+when(stateDirectory.lock(taskId)).thenReturn(true);
 
-expect(stateManager.baseDir()).andReturn(randomFile);
+doThrow(new ProcessorStateException("Close 
failed")).when(stateManager).close();
 
-Utils.delete(randomFile);
+when(stateManager.baseDir()).thenReturn(randomFile);
 
-stateDirectory.unlock(taskId);
-expectLastCall();
+try (MockedStatic utils = mockStatic(Utils.class)) {
+assertThrows(ProcessorStateException.class, () ->
+StateManagerUtil.closeStateManager(logger, "logPrefix:", 
false, true, stateManager, stateDirectory, TaskType.ACTIVE));
+}
 
-ctrl.checkOrder(true);
-ctrl.replay();
-
-replayAll();
-
-assertThrows(ProcessorStateException.class, () ->
-StateManagerUtil.closeStateManager(logger, "logPrefix:", false, 
true, stateManager, stateDirectory, TaskType.ACTIVE));
-
-ctrl.verify();
+verify(stateDirectory).unlock(taskId);
 }
 
 @Test
 public void 
testCloseStateManagerWithStateStoreWipeOutRethrowWrappedIOException() throws 
IOException {
 final File unknownFile = new File("/unknown/path");
-mockStatic(Utils.class);
-
-expect(stateManager.taskId()).andReturn(taskId);
-expect(stateDirectory.lock(taskId)).andReturn(true);
-
-stateManager.close();
-expectLastCall();
-
-expect(stateManager.baseDir()).andReturn(unknownFile);
 
-Utils.delete(unknownFile);
-expectLastCall().andThrow(new IOException("Deletion failed"));
+when(stateManager.taskId()).thenReturn(taskId);
+when(stateDirectory.lock(taskId)).thenReturn(true);
+when(stateManager.baseDir()).thenReturn(unknownFile);
 
-stateDirectory.unlock(taskId);
-expectLastCall();
+try (MockedStatic utils = mockStatic(Utils.class)) {
+

[GitHub] [kafka] rondagostino commented on a diff in pull request #12856: KAFKA-14392: KRaft broker heartbeat timeout should not exceed broker.session.timeout.ms

2022-11-16 Thread GitBox


rondagostino commented on code in PR #12856:
URL: https://github.com/apache/kafka/pull/12856#discussion_r1024154049


##
core/src/main/scala/kafka/server/BrokerToControllerChannelManager.scala:
##
@@ -121,16 +121,18 @@ object BrokerToControllerChannelManager {
 config: KafkaConfig,
 channelName: String,
 threadNamePrefix: Option[String],
-retryTimeoutMs: Long
-  ): BrokerToControllerChannelManager = {
+retryTimeoutMs: Long,
+networkClientRetryTimeoutMs: Option[Int] = None

Review Comment:
   The fact that there are actually two separate timeouts here is a signal that 
something may be wrong.  To be clear, there were 2 separate timeouts prior to 
this PR -- this patch just makes their existence explicit so that we can set 
both of them in the KRaft case.   I've opened 
https://issues.apache.org/jira/browse/KAFKA-14394 to track this.



##
core/src/main/scala/kafka/server/BrokerToControllerChannelManager.scala:
##
@@ -211,7 +214,7 @@ class BrokerToControllerChannelManagerImpl(
 50,
 Selectable.USE_DEFAULT_BUFFER_SIZE,
 Selectable.USE_DEFAULT_BUFFER_SIZE,
-config.requestTimeoutMs,
+
networkClientRetryTimeoutMs.getOrElse(config.controllerSocketTimeoutMs),

Review Comment:
   Again, this is a cosmetic change.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] YeonCheolGit commented on a diff in pull request #12803: KAFKA-13602: Adding ability to multicast records.

2022-11-16 Thread GitBox


YeonCheolGit commented on code in PR #12803:
URL: https://github.com/apache/kafka/pull/12803#discussion_r1024151927


##
streams/src/main/java/org/apache/kafka/streams/kstream/internals/KTableImpl.java:
##
@@ -1046,7 +1047,18 @@ public  KTable leftJoin(final 
KTable other,
 return doJoinOnForeignKey(other, foreignKeyExtractor, joiner, 
TableJoined.with(null, null), materialized, true);
 }
 
-@SuppressWarnings("unchecked")
+private final Function>, Integer> getPartition = 
maybeMulticastPartitions -> {

Review Comment:
   @vamossagar12 Thanks for letting me know about your intention:) Your answer 
is very clear to me.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] rondagostino commented on pull request #12856: KAFKA-14392: KRaft broker heartbeat timeout should not exceed broker.session.timeout.ms

2022-11-16 Thread GitBox


rondagostino commented on PR #12856:
URL: https://github.com/apache/kafka/pull/12856#issuecomment-1317191981

   > change is getting a bit too big,
   
   Thanks, @cmccabe.  I've trimmed it down and opened the separate ticket as 
you mentioned.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] rondagostino commented on a diff in pull request #12856: KAFKA-14392: KRaft broker heartbeat timeout should not exceed broker.session.timeout.ms

2022-11-16 Thread GitBox


rondagostino commented on code in PR #12856:
URL: https://github.com/apache/kafka/pull/12856#discussion_r1024149966


##
core/src/main/scala/kafka/server/AlterPartitionManager.scala:
##
@@ -91,7 +91,8 @@ object AlterPartitionManager {
   config = config,
   channelName = "alterPartition",
   threadNamePrefix = threadNamePrefix,
-  retryTimeoutMs = Long.MaxValue
+  networkClientRetryTimeoutMs = if (config.processRoles.isEmpty) 
config.controllerSocketTimeoutMs else config.brokerSessionTimeoutMs / 2,

Review Comment:
   > let's leave this one alone for now and file a follow-up JIRA for it
   
   I've opened https://issues.apache.org/jira/browse/KAFKA-14394 to track this.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[jira] [Created] (KAFKA-14394) BrokerToControllerChannelManager has 2 separate timeouts

2022-11-16 Thread Ron Dagostino (Jira)
Ron Dagostino created KAFKA-14394:
-

 Summary: BrokerToControllerChannelManager has 2 separate timeouts
 Key: KAFKA-14394
 URL: https://issues.apache.org/jira/browse/KAFKA-14394
 Project: Kafka
  Issue Type: Task
Reporter: Ron Dagostino


BrokerToControllerChannelManager uses `config.controllerSocketTimeoutMs` as its 
default `networkClientRetryTimeoutMs` in general, but it does accept a second 
`retryTimeoutMs`, value -- and then there is exactly one place where second 
timeout is used: within BrokerToControllerRequestThread.  Is this second, 
separate timeout actually necessary, or is it a bug (in which case the two 
timeouts should be the same).  Closely related to this is the case of 
AlterPartitionManager, which sends Long.MAX_VALUE as the retryTimeoutMs value 
when it instantiates its instance of BrokerToControllerChannelManager.  Is this 
Long.MAX_VALUE correct, when in fact `config.controllerSocketTimeoutMs` is 
being used as the other timeout?

This is related to 
https://issues.apache.org/jira/projects/KAFKA/issues/KAFKA-14392 and the 
associated PR, https://github.com/apache/kafka/pull/12856



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[GitHub] [kafka] bbejeck commented on a diff in pull request #12861: KAFKA-14388 - Fixes the NPE when using the new Processor API with the DSL

2022-11-16 Thread GitBox


bbejeck commented on code in PR #12861:
URL: https://github.com/apache/kafka/pull/12861#discussion_r1024098926


##
streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamNewProcessorApiTest.java:
##
@@ -0,0 +1,117 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.kafka.streams.kstream.internals;
+
+import org.apache.kafka.common.serialization.Deserializer;
+import org.apache.kafka.common.serialization.Serdes;
+import org.apache.kafka.streams.KeyValue;
+import org.apache.kafka.streams.StreamsBuilder;
+import org.apache.kafka.streams.TestInputTopic;
+import org.apache.kafka.streams.TopologyTestDriver;
+import org.apache.kafka.streams.kstream.Consumed;
+import org.apache.kafka.streams.kstream.Produced;
+import org.apache.kafka.streams.processor.api.ContextualFixedKeyProcessor;
+import org.apache.kafka.streams.processor.api.FixedKeyProcessorContext;
+import org.apache.kafka.streams.processor.api.FixedKeyProcessorSupplier;
+import org.apache.kafka.streams.processor.api.FixedKeyRecord;
+import org.apache.kafka.streams.state.KeyValueStore;
+import org.apache.kafka.streams.state.StoreBuilder;
+import org.apache.kafka.streams.state.Stores;
+import org.junit.jupiter.api.Assertions;
+import org.junit.jupiter.api.DisplayName;
+import org.junit.jupiter.api.Test;
+
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.List;
+import java.util.Set;
+
+import static java.util.Arrays.asList;
+
+
+public class KStreamNewProcessorApiTest {
+
+@Test
+@DisplayName("Test for using new Processor API and state stores with the 
DSL")
+void shouldGetStateStoreWithNewProcessor() {
+final StreamsBuilder builder = new StreamsBuilder();
+final StoreBuilder storeBuilder = 
Stores.keyValueStoreBuilder(Stores.inMemoryKeyValueStore("store"), 
Serdes.String(), Serdes.String());
+
+
+builder.stream("input", Consumed.with(Serdes.String(), 
Serdes.String()))
+.processValues(new TransformerSupplier(storeBuilder), "store")
+.to("output", Produced.with(Serdes.String(), Serdes.String()));
+
+final List> words = 
Arrays.asList(KeyValue.pair("a", "foo"), KeyValue.pair("b", "bar"), 
KeyValue.pair("c", "baz"));
+try (TopologyTestDriver testDriver = new 
TopologyTestDriver(builder.build())) {
+final TestInputTopic
+testDriverInputTopic =
+testDriver.createInputTopic("input", 
Serdes.String().serializer(), Serdes.String().serializer());
+
+words.forEach(clk -> testDriverInputTopic.pipeInput(clk.key, 
clk.value));
+
+final List expectedOutput = asList("fooUpdated", 
"barUpdated", "bazUpdated");
+
+final Deserializer keyDeserializer = 
Serdes.String().deserializer();
+final List actualOutput =
+new ArrayList<>(testDriver.createOutputTopic("output", 
keyDeserializer, Serdes.String().deserializer()).readValuesToList());
+
+Assertions.assertEquals(expectedOutput, actualOutput);
+}
+}
+private static class TransformerSupplier implements 
FixedKeyProcessorSupplier {
+private final StoreBuilder storeBuilder;
+
+public TransformerSupplier(final StoreBuilder storeBuilder) {
+this.storeBuilder = storeBuilder;
+}
+
+@Override
+public ContextualFixedKeyProcessor get() {
+return new ContextualFixedKeyProcessor() {
+KeyValueStore store;
+FixedKeyProcessorContext context;
+
+@Override
+public void init(final FixedKeyProcessorContext context) {
+super.init(context);
+store = context.getStateStore("store");
+this.context = context;
+}
+
+@Override
+public void process(final FixedKeyRecord 
record) {
+final String updated = store.get(record.key());
+store.putIfAbsent(record.key(), record.value() + 
"Updated");

Review Comment:
   ack - will test the store content as well



-- 
This is an automated message from the 

[GitHub] [kafka] bbejeck commented on a diff in pull request #12861: KAFKA-14388 - Fixes the NPE when using the new Processor API with the DSL

2022-11-16 Thread GitBox


bbejeck commented on code in PR #12861:
URL: https://github.com/apache/kafka/pull/12861#discussion_r1024098085


##
streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamNewProcessorApiTest.java:
##
@@ -0,0 +1,117 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.kafka.streams.kstream.internals;
+
+import org.apache.kafka.common.serialization.Deserializer;
+import org.apache.kafka.common.serialization.Serdes;
+import org.apache.kafka.streams.KeyValue;
+import org.apache.kafka.streams.StreamsBuilder;
+import org.apache.kafka.streams.TestInputTopic;
+import org.apache.kafka.streams.TopologyTestDriver;
+import org.apache.kafka.streams.kstream.Consumed;
+import org.apache.kafka.streams.kstream.Produced;
+import org.apache.kafka.streams.processor.api.ContextualFixedKeyProcessor;
+import org.apache.kafka.streams.processor.api.FixedKeyProcessorContext;
+import org.apache.kafka.streams.processor.api.FixedKeyProcessorSupplier;
+import org.apache.kafka.streams.processor.api.FixedKeyRecord;
+import org.apache.kafka.streams.state.KeyValueStore;
+import org.apache.kafka.streams.state.StoreBuilder;
+import org.apache.kafka.streams.state.Stores;
+import org.junit.jupiter.api.Assertions;
+import org.junit.jupiter.api.DisplayName;
+import org.junit.jupiter.api.Test;
+
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.List;
+import java.util.Set;
+
+import static java.util.Arrays.asList;
+
+
+public class KStreamNewProcessorApiTest {
+
+@Test
+@DisplayName("Test for using new Processor API and state stores with the 
DSL")
+void shouldGetStateStoreWithNewProcessor() {
+final StreamsBuilder builder = new StreamsBuilder();
+final StoreBuilder storeBuilder = 
Stores.keyValueStoreBuilder(Stores.inMemoryKeyValueStore("store"), 
Serdes.String(), Serdes.String());
+
+
+builder.stream("input", Consumed.with(Serdes.String(), 
Serdes.String()))
+.processValues(new TransformerSupplier(storeBuilder), "store")

Review Comment:
   ack - I'll add test for using `addStateStore` instead of using 
`ConnectedStoreProvider` as well



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] jonathan-albrecht-ibm commented on a diff in pull request #12343: MINOR: Update unit/integration tests to work with the IBM Semeru JDK

2022-11-16 Thread GitBox


jonathan-albrecht-ibm commented on code in PR #12343:
URL: https://github.com/apache/kafka/pull/12343#discussion_r1024093773


##
core/src/test/scala/unit/kafka/utils/JaasTestUtils.scala:
##
@@ -119,6 +120,18 @@ object JaasTestUtils {
 }
   }
 
+  private val isIbmSecurity = try {
+  Class.forName("sun.security.krb5.Config"); false
+} catch {
+  case ex: Exception => {
+if (Java.isIbmJdk) {
+  Class.forName("com.ibm.security.krb5.internal.Config"); true
+} else {
+  throw ex
+}
+  }
+}

Review Comment:
   See reply above.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] jonathan-albrecht-ibm commented on a diff in pull request #12343: MINOR: Update unit/integration tests to work with the IBM Semeru JDK

2022-11-16 Thread GitBox


jonathan-albrecht-ibm commented on code in PR #12343:
URL: https://github.com/apache/kafka/pull/12343#discussion_r1024093478


##
core/src/test/scala/kafka/security/minikdc/MiniKdc.scala:
##
@@ -260,11 +260,17 @@ class MiniKdc(config: Properties, workDir: File) extends 
Logging {
   }
 
   private def refreshJvmKerberosConfig(): Unit = {
-val klass =
-  if (Java.isIbmJdk)
-Class.forName("com.ibm.security.krb5.internal.Config")
-  else
-Class.forName("sun.security.krb5.Config")
+// Newer IBM JDKs use the OpenJDK security providers so try that first
+val klass = try {
+  Class.forName("sun.security.krb5.Config")
+} catch {
+  case ex: Exception => {
+if (Java.isIbmJdk)
+  Class.forName("com.ibm.security.krb5.internal.Config")
+else
+  throw ex
+  }
+}

Review Comment:
   The idea was to just check for the classes directly but you're definitely 
right that `Java.isIbmJdkSemeru` is more readable. I have a new commit to make 
this change and the other one below, but I'll hold off on pushing it to see if 
any more feedback from @ijuma or @hachikuji.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] jonathan-albrecht-ibm commented on a diff in pull request #12343: MINOR: Update unit/integration tests to work with the IBM Semeru JDK

2022-11-16 Thread GitBox


jonathan-albrecht-ibm commented on code in PR #12343:
URL: https://github.com/apache/kafka/pull/12343#discussion_r1024086819


##
clients/src/test/java/org/apache/kafka/clients/producer/KafkaProducerTest.java:
##
@@ -2263,7 +2263,10 @@ public void configure(Map configs) {
 private Sender sender = mock(Sender.class);
 private TransactionManager transactionManager = 
mock(TransactionManager.class);
 private Partitioner partitioner = mock(Partitioner.class);
-private KafkaThread ioThread = mock(KafkaThread.class);
+private KafkaThread ioThread = new KafkaThread("Fake Kafka Producer 
I/O Thread", new Runnable() {
+@Override
+public void run() {}
+}, true);

Review Comment:
   I have run the test again without this change and it looks like it is no 
longer needed. I see that mockito has been upgraded to newer releases a few 
times since this PR was created so I'm guessing that has fixed this problem.
   
   I'll push an amended commit to remove this change but let me know if the 
kafka developers prefer adding a new commit to remove it instead.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] dajac commented on a diff in pull request #12748: KAFKA-13715: add generationId field in subscription

2022-11-16 Thread GitBox


dajac commented on code in PR #12748:
URL: https://github.com/apache/kafka/pull/12748#discussion_r1024071860


##
clients/src/test/java/org/apache/kafka/clients/consumer/internals/AbstractStickyAssignorTest.java:
##
@@ -73,6 +88,74 @@ public void setUp() {
 }
 }
 
+@Test
+public void testMemberDataFromSubscription() {
+List topics = topics(topic);
+List ownedPartitions = partitions(tp(topic1, 0), 
tp(topic2, 1));
+List subscriptions = new ArrayList<>();
+// add all subscription in ConsumerProtocolSubscription versions
+subscriptions.add(buildSubscriptionV0(topics, ownedPartitions, 
generationId));
+subscriptions.add(buildSubscriptionV1(topics, ownedPartitions, 
generationId));
+subscriptions.add(buildSubscriptionV2Above(topics, ownedPartitions, 
generationId));
+for (Subscription subscription : subscriptions) {
+if (subscription != null) {
+AbstractStickyAssignor.MemberData memberData = 
assignor.memberDataFromSubscription(subscription);
+assertEquals(ownedPartitions, memberData.partitions, 
"subscription: " + subscription + " doesn't have expected owned partition");
+assertEquals(generationId, memberData.generation.orElse(-1), 
"subscription: " + subscription + " doesn't have expected generation id");
+}
+}
+}
+
+@Test
+public void testMemberDataFromSubscriptionWithInconsistentData() {
+Map partitionsPerTopic = new HashMap<>();
+partitionsPerTopic.put(topic, 2);
+List ownedPartitionsInUserdata = partitions(tp1);
+List ownedPartitionsInSubscription = partitions(tp0);
+
+assignor.onAssignment(new 
ConsumerPartitionAssignor.Assignment(ownedPartitionsInUserdata), new 
ConsumerGroupMetadata(groupId, generationId, consumer1, Optional.empty()));
+ByteBuffer userDataWithHigherGenerationId = 
assignor.subscriptionUserData(new HashSet<>(topics(topic)));
+// The owned partitions and generation id are provided in user data 
and different owned partition is provided in subscription without generation id
+// If subscription provides no generation id, we'll honor the 
generation id in userData and owned partitions in subscription
+Subscription subscription = new Subscription(topics(topic), 
userDataWithHigherGenerationId, ownedPartitionsInSubscription);
+
+AbstractStickyAssignor.MemberData memberData = 
assignor.memberDataFromSubscription(subscription);
+// in CooperativeStickyAssignor, we'll serialize owned partition in 
subscription into userData
+// but in StickyAssignor, we'll serialize owned partition in 
assignment into userData
+if (assignor instanceof CooperativeStickyAssignor) {

Review Comment:
   This is not very elegant given that we have subclasses for each. Should we 
move this test to the respective subclasses?



##
clients/src/test/java/org/apache/kafka/clients/consumer/internals/AbstractStickyAssignorTest.java:
##
@@ -73,6 +88,74 @@ public void setUp() {
 }
 }
 
+@Test
+public void testMemberDataFromSubscription() {
+List topics = topics(topic);
+List ownedPartitions = partitions(tp(topic1, 0), 
tp(topic2, 1));
+List subscriptions = new ArrayList<>();
+// add all subscription in ConsumerProtocolSubscription versions
+subscriptions.add(buildSubscriptionV0(topics, ownedPartitions, 
generationId));
+subscriptions.add(buildSubscriptionV1(topics, ownedPartitions, 
generationId));
+subscriptions.add(buildSubscriptionV2Above(topics, ownedPartitions, 
generationId));
+for (Subscription subscription : subscriptions) {
+if (subscription != null) {
+AbstractStickyAssignor.MemberData memberData = 
assignor.memberDataFromSubscription(subscription);
+assertEquals(ownedPartitions, memberData.partitions, 
"subscription: " + subscription + " doesn't have expected owned partition");
+assertEquals(generationId, memberData.generation.orElse(-1), 
"subscription: " + subscription + " doesn't have expected generation id");
+}
+}
+}
+
+@Test
+public void testMemberDataFromSubscriptionWithInconsistentData() {
+Map partitionsPerTopic = new HashMap<>();
+partitionsPerTopic.put(topic, 2);
+List ownedPartitionsInUserdata = partitions(tp1);
+List ownedPartitionsInSubscription = partitions(tp0);
+
+assignor.onAssignment(new 
ConsumerPartitionAssignor.Assignment(ownedPartitionsInUserdata), new 
ConsumerGroupMetadata(groupId, generationId, consumer1, Optional.empty()));
+ByteBuffer userDataWithHigherGenerationId = 
assignor.subscriptionUserData(new HashSet<>(topics(topic)));
+// The owned partitions and generation id are provided in user data 
and different owned partition is provided in 

[GitHub] [kafka] rigelbm closed pull request #12493: MINOR: Migrate from slf4j-log4j12 to slf4j-reload4j.

2022-11-16 Thread GitBox


rigelbm closed pull request #12493: MINOR: Migrate from slf4j-log4j12 to 
slf4j-reload4j.
URL: https://github.com/apache/kafka/pull/12493


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] dajac commented on a diff in pull request #12748: KAFKA-13715: add generationId field in subscription

2022-11-16 Thread GitBox


dajac commented on code in PR #12748:
URL: https://github.com/apache/kafka/pull/12748#discussion_r1024047722


##
clients/src/main/java/org/apache/kafka/clients/consumer/internals/AbstractStickyAssignor.java:
##
@@ -162,6 +162,17 @@ private boolean allSubscriptionsEqual(Set 
allTopics,
 return isAllSubscriptionsEqual;
 }
 
+// visible for testing
+MemberData memberDataFromSubscription(Subscription subscription) {
+if (!subscription.ownedPartitions().isEmpty() && 
subscription.generationId().isPresent()) {

Review Comment:
   I wonder if this is 100% correct. Could we have a case where subscription v3 
is used but no owned partitions are reported by the member? For instance, this 
could happen if the group has more members than partitions.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[jira] [Resolved] (KAFKA-14334) DelayedFetch purgatory not completed when appending as follower

2022-11-16 Thread David Jacot (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-14334?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

David Jacot resolved KAFKA-14334.
-
Fix Version/s: 3.4.0
   3.3.2
 Reviewer: David Jacot
   Resolution: Fixed

> DelayedFetch purgatory not completed when appending as follower
> ---
>
> Key: KAFKA-14334
> URL: https://issues.apache.org/jira/browse/KAFKA-14334
> Project: Kafka
>  Issue Type: Bug
>Reporter: Jeff Kim
>Assignee: Jeff Kim
>Priority: Major
> Fix For: 3.4.0, 3.3.2
>
>
> Currently, the ReplicaManager.delayedFetchPurgatory is only completed when 
> appending as leader. With 
> [https://cwiki.apache.org/confluence/display/KAFKA/KIP-392%3A+Allow+consumers+to+fetch+from+closest+replica]
>  enabled, followers will also have to complete delayed fetch requests after 
> successfully replicating. Otherwise, consumer fetches to closest followers 
> will hit fetch.max.wait.ms



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[GitHub] [kafka] divijvaidya commented on a diff in pull request #12233: MINOR: Clean up tmp files created by tests

2022-11-16 Thread GitBox


divijvaidya commented on code in PR #12233:
URL: https://github.com/apache/kafka/pull/12233#discussion_r1024041485


##
clients/src/test/java/org/apache/kafka/test/TestUtils.java:
##
@@ -142,26 +142,40 @@ public static String randomString(final int len) {
 }
 
 /**
- * Create an empty file in the default temporary-file directory, using 
`kafka` as the prefix and `tmp` as the
- * suffix to generate its name.
+ * Create an empty file in the default temporary-file directory, using the 
given prefix and suffix
+ * to generate its name.
+ * @throws IOException
  */
-public static File tempFile() throws IOException {
-final File file = File.createTempFile("kafka", ".tmp");
+public static File tempFile(final String prefix, final String suffix) 
throws IOException {
+final File file = Files.createTempFile(prefix, suffix).toFile();
 file.deleteOnExit();
 
+Exit.addShutdownHook("delete-temp-file-shutdown-hook", () -> {

Review Comment:
   From what I understand, we try to use `Exit.scala` instead of 
`Runtime.getRuntime()` so that Tests may provide their own implementation of 
shutdown hook adder. This is mentioned here: 
https://github.com/apache/kafka/blob/trunk/core/src/main/scala/kafka/utils/Exit.scala#L22
 
   
   I didn't understand the scenario where this hook will be disabled! Could you 
please explain with an example?
   
   If a test would override the shutdown hook behaviour, then I would assume it 
will add a new shutdown hook? or perhaps, add a new `ShutdownHookAdder` where 
it may play with the order of the hooks? In all such scenarios, this cleanup 
hook will still be executed as long it is registered correctly. 
   
   



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] dajac commented on pull request #12783: KAFKA-14334: complete delayed purgatory after replication

2022-11-16 Thread GitBox


dajac commented on PR #12783:
URL: https://github.com/apache/kafka/pull/12783#issuecomment-1317073961

   Merged to trunk and 3.3.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] dajac merged pull request #12783: KAFKA-14334: complete delayed purgatory after replication

2022-11-16 Thread GitBox


dajac merged PR #12783:
URL: https://github.com/apache/kafka/pull/12783


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] dajac commented on pull request #12783: KAFKA-14334: complete delayed purgatory after replication

2022-11-16 Thread GitBox


dajac commented on PR #12783:
URL: https://github.com/apache/kafka/pull/12783#issuecomment-1317056696

   Failed tests are unrelated:
   ```
   Build / JDK 17 and Scala 2.13 / 
kafka.api.TransactionsExpirationTest.testTransactionAfterProducerIdExpires(String).quorum=kraft
   Build / JDK 11 and Scala 2.13 / 
org.apache.kafka.connect.mirror.integration.MirrorConnectorsWithCustomForwardingAdminIntegrationTest.testCreatePartitionsUseProvidedForwardingAdmin()
   Build / JDK 11 and Scala 2.13 / 
org.apache.kafka.controller.QuorumControllerTest.testDelayedConfigurationOperations()
   Build / JDK 11 and Scala 2.13 / 
org.apache.kafka.trogdor.coordinator.CoordinatorTest.testTaskRequestWithOldStartMsGetsUpdated()
   Build / JDK 8 and Scala 2.12 / 
org.apache.kafka.controller.QuorumControllerTest.testEarlyControllerResults()
   ```


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] cadonna commented on pull request #12809: KAFKA-14324: Upgrade RocksDB to 7.1.2

2022-11-16 Thread GitBox


cadonna commented on PR #12809:
URL: https://github.com/apache/kafka/pull/12809#issuecomment-1317015966

   Backported to 3.3, 3.2, 3.1, and 3.0. For 3.1 and 3.0, I also needed to 
backport https://github.com/apache/kafka/pull/11690.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] jeqo commented on a diff in pull request #12859: KAFKA-14325: Fix NPE on Processor Parameters toString

2022-11-16 Thread GitBox


jeqo commented on code in PR #12859:
URL: https://github.com/apache/kafka/pull/12859#discussion_r1023922604


##
streams/src/main/java/org/apache/kafka/streams/kstream/internals/graph/ProcessorParameters.java:
##
@@ -130,7 +129,8 @@ public String processorName() {
 @Override
 public String toString() {
 return "ProcessorParameters{" +
-"processor class=" + processorSupplier.get().getClass() +
+"processor supplier class=" + (processorSupplier != null ? 
processorSupplier.getClass() : "null") +
+", fixed key processor supplier class=" + 
(fixedKeyProcessorSupplier != null ? fixedKeyProcessorSupplier.getClass() : 
"null") +

Review Comment:
   Avoid calling get() as it could trigger side effects (it does on tests, see 
https://github.com/apache/kafka/blob/039a245a2314c63834e61989939738579bad/streams/src/test/java/org/apache/kafka/test/MockApiProcessorSupplier.java#L51-L54)



##
streams/src/main/java/org/apache/kafka/streams/kstream/internals/graph/ProcessorParameters.java:
##
@@ -114,13 +114,12 @@ KTableSource kTableSourceSupplier() {
 
 @SuppressWarnings("unchecked")
  KTableProcessorSupplier 
kTableProcessorSupplier() {
-// This cast always works because KTableProcessorSupplier hasn't been 
converted yet.

Review Comment:
   Now that TableProcessor suplier is using latest API, I think this is worth 
updating



##
streams/src/test/resources/log4j.properties:
##
@@ -27,7 +27,7 @@ log4j.logger.org.apache.kafka.clients=ERROR
 # These are the only logs we will likely ever find anything useful in to debug 
Streams test failures
 log4j.logger.org.apache.kafka.clients.consumer=INFO
 log4j.logger.org.apache.kafka.clients.producer=INFO
-log4j.logger.org.apache.kafka.streams=INFO
+log4j.logger.org.apache.kafka.streams=DEBUG

Review Comment:
   This would have triggered the error in the first place



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] dajac opened a new pull request, #12865: MINOR: Serialize response in KafkaApisTest

2022-11-16 Thread GitBox


dajac opened a new pull request, #12865:
URL: https://github.com/apache/kafka/pull/12865

   WIP
   
   ### Committer Checklist (excluded from commit message)
   - [ ] Verify design and implementation 
   - [ ] Verify test coverage and CI build status
   - [ ] Verify documentation (including upgrade notes)
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] clolov commented on pull request #12818: KAFKA-14133: Replace EasyMock with Mockito in streams tests

2022-11-16 Thread GitBox


clolov commented on PR #12818:
URL: https://github.com/apache/kafka/pull/12818#issuecomment-1316803784

   Tagging @divijvaidya for visibility as well :)


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] clolov commented on pull request #12418: KAFKA-13414: Replace PowerMock/EasyMock with Mockito in connect.storage.KafkaOffsetBackingStoreTest

2022-11-16 Thread GitBox


clolov commented on PR #12418:
URL: https://github.com/apache/kafka/pull/12418#issuecomment-1316765701

   Thank you for the quick turnaround on the review @C0urante! I will aim to 
open a couple more pull requests in the upcoming days for Connect with respect 
to the Mockito migration and tag you there.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[jira] [Updated] (KAFKA-14132) Remaining PowerMock to Mockito tests

2022-11-16 Thread Christo Lolov (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-14132?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Christo Lolov updated KAFKA-14132:
--
Description: 
{color:#de350b}Some of the tests below use EasyMock as well. For those migrate 
both PowerMock and EasyMock to Mockito.{color}

Unless stated in brackets the tests are in the connect module.

A list of tests which still require to be moved from PowerMock to Mockito as of 
2nd of August 2022 which do not have a Jira issue and do not have pull requests 
I am aware of which are opened:

{color:#ff8b00}InReview{color}
{color:#00875a}Merged{color}
 # {color:#ff8b00}ErrorHandlingTaskTest{color} (owner: [~shekharrajak])
 # {color:#00875a}SourceTaskOffsetCommiterTest{color} (owner: Christo)
 # {color:#00875a}WorkerMetricsGroupTest{color} (owner: Divij)
 # WorkerSinkTaskTest (owner: Divij) *WIP* 
 # WorkerSinkTaskThreadedTest (owner: Divij) *WIP*
 # {color:#00875a}WorkerTaskTest{color} (owner: [~yash.mayya])
 # {color:#00875a}ErrorReporterTest{color} (owner: [~yash.mayya])
 # {color:#00875a}RetryWithToleranceOperatorTest{color} (owner: [~yash.mayya])
 # {color:#00875a}WorkerErrantRecordReporterTest{color} (owner: [~yash.mayya])
 # {color:#00875a}ConnectorsResourceTest{color} (owner: [~mdedetrich-aiven])
 # {color:#ff8b00}StandaloneHerderTest{color} (owner: [~mdedetrich-aiven]) 
([https://github.com/apache/kafka/pull/12728])
 # KafkaConfigBackingStoreTest (owner: [~mdedetrich-aiven])
 # {color:#00875a}KafkaOffsetBackingStoreTest{color} (owner: Christo) 
([https://github.com/apache/kafka/pull/12418])
 # {color:#FF8B00}KafkaBasedLogTest{color} (owner: [~mdedetrich-aiven])
 # RetryUtilTest (owner: [~mdedetrich-aiven] )
 # {color:#FF8B00}RepartitionTopicTest{color} (streams) (owner: Christo)
 # {color:#FF8B00}StateManagerUtilTest{color} (streams) (owner: Christo)

*The coverage report for the above tests after the change should be >= to what 
the coverage is now.*

  was:
{color:#de350b}Some of the tests below use EasyMock as well. For those migrate 
both PowerMock and EasyMock to Mockito.{color}

Unless stated in brackets the tests are in the connect module.

A list of tests which still require to be moved from PowerMock to Mockito as of 
2nd of August 2022 which do not have a Jira issue and do not have pull requests 
I am aware of which are opened:

{color:#ff8b00}InReview{color}
{color:#00875a}Merged{color}
 # {color:#ff8b00}ErrorHandlingTaskTest{color} (owner: [~shekharrajak])
 # {color:#00875a}SourceTaskOffsetCommiterTest{color} (owner: Christo)
 # {color:#00875a}WorkerMetricsGroupTest{color} (owner: Divij)
 # WorkerSinkTaskTest (owner: Divij) *WIP* 
 # WorkerSinkTaskThreadedTest (owner: Divij) *WIP*
 # {color:#00875a}WorkerTaskTest{color} (owner: [~yash.mayya])
 # {color:#00875a}ErrorReporterTest{color} (owner: [~yash.mayya])
 # {color:#00875a}RetryWithToleranceOperatorTest{color} (owner: [~yash.mayya])
 # {color:#00875a}WorkerErrantRecordReporterTest{color} (owner: [~yash.mayya])
 # {color:#00875a}ConnectorsResourceTest{color} (owner: [~mdedetrich-aiven])
 # {color:#ff8b00}StandaloneHerderTest{color} (owner: [~mdedetrich-aiven]) 
([https://github.com/apache/kafka/pull/12728])
 # KafkaConfigBackingStoreTest (owner: [~mdedetrich-aiven])
 # {color:#ff8b00}KafkaOffsetBackingStoreTest{color} (owner: Christo) 
([https://github.com/apache/kafka/pull/12418])
 # {color:#FF8B00}KafkaBasedLogTest{color} (owner: [~mdedetrich-aiven])
 # RetryUtilTest (owner: [~mdedetrich-aiven] )
 # {color:#FF8B00}RepartitionTopicTest{color} (streams) (owner: Christo)
 # {color:#FF8B00}StateManagerUtilTest{color} (streams) (owner: Christo)

*The coverage report for the above tests after the change should be >= to what 
the coverage is now.*


> Remaining PowerMock to Mockito tests
> 
>
> Key: KAFKA-14132
> URL: https://issues.apache.org/jira/browse/KAFKA-14132
> Project: Kafka
>  Issue Type: Sub-task
>Reporter: Christo Lolov
>Assignee: Christo Lolov
>Priority: Major
>
> {color:#de350b}Some of the tests below use EasyMock as well. For those 
> migrate both PowerMock and EasyMock to Mockito.{color}
> Unless stated in brackets the tests are in the connect module.
> A list of tests which still require to be moved from PowerMock to Mockito as 
> of 2nd of August 2022 which do not have a Jira issue and do not have pull 
> requests I am aware of which are opened:
> {color:#ff8b00}InReview{color}
> {color:#00875a}Merged{color}
>  # {color:#ff8b00}ErrorHandlingTaskTest{color} (owner: [~shekharrajak])
>  # {color:#00875a}SourceTaskOffsetCommiterTest{color} (owner: Christo)
>  # {color:#00875a}WorkerMetricsGroupTest{color} (owner: Divij)
>  # WorkerSinkTaskTest (owner: Divij) *WIP* 
>  # WorkerSinkTaskThreadedTest (owner: Divij) *WIP*
>  # {color:#00875a}WorkerTaskTest{color} (owner: [~yash.mayya])
>  # 

[GitHub] [kafka] dajac commented on pull request #12845: KAFKA-14367; Add `JoinGroup` to the new `GroupCoordinator` interface

2022-11-16 Thread GitBox


dajac commented on PR #12845:
URL: https://github.com/apache/kafka/pull/12845#issuecomment-1316692147

   @jeffkbkim @jolshan Thanks for your comments. I have addressed your 
feedback. Note that I have extracted the changes about the JoinGroupResponse 
version handling in a separate PR: https://github.com/apache/kafka/pull/12864. 
It is less risky this way.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] dajac commented on a diff in pull request #12845: KAFKA-14367; Add `JoinGroup` to the new `GroupCoordinator` interface

2022-11-16 Thread GitBox


dajac commented on code in PR #12845:
URL: https://github.com/apache/kafka/pull/12845#discussion_r1023749684


##
core/src/main/scala/kafka/server/KafkaApis.scala:
##
@@ -1646,73 +1649,46 @@ class KafkaApis(val requestChannel: RequestChannel,
 }
   }
 
+  private def makeGroupCoordinatorRequestContextFrom(

Review Comment:
   why do you think so? is it because of the usage of `make`? the name looks 
quite reasonable to me.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] dajac commented on a diff in pull request #12845: KAFKA-14367; Add `JoinGroup` to the new `GroupCoordinator` interface

2022-11-16 Thread GitBox


dajac commented on code in PR #12845:
URL: https://github.com/apache/kafka/pull/12845#discussion_r1023741182


##
core/src/main/scala/kafka/server/BrokerServer.scala:
##
@@ -22,9 +22,8 @@ import java.util
 import java.util.concurrent.atomic.AtomicBoolean
 import java.util.concurrent.locks.ReentrantLock
 import java.util.concurrent.{CompletableFuture, ExecutionException, TimeUnit, 
TimeoutException}
-

Review Comment:
   Nope. Reverted.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] dajac commented on a diff in pull request #12864: MINOR: Handle JoinGroupResponseData.protocolName backward compatibility in JoinGroupResponse

2022-11-16 Thread GitBox


dajac commented on code in PR #12864:
URL: https://github.com/apache/kafka/pull/12864#discussion_r1023727410


##
clients/src/main/java/org/apache/kafka/common/requests/JoinGroupResponse.java:
##
@@ -28,9 +28,15 @@ public class JoinGroupResponse extends AbstractResponse {
 
 private final JoinGroupResponseData data;
 
-public JoinGroupResponse(JoinGroupResponseData data) {
+public JoinGroupResponse(JoinGroupResponseData data, short version) {
 super(ApiKeys.JOIN_GROUP);
 this.data = data;
+
+// All versions prior to version 7 do not support nullable
+// string for the protocol name. Empty string should be used.
+if (version < 7 && data.protocolName() == null) {
+data.setProtocolName("");
+}

Review Comment:
   This new logic replaces 
https://github.com/apache/kafka/pull/12864/files#diff-cc056b4960ededba37a438b1454f0f3c5ff5e8ad5e6d2ec9a08e813ca056ffebL1655.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] dajac opened a new pull request, #12864: MINOR: Handle JoinGroupResponseData.protocolName backward compatibility in JoinGroupResponse

2022-11-16 Thread GitBox


dajac opened a new pull request, #12864:
URL: https://github.com/apache/kafka/pull/12864

   This is a small refactor extracted from 
https://github.com/apache/kafka/pull/12845. It basically moves the logic to 
handle the backward compatibility of `JoinGroupResponseData.protocolName` from 
`KafkaApis` to `JoinGroupResponse`.
   
   The patch adds a new unit test for `JoinGroupResponse` and relies on 
existing tests as well.
   
   ### Committer Checklist (excluded from commit message)
   - [ ] Verify design and implementation 
   - [ ] Verify test coverage and CI build status
   - [ ] Verify documentation (including upgrade notes)
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org