Re: [PR] KAFKA-15281: Implement the groupMetadata Consumer API [kafka]

2023-12-05 Thread via GitHub


cadonna merged PR #14879:
URL: https://github.com/apache/kafka/pull/14879


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[jira] [Assigned] (KAFKA-15751) KRaft support in BaseAdminIntegrationTest

2023-12-05 Thread Gantigmaa Selenge (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-15751?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Gantigmaa Selenge reassigned KAFKA-15751:
-

Assignee: Gantigmaa Selenge

> KRaft support in BaseAdminIntegrationTest
> -
>
> Key: KAFKA-15751
> URL: https://issues.apache.org/jira/browse/KAFKA-15751
> Project: Kafka
>  Issue Type: Task
>  Components: core
>Reporter: Sameer Tejani
>Assignee: Gantigmaa Selenge
>Priority: Minor
>  Labels: kraft, kraft-test, newbie
>
> The following tests in BaseAdminIntegrationTest in 
> core/src/test/scala/integration/kafka/api/BaseAdminIntegrationTest.scala need 
> to be updated to support KRaft
> 70 : def testCreateDeleteTopics(): Unit = {
> 163 : def testAuthorizedOperations(): Unit = {
> Scanned 259 lines. Found 0 KRaft tests out of 2 tests



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


Re: [PR] MINOR: Allow local-log segment deletion when log-start-offset incremented [kafka]

2023-12-05 Thread via GitHub


kamalcph commented on PR #14905:
URL: https://github.com/apache/kafka/pull/14905#issuecomment-1842245405

   
   > Does that mean currently, when the log start offset incremented over the 
candidate segments, we won't delete until the segments are uploaded to the 
remote storage? 
   
   The local-log segments won't be eligible for deletion and they are not 
eligible for upload to remote storage as well since the log-start-offset might 
already be moved. Once the next-set of segments gets uploaded, then the 
`highest-copied-remote-offset` will be higher than not-uploaded local-log 
segments, then they became eligible for deletion. This patch is raised to 
address edge cases where the topic becomes stale/deprecated post the 
log-start-offset update.
   
   > If so, does that mean those uploaded segments will be deleted next time 
when when enter `cleanupExpiredRemoteLogSegments` because of 
`isSegmentBreachByLogStartOffset`?
   
   Yes, correct. The uploaded remote segments will be cleaned up by 
`cleanupExpiredRemoteLogSegments` due to `isSegmentBreachByLogStartOffset`.
   
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-15971: Re-enable consumer integration tests for new consumer [kafka]

2023-12-05 Thread via GitHub


dajac commented on PR #14925:
URL: https://github.com/apache/kafka/pull/14925#issuecomment-1842219297

   We've got two clean builds. I just re-scheduled a new one.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[jira] [Assigned] (KAFKA-15911) KRaft quorum leader should make sure the follower fetch is making progress

2023-12-05 Thread Luke Chen (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-15911?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Luke Chen reassigned KAFKA-15911:
-

Assignee: Luke Chen

> KRaft quorum leader should make sure the follower fetch is making progress
> --
>
> Key: KAFKA-15911
> URL: https://issues.apache.org/jira/browse/KAFKA-15911
> Project: Kafka
>  Issue Type: Bug
>  Components: kraft
>Affects Versions: 3.7.0
>Reporter: Luke Chen
>Assignee: Luke Chen
>Priority: Major
>
> Just because the leader returned a successful response to FETCH and 
> FETCH_SNAPSHOT doesn't mean that the followers were able to handle the 
> response correctly.
> For example, imagine the case where the log end offset (LEO) is at 1000 and 
> all of the followers are continuously fetching at offset 0 without ever 
> increasing their fetch offset. This can happen if the followers encounter an 
> error when processing the FETCH or FETCH_SNAPSHOT response.
> In this scenario the leader will never be able to increase the HWM. I think 
> that this scenario is specific to KRaft and doesn't exists in Raft because 
> KRaft is pull vs Raft which is push.
> https://github.com/apache/kafka/pull/14428#pullrequestreview-1751408695



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


Re: [PR] KAFKA-14585: Move StorageTool to tools [kafka]

2023-12-05 Thread via GitHub


showuon commented on PR #14847:
URL: https://github.com/apache/kafka/pull/14847#issuecomment-1842216032

   > The tool calls KafkaConfig.validateValues which runs the full set of 
configuration validations. 
   
   @fvaleri , sorry, I didn't see where we invoke `KafkaConfig.validateValues` 
in StorageTool. Could you guide me where it is? Thanks.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-15696: Refactor AsyncConsumer close procedure [kafka]

2023-12-05 Thread via GitHub


philipnee commented on code in PR #14920:
URL: https://github.com/apache/kafka/pull/14920#discussion_r1416732684


##
clients/src/main/java/org/apache/kafka/clients/consumer/internals/AsyncKafkaConsumer.java:
##
@@ -957,6 +966,57 @@ private void close(Duration timeout, boolean 
swallowException) {
 }
 }
 
+/**
+ * Prior to closing the network thread, we need to make sure the following 
operations happen in the right sequence:
+ * 1. autocommit offsets
+ * 2. revoke all partitions
+ */
+private void prepareShutdown(final Timer timer) {
+if (!groupMetadata.isPresent())
+return;
+
+maybeAutoCommitSync(timer);
+timer.update();
+if (!subscriptions.hasAutoAssignedPartitions() || 
subscriptions.assignedPartitions().isEmpty())
+return;
+
+try {
+// If the consumer is in a group, we will pause and revoke all 
assigned partitions
+onLeavePrepare().get(timer.remainingMs(), TimeUnit.MILLISECONDS);
+timer.update();
+} catch (Exception e) {
+Exception exception = e;
+if (e instanceof ExecutionException)
+exception = (Exception) e.getCause();
+throw new KafkaException("User rebalance callback throws an 
error", exception);
+} finally {
+subscriptions.assignFromSubscribed(Collections.emptySet());
+}
+}
+
+private void maybeAutoCommitSync(final Timer timer) {
+if (autoCommitEnabled) {
+Map allConsumed = 
subscriptions.allConsumed();
+try {
+log.debug("Sending synchronous auto-commit of offsets {} on 
closing", allConsumed);
+commitSync(allConsumed, 
Duration.ofMillis(timer.remainingMs()));
+} catch (Exception e) {
+// consistent with async auto-commit failures, we do not 
propagate the exception
+log.warn("Synchronous auto-commit of offsets {} failed: {}", 
allConsumed, e.getMessage());
+}
+}
+}
+
+private CompletableFuture onLeavePrepare() {
+SortedSet droppedPartitions = new 
TreeSet<>(MembershipManagerImpl.TOPIC_PARTITION_COMPARATOR);
+droppedPartitions.addAll(subscriptions.assignedPartitions());
+if (!subscriptions.hasAutoAssignedPartitions() || 
droppedPartitions.isEmpty()) {
+return CompletableFuture.completedFuture(null);
+}
+// TODO: Invoke rebalanceListener via KAFKA-15276
+return CompletableFuture.completedFuture(null);
+}
+

Review Comment:
   Thanks.  I mostly agree with your idea. Though - I think simply firing the 
callback revocation from the close() should be enough - but I think sending 
leave-group and closing events as you suggested is a good idea.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-15818: ensure leave group on max poll interval [kafka]

2023-12-05 Thread via GitHub


philipnee commented on PR #14873:
URL: https://github.com/apache/kafka/pull/14873#issuecomment-1842164667

   Hi @lianetm - Thanks for the time reviewing the PR. Instead of using boolean 
flags, I added a _STALED_ state to address your concern.  I think we do need 
this state to stay consistent with the state logic, i.e., I agree that 
unsubscribed is probably not the right state to be in as it hints to the user 
that it needs to subscribe.  Let me know what do you think.
   
   For the static member - I think the right thing to do is to let the 
membershipManager to decide the right epoch to send.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-15022: introduce interface to control graph constructor [kafka]

2023-12-05 Thread via GitHub


mjsax merged PR #14714:
URL: https://github.com/apache/kafka/pull/14714


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-15022: introduce interface to control graph constructor [kafka]

2023-12-05 Thread via GitHub


mjsax commented on PR #14714:
URL: https://github.com/apache/kafka/pull/14714#issuecomment-1842147167

   Jenkins issues:
   - build 7: `JDK 11 and Scala 2.13` failed
   - build 6: `JDK 8 and Scala 2.12` failed
   
   Test failures seems to be unrelated to this PR. Merging.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-15684: Support to describe all client metrics resources (KIP-714) [kafka]

2023-12-05 Thread via GitHub


apoorvmittal10 commented on PR #14933:
URL: https://github.com/apache/kafka/pull/14933#issuecomment-1842077768

   @junrao Thanks for reviewing the PR, I have addressed the comments.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-15684: Support to describe all client metrics resources (KIP-714) [kafka]

2023-12-05 Thread via GitHub


apoorvmittal10 commented on code in PR #14933:
URL: https://github.com/apache/kafka/pull/14933#discussion_r1416635870


##
core/src/test/scala/unit/kafka/server/KafkaApisTest.scala:
##
@@ -6922,4 +6923,58 @@ class KafkaApisTest {
 val expectedResponse = new 
PushTelemetryResponseData().setErrorCode(Errors.INVALID_REQUEST.code)
 assertEquals(expectedResponse, response.data)
   }
+
+  @Test
+  def testListClientMetricsResourcesNotAllowedForZkClusters(): Unit = {
+val request = buildRequest(new 
ListClientMetricsResourcesRequest.Builder(new 
ListClientMetricsResourcesRequestData()).build())
+createKafkaApis(enableForwarding = true).handle(request, 
RequestLocal.NoCaching)
+
+val response = 
verifyNoThrottling[ListClientMetricsResourcesResponse](request)
+assertEquals(Errors.UNKNOWN_SERVER_ERROR, 
Errors.forCode(response.data.errorCode))

Review Comment:
   This test covers when KafkaApis is configured in ZK mode and gets rejected 
by common `handle` method which throws `new IllegalStateException` which 
translates to `UNKNOWN_SERVER_ERROR`.
   
   Then it also makes sense that no handling is needed in 
`handleListClientMetricsResources`, as method should always have 
`clientMetricsManager` if call reaches to that method. But I completed the 
request logically there if `clientMetricsManager` is None to avoid any error 
case.
   
   I have added a comment in the code as well.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-15684: Support to describe all client metrics resources (KIP-714) [kafka]

2023-12-05 Thread via GitHub


apoorvmittal10 commented on code in PR #14933:
URL: https://github.com/apache/kafka/pull/14933#discussion_r1416644082


##
core/src/main/scala/kafka/server/KafkaApis.scala:
##
@@ -3810,16 +3811,27 @@ class KafkaApis(val requestChannel: RequestChannel,
 }
   }
 
-  // Just a placeholder for now.
   def handleListClientMetricsResources(request: RequestChannel.Request): Unit 
= {
 val listClientMetricsResourcesRequest = 
request.body[ListClientMetricsResourcesRequest]
 
 if (!authHelper.authorize(request.context, DESCRIBE_CONFIGS, CLUSTER, 
CLUSTER_NAME)) {
   requestHelper.sendMaybeThrottle(request, 
listClientMetricsResourcesRequest.getErrorResponse(Errors.CLUSTER_AUTHORIZATION_FAILED.exception))
 } else {
-  // Just return an empty list in the placeholder
-  val data = new ListClientMetricsResourcesResponseData()
-  requestHelper.sendMaybeThrottle(request, new 
ListClientMetricsResourcesResponse(data))
+  clientMetricsManager match {
+case Some(metricsManager) =>
+  try {
+val data = new 
ListClientMetricsResourcesResponseData().setClientMetricsResources(
+  metricsManager.listClientMetricsResources.asScala.map(
+name => new 
ClientMetricsResource().setName(name)).toList.asJava)
+requestHelper.sendMaybeThrottle(request, new 
ListClientMetricsResourcesResponse(data))
+  } catch {
+case _: Exception =>

Review Comment:
   Done.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-15684: Support to describe all client metrics resources (KIP-714) [kafka]

2023-12-05 Thread via GitHub


apoorvmittal10 commented on code in PR #14933:
URL: https://github.com/apache/kafka/pull/14933#discussion_r1416642552


##
core/src/main/scala/kafka/server/KafkaApis.scala:
##
@@ -3810,16 +3811,27 @@ class KafkaApis(val requestChannel: RequestChannel,
 }
   }
 
-  // Just a placeholder for now.
   def handleListClientMetricsResources(request: RequestChannel.Request): Unit 
= {
 val listClientMetricsResourcesRequest = 
request.body[ListClientMetricsResourcesRequest]
 
 if (!authHelper.authorize(request.context, DESCRIBE_CONFIGS, CLUSTER, 
CLUSTER_NAME)) {
   requestHelper.sendMaybeThrottle(request, 
listClientMetricsResourcesRequest.getErrorResponse(Errors.CLUSTER_AUTHORIZATION_FAILED.exception))
 } else {
-  // Just return an empty list in the placeholder
-  val data = new ListClientMetricsResourcesResponseData()
-  requestHelper.sendMaybeThrottle(request, new 
ListClientMetricsResourcesResponse(data))
+  clientMetricsManager match {
+case Some(metricsManager) =>
+  try {
+val data = new 
ListClientMetricsResourcesResponseData().setClientMetricsResources(
+  metricsManager.listClientMetricsResources.asScala.map(
+name => new 
ClientMetricsResource().setName(name)).toList.asJava)
+requestHelper.sendMaybeThrottle(request, new 
ListClientMetricsResourcesResponse(data))
+  } catch {
+case _: Exception =>

Review Comment:
   Yeah, this is not needed in this API. I have removed it and validated in 
test that API returns `UNKNOWN_SERVER_ERROR` now. 



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-15684: Support to describe all client metrics resources (KIP-714) [kafka]

2023-12-05 Thread via GitHub


apoorvmittal10 commented on code in PR #14933:
URL: https://github.com/apache/kafka/pull/14933#discussion_r1416635870


##
core/src/test/scala/unit/kafka/server/KafkaApisTest.scala:
##
@@ -6922,4 +6923,58 @@ class KafkaApisTest {
 val expectedResponse = new 
PushTelemetryResponseData().setErrorCode(Errors.INVALID_REQUEST.code)
 assertEquals(expectedResponse, response.data)
   }
+
+  @Test
+  def testListClientMetricsResourcesNotAllowedForZkClusters(): Unit = {
+val request = buildRequest(new 
ListClientMetricsResourcesRequest.Builder(new 
ListClientMetricsResourcesRequestData()).build())
+createKafkaApis(enableForwarding = true).handle(request, 
RequestLocal.NoCaching)
+
+val response = 
verifyNoThrottling[ListClientMetricsResourcesResponse](request)
+assertEquals(Errors.UNKNOWN_SERVER_ERROR, 
Errors.forCode(response.data.errorCode))

Review Comment:
   This test covers when KafkaApis is configured in ZK mode and gets rejected 
by common `handle` method which throws `new IllegalStateException` which 
translates to `UNKNOWN_SERVER_ERROR`.
   
   Then it also makes sense that no handling is needed in 
`handleListClientMetricsResources`, as method should always have 
`clientMetricsManager` if call reaches to that method. But I completed the 
request logically there if `clientMetricsManager` is None to avoid any error 
case.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-15684: Support to describe all client metrics resources (KIP-714) [kafka]

2023-12-05 Thread via GitHub


apoorvmittal10 commented on code in PR #14933:
URL: https://github.com/apache/kafka/pull/14933#discussion_r1416631582


##
core/src/test/scala/unit/kafka/admin/ConfigCommandTest.scala:
##
@@ -1690,13 +1690,33 @@ class ConfigCommandTest extends Logging {
   }
 
   @Test
-  def shouldNotDescribeClientMetricsConfigWithoutEntityName(): Unit = {
+  def shouldDescribeClientMetricsConfigWithoutEntityName(): Unit = {
 val describeOpts = new ConfigCommandOptions(Array("--bootstrap-server", 
"localhost:9092",
   "--entity-type", "client-metrics",
   "--describe"))
 
-val exception = assertThrows(classOf[IllegalArgumentException], () => 
describeOpts.checkArgs())
-assertEquals("an entity name must be specified with --describe of 
client-metrics", exception.getMessage)
+val resourceCustom = new 
ConfigResource(ConfigResource.Type.CLIENT_METRICS, "1")
+val configEntry = new ConfigEntry("metrics", "*")
+val future = new KafkaFutureImpl[util.Map[ConfigResource, Config]]
+val describeResult: DescribeConfigsResult = 
mock(classOf[DescribeConfigsResult])
+when(describeResult.all()).thenReturn(future)
+
+val node = new Node(1, "localhost", 9092)
+val mockAdminClient = new 
MockAdminClient(util.Collections.singletonList(node), node) {
+  override def describeConfigs(resources: util.Collection[ConfigResource], 
options: DescribeConfigsOptions): DescribeConfigsResult = {

Review Comment:
   When the entity-name is not provided then first all names are fetched by 
calling `listClientMetricsResources` and then later `describeConfigs` gets 
called on every entity to get the details. I have implemented 
`listClientMetricsResources` in `MockAdminClient` and calls 
`mockAdminClient.incrementalAlterConfigs` to set the client metrics resource in 
`MockAdminClient`.
   
   The test calls `listClientMetricsResources` from ConfigCommand.scala and 
result of which is passed in `describeConfigs` for validation. Later I also 
verify if `describeConfigs` was called.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-15018: Failing offset flush for EOS when secondary offset store writes fails for tombstone records [kafka]

2023-12-05 Thread via GitHub


C0urante commented on PR #13801:
URL: https://github.com/apache/kafka/pull/13801#issuecomment-1842053883

   Removed the stale label.
   
   @vamossagar12 thanks for your patience on this one, I plan on making another 
pass this week!


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-14598: Fix flaky ConnectRestApiTest [kafka]

2023-12-05 Thread via GitHub


ashwinpankaj closed pull request #13084: KAFKA-14598: Fix flaky 
ConnectRestApiTest
URL: https://github.com/apache/kafka/pull/13084


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-15818: ensure leave group on max poll interval [kafka]

2023-12-05 Thread via GitHub


lianetm commented on PR #14873:
URL: https://github.com/apache/kafka/pull/14873#issuecomment-1842032321

   Thanks for the changes @philipnee, left answers for the static membership 
and some comments.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-15818: ensure leave group on max poll interval [kafka]

2023-12-05 Thread via GitHub


lianetm commented on code in PR #14873:
URL: https://github.com/apache/kafka/pull/14873#discussion_r1416614219


##
clients/src/test/java/org/apache/kafka/clients/consumer/internals/HeartbeatRequestManagerTest.java:
##
@@ -399,14 +400,45 @@ public void testHeartbeatState() {
 new ConsumerGroupHeartbeatResponseData.Assignment();
 
assignmentTopic1.setTopicPartitions(Collections.singletonList(tpTopic1));
 ConsumerGroupHeartbeatResponse rs1 = new 
ConsumerGroupHeartbeatResponse(new ConsumerGroupHeartbeatResponseData()
-.setHeartbeatIntervalMs(DEFAULT_HEARTBEAT_INTERVAL_MS)
-.setMemberId(memberId)
-.setMemberEpoch(1)
-.setAssignment(assignmentTopic1));
+.setHeartbeatIntervalMs(DEFAULT_HEARTBEAT_INTERVAL_MS)
+.setMemberId(memberId)
+.setMemberEpoch(1)
+.setAssignment(assignmentTopic1));
 membershipManager.onHeartbeatResponseReceived(rs1.data());
 assertEquals(MemberState.RECONCILING, membershipManager.state());
 }
 
+@Test
+public void testEnsureLeaveGroupWhenPollTimerExpires() {
+membershipManager.transitionToJoining();

Review Comment:
   I know we've been all writing this kind of unit tests that go into the 
integration test land, but if we want to start correcting/simplifying that, 
this unit test could be simplified. As it is for the HB manager, it should just 
`verify` the call to the membershipManager func when the timer expires (and 
then another test in the MembershipMgr for that onStale func, that is needed 
anyways I would say)



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-15018: Failing offset flush for EOS when secondary offset store writes fails for tombstone records [kafka]

2023-12-05 Thread via GitHub


github-actions[bot] commented on PR #13801:
URL: https://github.com/apache/kafka/pull/13801#issuecomment-1842029141

   This PR is being marked as stale since it has not had any activity in 90 
days. If you would like to keep this PR alive, please ask a committer for 
review. If the PR has  merge conflicts, please update it with the latest from 
trunk (or appropriate release branch)  If this PR is no longer valid or 
desired, please feel free to close it. If no activity occurs in the next 30 
days, it will be automatically closed.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-15119:Support incremental syncTopicAcls in MirrorSourceConnector [kafka]

2023-12-05 Thread via GitHub


github-actions[bot] commented on PR #13913:
URL: https://github.com/apache/kafka/pull/13913#issuecomment-1842029086

   This PR is being marked as stale since it has not had any activity in 90 
days. If you would like to keep this PR alive, please ask a committer for 
review. If the PR has  merge conflicts, please update it with the latest from 
trunk (or appropriate release branch)  If this PR is no longer valid or 
desired, please feel free to close it. If no activity occurs in the next 30 
days, it will be automatically closed.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-15296: Allow offsets to be committed for filtered records when Exactly Once support is disabled [kafka]

2023-12-05 Thread via GitHub


github-actions[bot] commented on PR #14158:
URL: https://github.com/apache/kafka/pull/14158#issuecomment-1842028971

   This PR is being marked as stale since it has not had any activity in 90 
days. If you would like to keep this PR alive, please ask a committer for 
review. If the PR has  merge conflicts, please update it with the latest from 
trunk (or appropriate release branch)  If this PR is no longer valid or 
desired, please feel free to close it. If no activity occurs in the next 30 
days, it will be automatically closed.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] IncrementalCooperativeAssignor#handleLostAssignments invokes logic for lost Assignments even when there are no Lost assignments [kafka]

2023-12-05 Thread via GitHub


github-actions[bot] commented on PR #14000:
URL: https://github.com/apache/kafka/pull/14000#issuecomment-1842029030

   This PR is being marked as stale since it has not had any activity in 90 
days. If you would like to keep this PR alive, please ask a committer for 
review. If the PR has  merge conflicts, please update it with the latest from 
trunk (or appropriate release branch)  If this PR is no longer valid or 
desired, please feel free to close it. If no activity occurs in the next 30 
days, it will be automatically closed.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] MINOR: Add missing versions to STs [kafka]

2023-12-05 Thread via GitHub


github-actions[bot] commented on PR #14334:
URL: https://github.com/apache/kafka/pull/14334#issuecomment-1842028892

   This PR is being marked as stale since it has not had any activity in 90 
days. If you would like to keep this PR alive, please ask a committer for 
review. If the PR has  merge conflicts, please update it with the latest from 
trunk (or appropriate release branch)  If this PR is no longer valid or 
desired, please feel free to close it. If no activity occurs in the next 30 
days, it will be automatically closed.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] [MINOR] fix: Align LogContext setup for core components [kafka]

2023-12-05 Thread via GitHub


github-actions[bot] commented on PR #14348:
URL: https://github.com/apache/kafka/pull/14348#issuecomment-1842028870

   This PR is being marked as stale since it has not had any activity in 90 
days. If you would like to keep this PR alive, please ask a committer for 
review. If the PR has  merge conflicts, please update it with the latest from 
trunk (or appropriate release branch)  If this PR is no longer valid or 
desired, please feel free to close it. If no activity occurs in the next 30 
days, it will be automatically closed.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAKFA-15629: Update upgrade-guide.html for TimestampedKeyQuery and TimestampedRangeQuery [kafka]

2023-12-05 Thread via GitHub


hanyuzheng7 commented on PR #14898:
URL: https://github.com/apache/kafka/pull/14898#issuecomment-1842028444

   > This PR shows merge conflict. Can you rebase it on `trunk`?
   
   ok
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-15818: ensure leave group on max poll interval [kafka]

2023-12-05 Thread via GitHub


lianetm commented on code in PR #14873:
URL: https://github.com/apache/kafka/pull/14873#discussion_r1416609139


##
clients/src/test/java/org/apache/kafka/clients/consumer/internals/HeartbeatRequestManagerTest.java:
##
@@ -399,14 +400,51 @@ public void testHeartbeatState() {
 new ConsumerGroupHeartbeatResponseData.Assignment();
 
assignmentTopic1.setTopicPartitions(Collections.singletonList(tpTopic1));
 ConsumerGroupHeartbeatResponse rs1 = new 
ConsumerGroupHeartbeatResponse(new ConsumerGroupHeartbeatResponseData()
-.setHeartbeatIntervalMs(DEFAULT_HEARTBEAT_INTERVAL_MS)
-.setMemberId(memberId)
-.setMemberEpoch(1)
-.setAssignment(assignmentTopic1));
+.setHeartbeatIntervalMs(DEFAULT_HEARTBEAT_INTERVAL_MS)
+.setMemberId(memberId)
+.setMemberEpoch(1)
+.setAssignment(assignmentTopic1));
 membershipManager.onHeartbeatResponseReceived(rs1.data());
 assertEquals(MemberState.RECONCILING, membershipManager.state());
 }
 
+@Test
+public void testEnsureLeaveGroupWhenPollTimerExpires() {
+membershipManager.transitionToJoining();
+time.sleep(1);
+// Sending first heartbeat and transitioning to stable
+assertHeartbeat(heartbeatRequestManager);
+assertFalse(heartbeatRequestManager.pollTimer().isExpired());
+// Expires the poll timer, ensure sending a leave group
+time.sleep(DEFAULT_MAX_POLL_INTERVAL_MS);
+assertLeaveGroup(heartbeatRequestManager);
+assertTrue(heartbeatRequestManager.pollTimer().isExpired());
+// Poll again, ensure we heartbeat again.
+time.sleep(1);
+heartbeatRequestManager.resetPollTimer();
+assertHeartbeat(heartbeatRequestManager);
+assertFalse(heartbeatRequestManager.pollTimer().isExpired());
+}
+
+private void assertHeartbeat(HeartbeatRequestManager hrm) {
+System.out.println("assertHeartbeat");
+NetworkClientDelegate.PollResult pollResult = 
hrm.poll(time.milliseconds());
+assertEquals(1, pollResult.unsentRequests.size());
+assertEquals(DEFAULT_HEARTBEAT_INTERVAL_MS, 
pollResult.timeUntilNextPollMs);
+
pollResult.unsentRequests.get(0).handler().onComplete(createHeartbeatResponse(pollResult.unsentRequests.get(0),
+Errors.NONE));
+}
+
+private void assertLeaveGroup(HeartbeatRequestManager hrm) {
+NetworkClientDelegate.PollResult pollResult = 
hrm.poll(time.milliseconds());
+assertEquals(1, pollResult.unsentRequests.size());
+ConsumerGroupHeartbeatRequestData data = 
(ConsumerGroupHeartbeatRequestData) 
pollResult.unsentRequests.get(0).requestBuilder().build().data();
+assertEquals(ConsumerGroupHeartbeatRequest.LEAVE_GROUP_MEMBER_EPOCH, 
data.memberEpoch());
+assertEquals(MemberState.UNSUBSCRIBED, membershipManager.state());

Review Comment:
   Is this really the right end state when the poll timer expires? This means 
that the user will have to call `subscribe` to join the group again. Is that 
what the old coordinator requires? (I expected that the old code just requires 
the consumer to be polled again to join the group. If my understanding is right 
then we're missing logic after sending the last HB to leave: on timer 
expiration we should send the last HB and transitionToJoining so that the 
member re-joins the group on the next poll) 



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-15818: ensure leave group on max poll interval [kafka]

2023-12-05 Thread via GitHub


lianetm commented on code in PR #14873:
URL: https://github.com/apache/kafka/pull/14873#discussion_r1416605360


##
clients/src/main/java/org/apache/kafka/clients/consumer/internals/HeartbeatRequestManager.java:
##
@@ -103,6 +103,12 @@ public class HeartbeatRequestManager implements 
RequestManager {
  */
 private final BackgroundEventHandler backgroundEventHandler;
 
+/**
+ * Timer for tracking the time since the last consumer poll.  If the timer 
expires, the consumer will stop
+ * sending heartbeat until the next poll.
+ */

Review Comment:
   My take is that we should always send the leave group when the poll timer 
expires (group.instance.id null or not). Dynamic members should send -1 and 
static -2 as described in the KIP. The [documentation of the 
max.poll.interval](https://kafka.apache.org/documentation/#consumerconfigs_max.poll.interval.ms)
 is just describing what will happen on the broker side (if static member 
leaving, broker won't re-assign the partitions until the session timeout 
expires. If dynamic member, it will re-assign them right away). 



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-15932: Wait for responses in consumer operations [kafka]

2023-12-05 Thread via GitHub


lianetm commented on PR #14912:
URL: https://github.com/apache/kafka/pull/14912#issuecomment-1842006427

   Thanks for the changes @AndrewJSchofield. Took a pass of it all and it looks 
good to me, just a minor comment above. 
   
   Helpful for all to keep this fix in mind for reviewing other async 
operations that may not be waiting for responses before continuously enqueuing 
requests.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-15932: Wait for responses in consumer operations [kafka]

2023-12-05 Thread via GitHub


lianetm commented on code in PR #14912:
URL: https://github.com/apache/kafka/pull/14912#discussion_r1416580989


##
clients/src/main/java/org/apache/kafka/clients/consumer/internals/OffsetsRequestManager.java:
##
@@ -388,11 +385,13 @@ private CompletableFuture 
buildListOffsetRequestToNode(
  * {@link 
org.apache.kafka.clients.consumer.internals.NetworkClientDelegate.UnsentRequest}
  * that can be polled to obtain the corresponding timestamps and offsets.
  */
-private List 
buildListOffsetsRequestsAndResetPositions(
+private CompletableFuture sendListOffsetsRequestsAndResetPositions(

Review Comment:
   Let's update the description about the return value that changed



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[PR] Kafka 15662 kip 714 restore [kafka]

2023-12-05 Thread via GitHub


mjsax opened a new pull request, #14936:
URL: https://github.com/apache/kafka/pull/14936

   This PR is stacked on https://github.com/apache/kafka/pull/14922
   
   Only second commit needs a review
   
   +++
   Part of KIP-714.
   
   Add support to collect client instance id of the restore consumer.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-15866:Refactor OffsetFetchRequestState Error handling [kafka]

2023-12-05 Thread via GitHub


DL1231 commented on PR #14923:
URL: https://github.com/apache/kafka/pull/14923#issuecomment-1841980081

   @philipnee PTAL


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-15720: KRaft support in DeleteTopicTest [kafka]

2023-12-05 Thread via GitHub


dengziming commented on PR #14846:
URL: https://github.com/apache/kafka/pull/14846#issuecomment-1841979753

   Hello @tinaselenge , there are some flaky related to this, we should take 
some time to investigate it, and we should trigger the CI more times to ensure 
less flaky.
   
   
https://ci-builds.apache.org/job/Kafka/job/kafka-pr/job/PR-14846/2//#showFailuresLink
   
   [Build / JDK 17 and Scala 2.13 / 
kafka.admin.DeleteTopicTest.testIncreasePartitionCountDuringDeleteTopic()](https://ci-builds.apache.org/job/Kafka/job/kafka-pr/job/PR-14846/2//testReport/junit/kafka.admin/DeleteTopicTest/Build___JDK_17_and_Scala_2_13___testIncreasePartitionCountDuringDeleteTopic__/)
   [Build / JDK 17 and Scala 2.13 / 
kafka.admin.DeleteTopicTest.testDisableDeleteTopic(String).quorum=kraft](https://ci-builds.apache.org/job/Kafka/job/kafka-pr/job/PR-14846/2//testReport/junit/kafka.admin/DeleteTopicTest/Build___JDK_17_and_Scala_2_13___testDisableDeleteTopic_String__quorum_kraft/)
   [Build / JDK 17 and Scala 2.13 / 
kafka.admin.DeleteTopicTest.executionError](https://ci-builds.apache.org/job/Kafka/job/kafka-pr/job/PR-14846/2//testReport/junit/kafka.admin/DeleteTopicTest/Build___JDK_17_and_Scala_2_13___executionError/)


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-14132; Replace EasyMock with Mockito in StandaloneHerderTest [kafka]

2023-12-05 Thread via GitHub


mdedetrich commented on PR #12728:
URL: https://github.com/apache/kafka/pull/12728#issuecomment-1841940640

   @C0urante @ijuma I have addressed the final comments, let me know if I have 
missed anything


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-14132; Replace EasyMock with Mockito in StandaloneHerderTest [kafka]

2023-12-05 Thread via GitHub


mdedetrich commented on code in PR #12728:
URL: https://github.com/apache/kafka/pull/12728#discussion_r1416539486


##
connect/runtime/src/test/java/org/apache/kafka/connect/runtime/standalone/StandaloneHerderTest.java:
##
@@ -908,75 +799,58 @@ public void testCorruptConfig() throws Throwable {
 config.put(ConnectorConfig.NAME_CONFIG, CONNECTOR_NAME);
 config.put(ConnectorConfig.CONNECTOR_CLASS_CONFIG, 
BogusSinkConnector.class.getName());
 config.put(SinkConnectorConfig.TOPICS_CONFIG, TOPICS_LIST_STR);
-Connector connectorMock = PowerMock.createMock(SinkConnector.class);
+Connector connectorMock = mock(SinkConnector.class);
 String error = "This is an error in your config!";
 List errors = new ArrayList<>(singletonList(error));
 String key = "foo.invalid.key";
-EasyMock.expect(connectorMock.validate(config)).andReturn(
+when(connectorMock.validate(config)).thenReturn(
 new Config(
 Arrays.asList(new ConfigValue(key, null, 
Collections.emptyList(), errors))
 )
 );
 ConfigDef configDef = new ConfigDef();
 configDef.define(key, ConfigDef.Type.STRING, 
ConfigDef.Importance.HIGH, "");
-
EasyMock.expect(worker.configTransformer()).andReturn(transformer).times(2);
-final Capture> configCapture = 
EasyMock.newCapture();
-
EasyMock.expect(transformer.transform(EasyMock.capture(configCapture))).andAnswer(configCapture::getValue);
-EasyMock.expect(worker.getPlugins()).andReturn(plugins).times(3);
-
EasyMock.expect(plugins.connectorLoader(EasyMock.anyString())).andReturn(pluginLoader);
-
EasyMock.expect(plugins.withClassLoader(pluginLoader)).andReturn(loaderSwap);
-EasyMock.expect(worker.getPlugins()).andStubReturn(plugins);
-
EasyMock.expect(plugins.newConnector(EasyMock.anyString())).andReturn(connectorMock);
-EasyMock.expect(connectorMock.config()).andStubReturn(configDef);
-loaderSwap.close();

Review Comment:
   Committed and pushed



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-15760: Disable flaky test testTaskRequestWithOldStartMsGetsUpdated [kafka]

2023-12-05 Thread via GitHub


gharris1727 commented on PR #14917:
URL: https://github.com/apache/kafka/pull/14917#issuecomment-1841939038

   Okay i'll let it keep running, but it appears that the 4-minute timeout has 
an ~85% fail rate at 20% CPU, and ~0% fail rate at 30% CPU.
   
   The 2-minute timeout has a 100% fail rate at 20% and 30% CPU, and a 50% fail 
rate at 40% CPU. The additional timeout is helping, but stil isn't tolerant of 
low CPU percentages like the BeforeAll strategy.
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-14132; Replace EasyMock with Mockito in StandaloneHerderTest [kafka]

2023-12-05 Thread via GitHub


mdedetrich commented on code in PR #12728:
URL: https://github.com/apache/kafka/pull/12728#discussion_r1416535578


##
connect/runtime/src/test/java/org/apache/kafka/connect/runtime/standalone/StandaloneHerderTest.java:
##
@@ -887,19 +783,14 @@ public void testPutConnectorConfig() throws Exception {
 
 assertEquals("bar", capturedConfig.getValue().get("foo"));
 herder.connectorConfig(CONNECTOR_NAME, connectorConfigCb);

Review Comment:
   Fixed and committed.
   
   
   
   > It'd probably be better to do away with mocked callbacks and switch to 
using a `FutureCallback` that we invoke `get` on (like we do with many other 
tests), but it's up to you if you'd like to implement that or not.
   
   I think its better to solve this as another PR/issue. This PR has already 
been ongoing for long enough and this just appears to be a nice improvement for 
me.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAKFA-15629: Update upgrade-guide.html for TimestampedKeyQuery and TimestampedRangeQuery [kafka]

2023-12-05 Thread via GitHub


mjsax commented on PR #14898:
URL: https://github.com/apache/kafka/pull/14898#issuecomment-1841932981

   This PR shows merge conflict. Can you rebase it on `trunk`?


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[PR] KAFKA-15662: Add support for clientInstanceIds in Kafka Stream [kafka]

2023-12-05 Thread via GitHub


mjsax opened a new pull request, #14935:
URL: https://github.com/apache/kafka/pull/14935

   Part of KIP-714.
   
   Add support to collect client instance id of the global consumer.
   
   
   +++
   This PR is on top of https://github.com/apache/kafka/pull/14922
   
   Only second commit needs a review.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-15629: Support ResultOrder to TimestampedRangeQuery. [kafka]

2023-12-05 Thread via GitHub


hanyuzheng7 commented on code in PR #14907:
URL: https://github.com/apache/kafka/pull/14907#discussion_r1416485410


##
streams/src/main/java/org/apache/kafka/streams/query/RangeQuery.java:
##
@@ -59,25 +58,25 @@ private RangeQuery(final Optional lower, final 
Optional upper, final boole
  * @param  The value type
  */
 public static  RangeQuery withRange(final K lower, final K 
upper) {
-return new RangeQuery<>(Optional.ofNullable(lower), 
Optional.ofNullable(upper), true);
+return new RangeQuery<>(Optional.ofNullable(lower), 
Optional.ofNullable(upper), ResultOrder.ANY);
 }
 
 /**
- * Determines if the serialized byte[] of the keys in ascending order.
+ * Determines if the serialized byte[] of the keys in ascending or any 
order.
  * Order is based on the serialized byte[] of the keys, not the 'logical' 
key order.
- * @return true if ascending, false otherwise.
+ * @return return the order of returned records based on the serialized 
byte[] of the keys(can be unordered, or in ascending or in descending order).

Review Comment:
   But I didn't find where miss the space



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-15629: Support ResultOrder to TimestampedRangeQuery. [kafka]

2023-12-05 Thread via GitHub


hanyuzheng7 commented on code in PR #14907:
URL: https://github.com/apache/kafka/pull/14907#discussion_r1416484369


##
streams/src/main/java/org/apache/kafka/streams/query/RangeQuery.java:
##
@@ -59,25 +58,25 @@ private RangeQuery(final Optional lower, final 
Optional upper, final boole
  * @param  The value type
  */
 public static  RangeQuery withRange(final K lower, final K 
upper) {
-return new RangeQuery<>(Optional.ofNullable(lower), 
Optional.ofNullable(upper), true);
+return new RangeQuery<>(Optional.ofNullable(lower), 
Optional.ofNullable(upper), ResultOrder.ANY);
 }
 
 /**
- * Determines if the serialized byte[] of the keys in ascending order.
+ * Determines if the serialized byte[] of the keys in ascending or any 
order.
  * Order is based on the serialized byte[] of the keys, not the 'logical' 
key order.
- * @return true if ascending, false otherwise.
+ * @return return the order of returned records based on the serialized 
byte[] of the keys(can be unordered, or in ascending or in descending order).
  */
-public boolean isKeyAscending() {
-return isKeyAscending;
+public ResultOrder resultOrder() {
+return order;
 }
 
 /**
  * Set the query to return the serialized byte[] of the keys in descending 
order.
  * Order is based on the serialized byte[] of the keys, not the 'logical' 
key order.
- * @return a new RangeQuery instance with descending flag set.
+ * @return a new RangeQuery instance with DESCENDING set.
  */
 public RangeQuery withDescendingKeys() {

Review Comment:
   ok



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-15629: Support ResultOrder to TimestampedRangeQuery. [kafka]

2023-12-05 Thread via GitHub


hanyuzheng7 commented on code in PR #14907:
URL: https://github.com/apache/kafka/pull/14907#discussion_r1416484212


##
streams/src/main/java/org/apache/kafka/streams/query/RangeQuery.java:
##
@@ -59,25 +58,25 @@ private RangeQuery(final Optional lower, final 
Optional upper, final boole
  * @param  The value type
  */
 public static  RangeQuery withRange(final K lower, final K 
upper) {
-return new RangeQuery<>(Optional.ofNullable(lower), 
Optional.ofNullable(upper), true);
+return new RangeQuery<>(Optional.ofNullable(lower), 
Optional.ofNullable(upper), ResultOrder.ANY);
 }
 
 /**
- * Determines if the serialized byte[] of the keys in ascending order.
+ * Determines if the serialized byte[] of the keys in ascending or any 
order.
  * Order is based on the serialized byte[] of the keys, not the 'logical' 
key order.
- * @return true if ascending, false otherwise.
+ * @return return the order of returned records based on the serialized 
byte[] of the keys(can be unordered, or in ascending or in descending order).
  */
-public boolean isKeyAscending() {
-return isKeyAscending;
+public ResultOrder resultOrder() {
+return order;
 }
 
 /**
  * Set the query to return the serialized byte[] of the keys in descending 
order.
  * Order is based on the serialized byte[] of the keys, not the 'logical' 
key order.
- * @return a new RangeQuery instance with descending flag set.
+ * @return a new RangeQuery instance with DESCENDING set.

Review Comment:
   ok



##
streams/src/main/java/org/apache/kafka/streams/query/RangeQuery.java:
##
@@ -59,25 +58,25 @@ private RangeQuery(final Optional lower, final 
Optional upper, final boole
  * @param  The value type
  */
 public static  RangeQuery withRange(final K lower, final K 
upper) {
-return new RangeQuery<>(Optional.ofNullable(lower), 
Optional.ofNullable(upper), true);
+return new RangeQuery<>(Optional.ofNullable(lower), 
Optional.ofNullable(upper), ResultOrder.ANY);
 }
 
 /**
- * Determines if the serialized byte[] of the keys in ascending order.
+ * Determines if the serialized byte[] of the keys in ascending or any 
order.
  * Order is based on the serialized byte[] of the keys, not the 'logical' 
key order.
- * @return true if ascending, false otherwise.
+ * @return return the order of returned records based on the serialized 
byte[] of the keys(can be unordered, or in ascending or in descending order).

Review Comment:
   ok



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-15629: Support ResultOrder to TimestampedRangeQuery. [kafka]

2023-12-05 Thread via GitHub


hanyuzheng7 commented on code in PR #14907:
URL: https://github.com/apache/kafka/pull/14907#discussion_r1416481880


##
streams/src/main/java/org/apache/kafka/streams/query/TimestampedRangeQuery.java:
##
@@ -82,25 +81,25 @@ public static  TimestampedRangeQuery 
withUpperBound(final K upper) {
  * @param  The value type
  */
 public static  TimestampedRangeQuery withLowerBound(final K 
lower) {
-return new TimestampedRangeQuery<>(Optional.of(lower), 
Optional.empty(), true);
+return new TimestampedRangeQuery<>(Optional.of(lower), 
Optional.empty(), ResultOrder.ANY);
 }
 
 /**
  * Determines if the serialized byte[] of the keys in ascending order.
  * Order is based on the serialized byte[] of the keys, not the 'logical' 
key order.
- * @return true if ascending, false otherwise.
+ * @return return the order of return records base on the serialized 
byte[] of the keys(can be unordered, or in ascending, or in descending order).
  */
-public boolean isKeyAscending() {
-return isKeyAscending;
+public ResultOrder resultOrder() {
+return order;
 }
 
 /**
  * Set the query to return the serialized byte[] of the keys in descending 
order.
  * Order is based on the serialized byte[] of the keys, not the 'logical' 
key order.
- * @return a new RangeQuery instance with descending flag set.
+ * @return a new RangeQuery instance with DESCENDING set.
  */
 public TimestampedRangeQuery withDescendingKeys() {

Review Comment:
   ok



##
streams/src/main/java/org/apache/kafka/streams/query/TimestampedRangeQuery.java:
##
@@ -82,25 +81,25 @@ public static  TimestampedRangeQuery 
withUpperBound(final K upper) {
  * @param  The value type
  */
 public static  TimestampedRangeQuery withLowerBound(final K 
lower) {
-return new TimestampedRangeQuery<>(Optional.of(lower), 
Optional.empty(), true);
+return new TimestampedRangeQuery<>(Optional.of(lower), 
Optional.empty(), ResultOrder.ANY);
 }
 
 /**
  * Determines if the serialized byte[] of the keys in ascending order.
  * Order is based on the serialized byte[] of the keys, not the 'logical' 
key order.
- * @return true if ascending, false otherwise.
+ * @return return the order of return records base on the serialized 
byte[] of the keys(can be unordered, or in ascending, or in descending order).
  */
-public boolean isKeyAscending() {
-return isKeyAscending;
+public ResultOrder resultOrder() {
+return order;
 }
 
 /**
  * Set the query to return the serialized byte[] of the keys in descending 
order.
  * Order is based on the serialized byte[] of the keys, not the 'logical' 
key order.
- * @return a new RangeQuery instance with descending flag set.
+ * @return a new RangeQuery instance with DESCENDING set.

Review Comment:
   ok



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-15629: Support ResultOrder to TimestampedRangeQuery. [kafka]

2023-12-05 Thread via GitHub


hanyuzheng7 commented on code in PR #14907:
URL: https://github.com/apache/kafka/pull/14907#discussion_r1416481604


##
streams/src/main/java/org/apache/kafka/streams/state/internals/MeteredTimestampedKeyValueStore.java:
##
@@ -205,12 +206,12 @@ private  QueryResult runTimestampedRangeQuery(final 
Query query,
 final QueryResult result;
 final TimestampedRangeQuery typedQuery = 
(TimestampedRangeQuery) query;
 RangeQuery rawRangeQuery;
-final boolean isKeyAscending = typedQuery.isKeyAscending();
+ResultOrder order = typedQuery.resultOrder();
 rawRangeQuery = RangeQuery.withRange(
 keyBytes(typedQuery.lowerBound().orElse(null)),
 keyBytes(typedQuery.upperBound().orElse(null))
 );
-if (!isKeyAscending) {
+if (order.equals(ResultOrder.DESCENDING)) {

Review Comment:
   https://github.com/apache/kafka/pull/14907#discussion_r1416481443



##
streams/src/main/java/org/apache/kafka/streams/state/internals/MeteredTimestampedKeyValueStore.java:
##
@@ -268,12 +269,12 @@ private   QueryResult runRangeQuery(final Query 
query,
 final QueryResult result;
 final RangeQuery typedQuery = (RangeQuery) query;
 RangeQuery rawRangeQuery;
-final boolean isKeyAscending = typedQuery.isKeyAscending();
+ResultOrder order = typedQuery.resultOrder();
 rawRangeQuery = RangeQuery.withRange(
 keyBytes(typedQuery.getLowerBound().orElse(null)),
 keyBytes(typedQuery.getUpperBound().orElse(null))
 );
-if (!isKeyAscending) {
+if (order.equals(ResultOrder.DESCENDING)) {

Review Comment:
   https://github.com/apache/kafka/pull/14907#discussion_r1416481443



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-15629: Support ResultOrder to TimestampedRangeQuery. [kafka]

2023-12-05 Thread via GitHub


hanyuzheng7 commented on code in PR #14907:
URL: https://github.com/apache/kafka/pull/14907#discussion_r1416481443


##
streams/src/main/java/org/apache/kafka/streams/state/internals/MeteredKeyValueStore.java:
##
@@ -254,12 +255,12 @@ private  QueryResult runRangeQuery(final Query 
query,
 final QueryResult result;
 final RangeQuery typedQuery = (RangeQuery) query;
 RangeQuery rawRangeQuery;
-final boolean isKeyAscending = typedQuery.isKeyAscending();
+final ResultOrder order = typedQuery.resultOrder();
 rawRangeQuery = RangeQuery.withRange(
 keyBytes(typedQuery.getLowerBound().orElse(null)),
 keyBytes(typedQuery.getUpperBound().orElse(null))
 );
-if (!isKeyAscending) {
+if (order.equals(ResultOrder.DESCENDING)) {

Review Comment:
   Given that 'ANY' and 'ASCENDING' are treated the same, except for 
'ResultOrder.DESCENDING', should a separate statement be used to handle the 
'ASCENDING' case?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-15426: Process and persist directory assignments [kafka]

2023-12-05 Thread via GitHub


soarez commented on code in PR #14863:
URL: https://github.com/apache/kafka/pull/14863#discussion_r1416477136


##
metadata/src/main/java/org/apache/kafka/controller/ReplicationControlManager.java:
##
@@ -2019,6 +2021,70 @@ ListPartitionReassignmentsResponseData 
listPartitionReassignments(
 return response;
 }
 
+ControllerResult 
handleAssignReplicasToDirs(AssignReplicasToDirsRequestData request) {
+int brokerId = request.brokerId();
+clusterControl.checkBrokerEpoch(brokerId, request.brokerEpoch());
+BrokerRegistration brokerRegistration = 
clusterControl.brokerRegistrations().get(brokerId);
+List records = new ArrayList<>();
+AssignReplicasToDirsResponseData response = new 
AssignReplicasToDirsResponseData();
+Set leaderAndIsrUpdates = new HashSet<>();
+for (AssignReplicasToDirsRequestData.DirectoryData reqDir : 
request.directories()) {
+Uuid dirId = reqDir.id();
+AssignReplicasToDirsResponseData.DirectoryData resDir = new 
AssignReplicasToDirsResponseData.DirectoryData().setId(dirId);
+for (AssignReplicasToDirsRequestData.TopicData reqTopic : 
reqDir.topics()) {
+Uuid topicId = reqTopic.topicId();
+Errors topicError = Errors.NONE;
+TopicControlInfo topicControl = this.topics.get(topicId);
+if (topicControl == null) {
+log.warn("AssignReplicasToDirsRequest from broker {} 
references unknown topic ID {}", brokerId, topicId);
+topicError = Errors.UNKNOWN_TOPIC_ID;
+}
+AssignReplicasToDirsResponseData.TopicData resTopic = new 
AssignReplicasToDirsResponseData.TopicData().setTopicId(topicId);
+for (AssignReplicasToDirsRequestData.PartitionData 
reqPartition : reqTopic.partitions()) {
+int partitionIndex = reqPartition.partitionIndex();
+Errors partitionError = topicError;
+if (topicError == Errors.NONE) {
+String topicName = topicControl.name;
+PartitionRegistration partitionRegistration = 
topicControl.parts.get(partitionIndex);
+if (partitionRegistration == null) {
+log.warn("AssignReplicasToDirsRequest from broker 
{} references unknown partition {}-{}", brokerId, topicName, partitionIndex);
+partitionError = Errors.UNKNOWN_TOPIC_OR_PARTITION;
+} else if 
(!Replicas.contains(partitionRegistration.replicas, brokerId)) {
+log.warn("AssignReplicasToDirsRequest from broker 
{} references non assigned partition {}-{}", brokerId, topicName, 
partitionIndex);
+partitionError = Errors.NOT_LEADER_OR_FOLLOWER;
+} else {
+Optional 
partitionChangeRecord = new PartitionChangeBuilder(
+partitionRegistration,
+topicId,
+partitionIndex,
+new LeaderAcceptor(clusterControl, 
partitionRegistration),
+featureControl.metadataVersion(),
+getTopicEffectiveMinIsr(topicName)
+)
+.setDirectory(brokerId, dirId)
+.setDefaultDirProvider(clusterDescriber)
+.build();
+partitionChangeRecord.ifPresent(records::add);
+
+if (!brokerRegistration.hasOnlineDir(dirId)) {

Review Comment:
   Good point



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-15361: Process and persist dir info with broker registration [kafka]

2023-12-05 Thread via GitHub


soarez commented on PR #14838:
URL: https://github.com/apache/kafka/pull/14838#issuecomment-1841864233

   @cmccabe:
   
   > I didn't understand the purpose of the HeartbeatManager changes (now that 
we've agreed to keep Directory out of HBM). Seems like we should be able to 
keep this file the same now? Or if not, can we change it in a separate PR where 
we have some rationale?
   
   The changes aren't necessary, but they are an improvement. I've moved them 
to a separate PR: #14934


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[PR] MINOR: register before touch in BrokerHeartbeatManager [kafka]

2023-12-05 Thread via GitHub


soarez opened a new pull request, #14934:
URL: https://github.com/apache/kafka/pull/14934

   BrokerHeartbeatManager has a confusing API where it tolerates 
`touch(brokerId, fenced, metadataOffset)` without a previous call to 
`register(brokerId, fenced)`.
   
   
   ### Committer Checklist (excluded from commit message)
   - [ ] Verify design and implementation 
   - [ ] Verify test coverage and CI build status
   - [ ] Verify documentation (including upgrade notes)
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-15361: Process and persist dir info with broker registration [kafka]

2023-12-05 Thread via GitHub


soarez commented on code in PR #14838:
URL: https://github.com/apache/kafka/pull/14838#discussion_r1416458768


##
metadata/src/test/java/org/apache/kafka/controller/ClusterControlManagerTest.java:
##
@@ -272,8 +274,17 @@ public void testRegistrationWithIncorrectClusterId() {
 (short) 1));
 }
 
+private static Stream metadataVersions() {
+return Stream.of(
+MetadataVersion.IBP_3_3_IV2,
+MetadataVersion.IBP_3_3_IV3,
+MetadataVersion.LATEST_PRODUCTION,

Review Comment:
   Replaced `LATEST_PRODUCTION` with `IBP_3_7_IV2`.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-15361: Process and persist dir info with broker registration [kafka]

2023-12-05 Thread via GitHub


soarez commented on code in PR #14838:
URL: https://github.com/apache/kafka/pull/14838#discussion_r1416458244


##
metadata/src/main/java/org/apache/kafka/controller/ClusterControlManager.java:
##
@@ -354,6 +357,25 @@ public ControllerResult 
registerBroker(
 throw new BrokerIdNotRegisteredException("Controller is in 
pre-migration mode and cannot register KRaft " +
 "brokers until the metadata migration is complete.");
 }
+
+if (featureControl.metadataVersion().isDirectoryAssignmentSupported()) 
{
+Set set = new HashSet<>(request.logDirs());
+if (set.stream().anyMatch(DirectoryId::reserved)) {
+throw new InvalidRequestException("Reserved directory ID in 
request");
+}
+if (set.size() != request.logDirs().size()) {
+throw new InvalidRequestException("Duplicate directory ID in 
request");
+}
+for (BrokerRegistration registration : 
brokerRegistrations().values()) {

Review Comment:
   @cmccabe:
   
   >  InvalidRequestException seems too generic
   
   I've introduced InvalidRegistrationException to handle these.
   
   > Also, we shouldn't be checking the previous registration for this specific 
broker ID.
   
   I forgot that. Thanks for pointing this out!
   
   > we should just have a TimelineHashMap that maps log dir 
UUID to the broker that has it
   
   Great suggestion. Added.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-15871: kafka-client-metrics.sh [kafka]

2023-12-05 Thread via GitHub


junrao commented on code in PR #14926:
URL: https://github.com/apache/kafka/pull/14926#discussion_r1416416581


##
tools/src/main/java/org/apache/kafka/tools/ClientMetricsCommand.java:
##
@@ -0,0 +1,377 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.kafka.tools;
+
+import joptsimple.ArgumentAcceptingOptionSpec;
+import joptsimple.OptionSpec;
+import joptsimple.OptionSpecBuilder;
+
+import org.apache.kafka.clients.admin.Admin;
+import org.apache.kafka.clients.admin.AdminClientConfig;
+import org.apache.kafka.clients.admin.AlterConfigOp;
+import org.apache.kafka.clients.admin.AlterConfigsOptions;
+import org.apache.kafka.clients.admin.ClientMetricsResourceListing;
+import org.apache.kafka.clients.admin.Config;
+import org.apache.kafka.clients.admin.ConfigEntry;
+import org.apache.kafka.common.Uuid;
+import org.apache.kafka.common.config.ConfigResource;
+import org.apache.kafka.common.utils.Exit;
+import org.apache.kafka.common.utils.Utils;
+import org.apache.kafka.server.util.CommandDefaultOptions;
+import org.apache.kafka.server.util.CommandLineUtils;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.IOException;
+import java.util.Arrays;
+import java.util.Collection;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.Optional;
+import java.util.Properties;
+import java.util.concurrent.ExecutionException;
+import java.util.concurrent.TimeUnit;
+import java.util.stream.Collectors;
+
+public class ClientMetricsCommand {
+private static final Logger LOG = 
LoggerFactory.getLogger(ClientMetricsCommand.class);
+
+public static void main(String... args) {
+Exit.exit(mainNoExit(args));
+}
+
+static int mainNoExit(String... args) {
+try {
+execute(args);
+return 0;
+} catch (Throwable e) {
+System.err.println(e.getMessage());
+System.err.println(Utils.stackTrace(e));
+return 1;
+}
+}
+
+static void execute(String... args) throws Exception {
+ClientMetricsCommandOptions opts = new 
ClientMetricsCommandOptions(args);
+
+Properties config = opts.commandConfig();
+config.put(AdminClientConfig.BOOTSTRAP_SERVERS_CONFIG, 
opts.bootstrapServer());
+
+ClientMetricsService service = new ClientMetricsService(config);
+int exitCode = 0;
+try {
+if (opts.hasAlterOption()) {
+service.alterClientMetrics(opts);
+} else if (opts.hasDescribeOption()) {
+service.describeClientMetrics(opts);
+} else if (opts.hasDeleteOption()) {
+service.deleteClientMetrics(opts);
+} else if (opts.hasListOption()) {
+service.listClientMetrics(opts);
+}
+} catch (ExecutionException e) {
+Throwable cause = e.getCause();
+if (cause != null) {
+printException(cause);
+} else {
+printException(e);
+}
+exitCode = 1;
+} catch (Throwable t) {
+printException(t);
+exitCode = 1;
+} finally {
+service.close();
+Exit.exit(exitCode);
+}
+}
+
+public static class ClientMetricsService implements AutoCloseable {
+private Admin adminClient;
+
+public ClientMetricsService(Properties config) {
+this.adminClient = Admin.create(config);
+}
+
+// Visible for testing
+ClientMetricsService(Admin adminClient) {
+this.adminClient = adminClient;
+}
+
+public void alterClientMetrics(ClientMetricsCommandOptions opts) 
throws Exception {
+String entityName = opts.hasGenerateNameOption() ? 
Uuid.randomUuid().toString() : opts.name().get();
+
+Map configsToBeSet = new HashMap<>();
+opts.interval().map(intervalVal -> 
configsToBeSet.put("interval.ms", intervalVal.toString()));
+opts.metrics().map(metricslist -> configsToBeSet.put("metrics", 
metricslist.stream().collect(Collectors.joining(",";
+

Re: [PR] [MINOR-PR] Enable telemetry APIs and logs suppression (KIP-714) [kafka]

2023-12-05 Thread via GitHub


wcarlson5 merged PR #14928:
URL: https://github.com/apache/kafka/pull/14928


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] Kafka-14748: Relax non-null FK left-join requirement [kafka]

2023-12-05 Thread via GitHub


wcarlson5 merged PR #14107:
URL: https://github.com/apache/kafka/pull/14107


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] Kafka-14748: Relax non-null FK left-join requirement [kafka]

2023-12-05 Thread via GitHub


wcarlson5 commented on PR #14107:
URL: https://github.com/apache/kafka/pull/14107#issuecomment-1841851229

   I'm not sure, either option for the optimization is fine with me as long as 
it's well documented. 
   
   I'm good with how the PR is for now. I'm going to merge it to get it in 
before feature freeze for 3.7


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-15696: Refactor AsyncConsumer close procedure [kafka]

2023-12-05 Thread via GitHub


kirktrue commented on code in PR #14920:
URL: https://github.com/apache/kafka/pull/14920#discussion_r1416404354


##
clients/src/main/java/org/apache/kafka/clients/consumer/internals/AsyncKafkaConsumer.java:
##
@@ -957,6 +966,57 @@ private void close(Duration timeout, boolean 
swallowException) {
 }
 }
 
+/**
+ * Prior to closing the network thread, we need to make sure the following 
operations happen in the right sequence:
+ * 1. autocommit offsets
+ * 2. revoke all partitions
+ */
+private void prepareShutdown(final Timer timer) {
+if (!groupMetadata.isPresent())
+return;
+
+maybeAutoCommitSync(timer);
+timer.update();
+if (!subscriptions.hasAutoAssignedPartitions() || 
subscriptions.assignedPartitions().isEmpty())
+return;
+
+try {
+// If the consumer is in a group, we will pause and revoke all 
assigned partitions
+onLeavePrepare().get(timer.remainingMs(), TimeUnit.MILLISECONDS);
+timer.update();
+} catch (Exception e) {
+Exception exception = e;
+if (e instanceof ExecutionException)
+exception = (Exception) e.getCause();
+throw new KafkaException("User rebalance callback throws an 
error", exception);
+} finally {
+subscriptions.assignFromSubscribed(Collections.emptySet());
+}
+}
+
+private void maybeAutoCommitSync(final Timer timer) {
+if (autoCommitEnabled) {
+Map allConsumed = 
subscriptions.allConsumed();
+try {
+log.debug("Sending synchronous auto-commit of offsets {} on 
closing", allConsumed);
+commitSync(allConsumed, 
Duration.ofMillis(timer.remainingMs()));
+} catch (Exception e) {
+// consistent with async auto-commit failures, we do not 
propagate the exception
+log.warn("Synchronous auto-commit of offsets {} failed: {}", 
allConsumed, e.getMessage());
+}
+}
+}
+
+private CompletableFuture onLeavePrepare() {
+SortedSet droppedPartitions = new 
TreeSet<>(MembershipManagerImpl.TOPIC_PARTITION_COMPARATOR);
+droppedPartitions.addAll(subscriptions.assignedPartitions());
+if (!subscriptions.hasAutoAssignedPartitions() || 
droppedPartitions.isEmpty()) {
+return CompletableFuture.completedFuture(null);
+}
+// TODO: Invoke rebalanceListener via KAFKA-15276
+return CompletableFuture.completedFuture(null);
+}
+

Review Comment:
   I would imagine that the leave group process could be mostly performed using 
an event and a callback execution, like in #14931.
   
   We'd need to submit an event to the background thread (e.g. 
`PrepareCloseEvent`) so that the member manager can orchestrate the callback 
request and the heartbeat request.



##
clients/src/main/java/org/apache/kafka/clients/consumer/internals/AsyncKafkaConsumer.java:
##
@@ -931,15 +941,14 @@ private void close(Duration timeout, boolean 
swallowException) {
 final Timer closeTimer = time.timer(timeout);
 clientTelemetryReporter.ifPresent(reporter -> 
reporter.initiateClose(timeout.toMillis()));
 closeTimer.update();
-
+// Prepare shutting down the network thread
+swallow(log, Level.ERROR, "Unexpected exception when preparing for 
shutdown", () -> prepareShutdown(closeTimer), firstException);
+closeTimer.update();
 if (applicationEventHandler != null)
-closeQuietly(() -> 
applicationEventHandler.close(Duration.ofMillis(closeTimer.remainingMs())), 
"Failed to close application event handler with a timeout(ms)=" + 
closeTimer.remainingMs(), firstException);
-
-// Invoke all callbacks after the background thread exists in case if 
there are unsent async
-// commits
-maybeInvokeCommitCallbacks();
-
-closeQuietly(fetchBuffer, "Failed to close the fetch buffer", 
firstException);
+closeQuietly(() -> 
applicationEventHandler.close(Duration.ofMillis(closeTimer.remainingMs())), 
"Failed shutting down network thread", firstException);
+closeTimer.update();
+// Ensure all async commit callbacks are invoked
+swallow(log, Level.ERROR, "Failed invoking asynchronous commit 
callback", this::maybeInvokeCommitCallbacks, firstException);

Review Comment:
   These should be done before we shut down the network, right?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-15760: Disable flaky test testTaskRequestWithOldStartMsGetsUpdated [kafka]

2023-12-05 Thread via GitHub


gharris1727 commented on PR #14917:
URL: https://github.com/apache/kafka/pull/14917#issuecomment-1841810754

   I did some stress-testing on my "BeforeAll" fix overnight and found a new 
failure mode
   
   ```
   org.opentest4j.AssertionFailedError: expected: 
   
<{"state":"RUNNING","spec":{"class":"org.apache.kafka.trogdor.task.NoOpTaskSpec","startMs":552,"durationMs":500},"startedMs":552,"status":"receiving"}>
   but was: 
   
<{"state":"RUNNING","spec":{"class":"org.apache.kafka.trogdor.task.NoOpTaskSpec","startMs":552,"durationMs":500},"startedMs":552,"status":"active"}>
   ```
   
   This occurred in 29 of 7531 runs (0.38% chance) at 20% CPU.
   
   I'm going to re-run the test with your branch to see how effective the 
4-minute timeout is.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-15426: Process and persist directory assignments [kafka]

2023-12-05 Thread via GitHub


rondagostino commented on code in PR #14863:
URL: https://github.com/apache/kafka/pull/14863#discussion_r1416193447


##
core/src/test/scala/unit/kafka/server/ControllerApisTest.scala:
##
@@ -1139,31 +1139,24 @@ class ControllerApisTest {
   }
 
   @Test
-  def testAssignReplicasToDirsReturnsUnsupportedVersion(): Unit = {
+  def testAssignReplicasToDirs(): Unit = {
 val controller = mock(classOf[Controller])
-val controllerApis = createControllerApis(None, controller)
+val authorizer = mock(classOf[Authorizer])
+val controllerApis = createControllerApis(Some(authorizer), controller)
+
+val request = new AssignReplicasToDirsRequest.Builder(new 
AssignReplicasToDirsRequestData()).build()
+
+when(authorizer.authorize(any[RequestContext], 
ArgumentMatchers.eq(Collections.singletonList(new Action(
+  AclOperation.CLUSTER_ACTION,
+  new ResourcePattern(ResourceType.CLUSTER, Resource.CLUSTER_NAME, 
PatternType.LITERAL),
+  1, true, true
+)
+  .thenReturn(Collections.singletonList(AuthorizationResult.ALLOWED))
+when(controller.assignReplicasToDirs(any[ControllerRequestContext], 
ArgumentMatchers.eq(request.data)))
+  .thenThrow(Errors.UNKNOWN_TOPIC_OR_PARTITION.exception())

Review Comment:
   Need to return something like `completableFuture.completeExceptionally()` 
here as opposed to throwing an exception directly.



##
metadata/src/main/java/org/apache/kafka/controller/ReplicationControlManager.java:
##
@@ -2019,6 +2021,70 @@ ListPartitionReassignmentsResponseData 
listPartitionReassignments(
 return response;
 }
 
+ControllerResult 
handleAssignReplicasToDirs(AssignReplicasToDirsRequestData request) {
+int brokerId = request.brokerId();
+clusterControl.checkBrokerEpoch(brokerId, request.brokerEpoch());
+BrokerRegistration brokerRegistration = 
clusterControl.brokerRegistrations().get(brokerId);
+List records = new ArrayList<>();
+AssignReplicasToDirsResponseData response = new 
AssignReplicasToDirsResponseData();
+Set leaderAndIsrUpdates = new HashSet<>();
+for (AssignReplicasToDirsRequestData.DirectoryData reqDir : 
request.directories()) {
+Uuid dirId = reqDir.id();
+AssignReplicasToDirsResponseData.DirectoryData resDir = new 
AssignReplicasToDirsResponseData.DirectoryData().setId(dirId);
+for (AssignReplicasToDirsRequestData.TopicData reqTopic : 
reqDir.topics()) {
+Uuid topicId = reqTopic.topicId();
+Errors topicError = Errors.NONE;
+TopicControlInfo topicControl = this.topics.get(topicId);

Review Comment:
   nit: `topicControlInfo` or `topicInfo` might be a better name than 
`topicControl` -- get the "Info" in there somehow.



##
metadata/src/main/java/org/apache/kafka/controller/ReplicationControlManager.java:
##
@@ -2019,6 +2021,70 @@ ListPartitionReassignmentsResponseData 
listPartitionReassignments(
 return response;
 }
 
+ControllerResult 
handleAssignReplicasToDirs(AssignReplicasToDirsRequestData request) {
+int brokerId = request.brokerId();
+clusterControl.checkBrokerEpoch(brokerId, request.brokerEpoch());
+BrokerRegistration brokerRegistration = 
clusterControl.brokerRegistrations().get(brokerId);
+List records = new ArrayList<>();
+AssignReplicasToDirsResponseData response = new 
AssignReplicasToDirsResponseData();
+Set leaderAndIsrUpdates = new HashSet<>();
+for (AssignReplicasToDirsRequestData.DirectoryData reqDir : 
request.directories()) {
+Uuid dirId = reqDir.id();
+AssignReplicasToDirsResponseData.DirectoryData resDir = new 
AssignReplicasToDirsResponseData.DirectoryData().setId(dirId);
+for (AssignReplicasToDirsRequestData.TopicData reqTopic : 
reqDir.topics()) {
+Uuid topicId = reqTopic.topicId();
+Errors topicError = Errors.NONE;
+TopicControlInfo topicControl = this.topics.get(topicId);
+if (topicControl == null) {
+log.warn("AssignReplicasToDirsRequest from broker {} 
references unknown topic ID {}", brokerId, topicId);
+topicError = Errors.UNKNOWN_TOPIC_ID;
+}
+AssignReplicasToDirsResponseData.TopicData resTopic = new 
AssignReplicasToDirsResponseData.TopicData().setTopicId(topicId);
+for (AssignReplicasToDirsRequestData.PartitionData 
reqPartition : reqTopic.partitions()) {
+int partitionIndex = reqPartition.partitionIndex();
+Errors partitionError = topicError;
+if (topicError == Errors.NONE) {
+String topicName = topicControl.name;
+PartitionRegistration partitionRegistration = 
topicControl.parts.get(partitionIndex);
+if 

Re: [PR] MINOR: ConsumerRebalanceListenerInvoker class and method visibility [kafka]

2023-12-05 Thread via GitHub


kirktrue commented on PR #14931:
URL: https://github.com/apache/kafka/pull/14931#issuecomment-1841792685

   Four greys. Restarting build again.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] MINOR: ConsumerRebalanceListenerInvoker class and method visibility [kafka]

2023-12-05 Thread via GitHub


kirktrue closed pull request #14931: MINOR: ConsumerRebalanceListenerInvoker 
class and method visibility
URL: https://github.com/apache/kafka/pull/14931


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-15818: ensure leave group on max poll interval [kafka]

2023-12-05 Thread via GitHub


philipnee commented on PR #14873:
URL: https://github.com/apache/kafka/pull/14873#issuecomment-1841786386

   Hi @lianetm @cadonna @lucasbru - Addressed your comments.  I've left a 
question around sending leave group for static member.  From reading the KIP, 
seems like we should do it.  I think we need further clarification.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-15818: ensure leave group on max poll interval [kafka]

2023-12-05 Thread via GitHub


philipnee commented on code in PR #14873:
URL: https://github.com/apache/kafka/pull/14873#discussion_r1416388581


##
clients/src/main/java/org/apache/kafka/clients/consumer/internals/HeartbeatRequestManager.java:
##
@@ -103,6 +103,12 @@ public class HeartbeatRequestManager implements 
RequestManager {
  */
 private final BackgroundEventHandler backgroundEventHandler;
 
+/**
+ * Timer for tracking the time since the last consumer poll.  If the timer 
expires, the consumer will stop
+ * sending heartbeat until the next poll.
+ */

Review Comment:
   I think you meant : only when `group.instance.id` is not set, i.e. dynamic 
member?
   
   This is the current implementation
   ```
// Starting from 2.3, only dynamic members will send LeaveGroupRequest to 
the broker,
   // consumer with valid group.instance.id is viewed as static member 
that never sends LeaveGroup,
   // and the membership expiration is only controlled by session 
timeout.
   if (isDynamicMember() && !coordinatorUnknown() &&
   state != MemberState.UNJOINED && generation.hasMemberId()) {
   // this is a minimal effort attempt to leave the group. we do not
   // attempt any resending if the request fails or times out.
   log.info("Member {} sending LeaveGroup request to coordinator {} 
due to {}",
   generation.memberId, coordinator, leaveReason);
   LeaveGroupRequest.Builder request = new 
LeaveGroupRequest.Builder(
   rebalanceConfig.groupId,
   Collections.singletonList(new 
MemberIdentity().setMemberId(generation.memberId).setReason(JoinGroupRequest.maybeTruncateReason(leaveReason)))
   );
   
   future = client.send(coordinator, request).compose(new 
LeaveGroupResponseHandler(generation));
   client.pollNoWakeup();
   }
   ```



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-15818: ensure leave group on max poll interval [kafka]

2023-12-05 Thread via GitHub


philipnee commented on code in PR #14873:
URL: https://github.com/apache/kafka/pull/14873#discussion_r1416383477


##
clients/src/main/java/org/apache/kafka/clients/consumer/internals/HeartbeatRequestManager.java:
##
@@ -103,6 +103,12 @@ public class HeartbeatRequestManager implements 
RequestManager {
  */
 private final BackgroundEventHandler backgroundEventHandler;
 
+/**
+ * Timer for tracking the time since the last consumer poll.  If the timer 
expires, the consumer will stop
+ * sending heartbeat until the next poll.
+ */

Review Comment:
   well, i'm not sure actually.  According to KIP-848: `Static membership, 
introduced in KIP-345, is still supported by this new rebalance protocol. When 
a member wants to leave temporary – e.g. while being bounced – it should send 
an heartbeat with a member epoch equals to -2. This signals to the group 
coordinator that the member left but will rejoin within the session timeout. 
When the member rejoins with the same instance ID, the group coordinator 
replaces the old member by the new member and gives back its current 
assignment.`
   
   



##
clients/src/main/java/org/apache/kafka/clients/consumer/internals/HeartbeatRequestManager.java:
##
@@ -103,6 +103,12 @@ public class HeartbeatRequestManager implements 
RequestManager {
  */
 private final BackgroundEventHandler backgroundEventHandler;
 
+/**
+ * Timer for tracking the time since the last consumer poll.  If the timer 
expires, the consumer will stop
+ * sending heartbeat until the next poll.
+ */

Review Comment:
   well, i'm not sure actually.  According to KIP-848: `Static membership, 
introduced in KIP-345, is still supported by this new rebalance protocol. When 
a member wants to leave temporary – e.g. while being bounced – it should send 
an heartbeat with a member epoch equals to -2. This signals to the group 
coordinator that the member left but will rejoin within the session timeout. 
When the member rejoins with the same instance ID, the group coordinator 
replaces the old member by the new member and gives back its current 
assignment.`
   
   



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-15818: ensure leave group on max poll interval [kafka]

2023-12-05 Thread via GitHub


philipnee commented on code in PR #14873:
URL: https://github.com/apache/kafka/pull/14873#discussion_r1416380771


##
clients/src/test/java/org/apache/kafka/clients/consumer/internals/HeartbeatRequestManagerTest.java:
##
@@ -399,14 +400,45 @@ public void testHeartbeatState() {
 new ConsumerGroupHeartbeatResponseData.Assignment();
 
assignmentTopic1.setTopicPartitions(Collections.singletonList(tpTopic1));
 ConsumerGroupHeartbeatResponse rs1 = new 
ConsumerGroupHeartbeatResponse(new ConsumerGroupHeartbeatResponseData()
-.setHeartbeatIntervalMs(DEFAULT_HEARTBEAT_INTERVAL_MS)
-.setMemberId(memberId)
-.setMemberEpoch(1)
-.setAssignment(assignmentTopic1));
+.setHeartbeatIntervalMs(DEFAULT_HEARTBEAT_INTERVAL_MS)
+.setMemberId(memberId)
+.setMemberEpoch(1)
+.setAssignment(assignmentTopic1));
 membershipManager.onHeartbeatResponseReceived(rs1.data());
 assertEquals(MemberState.RECONCILING, membershipManager.state());
 }
 
+@Test
+public void testEnsureLeaveGroupWhenPollTimerExpires() {
+membershipManager.transitionToJoining();

Review Comment:
   it is not possible to transition to fenced from unsubscribed



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-15818: ensure leave group on max poll interval [kafka]

2023-12-05 Thread via GitHub


philipnee commented on code in PR #14873:
URL: https://github.com/apache/kafka/pull/14873#discussion_r1416378676


##
clients/src/main/java/org/apache/kafka/clients/consumer/internals/HeartbeatRequestManager.java:
##
@@ -198,6 +219,17 @@ public long maximumTimeToWait(long currentTimeMs) {
 return heartbeatNow ? 0L : 
heartbeatRequestState.nextHeartbeatMs(currentTimeMs);
 }
 
+/**
+ * When consumer polls, we need to reset the pollTimer.  If member is 
already leaving the group
+ */
+public void ack() {

Review Comment:
   The code is updated - but the pollTimer is updated on a regular cadence by 
the HeartbeatRequestManager.poll(). this func is used to check if the timer has 
been expired.  If it was, then we reset the timer everytime we poll the 
consumer.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-15684: Support to describe all client metrics resources (KIP-714) [kafka]

2023-12-05 Thread via GitHub


junrao commented on code in PR #14933:
URL: https://github.com/apache/kafka/pull/14933#discussion_r1416359770


##
core/src/main/scala/kafka/server/KafkaApis.scala:
##
@@ -3810,16 +3811,27 @@ class KafkaApis(val requestChannel: RequestChannel,
 }
   }
 
-  // Just a placeholder for now.
   def handleListClientMetricsResources(request: RequestChannel.Request): Unit 
= {
 val listClientMetricsResourcesRequest = 
request.body[ListClientMetricsResourcesRequest]
 
 if (!authHelper.authorize(request.context, DESCRIBE_CONFIGS, CLUSTER, 
CLUSTER_NAME)) {
   requestHelper.sendMaybeThrottle(request, 
listClientMetricsResourcesRequest.getErrorResponse(Errors.CLUSTER_AUTHORIZATION_FAILED.exception))
 } else {
-  // Just return an empty list in the placeholder
-  val data = new ListClientMetricsResourcesResponseData()
-  requestHelper.sendMaybeThrottle(request, new 
ListClientMetricsResourcesResponse(data))
+  clientMetricsManager match {
+case Some(metricsManager) =>
+  try {
+val data = new 
ListClientMetricsResourcesResponseData().setClientMetricsResources(
+  metricsManager.listClientMetricsResources.asScala.map(
+name => new 
ClientMetricsResource().setName(name)).toList.asJava)
+requestHelper.sendMaybeThrottle(request, new 
ListClientMetricsResourcesResponse(data))
+  } catch {
+case _: Exception =>

Review Comment:
   Do we need this? `KafkaApis` has a generic way to handle unexpected errors 
through `handleError`.



##
core/src/test/scala/unit/kafka/admin/ConfigCommandTest.scala:
##
@@ -1690,13 +1690,33 @@ class ConfigCommandTest extends Logging {
   }
 
   @Test
-  def shouldNotDescribeClientMetricsConfigWithoutEntityName(): Unit = {
+  def shouldDescribeClientMetricsConfigWithoutEntityName(): Unit = {
 val describeOpts = new ConfigCommandOptions(Array("--bootstrap-server", 
"localhost:9092",
   "--entity-type", "client-metrics",
   "--describe"))
 
-val exception = assertThrows(classOf[IllegalArgumentException], () => 
describeOpts.checkArgs())
-assertEquals("an entity name must be specified with --describe of 
client-metrics", exception.getMessage)
+val resourceCustom = new 
ConfigResource(ConfigResource.Type.CLIENT_METRICS, "1")
+val configEntry = new ConfigEntry("metrics", "*")
+val future = new KafkaFutureImpl[util.Map[ConfigResource, Config]]
+val describeResult: DescribeConfigsResult = 
mock(classOf[DescribeConfigsResult])
+when(describeResult.all()).thenReturn(future)
+
+val node = new Node(1, "localhost", 9092)
+val mockAdminClient = new 
MockAdminClient(util.Collections.singletonList(node), node) {
+  override def describeConfigs(resources: util.Collection[ConfigResource], 
options: DescribeConfigsOptions): DescribeConfigsResult = {

Review Comment:
   Hmm, this should call `listClientMetricsResources`, not `describeConfigs`, 
right?



##
core/src/test/scala/unit/kafka/server/KafkaApisTest.scala:
##
@@ -6922,4 +6923,58 @@ class KafkaApisTest {
 val expectedResponse = new 
PushTelemetryResponseData().setErrorCode(Errors.INVALID_REQUEST.code)
 assertEquals(expectedResponse, response.data)
   }
+
+  @Test
+  def testListClientMetricsResourcesNotAllowedForZkClusters(): Unit = {
+val request = buildRequest(new 
ListClientMetricsResourcesRequest.Builder(new 
ListClientMetricsResourcesRequestData()).build())
+createKafkaApis(enableForwarding = true).handle(request, 
RequestLocal.NoCaching)
+
+val response = 
verifyNoThrottling[ListClientMetricsResourcesResponse](request)
+assertEquals(Errors.UNKNOWN_SERVER_ERROR, 
Errors.forCode(response.data.errorCode))

Review Comment:
   Hmm, the code seems to set the error to `UNSUPPORTED_VERSION`?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[jira] [Updated] (KAFKA-15816) Typos in tests leak network sockets

2023-12-05 Thread Greg Harris (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-15816?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Greg Harris updated KAFKA-15816:

Description: 
There are a few tests which leak network sockets due to small typos in the 
tests themselves.

Clients: [https://github.com/apache/kafka/pull/14750] (DONE)
 * NioEchoServer
 * KafkaConsumerTest
 * KafkaProducerTest
 * SelectorTest
 * SslTransportLayerTest
 * SslTransportTls12Tls13Test
 * SslVersionsTransportLayerTest
 * SaslAuthenticatorTest

Core: [https://github.com/apache/kafka/pull/14754] 
 * MiniKdc
 * GssapiAuthenticationTest
 * MirrorMakerIntegrationTest
 * SocketServerTest
 * EpochDrivenReplicationProtocolAcceptanceTest
 * LeaderEpochIntegrationTest

Trogdor: [https://github.com/apache/kafka/pull/14771] 
 * AgentTest

Mirror: [https://github.com/apache/kafka/pull/14761] 
 * DedicatedMirrorIntegrationTest
 * MirrorConnectorsIntegrationTest
 * MirrorConnectorsWithCustomForwardingAdminIntegrationTest

Runtime: [https://github.com/apache/kafka/pull/14764] 
 * ConnectorTopicsIntegrationTest
 * ExactlyOnceSourceIntegrationTest
 * WorkerTest
 * WorkerGroupMemberTest

Streams: [https://github.com/apache/kafka/pull/14769] (DONE)
 * IQv2IntegrationTest
 * MetricsReporterIntegrationTest
 * NamedTopologyIntegrationTest
 * PurgeRepartitionTopicIntegrationTest

These can be addressed by just fixing the tests.

  was:
There are a few tests which leak network sockets due to small typos in the 
tests themselves.

Clients: [https://github.com/apache/kafka/pull/14750]
 * NioEchoServer
 * KafkaConsumerTest
 * KafkaProducerTest
 * SelectorTest
 * SslTransportLayerTest
 * SslTransportTls12Tls13Test
 * SslVersionsTransportLayerTest
 * SaslAuthenticatorTest

Core: [https://github.com/apache/kafka/pull/14754] 
 * MiniKdc
 * GssapiAuthenticationTest
 * MirrorMakerIntegrationTest
 * SocketServerTest
 * EpochDrivenReplicationProtocolAcceptanceTest
 * LeaderEpochIntegrationTest

Trogdor: [https://github.com/apache/kafka/pull/14771] 
 * AgentTest

Mirror: [https://github.com/apache/kafka/pull/14761] 
 * DedicatedMirrorIntegrationTest
 * MirrorConnectorsIntegrationTest
 * MirrorConnectorsWithCustomForwardingAdminIntegrationTest

Runtime: [https://github.com/apache/kafka/pull/14764] 
 * ConnectorTopicsIntegrationTest
 * ExactlyOnceSourceIntegrationTest
 * WorkerTest
 * WorkerGroupMemberTest

Streams: [https://github.com/apache/kafka/pull/14769] (DONE)
 * IQv2IntegrationTest
 * MetricsReporterIntegrationTest
 * NamedTopologyIntegrationTest
 * PurgeRepartitionTopicIntegrationTest

These can be addressed by just fixing the tests.


> Typos in tests leak network sockets
> ---
>
> Key: KAFKA-15816
> URL: https://issues.apache.org/jira/browse/KAFKA-15816
> Project: Kafka
>  Issue Type: Bug
>  Components: unit tests
>Affects Versions: 3.6.0
>Reporter: Greg Harris
>Assignee: Greg Harris
>Priority: Minor
>
> There are a few tests which leak network sockets due to small typos in the 
> tests themselves.
> Clients: [https://github.com/apache/kafka/pull/14750] (DONE)
>  * NioEchoServer
>  * KafkaConsumerTest
>  * KafkaProducerTest
>  * SelectorTest
>  * SslTransportLayerTest
>  * SslTransportTls12Tls13Test
>  * SslVersionsTransportLayerTest
>  * SaslAuthenticatorTest
> Core: [https://github.com/apache/kafka/pull/14754] 
>  * MiniKdc
>  * GssapiAuthenticationTest
>  * MirrorMakerIntegrationTest
>  * SocketServerTest
>  * EpochDrivenReplicationProtocolAcceptanceTest
>  * LeaderEpochIntegrationTest
> Trogdor: [https://github.com/apache/kafka/pull/14771] 
>  * AgentTest
> Mirror: [https://github.com/apache/kafka/pull/14761] 
>  * DedicatedMirrorIntegrationTest
>  * MirrorConnectorsIntegrationTest
>  * MirrorConnectorsWithCustomForwardingAdminIntegrationTest
> Runtime: [https://github.com/apache/kafka/pull/14764] 
>  * ConnectorTopicsIntegrationTest
>  * ExactlyOnceSourceIntegrationTest
>  * WorkerTest
>  * WorkerGroupMemberTest
> Streams: [https://github.com/apache/kafka/pull/14769] (DONE)
>  * IQv2IntegrationTest
>  * MetricsReporterIntegrationTest
>  * NamedTopologyIntegrationTest
>  * PurgeRepartitionTopicIntegrationTest
> These can be addressed by just fixing the tests.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


Re: [PR] KAFKA-15816: Fix leaked sockets in clients tests [kafka]

2023-12-05 Thread via GitHub


gharris1727 merged PR #14750:
URL: https://github.com/apache/kafka/pull/14750


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-15816: Fix leaked sockets in clients tests [kafka]

2023-12-05 Thread via GitHub


gharris1727 commented on PR #14750:
URL: https://github.com/apache/kafka/pull/14750#issuecomment-1841735675

   Test failures appear unrelated, and the clients tests pass locally.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-15361: Process and persist dir info with broker registration [kafka]

2023-12-05 Thread via GitHub


cmccabe commented on code in PR #14838:
URL: https://github.com/apache/kafka/pull/14838#discussion_r1416350081


##
metadata/src/main/java/org/apache/kafka/metadata/BrokerRegistration.java:
##
@@ -192,7 +192,7 @@ private BrokerRegistration(
 this.isMigratingZkBroker = isMigratingZkBroker;
 directories = new ArrayList<>(directories);
 directories.sort(Uuid::compareTo);
-this.directories = Collections.unmodifiableList(directories);
+this.sortedDirectories = Collections.unmodifiableList(directories);

Review Comment:
   ah, sorry, that is my bad. Please ignore this then :)



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] MINOR: ConsumerRebalanceListenerInvoker class and method visibility [kafka]

2023-12-05 Thread via GitHub


kirktrue commented on PR #14931:
URL: https://github.com/apache/kafka/pull/14931#issuecomment-1841727258

   Three yellows, one red. Restarting 爛 


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-15361: Process and persist dir info with broker registration [kafka]

2023-12-05 Thread via GitHub


cmccabe commented on PR #14838:
URL: https://github.com/apache/kafka/pull/14838#issuecomment-1841725640

   Sorry, one more thing: I didn't understand the purpose of the 
HeartbeatManager changes. Seems like we should be able to keep this file the 
same now? Or if not, can we do them in a separate PR where we have some 
rationale


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] MINOR: ConsumerRebalanceListenerInvoker class and method visibility [kafka]

2023-12-05 Thread via GitHub


kirktrue closed pull request #14931: MINOR: ConsumerRebalanceListenerInvoker 
class and method visibility
URL: https://github.com/apache/kafka/pull/14931


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-15361: Process and persist dir info with broker registration [kafka]

2023-12-05 Thread via GitHub


cmccabe commented on code in PR #14838:
URL: https://github.com/apache/kafka/pull/14838#discussion_r1416342392


##
metadata/src/main/java/org/apache/kafka/controller/ClusterControlManager.java:
##
@@ -354,6 +357,25 @@ public ControllerResult 
registerBroker(
 throw new BrokerIdNotRegisteredException("Controller is in 
pre-migration mode and cannot register KRaft " +
 "brokers until the metadata migration is complete.");
 }
+
+if (featureControl.metadataVersion().isDirectoryAssignmentSupported()) 
{
+Set set = new HashSet<>(request.logDirs());
+if (set.stream().anyMatch(DirectoryId::reserved)) {
+throw new InvalidRequestException("Reserved directory ID in 
request");
+}
+if (set.size() != request.logDirs().size()) {
+throw new InvalidRequestException("Duplicate directory ID in 
request");
+}
+for (BrokerRegistration registration : 
brokerRegistrations().values()) {

Review Comment:
   > Also, we shouldn't be checking the previous registration for this specific 
broker ID... it would be quite normal to re-register broker ID X with the same 
set of directories it had last time we registered it
   
   This might be what's causing the test failures btw



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-15361: Process and persist dir info with broker registration [kafka]

2023-12-05 Thread via GitHub


rondagostino commented on code in PR #14838:
URL: https://github.com/apache/kafka/pull/14838#discussion_r1416337073


##
metadata/src/main/java/org/apache/kafka/metadata/BrokerRegistration.java:
##
@@ -192,7 +192,7 @@ private BrokerRegistration(
 this.isMigratingZkBroker = isMigratingZkBroker;
 directories = new ArrayList<>(directories);
 directories.sort(Uuid::compareTo);
-this.directories = Collections.unmodifiableList(directories);
+this.sortedDirectories = Collections.unmodifiableList(directories);

Review Comment:
   We do that in the prior 2 lines



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] MINOR: Change test logging capture to per-test, reducing jenkins truncation [kafka]

2023-12-05 Thread via GitHub


gharris1727 commented on PR #14795:
URL: https://github.com/apache/kafka/pull/14795#issuecomment-1841693899

   @mimaison I've raised a ticket to ask the Infra team what they think about 
this change: https://issues.apache.org/jira/browse/INFRA-25245


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-15970 Copy over KIP-951 tests in FetcherTest.java to FetchRequestManagerTest.java [kafka]

2023-12-05 Thread via GitHub


wcarlson5 commented on PR #14916:
URL: https://github.com/apache/kafka/pull/14916#issuecomment-1841690681

   I'll rerun it and see if it gets any further :)


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-15361: Process and persist dir info with broker registration [kafka]

2023-12-05 Thread via GitHub


cmccabe commented on code in PR #14838:
URL: https://github.com/apache/kafka/pull/14838#discussion_r1416327866


##
metadata/src/main/java/org/apache/kafka/controller/ClusterControlManager.java:
##
@@ -354,6 +357,25 @@ public ControllerResult 
registerBroker(
 throw new BrokerIdNotRegisteredException("Controller is in 
pre-migration mode and cannot register KRaft " +
 "brokers until the metadata migration is complete.");
 }
+
+if (featureControl.metadataVersion().isDirectoryAssignmentSupported()) 
{
+Set set = new HashSet<>(request.logDirs());
+if (set.stream().anyMatch(DirectoryId::reserved)) {
+throw new InvalidRequestException("Reserved directory ID in 
request");
+}
+if (set.size() != request.logDirs().size()) {
+throw new InvalidRequestException("Duplicate directory ID in 
request");
+}
+for (BrokerRegistration registration : 
brokerRegistrations().values()) {

Review Comment:
   Another thing. This is O(num_brokers * num_directories) which isn't good for 
larger deployments. I think we should just have a `TimelineHashMap` that maps log dir UUID to the broker that has it. The main hassle is 
removing the mappings during broker unregistration, but that is rare, of course.



##
metadata/src/main/java/org/apache/kafka/controller/ClusterControlManager.java:
##
@@ -354,6 +357,25 @@ public ControllerResult 
registerBroker(
 throw new BrokerIdNotRegisteredException("Controller is in 
pre-migration mode and cannot register KRaft " +
 "brokers until the metadata migration is complete.");
 }
+
+if (featureControl.metadataVersion().isDirectoryAssignmentSupported()) 
{
+Set set = new HashSet<>(request.logDirs());
+if (set.stream().anyMatch(DirectoryId::reserved)) {
+throw new InvalidRequestException("Reserved directory ID in 
request");
+}
+if (set.size() != request.logDirs().size()) {
+throw new InvalidRequestException("Duplicate directory ID in 
request");
+}
+for (BrokerRegistration registration : 
brokerRegistrations().values()) {

Review Comment:
   Another thing. This is `O(num_brokers * num_directories)` which isn't good 
for larger deployments. I think we should just have a `TimelineHashMap` that maps log dir UUID to the broker that has it. The main hassle is 
removing the mappings during broker unregistration, but that is rare, of course.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-15361: Process and persist dir info with broker registration [kafka]

2023-12-05 Thread via GitHub


cmccabe commented on code in PR #14838:
URL: https://github.com/apache/kafka/pull/14838#discussion_r1416317197


##
metadata/src/main/java/org/apache/kafka/controller/ClusterControlManager.java:
##
@@ -354,6 +357,25 @@ public ControllerResult 
registerBroker(
 throw new BrokerIdNotRegisteredException("Controller is in 
pre-migration mode and cannot register KRaft " +
 "brokers until the metadata migration is complete.");
 }
+
+if (featureControl.metadataVersion().isDirectoryAssignmentSupported()) 
{

Review Comment:
   It should not be legal to send an empty `logDirs` array when metadataVersion 
>= 3.7-IV2
   
   Since setting that MV on the controller requires that all the brokers 
support it.
   
   It is, of course, OK to send logDirs when MV < 3.7-IV2. They will be ignored



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-15760: Disable flaky test testTaskRequestWithOldStartMsGetsUpdated [kafka]

2023-12-05 Thread via GitHub


splett2 commented on PR #14917:
URL: https://github.com/apache/kafka/pull/14917#issuecomment-1841683355

   @gharris1727 pretty odd that class loading takes so long, but the analysis 
seems reasonable.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-15361: Process and persist dir info with broker registration [kafka]

2023-12-05 Thread via GitHub


cmccabe commented on code in PR #14838:
URL: https://github.com/apache/kafka/pull/14838#discussion_r1416324308


##
metadata/src/test/java/org/apache/kafka/controller/ClusterControlManagerTest.java:
##
@@ -272,8 +274,17 @@ public void testRegistrationWithIncorrectClusterId() {
 (short) 1));
 }
 
+private static Stream metadataVersions() {
+return Stream.of(
+MetadataVersion.IBP_3_3_IV2,
+MetadataVersion.IBP_3_3_IV3,
+MetadataVersion.LATEST_PRODUCTION,

Review Comment:
   I feel like we should explicitly include `IBP_3_7_IV2` here rather than 
`LATEST_PRODUCTION`. After all, that is the first JBOD version so it's worth 
testing specifically, even after `LATEST_PRODUCTION` marches on.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-15910: New group coordinator needs to generate snapshots while loading [kafka]

2023-12-05 Thread via GitHub


jeffkbkim commented on code in PR #14849:
URL: https://github.com/apache/kafka/pull/14849#discussion_r1416324140


##
group-coordinator/src/main/java/org/apache/kafka/coordinator/group/runtime/CoordinatorRuntime.java:
##
@@ -906,8 +831,8 @@ public void run() {
 withActiveContextOrThrow(tp, context -> {
 // Execute the read operation.
 response = op.generateResponse(
-context.coordinator,
-context.lastCommittedOffset
+context.stateMachine.coordinator(),

Review Comment:
   The lock you're referring to is the SnapshottableCoordinator right? i can't 
think of a way to resolve this without making the code messier. This seems to 
require a big change in our code structure



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-15361: Process and persist dir info with broker registration [kafka]

2023-12-05 Thread via GitHub


cmccabe commented on code in PR #14838:
URL: https://github.com/apache/kafka/pull/14838#discussion_r1416321816


##
metadata/src/main/java/org/apache/kafka/metadata/BrokerRegistration.java:
##
@@ -337,7 +341,7 @@ public boolean equals(Object o) {
 other.fenced == fenced &&
 other.inControlledShutdown == inControlledShutdown &&
 other.isMigratingZkBroker == isMigratingZkBroker &&
-other.directories.equals(directories);
+other.sortedDirectories.equals(sortedDirectories);

Review Comment:
   This is another reason to go with sorted -- `list.equals` won't do the right 
thing otherwise.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-15361: Process and persist dir info with broker registration [kafka]

2023-12-05 Thread via GitHub


cmccabe commented on code in PR #14838:
URL: https://github.com/apache/kafka/pull/14838#discussion_r1416321397


##
metadata/src/main/java/org/apache/kafka/metadata/BrokerRegistration.java:
##
@@ -192,7 +192,7 @@ private BrokerRegistration(
 this.isMigratingZkBroker = isMigratingZkBroker;
 directories = new ArrayList<>(directories);
 directories.sort(Uuid::compareTo);
-this.directories = Collections.unmodifiableList(directories);
+this.sortedDirectories = Collections.unmodifiableList(directories);

Review Comment:
   Seems like we should copy the list and sort it here to make sure that it 
actually is sorted. It should be a short list (a dozen or so max) so not a big 
performance issue.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-15361: Process and persist dir info with broker registration [kafka]

2023-12-05 Thread via GitHub


cmccabe commented on code in PR #14838:
URL: https://github.com/apache/kafka/pull/14838#discussion_r1416319067


##
metadata/src/main/java/org/apache/kafka/controller/ClusterControlManager.java:
##
@@ -354,6 +357,25 @@ public ControllerResult 
registerBroker(
 throw new BrokerIdNotRegisteredException("Controller is in 
pre-migration mode and cannot register KRaft " +
 "brokers until the metadata migration is complete.");
 }
+
+if (featureControl.metadataVersion().isDirectoryAssignmentSupported()) 
{
+Set set = new HashSet<>(request.logDirs());
+if (set.stream().anyMatch(DirectoryId::reserved)) {
+throw new InvalidRequestException("Reserved directory ID in 
request");
+}
+if (set.size() != request.logDirs().size()) {
+throw new InvalidRequestException("Duplicate directory ID in 
request");
+}
+for (BrokerRegistration registration : 
brokerRegistrations().values()) {

Review Comment:
   I think we should have a custom error code for this situation. 
InvalidRequestException seems too generic.
   
   Also, we shouldn't be checking the previous registration for this specific 
broker ID... it would be quite normal to re-register broker ID X with the same 
set of directories it had last time we registered it.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-15361: Process and persist dir info with broker registration [kafka]

2023-12-05 Thread via GitHub


cmccabe commented on code in PR #14838:
URL: https://github.com/apache/kafka/pull/14838#discussion_r1416317197


##
metadata/src/main/java/org/apache/kafka/controller/ClusterControlManager.java:
##
@@ -354,6 +357,25 @@ public ControllerResult 
registerBroker(
 throw new BrokerIdNotRegisteredException("Controller is in 
pre-migration mode and cannot register KRaft " +
 "brokers until the metadata migration is complete.");
 }
+
+if (featureControl.metadataVersion().isDirectoryAssignmentSupported()) 
{

Review Comment:
   If the metadata version does not support directory assignment then it 
shouldn't be sent by the brokers, right?
   
   And if it does support directory assignment but the brokers are not sending 
directories, we need to fail the heartbeat.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-15932: Wait for responses in consumer operations [kafka]

2023-12-05 Thread via GitHub


AndrewJSchofield commented on PR #14912:
URL: https://github.com/apache/kafka/pull/14912#issuecomment-1841665124

   Closing PR and re-opening to restart build.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-15932: Wait for responses in consumer operations [kafka]

2023-12-05 Thread via GitHub


AndrewJSchofield closed pull request #14912: KAFKA-15932: Wait for responses in 
consumer operations
URL: https://github.com/apache/kafka/pull/14912


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-15347: add support for 'single key multi timestamp' IQs with versioned state stores (KIP-968) [kafka]

2023-12-05 Thread via GitHub


mjsax commented on code in PR #14626:
URL: https://github.com/apache/kafka/pull/14626#discussion_r1416297822


##
streams/src/test/java/org/apache/kafka/streams/integration/IQv2VersionedStoreIntegrationTest.java:
##
@@ -88,7 +87,7 @@ public class IQv2VersionedStoreIntegrationTest {
 
 public static final EmbeddedKafkaCluster CLUSTER = new 
EmbeddedKafkaCluster(NUM_BROKERS, 
Utils.mkProperties(Collections.singletonMap("auto.create.topics.enable", 
"true")));
 
-private KafkaStreams kafkaStreams;
+private static KafkaStreams kafkaStreams;

Review Comment:
   Not sure if it's ok to make this one `static`? It's setup inside `@Before` 
which is not static and we run test in parallel...
   
   I think we need to rather pass it as a parameter into the newly added 
`static` methods that use it.



##
streams/src/test/java/org/apache/kafka/streams/integration/IQv2VersionedStoreIntegrationTest.java:
##
@@ -168,16 +167,13 @@ private void shouldHandleVersionedKeyQuery(final 
Optional queryTimestam
final Long expectedTimestamp,
final Optional 
expectedValidToTime) {
 
-VersionedKeyQuery query = 
VersionedKeyQuery.withKey(RECORD_KEY);
-if (queryTimestamp.isPresent()) {
-query = query.asOf(queryTimestamp.get());
-}
-
-final StateQueryRequest> request = 
StateQueryRequest.inStore(STORE_NAME).withQuery(query).withPositionBound(PositionBound.at(INPUT_POSITION));
-final StateQueryResult> result = 
IntegrationTestUtils.iqv2WaitForResult(kafkaStreams, request);
+// define query

Review Comment:
   Comment can be removed. We call `defineQuery()` -- very obvious what is 
happening (what also means, it's well structured and easy to understand code)



##
streams/src/main/java/org/apache/kafka/streams/query/MultiVersionedKeyQuery.java:
##
@@ -57,7 +57,7 @@ private MultiVersionedKeyQuery(final K key, final 
Optional fromTime, fi
* @param key The specified key by the query
* @param  The type of the key
* @param  The type of the value that will be retrieved
-   * @throws NullPointerException if @param key is null
+   * @throws NullPointerException if @code key is null

Review Comment:
   This need to be in `{}` -> `{@code key}`



##
streams/src/test/java/org/apache/kafka/streams/integration/IQv2VersionedStoreIntegrationTest.java:
##
@@ -168,16 +167,13 @@ private void shouldHandleVersionedKeyQuery(final 
Optional queryTimestam
final Long expectedTimestamp,
final Optional 
expectedValidToTime) {
 
-VersionedKeyQuery query = 
VersionedKeyQuery.withKey(RECORD_KEY);
-if (queryTimestamp.isPresent()) {
-query = query.asOf(queryTimestamp.get());
-}
-
-final StateQueryRequest> request = 
StateQueryRequest.inStore(STORE_NAME).withQuery(query).withPositionBound(PositionBound.at(INPUT_POSITION));
-final StateQueryResult> result = 
IntegrationTestUtils.iqv2WaitForResult(kafkaStreams, request);
+// define query
+final VersionedKeyQuery query = 
defineQuery(RECORD_KEY, queryTimestamp);
 
-final QueryResult> queryResult = 
result.getOnlyPartitionResult();
+// send request and receive results

Review Comment:
   as above (more of the same below)



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[jira] [Created] (KAFKA-15975) Update kafka quickstart guide to no longer list ZK start first

2023-12-05 Thread Justine Olshan (Jira)
Justine Olshan created KAFKA-15975:
--

 Summary: Update kafka quickstart guide to no longer list ZK start 
first
 Key: KAFKA-15975
 URL: https://issues.apache.org/jira/browse/KAFKA-15975
 Project: Kafka
  Issue Type: Task
  Components: docs
Affects Versions: 4.0.0
Reporter: Justine Olshan


Given we are deprecating ZooKeeper, I think we should update our quickstart 
guide to not list the ZooKeeper instructions first.

With 4.0, we may want to remove it entirely.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


Re: [PR] MINOR: fully encapsulate user restore listener in the DelegatingRestoreListener [kafka]

2023-12-05 Thread via GitHub


ableegoldman commented on PR #14886:
URL: https://github.com/apache/kafka/pull/14886#issuecomment-1841644618

   Lots of test failures, but all are unrelated


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-15361: Process and persist dir info with broker registration [kafka]

2023-12-05 Thread via GitHub


rondagostino commented on PR #14838:
URL: https://github.com/apache/kafka/pull/14838#issuecomment-1841623911

   
https://ci-builds.apache.org/blue/organizations/jenkins/Kafka%2Fkafka-pr/detail/PR-14838/13/tests
   ```
   208 tests have failed
   There are 20 new tests failing, 188 existing failing and 272 skipped
   ```
   @soarez it seems that something is not correct.
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAKFA-15629: Update upgrade-guide.html for TimestampedKeyQuery and TimestampedRangeQuery [kafka]

2023-12-05 Thread via GitHub


hanyuzheng7 commented on code in PR #14898:
URL: https://github.com/apache/kafka/pull/14898#discussion_r1416268185


##
docs/streams/upgrade-guide.html:
##
@@ -136,7 +136,12 @@ <
 Streams API changes in 3.7.0
 
 IQv2 supports RangeQuery that allows to specify 
unbounded, bounded, or half-open key-ranges, which return data in ascending 
(byte[]-lexicographical) order (per partition).
-https://cwiki.apache.org/confluence/display/KAFKA/KIP-985%3A+Add+reverseRange+and+reverseAll+query+over+kv-store+in+IQv2;>KIP-985
 extends this functionality by adding .withDescendingKeys() to 
allow user to receive data in descending order.
+https://cwiki.apache.org/confluence/display/KAFKA/KIP-985%3A+Add+reverseRange+and+reverseAll+query+over+kv-store+in+IQv2;>KIP-985
 extends this functionality by adding .withDescendingKeys() to 
allow user to receive data in descending order.
+
+
+
+KIP-992 add two new query types, namely 
TimestampedKeyQuery and TimestampedRangeQuery. Both 
should be used to query a timestamped key-value store, to retrieve a 
ValueAndTimestamp result. The existing KeyQuery and 
RangeQuery are changes to always return the value only, also for 
timestamped key-value stores.

Review Comment:
   ok



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-14412: Generalise over RocksDB WriteBatch [kafka]

2023-12-05 Thread via GitHub


ableegoldman commented on PR #14853:
URL: https://github.com/apache/kafka/pull/14853#issuecomment-1841596989

   Strangely one of the builds failed to compile due to some issue in the 
upgrade test/smoke client -- seems unrelated but we can't merge with a broken 
build, so let's retry and hope it goes away 爛 


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-15364: Handle log directory failure in the Controller [kafka]

2023-12-05 Thread via GitHub


soarez commented on code in PR #14902:
URL: https://github.com/apache/kafka/pull/14902#discussion_r1416264410


##
metadata/src/main/java/org/apache/kafka/controller/BrokerHeartbeatManager.java:
##
@@ -107,6 +125,13 @@ int id() {
 return id;
 }
 
+/**
+ * Returns the sorted directories which the broker currently has 
available
+ */
+List directories() {

Review Comment:
   This was part of 
[KAFKA-15361](https://issues.apache.org/jira/browse/KAFKA-15361) 
(https://github.com/apache/kafka/pull/14902) and is now gone since we removed 
dirs from the heartbeat manager.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-15364: Handle log directory failure in the Controller [kafka]

2023-12-05 Thread via GitHub


soarez commented on code in PR #14902:
URL: https://github.com/apache/kafka/pull/14902#discussion_r1416263442


##
core/src/main/scala/kafka/server/ControllerApis.scala:
##
@@ -1070,11 +1070,27 @@ class ControllerApis(
   }
 
   def handleAssignReplicasToDirs(request: RequestChannel.Request): 
CompletableFuture[Unit] = {
+authHelper.authorizeClusterOperation(request, CLUSTER_ACTION)
 val assignReplicasToDirsRequest = request.body[AssignReplicasToDirsRequest]
-
-// TODO KAFKA-15426
-requestHelper.sendMaybeThrottle(request,
-  
assignReplicasToDirsRequest.getErrorResponse(Errors.UNSUPPORTED_VERSION.exception))
-CompletableFuture.completedFuture[Unit](())
+val context = new ControllerRequestContext(request.context.header.data, 
request.context.principal,
+  OptionalLong.empty())
+controller.assignReplicasToDirs(context, 
assignReplicasToDirsRequest.data).handle[Unit] { (reply, e) =>

Review Comment:
   Good point.
   
   This change isn't really part of this PR. This PR builds on KAFKA-15361 
(#14902) and KAFKA-15426 (#14863). This change is part of #14863, so this was 
addressed in 
[2874e11](https://github.com/apache/kafka/pull/14863/commits/2874e11f53c3a3fc73d480614f8d1cf695754bb8).



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-15361: Process and persist dir info with broker registration [kafka]

2023-12-05 Thread via GitHub


soarez commented on PR #14838:
URL: https://github.com/apache/kafka/pull/14838#issuecomment-1841564307

   Thanks for the review.
   
   @rondagostino AFAICT, the tests are passing on all JDK versions. Did I miss 
something?


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-15365: Broker-side replica management changes [kafka]

2023-12-05 Thread via GitHub


soarez commented on PR #14881:
URL: https://github.com/apache/kafka/pull/14881#issuecomment-1841561078

   @rondagostino @cmccabe @pprovenzano PTAL


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-15365: Broker-side replica management changes [kafka]

2023-12-05 Thread via GitHub


soarez commented on code in PR #14881:
URL: https://github.com/apache/kafka/pull/14881#discussion_r1416239117


##
core/src/test/scala/unit/kafka/log/LogManagerTest.scala:
##
@@ -93,6 +93,54 @@ class LogManagerTest {
 log.appendAsLeader(TestUtils.singletonRecords("test".getBytes()), 
leaderEpoch = 0)
   }
 
+  /**
+   * Test that getOrCreateLog on a non-existent log creates a new log in given 
logDirectory using directory id and that we can append to the new log.
+   */
+  @Test
+  def testCreateLogOnTargetedLogDirectory(): Unit = {
+val targetedLogDirectoryId = DirectoryId.random()
+
+val dirs: Seq[File] = Seq.fill(5)(TestUtils.tempDir())
+writeMetaProperties(dirs(0))
+writeMetaProperties(dirs(1), Optional.of(targetedLogDirectoryId))
+writeMetaProperties(dirs(3), Optional.of(DirectoryId.random()))
+writeMetaProperties(dirs(4))
+
+logManager = createLogManager(dirs)
+
+val log = logManager.getOrCreateLog(new TopicPartition(name, 0), topicId = 
None, targetLogDirectoryId = Some(targetedLogDirectoryId))
+assertEquals(5, logManager.liveLogDirs.size)
+
+val logFile = new File(dirs(1), name + "-0")
+assertTrue(logFile.exists)
+assertEquals(dirs(1).getAbsolutePath, logFile.getParent)
+log.appendAsLeader(TestUtils.singletonRecords("test".getBytes()), 
leaderEpoch = 0)
+  }
+
+  /**
+   * Test that getOrCreateLog on a non-existent log creates a new log in 
random logDirectory if the given directory id is DirectoryId.UNASSIGNED.

Review Comment:
   Not random



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-15365: Broker-side replica management changes [kafka]

2023-12-05 Thread via GitHub


soarez commented on code in PR #14881:
URL: https://github.com/apache/kafka/pull/14881#discussion_r1416236550


##
core/src/main/scala/kafka/log/LogManager.scala:
##
@@ -991,18 +997,29 @@ class LogManager(logDirs: Seq[File],
* @param isNew Whether the replica should have existed on the broker or not
* @param isFuture True if the future log of the specified partition should 
be returned or created
* @param topicId The topic ID of the partition's topic
+   * @param targetLogDirectoryId The directory Id that should host the the 
partition's topic.
+   * The next selected directory will be picked up 
if it None or equal {@link DirectoryId.UNASSIGNED}.
+   * The method assumes provided Id belong to 
online directory.
* @throws KafkaStorageException if isNew=false, log is not found in the 
cache and there is offline log directory on the broker
* @throws InconsistentTopicIdException if the topic ID in the log does not 
match the topic ID provided
*/
-  def getOrCreateLog(topicPartition: TopicPartition, isNew: Boolean = false, 
isFuture: Boolean = false, topicId: Option[Uuid]): UnifiedLog = {
+  def getOrCreateLog(topicPartition: TopicPartition, isNew: Boolean = false, 
isFuture: Boolean = false,
+ topicId: Option[Uuid], targetLogDirectoryId: Option[Uuid] 
= Option.empty): UnifiedLog = {
 logCreationOrDeletionLock synchronized {
   val log = getLog(topicPartition, isFuture).getOrElse {
 // create the log if it has not already been created in another thread
 if (!isNew && offlineLogDirs.nonEmpty)
   throw new KafkaStorageException(s"Can not create log for 
$topicPartition because log directories ${offlineLogDirs.mkString(",")} are 
offline")
 
 val logDirs: List[File] = {
-  val preferredLogDir = preferredLogDirs.get(topicPartition)
+  val preferredLogDir = targetLogDirectoryId.filterNot(_ == 
DirectoryId.UNASSIGNED) match {
+case Some(targetId) if 
!preferredLogDirs.containsKey(topicPartition) =>
+  // If partition is configured with both targetLogDirectoryId and 
preferredLogDirs, then
+  // preferredLogDirs will be respected, otherwise 
targetLogDirectoryId will be respected
+  directoryIds.find(_._2 == targetId).map(_._1).getOrElse(null)

Review Comment:
   This makes sense. The preferredLogDir is set by an admin, and if that exists 
it should take precedence.  



##
core/src/main/scala/kafka/log/LogManager.scala:
##
@@ -991,18 +997,29 @@ class LogManager(logDirs: Seq[File],
* @param isNew Whether the replica should have existed on the broker or not
* @param isFuture True if the future log of the specified partition should 
be returned or created
* @param topicId The topic ID of the partition's topic
+   * @param targetLogDirectoryId The directory Id that should host the the 
partition's topic.
+   * The next selected directory will be picked up 
if it None or equal {@link DirectoryId.UNASSIGNED}.
+   * The method assumes provided Id belong to 
online directory.
* @throws KafkaStorageException if isNew=false, log is not found in the 
cache and there is offline log directory on the broker
* @throws InconsistentTopicIdException if the topic ID in the log does not 
match the topic ID provided
*/
-  def getOrCreateLog(topicPartition: TopicPartition, isNew: Boolean = false, 
isFuture: Boolean = false, topicId: Option[Uuid]): UnifiedLog = {
+  def getOrCreateLog(topicPartition: TopicPartition, isNew: Boolean = false, 
isFuture: Boolean = false,
+ topicId: Option[Uuid], targetLogDirectoryId: Option[Uuid] 
= Option.empty): UnifiedLog = {
 logCreationOrDeletionLock synchronized {
   val log = getLog(topicPartition, isFuture).getOrElse {
 // create the log if it has not already been created in another thread
 if (!isNew && offlineLogDirs.nonEmpty)
   throw new KafkaStorageException(s"Can not create log for 
$topicPartition because log directories ${offlineLogDirs.mkString(",")} are 
offline")
 
 val logDirs: List[File] = {
-  val preferredLogDir = preferredLogDirs.get(topicPartition)
+  val preferredLogDir = targetLogDirectoryId.filterNot(_ == 
DirectoryId.UNASSIGNED) match {
+case Some(targetId) if 
!preferredLogDirs.containsKey(topicPartition) =>
+  // If partition is configured with both targetLogDirectoryId and 
preferredLogDirs, then
+  // preferredLogDirs will be respected, otherwise 
targetLogDirectoryId will be respected
+  directoryIds.find(_._2 == targetId).map(_._1).getOrElse(null)

Review Comment:
   This makes sense. The preferredLogDir is set by an admin, and if that exists 
it should take precedence.  



-- 
This is an automated message from the Apache Git Service.
To 

  1   2   3   >