[GitHub] [kafka] abhijeetk88 opened a new pull request, #13944: [DRAFT PR - WIP] KAFKA-14953: Adding RemoteLogManager metrics
abhijeetk88 opened a new pull request, #13944: URL: https://github.com/apache/kafka/pull/13944 *More detailed description of your change, if necessary. The PR title and PR message become the squashed commit message, so use a separate comment to ping reviewers.* *Summary of testing strategy (including rationale) for the feature or bug fix. Unit and/or integration tests are expected for any behaviour change and system tests should be considered for larger changes.* ### Committer Checklist (excluded from commit message) - [ ] Verify design and implementation - [ ] Verify test coverage and CI build status - [ ] Verify documentation (including upgrade notes) -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] abhijeetk88 commented on pull request #13943: [DRAFT] KAFKA-14953 - Added metrics related to tiered storage.
abhijeetk88 commented on PR #13943: URL: https://github.com/apache/kafka/pull/13943#issuecomment-1615504449 Will raise a new one. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] abhijeetk88 closed pull request #13943: [DRAFT] KAFKA-14953 - Added metrics related to tiered storage.
abhijeetk88 closed pull request #13943: [DRAFT] KAFKA-14953 - Added metrics related to tiered storage. URL: https://github.com/apache/kafka/pull/13943 -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] abhijeetk88 opened a new pull request, #13943: [DRAFT PR] Kafka 14953 - WIP
abhijeetk88 opened a new pull request, #13943: URL: https://github.com/apache/kafka/pull/13943 *More detailed description of your change, if necessary. The PR title and PR message become the squashed commit message, so use a separate comment to ping reviewers.* *Summary of testing strategy (including rationale) for the feature or bug fix. Unit and/or integration tests are expected for any behaviour change and system tests should be considered for larger changes.* ### Committer Checklist (excluded from commit message) - [ ] Verify design and implementation - [ ] Verify test coverage and CI build status - [ ] Verify documentation (including upgrade notes) -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] github-actions[bot] commented on pull request #12844: KAFKA-14353: Allow configuring request timeouts for create/update/validate Kafka Connect REST endpoints
github-actions[bot] commented on PR #12844: URL: https://github.com/apache/kafka/pull/12844#issuecomment-1615427934 This PR is being marked as stale since it has not had any activity in 90 days. If you would like to keep this PR alive, please ask a committer for review. If the PR has merge conflicts, please update it with the latest from trunk (or appropriate release branch) If this PR is no longer valid or desired, please feel free to close it. If no activity occurrs in the next 30 days, it will be automatically closed. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] github-actions[bot] commented on pull request #12849: MINOR: Fix commitId maybe null
github-actions[bot] commented on PR #12849: URL: https://github.com/apache/kafka/pull/12849#issuecomment-1615427881 This PR is being marked as stale since it has not had any activity in 90 days. If you would like to keep this PR alive, please ask a committer for review. If the PR has merge conflicts, please update it with the latest from trunk (or appropriate release branch) If this PR is no longer valid or desired, please feel free to close it. If no activity occurrs in the next 30 days, it will be automatically closed. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] wcarlson5 opened a new pull request, #13942: KAFKA-14936: Check the versioned table's history retention and compare to grace period (4/N)
wcarlson5 opened a new pull request, #13942: URL: https://github.com/apache/kafka/pull/13942 Check the history retention of the ktable of the grace period join. ### Committer Checklist (excluded from commit message) - [ ] Verify design and implementation - [ ] Verify test coverage and CI build status - [ ] Verify documentation (including upgrade notes) -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Updated] (KAFKA-15138) Java kafka-clients compression dependencies should be optional
[ https://issues.apache.org/jira/browse/KAFKA-15138?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Joe DiPol updated KAFKA-15138: -- Description: If you look at [https://repo1.maven.org/maven2/org/apache/kafka/kafka-clients/3.4.0/kafka-clients-3.4.0.pom] You see that the dependencies for the compression libraries (like lz4-java) do NOT have "{{{}true{}}}". That means that these libraries are transitive dependencies which will be pulled (and potentially security scanned) for any project that uses kafka-clients. This is not correct. These compression libraries are optional and should not be transitive dependencies of kafka-clients. Therefore the above pom should state {{optional}} like: {{ org.lz4 lz4-java 1.8.0 runtime true }} was: If you look at [https://repo1.maven.org/maven2/org/apache/kafka/kafka-clients/3.4.0/kafka-clients-3.4.0.pom] You see that the dependencies for the compression libraries (like lz4-java) do NOT have "{{{}true{}}}". That means that these libraries are transitive dependencies which will be pulled (and potentially security scanned) for any project that uses kafka-clients. This is not correct. These compression libraries are optional and should not be transitive dependencies of kafka-clients. Therefore the above pom should state {{optional}} like: org.lz4 lz4-java 1.8.0 runtime true > Java kafka-clients compression dependencies should be optional > -- > > Key: KAFKA-15138 > URL: https://issues.apache.org/jira/browse/KAFKA-15138 > Project: Kafka > Issue Type: Bug > Components: clients >Affects Versions: 3.4.0 >Reporter: Joe DiPol >Priority: Major > > If you look at > [https://repo1.maven.org/maven2/org/apache/kafka/kafka-clients/3.4.0/kafka-clients-3.4.0.pom] > You see that the dependencies for the compression libraries (like lz4-java) > do NOT have "{{{}true{}}}". That means that these > libraries are transitive dependencies which will be pulled (and potentially > security scanned) for any project that uses kafka-clients. > This is not correct. These compression libraries are optional and should not > be transitive dependencies of kafka-clients. Therefore the above pom should > state {{optional}} like: > {{ > > org.lz4 > lz4-java > 1.8.0 > runtime > true > > }} -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Created] (KAFKA-15138) Java kafka-clients compression dependencies should be optional
Joe DiPol created KAFKA-15138: - Summary: Java kafka-clients compression dependencies should be optional Key: KAFKA-15138 URL: https://issues.apache.org/jira/browse/KAFKA-15138 Project: Kafka Issue Type: Bug Components: clients Affects Versions: 3.4.0 Reporter: Joe DiPol If you look at [https://repo1.maven.org/maven2/org/apache/kafka/kafka-clients/3.4.0/kafka-clients-3.4.0.pom] You see that the dependencies for the compression libraries (like lz4-java) do NOT have "{{{}true{}}}". That means that these libraries are transitive dependencies which will be pulled (and potentially security scanned) for any project that uses kafka-clients. This is not correct. These compression libraries are optional and should not be transitive dependencies of kafka-clients. Therefore the above pom should state {{optional}} like: org.lz4 lz4-java 1.8.0 runtime true -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Updated] (KAFKA-14335) Admin.listConsumerGroups should allow filtering, pagination
[ https://issues.apache.org/jira/browse/KAFKA-14335?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Kirk True updated KAFKA-14335: -- Labels: needs-kip (was: ) > Admin.listConsumerGroups should allow filtering, pagination > --- > > Key: KAFKA-14335 > URL: https://issues.apache.org/jira/browse/KAFKA-14335 > Project: Kafka > Issue Type: Improvement > Components: admin, clients, protocol >Affects Versions: 3.3.0 >Reporter: Kirk True >Assignee: Kirk True >Priority: Major > Labels: needs-kip > > The > [Admin|https://kafka.apache.org/33/javadoc/org/apache/kafka/clients/admin/Admin.html] > API provides a means for clients to list the consumer groups in the cluster. > When the list of consumer groups becomes very large, it can cause problems > for the client (e.g., OOM errors) as well as overhead for the broker and > network. > The proposal is to enhance the > [ListConsumerGroupsOptions|https://kafka.apache.org/33/javadoc/org/apache/kafka/clients/admin/ListConsumerGroupsOptions.html)] > class to have optional values such as: > * Consumer group ID regex (evaluated on broker) > * Pagination token (consumer group ID, probably) > This will require a KIP since it is enhancing the admin API, protocol, and > broker. -- This message was sent by Atlassian Jira (v8.20.10#820010)
[GitHub] [kafka] riedelmax opened a new pull request, #13941: KAFKA-15123: Add tests for ChunkedBytesStream
riedelmax opened a new pull request, #13941: URL: https://github.com/apache/kafka/pull/13941 ### Committer Checklist (excluded from commit message) - [ ] Verify design and implementation - [ ] Verify test coverage and CI build status - [ ] Verify documentation (including upgrade notes) -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] jolshan commented on a diff in pull request #13901: KAFKA-14462; [20/N] Refresh subscription metadata on new metadata image
jolshan commented on code in PR #13901: URL: https://github.com/apache/kafka/pull/13901#discussion_r1248282166 ## group-coordinator/src/test/java/org/apache/kafka/coordinator/group/consumer/ConsumerGroupTest.java: ## @@ -568,5 +568,20 @@ public void testMetadataRefreshDeadline() { assertTrue(group.hasMetadataExpired(time.milliseconds())); assertEquals(time.milliseconds() + 1000, group.metadataRefreshDeadline().deadlineMs); assertEquals(group.groupEpoch() + 1, group.metadataRefreshDeadline().epoch); + +// Set the refresh deadline. +group.setMetadataRefreshDeadline(time.milliseconds() + 1000, group.groupEpoch()); Review Comment: Hmm -- reset meaning we should refresh? I guess my point was that if we lower the epoch we may delay the reset. I guess worst case, we just have the wait the refresh interval though. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] dajac commented on a diff in pull request #13901: KAFKA-14462; [20/N] Refresh subscription metadata on new metadata image
dajac commented on code in PR #13901: URL: https://github.com/apache/kafka/pull/13901#discussion_r1248223476 ## group-coordinator/src/test/java/org/apache/kafka/coordinator/group/consumer/ConsumerGroupTest.java: ## @@ -568,5 +568,20 @@ public void testMetadataRefreshDeadline() { assertTrue(group.hasMetadataExpired(time.milliseconds())); assertEquals(time.milliseconds() + 1000, group.metadataRefreshDeadline().deadlineMs); assertEquals(group.groupEpoch() + 1, group.metadataRefreshDeadline().epoch); + +// Set the refresh deadline. +group.setMetadataRefreshDeadline(time.milliseconds() + 1000, group.groupEpoch()); Review Comment: I think so. The epoch value is not so important here. The important part is that it should be reset regardless. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] divijvaidya commented on pull request #13935: MINOR: Fix debug logs to display TimeIndexOffset
divijvaidya commented on PR #13935: URL: https://github.com/apache/kafka/pull/13935#issuecomment-1615046002 Unrelated test failures ``` [Build / JDK 8 and Scala 2.12 / org.apache.kafka.connect.mirror.integration.MirrorConnectorsWithCustomForwardingAdminIntegrationTest.testReplicationWithEmptyPartition()](https://ci-builds.apache.org/job/Kafka/job/kafka-pr/job/PR-13935/2/testReport/junit/org.apache.kafka.connect.mirror.integration/MirrorConnectorsWithCustomForwardingAdminIntegrationTest/Build___JDK_8_and_Scala_2_12___testReplicationWithEmptyPartition__/) [Build / JDK 8 and Scala 2.12 / org.apache.kafka.connect.mirror.integration.MirrorConnectorsWithCustomForwardingAdminIntegrationTest.testSyncTopicConfigs()](https://ci-builds.apache.org/job/Kafka/job/kafka-pr/job/PR-13935/2/testReport/junit/org.apache.kafka.connect.mirror.integration/MirrorConnectorsWithCustomForwardingAdminIntegrationTest/Build___JDK_8_and_Scala_2_12___testSyncTopicConfigs__/) [Build / JDK 8 and Scala 2.12 / kafka.admin.TopicCommandIntegrationTest.testDescribeAtMinIsrPartitions(String).quorum=kraft](https://ci-builds.apache.org/job/Kafka/job/kafka-pr/job/PR-13935/2/testReport/junit/kafka.admin/TopicCommandIntegrationTest/Build___JDK_8_and_Scala_2_12___testDescribeAtMinIsrPartitions_String__quorum_kraft/) [Build / JDK 8 and Scala 2.12 / kafka.api.ConsumerBounceTest.testSubscribeWhenTopicUnavailable()](https://ci-builds.apache.org/job/Kafka/job/kafka-pr/job/PR-13935/2/testReport/junit/kafka.api/ConsumerBounceTest/Build___JDK_8_and_Scala_2_12___testSubscribeWhenTopicUnavailable__/) [Build / JDK 8 and Scala 2.12 / org.apache.kafka.controller.QuorumControllerTest.testBalancePartitionLeaders()](https://ci-builds.apache.org/job/Kafka/job/kafka-pr/job/PR-13935/2/testReport/junit/org.apache.kafka.controller/QuorumControllerTest/Build___JDK_8_and_Scala_2_12___testBalancePartitionLeaders__/) [Build / JDK 11 and Scala 2.13 / kafka.zk.ZkMigrationIntegrationTest.[1] Type=ZK, Name=testDualWrite, MetadataVersion=3.4-IV0, Security=PLAINTEXT](https://ci-builds.apache.org/job/Kafka/job/kafka-pr/job/PR-13935/2/testReport/junit/kafka.zk/ZkMigrationIntegrationTest/Build___JDK_11_and_Scala_2_131__Type_ZK__Name_testDualWrite__MetadataVersion_3_4_IV0__Security_PLAINTEXT/) [Build / JDK 11 and Scala 2.13 / org.apache.kafka.controller.QuorumControllerTest.testBalancePartitionLeaders()](https://ci-builds.apache.org/job/Kafka/job/kafka-pr/job/PR-13935/2/testReport/junit/org.apache.kafka.controller/QuorumControllerTest/Build___JDK_11_and_Scala_2_13___testBalancePartitionLeaders__/) [Build / JDK 11 and Scala 2.13 / org.apache.kafka.trogdor.coordinator.CoordinatorTest.testTaskRequestWithOldStartMsGetsUpdated()](https://ci-builds.apache.org/job/Kafka/job/kafka-pr/job/PR-13935/2/testReport/junit/org.apache.kafka.trogdor.coordinator/CoordinatorTest/Build___JDK_11_and_Scala_2_13___testTaskRequestWithOldStartMsGetsUpdated__/) ``` @showuon requesting review for this. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] divijvaidya commented on a diff in pull request #13284: KAFKA-14718: Fix flaky DedicatedMirrorIntegrationTest
divijvaidya commented on code in PR #13284: URL: https://github.com/apache/kafka/pull/13284#discussion_r1248166033 ## connect/mirror/src/test/java/org/apache/kafka/connect/mirror/integration/DedicatedMirrorIntegrationTest.java: ## @@ -40,16 +47,16 @@ import java.util.concurrent.atomic.AtomicReference; import static org.apache.kafka.clients.consumer.ConsumerConfig.AUTO_OFFSET_RESET_CONFIG; +import static org.apache.kafka.connect.mirror.MirrorMaker.CONNECTOR_CLASSES; import static org.apache.kafka.test.TestUtils.waitForCondition; @Tag("integration") public class DedicatedMirrorIntegrationTest { private static final Logger log = LoggerFactory.getLogger(DedicatedMirrorIntegrationTest.class); - -private static final int TOPIC_CREATION_TIMEOUT_MS = 120_000; Review Comment: Reverted back to original since we have a better logic to ensure correct sequence of startup now. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] divijvaidya commented on a diff in pull request #13284: KAFKA-14718: Fix flaky DedicatedMirrorIntegrationTest
divijvaidya commented on code in PR #13284: URL: https://github.com/apache/kafka/pull/13284#discussion_r1248164637 ## connect/mirror/src/test/java/org/apache/kafka/connect/mirror/integration/DedicatedMirrorIntegrationTest.java: ## @@ -273,6 +288,30 @@ private void writeToTopic(EmbeddedKafkaCluster cluster, String topic, int numMes } } +private void awaitMirrorMakerStart(final SourceAndTarget sourceAndTarget) throws InterruptedException { +waitForCondition(() -> { +try { +return mirrorMakers.values().stream().allMatch( +mm -> CONNECTOR_CLASSES.stream().allMatch( +connectorClazz -> isConnectorRunningForMirrorMaker(connectorClazz, mm, sourceAndTarget))); +} catch (Exception ex) { +log.error("Something unexpected occurred. Unable to check for startup status for mirror maker for {}", sourceAndTarget, ex); +throw ex; +} +}, MM_START_UP_TIMEOUT_MS, "MirrorMaker instances did not transition to running in time"); +} + +private void awaitConnectorTasksStart(final Class clazz, final String source, String target) throws InterruptedException { +waitForCondition(() -> { +try { +return mirrorMakers.values().stream().allMatch(mm -> isTaskRunningForMirrorMakerConnector(clazz, mm, source, target)); Review Comment: Done. Only checking for one node now. ## connect/mirror/src/test/java/org/apache/kafka/connect/mirror/integration/DedicatedMirrorIntegrationTest.java: ## @@ -273,6 +288,30 @@ private void writeToTopic(EmbeddedKafkaCluster cluster, String topic, int numMes } } +private void awaitMirrorMakerStart(final SourceAndTarget sourceAndTarget) throws InterruptedException { +waitForCondition(() -> { +try { +return mirrorMakers.values().stream().allMatch( Review Comment: Done in latest revision. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] divijvaidya commented on a diff in pull request #13284: KAFKA-14718: Fix flaky DedicatedMirrorIntegrationTest
divijvaidya commented on code in PR #13284: URL: https://github.com/apache/kafka/pull/13284#discussion_r1248164007 ## connect/mirror/src/test/java/org/apache/kafka/connect/mirror/integration/DedicatedMirrorIntegrationTest.java: ## @@ -288,4 +327,28 @@ private void awaitTopicContent(EmbeddedKafkaCluster cluster, String clusterName, } } +/** + * Validates that the underlying connector are running for the given MirrorMaker. + */ +private boolean isConnectorRunningForMirrorMaker(final Class connectorClazz, final MirrorMaker mm, final SourceAndTarget sourceAndTarget) { +final String connName = connectorClazz.getSimpleName(); +final ConnectorStateInfo connectorStatus = mm.connectorStatus(sourceAndTarget, connName); +return connectorStatus != null +// verify that connector state is set to running +&& connectorStatus.connector().state().equals(AbstractStatus.State.RUNNING.toString()); Review Comment: Done in latest revision. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] divijvaidya commented on a diff in pull request #13284: KAFKA-14718: Fix flaky DedicatedMirrorIntegrationTest
divijvaidya commented on code in PR #13284: URL: https://github.com/apache/kafka/pull/13284#discussion_r1248163268 ## connect/mirror/src/test/java/org/apache/kafka/connect/mirror/integration/DedicatedMirrorIntegrationTest.java: ## @@ -288,4 +327,28 @@ private void awaitTopicContent(EmbeddedKafkaCluster cluster, String clusterName, } } +/** + * Validates that the underlying connector are running for the given MirrorMaker. + */ +private boolean isConnectorRunningForMirrorMaker(final Class connectorClazz, final MirrorMaker mm, final SourceAndTarget sourceAndTarget) { +final String connName = connectorClazz.getSimpleName(); +final ConnectorStateInfo connectorStatus = mm.connectorStatus(sourceAndTarget, connName); +return connectorStatus != null +// verify that connector state is set to running +&& connectorStatus.connector().state().equals(AbstractStatus.State.RUNNING.toString()); +} + +/** + * Validates that the tasks are associated with the connector and they are running for the given MirrorMaker. + */ +private boolean isTaskRunningForMirrorMakerConnector(final Class connectorClazz, final MirrorMaker mm, final String source, final String target) { +final SourceAndTarget sourceAndTarget = new SourceAndTarget(source, target); +final String connName = connectorClazz.getSimpleName(); +final ConnectorStateInfo connectorStatus = mm.connectorStatus(sourceAndTarget, connName); +return isConnectorRunningForMirrorMaker(connectorClazz, mm, sourceAndTarget) +// verify that at least one task exists +&& !connectorStatus.tasks().isEmpty() +// verify that tasks are set to running +&& connectorStatus.tasks().stream().allMatch(s -> s.state().equals(AbstractStatus.State.RUNNING.toString())); Review Comment: Done in latest revision. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] divijvaidya commented on pull request #13284: KAFKA-14718: Fix flaky DedicatedMirrorIntegrationTest
divijvaidya commented on PR #13284: URL: https://github.com/apache/kafka/pull/13284#issuecomment-1615032737 I had to rebase with trunk to resolve merge conflicts. Changes in latest revision. 1\ Moved `awaitMirrorMakerStart` into the test cases. 2\ Only check for one mirror maker node to start instead of all of them. 3\ Only check for connector tasks in one MM node instead of all of the nodes. 4\ Use NoRetryException to fail fast in case of a task/connector failure. (Thank you for introducing me to this nice test utility!) 5\ Task/Connector failure is already logged at ERROR level. This test passes locally. @C0urante ready for review. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Commented] (KAFKA-15091) Javadocs for SourceTask::commit are incorrect
[ https://issues.apache.org/jira/browse/KAFKA-15091?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17739196#comment-17739196 ] Chris Egerton commented on KAFKA-15091: --- I think that was also discussed with KAFKA-5716. I wouldn't necessarily be opposed to deprecating {{{}SourceTask::commit{}}}, but given that we're several years further along than when that ticket was last discussed, the likelihood of connectors relying on that method have increased. We also currently make use of this method in MirrorMaker 2 (see KAFKA-14610). I think this ticket should focus on updating the docs for this method to be correct for all releases of Kafka Connect that invoke it; if we want to take more drastic action (which, again, I'm not currently opposed to), it probably makes sense to tackle that in a separate ticket. > Javadocs for SourceTask::commit are incorrect > - > > Key: KAFKA-15091 > URL: https://issues.apache.org/jira/browse/KAFKA-15091 > Project: Kafka > Issue Type: Bug > Components: KafkaConnect >Reporter: Chris Egerton >Priority: Major > > The Javadocs for {{SourceTask::commit}} state that the method should: > {quote}Commit the offsets, up to the offsets that have been returned by > [{{poll()}}|https://kafka.apache.org/34/javadoc/org/apache/kafka/connect/source/SourceTask.html#poll()]. > {quote} > However, this is obviously incorrect given how the Connect runtime (when not > configured with exactly-once support for source connectors) performs polling > and offset commits on separate threads. There's also some extensive > discussion on the semantics of that method in KAFKA-5716 where it's made > clear that altering the behavior of the runtime to align with the documented > semantics of that method is not a viable option. > We should update the Javadocs for this method to state that it does not have > anything to do with the offsets returned from {{SourceTask:poll}} and is > instead just a general, periodically-invoked hook to let the task know that > an offset commit has taken place (but with no guarantees as to which offsets > have been committed and which ones correspond to still-in-flight records). -- This message was sent by Atlassian Jira (v8.20.10#820010)
[GitHub] [kafka] jolshan commented on a diff in pull request #13901: KAFKA-14462; [20/N] Refresh subscription metadata on new metadata image
jolshan commented on code in PR #13901: URL: https://github.com/apache/kafka/pull/13901#discussion_r1248148380 ## group-coordinator/src/test/java/org/apache/kafka/coordinator/group/consumer/ConsumerGroupTest.java: ## @@ -568,5 +568,20 @@ public void testMetadataRefreshDeadline() { assertTrue(group.hasMetadataExpired(time.milliseconds())); assertEquals(time.milliseconds() + 1000, group.metadataRefreshDeadline().deadlineMs); assertEquals(group.groupEpoch() + 1, group.metadataRefreshDeadline().epoch); + +// Set the refresh deadline. +group.setMetadataRefreshDeadline(time.milliseconds() + 1000, group.groupEpoch()); Review Comment: Ok -- so this is behavior we want then? I guess I was just having trouble seeing when we would update to the lower epoch. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] dajac commented on a diff in pull request #13901: KAFKA-14462; [20/N] Refresh subscription metadata on new metadata image
dajac commented on code in PR #13901: URL: https://github.com/apache/kafka/pull/13901#discussion_r1248133399 ## group-coordinator/src/test/java/org/apache/kafka/coordinator/group/consumer/ConsumerGroupTest.java: ## @@ -568,5 +568,20 @@ public void testMetadataRefreshDeadline() { assertTrue(group.hasMetadataExpired(time.milliseconds())); assertEquals(time.milliseconds() + 1000, group.metadataRefreshDeadline().deadlineMs); assertEquals(group.groupEpoch() + 1, group.metadataRefreshDeadline().epoch); + +// Set the refresh deadline. +group.setMetadataRefreshDeadline(time.milliseconds() + 1000, group.groupEpoch()); Review Comment: Yeah, you’re right. I actually use the current group epoch here because hasMetadataExpired would be true if epoch + 1. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] jolshan commented on a diff in pull request #13901: KAFKA-14462; [20/N] Refresh subscription metadata on new metadata image
jolshan commented on code in PR #13901: URL: https://github.com/apache/kafka/pull/13901#discussion_r1248102033 ## group-coordinator/src/test/java/org/apache/kafka/coordinator/group/consumer/ConsumerGroupTest.java: ## @@ -568,5 +568,20 @@ public void testMetadataRefreshDeadline() { assertTrue(group.hasMetadataExpired(time.milliseconds())); assertEquals(time.milliseconds() + 1000, group.metadataRefreshDeadline().deadlineMs); assertEquals(group.groupEpoch() + 1, group.metadataRefreshDeadline().epoch); + +// Set the refresh deadline. +group.setMetadataRefreshDeadline(time.milliseconds() + 1000, group.groupEpoch()); Review Comment: I thought in the case the write is lost, we have the higher epoch in the DeadlineAndEpoch and that would signal us to continue to refresh. However, in this case, you are saying we would go back and epoch and decide not to try to refresh anymore. Is that correct? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] jolshan commented on a diff in pull request #13901: KAFKA-14462; [20/N] Refresh subscription metadata on new metadata image
jolshan commented on code in PR #13901: URL: https://github.com/apache/kafka/pull/13901#discussion_r1248100591 ## group-coordinator/src/test/java/org/apache/kafka/coordinator/group/runtime/CoordinatorRuntimeTest.java: ## @@ -825,4 +828,60 @@ public void testClose() throws Exception { assertFutureThrows(write1, NotCoordinatorException.class); assertFutureThrows(write2, NotCoordinatorException.class); } + +@Test +public void testOnNewMetadataImage() { +TopicPartition tp0 = new TopicPartition("__consumer_offsets", 0); +TopicPartition tp1 = new TopicPartition("__consumer_offsets", 1); + +MockCoordinatorLoader loader = mock(MockCoordinatorLoader.class); +MockPartitionWriter writer = mock(MockPartitionWriter.class); +MockCoordinatorBuilderSupplier supplier = mock(MockCoordinatorBuilderSupplier.class); +MockCoordinatorBuilder builder = mock(MockCoordinatorBuilder.class); + +CoordinatorRuntime runtime = +new CoordinatorRuntime.Builder() +.withLoader(loader) +.withEventProcessor(new MockEventProcessor()) +.withPartitionWriter(writer) +.withCoordinatorBuilderSupplier(supplier) +.build(); + +MockCoordinator coordinator0 = mock(MockCoordinator.class); +MockCoordinator coordinator1 = mock(MockCoordinator.class); + +when(supplier.get()).thenReturn(builder); +when(builder.withSnapshotRegistry(any())).thenReturn(builder); +when(builder.withLogContext(any())).thenReturn(builder); +when(builder.build()) +.thenReturn(coordinator0) +.thenReturn(coordinator1); + +CompletableFuture future0 = new CompletableFuture<>(); +when(loader.load(tp0, coordinator0)).thenReturn(future0); + +CompletableFuture future1 = new CompletableFuture<>(); +when(loader.load(tp1, coordinator1)).thenReturn(future1); + +runtime.scheduleLoadOperation(tp0, 0); +runtime.scheduleLoadOperation(tp1, 0); + +// Coordinator 0 is loaded. It should get the current image +// that is the empty one. +future0.complete(null); +verify(coordinator0).onLoaded(MetadataImage.EMPTY); + +// Publish a new image. +MetadataDelta delta = new MetadataDelta(MetadataImage.EMPTY); +MetadataImage newImage = delta.apply(MetadataProvenance.EMPTY); +runtime.onNewMetadataImage(newImage, delta); + +// Coordinator 0 should be notified about it. +verify(coordinator0).onNewMetadataImage(newImage, delta); Review Comment: Sorry I found it after posting. I should have looked a little longer. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] dajac commented on a diff in pull request #13901: KAFKA-14462; [20/N] Refresh subscription metadata on new metadata image
dajac commented on code in PR #13901: URL: https://github.com/apache/kafka/pull/13901#discussion_r1248097012 ## group-coordinator/src/test/java/org/apache/kafka/coordinator/group/GroupMetadataManagerTest.java: ## @@ -1932,6 +1961,435 @@ public void testPartitionAssignorExceptionOnRegularHeartbeat() { .setTopicPartitions(Collections.emptyList(; } +@Test +public void testSubscriptionMetadataRefreshedAfterGroupIsLoaded() { +String groupId = "fooup"; +// Use a static member id as it makes the test easier. +String memberId = Uuid.randomUuid().toString(); + +Uuid fooTopicId = Uuid.randomUuid(); +String fooTopicName = "foo"; + +// Create a context with one consumer group containing one member. +MockPartitionAssignor assignor = new MockPartitionAssignor("range"); +GroupMetadataManagerTestContext context = new GroupMetadataManagerTestContext.Builder() +.withAssignors(Collections.singletonList(assignor)) +.withConsumerGroupMetadataRefreshIntervalMs(5 * 60 * 1000) +.withMetadataImage(new MetadataImageBuilder() +.addTopic(fooTopicId, fooTopicName, 6) +.build()) +.withConsumerGroup(new ConsumerGroupBuilder(groupId, 10) +.withMember(new ConsumerGroupMember.Builder(memberId) +.setMemberEpoch(10) +.setPreviousMemberEpoch(10) +.setTargetMemberEpoch(10) +.setClientId("client") +.setClientHost("localhost/127.0.0.1") +.setRebalanceTimeoutMs(5000) +.setSubscribedTopicNames(Arrays.asList("foo", "bar")) +.setServerAssignorName("range") +.setAssignedPartitions(mkAssignment( +mkTopicAssignment(fooTopicId, 0, 1, 2))) +.build()) +.withAssignment(memberId, mkAssignment( +mkTopicAssignment(fooTopicId, 0, 1, 2))) +.withAssignmentEpoch(10) +.withSubscriptionMetadata(new HashMap() { +{ +// foo only has 3 partitions stored in the metadata but foo has +// 6 partitions the metadata image. +put(fooTopicName, new TopicMetadata(fooTopicId, fooTopicName, 3)); +} +})) +.build(); + +// The metadata refresh flag should be true. +ConsumerGroup consumerGroup = context.groupMetadataManager +.getOrMaybeCreateConsumerGroup(groupId, false); + assertTrue(consumerGroup.hasMetadataExpired(context.time.milliseconds())); + +// Prepare the assignment result. +assignor.prepareGroupAssignment(new GroupAssignment( +Collections.singletonMap(memberId, new MemberAssignment(mkAssignment( +mkTopicAssignment(fooTopicId, 0, 1, 2, 3, 4, 5) +))) +)); + +// Heartbeat. +CoordinatorResult result = context.consumerGroupHeartbeat( Review Comment: gotcha. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] dajac commented on a diff in pull request #13901: KAFKA-14462; [20/N] Refresh subscription metadata on new metadata image
dajac commented on code in PR #13901: URL: https://github.com/apache/kafka/pull/13901#discussion_r1248096864 ## group-coordinator/src/test/java/org/apache/kafka/coordinator/group/consumer/ConsumerGroupTest.java: ## @@ -568,5 +568,20 @@ public void testMetadataRefreshDeadline() { assertTrue(group.hasMetadataExpired(time.milliseconds())); assertEquals(time.milliseconds() + 1000, group.metadataRefreshDeadline().deadlineMs); assertEquals(group.groupEpoch() + 1, group.metadataRefreshDeadline().epoch); + +// Set the refresh deadline. +group.setMetadataRefreshDeadline(time.milliseconds() + 1000, group.groupEpoch()); Review Comment: this could for instance if the write is lost so the new epoch would not be known anymore. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] dajac commented on a diff in pull request #13901: KAFKA-14462; [20/N] Refresh subscription metadata on new metadata image
dajac commented on code in PR #13901: URL: https://github.com/apache/kafka/pull/13901#discussion_r1248095763 ## group-coordinator/src/test/java/org/apache/kafka/coordinator/group/consumer/ConsumerGroupTest.java: ## @@ -564,9 +564,9 @@ public void testMetadataRefreshDeadline() { assertTrue(group.hasMetadataExpired(time.milliseconds())); // Set the refresh time deadline with a higher group epoch. -group.setMetadataRefreshDeadline(time.milliseconds() + 1000, group.groupEpoch() + 1); Review Comment: fixed. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] dajac commented on a diff in pull request #13901: KAFKA-14462; [20/N] Refresh subscription metadata on new metadata image
dajac commented on code in PR #13901: URL: https://github.com/apache/kafka/pull/13901#discussion_r1248095382 ## group-coordinator/src/test/java/org/apache/kafka/coordinator/group/consumer/ConsumerGroupTest.java: ## @@ -564,9 +564,9 @@ public void testMetadataRefreshDeadline() { assertTrue(group.hasMetadataExpired(time.milliseconds())); // Set the refresh time deadline with a higher group epoch. -group.setMetadataRefreshDeadline(time.milliseconds() + 1000, group.groupEpoch() + 1); Review Comment: ah right, i did it wrong here. let me revert. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] jolshan commented on a diff in pull request #13901: KAFKA-14462; [20/N] Refresh subscription metadata on new metadata image
jolshan commented on code in PR #13901: URL: https://github.com/apache/kafka/pull/13901#discussion_r1248094484 ## group-coordinator/src/test/java/org/apache/kafka/coordinator/group/consumer/ConsumerGroupTest.java: ## @@ -564,9 +564,9 @@ public void testMetadataRefreshDeadline() { assertTrue(group.hasMetadataExpired(time.milliseconds())); // Set the refresh time deadline with a higher group epoch. -group.setMetadataRefreshDeadline(time.milliseconds() + 1000, group.groupEpoch() + 1); Review Comment: why did we remove +1000? Wouldn't a deadline set to the current time be expired anyway (regardless of epoch) -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] jolshan commented on a diff in pull request #13901: KAFKA-14462; [20/N] Refresh subscription metadata on new metadata image
jolshan commented on code in PR #13901: URL: https://github.com/apache/kafka/pull/13901#discussion_r1248092632 ## group-coordinator/src/test/java/org/apache/kafka/coordinator/group/consumer/ConsumerGroupTest.java: ## @@ -568,5 +568,20 @@ public void testMetadataRefreshDeadline() { assertTrue(group.hasMetadataExpired(time.milliseconds())); assertEquals(time.milliseconds() + 1000, group.metadataRefreshDeadline().deadlineMs); assertEquals(group.groupEpoch() + 1, group.metadataRefreshDeadline().epoch); + +// Set the refresh deadline. +group.setMetadataRefreshDeadline(time.milliseconds() + 1000, group.groupEpoch()); Review Comment: when we set the refresh deadline, there is no check on the group epoch. Maybe it's fine that we went backwards an epoch, but I'm not sure which scenario this would be. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] lucasbru commented on pull request #13876: KAFKA-10733: Clean up producer exceptions
lucasbru commented on PR #13876: URL: https://github.com/apache/kafka/pull/13876#issuecomment-1614928567 @jolshan This makes sense. It seems to me that I need to go through the whole KIP and have an end-to-end solution before addressing these kinds of consistency problems. At least for the fencing exceptions, I'm leaning towards leaving them unwrapped now (and all other fatal exceptions). But I'm not super confident, because it seems Guozhang explicitly decided against that solution. Now, to make some progress here I would say I remove the wrapping changes to `ProducerFencedException` and `InvalidProducerEpochException` and we merge the other, less intrusive changes? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] dajac commented on a diff in pull request #13901: KAFKA-14462; [20/N] Refresh subscription metadata on new metadata image
dajac commented on code in PR #13901: URL: https://github.com/apache/kafka/pull/13901#discussion_r1248086301 ## group-coordinator/src/test/java/org/apache/kafka/coordinator/group/runtime/CoordinatorRuntimeTest.java: ## @@ -825,4 +828,60 @@ public void testClose() throws Exception { assertFutureThrows(write1, NotCoordinatorException.class); assertFutureThrows(write2, NotCoordinatorException.class); } + +@Test +public void testOnNewMetadataImage() { +TopicPartition tp0 = new TopicPartition("__consumer_offsets", 0); +TopicPartition tp1 = new TopicPartition("__consumer_offsets", 1); + +MockCoordinatorLoader loader = mock(MockCoordinatorLoader.class); +MockPartitionWriter writer = mock(MockPartitionWriter.class); +MockCoordinatorBuilderSupplier supplier = mock(MockCoordinatorBuilderSupplier.class); +MockCoordinatorBuilder builder = mock(MockCoordinatorBuilder.class); + +CoordinatorRuntime runtime = +new CoordinatorRuntime.Builder() +.withLoader(loader) +.withEventProcessor(new MockEventProcessor()) +.withPartitionWriter(writer) +.withCoordinatorBuilderSupplier(supplier) +.build(); + +MockCoordinator coordinator0 = mock(MockCoordinator.class); +MockCoordinator coordinator1 = mock(MockCoordinator.class); + +when(supplier.get()).thenReturn(builder); +when(builder.withSnapshotRegistry(any())).thenReturn(builder); +when(builder.withLogContext(any())).thenReturn(builder); +when(builder.build()) +.thenReturn(coordinator0) +.thenReturn(coordinator1); + +CompletableFuture future0 = new CompletableFuture<>(); +when(loader.load(tp0, coordinator0)).thenReturn(future0); + +CompletableFuture future1 = new CompletableFuture<>(); +when(loader.load(tp1, coordinator1)).thenReturn(future1); + +runtime.scheduleLoadOperation(tp0, 0); +runtime.scheduleLoadOperation(tp1, 0); + +// Coordinator 0 is loaded. It should get the current image +// that is the empty one. +future0.complete(null); +verify(coordinator0).onLoaded(MetadataImage.EMPTY); + +// Publish a new image. +MetadataDelta delta = new MetadataDelta(MetadataImage.EMPTY); +MetadataImage newImage = delta.apply(MetadataProvenance.EMPTY); +runtime.onNewMetadataImage(newImage, delta); + +// Coordinator 0 should be notified about it. +verify(coordinator0).onNewMetadataImage(newImage, delta); Review Comment: This part is handled in the `GroupMetadataManager#onNewMetadataImage` and it is tested separately. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] jolshan commented on a diff in pull request #13901: KAFKA-14462; [20/N] Refresh subscription metadata on new metadata image
jolshan commented on code in PR #13901: URL: https://github.com/apache/kafka/pull/13901#discussion_r1248092025 ## group-coordinator/src/test/java/org/apache/kafka/coordinator/group/GroupMetadataManagerTest.java: ## @@ -1932,6 +1961,435 @@ public void testPartitionAssignorExceptionOnRegularHeartbeat() { .setTopicPartitions(Collections.emptyList(; } +@Test +public void testSubscriptionMetadataRefreshedAfterGroupIsLoaded() { +String groupId = "fooup"; +// Use a static member id as it makes the test easier. +String memberId = Uuid.randomUuid().toString(); + +Uuid fooTopicId = Uuid.randomUuid(); +String fooTopicName = "foo"; + +// Create a context with one consumer group containing one member. +MockPartitionAssignor assignor = new MockPartitionAssignor("range"); +GroupMetadataManagerTestContext context = new GroupMetadataManagerTestContext.Builder() +.withAssignors(Collections.singletonList(assignor)) +.withConsumerGroupMetadataRefreshIntervalMs(5 * 60 * 1000) +.withMetadataImage(new MetadataImageBuilder() +.addTopic(fooTopicId, fooTopicName, 6) +.build()) +.withConsumerGroup(new ConsumerGroupBuilder(groupId, 10) +.withMember(new ConsumerGroupMember.Builder(memberId) +.setMemberEpoch(10) +.setPreviousMemberEpoch(10) +.setTargetMemberEpoch(10) +.setClientId("client") +.setClientHost("localhost/127.0.0.1") +.setRebalanceTimeoutMs(5000) +.setSubscribedTopicNames(Arrays.asList("foo", "bar")) +.setServerAssignorName("range") +.setAssignedPartitions(mkAssignment( +mkTopicAssignment(fooTopicId, 0, 1, 2))) +.build()) +.withAssignment(memberId, mkAssignment( +mkTopicAssignment(fooTopicId, 0, 1, 2))) +.withAssignmentEpoch(10) +.withSubscriptionMetadata(new HashMap() { +{ +// foo only has 3 partitions stored in the metadata but foo has +// 6 partitions the metadata image. +put(fooTopicName, new TopicMetadata(fooTopicId, fooTopicName, 3)); +} +})) +.build(); + +// The metadata refresh flag should be true. +ConsumerGroup consumerGroup = context.groupMetadataManager +.getOrMaybeCreateConsumerGroup(groupId, false); + assertTrue(consumerGroup.hasMetadataExpired(context.time.milliseconds())); + +// Prepare the assignment result. +assignor.prepareGroupAssignment(new GroupAssignment( +Collections.singletonMap(memberId, new MemberAssignment(mkAssignment( +mkTopicAssignment(fooTopicId, 0, 1, 2, 3, 4, 5) +))) +)); + +// Heartbeat. +CoordinatorResult result = context.consumerGroupHeartbeat( Review Comment: Sorry. I was referring how when I asked about the heartbeat in the other test, I hadn't seen this one. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] dajac commented on a diff in pull request #13901: KAFKA-14462; [20/N] Refresh subscription metadata on new metadata image
dajac commented on code in PR #13901: URL: https://github.com/apache/kafka/pull/13901#discussion_r1248091492 ## group-coordinator/src/test/java/org/apache/kafka/coordinator/group/consumer/ConsumerGroupTest.java: ## @@ -568,5 +568,20 @@ public void testMetadataRefreshDeadline() { assertTrue(group.hasMetadataExpired(time.milliseconds())); assertEquals(time.milliseconds() + 1000, group.metadataRefreshDeadline().deadlineMs); assertEquals(group.groupEpoch() + 1, group.metadataRefreshDeadline().epoch); + +// Set the refresh deadline. +group.setMetadataRefreshDeadline(time.milliseconds() + 1000, group.groupEpoch()); Review Comment: btw, i just fixed the previous test case at L566. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] dajac commented on a diff in pull request #13901: KAFKA-14462; [20/N] Refresh subscription metadata on new metadata image
dajac commented on code in PR #13901: URL: https://github.com/apache/kafka/pull/13901#discussion_r1248088555 ## group-coordinator/src/test/java/org/apache/kafka/coordinator/group/runtime/CoordinatorRuntimeTest.java: ## @@ -825,4 +828,60 @@ public void testClose() throws Exception { assertFutureThrows(write1, NotCoordinatorException.class); assertFutureThrows(write2, NotCoordinatorException.class); } + +@Test +public void testOnNewMetadataImage() { +TopicPartition tp0 = new TopicPartition("__consumer_offsets", 0); +TopicPartition tp1 = new TopicPartition("__consumer_offsets", 1); + +MockCoordinatorLoader loader = mock(MockCoordinatorLoader.class); +MockPartitionWriter writer = mock(MockPartitionWriter.class); +MockCoordinatorBuilderSupplier supplier = mock(MockCoordinatorBuilderSupplier.class); +MockCoordinatorBuilder builder = mock(MockCoordinatorBuilder.class); + +CoordinatorRuntime runtime = +new CoordinatorRuntime.Builder() +.withLoader(loader) +.withEventProcessor(new MockEventProcessor()) +.withPartitionWriter(writer) +.withCoordinatorBuilderSupplier(supplier) +.build(); + +MockCoordinator coordinator0 = mock(MockCoordinator.class); +MockCoordinator coordinator1 = mock(MockCoordinator.class); + +when(supplier.get()).thenReturn(builder); +when(builder.withSnapshotRegistry(any())).thenReturn(builder); +when(builder.withLogContext(any())).thenReturn(builder); +when(builder.build()) +.thenReturn(coordinator0) +.thenReturn(coordinator1); + +CompletableFuture future0 = new CompletableFuture<>(); +when(loader.load(tp0, coordinator0)).thenReturn(future0); + +CompletableFuture future1 = new CompletableFuture<>(); +when(loader.load(tp1, coordinator1)).thenReturn(future1); + +runtime.scheduleLoadOperation(tp0, 0); +runtime.scheduleLoadOperation(tp1, 0); + +// Coordinator 0 is loaded. It should get the current image +// that is the empty one. +future0.complete(null); +verify(coordinator0).onLoaded(MetadataImage.EMPTY); + +// Publish a new image. +MetadataDelta delta = new MetadataDelta(MetadataImage.EMPTY); +MetadataImage newImage = delta.apply(MetadataProvenance.EMPTY); +runtime.onNewMetadataImage(newImage, delta); + +// Coordinator 0 should be notified about it. +verify(coordinator0).onNewMetadataImage(newImage, delta); Review Comment: https://github.com/apache/kafka/pull/13901/files/40fcd86ff81782e84d2d2835ac106b83fdfb32a9#diff-00f0f81cf13e66781777d94f7d2e68a581663385c37e98792507f2294c91bb09R1025 -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] dajac commented on a diff in pull request #13901: KAFKA-14462; [20/N] Refresh subscription metadata on new metadata image
dajac commented on code in PR #13901: URL: https://github.com/apache/kafka/pull/13901#discussion_r1248087985 ## group-coordinator/src/test/java/org/apache/kafka/coordinator/group/GroupMetadataManagerTest.java: ## @@ -1932,6 +1961,435 @@ public void testPartitionAssignorExceptionOnRegularHeartbeat() { .setTopicPartitions(Collections.emptyList(; } +@Test +public void testSubscriptionMetadataRefreshedAfterGroupIsLoaded() { +String groupId = "fooup"; +// Use a static member id as it makes the test easier. +String memberId = Uuid.randomUuid().toString(); + +Uuid fooTopicId = Uuid.randomUuid(); +String fooTopicName = "foo"; + +// Create a context with one consumer group containing one member. +MockPartitionAssignor assignor = new MockPartitionAssignor("range"); +GroupMetadataManagerTestContext context = new GroupMetadataManagerTestContext.Builder() +.withAssignors(Collections.singletonList(assignor)) +.withConsumerGroupMetadataRefreshIntervalMs(5 * 60 * 1000) +.withMetadataImage(new MetadataImageBuilder() +.addTopic(fooTopicId, fooTopicName, 6) +.build()) +.withConsumerGroup(new ConsumerGroupBuilder(groupId, 10) +.withMember(new ConsumerGroupMember.Builder(memberId) +.setMemberEpoch(10) +.setPreviousMemberEpoch(10) +.setTargetMemberEpoch(10) +.setClientId("client") +.setClientHost("localhost/127.0.0.1") +.setRebalanceTimeoutMs(5000) +.setSubscribedTopicNames(Arrays.asList("foo", "bar")) +.setServerAssignorName("range") +.setAssignedPartitions(mkAssignment( +mkTopicAssignment(fooTopicId, 0, 1, 2))) +.build()) +.withAssignment(memberId, mkAssignment( +mkTopicAssignment(fooTopicId, 0, 1, 2))) +.withAssignmentEpoch(10) +.withSubscriptionMetadata(new HashMap() { +{ +// foo only has 3 partitions stored in the metadata but foo has +// 6 partitions the metadata image. +put(fooTopicName, new TopicMetadata(fooTopicId, fooTopicName, 3)); +} +})) +.build(); + +// The metadata refresh flag should be true. +ConsumerGroup consumerGroup = context.groupMetadataManager +.getOrMaybeCreateConsumerGroup(groupId, false); + assertTrue(consumerGroup.hasMetadataExpired(context.time.milliseconds())); + +// Prepare the assignment result. +assignor.prepareGroupAssignment(new GroupAssignment( +Collections.singletonMap(memberId, new MemberAssignment(mkAssignment( +mkTopicAssignment(fooTopicId, 0, 1, 2, 3, 4, 5) +))) +)); + +// Heartbeat. +CoordinatorResult result = context.consumerGroupHeartbeat( Review Comment: Correct. I don't get what you mean though? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] dajac commented on a diff in pull request #13901: KAFKA-14462; [20/N] Refresh subscription metadata on new metadata image
dajac commented on code in PR #13901: URL: https://github.com/apache/kafka/pull/13901#discussion_r1248087394 ## group-coordinator/src/test/java/org/apache/kafka/coordinator/group/GroupMetadataManagerTest.java: ## @@ -1932,6 +1961,435 @@ public void testPartitionAssignorExceptionOnRegularHeartbeat() { .setTopicPartitions(Collections.emptyList(; } +@Test +public void testSubscriptionMetadataRefreshedAfterGroupIsLoaded() { +String groupId = "fooup"; +// Use a static member id as it makes the test easier. +String memberId = Uuid.randomUuid().toString(); + +Uuid fooTopicId = Uuid.randomUuid(); +String fooTopicName = "foo"; + +// Create a context with one consumer group containing one member. +MockPartitionAssignor assignor = new MockPartitionAssignor("range"); +GroupMetadataManagerTestContext context = new GroupMetadataManagerTestContext.Builder() +.withAssignors(Collections.singletonList(assignor)) +.withConsumerGroupMetadataRefreshIntervalMs(5 * 60 * 1000) +.withMetadataImage(new MetadataImageBuilder() +.addTopic(fooTopicId, fooTopicName, 6) +.build()) +.withConsumerGroup(new ConsumerGroupBuilder(groupId, 10) +.withMember(new ConsumerGroupMember.Builder(memberId) +.setMemberEpoch(10) +.setPreviousMemberEpoch(10) +.setTargetMemberEpoch(10) +.setClientId("client") +.setClientHost("localhost/127.0.0.1") +.setRebalanceTimeoutMs(5000) +.setSubscribedTopicNames(Arrays.asList("foo", "bar")) +.setServerAssignorName("range") +.setAssignedPartitions(mkAssignment( +mkTopicAssignment(fooTopicId, 0, 1, 2))) +.build()) +.withAssignment(memberId, mkAssignment( +mkTopicAssignment(fooTopicId, 0, 1, 2))) +.withAssignmentEpoch(10) +.withSubscriptionMetadata(new HashMap() { +{ +// foo only has 3 partitions stored in the metadata but foo has +// 6 partitions the metadata image. +put(fooTopicName, new TopicMetadata(fooTopicId, fooTopicName, 3)); +} +})) +.build(); + +// The metadata refresh flag should be true. +ConsumerGroup consumerGroup = context.groupMetadataManager +.getOrMaybeCreateConsumerGroup(groupId, false); + assertTrue(consumerGroup.hasMetadataExpired(context.time.milliseconds())); + +// Prepare the assignment result. +assignor.prepareGroupAssignment(new GroupAssignment( +Collections.singletonMap(memberId, new MemberAssignment(mkAssignment( +mkTopicAssignment(fooTopicId, 0, 1, 2, 3, 4, 5) +))) +)); + +// Heartbeat. +CoordinatorResult result = context.consumerGroupHeartbeat( +new ConsumerGroupHeartbeatRequestData() +.setGroupId(groupId) +.setMemberId(memberId) +.setMemberEpoch(10)); + +// The member gets partitions 3, 4 and 5 assigned. +assertResponseEquals( +new ConsumerGroupHeartbeatResponseData() +.setMemberId(memberId) +.setMemberEpoch(11) +.setHeartbeatIntervalMs(5000) +.setAssignment(new ConsumerGroupHeartbeatResponseData.Assignment() +.setAssignedTopicPartitions(Arrays.asList( +new ConsumerGroupHeartbeatResponseData.TopicPartitions() +.setTopicId(fooTopicId) +.setPartitions(Arrays.asList(0, 1, 2, 3, 4, 5)) +))), +result.response() +); + +ConsumerGroupMember expectedMember = new ConsumerGroupMember.Builder(memberId) +.setMemberEpoch(11) +.setPreviousMemberEpoch(10) +.setTargetMemberEpoch(11) +.setClientId("client") +.setClientHost("localhost/127.0.0.1") +.setSubscribedTopicNames(Arrays.asList("foo", "bar")) +.setServerAssignorName("range") +.setAssignedPartitions(mkAssignment( +mkTopicAssignment(fooTopicId, 0, 1, 2, 3, 4, 5))) +.build(); + +List expectedRecords = Arrays.asList( +RecordHelpers.newGroupSubscriptionMetadataRecord(groupId, new HashMap() { +{ +put(fooTopicName, new TopicMetadata(fooTopicId, fooTopicName, 6)); +} +}), +RecordHelpers.newGroupEpochRecord(groupId, 11), +RecordHelpers.newTargetAssignmentRecord(groupId,
[GitHub] [kafka] dajac commented on a diff in pull request #13901: KAFKA-14462; [20/N] Refresh subscription metadata on new metadata image
dajac commented on code in PR #13901: URL: https://github.com/apache/kafka/pull/13901#discussion_r1248086803 ## group-coordinator/src/test/java/org/apache/kafka/coordinator/group/consumer/ConsumerGroupTest.java: ## @@ -568,5 +568,20 @@ public void testMetadataRefreshDeadline() { assertTrue(group.hasMetadataExpired(time.milliseconds())); assertEquals(time.milliseconds() + 1000, group.metadataRefreshDeadline().deadlineMs); assertEquals(group.groupEpoch() + 1, group.metadataRefreshDeadline().epoch); + +// Set the refresh deadline. +group.setMetadataRefreshDeadline(time.milliseconds() + 1000, group.groupEpoch()); Review Comment: i don't understand your comment. could you elaborate? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] dajac commented on a diff in pull request #13901: KAFKA-14462; [20/N] Refresh subscription metadata on new metadata image
dajac commented on code in PR #13901: URL: https://github.com/apache/kafka/pull/13901#discussion_r1248086301 ## group-coordinator/src/test/java/org/apache/kafka/coordinator/group/runtime/CoordinatorRuntimeTest.java: ## @@ -825,4 +828,60 @@ public void testClose() throws Exception { assertFutureThrows(write1, NotCoordinatorException.class); assertFutureThrows(write2, NotCoordinatorException.class); } + +@Test +public void testOnNewMetadataImage() { +TopicPartition tp0 = new TopicPartition("__consumer_offsets", 0); +TopicPartition tp1 = new TopicPartition("__consumer_offsets", 1); + +MockCoordinatorLoader loader = mock(MockCoordinatorLoader.class); +MockPartitionWriter writer = mock(MockPartitionWriter.class); +MockCoordinatorBuilderSupplier supplier = mock(MockCoordinatorBuilderSupplier.class); +MockCoordinatorBuilder builder = mock(MockCoordinatorBuilder.class); + +CoordinatorRuntime runtime = +new CoordinatorRuntime.Builder() +.withLoader(loader) +.withEventProcessor(new MockEventProcessor()) +.withPartitionWriter(writer) +.withCoordinatorBuilderSupplier(supplier) +.build(); + +MockCoordinator coordinator0 = mock(MockCoordinator.class); +MockCoordinator coordinator1 = mock(MockCoordinator.class); + +when(supplier.get()).thenReturn(builder); +when(builder.withSnapshotRegistry(any())).thenReturn(builder); +when(builder.withLogContext(any())).thenReturn(builder); +when(builder.build()) +.thenReturn(coordinator0) +.thenReturn(coordinator1); + +CompletableFuture future0 = new CompletableFuture<>(); +when(loader.load(tp0, coordinator0)).thenReturn(future0); + +CompletableFuture future1 = new CompletableFuture<>(); +when(loader.load(tp1, coordinator1)).thenReturn(future1); + +runtime.scheduleLoadOperation(tp0, 0); +runtime.scheduleLoadOperation(tp1, 0); + +// Coordinator 0 is loaded. It should get the current image +// that is the empty one. +future0.complete(null); +verify(coordinator0).onLoaded(MetadataImage.EMPTY); + +// Publish a new image. +MetadataDelta delta = new MetadataDelta(MetadataImage.EMPTY); +MetadataImage newImage = delta.apply(MetadataProvenance.EMPTY); +runtime.onNewMetadataImage(newImage, delta); + +// Coordinator 0 should be notified about it. +verify(coordinator0).onNewMetadataImage(newImage, delta); Review Comment: This part is handled in the `GroupMetadataManager#onNewMetadataImage`. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] jolshan commented on a diff in pull request #13901: KAFKA-14462; [20/N] Refresh subscription metadata on new metadata image
jolshan commented on code in PR #13901: URL: https://github.com/apache/kafka/pull/13901#discussion_r1248086692 ## group-coordinator/src/test/java/org/apache/kafka/coordinator/group/GroupMetadataManagerTest.java: ## @@ -1932,6 +1961,435 @@ public void testPartitionAssignorExceptionOnRegularHeartbeat() { .setTopicPartitions(Collections.emptyList(; } +@Test +public void testSubscriptionMetadataRefreshedAfterGroupIsLoaded() { +String groupId = "fooup"; +// Use a static member id as it makes the test easier. +String memberId = Uuid.randomUuid().toString(); + +Uuid fooTopicId = Uuid.randomUuid(); +String fooTopicName = "foo"; + +// Create a context with one consumer group containing one member. +MockPartitionAssignor assignor = new MockPartitionAssignor("range"); +GroupMetadataManagerTestContext context = new GroupMetadataManagerTestContext.Builder() +.withAssignors(Collections.singletonList(assignor)) +.withConsumerGroupMetadataRefreshIntervalMs(5 * 60 * 1000) +.withMetadataImage(new MetadataImageBuilder() +.addTopic(fooTopicId, fooTopicName, 6) +.build()) +.withConsumerGroup(new ConsumerGroupBuilder(groupId, 10) +.withMember(new ConsumerGroupMember.Builder(memberId) +.setMemberEpoch(10) +.setPreviousMemberEpoch(10) +.setTargetMemberEpoch(10) +.setClientId("client") +.setClientHost("localhost/127.0.0.1") +.setRebalanceTimeoutMs(5000) +.setSubscribedTopicNames(Arrays.asList("foo", "bar")) +.setServerAssignorName("range") +.setAssignedPartitions(mkAssignment( +mkTopicAssignment(fooTopicId, 0, 1, 2))) +.build()) +.withAssignment(memberId, mkAssignment( +mkTopicAssignment(fooTopicId, 0, 1, 2))) +.withAssignmentEpoch(10) +.withSubscriptionMetadata(new HashMap() { +{ +// foo only has 3 partitions stored in the metadata but foo has +// 6 partitions the metadata image. +put(fooTopicName, new TopicMetadata(fooTopicId, fooTopicName, 3)); +} +})) +.build(); + +// The metadata refresh flag should be true. +ConsumerGroup consumerGroup = context.groupMetadataManager +.getOrMaybeCreateConsumerGroup(groupId, false); + assertTrue(consumerGroup.hasMetadataExpired(context.time.milliseconds())); + +// Prepare the assignment result. +assignor.prepareGroupAssignment(new GroupAssignment( +Collections.singletonMap(memberId, new MemberAssignment(mkAssignment( +mkTopicAssignment(fooTopicId, 0, 1, 2, 3, 4, 5) +))) +)); + +// Heartbeat. +CoordinatorResult result = context.consumerGroupHeartbeat( +new ConsumerGroupHeartbeatRequestData() +.setGroupId(groupId) +.setMemberId(memberId) +.setMemberEpoch(10)); + +// The member gets partitions 3, 4 and 5 assigned. +assertResponseEquals( +new ConsumerGroupHeartbeatResponseData() +.setMemberId(memberId) +.setMemberEpoch(11) +.setHeartbeatIntervalMs(5000) +.setAssignment(new ConsumerGroupHeartbeatResponseData.Assignment() +.setAssignedTopicPartitions(Arrays.asList( +new ConsumerGroupHeartbeatResponseData.TopicPartitions() +.setTopicId(fooTopicId) +.setPartitions(Arrays.asList(0, 1, 2, 3, 4, 5)) +))), +result.response() +); + +ConsumerGroupMember expectedMember = new ConsumerGroupMember.Builder(memberId) +.setMemberEpoch(11) +.setPreviousMemberEpoch(10) +.setTargetMemberEpoch(11) +.setClientId("client") +.setClientHost("localhost/127.0.0.1") +.setSubscribedTopicNames(Arrays.asList("foo", "bar")) +.setServerAssignorName("range") +.setAssignedPartitions(mkAssignment( +mkTopicAssignment(fooTopicId, 0, 1, 2, 3, 4, 5))) +.build(); + +List expectedRecords = Arrays.asList( +RecordHelpers.newGroupSubscriptionMetadataRecord(groupId, new HashMap() { +{ +put(fooTopicName, new TopicMetadata(fooTopicId, fooTopicName, 6)); +} +}), +RecordHelpers.newGroupEpochRecord(groupId, 11), +
[GitHub] [kafka] C0urante merged pull request #13939: [MINOR] Correcting few WARN log lines in DistributedHerder#handleRebalance
C0urante merged PR #13939: URL: https://github.com/apache/kafka/pull/13939 -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] jolshan commented on a diff in pull request #13901: KAFKA-14462; [20/N] Refresh subscription metadata on new metadata image
jolshan commented on code in PR #13901: URL: https://github.com/apache/kafka/pull/13901#discussion_r1248071005 ## group-coordinator/src/test/java/org/apache/kafka/coordinator/group/GroupMetadataManagerTest.java: ## @@ -1932,6 +1961,435 @@ public void testPartitionAssignorExceptionOnRegularHeartbeat() { .setTopicPartitions(Collections.emptyList(; } +@Test +public void testSubscriptionMetadataRefreshedAfterGroupIsLoaded() { +String groupId = "fooup"; +// Use a static member id as it makes the test easier. +String memberId = Uuid.randomUuid().toString(); + +Uuid fooTopicId = Uuid.randomUuid(); +String fooTopicName = "foo"; + +// Create a context with one consumer group containing one member. +MockPartitionAssignor assignor = new MockPartitionAssignor("range"); +GroupMetadataManagerTestContext context = new GroupMetadataManagerTestContext.Builder() +.withAssignors(Collections.singletonList(assignor)) +.withConsumerGroupMetadataRefreshIntervalMs(5 * 60 * 1000) +.withMetadataImage(new MetadataImageBuilder() +.addTopic(fooTopicId, fooTopicName, 6) +.build()) +.withConsumerGroup(new ConsumerGroupBuilder(groupId, 10) +.withMember(new ConsumerGroupMember.Builder(memberId) +.setMemberEpoch(10) +.setPreviousMemberEpoch(10) +.setTargetMemberEpoch(10) +.setClientId("client") +.setClientHost("localhost/127.0.0.1") +.setRebalanceTimeoutMs(5000) +.setSubscribedTopicNames(Arrays.asList("foo", "bar")) +.setServerAssignorName("range") +.setAssignedPartitions(mkAssignment( +mkTopicAssignment(fooTopicId, 0, 1, 2))) +.build()) +.withAssignment(memberId, mkAssignment( +mkTopicAssignment(fooTopicId, 0, 1, 2))) +.withAssignmentEpoch(10) +.withSubscriptionMetadata(new HashMap() { +{ +// foo only has 3 partitions stored in the metadata but foo has +// 6 partitions the metadata image. +put(fooTopicName, new TopicMetadata(fooTopicId, fooTopicName, 3)); +} +})) +.build(); + +// The metadata refresh flag should be true. +ConsumerGroup consumerGroup = context.groupMetadataManager +.getOrMaybeCreateConsumerGroup(groupId, false); + assertTrue(consumerGroup.hasMetadataExpired(context.time.milliseconds())); + +// Prepare the assignment result. +assignor.prepareGroupAssignment(new GroupAssignment( +Collections.singletonMap(memberId, new MemberAssignment(mkAssignment( +mkTopicAssignment(fooTopicId, 0, 1, 2, 3, 4, 5) +))) +)); + +// Heartbeat. +CoordinatorResult result = context.consumerGroupHeartbeat( Review Comment: I see we test the heartbeat logic here now. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] divijvaidya commented on a diff in pull request #13837: KAFKA-9564: Local Tiered Storage implementation for Remote Storage Manager
divijvaidya commented on code in PR #13837: URL: https://github.com/apache/kafka/pull/13837#discussion_r1247912475 ## storage/src/test/java/org/apache/kafka/server/log/remote/storage/RemoteTopicPartitionDirectory.java: ## @@ -0,0 +1,169 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.kafka.server.log.remote.storage; + +import org.apache.kafka.common.TopicIdPartition; +import org.apache.kafka.common.TopicPartition; +import org.apache.kafka.common.Uuid; +import org.apache.kafka.common.internals.Topic; +import org.slf4j.Logger; + +import java.io.File; +import java.util.Arrays; +import java.util.List; +import java.util.Set; +import java.util.regex.Matcher; +import java.util.regex.Pattern; +import java.util.stream.Collectors; + +import static java.lang.String.format; +import static java.util.Arrays.asList; +import static java.util.Objects.requireNonNull; +import static java.util.regex.Pattern.compile; +import static java.util.stream.Collectors.toSet; +import static org.apache.kafka.server.log.remote.storage.RemoteLogSegmentFileset.RemoteLogSegmentFileType.getUuid; +import static org.apache.kafka.server.log.remote.storage.RemoteLogSegmentFileset.deleteFilesOnly; +import static org.apache.kafka.server.log.remote.storage.RemoteLogSegmentFileset.deleteQuietly; +import static org.slf4j.LoggerFactory.getLogger; + +/** + * Represents a topic-partition directory in the local tiered storage under which filesets for + * log segments are stored. + * + * + * + * / storage-directory / uuidBase64-0-topic / tvHCaSDsQZWsjr5rbtCjxA-segment + * . . tvHCaSDsQZWsjr5rbtCjxA-offset_index + * . . tvHCaSDsQZWsjr5rbtCjxA-time_index + * . + * / 5fEBmixCR5-dMntYSLIr1g-3-topic / BFyXlC8ySMm-Uzxw5lZSMg-segment + * . BFyXlC8ySMm-Uzxw5lZSMg-offset_index + * . BFyXlC8ySMm-Uzxw5lZSMg-time_index + * + */ +public final class RemoteTopicPartitionDirectory { +private static final Logger LOGGER = getLogger(RemoteTopicPartitionDirectory.class); +private static final String UUID_LEGAL_CHARS = "[a-zA-Z0-9_-]{22}"; Review Comment: `_` (underscore) is not a valid UUID char as per https://www.ietf.org/rfc/rfc4122.txt ## storage/src/test/java/org/apache/kafka/server/log/remote/storage/LocalTieredStorage.java: ## @@ -0,0 +1,564 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.kafka.server.log.remote.storage; + +import org.apache.kafka.common.TopicIdPartition; +import org.apache.kafka.common.Uuid; +import org.apache.kafka.common.errors.InvalidConfigurationException; +import org.apache.kafka.server.log.remote.storage.LocalTieredStorageListener.LocalTieredStorageListeners; +import org.apache.kafka.common.utils.LogContext; +import org.apache.kafka.test.TestUtils; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.io.ByteArrayInputStream; +import java.io.File; +import java.io.FileNotFoundException; +import java.io.FileOutputStream; +import java.io.IOException; +import java.io.InputStream; +import java.nio.ByteBuffer; +import java.nio.channels.FileChannel; +import java.nio.file.Files; +import java.nio.file.NoSuchFileException; +import java.util.Arrays; +import java.util.Map; +import java.util.Objects;
[GitHub] [kafka] vamossagar12 commented on pull request #13158: KAFKA-14647: Moving TopicFilter to server-common/utils
vamossagar12 commented on PR #13158: URL: https://github.com/apache/kafka/pull/13158#issuecomment-1614890801 Thanks @fvaleri . Hmm I see 101 test failures. 92 existing and 9 new. Atleast the new ones look unrelated.. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] jolshan commented on a diff in pull request #13901: KAFKA-14462; [20/N] Refresh subscription metadata on new metadata image
jolshan commented on code in PR #13901: URL: https://github.com/apache/kafka/pull/13901#discussion_r1248060783 ## group-coordinator/src/test/java/org/apache/kafka/coordinator/group/consumer/ConsumerGroupTest.java: ## @@ -568,5 +568,20 @@ public void testMetadataRefreshDeadline() { assertTrue(group.hasMetadataExpired(time.milliseconds())); assertEquals(time.milliseconds() + 1000, group.metadataRefreshDeadline().deadlineMs); assertEquals(group.groupEpoch() + 1, group.metadataRefreshDeadline().epoch); + +// Set the refresh deadline. +group.setMetadataRefreshDeadline(time.milliseconds() + 1000, group.groupEpoch()); Review Comment: we can just go back an epoch? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] vamossagar12 commented on a diff in pull request #13939: [MINOR] Correcting few WARN log lines in DistributedHerder#handleRebalance
vamossagar12 commented on code in PR #13939: URL: https://github.com/apache/kafka/pull/13939#discussion_r1248058129 ## connect/runtime/src/main/java/org/apache/kafka/connect/runtime/distributed/DistributedHerder.java: ## @@ -1657,13 +1657,13 @@ private boolean handleRebalanceCompleted() { if (assignment.failed()) { needsRejoin = true; if (isLeader()) { -log.warn("Join group completed, but assignment failed and we are the leader. Reading to end of config and retrying."); +log.warn("Join group completed, but the assignment failed and we are the leader. Reading to end of config and retrying."); needsReadToEnd = true; } else if (configState.offset() < assignment.offset()) { -log.warn("Join group completed, but assignment failed and we lagging. Reading to end of config and retrying."); +log.warn("Join group completed, but the assignment failed and we are lagging. Reading to end of config and retrying."); Review Comment: Removed the unwanted changes. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] jolshan commented on a diff in pull request #13901: KAFKA-14462; [20/N] Refresh subscription metadata on new metadata image
jolshan commented on code in PR #13901: URL: https://github.com/apache/kafka/pull/13901#discussion_r1248058020 ## group-coordinator/src/test/java/org/apache/kafka/coordinator/group/runtime/CoordinatorRuntimeTest.java: ## @@ -825,4 +828,60 @@ public void testClose() throws Exception { assertFutureThrows(write1, NotCoordinatorException.class); assertFutureThrows(write2, NotCoordinatorException.class); } + +@Test +public void testOnNewMetadataImage() { +TopicPartition tp0 = new TopicPartition("__consumer_offsets", 0); +TopicPartition tp1 = new TopicPartition("__consumer_offsets", 1); + +MockCoordinatorLoader loader = mock(MockCoordinatorLoader.class); +MockPartitionWriter writer = mock(MockPartitionWriter.class); +MockCoordinatorBuilderSupplier supplier = mock(MockCoordinatorBuilderSupplier.class); +MockCoordinatorBuilder builder = mock(MockCoordinatorBuilder.class); + +CoordinatorRuntime runtime = +new CoordinatorRuntime.Builder() +.withLoader(loader) +.withEventProcessor(new MockEventProcessor()) +.withPartitionWriter(writer) +.withCoordinatorBuilderSupplier(supplier) +.build(); + +MockCoordinator coordinator0 = mock(MockCoordinator.class); +MockCoordinator coordinator1 = mock(MockCoordinator.class); + +when(supplier.get()).thenReturn(builder); +when(builder.withSnapshotRegistry(any())).thenReturn(builder); +when(builder.withLogContext(any())).thenReturn(builder); +when(builder.build()) +.thenReturn(coordinator0) +.thenReturn(coordinator1); + +CompletableFuture future0 = new CompletableFuture<>(); +when(loader.load(tp0, coordinator0)).thenReturn(future0); + +CompletableFuture future1 = new CompletableFuture<>(); +when(loader.load(tp1, coordinator1)).thenReturn(future1); + +runtime.scheduleLoadOperation(tp0, 0); +runtime.scheduleLoadOperation(tp1, 0); + +// Coordinator 0 is loaded. It should get the current image +// that is the empty one. +future0.complete(null); +verify(coordinator0).onLoaded(MetadataImage.EMPTY); + +// Publish a new image. +MetadataDelta delta = new MetadataDelta(MetadataImage.EMPTY); +MetadataImage newImage = delta.apply(MetadataProvenance.EMPTY); +runtime.onNewMetadataImage(newImage, delta); + +// Coordinator 0 should be notified about it. +verify(coordinator0).onNewMetadataImage(newImage, delta); Review Comment: Yeah. I think I understand that the metadata image is updated, but I wasn't sure if we had anything ensuring that the new metadata image will also trigger the refresh of the subscription metadata. (Apologies if this was just in a previous pr) -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] fvaleri commented on pull request #13158: KAFKA-14647: Moving TopicFilter to server-common/utils
fvaleri commented on PR #13158: URL: https://github.com/apache/kafka/pull/13158#issuecomment-1614872160 @vamossagar12 LGTM. Thanks. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Commented] (KAFKA-15102) Mirror Maker 2 - KIP690 backward compatibility
[ https://issues.apache.org/jira/browse/KAFKA-15102?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17739146#comment-17739146 ] Chris Egerton commented on KAFKA-15102: --- [~omnia_h_ibrahim] Good call we should definitely update the compatibility section in the KIP to mention this. We may also want list the affected versions and link to this issue for further context. > Mirror Maker 2 - KIP690 backward compatibility > -- > > Key: KAFKA-15102 > URL: https://issues.apache.org/jira/browse/KAFKA-15102 > Project: Kafka > Issue Type: Bug > Components: mirrormaker >Affects Versions: 3.1.0 >Reporter: David Dufour >Priority: Major > > According to KIP690, "When users upgrade an existing MM2 cluster they don’t > need to change any of their current configuration as this proposal maintains > the default behaviour for MM2." > Now, the separator is subject to customization. > As a consequence, when an MM2 upgrade is performed, if the separator was > customized with replication.policy.separator, the name of this internal topic > changes. It then generates issues like: > Caused by: java.util.concurrent.ExecutionException: > org.apache.kafka.common.errors.InvalidTopicException: Topic > 'mm2-offset-syncs_bkts28_internal' collides with existing topics: > mm2-offset-syncs.bkts28.internal > It has been observed that the replication can then be broken sometimes > several days after the upgrade (reason not identified). By deleting the old > topic name, it recovers. -- This message was sent by Atlassian Jira (v8.20.10#820010)
[GitHub] [kafka] C0urante commented on a diff in pull request #13913: KAFKA-15119:Support incremental syncTopicAcls in MirrorSourceConnector
C0urante commented on code in PR #13913: URL: https://github.com/apache/kafka/pull/13913#discussion_r1248004857 ## connect/mirror/src/main/java/org/apache/kafka/connect/mirror/MirrorSourceConnector.java: ## @@ -582,12 +583,20 @@ void incrementalAlterConfigs(Map topicConfigs) { } private void updateTopicAcls(List bindings) { -log.trace("Syncing {} topic ACL bindings.", bindings.size()); -targetAdminClient.createAcls(bindings).values().forEach((k, v) -> v.whenComplete((x, e) -> { -if (e != null) { -log.warn("Could not sync ACL of topic {}.", k.pattern().name(), e); -} -})); +Set addBindings = new HashSet<>(); +addBindings.addAll(bindings); Review Comment: Nit: can simplify ```suggestion Set addBindings = new HashSet<>(bindings); ``` ## connect/mirror/src/main/java/org/apache/kafka/connect/mirror/MirrorSourceConnector.java: ## @@ -582,12 +583,20 @@ void incrementalAlterConfigs(Map topicConfigs) { } private void updateTopicAcls(List bindings) { -log.trace("Syncing {} topic ACL bindings.", bindings.size()); -targetAdminClient.createAcls(bindings).values().forEach((k, v) -> v.whenComplete((x, e) -> { -if (e != null) { -log.warn("Could not sync ACL of topic {}.", k.pattern().name(), e); -} -})); +Set addBindings = new HashSet<>(); +addBindings.addAll(bindings); +addBindings.removeAll(knownTopicAclBindings); +if (!addBindings.isEmpty()) { +log.info("Syncing new found {} topic ACL bindings.", addBindings.size()); +targetAdminClient.createAcls(addBindings).values().forEach((k, v) -> v.whenComplete((x, e) -> { +if (e != null) { +log.warn("Could not sync ACL of topic {}.", k.pattern().name(), e); +} +})); +knownTopicAclBindings = bindings; Review Comment: ```suggestion knownTopicAclBindings = new HashSet<>(bindings); ``` ## connect/mirror/src/main/java/org/apache/kafka/connect/mirror/MirrorSourceConnector.java: ## @@ -582,12 +583,20 @@ void incrementalAlterConfigs(Map topicConfigs) { } private void updateTopicAcls(List bindings) { -log.trace("Syncing {} topic ACL bindings.", bindings.size()); -targetAdminClient.createAcls(bindings).values().forEach((k, v) -> v.whenComplete((x, e) -> { -if (e != null) { -log.warn("Could not sync ACL of topic {}.", k.pattern().name(), e); -} -})); +Set addBindings = new HashSet<>(); +addBindings.addAll(bindings); +addBindings.removeAll(knownTopicAclBindings); Review Comment: My IDE nagged me about possible slow performance for invoking `Set::removeAll` with a `List` as an argument. Some research led to [this fascinating blog post](https://codeblog.jonskeet.uk/2010/07/29/there-s-a-hole-in-my-abstraction-dear-liza-dear-liza/). I think the scenario described there isn't likely to impact us frequently, but just in case, do you think we can change `knownTopicAclBindings` from a `List` to a `Set`? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Created] (KAFKA-15137) Don't log the entire request in KRaftControllerChannelManager
David Arthur created KAFKA-15137: Summary: Don't log the entire request in KRaftControllerChannelManager Key: KAFKA-15137 URL: https://issues.apache.org/jira/browse/KAFKA-15137 Project: Kafka Issue Type: Bug Affects Versions: 3.5.0, 3.6.0 Reporter: David Arthur Assignee: Alyssa Huang Fix For: 3.5.1 While debugging some junit tests, I noticed some really long log lines in KRaftControllerChannelManager. When the broker is down, we log a WARN that includes the entire UpdateMetadataRequest or LeaderAndIsrRequest. For large clusters, these can be really large requests, so this could potentially cause excessive output in the log4j logs. -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Updated] (KAFKA-13988) Mirrormaker 2 auto.offset.reset=latest not working
[ https://issues.apache.org/jira/browse/KAFKA-13988?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Chris Egerton updated KAFKA-13988: -- Component/s: (was: KafkaConnect) > Mirrormaker 2 auto.offset.reset=latest not working > -- > > Key: KAFKA-13988 > URL: https://issues.apache.org/jira/browse/KAFKA-13988 > Project: Kafka > Issue Type: Bug > Components: mirrormaker >Affects Versions: 3.2.0 > Environment: Source Kafka cluster running on Ubuntu 20 > Source Kafka cluster Kafka v0.10 > Target Kafka cluster running in AWS MSK > Target Kafka cluster Kafka v2.6.2 > Mirrormaker version 3.2.0 running on Ubuntu 20. >Reporter: Daniel Florek >Assignee: Ravindranath Kakarla >Priority: Major > > Hi. > I have problem setting up mirroring with MM2 from latest offset between 2 > clusters. In logs I can see that Consumer that is consuming topics has > auto.offset.reset property set to latest. But still topics are read from > offset 0. I am using following configuration: > > {code:java} > clusters = A, B > A.bootstrap.servers = broker-01A:9092 > B.bootstrap.servers = broker-01B:9092,broker-02B:9092,broker-03B:9092 > replication.policy.class = > org.apache.kafka.connect.mirror.IdentityReplicationPolicy > #Enable replication between clusters and define topics which should be > replicated > A->B.enabled = true > A->B.topics = .* > A->B.replication.factor=3 > A->B.emit.heartbeats.enabled = true > A->B.emit.checkpoints.enabled = true > auto.offset.reset=latest > consumer.auto.offset.reset=latest > A.consumer.auto.offset.reset=latest > B.consumer.auto.offset.reset=latest > refresh.topics.enabled=true > heartbeats.topic.replication.factor=1 > checkpoints.topic.replication.factor=1 > offset-syncs.topic.replication.factor=1 > config.storage.replication.factor = 1 > offset.storage.replication.factor = 1 > status.storage.replication.factor = 1 {code} > I am using Kafka 3.2.0 for Mirrormaker 2. Source kafka cluster is 1 broker > running on EC2 instance in AWS (quite an old version I think 0.10). Target > kafka cluster contains 3 brokers running in AWS MSK (version 2.6.2). > Could you point me what I am doing wrong? Or is this possibly a bug? > -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Updated] (KAFKA-13988) Mirrormaker 2 auto.offset.reset=latest not working
[ https://issues.apache.org/jira/browse/KAFKA-13988?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Chris Egerton updated KAFKA-13988: -- Fix Version/s: (was: 3.2.0) > Mirrormaker 2 auto.offset.reset=latest not working > -- > > Key: KAFKA-13988 > URL: https://issues.apache.org/jira/browse/KAFKA-13988 > Project: Kafka > Issue Type: Bug > Components: KafkaConnect, mirrormaker >Affects Versions: 3.2.0 > Environment: Source Kafka cluster running on Ubuntu 20 > Source Kafka cluster Kafka v0.10 > Target Kafka cluster running in AWS MSK > Target Kafka cluster Kafka v2.6.2 > Mirrormaker version 3.2.0 running on Ubuntu 20. >Reporter: Daniel Florek >Assignee: Ravindranath Kakarla >Priority: Major > > Hi. > I have problem setting up mirroring with MM2 from latest offset between 2 > clusters. In logs I can see that Consumer that is consuming topics has > auto.offset.reset property set to latest. But still topics are read from > offset 0. I am using following configuration: > > {code:java} > clusters = A, B > A.bootstrap.servers = broker-01A:9092 > B.bootstrap.servers = broker-01B:9092,broker-02B:9092,broker-03B:9092 > replication.policy.class = > org.apache.kafka.connect.mirror.IdentityReplicationPolicy > #Enable replication between clusters and define topics which should be > replicated > A->B.enabled = true > A->B.topics = .* > A->B.replication.factor=3 > A->B.emit.heartbeats.enabled = true > A->B.emit.checkpoints.enabled = true > auto.offset.reset=latest > consumer.auto.offset.reset=latest > A.consumer.auto.offset.reset=latest > B.consumer.auto.offset.reset=latest > refresh.topics.enabled=true > heartbeats.topic.replication.factor=1 > checkpoints.topic.replication.factor=1 > offset-syncs.topic.replication.factor=1 > config.storage.replication.factor = 1 > offset.storage.replication.factor = 1 > status.storage.replication.factor = 1 {code} > I am using Kafka 3.2.0 for Mirrormaker 2. Source kafka cluster is 1 broker > running on EC2 instance in AWS (quite an old version I think 0.10). Target > kafka cluster contains 3 brokers running in AWS MSK (version 2.6.2). > Could you point me what I am doing wrong? Or is this possibly a bug? > -- This message was sent by Atlassian Jira (v8.20.10#820010)
[GitHub] [kafka] C0urante commented on a diff in pull request #13905: KAFKA-13988: Fix MM2 not consuming from latest when "auto.offset.reset=latest" is set
C0urante commented on code in PR #13905: URL: https://github.com/apache/kafka/pull/13905#discussion_r1247963748 ## connect/mirror/src/main/java/org/apache/kafka/connect/mirror/MirrorSourceTask.java: ## @@ -53,6 +53,7 @@ public class MirrorSourceTask extends SourceTask { private static final Logger log = LoggerFactory.getLogger(MirrorSourceTask.class); private static final int MAX_OUTSTANDING_OFFSET_SYNCS = 10; +public static final long NON_EXISTING_OFFSET_VALUE = -1L; Review Comment: Perhaps instead of a single sentinel value to denote uncommitted offsets, we can reject all values less than zero and replace this constant with a method? Could also help with readability with some of the Java 8 streams logic. ```java private boolean isUncommitted(Long offset) { return offset == null || offset < 0; } ``` -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] C0urante commented on a diff in pull request #13905: KAFKA-13988: Fix MM2 not consuming from latest when "auto.offset.reset=latest" is set
C0urante commented on code in PR #13905: URL: https://github.com/apache/kafka/pull/13905#discussion_r1247968250 ## connect/mirror/src/test/java/org/apache/kafka/connect/mirror/MirrorSourceTaskTest.java: ## @@ -31,25 +31,36 @@ import org.apache.kafka.connect.mirror.MirrorSourceTask.PartitionState; import org.apache.kafka.connect.source.SourceRecord; +import org.apache.kafka.connect.source.SourceTaskContext; +import org.apache.kafka.connect.storage.OffsetStorageReader; import org.junit.jupiter.api.Test; import org.mockito.ArgumentCaptor; +import org.mockito.MockedStatic; +import org.mockito.internal.stubbing.answers.CallsRealMethods; +import org.mockito.internal.util.collections.Sets; Review Comment: Nit: probably better to avoid depending on internal packages if we can avoid it. Left some suggestions on how to do this without much additional work below ## connect/mirror/src/main/java/org/apache/kafka/connect/mirror/MirrorSourceTask.java: ## @@ -106,13 +107,28 @@ public void start(Map props) { Map topicPartitionOffsets = loadOffsets(taskTopicPartitions); consumer.assign(topicPartitionOffsets.keySet()); log.info("Starting with {} previously uncommitted partitions.", topicPartitionOffsets.entrySet().stream() -.filter(x -> x.getValue() == 0L).count()); -log.trace("Seeking offsets: {}", topicPartitionOffsets); -topicPartitionOffsets.forEach(consumer::seek); +.filter(x -> x.getValue() == NON_EXISTING_OFFSET_VALUE).count()); Review Comment: Nit: we can clean this up if we use the `isUncommitted` method ```suggestion log.info("Starting with {} previously uncommitted partitions.", topicPartitionOffsets.values().stream() .filter(this::isUncommitted).count()); ``` ## connect/mirror/src/test/java/org/apache/kafka/connect/mirror/MirrorSourceTaskTest.java: ## @@ -214,6 +225,82 @@ public void testPoll() { } } +@Test +public void testSeekBehaviorDuringStart() { +// Setting up mock behavior. +@SuppressWarnings("unchecked") +KafkaConsumer mockConsumer = mock(KafkaConsumer.class); + +@SuppressWarnings("unchecked") +KafkaProducer mockProducer = mock(KafkaProducer.class); + +String sourceClusterName = "sourceCluster"; +MirrorSourceMetrics mockMetrics = mock(MirrorSourceMetrics.class); + +SourceTaskContext mockSourceTaskContext = mock(SourceTaskContext.class); +OffsetStorageReader mockOffsetStorageReader = mock(OffsetStorageReader.class); + when(mockSourceTaskContext.offsetStorageReader()).thenReturn(mockOffsetStorageReader); + +MockedStatic mockMirrorUtils = mockStatic(MirrorUtils.class, new CallsRealMethods()); +mockMirrorUtils.when(() -> MirrorUtils.newConsumer(anyMap())).thenReturn(mockConsumer); +mockMirrorUtils.when(() -> MirrorUtils.newProducer(anyMap())).thenReturn(mockProducer); + +Set topicPartitions = Sets.newSet( Review Comment: ```suggestion Set topicPartitions = new HashSet<>(Arrays.asList( ``` (will also require a couple new imports and an additional trailing parenthesis) ## connect/mirror/src/main/java/org/apache/kafka/connect/mirror/MirrorSourceTask.java: ## @@ -106,13 +107,28 @@ public void start(Map props) { Map topicPartitionOffsets = loadOffsets(taskTopicPartitions); consumer.assign(topicPartitionOffsets.keySet()); log.info("Starting with {} previously uncommitted partitions.", topicPartitionOffsets.entrySet().stream() -.filter(x -> x.getValue() == 0L).count()); -log.trace("Seeking offsets: {}", topicPartitionOffsets); -topicPartitionOffsets.forEach(consumer::seek); +.filter(x -> x.getValue() == NON_EXISTING_OFFSET_VALUE).count()); + +log.trace("Seeking offsets: {}", topicPartitionOffsets.entrySet().stream() +.filter(topicPartitionOffset -> +topicPartitionOffset.getValue() != NON_EXISTING_OFFSET_VALUE)); + +topicPartitionOffsets.forEach(this::maybeSeek); log.info("{} replicating {} topic-partitions {}->{}: {}.", Thread.currentThread().getName(), taskTopicPartitions.size(), sourceClusterAlias, config.targetClusterAlias(), taskTopicPartitions); } +private void maybeSeek(TopicPartition topicPartition, Long offset) { +// Do not call seek on partitions that don't have an existing offset committed. +if (offset == NON_EXISTING_OFFSET_VALUE) { Review Comment: If we use `isUncommitted`: ```suggestion if (isUncommitted(offset)) { ``` ## connect/mirror/src/main/java/org/apache/kafka/connect/mirror/MirrorSourceTask.java: ## @@ -106,13 +107,28 @@ public void start(Map props) { Map topicPartitionOffsets =
[GitHub] [kafka] hudeqi commented on pull request #13926: KAFKA-15129;[2/N] Remove metrics in GroupMetadataManager when shutdown
hudeqi commented on PR #13926: URL: https://github.com/apache/kafka/pull/13926#issuecomment-1614799513 And this please @divijvaidya -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] hudeqi commented on pull request #13929: KAFKA-15129;[3/N] Remove metrics in AbstractFetcherManager when fetcher manager instance shutdown
hudeqi commented on PR #13929: URL: https://github.com/apache/kafka/pull/13929#issuecomment-1614790587 And please help to review this @divijvaidya -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] hudeqi commented on pull request #13926: KAFKA-15129;[2/N] Remove metrics in GroupMetadataManager when shutdown
hudeqi commented on PR #13926: URL: https://github.com/apache/kafka/pull/13926#issuecomment-1614782255 Combined with the explanation of https://github.com/apache/kafka/pull/3506#discussion_r128589927 and the results of my actual test, I think this is the case: All unit tests in `GroupMetadataManagerTest` involve many times of "new GroupMetadataManager" behavior , since "calls newGauge" is a global registry, it will affect the subsequent metric verification (`testMetrics()` method) in multiple unit tests. I remember that I replaced "recreateGauge" directly before mentioning this PR. The unit test failed. Now because I added "removeMetrics()" to "groupMetadataManager.shutdown()"(also in@AfterEach), it will be the same as "recreateGauge" effect. @clolov @dajac -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] dajac commented on pull request #13926: KAFKA-15129;[2/N] Remove metrics in GroupMetadataManager when shutdown
dajac commented on PR #13926: URL: https://github.com/apache/kafka/pull/13926#issuecomment-1614765471 Yeah, that seems to be a general issue with Yammer based metrics. It is not about individual unit tests. It is about integration tests that create multiple KafkaServers. In this case, the metric registry is shared by all instances: ``` public final Gauge newGauge(String name, Gauge metric, Map tags) { return KafkaYammerMetrics.defaultRegistry().newGauge(metricName(name, tags), metric); } ``` That being said, it seem that `newGauge` here uses `getOrAdd` internally in the registry so it may be fine in the recent version of Yammer. It may be worth checking if this is always true (may depend on the version). -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] clolov commented on pull request #13926: KAFKA-15129;[2/N] Remove metrics in GroupMetadataManager when shutdown
clolov commented on PR #13926: URL: https://github.com/apache/kafka/pull/13926#issuecomment-1614755545 Hudeqi mentioned this comment as well, but I still do not understand how this could happen - is it that we keep the same metric registry between individual unit tests? If this is the case, don't we have this problem for other groups of metrics as well? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] hudeqi commented on a diff in pull request #13924: KAFKA-15129;[1/N] Remove metrics in LogCleanerManager when LogCleaner shutdown
hudeqi commented on code in PR #13924: URL: https://github.com/apache/kafka/pull/13924#discussion_r1247947969 ## core/src/test/scala/unit/kafka/log/LogCleanerTest.scala: ## @@ -83,11 +84,19 @@ class LogCleanerTest { val numMetricsRegistered = LogCleaner.MetricNames.size verify(mockMetricsGroup, times(numMetricsRegistered)).newGauge(anyString(), any()) - // verify that each metric is removed + // verify that each metric in `LogCleaner` is removed LogCleaner.MetricNames.foreach(verify(mockMetricsGroup).removeMetric(_)) + // verify that each metric in `LogCleanerManager` is removed + val mockLogCleanerManagerMetricsGroup = mockMetricsGroupCtor.constructed.get(1) + LogCleanerManager.GaugeMetricNameNoTag.foreach(metricName => verify(mockLogCleanerManagerMetricsGroup).newGauge(ArgumentMatchers.eq(metricName), any())) + LogCleanerManager.GaugeMetricNameWithTag.keySet().asScala.foreach(metricName => verify(mockLogCleanerManagerMetricsGroup).newGauge(ArgumentMatchers.eq(metricName), any(), any())) Review Comment: I have updated this. @divijvaidya -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] hudeqi commented on a diff in pull request #13924: KAFKA-15129;[1/N] Remove metrics in LogCleanerManager when LogCleaner shutdown
hudeqi commented on code in PR #13924: URL: https://github.com/apache/kafka/pull/13924#discussion_r1247947464 ## core/src/main/scala/kafka/log/LogCleanerManager.scala: ## @@ -88,17 +88,21 @@ private[log] class LogCleanerManager(val logDirs: Seq[File], /* for coordinating the pausing and the cleaning of a partition */ private val pausedCleaningCond = lock.newCondition() + // Avoid adding legacy tags for a metric when initializing `LogCleanerManager` + GaugeMetricNameWithTag.clear() /* gauges for tracking the number of partitions marked as uncleanable for each log directory */ for (dir <- logDirs) { -metricsGroup.newGauge("uncleanable-partitions-count", +metricsGroup.newGauge(UncleanablePartitionsCountMetricName, () => inLock(lock) { uncleanablePartitions.get(dir.getAbsolutePath).map(_.size).getOrElse(0) }, Map("logDirectory" -> dir.getAbsolutePath).asJava ) + GaugeMetricNameWithTag.computeIfAbsent(UncleanablePartitionsCountMetricName, k => new java.util.ArrayList[java.util.Map[String, String]]()) + .add(Map("logDirectory" -> dir.getAbsolutePath).asJava) Review Comment: committed the update. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] divijvaidya commented on a diff in pull request #13924: KAFKA-15129;[1/N] Remove metrics in LogCleanerManager when LogCleaner shutdown
divijvaidya commented on code in PR #13924: URL: https://github.com/apache/kafka/pull/13924#discussion_r1247921648 ## core/src/test/scala/unit/kafka/log/LogCleanerTest.scala: ## @@ -83,11 +84,19 @@ class LogCleanerTest { val numMetricsRegistered = LogCleaner.MetricNames.size verify(mockMetricsGroup, times(numMetricsRegistered)).newGauge(anyString(), any()) - // verify that each metric is removed + // verify that each metric in `LogCleaner` is removed LogCleaner.MetricNames.foreach(verify(mockMetricsGroup).removeMetric(_)) + // verify that each metric in `LogCleanerManager` is removed + val mockLogCleanerManagerMetricsGroup = mockMetricsGroupCtor.constructed.get(1) + LogCleanerManager.GaugeMetricNameNoTag.foreach(metricName => verify(mockLogCleanerManagerMetricsGroup).newGauge(ArgumentMatchers.eq(metricName), any())) + LogCleanerManager.GaugeMetricNameWithTag.keySet().asScala.foreach(metricName => verify(mockLogCleanerManagerMetricsGroup).newGauge(ArgumentMatchers.eq(metricName), any(), any())) Review Comment: ah ok. Can we please unit test this will multi dirs since the metric addition/removal here is dependent on number of directories and we want to ensure it works correctly for multiple dir case as well. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] C0urante commented on a diff in pull request #13939: [MINOR] Correcting few WARN log lines in DistributedHerder#handleRebalance
C0urante commented on code in PR #13939: URL: https://github.com/apache/kafka/pull/13939#discussion_r1247916426 ## connect/runtime/src/main/java/org/apache/kafka/connect/runtime/distributed/DistributedHerder.java: ## @@ -1657,13 +1657,13 @@ private boolean handleRebalanceCompleted() { if (assignment.failed()) { needsRejoin = true; if (isLeader()) { -log.warn("Join group completed, but assignment failed and we are the leader. Reading to end of config and retrying."); +log.warn("Join group completed, but the assignment failed and we are the leader. Reading to end of config and retrying."); needsReadToEnd = true; } else if (configState.offset() < assignment.offset()) { -log.warn("Join group completed, but assignment failed and we lagging. Reading to end of config and retrying."); +log.warn("Join group completed, but the assignment failed and we are lagging. Reading to end of config and retrying."); Review Comment: I think the only change we need here is "we are lagging"; "assignment" is fine without a preceeding "the". ## connect/runtime/src/main/java/org/apache/kafka/connect/runtime/distributed/DistributedHerder.java: ## @@ -1657,13 +1657,13 @@ private boolean handleRebalanceCompleted() { if (assignment.failed()) { needsRejoin = true; if (isLeader()) { -log.warn("Join group completed, but assignment failed and we are the leader. Reading to end of config and retrying."); +log.warn("Join group completed, but the assignment failed and we are the leader. Reading to end of config and retrying."); needsReadToEnd = true; } else if (configState.offset() < assignment.offset()) { -log.warn("Join group completed, but assignment failed and we lagging. Reading to end of config and retrying."); +log.warn("Join group completed, but the assignment failed and we are lagging. Reading to end of config and retrying."); Review Comment: I think the only change we need here is "we are lagging"; "assignment" is fine without a preceding "the". -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] hudeqi commented on a diff in pull request #13924: KAFKA-15129;[1/N] Remove metrics in LogCleanerManager when LogCleaner shutdown
hudeqi commented on code in PR #13924: URL: https://github.com/apache/kafka/pull/13924#discussion_r1247914999 ## core/src/test/scala/unit/kafka/log/LogCleanerTest.scala: ## @@ -83,11 +84,19 @@ class LogCleanerTest { val numMetricsRegistered = LogCleaner.MetricNames.size verify(mockMetricsGroup, times(numMetricsRegistered)).newGauge(anyString(), any()) - // verify that each metric is removed + // verify that each metric in `LogCleaner` is removed LogCleaner.MetricNames.foreach(verify(mockMetricsGroup).removeMetric(_)) + // verify that each metric in `LogCleanerManager` is removed + val mockLogCleanerManagerMetricsGroup = mockMetricsGroupCtor.constructed.get(1) + LogCleanerManager.GaugeMetricNameNoTag.foreach(metricName => verify(mockLogCleanerManagerMetricsGroup).newGauge(ArgumentMatchers.eq(metricName), any())) + LogCleanerManager.GaugeMetricNameWithTag.keySet().asScala.foreach(metricName => verify(mockLogCleanerManagerMetricsGroup).newGauge(ArgumentMatchers.eq(metricName), any(), any())) Review Comment: > GaugeMetricNameWithTag can contain multiple metrics with same name but different tags. Let's say `UncleanablePartitionsCountMetricName ` is the name and tag will be directory path. Over here, we are verifying that there is an exactly 1 call to newGauge() with `UncleanablePartitionsCountMetricName` as the metricname. But that is not correct because we are actually calling `newGauge` for this key name numDirs times. > > Hence, this test should ideally fail. What am I missing here? I think the reason is: "logDirs = Array(TestUtils.tempDir())", means that the `numDirs` size is 1. @divijvaidya -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] divijvaidya commented on a diff in pull request #13924: KAFKA-15129;[1/N] Remove metrics in LogCleanerManager when LogCleaner shutdown
divijvaidya commented on code in PR #13924: URL: https://github.com/apache/kafka/pull/13924#discussion_r1247907443 ## core/src/test/scala/unit/kafka/log/LogCleanerTest.scala: ## @@ -83,11 +84,19 @@ class LogCleanerTest { val numMetricsRegistered = LogCleaner.MetricNames.size verify(mockMetricsGroup, times(numMetricsRegistered)).newGauge(anyString(), any()) - // verify that each metric is removed + // verify that each metric in `LogCleaner` is removed LogCleaner.MetricNames.foreach(verify(mockMetricsGroup).removeMetric(_)) + // verify that each metric in `LogCleanerManager` is removed + val mockLogCleanerManagerMetricsGroup = mockMetricsGroupCtor.constructed.get(1) + LogCleanerManager.GaugeMetricNameNoTag.foreach(metricName => verify(mockLogCleanerManagerMetricsGroup).newGauge(ArgumentMatchers.eq(metricName), any())) + LogCleanerManager.GaugeMetricNameWithTag.keySet().asScala.foreach(metricName => verify(mockLogCleanerManagerMetricsGroup).newGauge(ArgumentMatchers.eq(metricName), any(), any())) Review Comment: GaugeMetricNameWithTag can contain multiple metrics with same name but different tags. Let's say `UncleanablePartitionsCountMetricName ` is the name and tag will be directory path. Over here, we are verifying that there is an exactly 1 call to newGauge() with `UncleanablePartitionsCountMetricName` as the metricname. But that is not correct because we are actually calling `newGauge` for this key name numDirs times. Hence, this test should ideally fail. What am I missing here? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] dajac commented on pull request #13926: KAFKA-15129;[2/N] Remove metrics in GroupMetadataManager when shutdown
dajac commented on PR #13926: URL: https://github.com/apache/kafka/pull/13926#issuecomment-1614681662 Found this: https://github.com/apache/kafka/pull/3506#discussion_r128589927. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Commented] (KAFKA-15127) Allow offsets to be reset at the same time a connector is deleted.
[ https://issues.apache.org/jira/browse/KAFKA-15127?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17739095#comment-17739095 ] Chris Egerton commented on KAFKA-15127: --- In that case, perhaps we can leave it unassigned and note in the description that we'd like to let the initial offset management API soak for a bit to gather user feedback before pursuing this? > Allow offsets to be reset at the same time a connector is deleted. > -- > > Key: KAFKA-15127 > URL: https://issues.apache.org/jira/browse/KAFKA-15127 > Project: Kafka > Issue Type: Improvement > Components: KafkaConnect >Reporter: Sagar Rao >Assignee: Sagar Rao >Priority: Major > Labels: needs-kip > > This has been listed as [Future > Work|https://cwiki.apache.org/confluence/display/KAFKA/KIP-875%3A+First-class+offsets+support+in+Kafka+Connect#KIP875:FirstclassoffsetssupportinKafkaConnect-Automaticallydeleteoffsetswithconnectors] > in KIP-875. Now that the delete offsets mechanism is also in place, we can > take this up which will allow connector names to be reused after connector > deletion. -- This message was sent by Atlassian Jira (v8.20.10#820010)
[GitHub] [kafka] clolov commented on pull request #13926: KAFKA-15129;[2/N] Remove metrics in GroupMetadataManager when shutdown
clolov commented on PR #13926: URL: https://github.com/apache/kafka/pull/13926#issuecomment-1614673915 But if this is the case won't we run into the same problem for at least one other subset of metrics? Also if this is the case, that would mean that if we remove the method the tests running as part of the auto-build will start failing, no? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] clolov commented on a diff in pull request #13924: KAFKA-15129;[1/N] Remove metrics in LogCleanerManager when LogCleaner shutdown
clolov commented on code in PR #13924: URL: https://github.com/apache/kafka/pull/13924#discussion_r1247880517 ## core/src/test/scala/unit/kafka/log/LogCleanerTest.scala: ## @@ -83,11 +84,19 @@ class LogCleanerTest { val numMetricsRegistered = LogCleaner.MetricNames.size verify(mockMetricsGroup, times(numMetricsRegistered)).newGauge(anyString(), any()) - // verify that each metric is removed + // verify that each metric in `LogCleaner` is removed LogCleaner.MetricNames.foreach(verify(mockMetricsGroup).removeMetric(_)) + // verify that each metric in `LogCleanerManager` is removed + val mockLogCleanerManagerMetricsGroup = mockMetricsGroupCtor.constructed.get(1) + LogCleanerManager.GaugeMetricNameNoTag.foreach(metricName => verify(mockLogCleanerManagerMetricsGroup).newGauge(ArgumentMatchers.eq(metricName), any())) + LogCleanerManager.GaugeMetricNameWithTag.keySet().asScala.foreach(metricName => verify(mockLogCleanerManagerMetricsGroup).newGauge(ArgumentMatchers.eq(metricName), any(), any())) Review Comment: Sorry, why do you think we will call this multiple times with the same metric name? The way I read this code is that we will get the keys from GaugeMetricNameWithTag and GaugeMetricNameNoTag (which are unique because of the nature of a set and map) and for each one we will verify that it is called only once. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] dajac commented on pull request #13926: KAFKA-15129;[2/N] Remove metrics in GroupMetadataManager when shutdown
dajac commented on PR #13926: URL: https://github.com/apache/kafka/pull/13926#issuecomment-1614670671 I was wondering if it is because we run multiple brokers in the same JVM in tests but I am not sure. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] jeqo commented on a diff in pull request #13561: KAFKA-14888: Added remote log segments retention functionality based on time and size.
jeqo commented on code in PR #13561: URL: https://github.com/apache/kafka/pull/13561#discussion_r1247799637 ## core/src/main/java/kafka/log/remote/RemoteLogManager.java: ## @@ -152,19 +156,23 @@ public class RemoteLogManager implements Closeable { * @param time Time instance. * @param clusterId The cluster id. * @param fetchLog function to get UnifiedLog instance for a given topic. + * @param updateRemoteLogStartOffset function to update the log-start-offset for a given topic partition. */ public RemoteLogManager(RemoteLogManagerConfig rlmConfig, int brokerId, String logDir, String clusterId, Time time, -Function> fetchLog) { +Function> fetchLog, +BiConsumer updateRemoteLogStartOffset) { Review Comment: nit: ```suggestion BiConsumer updateLogStartOffsetFromRemoteTier) { ``` ## core/src/main/java/kafka/log/remote/RemoteLogManager.java: ## @@ -634,6 +642,241 @@ public void run() { } } +public void handleLogStartOffsetUpdate(TopicPartition topicPartition, long remoteLogStartOffset) { +if (isLeader()) { +logger.debug("Updating {} with remoteLogStartOffset: {}", topicPartition, remoteLogStartOffset); +updateRemoteLogStartOffset.accept(topicPartition, remoteLogStartOffset); +} +} + +class RemoteLogRetentionHandler { + +private final Optional retentionSizeData; +private final Optional retentionTimeData; + +private long remainingBreachedSize; + +private OptionalLong logStartOffset = OptionalLong.empty(); + +public RemoteLogRetentionHandler(Optional retentionSizeData, Optional retentionTimeData) { +this.retentionSizeData = retentionSizeData; +this.retentionTimeData = retentionTimeData; +remainingBreachedSize = retentionSizeData.map(sizeData -> sizeData.remainingBreachedSize).orElse(0L); +} + +private boolean deleteRetentionSizeBreachedSegments(RemoteLogSegmentMetadata metadata) throws RemoteStorageException, ExecutionException, InterruptedException { +if (!retentionSizeData.isPresent()) { +return false; +} + +boolean isSegmentDeleted = deleteRemoteLogSegment(metadata, x -> { +// Assumption that segments contain size >= 0 +if (remainingBreachedSize > 0) { +long remainingBytes = remainingBreachedSize - x.segmentSizeInBytes(); +if (remainingBytes >= 0) { +remainingBreachedSize = remainingBytes; +return true; +} +} + +return false; +}); +if (isSegmentDeleted) { +logStartOffset = OptionalLong.of(metadata.endOffset() + 1); +logger.info("Deleted remote log segment {} due to retention size {} breach. Log size after deletion will be {}.", +metadata.remoteLogSegmentId(), retentionSizeData.get().retentionSize, remainingBreachedSize + retentionSizeData.get().retentionSize); +} +return isSegmentDeleted; +} + +public boolean deleteRetentionTimeBreachedSegments(RemoteLogSegmentMetadata metadata) +throws RemoteStorageException, ExecutionException, InterruptedException { +if (!retentionTimeData.isPresent()) { +return false; +} + +boolean isSegmentDeleted = deleteRemoteLogSegment(metadata, +x -> x.maxTimestampMs() <= retentionTimeData.get().cleanupUntilMs); +if (isSegmentDeleted) { +remainingBreachedSize = Math.max(0, remainingBreachedSize - metadata.segmentSizeInBytes()); +// It is fine to have logStartOffset as `metadata.endOffset() + 1` as the segment offset intervals +// are ascending with in an epoch. +logStartOffset = OptionalLong.of(metadata.endOffset() + 1); +logger.info("Deleted remote log segment {} due to retention time {}ms breach based on the largest record timestamp in the segment", +metadata.remoteLogSegmentId(), retentionTimeData.get().retentionMs); +} +return isSegmentDeleted; +} + +private boolean deleteLogStartOffsetBreachedSegments(RemoteLogSegmentMetadata metadata, long startOffset) +throws
[GitHub] [kafka] clolov commented on pull request #13926: KAFKA-15129;[2/N] Remove metrics in GroupMetadataManager when shutdown
clolov commented on PR #13926: URL: https://github.com/apache/kafka/pull/13926#issuecomment-1614665719 Okay, that makes sense @dajac, do you happen to know (or are able to deduce) the answer to the other question about why we needed the recreateGauge method in the first place and is it safe to get rid of it now? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] divijvaidya commented on a diff in pull request #13924: KAFKA-15129;[1/N] Remove metrics in LogCleanerManager when LogCleaner shutdown
divijvaidya commented on code in PR #13924: URL: https://github.com/apache/kafka/pull/13924#discussion_r1247863305 ## core/src/test/scala/unit/kafka/log/LogCleanerTest.scala: ## @@ -83,11 +84,19 @@ class LogCleanerTest { val numMetricsRegistered = LogCleaner.MetricNames.size verify(mockMetricsGroup, times(numMetricsRegistered)).newGauge(anyString(), any()) - // verify that each metric is removed + // verify that each metric in `LogCleaner` is removed LogCleaner.MetricNames.foreach(verify(mockMetricsGroup).removeMetric(_)) + // verify that each metric in `LogCleanerManager` is removed + val mockLogCleanerManagerMetricsGroup = mockMetricsGroupCtor.constructed.get(1) + LogCleanerManager.GaugeMetricNameNoTag.foreach(metricName => verify(mockLogCleanerManagerMetricsGroup).newGauge(ArgumentMatchers.eq(metricName), any())) + LogCleanerManager.GaugeMetricNameWithTag.keySet().asScala.foreach(metricName => verify(mockLogCleanerManagerMetricsGroup).newGauge(ArgumentMatchers.eq(metricName), any(), any())) Review Comment: does this work? In our case we will call this multiple times with same metric Name. The any() will match all such occurrences whereas we are saying that this invocation occurs only 1 time (default is times(1), when not specified) Please help me understand why it's working. ## core/src/main/scala/kafka/log/LogCleanerManager.scala: ## @@ -88,17 +88,21 @@ private[log] class LogCleanerManager(val logDirs: Seq[File], /* for coordinating the pausing and the cleaning of a partition */ private val pausedCleaningCond = lock.newCondition() + // Avoid adding legacy tags for a metric when initializing `LogCleanerManager` + GaugeMetricNameWithTag.clear() /* gauges for tracking the number of partitions marked as uncleanable for each log directory */ for (dir <- logDirs) { -metricsGroup.newGauge("uncleanable-partitions-count", +metricsGroup.newGauge(UncleanablePartitionsCountMetricName, () => inLock(lock) { uncleanablePartitions.get(dir.getAbsolutePath).map(_.size).getOrElse(0) }, Map("logDirectory" -> dir.getAbsolutePath).asJava ) + GaugeMetricNameWithTag.computeIfAbsent(UncleanablePartitionsCountMetricName, k => new java.util.ArrayList[java.util.Map[String, String]]()) + .add(Map("logDirectory" -> dir.getAbsolutePath).asJava) Review Comment: could we move the `Map("logDirectory" -> dir.getAbsolutePath` part to a val metricTag and then re-use it at both locations. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] dajac commented on a diff in pull request #13870: KAFKA-14500; [5/N] Implement JoinGroup protocol in new GroupCoordinator
dajac commented on code in PR #13870: URL: https://github.com/apache/kafka/pull/13870#discussion_r1247814090 ## group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupCoordinatorService.java: ## @@ -266,9 +295,21 @@ public CompletableFuture joinGroup( return FutureUtils.failedFuture(Errors.COORDINATOR_NOT_AVAILABLE.exception()); } -return FutureUtils.failedFuture(Errors.UNSUPPORTED_VERSION.exception( -"This API is not implemented yet." -)); +CompletableFuture responseFuture = new CompletableFuture<>(); + +if (!isValidGroupId(request.groupId(), ApiKeys.forId(request.apiKey( { +responseFuture.complete(new JoinGroupResponseData() +.setMemberId(request.memberId()) +.setErrorCode(Errors.INVALID_GROUP_ID.code())); + +return responseFuture; +} + +runtime.scheduleWriteOperation("generic-group-join", Review Comment: I wonder if we need to handle the future returned by `scheduleWriteOperation` as well. At minimum, we may want to react to errors. This could for instance happen if something goes wrong before the join group handling is event triggered. ## group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupCoordinatorService.java: ## @@ -578,4 +619,19 @@ public void shutdown() { Utils.closeQuietly(runtime, "coordinator runtime"); log.info("Shutdown complete."); } + +private boolean isValidGroupId(String groupId, ApiKeys api) { +if (api == ApiKeys.OFFSET_COMMIT || +api == ApiKeys.OFFSET_FETCH || +api == ApiKeys.DESCRIBE_GROUPS || +api == ApiKeys.DELETE_GROUPS Review Comment: nit: I am not a fan of this validation. I wonder if we should just have two helpers: `isGroupIdNotNull` and `isGroupIdNotEmpty`. In this PR, we would only need `isGroupIdNotEmpty`. What do you think? ## group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupMetadataManager.java: ## @@ -874,4 +1087,1348 @@ public void replay( consumerGroup.updateMember(newMember); } } + +/** + * Replays GroupMetadataKey/Value to update the soft state of + * the generic group. + * + * @param key A GroupMetadataKey key. + * @param value A GroupMetadataValue record. + */ +public void replay( +GroupMetadataKey key, +GroupMetadataValue value, +short version +) { +String groupId = key.group(); + +if (value == null) { +// Tombstone. Group should not be added. +} else { +List loadedMembers = new ArrayList<>(); +for (GroupMetadataValue.MemberMetadata member : value.members()) { +int rebalanceTimeout = version == 0 ? member.sessionTimeout() : member.rebalanceTimeout(); Review Comment: I wonder if we could avoid passing the version to this method by adding `-1` as the default value of `rebalanceTimeout` in `GroupMetadataValue`. It seems that we could rely on this to decide here. Another way that I was thinking about would be to pass the `Record` to the replay method as it contains all the available information. Have you considered this? ## group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupMetadataManager.java: ## @@ -874,4 +1087,1348 @@ public void replay( consumerGroup.updateMember(newMember); } } + +/** + * Replays GroupMetadataKey/Value to update the soft state of + * the generic group. + * + * @param key A GroupMetadataKey key. + * @param value A GroupMetadataValue record. + */ +public void replay( +GroupMetadataKey key, +GroupMetadataValue value, +short version +) { +String groupId = key.group(); + +if (value == null) { +// Tombstone. Group should not be added. Review Comment: I think that the group should be deleted in this case. ## group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupCoordinatorService.java: ## @@ -266,9 +295,21 @@ public CompletableFuture joinGroup( return FutureUtils.failedFuture(Errors.COORDINATOR_NOT_AVAILABLE.exception()); } -return FutureUtils.failedFuture(Errors.UNSUPPORTED_VERSION.exception( -"This API is not implemented yet." -)); +CompletableFuture responseFuture = new CompletableFuture<>(); + +if (!isValidGroupId(request.groupId(), ApiKeys.forId(request.apiKey( { +responseFuture.complete(new JoinGroupResponseData() +.setMemberId(request.memberId()) +.setErrorCode(Errors.INVALID_GROUP_ID.code())); + +return responseFuture; +} + +runtime.scheduleGenericGroupOperation("generic-group-join",
[GitHub] [kafka] dajac commented on pull request #13926: KAFKA-15129;[2/N] Remove metrics in GroupMetadataManager when shutdown
dajac commented on PR #13926: URL: https://github.com/apache/kafka/pull/13926#issuecomment-1614635663 @clolov Constants (in the companion object) in Scala start with a capital letter in our code base. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] clolov commented on pull request #13926: KAFKA-15129;[2/N] Remove metrics in GroupMetadataManager when shutdown
clolov commented on PR #13926: URL: https://github.com/apache/kafka/pull/13926#issuecomment-1614621247 In theory this change makes sense to me. As with the [1/N] one I would prefer if variable names start with a lowercase unless there is a good reason for them not to. I have reached out to @guozhangwang who was also wondering what the purpose of the recreateGauge method is. I thought that there is only ever one GroupMetadataManager so I do not understand the purpose of this method in the first place and I would like to get a confirmation before we remove it. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] kriscfoster commented on pull request #13865: KAFKA-15096: Update snappy-java to 1.1.10.1
kriscfoster commented on PR #13865: URL: https://github.com/apache/kafka/pull/13865#issuecomment-1614606533 Great, thank you @jlprat! -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] jlprat commented on pull request #13865: KAFKA-15096: Update snappy-java to 1.1.10.1
jlprat commented on PR #13865: URL: https://github.com/apache/kafka/pull/13865#issuecomment-1614604961 Hi @kriscfoster Currently `3.5.1` and `3.6.0` are the only releases in the making (`3.6.0` is expected to be released during September) that I'm aware of. I think the version that would be released the soonest with this patch is `3.5.1`. If you would need to have this fixed for `3.4.x` you can do a jar replacement of `snappy-java` with the `1.1.10.1` version. If there would be high demand from the community to have a new `3.4.x` version, a maintainer might pick up the task of creating a new release. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] divijvaidya merged pull request #13923: KAFKA-15131: Improve RemoteStorageManager exception handling documentation
divijvaidya merged PR #13923: URL: https://github.com/apache/kafka/pull/13923 -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] kriscfoster commented on pull request #13865: KAFKA-15096: Update snappy-java to 1.1.10.1
kriscfoster commented on PR #13865: URL: https://github.com/apache/kafka/pull/13865#issuecomment-1614589914 @jlprat do you know when the next release will be on 3.4 branch? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] showuon commented on a diff in pull request #13837: KAFKA-9564: Local Tiered Storage implementation for Remote Storage Manager
showuon commented on code in PR #13837: URL: https://github.com/apache/kafka/pull/13837#discussion_r1247809160 ## storage/src/test/java/org/apache/kafka/server/log/remote/storage/LocalTieredStorageEvent.java: ## @@ -0,0 +1,187 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.kafka.server.log.remote.storage; + +import org.apache.kafka.common.TopicPartition; + +import java.util.Optional; + +import static java.lang.String.format; +import static java.util.Objects.requireNonNull; +import static java.util.Optional.ofNullable; + +/** + * Represents an interaction between a broker and a second-tier storage. This type of event is generated + * by the {@link LocalTieredStorage} which is an implementation of the {@link RemoteStorageManager} + * operating in Kafka's runtime as the interface between Kafka and external storage systems, through + * which all such interactions go through. + */ +public final class LocalTieredStorageEvent implements Comparable { + +/** + * The nature of the interaction. + */ +public enum EventType { +OFFLOAD_SEGMENT, +FETCH_SEGMENT, +FETCH_OFFSET_INDEX, +FETCH_TIME_INDEX, +FETCH_TRANSACTION_INDEX, +FETCH_LEADER_EPOCH_CHECKPOINT, +FETCH_PRODUCER_SNAPSHOT, +DELETE_SEGMENT, +DELETE_PARTITION +} + +private final int brokerId; +private final EventType type; +private final RemoteLogSegmentId segmentId; +private final int timestamp; +private final Optional fileset; +private final Optional metadata; +private final int startPosition; +private final int endPosition; +private final Optional exception; + +/** + * Assess whether this event matches the characteristics of an event specified by the {@code condition}. + * + * @param condition The condition which contains the characteristics to match. + * @return true if this event matches the condition's characteristics, false otherwise. + */ +public boolean matches(final LocalTieredStorageCondition condition) { +if (brokerId != condition.brokerId) { +return false; +} +if (condition.eventType != type) { +return false; +} +if (!segmentId.topicIdPartition().topicPartition().equals(condition.topicPartition)) { +return false; +} +if (!exception.map(e -> condition.failed).orElseGet(() -> !condition.failed)) { +return false; +} +return true; +} + +/** + * Returns whether the provided {@code event} was created after the present event. + * This assumes a chronological ordering of events. + * Both events need to be generated from the same broker. + * + * @param event The event to compare + * @return true if the current instance was generated after the given {@code event}, + * false if events are equal or the current instance was generated before the + * given {@code event}. + */ +public boolean isAfter(final LocalTieredStorageEvent event) { +return event.timestamp < timestamp; +} + +public EventType getType() { +return type; +} + +public TopicPartition getTopicPartition() { +return segmentId.topicIdPartition().topicPartition(); +} + +@Override +public int compareTo(LocalTieredStorageEvent other) { +requireNonNull(other); + +if (other.timestamp > timestamp) { +return -1; +} +if (other.timestamp < timestamp) { +return 1; +} +return 0; Review Comment: We can directly return ``` return timestamp - other.timestamp; ``` > Compares this object with the specified object for order. Returns a negative integer, zero, or a positive integer as this object is less than, equal to, or greater than the specified object. ref: https://docs.oracle.com/javase/8/docs/api/java/lang/Comparable.html#compareTo-T- ## storage/src/test/java/org/apache/kafka/server/log/remote/storage/LocalTieredStorageEvent.java: ## @@ -0,0 +1,187 @@ +/* + * Licensed to the Apache Software Foundation
[GitHub] [kafka] dajac commented on a diff in pull request #13901: KAFKA-14462; [20/N] Refresh subscription metadata on new metadata image
dajac commented on code in PR #13901: URL: https://github.com/apache/kafka/pull/13901#discussion_r1247805182 ## group-coordinator/src/test/java/org/apache/kafka/coordinator/group/GroupMetadataManagerTest.java: ## @@ -172,19 +186,21 @@ public List build(TopicsImage topicsImage) { }); // Add subscription metadata. -Map subscriptionMetadata = new HashMap<>(); -members.forEach((memberId, member) -> { -member.subscribedTopicNames().forEach(topicName -> { -TopicImage topicImage = topicsImage.getTopic(topicName); -if (topicImage != null) { -subscriptionMetadata.put(topicName, new TopicMetadata( -topicImage.id(), -topicImage.name(), -topicImage.partitions().size() -)); -} +if (subscriptionMetadata == null) { Review Comment: Most of tests are just fine with the auto-generated subscription metadata. However, the new ones need specific subscription metadata to verify the check. This is why I extended this builder to either accept the subscription metadata to use or to auto-generate it. ## group-coordinator/src/test/java/org/apache/kafka/coordinator/group/runtime/CoordinatorRuntimeTest.java: ## @@ -825,4 +828,60 @@ public void testClose() throws Exception { assertFutureThrows(write1, NotCoordinatorException.class); assertFutureThrows(write2, NotCoordinatorException.class); } + +@Test +public void testOnNewMetadataImage() { +TopicPartition tp0 = new TopicPartition("__consumer_offsets", 0); +TopicPartition tp1 = new TopicPartition("__consumer_offsets", 1); + +MockCoordinatorLoader loader = mock(MockCoordinatorLoader.class); +MockPartitionWriter writer = mock(MockPartitionWriter.class); +MockCoordinatorBuilderSupplier supplier = mock(MockCoordinatorBuilderSupplier.class); +MockCoordinatorBuilder builder = mock(MockCoordinatorBuilder.class); + +CoordinatorRuntime runtime = +new CoordinatorRuntime.Builder() +.withLoader(loader) +.withEventProcessor(new MockEventProcessor()) +.withPartitionWriter(writer) +.withCoordinatorBuilderSupplier(supplier) +.build(); + +MockCoordinator coordinator0 = mock(MockCoordinator.class); +MockCoordinator coordinator1 = mock(MockCoordinator.class); + +when(supplier.get()).thenReturn(builder); +when(builder.withSnapshotRegistry(any())).thenReturn(builder); +when(builder.withLogContext(any())).thenReturn(builder); +when(builder.build()) +.thenReturn(coordinator0) +.thenReturn(coordinator1); + +CompletableFuture future0 = new CompletableFuture<>(); +when(loader.load(tp0, coordinator0)).thenReturn(future0); + +CompletableFuture future1 = new CompletableFuture<>(); +when(loader.load(tp1, coordinator1)).thenReturn(future1); + +runtime.scheduleLoadOperation(tp0, 0); +runtime.scheduleLoadOperation(tp1, 0); + +// Coordinator 0 is loaded. It should get the current image +// that is the empty one. +future0.complete(null); +verify(coordinator0).onLoaded(MetadataImage.EMPTY); + +// Publish a new image. +MetadataDelta delta = new MetadataDelta(MetadataImage.EMPTY); +MetadataImage newImage = delta.apply(MetadataProvenance.EMPTY); +runtime.onNewMetadataImage(newImage, delta); + +// Coordinator 0 should be notified about it. +verify(coordinator0).onNewMetadataImage(newImage, delta); Review Comment: We cannot test this here because the runtime is not aware of the concrete implementation of the state machine. I also want to ensure that we are on the same page. The metadata image is updated when `onNewMetadataImage` is called but the subscription metadata is refreshed on the next heartbeat. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] dajac commented on a diff in pull request #13901: KAFKA-14462; [20/N] Refresh subscription metadata on new metadata image
dajac commented on code in PR #13901: URL: https://github.com/apache/kafka/pull/13901#discussion_r1247803617 ## group-coordinator/src/test/java/org/apache/kafka/coordinator/group/runtime/CoordinatorRuntimeTest.java: ## @@ -825,4 +828,60 @@ public void testClose() throws Exception { assertFutureThrows(write1, NotCoordinatorException.class); assertFutureThrows(write2, NotCoordinatorException.class); } + +@Test +public void testOnNewMetadataImage() { +TopicPartition tp0 = new TopicPartition("__consumer_offsets", 0); +TopicPartition tp1 = new TopicPartition("__consumer_offsets", 1); + +MockCoordinatorLoader loader = mock(MockCoordinatorLoader.class); +MockPartitionWriter writer = mock(MockPartitionWriter.class); +MockCoordinatorBuilderSupplier supplier = mock(MockCoordinatorBuilderSupplier.class); +MockCoordinatorBuilder builder = mock(MockCoordinatorBuilder.class); + +CoordinatorRuntime runtime = +new CoordinatorRuntime.Builder() +.withLoader(loader) +.withEventProcessor(new MockEventProcessor()) +.withPartitionWriter(writer) +.withCoordinatorBuilderSupplier(supplier) +.build(); + +MockCoordinator coordinator0 = mock(MockCoordinator.class); +MockCoordinator coordinator1 = mock(MockCoordinator.class); + +when(supplier.get()).thenReturn(builder); +when(builder.withSnapshotRegistry(any())).thenReturn(builder); +when(builder.withLogContext(any())).thenReturn(builder); +when(builder.build()) +.thenReturn(coordinator0) +.thenReturn(coordinator1); + +CompletableFuture future0 = new CompletableFuture<>(); +when(loader.load(tp0, coordinator0)).thenReturn(future0); + +CompletableFuture future1 = new CompletableFuture<>(); +when(loader.load(tp1, coordinator1)).thenReturn(future1); + +runtime.scheduleLoadOperation(tp0, 0); +runtime.scheduleLoadOperation(tp1, 0); + +// Coordinator 0 is loaded. It should get the current image +// that is the empty one. +future0.complete(null); +verify(coordinator0).onLoaded(MetadataImage.EMPTY); + +// Publish a new image. +MetadataDelta delta = new MetadataDelta(MetadataImage.EMPTY); +MetadataImage newImage = delta.apply(MetadataProvenance.EMPTY); +runtime.onNewMetadataImage(newImage, delta); + +// Coordinator 0 should be notified about it. +verify(coordinator0).onNewMetadataImage(newImage, delta); Review Comment: We cannot test this here because the runtime is not aware of the concrete implementation of the state machine. I also want to ensure that we are on the same page. The metadata image is updated when `onNewMetadataImage` is called but the subscription metadata is refreshed on the next heartbeat. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] hudeqi commented on a diff in pull request #13924: KAFKA-15129;[1/N] Remove metrics in LogCleanerManager when LogCleaner shutdown
hudeqi commented on code in PR #13924: URL: https://github.com/apache/kafka/pull/13924#discussion_r1247795935 ## core/src/main/scala/kafka/log/LogCleanerManager.scala: ## @@ -555,6 +568,17 @@ private case class OffsetsToClean(firstDirtyOffset: Long, } private[log] object LogCleanerManager extends Logging { + private val UncleanablePartitionsCountMetricName = "uncleanable-partitions-count" Review Comment: > Is there a reason why decided to have these variable names start with a capital letter? Learned scala writing rules from discussions like PR: https://github.com/apache/kafka/pull/13623#discussion_r1182593025 @clolov -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] hudeqi commented on a diff in pull request #13924: KAFKA-15129;[1/N] Remove metrics in LogCleanerManager when LogCleaner shutdown
hudeqi commented on code in PR #13924: URL: https://github.com/apache/kafka/pull/13924#discussion_r1247795935 ## core/src/main/scala/kafka/log/LogCleanerManager.scala: ## @@ -555,6 +568,17 @@ private case class OffsetsToClean(firstDirtyOffset: Long, } private[log] object LogCleanerManager extends Logging { + private val UncleanablePartitionsCountMetricName = "uncleanable-partitions-count" Review Comment: > Is there a reason why decided to have these variable names start with a capital letter? https://github.com/apache/kafka/pull/13623#discussion_r1182593025 -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] clolov commented on a diff in pull request #13924: KAFKA-15129;[1/N] Remove metrics in LogCleanerManager when LogCleaner shutdown
clolov commented on code in PR #13924: URL: https://github.com/apache/kafka/pull/13924#discussion_r1247788765 ## core/src/main/scala/kafka/log/LogCleanerManager.scala: ## @@ -555,6 +568,17 @@ private case class OffsetsToClean(firstDirtyOffset: Long, } private[log] object LogCleanerManager extends Logging { + private val UncleanablePartitionsCountMetricName = "uncleanable-partitions-count" Review Comment: Is there a reason why decided to have these variable names start with a capital letter? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] jeqo commented on a diff in pull request #13938: KAFKA-15135: fix(storage): pass endpoint configurations as client commont to TBRLMM
jeqo commented on code in PR #13938: URL: https://github.com/apache/kafka/pull/13938#discussion_r1247786162 ## core/src/test/java/kafka/log/remote/RemoteLogManagerTest.java: ## @@ -220,8 +220,8 @@ void testRemoteLogMetadataManagerWithEndpointConfig() { ArgumentCaptor> capture = ArgumentCaptor.forClass(Map.class); verify(remoteLogMetadataManager, times(1)).configure(capture.capture()); -assertEquals(host + ":" + port, capture.getValue().get("bootstrap.servers")); -assertEquals(securityProtocol, capture.getValue().get("security.protocol")); +assertEquals(host + ":" + port, capture.getValue().get("remote.log.metadata.common.client.bootstrap.servers")); +assertEquals(securityProtocol, capture.getValue().get("remote.log.metadata.common.client.security.protocol")); Review Comment: Sure, I was following the text content instead, but we can use the constant here as well. Applying fix -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] vamossagar12 commented on pull request #13158: KAFKA-14647: Moving TopicFilter to server-common/utils
vamossagar12 commented on PR #13158: URL: https://github.com/apache/kafka/pull/13158#issuecomment-1614540440 @ruslankrivoshein , I have fixed the checkstyle issues. Also, I believe that the other comment [here](https://github.com/apache/kafka/pull/13158#issuecomment-1422555044) has been addressed by you in the PR https://github.com/apache/kafka/pull/13562. Sorry for the long wait here- it just went off my radar. @fvaleri , do you think this is looking fine now? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] vamossagar12 opened a new pull request, #13939: [MINOR] Correcting few WARN log lines in DistributedHerder#handleRebalance
vamossagar12 opened a new pull request, #13939: URL: https://github.com/apache/kafka/pull/13939 Minor improvements to comments. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] vamossagar12 closed pull request #12874: Reproducing callbacks added even when workthread is terminated
vamossagar12 closed pull request #12874: Reproducing callbacks added even when workthread is terminated URL: https://github.com/apache/kafka/pull/12874 -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] vamossagar12 commented on pull request #12874: Reproducing callbacks added even when workthread is terminated
vamossagar12 commented on PR #12874: URL: https://github.com/apache/kafka/pull/12874#issuecomment-1614498011 Closing as the main purpose of this was to do a POC -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] showuon commented on a diff in pull request #13938: KAFKA-15135: fix(storage): pass endpoint configurations as client commont to TBRLMM
showuon commented on code in PR #13938: URL: https://github.com/apache/kafka/pull/13938#discussion_r1247736125 ## core/src/test/java/kafka/log/remote/RemoteLogManagerTest.java: ## @@ -220,8 +220,8 @@ void testRemoteLogMetadataManagerWithEndpointConfig() { ArgumentCaptor> capture = ArgumentCaptor.forClass(Map.class); verify(remoteLogMetadataManager, times(1)).configure(capture.capture()); -assertEquals(host + ":" + port, capture.getValue().get("bootstrap.servers")); -assertEquals(securityProtocol, capture.getValue().get("security.protocol")); +assertEquals(host + ":" + port, capture.getValue().get("remote.log.metadata.common.client.bootstrap.servers")); +assertEquals(securityProtocol, capture.getValue().get("remote.log.metadata.common.client.security.protocol")); Review Comment: Any reason why don't we use `REMOTE_LOG_METADATA_COMMON_CLIENT_PREFIX` like other places did? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] satishd commented on a diff in pull request #13923: KAFKA-15131: Improve RemoteStorageManager exception handling documentation
satishd commented on code in PR #13923: URL: https://github.com/apache/kafka/pull/13923#discussion_r1247690846 ## storage/api/src/main/java/org/apache/kafka/server/log/remote/storage/RemoteStorageManager.java: ## @@ -75,6 +75,9 @@ enum IndexType { * * Invoker of this API should always send a unique id as part of {@link RemoteLogSegmentMetadata#remoteLogSegmentId()} * even when it retries to invoke this method for the same log segment data. + * + * This operation is expected to be idempotent. If a copy operation is retried and there is existing content already written, Review Comment: Newly added statement gives more clarity on idempotency. The unique id sent as part of `RemoteLogSegmentMetadata` can be used to avoid overwrites in RSM implementation. Caller makes sure it sends unique id for multiple invocations. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Updated] (KAFKA-15136) The inspection of field allowAutoTopicCreation field in MetadataRequest is unreasonable
[ https://issues.apache.org/jira/browse/KAFKA-15136?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Xiaobing Fang updated KAFKA-15136: -- Priority: Major (was: Minor) > The inspection of field allowAutoTopicCreation field in MetadataRequest is > unreasonable > --- > > Key: KAFKA-15136 > URL: https://issues.apache.org/jira/browse/KAFKA-15136 > Project: Kafka > Issue Type: Improvement > Components: clients >Reporter: Xiaobing Fang >Priority: Major > > > {code:java} > if (!data.allowAutoTopicCreation() && version < 4) > throw new UnsupportedVersionException("MetadataRequest versions older > than 4 don't support the " + > "allowAutoTopicCreation field"); {code} > > Background: > Based on my understanding, the code in MetadataRequest is intended to ignore > the allowAutoTopicCreation field when version≤4. However, if kafka server is > configured with "auto.create.topics.enable"=false and the client sets > allowAutoTopicCreation=false, UnsupportedVersionExceptionwill be thrown, > which is not expected. > > Issues: > # `KafkaAdminClient#handleDescribeTopicsByNames()` throws an exception when > sending MetadataRequest to a lower version server, making it unusable. > # MetadataRequestTest avoids this issue by setting > `allowAutoTopicCreation=true` in tests for version≤4, but this is not > reasonable. And the comments in > [testAutoTopicCreation|https://github.com/apache/kafka/blob/1f4cbc5d53259031123b6e9e6bb9a5bbe1e084e8/core/src/test/scala/unit/kafka/server/MetadataRequestTest.scala#L146] > may also be problematic. > > Solution: > # Remove the checking code in MetadataRequest. > # Add a field `hasSetAllowAutoTopicCreation` in MetadataRequest. Only throw > an exception when `version≤4` and `hasSetAllowAutoTopicCreation=true`, > without considering the value of allowAutoTopicCreation field. > > If there is indeed an issue, I can work on fixing it. Looking forward to your > reply. -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Updated] (KAFKA-15136) The inspection of field allowAutoTopicCreation field in MetadataRequest is unreasonable
[ https://issues.apache.org/jira/browse/KAFKA-15136?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Xiaobing Fang updated KAFKA-15136: -- Description: {code:java} if (!data.allowAutoTopicCreation() && version < 4) throw new UnsupportedVersionException("MetadataRequest versions older than 4 don't support the " + "allowAutoTopicCreation field"); {code} Background: Based on my understanding, the code in MetadataRequest is intended to ignore the allowAutoTopicCreation field when version≤4. However, if kafka server is configured with "auto.create.topics.enable"=false and the client sets allowAutoTopicCreation=false, UnsupportedVersionExceptionwill be thrown, which is not expected. Issues: # `KafkaAdminClient#handleDescribeTopicsByNames()` throws an exception when sending MetadataRequest to a lower version server, making it unusable. # MetadataRequestTest avoids this issue by setting `allowAutoTopicCreation=true` in tests for version≤4, but this is not reasonable. And the comments in [testAutoTopicCreation|https://github.com/apache/kafka/blob/1f4cbc5d53259031123b6e9e6bb9a5bbe1e084e8/core/src/test/scala/unit/kafka/server/MetadataRequestTest.scala#L146] may also be problematic. Solution: # Remove the checking code in MetadataRequest. # Add a field `hasSetAllowAutoTopicCreation` in MetadataRequest. Only throw an exception when `version≤4` and `hasSetAllowAutoTopicCreation=true`, without considering the value of allowAutoTopicCreation field. If there is indeed an issue, I can work on fixing it. Looking forward to your reply. was: ```java if (!data.allowAutoTopicCreation() && version < 4) throw new UnsupportedVersionException("MetadataRequest versions older than 4 don't support the " + "allowAutoTopicCreation field"); ``` Background: Based on my understanding, the code in MetadataRequest is intended to ignore the allowAutoTopicCreation field when version≤4. However, if kafka server is configured with "auto.create.topics.enable"=false and the client sets allowAutoTopicCreation=false, UnsupportedVersionExceptionwill be thrown, which is not expected. Issues: # `KafkaAdminClient#handleDescribeTopicsByNames()` throws an exception when sending MetadataRequest to a lower version server, making it unusable. # MetadataRequestTest avoids this issue by setting `allowAutoTopicCreation=true` in tests for version≤4, but this is not reasonable. And the comments in [testAutoTopicCreation|https://github.com/apache/kafka/blob/1f4cbc5d53259031123b6e9e6bb9a5bbe1e084e8/core/src/test/scala/unit/kafka/server/MetadataRequestTest.scala#L146] may also be problematic. Solution: # Remove the checking code in MetadataRequest. # Add a field `hasSetAllowAutoTopicCreation` in MetadataRequest. Only throw an exception when `version≤4` and `hasSetAllowAutoTopicCreation=true`, without considering the value of allowAutoTopicCreation field. If there is indeed an issue, I can work on fixing it. Looking forward to your reply. > The inspection of field allowAutoTopicCreation field in MetadataRequest is > unreasonable > --- > > Key: KAFKA-15136 > URL: https://issues.apache.org/jira/browse/KAFKA-15136 > Project: Kafka > Issue Type: Improvement > Components: clients >Reporter: Xiaobing Fang >Priority: Minor > > > {code:java} > if (!data.allowAutoTopicCreation() && version < 4) > throw new UnsupportedVersionException("MetadataRequest versions older > than 4 don't support the " + > "allowAutoTopicCreation field"); {code} > > Background: > Based on my understanding, the code in MetadataRequest is intended to ignore > the allowAutoTopicCreation field when version≤4. However, if kafka server is > configured with "auto.create.topics.enable"=false and the client sets > allowAutoTopicCreation=false, UnsupportedVersionExceptionwill be thrown, > which is not expected. > > Issues: > # `KafkaAdminClient#handleDescribeTopicsByNames()` throws an exception when > sending MetadataRequest to a lower version server, making it unusable. > # MetadataRequestTest avoids this issue by setting > `allowAutoTopicCreation=true` in tests for version≤4, but this is not > reasonable. And the comments in > [testAutoTopicCreation|https://github.com/apache/kafka/blob/1f4cbc5d53259031123b6e9e6bb9a5bbe1e084e8/core/src/test/scala/unit/kafka/server/MetadataRequestTest.scala#L146] > may also be problematic. > > Solution: > # Remove the checking code in MetadataRequest. > # Add a field `hasSetAllowAutoTopicCreation` in MetadataRequest. Only throw > an exception when `version≤4` and `hasSetAllowAutoTopicCreation=true`, > without considering the value of allowAutoTopicCreation field. > > If there is indeed an issue, I can work on fixing it. Looking forward to
[jira] [Created] (KAFKA-15136) The inspection of field allowAutoTopicCreation field in MetadataRequest is unreasonable
Xiaobing Fang created KAFKA-15136: - Summary: The inspection of field allowAutoTopicCreation field in MetadataRequest is unreasonable Key: KAFKA-15136 URL: https://issues.apache.org/jira/browse/KAFKA-15136 Project: Kafka Issue Type: Improvement Components: clients Reporter: Xiaobing Fang ```java if (!data.allowAutoTopicCreation() && version < 4) throw new UnsupportedVersionException("MetadataRequest versions older than 4 don't support the " + "allowAutoTopicCreation field"); ``` Background: Based on my understanding, the code in MetadataRequest is intended to ignore the allowAutoTopicCreation field when version≤4. However, if kafka server is configured with "auto.create.topics.enable"=false and the client sets allowAutoTopicCreation=false, UnsupportedVersionExceptionwill be thrown, which is not expected. Issues: # `KafkaAdminClient#handleDescribeTopicsByNames()` throws an exception when sending MetadataRequest to a lower version server, making it unusable. # MetadataRequestTest avoids this issue by setting `allowAutoTopicCreation=true` in tests for version≤4, but this is not reasonable. And the comments in [testAutoTopicCreation|https://github.com/apache/kafka/blob/1f4cbc5d53259031123b6e9e6bb9a5bbe1e084e8/core/src/test/scala/unit/kafka/server/MetadataRequestTest.scala#L146] may also be problematic. Solution: # Remove the checking code in MetadataRequest. # Add a field `hasSetAllowAutoTopicCreation` in MetadataRequest. Only throw an exception when `version≤4` and `hasSetAllowAutoTopicCreation=true`, without considering the value of allowAutoTopicCreation field. If there is indeed an issue, I can work on fixing it. Looking forward to your reply. -- This message was sent by Atlassian Jira (v8.20.10#820010)
[GitHub] [kafka] jeqo commented on pull request #13828: KAFKA-15066: add "remote.log.metadata.manager.listener.name" config to rlmm
jeqo commented on PR #13828: URL: https://github.com/apache/kafka/pull/13828#issuecomment-1614448203 Sure!, created this: https://github.com/apache/kafka/pull/13938 -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] jeqo opened a new pull request, #13938: KAFKA-15135: fix(storage): pass endpoint configurations as client commont to TBRLMM
jeqo opened a new pull request, #13938: URL: https://github.com/apache/kafka/pull/13938 Pass endpoint properties from RLM to TBRLMM and validate those are not ignored. ### Committer Checklist (excluded from commit message) - [ ] Verify design and implementation - [ ] Verify test coverage and CI build status - [ ] Verify documentation (including upgrade notes) -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] satishd commented on a diff in pull request #13923: KAFKA-15131: Improve RemoteStorageManager exception handling documentation
satishd commented on code in PR #13923: URL: https://github.com/apache/kafka/pull/13923#discussion_r1247695189 ## storage/api/src/main/java/org/apache/kafka/server/log/remote/storage/RemoteStorageManager.java: ## @@ -120,7 +123,10 @@ InputStream fetchLogSegment(RemoteLogSegmentMetadata remoteLogSegmentMetadata, * @param indexTypetype of the index to be fetched for the segment. * @return input stream of the requested index. * @throws RemoteStorageException if there are any errors while fetching the index. - * @throws RemoteResourceNotFoundException when there are no resources associated with the given remoteLogSegmentMetadata. + * @throws RemoteResourceNotFoundException the requested index is not found in the remote storage + * (e.g. Transaction index may not exist because segments create prior to version 2.8.0 will not have transaction index associated with them.). Review Comment: minor typo: `segments create prior to version` -> `segments created prior to version` ## storage/api/src/main/java/org/apache/kafka/server/log/remote/storage/RemoteStorageManager.java: ## @@ -75,6 +75,9 @@ enum IndexType { * * Invoker of this API should always send a unique id as part of {@link RemoteLogSegmentMetadata#remoteLogSegmentId()} * even when it retries to invoke this method for the same log segment data. + * + * This operation is expected to be idempotent. If a copy operation is retried and there is existing content already written, Review Comment: Newly added statement gives more clarity on idempotency. The given unique id sent as part of `RemoteLogSegmentMetadata` can be used to avoid overwrites in RSM implementation. Caller makes sure it sends unique id for multiple invocations. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org