[jira] [Updated] (KAFKA-14952) Publish metrics when source connector fails to poll data

2023-04-28 Thread Ravindranath Kakarla (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-14952?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Ravindranath Kakarla updated KAFKA-14952:
-
Description: 
Currently, there is no metric in Kafka Connect to track when a source connector 
fails to poll data from the source. This information would be useful to 
operators and developers to visualize, monitor and alert when the connector 
fails to poll records from the source.

Existing metrics like *kafka_producer_producer_metrics_record_error_total* and 
*kafka_connect_task_error_metrics_total_record_failures* only cover failures 
when producing data to the Kafka cluster but not when the source task fails 
with a retryable exception or ConnectException.

Polling from source can fail due to unavailability of the source system or 
errors with the connect configuration. Currently, this cannot be monitored 
directly using metrics and instead operators have to rely on log diving which 
is not consistent with how other metrics are monitored.

I propose adding new metrics to Kafka Connect, 
"{_}source-record-poll-error-total{_}" and 
"{_}source-record-poll-error-rate{_}" that can be used to monitor failures 
during polling.

*source-record-poll-error-total* - The total number of times a source connector 
failed to poll data from the source. This will include both retryable and 
non-retryable exceptions.

*source-record-poll-error-rate* - The rate of above failures per unit of time.

These metrics would be tracked at the connector level and could be exposed 
through the JMX along with the other metrics.

I am willing to submit a PR if this looks good, sample implementation code 
below,
{code:java}
//AbstractWorkerSourceTask.java

protected List poll() throws InterruptedException {
try {
return task.poll();
} catch (RetriableException | 
org.apache.kafka.common.errors.RetriableException e) {
log.warn("{} failed to poll records from SourceTask. Will retry 
operation.", this, e);
  
 sourceTaskMetricsGroup.recordPollError();

// Do nothing. Let the framework poll whenever it's ready.
return null;
} catch (Throwable e) {
sourceTaskMetricsGroup.recordPollError();

throw e;
}
} {code}
[Reference|https://github.com/apache/kafka/blob/trunk/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/AbstractWorkerSourceTask.java#L460]

 

 

  was:
Currently, there is no metric in Kafka Connect to track when a source connector 
fails to poll data from the source. This information would be useful to 
operators and developers to visualize, monitor and alert when the connector 
fails to poll records from the source.

Existing metrics like *kafka_producer_producer_metrics_record_error_total* and 
*kafka_connect_task_error_metrics_total_record_failures* only cover failures 
when producing data to the Kafka cluster but not when the source task fails 
with a retryable exception or ConnectException.

Polling from source can fail due to unavailability of the source system or 
errors with the connect configuration. Currently, this cannot be monitored 
directly using metrics and instead operators have to rely on log diving which 
is not consistent with how other metrics are monitored.

I propose adding new metrics to Kafka Connect, "source-record-poll-error-total" 
and "source-record-poll-error-rate" that can be used to monitor failures during 
polling.

_*source-record-poll-error-total*_ - The total number of times a source 
connector failed to poll data from the source. This will include both retryable 
and non-retryable exceptions.

_*source-record-poll-error-rate*_ - The rate of above failures per unit of time.

These metrics would be tracked at the connector level and could be exposed 
through the JMX along with the other metrics.

I am willing to submit a PR if this looks good, sample implementation code 
below,

 
{code:java}
//AbstractWorkerSourceTask.java

protected List poll() throws InterruptedException {
try {
return task.poll();
} catch (RetriableException | 
org.apache.kafka.common.errors.RetriableException e) {
log.warn("{} failed to poll records from SourceTask. Will retry 
operation.", this, e);
  
 sourceTaskMetricsGroup.recordPollError();

// Do nothing. Let the framework poll whenever it's ready.
return null;
} catch (Throwable e) {
sourceTaskMetricsGroup.recordPollError();

throw e;
}
} {code}
[Reference|https://github.com/apache/kafka/blob/trunk/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/AbstractWorkerSourceTask.java#L460]

 

 


> Publish metrics when source connector fails to poll data
> 
>
> Key: KAFKA-14952
> URL: https://issues.apache.org/jira/browse/KAFKA-14952
> Project: Kafka
>  Issue 

[jira] [Updated] (KAFKA-14952) Publish metrics when source connector fails to poll data

2023-04-28 Thread Ravindranath Kakarla (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-14952?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Ravindranath Kakarla updated KAFKA-14952:
-
Description: 
Currently, there is no metric in Kafka Connect to track when a source connector 
fails to poll data from the source. This information would be useful to 
operators and developers to visualize, monitor and alert when the connector 
fails to poll records from the source.

Existing metrics like *kafka_producer_producer_metrics_record_error_total* and 
*kafka_connect_task_error_metrics_total_record_failures* only cover failures 
when producing data to the Kafka cluster but not when the source task fails 
with a retryable exception or ConnectException.

Polling from source can fail due to unavailability of the source system or 
errors with the connect configuration. Currently, this cannot be monitored 
directly using metrics and instead operators have to rely on log diving which 
is not consistent with how other metrics are monitored.

I propose adding new metrics to Kafka Connect, "source-record-poll-error-total" 
and "source-record-poll-error-rate" that can be used to monitor failures during 
polling.

_*source-record-poll-error-total*_ - The total number of times a source 
connector failed to poll data from the source. This will include both retryable 
and non-retryable exceptions.

_*source-record-poll-error-rate*_ - The rate of above failures per unit of time.

These metrics would be tracked at the connector level and could be exposed 
through the JMX along with the other metrics.

I am willing to submit a PR if this looks good, sample implementation code 
below,

 
{code:java}
//AbstractWorkerSourceTask.java

protected List poll() throws InterruptedException {
try {
return task.poll();
} catch (RetriableException | 
org.apache.kafka.common.errors.RetriableException e) {
log.warn("{} failed to poll records from SourceTask. Will retry 
operation.", this, e);
  
 sourceTaskMetricsGroup.recordPollError();

// Do nothing. Let the framework poll whenever it's ready.
return null;
} catch (Throwable e) {
sourceTaskMetricsGroup.recordPollError();

throw e;
}
} {code}
[Reference|https://github.com/apache/kafka/blob/trunk/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/AbstractWorkerSourceTask.java#L460]

 

 

  was:
Currently, there is no metric in Kafka Connect to track when a source connector 
fails to poll data from the source. This information would be useful to 
operators and developers to visualize, monitor and alert when the connector 
fails to poll records from the source.

Existing metrics like `kafka_producer_producer_metrics_record_error_total` and 

`kafka_connect_task_error_metrics_total_record_failures` only cover failures 
when producing data to the Kafka cluster but not when the source task fails 
with a retryable exception or ConnectException.

Polling from source can fail due to unavailability of the source system or 
errors with the connect configuration. Currently, this cannot be monitored 
directly using metrics and instead operators have to rely on log diving which 
is not consistent with how other metrics are monitored.

I propose adding new metrics to Kafka Connect, "source-record-poll-error-total" 
and "source-record-poll-error-rate" that can be used to monitor failures during 
polling.

`source-record-poll-error-total` - The total number of times a source connector 
failed to poll data from the source. This will include both retryable and 
non-retryable exceptions.

`source-record-poll-error-rate` - The rate of above failures per unit of time. 

These metrics would be tracked at the connector level and could be exposed 
through the JMX along with the other metrics.

I am willing to submit a PR if this looks good, sample implementation code 
below,

 
{code:java}
//AbstractWorkerSourceTask.java

protected List poll() throws InterruptedException {
try {
return task.poll();
} catch (RetriableException | 
org.apache.kafka.common.errors.RetriableException e) {
log.warn("{} failed to poll records from SourceTask. Will retry 
operation.", this, e);
  
 sourceTaskMetricsGroup.recordPollError();

// Do nothing. Let the framework poll whenever it's ready.
return null;
} catch (Throwable e) {
sourceTaskMetricsGroup.recordPollError();

throw e;
}
} {code}
 

 

 


> Publish metrics when source connector fails to poll data
> 
>
> Key: KAFKA-14952
> URL: https://issues.apache.org/jira/browse/KAFKA-14952
> Project: Kafka
>  Issue Type: Improvement
>  Components: KafkaConnect
>Affects Versions: 3.3.2
>Reporter: Ravindranath Kakarla
>Priority: Minor
> 

[GitHub] [kafka] kirktrue commented on pull request #13591: KAFKA-14831: Illegal state errors should be fatal in transactional producer

2023-04-28 Thread via GitHub


kirktrue commented on PR #13591:
URL: https://github.com/apache/kafka/pull/13591#issuecomment-1528441772

   Temporarily marking this as a draft as I've committed some code that is not 
fully vetted yet.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[jira] [Created] (KAFKA-14952) Publish metrics when source connector fails to poll data

2023-04-28 Thread Ravindranath Kakarla (Jira)
Ravindranath Kakarla created KAFKA-14952:


 Summary: Publish metrics when source connector fails to poll data
 Key: KAFKA-14952
 URL: https://issues.apache.org/jira/browse/KAFKA-14952
 Project: Kafka
  Issue Type: Improvement
  Components: KafkaConnect
Affects Versions: 3.3.2
Reporter: Ravindranath Kakarla


Currently, there is no metric in Kafka Connect to track when a source connector 
fails to poll data from the source. This information would be useful to 
operators and developers to visualize, monitor and alert when the connector 
fails to poll records from the source.

Existing metrics like `kafka_producer_producer_metrics_record_error_total` and 

`kafka_connect_task_error_metrics_total_record_failures` only cover failures 
when producing data to the Kafka cluster but not when the source task fails 
with a retryable exception or ConnectException.

Polling from source can fail due to unavailability of the source system or 
errors with the connect configuration. Currently, this cannot be monitored 
directly using metrics and instead operators have to rely on log diving which 
is not consistent with how other metrics are monitored.

I propose adding new metrics to Kafka Connect, "source-record-poll-error-total" 
and "source-record-poll-error-rate" that can be used to monitor failures during 
polling.

`source-record-poll-error-total` - The total number of times a source connector 
failed to poll data from the source. This will include both retryable and 
non-retryable exceptions.

`source-record-poll-error-rate` - The rate of above failures per unit of time. 

These metrics would be tracked at the connector level and could be exposed 
through the JMX along with the other metrics.

I am willing to submit a PR if this looks good, sample implementation code 
below,

 
{code:java}
//AbstractWorkerSourceTask.java

protected List poll() throws InterruptedException {
try {
return task.poll();
} catch (RetriableException | 
org.apache.kafka.common.errors.RetriableException e) {
log.warn("{} failed to poll records from SourceTask. Will retry 
operation.", this, e);
  
 sourceTaskMetricsGroup.recordPollError();

// Do nothing. Let the framework poll whenever it's ready.
return null;
} catch (Throwable e) {
sourceTaskMetricsGroup.recordPollError();

throw e;
}
} {code}
 

 

 



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[GitHub] [kafka] kirktrue commented on a diff in pull request #13591: KAFKA-14831: Illegal state errors should be fatal in transactional producer

2023-04-28 Thread via GitHub


kirktrue commented on code in PR #13591:
URL: https://github.com/apache/kafka/pull/13591#discussion_r1180888455


##
clients/src/main/java/org/apache/kafka/clients/producer/internals/TransactionManager.java:
##
@@ -384,14 +436,19 @@ synchronized boolean isAborting() {
 }
 
 synchronized void transitionToAbortableError(RuntimeException exception) {

Review Comment:
   It's only used in `TransactionManagerTest`. I didn't want to expose the 
`InvalidStateTransitionHandler` quirkiness the outside world (even the tests).
   
   I went back and forth on this decision more than a couple of times. I'd 
certainly welcome your input.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] jolshan commented on pull request #13591: KAFKA-14831: Illegal state errors should be fatal in transactional producer

2023-04-28 Thread via GitHub


jolshan commented on PR #13591:
URL: https://github.com/apache/kafka/pull/13591#issuecomment-1528256534

   @kirktrue do we have a list of transitions we consider internal vs external? 
It would be nice to review that list as well as the code.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] jolshan commented on a diff in pull request #13591: KAFKA-14831: Illegal state errors should be fatal in transactional producer

2023-04-28 Thread via GitHub


jolshan commented on code in PR #13591:
URL: https://github.com/apache/kafka/pull/13591#discussion_r1180886011


##
clients/src/main/java/org/apache/kafka/clients/producer/internals/TransactionManager.java:
##
@@ -384,14 +436,19 @@ synchronized boolean isAborting() {
 }
 
 synchronized void transitionToAbortableError(RuntimeException exception) {

Review Comment:
   I guess I will need to manually check if the usages are external  



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] jolshan commented on a diff in pull request #13591: KAFKA-14831: Illegal state errors should be fatal in transactional producer

2023-04-28 Thread via GitHub


jolshan commented on code in PR #13591:
URL: https://github.com/apache/kafka/pull/13591#discussion_r1180884390


##
clients/src/main/java/org/apache/kafka/clients/producer/internals/TransactionManager.java:
##
@@ -384,14 +436,19 @@ synchronized boolean isAborting() {
 }
 
 synchronized void transitionToAbortableError(RuntimeException exception) {

Review Comment:
   Do we ever use this version of the method? Ditto to some of the others we've 
changed



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] jolshan commented on a diff in pull request #13591: KAFKA-14831: Illegal state errors should be fatal in transactional producer

2023-04-28 Thread via GitHub


jolshan commented on code in PR #13591:
URL: https://github.com/apache/kafka/pull/13591#discussion_r1180884390


##
clients/src/main/java/org/apache/kafka/clients/producer/internals/TransactionManager.java:
##
@@ -384,14 +436,19 @@ synchronized boolean isAborting() {
 }
 
 synchronized void transitionToAbortableError(RuntimeException exception) {

Review Comment:
   Do we ever use this version of the method?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] jolshan commented on a diff in pull request #13591: KAFKA-14831: Illegal state errors should be fatal in transactional producer

2023-04-28 Thread via GitHub


jolshan commented on code in PR #13591:
URL: https://github.com/apache/kafka/pull/13591#discussion_r1180883557


##
clients/src/main/java/org/apache/kafka/clients/producer/internals/TransactionManager.java:
##
@@ -384,14 +436,19 @@ synchronized boolean isAborting() {
 }
 
 synchronized void transitionToAbortableError(RuntimeException exception) {
+transitionToAbortableError(exception, 
InvalidStateTransitionHandler.THROW_EXCEPTION);
+}
+
+private synchronized void transitionToAbortableError(RuntimeException 
exception,
+ InvalidStateTransitionHandler 
invalidStateTransitionHandler) {

Review Comment:
   nit: can we line up the parameters?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] kirktrue commented on a diff in pull request #13591: KAFKA-14831: Illegal state errors should be fatal in transactional producer

2023-04-28 Thread via GitHub


kirktrue commented on code in PR #13591:
URL: https://github.com/apache/kafka/pull/13591#discussion_r1180881150


##
clients/src/main/java/org/apache/kafka/clients/producer/internals/TransactionManager.java:
##
@@ -196,6 +196,44 @@ private enum Priority {
 }
 }
 
+/**
+ * State transitions originate from one of two sources:
+ *
+ * 
+ * External to the transaction manager. For example, the 
user performs an API call on the
+ * {@link org.apache.kafka.clients.producer.Producer producer} 
that is related to transactional management.
+ * 
+ * Internal to the transaction manager. These transitions 
occur from within the transaction
+ * manager, for example when handling responses from the broker 
for transactions.
+ * 
+ * 
+ *
+ * When an invalid state transition is attempted, the logic 
related to handling that situation may
+ * differ depending on the source of the state transition. This interface 
allows the caller to provide the desired
+ * logic for handling that invalid state transition attempt as appropriate 
for the situation.
+ *
+ * 
+ *
+ * When the state transition is being attempted on behalf of an 
external source, we want to continue to
+ * throw an {@link IllegalStateException} as this is the existing 
behavior. This gives the user the opportunity
+ * to fix the issue without permanently poisoning the state of the 
transaction manager.
+ *
+ * 
+ *
+ * When the state transition is being attempted on behalf of an 
external source, we want to continue to
+ * throw an {@link IllegalStateException} as this is the existing 
behavior. This gives the user the opportunity
+ * to fix the issue without permanently poisoning the state of the 
transaction manager.
+ *
+ * 
+ *
+ * See KAFKA-14831 for more detail.
+ */
+private enum InvalidStateTransitionHandler {
+

Review Comment:
   Will remove.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] jolshan commented on a diff in pull request #13591: KAFKA-14831: Illegal state errors should be fatal in transactional producer

2023-04-28 Thread via GitHub


jolshan commented on code in PR #13591:
URL: https://github.com/apache/kafka/pull/13591#discussion_r1180880988


##
clients/src/main/java/org/apache/kafka/clients/producer/internals/TransactionManager.java:
##
@@ -196,6 +196,44 @@ private enum Priority {
 }
 }
 
+/**
+ * State transitions originate from one of two sources:
+ *
+ * 
+ * External to the transaction manager. For example, the 
user performs an API call on the
+ * {@link org.apache.kafka.clients.producer.Producer producer} 
that is related to transactional management.
+ * 
+ * Internal to the transaction manager. These transitions 
occur from within the transaction
+ * manager, for example when handling responses from the broker 
for transactions.
+ * 
+ * 
+ *
+ * When an invalid state transition is attempted, the logic 
related to handling that situation may
+ * differ depending on the source of the state transition. This interface 
allows the caller to provide the desired
+ * logic for handling that invalid state transition attempt as appropriate 
for the situation.
+ *
+ * 
+ *
+ * When the state transition is being attempted on behalf of an 
external source, we want to continue to
+ * throw an {@link IllegalStateException} as this is the existing 
behavior. This gives the user the opportunity
+ * to fix the issue without permanently poisoning the state of the 
transaction manager.
+ *
+ * 
+ *
+ * When the state transition is being attempted on behalf of an 
external source, we want to continue to
+ * throw an {@link IllegalStateException} as this is the existing 
behavior. This gives the user the opportunity
+ * to fix the issue without permanently poisoning the state of the 
transaction manager.
+ *
+ * 
+ *
+ * See KAFKA-14831 for more detail.
+ */
+private enum InvalidStateTransitionHandler {
+

Review Comment:
   nit: I don't know if we need the empty lines in 232 and 234. However, we 
typically list enums one per line.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] kirktrue commented on a diff in pull request #13591: KAFKA-14831: Illegal state errors should be fatal in transactional producer

2023-04-28 Thread via GitHub


kirktrue commented on code in PR #13591:
URL: https://github.com/apache/kafka/pull/13591#discussion_r1180881100


##
clients/src/main/java/org/apache/kafka/clients/producer/internals/TransactionManager.java:
##
@@ -196,6 +196,44 @@ private enum Priority {
 }
 }
 
+/**
+ * State transitions originate from one of two sources:
+ *
+ * 
+ * External to the transaction manager. For example, the 
user performs an API call on the
+ * {@link org.apache.kafka.clients.producer.Producer producer} 
that is related to transactional management.
+ * 
+ * Internal to the transaction manager. These transitions 
occur from within the transaction
+ * manager, for example when handling responses from the broker 
for transactions.
+ * 
+ * 
+ *
+ * When an invalid state transition is attempted, the logic 
related to handling that situation may
+ * differ depending on the source of the state transition. This interface 
allows the caller to provide the desired
+ * logic for handling that invalid state transition attempt as appropriate 
for the situation.
+ *
+ * 
+ *
+ * When the state transition is being attempted on behalf of an 
external source, we want to continue to
+ * throw an {@link IllegalStateException} as this is the existing 
behavior. This gives the user the opportunity
+ * to fix the issue without permanently poisoning the state of the 
transaction manager.
+ *
+ * 
+ *
+ * When the state transition is being attempted on behalf of an 
external source, we want to continue to

Review Comment:
   Yeah, probably. The comment isn't so beautiful now, is it?  



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] jolshan commented on a diff in pull request #13591: KAFKA-14831: Illegal state errors should be fatal in transactional producer

2023-04-28 Thread via GitHub


jolshan commented on code in PR #13591:
URL: https://github.com/apache/kafka/pull/13591#discussion_r1180880696


##
clients/src/main/java/org/apache/kafka/clients/producer/internals/TransactionManager.java:
##
@@ -196,6 +196,44 @@ private enum Priority {
 }
 }
 
+/**
+ * State transitions originate from one of two sources:
+ *
+ * 
+ * External to the transaction manager. For example, the 
user performs an API call on the
+ * {@link org.apache.kafka.clients.producer.Producer producer} 
that is related to transactional management.
+ * 
+ * Internal to the transaction manager. These transitions 
occur from within the transaction
+ * manager, for example when handling responses from the broker 
for transactions.
+ * 
+ * 
+ *
+ * When an invalid state transition is attempted, the logic 
related to handling that situation may
+ * differ depending on the source of the state transition. This interface 
allows the caller to provide the desired
+ * logic for handling that invalid state transition attempt as appropriate 
for the situation.
+ *
+ * 
+ *
+ * When the state transition is being attempted on behalf of an 
external source, we want to continue to
+ * throw an {@link IllegalStateException} as this is the existing 
behavior. This gives the user the opportunity
+ * to fix the issue without permanently poisoning the state of the 
transaction manager.
+ *
+ * 
+ *
+ * When the state transition is being attempted on behalf of an 
external source, we want to continue to

Review Comment:
   Minor nit: this seems to be a repeat paragraph.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] jolshan commented on a diff in pull request #13591: KAFKA-14831: Illegal state errors should be fatal in transactional producer

2023-04-28 Thread via GitHub


jolshan commented on code in PR #13591:
URL: https://github.com/apache/kafka/pull/13591#discussion_r1180880417


##
clients/src/main/java/org/apache/kafka/clients/producer/internals/TransactionManager.java:
##
@@ -196,6 +196,44 @@ private enum Priority {
 }
 }
 
+/**
+ * State transitions originate from one of two sources:

Review Comment:
   This is a beautiful comment. I really appreciate clarity here :) 



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] jolshan commented on pull request #13607: KAFKA-14916: Fix code that assumes transactional ID implies all records are transactional

2023-04-28 Thread via GitHub


jolshan commented on PR #13607:
URL: https://github.com/apache/kafka/pull/13607#issuecomment-1528168395

   One concern that came up when I was thinking about this PR is if we can have 
a produce request with more than one producer ID when we overflow epoch. 
Basically, I need to confirm we flush the accumulator.
   
   Alternatively, we can allow for more than one producer ID -- the main 
concern was that the verification could send the wrong one and get invalid 
producer ID mapping. Then we would have to retry. 


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] kirktrue commented on pull request #13568: KAFKA-14906:Extract the coordinator service log from server log

2023-04-28 Thread via GitHub


kirktrue commented on PR #13568:
URL: https://github.com/apache/kafka/pull/13568#issuecomment-1528167503

   Logging configuration is something the administrator can change as desired, 
right? Is there a clear benefit to the majority of users for this change? I can 
imagine it being a welcome change for some and an unexpected burden for others.
   
   Note: since I am not a Kafka administrator, feel free to ignore my comment 


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] jolshan commented on a diff in pull request #13607: KAFKA-14916: Fix code that assumes transactional ID implies all records are transactional

2023-04-28 Thread via GitHub


jolshan commented on code in PR #13607:
URL: https://github.com/apache/kafka/pull/13607#discussion_r1180865066


##
clients/src/main/java/org/apache/kafka/common/requests/ProduceRequest.java:
##
@@ -222,6 +223,27 @@ public void clearPartitionRecords() {
 partitionSizes();
 data = null;
 }
+
+public static void validateProducerIds(short version, ProduceRequestData 
data) {
+if (version >= 3) {

Review Comment:
   In producerequest.json:
   >   // Version 3 adds the transactional ID, which is used for authorization 
when attempting to write
 // transactional data.  Version 3 also adds support for Kafka Message 
Format v2.
 
Basically before version 3, these fields aren't used.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] jolshan commented on a diff in pull request #13607: KAFKA-14916: Fix code that assumes transactional ID implies all records are transactional

2023-04-28 Thread via GitHub


jolshan commented on code in PR #13607:
URL: https://github.com/apache/kafka/pull/13607#discussion_r1180864382


##
core/src/main/scala/kafka/server/ReplicaManager.scala:
##
@@ -642,7 +642,14 @@ class ReplicaManager(val config: KafkaConfig,
   (entriesPerPartition, Map.empty)
 else
   entriesPerPartition.partition { case (topicPartition, records) =>
-
getPartitionOrException(topicPartition).hasOngoingTransaction(records.firstBatch().producerId())
+// Produce requests (only requests that require verification) 
should only have one batch in "batches" but check all just to be safe.
+val transactionalBatches = records.batches.asScala.filter(batch => 
batch.hasProducerId && batch.isTransactional)

Review Comment:
   it would be the other way around, we can have a producer id but not be 
transactional with idempotent producers.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] jolshan commented on a diff in pull request #13607: KAFKA-14916: Fix code that assumes transactional ID implies all records are transactional

2023-04-28 Thread via GitHub


jolshan commented on code in PR #13607:
URL: https://github.com/apache/kafka/pull/13607#discussion_r1180864129


##
clients/src/main/java/org/apache/kafka/common/requests/ProduceRequest.java:
##
@@ -222,6 +223,27 @@ public void clearPartitionRecords() {
 partitionSizes();
 data = null;
 }
+
+public static void validateProducerIds(short version, ProduceRequestData 
data) {
+if (version >= 3) {
+long producerId = -1;
+for (ProduceRequestData.TopicProduceData topicData : 
data.topicData()) {
+for (ProduceRequestData.PartitionProduceData partitionData : 
topicData.partitionData()) {
+BaseRecords baseRecords = partitionData.records();
+if (baseRecords instanceof Records) {
+Records records = (Records) baseRecords;
+for (RecordBatch batch : records.batches()) {
+if (producerId == -1 && batch.hasProducerId())

Review Comment:
   Based on some of the ProduceRequest tests I found, we allow for some batches 
with and some without. 
   See `testMixedIdempotentData`



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] kirktrue commented on pull request #13581: A document on the current usage of Kafka project is added

2023-04-28 Thread via GitHub


kirktrue commented on PR #13581:
URL: https://github.com/apache/kafka/pull/13581#issuecomment-1528144605

   @SijoJoseph2002 just for my own understanding, did you intend to change this 
much source?


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] kirktrue commented on a diff in pull request #13605: KAFKA-14950: implement assign() and assignment()

2023-04-28 Thread via GitHub


kirktrue commented on code in PR #13605:
URL: https://github.com/apache/kafka/pull/13605#discussion_r1180848020


##
clients/src/main/java/org/apache/kafka/clients/consumer/internals/PrototypeAsyncConsumer.java:
##
@@ -537,7 +568,9 @@ public void subscribe(Pattern pattern) {
 
 @Override
 public void unsubscribe() {
-throw new KafkaException("method not implemented");
+// 
fetcher.clearBufferedDataForUnassignedPartitions(Collections.emptySet());

Review Comment:
   I'm wondering if we want to push the clearing of buffered data and 
`SubscriptionState` mutation into the background thread so that it's executed 
together?
   
   The coordinator's `onLeavePrepare` needs to know what partitions are being 
removed. As it's written, it's possible that the set of partitions could be 
removed before the background thread gets a chance to run the `onLeavePrepare` 
logic. 



##
clients/src/main/java/org/apache/kafka/clients/consumer/internals/PrototypeAsyncConsumer.java:
##
@@ -522,7 +525,35 @@ public void subscribe(Collection topics, 
ConsumerRebalanceListener callb
 
 @Override
 public void assign(Collection partitions) {
-throw new KafkaException("method not implemented");
+if (partitions == null) {
+throw new IllegalArgumentException("Topic partitions collection to 
assign to cannot be null");
+}
+
+if (partitions.isEmpty()) {
+this.unsubscribe();
+return;
+}
+
+for (TopicPartition tp : partitions) {
+String topic = (tp != null) ? tp.topic() : null;
+if (Utils.isBlank(topic))
+throw new IllegalArgumentException("Topic partitions to assign 
to cannot have null or empty topic");
+}
+// TODO: implement fetcher
+// fetcher.clearBufferedDataForUnassignedPartitions(partitions);
+
+// make sure the offsets of topic partitions the consumer is 
unsubscribing from
+// are committed since there will be no following rebalance
+commit(subscriptions.allConsumed());
+
+log.info("Assigned to partition(s): {}", Utils.join(partitions, ", "));
+if (this.subscriptions.assignFromUser(new HashSet<>(partitions)))
+   updateMetadata(time.milliseconds());
+}
+
+private void updateMetadata(long milliseconds) {

Review Comment:
   Nitpicky: can we fold `updateMetadata` into `assign`?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] kirktrue commented on a diff in pull request #13607: KAFKA-14916: Fix code that assumes transactional ID implies all records are transactional

2023-04-28 Thread via GitHub


kirktrue commented on code in PR #13607:
URL: https://github.com/apache/kafka/pull/13607#discussion_r1180838837


##
clients/src/test/java/org/apache/kafka/common/requests/ProduceRequestTest.java:
##
@@ -238,6 +238,37 @@ public void testV6AndBelowCannotUseZStdCompression() {
 // Works fine with current version (>= 7)
 ProduceRequest.forCurrentMagic(produceData);
 }
+
+@Test
+public void testNoMixedProducerIds() {
+final long producerId1 = 15L;
+final long producerId2 = 16L;
+final short producerEpoch = 5;
+final int sequence = 10;
+
+final MemoryRecords records = 
MemoryRecords.withRecords(CompressionType.NONE,
+new SimpleRecord("foo".getBytes()));
+final MemoryRecords txnRecords = 
MemoryRecords.withTransactionalRecords(CompressionType.NONE, producerId1,
+producerEpoch, sequence, new SimpleRecord("bar".getBytes()));
+final MemoryRecords idempotentRecords = 
MemoryRecords.withIdempotentRecords(CompressionType.NONE, producerId2,
+producerEpoch, sequence, new SimpleRecord("bee".getBytes()));
+
+
+ProduceRequest.Builder requestBuilder = ProduceRequest.forCurrentMagic(
+new ProduceRequestData()
+.setTopicData(new 
ProduceRequestData.TopicProduceDataCollection(Arrays.asList(
+new 
ProduceRequestData.TopicProduceData().setName("foo").setPartitionData(Collections.singletonList(
+new 
ProduceRequestData.PartitionProduceData().setIndex(0).setRecords(records))),
+new 
ProduceRequestData.TopicProduceData().setName("bar").setPartitionData(Collections.singletonList(
+new 
ProduceRequestData.PartitionProduceData().setIndex(1).setRecords(txnRecords))),
+new 
ProduceRequestData.TopicProduceData().setName("bee").setPartitionData(Collections.singletonList(

Review Comment:
   Please change `bee` to `baz` 



##
clients/src/main/java/org/apache/kafka/common/requests/ProduceRequest.java:
##
@@ -222,6 +223,27 @@ public void clearPartitionRecords() {
 partitionSizes();
 data = null;
 }
+
+public static void validateProducerIds(short version, ProduceRequestData 
data) {
+if (version >= 3) {
+long producerId = -1;
+for (ProduceRequestData.TopicProduceData topicData : 
data.topicData()) {
+for (ProduceRequestData.PartitionProduceData partitionData : 
topicData.partitionData()) {
+BaseRecords baseRecords = partitionData.records();
+if (baseRecords instanceof Records) {
+Records records = (Records) baseRecords;
+for (RecordBatch batch : records.batches()) {
+if (producerId == -1 && batch.hasProducerId())

Review Comment:
   Is it an error if there's one batch with producer ID and another without one?



##
core/src/main/scala/kafka/server/ReplicaManager.scala:
##
@@ -642,7 +642,14 @@ class ReplicaManager(val config: KafkaConfig,
   (entriesPerPartition, Map.empty)
 else
   entriesPerPartition.partition { case (topicPartition, records) =>
-
getPartitionOrException(topicPartition).hasOngoingTransaction(records.firstBatch().producerId())
+// Produce requests (only requests that require verification) 
should only have one batch in "batches" but check all just to be safe.
+val transactionalBatches = records.batches.asScala.filter(batch => 
batch.hasProducerId && batch.isTransactional)

Review Comment:
   Is it valid for `isTransactional` to be `true` but `hasProducerId` to be 
`false`?



##
clients/src/main/java/org/apache/kafka/common/requests/ProduceRequest.java:
##
@@ -222,6 +223,27 @@ public void clearPartitionRecords() {
 partitionSizes();
 data = null;
 }
+
+public static void validateProducerIds(short version, ProduceRequestData 
data) {
+if (version >= 3) {

Review Comment:
   For the uninitiated, can we move and/or comment why version `3` is special?



##
clients/src/test/java/org/apache/kafka/common/requests/ProduceRequestTest.java:
##
@@ -238,6 +238,37 @@ public void testV6AndBelowCannotUseZStdCompression() {
 // Works fine with current version (>= 7)
 ProduceRequest.forCurrentMagic(produceData);
 }
+
+@Test
+public void testNoMixedProducerIds() {
+final long producerId1 = 15L;
+final long producerId2 = 16L;
+final short producerEpoch = 5;
+final int sequence = 10;
+
+final MemoryRecords records = 
MemoryRecords.withRecords(CompressionType.NONE,
+new SimpleRecord("foo".getBytes()));
+final MemoryRecords 

[GitHub] [kafka] mumrah commented on pull request #13647: MINOR: fix KRaftClusterTest and KRaft integration test failure

2023-04-28 Thread via GitHub


mumrah commented on PR #13647:
URL: https://github.com/apache/kafka/pull/13647#issuecomment-1528072990

   The test failure was introduced by a commit fairly late in #13407. I did 
briefly investigate it, but couldn't reproduce it locally, so I figured it was 
existing flakiness. Basically, it's just my fault for not looking more closely 
at the test failures.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] jeffkbkim commented on a diff in pull request #13637: KAFKA-14462; [10/N] Add TargetAssignmentBuilder

2023-04-28 Thread via GitHub


jeffkbkim commented on code in PR #13637:
URL: https://github.com/apache/kafka/pull/13637#discussion_r1180707601


##
group-coordinator/src/main/java/org/apache/kafka/coordinator/group/consumer/TargetAssignmentBuilder.java:
##
@@ -0,0 +1,327 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.coordinator.group.consumer;
+
+import org.apache.kafka.common.Uuid;
+import org.apache.kafka.coordinator.group.Record;
+import org.apache.kafka.coordinator.group.assignor.AssignmentMemberSpec;
+import org.apache.kafka.coordinator.group.assignor.AssignmentSpec;
+import org.apache.kafka.coordinator.group.assignor.AssignmentTopicMetadata;
+import org.apache.kafka.coordinator.group.assignor.GroupAssignment;
+import org.apache.kafka.coordinator.group.assignor.MemberAssignment;
+import org.apache.kafka.coordinator.group.assignor.PartitionAssignor;
+import org.apache.kafka.coordinator.group.assignor.PartitionAssignorException;
+
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.Objects;
+import java.util.Optional;
+import java.util.Set;
+
+import static 
org.apache.kafka.coordinator.group.RecordHelpers.newTargetAssignmentEpochRecord;
+import static 
org.apache.kafka.coordinator.group.RecordHelpers.newTargetAssignmentRecord;
+
+/**
+ * Build a new Target Assignment based on the provided parameters. As a result,
+ * it yields the records that must be persisted to the log and the new member
+ * assignments as a map.
+ *
+ * Records are only created for members which have a new target assignment. If
+ * their assignment did not change, no new record is needed.
+ *
+ * When a member is deleted, it is assumed that its target assignment record
+ * is deleted as part of the member deletion process. In other words, this 
class
+ * does not yield a tombstone for removed members.
+ */
+public class TargetAssignmentBuilder {
+/**
+ * The assignment result returned by {{@link 
TargetAssignmentBuilder#build()}}.
+ */
+public static class TargetAssignmentResult {
+/**
+ * The records that must be applied to the __consumer_offsets
+ * topics to persist the new target assignment.
+ */
+private final List records;
+
+/**
+ * The new target assignment for all members.
+ */
+private final Map assignments;
+
+TargetAssignmentResult(
+List records,
+Map assignments
+) {
+Objects.requireNonNull(records);
+Objects.requireNonNull(assignments);
+this.records = records;
+this.assignments = assignments;
+}
+
+/**
+ * @return The records.
+ */
+public List records() {
+return records;
+}
+
+/**
+ * @return The assignments.
+ */
+public Map assignments() {
+return assignments;
+}
+}
+
+/**
+ * The group id.
+ */
+private final String groupId;
+
+/**
+ * The group epoch.
+ */
+private final int groupEpoch;
+
+/**
+ * The partition assignor used to compute the assignment.
+ */
+private final PartitionAssignor assignor;
+
+/**
+ * The members in the group.
+ */
+private Map members = Collections.emptyMap();
+
+/**
+ * The subscription metadata.
+ */
+private Map subscriptionMetadata = 
Collections.emptyMap();
+
+/**
+ * The existing target assignment.
+ */
+private Map assignments = Collections.emptyMap();
+
+/**
+ * The members which have been updated or deleted. Deleted members
+ * are signaled by a null value.
+ */
+private final Map updatedMembers = new 
HashMap<>();
+
+/**
+ * Constructs the object.
+ *
+ * @param groupId   The group id.
+ * @param groupEpochThe group epoch to compute a target assignment for.
+ * @param assignor  The assignor to use to compute the target 
assignment.
+ */
+public TargetAssignmentBuilder(
+String groupId,
+int groupEpoch,
+

[GitHub] [kafka] kirktrue commented on a diff in pull request #13617: MINOR:code optimization in QuorumController

2023-04-28 Thread via GitHub


kirktrue commented on code in PR #13617:
URL: https://github.com/apache/kafka/pull/13617#discussion_r1180727333


##
metadata/src/main/java/org/apache/kafka/controller/QuorumController.java:
##
@@ -1635,7 +1636,7 @@ private enum ImbalanceSchedule {
 /**
  * Tracks if a snapshot generate was scheduled.
  */
-private boolean generateSnapshotScheduled = false;
+private final boolean generateSnapshotScheduled;

Review Comment:
   If the value is always set to `false`, why have the variable at all? 樂



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] kirktrue commented on a diff in pull request #13623: KAFKA-14926: Remove metrics on Log Cleaner shutdown

2023-04-28 Thread via GitHub


kirktrue commented on code in PR #13623:
URL: https://github.com/apache/kafka/pull/13623#discussion_r1180722737


##
core/src/main/scala/kafka/log/LogCleaner.scala:
##
@@ -167,8 +167,20 @@ class LogCleaner(initialConfig: CleanerConfig,
*/
   def shutdown(): Unit = {
 info("Shutting down the log cleaner.")
-cleaners.foreach(_.shutdown())
-cleaners.clear()
+try {
+  cleaners.foreach(_.shutdown())
+  cleaners.clear()
+} finally {
+  remoteMetrics()
+}
+  }
+
+  def remoteMetrics(): Unit = {

Review Comment:
   Should this method name be `removeMetrics` (a `v` instead of a `t`)?



##
core/src/main/scala/kafka/log/LogCleaner.scala:
##
@@ -167,8 +167,20 @@ class LogCleaner(initialConfig: CleanerConfig,
*/
   def shutdown(): Unit = {
 info("Shutting down the log cleaner.")
-cleaners.foreach(_.shutdown())
-cleaners.clear()
+try {
+  cleaners.foreach(_.shutdown())
+  cleaners.clear()
+} finally {
+  remoteMetrics()
+}
+  }
+
+  def remoteMetrics(): Unit = {
+metricsGroup.removeMetric("max-buffer-utilization-percent")
+metricsGroup.removeMetric("cleaner-recopy-percent")
+metricsGroup.removeMetric("max-clean-time-secs")
+metricsGroup.removeMetric("max-compaction-delay-secs")
+metricsGroup.removeMetric("DeadThreadCount")

Review Comment:
   I had a similar thought. Calling them out together as constants could also 
remind future contributors that might add a metric that they need to remove the 
metric too.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] kirktrue commented on pull request #13528: KAFKA-14879: Update system tests to use latest versions

2023-04-28 Thread via GitHub


kirktrue commented on PR #13528:
URL: https://github.com/apache/kafka/pull/13528#issuecomment-1527918482

   Test failures are Kraft-related and do not appear to be caused by this 
change.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] kirktrue commented on pull request #13591: KAFKA-14831: Illegal state errors should be fatal in transactional producer

2023-04-28 Thread via GitHub


kirktrue commented on PR #13591:
URL: https://github.com/apache/kafka/pull/13591#issuecomment-1527916726

   The test failures are Kraft-related and don't appear to be caused by this 
change.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] kirktrue commented on pull request #13640: KAFKA-14937: Refactoring for client code to reduce boilerplate

2023-04-28 Thread via GitHub


kirktrue commented on PR #13640:
URL: https://github.com/apache/kafka/pull/13640#issuecomment-1527914720

   The Spotbugs validation is passing and there are no new test failures.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] jolshan commented on a diff in pull request #12149: KAFKA-13668: Retry upon missing initProducerId due to authorization error

2023-04-28 Thread via GitHub


jolshan commented on code in PR #12149:
URL: https://github.com/apache/kafka/pull/12149#discussion_r1180672745


##
clients/src/main/java/org/apache/kafka/clients/producer/internals/Sender.java:
##
@@ -300,9 +301,13 @@ void runOnce() {
 try {
 transactionManager.maybeResolveSequences();
 
+RuntimeException lastError = transactionManager.lastError();
+if (transactionManager.hasAbortableError() && 
shouldHandleAuthorizationError(lastError)) {
+return;
+}
+

Review Comment:
   Ah I see we had the abortable error check.  Ok well now we are doubly 
covered.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] jolshan commented on a diff in pull request #12149: KAFKA-13668: Retry upon missing initProducerId due to authorization error

2023-04-28 Thread via GitHub


jolshan commented on code in PR #12149:
URL: https://github.com/apache/kafka/pull/12149#discussion_r1180672146


##
clients/src/main/java/org/apache/kafka/clients/producer/internals/Sender.java:
##
@@ -300,9 +301,13 @@ void runOnce() {
 try {
 transactionManager.maybeResolveSequences();
 
+RuntimeException lastError = transactionManager.lastError();
+if (transactionManager.hasAbortableError() && 
shouldHandleAuthorizationError(lastError)) {
+return;
+}
+

Review Comment:
   Yup -- my concern is we unecessarily reset the producer to initializing in 
the fatal error case.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] ijuma commented on pull request #13647: MINOR: fix KRaftClusterTest and KRaft integration test failure

2023-04-28 Thread via GitHub


ijuma commented on PR #13647:
URL: https://github.com/apache/kafka/pull/13647#issuecomment-1527834450

   @mumrah the original PR had the same failures, how come we merged it?


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] mjsax commented on a diff in pull request #11433: KAFKA-13295: Avoiding Transation timeouts arising due to long restora…

2023-04-28 Thread via GitHub


mjsax commented on code in PR #11433:
URL: https://github.com/apache/kafka/pull/11433#discussion_r832815852


##
streams/src/main/java/org/apache/kafka/streams/processor/internals/TaskManager.java:
##
@@ -262,6 +262,42 @@ private void closeDirtyAndRevive(final Collection 
taskWithChangelogs, fina
 }
 }
 
+private void commitActiveTasks(final Set activeTasksNeedCommit, 
final AtomicReference activeTasksCommitException) {

Review Comment:
   Can't we reuse the existing `commitTasksAndMaybeUpdateCommittableOffsets()` ?



##
streams/src/main/java/org/apache/kafka/streams/processor/internals/TaskManager.java:
##
@@ -342,7 +342,19 @@ public void handleAssignment(final Map> activeTasks,
 
 maybeThrowTaskExceptions(taskCloseExceptions);
 
-createNewTasks(activeTasksToCreate, standbyTasksToCreate);
+rebalanceInProgress = true;
+final Collection newActiveTasks = 
createNewTasks(activeTasksToCreate, standbyTasksToCreate);
+// If there are any transactions in flight and there are newly created 
active tasks, commit the tasks
+// to avoid potential long restoration times.
+if (processingMode == EXACTLY_ONCE_V2 && 
threadProducer().transactionInFlight() && !newActiveTasks.isEmpty()) {
+log.info("New active tasks were added and there is an inflight 
transaction. Attempting to commit tasks.");
+final int numCommitted = 
commitTasksAndMaybeUpdateCommittableOffsets(tasks.allTasks(), new HashMap<>());

Review Comment:
   We pass in the second parameter to be able to handle timeout exceptions 
correctly (cf. other places where we call 
`commitTasksAndMaybeUpdateCommittableOffsets`) -- here, we don't catch 
`TimeoutException`. What is the impact? Seems it bubble into the consumer and 
crashes us?
   
   Given that we kinda need to commit, it seems there are two things we could 
do: either add a loop here, and retry the commit until we exceed 
`task.timeout.config` (not my personally preferred solution), or actually move 
the whole commit logic into the restore part (not sure if this is easily 
possible) -- ie, each time we enter the restore code, we check if there is an 
open TX and commit. Not sure how this would align this the new state-updated 
core though.
   
   \cc @cadonna @lucasbru -- can you comment?



##
streams/src/main/java/org/apache/kafka/streams/processor/internals/TaskManager.java:
##
@@ -342,7 +342,19 @@ public void handleAssignment(final Map> activeTasks,
 
 maybeThrowTaskExceptions(taskCloseExceptions);
 
-createNewTasks(activeTasksToCreate, standbyTasksToCreate);
+rebalanceInProgress = true;
+final Collection newActiveTasks = 
createNewTasks(activeTasksToCreate, standbyTasksToCreate);
+// If there are any transactions in flight and there are newly created 
active tasks, commit the tasks
+// to avoid potential long restoration times.
+if (processingMode == EXACTLY_ONCE_V2 && 
threadProducer().transactionInFlight() && !newActiveTasks.isEmpty()) {

Review Comment:
   Why is this limited to EOS_v2? From my understanding, EOS_v1 would have the 
same problem?
   
   It also only seems to be a potential issue, when we get stateful tasks 
assigned? Not sure if we can limit the last check to 
`!newActiveStatefulTasks.isEmpty()` ?



##
streams/src/main/java/org/apache/kafka/streams/processor/internals/TaskManager.java:
##
@@ -342,7 +342,19 @@ public void handleAssignment(final Map> activeTasks,
 
 maybeThrowTaskExceptions(taskCloseExceptions);
 
-createNewTasks(activeTasksToCreate, standbyTasksToCreate);
+rebalanceInProgress = true;
+final Collection newActiveTasks = 
createNewTasks(activeTasksToCreate, standbyTasksToCreate);

Review Comment:
   Seems we only the count (or a boolean) if empty or not, but not the full 
collection. Can we simplify this?



##
streams/src/main/java/org/apache/kafka/streams/processor/internals/TaskManager.java:
##
@@ -342,7 +342,19 @@ public void handleAssignment(final Map> activeTasks,
 
 maybeThrowTaskExceptions(taskCloseExceptions);
 
-createNewTasks(activeTasksToCreate, standbyTasksToCreate);
+rebalanceInProgress = true;

Review Comment:
   If we set `rebalanceInProgress = true`, it seems the later call to 
`commitTasksAndMaybeUpdateCommittableOffsets` would exit early and return `-1`? 
Does not seem right?
   
   I cannot remember the full purpose of `rebalanceInProgress` flag, and given 
cooperator rebalancing and processing record during a rebalance, wondering if 
semantics actually changed?
   
   Edit: just see that there is a discussion about this below.



##
streams/src/main/java/org/apache/kafka/streams/processor/internals/TaskManager.java:
##
@@ -342,7 +342,18 @@ public void handleAssignment(final Map> activeTasks,
 
 

[GitHub] [kafka] philipnee commented on a diff in pull request #12149: KAFKA-13668: Retry upon missing initProducerId due to authorization error

2023-04-28 Thread via GitHub


philipnee commented on code in PR #12149:
URL: https://github.com/apache/kafka/pull/12149#discussion_r1180625792


##
clients/src/main/java/org/apache/kafka/clients/producer/internals/Sender.java:
##
@@ -300,9 +301,13 @@ void runOnce() {
 try {
 transactionManager.maybeResolveSequences();
 
+RuntimeException lastError = transactionManager.lastError();
+if (transactionManager.hasAbortableError() && 
shouldHandleAuthorizationError(lastError)) {
+return;
+}
+

Review Comment:
   sounds good - perhaps you meant by checking for fatal first then abortable 
error? Though I think it doesn't change the logic there.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] jolshan commented on a diff in pull request #12149: KAFKA-13668: Retry upon missing initProducerId due to authorization error

2023-04-28 Thread via GitHub


jolshan commented on code in PR #12149:
URL: https://github.com/apache/kafka/pull/12149#discussion_r1180619843


##
clients/src/main/java/org/apache/kafka/clients/producer/internals/Sender.java:
##
@@ -300,9 +301,13 @@ void runOnce() {
 try {
 transactionManager.maybeResolveSequences();
 
+RuntimeException lastError = transactionManager.lastError();
+if (transactionManager.hasAbortableError() && 
shouldHandleAuthorizationError(lastError)) {
+return;
+}
+

Review Comment:
   I thought I left this comment, but seems like it didn't take -- should we 
move the fatal error check above the auth check? Since for other request types, 
we will go through the above path, but it really should just be a fatal error.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] jolshan commented on a diff in pull request #13637: KAFKA-14462; [10/N] Add TargetAssignmentBuilder

2023-04-28 Thread via GitHub


jolshan commented on code in PR #13637:
URL: https://github.com/apache/kafka/pull/13637#discussion_r1180613326


##
group-coordinator/src/test/java/org/apache/kafka/coordinator/group/consumer/ConsumerGroupMemberTest.java:
##
@@ -0,0 +1,307 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.coordinator.group.consumer;
+
+import org.apache.kafka.common.Uuid;
+import 
org.apache.kafka.coordinator.group.generated.ConsumerGroupCurrentMemberAssignmentValue;
+import 
org.apache.kafka.coordinator.group.generated.ConsumerGroupMemberMetadataValue;
+import org.junit.jupiter.api.Test;
+
+import java.nio.ByteBuffer;
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.Optional;
+import java.util.OptionalInt;
+
+import static 
org.apache.kafka.coordinator.group.consumer.AssignmentTestUtil.mkAssignment;
+import static 
org.apache.kafka.coordinator.group.consumer.AssignmentTestUtil.mkTopicAssignment;
+import static org.junit.jupiter.api.Assertions.assertEquals;
+
+public class ConsumerGroupMemberTest {
+
+@Test
+public void testNewMember() {
+Uuid topicId1 = Uuid.randomUuid();
+Uuid topicId2 = Uuid.randomUuid();
+Uuid topicId3 = Uuid.randomUuid();
+
+ConsumerGroupMember member = new 
ConsumerGroupMember.Builder("member-id")
+.setMemberEpoch(10)
+.setPreviousMemberEpoch(9)
+.setNextMemberEpoch(11)
+.setInstanceId("instance-id")
+.setRackId("rack-id")
+.setRebalanceTimeoutMs(5000)
+.setClientId("client-id")
+.setClientHost("hostname")
+.setSubscribedTopicNames(Arrays.asList("foo", "bar"))
+.setSubscribedTopicRegex("regex")
+.setServerAssignorName("range")
+.setClientAssignors(Collections.singletonList(
+new ClientAssignor(
+"assignor",
+(byte) 0,
+(byte) 0,
+(byte) 1,
+new VersionedMetadata(
+(byte) 1,
+ByteBuffer.allocate(0)
+.setAssignedPartitions(mkAssignment(
+mkTopicAssignment(topicId1, 1, 2, 3)))
+.setPartitionsPendingRevocation(mkAssignment(
+mkTopicAssignment(topicId2, 4, 5, 6)))
+.setPartitionsPendingAssignment(mkAssignment(
+mkTopicAssignment(topicId3, 7, 8, 9)))
+.build();
+
+assertEquals("member-id", member.memberId());
+assertEquals(10, member.memberEpoch());
+assertEquals(9, member.previousMemberEpoch());
+assertEquals(11, member.nextMemberEpoch());
+assertEquals("instance-id", member.instanceId());
+assertEquals("rack-id", member.rackId());
+assertEquals("client-id", member.clientId());
+assertEquals("hostname", member.clientHost());
+// Names are sorted.
+assertEquals(Arrays.asList("bar", "foo"), 
member.subscribedTopicNames());
+assertEquals("regex", member.subscribedTopicRegex());
+assertEquals("range", member.serverAssignorName().get());
+assertEquals(
+Collections.singletonList(
+new ClientAssignor(
+"assignor",
+(byte) 0,
+(byte) 0,
+(byte) 1,
+new VersionedMetadata(
+(byte) 1,
+ByteBuffer.allocate(0,
+member.clientAssignors());
+assertEquals(mkAssignment(mkTopicAssignment(topicId1, 1, 2, 3)), 
member.assignedPartitions());
+assertEquals(mkAssignment(mkTopicAssignment(topicId2, 4, 5, 6)), 
member.partitionsPendingRevocation());
+assertEquals(mkAssignment(mkTopicAssignment(topicId3, 7, 8, 9)), 
member.partitionsPendingAssignment());
+}
+
+@Test
+public void testEquals() {
+Uuid topicId1 = Uuid.randomUuid();

Review Comment:
   Sorry -- my bad for misunderstanding Java earlier  



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above 

[jira] [Commented] (KAFKA-13891) sync group failed with rebalanceInProgress error cause rebalance many rounds in coopeartive

2023-04-28 Thread Philip Nee (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-13891?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17717745#comment-17717745
 ] 

Philip Nee commented on KAFKA-13891:


[~dajac] - I think we resolve this and the subsequent issues (KAFKA-14016)

> sync group failed with rebalanceInProgress error cause rebalance many rounds 
> in coopeartive
> ---
>
> Key: KAFKA-13891
> URL: https://issues.apache.org/jira/browse/KAFKA-13891
> Project: Kafka
>  Issue Type: Bug
>  Components: clients
>Affects Versions: 3.0.0
>Reporter: Shawn Wang
>Assignee: Philip Nee
>Priority: Major
> Fix For: 3.5.0
>
>
> This issue was first found in 
> [KAFKA-13419|https://issues.apache.org/jira/browse/KAFKA-13419]
> But the previous PR forgot to reset generation when sync group failed with 
> rebalanceInProgress error. So the previous bug still exists and it may cause 
> consumer to rebalance many rounds before final stable.
> Here's the example ({*}bold is added{*}):
>  # consumer A joined and synced group successfully with generation 1 *( with 
> ownedPartition P1/P2 )*
>  # New rebalance started with generation 2, consumer A joined successfully, 
> but somehow, consumer A doesn't send out sync group immediately
>  # other consumer completed sync group successfully in generation 2, except 
> consumer A.
>  # After consumer A send out sync group, the new rebalance start, with 
> generation 3. So consumer A got REBALANCE_IN_PROGRESS error with sync group 
> response
>  # When receiving REBALANCE_IN_PROGRESS, we re-join the group, with 
> generation 3, with the assignment (ownedPartition) in generation 1.
>  # So, now, we have out-of-date ownedPartition sent, with unexpected results 
> happened
>  # *After the generation-3 rebalance, consumer A got P3/P4 partition. the 
> ownedPartition is ignored because of old generation.*
>  # *consumer A revoke P1/P2 and re-join to start a new round of rebalance*
>  # *if some other consumer C failed to syncGroup before consumer A's 
> joinGroup. the same issue will happens again and result in many rounds of 
> rebalance before stable*
>  



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[GitHub] [kafka] dajac commented on a diff in pull request #13637: KAFKA-14462; [10/N] Add TargetAssignmentBuilder

2023-04-28 Thread via GitHub


dajac commented on code in PR #13637:
URL: https://github.com/apache/kafka/pull/13637#discussion_r1180608631


##
group-coordinator/src/test/java/org/apache/kafka/coordinator/group/consumer/TargetAssignmentBuilderTest.java:
##
@@ -0,0 +1,698 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.coordinator.group.consumer;
+
+import org.apache.kafka.common.Uuid;
+import org.apache.kafka.coordinator.group.assignor.AssignmentMemberSpec;
+import org.apache.kafka.coordinator.group.assignor.AssignmentSpec;
+import org.apache.kafka.coordinator.group.assignor.AssignmentTopicMetadata;
+import org.apache.kafka.coordinator.group.assignor.GroupAssignment;
+import org.apache.kafka.coordinator.group.assignor.MemberAssignment;
+import org.apache.kafka.coordinator.group.assignor.PartitionAssignor;
+import org.junit.jupiter.api.Test;
+
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.Optional;
+import java.util.Set;
+
+import static 
org.apache.kafka.coordinator.group.consumer.AssignmentTestUtil.mkAssignment;
+import static 
org.apache.kafka.coordinator.group.consumer.AssignmentTestUtil.mkTopicAssignment;
+import static 
org.apache.kafka.coordinator.group.RecordHelpers.newTargetAssignmentEpochRecord;
+import static 
org.apache.kafka.coordinator.group.RecordHelpers.newTargetAssignmentRecord;
+import static 
org.apache.kafka.coordinator.group.consumer.TargetAssignmentBuilder.createAssignmentMemberSpec;
+import static org.junit.jupiter.api.Assertions.assertEquals;
+import static org.mockito.ArgumentMatchers.any;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.times;
+import static org.mockito.Mockito.verify;
+import static org.mockito.Mockito.when;
+
+public class TargetAssignmentBuilderTest {
+
+public static class TargetAssignmentBuilderTestContext {
+private final String groupId;
+private final int groupEpoch;
+private final PartitionAssignor assignor = 
mock(PartitionAssignor.class);
+private final Map members = new 
HashMap<>();
+private final Map subscriptionMetadata = new 
HashMap<>();
+private final Map updatedMembers = new 
HashMap<>();
+private final Map targetAssignments = new 
HashMap<>();
+private final Map assignments = new 
HashMap<>();
+
+public TargetAssignmentBuilderTestContext(
+String groupId,
+int groupEpoch
+) {
+this.groupId = groupId;
+this.groupEpoch = groupEpoch;
+}
+
+public void addGroupMember(
+String memberId,
+List subscriptions,
+Map> targetPartitions
+) {
+members.put(memberId, new ConsumerGroupMember.Builder(memberId)
+.setSubscribedTopicNames(subscriptions)
+.setRebalanceTimeoutMs(5000)
+.build());
+
+targetAssignments.put(memberId, new Assignment(
+(byte) 0,
+targetPartitions,
+VersionedMetadata.EMPTY
+));
+}
+
+public Uuid addTopicMetadata(
+String topicName,
+int numPartitions
+) {
+Uuid topicId = Uuid.randomUuid();
+subscriptionMetadata.put(topicName, new TopicMetadata(
+topicId,
+topicName,
+numPartitions
+));
+return topicId;
+}
+
+public void updateMemberSubscription(
+String memberId,
+List subscriptions
+) {
+updateMemberSubscription(
+memberId,
+subscriptions,
+Optional.empty(),
+Optional.empty()
+);
+}
+
+public void updateMemberSubscription(
+String memberId,
+List subscriptions,
+Optional instanceId,
+Optional rackId
+) {
+ConsumerGroupMember existingMember = members.get(memberId);
+ConsumerGroupMember.Builder builder;
+if 

[GitHub] [kafka] dajac commented on a diff in pull request #13637: KAFKA-14462; [10/N] Add TargetAssignmentBuilder

2023-04-28 Thread via GitHub


dajac commented on code in PR #13637:
URL: https://github.com/apache/kafka/pull/13637#discussion_r1180608354


##
group-coordinator/src/test/java/org/apache/kafka/coordinator/group/consumer/TargetAssignmentBuilderTest.java:
##
@@ -0,0 +1,698 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.coordinator.group.consumer;
+
+import org.apache.kafka.common.Uuid;
+import org.apache.kafka.coordinator.group.assignor.AssignmentMemberSpec;
+import org.apache.kafka.coordinator.group.assignor.AssignmentSpec;
+import org.apache.kafka.coordinator.group.assignor.AssignmentTopicMetadata;
+import org.apache.kafka.coordinator.group.assignor.GroupAssignment;
+import org.apache.kafka.coordinator.group.assignor.MemberAssignment;
+import org.apache.kafka.coordinator.group.assignor.PartitionAssignor;
+import org.junit.jupiter.api.Test;
+
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.Optional;
+import java.util.Set;
+
+import static 
org.apache.kafka.coordinator.group.consumer.AssignmentTestUtil.mkAssignment;
+import static 
org.apache.kafka.coordinator.group.consumer.AssignmentTestUtil.mkTopicAssignment;
+import static 
org.apache.kafka.coordinator.group.RecordHelpers.newTargetAssignmentEpochRecord;
+import static 
org.apache.kafka.coordinator.group.RecordHelpers.newTargetAssignmentRecord;
+import static 
org.apache.kafka.coordinator.group.consumer.TargetAssignmentBuilder.createAssignmentMemberSpec;
+import static org.junit.jupiter.api.Assertions.assertEquals;
+import static org.mockito.ArgumentMatchers.any;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.times;
+import static org.mockito.Mockito.verify;
+import static org.mockito.Mockito.when;
+
+public class TargetAssignmentBuilderTest {
+
+public static class TargetAssignmentBuilderTestContext {
+private final String groupId;
+private final int groupEpoch;
+private final PartitionAssignor assignor = 
mock(PartitionAssignor.class);
+private final Map members = new 
HashMap<>();
+private final Map subscriptionMetadata = new 
HashMap<>();
+private final Map updatedMembers = new 
HashMap<>();
+private final Map targetAssignments = new 
HashMap<>();
+private final Map assignments = new 
HashMap<>();
+
+public TargetAssignmentBuilderTestContext(
+String groupId,
+int groupEpoch
+) {
+this.groupId = groupId;
+this.groupEpoch = groupEpoch;
+}
+
+public void addGroupMember(
+String memberId,
+List subscriptions,
+Map> targetPartitions
+) {
+members.put(memberId, new ConsumerGroupMember.Builder(memberId)
+.setSubscribedTopicNames(subscriptions)
+.setRebalanceTimeoutMs(5000)
+.build());
+
+targetAssignments.put(memberId, new Assignment(
+(byte) 0,
+targetPartitions,
+VersionedMetadata.EMPTY
+));
+}
+
+public Uuid addTopicMetadata(
+String topicName,
+int numPartitions
+) {
+Uuid topicId = Uuid.randomUuid();
+subscriptionMetadata.put(topicName, new TopicMetadata(
+topicId,
+topicName,
+numPartitions
+));
+return topicId;
+}
+
+public void updateMemberSubscription(
+String memberId,
+List subscriptions
+) {
+updateMemberSubscription(
+memberId,
+subscriptions,
+Optional.empty(),
+Optional.empty()
+);
+}
+
+public void updateMemberSubscription(
+String memberId,
+List subscriptions,
+Optional instanceId,
+Optional rackId
+) {
+ConsumerGroupMember existingMember = members.get(memberId);
+ConsumerGroupMember.Builder builder;
+if 

[GitHub] [kafka] dajac commented on a diff in pull request #13637: KAFKA-14462; [10/N] Add TargetAssignmentBuilder

2023-04-28 Thread via GitHub


dajac commented on code in PR #13637:
URL: https://github.com/apache/kafka/pull/13637#discussion_r1180607217


##
group-coordinator/src/test/java/org/apache/kafka/coordinator/group/consumer/TargetAssignmentBuilderTest.java:
##
@@ -0,0 +1,688 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.coordinator.group.consumer;
+
+import org.apache.kafka.common.Uuid;
+import org.apache.kafka.coordinator.group.assignor.AssignmentMemberSpec;
+import org.apache.kafka.coordinator.group.assignor.AssignmentSpec;
+import org.apache.kafka.coordinator.group.assignor.AssignmentTopicMetadata;
+import org.apache.kafka.coordinator.group.assignor.GroupAssignment;
+import org.apache.kafka.coordinator.group.assignor.PartitionAssignor;
+import org.junit.jupiter.api.Test;
+
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.Optional;
+import java.util.Set;
+
+import static 
org.apache.kafka.coordinator.group.consumer.AssignmentTestUtil.mkAssignment;
+import static 
org.apache.kafka.coordinator.group.consumer.AssignmentTestUtil.mkTopicAssignment;
+import static 
org.apache.kafka.coordinator.group.RecordHelpers.newTargetAssignmentEpochRecord;
+import static 
org.apache.kafka.coordinator.group.RecordHelpers.newTargetAssignmentRecord;
+import static org.junit.jupiter.api.Assertions.assertEquals;
+import static org.mockito.ArgumentMatchers.any;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.times;
+import static org.mockito.Mockito.verify;
+import static org.mockito.Mockito.when;
+
+public class TargetAssignmentBuilderTest {
+
+public static class TargetAssignmentBuilderTestContext {
+private final String groupId;
+private final int groupEpoch;

Review Comment:
   Ack. No worries.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] dajac commented on a diff in pull request #13637: KAFKA-14462; [10/N] Add TargetAssignmentBuilder

2023-04-28 Thread via GitHub


dajac commented on code in PR #13637:
URL: https://github.com/apache/kafka/pull/13637#discussion_r1180606875


##
group-coordinator/src/main/java/org/apache/kafka/coordinator/group/consumer/TargetAssignmentBuilder.java:
##
@@ -0,0 +1,327 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.coordinator.group.consumer;
+
+import org.apache.kafka.common.Uuid;
+import org.apache.kafka.coordinator.group.Record;
+import org.apache.kafka.coordinator.group.assignor.AssignmentMemberSpec;
+import org.apache.kafka.coordinator.group.assignor.AssignmentSpec;
+import org.apache.kafka.coordinator.group.assignor.AssignmentTopicMetadata;
+import org.apache.kafka.coordinator.group.assignor.GroupAssignment;
+import org.apache.kafka.coordinator.group.assignor.MemberAssignment;
+import org.apache.kafka.coordinator.group.assignor.PartitionAssignor;
+import org.apache.kafka.coordinator.group.assignor.PartitionAssignorException;
+
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.Objects;
+import java.util.Optional;
+import java.util.Set;
+
+import static 
org.apache.kafka.coordinator.group.RecordHelpers.newTargetAssignmentEpochRecord;
+import static 
org.apache.kafka.coordinator.group.RecordHelpers.newTargetAssignmentRecord;
+
+/**
+ * Build a new Target Assignment based on the provided parameters. As a result,
+ * it yields the records that must be persisted to the log and the new member
+ * assignments as a map.
+ *
+ * Records are only created for members which have a new target assignment. If
+ * their assignment did not change, no new record is needed.
+ *
+ * When a member is deleted, it is assumed that its target assignment record
+ * is deleted as part of the member deletion process. In other words, this 
class
+ * does not yield a tombstone for removed members.
+ */
+public class TargetAssignmentBuilder {
+/**
+ * The assignment result returned by {{@link 
TargetAssignmentBuilder#build()}}.
+ */
+public static class TargetAssignmentResult {
+/**
+ * The records that must be applied to the __consumer_offsets
+ * topics to persist the new target assignment.
+ */
+private final List records;
+
+/**
+ * The new target assignment for all members.
+ */
+private final Map assignments;
+
+TargetAssignmentResult(
+List records,
+Map assignments
+) {
+Objects.requireNonNull(records);
+Objects.requireNonNull(assignments);
+this.records = records;
+this.assignments = assignments;
+}
+
+/**
+ * @return The records.
+ */
+public List records() {
+return records;
+}
+
+/**
+ * @return The assignments.
+ */
+public Map assignments() {
+return assignments;
+}
+}
+
+/**
+ * The group id.
+ */
+private final String groupId;
+
+/**
+ * The group epoch.
+ */
+private final int groupEpoch;
+
+/**
+ * The partition assignor used to compute the assignment.
+ */
+private final PartitionAssignor assignor;
+
+/**
+ * The members in the group.
+ */
+private Map members = Collections.emptyMap();
+
+/**
+ * The subscription metadata.
+ */
+private Map subscriptionMetadata = 
Collections.emptyMap();
+
+/**
+ * The current target assignment.
+ */
+private Map assignments = Collections.emptyMap();
+
+/**
+ * The members which have been updated or deleted. Deleted members
+ * are signaled by a null value.
+ */
+private final Map updatedMembers = new 
HashMap<>();
+
+/**
+ * Constructs the object.
+ *
+ * @param groupId   The group id.
+ * @param groupEpochThe group epoch to compute a target assignment for.
+ * @param assignor  The assignor to use to compute the target 
assignment.
+ */
+public TargetAssignmentBuilder(
+String groupId,
+int groupEpoch,
+

[GitHub] [kafka] dajac commented on a diff in pull request #13637: KAFKA-14462; [10/N] Add TargetAssignmentBuilder

2023-04-28 Thread via GitHub


dajac commented on code in PR #13637:
URL: https://github.com/apache/kafka/pull/13637#discussion_r1180604733


##
group-coordinator/src/main/java/org/apache/kafka/coordinator/group/consumer/TargetAssignmentBuilder.java:
##
@@ -0,0 +1,327 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.coordinator.group.consumer;
+
+import org.apache.kafka.common.Uuid;
+import org.apache.kafka.coordinator.group.Record;
+import org.apache.kafka.coordinator.group.assignor.AssignmentMemberSpec;
+import org.apache.kafka.coordinator.group.assignor.AssignmentSpec;
+import org.apache.kafka.coordinator.group.assignor.AssignmentTopicMetadata;
+import org.apache.kafka.coordinator.group.assignor.GroupAssignment;
+import org.apache.kafka.coordinator.group.assignor.MemberAssignment;
+import org.apache.kafka.coordinator.group.assignor.PartitionAssignor;
+import org.apache.kafka.coordinator.group.assignor.PartitionAssignorException;
+
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.Objects;
+import java.util.Optional;
+import java.util.Set;
+
+import static 
org.apache.kafka.coordinator.group.RecordHelpers.newTargetAssignmentEpochRecord;
+import static 
org.apache.kafka.coordinator.group.RecordHelpers.newTargetAssignmentRecord;
+
+/**
+ * Build a new Target Assignment based on the provided parameters. As a result,
+ * it yields the records that must be persisted to the log and the new member
+ * assignments as a map.
+ *
+ * Records are only created for members which have a new target assignment. If
+ * their assignment did not change, no new record is needed.
+ *
+ * When a member is deleted, it is assumed that its target assignment record
+ * is deleted as part of the member deletion process. In other words, this 
class
+ * does not yield a tombstone for removed members.
+ */
+public class TargetAssignmentBuilder {
+/**
+ * The assignment result returned by {{@link 
TargetAssignmentBuilder#build()}}.
+ */
+public static class TargetAssignmentResult {
+/**
+ * The records that must be applied to the __consumer_offsets
+ * topics to persist the new target assignment.
+ */
+private final List records;
+
+/**
+ * The new target assignment for all members.
+ */
+private final Map assignments;
+
+TargetAssignmentResult(
+List records,
+Map assignments
+) {
+Objects.requireNonNull(records);
+Objects.requireNonNull(assignments);
+this.records = records;
+this.assignments = assignments;
+}
+
+/**
+ * @return The records.
+ */
+public List records() {
+return records;
+}
+
+/**
+ * @return The assignments.
+ */
+public Map assignments() {
+return assignments;
+}
+}
+
+/**
+ * The group id.
+ */
+private final String groupId;
+
+/**
+ * The group epoch.
+ */
+private final int groupEpoch;
+
+/**
+ * The partition assignor used to compute the assignment.
+ */
+private final PartitionAssignor assignor;
+
+/**
+ * The members in the group.
+ */
+private Map members = Collections.emptyMap();
+
+/**
+ * The subscription metadata.
+ */
+private Map subscriptionMetadata = 
Collections.emptyMap();
+
+/**
+ * The current target assignment.
+ */
+private Map assignments = Collections.emptyMap();
+
+/**
+ * The members which have been updated or deleted. Deleted members
+ * are signaled by a null value.
+ */
+private final Map updatedMembers = new 
HashMap<>();
+
+/**
+ * Constructs the object.
+ *
+ * @param groupId   The group id.
+ * @param groupEpochThe group epoch to compute a target assignment for.
+ * @param assignor  The assignor to use to compute the target 
assignment.
+ */
+public TargetAssignmentBuilder(
+String groupId,
+int groupEpoch,
+

[GitHub] [kafka] philipnee opened a new pull request, #13652: Cherry pick KAFKA-14639

2023-04-28 Thread via GitHub


philipnee opened a new pull request, #13652:
URL: https://github.com/apache/kafka/pull/13652

   *More detailed description of your change,
   if necessary. The PR title and PR message become
   the squashed commit message, so use a separate
   comment to ping reviewers.*
   
   *Summary of testing strategy (including rationale)
   for the feature or bug fix. Unit and/or integration
   tests are expected for any behaviour change and
   system tests should be considered for larger changes.*
   
   ### Committer Checklist (excluded from commit message)
   - [ ] Verify design and implementation 
   - [ ] Verify test coverage and CI build status
   - [ ] Verify documentation (including upgrade notes)
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] jolshan commented on a diff in pull request #13637: KAFKA-14462; [10/N] Add TargetAssignmentBuilder

2023-04-28 Thread via GitHub


jolshan commented on code in PR #13637:
URL: https://github.com/apache/kafka/pull/13637#discussion_r1180601499


##
group-coordinator/src/test/java/org/apache/kafka/coordinator/group/consumer/TargetAssignmentBuilderTest.java:
##
@@ -0,0 +1,698 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.coordinator.group.consumer;
+
+import org.apache.kafka.common.Uuid;
+import org.apache.kafka.coordinator.group.assignor.AssignmentMemberSpec;
+import org.apache.kafka.coordinator.group.assignor.AssignmentSpec;
+import org.apache.kafka.coordinator.group.assignor.AssignmentTopicMetadata;
+import org.apache.kafka.coordinator.group.assignor.GroupAssignment;
+import org.apache.kafka.coordinator.group.assignor.MemberAssignment;
+import org.apache.kafka.coordinator.group.assignor.PartitionAssignor;
+import org.junit.jupiter.api.Test;
+
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.Optional;
+import java.util.Set;
+
+import static 
org.apache.kafka.coordinator.group.consumer.AssignmentTestUtil.mkAssignment;
+import static 
org.apache.kafka.coordinator.group.consumer.AssignmentTestUtil.mkTopicAssignment;
+import static 
org.apache.kafka.coordinator.group.RecordHelpers.newTargetAssignmentEpochRecord;
+import static 
org.apache.kafka.coordinator.group.RecordHelpers.newTargetAssignmentRecord;
+import static 
org.apache.kafka.coordinator.group.consumer.TargetAssignmentBuilder.createAssignmentMemberSpec;
+import static org.junit.jupiter.api.Assertions.assertEquals;
+import static org.mockito.ArgumentMatchers.any;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.times;
+import static org.mockito.Mockito.verify;
+import static org.mockito.Mockito.when;
+
+public class TargetAssignmentBuilderTest {
+
+public static class TargetAssignmentBuilderTestContext {
+private final String groupId;
+private final int groupEpoch;
+private final PartitionAssignor assignor = 
mock(PartitionAssignor.class);
+private final Map members = new 
HashMap<>();
+private final Map subscriptionMetadata = new 
HashMap<>();
+private final Map updatedMembers = new 
HashMap<>();
+private final Map targetAssignments = new 
HashMap<>();
+private final Map assignments = new 
HashMap<>();
+
+public TargetAssignmentBuilderTestContext(
+String groupId,
+int groupEpoch
+) {
+this.groupId = groupId;
+this.groupEpoch = groupEpoch;
+}
+
+public void addGroupMember(
+String memberId,
+List subscriptions,
+Map> targetPartitions
+) {
+members.put(memberId, new ConsumerGroupMember.Builder(memberId)
+.setSubscribedTopicNames(subscriptions)
+.setRebalanceTimeoutMs(5000)
+.build());
+
+targetAssignments.put(memberId, new Assignment(
+(byte) 0,
+targetPartitions,
+VersionedMetadata.EMPTY
+));
+}
+
+public Uuid addTopicMetadata(
+String topicName,
+int numPartitions
+) {
+Uuid topicId = Uuid.randomUuid();
+subscriptionMetadata.put(topicName, new TopicMetadata(
+topicId,
+topicName,
+numPartitions
+));
+return topicId;
+}
+
+public void updateMemberSubscription(
+String memberId,
+List subscriptions
+) {
+updateMemberSubscription(
+memberId,
+subscriptions,
+Optional.empty(),
+Optional.empty()
+);
+}
+
+public void updateMemberSubscription(
+String memberId,
+List subscriptions,
+Optional instanceId,
+Optional rackId
+) {
+ConsumerGroupMember existingMember = members.get(memberId);
+ConsumerGroupMember.Builder builder;
+if 

[GitHub] [kafka] jolshan commented on a diff in pull request #13637: KAFKA-14462; [10/N] Add TargetAssignmentBuilder

2023-04-28 Thread via GitHub


jolshan commented on code in PR #13637:
URL: https://github.com/apache/kafka/pull/13637#discussion_r1180598466


##
group-coordinator/src/test/java/org/apache/kafka/coordinator/group/consumer/TargetAssignmentBuilderTest.java:
##
@@ -0,0 +1,698 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.coordinator.group.consumer;
+
+import org.apache.kafka.common.Uuid;
+import org.apache.kafka.coordinator.group.assignor.AssignmentMemberSpec;
+import org.apache.kafka.coordinator.group.assignor.AssignmentSpec;
+import org.apache.kafka.coordinator.group.assignor.AssignmentTopicMetadata;
+import org.apache.kafka.coordinator.group.assignor.GroupAssignment;
+import org.apache.kafka.coordinator.group.assignor.MemberAssignment;
+import org.apache.kafka.coordinator.group.assignor.PartitionAssignor;
+import org.junit.jupiter.api.Test;
+
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.Optional;
+import java.util.Set;
+
+import static 
org.apache.kafka.coordinator.group.consumer.AssignmentTestUtil.mkAssignment;
+import static 
org.apache.kafka.coordinator.group.consumer.AssignmentTestUtil.mkTopicAssignment;
+import static 
org.apache.kafka.coordinator.group.RecordHelpers.newTargetAssignmentEpochRecord;
+import static 
org.apache.kafka.coordinator.group.RecordHelpers.newTargetAssignmentRecord;
+import static 
org.apache.kafka.coordinator.group.consumer.TargetAssignmentBuilder.createAssignmentMemberSpec;
+import static org.junit.jupiter.api.Assertions.assertEquals;
+import static org.mockito.ArgumentMatchers.any;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.times;
+import static org.mockito.Mockito.verify;
+import static org.mockito.Mockito.when;
+
+public class TargetAssignmentBuilderTest {
+
+public static class TargetAssignmentBuilderTestContext {
+private final String groupId;
+private final int groupEpoch;
+private final PartitionAssignor assignor = 
mock(PartitionAssignor.class);
+private final Map members = new 
HashMap<>();
+private final Map subscriptionMetadata = new 
HashMap<>();
+private final Map updatedMembers = new 
HashMap<>();
+private final Map targetAssignments = new 
HashMap<>();
+private final Map assignments = new 
HashMap<>();
+
+public TargetAssignmentBuilderTestContext(
+String groupId,
+int groupEpoch
+) {
+this.groupId = groupId;
+this.groupEpoch = groupEpoch;
+}
+
+public void addGroupMember(
+String memberId,
+List subscriptions,
+Map> targetPartitions
+) {
+members.put(memberId, new ConsumerGroupMember.Builder(memberId)
+.setSubscribedTopicNames(subscriptions)
+.setRebalanceTimeoutMs(5000)
+.build());
+
+targetAssignments.put(memberId, new Assignment(
+(byte) 0,
+targetPartitions,
+VersionedMetadata.EMPTY
+));
+}
+
+public Uuid addTopicMetadata(
+String topicName,
+int numPartitions
+) {
+Uuid topicId = Uuid.randomUuid();
+subscriptionMetadata.put(topicName, new TopicMetadata(
+topicId,
+topicName,
+numPartitions
+));
+return topicId;
+}
+
+public void updateMemberSubscription(
+String memberId,
+List subscriptions
+) {
+updateMemberSubscription(
+memberId,
+subscriptions,
+Optional.empty(),
+Optional.empty()
+);
+}
+
+public void updateMemberSubscription(
+String memberId,
+List subscriptions,
+Optional instanceId,
+Optional rackId
+) {
+ConsumerGroupMember existingMember = members.get(memberId);
+ConsumerGroupMember.Builder builder;
+if 

[GitHub] [kafka] jolshan commented on a diff in pull request #13637: KAFKA-14462; [10/N] Add TargetAssignmentBuilder

2023-04-28 Thread via GitHub


jolshan commented on code in PR #13637:
URL: https://github.com/apache/kafka/pull/13637#discussion_r1180593876


##
group-coordinator/src/test/java/org/apache/kafka/coordinator/group/consumer/TargetAssignmentBuilderTest.java:
##
@@ -0,0 +1,688 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.coordinator.group.consumer;
+
+import org.apache.kafka.common.Uuid;
+import org.apache.kafka.coordinator.group.assignor.AssignmentMemberSpec;
+import org.apache.kafka.coordinator.group.assignor.AssignmentSpec;
+import org.apache.kafka.coordinator.group.assignor.AssignmentTopicMetadata;
+import org.apache.kafka.coordinator.group.assignor.GroupAssignment;
+import org.apache.kafka.coordinator.group.assignor.PartitionAssignor;
+import org.junit.jupiter.api.Test;
+
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.Optional;
+import java.util.Set;
+
+import static 
org.apache.kafka.coordinator.group.consumer.AssignmentTestUtil.mkAssignment;
+import static 
org.apache.kafka.coordinator.group.consumer.AssignmentTestUtil.mkTopicAssignment;
+import static 
org.apache.kafka.coordinator.group.RecordHelpers.newTargetAssignmentEpochRecord;
+import static 
org.apache.kafka.coordinator.group.RecordHelpers.newTargetAssignmentRecord;
+import static org.junit.jupiter.api.Assertions.assertEquals;
+import static org.mockito.ArgumentMatchers.any;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.times;
+import static org.mockito.Mockito.verify;
+import static org.mockito.Mockito.when;
+
+public class TargetAssignmentBuilderTest {
+
+public static class TargetAssignmentBuilderTestContext {
+private final String groupId;
+private final int groupEpoch;

Review Comment:
   Sorry I wanted `new TargetAssignmentBuilderTestContext` per test. But I see 
you have that. I forgot how static classes worked in Java and thought that you 
could only have once instance that is shared. I see though that the semantics 
are different. Sorry for confusion.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] jolshan commented on a diff in pull request #13637: KAFKA-14462; [10/N] Add TargetAssignmentBuilder

2023-04-28 Thread via GitHub


jolshan commented on code in PR #13637:
URL: https://github.com/apache/kafka/pull/13637#discussion_r1180590786


##
group-coordinator/src/test/java/org/apache/kafka/coordinator/group/consumer/TargetAssignmentBuilderTest.java:
##
@@ -0,0 +1,688 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.coordinator.group.consumer;
+
+import org.apache.kafka.common.Uuid;
+import org.apache.kafka.coordinator.group.assignor.AssignmentMemberSpec;
+import org.apache.kafka.coordinator.group.assignor.AssignmentSpec;
+import org.apache.kafka.coordinator.group.assignor.AssignmentTopicMetadata;
+import org.apache.kafka.coordinator.group.assignor.GroupAssignment;
+import org.apache.kafka.coordinator.group.assignor.PartitionAssignor;
+import org.junit.jupiter.api.Test;
+
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.Optional;
+import java.util.Set;
+
+import static 
org.apache.kafka.coordinator.group.consumer.AssignmentTestUtil.mkAssignment;
+import static 
org.apache.kafka.coordinator.group.consumer.AssignmentTestUtil.mkTopicAssignment;
+import static 
org.apache.kafka.coordinator.group.RecordHelpers.newTargetAssignmentEpochRecord;
+import static 
org.apache.kafka.coordinator.group.RecordHelpers.newTargetAssignmentRecord;
+import static org.junit.jupiter.api.Assertions.assertEquals;
+import static org.mockito.ArgumentMatchers.any;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.times;
+import static org.mockito.Mockito.verify;
+import static org.mockito.Mockito.when;
+
+public class TargetAssignmentBuilderTest {
+
+public static class TargetAssignmentBuilderTestContext {
+private final String groupId;
+private final int groupEpoch;

Review Comment:
   The class itself is shared amongst all the tests. We just put new data in 
for each test yes?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] jolshan commented on a diff in pull request #13637: KAFKA-14462; [10/N] Add TargetAssignmentBuilder

2023-04-28 Thread via GitHub


jolshan commented on code in PR #13637:
URL: https://github.com/apache/kafka/pull/13637#discussion_r1180590049


##
group-coordinator/src/main/java/org/apache/kafka/coordinator/group/consumer/TargetAssignmentBuilder.java:
##
@@ -0,0 +1,327 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.coordinator.group.consumer;
+
+import org.apache.kafka.common.Uuid;
+import org.apache.kafka.coordinator.group.Record;
+import org.apache.kafka.coordinator.group.assignor.AssignmentMemberSpec;
+import org.apache.kafka.coordinator.group.assignor.AssignmentSpec;
+import org.apache.kafka.coordinator.group.assignor.AssignmentTopicMetadata;
+import org.apache.kafka.coordinator.group.assignor.GroupAssignment;
+import org.apache.kafka.coordinator.group.assignor.MemberAssignment;
+import org.apache.kafka.coordinator.group.assignor.PartitionAssignor;
+import org.apache.kafka.coordinator.group.assignor.PartitionAssignorException;
+
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.Objects;
+import java.util.Optional;
+import java.util.Set;
+
+import static 
org.apache.kafka.coordinator.group.RecordHelpers.newTargetAssignmentEpochRecord;
+import static 
org.apache.kafka.coordinator.group.RecordHelpers.newTargetAssignmentRecord;
+
+/**
+ * Build a new Target Assignment based on the provided parameters. As a result,
+ * it yields the records that must be persisted to the log and the new member
+ * assignments as a map.
+ *
+ * Records are only created for members which have a new target assignment. If
+ * their assignment did not change, no new record is needed.
+ *
+ * When a member is deleted, it is assumed that its target assignment record
+ * is deleted as part of the member deletion process. In other words, this 
class
+ * does not yield a tombstone for removed members.
+ */
+public class TargetAssignmentBuilder {
+/**
+ * The assignment result returned by {{@link 
TargetAssignmentBuilder#build()}}.
+ */
+public static class TargetAssignmentResult {
+/**
+ * The records that must be applied to the __consumer_offsets
+ * topics to persist the new target assignment.
+ */
+private final List records;
+
+/**
+ * The new target assignment for all members.
+ */
+private final Map assignments;
+
+TargetAssignmentResult(
+List records,
+Map assignments
+) {
+Objects.requireNonNull(records);
+Objects.requireNonNull(assignments);
+this.records = records;
+this.assignments = assignments;
+}
+
+/**
+ * @return The records.
+ */
+public List records() {
+return records;
+}
+
+/**
+ * @return The assignments.
+ */
+public Map assignments() {
+return assignments;
+}
+}
+
+/**
+ * The group id.
+ */
+private final String groupId;
+
+/**
+ * The group epoch.
+ */
+private final int groupEpoch;
+
+/**
+ * The partition assignor used to compute the assignment.
+ */
+private final PartitionAssignor assignor;
+
+/**
+ * The members in the group.
+ */
+private Map members = Collections.emptyMap();
+
+/**
+ * The subscription metadata.
+ */
+private Map subscriptionMetadata = 
Collections.emptyMap();
+
+/**
+ * The current target assignment.
+ */
+private Map assignments = Collections.emptyMap();
+
+/**
+ * The members which have been updated or deleted. Deleted members
+ * are signaled by a null value.
+ */
+private final Map updatedMembers = new 
HashMap<>();
+
+/**
+ * Constructs the object.
+ *
+ * @param groupId   The group id.
+ * @param groupEpochThe group epoch to compute a target assignment for.
+ * @param assignor  The assignor to use to compute the target 
assignment.
+ */
+public TargetAssignmentBuilder(
+String groupId,
+int groupEpoch,
+

[GitHub] [kafka] jolshan commented on a diff in pull request #13637: KAFKA-14462; [10/N] Add TargetAssignmentBuilder

2023-04-28 Thread via GitHub


jolshan commented on code in PR #13637:
URL: https://github.com/apache/kafka/pull/13637#discussion_r1180589296


##
group-coordinator/src/main/java/org/apache/kafka/coordinator/group/consumer/TargetAssignmentBuilder.java:
##
@@ -0,0 +1,327 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.coordinator.group.consumer;
+
+import org.apache.kafka.common.Uuid;
+import org.apache.kafka.coordinator.group.Record;
+import org.apache.kafka.coordinator.group.assignor.AssignmentMemberSpec;
+import org.apache.kafka.coordinator.group.assignor.AssignmentSpec;
+import org.apache.kafka.coordinator.group.assignor.AssignmentTopicMetadata;
+import org.apache.kafka.coordinator.group.assignor.GroupAssignment;
+import org.apache.kafka.coordinator.group.assignor.MemberAssignment;
+import org.apache.kafka.coordinator.group.assignor.PartitionAssignor;
+import org.apache.kafka.coordinator.group.assignor.PartitionAssignorException;
+
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.Objects;
+import java.util.Optional;
+import java.util.Set;
+
+import static 
org.apache.kafka.coordinator.group.RecordHelpers.newTargetAssignmentEpochRecord;
+import static 
org.apache.kafka.coordinator.group.RecordHelpers.newTargetAssignmentRecord;
+
+/**
+ * Build a new Target Assignment based on the provided parameters. As a result,
+ * it yields the records that must be persisted to the log and the new member
+ * assignments as a map.
+ *
+ * Records are only created for members which have a new target assignment. If
+ * their assignment did not change, no new record is needed.
+ *
+ * When a member is deleted, it is assumed that its target assignment record
+ * is deleted as part of the member deletion process. In other words, this 
class
+ * does not yield a tombstone for removed members.
+ */
+public class TargetAssignmentBuilder {
+/**
+ * The assignment result returned by {{@link 
TargetAssignmentBuilder#build()}}.
+ */
+public static class TargetAssignmentResult {
+/**
+ * The records that must be applied to the __consumer_offsets
+ * topics to persist the new target assignment.
+ */
+private final List records;
+
+/**
+ * The new target assignment for all members.
+ */
+private final Map assignments;
+
+TargetAssignmentResult(
+List records,
+Map assignments
+) {
+Objects.requireNonNull(records);
+Objects.requireNonNull(assignments);
+this.records = records;
+this.assignments = assignments;
+}
+
+/**
+ * @return The records.
+ */
+public List records() {
+return records;
+}
+
+/**
+ * @return The assignments.
+ */
+public Map assignments() {
+return assignments;
+}
+}
+
+/**
+ * The group id.
+ */
+private final String groupId;
+
+/**
+ * The group epoch.
+ */
+private final int groupEpoch;
+
+/**
+ * The partition assignor used to compute the assignment.
+ */
+private final PartitionAssignor assignor;
+
+/**
+ * The members in the group.
+ */
+private Map members = Collections.emptyMap();
+
+/**
+ * The subscription metadata.
+ */
+private Map subscriptionMetadata = 
Collections.emptyMap();
+
+/**
+ * The current target assignment.
+ */
+private Map assignments = Collections.emptyMap();
+
+/**
+ * The members which have been updated or deleted. Deleted members
+ * are signaled by a null value.
+ */
+private final Map updatedMembers = new 
HashMap<>();
+
+/**
+ * Constructs the object.
+ *
+ * @param groupId   The group id.
+ * @param groupEpochThe group epoch to compute a target assignment for.
+ * @param assignor  The assignor to use to compute the target 
assignment.
+ */
+public TargetAssignmentBuilder(
+String groupId,
+int groupEpoch,
+

[GitHub] [kafka] jolshan commented on a diff in pull request #13637: KAFKA-14462; [10/N] Add TargetAssignmentBuilder

2023-04-28 Thread via GitHub


jolshan commented on code in PR #13637:
URL: https://github.com/apache/kafka/pull/13637#discussion_r1180588499


##
group-coordinator/src/main/java/org/apache/kafka/coordinator/group/consumer/TargetAssignmentBuilder.java:
##
@@ -0,0 +1,327 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.coordinator.group.consumer;
+
+import org.apache.kafka.common.Uuid;
+import org.apache.kafka.coordinator.group.Record;
+import org.apache.kafka.coordinator.group.assignor.AssignmentMemberSpec;
+import org.apache.kafka.coordinator.group.assignor.AssignmentSpec;
+import org.apache.kafka.coordinator.group.assignor.AssignmentTopicMetadata;
+import org.apache.kafka.coordinator.group.assignor.GroupAssignment;
+import org.apache.kafka.coordinator.group.assignor.MemberAssignment;
+import org.apache.kafka.coordinator.group.assignor.PartitionAssignor;
+import org.apache.kafka.coordinator.group.assignor.PartitionAssignorException;
+
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.Objects;
+import java.util.Optional;
+import java.util.Set;
+
+import static 
org.apache.kafka.coordinator.group.RecordHelpers.newTargetAssignmentEpochRecord;
+import static 
org.apache.kafka.coordinator.group.RecordHelpers.newTargetAssignmentRecord;
+
+/**
+ * Build a new Target Assignment based on the provided parameters. As a result,
+ * it yields the records that must be persisted to the log and the new member
+ * assignments as a map.
+ *
+ * Records are only created for members which have a new target assignment. If
+ * their assignment did not change, no new record is needed.
+ *
+ * When a member is deleted, it is assumed that its target assignment record
+ * is deleted as part of the member deletion process. In other words, this 
class
+ * does not yield a tombstone for removed members.

Review Comment:
   I read this, but I guess I just didn't assume that delete record = create 
tombstone. I guess that's a pretty easy conclusion to draw, it just didn't come 
to me right away.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] jeffkbkim commented on a diff in pull request #13267: KAFKA-14694: RPCProducerIdManager should not wait on new block

2023-04-28 Thread via GitHub


jeffkbkim commented on code in PR #13267:
URL: https://github.com/apache/kafka/pull/13267#discussion_r1180578924


##
core/src/main/scala/kafka/coordinator/transaction/ProducerIdManager.scala:
##
@@ -123,73 +129,94 @@ class ZkProducerIdManager(brokerId: Int,
 }
   }
 
-  def generateProducerId(): Long = {
+  def generateProducerId(): Try[Long] = {
 this synchronized {
   // grab a new block of producerIds if this block has been exhausted
   if (nextProducerId > currentProducerIdBlock.lastProducerId) {
-allocateNewProducerIdBlock()
+try {
+  allocateNewProducerIdBlock()
+} catch {
+  case t: Throwable =>
+return Failure(t)
+}
 nextProducerId = currentProducerIdBlock.firstProducerId
   }
   nextProducerId += 1
-  nextProducerId - 1
+  Success(nextProducerId - 1)
+}
+  }
+
+  override def hasValidBlock: Boolean = {
+this synchronized {
+  !currentProducerIdBlock.equals(ProducerIdsBlock.EMPTY)
 }
   }
 }
 
+/**
+ * RPCProducerIdManager allocates producer id blocks asynchronously and will 
immediately fail requests
+ * for producers to retry if it does not have an available producer id and is 
waiting on a new block.
+ */
 class RPCProducerIdManager(brokerId: Int,
+   time: Time,
brokerEpochSupplier: () => Long,
-   controllerChannel: BrokerToControllerChannelManager,
-   maxWaitMs: Int) extends ProducerIdManager with 
Logging {
+   controllerChannel: 
BrokerToControllerChannelManager) extends ProducerIdManager with Logging {
 
   this.logIdent = "[RPC ProducerId Manager " + brokerId + "]: "
 
-  private val nextProducerIdBlock = new 
ArrayBlockingQueue[Try[ProducerIdsBlock]](1)
+  // Visible for testing
+  private[transaction] var nextProducerIdBlock = new 
AtomicReference[ProducerIdsBlock](null)
+  private val currentProducerIdBlock: AtomicReference[ProducerIdsBlock] = new 
AtomicReference(ProducerIdsBlock.EMPTY)
   private val requestInFlight = new AtomicBoolean(false)
-  private var currentProducerIdBlock: ProducerIdsBlock = ProducerIdsBlock.EMPTY
-  private var nextProducerId: Long = -1L
+  private val shouldBackoff = new AtomicBoolean(false)
 
-  override def generateProducerId(): Long = {
-this synchronized {
-  if (nextProducerId == -1L) {
-// Send an initial request to get the first block
-maybeRequestNextBlock()
-nextProducerId = 0L
-  } else {
-nextProducerId += 1
-
-// Check if we need to fetch the next block
-if (nextProducerId >= (currentProducerIdBlock.firstProducerId + 
currentProducerIdBlock.size * ProducerIdManager.PidPrefetchThreshold)) {
-  maybeRequestNextBlock()
-}
-  }
+  override def hasValidBlock: Boolean = {
+nextProducerIdBlock.get != null
+  }
 
-  // If we've exhausted the current block, grab the next block (waiting if 
necessary)
-  if (nextProducerId > currentProducerIdBlock.lastProducerId) {
-val block = nextProducerIdBlock.poll(maxWaitMs, TimeUnit.MILLISECONDS)
-if (block == null) {
-  // Return COORDINATOR_LOAD_IN_PROGRESS rather than REQUEST_TIMED_OUT 
since older clients treat the error as fatal
-  // when it should be retriable like COORDINATOR_LOAD_IN_PROGRESS.
-  throw Errors.COORDINATOR_LOAD_IN_PROGRESS.exception("Timed out 
waiting for next producer ID block")
-} else {
-  block match {
-case Success(nextBlock) =>
-  currentProducerIdBlock = nextBlock
-  nextProducerId = currentProducerIdBlock.firstProducerId
-case Failure(t) => throw t
+  override def generateProducerId(): Try[Long] = {
+var result: Try[Long] = null
+while (result == null) {

Review Comment:
   ah that makes sense



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] dajac commented on a diff in pull request #13637: KAFKA-14462; [10/N] Add TargetAssignmentBuilder

2023-04-28 Thread via GitHub


dajac commented on code in PR #13637:
URL: https://github.com/apache/kafka/pull/13637#discussion_r1180576777


##
group-coordinator/src/test/java/org/apache/kafka/coordinator/group/consumer/ConsumerGroupMemberTest.java:
##
@@ -0,0 +1,307 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.coordinator.group.consumer;
+
+import org.apache.kafka.common.Uuid;
+import 
org.apache.kafka.coordinator.group.generated.ConsumerGroupCurrentMemberAssignmentValue;
+import 
org.apache.kafka.coordinator.group.generated.ConsumerGroupMemberMetadataValue;
+import org.junit.jupiter.api.Test;
+
+import java.nio.ByteBuffer;
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.Optional;
+import java.util.OptionalInt;
+
+import static 
org.apache.kafka.coordinator.group.consumer.AssignmentTestUtil.mkAssignment;
+import static 
org.apache.kafka.coordinator.group.consumer.AssignmentTestUtil.mkTopicAssignment;
+import static org.junit.jupiter.api.Assertions.assertEquals;
+
+public class ConsumerGroupMemberTest {
+
+@Test
+public void testNewMember() {
+Uuid topicId1 = Uuid.randomUuid();
+Uuid topicId2 = Uuid.randomUuid();
+Uuid topicId3 = Uuid.randomUuid();
+
+ConsumerGroupMember member = new 
ConsumerGroupMember.Builder("member-id")
+.setMemberEpoch(10)
+.setPreviousMemberEpoch(9)
+.setNextMemberEpoch(11)
+.setInstanceId("instance-id")
+.setRackId("rack-id")
+.setRebalanceTimeoutMs(5000)
+.setClientId("client-id")
+.setClientHost("hostname")
+.setSubscribedTopicNames(Arrays.asList("foo", "bar"))
+.setSubscribedTopicRegex("regex")
+.setServerAssignorName("range")
+.setClientAssignors(Collections.singletonList(
+new ClientAssignor(
+"assignor",
+(byte) 0,
+(byte) 0,
+(byte) 1,
+new VersionedMetadata(
+(byte) 1,
+ByteBuffer.allocate(0)
+.setAssignedPartitions(mkAssignment(
+mkTopicAssignment(topicId1, 1, 2, 3)))
+.setPartitionsPendingRevocation(mkAssignment(
+mkTopicAssignment(topicId2, 4, 5, 6)))
+.setPartitionsPendingAssignment(mkAssignment(
+mkTopicAssignment(topicId3, 7, 8, 9)))
+.build();
+
+assertEquals("member-id", member.memberId());
+assertEquals(10, member.memberEpoch());
+assertEquals(9, member.previousMemberEpoch());
+assertEquals(11, member.nextMemberEpoch());
+assertEquals("instance-id", member.instanceId());
+assertEquals("rack-id", member.rackId());
+assertEquals("client-id", member.clientId());
+assertEquals("hostname", member.clientHost());
+// Names are sorted.
+assertEquals(Arrays.asList("bar", "foo"), 
member.subscribedTopicNames());
+assertEquals("regex", member.subscribedTopicRegex());
+assertEquals("range", member.serverAssignorName().get());
+assertEquals(
+Collections.singletonList(
+new ClientAssignor(
+"assignor",
+(byte) 0,
+(byte) 0,
+(byte) 1,
+new VersionedMetadata(
+(byte) 1,
+ByteBuffer.allocate(0,
+member.clientAssignors());
+assertEquals(mkAssignment(mkTopicAssignment(topicId1, 1, 2, 3)), 
member.assignedPartitions());
+assertEquals(mkAssignment(mkTopicAssignment(topicId2, 4, 5, 6)), 
member.partitionsPendingRevocation());
+assertEquals(mkAssignment(mkTopicAssignment(topicId3, 7, 8, 9)), 
member.partitionsPendingAssignment());
+}
+
+@Test
+public void testEquals() {
+Uuid topicId1 = Uuid.randomUuid();

Review Comment:
   Got it.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To 

[GitHub] [kafka] jolshan commented on a diff in pull request #13637: KAFKA-14462; [10/N] Add TargetAssignmentBuilder

2023-04-28 Thread via GitHub


jolshan commented on code in PR #13637:
URL: https://github.com/apache/kafka/pull/13637#discussion_r1180575457


##
group-coordinator/src/test/java/org/apache/kafka/coordinator/group/consumer/ConsumerGroupMemberTest.java:
##
@@ -0,0 +1,307 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.coordinator.group.consumer;
+
+import org.apache.kafka.common.Uuid;
+import 
org.apache.kafka.coordinator.group.generated.ConsumerGroupCurrentMemberAssignmentValue;
+import 
org.apache.kafka.coordinator.group.generated.ConsumerGroupMemberMetadataValue;
+import org.junit.jupiter.api.Test;
+
+import java.nio.ByteBuffer;
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.Optional;
+import java.util.OptionalInt;
+
+import static 
org.apache.kafka.coordinator.group.consumer.AssignmentTestUtil.mkAssignment;
+import static 
org.apache.kafka.coordinator.group.consumer.AssignmentTestUtil.mkTopicAssignment;
+import static org.junit.jupiter.api.Assertions.assertEquals;
+
+public class ConsumerGroupMemberTest {
+
+@Test
+public void testNewMember() {
+Uuid topicId1 = Uuid.randomUuid();
+Uuid topicId2 = Uuid.randomUuid();
+Uuid topicId3 = Uuid.randomUuid();
+
+ConsumerGroupMember member = new 
ConsumerGroupMember.Builder("member-id")
+.setMemberEpoch(10)
+.setPreviousMemberEpoch(9)
+.setNextMemberEpoch(11)
+.setInstanceId("instance-id")
+.setRackId("rack-id")
+.setRebalanceTimeoutMs(5000)
+.setClientId("client-id")
+.setClientHost("hostname")
+.setSubscribedTopicNames(Arrays.asList("foo", "bar"))
+.setSubscribedTopicRegex("regex")
+.setServerAssignorName("range")
+.setClientAssignors(Collections.singletonList(
+new ClientAssignor(
+"assignor",
+(byte) 0,
+(byte) 0,
+(byte) 1,
+new VersionedMetadata(
+(byte) 1,
+ByteBuffer.allocate(0)
+.setAssignedPartitions(mkAssignment(
+mkTopicAssignment(topicId1, 1, 2, 3)))
+.setPartitionsPendingRevocation(mkAssignment(
+mkTopicAssignment(topicId2, 4, 5, 6)))
+.setPartitionsPendingAssignment(mkAssignment(
+mkTopicAssignment(topicId3, 7, 8, 9)))
+.build();
+
+assertEquals("member-id", member.memberId());
+assertEquals(10, member.memberEpoch());
+assertEquals(9, member.previousMemberEpoch());
+assertEquals(11, member.nextMemberEpoch());
+assertEquals("instance-id", member.instanceId());
+assertEquals("rack-id", member.rackId());
+assertEquals("client-id", member.clientId());
+assertEquals("hostname", member.clientHost());
+// Names are sorted.
+assertEquals(Arrays.asList("bar", "foo"), 
member.subscribedTopicNames());
+assertEquals("regex", member.subscribedTopicRegex());
+assertEquals("range", member.serverAssignorName().get());
+assertEquals(
+Collections.singletonList(
+new ClientAssignor(
+"assignor",
+(byte) 0,
+(byte) 0,
+(byte) 1,
+new VersionedMetadata(
+(byte) 1,
+ByteBuffer.allocate(0,
+member.clientAssignors());
+assertEquals(mkAssignment(mkTopicAssignment(topicId1, 1, 2, 3)), 
member.assignedPartitions());
+assertEquals(mkAssignment(mkTopicAssignment(topicId2, 4, 5, 6)), 
member.partitionsPendingRevocation());
+assertEquals(mkAssignment(mkTopicAssignment(topicId3, 7, 8, 9)), 
member.partitionsPendingAssignment());
+}
+
+@Test
+public void testEquals() {
+Uuid topicId1 = Uuid.randomUuid();

Review Comment:
   Yes I was referring to the context.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the 

[GitHub] [kafka] dajac commented on a diff in pull request #13637: KAFKA-14462; [10/N] Add TargetAssignmentBuilder

2023-04-28 Thread via GitHub


dajac commented on code in PR #13637:
URL: https://github.com/apache/kafka/pull/13637#discussion_r1180572223


##
group-coordinator/src/test/java/org/apache/kafka/coordinator/group/consumer/ConsumerGroupMemberTest.java:
##
@@ -0,0 +1,307 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.coordinator.group.consumer;
+
+import org.apache.kafka.common.Uuid;
+import 
org.apache.kafka.coordinator.group.generated.ConsumerGroupCurrentMemberAssignmentValue;
+import 
org.apache.kafka.coordinator.group.generated.ConsumerGroupMemberMetadataValue;
+import org.junit.jupiter.api.Test;
+
+import java.nio.ByteBuffer;
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.Optional;
+import java.util.OptionalInt;
+
+import static 
org.apache.kafka.coordinator.group.consumer.AssignmentTestUtil.mkAssignment;
+import static 
org.apache.kafka.coordinator.group.consumer.AssignmentTestUtil.mkTopicAssignment;
+import static org.junit.jupiter.api.Assertions.assertEquals;
+
+public class ConsumerGroupMemberTest {
+
+@Test
+public void testNewMember() {
+Uuid topicId1 = Uuid.randomUuid();
+Uuid topicId2 = Uuid.randomUuid();
+Uuid topicId3 = Uuid.randomUuid();
+
+ConsumerGroupMember member = new 
ConsumerGroupMember.Builder("member-id")
+.setMemberEpoch(10)
+.setPreviousMemberEpoch(9)
+.setNextMemberEpoch(11)
+.setInstanceId("instance-id")
+.setRackId("rack-id")
+.setRebalanceTimeoutMs(5000)
+.setClientId("client-id")
+.setClientHost("hostname")
+.setSubscribedTopicNames(Arrays.asList("foo", "bar"))
+.setSubscribedTopicRegex("regex")
+.setServerAssignorName("range")
+.setClientAssignors(Collections.singletonList(
+new ClientAssignor(
+"assignor",
+(byte) 0,
+(byte) 0,
+(byte) 1,
+new VersionedMetadata(
+(byte) 1,
+ByteBuffer.allocate(0)
+.setAssignedPartitions(mkAssignment(
+mkTopicAssignment(topicId1, 1, 2, 3)))
+.setPartitionsPendingRevocation(mkAssignment(
+mkTopicAssignment(topicId2, 4, 5, 6)))
+.setPartitionsPendingAssignment(mkAssignment(
+mkTopicAssignment(topicId3, 7, 8, 9)))
+.build();
+
+assertEquals("member-id", member.memberId());
+assertEquals(10, member.memberEpoch());
+assertEquals(9, member.previousMemberEpoch());
+assertEquals(11, member.nextMemberEpoch());
+assertEquals("instance-id", member.instanceId());
+assertEquals("rack-id", member.rackId());
+assertEquals("client-id", member.clientId());
+assertEquals("hostname", member.clientHost());
+// Names are sorted.
+assertEquals(Arrays.asList("bar", "foo"), 
member.subscribedTopicNames());
+assertEquals("regex", member.subscribedTopicRegex());
+assertEquals("range", member.serverAssignorName().get());
+assertEquals(
+Collections.singletonList(
+new ClientAssignor(
+"assignor",
+(byte) 0,
+(byte) 0,
+(byte) 1,
+new VersionedMetadata(
+(byte) 1,
+ByteBuffer.allocate(0,
+member.clientAssignors());
+assertEquals(mkAssignment(mkTopicAssignment(topicId1, 1, 2, 3)), 
member.assignedPartitions());
+assertEquals(mkAssignment(mkTopicAssignment(topicId2, 4, 5, 6)), 
member.partitionsPendingRevocation());
+assertEquals(mkAssignment(mkTopicAssignment(topicId3, 7, 8, 9)), 
member.partitionsPendingAssignment());
+}
+
+@Test
+public void testEquals() {
+Uuid topicId1 = Uuid.randomUuid();

Review Comment:
   What do you mean by static class? Are you referring to a static attribute 
for the context in the test class that is shared by all the tests? In this 
case, yes, they would collide.



-- 

[GitHub] [kafka] jolshan commented on a diff in pull request #13637: KAFKA-14462; [10/N] Add TargetAssignmentBuilder

2023-04-28 Thread via GitHub


jolshan commented on code in PR #13637:
URL: https://github.com/apache/kafka/pull/13637#discussion_r1180569515


##
group-coordinator/src/test/java/org/apache/kafka/coordinator/group/consumer/ConsumerGroupMemberTest.java:
##
@@ -0,0 +1,307 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.coordinator.group.consumer;
+
+import org.apache.kafka.common.Uuid;
+import 
org.apache.kafka.coordinator.group.generated.ConsumerGroupCurrentMemberAssignmentValue;
+import 
org.apache.kafka.coordinator.group.generated.ConsumerGroupMemberMetadataValue;
+import org.junit.jupiter.api.Test;
+
+import java.nio.ByteBuffer;
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.Optional;
+import java.util.OptionalInt;
+
+import static 
org.apache.kafka.coordinator.group.consumer.AssignmentTestUtil.mkAssignment;
+import static 
org.apache.kafka.coordinator.group.consumer.AssignmentTestUtil.mkTopicAssignment;
+import static org.junit.jupiter.api.Assertions.assertEquals;
+
+public class ConsumerGroupMemberTest {
+
+@Test
+public void testNewMember() {
+Uuid topicId1 = Uuid.randomUuid();
+Uuid topicId2 = Uuid.randomUuid();
+Uuid topicId3 = Uuid.randomUuid();
+
+ConsumerGroupMember member = new 
ConsumerGroupMember.Builder("member-id")
+.setMemberEpoch(10)
+.setPreviousMemberEpoch(9)
+.setNextMemberEpoch(11)
+.setInstanceId("instance-id")
+.setRackId("rack-id")
+.setRebalanceTimeoutMs(5000)
+.setClientId("client-id")
+.setClientHost("hostname")
+.setSubscribedTopicNames(Arrays.asList("foo", "bar"))
+.setSubscribedTopicRegex("regex")
+.setServerAssignorName("range")
+.setClientAssignors(Collections.singletonList(
+new ClientAssignor(
+"assignor",
+(byte) 0,
+(byte) 0,
+(byte) 1,
+new VersionedMetadata(
+(byte) 1,
+ByteBuffer.allocate(0)
+.setAssignedPartitions(mkAssignment(
+mkTopicAssignment(topicId1, 1, 2, 3)))
+.setPartitionsPendingRevocation(mkAssignment(
+mkTopicAssignment(topicId2, 4, 5, 6)))
+.setPartitionsPendingAssignment(mkAssignment(
+mkTopicAssignment(topicId3, 7, 8, 9)))
+.build();
+
+assertEquals("member-id", member.memberId());
+assertEquals(10, member.memberEpoch());
+assertEquals(9, member.previousMemberEpoch());
+assertEquals(11, member.nextMemberEpoch());
+assertEquals("instance-id", member.instanceId());
+assertEquals("rack-id", member.rackId());
+assertEquals("client-id", member.clientId());
+assertEquals("hostname", member.clientHost());
+// Names are sorted.
+assertEquals(Arrays.asList("bar", "foo"), 
member.subscribedTopicNames());
+assertEquals("regex", member.subscribedTopicRegex());
+assertEquals("range", member.serverAssignorName().get());
+assertEquals(
+Collections.singletonList(
+new ClientAssignor(
+"assignor",
+(byte) 0,
+(byte) 0,
+(byte) 1,
+new VersionedMetadata(
+(byte) 1,
+ByteBuffer.allocate(0,
+member.clientAssignors());
+assertEquals(mkAssignment(mkTopicAssignment(topicId1, 1, 2, 3)), 
member.assignedPartitions());
+assertEquals(mkAssignment(mkTopicAssignment(topicId2, 4, 5, 6)), 
member.partitionsPendingRevocation());
+assertEquals(mkAssignment(mkTopicAssignment(topicId3, 7, 8, 9)), 
member.partitionsPendingAssignment());
+}
+
+@Test
+public void testEquals() {
+Uuid topicId1 = Uuid.randomUuid();

Review Comment:
   If we had a static class and used the same IDs, wouldn't they collide?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and 

[GitHub] [kafka] dajac merged pull request #13618: MINOR: Fixing typos in the ConsumerCoordinators

2023-04-28 Thread via GitHub


dajac merged PR #13618:
URL: https://github.com/apache/kafka/pull/13618


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] philipnee commented on pull request #13550: KAFKA-14639: A single partition may be revoked and assign during a single round of rebalance

2023-04-28 Thread via GitHub


philipnee commented on PR #13550:
URL: https://github.com/apache/kafka/pull/13550#issuecomment-1527744080

   Thanks David, FWIW: I think we also want to back port it to 3.4, I'll raise 
a PR.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] vamossagar12 commented on pull request #13594: KAFKA-14913: Using ThreadUtils.shutdownExecutorServiceQuietly to close executors in Connect Runtime

2023-04-28 Thread via GitHub


vamossagar12 commented on PR #13594:
URL: https://github.com/apache/kafka/pull/13594#issuecomment-1527739912

   @urbandan , i took the liberty and modified the 
`shutdownExecutorServiceQuietly` to do a 2-phased shutdown as suggested in 
JavaDocs. Also, modified  `MemoryOffsetBackingStore, SourceTaskOffsetCommitter, 
Worker` to use `shutdownExecutorServiceQuietly` to shutdown their executors. 
@yashmayya , as suggested, I have also removed the ConnectException thrown from 
`MemoryOffsetBackingStore#stop`. This is ready for a re-review.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[jira] [Updated] (KAFKA-14949) Add Streams upgrade tests from AK 3.4

2023-04-28 Thread Matthias J. Sax (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-14949?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Matthias J. Sax updated KAFKA-14949:

Component/s: system tests

> Add Streams upgrade tests from AK 3.4
> -
>
> Key: KAFKA-14949
> URL: https://issues.apache.org/jira/browse/KAFKA-14949
> Project: Kafka
>  Issue Type: Task
>  Components: streams, system tests
>Reporter: Victoria Xia
>Priority: Critical
>
> Streams upgrade tests currently only test upgrading from 3.3 and earlier 
> versions 
> ([link|https://github.com/apache/kafka/blob/056657d84d84e116ffc9460872945b4d2b479ff3/tests/kafkatest/tests/streams/streams_application_upgrade_test.py#L30]).
>  We should add 3.4 as an "upgrade_from" version into these tests, in light of 
> the upcoming 3.5 release.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[GitHub] [kafka] mjsax commented on a diff in pull request #13633: KAFKA-14839: Exclude protected variable from JavaDocs

2023-04-28 Thread via GitHub


mjsax commented on code in PR #13633:
URL: https://github.com/apache/kafka/pull/13633#discussion_r1180546756


##
build.gradle:
##
@@ -2057,6 +2057,7 @@ project(':streams') {
   srcJar.dependsOn 'processMessages'
 
   javadoc {
+options.memberLevel = JavadocMemberLevel.PUBLIC  // Document only public 
members/API

Review Comment:
   \cc @ijuma WDYT?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] vamossagar12 commented on pull request #13646: KAFKA-14938: Fixing flaky test testConnectorBoundary

2023-04-28 Thread via GitHub


vamossagar12 commented on PR #13646:
URL: https://github.com/apache/kafka/pull/13646#issuecomment-1527703722

   Yeah I agree with @yashmayya . Moreover this 
   
   ```
   Yes it will fail, but consumeAll is not failing due to timeout here but 
rather due to its nature of storing the end offsets before consuming.
   ```
   
   is not entirely correct i think. I agree what gets thrown in an 
AssertionError but thats because the number of sourceRecords returned by 
`consumeAll` didn't meet the desired number of records within 60s. For 
starters, can you try increasing `CONSUME_RECORDS_TIMEOUT_MS` to 100s or as 
such and see if it even works? Basically, we need to check if consumer is 
lagging or are enough records being produced? I i think  it would mostly be the 
former because as Yash said, we are anyways waiting for 100 records to be 
committed. It's not an ideal fix but let's first see if it works and if needed 
we can dig deeper.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] bmscomp commented on a diff in pull request #13627: MINOR: simplify if else conditions in Uuid.compareTo method

2023-04-28 Thread via GitHub


bmscomp commented on code in PR #13627:
URL: https://github.com/apache/kafka/pull/13627#discussion_r1178371488


##
clients/src/main/java/org/apache/kafka/common/Uuid.java:
##
@@ -143,12 +143,6 @@ public int compareTo(Uuid other) {
 return 1;

Review Comment:
   @divijvaidya  Still there is another way to implement the `compareTo` 
method, and by the way `compare`  can be a pretty name also for the method, and 
it will keep the same style of the compare method name in `Long` 
   

  if (mostSignificantBits == other.mostSignificantBits)
   return Long.compare(leastSignificantBits, 
other.leastSignificantBits);
   return Long.compare(mostSignificantBits, other.mostSignificantBits);
   



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] dajac commented on pull request #13637: KAFKA-14462; [10/N] Add TargetAssignmentBuilder

2023-04-28 Thread via GitHub


dajac commented on PR #13637:
URL: https://github.com/apache/kafka/pull/13637#issuecomment-1527632827

   @jolshan @rreddy-22 @jeffkbkim Thanks for your comments. I have addressed 
all of them.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] dajac commented on a diff in pull request #13637: KAFKA-14462; [10/N] Add TargetAssignmentBuilder

2023-04-28 Thread via GitHub


dajac commented on code in PR #13637:
URL: https://github.com/apache/kafka/pull/13637#discussion_r1180463243


##
group-coordinator/src/test/java/org/apache/kafka/coordinator/group/consumer/ConsumerGroupMemberTest.java:
##
@@ -0,0 +1,307 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.coordinator.group.consumer;
+
+import org.apache.kafka.common.Uuid;
+import 
org.apache.kafka.coordinator.group.generated.ConsumerGroupCurrentMemberAssignmentValue;
+import 
org.apache.kafka.coordinator.group.generated.ConsumerGroupMemberMetadataValue;
+import org.junit.jupiter.api.Test;
+
+import java.nio.ByteBuffer;
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.Optional;
+import java.util.OptionalInt;
+
+import static 
org.apache.kafka.coordinator.group.consumer.AssignmentTestUtil.mkAssignment;
+import static 
org.apache.kafka.coordinator.group.consumer.AssignmentTestUtil.mkTopicAssignment;
+import static org.junit.jupiter.api.Assertions.assertEquals;
+
+public class ConsumerGroupMemberTest {
+
+@Test
+public void testNewMember() {
+Uuid topicId1 = Uuid.randomUuid();
+Uuid topicId2 = Uuid.randomUuid();
+Uuid topicId3 = Uuid.randomUuid();
+
+ConsumerGroupMember member = new 
ConsumerGroupMember.Builder("member-id")
+.setMemberEpoch(10)
+.setPreviousMemberEpoch(9)
+.setNextMemberEpoch(11)
+.setInstanceId("instance-id")
+.setRackId("rack-id")
+.setRebalanceTimeoutMs(5000)
+.setClientId("client-id")
+.setClientHost("hostname")
+.setSubscribedTopicNames(Arrays.asList("foo", "bar"))
+.setSubscribedTopicRegex("regex")
+.setServerAssignorName("range")
+.setClientAssignors(Collections.singletonList(
+new ClientAssignor(
+"assignor",
+(byte) 0,
+(byte) 0,
+(byte) 1,
+new VersionedMetadata(
+(byte) 1,
+ByteBuffer.allocate(0)
+.setAssignedPartitions(mkAssignment(
+mkTopicAssignment(topicId1, 1, 2, 3)))
+.setPartitionsPendingRevocation(mkAssignment(
+mkTopicAssignment(topicId2, 4, 5, 6)))
+.setPartitionsPendingAssignment(mkAssignment(
+mkTopicAssignment(topicId3, 7, 8, 9)))
+.build();
+
+assertEquals("member-id", member.memberId());
+assertEquals(10, member.memberEpoch());
+assertEquals(9, member.previousMemberEpoch());
+assertEquals(11, member.nextMemberEpoch());
+assertEquals("instance-id", member.instanceId());
+assertEquals("rack-id", member.rackId());
+assertEquals("client-id", member.clientId());
+assertEquals("hostname", member.clientHost());
+// Names are sorted.
+assertEquals(Arrays.asList("bar", "foo"), 
member.subscribedTopicNames());
+assertEquals("regex", member.subscribedTopicRegex());
+assertEquals("range", member.serverAssignorName().get());
+assertEquals(
+Collections.singletonList(
+new ClientAssignor(
+"assignor",
+(byte) 0,
+(byte) 0,
+(byte) 1,
+new VersionedMetadata(
+(byte) 1,
+ByteBuffer.allocate(0,
+member.clientAssignors());
+assertEquals(mkAssignment(mkTopicAssignment(topicId1, 1, 2, 3)), 
member.assignedPartitions());
+assertEquals(mkAssignment(mkTopicAssignment(topicId2, 4, 5, 6)), 
member.partitionsPendingRevocation());
+assertEquals(mkAssignment(mkTopicAssignment(topicId3, 7, 8, 9)), 
member.partitionsPendingAssignment());
+}
+
+@Test
+public void testEquals() {
+Uuid topicId1 = Uuid.randomUuid();

Review Comment:
   I have change it a bit. Let me know what you think.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to 

[GitHub] [kafka] dajac commented on a diff in pull request #13637: KAFKA-14462; [10/N] Add TargetAssignmentBuilder

2023-04-28 Thread via GitHub


dajac commented on code in PR #13637:
URL: https://github.com/apache/kafka/pull/13637#discussion_r1180451832


##
group-coordinator/src/test/java/org/apache/kafka/coordinator/group/consumer/TargetAssignmentBuilderTest.java:
##
@@ -0,0 +1,688 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.coordinator.group.consumer;
+
+import org.apache.kafka.common.Uuid;
+import org.apache.kafka.coordinator.group.assignor.AssignmentMemberSpec;
+import org.apache.kafka.coordinator.group.assignor.AssignmentSpec;
+import org.apache.kafka.coordinator.group.assignor.AssignmentTopicMetadata;
+import org.apache.kafka.coordinator.group.assignor.GroupAssignment;
+import org.apache.kafka.coordinator.group.assignor.PartitionAssignor;
+import org.junit.jupiter.api.Test;
+
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.Optional;
+import java.util.Set;
+
+import static 
org.apache.kafka.coordinator.group.consumer.AssignmentTestUtil.mkAssignment;
+import static 
org.apache.kafka.coordinator.group.consumer.AssignmentTestUtil.mkTopicAssignment;
+import static 
org.apache.kafka.coordinator.group.RecordHelpers.newTargetAssignmentEpochRecord;
+import static 
org.apache.kafka.coordinator.group.RecordHelpers.newTargetAssignmentRecord;
+import static org.junit.jupiter.api.Assertions.assertEquals;
+import static org.mockito.ArgumentMatchers.any;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.times;
+import static org.mockito.Mockito.verify;
+import static org.mockito.Mockito.when;
+
+public class TargetAssignmentBuilderTest {
+
+public static class TargetAssignmentBuilderTestContext {

Review Comment:
   To your first comment: It is hard to do simpler than that. The alternative 
would be to manually prepare all the data in each test. I did this actually 
before extracting all the common logic into the context in order to make the 
test simpler.
   
   To your second comment: I have added more comments.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] dajac commented on a diff in pull request #13637: KAFKA-14462; [10/N] Add TargetAssignmentBuilder

2023-04-28 Thread via GitHub


dajac commented on code in PR #13637:
URL: https://github.com/apache/kafka/pull/13637#discussion_r1180427210


##
group-coordinator/src/test/java/org/apache/kafka/coordinator/group/consumer/TargetAssignmentBuilderTest.java:
##
@@ -0,0 +1,688 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.coordinator.group.consumer;
+
+import org.apache.kafka.common.Uuid;
+import org.apache.kafka.coordinator.group.assignor.AssignmentMemberSpec;
+import org.apache.kafka.coordinator.group.assignor.AssignmentSpec;
+import org.apache.kafka.coordinator.group.assignor.AssignmentTopicMetadata;
+import org.apache.kafka.coordinator.group.assignor.GroupAssignment;
+import org.apache.kafka.coordinator.group.assignor.PartitionAssignor;
+import org.junit.jupiter.api.Test;
+
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.Optional;
+import java.util.Set;
+
+import static 
org.apache.kafka.coordinator.group.consumer.AssignmentTestUtil.mkAssignment;
+import static 
org.apache.kafka.coordinator.group.consumer.AssignmentTestUtil.mkTopicAssignment;
+import static 
org.apache.kafka.coordinator.group.RecordHelpers.newTargetAssignmentEpochRecord;
+import static 
org.apache.kafka.coordinator.group.RecordHelpers.newTargetAssignmentRecord;
+import static org.junit.jupiter.api.Assertions.assertEquals;
+import static org.mockito.ArgumentMatchers.any;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.times;
+import static org.mockito.Mockito.verify;
+import static org.mockito.Mockito.when;
+
+public class TargetAssignmentBuilderTest {
+
+public static class TargetAssignmentBuilderTestContext {
+private final String groupId;
+private final int groupEpoch;

Review Comment:
   I don't understand your comment. The context is instantiated in each test...



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] dajac commented on a diff in pull request #13637: KAFKA-14462; [10/N] Add TargetAssignmentBuilder

2023-04-28 Thread via GitHub


dajac commented on code in PR #13637:
URL: https://github.com/apache/kafka/pull/13637#discussion_r1180425612


##
group-coordinator/src/test/java/org/apache/kafka/coordinator/group/consumer/TargetAssignmentBuilderTest.java:
##
@@ -0,0 +1,688 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.coordinator.group.consumer;
+
+import org.apache.kafka.common.Uuid;
+import org.apache.kafka.coordinator.group.assignor.AssignmentMemberSpec;
+import org.apache.kafka.coordinator.group.assignor.AssignmentSpec;
+import org.apache.kafka.coordinator.group.assignor.AssignmentTopicMetadata;
+import org.apache.kafka.coordinator.group.assignor.GroupAssignment;
+import org.apache.kafka.coordinator.group.assignor.PartitionAssignor;
+import org.junit.jupiter.api.Test;
+
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.Optional;
+import java.util.Set;
+
+import static 
org.apache.kafka.coordinator.group.consumer.AssignmentTestUtil.mkAssignment;
+import static 
org.apache.kafka.coordinator.group.consumer.AssignmentTestUtil.mkTopicAssignment;
+import static 
org.apache.kafka.coordinator.group.RecordHelpers.newTargetAssignmentEpochRecord;
+import static 
org.apache.kafka.coordinator.group.RecordHelpers.newTargetAssignmentRecord;
+import static org.junit.jupiter.api.Assertions.assertEquals;
+import static org.mockito.ArgumentMatchers.any;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.times;
+import static org.mockito.Mockito.verify;
+import static org.mockito.Mockito.when;
+
+public class TargetAssignmentBuilderTest {
+
+public static class TargetAssignmentBuilderTestContext {
+private final String groupId;
+private final int groupEpoch;
+private final PartitionAssignor assignor = 
mock(PartitionAssignor.class);
+private final Map members = new 
HashMap<>();
+private final Map subscriptionMetadata = new 
HashMap<>();
+private final Map updatedMembers = new 
HashMap<>();
+private final Map targetAssignments = new 
HashMap<>();
+private final Map assignments = new 
HashMap<>();
+
+public TargetAssignmentBuilderTestContext(
+String groupId,
+int groupEpoch
+) {
+this.groupId = groupId;
+this.groupEpoch = groupEpoch;
+}
+
+public TargetAssignmentBuilderTestContext addGroupMember(
+String memberId,
+List subscriptions,
+Map> targetPartitions
+) {
+members.put(memberId, new ConsumerGroupMember.Builder(memberId)
+.setSubscribedTopicNames(subscriptions)
+.setRebalanceTimeoutMs(5000)
+.build());
+
+targetAssignments.put(memberId, new Assignment(
+(byte) 0,
+targetPartitions,
+VersionedMetadata.EMPTY
+));
+
+return this;
+}
+
+public TargetAssignmentBuilderTestContext addTopicMetadata(
+Uuid topicId,
+String topicName,
+int numPartitions
+) {
+subscriptionMetadata.put(topicName, new TopicMetadata(
+topicId,
+topicName,
+numPartitions
+));
+return this;
+}
+
+public TargetAssignmentBuilderTestContext updateMemberSubscription(
+String memberId,
+List subscriptions
+) {
+return updateMemberSubscription(
+memberId,
+subscriptions,
+Optional.empty(),
+Optional.empty()
+);
+}
+
+public TargetAssignmentBuilderTestContext updateMemberSubscription(
+String memberId,
+List subscriptions,
+Optional instanceId,
+Optional rackId
+) {
+ConsumerGroupMember existingMember = members.get(memberId);
+ConsumerGroupMember.Builder builder;
+if (existingMember != null) {
+builder = new 

[jira] [Assigned] (KAFKA-13187) Replace EasyMock and PowerMock with Mockito for DistributedHerderTest

2023-04-28 Thread Yash Mayya (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-13187?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Yash Mayya reassigned KAFKA-13187:
--

Assignee: Yash Mayya  (was: Matthew de Detrich)

> Replace EasyMock and PowerMock with Mockito for DistributedHerderTest
> -
>
> Key: KAFKA-13187
> URL: https://issues.apache.org/jira/browse/KAFKA-13187
> Project: Kafka
>  Issue Type: Sub-task
>Reporter: YI-CHEN WANG
>Assignee: Yash Mayya
>Priority: Major
>




--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[GitHub] [kafka] yashmayya commented on pull request #11792: Replace EasyMock/PowerMock with Mockito in DistributedHerderTest

2023-04-28 Thread via GitHub


yashmayya commented on PR #11792:
URL: https://github.com/apache/kafka/pull/11792#issuecomment-1527579890

   @mdedetrich no problem, I've assigned 
https://issues.apache.org/jira/browse/KAFKA-13187 to myself and I'll start 
working on it next week.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] dajac commented on a diff in pull request #13637: KAFKA-14462; [10/N] Add TargetAssignmentBuilder

2023-04-28 Thread via GitHub


dajac commented on code in PR #13637:
URL: https://github.com/apache/kafka/pull/13637#discussion_r1180410020


##
group-coordinator/src/main/java/org/apache/kafka/coordinator/group/consumer/TargetAssignmentBuilder.java:
##
@@ -0,0 +1,327 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.coordinator.group.consumer;
+
+import org.apache.kafka.common.Uuid;
+import org.apache.kafka.coordinator.group.Record;
+import org.apache.kafka.coordinator.group.assignor.AssignmentMemberSpec;
+import org.apache.kafka.coordinator.group.assignor.AssignmentSpec;
+import org.apache.kafka.coordinator.group.assignor.AssignmentTopicMetadata;
+import org.apache.kafka.coordinator.group.assignor.GroupAssignment;
+import org.apache.kafka.coordinator.group.assignor.MemberAssignment;
+import org.apache.kafka.coordinator.group.assignor.PartitionAssignor;
+import org.apache.kafka.coordinator.group.assignor.PartitionAssignorException;
+
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.Objects;
+import java.util.Optional;
+import java.util.Set;
+
+import static 
org.apache.kafka.coordinator.group.RecordHelpers.newTargetAssignmentEpochRecord;
+import static 
org.apache.kafka.coordinator.group.RecordHelpers.newTargetAssignmentRecord;
+
+/**
+ * Build a new Target Assignment based on the provided parameters. As a result,
+ * it yields the records that must be persisted to the log and the new member
+ * assignments as a map.
+ *
+ * Records are only created for members which have a new target assignment. If
+ * their assignment did not change, no new record is needed.
+ *
+ * When a member is deleted, it is assumed that its target assignment record
+ * is deleted as part of the member deletion process. In other words, this 
class
+ * does not yield a tombstone for removed members.
+ */
+public class TargetAssignmentBuilder {
+/**
+ * The assignment result returned by {{@link 
TargetAssignmentBuilder#build()}}.
+ */
+public static class TargetAssignmentResult {
+/**
+ * The records that must be applied to the __consumer_offsets
+ * topics to persist the new target assignment.
+ */
+private final List records;
+
+/**
+ * The new target assignment for all members.
+ */
+private final Map assignments;
+
+TargetAssignmentResult(
+List records,
+Map assignments
+) {
+Objects.requireNonNull(records);
+Objects.requireNonNull(assignments);
+this.records = records;
+this.assignments = assignments;
+}
+
+/**
+ * @return The records.
+ */
+public List records() {
+return records;
+}
+
+/**
+ * @return The assignments.
+ */
+public Map assignments() {
+return assignments;
+}
+}
+
+/**
+ * The group id.
+ */
+private final String groupId;
+
+/**
+ * The group epoch.
+ */
+private final int groupEpoch;
+
+/**
+ * The partition assignor used to compute the assignment.
+ */
+private final PartitionAssignor assignor;
+
+/**
+ * The members in the group.
+ */
+private Map members = Collections.emptyMap();
+
+/**
+ * The subscription metadata.
+ */
+private Map subscriptionMetadata = 
Collections.emptyMap();
+
+/**
+ * The current target assignment.
+ */
+private Map assignments = Collections.emptyMap();
+
+/**
+ * The members which have been updated or deleted. Deleted members
+ * are signaled by a null value.
+ */
+private final Map updatedMembers = new 
HashMap<>();
+
+/**
+ * Constructs the object.
+ *
+ * @param groupId   The group id.
+ * @param groupEpochThe group epoch to compute a target assignment for.
+ * @param assignor  The assignor to use to compute the target 
assignment.
+ */
+public TargetAssignmentBuilder(
+String groupId,
+int groupEpoch,
+

[GitHub] [kafka] dajac commented on a diff in pull request #13637: KAFKA-14462; [10/N] Add TargetAssignmentBuilder

2023-04-28 Thread via GitHub


dajac commented on code in PR #13637:
URL: https://github.com/apache/kafka/pull/13637#discussion_r1180408510


##
group-coordinator/src/main/java/org/apache/kafka/coordinator/group/consumer/TargetAssignmentBuilder.java:
##
@@ -0,0 +1,327 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.coordinator.group.consumer;
+
+import org.apache.kafka.common.Uuid;
+import org.apache.kafka.coordinator.group.Record;
+import org.apache.kafka.coordinator.group.assignor.AssignmentMemberSpec;
+import org.apache.kafka.coordinator.group.assignor.AssignmentSpec;
+import org.apache.kafka.coordinator.group.assignor.AssignmentTopicMetadata;
+import org.apache.kafka.coordinator.group.assignor.GroupAssignment;
+import org.apache.kafka.coordinator.group.assignor.MemberAssignment;
+import org.apache.kafka.coordinator.group.assignor.PartitionAssignor;
+import org.apache.kafka.coordinator.group.assignor.PartitionAssignorException;
+
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.Objects;
+import java.util.Optional;
+import java.util.Set;
+
+import static 
org.apache.kafka.coordinator.group.RecordHelpers.newTargetAssignmentEpochRecord;
+import static 
org.apache.kafka.coordinator.group.RecordHelpers.newTargetAssignmentRecord;
+
+/**
+ * Build a new Target Assignment based on the provided parameters. As a result,
+ * it yields the records that must be persisted to the log and the new member
+ * assignments as a map.
+ *
+ * Records are only created for members which have a new target assignment. If
+ * their assignment did not change, no new record is needed.
+ *
+ * When a member is deleted, it is assumed that its target assignment record
+ * is deleted as part of the member deletion process. In other words, this 
class
+ * does not yield a tombstone for removed members.
+ */
+public class TargetAssignmentBuilder {
+/**
+ * The assignment result returned by {{@link 
TargetAssignmentBuilder#build()}}.
+ */
+public static class TargetAssignmentResult {
+/**
+ * The records that must be applied to the __consumer_offsets
+ * topics to persist the new target assignment.
+ */
+private final List records;
+
+/**
+ * The new target assignment for all members.
+ */
+private final Map assignments;
+
+TargetAssignmentResult(
+List records,
+Map assignments
+) {
+Objects.requireNonNull(records);
+Objects.requireNonNull(assignments);
+this.records = records;
+this.assignments = assignments;
+}
+
+/**
+ * @return The records.
+ */
+public List records() {
+return records;
+}
+
+/**
+ * @return The assignments.
+ */
+public Map assignments() {
+return assignments;
+}
+}
+
+/**
+ * The group id.
+ */
+private final String groupId;
+
+/**
+ * The group epoch.
+ */
+private final int groupEpoch;
+
+/**
+ * The partition assignor used to compute the assignment.
+ */
+private final PartitionAssignor assignor;
+
+/**
+ * The members in the group.
+ */
+private Map members = Collections.emptyMap();
+
+/**
+ * The subscription metadata.
+ */
+private Map subscriptionMetadata = 
Collections.emptyMap();
+
+/**
+ * The current target assignment.
+ */
+private Map assignments = Collections.emptyMap();
+
+/**
+ * The members which have been updated or deleted. Deleted members
+ * are signaled by a null value.
+ */
+private final Map updatedMembers = new 
HashMap<>();
+
+/**
+ * Constructs the object.
+ *
+ * @param groupId   The group id.
+ * @param groupEpochThe group epoch to compute a target assignment for.
+ * @param assignor  The assignor to use to compute the target 
assignment.
+ */
+public TargetAssignmentBuilder(
+String groupId,
+int groupEpoch,
+

[GitHub] [kafka] dajac commented on a diff in pull request #13637: KAFKA-14462; [10/N] Add TargetAssignmentBuilder

2023-04-28 Thread via GitHub


dajac commented on code in PR #13637:
URL: https://github.com/apache/kafka/pull/13637#discussion_r1180406549


##
group-coordinator/src/main/java/org/apache/kafka/coordinator/group/consumer/TargetAssignmentBuilder.java:
##
@@ -0,0 +1,327 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.coordinator.group.consumer;
+
+import org.apache.kafka.common.Uuid;
+import org.apache.kafka.coordinator.group.Record;
+import org.apache.kafka.coordinator.group.assignor.AssignmentMemberSpec;
+import org.apache.kafka.coordinator.group.assignor.AssignmentSpec;
+import org.apache.kafka.coordinator.group.assignor.AssignmentTopicMetadata;
+import org.apache.kafka.coordinator.group.assignor.GroupAssignment;
+import org.apache.kafka.coordinator.group.assignor.MemberAssignment;
+import org.apache.kafka.coordinator.group.assignor.PartitionAssignor;
+import org.apache.kafka.coordinator.group.assignor.PartitionAssignorException;
+
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.Objects;
+import java.util.Optional;
+import java.util.Set;
+
+import static 
org.apache.kafka.coordinator.group.RecordHelpers.newTargetAssignmentEpochRecord;
+import static 
org.apache.kafka.coordinator.group.RecordHelpers.newTargetAssignmentRecord;
+
+/**
+ * Build a new Target Assignment based on the provided parameters. As a result,
+ * it yields the records that must be persisted to the log and the new member
+ * assignments as a map.
+ *
+ * Records are only created for members which have a new target assignment. If
+ * their assignment did not change, no new record is needed.
+ *
+ * When a member is deleted, it is assumed that its target assignment record
+ * is deleted as part of the member deletion process. In other words, this 
class
+ * does not yield a tombstone for removed members.
+ */
+public class TargetAssignmentBuilder {
+/**
+ * The assignment result returned by {{@link 
TargetAssignmentBuilder#build()}}.
+ */
+public static class TargetAssignmentResult {
+/**
+ * The records that must be applied to the __consumer_offsets
+ * topics to persist the new target assignment.
+ */
+private final List records;
+
+/**
+ * The new target assignment for all members.
+ */
+private final Map assignments;
+
+TargetAssignmentResult(
+List records,
+Map assignments
+) {
+Objects.requireNonNull(records);
+Objects.requireNonNull(assignments);
+this.records = records;
+this.assignments = assignments;
+}
+
+/**
+ * @return The records.
+ */
+public List records() {
+return records;
+}
+
+/**
+ * @return The assignments.
+ */
+public Map assignments() {
+return assignments;
+}
+}
+
+/**
+ * The group id.
+ */
+private final String groupId;
+
+/**
+ * The group epoch.
+ */
+private final int groupEpoch;
+
+/**
+ * The partition assignor used to compute the assignment.
+ */
+private final PartitionAssignor assignor;
+
+/**
+ * The members in the group.
+ */
+private Map members = Collections.emptyMap();
+
+/**
+ * The subscription metadata.
+ */
+private Map subscriptionMetadata = 
Collections.emptyMap();
+
+/**
+ * The current target assignment.
+ */
+private Map assignments = Collections.emptyMap();
+
+/**
+ * The members which have been updated or deleted. Deleted members
+ * are signaled by a null value.
+ */
+private final Map updatedMembers = new 
HashMap<>();
+
+/**
+ * Constructs the object.
+ *
+ * @param groupId   The group id.
+ * @param groupEpochThe group epoch to compute a target assignment for.
+ * @param assignor  The assignor to use to compute the target 
assignment.
+ */
+public TargetAssignmentBuilder(
+String groupId,
+int groupEpoch,
+

[GitHub] [kafka] dajac commented on a diff in pull request #13637: KAFKA-14462; [10/N] Add TargetAssignmentBuilder

2023-04-28 Thread via GitHub


dajac commented on code in PR #13637:
URL: https://github.com/apache/kafka/pull/13637#discussion_r1180405622


##
group-coordinator/src/main/java/org/apache/kafka/coordinator/group/consumer/TargetAssignmentBuilder.java:
##
@@ -0,0 +1,327 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.coordinator.group.consumer;
+
+import org.apache.kafka.common.Uuid;
+import org.apache.kafka.coordinator.group.Record;
+import org.apache.kafka.coordinator.group.assignor.AssignmentMemberSpec;
+import org.apache.kafka.coordinator.group.assignor.AssignmentSpec;
+import org.apache.kafka.coordinator.group.assignor.AssignmentTopicMetadata;
+import org.apache.kafka.coordinator.group.assignor.GroupAssignment;
+import org.apache.kafka.coordinator.group.assignor.MemberAssignment;
+import org.apache.kafka.coordinator.group.assignor.PartitionAssignor;
+import org.apache.kafka.coordinator.group.assignor.PartitionAssignorException;
+
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.Objects;
+import java.util.Optional;
+import java.util.Set;
+
+import static 
org.apache.kafka.coordinator.group.RecordHelpers.newTargetAssignmentEpochRecord;
+import static 
org.apache.kafka.coordinator.group.RecordHelpers.newTargetAssignmentRecord;
+
+/**
+ * Build a new Target Assignment based on the provided parameters. As a result,
+ * it yields the records that must be persisted to the log and the new member
+ * assignments as a map.
+ *
+ * Records are only created for members which have a new target assignment. If
+ * their assignment did not change, no new record is needed.
+ *
+ * When a member is deleted, it is assumed that its target assignment record
+ * is deleted as part of the member deletion process. In other words, this 
class
+ * does not yield a tombstone for removed members.

Review Comment:
   As the first sentence says: `When a member is deleted, it is assumed that 
its target assignment record is deleted as part of the member deletion process.`



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] dajac commented on a diff in pull request #13637: KAFKA-14462; [10/N] Add TargetAssignmentBuilder

2023-04-28 Thread via GitHub


dajac commented on code in PR #13637:
URL: https://github.com/apache/kafka/pull/13637#discussion_r1180404680


##
group-coordinator/src/test/java/org/apache/kafka/coordinator/group/consumer/ConsumerGroupMemberTest.java:
##
@@ -0,0 +1,307 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.coordinator.group.consumer;
+
+import org.apache.kafka.common.Uuid;
+import 
org.apache.kafka.coordinator.group.generated.ConsumerGroupCurrentMemberAssignmentValue;
+import 
org.apache.kafka.coordinator.group.generated.ConsumerGroupMemberMetadataValue;
+import org.junit.jupiter.api.Test;
+
+import java.nio.ByteBuffer;
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.Optional;
+import java.util.OptionalInt;
+
+import static 
org.apache.kafka.coordinator.group.consumer.AssignmentTestUtil.mkAssignment;
+import static 
org.apache.kafka.coordinator.group.consumer.AssignmentTestUtil.mkTopicAssignment;
+import static org.junit.jupiter.api.Assertions.assertEquals;
+
+public class ConsumerGroupMemberTest {
+
+@Test
+public void testNewMember() {
+Uuid topicId1 = Uuid.randomUuid();
+Uuid topicId2 = Uuid.randomUuid();
+Uuid topicId3 = Uuid.randomUuid();
+
+ConsumerGroupMember member = new 
ConsumerGroupMember.Builder("member-id")
+.setMemberEpoch(10)
+.setPreviousMemberEpoch(9)
+.setNextMemberEpoch(11)
+.setInstanceId("instance-id")
+.setRackId("rack-id")
+.setRebalanceTimeoutMs(5000)
+.setClientId("client-id")
+.setClientHost("hostname")
+.setSubscribedTopicNames(Arrays.asList("foo", "bar"))
+.setSubscribedTopicRegex("regex")
+.setServerAssignorName("range")
+.setClientAssignors(Collections.singletonList(
+new ClientAssignor(
+"assignor",
+(byte) 0,
+(byte) 0,
+(byte) 1,
+new VersionedMetadata(
+(byte) 1,
+ByteBuffer.allocate(0)
+.setAssignedPartitions(mkAssignment(
+mkTopicAssignment(topicId1, 1, 2, 3)))
+.setPartitionsPendingRevocation(mkAssignment(
+mkTopicAssignment(topicId2, 4, 5, 6)))
+.setPartitionsPendingAssignment(mkAssignment(
+mkTopicAssignment(topicId3, 7, 8, 9)))
+.build();
+
+assertEquals("member-id", member.memberId());
+assertEquals(10, member.memberEpoch());
+assertEquals(9, member.previousMemberEpoch());
+assertEquals(11, member.nextMemberEpoch());
+assertEquals("instance-id", member.instanceId());
+assertEquals("rack-id", member.rackId());
+assertEquals("client-id", member.clientId());
+assertEquals("hostname", member.clientHost());
+// Names are sorted.
+assertEquals(Arrays.asList("bar", "foo"), 
member.subscribedTopicNames());
+assertEquals("regex", member.subscribedTopicRegex());
+assertEquals("range", member.serverAssignorName().get());
+assertEquals(
+Collections.singletonList(
+new ClientAssignor(
+"assignor",
+(byte) 0,
+(byte) 0,
+(byte) 1,
+new VersionedMetadata(
+(byte) 1,
+ByteBuffer.allocate(0,
+member.clientAssignors());
+assertEquals(mkAssignment(mkTopicAssignment(topicId1, 1, 2, 3)), 
member.assignedPartitions());
+assertEquals(mkAssignment(mkTopicAssignment(topicId2, 4, 5, 6)), 
member.partitionsPendingRevocation());
+assertEquals(mkAssignment(mkTopicAssignment(topicId3, 7, 8, 9)), 
member.partitionsPendingAssignment());
+}
+
+@Test
+public void testEquals() {
+Uuid topicId1 = Uuid.randomUuid();

Review Comment:
   I don't get the relation with the static class.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go 

[jira] [Created] (KAFKA-14951) Changing the cleanup.policy on an existing topic from "delete" to "compact" seems to change it to "compact,delete" even though it says "compact"

2023-04-28 Thread Tomasz Kaszuba (Jira)
Tomasz Kaszuba created KAFKA-14951:
--

 Summary: Changing the cleanup.policy on an existing topic from 
"delete" to "compact" seems to change it to "compact,delete" even though it 
says "compact"
 Key: KAFKA-14951
 URL: https://issues.apache.org/jira/browse/KAFKA-14951
 Project: Kafka
  Issue Type: Bug
Affects Versions: 2.8.2
Reporter: Tomasz Kaszuba


We've seemed to have noticed something strange when we change the 
cleanup.policy of an existing topic from "delete" to "compact". It seems that 
even though the policy is set to "compact" the retention.ms is not ignored and 
the topic is still cleaned up when the retention time is hit, suggesting that 
the actual policy is "compact, delete" and not "compact". 

If this is the case could the metadata return "compact, delete" instead of 
"compact" when describing a topic?

I have not found any information if this is normal behavior but I noticed that 
this kind of change is blocked in the confluent cloud so perhaps this is a 
known issue.

[https://docs.confluent.io/cloud/current/clusters/broker-config.html#custom-topic-settings-for-all-cluster-types]

_You cannot change the {{cleanup.policy}} from {{delete}} to {{compact}} after 
a topic has been created_



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Commented] (KAFKA-14947) Duplicate records are getting created in the topic.

2023-04-28 Thread krishnendu Das (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-14947?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17717662#comment-17717662
 ] 

krishnendu Das commented on KAFKA-14947:


[~yash.mayya] Thanks for your comment. I have two points based on your 
comments/suggestion. 

With our existing connect API code and with the Kafka server (2.6.2), our 
ingestion mechanism was working fine in the live environment. We checked the 
Kafka server (2.6.2) WorkerSourceTask::execute() method, and it was following 
the below-mentioned execution path
 # Poll the task for the new data.
 # If get any data, save the new data into the Kafka topic.
 # Commit the offset.

Still today we are able to ingest huge volumes of data in the live env using 
the Kafka server version 2.6.2. No data duplication is happening.

 

Now we have to upgrade the Kafka version to 3.1.1.  The 
WorkerSourceTask::execute() method in the Kafka server version v3.1.1, it was 
following the below-mentioned execution path
 # update the committableOffsets variable (if any offset commit is pending) 
which is being shared between execute() and commitoffset() functions.
 # Poll the task for the new data.
 # If got any data, save the new data into the Kafka topic.

Because of the above-mentioned implementation order, the shared variable 
committableOffsets get set with the latest offset value in the 2nd poll only 
and will be committed in the next call of WorkerSourceTask::commitOffsets(). 

So, in the 2nd poll what data u are reading that you already read in the 1st 
poll itself, wrote to the topic, and commit offset will happen in some time.  
Because of this execution flow of Kafka server execute(), we are getting 
duplicate data into the topics. 

And

As you had mentioned "In your provided scenario, why can't the connector simply 
read from its previous position in the second poll since it should be 
maintaining some internal state?"...we can store the per file current offset 
stage in-memory object. But that willn't be persistent. At every start, the 
object will be reset. Any suggestion, on how we can make it persistent in the 
new Kafka server (3.1.1) 

 

> Duplicate records are getting created in the topic. 
> 
>
> Key: KAFKA-14947
> URL: https://issues.apache.org/jira/browse/KAFKA-14947
> Project: Kafka
>  Issue Type: Bug
>  Components: producer 
>Affects Versions: 3.1.1
>Reporter: krishnendu Das
>Priority: Blocker
> Attachments: Kafka_server_3.1.1_data_duplication_issue_log
>
>
> We are using Kafka connect API (version 2.3.0) and Kafka  (version 3.1.1) for 
> data ingestion purposes. Previously we were using Kafka (version 2.6.2) and 
> the same Kafka connect API (version 2.3.0). The data ingestion was happening 
> properly. 
>  
> Recently we updated the Kafka version from 2.6.2 to 3.1.1.
> Post update we are facing duplicate data issues from the source connector 
> into the Kafka topic. After debugging the 3.1.1 code, we saw one new function
> {*}updateCommittableOffsets{*}() got added and called inside the 
> {*}WorkerSourceTask::execute{*}() as part of bug fix --"KAFKA-12226: Commit 
> source task offsets without blocking on batch delivery (#11323)"
>  
> Now because of this function, we are observing this scenario
>  # Inside the execute() at the start of the flow, the call goes to 
> updateCommittableOffsets() to check if anything was there to perform the 
> committed offset or not. As the first poll is still not yet happened, this 
> function didn't find anything for commit.
>  # Then Kafka connects API poll() method is called from the 
> WorkerSourceTask::execute(). *-> 1st poll*
>  # Kafka Connect API (using sleepy policy) reads one source file from the 
> Cloud source directory.
>  # Read the whole content of the file and send the result set Kafka server to 
> write to the Kafka topic.
>  # During the 2nd poll updateCommittableOffsets() found some offset to commit 
> and its updates a reference variable committableOffsets, which will be used 
> further by the WorkerSourceTask::commitOffsets() function to perform actual 
> commit offset.
>  # Then Kafka connects the API poll() method is called from the 
> *WorkerSourceTask::execute().* *-> 2nd poll*
>  # Kafka Connect API (using sleepy policy) reads the same source file again 
> from the start, as the offsetStrorageReader::offset() didn’t give the latest 
> offset.
>  # Read the whole content of the file and send the result set Kafka server to 
> write to the Kafka topic.---> These create duplicate data into the topic.
> 
> 
>  # WorkerSourceTask::commitOffsets() commits the offset.
> 
> 

[GitHub] [kafka] divijvaidya commented on a diff in pull request #13535: KAFKA-9579 Fetch implementation for records in the remote storage through a specific purgatory.

2023-04-28 Thread via GitHub


divijvaidya commented on code in PR #13535:
URL: https://github.com/apache/kafka/pull/13535#discussion_r1180144730


##
core/src/main/java/kafka/log/remote/RemoteLogManager.java:
##
@@ -600,25 +622,208 @@ public String toString() {
 }
 }
 
-long findHighestRemoteOffset(TopicIdPartition topicIdPartition) throws 
RemoteStorageException {
-Optional offset = Optional.empty();
-Optional maybeLog = 
fetchLog.apply(topicIdPartition.topicPartition());
-if (maybeLog.isPresent()) {
-UnifiedLog log = maybeLog.get();
-Option maybeLeaderEpochFileCache = 
log.leaderEpochCache();
-if (maybeLeaderEpochFileCache.isDefined()) {
-LeaderEpochFileCache cache = maybeLeaderEpochFileCache.get();
-OptionalInt epoch = cache.latestEpoch();
-while (!offset.isPresent() && epoch.isPresent()) {
-offset = 
remoteLogMetadataManager.highestOffsetForEpoch(topicIdPartition, 
epoch.getAsInt());
-epoch = cache.previousEpoch(epoch.getAsInt());
+public FetchDataInfo read(RemoteStorageFetchInfo remoteStorageFetchInfo) 
throws RemoteStorageException, IOException {
+int fetchMaxBytes = remoteStorageFetchInfo.fetchMaxBytes;
+TopicPartition tp = remoteStorageFetchInfo.topicPartition;
+FetchRequest.PartitionData fetchInfo = 
remoteStorageFetchInfo.fetchInfo;
+
+boolean includeAbortedTxns = remoteStorageFetchInfo.fetchIsolation == 
FetchIsolation.TXN_COMMITTED;
+
+long offset = fetchInfo.fetchOffset;
+int maxBytes = Math.min(fetchMaxBytes, fetchInfo.maxBytes);
+
+Optional logOptional = fetchLog.apply(tp);
+OptionalInt epoch = OptionalInt.empty();
+
+if (logOptional.isPresent()) {
+Option leaderEpochCache = 
logOptional.get().leaderEpochCache();
+if (leaderEpochCache.isDefined()) {
+epoch = leaderEpochCache.get().epochForOffset(offset);
+}
+}
+
+Optional rlsMetadataOptional = 
epoch.isPresent()
+? fetchRemoteLogSegmentMetadata(tp, epoch.getAsInt(), offset)
+: Optional.empty();
+
+if (!rlsMetadataOptional.isPresent()) {
+String epochStr = (epoch.isPresent()) ? 
Integer.toString(epoch.getAsInt()) : "NOT AVAILABLE";
+throw new OffsetOutOfRangeException("Received request for offset " 
+ offset + " for leader epoch "
++ epochStr + " and partition " + tp + " which does not 
exist in remote tier.");
+}
+
+RemoteLogSegmentMetadata remoteLogSegmentMetadata = 
rlsMetadataOptional.get();
+int startPos = lookupPositionForOffset(remoteLogSegmentMetadata, 
offset);
+InputStream remoteSegInputStream = null;
+try {
+// Search forward for the position of the last offset that is 
greater than or equal to the target offset
+remoteSegInputStream = 
remoteLogStorageManager.fetchLogSegment(remoteLogSegmentMetadata, startPos);
+RemoteLogInputStream remoteLogInputStream = new 
RemoteLogInputStream(remoteSegInputStream);

Review Comment:
   Perf comment: not blocking for this PR
   
   Note that `RemoteLogInputStream` performs a heap allocation here for each 
batch. This allocation will increase GC activity with the size of segment. 
Perhaps, we can use BufferSupplier (we allocate one pool per request handler 
thread, see `requestLocal.bufferSupplier` in KafkaAPI.scala) here in future.



##
core/src/main/java/kafka/log/remote/RemoteLogManager.java:
##
@@ -600,25 +622,208 @@ public String toString() {
 }
 }
 
-long findHighestRemoteOffset(TopicIdPartition topicIdPartition) throws 
RemoteStorageException {
-Optional offset = Optional.empty();
-Optional maybeLog = 
fetchLog.apply(topicIdPartition.topicPartition());
-if (maybeLog.isPresent()) {
-UnifiedLog log = maybeLog.get();
-Option maybeLeaderEpochFileCache = 
log.leaderEpochCache();
-if (maybeLeaderEpochFileCache.isDefined()) {
-LeaderEpochFileCache cache = maybeLeaderEpochFileCache.get();
-OptionalInt epoch = cache.latestEpoch();
-while (!offset.isPresent() && epoch.isPresent()) {
-offset = 
remoteLogMetadataManager.highestOffsetForEpoch(topicIdPartition, 
epoch.getAsInt());
-epoch = cache.previousEpoch(epoch.getAsInt());
+public FetchDataInfo read(RemoteStorageFetchInfo remoteStorageFetchInfo) 
throws RemoteStorageException, IOException {
+int fetchMaxBytes = remoteStorageFetchInfo.fetchMaxBytes;
+TopicPartition tp = remoteStorageFetchInfo.topicPartition;
+FetchRequest.PartitionData fetchInfo = 
remoteStorageFetchInfo.fetchInfo;
+
+boolean includeAbortedTxns = remoteStorageFetchInfo.fetchIsolation == 
FetchIsolation.TXN_COMMITTED;
+
+ 

[GitHub] [kafka] mimaison merged pull request #13649: MINOR: Fix incorrect description of SessionLifetimeMs

2023-04-28 Thread via GitHub


mimaison merged PR #13649:
URL: https://github.com/apache/kafka/pull/13649


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] divijvaidya commented on a diff in pull request #13561: KAFKA-14888: Added remote log segments retention functionality based on time and size.

2023-04-28 Thread via GitHub


divijvaidya commented on code in PR #13561:
URL: https://github.com/apache/kafka/pull/13561#discussion_r1180254846


##
core/src/main/java/kafka/log/remote/RemoteLogManager.java:
##
@@ -581,11 +588,18 @@ public void run() {
 if (isLeader()) {
 // Copy log segments to remote storage
 copyLogSegmentsToRemote();
+// Cleanup/delete expired remote log segments
+cleanupExpiredRemoteLogSegments();

Review Comment:
   That is fair. As I mentioned in the [review 
overview](https://github.com/apache/kafka/pull/13561#pullrequestreview-1385125558),
 I am fine (and would actually prefer) with creating JIRAs and tackling the 
perf related comments outside this PR. With this comment, I wanted to make sure 
we are aware and are tracking things that need fixing in this code.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] aiven-anton commented on a diff in pull request #13649: MINOR: Fix incorrect description of SessionLifetimeMs

2023-04-28 Thread via GitHub


aiven-anton commented on code in PR #13649:
URL: https://github.com/apache/kafka/pull/13649#discussion_r1180246714


##
clients/src/main/resources/common/message/SaslAuthenticateResponse.json:
##
@@ -29,6 +29,6 @@
 { "name": "AuthBytes", "type": "bytes", "versions": "0+",
   "about": "The SASL authentication bytes from the server, as defined by 
the SASL mechanism." },
 { "name": "SessionLifetimeMs", "type": "int64", "versions": "1+", 
"default": "0", "ignorable": true,
-  "about": "The SASL authentication bytes from the server, as defined by 
the SASL mechanism." }
+  "about": "The duration in milliseconds that a successful session 
response is valid for." }

Review Comment:
   I updated with original copy as suggested.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] yashmayya commented on pull request #13646: KAFKA-14938: Fixing flaky test testConnectorBoundary

2023-04-28 Thread via GitHub


yashmayya commented on PR #13646:
URL: https://github.com/apache/kafka/pull/13646#issuecomment-1527292922

   Thanks @sambhav-jain-16 
   
   > What I suspect is happening is that when the method is initially storing 
the end offsets of the partitions, the connector hasn't produced 100 records 
till then and therefore the method doesn't consume fully even though messages 
are being produced by the connector.
   
   I'm not sure how this is is possible given that we're waiting for 
`MINIMUM_MESSAGES` to be committed 
first?https://github.com/apache/kafka/blob/c6ad151ac3bac0d8d1d6985d230eacaa170b8984/connect/runtime/src/test/java/org/apache/kafka/connect/integration/ExactlyOnceSourceIntegrationTest.java#L399-L410
 (i.e. `SourceTask::commitRecord` is called `MINIMUM_MESSAGES` number of times).
   
   Records are "committed" only after the producer transaction is committed 
successfully -
   
   
https://github.com/apache/kafka/blob/c6ad151ac3bac0d8d1d6985d230eacaa170b8984/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/ExactlyOnceWorkerSourceTask.java#L302-L332


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] dajac commented on a diff in pull request #13644: KAFKA-14500; [1/N] Rewrite MemberMetadata in Java

2023-04-28 Thread via GitHub


dajac commented on code in PR #13644:
URL: https://github.com/apache/kafka/pull/13644#discussion_r1180180911


##
group-coordinator/src/main/java/org/apache/kafka/coordinator/group/generic/GenericGroupMember.java:
##
@@ -0,0 +1,560 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.kafka.coordinator.group.generic;
+
+import org.apache.kafka.common.message.JoinGroupResponseData;
+import org.apache.kafka.common.message.SyncGroupResponseData;
+
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.List;
+import java.util.Objects;
+import java.util.Optional;
+import java.util.Set;
+import java.util.concurrent.CompletableFuture;
+import java.util.stream.Collectors;
+
+/**
+ * Java rewrite of {@link kafka.coordinator.group.MemberMetadata} that is used
+ * by the new group coordinator (KIP-848).

Review Comment:
   We should not put javadoc like this. Let's describe what it is.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[jira] [Resolved] (KAFKA-14639) Kafka CooperativeStickyAssignor revokes/assigns partition in one rebalance cycle

2023-04-28 Thread David Jacot (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-14639?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

David Jacot resolved KAFKA-14639.
-
Fix Version/s: 3.5.0
 Reviewer: David Jacot
   Resolution: Fixed

> Kafka CooperativeStickyAssignor revokes/assigns partition in one rebalance 
> cycle
> 
>
> Key: KAFKA-14639
> URL: https://issues.apache.org/jira/browse/KAFKA-14639
> Project: Kafka
>  Issue Type: Bug
>  Components: clients, consumer
>Affects Versions: 3.2.1
>Reporter: Bojan Blagojevic
>Assignee: Philip Nee
>Priority: Major
> Fix For: 3.5.0
>
> Attachments: consumers-jira.log
>
>
> I have an application that runs 6 consumers in parallel. I am getting some 
> unexpected results when I use {{{}CooperativeStickyAssignor{}}}. If I 
> understand the mechanism correctly, if the consumer looses partition in one 
> rebalance cycle, the partition should be assigned in the next rebalance cycle.
> This assumption is based on the 
> [RebalanceProtocol|https://kafka.apache.org/31/javadoc/org/apache/kafka/clients/consumer/ConsumerPartitionAssignor.RebalanceProtocol.html]
>  documentation and few blog posts that describe the protocol, like [this 
> one|https://www.confluent.io/blog/cooperative-rebalancing-in-kafka-streams-consumer-ksqldb/]
>  on Confluent blog.
> {quote}The assignor should not reassign any owned partitions immediately, but 
> instead may indicate consumers the need for partition revocation so that the 
> revoked partitions can be reassigned to other consumers in the next rebalance 
> event. This is designed for sticky assignment logic which attempts to 
> minimize partition reassignment with cooperative adjustments.
> {quote}
> {quote}Any member that revoked partitions then rejoins the group, triggering 
> a second rebalance so that its revoked partitions can be assigned. Until 
> then, these partitions are unowned and unassigned.
> {quote}
> These are the logs from the application that uses 
> {{{}protocol='cooperative-sticky'{}}}. In the same rebalance cycle 
> ({{{}generationId=640{}}}) {{partition 74}} moves from {{consumer-3}} to 
> {{{}consumer-4{}}}. I omitted the lines that are logged by the other 4 
> consumers.
> Mind that the log is in reverse(bottom to top)
> {code:java}
> 2022-12-14 11:18:24 1 — [consumer-3] x.y.z.MyRebalanceHandler1 : New 
> partition assignment: partition-59, seek to min common offset: 85120524
> 2022-12-14 11:18:24 1 — [consumer-3] x.y.z.MyRebalanceHandler2 : Partitions 
> [partition-59] assigned successfully
> 2022-12-14 11:18:24 1 — [consumer-3] x.y.z.MyRebalanceHandler1 : Partitions 
> assigned: [partition-59]
> 2022-12-14 11:18:24 1 — [consumer-3] o.a.k.c.c.internals.ConsumerCoordinator 
> : [Consumer clientId=partition-3-my-client-id-my-group-id, 
> groupId=my-group-id] Adding newly assigned partitions: partition-59
> 2022-12-14 11:18:24 1 — [consumer-3] o.a.k.c.c.internals.ConsumerCoordinator 
> : [Consumer clientId=partition-3-my-client-id-my-group-id, 
> groupId=my-group-id] Notifying assignor about the new 
> Assignment(partitions=[partition-59])
> 2022-12-14 11:18:24 1 — [consumer-3] o.a.k.c.c.internals.ConsumerCoordinator 
> : [Consumer clientId=partition-3-my-client-id-my-group-id, 
> groupId=my-group-id] Request joining group due to: need to revoke partitions 
> [partition-26, partition-74] as indicated by the current assignment and 
> re-join
> 2022-12-14 11:18:24 1 — [consumer-3] x.y.z.MyRebalanceHandler2 : Partitions 
> [partition-26, partition-74] revoked successfully
> 2022-12-14 11:18:24 1 — [consumer-3] x.y.z.MyRebalanceHandler1 : Finished 
> removing partition data
> 2022-12-14 11:18:24 1 — [consumer-4] o.a.k.c.c.internals.ConsumerCoordinator 
> : [Consumer clientId=partition-4-my-client-id-my-group-id, 
> groupId=my-group-id] (Re-)joining group
> 2022-12-14 11:18:24 1 — [consumer-4] x.y.z.MyRebalanceHandler1 : New 
> partition assignment: partition-74, seek to min common offset: 107317730
> 2022-12-14 11:18:24 1 — [consumer-4] x.y.z.MyRebalanceHandler2 : Partitions 
> [partition-74] assigned successfully
> 2022-12-14 11:18:24 1 — [consumer-4] x.y.z.MyRebalanceHandler1 : Partitions 
> assigned: [partition-74]
> 2022-12-14 11:18:24 1 — [consumer-4] o.a.k.c.c.internals.ConsumerCoordinator 
> : [Consumer clientId=partition-4-my-client-id-my-group-id, 
> groupId=my-group-id] Adding newly assigned partitions: partition-74
> 2022-12-14 11:18:24 1 — [consumer-4] o.a.k.c.c.internals.ConsumerCoordinator 
> : [Consumer clientId=partition-4-my-client-id-my-group-id, 
> groupId=my-group-id] Notifying assignor about the new 
> Assignment(partitions=[partition-74])
> 2022-12-14 11:18:24 1 — [consumer-4] o.a.k.c.c.internals.ConsumerCoordinator 
> : [Consumer 

[GitHub] [kafka] dajac commented on pull request #13550: KAFKA-14639: A single partition may be revoked and assign during a single round of rebalance

2023-04-28 Thread via GitHub


dajac commented on PR #13550:
URL: https://github.com/apache/kafka/pull/13550#issuecomment-1527244890

   Merged to trunk and 3.5.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] showuon commented on a diff in pull request #13561: KAFKA-14888: Added remote log segments retention functionality based on time and size.

2023-04-28 Thread via GitHub


showuon commented on code in PR #13561:
URL: https://github.com/apache/kafka/pull/13561#discussion_r1180121259


##
core/src/main/java/kafka/log/remote/RemoteLogManager.java:
##
@@ -581,11 +588,18 @@ public void run() {
 if (isLeader()) {
 // Copy log segments to remote storage
 copyLogSegmentsToRemote();
+// Cleanup/delete expired remote log segments
+cleanupExpiredRemoteLogSegments();
+} else {
+Optional unifiedLogOptional = 
fetchLog.apply(topicIdPartition.topicPartition());
+if (unifiedLogOptional.isPresent()) {
+long offset = 
findHighestRemoteOffset(topicIdPartition);
+
unifiedLogOptional.get().updateHighestOffsetInRemoteStorage(offset);

Review Comment:
   We might need to add logs here to describe why we need to update highest 
offset in remote storage for followers. I think that's for fetch from follower 
replica feature, right?



##
core/src/main/java/kafka/log/remote/RemoteLogManager.java:
##
@@ -595,6 +609,193 @@ public void run() {
 }
 }
 
+public void handleLogStartOffsetUpdate(TopicPartition topicPartition, 
long remoteLogStartOffset) {
+logger.debug("Updating $topicPartition with remoteLogStartOffset: 
{}", remoteLogStartOffset);
+updateRemoteLogStartOffset.accept(topicPartition, 
remoteLogStartOffset);
+}
+
+class RemoteLogRetentionHandler {
+
+private long remainingBreachedSize = 0L;
+private OptionalLong logStartOffset = OptionalLong.empty();
+
+public RemoteLogRetentionHandler(long remainingBreachedSize) {
+this.remainingBreachedSize = remainingBreachedSize;
+}
+
+private boolean 
deleteRetentionSizeBreachedSegments(RemoteLogSegmentMetadata metadata, boolean 
checkSizeRetention) throws RemoteStorageException, ExecutionException, 
InterruptedException {
+boolean isSegmentDeleted = deleteRemoteLogSegment(metadata, x 
-> {
+// Assumption that segments contain size > 0

Review Comment:
   nit: assume that segments contain size >= 0



##
core/src/main/java/kafka/log/remote/RemoteLogManager.java:
##
@@ -595,6 +609,193 @@ public void run() {
 }
 }
 
+public void handleLogStartOffsetUpdate(TopicPartition topicPartition, 
long remoteLogStartOffset) {
+logger.debug("Updating $topicPartition with remoteLogStartOffset: 
{}", remoteLogStartOffset);
+updateRemoteLogStartOffset.accept(topicPartition, 
remoteLogStartOffset);
+}
+
+class RemoteLogRetentionHandler {
+
+private long remainingBreachedSize = 0L;
+private OptionalLong logStartOffset = OptionalLong.empty();
+
+public RemoteLogRetentionHandler(long remainingBreachedSize) {
+this.remainingBreachedSize = remainingBreachedSize;
+}
+
+private boolean 
deleteRetentionSizeBreachedSegments(RemoteLogSegmentMetadata metadata, boolean 
checkSizeRetention) throws RemoteStorageException, ExecutionException, 
InterruptedException {
+boolean isSegmentDeleted = deleteRemoteLogSegment(metadata, x 
-> {
+// Assumption that segments contain size > 0
+if (checkSizeRetention && remainingBreachedSize > 0) {
+remainingBreachedSize -= x.segmentSizeInBytes();
+return remainingBreachedSize >= 0;
+} else return false;
+});
+if (isSegmentDeleted) {
+logStartOffset = OptionalLong.of(metadata.endOffset() + 1);
+logger.info("Deleted remote log segment 
${metadata.remoteLogSegmentId()} due to retention size " +
+"${log.config.retentionSize} breach. Log size 
after deletion will be " +
+"${remainingBreachedSize + 
log.config.retentionSize}.");
+}
+return isSegmentDeleted;
+}
+
+private boolean 
deleteLogStartOffsetBreachedSegments(RemoteLogSegmentMetadata metadata, long 
startOffset)
+throws RemoteStorageException, ExecutionException, 
InterruptedException {
+boolean isSegmentDeleted = deleteRemoteLogSegment(metadata, x 
-> startOffset > x.endOffset());
+if (isSegmentDeleted) {
+remainingBreachedSize = Math.max(0, remainingBreachedSize 
- metadata.segmentSizeInBytes());
+logger.info("Deleted remote log segment {} due to log 
start offset {} breach", metadata.remoteLogSegmentId(), startOffset);
+}
+
+// No need to update the logStartOffset.
+return isSegmentDeleted;
+

[GitHub] [kafka] dajac merged pull request #13550: KAFKA-14639: A single partition may be revoked and assign during a single round of rebalance

2023-04-28 Thread via GitHub


dajac merged PR #13550:
URL: https://github.com/apache/kafka/pull/13550


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] mdedetrich commented on pull request #11792: Replace EasyMock/PowerMock with Mockito in DistributedHerderTest

2023-04-28 Thread via GitHub


mdedetrich commented on PR #11792:
URL: https://github.com/apache/kafka/pull/11792#issuecomment-1527237583

   @yashmayya I don't really have time to finish this any time soon so feel 
free to take over


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] showuon commented on pull request #13535: KAFKA-9579 Fetch implementation for records in the remote storage through a specific purgatory.

2023-04-28 Thread via GitHub


showuon commented on PR #13535:
URL: https://github.com/apache/kafka/pull/13535#issuecomment-1527204055

   @Hangleton @junrao @jeqo , any other comments to this PR? We hope we can 
merge it in the early stage of a release, so that we can have enough time to 
test the stability and have more improvement. Thanks.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] mimaison merged pull request #13171: KAFKA-14584: Deprecate StateChangeLogMerger tool

2023-04-28 Thread via GitHub


mimaison merged PR #13171:
URL: https://github.com/apache/kafka/pull/13171


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] fvaleri commented on a diff in pull request #13514: KAFKA-14752: Kafka examples improvements - consumer changes

2023-04-28 Thread via GitHub


fvaleri commented on code in PR #13514:
URL: https://github.com/apache/kafka/pull/13514#discussion_r1180003269


##
examples/src/main/java/kafka/examples/Consumer.java:
##
@@ -91,9 +99,13 @@ public void run() {
 // we can't recover from these exceptions
 Utils.printErr(e.getMessage());
 shutdown();
+} catch (OffsetOutOfRangeException | 
NoOffsetForPartitionException e) {
+// invalid or no offset found without auto.reset.policy
+Utils.printOut("Invalid or no offset found, using latest");
+consumer.seekToEnd(emptyList());

Review Comment:
   Yes, that makes sense. Thanks Luke.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] showuon merged pull request #13631: KAFKA-14909: check zkMigrationReady tag before migration

2023-04-28 Thread via GitHub


showuon merged PR #13631:
URL: https://github.com/apache/kafka/pull/13631


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] showuon commented on pull request #13631: KAFKA-14909: check zkMigrationReady tag before migration

2023-04-28 Thread via GitHub


showuon commented on PR #13631:
URL: https://github.com/apache/kafka/pull/13631#issuecomment-1527058819

   Failed tests are unrelated:
   ```
   Build / JDK 11 and Scala 2.13 / 
org.apache.kafka.connect.runtime.distributed.DistributedHerderTest.testExternalZombieFencingRequestAsynchronousFailure
   Build / JDK 17 and Scala 2.13 / 
integration.kafka.server.FetchFromFollowerIntegrationTest.testRackAwareRangeAssignor()
   Build / JDK 17 and Scala 2.13 / 
kafka.server.CreateTopicsRequestTest.testErrorCreateTopicsRequests(String).quorum=kraft
   Build / JDK 17 and Scala 2.13 / 
org.apache.kafka.trogdor.coordinator.CoordinatorTest.testTaskRequestWithOldStartMsGetsUpdated()
   ```


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] sambhav-jain-16 commented on pull request #13646: KAFKA-14938: Fixing flaky test testConnectorBoundary

2023-04-28 Thread via GitHub


sambhav-jain-16 commented on PR #13646:
URL: https://github.com/apache/kafka/pull/13646#issuecomment-1527055473

   >  How would this fix be any different from that i.e if consume can't get 
100 messages within the timeout. Won't it still fail?
   
   Yes it will fail, but `consumeAll` is not failing due to timeout here but 
rather due to its nature of storing the end offsets before consuming.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] sambhav-jain-16 commented on pull request #13646: KAFKA-14938: Fixing flaky test testConnectorBoundary

2023-04-28 Thread via GitHub


sambhav-jain-16 commented on PR #13646:
URL: https://github.com/apache/kafka/pull/13646#issuecomment-1527052521

   @sagarrao12 
   The `consumeAll` method consumes the available records at the moment we 
start consuming, i.e. It will store the end offsets initially before consuming 
and then start to consume up until the end offsets stored (for each partition).
   The `consume` method has no such restriction, it will consume until it 
receives the specified number of records or the timeout.
   .
   .
   What I suspect is happening is that when the method is initially storing the 
offsets of the partitions, the connector hasn't produced 100 records till then 
and therefore the method doesn't consume fully even though messages are being 
produced by the connector.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] dajac commented on a diff in pull request #13443: KAFKA-14514: Add Range Assignor on the Server (KIP-848)

2023-04-28 Thread via GitHub


dajac commented on code in PR #13443:
URL: https://github.com/apache/kafka/pull/13443#discussion_r1179969167


##
group-coordinator/src/test/java/org/apache/kafka/coordinator/group/assignor/RangeAssignorTest.java:
##
@@ -0,0 +1,574 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.kafka.coordinator.group.assignor;
+
+import org.apache.kafka.common.Uuid;
+
+import org.junit.jupiter.api.Test;
+
+import java.util.Arrays;
+import java.util.Collection;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.Map;
+import java.util.Optional;
+import java.util.Set;
+
+import static org.junit.jupiter.api.Assertions.assertEquals;
+import static org.junit.jupiter.api.Assertions.assertNull;
+import static org.junit.jupiter.api.Assertions.assertTrue;
+
+public class RangeAssignorTest {
+private final RangeAssignor assignor = new RangeAssignor();
+private final Uuid topic1Uuid = Uuid.randomUuid();
+private final Uuid topic2Uuid = Uuid.randomUuid();
+private final Uuid topic3Uuid = Uuid.randomUuid();
+private final String consumerA = "A";
+private final String consumerB = "B";
+private final String consumerC = "C";
+
+@Test
+public void testOneConsumerNoTopic() {
+Map topics = 
Collections.singletonMap(topic1Uuid, new AssignmentTopicMetadata(3));
+Map members = Collections.singletonMap(
+consumerA,
+new AssignmentMemberSpec(
+Optional.empty(),
+Optional.empty(),
+Collections.emptyList(),
+Collections.emptyMap())
+);
+
+AssignmentSpec assignmentSpec = new AssignmentSpec(members, topics);
+GroupAssignment groupAssignment = assignor.assign(assignmentSpec);
+
+assertTrue(groupAssignment.members().isEmpty());
+}
+
+@Test
+public void testOneConsumerNonExistentTopic() {
+Map topics = 
Collections.singletonMap(topic1Uuid, new AssignmentTopicMetadata(3));
+Map members = Collections.singletonMap(
+consumerA,
+new AssignmentMemberSpec(
+Optional.empty(),
+Optional.empty(),
+Collections.singletonList(topic2Uuid),
+Collections.emptyMap())
+);
+
+AssignmentSpec assignmentSpec = new AssignmentSpec(members, topics);
+GroupAssignment groupAssignment = assignor.assign(assignmentSpec);
+
+assertTrue(groupAssignment.members().isEmpty());
+}
+
+@Test
+public void testFirstAssignmentTwoConsumersTwoTopicsSameSubscriptions() {
+Map topics = new HashMap<>();
+topics.put(topic1Uuid, new AssignmentTopicMetadata(3));
+topics.put(topic3Uuid, new AssignmentTopicMetadata(2));
+
+Map members = new HashMap<>();
+// Initial Subscriptions are: A -> T1, T3 | B -> T1, T3
+
+members.put(consumerA, new AssignmentMemberSpec(
+Optional.empty(),
+Optional.empty(),
+Arrays.asList(topic1Uuid, topic3Uuid),
+Collections.emptyMap())
+);
+
+members.put(consumerB, new AssignmentMemberSpec(
+Optional.empty(),
+Optional.empty(),
+Arrays.asList(topic1Uuid, topic3Uuid),
+Collections.emptyMap())
+);
+
+AssignmentSpec assignmentSpec = new AssignmentSpec(members, topics);
+GroupAssignment computedAssignment = assignor.assign(assignmentSpec);
+
+Map>> expectedAssignment = new HashMap<>();
+// Topic 1 Partitions Assignment
+mkAssignment(expectedAssignment, topic1Uuid, Arrays.asList(0, 1));
+mkAssignment(expectedAssignment, topic1Uuid, Collections.singleton(2));
+// Topic 3 Partitions Assignment
+mkAssignment(expectedAssignment, topic3Uuid, Collections.singleton(0));
+mkAssignment(expectedAssignment, topic3Uuid, Collections.singleton(1));
+
+assertAssignment(expectedAssignment, computedAssignment);

Review Comment:
   Yeah, sorry for this. I only thought about it when I raised this comment.



-- 
This is an automated message from the Apache Git 

[GitHub] [kafka] dajac commented on a diff in pull request #13443: KAFKA-14514: Add Range Assignor on the Server (KIP-848)

2023-04-28 Thread via GitHub


dajac commented on code in PR #13443:
URL: https://github.com/apache/kafka/pull/13443#discussion_r1179968856


##
group-coordinator/src/main/java/org/apache/kafka/coordinator/group/assignor/RangeAssignor.java:
##
@@ -0,0 +1,258 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.coordinator.group.assignor;
+
+import org.apache.kafka.common.Uuid;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+
+import static java.lang.Math.min;
+
+/**
+ * The Server Side Sticky Range Assignor inherits properties of both the range 
assignor and the sticky assignor.
+ * The properties are as follows:
+ * 
+ *  Each member must get at least one partition from every topic that it 
is subscribed to. The only exception is when
+ * the number of subscribed members is greater than the number of partitions 
for that topic. (Range) 
+ *  Partitions should be assigned to members in a way that facilitates the 
join operation when required. (Range) 
+ * This can only be done if every member is subscribed to the same topics and 
the topics are co-partitioned.
+ * Two streams are co-partitioned if the following conditions are met:
+ * 
+ *  The keys must have the same schemas. 
+ *  The topics involved must have the same number of partitions. 
+ * 
+ *  Members should retain as much of their previous assignment as possible 
to reduce the number of partition movements during reassignment. (Sticky) 
+ * 
+ */
+public class RangeAssignor implements PartitionAssignor {
+private static final Logger log = 
LoggerFactory.getLogger(RangeAssignor.class);
+
+public static final String RANGE_ASSIGNOR_NAME = "range";
+
+@Override
+public String name() {
+return RANGE_ASSIGNOR_NAME;
+}
+
+// Used in the potentiallyUnfilledMembers map and the UnfilledMembers map.
+private static class MemberWithRemainingAssignments {
+private final String memberId;
+/**
+ * Number of partitions required to meet the assignment quota
+ */
+private final Integer remaining;
+
+public MemberWithRemainingAssignments(String memberId, Integer 
remaining) {
+this.memberId = memberId;
+this.remaining = remaining;
+}
+
+/**
+ * @return memberId
+ */
+public String memberId() {
+return memberId;
+}
+   /**
+ * @return Remaining number of partitions
+ */
+public Integer remaining() {
+return remaining;
+}
+}
+
+private Map> membersPerTopic(final AssignmentSpec 
assignmentSpec) {
+Map> membersPerTopic = new HashMap<>();
+Map membersData = 
assignmentSpec.members();
+
+membersData.forEach((memberId, memberMetadata) -> {
+Collection topics = memberMetadata.subscribedTopicIds();
+for (Uuid topicId: topics) {
+// Only topics that are present in both the subscribed topics 
list and the topic metadata should be considered for assignment.
+if (assignmentSpec.topics().containsKey(topicId)) {
+membersPerTopic
+.computeIfAbsent(topicId, k -> new ArrayList<>())
+.add(memberId);
+} else {
+log.warn("Member " + memberId + " subscribed to topic " + 
topicId + " which doesn't exist in the topic metadata");
+}
+}
+});
+
+return membersPerTopic;
+}
+
+/**
+ *  The algorithm includes the following steps:
+ * 
+ *  Generate a map of membersPerTopic using the given member 
subscriptions.

Review Comment:
   Interesting... It is weird to have variable names in the description. Plain 
english is much better than `membersPerTopic`.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: 

[GitHub] [kafka] dajac commented on a diff in pull request #13443: KAFKA-14514: Add Range Assignor on the Server (KIP-848)

2023-04-28 Thread via GitHub


dajac commented on code in PR #13443:
URL: https://github.com/apache/kafka/pull/13443#discussion_r1179967228


##
group-coordinator/src/test/java/org/apache/kafka/coordinator/group/assignor/RangeAssignorTest.java:
##
@@ -0,0 +1,574 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.kafka.coordinator.group.assignor;
+
+import org.apache.kafka.common.Uuid;
+
+import org.junit.jupiter.api.Test;
+
+import java.util.Arrays;
+import java.util.Collection;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.Map;
+import java.util.Optional;
+import java.util.Set;
+
+import static org.junit.jupiter.api.Assertions.assertEquals;
+import static org.junit.jupiter.api.Assertions.assertNull;
+import static org.junit.jupiter.api.Assertions.assertTrue;
+
+public class RangeAssignorTest {
+private final RangeAssignor assignor = new RangeAssignor();
+private final Uuid topic1Uuid = Uuid.randomUuid();
+private final Uuid topic2Uuid = Uuid.randomUuid();
+private final Uuid topic3Uuid = Uuid.randomUuid();
+private final String consumerA = "A";
+private final String consumerB = "B";
+private final String consumerC = "C";
+
+@Test
+public void testOneConsumerNoTopic() {
+Map topics = 
Collections.singletonMap(topic1Uuid, new AssignmentTopicMetadata(3));
+Map members = Collections.singletonMap(
+consumerA,
+new AssignmentMemberSpec(
+Optional.empty(),
+Optional.empty(),
+Collections.emptyList(),
+Collections.emptyMap())
+);
+
+AssignmentSpec assignmentSpec = new AssignmentSpec(members, topics);
+GroupAssignment groupAssignment = assignor.assign(assignmentSpec);
+
+assertTrue(groupAssignment.members().isEmpty());
+}
+
+@Test
+public void testOneConsumerNonExistentTopic() {
+Map topics = 
Collections.singletonMap(topic1Uuid, new AssignmentTopicMetadata(3));
+Map members = Collections.singletonMap(
+consumerA,
+new AssignmentMemberSpec(
+Optional.empty(),
+Optional.empty(),
+Collections.singletonList(topic2Uuid),
+Collections.emptyMap())
+);
+
+AssignmentSpec assignmentSpec = new AssignmentSpec(members, topics);
+GroupAssignment groupAssignment = assignor.assign(assignmentSpec);
+
+assertTrue(groupAssignment.members().isEmpty());
+}
+
+@Test
+public void testFirstAssignmentTwoConsumersTwoTopicsSameSubscriptions() {
+Map topics = new HashMap<>();
+topics.put(topic1Uuid, new AssignmentTopicMetadata(3));
+topics.put(topic3Uuid, new AssignmentTopicMetadata(2));
+
+Map members = new HashMap<>();
+// Initial Subscriptions are: A -> T1, T3 | B -> T1, T3
+
+members.put(consumerA, new AssignmentMemberSpec(
+Optional.empty(),
+Optional.empty(),
+Arrays.asList(topic1Uuid, topic3Uuid),
+Collections.emptyMap())
+);
+
+members.put(consumerB, new AssignmentMemberSpec(
+Optional.empty(),
+Optional.empty(),
+Arrays.asList(topic1Uuid, topic3Uuid),
+Collections.emptyMap())
+);
+
+AssignmentSpec assignmentSpec = new AssignmentSpec(members, topics);
+GroupAssignment computedAssignment = assignor.assign(assignmentSpec);
+
+Map>> expectedAssignment = new HashMap<>();
+// Topic 1 Partitions Assignment
+mkAssignment(expectedAssignment, topic1Uuid, Arrays.asList(0, 1));
+mkAssignment(expectedAssignment, topic1Uuid, Collections.singleton(2));
+// Topic 3 Partitions Assignment
+mkAssignment(expectedAssignment, topic3Uuid, Collections.singleton(0));
+mkAssignment(expectedAssignment, topic3Uuid, Collections.singleton(1));
+
+assertAssignment(expectedAssignment, computedAssignment);
+}
+
+@Test
+public void 
testFirstAssignmentThreeConsumersThreeTopicsDifferentSubscriptions() {
+Map topics = new HashMap<>();
+  

[GitHub] [kafka] dajac commented on a diff in pull request #13443: KAFKA-14514: Add Range Assignor on the Server (KIP-848)

2023-04-28 Thread via GitHub


dajac commented on code in PR #13443:
URL: https://github.com/apache/kafka/pull/13443#discussion_r1179966618


##
group-coordinator/src/main/java/org/apache/kafka/coordinator/group/assignor/RangeAssignor.java:
##
@@ -0,0 +1,258 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.coordinator.group.assignor;
+
+import org.apache.kafka.common.Uuid;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+
+import static java.lang.Math.min;
+
+/**
+ * The Server Side Sticky Range Assignor inherits properties of both the range 
assignor and the sticky assignor.
+ * The properties are as follows:
+ * 
+ *  Each member must get at least one partition from every topic that it 
is subscribed to. The only exception is when
+ * the number of subscribed members is greater than the number of partitions 
for that topic. (Range) 
+ *  Partitions should be assigned to members in a way that facilitates the 
join operation when required. (Range) 
+ * This can only be done if every member is subscribed to the same topics and 
the topics are co-partitioned.
+ * Two streams are co-partitioned if the following conditions are met:
+ * 
+ *  The keys must have the same schemas. 
+ *  The topics involved must have the same number of partitions. 
+ * 
+ *  Members should retain as much of their previous assignment as possible 
to reduce the number of partition movements during reassignment. (Sticky) 
+ * 
+ */
+public class RangeAssignor implements PartitionAssignor {
+private static final Logger log = 
LoggerFactory.getLogger(RangeAssignor.class);
+
+public static final String RANGE_ASSIGNOR_NAME = "range";

Review Comment:
   Ack. We can keep it as public.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] sagarrao12 commented on pull request #13646: KAFKA-14938: Fixing flaky test testConnectorBoundary

2023-04-28 Thread via GitHub


sagarrao12 commented on PR #13646:
URL: https://github.com/apache/kafka/pull/13646#issuecomment-1527038626

   Thanks @sambhav-jain-16 . This seems to have fixed the test as such going by 
the build but I am still curious as to why it used to fail expecting 100+ but 
got 72. How would this fix be any different from that i.e if consume can't get 
100 messages within the timeout. Won't it still fail?


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



  1   2   >