[jira] [Updated] (KAFKA-15867) Should ConsumerNetworkThread wrap the exception and notify the polling thread?

2023-11-20 Thread Philip Nee (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-15867?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Philip Nee updated KAFKA-15867:
---
Parent: KAFKA-14246
Issue Type: Sub-task  (was: Bug)

> Should ConsumerNetworkThread wrap the exception and notify the polling thread?
> --
>
> Key: KAFKA-15867
> URL: https://issues.apache.org/jira/browse/KAFKA-15867
> Project: Kafka
>  Issue Type: Sub-task
>  Components: consumer
>Reporter: Philip Nee
>Priority: Minor
>
> The ConsumerNetworkThread runs a tight loop infinitely.  However, when 
> encountering an unexpected exception, it logs an error and continues.
>  
> I think this might not be ideal because user can run blind for a long time 
> before discovering there's something wrong with the code; so I believe we 
> should propagate the throwable back to the polling thread. 
>  
> cc [~lucasbru] 



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Created] (KAFKA-15867) Should ConsumerNetworkThread wrap the exception and notify the polling thread?

2023-11-20 Thread Philip Nee (Jira)
Philip Nee created KAFKA-15867:
--

 Summary: Should ConsumerNetworkThread wrap the exception and 
notify the polling thread?
 Key: KAFKA-15867
 URL: https://issues.apache.org/jira/browse/KAFKA-15867
 Project: Kafka
  Issue Type: Bug
  Components: consumer
Reporter: Philip Nee


The ConsumerNetworkThread runs a tight loop infinitely.  However, when 
encountering an unexpected exception, it logs an error and continues.

 

I think this might not be ideal because user can run blind for a long time 
before discovering there's something wrong with the code; so I believe we 
should propagate the throwable back to the polling thread. 

 

cc [~lucasbru] 



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


Re: [PR] KAFKA-15327: ensure the commit manager commit on close [kafka]

2023-11-20 Thread via GitHub


philipnee commented on code in PR #14710:
URL: https://github.com/apache/kafka/pull/14710#discussion_r1400134639


##
clients/src/test/java/org/apache/kafka/clients/consumer/internals/CommitRequestManagerTest.java:
##
@@ -353,7 +380,8 @@ private void testRetriable(final CommitRequestManager 
commitRequestManger,
final 
List>> futures) {
 futures.forEach(f -> assertFalse(f.isDone()));
 
-time.sleep(500);
+// The manager should backoff for 100ms
+time.sleep(100);

Review Comment:
   changed to 100 because we are now using jitter = 0, so the backoff during 
testing is deterministic and should always be 100ms



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-15327: ensure the commit manager commit on close [kafka]

2023-11-20 Thread via GitHub


philipnee commented on PR #14710:
URL: https://github.com/apache/kafka/pull/14710#issuecomment-1820383565

   Hey @lucasbru - Thanks again for taking the time reviewing the PR.  I've 
made some cleanup and changes according to your comments. Let me know if you 
have further questions.
   
   I've rebased the trunk so worry about messing up the commits a bit.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-15327: ensure the commit manager commit on close [kafka]

2023-11-20 Thread via GitHub


philipnee commented on code in PR #14710:
URL: https://github.com/apache/kafka/pull/14710#discussion_r1400134639


##
clients/src/test/java/org/apache/kafka/clients/consumer/internals/CommitRequestManagerTest.java:
##
@@ -353,7 +380,8 @@ private void testRetriable(final CommitRequestManager 
commitRequestManger,
final 
List>> futures) {
 futures.forEach(f -> assertFalse(f.isDone()));
 
-time.sleep(500);
+// The manager should backoff for 100ms
+time.sleep(100);

Review Comment:
   changed to 100 because we are now using jitter = 0, so the next backoff 
should always be 100ms



##
clients/src/test/java/org/apache/kafka/clients/consumer/internals/CommitRequestManagerTest.java:
##
@@ -373,7 +401,6 @@ private static Stream 
offsetCommitExceptionSupplier() {
 Arguments.of(Errors.INVALID_COMMIT_OFFSET_SIZE, false),
 Arguments.of(Errors.UNKNOWN_TOPIC_OR_PARTITION, true),
 Arguments.of(Errors.COORDINATOR_NOT_AVAILABLE, true),
-Arguments.of(Errors.NOT_COORDINATOR, true),

Review Comment:
   Both are removed due to duplication, similar to the error below.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[jira] [Created] (KAFKA-15866) Refactor OffsetFetchRequestState Error handling to be more consistent with OffsetCommitRequestState

2023-11-20 Thread Philip Nee (Jira)
Philip Nee created KAFKA-15866:
--

 Summary: Refactor OffsetFetchRequestState Error handling to be 
more consistent with OffsetCommitRequestState
 Key: KAFKA-15866
 URL: https://issues.apache.org/jira/browse/KAFKA-15866
 Project: Kafka
  Issue Type: Improvement
Reporter: Philip Nee


The current OffsetFetchRequestState error handling uses nested if-else, which 
is quite different, stylistically, to the OffsetCommitRequestState using a 
switch statment.  The latter is a bit more readable so we should refactor the 
error handling using the same style to improve readability.

 

A minor point: Some of the error handling seems inconsistent with the commit. 
The logic was from the current implementation, so we should also review all the 
error handling.  For example: somehow the current logic doesn't mark the 
coordinator unavailable when receiving COORDINATOR_NOT_AVAILABLE



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Updated] (KAFKA-15866) Refactor OffsetFetchRequestState Error handling to be more consistent with OffsetCommitRequestState

2023-11-20 Thread Philip Nee (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-15866?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Philip Nee updated KAFKA-15866:
---
Parent: KAFKA-14246
Issue Type: Sub-task  (was: Improvement)

> Refactor OffsetFetchRequestState Error handling to be more consistent with 
> OffsetCommitRequestState
> ---
>
> Key: KAFKA-15866
> URL: https://issues.apache.org/jira/browse/KAFKA-15866
> Project: Kafka
>  Issue Type: Sub-task
>Reporter: Philip Nee
>Priority: Minor
>
> The current OffsetFetchRequestState error handling uses nested if-else, which 
> is quite different, stylistically, to the OffsetCommitRequestState using a 
> switch statment.  The latter is a bit more readable so we should refactor the 
> error handling using the same style to improve readability.
>  
> A minor point: Some of the error handling seems inconsistent with the commit. 
> The logic was from the current implementation, so we should also review all 
> the error handling.  For example: somehow the current logic doesn't mark the 
> coordinator unavailable when receiving COORDINATOR_NOT_AVAILABLE



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Commented] (KAFKA-15865) Ensure consumer.poll() execute autocommit callback

2023-11-20 Thread Philip Nee (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-15865?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17788298#comment-17788298
 ] 

Philip Nee commented on KAFKA-15865:


[~lucasbru] [~cadonna] - Would either of you be interested in taking this on? 
This is a pretty simple task.

> Ensure consumer.poll() execute autocommit callback
> --
>
> Key: KAFKA-15865
> URL: https://issues.apache.org/jira/browse/KAFKA-15865
> Project: Kafka
>  Issue Type: Sub-task
>  Components: consumer
>Reporter: Philip Nee
>Assignee: Philip Nee
>Priority: Major
>  Labels: consumer-threading-refactor, kip-848-preview
>
> When the network thread completes autocommits, we need to send a 
> message/event to the application to notify the thread to execute the 
> callback.  In KAFKA-15327, the network thread sends a 
> AutoCommitCompletionBackgroundEvent to the polling thread.  The polling 
> thread should trigger the OffsetCommitCallback upon receiving it.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


Re: [PR] KAFKA-15327: ensure the commit manager commit on close [kafka]

2023-11-20 Thread via GitHub


philipnee commented on code in PR #14710:
URL: https://github.com/apache/kafka/pull/14710#discussion_r1400074015


##
clients/src/main/java/org/apache/kafka/clients/consumer/internals/ConsumerNetworkThread.java:
##
@@ -257,10 +257,77 @@ private void closeInternal(final Duration timeout) {
 void cleanup() {
 log.trace("Closing the consumer network thread");
 Timer timer = time.timer(closeTimeout);
+waitForClosingTasks(timer);
 runAtClose(requestManagers.entries(), networkClientDelegate, timer);
 closeQuietly(requestManagers, "request managers");
 closeQuietly(networkClientDelegate, "network client delegate");
 closeQuietly(applicationEventProcessor, "application event processor");
 log.debug("Closed the consumer network thread");
 }
+
+/**
+ * We need to autocommit before shutting down the consumer. The method 
needs to first connect to the coordinator
+ * node to construct the closing requests.  Then wait for all closing 
requests to finish before returning.  The
+ * method is bounded by a closing timer.  We will continue closing down 
the consumer if the requests cannot be
+ * completed in time.
+ */
+// Visible for testing
+void waitForClosingTasks(final Timer timer) {

Review Comment:
   You are right.  I think the original code is actually less confusing because 
sending auto commits needs to happen before closing down the 
`ConsumerCoordinator`. Now these tasks (closingTasks here) are the operations 
that need to happen before closing the network thread (Because we are shutting 
down the consumer).
   
   I wonder if I could make it less confusing by renaming the method. See the 
changes there. 



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-15327: ensure the commit manager commit on close [kafka]

2023-11-20 Thread via GitHub


philipnee commented on code in PR #14710:
URL: https://github.com/apache/kafka/pull/14710#discussion_r1400115792


##
clients/src/main/java/org/apache/kafka/clients/consumer/internals/CommitRequestManager.java:
##
@@ -454,6 +491,17 @@ public OffsetFetchRequestState(final Set 
partitions,
 this.future = new CompletableFuture<>();
 }
 
+public OffsetFetchRequestState(final Set partitions,
+   final GroupState.Generation generation,
+   final long retryBackoffMs,
+   final long retryBackoffMaxMs,
+   final double jitter) {
+super(logContext, CommitRequestManager.class.getSimpleName(), 
retryBackoffMs, 2, retryBackoffMaxMs, jitter);

Review Comment:
   @lucasbru - Just FYI: I added this so that exponential backoff during 
testing doesn't have jitter. 



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-15327: ensure the commit manager commit on close [kafka]

2023-11-20 Thread via GitHub


philipnee commented on code in PR #14710:
URL: https://github.com/apache/kafka/pull/14710#discussion_r1400064245


##
clients/src/main/java/org/apache/kafka/clients/consumer/internals/CoordinatorRequestManager.java:
##
@@ -219,4 +219,14 @@ private void onResponse(
 public Optional coordinator() {
 return Optional.ofNullable(this.coordinator);
 }
+
+@Override
+public NetworkClientDelegate.PollResult pollOnClose() {

Review Comment:
   It was a hack to make sure poll can always send a find coordinator request. 
Giving it a second thought, I think just calling poll is fine because if there 
is an inflight request, we should poll the network client and wait.  If we just 
sent a failed one, we should backoff and then resend.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[jira] [Comment Edited] (KAFKA-15556) Remove NetworkClientDelegate methods isUnavailable, maybeThrowAuthFailure, and tryConnect

2023-11-20 Thread Phuc Hong Tran (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-15556?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17788233#comment-17788233
 ] 

Phuc Hong Tran edited comment on KAFKA-15556 at 11/21/23 6:23 AM:
--

Hi [~kirktrue], just to clarify, with this Jira we want to check if we can 
remove {{networkClientDelegate.isUnavailable}}, 
{{networkClientDelegate.maybeThrowAuthFailure}} in {{FetchRequestManager}} and 
{{networkClientDelegate.tryConnect}} in {{OffsetsRequestManager}} with direct 
call to NetworkClientUtils version of those functions instead?


was (Author: JIRAUSER301295):
Hi [~kirktrue], just to clarify, with this Jira we want to remove 
"networkClientDelegate.isUnavailable", 
"networkClientDelegate.maybeThrowAuthFailure" in FetchRequestManager and 
"networkClientDelegate.tryConnect" in {{OffsetsRequestManager}} with direct 
call to NetworkClientUtils version of those functions instead?

> Remove NetworkClientDelegate methods isUnavailable, maybeThrowAuthFailure, 
> and tryConnect
> -
>
> Key: KAFKA-15556
> URL: https://issues.apache.org/jira/browse/KAFKA-15556
> Project: Kafka
>  Issue Type: Sub-task
>  Components: clients, consumer
>Reporter: Kirk True
>Assignee: Phuc Hong Tran
>Priority: Major
>  Labels: consumer-threading-refactor
>
> The "new consumer" (i.e. {{{}PrototypeAsyncConsumer{}}}) was designed to 
> handle networking details in a more centralized way. However, in order to 
> reuse code between the existing {{KafkaConsumer}} and the new 
> {{{}PrototypeAsyncConsumer{}}}, that design goal was "relaxed" when the 
> {{NetworkClientDelegate}} capitulated and -stole- copied three methods from 
> {{ConsumerNetworkClient}} related to detecting node status:
>  # {{isUnavailable}}
>  # {{maybeThrowAuthFailure}}
>  # {{tryConnect}}
> Unfortunately, these have found their way into the {{FetchRequestManager}} 
> and {{OffsetsRequestManager}} implementations. We should review if we can 
> clean up—or even remove—this leaky abstraction.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


Re: [PR] KAFKA-15327: ensure the commit manager commit on close [kafka]

2023-11-20 Thread via GitHub


philipnee commented on code in PR #14710:
URL: https://github.com/apache/kafka/pull/14710#discussion_r1400074015


##
clients/src/main/java/org/apache/kafka/clients/consumer/internals/ConsumerNetworkThread.java:
##
@@ -257,10 +257,77 @@ private void closeInternal(final Duration timeout) {
 void cleanup() {
 log.trace("Closing the consumer network thread");
 Timer timer = time.timer(closeTimeout);
+waitForClosingTasks(timer);
 runAtClose(requestManagers.entries(), networkClientDelegate, timer);
 closeQuietly(requestManagers, "request managers");
 closeQuietly(networkClientDelegate, "network client delegate");
 closeQuietly(applicationEventProcessor, "application event processor");
 log.debug("Closed the consumer network thread");
 }
+
+/**
+ * We need to autocommit before shutting down the consumer. The method 
needs to first connect to the coordinator
+ * node to construct the closing requests.  Then wait for all closing 
requests to finish before returning.  The
+ * method is bounded by a closing timer.  We will continue closing down 
the consumer if the requests cannot be
+ * completed in time.
+ */
+// Visible for testing
+void waitForClosingTasks(final Timer timer) {

Review Comment:
   You are right.  I think the original code is actually less confusing because 
sending auto commits needs to happen before closing down the 
`ConsumerCoordinator`. Now these tasks (closingTasks here) are the operations 
that need to happen before closing the network thread (Because we are shutting 
down the consumer).
   
   I wonder if structuring the runOnClose this way would make the code more 
readable
   ```
   void runOnClose(Timer timer) {
 maybeAutoCommitAndLeaveGroup(timer);
 ...
   }



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[jira] [Updated] (KAFKA-15865) Ensure consumer.poll() execute autocommit callback

2023-11-20 Thread Philip Nee (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-15865?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Philip Nee updated KAFKA-15865:
---
Labels: consumer-threading-refactor kip-848-preview  (was: )

> Ensure consumer.poll() execute autocommit callback
> --
>
> Key: KAFKA-15865
> URL: https://issues.apache.org/jira/browse/KAFKA-15865
> Project: Kafka
>  Issue Type: Sub-task
>  Components: consumer
>Reporter: Philip Nee
>Assignee: Philip Nee
>Priority: Major
>  Labels: consumer-threading-refactor, kip-848-preview
>
> When the network thread completes autocommits, we need to send a 
> message/event to the application to notify the thread to execute the 
> callback.  In KAFKA-15327, the network thread sends a 
> AutoCommitCompletionBackgroundEvent to the polling thread.  The polling 
> thread should trigger the OffsetCommitCallback upon receiving it.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Updated] (KAFKA-15865) Ensure consumer.poll() execute autocommit callback

2023-11-20 Thread Philip Nee (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-15865?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Philip Nee updated KAFKA-15865:
---
Parent: KAFKA-14246
Issue Type: Sub-task  (was: Bug)

> Ensure consumer.poll() execute autocommit callback
> --
>
> Key: KAFKA-15865
> URL: https://issues.apache.org/jira/browse/KAFKA-15865
> Project: Kafka
>  Issue Type: Sub-task
>  Components: consumer
>Reporter: Philip Nee
>Assignee: Philip Nee
>Priority: Major
>
> When the network thread completes autocommits, we need to send a 
> message/event to the application to notify the thread to execute the 
> callback.  In KAFKA-15327, the network thread sends a 
> AutoCommitCompletionBackgroundEvent to the polling thread.  The polling 
> thread should trigger the OffsetCommitCallback upon receiving it.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


Re: [PR] KAFKA-15327: ensure the commit manager commit on close [kafka]

2023-11-20 Thread via GitHub


philipnee commented on code in PR #14710:
URL: https://github.com/apache/kafka/pull/14710#discussion_r1400070106


##
clients/src/main/java/org/apache/kafka/clients/consumer/internals/events/AutoCommitCompletionBackgroundEvent.java:
##
@@ -0,0 +1,23 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.clients.consumer.internals.events;
+
+public class AutoCommitCompletionBackgroundEvent extends BackgroundEvent {

Review Comment:
   Thanks for pointing this out.  Would it be possible to followup with a 
separated PR? I filed a jira here: 
https://issues.apache.org/jira/browse/KAFKA-15865



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[jira] [Created] (KAFKA-15865) Ensure consumer.poll() execute autocommit callback

2023-11-20 Thread Philip Nee (Jira)
Philip Nee created KAFKA-15865:
--

 Summary: Ensure consumer.poll() execute autocommit callback
 Key: KAFKA-15865
 URL: https://issues.apache.org/jira/browse/KAFKA-15865
 Project: Kafka
  Issue Type: Bug
  Components: consumer
Reporter: Philip Nee
Assignee: Philip Nee


When the network thread completes autocommits, we need to send a message/event 
to the application to notify the thread to execute the callback.  In 
KAFKA-15327, the network thread sends a AutoCommitCompletionBackgroundEvent to 
the polling thread.  The polling thread should trigger the OffsetCommitCallback 
upon receiving it.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


Re: [PR] KAFKA-15327: ensure the commit manager commit on close [kafka]

2023-11-20 Thread via GitHub


philipnee commented on code in PR #14710:
URL: https://github.com/apache/kafka/pull/14710#discussion_r1400064652


##
clients/src/main/java/org/apache/kafka/clients/consumer/internals/CommitRequestManager.java:
##
@@ -180,12 +188,47 @@ public void maybeAutoCommit(final Map offsets
 autocommit.setInflightCommitStatus(true);
 }
 
+/**
+ * The consumer needs to send an auto commit during the shutdown if 
autocommit is enabled.
+ */
+Optional maybeAutoCommit() {
+if (!autoCommitState.isPresent()) {
+return Optional.empty();
+}
+
+OffsetCommitRequestState request = 
pendingRequests.createOffsetCommitRequest(subscriptions.allConsumed(), jitter);
+
request.future.whenComplete(autoCommitCallback(subscriptions.allConsumed()));
+return Optional.of(request.toUnsentRequest());
+}
+
+   // Visible for testing
+CompletableFuture sendAutoCommit(final Map allConsumedOffsets) {

Review Comment:
   changed to `private`



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-15327: ensure the commit manager commit on close [kafka]

2023-11-20 Thread via GitHub


philipnee commented on code in PR #14710:
URL: https://github.com/apache/kafka/pull/14710#discussion_r1400064245


##
clients/src/main/java/org/apache/kafka/clients/consumer/internals/CoordinatorRequestManager.java:
##
@@ -219,4 +219,14 @@ private void onResponse(
 public Optional coordinator() {
 return Optional.ofNullable(this.coordinator);
 }
+
+@Override
+public NetworkClientDelegate.PollResult pollOnClose() {

Review Comment:
   Deleted. I forgot to remove when removing the autocommit in the 
runOnClose... (sorry)



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-15047: Roll active segment when it breaches the retention policy [kafka]

2023-11-20 Thread via GitHub


satishd commented on code in PR #14766:
URL: https://github.com/apache/kafka/pull/14766#discussion_r1399008945


##
core/src/main/scala/kafka/log/UnifiedLog.scala:
##
@@ -1424,25 +1425,71 @@ class UnifiedLog(@volatile var logStartOffset: Long,
*/
   private def deleteOldSegments(predicate: (LogSegment, Option[LogSegment]) => 
Boolean,
 reason: SegmentDeletionReason): Int = {
-def shouldDelete(segment: LogSegment, nextSegmentOpt: Option[LogSegment]): 
Boolean = {
-  val upperBoundOffset = 
nextSegmentOpt.map(_.baseOffset).getOrElse(localLog.logEndOffset)
-
-  // Check not to delete segments which are not yet copied to tiered 
storage if remote log is enabled.
-  (!remoteLogEnabled() || (upperBoundOffset > 0 && upperBoundOffset - 1 <= 
highestOffsetInRemoteStorage)) &&
-// We don't delete segments with offsets at or beyond the high 
watermark to ensure that the log start
-// offset can never exceed it.
-highWatermark >= upperBoundOffset &&
-predicate(segment, nextSegmentOpt)
-}
 lock synchronized {
-  val deletable = localLog.deletableSegments(shouldDelete)
+  val deletable = deletableSegments(predicate)
   if (deletable.nonEmpty)
 deleteSegments(deletable, reason)
   else
 0
 }
   }
 
+  /**
+   * Find segments starting from the oldest until the user-supplied predicate 
is false.
+   * A final segment that is empty will never be returned.
+   *
+   * @param predicate A function that takes in a candidate log segment, the 
next higher segment
+   *  (if there is one). It returns true iff the segment is 
deletable.
+   * @return the segments ready to be deleted
+   */
+  private[log] def deletableSegments(predicate: (LogSegment, 
Option[LogSegment]) => Boolean): Iterable[LogSegment] = {
+def isSegmentEligibleForDeletion(upperBoundOffset: Long): Boolean = {
+  // Segments are eligible for deletion when:
+  //1. they are uploaded to the remote storage
+  if (remoteLogEnabled()) {
+upperBoundOffset > 0 && upperBoundOffset - 1 <= 
highestOffsetInRemoteStorage
+  } else {
+true
+  }
+}
+
+if (localLog.segments.isEmpty) {
+  Seq.empty
+} else {
+  val deletable = ArrayBuffer.empty[LogSegment]
+  val segmentsIterator = localLog.segments.values.iterator
+  var segmentOpt = nextOption(segmentsIterator)
+  var shouldRoll = false
+  while (segmentOpt.isDefined) {
+val segment = segmentOpt.get
+val nextSegmentOpt = nextOption(segmentsIterator)
+val isLastSegmentAndEmpty = nextSegmentOpt.isEmpty && segment.size == 0
+val upperBoundOffset = if (nextSegmentOpt.nonEmpty) 
nextSegmentOpt.get.baseOffset() else logEndOffset
+// We don't delete segments with offsets at or beyond the high 
watermark to ensure that the log start
+// offset can never exceed it.
+val predicateResult = highWatermark >= upperBoundOffset && 
predicate(segment, nextSegmentOpt)
+
+// Roll the active segment when it breaches the configured retention 
policy. The rolled segment will be
+// eligible for deletion and gets removed in the next iteration.
+if (predicateResult && !isLastSegmentAndEmpty && remoteLogEnabled() && 
nextSegmentOpt.isEmpty) {

Review Comment:
   Can we have the below condition which is easy to understand or some thing 
better?
   ```
   predicateResult && remoteLogEnabled() &&
   nextSegmentOpt.isEmpty  && segment.size > 0 // active segment is not empty
   ```
   
   Another way is to keep the last segment's predicate result and its size and 
do the check after the while loop. 



##
storage/src/test/java/org/apache/kafka/tiered/storage/integration/RollAndOffloadActiveSegmentTest.java:
##
@@ -0,0 +1,75 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.tiered.storage.integration;
+
+import org.apache.kafka.common.config.TopicConfig;
+import org.apache.kafka.tiered.storage.TieredStorageTestBuilder;
+import org.apache.kafka.tiered.storage.TieredStorageTestHarness;
+import org.apache.kafka.tiered.storage.specs.KeyValueSpec;
+
+im

[jira] [Updated] (KAFKA-15864) Add more tests asserting the log-start-offset, local-log-start-offset, and HW/LSO/LEO in rolling over segments with tiered storage.

2023-11-20 Thread Satish Duggana (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-15864?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Satish Duggana updated KAFKA-15864:
---
Description: Followup on the 
[comment|https://github.com/apache/kafka/pull/14766/files#r1395389551]

> Add more tests asserting the log-start-offset, local-log-start-offset, and 
> HW/LSO/LEO in rolling over segments with tiered storage.
> ---
>
> Key: KAFKA-15864
> URL: https://issues.apache.org/jira/browse/KAFKA-15864
> Project: Kafka
>  Issue Type: Improvement
>  Components: core
>Reporter: Satish Duggana
>Priority: Major
>  Labels: tiered-storage
> Fix For: 3.7.0
>
>
> Followup on the 
> [comment|https://github.com/apache/kafka/pull/14766/files#r1395389551]



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


Re: [PR] KAFKA-15327: ensure the commit manager commit on close [kafka]

2023-11-20 Thread via GitHub


philipnee commented on code in PR #14710:
URL: https://github.com/apache/kafka/pull/14710#discussion_r1400055966


##
clients/src/main/java/org/apache/kafka/clients/consumer/internals/ConsumerNetworkThread.java:
##
@@ -90,14 +93,13 @@ public void run() {
 while (running) {
 try {
 runOnce();
-} catch (final WakeupException e) {
-log.debug("WakeupException caught, consumer network thread 
won't be interrupted");
+} catch (final Throwable e) {
+log.error("Unexpected error caught in consumer network 
thread", e);
 // swallow the wakeup exception to prevent killing the 
thread.

Review Comment:
   Thanks, a question here: Should we actually swallow the exception or 
propagate the exception back to the application thread (wrap it with 
KafkaException via BackgroundEventHandler)



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-15778 & KAFKA-15779: Implement metrics manager (KIP-714) [kafka]

2023-11-20 Thread via GitHub


apoorvmittal10 commented on PR #14699:
URL: https://github.com/apache/kafka/pull/14699#issuecomment-1820280356

   Thanks @junrao for explaining the details. I have updated the PR and removed 
throttleMs from ClientMetricsManager. I have added a Jira to add respective 
throttling changes in QuotaManager.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[jira] [Created] (KAFKA-15864) Add more tests asserting the log-start-offset, local-log-start-offset, and HW/LSO/LEO in rolling over segments with tiered storage.

2023-11-20 Thread Satish Duggana (Jira)
Satish Duggana created KAFKA-15864:
--

 Summary: Add more tests asserting the log-start-offset, 
local-log-start-offset, and HW/LSO/LEO in rolling over segments with tiered 
storage.
 Key: KAFKA-15864
 URL: https://issues.apache.org/jira/browse/KAFKA-15864
 Project: Kafka
  Issue Type: Improvement
  Components: core
Reporter: Satish Duggana
 Fix For: 3.7.0






--
This message was sent by Atlassian Jira
(v8.20.10#820010)


Re: [PR] KAFKA-14519; [2/N] New coordinator metrics [kafka]

2023-11-20 Thread via GitHub


jolshan merged PR #14387:
URL: https://github.com/apache/kafka/pull/14387


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-14519; [2/N] New coordinator metrics [kafka]

2023-11-20 Thread via GitHub


jolshan commented on PR #14387:
URL: https://github.com/apache/kafka/pull/14387#issuecomment-1820273369

   Build looks good now. I will merge.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-15778 & KAFKA-15779: Implement metrics manager (KIP-714) [kafka]

2023-11-20 Thread via GitHub


apoorvmittal10 commented on code in PR #14699:
URL: https://github.com/apache/kafka/pull/14699#discussion_r1400049044


##
core/src/main/scala/kafka/server/metadata/DynamicConfigPublisher.scala:
##
@@ -103,9 +103,16 @@ class DynamicConfigPublisher(
   )
 case CLIENT_METRICS =>
   // Apply changes to client metrics subscription.
-  info(s"Updating client metrics subscription ${resource.name()} 
with new configuration : " +
-toLoggableProps(resource, props).mkString(","))
-  
dynamicConfigHandlers(ConfigType.ClientMetrics).processConfigChanges(resource.name(),
 props)
+  
dynamicConfigHandlers.get(ConfigType.ClientMetrics).foreach(metricsConfigHandler
 =>
+try {
+  info(s"Updating client metrics ${resource.name()} with new 
configuration : " +
+toLoggableProps(resource, props).mkString(","))

Review Comment:
   Done.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-15778 & KAFKA-15779: Implement metrics manager (KIP-714) [kafka]

2023-11-20 Thread via GitHub


apoorvmittal10 commented on code in PR #14699:
URL: https://github.com/apache/kafka/pull/14699#discussion_r1400048884


##
core/src/main/java/kafka/server/ClientMetricsManager.java:
##
@@ -16,31 +16,421 @@
  */
 package kafka.server;
 
+import kafka.metrics.ClientMetricsConfigs;
+import kafka.metrics.ClientMetricsInstance;
+import kafka.metrics.ClientMetricsInstanceMetadata;
+import kafka.metrics.ClientMetricsReceiverPlugin;
+
+import org.apache.kafka.common.Uuid;
+import org.apache.kafka.common.cache.Cache;
+import org.apache.kafka.common.cache.LRUCache;
+import org.apache.kafka.common.cache.SynchronizedCache;
+import org.apache.kafka.common.errors.ApiException;
+import org.apache.kafka.common.errors.InvalidRequestException;
+import org.apache.kafka.common.errors.TelemetryTooLargeException;
+import org.apache.kafka.common.errors.ThrottlingQuotaExceededException;
+import org.apache.kafka.common.errors.UnknownSubscriptionIdException;
+import org.apache.kafka.common.errors.UnsupportedCompressionTypeException;
+import org.apache.kafka.common.message.GetTelemetrySubscriptionsResponseData;
+import org.apache.kafka.common.message.PushTelemetryResponseData;
+import org.apache.kafka.common.protocol.Errors;
+import org.apache.kafka.common.record.CompressionType;
+import org.apache.kafka.common.requests.GetTelemetrySubscriptionsRequest;
+import org.apache.kafka.common.requests.GetTelemetrySubscriptionsResponse;
+import org.apache.kafka.common.requests.PushTelemetryRequest;
+import org.apache.kafka.common.requests.PushTelemetryResponse;
+import org.apache.kafka.common.requests.RequestContext;
+import org.apache.kafka.common.utils.Crc32C;
+import org.apache.kafka.common.utils.Time;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
 import java.io.Closeable;
 import java.io.IOException;
+import java.nio.charset.StandardCharsets;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Collection;
+import java.util.Collections;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.Optional;
 import java.util.Properties;
+import java.util.Set;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.atomic.AtomicInteger;
+import java.util.regex.Pattern;
 
 /**
  * Handles client telemetry metrics requests/responses, subscriptions and 
instance information.
  */
 public class ClientMetricsManager implements Closeable {
 
 private static final Logger log = 
LoggerFactory.getLogger(ClientMetricsManager.class);
-private static final ClientMetricsManager INSTANCE = new 
ClientMetricsManager();
+private static final List SUPPORTED_COMPRESSION_TYPES = 
Collections.unmodifiableList(
+Arrays.asList(CompressionType.ZSTD.id, CompressionType.LZ4.id, 
CompressionType.GZIP.id, CompressionType.SNAPPY.id));
+// Max cache size (16k active client connections per broker)
+private static final int CM_CACHE_MAX_SIZE = 16384;
+
+private final Cache clientInstanceCache;
+private final Map subscriptionMap;
+private final KafkaConfig config;
+private final Time time;
+
+// The latest subscription version is used to determine if subscription 
has changed and needs
+// to re-evaluate the client instance subscription id as per changed 
subscriptions.
+private final AtomicInteger subscriptionUpdateVersion;
 
-public static ClientMetricsManager instance() {
-return INSTANCE;
+public ClientMetricsManager(KafkaConfig config, Time time) {
+this.subscriptionMap = new ConcurrentHashMap<>();
+this.subscriptionUpdateVersion = new AtomicInteger(0);
+this.clientInstanceCache = new SynchronizedCache<>(new 
LRUCache<>(CM_CACHE_MAX_SIZE));
+this.config = config;
+this.time = time;
 }
 
 public void updateSubscription(String subscriptionName, Properties 
properties) {
-// TODO: Implement the update logic to manage subscriptions.
+// Validate the subscription properties.
+ClientMetricsConfigs.validate(subscriptionName, properties);
+// IncrementalAlterConfigs API will send empty configs when all the 
configs are deleted
+// for respective subscription. In that case, we need to remove the 
subscription from the map.
+if (properties.isEmpty()) {
+// Remove the subscription from the map if it exists, else ignore 
the config update.
+if (subscriptionMap.containsKey(subscriptionName)) {
+log.info("Removing subscription [{}] from the subscription 
map", subscriptionName);
+subscriptionMap.remove(subscriptionName);
+this.subscriptionUpdateVersion.incrementAndGet();
+}
+return;
+}
+
+updateClientSubscription(subscriptionName, new 
ClientMetricsConfigs(properties));
+}
+
+public GetTelemetrySubscriptionsResponse 
processGetTelemetrySubscriptionRequest(
+GetTelemetrySubscriptionsReq

[jira] [Created] (KAFKA-15863) Handle push telemetry throttling with quota manager

2023-11-20 Thread Apoorv Mittal (Jira)
Apoorv Mittal created KAFKA-15863:
-

 Summary: Handle push telemetry throttling with quota manager
 Key: KAFKA-15863
 URL: https://issues.apache.org/jira/browse/KAFKA-15863
 Project: Kafka
  Issue Type: Sub-task
Reporter: Apoorv Mittal
Assignee: Apoorv Mittal


Details: https://github.com/apache/kafka/pull/14699#discussion_r1399714279



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


Re: [PR] KAFKA-15484: General Rack Aware Assignor [kafka]

2023-11-20 Thread via GitHub


jeffkbkim commented on code in PR #14481:
URL: https://github.com/apache/kafka/pull/14481#discussion_r1400044251


##
group-coordinator/src/main/java/org/apache/kafka/coordinator/group/assignor/GeneralUniformAssignmentBuilder.java:
##
@@ -14,17 +14,891 @@
  * See the License for the specific language governing permissions and
  * limitations under the License.
  */
-
 package org.apache.kafka.coordinator.group.assignor;
 
+import org.apache.kafka.common.Uuid;
+import org.apache.kafka.server.common.TopicIdPartition;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.Collections;
+import java.util.Comparator;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.Objects;
+import java.util.Set;
+import java.util.TreeSet;
+import java.util.stream.Collectors;
+
+/**
+ * The general uniform assignment builder is used to generate the target 
assignment for a consumer group with
+ * at least one of its members subscribed to a different set of topics.
+ *
+ * Assignments are done according to the following principles:
+ *
+ *  Balance:  Ensure partitions are distributed equally among all 
members.
+ *The difference in assignments sizes between any two 
members
+ *should not exceed one partition. 
+ *  Rack Matching:When feasible, aim to assign partitions to members
+ *located on the same rack thus avoiding cross-zone 
traffic. 
+ *  Stickiness:   Minimize partition movements among members by 
retaining
+ *as much of the existing assignment as possible. 
+ *
+ * This assignment builder prioritizes the above properties in the following 
order:
+ *  Balance > Rack Matching > Stickiness.
+ */
 public class GeneralUniformAssignmentBuilder extends 
AbstractUniformAssignmentBuilder {
 private static final Logger LOG = 
LoggerFactory.getLogger(GeneralUniformAssignmentBuilder.class);
 
+/**
+ * The member metadata obtained from the assignment specification.
+ */
+private final Map members;
+
+/**
+ * The topic and partition metadata describer.
+ */
+private final SubscribedTopicDescriber subscribedTopicDescriber;
+
+/**
+ * The list of all the topic Ids that the consumer group is subscribed to.
+ */
+private final Set subscriptionIds;
+
+/**
+ * Rack information.
+ */
+private final RackInfo rackInfo;
+
+/**
+ * List of subscribed members for each topic.
+ */
+private final Map> membersPerTopic;
+
+/**
+ * The partitions that still need to be assigned.
+ */
+private final Set unassignedPartitions;
+
+/**
+ * All the partitions that have been retained from the existing assignment.
+ */
+private final Set assignedStickyPartitions;
+
+/**
+ * Manages assignments to members based on their current assignment size 
and maximum allowed assignment size.
+ */
+private final AssignmentManager assignmentManager;
+
+/**
+ * List of all the members sorted by their respective assignment sizes.
+ */
+private final TreeSet sortedMembersByAssignmentSize;
+
+/**
+ * Tracks the owner of each partition in the existing assignment on the 
client.
+ *
+ * Only populated when rack aware strategy is used.
+ * Contains partitions that weren't retained due to a rack mismatch.
+ */
+private final Map currentPartitionOwners;
+
+/**
+ * Tracks the owner of each partition in the target assignment.
+ */
+private final Map 
partitionOwnerInTargetAssignment;
+
+/**
+ * Handles all operations related to partition movements during a 
reassignment for balancing the target assignment.
+ */
+private PartitionMovements partitionMovements;
+
+/**
+ * The new assignment that will be returned.
+ */
+private final Map targetAssignment;
+
+public GeneralUniformAssignmentBuilder(AssignmentSpec assignmentSpec, 
SubscribedTopicDescriber subscribedTopicDescriber) {
+this.members = assignmentSpec.members();
+this.subscribedTopicDescriber = subscribedTopicDescriber;
+this.subscriptionIds = assignmentSpec.members().values().stream()
+.flatMap(memberSpec -> memberSpec.subscribedTopicIds().stream())
+.peek(topicId -> {
+int partitionCount = 
subscribedTopicDescriber.numPartitions(topicId);
+if (partitionCount == -1) {
+throw new PartitionAssignorException(
+"Members are subscribed to topic " + topicId + " which 
doesn't exist in the topic metadata."
+);
+}
+})
+.collect(Collectors.toSet());
+this.rackInfo = new RackInfo(assignmentSpec, subscribedTopicDescriber, 
subscriptionIds);
+   

Re: [PR] KAFKA-15778 & KAFKA-15779: Implement metrics manager (KIP-714) [kafka]

2023-11-20 Thread via GitHub


apoorvmittal10 commented on code in PR #14699:
URL: https://github.com/apache/kafka/pull/14699#discussion_r1400045702


##
core/src/main/java/kafka/server/ClientMetricsManager.java:
##
@@ -16,31 +16,420 @@
  */
 package kafka.server;
 
+import kafka.metrics.ClientMetricsConfigs;

Review Comment:
   As I received merge conflict in the current PR hence merged the upstream 
changes which required to move the classes to write package. I have done this 
in current PR itself.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-15484: General Rack Aware Assignor [kafka]

2023-11-20 Thread via GitHub


jeffkbkim commented on code in PR #14481:
URL: https://github.com/apache/kafka/pull/14481#discussion_r1400045359


##
group-coordinator/src/main/java/org/apache/kafka/coordinator/group/assignor/GeneralUniformAssignmentBuilder.java:
##
@@ -14,17 +14,891 @@
  * See the License for the specific language governing permissions and
  * limitations under the License.
  */
-
 package org.apache.kafka.coordinator.group.assignor;
 
+import org.apache.kafka.common.Uuid;
+import org.apache.kafka.server.common.TopicIdPartition;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.Collections;
+import java.util.Comparator;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.Objects;
+import java.util.Set;
+import java.util.TreeSet;
+import java.util.stream.Collectors;
+
+/**
+ * The general uniform assignment builder is used to generate the target 
assignment for a consumer group with
+ * at least one of its members subscribed to a different set of topics.
+ *
+ * Assignments are done according to the following principles:
+ *
+ *  Balance:  Ensure partitions are distributed equally among all 
members.
+ *The difference in assignments sizes between any two 
members
+ *should not exceed one partition. 
+ *  Rack Matching:When feasible, aim to assign partitions to members
+ *located on the same rack thus avoiding cross-zone 
traffic. 
+ *  Stickiness:   Minimize partition movements among members by 
retaining
+ *as much of the existing assignment as possible. 
+ *
+ * This assignment builder prioritizes the above properties in the following 
order:
+ *  Balance > Rack Matching > Stickiness.
+ */
 public class GeneralUniformAssignmentBuilder extends 
AbstractUniformAssignmentBuilder {
 private static final Logger LOG = 
LoggerFactory.getLogger(GeneralUniformAssignmentBuilder.class);
 
+/**
+ * The member metadata obtained from the assignment specification.
+ */
+private final Map members;
+
+/**
+ * The topic and partition metadata describer.
+ */
+private final SubscribedTopicDescriber subscribedTopicDescriber;
+
+/**
+ * The list of all the topic Ids that the consumer group is subscribed to.
+ */
+private final Set subscriptionIds;
+
+/**
+ * Rack information.
+ */
+private final RackInfo rackInfo;
+
+/**
+ * List of subscribed members for each topic.
+ */
+private final Map> membersPerTopic;
+
+/**
+ * The partitions that still need to be assigned.
+ */
+private final Set unassignedPartitions;
+
+/**
+ * All the partitions that have been retained from the existing assignment.
+ */
+private final Set assignedStickyPartitions;
+
+/**
+ * Manages assignments to members based on their current assignment size 
and maximum allowed assignment size.
+ */
+private final AssignmentManager assignmentManager;
+
+/**
+ * List of all the members sorted by their respective assignment sizes.
+ */
+private final TreeSet sortedMembersByAssignmentSize;
+
+/**
+ * Tracks the owner of each partition in the existing assignment on the 
client.
+ *
+ * Only populated when rack aware strategy is used.
+ * Contains partitions that weren't retained due to a rack mismatch.
+ */
+private final Map currentPartitionOwners;
+
+/**
+ * Tracks the owner of each partition in the target assignment.
+ */
+private final Map 
partitionOwnerInTargetAssignment;
+
+/**
+ * Handles all operations related to partition movements during a 
reassignment for balancing the target assignment.
+ */
+private PartitionMovements partitionMovements;
+
+/**
+ * The new assignment that will be returned.
+ */
+private final Map targetAssignment;
+
+public GeneralUniformAssignmentBuilder(AssignmentSpec assignmentSpec, 
SubscribedTopicDescriber subscribedTopicDescriber) {
+this.members = assignmentSpec.members();
+this.subscribedTopicDescriber = subscribedTopicDescriber;
+this.subscriptionIds = assignmentSpec.members().values().stream()
+.flatMap(memberSpec -> memberSpec.subscribedTopicIds().stream())
+.peek(topicId -> {
+int partitionCount = 
subscribedTopicDescriber.numPartitions(topicId);
+if (partitionCount == -1) {
+throw new PartitionAssignorException(
+"Members are subscribed to topic " + topicId + " which 
doesn't exist in the topic metadata."
+);
+}
+})
+.collect(Collectors.toSet());
+this.rackInfo = new RackInfo(assignmentSpec, subscribedTopicDescriber, 
subscriptionIds);
+   

Re: [PR] KAFKA-15484: General Rack Aware Assignor [kafka]

2023-11-20 Thread via GitHub


jeffkbkim commented on code in PR #14481:
URL: https://github.com/apache/kafka/pull/14481#discussion_r1400044251


##
group-coordinator/src/main/java/org/apache/kafka/coordinator/group/assignor/GeneralUniformAssignmentBuilder.java:
##
@@ -14,17 +14,891 @@
  * See the License for the specific language governing permissions and
  * limitations under the License.
  */
-
 package org.apache.kafka.coordinator.group.assignor;
 
+import org.apache.kafka.common.Uuid;
+import org.apache.kafka.server.common.TopicIdPartition;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.Collections;
+import java.util.Comparator;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.Objects;
+import java.util.Set;
+import java.util.TreeSet;
+import java.util.stream.Collectors;
+
+/**
+ * The general uniform assignment builder is used to generate the target 
assignment for a consumer group with
+ * at least one of its members subscribed to a different set of topics.
+ *
+ * Assignments are done according to the following principles:
+ *
+ *  Balance:  Ensure partitions are distributed equally among all 
members.
+ *The difference in assignments sizes between any two 
members
+ *should not exceed one partition. 
+ *  Rack Matching:When feasible, aim to assign partitions to members
+ *located on the same rack thus avoiding cross-zone 
traffic. 
+ *  Stickiness:   Minimize partition movements among members by 
retaining
+ *as much of the existing assignment as possible. 
+ *
+ * This assignment builder prioritizes the above properties in the following 
order:
+ *  Balance > Rack Matching > Stickiness.
+ */
 public class GeneralUniformAssignmentBuilder extends 
AbstractUniformAssignmentBuilder {
 private static final Logger LOG = 
LoggerFactory.getLogger(GeneralUniformAssignmentBuilder.class);
 
+/**
+ * The member metadata obtained from the assignment specification.
+ */
+private final Map members;
+
+/**
+ * The topic and partition metadata describer.
+ */
+private final SubscribedTopicDescriber subscribedTopicDescriber;
+
+/**
+ * The list of all the topic Ids that the consumer group is subscribed to.
+ */
+private final Set subscriptionIds;
+
+/**
+ * Rack information.
+ */
+private final RackInfo rackInfo;
+
+/**
+ * List of subscribed members for each topic.
+ */
+private final Map> membersPerTopic;
+
+/**
+ * The partitions that still need to be assigned.
+ */
+private final Set unassignedPartitions;
+
+/**
+ * All the partitions that have been retained from the existing assignment.
+ */
+private final Set assignedStickyPartitions;
+
+/**
+ * Manages assignments to members based on their current assignment size 
and maximum allowed assignment size.
+ */
+private final AssignmentManager assignmentManager;
+
+/**
+ * List of all the members sorted by their respective assignment sizes.
+ */
+private final TreeSet sortedMembersByAssignmentSize;
+
+/**
+ * Tracks the owner of each partition in the existing assignment on the 
client.
+ *
+ * Only populated when rack aware strategy is used.
+ * Contains partitions that weren't retained due to a rack mismatch.
+ */
+private final Map currentPartitionOwners;
+
+/**
+ * Tracks the owner of each partition in the target assignment.
+ */
+private final Map 
partitionOwnerInTargetAssignment;
+
+/**
+ * Handles all operations related to partition movements during a 
reassignment for balancing the target assignment.
+ */
+private PartitionMovements partitionMovements;
+
+/**
+ * The new assignment that will be returned.
+ */
+private final Map targetAssignment;
+
+public GeneralUniformAssignmentBuilder(AssignmentSpec assignmentSpec, 
SubscribedTopicDescriber subscribedTopicDescriber) {
+this.members = assignmentSpec.members();
+this.subscribedTopicDescriber = subscribedTopicDescriber;
+this.subscriptionIds = assignmentSpec.members().values().stream()
+.flatMap(memberSpec -> memberSpec.subscribedTopicIds().stream())
+.peek(topicId -> {
+int partitionCount = 
subscribedTopicDescriber.numPartitions(topicId);
+if (partitionCount == -1) {
+throw new PartitionAssignorException(
+"Members are subscribed to topic " + topicId + " which 
doesn't exist in the topic metadata."
+);
+}
+})
+.collect(Collectors.toSet());
+this.rackInfo = new RackInfo(assignmentSpec, subscribedTopicDescriber, 
subscriptionIds);
+   

Re: [PR] KAFKA-15586: Clean shutdown detection - server side [kafka]

2023-11-20 Thread via GitHub


CalvinConfluent commented on code in PR #14706:
URL: https://github.com/apache/kafka/pull/14706#discussion_r1400042116


##
metadata/src/main/java/org/apache/kafka/controller/BrokersToElrs.java:
##
@@ -0,0 +1,161 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.kafka.controller;

Review Comment:
   Updated the comment in the ReplicationControlManager. 



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-15586: Clean shutdown detection - server side [kafka]

2023-11-20 Thread via GitHub


CalvinConfluent commented on code in PR #14706:
URL: https://github.com/apache/kafka/pull/14706#discussion_r1400041537


##
metadata/src/main/java/org/apache/kafka/controller/ClusterControlManager.java:
##
@@ -327,10 +333,12 @@ public ControllerResult 
registerBroker(
 ", but got cluster ID " + request.clusterId());
 }
 int brokerId = request.brokerId();
+List records = new ArrayList<>();
 BrokerRegistration existing = brokerRegistrations.get(brokerId);
-if (version < 2 || existing == null || request.previousBrokerEpoch() 
!= existing.epoch()) {
-// TODO(KIP-966): Update the ELR if the broker has an unclean 
shutdown.
-log.debug("Received an unclean shutdown request");
+if (version >= 2
+&& (existing == null || request.previousBrokerEpoch() != 
existing.epoch())
+&& replicationControlManager != null) {

Review Comment:
   Done.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-15586: Clean shutdown detection - server side [kafka]

2023-11-20 Thread via GitHub


CalvinConfluent commented on code in PR #14706:
URL: https://github.com/apache/kafka/pull/14706#discussion_r1400041854


##
metadata/src/main/java/org/apache/kafka/controller/ReplicationControlManager.java:
##
@@ -1772,7 +1814,8 @@ void generateLeaderAndIsrUpdates(String context,
 // where there is an unclean leader election which chooses a leader 
from outside
 // the ISR.
 IntPredicate isAcceptableLeader =
-r -> (r != brokerToRemove) && (r == brokerToAdd || 
clusterControl.isActive(r));
+r -> (r != brokerToRemove && r != brokerWithUncleanShutdown)

Review Comment:
   Done.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-15586: Clean shutdown detection - server side [kafka]

2023-11-20 Thread via GitHub


CalvinConfluent commented on code in PR #14706:
URL: https://github.com/apache/kafka/pull/14706#discussion_r1400041780


##
metadata/src/main/java/org/apache/kafka/controller/BrokersToElrs.java:
##
@@ -0,0 +1,161 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.kafka.controller;
+
+import org.apache.kafka.common.Uuid;
+import org.apache.kafka.metadata.Replicas;
+import org.apache.kafka.timeline.SnapshotRegistry;
+import org.apache.kafka.timeline.TimelineHashMap;
+
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.Map;
+
+import static org.apache.kafka.metadata.Replicas.NONE;
+
+public class BrokersToElrs {
+private final SnapshotRegistry snapshotRegistry;
+
+private final TimelineHashMap> 
elrMembers;

Review Comment:
   Done. It is broker id -> TopicIdPartitions



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[PR] KAFKA-15860: ControllerRegistration must be written out to the metadata image [kafka]

2023-11-20 Thread via GitHub


cmccabe opened a new pull request, #14807:
URL: https://github.com/apache/kafka/pull/14807

   (no comment)


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-15586: Clean shutdown detection - server side [kafka]

2023-11-20 Thread via GitHub


CalvinConfluent commented on code in PR #14706:
URL: https://github.com/apache/kafka/pull/14706#discussion_r1400036881


##
metadata/src/main/java/org/apache/kafka/controller/PartitionChangeBuilder.java:
##
@@ -466,6 +466,10 @@ private void 
maybeUpdateLastKnownLeader(PartitionChangeRecord record) {
 partition.lastKnownElr[0] != partition.leader)) {
 // Only update the last known leader when the first time the 
partition becomes leaderless.
 record.setLastKnownELR(Arrays.asList(partition.leader));
+} else if ((recordHasLeader(record) || partition.leader != NO_LEADER)

Review Comment:
   Good catch! To improve the readability, revise to the following.
   `record.leader() > 0 || (partition.leader != NO_LEADER && record.leader() != 
NO_LEADER)`
   `(Will have a new leader) || (will not become leaderless)`



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-15681: Add support of client-metrics in kafka-configs.sh (KIP-714) [kafka]

2023-11-20 Thread via GitHub


apoorvmittal10 commented on PR #14632:
URL: https://github.com/apache/kafka/pull/14632#issuecomment-1820231138

   > Seems there is a compilation error on the last run:
   > 
   > ```
   > 
   > [Error] 
/home/jenkins/jenkins-agent/workspace/Kafka_kafka-pr_PR-14632/core/src/main/scala/kafka/server/DynamicConfig.scala:116:47:
 object ClientMetricsConfigs is not a member of package kafka.metrics
   > 
   > [Error] 
/home/jenkins/jenkins-agent/workspace/Kafka_kafka-pr_PR-14632/core/src/main/scala/kafka/server/DynamicConfig.scala:116:17:
 private val clientConfigs in object ClientMetrics is never used
   > ```
   
   The change in dependent classes broke the PR build. I have resolved merge 
issue and triggered build.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-15585: Add DescribeTopics API server side support [kafka]

2023-11-20 Thread via GitHub


CalvinConfluent commented on code in PR #14612:
URL: https://github.com/apache/kafka/pull/14612#discussion_r1400016403


##
core/src/main/scala/kafka/server/KafkaApis.scala:
##
@@ -1356,6 +1373,78 @@ class KafkaApis(val requestChannel: RequestChannel,
   ))
   }
 
+  def handleDescribeTopicPartitionsRequest(request: RequestChannel.Request): 
Unit = {
+metadataCache match {
+  case _: ZkMetadataCache =>
+throw new InvalidRequestException("ZK cluster does not handle 
DescribeTopicPartitions request")
+  case _ =>
+}
+val kRaftMetadataCache = metadataCache.asInstanceOf[KRaftMetadataCache]
+
+val describeTopicPartitionsRequest = 
request.body[DescribeTopicPartitionsRequest].data()
+val topicSet = mutable.SortedSet[String]()
+describeTopicPartitionsRequest.topics().forEach(topic => 
topicSet.add(topic.name()))
+val topics = ListBuffer[String]().addAll(topicSet)
+
+val cursor = describeTopicPartitionsRequest.cursor()
+val fetchAllTopics = topics.isEmpty
+if (fetchAllTopics) {
+  kRaftMetadataCache.getAllTopics().foreach(topic => topics.append(topic))

Review Comment:
   But if you ask whether it is worth the effort to create the full set of 
underline structures to get an ordered list where we can just sort the topic 
list, I am not sure.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-15585: Add DescribeTopics API server side support [kafka]

2023-11-20 Thread via GitHub


CalvinConfluent commented on code in PR #14612:
URL: https://github.com/apache/kafka/pull/14612#discussion_r1400016403


##
core/src/main/scala/kafka/server/KafkaApis.scala:
##
@@ -1356,6 +1373,78 @@ class KafkaApis(val requestChannel: RequestChannel,
   ))
   }
 
+  def handleDescribeTopicPartitionsRequest(request: RequestChannel.Request): 
Unit = {
+metadataCache match {
+  case _: ZkMetadataCache =>
+throw new InvalidRequestException("ZK cluster does not handle 
DescribeTopicPartitions request")
+  case _ =>
+}
+val kRaftMetadataCache = metadataCache.asInstanceOf[KRaftMetadataCache]
+
+val describeTopicPartitionsRequest = 
request.body[DescribeTopicPartitionsRequest].data()
+val topicSet = mutable.SortedSet[String]()
+describeTopicPartitionsRequest.topics().forEach(topic => 
topicSet.add(topic.name()))
+val topics = ListBuffer[String]().addAll(topicSet)
+
+val cursor = describeTopicPartitionsRequest.cursor()
+val fetchAllTopics = topics.isEmpty
+if (fetchAllTopics) {
+  kRaftMetadataCache.getAllTopics().foreach(topic => topics.append(topic))

Review Comment:
   But if you ask whether it is worth the effort to create the full set of 
underline structures to get an ordered list where we can just sort the list, I 
am not sure.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-15585: Add DescribeTopics API server side support [kafka]

2023-11-20 Thread via GitHub


CalvinConfluent commented on code in PR #14612:
URL: https://github.com/apache/kafka/pull/14612#discussion_r1400015238


##
core/src/main/scala/kafka/server/KafkaApis.scala:
##
@@ -1356,6 +1373,78 @@ class KafkaApis(val requestChannel: RequestChannel,
   ))
   }
 
+  def handleDescribeTopicPartitionsRequest(request: RequestChannel.Request): 
Unit = {
+metadataCache match {
+  case _: ZkMetadataCache =>
+throw new InvalidRequestException("ZK cluster does not handle 
DescribeTopicPartitions request")
+  case _ =>
+}
+val kRaftMetadataCache = metadataCache.asInstanceOf[KRaftMetadataCache]
+
+val describeTopicPartitionsRequest = 
request.body[DescribeTopicPartitionsRequest].data()
+val topicSet = mutable.SortedSet[String]()
+describeTopicPartitionsRequest.topics().forEach(topic => 
topicSet.add(topic.name()))
+val topics = ListBuffer[String]().addAll(topicSet)
+
+val cursor = describeTopicPartitionsRequest.cursor()
+val fetchAllTopics = topics.isEmpty
+if (fetchAllTopics) {
+  kRaftMetadataCache.getAllTopics().foreach(topic => topics.append(topic))

Review Comment:
   In the fetch all path, no additional sort is required. I did not see a good 
way to convert Java list to a scala mutable list, so I did the copy.
   Use a mutable list for 2 reasons
   1. It is easier to filter out the topics alphabetically ahead of the cursor 
topic
   2. In the fetch all case, I think we should still include the cursor topic 
in the response if it does not exist. Mutable list make it easier.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-15585: Add DescribeTopics API server side support [kafka]

2023-11-20 Thread via GitHub


CalvinConfluent commented on code in PR #14612:
URL: https://github.com/apache/kafka/pull/14612#discussion_r1400011898


##
clients/src/main/resources/common/message/DescribeTopicPartitionsResponse.json:
##
@@ -0,0 +1,66 @@
+// Licensed to the Apache Software Foundation (ASF) under one or more
+// contributor license agreements.  See the NOTICE file distributed with
+// this work for additional information regarding copyright ownership.
+// The ASF licenses this file to You under the Apache License, Version 2.0
+// (the "License"); you may not use this file except in compliance with
+// the License.  You may obtain a copy of the License at
+//
+//http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+
+{
+  "apiKey": 74,
+  "type": "response",
+  "name": "DescribeTopicPartitionsResponse",
+  "validVersions": "0",
+  "flexibleVersions": "0+",
+  "fields": [
+{ "name": "ThrottleTimeMs", "type": "int32", "versions": "0+", 
"ignorable": true,
+  "about": "The duration in milliseconds for which the request was 
throttled due to a quota violation, or zero if the request did not violate any 
quota." },
+{ "name": "Topics", "type": "[]DescribeTopicPartitionsResponseTopic", 
"versions": "0+",
+  "about": "Each topic in the response.", "fields": [
+  { "name": "ErrorCode", "type": "int16", "versions": "0+",
+"about": "The topic error, or 0 if there was no error." },
+  { "name": "Name", "type": "string", "versions": "0+", "mapKey": true, 
"entityType": "topicName", "nullableVersions": "0+",
+"about": "The topic name." },
+  { "name": "TopicId", "type": "uuid", "versions": "0+", "ignorable": 
true, "about": "The topic id." },
+  { "name": "IsInternal", "type": "bool", "versions": "0+", "default": 
"false", "ignorable": true,
+"about": "True if the topic is internal." },
+  { "name": "Partitions", "type": 
"[]DescribeTopicPartitionsResponsePartition", "versions": "0+",
+"about": "Each partition in the topic.", "fields": [
+{ "name": "ErrorCode", "type": "int16", "versions": "0+",
+  "about": "The partition error, or 0 if there was no error." },
+{ "name": "PartitionIndex", "type": "int32", "versions": "0+",
+  "about": "The partition index." },
+{ "name": "LeaderId", "type": "int32", "versions": "0+", "entityType": 
"brokerId",
+  "about": "The ID of the leader broker." },
+{ "name": "LeaderEpoch", "type": "int32", "versions": "0+", "default": 
"-1", "ignorable": true,
+  "about": "The leader epoch of this partition." },
+{ "name": "ReplicaNodes", "type": "[]int32", "versions": "0+", 
"entityType": "brokerId",
+  "about": "The set of all nodes that host this partition." },
+{ "name": "IsrNodes", "type": "[]int32", "versions": "0+", 
"entityType": "brokerId",
+  "about": "The set of nodes that are in sync with the leader for this 
partition." },
+{ "name": "EligibleLeaderReplicas", "type": "[]int32", "default": 
"null", "entityType": "brokerId",
+  "versions": "0+", "nullableVersions": "0+",
+  "about": "The new eligible leader replicas otherwise." },
+{ "name": "LastKnownELR", "type": "[]int32", "default": "null", 
"entityType": "brokerId",
+  "versions": "0+", "nullableVersions": "0+",
+  "about": "The last known ELR." },
+{ "name": "OfflineReplicas", "type": "[]int32", "versions": "0+", 
"ignorable": true, "entityType": "brokerId",
+  "about": "The set of offline replicas of this partition." }]},
+  { "name": "TopicAuthorizedOperations", "type": "int32", "versions": 
"0+", "default": "-2147483648",
+"about": "32-bit bitfield to represent authorized operations for this 
topic." }]
+},
+{ "name": "NextTopicPartition", "type": "Cursor", "versions": "0+", 
"nullableVersions": "0+", "default": "null",

Review Comment:
   Updated.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-15484: General Rack Aware Assignor [kafka]

2023-11-20 Thread via GitHub


rreddy-22 commented on code in PR #14481:
URL: https://github.com/apache/kafka/pull/14481#discussion_r1399989044


##
group-coordinator/src/main/java/org/apache/kafka/coordinator/group/assignor/GeneralUniformAssignmentBuilder.java:
##
@@ -14,17 +14,891 @@
  * See the License for the specific language governing permissions and
  * limitations under the License.
  */
-
 package org.apache.kafka.coordinator.group.assignor;
 
+import org.apache.kafka.common.Uuid;
+import org.apache.kafka.server.common.TopicIdPartition;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.Collections;
+import java.util.Comparator;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.Objects;
+import java.util.Set;
+import java.util.TreeSet;
+import java.util.stream.Collectors;
+
+/**
+ * The general uniform assignment builder is used to generate the target 
assignment for a consumer group with
+ * at least one of its members subscribed to a different set of topics.
+ *
+ * Assignments are done according to the following principles:
+ *
+ *  Balance:  Ensure partitions are distributed equally among all 
members.
+ *The difference in assignments sizes between any two 
members
+ *should not exceed one partition. 
+ *  Rack Matching:When feasible, aim to assign partitions to members
+ *located on the same rack thus avoiding cross-zone 
traffic. 
+ *  Stickiness:   Minimize partition movements among members by 
retaining
+ *as much of the existing assignment as possible. 
+ *
+ * This assignment builder prioritizes the above properties in the following 
order:
+ *  Balance > Rack Matching > Stickiness.
+ */
 public class GeneralUniformAssignmentBuilder extends 
AbstractUniformAssignmentBuilder {
 private static final Logger LOG = 
LoggerFactory.getLogger(GeneralUniformAssignmentBuilder.class);
 
+/**
+ * The member metadata obtained from the assignment specification.
+ */
+private final Map members;
+
+/**
+ * The topic and partition metadata describer.
+ */
+private final SubscribedTopicDescriber subscribedTopicDescriber;
+
+/**
+ * The list of all the topic Ids that the consumer group is subscribed to.
+ */
+private final Set subscriptionIds;
+
+/**
+ * Rack information.
+ */
+private final RackInfo rackInfo;
+
+/**
+ * List of subscribed members for each topic.
+ */
+private final Map> membersPerTopic;
+
+/**
+ * The partitions that still need to be assigned.
+ */
+private final Set unassignedPartitions;
+
+/**
+ * All the partitions that have been retained from the existing assignment.
+ */
+private final Set assignedStickyPartitions;
+
+/**
+ * Manages assignments to members based on their current assignment size 
and maximum allowed assignment size.
+ */
+private final AssignmentManager assignmentManager;
+
+/**
+ * List of all the members sorted by their respective assignment sizes.
+ */
+private final TreeSet sortedMembersByAssignmentSize;
+
+/**
+ * Tracks the owner of each partition in the existing assignment on the 
client.
+ *
+ * Only populated when rack aware strategy is used.
+ * Contains partitions that weren't retained due to a rack mismatch.
+ */
+private final Map currentPartitionOwners;
+
+/**
+ * Tracks the owner of each partition in the target assignment.
+ */
+private final Map 
partitionOwnerInTargetAssignment;
+
+/**
+ * Handles all operations related to partition movements during a 
reassignment for balancing the target assignment.
+ */
+private PartitionMovements partitionMovements;
+
+/**
+ * The new assignment that will be returned.
+ */
+private final Map targetAssignment;
+
+public GeneralUniformAssignmentBuilder(AssignmentSpec assignmentSpec, 
SubscribedTopicDescriber subscribedTopicDescriber) {
+this.members = assignmentSpec.members();
+this.subscribedTopicDescriber = subscribedTopicDescriber;
+this.subscriptionIds = assignmentSpec.members().values().stream()
+.flatMap(memberSpec -> memberSpec.subscribedTopicIds().stream())
+.peek(topicId -> {
+int partitionCount = 
subscribedTopicDescriber.numPartitions(topicId);
+if (partitionCount == -1) {
+throw new PartitionAssignorException(
+"Members are subscribed to topic " + topicId + " which 
doesn't exist in the topic metadata."
+);
+}
+})
+.collect(Collectors.toSet());
+this.rackInfo = new RackInfo(assignmentSpec, subscribedTopicDescriber, 
subscriptionIds);
+   

Re: [PR] KAFKA-15484: General Rack Aware Assignor [kafka]

2023-11-20 Thread via GitHub


rreddy-22 commented on code in PR #14481:
URL: https://github.com/apache/kafka/pull/14481#discussion_r1399988448


##
group-coordinator/src/main/java/org/apache/kafka/coordinator/group/assignor/GeneralUniformAssignmentBuilder.java:
##
@@ -14,17 +14,891 @@
  * See the License for the specific language governing permissions and
  * limitations under the License.
  */
-
 package org.apache.kafka.coordinator.group.assignor;
 
+import org.apache.kafka.common.Uuid;
+import org.apache.kafka.server.common.TopicIdPartition;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.Collections;
+import java.util.Comparator;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.Objects;
+import java.util.Set;
+import java.util.TreeSet;
+import java.util.stream.Collectors;
+
+/**
+ * The general uniform assignment builder is used to generate the target 
assignment for a consumer group with
+ * at least one of its members subscribed to a different set of topics.
+ *
+ * Assignments are done according to the following principles:
+ *
+ *  Balance:  Ensure partitions are distributed equally among all 
members.
+ *The difference in assignments sizes between any two 
members
+ *should not exceed one partition. 
+ *  Rack Matching:When feasible, aim to assign partitions to members
+ *located on the same rack thus avoiding cross-zone 
traffic. 
+ *  Stickiness:   Minimize partition movements among members by 
retaining
+ *as much of the existing assignment as possible. 
+ *
+ * This assignment builder prioritizes the above properties in the following 
order:
+ *  Balance > Rack Matching > Stickiness.
+ */
 public class GeneralUniformAssignmentBuilder extends 
AbstractUniformAssignmentBuilder {
 private static final Logger LOG = 
LoggerFactory.getLogger(GeneralUniformAssignmentBuilder.class);
 
+/**
+ * The member metadata obtained from the assignment specification.
+ */
+private final Map members;
+
+/**
+ * The topic and partition metadata describer.
+ */
+private final SubscribedTopicDescriber subscribedTopicDescriber;
+
+/**
+ * The list of all the topic Ids that the consumer group is subscribed to.
+ */
+private final Set subscriptionIds;
+
+/**
+ * Rack information.
+ */
+private final RackInfo rackInfo;
+
+/**
+ * List of subscribed members for each topic.
+ */
+private final Map> membersPerTopic;
+
+/**
+ * The partitions that still need to be assigned.
+ */
+private final Set unassignedPartitions;
+
+/**
+ * All the partitions that have been retained from the existing assignment.
+ */
+private final Set assignedStickyPartitions;
+
+/**
+ * Manages assignments to members based on their current assignment size 
and maximum allowed assignment size.
+ */
+private final AssignmentManager assignmentManager;
+
+/**
+ * List of all the members sorted by their respective assignment sizes.
+ */
+private final TreeSet sortedMembersByAssignmentSize;
+
+/**
+ * Tracks the owner of each partition in the existing assignment on the 
client.
+ *
+ * Only populated when rack aware strategy is used.
+ * Contains partitions that weren't retained due to a rack mismatch.
+ */
+private final Map currentPartitionOwners;
+
+/**
+ * Tracks the owner of each partition in the target assignment.
+ */
+private final Map 
partitionOwnerInTargetAssignment;
+
+/**
+ * Handles all operations related to partition movements during a 
reassignment for balancing the target assignment.
+ */
+private PartitionMovements partitionMovements;
+
+/**
+ * The new assignment that will be returned.
+ */
+private final Map targetAssignment;
+
+public GeneralUniformAssignmentBuilder(AssignmentSpec assignmentSpec, 
SubscribedTopicDescriber subscribedTopicDescriber) {
+this.members = assignmentSpec.members();
+this.subscribedTopicDescriber = subscribedTopicDescriber;
+this.subscriptionIds = assignmentSpec.members().values().stream()
+.flatMap(memberSpec -> memberSpec.subscribedTopicIds().stream())
+.peek(topicId -> {
+int partitionCount = 
subscribedTopicDescriber.numPartitions(topicId);
+if (partitionCount == -1) {
+throw new PartitionAssignorException(
+"Members are subscribed to topic " + topicId + " which 
doesn't exist in the topic metadata."
+);
+}
+})
+.collect(Collectors.toSet());
+this.rackInfo = new RackInfo(assignmentSpec, subscribedTopicDescriber, 
subscriptionIds);
+   

Re: [PR] KAFKA-15215: [KIP-954] support custom DSL store providers [kafka]

2023-11-20 Thread via GitHub


ableegoldman commented on code in PR #14648:
URL: https://github.com/apache/kafka/pull/14648#discussion_r1399973308


##
streams/src/main/java/org/apache/kafka/streams/state/internals/TimestampedKeyValueStoreBuilder.java:
##
@@ -91,11 +91,13 @@ private KeyValueStore maybeWrapLogging(final 
KeyValueStore, Bytes, byte[]>

Review Comment:
   ah, ok, that makes sense. In that case yeah, can you just make a quick pass 
and update the calls in this? thanks!



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[jira] [Commented] (KAFKA-15556) Remove NetworkClientDelegate methods isUnavailable, maybeThrowAuthFailure, and tryConnect

2023-11-20 Thread Phuc Hong Tran (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-15556?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17788233#comment-17788233
 ] 

Phuc Hong Tran commented on KAFKA-15556:


Hi [~kirktrue], just to clarify, with this Jira we want to remove 
"networkClientDelegate.isUnavailable", 
"networkClientDelegate.maybeThrowAuthFailure" in FetchRequestManager and 
"networkClientDelegate.tryConnect" in {{OffsetsRequestManager}} with direct 
call to NetworkClientUtils version of those functions instead?

> Remove NetworkClientDelegate methods isUnavailable, maybeThrowAuthFailure, 
> and tryConnect
> -
>
> Key: KAFKA-15556
> URL: https://issues.apache.org/jira/browse/KAFKA-15556
> Project: Kafka
>  Issue Type: Sub-task
>  Components: clients, consumer
>Reporter: Kirk True
>Assignee: Phuc Hong Tran
>Priority: Major
>  Labels: consumer-threading-refactor
>
> The "new consumer" (i.e. {{{}PrototypeAsyncConsumer{}}}) was designed to 
> handle networking details in a more centralized way. However, in order to 
> reuse code between the existing {{KafkaConsumer}} and the new 
> {{{}PrototypeAsyncConsumer{}}}, that design goal was "relaxed" when the 
> {{NetworkClientDelegate}} capitulated and -stole- copied three methods from 
> {{ConsumerNetworkClient}} related to detecting node status:
>  # {{isUnavailable}}
>  # {{maybeThrowAuthFailure}}
>  # {{tryConnect}}
> Unfortunately, these have found their way into the {{FetchRequestManager}} 
> and {{OffsetsRequestManager}} implementations. We should review if we can 
> clean up—or even remove—this leaky abstraction.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


Re: [PR] KAFKA-15215: [KIP-954] support custom DSL store providers [kafka]

2023-11-20 Thread via GitHub


agavra commented on code in PR #14648:
URL: https://github.com/apache/kafka/pull/14648#discussion_r1399939069


##
streams/src/main/java/org/apache/kafka/streams/kstream/internals/StreamJoinedInternal.java:
##
@@ -18,16 +18,30 @@
 package org.apache.kafka.streams.kstream.internals;
 
 import org.apache.kafka.common.serialization.Serde;
+import org.apache.kafka.streams.TopologyConfig;
 import org.apache.kafka.streams.kstream.StreamJoined;
+import org.apache.kafka.streams.state.DslStoreSuppliers;
 import org.apache.kafka.streams.state.WindowBytesStoreSupplier;
 
 import java.util.Map;
 
 public class StreamJoinedInternal extends StreamJoined {
 
+private final InternalStreamsBuilder builder;
+
 //Needs to be public for testing
-public StreamJoinedInternal(final StreamJoined streamJoined) {
+public StreamJoinedInternal(
+final StreamJoined streamJoined,
+final InternalStreamsBuilder builder
+) {
 super(streamJoined);
+this.builder = builder;
+if (dslStoreSuppliers == null) {
+final TopologyConfig topologyConfig = 
builder.internalTopologyBuilder.topologyConfigs();
+if (topologyConfig != null) {
+dslStoreSuppliers = 
topologyConfig.resolveDslStoreSuppliers().orElse(null);
+}

Review Comment:
   nice catch! added a test and fixed it



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-15585: Add DescribeTopics API server side support [kafka]

2023-11-20 Thread via GitHub


artemlivshits commented on code in PR #14612:
URL: https://github.com/apache/kafka/pull/14612#discussion_r1399872558


##
clients/src/main/resources/common/message/DescribeTopicPartitionsResponse.json:
##
@@ -0,0 +1,66 @@
+// Licensed to the Apache Software Foundation (ASF) under one or more
+// contributor license agreements.  See the NOTICE file distributed with
+// this work for additional information regarding copyright ownership.
+// The ASF licenses this file to You under the Apache License, Version 2.0
+// (the "License"); you may not use this file except in compliance with
+// the License.  You may obtain a copy of the License at
+//
+//http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+
+{
+  "apiKey": 74,
+  "type": "response",
+  "name": "DescribeTopicPartitionsResponse",
+  "validVersions": "0",
+  "flexibleVersions": "0+",
+  "fields": [
+{ "name": "ThrottleTimeMs", "type": "int32", "versions": "0+", 
"ignorable": true,
+  "about": "The duration in milliseconds for which the request was 
throttled due to a quota violation, or zero if the request did not violate any 
quota." },
+{ "name": "Topics", "type": "[]DescribeTopicPartitionsResponseTopic", 
"versions": "0+",
+  "about": "Each topic in the response.", "fields": [
+  { "name": "ErrorCode", "type": "int16", "versions": "0+",
+"about": "The topic error, or 0 if there was no error." },
+  { "name": "Name", "type": "string", "versions": "0+", "mapKey": true, 
"entityType": "topicName", "nullableVersions": "0+",
+"about": "The topic name." },
+  { "name": "TopicId", "type": "uuid", "versions": "0+", "ignorable": 
true, "about": "The topic id." },
+  { "name": "IsInternal", "type": "bool", "versions": "0+", "default": 
"false", "ignorable": true,
+"about": "True if the topic is internal." },
+  { "name": "Partitions", "type": 
"[]DescribeTopicPartitionsResponsePartition", "versions": "0+",
+"about": "Each partition in the topic.", "fields": [
+{ "name": "ErrorCode", "type": "int16", "versions": "0+",
+  "about": "The partition error, or 0 if there was no error." },
+{ "name": "PartitionIndex", "type": "int32", "versions": "0+",
+  "about": "The partition index." },
+{ "name": "LeaderId", "type": "int32", "versions": "0+", "entityType": 
"brokerId",
+  "about": "The ID of the leader broker." },
+{ "name": "LeaderEpoch", "type": "int32", "versions": "0+", "default": 
"-1", "ignorable": true,
+  "about": "The leader epoch of this partition." },
+{ "name": "ReplicaNodes", "type": "[]int32", "versions": "0+", 
"entityType": "brokerId",
+  "about": "The set of all nodes that host this partition." },
+{ "name": "IsrNodes", "type": "[]int32", "versions": "0+", 
"entityType": "brokerId",
+  "about": "The set of nodes that are in sync with the leader for this 
partition." },
+{ "name": "EligibleLeaderReplicas", "type": "[]int32", "default": 
"null", "entityType": "brokerId",
+  "versions": "0+", "nullableVersions": "0+",
+  "about": "The new eligible leader replicas otherwise." },
+{ "name": "LastKnownELR", "type": "[]int32", "default": "null", 
"entityType": "brokerId",
+  "versions": "0+", "nullableVersions": "0+",
+  "about": "The last known ELR." },
+{ "name": "OfflineReplicas", "type": "[]int32", "versions": "0+", 
"ignorable": true, "entityType": "brokerId",
+  "about": "The set of offline replicas of this partition." }]},
+  { "name": "TopicAuthorizedOperations", "type": "int32", "versions": 
"0+", "default": "-2147483648",
+"about": "32-bit bitfield to represent authorized operations for this 
topic." }]
+},
+{ "name": "NextTopicPartition", "type": "Cursor", "versions": "0+", 
"nullableVersions": "0+", "default": "null",

Review Comment:
   If we call it Cursor in the request, I think we should call it NextCursor 
here.



##
core/src/main/scala/kafka/server/KafkaApis.scala:
##
@@ -1356,6 +1373,78 @@ class KafkaApis(val requestChannel: RequestChannel,
   ))
   }
 
+  def handleDescribeTopicPartitionsRequest(request: RequestChannel.Request): 
Unit = {
+metadataCache match {
+  case _: ZkMetadataCache =>
+throw new InvalidRequestException("ZK cluster does not handle 
DescribeTopicPartitions request")
+  case _ =>
+}
+val kRaftMetadataCache = metadataCache.asInstanceOf[KRaftMetadataCache]
+
+val describeTopicPartitionsRequest = 
request.body[DescribeTopicPartitionsRequest].data()
+val topicSet = mutable.SortedSet

Re: [PR] KAFKA-15215: [KIP-954] support custom DSL store providers [kafka]

2023-11-20 Thread via GitHub


agavra commented on code in PR #14648:
URL: https://github.com/apache/kafka/pull/14648#discussion_r1399930568


##
streams/src/main/java/org/apache/kafka/streams/state/internals/TimestampedKeyValueStoreBuilder.java:
##
@@ -91,11 +91,13 @@ private KeyValueStore maybeWrapLogging(final 
KeyValueStore, Bytes, byte[]>

Review Comment:
   yes it was, we needed it in order to pass the tests that I added which check 
that `InMemoryXStore` is in the store heirarchy - otherwise it would stop at 
this change



##
streams/src/main/java/org/apache/kafka/streams/state/internals/TimestampedKeyValueStoreBuilder.java:
##
@@ -91,11 +91,13 @@ private KeyValueStore maybeWrapLogging(final 
KeyValueStore, Bytes, byte[]>

Review Comment:
   yes it was, we needed it in order to pass the tests that I added which check 
that `InMemoryXStore` is in the store heirarchy - otherwise it would stop at 
this level



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-15681: Add support of client-metrics in kafka-configs.sh (KIP-714) [kafka]

2023-11-20 Thread via GitHub


mjsax commented on PR #14632:
URL: https://github.com/apache/kafka/pull/14632#issuecomment-1820056764

   Seems there is a compilation error on the last run:
   ```ask :core:compileScala
   
   [Error] 
/home/jenkins/jenkins-agent/workspace/Kafka_kafka-pr_PR-14632/core/src/main/scala/kafka/server/DynamicConfig.scala:116:47:
 object ClientMetricsConfigs is not a member of package kafka.metrics
   
   [Error] 
/home/jenkins/jenkins-agent/workspace/Kafka_kafka-pr_PR-14632/core/src/main/scala/kafka/server/DynamicConfig.scala:116:17:
 private val clientConfigs in object ClientMetrics is never used
   ```


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[jira] [Created] (KAFKA-15862) Remove SecurityManager Support

2023-11-20 Thread Greg Harris (Jira)
Greg Harris created KAFKA-15862:
---

 Summary: Remove SecurityManager Support
 Key: KAFKA-15862
 URL: https://issues.apache.org/jira/browse/KAFKA-15862
 Project: Kafka
  Issue Type: New Feature
  Components: clients, KafkaConnect, Tiered-Storage
Reporter: Greg Harris
Assignee: Greg Harris


https://cwiki.apache.org/confluence/display/KAFKA/KIP-1006%3A+Remove+SecurityManager+Support



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


Re: [PR] KAFKA-15362: Resolve offline replicas in metadata cache [kafka]

2023-11-20 Thread via GitHub


cmccabe commented on PR #14737:
URL: https://github.com/apache/kafka/pull/14737#issuecomment-1820033474

   @soarez : This PR is causing failures in 
`KRaftClusterTest.testCreateClusterAndPerformReassignment`
   
   ```
   Gradle Test Run :core:test > Gradle Test Executor 2 > KRaftClusterTest > 
testCreateClusterAndPerformReassignment() FAILED
   java.util.concurrent.ExecutionException: 
org.apache.kafka.common.errors.UnknownServerException: The server experienced 
an unexpected error when processing the request.
   at 
java.base/java.util.concurrent.CompletableFuture.reportGet(CompletableFuture.java:396)
   at 
java.base/java.util.concurrent.CompletableFuture.get(CompletableFuture.java:2073)
   at 
org.apache.kafka.common.internals.KafkaFutureImpl.get(KafkaFutureImpl.java:165)
   at 
kafka.server.KRaftClusterTest.testCreateClusterAndPerformReassignment(KRaftClusterTest.scala:479)
   
   Caused by:
   org.apache.kafka.common.errors.UnknownServerException: The server 
experienced an unexpected error when processing the request.```


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[jira] [Created] (KAFKA-15861) In Kraft mode, "ssl.keystore.key" private keys are accesible to all the controllers and brokers

2023-11-20 Thread Jira
Jesús Cea created KAFKA-15861:
-

 Summary: In Kraft mode, "ssl.keystore.key" private keys are 
accesible to all the controllers and brokers
 Key: KAFKA-15861
 URL: https://issues.apache.org/jira/browse/KAFKA-15861
 Project: Kafka
  Issue Type: Improvement
Affects Versions: 3.6.0
Reporter: Jesús Cea


Kafka allow dynamic updates of the TLS keys using 
"{color:#00}ssl.keystore.key" and 
"{color:#00}ssl.keystore.certificate.chain{color}". In a KRaft cluster, 
that data is distributed to the entire cluster, so the private keys of the X509 
certificates are widely shared.{color}

To  test this, you could propagate a X.509 certificate update via 
"kafka-configs" for a particular server and then use 
"{color:#00}kafka-metadata-shell.sh" to verify that the new certificate is 
openly shared with all the cluster servers (controllers and brokers) (under 
"{color:#00}image/configs/BROKER:X/listener.name.X.ssl.keystore.key{color}"){color}

You can also verify this doing a "strings" to the "__cluster_metadata-0" topic 
log files and "grep" the PEM private key.

Expected result: I understand the need of the replicated metadata in KRaft 
mode, but the X.509 private key should be shared encrypted with 
"password.encoder.secret", so only the relevant broker is able to decrypt the 
certificate private key, although all the cluster has access to the "opaque" 
encrypted data. If each broker has a (different) high quality 
"password.encoder.secret", the encrypted private key should be safe to 
replicate.

 



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


Re: [PR] MINOR: Always send cumulative failed dirs in HB request [kafka]

2023-11-20 Thread via GitHub


cmccabe merged PR #14770:
URL: https://github.com/apache/kafka/pull/14770


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[jira] [Created] (KAFKA-15860) ControllerRegistration must be written out to the metadata image

2023-11-20 Thread Colin McCabe (Jira)
Colin McCabe created KAFKA-15860:


 Summary: ControllerRegistration must be written out to the 
metadata image
 Key: KAFKA-15860
 URL: https://issues.apache.org/jira/browse/KAFKA-15860
 Project: Kafka
  Issue Type: Bug
Affects Versions: 3.7.0
Reporter: Colin McCabe
Assignee: Colin McCabe






--
This message was sent by Atlassian Jira
(v8.20.10#820010)


Re: [PR] KAFKA-15778 & KAFKA-15779: Implement metrics manager (KIP-714) [kafka]

2023-11-20 Thread via GitHub


junrao commented on code in PR #14699:
URL: https://github.com/apache/kafka/pull/14699#discussion_r1399714279


##
core/src/main/java/kafka/server/ClientMetricsManager.java:
##
@@ -16,31 +16,421 @@
  */
 package kafka.server;
 
+import kafka.metrics.ClientMetricsConfigs;
+import kafka.metrics.ClientMetricsInstance;
+import kafka.metrics.ClientMetricsInstanceMetadata;
+import kafka.metrics.ClientMetricsReceiverPlugin;
+
+import org.apache.kafka.common.Uuid;
+import org.apache.kafka.common.cache.Cache;
+import org.apache.kafka.common.cache.LRUCache;
+import org.apache.kafka.common.cache.SynchronizedCache;
+import org.apache.kafka.common.errors.ApiException;
+import org.apache.kafka.common.errors.InvalidRequestException;
+import org.apache.kafka.common.errors.TelemetryTooLargeException;
+import org.apache.kafka.common.errors.ThrottlingQuotaExceededException;
+import org.apache.kafka.common.errors.UnknownSubscriptionIdException;
+import org.apache.kafka.common.errors.UnsupportedCompressionTypeException;
+import org.apache.kafka.common.message.GetTelemetrySubscriptionsResponseData;
+import org.apache.kafka.common.message.PushTelemetryResponseData;
+import org.apache.kafka.common.protocol.Errors;
+import org.apache.kafka.common.record.CompressionType;
+import org.apache.kafka.common.requests.GetTelemetrySubscriptionsRequest;
+import org.apache.kafka.common.requests.GetTelemetrySubscriptionsResponse;
+import org.apache.kafka.common.requests.PushTelemetryRequest;
+import org.apache.kafka.common.requests.PushTelemetryResponse;
+import org.apache.kafka.common.requests.RequestContext;
+import org.apache.kafka.common.utils.Crc32C;
+import org.apache.kafka.common.utils.Time;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
 import java.io.Closeable;
 import java.io.IOException;
+import java.nio.charset.StandardCharsets;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Collection;
+import java.util.Collections;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.Optional;
 import java.util.Properties;
+import java.util.Set;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.atomic.AtomicInteger;
+import java.util.regex.Pattern;
 
 /**
  * Handles client telemetry metrics requests/responses, subscriptions and 
instance information.
  */
 public class ClientMetricsManager implements Closeable {
 
 private static final Logger log = 
LoggerFactory.getLogger(ClientMetricsManager.class);
-private static final ClientMetricsManager INSTANCE = new 
ClientMetricsManager();
+private static final List SUPPORTED_COMPRESSION_TYPES = 
Collections.unmodifiableList(
+Arrays.asList(CompressionType.ZSTD.id, CompressionType.LZ4.id, 
CompressionType.GZIP.id, CompressionType.SNAPPY.id));
+// Max cache size (16k active client connections per broker)
+private static final int CM_CACHE_MAX_SIZE = 16384;
+
+private final Cache clientInstanceCache;
+private final Map subscriptionMap;
+private final KafkaConfig config;
+private final Time time;
+
+// The latest subscription version is used to determine if subscription 
has changed and needs
+// to re-evaluate the client instance subscription id as per changed 
subscriptions.
+private final AtomicInteger subscriptionUpdateVersion;
 
-public static ClientMetricsManager instance() {
-return INSTANCE;
+public ClientMetricsManager(KafkaConfig config, Time time) {
+this.subscriptionMap = new ConcurrentHashMap<>();
+this.subscriptionUpdateVersion = new AtomicInteger(0);
+this.clientInstanceCache = new SynchronizedCache<>(new 
LRUCache<>(CM_CACHE_MAX_SIZE));
+this.config = config;
+this.time = time;
 }
 
 public void updateSubscription(String subscriptionName, Properties 
properties) {
-// TODO: Implement the update logic to manage subscriptions.
+// Validate the subscription properties.
+ClientMetricsConfigs.validate(subscriptionName, properties);
+// IncrementalAlterConfigs API will send empty configs when all the 
configs are deleted
+// for respective subscription. In that case, we need to remove the 
subscription from the map.
+if (properties.isEmpty()) {
+// Remove the subscription from the map if it exists, else ignore 
the config update.
+if (subscriptionMap.containsKey(subscriptionName)) {
+log.info("Removing subscription [{}] from the subscription 
map", subscriptionName);
+subscriptionMap.remove(subscriptionName);
+this.subscriptionUpdateVersion.incrementAndGet();
+}
+return;
+}
+
+updateClientSubscription(subscriptionName, new 
ClientMetricsConfigs(properties));
+}
+
+public GetTelemetrySubscriptionsResponse 
processGetTelemetrySubscriptionRequest(
+GetTelemetrySubscriptionsRequest req

Re: [PR] MINOR: Mark ConsumerGroupHeartbeat API (v1), OffsetCommit API (v9) and OffsetFetch API (v9) as stable (KIP-848) [kafka]

2023-11-20 Thread via GitHub


jolshan commented on PR #14801:
URL: https://github.com/apache/kafka/pull/14801#issuecomment-1819987867

   https://github.com/apache/kafka/assets/25566826/3f0c8239-2002-4d66-8455-41f299419738";>
   Somehow the tests are fixed and existing failures for java 21🤔 (Again, don't 
think it is your change, but something probably should be done with the CI 
output as we discussed offline today)


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] MINOR: Mark ConsumerGroupHeartbeat API (v1), OffsetCommit API (v9) and OffsetFetch API (v9) as stable (KIP-848) [kafka]

2023-11-20 Thread via GitHub


jolshan commented on PR #14801:
URL: https://github.com/apache/kafka/pull/14801#issuecomment-1819985912

   The failures here are out of control. I don't think it is related to your 
change. I will rebuild.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-15586: Clean shutdown detection - server side [kafka]

2023-11-20 Thread via GitHub


artemlivshits commented on code in PR #14706:
URL: https://github.com/apache/kafka/pull/14706#discussion_r1399798771


##
metadata/src/main/java/org/apache/kafka/controller/PartitionChangeBuilder.java:
##
@@ -466,6 +466,10 @@ private void 
maybeUpdateLastKnownLeader(PartitionChangeRecord record) {
 partition.lastKnownElr[0] != partition.leader)) {
 // Only update the last known leader when the first time the 
partition becomes leaderless.
 record.setLastKnownELR(Arrays.asList(partition.leader));
+} else if ((recordHasLeader(record) || partition.leader != NO_LEADER)

Review Comment:
   Can we have a case when `record.leader() == NO_LEADER  && partition.leader 
!= NO_LEADER` (i.e. partion has a valid leader but will not have a valid 
leader)?
   
   Would a more proper condition be `(recordHasLeader(record) || 
(partition.leader != NO_LEADER && record.leader() == NO_LEADER_CHANGE))`



##
metadata/src/main/java/org/apache/kafka/controller/ReplicationControlManager.java:
##
@@ -1772,7 +1814,8 @@ void generateLeaderAndIsrUpdates(String context,
 // where there is an unclean leader election which chooses a leader 
from outside
 // the ISR.
 IntPredicate isAcceptableLeader =
-r -> (r != brokerToRemove) && (r == brokerToAdd || 
clusterControl.isActive(r));
+r -> (r != brokerToRemove && r != brokerWithUncleanShutdown)

Review Comment:
   Can we update the comments to reflect the new logic?



##
metadata/src/main/java/org/apache/kafka/controller/BrokersToElrs.java:
##
@@ -0,0 +1,161 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.kafka.controller;

Review Comment:
   It looks like that in order to work correctly, we need to make sure that ISR 
and ELR are always disjoint sets (otherwise we'd generate multiple records for 
the same partition).  Can we add a comment about this?



##
metadata/src/main/java/org/apache/kafka/controller/BrokersToElrs.java:
##
@@ -0,0 +1,161 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.kafka.controller;
+
+import org.apache.kafka.common.Uuid;
+import org.apache.kafka.metadata.Replicas;
+import org.apache.kafka.timeline.SnapshotRegistry;
+import org.apache.kafka.timeline.TimelineHashMap;
+
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.Map;
+
+import static org.apache.kafka.metadata.Replicas.NONE;
+
+public class BrokersToElrs {
+private final SnapshotRegistry snapshotRegistry;
+
+private final TimelineHashMap> 
elrMembers;

Review Comment:
   Can we add a comment what's mapped to what here?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] [WIP] KAFKA-8677: Flaky test testNoDescribeProduceOrConsumeWithoutTopicDescribeAcl [kafka]

2023-11-20 Thread via GitHub


anatasiavela closed pull request #7267: [WIP] KAFKA-8677: Flaky test 
testNoDescribeProduceOrConsumeWithoutTopicDescribeAcl
URL: https://github.com/apache/kafka/pull/7267


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-15215: [KIP-954] support custom DSL store providers [kafka]

2023-11-20 Thread via GitHub


ableegoldman commented on code in PR #14648:
URL: https://github.com/apache/kafka/pull/14648#discussion_r1399822846


##
streams/src/main/java/org/apache/kafka/streams/state/internals/TimestampedKeyValueStoreBuilder.java:
##
@@ -91,11 +91,13 @@ private KeyValueStore maybeWrapLogging(final 
KeyValueStore, Bytes, byte[]>

Review Comment:
   was this related to the changes in this PR? if not, let's leave it out for 
now, especially as we'd probably want to clean it up a bit further and change 
the `wrapped` invocations to `super` throughout the class
   
   but feel free to do a followup PR with this!



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-15215: [KIP-954] support custom DSL store providers [kafka]

2023-11-20 Thread via GitHub


ableegoldman commented on code in PR #14648:
URL: https://github.com/apache/kafka/pull/14648#discussion_r1399821160


##
streams/src/main/java/org/apache/kafka/streams/state/DslStoreSuppliers.java:
##
@@ -0,0 +1,41 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.streams.state;
+
+import java.util.Map;
+import org.apache.kafka.common.Configurable;
+
+/**
+ * {@code DslStoreSuppliers} defines a grouping of factories to construct
+ * stores for each of the types of state store implementations in Kafka
+ * Streams. This allows configuration of a default store supplier beyond
+ * the builtin defaults of RocksDB and In-Memory.

Review Comment:
   This looks great, thanks!



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[PR] MINOR: Increase the delta for a flaky quota test [kafka]

2023-11-20 Thread via GitHub


harinirajendran opened a new pull request, #14806:
URL: https://github.com/apache/kafka/pull/14806

   Flaky test failing with assertion error just outside the defined threshold. 
So, bumping up the delta for quota by 1.
   
   java.util.concurrent.ExecutionException: 
org.opentest4j.AssertionFailedError: Expected rate (30 +- 7), but got 
37.50937734433608 (600 connections / 15.996 sec) ==> expected: <30.0> but was: 
<37.50937734433608>
   
   Fixed in `trunk` via https://github.com/apache/kafka/pull/14805


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-15215: [KIP-954] support custom DSL store providers [kafka]

2023-11-20 Thread via GitHub


ableegoldman commented on code in PR #14648:
URL: https://github.com/apache/kafka/pull/14648#discussion_r1399820405


##
streams/src/main/java/org/apache/kafka/streams/kstream/internals/StreamJoinedInternal.java:
##
@@ -18,16 +18,30 @@
 package org.apache.kafka.streams.kstream.internals;
 
 import org.apache.kafka.common.serialization.Serde;
+import org.apache.kafka.streams.TopologyConfig;
 import org.apache.kafka.streams.kstream.StreamJoined;
+import org.apache.kafka.streams.state.DslStoreSuppliers;
 import org.apache.kafka.streams.state.WindowBytesStoreSupplier;
 
 import java.util.Map;
 
 public class StreamJoinedInternal extends StreamJoined {
 
+private final InternalStreamsBuilder builder;
+
 //Needs to be public for testing
-public StreamJoinedInternal(final StreamJoined streamJoined) {
+public StreamJoinedInternal(
+final StreamJoined streamJoined,
+final InternalStreamsBuilder builder
+) {
 super(streamJoined);
+this.builder = builder;
+if (dslStoreSuppliers == null) {
+final TopologyConfig topologyConfig = 
builder.internalTopologyBuilder.topologyConfigs();
+if (topologyConfig != null) {
+dslStoreSuppliers = 
topologyConfig.resolveDslStoreSuppliers().orElse(null);
+}

Review Comment:
   Hm...is this logic correct in the context of the initialization logic in 
[OuterStreamJoinedStoreFactory](https://github.com/apache/kafka/pull/14648/files#r1399812871)
   
   My read of that code, specifically case 1, was that we interpret 
`streamJoined#dslStoreSuppliers` as what the user passed in to the 
StreamJoined, so it should be null (and stay null) if the user didn't 
explicitly configure a dslStoreSuppliers.
   
   Then in case 3 (or the else branch of case 2), we fall back to the config 
value if neither the `dslStoreSuppliers` or the `thisStoreSupplier` of the 
StreamJoined was set/non-null



##
streams/src/main/java/org/apache/kafka/streams/kstream/internals/StreamJoinedInternal.java:
##
@@ -18,16 +18,30 @@
 package org.apache.kafka.streams.kstream.internals;
 
 import org.apache.kafka.common.serialization.Serde;
+import org.apache.kafka.streams.TopologyConfig;
 import org.apache.kafka.streams.kstream.StreamJoined;
+import org.apache.kafka.streams.state.DslStoreSuppliers;
 import org.apache.kafka.streams.state.WindowBytesStoreSupplier;
 
 import java.util.Map;
 
 public class StreamJoinedInternal extends StreamJoined {
 
+private final InternalStreamsBuilder builder;
+
 //Needs to be public for testing
-public StreamJoinedInternal(final StreamJoined streamJoined) {
+public StreamJoinedInternal(
+final StreamJoined streamJoined,
+final InternalStreamsBuilder builder
+) {
 super(streamJoined);
+this.builder = builder;
+if (dslStoreSuppliers == null) {
+final TopologyConfig topologyConfig = 
builder.internalTopologyBuilder.topologyConfigs();
+if (topologyConfig != null) {
+dslStoreSuppliers = 
topologyConfig.resolveDslStoreSuppliers().orElse(null);
+}

Review Comment:
   Hm...is this logic correct in the context of the initialization logic in 
[OuterStreamJoinedStoreFactory](https://github.com/apache/kafka/pull/14648/files#r1399812871)?
   
   My read of that code, specifically case 1, was that we interpret 
`streamJoined#dslStoreSuppliers` as what the user passed in to the 
StreamJoined, so it should be null (and stay null) if the user didn't 
explicitly configure a dslStoreSuppliers.
   
   Then in case 3 (or the else branch of case 2), we fall back to the config 
value if neither the `dslStoreSuppliers` or the `thisStoreSupplier` of the 
StreamJoined was set/non-null



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-15215: [KIP-954] support custom DSL store providers [kafka]

2023-11-20 Thread via GitHub


ableegoldman commented on code in PR #14648:
URL: https://github.com/apache/kafka/pull/14648#discussion_r1399812871


##
streams/src/main/java/org/apache/kafka/streams/kstream/internals/OuterStreamJoinStoreFactory.java:
##
@@ -0,0 +1,212 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.streams.kstream.internals;
+
+import static 
org.apache.kafka.streams.internals.ApiUtils.prepareMillisCheckFailMsgPrefix;
+import static 
org.apache.kafka.streams.internals.ApiUtils.validateMillisecondDuration;
+
+import java.time.Duration;
+import java.util.Map;
+import org.apache.kafka.common.utils.Time;
+import org.apache.kafka.streams.kstream.JoinWindows;
+import org.apache.kafka.streams.processor.StateStore;
+import org.apache.kafka.streams.processor.internals.StoreFactory;
+import org.apache.kafka.streams.state.DslKeyValueParams;
+import org.apache.kafka.streams.state.DslStoreSuppliers;
+import org.apache.kafka.streams.state.KeyValueBytesStoreSupplier;
+import org.apache.kafka.streams.state.KeyValueStore;
+import org.apache.kafka.streams.state.StoreBuilder;
+import org.apache.kafka.streams.state.Stores;
+import 
org.apache.kafka.streams.state.internals.InMemoryWindowBytesStoreSupplier;
+import org.apache.kafka.streams.state.internals.LeftOrRightValue;
+import org.apache.kafka.streams.state.internals.LeftOrRightValueSerde;
+import org.apache.kafka.streams.state.internals.ListValueStoreBuilder;
+import 
org.apache.kafka.streams.state.internals.RocksDbWindowBytesStoreSupplier;
+import org.apache.kafka.streams.state.internals.TimestampedKeyAndJoinSide;
+import org.apache.kafka.streams.state.internals.TimestampedKeyAndJoinSideSerde;
+
+public class OuterStreamJoinStoreFactory extends 
AbstractConfigurableStoreFactory {
+
+private final String name;
+private final StreamJoinedInternal streamJoined;
+private final JoinWindows windows;
+private final DslStoreSuppliers streamJoinedDslStoreSuppliers;
+
+private boolean loggingEnabled;
+
+public enum Type {
+RIGHT,
+LEFT
+}
+
+public OuterStreamJoinStoreFactory(
+final String name,
+final StreamJoinedInternal streamJoined,
+final JoinWindows windows,
+final Type type
+) {
+super(streamJoined.dslStoreSuppliers());
+
+// we store this one manually instead of relying on 
super#dslStoreSuppliers()
+// so that we can differentiate between one that was explicitly passed 
in and
+// one that was configured via super#configure()
+this.streamJoinedDslStoreSuppliers = streamJoined.dslStoreSuppliers();
+this.name = buildOuterJoinWindowStoreName(streamJoined, name, type) + 
"-store";
+this.streamJoined = streamJoined;
+this.windows = windows;
+this.loggingEnabled = streamJoined.loggingEnabled();
+}
+
+@Override
+public StateStore build() {
+final Duration retentionPeriod = Duration.ofMillis(retentionPeriod());
+final Duration windowSize = Duration.ofMillis(windows.size());
+final String rpMsgPrefix = 
prepareMillisCheckFailMsgPrefix(retentionPeriod, "retentionPeriod");
+final long retentionMs = validateMillisecondDuration(retentionPeriod, 
rpMsgPrefix);
+final String wsMsgPrefix = prepareMillisCheckFailMsgPrefix(windowSize, 
"windowSize");
+final long windowSizeMs = validateMillisecondDuration(windowSize, 
wsMsgPrefix);
+
+if (retentionMs < 0L) {
+throw new IllegalArgumentException("retentionPeriod cannot be 
negative");
+}
+if (windowSizeMs < 0L) {
+throw new IllegalArgumentException("windowSize cannot be 
negative");
+}
+if (windowSizeMs > retentionMs) {
+throw new IllegalArgumentException("The retention period of the 
window store "
++ name + " must be no smaller than its window size. Got 
size=["
++ windowSizeMs + "], retention=[" + retentionMs + "]");
+}
+
+final TimestampedKeyAndJoinSideSerde timestampedKeyAndJoinSideSerde 
= new TimestampedKeyAndJoinSideSerde<>(streamJoined.keySerde());
+final LeftOrRightValueSerde leftOrR

[PR] MINOR: Increase the delta for a flaky quota test [kafka]

2023-11-20 Thread via GitHub


harinirajendran opened a new pull request, #14805:
URL: https://github.com/apache/kafka/pull/14805

   *More detailed description of your change,
   if necessary. The PR title and PR message become
   the squashed commit message, so use a separate
   comment to ping reviewers.*
   
   *Summary of testing strategy (including rationale)
   for the feature or bug fix. Unit and/or integration
   tests are expected for any behaviour change and
   system tests should be considered for larger changes.*
   
   ### Committer Checklist (excluded from commit message)
   - [ ] Verify design and implementation 
   - [ ] Verify test coverage and CI build status
   - [ ] Verify documentation (including upgrade notes)
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-15215: [KIP-954] support custom DSL store providers [kafka]

2023-11-20 Thread via GitHub


ableegoldman commented on PR #14648:
URL: https://github.com/apache/kafka/pull/14648#issuecomment-1819878453

   Haha yeah it's certainly grown quite a lot over the past few updates -- but 
for that reason I'm actually fine with keeping it in one PR. The first time I 
reviewed it there was significantly fewer LOC and I've just been looking at the 
updates since then. So no worries
   
   (If you just threw this PR at me right out of the gate, that'd be a 
different story lol)


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-15629: Proposal to introduce IQv2 Query Types: TimestampedKeyQuery and TimestampedRangeQuery [kafka]

2023-11-20 Thread via GitHub


mjsax commented on code in PR #14570:
URL: https://github.com/apache/kafka/pull/14570#discussion_r1399779288


##
streams/src/test/java/org/apache/kafka/streams/integration/IQv2StoreIntegrationTest.java:
##
@@ -1632,10 +1729,65 @@ public  void shouldHandleRangeQuery(
 IllegalArgumentException.class,
 queryResult.get(partition)::getFailureMessage
 );
-
 try (final KeyValueIterator iterator = 
queryResult.get(partition).getResult()) {
 while (iterator.hasNext()) {
-
actualValues.add(valueExtactor.apply(iterator.next().value));
+actualValues.add((Integer) iterator.next().value);
+}
+}
+assertThat(queryResult.get(partition).getExecutionInfo(), 
is(empty()));
+}
+assertThat("Result:" + result, actualValues, is(expectedValues));
+assertThat("Result:" + result, result.getPosition(), 
is(INPUT_POSITION));
+}
+}
+
+public  void shouldHandleTimestampedRangeQuery(
+final Optional lower,
+final Optional upper,
+final boolean isKeyAscending,
+final List expectedValues) {

Review Comment:
   Cf my comment above -- you need to set the right timestamps for each result. 
But it should be deterministic because the used ts are set by the test when 
input data is written.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-15629: Proposal to introduce IQv2 Query Types: TimestampedKeyQuery and TimestampedRangeQuery [kafka]

2023-11-20 Thread via GitHub


mjsax commented on code in PR #14570:
URL: https://github.com/apache/kafka/pull/14570#discussion_r1399777340


##
streams/src/test/java/org/apache/kafka/streams/integration/IQv2StoreIntegrationTest.java:
##
@@ -1584,7 +1646,43 @@ public  void shouldHandleKeyQuery(
 );
 
 final V result1 = queryResult.getResult();
-final Integer integer = valueExtactor.apply(result1);
+final Integer integer = (Integer) result1;
+assertThat(integer, is(expectedValue));
+assertThat(queryResult.getExecutionInfo(), is(empty()));
+assertThat(queryResult.getPosition(), is(POSITION_0));
+}
+
+public  void shouldHandleTimestampedKeyQuery(
+final Integer key,
+final Integer expectedValue) {

Review Comment:
   We should compare both (what happens automatically if you compare 
`ValueAndTimestamp` type). Thus, you need to set the right timestamp on 
expected `ValueAndTimestamp`.
   
   The ts comes from the data, and is set in `before()` to `WINDOW_START + 
Duration.ofMinutes(2).toMillis() * i` when input data is written.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-15801: WIP Add hostname and port in NetworkClient logging and increase connection issues logging severity. [kafka]

2023-11-20 Thread via GitHub


GianlucaPrincipini commented on PR #14804:
URL: https://github.com/apache/kafka/pull/14804#issuecomment-1819828622

   Any suggestions to retrieve port in the best possible way? 


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[PR] [KAFKA-15801][WIP] Add hostname and port in netword NetworkClient logging and increase connection issues severity [kafka]

2023-11-20 Thread via GitHub


GianlucaPrincipini opened a new pull request, #14804:
URL: https://github.com/apache/kafka/pull/14804

   ## Description
   When a component of the Kafka broker tries to reach another broker within 
the cluster the logging should be more elaborate and include the IP/hostname 
and port it tries to connect to, and have a higher severity WARN rather than 
INFO.
   
   Furthermore, the hostname has been added to all logging in NetworkClient. 
   
   ### Committer Checklist (excluded from commit message)
   - [x] Verify design and implementation 
   - [x] Verify test coverage and CI build status
   - [ ] Verify documentation (including upgrade notes)
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-15143: Adding in MockFixedKeyProcessorContext and Test [kafka]

2023-11-20 Thread via GitHub


lihaosky commented on code in PR #14605:
URL: https://github.com/apache/kafka/pull/14605#discussion_r1399755568


##
streams/test-utils/src/test/java/org/apache/kafka/streams/test/MockFixedKeyProcessorContextTest.java:
##
@@ -0,0 +1,315 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.streams.test;
+
+import org.apache.kafka.common.serialization.Serdes;
+import org.apache.kafka.streams.StreamsConfig;
+import org.apache.kafka.streams.processor.PunctuationType;
+import org.apache.kafka.streams.processor.Punctuator;
+import org.apache.kafka.streams.processor.TaskId;
+import org.apache.kafka.streams.processor.api.FixedKeyProcessor;
+import org.apache.kafka.streams.processor.api.FixedKeyProcessorContext;
+import org.apache.kafka.streams.processor.api.FixedKeyRecord;
+import org.apache.kafka.streams.processor.api.InternalFixedKeyRecordFactory;
+import org.apache.kafka.streams.processor.api.MockFixedKeyProcessorContext;
+import 
org.apache.kafka.streams.processor.api.MockProcessorContext.CapturedForward;
+import org.apache.kafka.streams.processor.api.Record;
+import org.apache.kafka.streams.processor.api.RecordMetadata;
+import org.apache.kafka.streams.state.KeyValueStore;
+import org.apache.kafka.streams.state.StoreBuilder;
+import org.apache.kafka.streams.state.Stores;
+import org.junit.jupiter.api.Test;
+
+import java.io.File;
+import java.time.Duration;
+import java.util.List;
+import java.util.Optional;
+import java.util.Properties;
+
+import static java.util.Arrays.asList;
+import static java.util.Collections.singletonList;
+import static org.apache.kafka.common.utils.Utils.mkEntry;
+import static org.apache.kafka.common.utils.Utils.mkMap;
+import static org.apache.kafka.common.utils.Utils.mkProperties;
+import static org.hamcrest.MatcherAssert.assertThat;
+import static org.hamcrest.Matchers.empty;
+import static org.hamcrest.Matchers.is;
+
+public class MockFixedKeyProcessorContextTest {

Review Comment:
   Can you parameterize `MockProcessorContextAPITest`? Most of the code here 
seems to be the same as what's in `MockProcessorContextAPITest`?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[PR] Update Streams API broker compat table [kafka]

2023-11-20 Thread via GitHub


JimGalasyn opened a new pull request, #14803:
URL: https://github.com/apache/kafka/pull/14803

   Update the table for v3.6. Also needs to be cherry-picked to the 3.6 branch.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-15618: Kafka metrics collector and supporting classes (KIP-714) [kafka]

2023-11-20 Thread via GitHub


xvrl commented on code in PR #14620:
URL: https://github.com/apache/kafka/pull/14620#discussion_r1399702579


##
clients/src/main/java/org/apache/kafka/common/telemetry/internals/KafkaMetricsCollector.java:
##
@@ -0,0 +1,334 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.common.telemetry.internals;
+
+import org.apache.kafka.common.KafkaException;
+import org.apache.kafka.common.MetricName;
+import org.apache.kafka.common.metrics.Gauge;
+import org.apache.kafka.common.metrics.KafkaMetric;
+import org.apache.kafka.common.metrics.Measurable;
+import org.apache.kafka.common.metrics.MetricValueProvider;
+import org.apache.kafka.common.metrics.stats.Avg;
+import org.apache.kafka.common.metrics.stats.CumulativeCount;
+import org.apache.kafka.common.metrics.stats.CumulativeSum;
+import org.apache.kafka.common.metrics.stats.Frequencies;
+import org.apache.kafka.common.metrics.stats.Max;
+import org.apache.kafka.common.metrics.stats.Meter;
+import org.apache.kafka.common.metrics.stats.Min;
+import org.apache.kafka.common.metrics.stats.Percentiles;
+import org.apache.kafka.common.metrics.stats.Rate;
+import org.apache.kafka.common.metrics.stats.SimpleRate;
+import org.apache.kafka.common.metrics.stats.WindowedCount;
+import 
org.apache.kafka.common.telemetry.internals.LastValueTracker.InstantAndValue;
+import org.apache.kafka.common.utils.Time;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.lang.reflect.Field;
+import java.time.Instant;
+import java.util.List;
+import java.util.Map;
+import java.util.Map.Entry;
+import java.util.Optional;
+import java.util.Set;
+import java.util.concurrent.ConcurrentHashMap;
+
+/**
+ * All metrics implement the {@link MetricValueProvider} interface. They are 
divided into
+ * two base types:
+ *
+ * 
+ * {@link Gauge}
+ * {@link Measurable}
+ * 
+ *
+ * {@link Gauge Gauges} can have any value, but we only collect metrics with 
number values.
+ * {@link Measurable Measurables} are divided into simple types with single 
values
+ * ({@link Avg}, {@link CumulativeCount}, {@link Min}, {@link Max}, {@link 
Rate},
+ * {@link SimpleRate}, and {@link CumulativeSum}) and compound types ({@link 
Frequencies},
+ * {@link Meter}, and {@link Percentiles}).
+ *
+ * 
+ *
+ * We can safely assume that a {@link CumulativeCount count} always increases 
in steady state. It
+ * should be a bug if a count metric decreases.
+ *
+ * 
+ *
+ * Total and Sum are treated as a monotonically increasing counter. The 
javadocs for Total metric type
+ * say "An un-windowed cumulative total maintained over all time.". The 
standalone Total metrics in
+ * the codebase seem to be cumulative metrics that will always increase. The 
Total metric underlying
+ * Meter type is mostly a Total of a Count metric.
+ * We can assume that a Total metric always increases (but it is not 
guaranteed as the sample values might be both
+ * negative or positive).
+ * For now, Total is converted to CUMULATIVE_DOUBLE unless we find a valid 
counter-example.
+ *
+ * 
+ *
+ * The Sum as it is a sample sum which is not a cumulative metric. It is 
converted to GAUGE_DOUBLE.
+ *
+ * 
+ *
+ * The compound metrics are virtual metrics. They are composed of simple types 
or anonymous measurable types
+ * which are reported. A compound metric is never reported as-is.
+ *
+ * 
+ *
+ * A Meter metric is always created with and reported as 2 metrics: a rate and 
a count. For eg:
+ * org.apache.kafka.common.network.Selector has Meter metric for 
"connection-close" but it has to be
+ * created with a "connection-close-rate" metric of type rate and a 
"connection-close-total"
+ * metric of type total.
+ *
+ * 
+ *
+ * Frequencies is created with an array of Frequency objects. When a 
Frequencies metric is registered, each
+ * member Frequency object is converted into an anonymous Measurable and 
registered. So, a Frequencies metric
+ * is reported with a set of measurables with name = Frequency.name(). As 
there is no way to figure out the
+ * compound type, each component measurables is converted to a GAUGE_DOUBLE.
+ *
+ * 
+ *
+ * Percentiles work the same way as Frequencies. The only difference is that 
it is compose

Re: [PR] KAFKA-15363: Broker log directory failure changes [kafka]

2023-11-20 Thread via GitHub


OmniaGM commented on code in PR #14790:
URL: https://github.com/apache/kafka/pull/14790#discussion_r1399670915


##
core/src/main/scala/kafka/server/ReplicaAlterLogDirsThread.scala:
##
@@ -76,13 +82,49 @@ class ReplicaAlterLogDirsThread(name: String,
 futureLog.updateHighWatermark(partitionData.highWatermark)
 futureLog.maybeIncrementLogStartOffset(partitionData.logStartOffset, 
LogStartOffsetIncrementReason.LeaderOffsetIncremented)
 
-if (partition.maybeReplaceCurrentWithFutureReplica())
-  removePartitions(Set(topicPartition))
+directoryEventHandler match {
+  case DirectoryEventHandler.NOOP =>
+if (partition.maybeReplaceCurrentWithFutureReplica())
+  removePartitions(Set(topicPartition))
+  case _ =>
+maybePromoteFutureReplica(topicPartition, partition)
+}
 
 quota.record(records.sizeInBytes)
 logAppendInfo
   }
 
+  // Visible for testing
+  def updatedAssignmentRequestStat(topicPartition: TopicPartition)(state: 
DirectoryEventRequestState): Unit = {
+assignmentRequestStates.put(topicPartition, state)
+  }
+  private def maybePromoteFutureReplica(topicPartition: TopicPartition, 
partition: Partition) = {
+val partitionRequestState = 
Option(assignmentRequestStates.get(topicPartition))
+val topicId = partition.topicId
+if (topicId.isEmpty)
+  throw new IllegalStateException(s"Topic ${topicPartition.topic()} exists 
but its ID doesn't exist.")

Review Comment:
   fixed



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-15363: Broker log directory failure changes [kafka]

2023-11-20 Thread via GitHub


OmniaGM commented on code in PR #14790:
URL: https://github.com/apache/kafka/pull/14790#discussion_r1399669761


##
core/src/main/scala/kafka/server/ReplicaAlterLogDirsThread.scala:
##
@@ -76,13 +82,49 @@ class ReplicaAlterLogDirsThread(name: String,
 futureLog.updateHighWatermark(partitionData.highWatermark)
 futureLog.maybeIncrementLogStartOffset(partitionData.logStartOffset, 
LogStartOffsetIncrementReason.LeaderOffsetIncremented)
 
-if (partition.maybeReplaceCurrentWithFutureReplica())
-  removePartitions(Set(topicPartition))
+directoryEventHandler match {
+  case DirectoryEventHandler.NOOP =>
+if (partition.maybeReplaceCurrentWithFutureReplica())
+  removePartitions(Set(topicPartition))
+  case _ =>
+maybePromoteFutureReplica(topicPartition, partition)
+}
 
 quota.record(records.sizeInBytes)
 logAppendInfo
   }
 
+  // Visible for testing
+  def updatedAssignmentRequestStat(topicPartition: TopicPartition)(state: 
DirectoryEventRequestState): Unit = {
+assignmentRequestStates.put(topicPartition, state)
+  }
+  private def maybePromoteFutureReplica(topicPartition: TopicPartition, 
partition: Partition) = {
+val partitionRequestState = 
Option(assignmentRequestStates.get(topicPartition))
+val topicId = partition.topicId
+if (topicId.isEmpty)
+  throw new IllegalStateException(s"Topic ${topicPartition.topic()} exists 
but its ID doesn't exist.")
+
+partitionRequestState match {
+  case None =>
+// Schedule assignment request and don't promote the future replica 
yet until the controller accepted the request.
+partition.maybeFutureReplicaCaughtUp(_ => {
+  partition.futureReplicaDirectoryId()
+.map {
+  directoryEventHandler.handleAssignment(new 
TopicIdPartition(topicId.get, topicPartition.partition()), _,
+updatedAssignmentRequestStat(topicPartition)(_))
+}
+})
+  case Some(DirectoryEventRequestState.COMPLETED) =>
+// Promote future replica if controller accepted the request and the 
replica caught-up with the original log.
+if (partition.maybeReplaceCurrentWithFutureReplica()) {
+  removePartitions(Set(topicPartition))
+  assignmentRequestStates.remove(topicPartition)
+}
+  case _ =>
+log.info("Waiting for AssignmentRequest to succeed before promoting 
the future replica.")

Review Comment:
   changed the log level to trace



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-15363: Broker log directory failure changes [kafka]

2023-11-20 Thread via GitHub


OmniaGM commented on code in PR #14790:
URL: https://github.com/apache/kafka/pull/14790#discussion_r1399667766


##
core/src/main/scala/kafka/server/ReplicaAlterLogDirsThread.scala:
##
@@ -76,13 +82,49 @@ class ReplicaAlterLogDirsThread(name: String,
 futureLog.updateHighWatermark(partitionData.highWatermark)
 futureLog.maybeIncrementLogStartOffset(partitionData.logStartOffset, 
LogStartOffsetIncrementReason.LeaderOffsetIncremented)
 
-if (partition.maybeReplaceCurrentWithFutureReplica())
-  removePartitions(Set(topicPartition))
+directoryEventHandler match {
+  case DirectoryEventHandler.NOOP =>
+if (partition.maybeReplaceCurrentWithFutureReplica())
+  removePartitions(Set(topicPartition))
+  case _ =>
+maybePromoteFutureReplica(topicPartition, partition)
+}
 
 quota.record(records.sizeInBytes)
 logAppendInfo
   }
 
+  // Visible for testing
+  def updatedAssignmentRequestStat(topicPartition: TopicPartition)(state: 
DirectoryEventRequestState): Unit = {
+assignmentRequestStates.put(topicPartition, state)
+  }
+  private def maybePromoteFutureReplica(topicPartition: TopicPartition, 
partition: Partition) = {
+val partitionRequestState = 
Option(assignmentRequestStates.get(topicPartition))
+val topicId = partition.topicId
+if (topicId.isEmpty)
+  throw new IllegalStateException(s"Topic ${topicPartition.topic()} exists 
but its ID doesn't exist.")
+
+partitionRequestState match {
+  case None =>
+// Schedule assignment request and don't promote the future replica 
yet until the controller accepted the request.
+partition.maybeFutureReplicaCaughtUp(_ => {
+  partition.futureReplicaDirectoryId()
+.map {
+  directoryEventHandler.handleAssignment(new 
TopicIdPartition(topicId.get, topicPartition.partition()), _,
+updatedAssignmentRequestStat(topicPartition)(_))

Review Comment:
   on second thought, I changed the signature and structure of these callbacks 
and did some refactoring. Now AssignmentManager just runs the callback as 
runnable when the assignment is completed without explicitly sending the state 
`COMPLETE` to the callback method. 



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-15816: Fix leaked sockets in streams tests [kafka]

2023-11-20 Thread via GitHub


gharris1727 commented on code in PR #14769:
URL: https://github.com/apache/kafka/pull/14769#discussion_r1399656534


##
streams/src/test/java/org/apache/kafka/streams/integration/IQv2IntegrationTest.java:
##
@@ -418,6 +418,8 @@ public String metricsScope() {
 })
 );
 
+// do not use the harness streams

Review Comment:
   I changed this, let me know if the new comment makes more sense or if you'd 
like it removed.
   
   I was indicating that the KafkaStreams that the test harness set up for the 
test won't be used, and a different KafkaStreams was being substituted.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-15629: Proposal to introduce IQv2 Query Types: TimestampedKeyQuery and TimestampedRangeQuery [kafka]

2023-11-20 Thread via GitHub


hanyuzheng7 commented on code in PR #14570:
URL: https://github.com/apache/kafka/pull/14570#discussion_r1399656268


##
streams/src/test/java/org/apache/kafka/streams/integration/IQv2StoreIntegrationTest.java:
##
@@ -1632,10 +1729,65 @@ public  void shouldHandleRangeQuery(
 IllegalArgumentException.class,
 queryResult.get(partition)::getFailureMessage
 );
-
 try (final KeyValueIterator iterator = 
queryResult.get(partition).getResult()) {
 while (iterator.hasNext()) {
-
actualValues.add(valueExtactor.apply(iterator.next().value));
+actualValues.add((Integer) iterator.next().value);
+}
+}
+assertThat(queryResult.get(partition).getExecutionInfo(), 
is(empty()));
+}
+assertThat("Result:" + result, actualValues, is(expectedValues));
+assertThat("Result:" + result, result.getPosition(), 
is(INPUT_POSITION));
+}
+}
+
+public  void shouldHandleTimestampedRangeQuery(
+final Optional lower,
+final Optional upper,
+final boolean isKeyAscending,
+final List expectedValues) {

Review Comment:
   It seem every time the timestamp of ` ValueAndTimestamp` is change, so we 
cannot compare ` ValueAndTimestamp`.



##
streams/src/test/java/org/apache/kafka/streams/integration/IQv2StoreIntegrationTest.java:
##
@@ -1632,10 +1729,65 @@ public  void shouldHandleRangeQuery(
 IllegalArgumentException.class,
 queryResult.get(partition)::getFailureMessage
 );
-
 try (final KeyValueIterator iterator = 
queryResult.get(partition).getResult()) {
 while (iterator.hasNext()) {
-
actualValues.add(valueExtactor.apply(iterator.next().value));
+actualValues.add((Integer) iterator.next().value);
+}
+}
+assertThat(queryResult.get(partition).getExecutionInfo(), 
is(empty()));
+}
+assertThat("Result:" + result, actualValues, is(expectedValues));
+assertThat("Result:" + result, result.getPosition(), 
is(INPUT_POSITION));
+}
+}
+
+public  void shouldHandleTimestampedRangeQuery(
+final Optional lower,
+final Optional upper,
+final boolean isKeyAscending,
+final List expectedValues) {

Review Comment:
   It seems every time the timestamp of ` ValueAndTimestamp` is change, so we 
cannot compare ` ValueAndTimestamp`.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-15629: Proposal to introduce IQv2 Query Types: TimestampedKeyQuery and TimestampedRangeQuery [kafka]

2023-11-20 Thread via GitHub


hanyuzheng7 commented on code in PR #14570:
URL: https://github.com/apache/kafka/pull/14570#discussion_r1399652671


##
streams/src/test/java/org/apache/kafka/streams/integration/IQv2StoreIntegrationTest.java:
##
@@ -1584,7 +1646,43 @@ public  void shouldHandleKeyQuery(
 );
 
 final V result1 = queryResult.getResult();
-final Integer integer = valueExtactor.apply(result1);
+final Integer integer = (Integer) result1;
+assertThat(integer, is(expectedValue));
+assertThat(queryResult.getExecutionInfo(), is(empty()));
+assertThat(queryResult.getPosition(), is(POSITION_0));
+}
+
+public  void shouldHandleTimestampedKeyQuery(
+final Integer key,
+final Integer expectedValue) {
+
+final TimestampedKeyQuery query = 
TimestampedKeyQuery.withKey(key);
+final StateQueryRequest> request =
+inStore(STORE_NAME)
+.withQuery(query)
+.withPartitions(mkSet(0, 1))
+.withPositionBound(PositionBound.at(INPUT_POSITION));
+
+final StateQueryResult> result =
+IntegrationTestUtils.iqv2WaitForResult(kafkaStreams, request);
+final QueryResult> queryResult = 
result.getOnlyPartitionResult();
+if (queryResult == null) {
+throw new AssertionError("cannot use this query type to query 
result");
+}
+final boolean failure = queryResult.isFailure();
+if (failure) {
+throw new AssertionError(queryResult.toString());
+}
+assertThat(queryResult.isSuccess(), is(true));
+
+assertThrows(IllegalArgumentException.class, 
queryResult::getFailureReason);
+assertThrows(
+IllegalArgumentException.class,
+queryResult::getFailureMessage
+);
+
+final ValueAndTimestamp result1 = queryResult.getResult();

Review Comment:
   ok



##
streams/src/test/java/org/apache/kafka/streams/integration/IQv2StoreIntegrationTest.java:
##
@@ -1584,7 +1646,43 @@ public  void shouldHandleKeyQuery(
 );
 
 final V result1 = queryResult.getResult();
-final Integer integer = valueExtactor.apply(result1);
+final Integer integer = (Integer) result1;
+assertThat(integer, is(expectedValue));
+assertThat(queryResult.getExecutionInfo(), is(empty()));
+assertThat(queryResult.getPosition(), is(POSITION_0));
+}
+
+public  void shouldHandleTimestampedKeyQuery(
+final Integer key,
+final Integer expectedValue) {
+
+final TimestampedKeyQuery query = 
TimestampedKeyQuery.withKey(key);
+final StateQueryRequest> request =
+inStore(STORE_NAME)
+.withQuery(query)
+.withPartitions(mkSet(0, 1))
+.withPositionBound(PositionBound.at(INPUT_POSITION));
+
+final StateQueryResult> result =
+IntegrationTestUtils.iqv2WaitForResult(kafkaStreams, request);
+final QueryResult> queryResult = 
result.getOnlyPartitionResult();
+if (queryResult == null) {
+throw new AssertionError("cannot use this query type to query 
result");
+}
+final boolean failure = queryResult.isFailure();
+if (failure) {
+throw new AssertionError(queryResult.toString());
+}
+assertThat(queryResult.isSuccess(), is(true));
+
+assertThrows(IllegalArgumentException.class, 
queryResult::getFailureReason);
+assertThrows(
+IllegalArgumentException.class,
+queryResult::getFailureMessage
+);
+
+final ValueAndTimestamp result1 = queryResult.getResult();
+final Integer integer = (Integer) result1.value();
 assertThat(integer, is(expectedValue));

Review Comment:
   ok



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-15629: Proposal to introduce IQv2 Query Types: TimestampedKeyQuery and TimestampedRangeQuery [kafka]

2023-11-20 Thread via GitHub


hanyuzheng7 commented on code in PR #14570:
URL: https://github.com/apache/kafka/pull/14570#discussion_r1399632055


##
streams/src/test/java/org/apache/kafka/streams/integration/IQv2StoreIntegrationTest.java:
##
@@ -1584,7 +1646,43 @@ public  void shouldHandleKeyQuery(
 );
 
 final V result1 = queryResult.getResult();
-final Integer integer = valueExtactor.apply(result1);
+final Integer integer = (Integer) result1;
+assertThat(integer, is(expectedValue));
+assertThat(queryResult.getExecutionInfo(), is(empty()));
+assertThat(queryResult.getPosition(), is(POSITION_0));
+}
+
+public  void shouldHandleTimestampedKeyQuery(
+final Integer key,
+final Integer expectedValue) {

Review Comment:
   But in the end we should compare the value in `ValueAndTimestamp`? 
   so we can set the the timestamp of  `ValueAndTimestamp` -1?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-15629: Proposal to introduce IQv2 Query Types: TimestampedKeyQuery and TimestampedRangeQuery [kafka]

2023-11-20 Thread via GitHub


hanyuzheng7 commented on code in PR #14570:
URL: https://github.com/apache/kafka/pull/14570#discussion_r1399632055


##
streams/src/test/java/org/apache/kafka/streams/integration/IQv2StoreIntegrationTest.java:
##
@@ -1584,7 +1646,43 @@ public  void shouldHandleKeyQuery(
 );
 
 final V result1 = queryResult.getResult();
-final Integer integer = valueExtactor.apply(result1);
+final Integer integer = (Integer) result1;
+assertThat(integer, is(expectedValue));
+assertThat(queryResult.getExecutionInfo(), is(empty()));
+assertThat(queryResult.getPosition(), is(POSITION_0));
+}
+
+public  void shouldHandleTimestampedKeyQuery(
+final Integer key,
+final Integer expectedValue) {

Review Comment:
   But in the end we should compare the value in `ValueAndTimestamp`? 



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-15363: Broker log directory failure changes [kafka]

2023-11-20 Thread via GitHub


OmniaGM commented on code in PR #14790:
URL: https://github.com/apache/kafka/pull/14790#discussion_r1399627999


##
core/src/main/scala/kafka/server/ReplicaAlterLogDirsThread.scala:
##
@@ -76,13 +82,49 @@ class ReplicaAlterLogDirsThread(name: String,
 futureLog.updateHighWatermark(partitionData.highWatermark)
 futureLog.maybeIncrementLogStartOffset(partitionData.logStartOffset, 
LogStartOffsetIncrementReason.LeaderOffsetIncremented)
 
-if (partition.maybeReplaceCurrentWithFutureReplica())
-  removePartitions(Set(topicPartition))
+directoryEventHandler match {
+  case DirectoryEventHandler.NOOP =>
+if (partition.maybeReplaceCurrentWithFutureReplica())
+  removePartitions(Set(topicPartition))
+  case _ =>
+maybePromoteFutureReplica(topicPartition, partition)
+}
 
 quota.record(records.sizeInBytes)
 logAppendInfo
   }
 
+  // Visible for testing
+  def updatedAssignmentRequestStat(topicPartition: TopicPartition)(state: 
DirectoryEventRequestState): Unit = {
+assignmentRequestStates.put(topicPartition, state)
+  }
+  private def maybePromoteFutureReplica(topicPartition: TopicPartition, 
partition: Partition) = {
+val partitionRequestState = 
Option(assignmentRequestStates.get(topicPartition))
+val topicId = partition.topicId
+if (topicId.isEmpty)
+  throw new IllegalStateException(s"Topic ${topicPartition.topic()} exists 
but its ID doesn't exist.")
+
+partitionRequestState match {
+  case None =>
+// Schedule assignment request and don't promote the future replica 
yet until the controller accepted the request.
+partition.maybeFutureReplicaCaughtUp(_ => {
+  partition.futureReplicaDirectoryId()
+.map {
+  directoryEventHandler.handleAssignment(new 
TopicIdPartition(topicId.get, topicPartition.partition()), _,
+updatedAssignmentRequestStat(topicPartition)(_))

Review Comment:
   updated this, now `AssignmentManager` will only update the state to 
`COMPLETED` and `QUEUED` will be set immediately here. 



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-15363: Broker log directory failure changes [kafka]

2023-11-20 Thread via GitHub


OmniaGM commented on code in PR #14790:
URL: https://github.com/apache/kafka/pull/14790#discussion_r1399627137


##
core/src/main/scala/kafka/server/ReplicaAlterLogDirsThread.scala:
##
@@ -76,13 +82,49 @@ class ReplicaAlterLogDirsThread(name: String,
 futureLog.updateHighWatermark(partitionData.highWatermark)
 futureLog.maybeIncrementLogStartOffset(partitionData.logStartOffset, 
LogStartOffsetIncrementReason.LeaderOffsetIncremented)
 
-if (partition.maybeReplaceCurrentWithFutureReplica())
-  removePartitions(Set(topicPartition))
+directoryEventHandler match {
+  case DirectoryEventHandler.NOOP =>
+if (partition.maybeReplaceCurrentWithFutureReplica())
+  removePartitions(Set(topicPartition))
+  case _ =>
+maybePromoteFutureReplica(topicPartition, partition)
+}
 
 quota.record(records.sizeInBytes)
 logAppendInfo
   }
 
+  // Visible for testing
+  def updatedAssignmentRequestStat(topicPartition: TopicPartition)(state: 
DirectoryEventRequestState): Unit = {
+assignmentRequestStates.put(topicPartition, state)
+  }
+  private def maybePromoteFutureReplica(topicPartition: TopicPartition, 
partition: Partition) = {
+val partitionRequestState = 
Option(assignmentRequestStates.get(topicPartition))
+val topicId = partition.topicId
+if (topicId.isEmpty)
+  throw new IllegalStateException(s"Topic ${topicPartition.topic()} exists 
but its ID doesn't exist.")
+
+partitionRequestState match {
+  case None =>
+// Schedule assignment request and don't promote the future replica 
yet until the controller accepted the request.
+partition.maybeFutureReplicaCaughtUp(_ => {
+  partition.futureReplicaDirectoryId()
+.map {
+  directoryEventHandler.handleAssignment(new 
TopicIdPartition(topicId.get, topicPartition.partition()), _,
+updatedAssignmentRequestStat(topicPartition)(_))
+}
+})
+  case Some(DirectoryEventRequestState.COMPLETED) =>
+// Promote future replica if controller accepted the request and the 
replica caught-up with the original log.
+if (partition.maybeReplaceCurrentWithFutureReplica()) {
+  removePartitions(Set(topicPartition))
+  assignmentRequestStates.remove(topicPartition)
+}
+  case _ =>

Review Comment:
   I updated the pr to only deal with `QUEUED` and `COMPLETED`. And dropped 
`DISPATCHED`  



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-15629: Proposal to introduce IQv2 Query Types: TimestampedKeyQuery and TimestampedRangeQuery [kafka]

2023-11-20 Thread via GitHub


mjsax commented on code in PR #14570:
URL: https://github.com/apache/kafka/pull/14570#discussion_r1399617212


##
streams/src/test/java/org/apache/kafka/streams/integration/IQv2StoreIntegrationTest.java:
##
@@ -1584,7 +1646,43 @@ public  void shouldHandleKeyQuery(
 );
 
 final V result1 = queryResult.getResult();
-final Integer integer = valueExtactor.apply(result1);
+final Integer integer = (Integer) result1;
+assertThat(integer, is(expectedValue));
+assertThat(queryResult.getExecutionInfo(), is(empty()));
+assertThat(queryResult.getPosition(), is(POSITION_0));
+}
+
+public  void shouldHandleTimestampedKeyQuery(
+final Integer key,
+final Integer expectedValue) {

Review Comment:
   I think we should pass in `ValueAndTimestamp` as expected result and also 
verify that we get the correct timestamp back. (Also rename `expectedValue` to 
`expectedValueAndTimestamp`.)



##
streams/src/test/java/org/apache/kafka/streams/integration/IQv2StoreIntegrationTest.java:
##
@@ -1584,7 +1646,43 @@ public  void shouldHandleKeyQuery(
 );
 
 final V result1 = queryResult.getResult();
-final Integer integer = valueExtactor.apply(result1);
+final Integer integer = (Integer) result1;
+assertThat(integer, is(expectedValue));
+assertThat(queryResult.getExecutionInfo(), is(empty()));
+assertThat(queryResult.getPosition(), is(POSITION_0));
+}
+
+public  void shouldHandleTimestampedKeyQuery(
+final Integer key,
+final Integer expectedValue) {
+
+final TimestampedKeyQuery query = 
TimestampedKeyQuery.withKey(key);
+final StateQueryRequest> request =
+inStore(STORE_NAME)
+.withQuery(query)
+.withPartitions(mkSet(0, 1))
+.withPositionBound(PositionBound.at(INPUT_POSITION));
+
+final StateQueryResult> result =
+IntegrationTestUtils.iqv2WaitForResult(kafkaStreams, request);
+final QueryResult> queryResult = 
result.getOnlyPartitionResult();
+if (queryResult == null) {
+throw new AssertionError("cannot use this query type to query 
result");
+}
+final boolean failure = queryResult.isFailure();
+if (failure) {
+throw new AssertionError(queryResult.toString());
+}
+assertThat(queryResult.isSuccess(), is(true));
+
+assertThrows(IllegalArgumentException.class, 
queryResult::getFailureReason);
+assertThrows(
+IllegalArgumentException.class,
+queryResult::getFailureMessage
+);
+
+final ValueAndTimestamp result1 = queryResult.getResult();

Review Comment:
   `result1` is a very bad name. Rename to `valueAndTimestamp`.



##
streams/src/test/java/org/apache/kafka/streams/integration/IQv2StoreIntegrationTest.java:
##
@@ -1584,7 +1646,43 @@ public  void shouldHandleKeyQuery(
 );
 
 final V result1 = queryResult.getResult();
-final Integer integer = valueExtactor.apply(result1);
+final Integer integer = (Integer) result1;
+assertThat(integer, is(expectedValue));
+assertThat(queryResult.getExecutionInfo(), is(empty()));
+assertThat(queryResult.getPosition(), is(POSITION_0));
+}
+
+public  void shouldHandleTimestampedKeyQuery(
+final Integer key,
+final Integer expectedValue) {
+
+final TimestampedKeyQuery query = 
TimestampedKeyQuery.withKey(key);
+final StateQueryRequest> request =
+inStore(STORE_NAME)
+.withQuery(query)
+.withPartitions(mkSet(0, 1))
+.withPositionBound(PositionBound.at(INPUT_POSITION));
+
+final StateQueryResult> result =
+IntegrationTestUtils.iqv2WaitForResult(kafkaStreams, request);
+final QueryResult> queryResult = 
result.getOnlyPartitionResult();
+if (queryResult == null) {
+throw new AssertionError("cannot use this query type to query 
result");
+}
+final boolean failure = queryResult.isFailure();
+if (failure) {
+throw new AssertionError(queryResult.toString());
+}
+assertThat(queryResult.isSuccess(), is(true));
+
+assertThrows(IllegalArgumentException.class, 
queryResult::getFailureReason);
+assertThrows(
+IllegalArgumentException.class,
+queryResult::getFailureMessage
+);
+
+final ValueAndTimestamp result1 = queryResult.getResult();
+final Integer integer = (Integer) result1.value();
 assertThat(integer, is(expectedValue));

Review Comment:
   Cf my comments above: I think this can just `  assertThat(valueAndTimestamp, 
is(expectedValueAndTimestamp));`



###

Re: [PR] KAFKA-15363: Broker log directory failure changes [kafka]

2023-11-20 Thread via GitHub


OmniaGM commented on code in PR #14790:
URL: https://github.com/apache/kafka/pull/14790#discussion_r1399605671


##
core/src/main/scala/kafka/server/ReplicaAlterLogDirsThread.scala:
##
@@ -76,13 +82,49 @@ class ReplicaAlterLogDirsThread(name: String,
 futureLog.updateHighWatermark(partitionData.highWatermark)
 futureLog.maybeIncrementLogStartOffset(partitionData.logStartOffset, 
LogStartOffsetIncrementReason.LeaderOffsetIncremented)
 
-if (partition.maybeReplaceCurrentWithFutureReplica())
-  removePartitions(Set(topicPartition))
+directoryEventHandler match {
+  case DirectoryEventHandler.NOOP =>
+if (partition.maybeReplaceCurrentWithFutureReplica())
+  removePartitions(Set(topicPartition))
+  case _ =>
+maybePromoteFutureReplica(topicPartition, partition)
+}

Review Comment:
   Which callback you are referring to here? The order of checking partition 
future replica is different between DirectoryEventHandler.NOOP and when we have 
a handler. 



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] MINOR: Small LogValidator clean ups [kafka]

2023-11-20 Thread via GitHub


hachikuji merged PR #14697:
URL: https://github.com/apache/kafka/pull/14697


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] MINOR: Small LogValidator clean ups [kafka]

2023-11-20 Thread via GitHub


hachikuji commented on PR #14697:
URL: https://github.com/apache/kafka/pull/14697#issuecomment-1819609057

   @junrao I triggered a few builds and didn't see much consistency in the test 
failures. I tried some of the tests locally and they passed. I will go ahead 
and merge to trunk.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-15417 flip joinSpuriousLookBackTimeMs and emit non-joined items [kafka]

2023-11-20 Thread via GitHub


lihaosky commented on code in PR #14426:
URL: https://github.com/apache/kafka/pull/14426#discussion_r1397782005


##
streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamKStreamLeftJoinTest.java:
##
@@ -609,17 +842,18 @@ public void testOrdering() {
 inputTopic1.pipeInput(1, "A1", 100L);
 processor.checkAndClearProcessResult();
 
-// push one item to the other window that has a join; this should 
produce non-joined records with a closed window first, then
-// the joined records
+// push one item to the other window that has a join; 
+// this should produce the joined record first;
+// then non-joined record with a closed window
 // by the time they were produced before
 // w1 = { 0:A0 (ts: 0), 1:A1 (ts: 100) }
 // w2 = { }
 // --> w1 = { 0:A0 (ts: 0), 1:A1 (ts: 100) }
 // --> w2 = { 1:a1 (ts: 110) }
 inputTopic2.pipeInput(1, "a1", 110L);
 processor.checkAndClearProcessResult(
-new KeyValueTimestamp<>(0, "A0+null", 0L),
-new KeyValueTimestamp<>(1, "A1+a1", 110L)
+new KeyValueTimestamp<>(1, "A1+a1", 110L),
+new KeyValueTimestamp<>(0, "A0+null", 0L)

Review Comment:
   This is because previously we look at outer store and then join. But this 
change make it we join first and then look at outer store. The ts in outer 
store and other store is hard to reason. If we change the ts of 0 to be 100 and 
ts of 1 to be 50, the original test would still produce 0 first which has 
larger ts... So unless we compare the ts of join and outer at the same time 
when we output, we can't guarantee the order of ts when output.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-15586: Clean shutdown detection - server side [kafka]

2023-11-20 Thread via GitHub


CalvinConfluent commented on code in PR #14706:
URL: https://github.com/apache/kafka/pull/14706#discussion_r1399579994


##
metadata/src/main/java/org/apache/kafka/controller/ClusterControlManager.java:
##
@@ -327,10 +333,12 @@ public ControllerResult 
registerBroker(
 ", but got cluster ID " + request.clusterId());
 }
 int brokerId = request.brokerId();
+List records = new ArrayList<>();
 BrokerRegistration existing = brokerRegistrations.get(brokerId);
-if (version < 2 || existing == null || request.previousBrokerEpoch() 
!= existing.epoch()) {
-// TODO(KIP-966): Update the ELR if the broker has an unclean 
shutdown.
-log.debug("Received an unclean shutdown request");

Review Comment:
   Actually, the server may receive old registration requests due to poor 
network conditions or during rolling upgrades. I think we should also treat the 
old version requests as unclean shutdowns.



##
metadata/src/main/java/org/apache/kafka/controller/ClusterControlManager.java:
##
@@ -327,10 +333,12 @@ public ControllerResult 
registerBroker(
 ", but got cluster ID " + request.clusterId());
 }
 int brokerId = request.brokerId();
+List records = new ArrayList<>();
 BrokerRegistration existing = brokerRegistrations.get(brokerId);
-if (version < 2 || existing == null || request.previousBrokerEpoch() 
!= existing.epoch()) {
-// TODO(KIP-966): Update the ELR if the broker has an unclean 
shutdown.
-log.debug("Received an unclean shutdown request");

Review Comment:
   Updated.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-15586: Clean shutdown detection - server side [kafka]

2023-11-20 Thread via GitHub


CalvinConfluent commented on code in PR #14706:
URL: https://github.com/apache/kafka/pull/14706#discussion_r1399575912


##
metadata/src/main/java/org/apache/kafka/controller/ReplicationControlManager.java:
##
@@ -1294,8 +1315,11 @@ void handleBrokerFenced(int brokerId, 
List records) {
  */
 void handleBrokerUnregistered(int brokerId, long brokerEpoch,
   List records) {
-generateLeaderAndIsrUpdates("handleBrokerUnregistered", brokerId, 
NO_LEADER, records,
-brokersToIsrs.partitionsWithBrokerInIsr(brokerId));
+generateLeaderAndIsrUpdates("handleBrokerUnregistered", brokerId, 
NO_LEADER, NO_LEADER, records,
+new PartitionsOnReplicaIteratorChain(Arrays.asList(

Review Comment:
   Removed.



##
metadata/src/main/java/org/apache/kafka/controller/PartitionChangeBuilder.java:
##
@@ -466,6 +466,10 @@ private void 
maybeUpdateLastKnownLeader(PartitionChangeRecord record) {
 partition.lastKnownElr[0] != partition.leader)) {
 // Only update the last known leader when the first time the 
partition becomes leaderless.
 record.setLastKnownELR(Arrays.asList(partition.leader));
+} else if ((record.leader() != NO_LEADER && record.leader() != 
NO_LEADER_CHANGE || partition.leader != NO_LEADER)

Review Comment:
   Done.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-15022: Detect negative cycle from one source [kafka]

2023-11-20 Thread via GitHub


lihaosky commented on code in PR #14696:
URL: https://github.com/apache/kafka/pull/14696#discussion_r1399571144


##
streams/src/main/java/org/apache/kafka/streams/processor/internals/assignment/Graph.java:
##
@@ -117,6 +131,8 @@ private Graph(final boolean isResidualGraph) {
 }
 
 public void addEdge(final V u, final V v, final int capacity, final int 
cost, final int flow) {
+Objects.requireNonNull(u);
+Objects.requireNonNull(v);

Review Comment:
   `null` is a special node we introduced in graph as dummy source to detect 
negative cycle. So users can use it



##
streams/src/main/java/org/apache/kafka/streams/processor/internals/assignment/Graph.java:
##
@@ -103,8 +104,21 @@ public String toString() {
 }
 }
 
-private final SortedMap> adjList = new TreeMap<>();
-private final SortedSet nodes = new TreeSet<>();
+private class KeyComparator implements Comparator {
+
+@Override
+public int compare(final V o1, final V o2) {
+if (o1 == null || o2 == null) {
+return -1;

Review Comment:
   Good point, if both are `null`, we can return 0;



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-15705: Add integration tests for Heartbeat API and GroupLeave API [kafka]

2023-11-20 Thread via GitHub


dongnuo123 commented on code in PR #14656:
URL: https://github.com/apache/kafka/pull/14656#discussion_r1399570354


##
core/src/test/scala/unit/kafka/server/HeartbeatRequestTest.scala:
##
@@ -0,0 +1,199 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package kafka.server
+
+import kafka.test.ClusterInstance
+import kafka.test.annotation.{ClusterConfigProperty, ClusterTest, 
ClusterTestDefaults, Type}
+import kafka.test.junit.ClusterTestExtensions
+import kafka.utils.TestUtils
+import org.apache.kafka.clients.consumer.ConsumerPartitionAssignor
+import org.apache.kafka.clients.consumer.internals.ConsumerProtocol
+import org.apache.kafka.common.message.SyncGroupRequestData
+import org.apache.kafka.common.protocol.{ApiKeys, Errors}
+import org.apache.kafka.coordinator.group.generic.GenericGroupState
+import org.junit.jupiter.api.{Tag, Timeout}
+import org.junit.jupiter.api.extension.ExtendWith
+
+import java.util.Collections
+import scala.concurrent.ExecutionContext.Implicits.global
+import scala.concurrent.Future
+
+@Timeout(120)
+@ExtendWith(value = Array(classOf[ClusterTestExtensions]))
+@ClusterTestDefaults(clusterType = Type.KRAFT, brokers = 1)
+@Tag("integration")
+class HeartbeatRequestTest(cluster: ClusterInstance) extends 
GroupCoordinatorBaseRequestTest(cluster) {
+  @ClusterTest(serverProperties = Array(
+new ClusterConfigProperty(key = "group.coordinator.new.enable", value = 
"true"),
+new ClusterConfigProperty(key = "offsets.topic.num.partitions", value = 
"1"),
+new ClusterConfigProperty(key = "offsets.topic.replication.factor", value 
= "1")
+  ))
+  def testHeartbeatWithOldConsumerGroupProtocolAndNewGroupCoordinator(): Unit 
= {
+testHeartbeat()
+  }
+
+  @ClusterTest(clusterType = Type.ALL, serverProperties = Array(
+new ClusterConfigProperty(key = "group.coordinator.new.enable", value = 
"false"),
+new ClusterConfigProperty(key = "offsets.topic.num.partitions", value = 
"1"),
+new ClusterConfigProperty(key = "offsets.topic.replication.factor", value 
= "1")
+  ))
+  def testHeartbeatWithOldConsumerGroupProtocolAndOldGroupCoordinator(): Unit 
= {
+testHeartbeat()
+  }
+
+  private def testHeartbeat(): Unit = {
+// Creates the __consumer_offsets topics because it won't be created 
automatically
+// in this test because it does not use FindCoordinator API.
+createOffsetsTopic()
+
+// Create the topic.
+createTopic(
+  topic = "foo",
+  numPartitions = 3
+)
+
+for (version <- ApiKeys.HEARTBEAT.oldestVersion() to 
ApiKeys.HEARTBEAT.latestVersion(isUnstableApiEnabled)) {
+  val metadata = ConsumerProtocol.serializeSubscription(
+new 
ConsumerPartitionAssignor.Subscription(Collections.singletonList("foo"))
+  ).array
+
+  val (leaderMemberId, leaderEpoch) = 
joinDynamicConsumerGroupWithOldProtocol(
+groupId = "grp",
+metadata = metadata,
+completeRebalance = false
+  )
+
+  // Heartbeat with unknown group id and unknown member id.
+  heartbeat(
+groupId = "grp-unknown",
+memberId = "member-id-unknown",
+generationId = -1,
+expectedError = Errors.UNKNOWN_MEMBER_ID,
+version = version.toShort
+  )
+
+  // Heartbeat with unknown group id.
+  heartbeat(
+groupId = "grp-unknown",
+memberId = leaderMemberId,
+generationId = -1,
+expectedError = Errors.UNKNOWN_MEMBER_ID,
+version = version.toShort
+  )
+
+  // Heartbeat with unknown member id.
+  heartbeat(
+groupId = "grp",
+memberId = "member-id-unknown",
+generationId = -1,
+expectedError = Errors.UNKNOWN_MEMBER_ID,
+version = version.toShort
+  )
+
+  // Heartbeat with unmatched generation id.
+  heartbeat(
+groupId = "grp",
+memberId = leaderMemberId,
+generationId = -1,
+expectedError = Errors.ILLEGAL_GENERATION,
+version = version.toShort
+  )
+
+  // Heartbeat COMPLETING_REBALANCE group.
+  heartbeat(
+groupId = "grp",
+memberId = leaderMemberId,
+generationId = leaderEpoch,
+version = version.toShort
+  )
+
+

Re: [PR] KAFKA-15363: Broker log directory failure changes [kafka]

2023-11-20 Thread via GitHub


OmniaGM commented on code in PR #14790:
URL: https://github.com/apache/kafka/pull/14790#discussion_r1399564930


##
core/src/main/java/kafka/server/AssignmentsManager.java:
##
@@ -210,6 +220,9 @@ public void run() throws Exception {
 channelManager.sendRequest(new AssignReplicasToDirsRequest.Builder(
 buildRequestData(brokerId, brokerEpochSupplier.get(), 
assignment)),
 new AssignReplicasToDirsRequestCompletionHandler());
+inflight.values().stream()
+.filter(assignmentEvent -> assignmentEvent.callback != 
null)
+.forEach(assignmentEvent -> 
assignmentEvent.callback.accept(DirectoryEventRequestState.DISPATCHED));

Review Comment:
   For dispatched this is true but for queued I think we still need this so we 
don't send the reassignment multiple times to AssignmentManager. Especially 
that as far as I see the manager isn't idempotent.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-15417 flip joinSpuriousLookBackTimeMs and emit non-joined items [kafka]

2023-11-20 Thread via GitHub


lihaosky commented on code in PR #14426:
URL: https://github.com/apache/kafka/pull/14426#discussion_r1399561823


##
streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamKStreamOuterJoinTest.java:
##
@@ -791,7 +791,7 @@ public void 
shouldNotEmitLeftJoinResultForAsymmetricBeforeWindow() {
 inputTopic2.pipeInput(expectedKeys[2], "a" + expectedKeys[2], 
time);
 
 processor.checkAndClearProcessResult(
-new KeyValueTimestamp<>(1, "null+a1", 102L)
+new KeyValueTimestamp<>(1, "null+a1", 102L)

Review Comment:
   revert this?



##
streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamKStreamOuterJoinTest.java:
##
@@ -878,16 +880,18 @@ public void 
shouldNotEmitLeftJoinResultForAsymmetricAfterWindow() {
 
 processor.checkAndClearProcessResult();
 
-// push one item to the first stream; this should produce one 
full-join item
+// push one item to the first stream;
+// this should produce one inner-join item;
+// and a right-joined item for a3
 // w1 = { 0:A0 (ts: 0), 1:A1 (ts: 1) }
 // w2 = { 1:a1 (ts: 1), 2:a2 (ts: 101), 3:a3 (ts: 101) }
 // --> w1 = { 0:A0 (ts: 0), 1:A1 (ts: 1), 2:A2 (ts: 201) }
-// --> w2 = { 1:a1 (ts: 1), 2:a2 (ts: 101), 3:a3 (ts: 101 }
+// --> w2 = { 1:a1 (ts: 1), 2:a2 (ts: 101), 3:a3 (ts: 101) }
 time += 100;
 inputTopic1.pipeInput(expectedKeys[2], "A" + expectedKeys[2], 
time);
 
 processor.checkAndClearProcessResult(
-new KeyValueTimestamp<>(2, "A2+a2", 201L)
+new KeyValueTimestamp<>(2, "A2+a2", 201L)

Review Comment:
   ditto



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-15363: Broker log directory failure changes [kafka]

2023-11-20 Thread via GitHub


OmniaGM commented on code in PR #14790:
URL: https://github.com/apache/kafka/pull/14790#discussion_r1399559787


##
core/src/main/java/kafka/server/AssignmentsManager.java:
##
@@ -146,6 +153,9 @@ public void run() throws Exception {
 log.debug("Received new assignment {}", this);
 }
 pending.put(partition, this);
+if (callback != null) {
+callback.accept(DirectoryEventRequestState.QUEUED);
+}

Review Comment:
   I think the Dispatched state is the questionable one. We may not need it and 
we keep the reassignment queued until it it is completed 



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-15363: Broker log directory failure changes [kafka]

2023-11-20 Thread via GitHub


OmniaGM commented on code in PR #14790:
URL: https://github.com/apache/kafka/pull/14790#discussion_r1399558750


##
core/src/main/java/kafka/server/AssignmentsManager.java:
##
@@ -146,6 +153,9 @@ public void run() throws Exception {
 log.debug("Received new assignment {}", this);
 }
 pending.put(partition, this);
+if (callback != null) {
+callback.accept(DirectoryEventRequestState.QUEUED);
+}

Review Comment:
   We need to mark the request has been sent to we don't send it again and just 
wait on the state to be `COMPLETED`



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-15681: Add support of client-metrics in kafka-configs.sh (KIP-714) [kafka]

2023-11-20 Thread via GitHub


junrao commented on PR #14632:
URL: https://github.com/apache/kafka/pull/14632#issuecomment-1819538489

   @apoorvmittal10 : Thanks for triaging the failed tests. There is still no 
green build though.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



  1   2   >