[jira] [Updated] (KAFKA-14952) Publish metrics when source connector fails to poll data
[ https://issues.apache.org/jira/browse/KAFKA-14952?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Ravindranath Kakarla updated KAFKA-14952: - Description: Currently, there is no metric in Kafka Connect to track when a source connector fails to poll data from the source. This information would be useful to operators and developers to visualize, monitor and alert when the connector fails to poll records from the source. Existing metrics like *kafka_producer_producer_metrics_record_error_total* and *kafka_connect_task_error_metrics_total_record_failures* only cover failures when producing data to the Kafka cluster but not when the source task fails with a retryable exception or ConnectException. Polling from source can fail due to unavailability of the source system or errors with the connect configuration. Currently, this cannot be monitored directly using metrics and instead operators have to rely on log diving which is not consistent with how other metrics are monitored. I propose adding new metrics to Kafka Connect, "{_}source-record-poll-error-total{_}" and "{_}source-record-poll-error-rate{_}" that can be used to monitor failures during polling. *source-record-poll-error-total* - The total number of times a source connector failed to poll data from the source. This will include both retryable and non-retryable exceptions. *source-record-poll-error-rate* - The rate of above failures per unit of time. These metrics would be tracked at the connector level and could be exposed through the JMX along with the other metrics. I am willing to submit a PR if this looks good, sample implementation code below, {code:java} //AbstractWorkerSourceTask.java protected List poll() throws InterruptedException { try { return task.poll(); } catch (RetriableException | org.apache.kafka.common.errors.RetriableException e) { log.warn("{} failed to poll records from SourceTask. Will retry operation.", this, e); sourceTaskMetricsGroup.recordPollError(); // Do nothing. Let the framework poll whenever it's ready. return null; } catch (Throwable e) { sourceTaskMetricsGroup.recordPollError(); throw e; } } {code} [Reference|https://github.com/apache/kafka/blob/trunk/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/AbstractWorkerSourceTask.java#L460] was: Currently, there is no metric in Kafka Connect to track when a source connector fails to poll data from the source. This information would be useful to operators and developers to visualize, monitor and alert when the connector fails to poll records from the source. Existing metrics like *kafka_producer_producer_metrics_record_error_total* and *kafka_connect_task_error_metrics_total_record_failures* only cover failures when producing data to the Kafka cluster but not when the source task fails with a retryable exception or ConnectException. Polling from source can fail due to unavailability of the source system or errors with the connect configuration. Currently, this cannot be monitored directly using metrics and instead operators have to rely on log diving which is not consistent with how other metrics are monitored. I propose adding new metrics to Kafka Connect, "source-record-poll-error-total" and "source-record-poll-error-rate" that can be used to monitor failures during polling. _*source-record-poll-error-total*_ - The total number of times a source connector failed to poll data from the source. This will include both retryable and non-retryable exceptions. _*source-record-poll-error-rate*_ - The rate of above failures per unit of time. These metrics would be tracked at the connector level and could be exposed through the JMX along with the other metrics. I am willing to submit a PR if this looks good, sample implementation code below, {code:java} //AbstractWorkerSourceTask.java protected List poll() throws InterruptedException { try { return task.poll(); } catch (RetriableException | org.apache.kafka.common.errors.RetriableException e) { log.warn("{} failed to poll records from SourceTask. Will retry operation.", this, e); sourceTaskMetricsGroup.recordPollError(); // Do nothing. Let the framework poll whenever it's ready. return null; } catch (Throwable e) { sourceTaskMetricsGroup.recordPollError(); throw e; } } {code} [Reference|https://github.com/apache/kafka/blob/trunk/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/AbstractWorkerSourceTask.java#L460] > Publish metrics when source connector fails to poll data > > > Key: KAFKA-14952 > URL: https://issues.apache.org/jira/browse/KAFKA-14952 > Project: Kafka > Issue
[jira] [Updated] (KAFKA-14952) Publish metrics when source connector fails to poll data
[ https://issues.apache.org/jira/browse/KAFKA-14952?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Ravindranath Kakarla updated KAFKA-14952: - Description: Currently, there is no metric in Kafka Connect to track when a source connector fails to poll data from the source. This information would be useful to operators and developers to visualize, monitor and alert when the connector fails to poll records from the source. Existing metrics like *kafka_producer_producer_metrics_record_error_total* and *kafka_connect_task_error_metrics_total_record_failures* only cover failures when producing data to the Kafka cluster but not when the source task fails with a retryable exception or ConnectException. Polling from source can fail due to unavailability of the source system or errors with the connect configuration. Currently, this cannot be monitored directly using metrics and instead operators have to rely on log diving which is not consistent with how other metrics are monitored. I propose adding new metrics to Kafka Connect, "source-record-poll-error-total" and "source-record-poll-error-rate" that can be used to monitor failures during polling. _*source-record-poll-error-total*_ - The total number of times a source connector failed to poll data from the source. This will include both retryable and non-retryable exceptions. _*source-record-poll-error-rate*_ - The rate of above failures per unit of time. These metrics would be tracked at the connector level and could be exposed through the JMX along with the other metrics. I am willing to submit a PR if this looks good, sample implementation code below, {code:java} //AbstractWorkerSourceTask.java protected List poll() throws InterruptedException { try { return task.poll(); } catch (RetriableException | org.apache.kafka.common.errors.RetriableException e) { log.warn("{} failed to poll records from SourceTask. Will retry operation.", this, e); sourceTaskMetricsGroup.recordPollError(); // Do nothing. Let the framework poll whenever it's ready. return null; } catch (Throwable e) { sourceTaskMetricsGroup.recordPollError(); throw e; } } {code} [Reference|https://github.com/apache/kafka/blob/trunk/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/AbstractWorkerSourceTask.java#L460] was: Currently, there is no metric in Kafka Connect to track when a source connector fails to poll data from the source. This information would be useful to operators and developers to visualize, monitor and alert when the connector fails to poll records from the source. Existing metrics like `kafka_producer_producer_metrics_record_error_total` and `kafka_connect_task_error_metrics_total_record_failures` only cover failures when producing data to the Kafka cluster but not when the source task fails with a retryable exception or ConnectException. Polling from source can fail due to unavailability of the source system or errors with the connect configuration. Currently, this cannot be monitored directly using metrics and instead operators have to rely on log diving which is not consistent with how other metrics are monitored. I propose adding new metrics to Kafka Connect, "source-record-poll-error-total" and "source-record-poll-error-rate" that can be used to monitor failures during polling. `source-record-poll-error-total` - The total number of times a source connector failed to poll data from the source. This will include both retryable and non-retryable exceptions. `source-record-poll-error-rate` - The rate of above failures per unit of time. These metrics would be tracked at the connector level and could be exposed through the JMX along with the other metrics. I am willing to submit a PR if this looks good, sample implementation code below, {code:java} //AbstractWorkerSourceTask.java protected List poll() throws InterruptedException { try { return task.poll(); } catch (RetriableException | org.apache.kafka.common.errors.RetriableException e) { log.warn("{} failed to poll records from SourceTask. Will retry operation.", this, e); sourceTaskMetricsGroup.recordPollError(); // Do nothing. Let the framework poll whenever it's ready. return null; } catch (Throwable e) { sourceTaskMetricsGroup.recordPollError(); throw e; } } {code} > Publish metrics when source connector fails to poll data > > > Key: KAFKA-14952 > URL: https://issues.apache.org/jira/browse/KAFKA-14952 > Project: Kafka > Issue Type: Improvement > Components: KafkaConnect >Affects Versions: 3.3.2 >Reporter: Ravindranath Kakarla >Priority: Minor >
[GitHub] [kafka] kirktrue commented on pull request #13591: KAFKA-14831: Illegal state errors should be fatal in transactional producer
kirktrue commented on PR #13591: URL: https://github.com/apache/kafka/pull/13591#issuecomment-1528441772 Temporarily marking this as a draft as I've committed some code that is not fully vetted yet. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Created] (KAFKA-14952) Publish metrics when source connector fails to poll data
Ravindranath Kakarla created KAFKA-14952: Summary: Publish metrics when source connector fails to poll data Key: KAFKA-14952 URL: https://issues.apache.org/jira/browse/KAFKA-14952 Project: Kafka Issue Type: Improvement Components: KafkaConnect Affects Versions: 3.3.2 Reporter: Ravindranath Kakarla Currently, there is no metric in Kafka Connect to track when a source connector fails to poll data from the source. This information would be useful to operators and developers to visualize, monitor and alert when the connector fails to poll records from the source. Existing metrics like `kafka_producer_producer_metrics_record_error_total` and `kafka_connect_task_error_metrics_total_record_failures` only cover failures when producing data to the Kafka cluster but not when the source task fails with a retryable exception or ConnectException. Polling from source can fail due to unavailability of the source system or errors with the connect configuration. Currently, this cannot be monitored directly using metrics and instead operators have to rely on log diving which is not consistent with how other metrics are monitored. I propose adding new metrics to Kafka Connect, "source-record-poll-error-total" and "source-record-poll-error-rate" that can be used to monitor failures during polling. `source-record-poll-error-total` - The total number of times a source connector failed to poll data from the source. This will include both retryable and non-retryable exceptions. `source-record-poll-error-rate` - The rate of above failures per unit of time. These metrics would be tracked at the connector level and could be exposed through the JMX along with the other metrics. I am willing to submit a PR if this looks good, sample implementation code below, {code:java} //AbstractWorkerSourceTask.java protected List poll() throws InterruptedException { try { return task.poll(); } catch (RetriableException | org.apache.kafka.common.errors.RetriableException e) { log.warn("{} failed to poll records from SourceTask. Will retry operation.", this, e); sourceTaskMetricsGroup.recordPollError(); // Do nothing. Let the framework poll whenever it's ready. return null; } catch (Throwable e) { sourceTaskMetricsGroup.recordPollError(); throw e; } } {code} -- This message was sent by Atlassian Jira (v8.20.10#820010)
[GitHub] [kafka] kirktrue commented on a diff in pull request #13591: KAFKA-14831: Illegal state errors should be fatal in transactional producer
kirktrue commented on code in PR #13591: URL: https://github.com/apache/kafka/pull/13591#discussion_r1180888455 ## clients/src/main/java/org/apache/kafka/clients/producer/internals/TransactionManager.java: ## @@ -384,14 +436,19 @@ synchronized boolean isAborting() { } synchronized void transitionToAbortableError(RuntimeException exception) { Review Comment: It's only used in `TransactionManagerTest`. I didn't want to expose the `InvalidStateTransitionHandler` quirkiness the outside world (even the tests). I went back and forth on this decision more than a couple of times. I'd certainly welcome your input. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] jolshan commented on pull request #13591: KAFKA-14831: Illegal state errors should be fatal in transactional producer
jolshan commented on PR #13591: URL: https://github.com/apache/kafka/pull/13591#issuecomment-1528256534 @kirktrue do we have a list of transitions we consider internal vs external? It would be nice to review that list as well as the code. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] jolshan commented on a diff in pull request #13591: KAFKA-14831: Illegal state errors should be fatal in transactional producer
jolshan commented on code in PR #13591: URL: https://github.com/apache/kafka/pull/13591#discussion_r1180886011 ## clients/src/main/java/org/apache/kafka/clients/producer/internals/TransactionManager.java: ## @@ -384,14 +436,19 @@ synchronized boolean isAborting() { } synchronized void transitionToAbortableError(RuntimeException exception) { Review Comment: I guess I will need to manually check if the usages are external -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] jolshan commented on a diff in pull request #13591: KAFKA-14831: Illegal state errors should be fatal in transactional producer
jolshan commented on code in PR #13591: URL: https://github.com/apache/kafka/pull/13591#discussion_r1180884390 ## clients/src/main/java/org/apache/kafka/clients/producer/internals/TransactionManager.java: ## @@ -384,14 +436,19 @@ synchronized boolean isAborting() { } synchronized void transitionToAbortableError(RuntimeException exception) { Review Comment: Do we ever use this version of the method? Ditto to some of the others we've changed -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] jolshan commented on a diff in pull request #13591: KAFKA-14831: Illegal state errors should be fatal in transactional producer
jolshan commented on code in PR #13591: URL: https://github.com/apache/kafka/pull/13591#discussion_r1180884390 ## clients/src/main/java/org/apache/kafka/clients/producer/internals/TransactionManager.java: ## @@ -384,14 +436,19 @@ synchronized boolean isAborting() { } synchronized void transitionToAbortableError(RuntimeException exception) { Review Comment: Do we ever use this version of the method? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] jolshan commented on a diff in pull request #13591: KAFKA-14831: Illegal state errors should be fatal in transactional producer
jolshan commented on code in PR #13591: URL: https://github.com/apache/kafka/pull/13591#discussion_r1180883557 ## clients/src/main/java/org/apache/kafka/clients/producer/internals/TransactionManager.java: ## @@ -384,14 +436,19 @@ synchronized boolean isAborting() { } synchronized void transitionToAbortableError(RuntimeException exception) { +transitionToAbortableError(exception, InvalidStateTransitionHandler.THROW_EXCEPTION); +} + +private synchronized void transitionToAbortableError(RuntimeException exception, + InvalidStateTransitionHandler invalidStateTransitionHandler) { Review Comment: nit: can we line up the parameters? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] kirktrue commented on a diff in pull request #13591: KAFKA-14831: Illegal state errors should be fatal in transactional producer
kirktrue commented on code in PR #13591: URL: https://github.com/apache/kafka/pull/13591#discussion_r1180881150 ## clients/src/main/java/org/apache/kafka/clients/producer/internals/TransactionManager.java: ## @@ -196,6 +196,44 @@ private enum Priority { } } +/** + * State transitions originate from one of two sources: + * + * + * External to the transaction manager. For example, the user performs an API call on the + * {@link org.apache.kafka.clients.producer.Producer producer} that is related to transactional management. + * + * Internal to the transaction manager. These transitions occur from within the transaction + * manager, for example when handling responses from the broker for transactions. + * + * + * + * When an invalid state transition is attempted, the logic related to handling that situation may + * differ depending on the source of the state transition. This interface allows the caller to provide the desired + * logic for handling that invalid state transition attempt as appropriate for the situation. + * + * + * + * When the state transition is being attempted on behalf of an external source, we want to continue to + * throw an {@link IllegalStateException} as this is the existing behavior. This gives the user the opportunity + * to fix the issue without permanently poisoning the state of the transaction manager. + * + * + * + * When the state transition is being attempted on behalf of an external source, we want to continue to + * throw an {@link IllegalStateException} as this is the existing behavior. This gives the user the opportunity + * to fix the issue without permanently poisoning the state of the transaction manager. + * + * + * + * See KAFKA-14831 for more detail. + */ +private enum InvalidStateTransitionHandler { + Review Comment: Will remove. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] jolshan commented on a diff in pull request #13591: KAFKA-14831: Illegal state errors should be fatal in transactional producer
jolshan commented on code in PR #13591: URL: https://github.com/apache/kafka/pull/13591#discussion_r1180880988 ## clients/src/main/java/org/apache/kafka/clients/producer/internals/TransactionManager.java: ## @@ -196,6 +196,44 @@ private enum Priority { } } +/** + * State transitions originate from one of two sources: + * + * + * External to the transaction manager. For example, the user performs an API call on the + * {@link org.apache.kafka.clients.producer.Producer producer} that is related to transactional management. + * + * Internal to the transaction manager. These transitions occur from within the transaction + * manager, for example when handling responses from the broker for transactions. + * + * + * + * When an invalid state transition is attempted, the logic related to handling that situation may + * differ depending on the source of the state transition. This interface allows the caller to provide the desired + * logic for handling that invalid state transition attempt as appropriate for the situation. + * + * + * + * When the state transition is being attempted on behalf of an external source, we want to continue to + * throw an {@link IllegalStateException} as this is the existing behavior. This gives the user the opportunity + * to fix the issue without permanently poisoning the state of the transaction manager. + * + * + * + * When the state transition is being attempted on behalf of an external source, we want to continue to + * throw an {@link IllegalStateException} as this is the existing behavior. This gives the user the opportunity + * to fix the issue without permanently poisoning the state of the transaction manager. + * + * + * + * See KAFKA-14831 for more detail. + */ +private enum InvalidStateTransitionHandler { + Review Comment: nit: I don't know if we need the empty lines in 232 and 234. However, we typically list enums one per line. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] kirktrue commented on a diff in pull request #13591: KAFKA-14831: Illegal state errors should be fatal in transactional producer
kirktrue commented on code in PR #13591: URL: https://github.com/apache/kafka/pull/13591#discussion_r1180881100 ## clients/src/main/java/org/apache/kafka/clients/producer/internals/TransactionManager.java: ## @@ -196,6 +196,44 @@ private enum Priority { } } +/** + * State transitions originate from one of two sources: + * + * + * External to the transaction manager. For example, the user performs an API call on the + * {@link org.apache.kafka.clients.producer.Producer producer} that is related to transactional management. + * + * Internal to the transaction manager. These transitions occur from within the transaction + * manager, for example when handling responses from the broker for transactions. + * + * + * + * When an invalid state transition is attempted, the logic related to handling that situation may + * differ depending on the source of the state transition. This interface allows the caller to provide the desired + * logic for handling that invalid state transition attempt as appropriate for the situation. + * + * + * + * When the state transition is being attempted on behalf of an external source, we want to continue to + * throw an {@link IllegalStateException} as this is the existing behavior. This gives the user the opportunity + * to fix the issue without permanently poisoning the state of the transaction manager. + * + * + * + * When the state transition is being attempted on behalf of an external source, we want to continue to Review Comment: Yeah, probably. The comment isn't so beautiful now, is it? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] jolshan commented on a diff in pull request #13591: KAFKA-14831: Illegal state errors should be fatal in transactional producer
jolshan commented on code in PR #13591: URL: https://github.com/apache/kafka/pull/13591#discussion_r1180880696 ## clients/src/main/java/org/apache/kafka/clients/producer/internals/TransactionManager.java: ## @@ -196,6 +196,44 @@ private enum Priority { } } +/** + * State transitions originate from one of two sources: + * + * + * External to the transaction manager. For example, the user performs an API call on the + * {@link org.apache.kafka.clients.producer.Producer producer} that is related to transactional management. + * + * Internal to the transaction manager. These transitions occur from within the transaction + * manager, for example when handling responses from the broker for transactions. + * + * + * + * When an invalid state transition is attempted, the logic related to handling that situation may + * differ depending on the source of the state transition. This interface allows the caller to provide the desired + * logic for handling that invalid state transition attempt as appropriate for the situation. + * + * + * + * When the state transition is being attempted on behalf of an external source, we want to continue to + * throw an {@link IllegalStateException} as this is the existing behavior. This gives the user the opportunity + * to fix the issue without permanently poisoning the state of the transaction manager. + * + * + * + * When the state transition is being attempted on behalf of an external source, we want to continue to Review Comment: Minor nit: this seems to be a repeat paragraph. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] jolshan commented on a diff in pull request #13591: KAFKA-14831: Illegal state errors should be fatal in transactional producer
jolshan commented on code in PR #13591: URL: https://github.com/apache/kafka/pull/13591#discussion_r1180880417 ## clients/src/main/java/org/apache/kafka/clients/producer/internals/TransactionManager.java: ## @@ -196,6 +196,44 @@ private enum Priority { } } +/** + * State transitions originate from one of two sources: Review Comment: This is a beautiful comment. I really appreciate clarity here :) -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] jolshan commented on pull request #13607: KAFKA-14916: Fix code that assumes transactional ID implies all records are transactional
jolshan commented on PR #13607: URL: https://github.com/apache/kafka/pull/13607#issuecomment-1528168395 One concern that came up when I was thinking about this PR is if we can have a produce request with more than one producer ID when we overflow epoch. Basically, I need to confirm we flush the accumulator. Alternatively, we can allow for more than one producer ID -- the main concern was that the verification could send the wrong one and get invalid producer ID mapping. Then we would have to retry. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] kirktrue commented on pull request #13568: KAFKA-14906:Extract the coordinator service log from server log
kirktrue commented on PR #13568: URL: https://github.com/apache/kafka/pull/13568#issuecomment-1528167503 Logging configuration is something the administrator can change as desired, right? Is there a clear benefit to the majority of users for this change? I can imagine it being a welcome change for some and an unexpected burden for others. Note: since I am not a Kafka administrator, feel free to ignore my comment -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] jolshan commented on a diff in pull request #13607: KAFKA-14916: Fix code that assumes transactional ID implies all records are transactional
jolshan commented on code in PR #13607: URL: https://github.com/apache/kafka/pull/13607#discussion_r1180865066 ## clients/src/main/java/org/apache/kafka/common/requests/ProduceRequest.java: ## @@ -222,6 +223,27 @@ public void clearPartitionRecords() { partitionSizes(); data = null; } + +public static void validateProducerIds(short version, ProduceRequestData data) { +if (version >= 3) { Review Comment: In producerequest.json: > // Version 3 adds the transactional ID, which is used for authorization when attempting to write // transactional data. Version 3 also adds support for Kafka Message Format v2. Basically before version 3, these fields aren't used. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] jolshan commented on a diff in pull request #13607: KAFKA-14916: Fix code that assumes transactional ID implies all records are transactional
jolshan commented on code in PR #13607: URL: https://github.com/apache/kafka/pull/13607#discussion_r1180864382 ## core/src/main/scala/kafka/server/ReplicaManager.scala: ## @@ -642,7 +642,14 @@ class ReplicaManager(val config: KafkaConfig, (entriesPerPartition, Map.empty) else entriesPerPartition.partition { case (topicPartition, records) => - getPartitionOrException(topicPartition).hasOngoingTransaction(records.firstBatch().producerId()) +// Produce requests (only requests that require verification) should only have one batch in "batches" but check all just to be safe. +val transactionalBatches = records.batches.asScala.filter(batch => batch.hasProducerId && batch.isTransactional) Review Comment: it would be the other way around, we can have a producer id but not be transactional with idempotent producers. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] jolshan commented on a diff in pull request #13607: KAFKA-14916: Fix code that assumes transactional ID implies all records are transactional
jolshan commented on code in PR #13607: URL: https://github.com/apache/kafka/pull/13607#discussion_r1180864129 ## clients/src/main/java/org/apache/kafka/common/requests/ProduceRequest.java: ## @@ -222,6 +223,27 @@ public void clearPartitionRecords() { partitionSizes(); data = null; } + +public static void validateProducerIds(short version, ProduceRequestData data) { +if (version >= 3) { +long producerId = -1; +for (ProduceRequestData.TopicProduceData topicData : data.topicData()) { +for (ProduceRequestData.PartitionProduceData partitionData : topicData.partitionData()) { +BaseRecords baseRecords = partitionData.records(); +if (baseRecords instanceof Records) { +Records records = (Records) baseRecords; +for (RecordBatch batch : records.batches()) { +if (producerId == -1 && batch.hasProducerId()) Review Comment: Based on some of the ProduceRequest tests I found, we allow for some batches with and some without. See `testMixedIdempotentData` -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] kirktrue commented on pull request #13581: A document on the current usage of Kafka project is added
kirktrue commented on PR #13581: URL: https://github.com/apache/kafka/pull/13581#issuecomment-1528144605 @SijoJoseph2002 just for my own understanding, did you intend to change this much source? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] kirktrue commented on a diff in pull request #13605: KAFKA-14950: implement assign() and assignment()
kirktrue commented on code in PR #13605: URL: https://github.com/apache/kafka/pull/13605#discussion_r1180848020 ## clients/src/main/java/org/apache/kafka/clients/consumer/internals/PrototypeAsyncConsumer.java: ## @@ -537,7 +568,9 @@ public void subscribe(Pattern pattern) { @Override public void unsubscribe() { -throw new KafkaException("method not implemented"); +// fetcher.clearBufferedDataForUnassignedPartitions(Collections.emptySet()); Review Comment: I'm wondering if we want to push the clearing of buffered data and `SubscriptionState` mutation into the background thread so that it's executed together? The coordinator's `onLeavePrepare` needs to know what partitions are being removed. As it's written, it's possible that the set of partitions could be removed before the background thread gets a chance to run the `onLeavePrepare` logic. ## clients/src/main/java/org/apache/kafka/clients/consumer/internals/PrototypeAsyncConsumer.java: ## @@ -522,7 +525,35 @@ public void subscribe(Collection topics, ConsumerRebalanceListener callb @Override public void assign(Collection partitions) { -throw new KafkaException("method not implemented"); +if (partitions == null) { +throw new IllegalArgumentException("Topic partitions collection to assign to cannot be null"); +} + +if (partitions.isEmpty()) { +this.unsubscribe(); +return; +} + +for (TopicPartition tp : partitions) { +String topic = (tp != null) ? tp.topic() : null; +if (Utils.isBlank(topic)) +throw new IllegalArgumentException("Topic partitions to assign to cannot have null or empty topic"); +} +// TODO: implement fetcher +// fetcher.clearBufferedDataForUnassignedPartitions(partitions); + +// make sure the offsets of topic partitions the consumer is unsubscribing from +// are committed since there will be no following rebalance +commit(subscriptions.allConsumed()); + +log.info("Assigned to partition(s): {}", Utils.join(partitions, ", ")); +if (this.subscriptions.assignFromUser(new HashSet<>(partitions))) + updateMetadata(time.milliseconds()); +} + +private void updateMetadata(long milliseconds) { Review Comment: Nitpicky: can we fold `updateMetadata` into `assign`? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] kirktrue commented on a diff in pull request #13607: KAFKA-14916: Fix code that assumes transactional ID implies all records are transactional
kirktrue commented on code in PR #13607: URL: https://github.com/apache/kafka/pull/13607#discussion_r1180838837 ## clients/src/test/java/org/apache/kafka/common/requests/ProduceRequestTest.java: ## @@ -238,6 +238,37 @@ public void testV6AndBelowCannotUseZStdCompression() { // Works fine with current version (>= 7) ProduceRequest.forCurrentMagic(produceData); } + +@Test +public void testNoMixedProducerIds() { +final long producerId1 = 15L; +final long producerId2 = 16L; +final short producerEpoch = 5; +final int sequence = 10; + +final MemoryRecords records = MemoryRecords.withRecords(CompressionType.NONE, +new SimpleRecord("foo".getBytes())); +final MemoryRecords txnRecords = MemoryRecords.withTransactionalRecords(CompressionType.NONE, producerId1, +producerEpoch, sequence, new SimpleRecord("bar".getBytes())); +final MemoryRecords idempotentRecords = MemoryRecords.withIdempotentRecords(CompressionType.NONE, producerId2, +producerEpoch, sequence, new SimpleRecord("bee".getBytes())); + + +ProduceRequest.Builder requestBuilder = ProduceRequest.forCurrentMagic( +new ProduceRequestData() +.setTopicData(new ProduceRequestData.TopicProduceDataCollection(Arrays.asList( +new ProduceRequestData.TopicProduceData().setName("foo").setPartitionData(Collections.singletonList( +new ProduceRequestData.PartitionProduceData().setIndex(0).setRecords(records))), +new ProduceRequestData.TopicProduceData().setName("bar").setPartitionData(Collections.singletonList( +new ProduceRequestData.PartitionProduceData().setIndex(1).setRecords(txnRecords))), +new ProduceRequestData.TopicProduceData().setName("bee").setPartitionData(Collections.singletonList( Review Comment: Please change `bee` to `baz` ## clients/src/main/java/org/apache/kafka/common/requests/ProduceRequest.java: ## @@ -222,6 +223,27 @@ public void clearPartitionRecords() { partitionSizes(); data = null; } + +public static void validateProducerIds(short version, ProduceRequestData data) { +if (version >= 3) { +long producerId = -1; +for (ProduceRequestData.TopicProduceData topicData : data.topicData()) { +for (ProduceRequestData.PartitionProduceData partitionData : topicData.partitionData()) { +BaseRecords baseRecords = partitionData.records(); +if (baseRecords instanceof Records) { +Records records = (Records) baseRecords; +for (RecordBatch batch : records.batches()) { +if (producerId == -1 && batch.hasProducerId()) Review Comment: Is it an error if there's one batch with producer ID and another without one? ## core/src/main/scala/kafka/server/ReplicaManager.scala: ## @@ -642,7 +642,14 @@ class ReplicaManager(val config: KafkaConfig, (entriesPerPartition, Map.empty) else entriesPerPartition.partition { case (topicPartition, records) => - getPartitionOrException(topicPartition).hasOngoingTransaction(records.firstBatch().producerId()) +// Produce requests (only requests that require verification) should only have one batch in "batches" but check all just to be safe. +val transactionalBatches = records.batches.asScala.filter(batch => batch.hasProducerId && batch.isTransactional) Review Comment: Is it valid for `isTransactional` to be `true` but `hasProducerId` to be `false`? ## clients/src/main/java/org/apache/kafka/common/requests/ProduceRequest.java: ## @@ -222,6 +223,27 @@ public void clearPartitionRecords() { partitionSizes(); data = null; } + +public static void validateProducerIds(short version, ProduceRequestData data) { +if (version >= 3) { Review Comment: For the uninitiated, can we move and/or comment why version `3` is special? ## clients/src/test/java/org/apache/kafka/common/requests/ProduceRequestTest.java: ## @@ -238,6 +238,37 @@ public void testV6AndBelowCannotUseZStdCompression() { // Works fine with current version (>= 7) ProduceRequest.forCurrentMagic(produceData); } + +@Test +public void testNoMixedProducerIds() { +final long producerId1 = 15L; +final long producerId2 = 16L; +final short producerEpoch = 5; +final int sequence = 10; + +final MemoryRecords records = MemoryRecords.withRecords(CompressionType.NONE, +new SimpleRecord("foo".getBytes())); +final MemoryRecords
[GitHub] [kafka] mumrah commented on pull request #13647: MINOR: fix KRaftClusterTest and KRaft integration test failure
mumrah commented on PR #13647: URL: https://github.com/apache/kafka/pull/13647#issuecomment-1528072990 The test failure was introduced by a commit fairly late in #13407. I did briefly investigate it, but couldn't reproduce it locally, so I figured it was existing flakiness. Basically, it's just my fault for not looking more closely at the test failures. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] jeffkbkim commented on a diff in pull request #13637: KAFKA-14462; [10/N] Add TargetAssignmentBuilder
jeffkbkim commented on code in PR #13637: URL: https://github.com/apache/kafka/pull/13637#discussion_r1180707601 ## group-coordinator/src/main/java/org/apache/kafka/coordinator/group/consumer/TargetAssignmentBuilder.java: ## @@ -0,0 +1,327 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.kafka.coordinator.group.consumer; + +import org.apache.kafka.common.Uuid; +import org.apache.kafka.coordinator.group.Record; +import org.apache.kafka.coordinator.group.assignor.AssignmentMemberSpec; +import org.apache.kafka.coordinator.group.assignor.AssignmentSpec; +import org.apache.kafka.coordinator.group.assignor.AssignmentTopicMetadata; +import org.apache.kafka.coordinator.group.assignor.GroupAssignment; +import org.apache.kafka.coordinator.group.assignor.MemberAssignment; +import org.apache.kafka.coordinator.group.assignor.PartitionAssignor; +import org.apache.kafka.coordinator.group.assignor.PartitionAssignorException; + +import java.util.ArrayList; +import java.util.Collections; +import java.util.HashMap; +import java.util.HashSet; +import java.util.List; +import java.util.Map; +import java.util.Objects; +import java.util.Optional; +import java.util.Set; + +import static org.apache.kafka.coordinator.group.RecordHelpers.newTargetAssignmentEpochRecord; +import static org.apache.kafka.coordinator.group.RecordHelpers.newTargetAssignmentRecord; + +/** + * Build a new Target Assignment based on the provided parameters. As a result, + * it yields the records that must be persisted to the log and the new member + * assignments as a map. + * + * Records are only created for members which have a new target assignment. If + * their assignment did not change, no new record is needed. + * + * When a member is deleted, it is assumed that its target assignment record + * is deleted as part of the member deletion process. In other words, this class + * does not yield a tombstone for removed members. + */ +public class TargetAssignmentBuilder { +/** + * The assignment result returned by {{@link TargetAssignmentBuilder#build()}}. + */ +public static class TargetAssignmentResult { +/** + * The records that must be applied to the __consumer_offsets + * topics to persist the new target assignment. + */ +private final List records; + +/** + * The new target assignment for all members. + */ +private final Map assignments; + +TargetAssignmentResult( +List records, +Map assignments +) { +Objects.requireNonNull(records); +Objects.requireNonNull(assignments); +this.records = records; +this.assignments = assignments; +} + +/** + * @return The records. + */ +public List records() { +return records; +} + +/** + * @return The assignments. + */ +public Map assignments() { +return assignments; +} +} + +/** + * The group id. + */ +private final String groupId; + +/** + * The group epoch. + */ +private final int groupEpoch; + +/** + * The partition assignor used to compute the assignment. + */ +private final PartitionAssignor assignor; + +/** + * The members in the group. + */ +private Map members = Collections.emptyMap(); + +/** + * The subscription metadata. + */ +private Map subscriptionMetadata = Collections.emptyMap(); + +/** + * The existing target assignment. + */ +private Map assignments = Collections.emptyMap(); + +/** + * The members which have been updated or deleted. Deleted members + * are signaled by a null value. + */ +private final Map updatedMembers = new HashMap<>(); + +/** + * Constructs the object. + * + * @param groupId The group id. + * @param groupEpochThe group epoch to compute a target assignment for. + * @param assignor The assignor to use to compute the target assignment. + */ +public TargetAssignmentBuilder( +String groupId, +int groupEpoch, +
[GitHub] [kafka] kirktrue commented on a diff in pull request #13617: MINOR:code optimization in QuorumController
kirktrue commented on code in PR #13617: URL: https://github.com/apache/kafka/pull/13617#discussion_r1180727333 ## metadata/src/main/java/org/apache/kafka/controller/QuorumController.java: ## @@ -1635,7 +1636,7 @@ private enum ImbalanceSchedule { /** * Tracks if a snapshot generate was scheduled. */ -private boolean generateSnapshotScheduled = false; +private final boolean generateSnapshotScheduled; Review Comment: If the value is always set to `false`, why have the variable at all? 樂 -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] kirktrue commented on a diff in pull request #13623: KAFKA-14926: Remove metrics on Log Cleaner shutdown
kirktrue commented on code in PR #13623: URL: https://github.com/apache/kafka/pull/13623#discussion_r1180722737 ## core/src/main/scala/kafka/log/LogCleaner.scala: ## @@ -167,8 +167,20 @@ class LogCleaner(initialConfig: CleanerConfig, */ def shutdown(): Unit = { info("Shutting down the log cleaner.") -cleaners.foreach(_.shutdown()) -cleaners.clear() +try { + cleaners.foreach(_.shutdown()) + cleaners.clear() +} finally { + remoteMetrics() +} + } + + def remoteMetrics(): Unit = { Review Comment: Should this method name be `removeMetrics` (a `v` instead of a `t`)? ## core/src/main/scala/kafka/log/LogCleaner.scala: ## @@ -167,8 +167,20 @@ class LogCleaner(initialConfig: CleanerConfig, */ def shutdown(): Unit = { info("Shutting down the log cleaner.") -cleaners.foreach(_.shutdown()) -cleaners.clear() +try { + cleaners.foreach(_.shutdown()) + cleaners.clear() +} finally { + remoteMetrics() +} + } + + def remoteMetrics(): Unit = { +metricsGroup.removeMetric("max-buffer-utilization-percent") +metricsGroup.removeMetric("cleaner-recopy-percent") +metricsGroup.removeMetric("max-clean-time-secs") +metricsGroup.removeMetric("max-compaction-delay-secs") +metricsGroup.removeMetric("DeadThreadCount") Review Comment: I had a similar thought. Calling them out together as constants could also remind future contributors that might add a metric that they need to remove the metric too. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] kirktrue commented on pull request #13528: KAFKA-14879: Update system tests to use latest versions
kirktrue commented on PR #13528: URL: https://github.com/apache/kafka/pull/13528#issuecomment-1527918482 Test failures are Kraft-related and do not appear to be caused by this change. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] kirktrue commented on pull request #13591: KAFKA-14831: Illegal state errors should be fatal in transactional producer
kirktrue commented on PR #13591: URL: https://github.com/apache/kafka/pull/13591#issuecomment-1527916726 The test failures are Kraft-related and don't appear to be caused by this change. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] kirktrue commented on pull request #13640: KAFKA-14937: Refactoring for client code to reduce boilerplate
kirktrue commented on PR #13640: URL: https://github.com/apache/kafka/pull/13640#issuecomment-1527914720 The Spotbugs validation is passing and there are no new test failures. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] jolshan commented on a diff in pull request #12149: KAFKA-13668: Retry upon missing initProducerId due to authorization error
jolshan commented on code in PR #12149: URL: https://github.com/apache/kafka/pull/12149#discussion_r1180672745 ## clients/src/main/java/org/apache/kafka/clients/producer/internals/Sender.java: ## @@ -300,9 +301,13 @@ void runOnce() { try { transactionManager.maybeResolveSequences(); +RuntimeException lastError = transactionManager.lastError(); +if (transactionManager.hasAbortableError() && shouldHandleAuthorizationError(lastError)) { +return; +} + Review Comment: Ah I see we had the abortable error check. Ok well now we are doubly covered. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] jolshan commented on a diff in pull request #12149: KAFKA-13668: Retry upon missing initProducerId due to authorization error
jolshan commented on code in PR #12149: URL: https://github.com/apache/kafka/pull/12149#discussion_r1180672146 ## clients/src/main/java/org/apache/kafka/clients/producer/internals/Sender.java: ## @@ -300,9 +301,13 @@ void runOnce() { try { transactionManager.maybeResolveSequences(); +RuntimeException lastError = transactionManager.lastError(); +if (transactionManager.hasAbortableError() && shouldHandleAuthorizationError(lastError)) { +return; +} + Review Comment: Yup -- my concern is we unecessarily reset the producer to initializing in the fatal error case. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] ijuma commented on pull request #13647: MINOR: fix KRaftClusterTest and KRaft integration test failure
ijuma commented on PR #13647: URL: https://github.com/apache/kafka/pull/13647#issuecomment-1527834450 @mumrah the original PR had the same failures, how come we merged it? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] mjsax commented on a diff in pull request #11433: KAFKA-13295: Avoiding Transation timeouts arising due to long restora…
mjsax commented on code in PR #11433: URL: https://github.com/apache/kafka/pull/11433#discussion_r832815852 ## streams/src/main/java/org/apache/kafka/streams/processor/internals/TaskManager.java: ## @@ -262,6 +262,42 @@ private void closeDirtyAndRevive(final Collection taskWithChangelogs, fina } } +private void commitActiveTasks(final Set activeTasksNeedCommit, final AtomicReference activeTasksCommitException) { Review Comment: Can't we reuse the existing `commitTasksAndMaybeUpdateCommittableOffsets()` ? ## streams/src/main/java/org/apache/kafka/streams/processor/internals/TaskManager.java: ## @@ -342,7 +342,19 @@ public void handleAssignment(final Map> activeTasks, maybeThrowTaskExceptions(taskCloseExceptions); -createNewTasks(activeTasksToCreate, standbyTasksToCreate); +rebalanceInProgress = true; +final Collection newActiveTasks = createNewTasks(activeTasksToCreate, standbyTasksToCreate); +// If there are any transactions in flight and there are newly created active tasks, commit the tasks +// to avoid potential long restoration times. +if (processingMode == EXACTLY_ONCE_V2 && threadProducer().transactionInFlight() && !newActiveTasks.isEmpty()) { +log.info("New active tasks were added and there is an inflight transaction. Attempting to commit tasks."); +final int numCommitted = commitTasksAndMaybeUpdateCommittableOffsets(tasks.allTasks(), new HashMap<>()); Review Comment: We pass in the second parameter to be able to handle timeout exceptions correctly (cf. other places where we call `commitTasksAndMaybeUpdateCommittableOffsets`) -- here, we don't catch `TimeoutException`. What is the impact? Seems it bubble into the consumer and crashes us? Given that we kinda need to commit, it seems there are two things we could do: either add a loop here, and retry the commit until we exceed `task.timeout.config` (not my personally preferred solution), or actually move the whole commit logic into the restore part (not sure if this is easily possible) -- ie, each time we enter the restore code, we check if there is an open TX and commit. Not sure how this would align this the new state-updated core though. \cc @cadonna @lucasbru -- can you comment? ## streams/src/main/java/org/apache/kafka/streams/processor/internals/TaskManager.java: ## @@ -342,7 +342,19 @@ public void handleAssignment(final Map> activeTasks, maybeThrowTaskExceptions(taskCloseExceptions); -createNewTasks(activeTasksToCreate, standbyTasksToCreate); +rebalanceInProgress = true; +final Collection newActiveTasks = createNewTasks(activeTasksToCreate, standbyTasksToCreate); +// If there are any transactions in flight and there are newly created active tasks, commit the tasks +// to avoid potential long restoration times. +if (processingMode == EXACTLY_ONCE_V2 && threadProducer().transactionInFlight() && !newActiveTasks.isEmpty()) { Review Comment: Why is this limited to EOS_v2? From my understanding, EOS_v1 would have the same problem? It also only seems to be a potential issue, when we get stateful tasks assigned? Not sure if we can limit the last check to `!newActiveStatefulTasks.isEmpty()` ? ## streams/src/main/java/org/apache/kafka/streams/processor/internals/TaskManager.java: ## @@ -342,7 +342,19 @@ public void handleAssignment(final Map> activeTasks, maybeThrowTaskExceptions(taskCloseExceptions); -createNewTasks(activeTasksToCreate, standbyTasksToCreate); +rebalanceInProgress = true; +final Collection newActiveTasks = createNewTasks(activeTasksToCreate, standbyTasksToCreate); Review Comment: Seems we only the count (or a boolean) if empty or not, but not the full collection. Can we simplify this? ## streams/src/main/java/org/apache/kafka/streams/processor/internals/TaskManager.java: ## @@ -342,7 +342,19 @@ public void handleAssignment(final Map> activeTasks, maybeThrowTaskExceptions(taskCloseExceptions); -createNewTasks(activeTasksToCreate, standbyTasksToCreate); +rebalanceInProgress = true; Review Comment: If we set `rebalanceInProgress = true`, it seems the later call to `commitTasksAndMaybeUpdateCommittableOffsets` would exit early and return `-1`? Does not seem right? I cannot remember the full purpose of `rebalanceInProgress` flag, and given cooperator rebalancing and processing record during a rebalance, wondering if semantics actually changed? Edit: just see that there is a discussion about this below. ## streams/src/main/java/org/apache/kafka/streams/processor/internals/TaskManager.java: ## @@ -342,7 +342,18 @@ public void handleAssignment(final Map> activeTasks,
[GitHub] [kafka] philipnee commented on a diff in pull request #12149: KAFKA-13668: Retry upon missing initProducerId due to authorization error
philipnee commented on code in PR #12149: URL: https://github.com/apache/kafka/pull/12149#discussion_r1180625792 ## clients/src/main/java/org/apache/kafka/clients/producer/internals/Sender.java: ## @@ -300,9 +301,13 @@ void runOnce() { try { transactionManager.maybeResolveSequences(); +RuntimeException lastError = transactionManager.lastError(); +if (transactionManager.hasAbortableError() && shouldHandleAuthorizationError(lastError)) { +return; +} + Review Comment: sounds good - perhaps you meant by checking for fatal first then abortable error? Though I think it doesn't change the logic there. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] jolshan commented on a diff in pull request #12149: KAFKA-13668: Retry upon missing initProducerId due to authorization error
jolshan commented on code in PR #12149: URL: https://github.com/apache/kafka/pull/12149#discussion_r1180619843 ## clients/src/main/java/org/apache/kafka/clients/producer/internals/Sender.java: ## @@ -300,9 +301,13 @@ void runOnce() { try { transactionManager.maybeResolveSequences(); +RuntimeException lastError = transactionManager.lastError(); +if (transactionManager.hasAbortableError() && shouldHandleAuthorizationError(lastError)) { +return; +} + Review Comment: I thought I left this comment, but seems like it didn't take -- should we move the fatal error check above the auth check? Since for other request types, we will go through the above path, but it really should just be a fatal error. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] jolshan commented on a diff in pull request #13637: KAFKA-14462; [10/N] Add TargetAssignmentBuilder
jolshan commented on code in PR #13637: URL: https://github.com/apache/kafka/pull/13637#discussion_r1180613326 ## group-coordinator/src/test/java/org/apache/kafka/coordinator/group/consumer/ConsumerGroupMemberTest.java: ## @@ -0,0 +1,307 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.kafka.coordinator.group.consumer; + +import org.apache.kafka.common.Uuid; +import org.apache.kafka.coordinator.group.generated.ConsumerGroupCurrentMemberAssignmentValue; +import org.apache.kafka.coordinator.group.generated.ConsumerGroupMemberMetadataValue; +import org.junit.jupiter.api.Test; + +import java.nio.ByteBuffer; +import java.util.Arrays; +import java.util.Collections; +import java.util.Optional; +import java.util.OptionalInt; + +import static org.apache.kafka.coordinator.group.consumer.AssignmentTestUtil.mkAssignment; +import static org.apache.kafka.coordinator.group.consumer.AssignmentTestUtil.mkTopicAssignment; +import static org.junit.jupiter.api.Assertions.assertEquals; + +public class ConsumerGroupMemberTest { + +@Test +public void testNewMember() { +Uuid topicId1 = Uuid.randomUuid(); +Uuid topicId2 = Uuid.randomUuid(); +Uuid topicId3 = Uuid.randomUuid(); + +ConsumerGroupMember member = new ConsumerGroupMember.Builder("member-id") +.setMemberEpoch(10) +.setPreviousMemberEpoch(9) +.setNextMemberEpoch(11) +.setInstanceId("instance-id") +.setRackId("rack-id") +.setRebalanceTimeoutMs(5000) +.setClientId("client-id") +.setClientHost("hostname") +.setSubscribedTopicNames(Arrays.asList("foo", "bar")) +.setSubscribedTopicRegex("regex") +.setServerAssignorName("range") +.setClientAssignors(Collections.singletonList( +new ClientAssignor( +"assignor", +(byte) 0, +(byte) 0, +(byte) 1, +new VersionedMetadata( +(byte) 1, +ByteBuffer.allocate(0) +.setAssignedPartitions(mkAssignment( +mkTopicAssignment(topicId1, 1, 2, 3))) +.setPartitionsPendingRevocation(mkAssignment( +mkTopicAssignment(topicId2, 4, 5, 6))) +.setPartitionsPendingAssignment(mkAssignment( +mkTopicAssignment(topicId3, 7, 8, 9))) +.build(); + +assertEquals("member-id", member.memberId()); +assertEquals(10, member.memberEpoch()); +assertEquals(9, member.previousMemberEpoch()); +assertEquals(11, member.nextMemberEpoch()); +assertEquals("instance-id", member.instanceId()); +assertEquals("rack-id", member.rackId()); +assertEquals("client-id", member.clientId()); +assertEquals("hostname", member.clientHost()); +// Names are sorted. +assertEquals(Arrays.asList("bar", "foo"), member.subscribedTopicNames()); +assertEquals("regex", member.subscribedTopicRegex()); +assertEquals("range", member.serverAssignorName().get()); +assertEquals( +Collections.singletonList( +new ClientAssignor( +"assignor", +(byte) 0, +(byte) 0, +(byte) 1, +new VersionedMetadata( +(byte) 1, +ByteBuffer.allocate(0, +member.clientAssignors()); +assertEquals(mkAssignment(mkTopicAssignment(topicId1, 1, 2, 3)), member.assignedPartitions()); +assertEquals(mkAssignment(mkTopicAssignment(topicId2, 4, 5, 6)), member.partitionsPendingRevocation()); +assertEquals(mkAssignment(mkTopicAssignment(topicId3, 7, 8, 9)), member.partitionsPendingAssignment()); +} + +@Test +public void testEquals() { +Uuid topicId1 = Uuid.randomUuid(); Review Comment: Sorry -- my bad for misunderstanding Java earlier -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above
[jira] [Commented] (KAFKA-13891) sync group failed with rebalanceInProgress error cause rebalance many rounds in coopeartive
[ https://issues.apache.org/jira/browse/KAFKA-13891?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17717745#comment-17717745 ] Philip Nee commented on KAFKA-13891: [~dajac] - I think we resolve this and the subsequent issues (KAFKA-14016) > sync group failed with rebalanceInProgress error cause rebalance many rounds > in coopeartive > --- > > Key: KAFKA-13891 > URL: https://issues.apache.org/jira/browse/KAFKA-13891 > Project: Kafka > Issue Type: Bug > Components: clients >Affects Versions: 3.0.0 >Reporter: Shawn Wang >Assignee: Philip Nee >Priority: Major > Fix For: 3.5.0 > > > This issue was first found in > [KAFKA-13419|https://issues.apache.org/jira/browse/KAFKA-13419] > But the previous PR forgot to reset generation when sync group failed with > rebalanceInProgress error. So the previous bug still exists and it may cause > consumer to rebalance many rounds before final stable. > Here's the example ({*}bold is added{*}): > # consumer A joined and synced group successfully with generation 1 *( with > ownedPartition P1/P2 )* > # New rebalance started with generation 2, consumer A joined successfully, > but somehow, consumer A doesn't send out sync group immediately > # other consumer completed sync group successfully in generation 2, except > consumer A. > # After consumer A send out sync group, the new rebalance start, with > generation 3. So consumer A got REBALANCE_IN_PROGRESS error with sync group > response > # When receiving REBALANCE_IN_PROGRESS, we re-join the group, with > generation 3, with the assignment (ownedPartition) in generation 1. > # So, now, we have out-of-date ownedPartition sent, with unexpected results > happened > # *After the generation-3 rebalance, consumer A got P3/P4 partition. the > ownedPartition is ignored because of old generation.* > # *consumer A revoke P1/P2 and re-join to start a new round of rebalance* > # *if some other consumer C failed to syncGroup before consumer A's > joinGroup. the same issue will happens again and result in many rounds of > rebalance before stable* > -- This message was sent by Atlassian Jira (v8.20.10#820010)
[GitHub] [kafka] dajac commented on a diff in pull request #13637: KAFKA-14462; [10/N] Add TargetAssignmentBuilder
dajac commented on code in PR #13637: URL: https://github.com/apache/kafka/pull/13637#discussion_r1180608631 ## group-coordinator/src/test/java/org/apache/kafka/coordinator/group/consumer/TargetAssignmentBuilderTest.java: ## @@ -0,0 +1,698 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.kafka.coordinator.group.consumer; + +import org.apache.kafka.common.Uuid; +import org.apache.kafka.coordinator.group.assignor.AssignmentMemberSpec; +import org.apache.kafka.coordinator.group.assignor.AssignmentSpec; +import org.apache.kafka.coordinator.group.assignor.AssignmentTopicMetadata; +import org.apache.kafka.coordinator.group.assignor.GroupAssignment; +import org.apache.kafka.coordinator.group.assignor.MemberAssignment; +import org.apache.kafka.coordinator.group.assignor.PartitionAssignor; +import org.junit.jupiter.api.Test; + +import java.util.Arrays; +import java.util.Collections; +import java.util.HashMap; +import java.util.HashSet; +import java.util.List; +import java.util.Map; +import java.util.Optional; +import java.util.Set; + +import static org.apache.kafka.coordinator.group.consumer.AssignmentTestUtil.mkAssignment; +import static org.apache.kafka.coordinator.group.consumer.AssignmentTestUtil.mkTopicAssignment; +import static org.apache.kafka.coordinator.group.RecordHelpers.newTargetAssignmentEpochRecord; +import static org.apache.kafka.coordinator.group.RecordHelpers.newTargetAssignmentRecord; +import static org.apache.kafka.coordinator.group.consumer.TargetAssignmentBuilder.createAssignmentMemberSpec; +import static org.junit.jupiter.api.Assertions.assertEquals; +import static org.mockito.ArgumentMatchers.any; +import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.times; +import static org.mockito.Mockito.verify; +import static org.mockito.Mockito.when; + +public class TargetAssignmentBuilderTest { + +public static class TargetAssignmentBuilderTestContext { +private final String groupId; +private final int groupEpoch; +private final PartitionAssignor assignor = mock(PartitionAssignor.class); +private final Map members = new HashMap<>(); +private final Map subscriptionMetadata = new HashMap<>(); +private final Map updatedMembers = new HashMap<>(); +private final Map targetAssignments = new HashMap<>(); +private final Map assignments = new HashMap<>(); + +public TargetAssignmentBuilderTestContext( +String groupId, +int groupEpoch +) { +this.groupId = groupId; +this.groupEpoch = groupEpoch; +} + +public void addGroupMember( +String memberId, +List subscriptions, +Map> targetPartitions +) { +members.put(memberId, new ConsumerGroupMember.Builder(memberId) +.setSubscribedTopicNames(subscriptions) +.setRebalanceTimeoutMs(5000) +.build()); + +targetAssignments.put(memberId, new Assignment( +(byte) 0, +targetPartitions, +VersionedMetadata.EMPTY +)); +} + +public Uuid addTopicMetadata( +String topicName, +int numPartitions +) { +Uuid topicId = Uuid.randomUuid(); +subscriptionMetadata.put(topicName, new TopicMetadata( +topicId, +topicName, +numPartitions +)); +return topicId; +} + +public void updateMemberSubscription( +String memberId, +List subscriptions +) { +updateMemberSubscription( +memberId, +subscriptions, +Optional.empty(), +Optional.empty() +); +} + +public void updateMemberSubscription( +String memberId, +List subscriptions, +Optional instanceId, +Optional rackId +) { +ConsumerGroupMember existingMember = members.get(memberId); +ConsumerGroupMember.Builder builder; +if
[GitHub] [kafka] dajac commented on a diff in pull request #13637: KAFKA-14462; [10/N] Add TargetAssignmentBuilder
dajac commented on code in PR #13637: URL: https://github.com/apache/kafka/pull/13637#discussion_r1180608354 ## group-coordinator/src/test/java/org/apache/kafka/coordinator/group/consumer/TargetAssignmentBuilderTest.java: ## @@ -0,0 +1,698 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.kafka.coordinator.group.consumer; + +import org.apache.kafka.common.Uuid; +import org.apache.kafka.coordinator.group.assignor.AssignmentMemberSpec; +import org.apache.kafka.coordinator.group.assignor.AssignmentSpec; +import org.apache.kafka.coordinator.group.assignor.AssignmentTopicMetadata; +import org.apache.kafka.coordinator.group.assignor.GroupAssignment; +import org.apache.kafka.coordinator.group.assignor.MemberAssignment; +import org.apache.kafka.coordinator.group.assignor.PartitionAssignor; +import org.junit.jupiter.api.Test; + +import java.util.Arrays; +import java.util.Collections; +import java.util.HashMap; +import java.util.HashSet; +import java.util.List; +import java.util.Map; +import java.util.Optional; +import java.util.Set; + +import static org.apache.kafka.coordinator.group.consumer.AssignmentTestUtil.mkAssignment; +import static org.apache.kafka.coordinator.group.consumer.AssignmentTestUtil.mkTopicAssignment; +import static org.apache.kafka.coordinator.group.RecordHelpers.newTargetAssignmentEpochRecord; +import static org.apache.kafka.coordinator.group.RecordHelpers.newTargetAssignmentRecord; +import static org.apache.kafka.coordinator.group.consumer.TargetAssignmentBuilder.createAssignmentMemberSpec; +import static org.junit.jupiter.api.Assertions.assertEquals; +import static org.mockito.ArgumentMatchers.any; +import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.times; +import static org.mockito.Mockito.verify; +import static org.mockito.Mockito.when; + +public class TargetAssignmentBuilderTest { + +public static class TargetAssignmentBuilderTestContext { +private final String groupId; +private final int groupEpoch; +private final PartitionAssignor assignor = mock(PartitionAssignor.class); +private final Map members = new HashMap<>(); +private final Map subscriptionMetadata = new HashMap<>(); +private final Map updatedMembers = new HashMap<>(); +private final Map targetAssignments = new HashMap<>(); +private final Map assignments = new HashMap<>(); + +public TargetAssignmentBuilderTestContext( +String groupId, +int groupEpoch +) { +this.groupId = groupId; +this.groupEpoch = groupEpoch; +} + +public void addGroupMember( +String memberId, +List subscriptions, +Map> targetPartitions +) { +members.put(memberId, new ConsumerGroupMember.Builder(memberId) +.setSubscribedTopicNames(subscriptions) +.setRebalanceTimeoutMs(5000) +.build()); + +targetAssignments.put(memberId, new Assignment( +(byte) 0, +targetPartitions, +VersionedMetadata.EMPTY +)); +} + +public Uuid addTopicMetadata( +String topicName, +int numPartitions +) { +Uuid topicId = Uuid.randomUuid(); +subscriptionMetadata.put(topicName, new TopicMetadata( +topicId, +topicName, +numPartitions +)); +return topicId; +} + +public void updateMemberSubscription( +String memberId, +List subscriptions +) { +updateMemberSubscription( +memberId, +subscriptions, +Optional.empty(), +Optional.empty() +); +} + +public void updateMemberSubscription( +String memberId, +List subscriptions, +Optional instanceId, +Optional rackId +) { +ConsumerGroupMember existingMember = members.get(memberId); +ConsumerGroupMember.Builder builder; +if
[GitHub] [kafka] dajac commented on a diff in pull request #13637: KAFKA-14462; [10/N] Add TargetAssignmentBuilder
dajac commented on code in PR #13637: URL: https://github.com/apache/kafka/pull/13637#discussion_r1180607217 ## group-coordinator/src/test/java/org/apache/kafka/coordinator/group/consumer/TargetAssignmentBuilderTest.java: ## @@ -0,0 +1,688 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.kafka.coordinator.group.consumer; + +import org.apache.kafka.common.Uuid; +import org.apache.kafka.coordinator.group.assignor.AssignmentMemberSpec; +import org.apache.kafka.coordinator.group.assignor.AssignmentSpec; +import org.apache.kafka.coordinator.group.assignor.AssignmentTopicMetadata; +import org.apache.kafka.coordinator.group.assignor.GroupAssignment; +import org.apache.kafka.coordinator.group.assignor.PartitionAssignor; +import org.junit.jupiter.api.Test; + +import java.util.Arrays; +import java.util.Collections; +import java.util.HashMap; +import java.util.HashSet; +import java.util.List; +import java.util.Map; +import java.util.Optional; +import java.util.Set; + +import static org.apache.kafka.coordinator.group.consumer.AssignmentTestUtil.mkAssignment; +import static org.apache.kafka.coordinator.group.consumer.AssignmentTestUtil.mkTopicAssignment; +import static org.apache.kafka.coordinator.group.RecordHelpers.newTargetAssignmentEpochRecord; +import static org.apache.kafka.coordinator.group.RecordHelpers.newTargetAssignmentRecord; +import static org.junit.jupiter.api.Assertions.assertEquals; +import static org.mockito.ArgumentMatchers.any; +import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.times; +import static org.mockito.Mockito.verify; +import static org.mockito.Mockito.when; + +public class TargetAssignmentBuilderTest { + +public static class TargetAssignmentBuilderTestContext { +private final String groupId; +private final int groupEpoch; Review Comment: Ack. No worries. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] dajac commented on a diff in pull request #13637: KAFKA-14462; [10/N] Add TargetAssignmentBuilder
dajac commented on code in PR #13637: URL: https://github.com/apache/kafka/pull/13637#discussion_r1180606875 ## group-coordinator/src/main/java/org/apache/kafka/coordinator/group/consumer/TargetAssignmentBuilder.java: ## @@ -0,0 +1,327 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.kafka.coordinator.group.consumer; + +import org.apache.kafka.common.Uuid; +import org.apache.kafka.coordinator.group.Record; +import org.apache.kafka.coordinator.group.assignor.AssignmentMemberSpec; +import org.apache.kafka.coordinator.group.assignor.AssignmentSpec; +import org.apache.kafka.coordinator.group.assignor.AssignmentTopicMetadata; +import org.apache.kafka.coordinator.group.assignor.GroupAssignment; +import org.apache.kafka.coordinator.group.assignor.MemberAssignment; +import org.apache.kafka.coordinator.group.assignor.PartitionAssignor; +import org.apache.kafka.coordinator.group.assignor.PartitionAssignorException; + +import java.util.ArrayList; +import java.util.Collections; +import java.util.HashMap; +import java.util.HashSet; +import java.util.List; +import java.util.Map; +import java.util.Objects; +import java.util.Optional; +import java.util.Set; + +import static org.apache.kafka.coordinator.group.RecordHelpers.newTargetAssignmentEpochRecord; +import static org.apache.kafka.coordinator.group.RecordHelpers.newTargetAssignmentRecord; + +/** + * Build a new Target Assignment based on the provided parameters. As a result, + * it yields the records that must be persisted to the log and the new member + * assignments as a map. + * + * Records are only created for members which have a new target assignment. If + * their assignment did not change, no new record is needed. + * + * When a member is deleted, it is assumed that its target assignment record + * is deleted as part of the member deletion process. In other words, this class + * does not yield a tombstone for removed members. + */ +public class TargetAssignmentBuilder { +/** + * The assignment result returned by {{@link TargetAssignmentBuilder#build()}}. + */ +public static class TargetAssignmentResult { +/** + * The records that must be applied to the __consumer_offsets + * topics to persist the new target assignment. + */ +private final List records; + +/** + * The new target assignment for all members. + */ +private final Map assignments; + +TargetAssignmentResult( +List records, +Map assignments +) { +Objects.requireNonNull(records); +Objects.requireNonNull(assignments); +this.records = records; +this.assignments = assignments; +} + +/** + * @return The records. + */ +public List records() { +return records; +} + +/** + * @return The assignments. + */ +public Map assignments() { +return assignments; +} +} + +/** + * The group id. + */ +private final String groupId; + +/** + * The group epoch. + */ +private final int groupEpoch; + +/** + * The partition assignor used to compute the assignment. + */ +private final PartitionAssignor assignor; + +/** + * The members in the group. + */ +private Map members = Collections.emptyMap(); + +/** + * The subscription metadata. + */ +private Map subscriptionMetadata = Collections.emptyMap(); + +/** + * The current target assignment. + */ +private Map assignments = Collections.emptyMap(); + +/** + * The members which have been updated or deleted. Deleted members + * are signaled by a null value. + */ +private final Map updatedMembers = new HashMap<>(); + +/** + * Constructs the object. + * + * @param groupId The group id. + * @param groupEpochThe group epoch to compute a target assignment for. + * @param assignor The assignor to use to compute the target assignment. + */ +public TargetAssignmentBuilder( +String groupId, +int groupEpoch, +
[GitHub] [kafka] dajac commented on a diff in pull request #13637: KAFKA-14462; [10/N] Add TargetAssignmentBuilder
dajac commented on code in PR #13637: URL: https://github.com/apache/kafka/pull/13637#discussion_r1180604733 ## group-coordinator/src/main/java/org/apache/kafka/coordinator/group/consumer/TargetAssignmentBuilder.java: ## @@ -0,0 +1,327 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.kafka.coordinator.group.consumer; + +import org.apache.kafka.common.Uuid; +import org.apache.kafka.coordinator.group.Record; +import org.apache.kafka.coordinator.group.assignor.AssignmentMemberSpec; +import org.apache.kafka.coordinator.group.assignor.AssignmentSpec; +import org.apache.kafka.coordinator.group.assignor.AssignmentTopicMetadata; +import org.apache.kafka.coordinator.group.assignor.GroupAssignment; +import org.apache.kafka.coordinator.group.assignor.MemberAssignment; +import org.apache.kafka.coordinator.group.assignor.PartitionAssignor; +import org.apache.kafka.coordinator.group.assignor.PartitionAssignorException; + +import java.util.ArrayList; +import java.util.Collections; +import java.util.HashMap; +import java.util.HashSet; +import java.util.List; +import java.util.Map; +import java.util.Objects; +import java.util.Optional; +import java.util.Set; + +import static org.apache.kafka.coordinator.group.RecordHelpers.newTargetAssignmentEpochRecord; +import static org.apache.kafka.coordinator.group.RecordHelpers.newTargetAssignmentRecord; + +/** + * Build a new Target Assignment based on the provided parameters. As a result, + * it yields the records that must be persisted to the log and the new member + * assignments as a map. + * + * Records are only created for members which have a new target assignment. If + * their assignment did not change, no new record is needed. + * + * When a member is deleted, it is assumed that its target assignment record + * is deleted as part of the member deletion process. In other words, this class + * does not yield a tombstone for removed members. + */ +public class TargetAssignmentBuilder { +/** + * The assignment result returned by {{@link TargetAssignmentBuilder#build()}}. + */ +public static class TargetAssignmentResult { +/** + * The records that must be applied to the __consumer_offsets + * topics to persist the new target assignment. + */ +private final List records; + +/** + * The new target assignment for all members. + */ +private final Map assignments; + +TargetAssignmentResult( +List records, +Map assignments +) { +Objects.requireNonNull(records); +Objects.requireNonNull(assignments); +this.records = records; +this.assignments = assignments; +} + +/** + * @return The records. + */ +public List records() { +return records; +} + +/** + * @return The assignments. + */ +public Map assignments() { +return assignments; +} +} + +/** + * The group id. + */ +private final String groupId; + +/** + * The group epoch. + */ +private final int groupEpoch; + +/** + * The partition assignor used to compute the assignment. + */ +private final PartitionAssignor assignor; + +/** + * The members in the group. + */ +private Map members = Collections.emptyMap(); + +/** + * The subscription metadata. + */ +private Map subscriptionMetadata = Collections.emptyMap(); + +/** + * The current target assignment. + */ +private Map assignments = Collections.emptyMap(); + +/** + * The members which have been updated or deleted. Deleted members + * are signaled by a null value. + */ +private final Map updatedMembers = new HashMap<>(); + +/** + * Constructs the object. + * + * @param groupId The group id. + * @param groupEpochThe group epoch to compute a target assignment for. + * @param assignor The assignor to use to compute the target assignment. + */ +public TargetAssignmentBuilder( +String groupId, +int groupEpoch, +
[GitHub] [kafka] philipnee opened a new pull request, #13652: Cherry pick KAFKA-14639
philipnee opened a new pull request, #13652: URL: https://github.com/apache/kafka/pull/13652 *More detailed description of your change, if necessary. The PR title and PR message become the squashed commit message, so use a separate comment to ping reviewers.* *Summary of testing strategy (including rationale) for the feature or bug fix. Unit and/or integration tests are expected for any behaviour change and system tests should be considered for larger changes.* ### Committer Checklist (excluded from commit message) - [ ] Verify design and implementation - [ ] Verify test coverage and CI build status - [ ] Verify documentation (including upgrade notes) -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] jolshan commented on a diff in pull request #13637: KAFKA-14462; [10/N] Add TargetAssignmentBuilder
jolshan commented on code in PR #13637: URL: https://github.com/apache/kafka/pull/13637#discussion_r1180601499 ## group-coordinator/src/test/java/org/apache/kafka/coordinator/group/consumer/TargetAssignmentBuilderTest.java: ## @@ -0,0 +1,698 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.kafka.coordinator.group.consumer; + +import org.apache.kafka.common.Uuid; +import org.apache.kafka.coordinator.group.assignor.AssignmentMemberSpec; +import org.apache.kafka.coordinator.group.assignor.AssignmentSpec; +import org.apache.kafka.coordinator.group.assignor.AssignmentTopicMetadata; +import org.apache.kafka.coordinator.group.assignor.GroupAssignment; +import org.apache.kafka.coordinator.group.assignor.MemberAssignment; +import org.apache.kafka.coordinator.group.assignor.PartitionAssignor; +import org.junit.jupiter.api.Test; + +import java.util.Arrays; +import java.util.Collections; +import java.util.HashMap; +import java.util.HashSet; +import java.util.List; +import java.util.Map; +import java.util.Optional; +import java.util.Set; + +import static org.apache.kafka.coordinator.group.consumer.AssignmentTestUtil.mkAssignment; +import static org.apache.kafka.coordinator.group.consumer.AssignmentTestUtil.mkTopicAssignment; +import static org.apache.kafka.coordinator.group.RecordHelpers.newTargetAssignmentEpochRecord; +import static org.apache.kafka.coordinator.group.RecordHelpers.newTargetAssignmentRecord; +import static org.apache.kafka.coordinator.group.consumer.TargetAssignmentBuilder.createAssignmentMemberSpec; +import static org.junit.jupiter.api.Assertions.assertEquals; +import static org.mockito.ArgumentMatchers.any; +import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.times; +import static org.mockito.Mockito.verify; +import static org.mockito.Mockito.when; + +public class TargetAssignmentBuilderTest { + +public static class TargetAssignmentBuilderTestContext { +private final String groupId; +private final int groupEpoch; +private final PartitionAssignor assignor = mock(PartitionAssignor.class); +private final Map members = new HashMap<>(); +private final Map subscriptionMetadata = new HashMap<>(); +private final Map updatedMembers = new HashMap<>(); +private final Map targetAssignments = new HashMap<>(); +private final Map assignments = new HashMap<>(); + +public TargetAssignmentBuilderTestContext( +String groupId, +int groupEpoch +) { +this.groupId = groupId; +this.groupEpoch = groupEpoch; +} + +public void addGroupMember( +String memberId, +List subscriptions, +Map> targetPartitions +) { +members.put(memberId, new ConsumerGroupMember.Builder(memberId) +.setSubscribedTopicNames(subscriptions) +.setRebalanceTimeoutMs(5000) +.build()); + +targetAssignments.put(memberId, new Assignment( +(byte) 0, +targetPartitions, +VersionedMetadata.EMPTY +)); +} + +public Uuid addTopicMetadata( +String topicName, +int numPartitions +) { +Uuid topicId = Uuid.randomUuid(); +subscriptionMetadata.put(topicName, new TopicMetadata( +topicId, +topicName, +numPartitions +)); +return topicId; +} + +public void updateMemberSubscription( +String memberId, +List subscriptions +) { +updateMemberSubscription( +memberId, +subscriptions, +Optional.empty(), +Optional.empty() +); +} + +public void updateMemberSubscription( +String memberId, +List subscriptions, +Optional instanceId, +Optional rackId +) { +ConsumerGroupMember existingMember = members.get(memberId); +ConsumerGroupMember.Builder builder; +if
[GitHub] [kafka] jolshan commented on a diff in pull request #13637: KAFKA-14462; [10/N] Add TargetAssignmentBuilder
jolshan commented on code in PR #13637: URL: https://github.com/apache/kafka/pull/13637#discussion_r1180598466 ## group-coordinator/src/test/java/org/apache/kafka/coordinator/group/consumer/TargetAssignmentBuilderTest.java: ## @@ -0,0 +1,698 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.kafka.coordinator.group.consumer; + +import org.apache.kafka.common.Uuid; +import org.apache.kafka.coordinator.group.assignor.AssignmentMemberSpec; +import org.apache.kafka.coordinator.group.assignor.AssignmentSpec; +import org.apache.kafka.coordinator.group.assignor.AssignmentTopicMetadata; +import org.apache.kafka.coordinator.group.assignor.GroupAssignment; +import org.apache.kafka.coordinator.group.assignor.MemberAssignment; +import org.apache.kafka.coordinator.group.assignor.PartitionAssignor; +import org.junit.jupiter.api.Test; + +import java.util.Arrays; +import java.util.Collections; +import java.util.HashMap; +import java.util.HashSet; +import java.util.List; +import java.util.Map; +import java.util.Optional; +import java.util.Set; + +import static org.apache.kafka.coordinator.group.consumer.AssignmentTestUtil.mkAssignment; +import static org.apache.kafka.coordinator.group.consumer.AssignmentTestUtil.mkTopicAssignment; +import static org.apache.kafka.coordinator.group.RecordHelpers.newTargetAssignmentEpochRecord; +import static org.apache.kafka.coordinator.group.RecordHelpers.newTargetAssignmentRecord; +import static org.apache.kafka.coordinator.group.consumer.TargetAssignmentBuilder.createAssignmentMemberSpec; +import static org.junit.jupiter.api.Assertions.assertEquals; +import static org.mockito.ArgumentMatchers.any; +import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.times; +import static org.mockito.Mockito.verify; +import static org.mockito.Mockito.when; + +public class TargetAssignmentBuilderTest { + +public static class TargetAssignmentBuilderTestContext { +private final String groupId; +private final int groupEpoch; +private final PartitionAssignor assignor = mock(PartitionAssignor.class); +private final Map members = new HashMap<>(); +private final Map subscriptionMetadata = new HashMap<>(); +private final Map updatedMembers = new HashMap<>(); +private final Map targetAssignments = new HashMap<>(); +private final Map assignments = new HashMap<>(); + +public TargetAssignmentBuilderTestContext( +String groupId, +int groupEpoch +) { +this.groupId = groupId; +this.groupEpoch = groupEpoch; +} + +public void addGroupMember( +String memberId, +List subscriptions, +Map> targetPartitions +) { +members.put(memberId, new ConsumerGroupMember.Builder(memberId) +.setSubscribedTopicNames(subscriptions) +.setRebalanceTimeoutMs(5000) +.build()); + +targetAssignments.put(memberId, new Assignment( +(byte) 0, +targetPartitions, +VersionedMetadata.EMPTY +)); +} + +public Uuid addTopicMetadata( +String topicName, +int numPartitions +) { +Uuid topicId = Uuid.randomUuid(); +subscriptionMetadata.put(topicName, new TopicMetadata( +topicId, +topicName, +numPartitions +)); +return topicId; +} + +public void updateMemberSubscription( +String memberId, +List subscriptions +) { +updateMemberSubscription( +memberId, +subscriptions, +Optional.empty(), +Optional.empty() +); +} + +public void updateMemberSubscription( +String memberId, +List subscriptions, +Optional instanceId, +Optional rackId +) { +ConsumerGroupMember existingMember = members.get(memberId); +ConsumerGroupMember.Builder builder; +if
[GitHub] [kafka] jolshan commented on a diff in pull request #13637: KAFKA-14462; [10/N] Add TargetAssignmentBuilder
jolshan commented on code in PR #13637: URL: https://github.com/apache/kafka/pull/13637#discussion_r1180593876 ## group-coordinator/src/test/java/org/apache/kafka/coordinator/group/consumer/TargetAssignmentBuilderTest.java: ## @@ -0,0 +1,688 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.kafka.coordinator.group.consumer; + +import org.apache.kafka.common.Uuid; +import org.apache.kafka.coordinator.group.assignor.AssignmentMemberSpec; +import org.apache.kafka.coordinator.group.assignor.AssignmentSpec; +import org.apache.kafka.coordinator.group.assignor.AssignmentTopicMetadata; +import org.apache.kafka.coordinator.group.assignor.GroupAssignment; +import org.apache.kafka.coordinator.group.assignor.PartitionAssignor; +import org.junit.jupiter.api.Test; + +import java.util.Arrays; +import java.util.Collections; +import java.util.HashMap; +import java.util.HashSet; +import java.util.List; +import java.util.Map; +import java.util.Optional; +import java.util.Set; + +import static org.apache.kafka.coordinator.group.consumer.AssignmentTestUtil.mkAssignment; +import static org.apache.kafka.coordinator.group.consumer.AssignmentTestUtil.mkTopicAssignment; +import static org.apache.kafka.coordinator.group.RecordHelpers.newTargetAssignmentEpochRecord; +import static org.apache.kafka.coordinator.group.RecordHelpers.newTargetAssignmentRecord; +import static org.junit.jupiter.api.Assertions.assertEquals; +import static org.mockito.ArgumentMatchers.any; +import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.times; +import static org.mockito.Mockito.verify; +import static org.mockito.Mockito.when; + +public class TargetAssignmentBuilderTest { + +public static class TargetAssignmentBuilderTestContext { +private final String groupId; +private final int groupEpoch; Review Comment: Sorry I wanted `new TargetAssignmentBuilderTestContext` per test. But I see you have that. I forgot how static classes worked in Java and thought that you could only have once instance that is shared. I see though that the semantics are different. Sorry for confusion. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] jolshan commented on a diff in pull request #13637: KAFKA-14462; [10/N] Add TargetAssignmentBuilder
jolshan commented on code in PR #13637: URL: https://github.com/apache/kafka/pull/13637#discussion_r1180590786 ## group-coordinator/src/test/java/org/apache/kafka/coordinator/group/consumer/TargetAssignmentBuilderTest.java: ## @@ -0,0 +1,688 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.kafka.coordinator.group.consumer; + +import org.apache.kafka.common.Uuid; +import org.apache.kafka.coordinator.group.assignor.AssignmentMemberSpec; +import org.apache.kafka.coordinator.group.assignor.AssignmentSpec; +import org.apache.kafka.coordinator.group.assignor.AssignmentTopicMetadata; +import org.apache.kafka.coordinator.group.assignor.GroupAssignment; +import org.apache.kafka.coordinator.group.assignor.PartitionAssignor; +import org.junit.jupiter.api.Test; + +import java.util.Arrays; +import java.util.Collections; +import java.util.HashMap; +import java.util.HashSet; +import java.util.List; +import java.util.Map; +import java.util.Optional; +import java.util.Set; + +import static org.apache.kafka.coordinator.group.consumer.AssignmentTestUtil.mkAssignment; +import static org.apache.kafka.coordinator.group.consumer.AssignmentTestUtil.mkTopicAssignment; +import static org.apache.kafka.coordinator.group.RecordHelpers.newTargetAssignmentEpochRecord; +import static org.apache.kafka.coordinator.group.RecordHelpers.newTargetAssignmentRecord; +import static org.junit.jupiter.api.Assertions.assertEquals; +import static org.mockito.ArgumentMatchers.any; +import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.times; +import static org.mockito.Mockito.verify; +import static org.mockito.Mockito.when; + +public class TargetAssignmentBuilderTest { + +public static class TargetAssignmentBuilderTestContext { +private final String groupId; +private final int groupEpoch; Review Comment: The class itself is shared amongst all the tests. We just put new data in for each test yes? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] jolshan commented on a diff in pull request #13637: KAFKA-14462; [10/N] Add TargetAssignmentBuilder
jolshan commented on code in PR #13637: URL: https://github.com/apache/kafka/pull/13637#discussion_r1180590049 ## group-coordinator/src/main/java/org/apache/kafka/coordinator/group/consumer/TargetAssignmentBuilder.java: ## @@ -0,0 +1,327 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.kafka.coordinator.group.consumer; + +import org.apache.kafka.common.Uuid; +import org.apache.kafka.coordinator.group.Record; +import org.apache.kafka.coordinator.group.assignor.AssignmentMemberSpec; +import org.apache.kafka.coordinator.group.assignor.AssignmentSpec; +import org.apache.kafka.coordinator.group.assignor.AssignmentTopicMetadata; +import org.apache.kafka.coordinator.group.assignor.GroupAssignment; +import org.apache.kafka.coordinator.group.assignor.MemberAssignment; +import org.apache.kafka.coordinator.group.assignor.PartitionAssignor; +import org.apache.kafka.coordinator.group.assignor.PartitionAssignorException; + +import java.util.ArrayList; +import java.util.Collections; +import java.util.HashMap; +import java.util.HashSet; +import java.util.List; +import java.util.Map; +import java.util.Objects; +import java.util.Optional; +import java.util.Set; + +import static org.apache.kafka.coordinator.group.RecordHelpers.newTargetAssignmentEpochRecord; +import static org.apache.kafka.coordinator.group.RecordHelpers.newTargetAssignmentRecord; + +/** + * Build a new Target Assignment based on the provided parameters. As a result, + * it yields the records that must be persisted to the log and the new member + * assignments as a map. + * + * Records are only created for members which have a new target assignment. If + * their assignment did not change, no new record is needed. + * + * When a member is deleted, it is assumed that its target assignment record + * is deleted as part of the member deletion process. In other words, this class + * does not yield a tombstone for removed members. + */ +public class TargetAssignmentBuilder { +/** + * The assignment result returned by {{@link TargetAssignmentBuilder#build()}}. + */ +public static class TargetAssignmentResult { +/** + * The records that must be applied to the __consumer_offsets + * topics to persist the new target assignment. + */ +private final List records; + +/** + * The new target assignment for all members. + */ +private final Map assignments; + +TargetAssignmentResult( +List records, +Map assignments +) { +Objects.requireNonNull(records); +Objects.requireNonNull(assignments); +this.records = records; +this.assignments = assignments; +} + +/** + * @return The records. + */ +public List records() { +return records; +} + +/** + * @return The assignments. + */ +public Map assignments() { +return assignments; +} +} + +/** + * The group id. + */ +private final String groupId; + +/** + * The group epoch. + */ +private final int groupEpoch; + +/** + * The partition assignor used to compute the assignment. + */ +private final PartitionAssignor assignor; + +/** + * The members in the group. + */ +private Map members = Collections.emptyMap(); + +/** + * The subscription metadata. + */ +private Map subscriptionMetadata = Collections.emptyMap(); + +/** + * The current target assignment. + */ +private Map assignments = Collections.emptyMap(); + +/** + * The members which have been updated or deleted. Deleted members + * are signaled by a null value. + */ +private final Map updatedMembers = new HashMap<>(); + +/** + * Constructs the object. + * + * @param groupId The group id. + * @param groupEpochThe group epoch to compute a target assignment for. + * @param assignor The assignor to use to compute the target assignment. + */ +public TargetAssignmentBuilder( +String groupId, +int groupEpoch, +
[GitHub] [kafka] jolshan commented on a diff in pull request #13637: KAFKA-14462; [10/N] Add TargetAssignmentBuilder
jolshan commented on code in PR #13637: URL: https://github.com/apache/kafka/pull/13637#discussion_r1180589296 ## group-coordinator/src/main/java/org/apache/kafka/coordinator/group/consumer/TargetAssignmentBuilder.java: ## @@ -0,0 +1,327 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.kafka.coordinator.group.consumer; + +import org.apache.kafka.common.Uuid; +import org.apache.kafka.coordinator.group.Record; +import org.apache.kafka.coordinator.group.assignor.AssignmentMemberSpec; +import org.apache.kafka.coordinator.group.assignor.AssignmentSpec; +import org.apache.kafka.coordinator.group.assignor.AssignmentTopicMetadata; +import org.apache.kafka.coordinator.group.assignor.GroupAssignment; +import org.apache.kafka.coordinator.group.assignor.MemberAssignment; +import org.apache.kafka.coordinator.group.assignor.PartitionAssignor; +import org.apache.kafka.coordinator.group.assignor.PartitionAssignorException; + +import java.util.ArrayList; +import java.util.Collections; +import java.util.HashMap; +import java.util.HashSet; +import java.util.List; +import java.util.Map; +import java.util.Objects; +import java.util.Optional; +import java.util.Set; + +import static org.apache.kafka.coordinator.group.RecordHelpers.newTargetAssignmentEpochRecord; +import static org.apache.kafka.coordinator.group.RecordHelpers.newTargetAssignmentRecord; + +/** + * Build a new Target Assignment based on the provided parameters. As a result, + * it yields the records that must be persisted to the log and the new member + * assignments as a map. + * + * Records are only created for members which have a new target assignment. If + * their assignment did not change, no new record is needed. + * + * When a member is deleted, it is assumed that its target assignment record + * is deleted as part of the member deletion process. In other words, this class + * does not yield a tombstone for removed members. + */ +public class TargetAssignmentBuilder { +/** + * The assignment result returned by {{@link TargetAssignmentBuilder#build()}}. + */ +public static class TargetAssignmentResult { +/** + * The records that must be applied to the __consumer_offsets + * topics to persist the new target assignment. + */ +private final List records; + +/** + * The new target assignment for all members. + */ +private final Map assignments; + +TargetAssignmentResult( +List records, +Map assignments +) { +Objects.requireNonNull(records); +Objects.requireNonNull(assignments); +this.records = records; +this.assignments = assignments; +} + +/** + * @return The records. + */ +public List records() { +return records; +} + +/** + * @return The assignments. + */ +public Map assignments() { +return assignments; +} +} + +/** + * The group id. + */ +private final String groupId; + +/** + * The group epoch. + */ +private final int groupEpoch; + +/** + * The partition assignor used to compute the assignment. + */ +private final PartitionAssignor assignor; + +/** + * The members in the group. + */ +private Map members = Collections.emptyMap(); + +/** + * The subscription metadata. + */ +private Map subscriptionMetadata = Collections.emptyMap(); + +/** + * The current target assignment. + */ +private Map assignments = Collections.emptyMap(); + +/** + * The members which have been updated or deleted. Deleted members + * are signaled by a null value. + */ +private final Map updatedMembers = new HashMap<>(); + +/** + * Constructs the object. + * + * @param groupId The group id. + * @param groupEpochThe group epoch to compute a target assignment for. + * @param assignor The assignor to use to compute the target assignment. + */ +public TargetAssignmentBuilder( +String groupId, +int groupEpoch, +
[GitHub] [kafka] jolshan commented on a diff in pull request #13637: KAFKA-14462; [10/N] Add TargetAssignmentBuilder
jolshan commented on code in PR #13637: URL: https://github.com/apache/kafka/pull/13637#discussion_r1180588499 ## group-coordinator/src/main/java/org/apache/kafka/coordinator/group/consumer/TargetAssignmentBuilder.java: ## @@ -0,0 +1,327 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.kafka.coordinator.group.consumer; + +import org.apache.kafka.common.Uuid; +import org.apache.kafka.coordinator.group.Record; +import org.apache.kafka.coordinator.group.assignor.AssignmentMemberSpec; +import org.apache.kafka.coordinator.group.assignor.AssignmentSpec; +import org.apache.kafka.coordinator.group.assignor.AssignmentTopicMetadata; +import org.apache.kafka.coordinator.group.assignor.GroupAssignment; +import org.apache.kafka.coordinator.group.assignor.MemberAssignment; +import org.apache.kafka.coordinator.group.assignor.PartitionAssignor; +import org.apache.kafka.coordinator.group.assignor.PartitionAssignorException; + +import java.util.ArrayList; +import java.util.Collections; +import java.util.HashMap; +import java.util.HashSet; +import java.util.List; +import java.util.Map; +import java.util.Objects; +import java.util.Optional; +import java.util.Set; + +import static org.apache.kafka.coordinator.group.RecordHelpers.newTargetAssignmentEpochRecord; +import static org.apache.kafka.coordinator.group.RecordHelpers.newTargetAssignmentRecord; + +/** + * Build a new Target Assignment based on the provided parameters. As a result, + * it yields the records that must be persisted to the log and the new member + * assignments as a map. + * + * Records are only created for members which have a new target assignment. If + * their assignment did not change, no new record is needed. + * + * When a member is deleted, it is assumed that its target assignment record + * is deleted as part of the member deletion process. In other words, this class + * does not yield a tombstone for removed members. Review Comment: I read this, but I guess I just didn't assume that delete record = create tombstone. I guess that's a pretty easy conclusion to draw, it just didn't come to me right away. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] jeffkbkim commented on a diff in pull request #13267: KAFKA-14694: RPCProducerIdManager should not wait on new block
jeffkbkim commented on code in PR #13267: URL: https://github.com/apache/kafka/pull/13267#discussion_r1180578924 ## core/src/main/scala/kafka/coordinator/transaction/ProducerIdManager.scala: ## @@ -123,73 +129,94 @@ class ZkProducerIdManager(brokerId: Int, } } - def generateProducerId(): Long = { + def generateProducerId(): Try[Long] = { this synchronized { // grab a new block of producerIds if this block has been exhausted if (nextProducerId > currentProducerIdBlock.lastProducerId) { -allocateNewProducerIdBlock() +try { + allocateNewProducerIdBlock() +} catch { + case t: Throwable => +return Failure(t) +} nextProducerId = currentProducerIdBlock.firstProducerId } nextProducerId += 1 - nextProducerId - 1 + Success(nextProducerId - 1) +} + } + + override def hasValidBlock: Boolean = { +this synchronized { + !currentProducerIdBlock.equals(ProducerIdsBlock.EMPTY) } } } +/** + * RPCProducerIdManager allocates producer id blocks asynchronously and will immediately fail requests + * for producers to retry if it does not have an available producer id and is waiting on a new block. + */ class RPCProducerIdManager(brokerId: Int, + time: Time, brokerEpochSupplier: () => Long, - controllerChannel: BrokerToControllerChannelManager, - maxWaitMs: Int) extends ProducerIdManager with Logging { + controllerChannel: BrokerToControllerChannelManager) extends ProducerIdManager with Logging { this.logIdent = "[RPC ProducerId Manager " + brokerId + "]: " - private val nextProducerIdBlock = new ArrayBlockingQueue[Try[ProducerIdsBlock]](1) + // Visible for testing + private[transaction] var nextProducerIdBlock = new AtomicReference[ProducerIdsBlock](null) + private val currentProducerIdBlock: AtomicReference[ProducerIdsBlock] = new AtomicReference(ProducerIdsBlock.EMPTY) private val requestInFlight = new AtomicBoolean(false) - private var currentProducerIdBlock: ProducerIdsBlock = ProducerIdsBlock.EMPTY - private var nextProducerId: Long = -1L + private val shouldBackoff = new AtomicBoolean(false) - override def generateProducerId(): Long = { -this synchronized { - if (nextProducerId == -1L) { -// Send an initial request to get the first block -maybeRequestNextBlock() -nextProducerId = 0L - } else { -nextProducerId += 1 - -// Check if we need to fetch the next block -if (nextProducerId >= (currentProducerIdBlock.firstProducerId + currentProducerIdBlock.size * ProducerIdManager.PidPrefetchThreshold)) { - maybeRequestNextBlock() -} - } + override def hasValidBlock: Boolean = { +nextProducerIdBlock.get != null + } - // If we've exhausted the current block, grab the next block (waiting if necessary) - if (nextProducerId > currentProducerIdBlock.lastProducerId) { -val block = nextProducerIdBlock.poll(maxWaitMs, TimeUnit.MILLISECONDS) -if (block == null) { - // Return COORDINATOR_LOAD_IN_PROGRESS rather than REQUEST_TIMED_OUT since older clients treat the error as fatal - // when it should be retriable like COORDINATOR_LOAD_IN_PROGRESS. - throw Errors.COORDINATOR_LOAD_IN_PROGRESS.exception("Timed out waiting for next producer ID block") -} else { - block match { -case Success(nextBlock) => - currentProducerIdBlock = nextBlock - nextProducerId = currentProducerIdBlock.firstProducerId -case Failure(t) => throw t + override def generateProducerId(): Try[Long] = { +var result: Try[Long] = null +while (result == null) { Review Comment: ah that makes sense -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] dajac commented on a diff in pull request #13637: KAFKA-14462; [10/N] Add TargetAssignmentBuilder
dajac commented on code in PR #13637: URL: https://github.com/apache/kafka/pull/13637#discussion_r1180576777 ## group-coordinator/src/test/java/org/apache/kafka/coordinator/group/consumer/ConsumerGroupMemberTest.java: ## @@ -0,0 +1,307 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.kafka.coordinator.group.consumer; + +import org.apache.kafka.common.Uuid; +import org.apache.kafka.coordinator.group.generated.ConsumerGroupCurrentMemberAssignmentValue; +import org.apache.kafka.coordinator.group.generated.ConsumerGroupMemberMetadataValue; +import org.junit.jupiter.api.Test; + +import java.nio.ByteBuffer; +import java.util.Arrays; +import java.util.Collections; +import java.util.Optional; +import java.util.OptionalInt; + +import static org.apache.kafka.coordinator.group.consumer.AssignmentTestUtil.mkAssignment; +import static org.apache.kafka.coordinator.group.consumer.AssignmentTestUtil.mkTopicAssignment; +import static org.junit.jupiter.api.Assertions.assertEquals; + +public class ConsumerGroupMemberTest { + +@Test +public void testNewMember() { +Uuid topicId1 = Uuid.randomUuid(); +Uuid topicId2 = Uuid.randomUuid(); +Uuid topicId3 = Uuid.randomUuid(); + +ConsumerGroupMember member = new ConsumerGroupMember.Builder("member-id") +.setMemberEpoch(10) +.setPreviousMemberEpoch(9) +.setNextMemberEpoch(11) +.setInstanceId("instance-id") +.setRackId("rack-id") +.setRebalanceTimeoutMs(5000) +.setClientId("client-id") +.setClientHost("hostname") +.setSubscribedTopicNames(Arrays.asList("foo", "bar")) +.setSubscribedTopicRegex("regex") +.setServerAssignorName("range") +.setClientAssignors(Collections.singletonList( +new ClientAssignor( +"assignor", +(byte) 0, +(byte) 0, +(byte) 1, +new VersionedMetadata( +(byte) 1, +ByteBuffer.allocate(0) +.setAssignedPartitions(mkAssignment( +mkTopicAssignment(topicId1, 1, 2, 3))) +.setPartitionsPendingRevocation(mkAssignment( +mkTopicAssignment(topicId2, 4, 5, 6))) +.setPartitionsPendingAssignment(mkAssignment( +mkTopicAssignment(topicId3, 7, 8, 9))) +.build(); + +assertEquals("member-id", member.memberId()); +assertEquals(10, member.memberEpoch()); +assertEquals(9, member.previousMemberEpoch()); +assertEquals(11, member.nextMemberEpoch()); +assertEquals("instance-id", member.instanceId()); +assertEquals("rack-id", member.rackId()); +assertEquals("client-id", member.clientId()); +assertEquals("hostname", member.clientHost()); +// Names are sorted. +assertEquals(Arrays.asList("bar", "foo"), member.subscribedTopicNames()); +assertEquals("regex", member.subscribedTopicRegex()); +assertEquals("range", member.serverAssignorName().get()); +assertEquals( +Collections.singletonList( +new ClientAssignor( +"assignor", +(byte) 0, +(byte) 0, +(byte) 1, +new VersionedMetadata( +(byte) 1, +ByteBuffer.allocate(0, +member.clientAssignors()); +assertEquals(mkAssignment(mkTopicAssignment(topicId1, 1, 2, 3)), member.assignedPartitions()); +assertEquals(mkAssignment(mkTopicAssignment(topicId2, 4, 5, 6)), member.partitionsPendingRevocation()); +assertEquals(mkAssignment(mkTopicAssignment(topicId3, 7, 8, 9)), member.partitionsPendingAssignment()); +} + +@Test +public void testEquals() { +Uuid topicId1 = Uuid.randomUuid(); Review Comment: Got it. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To
[GitHub] [kafka] jolshan commented on a diff in pull request #13637: KAFKA-14462; [10/N] Add TargetAssignmentBuilder
jolshan commented on code in PR #13637: URL: https://github.com/apache/kafka/pull/13637#discussion_r1180575457 ## group-coordinator/src/test/java/org/apache/kafka/coordinator/group/consumer/ConsumerGroupMemberTest.java: ## @@ -0,0 +1,307 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.kafka.coordinator.group.consumer; + +import org.apache.kafka.common.Uuid; +import org.apache.kafka.coordinator.group.generated.ConsumerGroupCurrentMemberAssignmentValue; +import org.apache.kafka.coordinator.group.generated.ConsumerGroupMemberMetadataValue; +import org.junit.jupiter.api.Test; + +import java.nio.ByteBuffer; +import java.util.Arrays; +import java.util.Collections; +import java.util.Optional; +import java.util.OptionalInt; + +import static org.apache.kafka.coordinator.group.consumer.AssignmentTestUtil.mkAssignment; +import static org.apache.kafka.coordinator.group.consumer.AssignmentTestUtil.mkTopicAssignment; +import static org.junit.jupiter.api.Assertions.assertEquals; + +public class ConsumerGroupMemberTest { + +@Test +public void testNewMember() { +Uuid topicId1 = Uuid.randomUuid(); +Uuid topicId2 = Uuid.randomUuid(); +Uuid topicId3 = Uuid.randomUuid(); + +ConsumerGroupMember member = new ConsumerGroupMember.Builder("member-id") +.setMemberEpoch(10) +.setPreviousMemberEpoch(9) +.setNextMemberEpoch(11) +.setInstanceId("instance-id") +.setRackId("rack-id") +.setRebalanceTimeoutMs(5000) +.setClientId("client-id") +.setClientHost("hostname") +.setSubscribedTopicNames(Arrays.asList("foo", "bar")) +.setSubscribedTopicRegex("regex") +.setServerAssignorName("range") +.setClientAssignors(Collections.singletonList( +new ClientAssignor( +"assignor", +(byte) 0, +(byte) 0, +(byte) 1, +new VersionedMetadata( +(byte) 1, +ByteBuffer.allocate(0) +.setAssignedPartitions(mkAssignment( +mkTopicAssignment(topicId1, 1, 2, 3))) +.setPartitionsPendingRevocation(mkAssignment( +mkTopicAssignment(topicId2, 4, 5, 6))) +.setPartitionsPendingAssignment(mkAssignment( +mkTopicAssignment(topicId3, 7, 8, 9))) +.build(); + +assertEquals("member-id", member.memberId()); +assertEquals(10, member.memberEpoch()); +assertEquals(9, member.previousMemberEpoch()); +assertEquals(11, member.nextMemberEpoch()); +assertEquals("instance-id", member.instanceId()); +assertEquals("rack-id", member.rackId()); +assertEquals("client-id", member.clientId()); +assertEquals("hostname", member.clientHost()); +// Names are sorted. +assertEquals(Arrays.asList("bar", "foo"), member.subscribedTopicNames()); +assertEquals("regex", member.subscribedTopicRegex()); +assertEquals("range", member.serverAssignorName().get()); +assertEquals( +Collections.singletonList( +new ClientAssignor( +"assignor", +(byte) 0, +(byte) 0, +(byte) 1, +new VersionedMetadata( +(byte) 1, +ByteBuffer.allocate(0, +member.clientAssignors()); +assertEquals(mkAssignment(mkTopicAssignment(topicId1, 1, 2, 3)), member.assignedPartitions()); +assertEquals(mkAssignment(mkTopicAssignment(topicId2, 4, 5, 6)), member.partitionsPendingRevocation()); +assertEquals(mkAssignment(mkTopicAssignment(topicId3, 7, 8, 9)), member.partitionsPendingAssignment()); +} + +@Test +public void testEquals() { +Uuid topicId1 = Uuid.randomUuid(); Review Comment: Yes I was referring to the context. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the
[GitHub] [kafka] dajac commented on a diff in pull request #13637: KAFKA-14462; [10/N] Add TargetAssignmentBuilder
dajac commented on code in PR #13637: URL: https://github.com/apache/kafka/pull/13637#discussion_r1180572223 ## group-coordinator/src/test/java/org/apache/kafka/coordinator/group/consumer/ConsumerGroupMemberTest.java: ## @@ -0,0 +1,307 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.kafka.coordinator.group.consumer; + +import org.apache.kafka.common.Uuid; +import org.apache.kafka.coordinator.group.generated.ConsumerGroupCurrentMemberAssignmentValue; +import org.apache.kafka.coordinator.group.generated.ConsumerGroupMemberMetadataValue; +import org.junit.jupiter.api.Test; + +import java.nio.ByteBuffer; +import java.util.Arrays; +import java.util.Collections; +import java.util.Optional; +import java.util.OptionalInt; + +import static org.apache.kafka.coordinator.group.consumer.AssignmentTestUtil.mkAssignment; +import static org.apache.kafka.coordinator.group.consumer.AssignmentTestUtil.mkTopicAssignment; +import static org.junit.jupiter.api.Assertions.assertEquals; + +public class ConsumerGroupMemberTest { + +@Test +public void testNewMember() { +Uuid topicId1 = Uuid.randomUuid(); +Uuid topicId2 = Uuid.randomUuid(); +Uuid topicId3 = Uuid.randomUuid(); + +ConsumerGroupMember member = new ConsumerGroupMember.Builder("member-id") +.setMemberEpoch(10) +.setPreviousMemberEpoch(9) +.setNextMemberEpoch(11) +.setInstanceId("instance-id") +.setRackId("rack-id") +.setRebalanceTimeoutMs(5000) +.setClientId("client-id") +.setClientHost("hostname") +.setSubscribedTopicNames(Arrays.asList("foo", "bar")) +.setSubscribedTopicRegex("regex") +.setServerAssignorName("range") +.setClientAssignors(Collections.singletonList( +new ClientAssignor( +"assignor", +(byte) 0, +(byte) 0, +(byte) 1, +new VersionedMetadata( +(byte) 1, +ByteBuffer.allocate(0) +.setAssignedPartitions(mkAssignment( +mkTopicAssignment(topicId1, 1, 2, 3))) +.setPartitionsPendingRevocation(mkAssignment( +mkTopicAssignment(topicId2, 4, 5, 6))) +.setPartitionsPendingAssignment(mkAssignment( +mkTopicAssignment(topicId3, 7, 8, 9))) +.build(); + +assertEquals("member-id", member.memberId()); +assertEquals(10, member.memberEpoch()); +assertEquals(9, member.previousMemberEpoch()); +assertEquals(11, member.nextMemberEpoch()); +assertEquals("instance-id", member.instanceId()); +assertEquals("rack-id", member.rackId()); +assertEquals("client-id", member.clientId()); +assertEquals("hostname", member.clientHost()); +// Names are sorted. +assertEquals(Arrays.asList("bar", "foo"), member.subscribedTopicNames()); +assertEquals("regex", member.subscribedTopicRegex()); +assertEquals("range", member.serverAssignorName().get()); +assertEquals( +Collections.singletonList( +new ClientAssignor( +"assignor", +(byte) 0, +(byte) 0, +(byte) 1, +new VersionedMetadata( +(byte) 1, +ByteBuffer.allocate(0, +member.clientAssignors()); +assertEquals(mkAssignment(mkTopicAssignment(topicId1, 1, 2, 3)), member.assignedPartitions()); +assertEquals(mkAssignment(mkTopicAssignment(topicId2, 4, 5, 6)), member.partitionsPendingRevocation()); +assertEquals(mkAssignment(mkTopicAssignment(topicId3, 7, 8, 9)), member.partitionsPendingAssignment()); +} + +@Test +public void testEquals() { +Uuid topicId1 = Uuid.randomUuid(); Review Comment: What do you mean by static class? Are you referring to a static attribute for the context in the test class that is shared by all the tests? In this case, yes, they would collide. --
[GitHub] [kafka] jolshan commented on a diff in pull request #13637: KAFKA-14462; [10/N] Add TargetAssignmentBuilder
jolshan commented on code in PR #13637: URL: https://github.com/apache/kafka/pull/13637#discussion_r1180569515 ## group-coordinator/src/test/java/org/apache/kafka/coordinator/group/consumer/ConsumerGroupMemberTest.java: ## @@ -0,0 +1,307 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.kafka.coordinator.group.consumer; + +import org.apache.kafka.common.Uuid; +import org.apache.kafka.coordinator.group.generated.ConsumerGroupCurrentMemberAssignmentValue; +import org.apache.kafka.coordinator.group.generated.ConsumerGroupMemberMetadataValue; +import org.junit.jupiter.api.Test; + +import java.nio.ByteBuffer; +import java.util.Arrays; +import java.util.Collections; +import java.util.Optional; +import java.util.OptionalInt; + +import static org.apache.kafka.coordinator.group.consumer.AssignmentTestUtil.mkAssignment; +import static org.apache.kafka.coordinator.group.consumer.AssignmentTestUtil.mkTopicAssignment; +import static org.junit.jupiter.api.Assertions.assertEquals; + +public class ConsumerGroupMemberTest { + +@Test +public void testNewMember() { +Uuid topicId1 = Uuid.randomUuid(); +Uuid topicId2 = Uuid.randomUuid(); +Uuid topicId3 = Uuid.randomUuid(); + +ConsumerGroupMember member = new ConsumerGroupMember.Builder("member-id") +.setMemberEpoch(10) +.setPreviousMemberEpoch(9) +.setNextMemberEpoch(11) +.setInstanceId("instance-id") +.setRackId("rack-id") +.setRebalanceTimeoutMs(5000) +.setClientId("client-id") +.setClientHost("hostname") +.setSubscribedTopicNames(Arrays.asList("foo", "bar")) +.setSubscribedTopicRegex("regex") +.setServerAssignorName("range") +.setClientAssignors(Collections.singletonList( +new ClientAssignor( +"assignor", +(byte) 0, +(byte) 0, +(byte) 1, +new VersionedMetadata( +(byte) 1, +ByteBuffer.allocate(0) +.setAssignedPartitions(mkAssignment( +mkTopicAssignment(topicId1, 1, 2, 3))) +.setPartitionsPendingRevocation(mkAssignment( +mkTopicAssignment(topicId2, 4, 5, 6))) +.setPartitionsPendingAssignment(mkAssignment( +mkTopicAssignment(topicId3, 7, 8, 9))) +.build(); + +assertEquals("member-id", member.memberId()); +assertEquals(10, member.memberEpoch()); +assertEquals(9, member.previousMemberEpoch()); +assertEquals(11, member.nextMemberEpoch()); +assertEquals("instance-id", member.instanceId()); +assertEquals("rack-id", member.rackId()); +assertEquals("client-id", member.clientId()); +assertEquals("hostname", member.clientHost()); +// Names are sorted. +assertEquals(Arrays.asList("bar", "foo"), member.subscribedTopicNames()); +assertEquals("regex", member.subscribedTopicRegex()); +assertEquals("range", member.serverAssignorName().get()); +assertEquals( +Collections.singletonList( +new ClientAssignor( +"assignor", +(byte) 0, +(byte) 0, +(byte) 1, +new VersionedMetadata( +(byte) 1, +ByteBuffer.allocate(0, +member.clientAssignors()); +assertEquals(mkAssignment(mkTopicAssignment(topicId1, 1, 2, 3)), member.assignedPartitions()); +assertEquals(mkAssignment(mkTopicAssignment(topicId2, 4, 5, 6)), member.partitionsPendingRevocation()); +assertEquals(mkAssignment(mkTopicAssignment(topicId3, 7, 8, 9)), member.partitionsPendingAssignment()); +} + +@Test +public void testEquals() { +Uuid topicId1 = Uuid.randomUuid(); Review Comment: If we had a static class and used the same IDs, wouldn't they collide? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and
[GitHub] [kafka] dajac merged pull request #13618: MINOR: Fixing typos in the ConsumerCoordinators
dajac merged PR #13618: URL: https://github.com/apache/kafka/pull/13618 -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] philipnee commented on pull request #13550: KAFKA-14639: A single partition may be revoked and assign during a single round of rebalance
philipnee commented on PR #13550: URL: https://github.com/apache/kafka/pull/13550#issuecomment-1527744080 Thanks David, FWIW: I think we also want to back port it to 3.4, I'll raise a PR. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] vamossagar12 commented on pull request #13594: KAFKA-14913: Using ThreadUtils.shutdownExecutorServiceQuietly to close executors in Connect Runtime
vamossagar12 commented on PR #13594: URL: https://github.com/apache/kafka/pull/13594#issuecomment-1527739912 @urbandan , i took the liberty and modified the `shutdownExecutorServiceQuietly` to do a 2-phased shutdown as suggested in JavaDocs. Also, modified `MemoryOffsetBackingStore, SourceTaskOffsetCommitter, Worker` to use `shutdownExecutorServiceQuietly` to shutdown their executors. @yashmayya , as suggested, I have also removed the ConnectException thrown from `MemoryOffsetBackingStore#stop`. This is ready for a re-review. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Updated] (KAFKA-14949) Add Streams upgrade tests from AK 3.4
[ https://issues.apache.org/jira/browse/KAFKA-14949?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Matthias J. Sax updated KAFKA-14949: Component/s: system tests > Add Streams upgrade tests from AK 3.4 > - > > Key: KAFKA-14949 > URL: https://issues.apache.org/jira/browse/KAFKA-14949 > Project: Kafka > Issue Type: Task > Components: streams, system tests >Reporter: Victoria Xia >Priority: Critical > > Streams upgrade tests currently only test upgrading from 3.3 and earlier > versions > ([link|https://github.com/apache/kafka/blob/056657d84d84e116ffc9460872945b4d2b479ff3/tests/kafkatest/tests/streams/streams_application_upgrade_test.py#L30]). > We should add 3.4 as an "upgrade_from" version into these tests, in light of > the upcoming 3.5 release. -- This message was sent by Atlassian Jira (v8.20.10#820010)
[GitHub] [kafka] mjsax commented on a diff in pull request #13633: KAFKA-14839: Exclude protected variable from JavaDocs
mjsax commented on code in PR #13633: URL: https://github.com/apache/kafka/pull/13633#discussion_r1180546756 ## build.gradle: ## @@ -2057,6 +2057,7 @@ project(':streams') { srcJar.dependsOn 'processMessages' javadoc { +options.memberLevel = JavadocMemberLevel.PUBLIC // Document only public members/API Review Comment: \cc @ijuma WDYT? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] vamossagar12 commented on pull request #13646: KAFKA-14938: Fixing flaky test testConnectorBoundary
vamossagar12 commented on PR #13646: URL: https://github.com/apache/kafka/pull/13646#issuecomment-1527703722 Yeah I agree with @yashmayya . Moreover this ``` Yes it will fail, but consumeAll is not failing due to timeout here but rather due to its nature of storing the end offsets before consuming. ``` is not entirely correct i think. I agree what gets thrown in an AssertionError but thats because the number of sourceRecords returned by `consumeAll` didn't meet the desired number of records within 60s. For starters, can you try increasing `CONSUME_RECORDS_TIMEOUT_MS` to 100s or as such and see if it even works? Basically, we need to check if consumer is lagging or are enough records being produced? I i think it would mostly be the former because as Yash said, we are anyways waiting for 100 records to be committed. It's not an ideal fix but let's first see if it works and if needed we can dig deeper. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] bmscomp commented on a diff in pull request #13627: MINOR: simplify if else conditions in Uuid.compareTo method
bmscomp commented on code in PR #13627: URL: https://github.com/apache/kafka/pull/13627#discussion_r1178371488 ## clients/src/main/java/org/apache/kafka/common/Uuid.java: ## @@ -143,12 +143,6 @@ public int compareTo(Uuid other) { return 1; Review Comment: @divijvaidya Still there is another way to implement the `compareTo` method, and by the way `compare` can be a pretty name also for the method, and it will keep the same style of the compare method name in `Long` if (mostSignificantBits == other.mostSignificantBits) return Long.compare(leastSignificantBits, other.leastSignificantBits); return Long.compare(mostSignificantBits, other.mostSignificantBits); -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] dajac commented on pull request #13637: KAFKA-14462; [10/N] Add TargetAssignmentBuilder
dajac commented on PR #13637: URL: https://github.com/apache/kafka/pull/13637#issuecomment-1527632827 @jolshan @rreddy-22 @jeffkbkim Thanks for your comments. I have addressed all of them. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] dajac commented on a diff in pull request #13637: KAFKA-14462; [10/N] Add TargetAssignmentBuilder
dajac commented on code in PR #13637: URL: https://github.com/apache/kafka/pull/13637#discussion_r1180463243 ## group-coordinator/src/test/java/org/apache/kafka/coordinator/group/consumer/ConsumerGroupMemberTest.java: ## @@ -0,0 +1,307 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.kafka.coordinator.group.consumer; + +import org.apache.kafka.common.Uuid; +import org.apache.kafka.coordinator.group.generated.ConsumerGroupCurrentMemberAssignmentValue; +import org.apache.kafka.coordinator.group.generated.ConsumerGroupMemberMetadataValue; +import org.junit.jupiter.api.Test; + +import java.nio.ByteBuffer; +import java.util.Arrays; +import java.util.Collections; +import java.util.Optional; +import java.util.OptionalInt; + +import static org.apache.kafka.coordinator.group.consumer.AssignmentTestUtil.mkAssignment; +import static org.apache.kafka.coordinator.group.consumer.AssignmentTestUtil.mkTopicAssignment; +import static org.junit.jupiter.api.Assertions.assertEquals; + +public class ConsumerGroupMemberTest { + +@Test +public void testNewMember() { +Uuid topicId1 = Uuid.randomUuid(); +Uuid topicId2 = Uuid.randomUuid(); +Uuid topicId3 = Uuid.randomUuid(); + +ConsumerGroupMember member = new ConsumerGroupMember.Builder("member-id") +.setMemberEpoch(10) +.setPreviousMemberEpoch(9) +.setNextMemberEpoch(11) +.setInstanceId("instance-id") +.setRackId("rack-id") +.setRebalanceTimeoutMs(5000) +.setClientId("client-id") +.setClientHost("hostname") +.setSubscribedTopicNames(Arrays.asList("foo", "bar")) +.setSubscribedTopicRegex("regex") +.setServerAssignorName("range") +.setClientAssignors(Collections.singletonList( +new ClientAssignor( +"assignor", +(byte) 0, +(byte) 0, +(byte) 1, +new VersionedMetadata( +(byte) 1, +ByteBuffer.allocate(0) +.setAssignedPartitions(mkAssignment( +mkTopicAssignment(topicId1, 1, 2, 3))) +.setPartitionsPendingRevocation(mkAssignment( +mkTopicAssignment(topicId2, 4, 5, 6))) +.setPartitionsPendingAssignment(mkAssignment( +mkTopicAssignment(topicId3, 7, 8, 9))) +.build(); + +assertEquals("member-id", member.memberId()); +assertEquals(10, member.memberEpoch()); +assertEquals(9, member.previousMemberEpoch()); +assertEquals(11, member.nextMemberEpoch()); +assertEquals("instance-id", member.instanceId()); +assertEquals("rack-id", member.rackId()); +assertEquals("client-id", member.clientId()); +assertEquals("hostname", member.clientHost()); +// Names are sorted. +assertEquals(Arrays.asList("bar", "foo"), member.subscribedTopicNames()); +assertEquals("regex", member.subscribedTopicRegex()); +assertEquals("range", member.serverAssignorName().get()); +assertEquals( +Collections.singletonList( +new ClientAssignor( +"assignor", +(byte) 0, +(byte) 0, +(byte) 1, +new VersionedMetadata( +(byte) 1, +ByteBuffer.allocate(0, +member.clientAssignors()); +assertEquals(mkAssignment(mkTopicAssignment(topicId1, 1, 2, 3)), member.assignedPartitions()); +assertEquals(mkAssignment(mkTopicAssignment(topicId2, 4, 5, 6)), member.partitionsPendingRevocation()); +assertEquals(mkAssignment(mkTopicAssignment(topicId3, 7, 8, 9)), member.partitionsPendingAssignment()); +} + +@Test +public void testEquals() { +Uuid topicId1 = Uuid.randomUuid(); Review Comment: I have change it a bit. Let me know what you think. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to
[GitHub] [kafka] dajac commented on a diff in pull request #13637: KAFKA-14462; [10/N] Add TargetAssignmentBuilder
dajac commented on code in PR #13637: URL: https://github.com/apache/kafka/pull/13637#discussion_r1180451832 ## group-coordinator/src/test/java/org/apache/kafka/coordinator/group/consumer/TargetAssignmentBuilderTest.java: ## @@ -0,0 +1,688 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.kafka.coordinator.group.consumer; + +import org.apache.kafka.common.Uuid; +import org.apache.kafka.coordinator.group.assignor.AssignmentMemberSpec; +import org.apache.kafka.coordinator.group.assignor.AssignmentSpec; +import org.apache.kafka.coordinator.group.assignor.AssignmentTopicMetadata; +import org.apache.kafka.coordinator.group.assignor.GroupAssignment; +import org.apache.kafka.coordinator.group.assignor.PartitionAssignor; +import org.junit.jupiter.api.Test; + +import java.util.Arrays; +import java.util.Collections; +import java.util.HashMap; +import java.util.HashSet; +import java.util.List; +import java.util.Map; +import java.util.Optional; +import java.util.Set; + +import static org.apache.kafka.coordinator.group.consumer.AssignmentTestUtil.mkAssignment; +import static org.apache.kafka.coordinator.group.consumer.AssignmentTestUtil.mkTopicAssignment; +import static org.apache.kafka.coordinator.group.RecordHelpers.newTargetAssignmentEpochRecord; +import static org.apache.kafka.coordinator.group.RecordHelpers.newTargetAssignmentRecord; +import static org.junit.jupiter.api.Assertions.assertEquals; +import static org.mockito.ArgumentMatchers.any; +import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.times; +import static org.mockito.Mockito.verify; +import static org.mockito.Mockito.when; + +public class TargetAssignmentBuilderTest { + +public static class TargetAssignmentBuilderTestContext { Review Comment: To your first comment: It is hard to do simpler than that. The alternative would be to manually prepare all the data in each test. I did this actually before extracting all the common logic into the context in order to make the test simpler. To your second comment: I have added more comments. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] dajac commented on a diff in pull request #13637: KAFKA-14462; [10/N] Add TargetAssignmentBuilder
dajac commented on code in PR #13637: URL: https://github.com/apache/kafka/pull/13637#discussion_r1180427210 ## group-coordinator/src/test/java/org/apache/kafka/coordinator/group/consumer/TargetAssignmentBuilderTest.java: ## @@ -0,0 +1,688 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.kafka.coordinator.group.consumer; + +import org.apache.kafka.common.Uuid; +import org.apache.kafka.coordinator.group.assignor.AssignmentMemberSpec; +import org.apache.kafka.coordinator.group.assignor.AssignmentSpec; +import org.apache.kafka.coordinator.group.assignor.AssignmentTopicMetadata; +import org.apache.kafka.coordinator.group.assignor.GroupAssignment; +import org.apache.kafka.coordinator.group.assignor.PartitionAssignor; +import org.junit.jupiter.api.Test; + +import java.util.Arrays; +import java.util.Collections; +import java.util.HashMap; +import java.util.HashSet; +import java.util.List; +import java.util.Map; +import java.util.Optional; +import java.util.Set; + +import static org.apache.kafka.coordinator.group.consumer.AssignmentTestUtil.mkAssignment; +import static org.apache.kafka.coordinator.group.consumer.AssignmentTestUtil.mkTopicAssignment; +import static org.apache.kafka.coordinator.group.RecordHelpers.newTargetAssignmentEpochRecord; +import static org.apache.kafka.coordinator.group.RecordHelpers.newTargetAssignmentRecord; +import static org.junit.jupiter.api.Assertions.assertEquals; +import static org.mockito.ArgumentMatchers.any; +import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.times; +import static org.mockito.Mockito.verify; +import static org.mockito.Mockito.when; + +public class TargetAssignmentBuilderTest { + +public static class TargetAssignmentBuilderTestContext { +private final String groupId; +private final int groupEpoch; Review Comment: I don't understand your comment. The context is instantiated in each test... -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] dajac commented on a diff in pull request #13637: KAFKA-14462; [10/N] Add TargetAssignmentBuilder
dajac commented on code in PR #13637: URL: https://github.com/apache/kafka/pull/13637#discussion_r1180425612 ## group-coordinator/src/test/java/org/apache/kafka/coordinator/group/consumer/TargetAssignmentBuilderTest.java: ## @@ -0,0 +1,688 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.kafka.coordinator.group.consumer; + +import org.apache.kafka.common.Uuid; +import org.apache.kafka.coordinator.group.assignor.AssignmentMemberSpec; +import org.apache.kafka.coordinator.group.assignor.AssignmentSpec; +import org.apache.kafka.coordinator.group.assignor.AssignmentTopicMetadata; +import org.apache.kafka.coordinator.group.assignor.GroupAssignment; +import org.apache.kafka.coordinator.group.assignor.PartitionAssignor; +import org.junit.jupiter.api.Test; + +import java.util.Arrays; +import java.util.Collections; +import java.util.HashMap; +import java.util.HashSet; +import java.util.List; +import java.util.Map; +import java.util.Optional; +import java.util.Set; + +import static org.apache.kafka.coordinator.group.consumer.AssignmentTestUtil.mkAssignment; +import static org.apache.kafka.coordinator.group.consumer.AssignmentTestUtil.mkTopicAssignment; +import static org.apache.kafka.coordinator.group.RecordHelpers.newTargetAssignmentEpochRecord; +import static org.apache.kafka.coordinator.group.RecordHelpers.newTargetAssignmentRecord; +import static org.junit.jupiter.api.Assertions.assertEquals; +import static org.mockito.ArgumentMatchers.any; +import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.times; +import static org.mockito.Mockito.verify; +import static org.mockito.Mockito.when; + +public class TargetAssignmentBuilderTest { + +public static class TargetAssignmentBuilderTestContext { +private final String groupId; +private final int groupEpoch; +private final PartitionAssignor assignor = mock(PartitionAssignor.class); +private final Map members = new HashMap<>(); +private final Map subscriptionMetadata = new HashMap<>(); +private final Map updatedMembers = new HashMap<>(); +private final Map targetAssignments = new HashMap<>(); +private final Map assignments = new HashMap<>(); + +public TargetAssignmentBuilderTestContext( +String groupId, +int groupEpoch +) { +this.groupId = groupId; +this.groupEpoch = groupEpoch; +} + +public TargetAssignmentBuilderTestContext addGroupMember( +String memberId, +List subscriptions, +Map> targetPartitions +) { +members.put(memberId, new ConsumerGroupMember.Builder(memberId) +.setSubscribedTopicNames(subscriptions) +.setRebalanceTimeoutMs(5000) +.build()); + +targetAssignments.put(memberId, new Assignment( +(byte) 0, +targetPartitions, +VersionedMetadata.EMPTY +)); + +return this; +} + +public TargetAssignmentBuilderTestContext addTopicMetadata( +Uuid topicId, +String topicName, +int numPartitions +) { +subscriptionMetadata.put(topicName, new TopicMetadata( +topicId, +topicName, +numPartitions +)); +return this; +} + +public TargetAssignmentBuilderTestContext updateMemberSubscription( +String memberId, +List subscriptions +) { +return updateMemberSubscription( +memberId, +subscriptions, +Optional.empty(), +Optional.empty() +); +} + +public TargetAssignmentBuilderTestContext updateMemberSubscription( +String memberId, +List subscriptions, +Optional instanceId, +Optional rackId +) { +ConsumerGroupMember existingMember = members.get(memberId); +ConsumerGroupMember.Builder builder; +if (existingMember != null) { +builder = new
[jira] [Assigned] (KAFKA-13187) Replace EasyMock and PowerMock with Mockito for DistributedHerderTest
[ https://issues.apache.org/jira/browse/KAFKA-13187?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Yash Mayya reassigned KAFKA-13187: -- Assignee: Yash Mayya (was: Matthew de Detrich) > Replace EasyMock and PowerMock with Mockito for DistributedHerderTest > - > > Key: KAFKA-13187 > URL: https://issues.apache.org/jira/browse/KAFKA-13187 > Project: Kafka > Issue Type: Sub-task >Reporter: YI-CHEN WANG >Assignee: Yash Mayya >Priority: Major > -- This message was sent by Atlassian Jira (v8.20.10#820010)
[GitHub] [kafka] yashmayya commented on pull request #11792: Replace EasyMock/PowerMock with Mockito in DistributedHerderTest
yashmayya commented on PR #11792: URL: https://github.com/apache/kafka/pull/11792#issuecomment-1527579890 @mdedetrich no problem, I've assigned https://issues.apache.org/jira/browse/KAFKA-13187 to myself and I'll start working on it next week. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] dajac commented on a diff in pull request #13637: KAFKA-14462; [10/N] Add TargetAssignmentBuilder
dajac commented on code in PR #13637: URL: https://github.com/apache/kafka/pull/13637#discussion_r1180410020 ## group-coordinator/src/main/java/org/apache/kafka/coordinator/group/consumer/TargetAssignmentBuilder.java: ## @@ -0,0 +1,327 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.kafka.coordinator.group.consumer; + +import org.apache.kafka.common.Uuid; +import org.apache.kafka.coordinator.group.Record; +import org.apache.kafka.coordinator.group.assignor.AssignmentMemberSpec; +import org.apache.kafka.coordinator.group.assignor.AssignmentSpec; +import org.apache.kafka.coordinator.group.assignor.AssignmentTopicMetadata; +import org.apache.kafka.coordinator.group.assignor.GroupAssignment; +import org.apache.kafka.coordinator.group.assignor.MemberAssignment; +import org.apache.kafka.coordinator.group.assignor.PartitionAssignor; +import org.apache.kafka.coordinator.group.assignor.PartitionAssignorException; + +import java.util.ArrayList; +import java.util.Collections; +import java.util.HashMap; +import java.util.HashSet; +import java.util.List; +import java.util.Map; +import java.util.Objects; +import java.util.Optional; +import java.util.Set; + +import static org.apache.kafka.coordinator.group.RecordHelpers.newTargetAssignmentEpochRecord; +import static org.apache.kafka.coordinator.group.RecordHelpers.newTargetAssignmentRecord; + +/** + * Build a new Target Assignment based on the provided parameters. As a result, + * it yields the records that must be persisted to the log and the new member + * assignments as a map. + * + * Records are only created for members which have a new target assignment. If + * their assignment did not change, no new record is needed. + * + * When a member is deleted, it is assumed that its target assignment record + * is deleted as part of the member deletion process. In other words, this class + * does not yield a tombstone for removed members. + */ +public class TargetAssignmentBuilder { +/** + * The assignment result returned by {{@link TargetAssignmentBuilder#build()}}. + */ +public static class TargetAssignmentResult { +/** + * The records that must be applied to the __consumer_offsets + * topics to persist the new target assignment. + */ +private final List records; + +/** + * The new target assignment for all members. + */ +private final Map assignments; + +TargetAssignmentResult( +List records, +Map assignments +) { +Objects.requireNonNull(records); +Objects.requireNonNull(assignments); +this.records = records; +this.assignments = assignments; +} + +/** + * @return The records. + */ +public List records() { +return records; +} + +/** + * @return The assignments. + */ +public Map assignments() { +return assignments; +} +} + +/** + * The group id. + */ +private final String groupId; + +/** + * The group epoch. + */ +private final int groupEpoch; + +/** + * The partition assignor used to compute the assignment. + */ +private final PartitionAssignor assignor; + +/** + * The members in the group. + */ +private Map members = Collections.emptyMap(); + +/** + * The subscription metadata. + */ +private Map subscriptionMetadata = Collections.emptyMap(); + +/** + * The current target assignment. + */ +private Map assignments = Collections.emptyMap(); + +/** + * The members which have been updated or deleted. Deleted members + * are signaled by a null value. + */ +private final Map updatedMembers = new HashMap<>(); + +/** + * Constructs the object. + * + * @param groupId The group id. + * @param groupEpochThe group epoch to compute a target assignment for. + * @param assignor The assignor to use to compute the target assignment. + */ +public TargetAssignmentBuilder( +String groupId, +int groupEpoch, +
[GitHub] [kafka] dajac commented on a diff in pull request #13637: KAFKA-14462; [10/N] Add TargetAssignmentBuilder
dajac commented on code in PR #13637: URL: https://github.com/apache/kafka/pull/13637#discussion_r1180408510 ## group-coordinator/src/main/java/org/apache/kafka/coordinator/group/consumer/TargetAssignmentBuilder.java: ## @@ -0,0 +1,327 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.kafka.coordinator.group.consumer; + +import org.apache.kafka.common.Uuid; +import org.apache.kafka.coordinator.group.Record; +import org.apache.kafka.coordinator.group.assignor.AssignmentMemberSpec; +import org.apache.kafka.coordinator.group.assignor.AssignmentSpec; +import org.apache.kafka.coordinator.group.assignor.AssignmentTopicMetadata; +import org.apache.kafka.coordinator.group.assignor.GroupAssignment; +import org.apache.kafka.coordinator.group.assignor.MemberAssignment; +import org.apache.kafka.coordinator.group.assignor.PartitionAssignor; +import org.apache.kafka.coordinator.group.assignor.PartitionAssignorException; + +import java.util.ArrayList; +import java.util.Collections; +import java.util.HashMap; +import java.util.HashSet; +import java.util.List; +import java.util.Map; +import java.util.Objects; +import java.util.Optional; +import java.util.Set; + +import static org.apache.kafka.coordinator.group.RecordHelpers.newTargetAssignmentEpochRecord; +import static org.apache.kafka.coordinator.group.RecordHelpers.newTargetAssignmentRecord; + +/** + * Build a new Target Assignment based on the provided parameters. As a result, + * it yields the records that must be persisted to the log and the new member + * assignments as a map. + * + * Records are only created for members which have a new target assignment. If + * their assignment did not change, no new record is needed. + * + * When a member is deleted, it is assumed that its target assignment record + * is deleted as part of the member deletion process. In other words, this class + * does not yield a tombstone for removed members. + */ +public class TargetAssignmentBuilder { +/** + * The assignment result returned by {{@link TargetAssignmentBuilder#build()}}. + */ +public static class TargetAssignmentResult { +/** + * The records that must be applied to the __consumer_offsets + * topics to persist the new target assignment. + */ +private final List records; + +/** + * The new target assignment for all members. + */ +private final Map assignments; + +TargetAssignmentResult( +List records, +Map assignments +) { +Objects.requireNonNull(records); +Objects.requireNonNull(assignments); +this.records = records; +this.assignments = assignments; +} + +/** + * @return The records. + */ +public List records() { +return records; +} + +/** + * @return The assignments. + */ +public Map assignments() { +return assignments; +} +} + +/** + * The group id. + */ +private final String groupId; + +/** + * The group epoch. + */ +private final int groupEpoch; + +/** + * The partition assignor used to compute the assignment. + */ +private final PartitionAssignor assignor; + +/** + * The members in the group. + */ +private Map members = Collections.emptyMap(); + +/** + * The subscription metadata. + */ +private Map subscriptionMetadata = Collections.emptyMap(); + +/** + * The current target assignment. + */ +private Map assignments = Collections.emptyMap(); + +/** + * The members which have been updated or deleted. Deleted members + * are signaled by a null value. + */ +private final Map updatedMembers = new HashMap<>(); + +/** + * Constructs the object. + * + * @param groupId The group id. + * @param groupEpochThe group epoch to compute a target assignment for. + * @param assignor The assignor to use to compute the target assignment. + */ +public TargetAssignmentBuilder( +String groupId, +int groupEpoch, +
[GitHub] [kafka] dajac commented on a diff in pull request #13637: KAFKA-14462; [10/N] Add TargetAssignmentBuilder
dajac commented on code in PR #13637: URL: https://github.com/apache/kafka/pull/13637#discussion_r1180406549 ## group-coordinator/src/main/java/org/apache/kafka/coordinator/group/consumer/TargetAssignmentBuilder.java: ## @@ -0,0 +1,327 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.kafka.coordinator.group.consumer; + +import org.apache.kafka.common.Uuid; +import org.apache.kafka.coordinator.group.Record; +import org.apache.kafka.coordinator.group.assignor.AssignmentMemberSpec; +import org.apache.kafka.coordinator.group.assignor.AssignmentSpec; +import org.apache.kafka.coordinator.group.assignor.AssignmentTopicMetadata; +import org.apache.kafka.coordinator.group.assignor.GroupAssignment; +import org.apache.kafka.coordinator.group.assignor.MemberAssignment; +import org.apache.kafka.coordinator.group.assignor.PartitionAssignor; +import org.apache.kafka.coordinator.group.assignor.PartitionAssignorException; + +import java.util.ArrayList; +import java.util.Collections; +import java.util.HashMap; +import java.util.HashSet; +import java.util.List; +import java.util.Map; +import java.util.Objects; +import java.util.Optional; +import java.util.Set; + +import static org.apache.kafka.coordinator.group.RecordHelpers.newTargetAssignmentEpochRecord; +import static org.apache.kafka.coordinator.group.RecordHelpers.newTargetAssignmentRecord; + +/** + * Build a new Target Assignment based on the provided parameters. As a result, + * it yields the records that must be persisted to the log and the new member + * assignments as a map. + * + * Records are only created for members which have a new target assignment. If + * their assignment did not change, no new record is needed. + * + * When a member is deleted, it is assumed that its target assignment record + * is deleted as part of the member deletion process. In other words, this class + * does not yield a tombstone for removed members. + */ +public class TargetAssignmentBuilder { +/** + * The assignment result returned by {{@link TargetAssignmentBuilder#build()}}. + */ +public static class TargetAssignmentResult { +/** + * The records that must be applied to the __consumer_offsets + * topics to persist the new target assignment. + */ +private final List records; + +/** + * The new target assignment for all members. + */ +private final Map assignments; + +TargetAssignmentResult( +List records, +Map assignments +) { +Objects.requireNonNull(records); +Objects.requireNonNull(assignments); +this.records = records; +this.assignments = assignments; +} + +/** + * @return The records. + */ +public List records() { +return records; +} + +/** + * @return The assignments. + */ +public Map assignments() { +return assignments; +} +} + +/** + * The group id. + */ +private final String groupId; + +/** + * The group epoch. + */ +private final int groupEpoch; + +/** + * The partition assignor used to compute the assignment. + */ +private final PartitionAssignor assignor; + +/** + * The members in the group. + */ +private Map members = Collections.emptyMap(); + +/** + * The subscription metadata. + */ +private Map subscriptionMetadata = Collections.emptyMap(); + +/** + * The current target assignment. + */ +private Map assignments = Collections.emptyMap(); + +/** + * The members which have been updated or deleted. Deleted members + * are signaled by a null value. + */ +private final Map updatedMembers = new HashMap<>(); + +/** + * Constructs the object. + * + * @param groupId The group id. + * @param groupEpochThe group epoch to compute a target assignment for. + * @param assignor The assignor to use to compute the target assignment. + */ +public TargetAssignmentBuilder( +String groupId, +int groupEpoch, +
[GitHub] [kafka] dajac commented on a diff in pull request #13637: KAFKA-14462; [10/N] Add TargetAssignmentBuilder
dajac commented on code in PR #13637: URL: https://github.com/apache/kafka/pull/13637#discussion_r1180405622 ## group-coordinator/src/main/java/org/apache/kafka/coordinator/group/consumer/TargetAssignmentBuilder.java: ## @@ -0,0 +1,327 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.kafka.coordinator.group.consumer; + +import org.apache.kafka.common.Uuid; +import org.apache.kafka.coordinator.group.Record; +import org.apache.kafka.coordinator.group.assignor.AssignmentMemberSpec; +import org.apache.kafka.coordinator.group.assignor.AssignmentSpec; +import org.apache.kafka.coordinator.group.assignor.AssignmentTopicMetadata; +import org.apache.kafka.coordinator.group.assignor.GroupAssignment; +import org.apache.kafka.coordinator.group.assignor.MemberAssignment; +import org.apache.kafka.coordinator.group.assignor.PartitionAssignor; +import org.apache.kafka.coordinator.group.assignor.PartitionAssignorException; + +import java.util.ArrayList; +import java.util.Collections; +import java.util.HashMap; +import java.util.HashSet; +import java.util.List; +import java.util.Map; +import java.util.Objects; +import java.util.Optional; +import java.util.Set; + +import static org.apache.kafka.coordinator.group.RecordHelpers.newTargetAssignmentEpochRecord; +import static org.apache.kafka.coordinator.group.RecordHelpers.newTargetAssignmentRecord; + +/** + * Build a new Target Assignment based on the provided parameters. As a result, + * it yields the records that must be persisted to the log and the new member + * assignments as a map. + * + * Records are only created for members which have a new target assignment. If + * their assignment did not change, no new record is needed. + * + * When a member is deleted, it is assumed that its target assignment record + * is deleted as part of the member deletion process. In other words, this class + * does not yield a tombstone for removed members. Review Comment: As the first sentence says: `When a member is deleted, it is assumed that its target assignment record is deleted as part of the member deletion process.` -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] dajac commented on a diff in pull request #13637: KAFKA-14462; [10/N] Add TargetAssignmentBuilder
dajac commented on code in PR #13637: URL: https://github.com/apache/kafka/pull/13637#discussion_r1180404680 ## group-coordinator/src/test/java/org/apache/kafka/coordinator/group/consumer/ConsumerGroupMemberTest.java: ## @@ -0,0 +1,307 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.kafka.coordinator.group.consumer; + +import org.apache.kafka.common.Uuid; +import org.apache.kafka.coordinator.group.generated.ConsumerGroupCurrentMemberAssignmentValue; +import org.apache.kafka.coordinator.group.generated.ConsumerGroupMemberMetadataValue; +import org.junit.jupiter.api.Test; + +import java.nio.ByteBuffer; +import java.util.Arrays; +import java.util.Collections; +import java.util.Optional; +import java.util.OptionalInt; + +import static org.apache.kafka.coordinator.group.consumer.AssignmentTestUtil.mkAssignment; +import static org.apache.kafka.coordinator.group.consumer.AssignmentTestUtil.mkTopicAssignment; +import static org.junit.jupiter.api.Assertions.assertEquals; + +public class ConsumerGroupMemberTest { + +@Test +public void testNewMember() { +Uuid topicId1 = Uuid.randomUuid(); +Uuid topicId2 = Uuid.randomUuid(); +Uuid topicId3 = Uuid.randomUuid(); + +ConsumerGroupMember member = new ConsumerGroupMember.Builder("member-id") +.setMemberEpoch(10) +.setPreviousMemberEpoch(9) +.setNextMemberEpoch(11) +.setInstanceId("instance-id") +.setRackId("rack-id") +.setRebalanceTimeoutMs(5000) +.setClientId("client-id") +.setClientHost("hostname") +.setSubscribedTopicNames(Arrays.asList("foo", "bar")) +.setSubscribedTopicRegex("regex") +.setServerAssignorName("range") +.setClientAssignors(Collections.singletonList( +new ClientAssignor( +"assignor", +(byte) 0, +(byte) 0, +(byte) 1, +new VersionedMetadata( +(byte) 1, +ByteBuffer.allocate(0) +.setAssignedPartitions(mkAssignment( +mkTopicAssignment(topicId1, 1, 2, 3))) +.setPartitionsPendingRevocation(mkAssignment( +mkTopicAssignment(topicId2, 4, 5, 6))) +.setPartitionsPendingAssignment(mkAssignment( +mkTopicAssignment(topicId3, 7, 8, 9))) +.build(); + +assertEquals("member-id", member.memberId()); +assertEquals(10, member.memberEpoch()); +assertEquals(9, member.previousMemberEpoch()); +assertEquals(11, member.nextMemberEpoch()); +assertEquals("instance-id", member.instanceId()); +assertEquals("rack-id", member.rackId()); +assertEquals("client-id", member.clientId()); +assertEquals("hostname", member.clientHost()); +// Names are sorted. +assertEquals(Arrays.asList("bar", "foo"), member.subscribedTopicNames()); +assertEquals("regex", member.subscribedTopicRegex()); +assertEquals("range", member.serverAssignorName().get()); +assertEquals( +Collections.singletonList( +new ClientAssignor( +"assignor", +(byte) 0, +(byte) 0, +(byte) 1, +new VersionedMetadata( +(byte) 1, +ByteBuffer.allocate(0, +member.clientAssignors()); +assertEquals(mkAssignment(mkTopicAssignment(topicId1, 1, 2, 3)), member.assignedPartitions()); +assertEquals(mkAssignment(mkTopicAssignment(topicId2, 4, 5, 6)), member.partitionsPendingRevocation()); +assertEquals(mkAssignment(mkTopicAssignment(topicId3, 7, 8, 9)), member.partitionsPendingAssignment()); +} + +@Test +public void testEquals() { +Uuid topicId1 = Uuid.randomUuid(); Review Comment: I don't get the relation with the static class. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go
[jira] [Created] (KAFKA-14951) Changing the cleanup.policy on an existing topic from "delete" to "compact" seems to change it to "compact,delete" even though it says "compact"
Tomasz Kaszuba created KAFKA-14951: -- Summary: Changing the cleanup.policy on an existing topic from "delete" to "compact" seems to change it to "compact,delete" even though it says "compact" Key: KAFKA-14951 URL: https://issues.apache.org/jira/browse/KAFKA-14951 Project: Kafka Issue Type: Bug Affects Versions: 2.8.2 Reporter: Tomasz Kaszuba We've seemed to have noticed something strange when we change the cleanup.policy of an existing topic from "delete" to "compact". It seems that even though the policy is set to "compact" the retention.ms is not ignored and the topic is still cleaned up when the retention time is hit, suggesting that the actual policy is "compact, delete" and not "compact". If this is the case could the metadata return "compact, delete" instead of "compact" when describing a topic? I have not found any information if this is normal behavior but I noticed that this kind of change is blocked in the confluent cloud so perhaps this is a known issue. [https://docs.confluent.io/cloud/current/clusters/broker-config.html#custom-topic-settings-for-all-cluster-types] _You cannot change the {{cleanup.policy}} from {{delete}} to {{compact}} after a topic has been created_ -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Commented] (KAFKA-14947) Duplicate records are getting created in the topic.
[ https://issues.apache.org/jira/browse/KAFKA-14947?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17717662#comment-17717662 ] krishnendu Das commented on KAFKA-14947: [~yash.mayya] Thanks for your comment. I have two points based on your comments/suggestion. With our existing connect API code and with the Kafka server (2.6.2), our ingestion mechanism was working fine in the live environment. We checked the Kafka server (2.6.2) WorkerSourceTask::execute() method, and it was following the below-mentioned execution path # Poll the task for the new data. # If get any data, save the new data into the Kafka topic. # Commit the offset. Still today we are able to ingest huge volumes of data in the live env using the Kafka server version 2.6.2. No data duplication is happening. Now we have to upgrade the Kafka version to 3.1.1. The WorkerSourceTask::execute() method in the Kafka server version v3.1.1, it was following the below-mentioned execution path # update the committableOffsets variable (if any offset commit is pending) which is being shared between execute() and commitoffset() functions. # Poll the task for the new data. # If got any data, save the new data into the Kafka topic. Because of the above-mentioned implementation order, the shared variable committableOffsets get set with the latest offset value in the 2nd poll only and will be committed in the next call of WorkerSourceTask::commitOffsets(). So, in the 2nd poll what data u are reading that you already read in the 1st poll itself, wrote to the topic, and commit offset will happen in some time. Because of this execution flow of Kafka server execute(), we are getting duplicate data into the topics. And As you had mentioned "In your provided scenario, why can't the connector simply read from its previous position in the second poll since it should be maintaining some internal state?"...we can store the per file current offset stage in-memory object. But that willn't be persistent. At every start, the object will be reset. Any suggestion, on how we can make it persistent in the new Kafka server (3.1.1) > Duplicate records are getting created in the topic. > > > Key: KAFKA-14947 > URL: https://issues.apache.org/jira/browse/KAFKA-14947 > Project: Kafka > Issue Type: Bug > Components: producer >Affects Versions: 3.1.1 >Reporter: krishnendu Das >Priority: Blocker > Attachments: Kafka_server_3.1.1_data_duplication_issue_log > > > We are using Kafka connect API (version 2.3.0) and Kafka (version 3.1.1) for > data ingestion purposes. Previously we were using Kafka (version 2.6.2) and > the same Kafka connect API (version 2.3.0). The data ingestion was happening > properly. > > Recently we updated the Kafka version from 2.6.2 to 3.1.1. > Post update we are facing duplicate data issues from the source connector > into the Kafka topic. After debugging the 3.1.1 code, we saw one new function > {*}updateCommittableOffsets{*}() got added and called inside the > {*}WorkerSourceTask::execute{*}() as part of bug fix --"KAFKA-12226: Commit > source task offsets without blocking on batch delivery (#11323)" > > Now because of this function, we are observing this scenario > # Inside the execute() at the start of the flow, the call goes to > updateCommittableOffsets() to check if anything was there to perform the > committed offset or not. As the first poll is still not yet happened, this > function didn't find anything for commit. > # Then Kafka connects API poll() method is called from the > WorkerSourceTask::execute(). *-> 1st poll* > # Kafka Connect API (using sleepy policy) reads one source file from the > Cloud source directory. > # Read the whole content of the file and send the result set Kafka server to > write to the Kafka topic. > # During the 2nd poll updateCommittableOffsets() found some offset to commit > and its updates a reference variable committableOffsets, which will be used > further by the WorkerSourceTask::commitOffsets() function to perform actual > commit offset. > # Then Kafka connects the API poll() method is called from the > *WorkerSourceTask::execute().* *-> 2nd poll* > # Kafka Connect API (using sleepy policy) reads the same source file again > from the start, as the offsetStrorageReader::offset() didn’t give the latest > offset. > # Read the whole content of the file and send the result set Kafka server to > write to the Kafka topic.---> These create duplicate data into the topic. > > > # WorkerSourceTask::commitOffsets() commits the offset. > >
[GitHub] [kafka] divijvaidya commented on a diff in pull request #13535: KAFKA-9579 Fetch implementation for records in the remote storage through a specific purgatory.
divijvaidya commented on code in PR #13535: URL: https://github.com/apache/kafka/pull/13535#discussion_r1180144730 ## core/src/main/java/kafka/log/remote/RemoteLogManager.java: ## @@ -600,25 +622,208 @@ public String toString() { } } -long findHighestRemoteOffset(TopicIdPartition topicIdPartition) throws RemoteStorageException { -Optional offset = Optional.empty(); -Optional maybeLog = fetchLog.apply(topicIdPartition.topicPartition()); -if (maybeLog.isPresent()) { -UnifiedLog log = maybeLog.get(); -Option maybeLeaderEpochFileCache = log.leaderEpochCache(); -if (maybeLeaderEpochFileCache.isDefined()) { -LeaderEpochFileCache cache = maybeLeaderEpochFileCache.get(); -OptionalInt epoch = cache.latestEpoch(); -while (!offset.isPresent() && epoch.isPresent()) { -offset = remoteLogMetadataManager.highestOffsetForEpoch(topicIdPartition, epoch.getAsInt()); -epoch = cache.previousEpoch(epoch.getAsInt()); +public FetchDataInfo read(RemoteStorageFetchInfo remoteStorageFetchInfo) throws RemoteStorageException, IOException { +int fetchMaxBytes = remoteStorageFetchInfo.fetchMaxBytes; +TopicPartition tp = remoteStorageFetchInfo.topicPartition; +FetchRequest.PartitionData fetchInfo = remoteStorageFetchInfo.fetchInfo; + +boolean includeAbortedTxns = remoteStorageFetchInfo.fetchIsolation == FetchIsolation.TXN_COMMITTED; + +long offset = fetchInfo.fetchOffset; +int maxBytes = Math.min(fetchMaxBytes, fetchInfo.maxBytes); + +Optional logOptional = fetchLog.apply(tp); +OptionalInt epoch = OptionalInt.empty(); + +if (logOptional.isPresent()) { +Option leaderEpochCache = logOptional.get().leaderEpochCache(); +if (leaderEpochCache.isDefined()) { +epoch = leaderEpochCache.get().epochForOffset(offset); +} +} + +Optional rlsMetadataOptional = epoch.isPresent() +? fetchRemoteLogSegmentMetadata(tp, epoch.getAsInt(), offset) +: Optional.empty(); + +if (!rlsMetadataOptional.isPresent()) { +String epochStr = (epoch.isPresent()) ? Integer.toString(epoch.getAsInt()) : "NOT AVAILABLE"; +throw new OffsetOutOfRangeException("Received request for offset " + offset + " for leader epoch " ++ epochStr + " and partition " + tp + " which does not exist in remote tier."); +} + +RemoteLogSegmentMetadata remoteLogSegmentMetadata = rlsMetadataOptional.get(); +int startPos = lookupPositionForOffset(remoteLogSegmentMetadata, offset); +InputStream remoteSegInputStream = null; +try { +// Search forward for the position of the last offset that is greater than or equal to the target offset +remoteSegInputStream = remoteLogStorageManager.fetchLogSegment(remoteLogSegmentMetadata, startPos); +RemoteLogInputStream remoteLogInputStream = new RemoteLogInputStream(remoteSegInputStream); Review Comment: Perf comment: not blocking for this PR Note that `RemoteLogInputStream` performs a heap allocation here for each batch. This allocation will increase GC activity with the size of segment. Perhaps, we can use BufferSupplier (we allocate one pool per request handler thread, see `requestLocal.bufferSupplier` in KafkaAPI.scala) here in future. ## core/src/main/java/kafka/log/remote/RemoteLogManager.java: ## @@ -600,25 +622,208 @@ public String toString() { } } -long findHighestRemoteOffset(TopicIdPartition topicIdPartition) throws RemoteStorageException { -Optional offset = Optional.empty(); -Optional maybeLog = fetchLog.apply(topicIdPartition.topicPartition()); -if (maybeLog.isPresent()) { -UnifiedLog log = maybeLog.get(); -Option maybeLeaderEpochFileCache = log.leaderEpochCache(); -if (maybeLeaderEpochFileCache.isDefined()) { -LeaderEpochFileCache cache = maybeLeaderEpochFileCache.get(); -OptionalInt epoch = cache.latestEpoch(); -while (!offset.isPresent() && epoch.isPresent()) { -offset = remoteLogMetadataManager.highestOffsetForEpoch(topicIdPartition, epoch.getAsInt()); -epoch = cache.previousEpoch(epoch.getAsInt()); +public FetchDataInfo read(RemoteStorageFetchInfo remoteStorageFetchInfo) throws RemoteStorageException, IOException { +int fetchMaxBytes = remoteStorageFetchInfo.fetchMaxBytes; +TopicPartition tp = remoteStorageFetchInfo.topicPartition; +FetchRequest.PartitionData fetchInfo = remoteStorageFetchInfo.fetchInfo; + +boolean includeAbortedTxns = remoteStorageFetchInfo.fetchIsolation == FetchIsolation.TXN_COMMITTED; + +
[GitHub] [kafka] mimaison merged pull request #13649: MINOR: Fix incorrect description of SessionLifetimeMs
mimaison merged PR #13649: URL: https://github.com/apache/kafka/pull/13649 -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] divijvaidya commented on a diff in pull request #13561: KAFKA-14888: Added remote log segments retention functionality based on time and size.
divijvaidya commented on code in PR #13561: URL: https://github.com/apache/kafka/pull/13561#discussion_r1180254846 ## core/src/main/java/kafka/log/remote/RemoteLogManager.java: ## @@ -581,11 +588,18 @@ public void run() { if (isLeader()) { // Copy log segments to remote storage copyLogSegmentsToRemote(); +// Cleanup/delete expired remote log segments +cleanupExpiredRemoteLogSegments(); Review Comment: That is fair. As I mentioned in the [review overview](https://github.com/apache/kafka/pull/13561#pullrequestreview-1385125558), I am fine (and would actually prefer) with creating JIRAs and tackling the perf related comments outside this PR. With this comment, I wanted to make sure we are aware and are tracking things that need fixing in this code. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] aiven-anton commented on a diff in pull request #13649: MINOR: Fix incorrect description of SessionLifetimeMs
aiven-anton commented on code in PR #13649: URL: https://github.com/apache/kafka/pull/13649#discussion_r1180246714 ## clients/src/main/resources/common/message/SaslAuthenticateResponse.json: ## @@ -29,6 +29,6 @@ { "name": "AuthBytes", "type": "bytes", "versions": "0+", "about": "The SASL authentication bytes from the server, as defined by the SASL mechanism." }, { "name": "SessionLifetimeMs", "type": "int64", "versions": "1+", "default": "0", "ignorable": true, - "about": "The SASL authentication bytes from the server, as defined by the SASL mechanism." } + "about": "The duration in milliseconds that a successful session response is valid for." } Review Comment: I updated with original copy as suggested. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] yashmayya commented on pull request #13646: KAFKA-14938: Fixing flaky test testConnectorBoundary
yashmayya commented on PR #13646: URL: https://github.com/apache/kafka/pull/13646#issuecomment-1527292922 Thanks @sambhav-jain-16 > What I suspect is happening is that when the method is initially storing the end offsets of the partitions, the connector hasn't produced 100 records till then and therefore the method doesn't consume fully even though messages are being produced by the connector. I'm not sure how this is is possible given that we're waiting for `MINIMUM_MESSAGES` to be committed first?https://github.com/apache/kafka/blob/c6ad151ac3bac0d8d1d6985d230eacaa170b8984/connect/runtime/src/test/java/org/apache/kafka/connect/integration/ExactlyOnceSourceIntegrationTest.java#L399-L410 (i.e. `SourceTask::commitRecord` is called `MINIMUM_MESSAGES` number of times). Records are "committed" only after the producer transaction is committed successfully - https://github.com/apache/kafka/blob/c6ad151ac3bac0d8d1d6985d230eacaa170b8984/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/ExactlyOnceWorkerSourceTask.java#L302-L332 -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] dajac commented on a diff in pull request #13644: KAFKA-14500; [1/N] Rewrite MemberMetadata in Java
dajac commented on code in PR #13644: URL: https://github.com/apache/kafka/pull/13644#discussion_r1180180911 ## group-coordinator/src/main/java/org/apache/kafka/coordinator/group/generic/GenericGroupMember.java: ## @@ -0,0 +1,560 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.kafka.coordinator.group.generic; + +import org.apache.kafka.common.message.JoinGroupResponseData; +import org.apache.kafka.common.message.SyncGroupResponseData; + +import java.util.Arrays; +import java.util.Collections; +import java.util.List; +import java.util.Objects; +import java.util.Optional; +import java.util.Set; +import java.util.concurrent.CompletableFuture; +import java.util.stream.Collectors; + +/** + * Java rewrite of {@link kafka.coordinator.group.MemberMetadata} that is used + * by the new group coordinator (KIP-848). Review Comment: We should not put javadoc like this. Let's describe what it is. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Resolved] (KAFKA-14639) Kafka CooperativeStickyAssignor revokes/assigns partition in one rebalance cycle
[ https://issues.apache.org/jira/browse/KAFKA-14639?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] David Jacot resolved KAFKA-14639. - Fix Version/s: 3.5.0 Reviewer: David Jacot Resolution: Fixed > Kafka CooperativeStickyAssignor revokes/assigns partition in one rebalance > cycle > > > Key: KAFKA-14639 > URL: https://issues.apache.org/jira/browse/KAFKA-14639 > Project: Kafka > Issue Type: Bug > Components: clients, consumer >Affects Versions: 3.2.1 >Reporter: Bojan Blagojevic >Assignee: Philip Nee >Priority: Major > Fix For: 3.5.0 > > Attachments: consumers-jira.log > > > I have an application that runs 6 consumers in parallel. I am getting some > unexpected results when I use {{{}CooperativeStickyAssignor{}}}. If I > understand the mechanism correctly, if the consumer looses partition in one > rebalance cycle, the partition should be assigned in the next rebalance cycle. > This assumption is based on the > [RebalanceProtocol|https://kafka.apache.org/31/javadoc/org/apache/kafka/clients/consumer/ConsumerPartitionAssignor.RebalanceProtocol.html] > documentation and few blog posts that describe the protocol, like [this > one|https://www.confluent.io/blog/cooperative-rebalancing-in-kafka-streams-consumer-ksqldb/] > on Confluent blog. > {quote}The assignor should not reassign any owned partitions immediately, but > instead may indicate consumers the need for partition revocation so that the > revoked partitions can be reassigned to other consumers in the next rebalance > event. This is designed for sticky assignment logic which attempts to > minimize partition reassignment with cooperative adjustments. > {quote} > {quote}Any member that revoked partitions then rejoins the group, triggering > a second rebalance so that its revoked partitions can be assigned. Until > then, these partitions are unowned and unassigned. > {quote} > These are the logs from the application that uses > {{{}protocol='cooperative-sticky'{}}}. In the same rebalance cycle > ({{{}generationId=640{}}}) {{partition 74}} moves from {{consumer-3}} to > {{{}consumer-4{}}}. I omitted the lines that are logged by the other 4 > consumers. > Mind that the log is in reverse(bottom to top) > {code:java} > 2022-12-14 11:18:24 1 — [consumer-3] x.y.z.MyRebalanceHandler1 : New > partition assignment: partition-59, seek to min common offset: 85120524 > 2022-12-14 11:18:24 1 — [consumer-3] x.y.z.MyRebalanceHandler2 : Partitions > [partition-59] assigned successfully > 2022-12-14 11:18:24 1 — [consumer-3] x.y.z.MyRebalanceHandler1 : Partitions > assigned: [partition-59] > 2022-12-14 11:18:24 1 — [consumer-3] o.a.k.c.c.internals.ConsumerCoordinator > : [Consumer clientId=partition-3-my-client-id-my-group-id, > groupId=my-group-id] Adding newly assigned partitions: partition-59 > 2022-12-14 11:18:24 1 — [consumer-3] o.a.k.c.c.internals.ConsumerCoordinator > : [Consumer clientId=partition-3-my-client-id-my-group-id, > groupId=my-group-id] Notifying assignor about the new > Assignment(partitions=[partition-59]) > 2022-12-14 11:18:24 1 — [consumer-3] o.a.k.c.c.internals.ConsumerCoordinator > : [Consumer clientId=partition-3-my-client-id-my-group-id, > groupId=my-group-id] Request joining group due to: need to revoke partitions > [partition-26, partition-74] as indicated by the current assignment and > re-join > 2022-12-14 11:18:24 1 — [consumer-3] x.y.z.MyRebalanceHandler2 : Partitions > [partition-26, partition-74] revoked successfully > 2022-12-14 11:18:24 1 — [consumer-3] x.y.z.MyRebalanceHandler1 : Finished > removing partition data > 2022-12-14 11:18:24 1 — [consumer-4] o.a.k.c.c.internals.ConsumerCoordinator > : [Consumer clientId=partition-4-my-client-id-my-group-id, > groupId=my-group-id] (Re-)joining group > 2022-12-14 11:18:24 1 — [consumer-4] x.y.z.MyRebalanceHandler1 : New > partition assignment: partition-74, seek to min common offset: 107317730 > 2022-12-14 11:18:24 1 — [consumer-4] x.y.z.MyRebalanceHandler2 : Partitions > [partition-74] assigned successfully > 2022-12-14 11:18:24 1 — [consumer-4] x.y.z.MyRebalanceHandler1 : Partitions > assigned: [partition-74] > 2022-12-14 11:18:24 1 — [consumer-4] o.a.k.c.c.internals.ConsumerCoordinator > : [Consumer clientId=partition-4-my-client-id-my-group-id, > groupId=my-group-id] Adding newly assigned partitions: partition-74 > 2022-12-14 11:18:24 1 — [consumer-4] o.a.k.c.c.internals.ConsumerCoordinator > : [Consumer clientId=partition-4-my-client-id-my-group-id, > groupId=my-group-id] Notifying assignor about the new > Assignment(partitions=[partition-74]) > 2022-12-14 11:18:24 1 — [consumer-4] o.a.k.c.c.internals.ConsumerCoordinator > : [Consumer
[GitHub] [kafka] dajac commented on pull request #13550: KAFKA-14639: A single partition may be revoked and assign during a single round of rebalance
dajac commented on PR #13550: URL: https://github.com/apache/kafka/pull/13550#issuecomment-1527244890 Merged to trunk and 3.5. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] showuon commented on a diff in pull request #13561: KAFKA-14888: Added remote log segments retention functionality based on time and size.
showuon commented on code in PR #13561: URL: https://github.com/apache/kafka/pull/13561#discussion_r1180121259 ## core/src/main/java/kafka/log/remote/RemoteLogManager.java: ## @@ -581,11 +588,18 @@ public void run() { if (isLeader()) { // Copy log segments to remote storage copyLogSegmentsToRemote(); +// Cleanup/delete expired remote log segments +cleanupExpiredRemoteLogSegments(); +} else { +Optional unifiedLogOptional = fetchLog.apply(topicIdPartition.topicPartition()); +if (unifiedLogOptional.isPresent()) { +long offset = findHighestRemoteOffset(topicIdPartition); + unifiedLogOptional.get().updateHighestOffsetInRemoteStorage(offset); Review Comment: We might need to add logs here to describe why we need to update highest offset in remote storage for followers. I think that's for fetch from follower replica feature, right? ## core/src/main/java/kafka/log/remote/RemoteLogManager.java: ## @@ -595,6 +609,193 @@ public void run() { } } +public void handleLogStartOffsetUpdate(TopicPartition topicPartition, long remoteLogStartOffset) { +logger.debug("Updating $topicPartition with remoteLogStartOffset: {}", remoteLogStartOffset); +updateRemoteLogStartOffset.accept(topicPartition, remoteLogStartOffset); +} + +class RemoteLogRetentionHandler { + +private long remainingBreachedSize = 0L; +private OptionalLong logStartOffset = OptionalLong.empty(); + +public RemoteLogRetentionHandler(long remainingBreachedSize) { +this.remainingBreachedSize = remainingBreachedSize; +} + +private boolean deleteRetentionSizeBreachedSegments(RemoteLogSegmentMetadata metadata, boolean checkSizeRetention) throws RemoteStorageException, ExecutionException, InterruptedException { +boolean isSegmentDeleted = deleteRemoteLogSegment(metadata, x -> { +// Assumption that segments contain size > 0 Review Comment: nit: assume that segments contain size >= 0 ## core/src/main/java/kafka/log/remote/RemoteLogManager.java: ## @@ -595,6 +609,193 @@ public void run() { } } +public void handleLogStartOffsetUpdate(TopicPartition topicPartition, long remoteLogStartOffset) { +logger.debug("Updating $topicPartition with remoteLogStartOffset: {}", remoteLogStartOffset); +updateRemoteLogStartOffset.accept(topicPartition, remoteLogStartOffset); +} + +class RemoteLogRetentionHandler { + +private long remainingBreachedSize = 0L; +private OptionalLong logStartOffset = OptionalLong.empty(); + +public RemoteLogRetentionHandler(long remainingBreachedSize) { +this.remainingBreachedSize = remainingBreachedSize; +} + +private boolean deleteRetentionSizeBreachedSegments(RemoteLogSegmentMetadata metadata, boolean checkSizeRetention) throws RemoteStorageException, ExecutionException, InterruptedException { +boolean isSegmentDeleted = deleteRemoteLogSegment(metadata, x -> { +// Assumption that segments contain size > 0 +if (checkSizeRetention && remainingBreachedSize > 0) { +remainingBreachedSize -= x.segmentSizeInBytes(); +return remainingBreachedSize >= 0; +} else return false; +}); +if (isSegmentDeleted) { +logStartOffset = OptionalLong.of(metadata.endOffset() + 1); +logger.info("Deleted remote log segment ${metadata.remoteLogSegmentId()} due to retention size " + +"${log.config.retentionSize} breach. Log size after deletion will be " + +"${remainingBreachedSize + log.config.retentionSize}."); +} +return isSegmentDeleted; +} + +private boolean deleteLogStartOffsetBreachedSegments(RemoteLogSegmentMetadata metadata, long startOffset) +throws RemoteStorageException, ExecutionException, InterruptedException { +boolean isSegmentDeleted = deleteRemoteLogSegment(metadata, x -> startOffset > x.endOffset()); +if (isSegmentDeleted) { +remainingBreachedSize = Math.max(0, remainingBreachedSize - metadata.segmentSizeInBytes()); +logger.info("Deleted remote log segment {} due to log start offset {} breach", metadata.remoteLogSegmentId(), startOffset); +} + +// No need to update the logStartOffset. +return isSegmentDeleted; +
[GitHub] [kafka] dajac merged pull request #13550: KAFKA-14639: A single partition may be revoked and assign during a single round of rebalance
dajac merged PR #13550: URL: https://github.com/apache/kafka/pull/13550 -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] mdedetrich commented on pull request #11792: Replace EasyMock/PowerMock with Mockito in DistributedHerderTest
mdedetrich commented on PR #11792: URL: https://github.com/apache/kafka/pull/11792#issuecomment-1527237583 @yashmayya I don't really have time to finish this any time soon so feel free to take over -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] showuon commented on pull request #13535: KAFKA-9579 Fetch implementation for records in the remote storage through a specific purgatory.
showuon commented on PR #13535: URL: https://github.com/apache/kafka/pull/13535#issuecomment-1527204055 @Hangleton @junrao @jeqo , any other comments to this PR? We hope we can merge it in the early stage of a release, so that we can have enough time to test the stability and have more improvement. Thanks. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] mimaison merged pull request #13171: KAFKA-14584: Deprecate StateChangeLogMerger tool
mimaison merged PR #13171: URL: https://github.com/apache/kafka/pull/13171 -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] fvaleri commented on a diff in pull request #13514: KAFKA-14752: Kafka examples improvements - consumer changes
fvaleri commented on code in PR #13514: URL: https://github.com/apache/kafka/pull/13514#discussion_r1180003269 ## examples/src/main/java/kafka/examples/Consumer.java: ## @@ -91,9 +99,13 @@ public void run() { // we can't recover from these exceptions Utils.printErr(e.getMessage()); shutdown(); +} catch (OffsetOutOfRangeException | NoOffsetForPartitionException e) { +// invalid or no offset found without auto.reset.policy +Utils.printOut("Invalid or no offset found, using latest"); +consumer.seekToEnd(emptyList()); Review Comment: Yes, that makes sense. Thanks Luke. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] showuon merged pull request #13631: KAFKA-14909: check zkMigrationReady tag before migration
showuon merged PR #13631: URL: https://github.com/apache/kafka/pull/13631 -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] showuon commented on pull request #13631: KAFKA-14909: check zkMigrationReady tag before migration
showuon commented on PR #13631: URL: https://github.com/apache/kafka/pull/13631#issuecomment-1527058819 Failed tests are unrelated: ``` Build / JDK 11 and Scala 2.13 / org.apache.kafka.connect.runtime.distributed.DistributedHerderTest.testExternalZombieFencingRequestAsynchronousFailure Build / JDK 17 and Scala 2.13 / integration.kafka.server.FetchFromFollowerIntegrationTest.testRackAwareRangeAssignor() Build / JDK 17 and Scala 2.13 / kafka.server.CreateTopicsRequestTest.testErrorCreateTopicsRequests(String).quorum=kraft Build / JDK 17 and Scala 2.13 / org.apache.kafka.trogdor.coordinator.CoordinatorTest.testTaskRequestWithOldStartMsGetsUpdated() ``` -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] sambhav-jain-16 commented on pull request #13646: KAFKA-14938: Fixing flaky test testConnectorBoundary
sambhav-jain-16 commented on PR #13646: URL: https://github.com/apache/kafka/pull/13646#issuecomment-1527055473 > How would this fix be any different from that i.e if consume can't get 100 messages within the timeout. Won't it still fail? Yes it will fail, but `consumeAll` is not failing due to timeout here but rather due to its nature of storing the end offsets before consuming. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] sambhav-jain-16 commented on pull request #13646: KAFKA-14938: Fixing flaky test testConnectorBoundary
sambhav-jain-16 commented on PR #13646: URL: https://github.com/apache/kafka/pull/13646#issuecomment-1527052521 @sagarrao12 The `consumeAll` method consumes the available records at the moment we start consuming, i.e. It will store the end offsets initially before consuming and then start to consume up until the end offsets stored (for each partition). The `consume` method has no such restriction, it will consume until it receives the specified number of records or the timeout. . . What I suspect is happening is that when the method is initially storing the offsets of the partitions, the connector hasn't produced 100 records till then and therefore the method doesn't consume fully even though messages are being produced by the connector. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] dajac commented on a diff in pull request #13443: KAFKA-14514: Add Range Assignor on the Server (KIP-848)
dajac commented on code in PR #13443: URL: https://github.com/apache/kafka/pull/13443#discussion_r1179969167 ## group-coordinator/src/test/java/org/apache/kafka/coordinator/group/assignor/RangeAssignorTest.java: ## @@ -0,0 +1,574 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.kafka.coordinator.group.assignor; + +import org.apache.kafka.common.Uuid; + +import org.junit.jupiter.api.Test; + +import java.util.Arrays; +import java.util.Collection; +import java.util.Collections; +import java.util.HashMap; +import java.util.HashSet; +import java.util.Map; +import java.util.Optional; +import java.util.Set; + +import static org.junit.jupiter.api.Assertions.assertEquals; +import static org.junit.jupiter.api.Assertions.assertNull; +import static org.junit.jupiter.api.Assertions.assertTrue; + +public class RangeAssignorTest { +private final RangeAssignor assignor = new RangeAssignor(); +private final Uuid topic1Uuid = Uuid.randomUuid(); +private final Uuid topic2Uuid = Uuid.randomUuid(); +private final Uuid topic3Uuid = Uuid.randomUuid(); +private final String consumerA = "A"; +private final String consumerB = "B"; +private final String consumerC = "C"; + +@Test +public void testOneConsumerNoTopic() { +Map topics = Collections.singletonMap(topic1Uuid, new AssignmentTopicMetadata(3)); +Map members = Collections.singletonMap( +consumerA, +new AssignmentMemberSpec( +Optional.empty(), +Optional.empty(), +Collections.emptyList(), +Collections.emptyMap()) +); + +AssignmentSpec assignmentSpec = new AssignmentSpec(members, topics); +GroupAssignment groupAssignment = assignor.assign(assignmentSpec); + +assertTrue(groupAssignment.members().isEmpty()); +} + +@Test +public void testOneConsumerNonExistentTopic() { +Map topics = Collections.singletonMap(topic1Uuid, new AssignmentTopicMetadata(3)); +Map members = Collections.singletonMap( +consumerA, +new AssignmentMemberSpec( +Optional.empty(), +Optional.empty(), +Collections.singletonList(topic2Uuid), +Collections.emptyMap()) +); + +AssignmentSpec assignmentSpec = new AssignmentSpec(members, topics); +GroupAssignment groupAssignment = assignor.assign(assignmentSpec); + +assertTrue(groupAssignment.members().isEmpty()); +} + +@Test +public void testFirstAssignmentTwoConsumersTwoTopicsSameSubscriptions() { +Map topics = new HashMap<>(); +topics.put(topic1Uuid, new AssignmentTopicMetadata(3)); +topics.put(topic3Uuid, new AssignmentTopicMetadata(2)); + +Map members = new HashMap<>(); +// Initial Subscriptions are: A -> T1, T3 | B -> T1, T3 + +members.put(consumerA, new AssignmentMemberSpec( +Optional.empty(), +Optional.empty(), +Arrays.asList(topic1Uuid, topic3Uuid), +Collections.emptyMap()) +); + +members.put(consumerB, new AssignmentMemberSpec( +Optional.empty(), +Optional.empty(), +Arrays.asList(topic1Uuid, topic3Uuid), +Collections.emptyMap()) +); + +AssignmentSpec assignmentSpec = new AssignmentSpec(members, topics); +GroupAssignment computedAssignment = assignor.assign(assignmentSpec); + +Map>> expectedAssignment = new HashMap<>(); +// Topic 1 Partitions Assignment +mkAssignment(expectedAssignment, topic1Uuid, Arrays.asList(0, 1)); +mkAssignment(expectedAssignment, topic1Uuid, Collections.singleton(2)); +// Topic 3 Partitions Assignment +mkAssignment(expectedAssignment, topic3Uuid, Collections.singleton(0)); +mkAssignment(expectedAssignment, topic3Uuid, Collections.singleton(1)); + +assertAssignment(expectedAssignment, computedAssignment); Review Comment: Yeah, sorry for this. I only thought about it when I raised this comment. -- This is an automated message from the Apache Git
[GitHub] [kafka] dajac commented on a diff in pull request #13443: KAFKA-14514: Add Range Assignor on the Server (KIP-848)
dajac commented on code in PR #13443: URL: https://github.com/apache/kafka/pull/13443#discussion_r1179968856 ## group-coordinator/src/main/java/org/apache/kafka/coordinator/group/assignor/RangeAssignor.java: ## @@ -0,0 +1,258 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.kafka.coordinator.group.assignor; + +import org.apache.kafka.common.Uuid; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.util.ArrayList; +import java.util.Collection; +import java.util.Collections; +import java.util.HashMap; +import java.util.HashSet; +import java.util.List; +import java.util.Map; +import java.util.Set; + +import static java.lang.Math.min; + +/** + * The Server Side Sticky Range Assignor inherits properties of both the range assignor and the sticky assignor. + * The properties are as follows: + * + * Each member must get at least one partition from every topic that it is subscribed to. The only exception is when + * the number of subscribed members is greater than the number of partitions for that topic. (Range) + * Partitions should be assigned to members in a way that facilitates the join operation when required. (Range) + * This can only be done if every member is subscribed to the same topics and the topics are co-partitioned. + * Two streams are co-partitioned if the following conditions are met: + * + * The keys must have the same schemas. + * The topics involved must have the same number of partitions. + * + * Members should retain as much of their previous assignment as possible to reduce the number of partition movements during reassignment. (Sticky) + * + */ +public class RangeAssignor implements PartitionAssignor { +private static final Logger log = LoggerFactory.getLogger(RangeAssignor.class); + +public static final String RANGE_ASSIGNOR_NAME = "range"; + +@Override +public String name() { +return RANGE_ASSIGNOR_NAME; +} + +// Used in the potentiallyUnfilledMembers map and the UnfilledMembers map. +private static class MemberWithRemainingAssignments { +private final String memberId; +/** + * Number of partitions required to meet the assignment quota + */ +private final Integer remaining; + +public MemberWithRemainingAssignments(String memberId, Integer remaining) { +this.memberId = memberId; +this.remaining = remaining; +} + +/** + * @return memberId + */ +public String memberId() { +return memberId; +} + /** + * @return Remaining number of partitions + */ +public Integer remaining() { +return remaining; +} +} + +private Map> membersPerTopic(final AssignmentSpec assignmentSpec) { +Map> membersPerTopic = new HashMap<>(); +Map membersData = assignmentSpec.members(); + +membersData.forEach((memberId, memberMetadata) -> { +Collection topics = memberMetadata.subscribedTopicIds(); +for (Uuid topicId: topics) { +// Only topics that are present in both the subscribed topics list and the topic metadata should be considered for assignment. +if (assignmentSpec.topics().containsKey(topicId)) { +membersPerTopic +.computeIfAbsent(topicId, k -> new ArrayList<>()) +.add(memberId); +} else { +log.warn("Member " + memberId + " subscribed to topic " + topicId + " which doesn't exist in the topic metadata"); +} +} +}); + +return membersPerTopic; +} + +/** + * The algorithm includes the following steps: + * + * Generate a map of membersPerTopic using the given member subscriptions. Review Comment: Interesting... It is weird to have variable names in the description. Plain english is much better than `membersPerTopic`. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail:
[GitHub] [kafka] dajac commented on a diff in pull request #13443: KAFKA-14514: Add Range Assignor on the Server (KIP-848)
dajac commented on code in PR #13443: URL: https://github.com/apache/kafka/pull/13443#discussion_r1179967228 ## group-coordinator/src/test/java/org/apache/kafka/coordinator/group/assignor/RangeAssignorTest.java: ## @@ -0,0 +1,574 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.kafka.coordinator.group.assignor; + +import org.apache.kafka.common.Uuid; + +import org.junit.jupiter.api.Test; + +import java.util.Arrays; +import java.util.Collection; +import java.util.Collections; +import java.util.HashMap; +import java.util.HashSet; +import java.util.Map; +import java.util.Optional; +import java.util.Set; + +import static org.junit.jupiter.api.Assertions.assertEquals; +import static org.junit.jupiter.api.Assertions.assertNull; +import static org.junit.jupiter.api.Assertions.assertTrue; + +public class RangeAssignorTest { +private final RangeAssignor assignor = new RangeAssignor(); +private final Uuid topic1Uuid = Uuid.randomUuid(); +private final Uuid topic2Uuid = Uuid.randomUuid(); +private final Uuid topic3Uuid = Uuid.randomUuid(); +private final String consumerA = "A"; +private final String consumerB = "B"; +private final String consumerC = "C"; + +@Test +public void testOneConsumerNoTopic() { +Map topics = Collections.singletonMap(topic1Uuid, new AssignmentTopicMetadata(3)); +Map members = Collections.singletonMap( +consumerA, +new AssignmentMemberSpec( +Optional.empty(), +Optional.empty(), +Collections.emptyList(), +Collections.emptyMap()) +); + +AssignmentSpec assignmentSpec = new AssignmentSpec(members, topics); +GroupAssignment groupAssignment = assignor.assign(assignmentSpec); + +assertTrue(groupAssignment.members().isEmpty()); +} + +@Test +public void testOneConsumerNonExistentTopic() { +Map topics = Collections.singletonMap(topic1Uuid, new AssignmentTopicMetadata(3)); +Map members = Collections.singletonMap( +consumerA, +new AssignmentMemberSpec( +Optional.empty(), +Optional.empty(), +Collections.singletonList(topic2Uuid), +Collections.emptyMap()) +); + +AssignmentSpec assignmentSpec = new AssignmentSpec(members, topics); +GroupAssignment groupAssignment = assignor.assign(assignmentSpec); + +assertTrue(groupAssignment.members().isEmpty()); +} + +@Test +public void testFirstAssignmentTwoConsumersTwoTopicsSameSubscriptions() { +Map topics = new HashMap<>(); +topics.put(topic1Uuid, new AssignmentTopicMetadata(3)); +topics.put(topic3Uuid, new AssignmentTopicMetadata(2)); + +Map members = new HashMap<>(); +// Initial Subscriptions are: A -> T1, T3 | B -> T1, T3 + +members.put(consumerA, new AssignmentMemberSpec( +Optional.empty(), +Optional.empty(), +Arrays.asList(topic1Uuid, topic3Uuid), +Collections.emptyMap()) +); + +members.put(consumerB, new AssignmentMemberSpec( +Optional.empty(), +Optional.empty(), +Arrays.asList(topic1Uuid, topic3Uuid), +Collections.emptyMap()) +); + +AssignmentSpec assignmentSpec = new AssignmentSpec(members, topics); +GroupAssignment computedAssignment = assignor.assign(assignmentSpec); + +Map>> expectedAssignment = new HashMap<>(); +// Topic 1 Partitions Assignment +mkAssignment(expectedAssignment, topic1Uuid, Arrays.asList(0, 1)); +mkAssignment(expectedAssignment, topic1Uuid, Collections.singleton(2)); +// Topic 3 Partitions Assignment +mkAssignment(expectedAssignment, topic3Uuid, Collections.singleton(0)); +mkAssignment(expectedAssignment, topic3Uuid, Collections.singleton(1)); + +assertAssignment(expectedAssignment, computedAssignment); +} + +@Test +public void testFirstAssignmentThreeConsumersThreeTopicsDifferentSubscriptions() { +Map topics = new HashMap<>(); +
[GitHub] [kafka] dajac commented on a diff in pull request #13443: KAFKA-14514: Add Range Assignor on the Server (KIP-848)
dajac commented on code in PR #13443: URL: https://github.com/apache/kafka/pull/13443#discussion_r1179966618 ## group-coordinator/src/main/java/org/apache/kafka/coordinator/group/assignor/RangeAssignor.java: ## @@ -0,0 +1,258 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.kafka.coordinator.group.assignor; + +import org.apache.kafka.common.Uuid; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.util.ArrayList; +import java.util.Collection; +import java.util.Collections; +import java.util.HashMap; +import java.util.HashSet; +import java.util.List; +import java.util.Map; +import java.util.Set; + +import static java.lang.Math.min; + +/** + * The Server Side Sticky Range Assignor inherits properties of both the range assignor and the sticky assignor. + * The properties are as follows: + * + * Each member must get at least one partition from every topic that it is subscribed to. The only exception is when + * the number of subscribed members is greater than the number of partitions for that topic. (Range) + * Partitions should be assigned to members in a way that facilitates the join operation when required. (Range) + * This can only be done if every member is subscribed to the same topics and the topics are co-partitioned. + * Two streams are co-partitioned if the following conditions are met: + * + * The keys must have the same schemas. + * The topics involved must have the same number of partitions. + * + * Members should retain as much of their previous assignment as possible to reduce the number of partition movements during reassignment. (Sticky) + * + */ +public class RangeAssignor implements PartitionAssignor { +private static final Logger log = LoggerFactory.getLogger(RangeAssignor.class); + +public static final String RANGE_ASSIGNOR_NAME = "range"; Review Comment: Ack. We can keep it as public. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] sagarrao12 commented on pull request #13646: KAFKA-14938: Fixing flaky test testConnectorBoundary
sagarrao12 commented on PR #13646: URL: https://github.com/apache/kafka/pull/13646#issuecomment-1527038626 Thanks @sambhav-jain-16 . This seems to have fixed the test as such going by the build but I am still curious as to why it used to fail expecting 100+ but got 72. How would this fix be any different from that i.e if consume can't get 100 messages within the timeout. Won't it still fail? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org