Re: [PR] KAFKA-15281: Implement the groupMetadata Consumer API [kafka]
cadonna merged PR #14879: URL: https://github.com/apache/kafka/pull/14879 -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Assigned] (KAFKA-15751) KRaft support in BaseAdminIntegrationTest
[ https://issues.apache.org/jira/browse/KAFKA-15751?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Gantigmaa Selenge reassigned KAFKA-15751: - Assignee: Gantigmaa Selenge > KRaft support in BaseAdminIntegrationTest > - > > Key: KAFKA-15751 > URL: https://issues.apache.org/jira/browse/KAFKA-15751 > Project: Kafka > Issue Type: Task > Components: core >Reporter: Sameer Tejani >Assignee: Gantigmaa Selenge >Priority: Minor > Labels: kraft, kraft-test, newbie > > The following tests in BaseAdminIntegrationTest in > core/src/test/scala/integration/kafka/api/BaseAdminIntegrationTest.scala need > to be updated to support KRaft > 70 : def testCreateDeleteTopics(): Unit = { > 163 : def testAuthorizedOperations(): Unit = { > Scanned 259 lines. Found 0 KRaft tests out of 2 tests -- This message was sent by Atlassian Jira (v8.20.10#820010)
Re: [PR] MINOR: Allow local-log segment deletion when log-start-offset incremented [kafka]
kamalcph commented on PR #14905: URL: https://github.com/apache/kafka/pull/14905#issuecomment-1842245405 > Does that mean currently, when the log start offset incremented over the candidate segments, we won't delete until the segments are uploaded to the remote storage? The local-log segments won't be eligible for deletion and they are not eligible for upload to remote storage as well since the log-start-offset might already be moved. Once the next-set of segments gets uploaded, then the `highest-copied-remote-offset` will be higher than not-uploaded local-log segments, then they became eligible for deletion. This patch is raised to address edge cases where the topic becomes stale/deprecated post the log-start-offset update. > If so, does that mean those uploaded segments will be deleted next time when when enter `cleanupExpiredRemoteLogSegments` because of `isSegmentBreachByLogStartOffset`? Yes, correct. The uploaded remote segments will be cleaned up by `cleanupExpiredRemoteLogSegments` due to `isSegmentBreachByLogStartOffset`. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-15971: Re-enable consumer integration tests for new consumer [kafka]
dajac commented on PR #14925: URL: https://github.com/apache/kafka/pull/14925#issuecomment-1842219297 We've got two clean builds. I just re-scheduled a new one. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Assigned] (KAFKA-15911) KRaft quorum leader should make sure the follower fetch is making progress
[ https://issues.apache.org/jira/browse/KAFKA-15911?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Luke Chen reassigned KAFKA-15911: - Assignee: Luke Chen > KRaft quorum leader should make sure the follower fetch is making progress > -- > > Key: KAFKA-15911 > URL: https://issues.apache.org/jira/browse/KAFKA-15911 > Project: Kafka > Issue Type: Bug > Components: kraft >Affects Versions: 3.7.0 >Reporter: Luke Chen >Assignee: Luke Chen >Priority: Major > > Just because the leader returned a successful response to FETCH and > FETCH_SNAPSHOT doesn't mean that the followers were able to handle the > response correctly. > For example, imagine the case where the log end offset (LEO) is at 1000 and > all of the followers are continuously fetching at offset 0 without ever > increasing their fetch offset. This can happen if the followers encounter an > error when processing the FETCH or FETCH_SNAPSHOT response. > In this scenario the leader will never be able to increase the HWM. I think > that this scenario is specific to KRaft and doesn't exists in Raft because > KRaft is pull vs Raft which is push. > https://github.com/apache/kafka/pull/14428#pullrequestreview-1751408695 -- This message was sent by Atlassian Jira (v8.20.10#820010)
Re: [PR] KAFKA-14585: Move StorageTool to tools [kafka]
showuon commented on PR #14847: URL: https://github.com/apache/kafka/pull/14847#issuecomment-1842216032 > The tool calls KafkaConfig.validateValues which runs the full set of configuration validations. @fvaleri , sorry, I didn't see where we invoke `KafkaConfig.validateValues` in StorageTool. Could you guide me where it is? Thanks. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-15696: Refactor AsyncConsumer close procedure [kafka]
philipnee commented on code in PR #14920: URL: https://github.com/apache/kafka/pull/14920#discussion_r1416732684 ## clients/src/main/java/org/apache/kafka/clients/consumer/internals/AsyncKafkaConsumer.java: ## @@ -957,6 +966,57 @@ private void close(Duration timeout, boolean swallowException) { } } +/** + * Prior to closing the network thread, we need to make sure the following operations happen in the right sequence: + * 1. autocommit offsets + * 2. revoke all partitions + */ +private void prepareShutdown(final Timer timer) { +if (!groupMetadata.isPresent()) +return; + +maybeAutoCommitSync(timer); +timer.update(); +if (!subscriptions.hasAutoAssignedPartitions() || subscriptions.assignedPartitions().isEmpty()) +return; + +try { +// If the consumer is in a group, we will pause and revoke all assigned partitions +onLeavePrepare().get(timer.remainingMs(), TimeUnit.MILLISECONDS); +timer.update(); +} catch (Exception e) { +Exception exception = e; +if (e instanceof ExecutionException) +exception = (Exception) e.getCause(); +throw new KafkaException("User rebalance callback throws an error", exception); +} finally { +subscriptions.assignFromSubscribed(Collections.emptySet()); +} +} + +private void maybeAutoCommitSync(final Timer timer) { +if (autoCommitEnabled) { +Map allConsumed = subscriptions.allConsumed(); +try { +log.debug("Sending synchronous auto-commit of offsets {} on closing", allConsumed); +commitSync(allConsumed, Duration.ofMillis(timer.remainingMs())); +} catch (Exception e) { +// consistent with async auto-commit failures, we do not propagate the exception +log.warn("Synchronous auto-commit of offsets {} failed: {}", allConsumed, e.getMessage()); +} +} +} + +private CompletableFuture onLeavePrepare() { +SortedSet droppedPartitions = new TreeSet<>(MembershipManagerImpl.TOPIC_PARTITION_COMPARATOR); +droppedPartitions.addAll(subscriptions.assignedPartitions()); +if (!subscriptions.hasAutoAssignedPartitions() || droppedPartitions.isEmpty()) { +return CompletableFuture.completedFuture(null); +} +// TODO: Invoke rebalanceListener via KAFKA-15276 +return CompletableFuture.completedFuture(null); +} + Review Comment: Thanks. I mostly agree with your idea. Though - I think simply firing the callback revocation from the close() should be enough - but I think sending leave-group and closing events as you suggested is a good idea. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-15818: ensure leave group on max poll interval [kafka]
philipnee commented on PR #14873: URL: https://github.com/apache/kafka/pull/14873#issuecomment-1842164667 Hi @lianetm - Thanks for the time reviewing the PR. Instead of using boolean flags, I added a _STALED_ state to address your concern. I think we do need this state to stay consistent with the state logic, i.e., I agree that unsubscribed is probably not the right state to be in as it hints to the user that it needs to subscribe. Let me know what do you think. For the static member - I think the right thing to do is to let the membershipManager to decide the right epoch to send. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-15022: introduce interface to control graph constructor [kafka]
mjsax merged PR #14714: URL: https://github.com/apache/kafka/pull/14714 -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-15022: introduce interface to control graph constructor [kafka]
mjsax commented on PR #14714: URL: https://github.com/apache/kafka/pull/14714#issuecomment-1842147167 Jenkins issues: - build 7: `JDK 11 and Scala 2.13` failed - build 6: `JDK 8 and Scala 2.12` failed Test failures seems to be unrelated to this PR. Merging. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-15684: Support to describe all client metrics resources (KIP-714) [kafka]
apoorvmittal10 commented on PR #14933: URL: https://github.com/apache/kafka/pull/14933#issuecomment-1842077768 @junrao Thanks for reviewing the PR, I have addressed the comments. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-15684: Support to describe all client metrics resources (KIP-714) [kafka]
apoorvmittal10 commented on code in PR #14933: URL: https://github.com/apache/kafka/pull/14933#discussion_r1416635870 ## core/src/test/scala/unit/kafka/server/KafkaApisTest.scala: ## @@ -6922,4 +6923,58 @@ class KafkaApisTest { val expectedResponse = new PushTelemetryResponseData().setErrorCode(Errors.INVALID_REQUEST.code) assertEquals(expectedResponse, response.data) } + + @Test + def testListClientMetricsResourcesNotAllowedForZkClusters(): Unit = { +val request = buildRequest(new ListClientMetricsResourcesRequest.Builder(new ListClientMetricsResourcesRequestData()).build()) +createKafkaApis(enableForwarding = true).handle(request, RequestLocal.NoCaching) + +val response = verifyNoThrottling[ListClientMetricsResourcesResponse](request) +assertEquals(Errors.UNKNOWN_SERVER_ERROR, Errors.forCode(response.data.errorCode)) Review Comment: This test covers when KafkaApis is configured in ZK mode and gets rejected by common `handle` method which throws `new IllegalStateException` which translates to `UNKNOWN_SERVER_ERROR`. Then it also makes sense that no handling is needed in `handleListClientMetricsResources`, as method should always have `clientMetricsManager` if call reaches to that method. But I completed the request logically there if `clientMetricsManager` is None to avoid any error case. I have added a comment in the code as well. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-15684: Support to describe all client metrics resources (KIP-714) [kafka]
apoorvmittal10 commented on code in PR #14933: URL: https://github.com/apache/kafka/pull/14933#discussion_r1416644082 ## core/src/main/scala/kafka/server/KafkaApis.scala: ## @@ -3810,16 +3811,27 @@ class KafkaApis(val requestChannel: RequestChannel, } } - // Just a placeholder for now. def handleListClientMetricsResources(request: RequestChannel.Request): Unit = { val listClientMetricsResourcesRequest = request.body[ListClientMetricsResourcesRequest] if (!authHelper.authorize(request.context, DESCRIBE_CONFIGS, CLUSTER, CLUSTER_NAME)) { requestHelper.sendMaybeThrottle(request, listClientMetricsResourcesRequest.getErrorResponse(Errors.CLUSTER_AUTHORIZATION_FAILED.exception)) } else { - // Just return an empty list in the placeholder - val data = new ListClientMetricsResourcesResponseData() - requestHelper.sendMaybeThrottle(request, new ListClientMetricsResourcesResponse(data)) + clientMetricsManager match { +case Some(metricsManager) => + try { +val data = new ListClientMetricsResourcesResponseData().setClientMetricsResources( + metricsManager.listClientMetricsResources.asScala.map( +name => new ClientMetricsResource().setName(name)).toList.asJava) +requestHelper.sendMaybeThrottle(request, new ListClientMetricsResourcesResponse(data)) + } catch { +case _: Exception => Review Comment: Done. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-15684: Support to describe all client metrics resources (KIP-714) [kafka]
apoorvmittal10 commented on code in PR #14933: URL: https://github.com/apache/kafka/pull/14933#discussion_r1416642552 ## core/src/main/scala/kafka/server/KafkaApis.scala: ## @@ -3810,16 +3811,27 @@ class KafkaApis(val requestChannel: RequestChannel, } } - // Just a placeholder for now. def handleListClientMetricsResources(request: RequestChannel.Request): Unit = { val listClientMetricsResourcesRequest = request.body[ListClientMetricsResourcesRequest] if (!authHelper.authorize(request.context, DESCRIBE_CONFIGS, CLUSTER, CLUSTER_NAME)) { requestHelper.sendMaybeThrottle(request, listClientMetricsResourcesRequest.getErrorResponse(Errors.CLUSTER_AUTHORIZATION_FAILED.exception)) } else { - // Just return an empty list in the placeholder - val data = new ListClientMetricsResourcesResponseData() - requestHelper.sendMaybeThrottle(request, new ListClientMetricsResourcesResponse(data)) + clientMetricsManager match { +case Some(metricsManager) => + try { +val data = new ListClientMetricsResourcesResponseData().setClientMetricsResources( + metricsManager.listClientMetricsResources.asScala.map( +name => new ClientMetricsResource().setName(name)).toList.asJava) +requestHelper.sendMaybeThrottle(request, new ListClientMetricsResourcesResponse(data)) + } catch { +case _: Exception => Review Comment: Yeah, this is not needed in this API. I have removed it and validated in test that API returns `UNKNOWN_SERVER_ERROR` now. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-15684: Support to describe all client metrics resources (KIP-714) [kafka]
apoorvmittal10 commented on code in PR #14933: URL: https://github.com/apache/kafka/pull/14933#discussion_r1416635870 ## core/src/test/scala/unit/kafka/server/KafkaApisTest.scala: ## @@ -6922,4 +6923,58 @@ class KafkaApisTest { val expectedResponse = new PushTelemetryResponseData().setErrorCode(Errors.INVALID_REQUEST.code) assertEquals(expectedResponse, response.data) } + + @Test + def testListClientMetricsResourcesNotAllowedForZkClusters(): Unit = { +val request = buildRequest(new ListClientMetricsResourcesRequest.Builder(new ListClientMetricsResourcesRequestData()).build()) +createKafkaApis(enableForwarding = true).handle(request, RequestLocal.NoCaching) + +val response = verifyNoThrottling[ListClientMetricsResourcesResponse](request) +assertEquals(Errors.UNKNOWN_SERVER_ERROR, Errors.forCode(response.data.errorCode)) Review Comment: This test covers when KafkaApis is configured in ZK mode and gets rejected by common `handle` method which throws `new IllegalStateException` which translates to `UNKNOWN_SERVER_ERROR`. Then it also makes sense that no handling is needed in `handleListClientMetricsResources`, as method should always have `clientMetricsManager` if call reaches to that method. But I completed the request logically there if `clientMetricsManager` is None to avoid any error case. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-15684: Support to describe all client metrics resources (KIP-714) [kafka]
apoorvmittal10 commented on code in PR #14933: URL: https://github.com/apache/kafka/pull/14933#discussion_r1416631582 ## core/src/test/scala/unit/kafka/admin/ConfigCommandTest.scala: ## @@ -1690,13 +1690,33 @@ class ConfigCommandTest extends Logging { } @Test - def shouldNotDescribeClientMetricsConfigWithoutEntityName(): Unit = { + def shouldDescribeClientMetricsConfigWithoutEntityName(): Unit = { val describeOpts = new ConfigCommandOptions(Array("--bootstrap-server", "localhost:9092", "--entity-type", "client-metrics", "--describe")) -val exception = assertThrows(classOf[IllegalArgumentException], () => describeOpts.checkArgs()) -assertEquals("an entity name must be specified with --describe of client-metrics", exception.getMessage) +val resourceCustom = new ConfigResource(ConfigResource.Type.CLIENT_METRICS, "1") +val configEntry = new ConfigEntry("metrics", "*") +val future = new KafkaFutureImpl[util.Map[ConfigResource, Config]] +val describeResult: DescribeConfigsResult = mock(classOf[DescribeConfigsResult]) +when(describeResult.all()).thenReturn(future) + +val node = new Node(1, "localhost", 9092) +val mockAdminClient = new MockAdminClient(util.Collections.singletonList(node), node) { + override def describeConfigs(resources: util.Collection[ConfigResource], options: DescribeConfigsOptions): DescribeConfigsResult = { Review Comment: When the entity-name is not provided then first all names are fetched by calling `listClientMetricsResources` and then later `describeConfigs` gets called on every entity to get the details. I have implemented `listClientMetricsResources` in `MockAdminClient` and calls `mockAdminClient.incrementalAlterConfigs` to set the client metrics resource in `MockAdminClient`. The test calls `listClientMetricsResources` from ConfigCommand.scala and result of which is passed in `describeConfigs` for validation. Later I also verify if `describeConfigs` was called. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-15018: Failing offset flush for EOS when secondary offset store writes fails for tombstone records [kafka]
C0urante commented on PR #13801: URL: https://github.com/apache/kafka/pull/13801#issuecomment-1842053883 Removed the stale label. @vamossagar12 thanks for your patience on this one, I plan on making another pass this week! -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-14598: Fix flaky ConnectRestApiTest [kafka]
ashwinpankaj closed pull request #13084: KAFKA-14598: Fix flaky ConnectRestApiTest URL: https://github.com/apache/kafka/pull/13084 -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-15818: ensure leave group on max poll interval [kafka]
lianetm commented on PR #14873: URL: https://github.com/apache/kafka/pull/14873#issuecomment-1842032321 Thanks for the changes @philipnee, left answers for the static membership and some comments. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-15818: ensure leave group on max poll interval [kafka]
lianetm commented on code in PR #14873: URL: https://github.com/apache/kafka/pull/14873#discussion_r1416614219 ## clients/src/test/java/org/apache/kafka/clients/consumer/internals/HeartbeatRequestManagerTest.java: ## @@ -399,14 +400,45 @@ public void testHeartbeatState() { new ConsumerGroupHeartbeatResponseData.Assignment(); assignmentTopic1.setTopicPartitions(Collections.singletonList(tpTopic1)); ConsumerGroupHeartbeatResponse rs1 = new ConsumerGroupHeartbeatResponse(new ConsumerGroupHeartbeatResponseData() -.setHeartbeatIntervalMs(DEFAULT_HEARTBEAT_INTERVAL_MS) -.setMemberId(memberId) -.setMemberEpoch(1) -.setAssignment(assignmentTopic1)); +.setHeartbeatIntervalMs(DEFAULT_HEARTBEAT_INTERVAL_MS) +.setMemberId(memberId) +.setMemberEpoch(1) +.setAssignment(assignmentTopic1)); membershipManager.onHeartbeatResponseReceived(rs1.data()); assertEquals(MemberState.RECONCILING, membershipManager.state()); } +@Test +public void testEnsureLeaveGroupWhenPollTimerExpires() { +membershipManager.transitionToJoining(); Review Comment: I know we've been all writing this kind of unit tests that go into the integration test land, but if we want to start correcting/simplifying that, this unit test could be simplified. As it is for the HB manager, it should just `verify` the call to the membershipManager func when the timer expires (and then another test in the MembershipMgr for that onStale func, that is needed anyways I would say) -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-15018: Failing offset flush for EOS when secondary offset store writes fails for tombstone records [kafka]
github-actions[bot] commented on PR #13801: URL: https://github.com/apache/kafka/pull/13801#issuecomment-1842029141 This PR is being marked as stale since it has not had any activity in 90 days. If you would like to keep this PR alive, please ask a committer for review. If the PR has merge conflicts, please update it with the latest from trunk (or appropriate release branch) If this PR is no longer valid or desired, please feel free to close it. If no activity occurs in the next 30 days, it will be automatically closed. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-15119:Support incremental syncTopicAcls in MirrorSourceConnector [kafka]
github-actions[bot] commented on PR #13913: URL: https://github.com/apache/kafka/pull/13913#issuecomment-1842029086 This PR is being marked as stale since it has not had any activity in 90 days. If you would like to keep this PR alive, please ask a committer for review. If the PR has merge conflicts, please update it with the latest from trunk (or appropriate release branch) If this PR is no longer valid or desired, please feel free to close it. If no activity occurs in the next 30 days, it will be automatically closed. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-15296: Allow offsets to be committed for filtered records when Exactly Once support is disabled [kafka]
github-actions[bot] commented on PR #14158: URL: https://github.com/apache/kafka/pull/14158#issuecomment-1842028971 This PR is being marked as stale since it has not had any activity in 90 days. If you would like to keep this PR alive, please ask a committer for review. If the PR has merge conflicts, please update it with the latest from trunk (or appropriate release branch) If this PR is no longer valid or desired, please feel free to close it. If no activity occurs in the next 30 days, it will be automatically closed. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] IncrementalCooperativeAssignor#handleLostAssignments invokes logic for lost Assignments even when there are no Lost assignments [kafka]
github-actions[bot] commented on PR #14000: URL: https://github.com/apache/kafka/pull/14000#issuecomment-1842029030 This PR is being marked as stale since it has not had any activity in 90 days. If you would like to keep this PR alive, please ask a committer for review. If the PR has merge conflicts, please update it with the latest from trunk (or appropriate release branch) If this PR is no longer valid or desired, please feel free to close it. If no activity occurs in the next 30 days, it will be automatically closed. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] MINOR: Add missing versions to STs [kafka]
github-actions[bot] commented on PR #14334: URL: https://github.com/apache/kafka/pull/14334#issuecomment-1842028892 This PR is being marked as stale since it has not had any activity in 90 days. If you would like to keep this PR alive, please ask a committer for review. If the PR has merge conflicts, please update it with the latest from trunk (or appropriate release branch) If this PR is no longer valid or desired, please feel free to close it. If no activity occurs in the next 30 days, it will be automatically closed. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] [MINOR] fix: Align LogContext setup for core components [kafka]
github-actions[bot] commented on PR #14348: URL: https://github.com/apache/kafka/pull/14348#issuecomment-1842028870 This PR is being marked as stale since it has not had any activity in 90 days. If you would like to keep this PR alive, please ask a committer for review. If the PR has merge conflicts, please update it with the latest from trunk (or appropriate release branch) If this PR is no longer valid or desired, please feel free to close it. If no activity occurs in the next 30 days, it will be automatically closed. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAKFA-15629: Update upgrade-guide.html for TimestampedKeyQuery and TimestampedRangeQuery [kafka]
hanyuzheng7 commented on PR #14898: URL: https://github.com/apache/kafka/pull/14898#issuecomment-1842028444 > This PR shows merge conflict. Can you rebase it on `trunk`? ok -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-15818: ensure leave group on max poll interval [kafka]
lianetm commented on code in PR #14873: URL: https://github.com/apache/kafka/pull/14873#discussion_r1416609139 ## clients/src/test/java/org/apache/kafka/clients/consumer/internals/HeartbeatRequestManagerTest.java: ## @@ -399,14 +400,51 @@ public void testHeartbeatState() { new ConsumerGroupHeartbeatResponseData.Assignment(); assignmentTopic1.setTopicPartitions(Collections.singletonList(tpTopic1)); ConsumerGroupHeartbeatResponse rs1 = new ConsumerGroupHeartbeatResponse(new ConsumerGroupHeartbeatResponseData() -.setHeartbeatIntervalMs(DEFAULT_HEARTBEAT_INTERVAL_MS) -.setMemberId(memberId) -.setMemberEpoch(1) -.setAssignment(assignmentTopic1)); +.setHeartbeatIntervalMs(DEFAULT_HEARTBEAT_INTERVAL_MS) +.setMemberId(memberId) +.setMemberEpoch(1) +.setAssignment(assignmentTopic1)); membershipManager.onHeartbeatResponseReceived(rs1.data()); assertEquals(MemberState.RECONCILING, membershipManager.state()); } +@Test +public void testEnsureLeaveGroupWhenPollTimerExpires() { +membershipManager.transitionToJoining(); +time.sleep(1); +// Sending first heartbeat and transitioning to stable +assertHeartbeat(heartbeatRequestManager); +assertFalse(heartbeatRequestManager.pollTimer().isExpired()); +// Expires the poll timer, ensure sending a leave group +time.sleep(DEFAULT_MAX_POLL_INTERVAL_MS); +assertLeaveGroup(heartbeatRequestManager); +assertTrue(heartbeatRequestManager.pollTimer().isExpired()); +// Poll again, ensure we heartbeat again. +time.sleep(1); +heartbeatRequestManager.resetPollTimer(); +assertHeartbeat(heartbeatRequestManager); +assertFalse(heartbeatRequestManager.pollTimer().isExpired()); +} + +private void assertHeartbeat(HeartbeatRequestManager hrm) { +System.out.println("assertHeartbeat"); +NetworkClientDelegate.PollResult pollResult = hrm.poll(time.milliseconds()); +assertEquals(1, pollResult.unsentRequests.size()); +assertEquals(DEFAULT_HEARTBEAT_INTERVAL_MS, pollResult.timeUntilNextPollMs); + pollResult.unsentRequests.get(0).handler().onComplete(createHeartbeatResponse(pollResult.unsentRequests.get(0), +Errors.NONE)); +} + +private void assertLeaveGroup(HeartbeatRequestManager hrm) { +NetworkClientDelegate.PollResult pollResult = hrm.poll(time.milliseconds()); +assertEquals(1, pollResult.unsentRequests.size()); +ConsumerGroupHeartbeatRequestData data = (ConsumerGroupHeartbeatRequestData) pollResult.unsentRequests.get(0).requestBuilder().build().data(); +assertEquals(ConsumerGroupHeartbeatRequest.LEAVE_GROUP_MEMBER_EPOCH, data.memberEpoch()); +assertEquals(MemberState.UNSUBSCRIBED, membershipManager.state()); Review Comment: Is this really the right end state when the poll timer expires? This means that the user will have to call `subscribe` to join the group again. Is that what the old coordinator requires? (I expected that the old code just requires the consumer to be polled again to join the group. If my understanding is right then we're missing logic after sending the last HB to leave: on timer expiration we should send the last HB and transitionToJoining so that the member re-joins the group on the next poll) -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-15818: ensure leave group on max poll interval [kafka]
lianetm commented on code in PR #14873: URL: https://github.com/apache/kafka/pull/14873#discussion_r1416605360 ## clients/src/main/java/org/apache/kafka/clients/consumer/internals/HeartbeatRequestManager.java: ## @@ -103,6 +103,12 @@ public class HeartbeatRequestManager implements RequestManager { */ private final BackgroundEventHandler backgroundEventHandler; +/** + * Timer for tracking the time since the last consumer poll. If the timer expires, the consumer will stop + * sending heartbeat until the next poll. + */ Review Comment: My take is that we should always send the leave group when the poll timer expires (group.instance.id null or not). Dynamic members should send -1 and static -2 as described in the KIP. The [documentation of the max.poll.interval](https://kafka.apache.org/documentation/#consumerconfigs_max.poll.interval.ms) is just describing what will happen on the broker side (if static member leaving, broker won't re-assign the partitions until the session timeout expires. If dynamic member, it will re-assign them right away). -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-15932: Wait for responses in consumer operations [kafka]
lianetm commented on PR #14912: URL: https://github.com/apache/kafka/pull/14912#issuecomment-1842006427 Thanks for the changes @AndrewJSchofield. Took a pass of it all and it looks good to me, just a minor comment above. Helpful for all to keep this fix in mind for reviewing other async operations that may not be waiting for responses before continuously enqueuing requests. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-15932: Wait for responses in consumer operations [kafka]
lianetm commented on code in PR #14912: URL: https://github.com/apache/kafka/pull/14912#discussion_r1416580989 ## clients/src/main/java/org/apache/kafka/clients/consumer/internals/OffsetsRequestManager.java: ## @@ -388,11 +385,13 @@ private CompletableFuture buildListOffsetRequestToNode( * {@link org.apache.kafka.clients.consumer.internals.NetworkClientDelegate.UnsentRequest} * that can be polled to obtain the corresponding timestamps and offsets. */ -private List buildListOffsetsRequestsAndResetPositions( +private CompletableFuture sendListOffsetsRequestsAndResetPositions( Review Comment: Let's update the description about the return value that changed -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[PR] Kafka 15662 kip 714 restore [kafka]
mjsax opened a new pull request, #14936: URL: https://github.com/apache/kafka/pull/14936 This PR is stacked on https://github.com/apache/kafka/pull/14922 Only second commit needs a review +++ Part of KIP-714. Add support to collect client instance id of the restore consumer. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-15866:Refactor OffsetFetchRequestState Error handling [kafka]
DL1231 commented on PR #14923: URL: https://github.com/apache/kafka/pull/14923#issuecomment-1841980081 @philipnee PTAL -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-15720: KRaft support in DeleteTopicTest [kafka]
dengziming commented on PR #14846: URL: https://github.com/apache/kafka/pull/14846#issuecomment-1841979753 Hello @tinaselenge , there are some flaky related to this, we should take some time to investigate it, and we should trigger the CI more times to ensure less flaky. https://ci-builds.apache.org/job/Kafka/job/kafka-pr/job/PR-14846/2//#showFailuresLink [Build / JDK 17 and Scala 2.13 / kafka.admin.DeleteTopicTest.testIncreasePartitionCountDuringDeleteTopic()](https://ci-builds.apache.org/job/Kafka/job/kafka-pr/job/PR-14846/2//testReport/junit/kafka.admin/DeleteTopicTest/Build___JDK_17_and_Scala_2_13___testIncreasePartitionCountDuringDeleteTopic__/) [Build / JDK 17 and Scala 2.13 / kafka.admin.DeleteTopicTest.testDisableDeleteTopic(String).quorum=kraft](https://ci-builds.apache.org/job/Kafka/job/kafka-pr/job/PR-14846/2//testReport/junit/kafka.admin/DeleteTopicTest/Build___JDK_17_and_Scala_2_13___testDisableDeleteTopic_String__quorum_kraft/) [Build / JDK 17 and Scala 2.13 / kafka.admin.DeleteTopicTest.executionError](https://ci-builds.apache.org/job/Kafka/job/kafka-pr/job/PR-14846/2//testReport/junit/kafka.admin/DeleteTopicTest/Build___JDK_17_and_Scala_2_13___executionError/) -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-14132; Replace EasyMock with Mockito in StandaloneHerderTest [kafka]
mdedetrich commented on PR #12728: URL: https://github.com/apache/kafka/pull/12728#issuecomment-1841940640 @C0urante @ijuma I have addressed the final comments, let me know if I have missed anything -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-14132; Replace EasyMock with Mockito in StandaloneHerderTest [kafka]
mdedetrich commented on code in PR #12728: URL: https://github.com/apache/kafka/pull/12728#discussion_r1416539486 ## connect/runtime/src/test/java/org/apache/kafka/connect/runtime/standalone/StandaloneHerderTest.java: ## @@ -908,75 +799,58 @@ public void testCorruptConfig() throws Throwable { config.put(ConnectorConfig.NAME_CONFIG, CONNECTOR_NAME); config.put(ConnectorConfig.CONNECTOR_CLASS_CONFIG, BogusSinkConnector.class.getName()); config.put(SinkConnectorConfig.TOPICS_CONFIG, TOPICS_LIST_STR); -Connector connectorMock = PowerMock.createMock(SinkConnector.class); +Connector connectorMock = mock(SinkConnector.class); String error = "This is an error in your config!"; List errors = new ArrayList<>(singletonList(error)); String key = "foo.invalid.key"; -EasyMock.expect(connectorMock.validate(config)).andReturn( +when(connectorMock.validate(config)).thenReturn( new Config( Arrays.asList(new ConfigValue(key, null, Collections.emptyList(), errors)) ) ); ConfigDef configDef = new ConfigDef(); configDef.define(key, ConfigDef.Type.STRING, ConfigDef.Importance.HIGH, ""); - EasyMock.expect(worker.configTransformer()).andReturn(transformer).times(2); -final Capture> configCapture = EasyMock.newCapture(); - EasyMock.expect(transformer.transform(EasyMock.capture(configCapture))).andAnswer(configCapture::getValue); -EasyMock.expect(worker.getPlugins()).andReturn(plugins).times(3); - EasyMock.expect(plugins.connectorLoader(EasyMock.anyString())).andReturn(pluginLoader); - EasyMock.expect(plugins.withClassLoader(pluginLoader)).andReturn(loaderSwap); -EasyMock.expect(worker.getPlugins()).andStubReturn(plugins); - EasyMock.expect(plugins.newConnector(EasyMock.anyString())).andReturn(connectorMock); -EasyMock.expect(connectorMock.config()).andStubReturn(configDef); -loaderSwap.close(); Review Comment: Committed and pushed -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-15760: Disable flaky test testTaskRequestWithOldStartMsGetsUpdated [kafka]
gharris1727 commented on PR #14917: URL: https://github.com/apache/kafka/pull/14917#issuecomment-1841939038 Okay i'll let it keep running, but it appears that the 4-minute timeout has an ~85% fail rate at 20% CPU, and ~0% fail rate at 30% CPU. The 2-minute timeout has a 100% fail rate at 20% and 30% CPU, and a 50% fail rate at 40% CPU. The additional timeout is helping, but stil isn't tolerant of low CPU percentages like the BeforeAll strategy. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-14132; Replace EasyMock with Mockito in StandaloneHerderTest [kafka]
mdedetrich commented on code in PR #12728: URL: https://github.com/apache/kafka/pull/12728#discussion_r1416535578 ## connect/runtime/src/test/java/org/apache/kafka/connect/runtime/standalone/StandaloneHerderTest.java: ## @@ -887,19 +783,14 @@ public void testPutConnectorConfig() throws Exception { assertEquals("bar", capturedConfig.getValue().get("foo")); herder.connectorConfig(CONNECTOR_NAME, connectorConfigCb); Review Comment: Fixed and committed. > It'd probably be better to do away with mocked callbacks and switch to using a `FutureCallback` that we invoke `get` on (like we do with many other tests), but it's up to you if you'd like to implement that or not. I think its better to solve this as another PR/issue. This PR has already been ongoing for long enough and this just appears to be a nice improvement for me. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAKFA-15629: Update upgrade-guide.html for TimestampedKeyQuery and TimestampedRangeQuery [kafka]
mjsax commented on PR #14898: URL: https://github.com/apache/kafka/pull/14898#issuecomment-1841932981 This PR shows merge conflict. Can you rebase it on `trunk`? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[PR] KAFKA-15662: Add support for clientInstanceIds in Kafka Stream [kafka]
mjsax opened a new pull request, #14935: URL: https://github.com/apache/kafka/pull/14935 Part of KIP-714. Add support to collect client instance id of the global consumer. +++ This PR is on top of https://github.com/apache/kafka/pull/14922 Only second commit needs a review. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-15629: Support ResultOrder to TimestampedRangeQuery. [kafka]
hanyuzheng7 commented on code in PR #14907: URL: https://github.com/apache/kafka/pull/14907#discussion_r1416485410 ## streams/src/main/java/org/apache/kafka/streams/query/RangeQuery.java: ## @@ -59,25 +58,25 @@ private RangeQuery(final Optional lower, final Optional upper, final boole * @param The value type */ public static RangeQuery withRange(final K lower, final K upper) { -return new RangeQuery<>(Optional.ofNullable(lower), Optional.ofNullable(upper), true); +return new RangeQuery<>(Optional.ofNullable(lower), Optional.ofNullable(upper), ResultOrder.ANY); } /** - * Determines if the serialized byte[] of the keys in ascending order. + * Determines if the serialized byte[] of the keys in ascending or any order. * Order is based on the serialized byte[] of the keys, not the 'logical' key order. - * @return true if ascending, false otherwise. + * @return return the order of returned records based on the serialized byte[] of the keys(can be unordered, or in ascending or in descending order). Review Comment: But I didn't find where miss the space -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-15629: Support ResultOrder to TimestampedRangeQuery. [kafka]
hanyuzheng7 commented on code in PR #14907: URL: https://github.com/apache/kafka/pull/14907#discussion_r1416484369 ## streams/src/main/java/org/apache/kafka/streams/query/RangeQuery.java: ## @@ -59,25 +58,25 @@ private RangeQuery(final Optional lower, final Optional upper, final boole * @param The value type */ public static RangeQuery withRange(final K lower, final K upper) { -return new RangeQuery<>(Optional.ofNullable(lower), Optional.ofNullable(upper), true); +return new RangeQuery<>(Optional.ofNullable(lower), Optional.ofNullable(upper), ResultOrder.ANY); } /** - * Determines if the serialized byte[] of the keys in ascending order. + * Determines if the serialized byte[] of the keys in ascending or any order. * Order is based on the serialized byte[] of the keys, not the 'logical' key order. - * @return true if ascending, false otherwise. + * @return return the order of returned records based on the serialized byte[] of the keys(can be unordered, or in ascending or in descending order). */ -public boolean isKeyAscending() { -return isKeyAscending; +public ResultOrder resultOrder() { +return order; } /** * Set the query to return the serialized byte[] of the keys in descending order. * Order is based on the serialized byte[] of the keys, not the 'logical' key order. - * @return a new RangeQuery instance with descending flag set. + * @return a new RangeQuery instance with DESCENDING set. */ public RangeQuery withDescendingKeys() { Review Comment: ok -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-15629: Support ResultOrder to TimestampedRangeQuery. [kafka]
hanyuzheng7 commented on code in PR #14907: URL: https://github.com/apache/kafka/pull/14907#discussion_r1416484212 ## streams/src/main/java/org/apache/kafka/streams/query/RangeQuery.java: ## @@ -59,25 +58,25 @@ private RangeQuery(final Optional lower, final Optional upper, final boole * @param The value type */ public static RangeQuery withRange(final K lower, final K upper) { -return new RangeQuery<>(Optional.ofNullable(lower), Optional.ofNullable(upper), true); +return new RangeQuery<>(Optional.ofNullable(lower), Optional.ofNullable(upper), ResultOrder.ANY); } /** - * Determines if the serialized byte[] of the keys in ascending order. + * Determines if the serialized byte[] of the keys in ascending or any order. * Order is based on the serialized byte[] of the keys, not the 'logical' key order. - * @return true if ascending, false otherwise. + * @return return the order of returned records based on the serialized byte[] of the keys(can be unordered, or in ascending or in descending order). */ -public boolean isKeyAscending() { -return isKeyAscending; +public ResultOrder resultOrder() { +return order; } /** * Set the query to return the serialized byte[] of the keys in descending order. * Order is based on the serialized byte[] of the keys, not the 'logical' key order. - * @return a new RangeQuery instance with descending flag set. + * @return a new RangeQuery instance with DESCENDING set. Review Comment: ok ## streams/src/main/java/org/apache/kafka/streams/query/RangeQuery.java: ## @@ -59,25 +58,25 @@ private RangeQuery(final Optional lower, final Optional upper, final boole * @param The value type */ public static RangeQuery withRange(final K lower, final K upper) { -return new RangeQuery<>(Optional.ofNullable(lower), Optional.ofNullable(upper), true); +return new RangeQuery<>(Optional.ofNullable(lower), Optional.ofNullable(upper), ResultOrder.ANY); } /** - * Determines if the serialized byte[] of the keys in ascending order. + * Determines if the serialized byte[] of the keys in ascending or any order. * Order is based on the serialized byte[] of the keys, not the 'logical' key order. - * @return true if ascending, false otherwise. + * @return return the order of returned records based on the serialized byte[] of the keys(can be unordered, or in ascending or in descending order). Review Comment: ok -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-15629: Support ResultOrder to TimestampedRangeQuery. [kafka]
hanyuzheng7 commented on code in PR #14907: URL: https://github.com/apache/kafka/pull/14907#discussion_r1416481880 ## streams/src/main/java/org/apache/kafka/streams/query/TimestampedRangeQuery.java: ## @@ -82,25 +81,25 @@ public static TimestampedRangeQuery withUpperBound(final K upper) { * @param The value type */ public static TimestampedRangeQuery withLowerBound(final K lower) { -return new TimestampedRangeQuery<>(Optional.of(lower), Optional.empty(), true); +return new TimestampedRangeQuery<>(Optional.of(lower), Optional.empty(), ResultOrder.ANY); } /** * Determines if the serialized byte[] of the keys in ascending order. * Order is based on the serialized byte[] of the keys, not the 'logical' key order. - * @return true if ascending, false otherwise. + * @return return the order of return records base on the serialized byte[] of the keys(can be unordered, or in ascending, or in descending order). */ -public boolean isKeyAscending() { -return isKeyAscending; +public ResultOrder resultOrder() { +return order; } /** * Set the query to return the serialized byte[] of the keys in descending order. * Order is based on the serialized byte[] of the keys, not the 'logical' key order. - * @return a new RangeQuery instance with descending flag set. + * @return a new RangeQuery instance with DESCENDING set. */ public TimestampedRangeQuery withDescendingKeys() { Review Comment: ok ## streams/src/main/java/org/apache/kafka/streams/query/TimestampedRangeQuery.java: ## @@ -82,25 +81,25 @@ public static TimestampedRangeQuery withUpperBound(final K upper) { * @param The value type */ public static TimestampedRangeQuery withLowerBound(final K lower) { -return new TimestampedRangeQuery<>(Optional.of(lower), Optional.empty(), true); +return new TimestampedRangeQuery<>(Optional.of(lower), Optional.empty(), ResultOrder.ANY); } /** * Determines if the serialized byte[] of the keys in ascending order. * Order is based on the serialized byte[] of the keys, not the 'logical' key order. - * @return true if ascending, false otherwise. + * @return return the order of return records base on the serialized byte[] of the keys(can be unordered, or in ascending, or in descending order). */ -public boolean isKeyAscending() { -return isKeyAscending; +public ResultOrder resultOrder() { +return order; } /** * Set the query to return the serialized byte[] of the keys in descending order. * Order is based on the serialized byte[] of the keys, not the 'logical' key order. - * @return a new RangeQuery instance with descending flag set. + * @return a new RangeQuery instance with DESCENDING set. Review Comment: ok -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-15629: Support ResultOrder to TimestampedRangeQuery. [kafka]
hanyuzheng7 commented on code in PR #14907: URL: https://github.com/apache/kafka/pull/14907#discussion_r1416481604 ## streams/src/main/java/org/apache/kafka/streams/state/internals/MeteredTimestampedKeyValueStore.java: ## @@ -205,12 +206,12 @@ private QueryResult runTimestampedRangeQuery(final Query query, final QueryResult result; final TimestampedRangeQuery typedQuery = (TimestampedRangeQuery) query; RangeQuery rawRangeQuery; -final boolean isKeyAscending = typedQuery.isKeyAscending(); +ResultOrder order = typedQuery.resultOrder(); rawRangeQuery = RangeQuery.withRange( keyBytes(typedQuery.lowerBound().orElse(null)), keyBytes(typedQuery.upperBound().orElse(null)) ); -if (!isKeyAscending) { +if (order.equals(ResultOrder.DESCENDING)) { Review Comment: https://github.com/apache/kafka/pull/14907#discussion_r1416481443 ## streams/src/main/java/org/apache/kafka/streams/state/internals/MeteredTimestampedKeyValueStore.java: ## @@ -268,12 +269,12 @@ private QueryResult runRangeQuery(final Query query, final QueryResult result; final RangeQuery typedQuery = (RangeQuery) query; RangeQuery rawRangeQuery; -final boolean isKeyAscending = typedQuery.isKeyAscending(); +ResultOrder order = typedQuery.resultOrder(); rawRangeQuery = RangeQuery.withRange( keyBytes(typedQuery.getLowerBound().orElse(null)), keyBytes(typedQuery.getUpperBound().orElse(null)) ); -if (!isKeyAscending) { +if (order.equals(ResultOrder.DESCENDING)) { Review Comment: https://github.com/apache/kafka/pull/14907#discussion_r1416481443 -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-15629: Support ResultOrder to TimestampedRangeQuery. [kafka]
hanyuzheng7 commented on code in PR #14907: URL: https://github.com/apache/kafka/pull/14907#discussion_r1416481443 ## streams/src/main/java/org/apache/kafka/streams/state/internals/MeteredKeyValueStore.java: ## @@ -254,12 +255,12 @@ private QueryResult runRangeQuery(final Query query, final QueryResult result; final RangeQuery typedQuery = (RangeQuery) query; RangeQuery rawRangeQuery; -final boolean isKeyAscending = typedQuery.isKeyAscending(); +final ResultOrder order = typedQuery.resultOrder(); rawRangeQuery = RangeQuery.withRange( keyBytes(typedQuery.getLowerBound().orElse(null)), keyBytes(typedQuery.getUpperBound().orElse(null)) ); -if (!isKeyAscending) { +if (order.equals(ResultOrder.DESCENDING)) { Review Comment: Given that 'ANY' and 'ASCENDING' are treated the same, except for 'ResultOrder.DESCENDING', should a separate statement be used to handle the 'ASCENDING' case? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-15426: Process and persist directory assignments [kafka]
soarez commented on code in PR #14863: URL: https://github.com/apache/kafka/pull/14863#discussion_r1416477136 ## metadata/src/main/java/org/apache/kafka/controller/ReplicationControlManager.java: ## @@ -2019,6 +2021,70 @@ ListPartitionReassignmentsResponseData listPartitionReassignments( return response; } +ControllerResult handleAssignReplicasToDirs(AssignReplicasToDirsRequestData request) { +int brokerId = request.brokerId(); +clusterControl.checkBrokerEpoch(brokerId, request.brokerEpoch()); +BrokerRegistration brokerRegistration = clusterControl.brokerRegistrations().get(brokerId); +List records = new ArrayList<>(); +AssignReplicasToDirsResponseData response = new AssignReplicasToDirsResponseData(); +Set leaderAndIsrUpdates = new HashSet<>(); +for (AssignReplicasToDirsRequestData.DirectoryData reqDir : request.directories()) { +Uuid dirId = reqDir.id(); +AssignReplicasToDirsResponseData.DirectoryData resDir = new AssignReplicasToDirsResponseData.DirectoryData().setId(dirId); +for (AssignReplicasToDirsRequestData.TopicData reqTopic : reqDir.topics()) { +Uuid topicId = reqTopic.topicId(); +Errors topicError = Errors.NONE; +TopicControlInfo topicControl = this.topics.get(topicId); +if (topicControl == null) { +log.warn("AssignReplicasToDirsRequest from broker {} references unknown topic ID {}", brokerId, topicId); +topicError = Errors.UNKNOWN_TOPIC_ID; +} +AssignReplicasToDirsResponseData.TopicData resTopic = new AssignReplicasToDirsResponseData.TopicData().setTopicId(topicId); +for (AssignReplicasToDirsRequestData.PartitionData reqPartition : reqTopic.partitions()) { +int partitionIndex = reqPartition.partitionIndex(); +Errors partitionError = topicError; +if (topicError == Errors.NONE) { +String topicName = topicControl.name; +PartitionRegistration partitionRegistration = topicControl.parts.get(partitionIndex); +if (partitionRegistration == null) { +log.warn("AssignReplicasToDirsRequest from broker {} references unknown partition {}-{}", brokerId, topicName, partitionIndex); +partitionError = Errors.UNKNOWN_TOPIC_OR_PARTITION; +} else if (!Replicas.contains(partitionRegistration.replicas, brokerId)) { +log.warn("AssignReplicasToDirsRequest from broker {} references non assigned partition {}-{}", brokerId, topicName, partitionIndex); +partitionError = Errors.NOT_LEADER_OR_FOLLOWER; +} else { +Optional partitionChangeRecord = new PartitionChangeBuilder( +partitionRegistration, +topicId, +partitionIndex, +new LeaderAcceptor(clusterControl, partitionRegistration), +featureControl.metadataVersion(), +getTopicEffectiveMinIsr(topicName) +) +.setDirectory(brokerId, dirId) +.setDefaultDirProvider(clusterDescriber) +.build(); +partitionChangeRecord.ifPresent(records::add); + +if (!brokerRegistration.hasOnlineDir(dirId)) { Review Comment: Good point -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-15361: Process and persist dir info with broker registration [kafka]
soarez commented on PR #14838: URL: https://github.com/apache/kafka/pull/14838#issuecomment-1841864233 @cmccabe: > I didn't understand the purpose of the HeartbeatManager changes (now that we've agreed to keep Directory out of HBM). Seems like we should be able to keep this file the same now? Or if not, can we change it in a separate PR where we have some rationale? The changes aren't necessary, but they are an improvement. I've moved them to a separate PR: #14934 -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[PR] MINOR: register before touch in BrokerHeartbeatManager [kafka]
soarez opened a new pull request, #14934: URL: https://github.com/apache/kafka/pull/14934 BrokerHeartbeatManager has a confusing API where it tolerates `touch(brokerId, fenced, metadataOffset)` without a previous call to `register(brokerId, fenced)`. ### Committer Checklist (excluded from commit message) - [ ] Verify design and implementation - [ ] Verify test coverage and CI build status - [ ] Verify documentation (including upgrade notes) -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-15361: Process and persist dir info with broker registration [kafka]
soarez commented on code in PR #14838: URL: https://github.com/apache/kafka/pull/14838#discussion_r1416458768 ## metadata/src/test/java/org/apache/kafka/controller/ClusterControlManagerTest.java: ## @@ -272,8 +274,17 @@ public void testRegistrationWithIncorrectClusterId() { (short) 1)); } +private static Stream metadataVersions() { +return Stream.of( +MetadataVersion.IBP_3_3_IV2, +MetadataVersion.IBP_3_3_IV3, +MetadataVersion.LATEST_PRODUCTION, Review Comment: Replaced `LATEST_PRODUCTION` with `IBP_3_7_IV2`. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-15361: Process and persist dir info with broker registration [kafka]
soarez commented on code in PR #14838: URL: https://github.com/apache/kafka/pull/14838#discussion_r1416458244 ## metadata/src/main/java/org/apache/kafka/controller/ClusterControlManager.java: ## @@ -354,6 +357,25 @@ public ControllerResult registerBroker( throw new BrokerIdNotRegisteredException("Controller is in pre-migration mode and cannot register KRaft " + "brokers until the metadata migration is complete."); } + +if (featureControl.metadataVersion().isDirectoryAssignmentSupported()) { +Set set = new HashSet<>(request.logDirs()); +if (set.stream().anyMatch(DirectoryId::reserved)) { +throw new InvalidRequestException("Reserved directory ID in request"); +} +if (set.size() != request.logDirs().size()) { +throw new InvalidRequestException("Duplicate directory ID in request"); +} +for (BrokerRegistration registration : brokerRegistrations().values()) { Review Comment: @cmccabe: > InvalidRequestException seems too generic I've introduced InvalidRegistrationException to handle these. > Also, we shouldn't be checking the previous registration for this specific broker ID. I forgot that. Thanks for pointing this out! > we should just have a TimelineHashMap that maps log dir UUID to the broker that has it Great suggestion. Added. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-15871: kafka-client-metrics.sh [kafka]
junrao commented on code in PR #14926: URL: https://github.com/apache/kafka/pull/14926#discussion_r1416416581 ## tools/src/main/java/org/apache/kafka/tools/ClientMetricsCommand.java: ## @@ -0,0 +1,377 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.kafka.tools; + +import joptsimple.ArgumentAcceptingOptionSpec; +import joptsimple.OptionSpec; +import joptsimple.OptionSpecBuilder; + +import org.apache.kafka.clients.admin.Admin; +import org.apache.kafka.clients.admin.AdminClientConfig; +import org.apache.kafka.clients.admin.AlterConfigOp; +import org.apache.kafka.clients.admin.AlterConfigsOptions; +import org.apache.kafka.clients.admin.ClientMetricsResourceListing; +import org.apache.kafka.clients.admin.Config; +import org.apache.kafka.clients.admin.ConfigEntry; +import org.apache.kafka.common.Uuid; +import org.apache.kafka.common.config.ConfigResource; +import org.apache.kafka.common.utils.Exit; +import org.apache.kafka.common.utils.Utils; +import org.apache.kafka.server.util.CommandDefaultOptions; +import org.apache.kafka.server.util.CommandLineUtils; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.io.IOException; +import java.util.Arrays; +import java.util.Collection; +import java.util.Collections; +import java.util.HashMap; +import java.util.List; +import java.util.Map; +import java.util.Optional; +import java.util.Properties; +import java.util.concurrent.ExecutionException; +import java.util.concurrent.TimeUnit; +import java.util.stream.Collectors; + +public class ClientMetricsCommand { +private static final Logger LOG = LoggerFactory.getLogger(ClientMetricsCommand.class); + +public static void main(String... args) { +Exit.exit(mainNoExit(args)); +} + +static int mainNoExit(String... args) { +try { +execute(args); +return 0; +} catch (Throwable e) { +System.err.println(e.getMessage()); +System.err.println(Utils.stackTrace(e)); +return 1; +} +} + +static void execute(String... args) throws Exception { +ClientMetricsCommandOptions opts = new ClientMetricsCommandOptions(args); + +Properties config = opts.commandConfig(); +config.put(AdminClientConfig.BOOTSTRAP_SERVERS_CONFIG, opts.bootstrapServer()); + +ClientMetricsService service = new ClientMetricsService(config); +int exitCode = 0; +try { +if (opts.hasAlterOption()) { +service.alterClientMetrics(opts); +} else if (opts.hasDescribeOption()) { +service.describeClientMetrics(opts); +} else if (opts.hasDeleteOption()) { +service.deleteClientMetrics(opts); +} else if (opts.hasListOption()) { +service.listClientMetrics(opts); +} +} catch (ExecutionException e) { +Throwable cause = e.getCause(); +if (cause != null) { +printException(cause); +} else { +printException(e); +} +exitCode = 1; +} catch (Throwable t) { +printException(t); +exitCode = 1; +} finally { +service.close(); +Exit.exit(exitCode); +} +} + +public static class ClientMetricsService implements AutoCloseable { +private Admin adminClient; + +public ClientMetricsService(Properties config) { +this.adminClient = Admin.create(config); +} + +// Visible for testing +ClientMetricsService(Admin adminClient) { +this.adminClient = adminClient; +} + +public void alterClientMetrics(ClientMetricsCommandOptions opts) throws Exception { +String entityName = opts.hasGenerateNameOption() ? Uuid.randomUuid().toString() : opts.name().get(); + +Map configsToBeSet = new HashMap<>(); +opts.interval().map(intervalVal -> configsToBeSet.put("interval.ms", intervalVal.toString())); +opts.metrics().map(metricslist -> configsToBeSet.put("metrics", metricslist.stream().collect(Collectors.joining(","; +
Re: [PR] [MINOR-PR] Enable telemetry APIs and logs suppression (KIP-714) [kafka]
wcarlson5 merged PR #14928: URL: https://github.com/apache/kafka/pull/14928 -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] Kafka-14748: Relax non-null FK left-join requirement [kafka]
wcarlson5 merged PR #14107: URL: https://github.com/apache/kafka/pull/14107 -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] Kafka-14748: Relax non-null FK left-join requirement [kafka]
wcarlson5 commented on PR #14107: URL: https://github.com/apache/kafka/pull/14107#issuecomment-1841851229 I'm not sure, either option for the optimization is fine with me as long as it's well documented. I'm good with how the PR is for now. I'm going to merge it to get it in before feature freeze for 3.7 -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-15696: Refactor AsyncConsumer close procedure [kafka]
kirktrue commented on code in PR #14920: URL: https://github.com/apache/kafka/pull/14920#discussion_r1416404354 ## clients/src/main/java/org/apache/kafka/clients/consumer/internals/AsyncKafkaConsumer.java: ## @@ -957,6 +966,57 @@ private void close(Duration timeout, boolean swallowException) { } } +/** + * Prior to closing the network thread, we need to make sure the following operations happen in the right sequence: + * 1. autocommit offsets + * 2. revoke all partitions + */ +private void prepareShutdown(final Timer timer) { +if (!groupMetadata.isPresent()) +return; + +maybeAutoCommitSync(timer); +timer.update(); +if (!subscriptions.hasAutoAssignedPartitions() || subscriptions.assignedPartitions().isEmpty()) +return; + +try { +// If the consumer is in a group, we will pause and revoke all assigned partitions +onLeavePrepare().get(timer.remainingMs(), TimeUnit.MILLISECONDS); +timer.update(); +} catch (Exception e) { +Exception exception = e; +if (e instanceof ExecutionException) +exception = (Exception) e.getCause(); +throw new KafkaException("User rebalance callback throws an error", exception); +} finally { +subscriptions.assignFromSubscribed(Collections.emptySet()); +} +} + +private void maybeAutoCommitSync(final Timer timer) { +if (autoCommitEnabled) { +Map allConsumed = subscriptions.allConsumed(); +try { +log.debug("Sending synchronous auto-commit of offsets {} on closing", allConsumed); +commitSync(allConsumed, Duration.ofMillis(timer.remainingMs())); +} catch (Exception e) { +// consistent with async auto-commit failures, we do not propagate the exception +log.warn("Synchronous auto-commit of offsets {} failed: {}", allConsumed, e.getMessage()); +} +} +} + +private CompletableFuture onLeavePrepare() { +SortedSet droppedPartitions = new TreeSet<>(MembershipManagerImpl.TOPIC_PARTITION_COMPARATOR); +droppedPartitions.addAll(subscriptions.assignedPartitions()); +if (!subscriptions.hasAutoAssignedPartitions() || droppedPartitions.isEmpty()) { +return CompletableFuture.completedFuture(null); +} +// TODO: Invoke rebalanceListener via KAFKA-15276 +return CompletableFuture.completedFuture(null); +} + Review Comment: I would imagine that the leave group process could be mostly performed using an event and a callback execution, like in #14931. We'd need to submit an event to the background thread (e.g. `PrepareCloseEvent`) so that the member manager can orchestrate the callback request and the heartbeat request. ## clients/src/main/java/org/apache/kafka/clients/consumer/internals/AsyncKafkaConsumer.java: ## @@ -931,15 +941,14 @@ private void close(Duration timeout, boolean swallowException) { final Timer closeTimer = time.timer(timeout); clientTelemetryReporter.ifPresent(reporter -> reporter.initiateClose(timeout.toMillis())); closeTimer.update(); - +// Prepare shutting down the network thread +swallow(log, Level.ERROR, "Unexpected exception when preparing for shutdown", () -> prepareShutdown(closeTimer), firstException); +closeTimer.update(); if (applicationEventHandler != null) -closeQuietly(() -> applicationEventHandler.close(Duration.ofMillis(closeTimer.remainingMs())), "Failed to close application event handler with a timeout(ms)=" + closeTimer.remainingMs(), firstException); - -// Invoke all callbacks after the background thread exists in case if there are unsent async -// commits -maybeInvokeCommitCallbacks(); - -closeQuietly(fetchBuffer, "Failed to close the fetch buffer", firstException); +closeQuietly(() -> applicationEventHandler.close(Duration.ofMillis(closeTimer.remainingMs())), "Failed shutting down network thread", firstException); +closeTimer.update(); +// Ensure all async commit callbacks are invoked +swallow(log, Level.ERROR, "Failed invoking asynchronous commit callback", this::maybeInvokeCommitCallbacks, firstException); Review Comment: These should be done before we shut down the network, right? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-15760: Disable flaky test testTaskRequestWithOldStartMsGetsUpdated [kafka]
gharris1727 commented on PR #14917: URL: https://github.com/apache/kafka/pull/14917#issuecomment-1841810754 I did some stress-testing on my "BeforeAll" fix overnight and found a new failure mode ``` org.opentest4j.AssertionFailedError: expected: <{"state":"RUNNING","spec":{"class":"org.apache.kafka.trogdor.task.NoOpTaskSpec","startMs":552,"durationMs":500},"startedMs":552,"status":"receiving"}> but was: <{"state":"RUNNING","spec":{"class":"org.apache.kafka.trogdor.task.NoOpTaskSpec","startMs":552,"durationMs":500},"startedMs":552,"status":"active"}> ``` This occurred in 29 of 7531 runs (0.38% chance) at 20% CPU. I'm going to re-run the test with your branch to see how effective the 4-minute timeout is. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-15426: Process and persist directory assignments [kafka]
rondagostino commented on code in PR #14863: URL: https://github.com/apache/kafka/pull/14863#discussion_r1416193447 ## core/src/test/scala/unit/kafka/server/ControllerApisTest.scala: ## @@ -1139,31 +1139,24 @@ class ControllerApisTest { } @Test - def testAssignReplicasToDirsReturnsUnsupportedVersion(): Unit = { + def testAssignReplicasToDirs(): Unit = { val controller = mock(classOf[Controller]) -val controllerApis = createControllerApis(None, controller) +val authorizer = mock(classOf[Authorizer]) +val controllerApis = createControllerApis(Some(authorizer), controller) + +val request = new AssignReplicasToDirsRequest.Builder(new AssignReplicasToDirsRequestData()).build() + +when(authorizer.authorize(any[RequestContext], ArgumentMatchers.eq(Collections.singletonList(new Action( + AclOperation.CLUSTER_ACTION, + new ResourcePattern(ResourceType.CLUSTER, Resource.CLUSTER_NAME, PatternType.LITERAL), + 1, true, true +) + .thenReturn(Collections.singletonList(AuthorizationResult.ALLOWED)) +when(controller.assignReplicasToDirs(any[ControllerRequestContext], ArgumentMatchers.eq(request.data))) + .thenThrow(Errors.UNKNOWN_TOPIC_OR_PARTITION.exception()) Review Comment: Need to return something like `completableFuture.completeExceptionally()` here as opposed to throwing an exception directly. ## metadata/src/main/java/org/apache/kafka/controller/ReplicationControlManager.java: ## @@ -2019,6 +2021,70 @@ ListPartitionReassignmentsResponseData listPartitionReassignments( return response; } +ControllerResult handleAssignReplicasToDirs(AssignReplicasToDirsRequestData request) { +int brokerId = request.brokerId(); +clusterControl.checkBrokerEpoch(brokerId, request.brokerEpoch()); +BrokerRegistration brokerRegistration = clusterControl.brokerRegistrations().get(brokerId); +List records = new ArrayList<>(); +AssignReplicasToDirsResponseData response = new AssignReplicasToDirsResponseData(); +Set leaderAndIsrUpdates = new HashSet<>(); +for (AssignReplicasToDirsRequestData.DirectoryData reqDir : request.directories()) { +Uuid dirId = reqDir.id(); +AssignReplicasToDirsResponseData.DirectoryData resDir = new AssignReplicasToDirsResponseData.DirectoryData().setId(dirId); +for (AssignReplicasToDirsRequestData.TopicData reqTopic : reqDir.topics()) { +Uuid topicId = reqTopic.topicId(); +Errors topicError = Errors.NONE; +TopicControlInfo topicControl = this.topics.get(topicId); Review Comment: nit: `topicControlInfo` or `topicInfo` might be a better name than `topicControl` -- get the "Info" in there somehow. ## metadata/src/main/java/org/apache/kafka/controller/ReplicationControlManager.java: ## @@ -2019,6 +2021,70 @@ ListPartitionReassignmentsResponseData listPartitionReassignments( return response; } +ControllerResult handleAssignReplicasToDirs(AssignReplicasToDirsRequestData request) { +int brokerId = request.brokerId(); +clusterControl.checkBrokerEpoch(brokerId, request.brokerEpoch()); +BrokerRegistration brokerRegistration = clusterControl.brokerRegistrations().get(brokerId); +List records = new ArrayList<>(); +AssignReplicasToDirsResponseData response = new AssignReplicasToDirsResponseData(); +Set leaderAndIsrUpdates = new HashSet<>(); +for (AssignReplicasToDirsRequestData.DirectoryData reqDir : request.directories()) { +Uuid dirId = reqDir.id(); +AssignReplicasToDirsResponseData.DirectoryData resDir = new AssignReplicasToDirsResponseData.DirectoryData().setId(dirId); +for (AssignReplicasToDirsRequestData.TopicData reqTopic : reqDir.topics()) { +Uuid topicId = reqTopic.topicId(); +Errors topicError = Errors.NONE; +TopicControlInfo topicControl = this.topics.get(topicId); +if (topicControl == null) { +log.warn("AssignReplicasToDirsRequest from broker {} references unknown topic ID {}", brokerId, topicId); +topicError = Errors.UNKNOWN_TOPIC_ID; +} +AssignReplicasToDirsResponseData.TopicData resTopic = new AssignReplicasToDirsResponseData.TopicData().setTopicId(topicId); +for (AssignReplicasToDirsRequestData.PartitionData reqPartition : reqTopic.partitions()) { +int partitionIndex = reqPartition.partitionIndex(); +Errors partitionError = topicError; +if (topicError == Errors.NONE) { +String topicName = topicControl.name; +PartitionRegistration partitionRegistration = topicControl.parts.get(partitionIndex); +if
Re: [PR] MINOR: ConsumerRebalanceListenerInvoker class and method visibility [kafka]
kirktrue commented on PR #14931: URL: https://github.com/apache/kafka/pull/14931#issuecomment-1841792685 Four greys. Restarting build again. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] MINOR: ConsumerRebalanceListenerInvoker class and method visibility [kafka]
kirktrue closed pull request #14931: MINOR: ConsumerRebalanceListenerInvoker class and method visibility URL: https://github.com/apache/kafka/pull/14931 -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-15818: ensure leave group on max poll interval [kafka]
philipnee commented on PR #14873: URL: https://github.com/apache/kafka/pull/14873#issuecomment-1841786386 Hi @lianetm @cadonna @lucasbru - Addressed your comments. I've left a question around sending leave group for static member. From reading the KIP, seems like we should do it. I think we need further clarification. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-15818: ensure leave group on max poll interval [kafka]
philipnee commented on code in PR #14873: URL: https://github.com/apache/kafka/pull/14873#discussion_r1416388581 ## clients/src/main/java/org/apache/kafka/clients/consumer/internals/HeartbeatRequestManager.java: ## @@ -103,6 +103,12 @@ public class HeartbeatRequestManager implements RequestManager { */ private final BackgroundEventHandler backgroundEventHandler; +/** + * Timer for tracking the time since the last consumer poll. If the timer expires, the consumer will stop + * sending heartbeat until the next poll. + */ Review Comment: I think you meant : only when `group.instance.id` is not set, i.e. dynamic member? This is the current implementation ``` // Starting from 2.3, only dynamic members will send LeaveGroupRequest to the broker, // consumer with valid group.instance.id is viewed as static member that never sends LeaveGroup, // and the membership expiration is only controlled by session timeout. if (isDynamicMember() && !coordinatorUnknown() && state != MemberState.UNJOINED && generation.hasMemberId()) { // this is a minimal effort attempt to leave the group. we do not // attempt any resending if the request fails or times out. log.info("Member {} sending LeaveGroup request to coordinator {} due to {}", generation.memberId, coordinator, leaveReason); LeaveGroupRequest.Builder request = new LeaveGroupRequest.Builder( rebalanceConfig.groupId, Collections.singletonList(new MemberIdentity().setMemberId(generation.memberId).setReason(JoinGroupRequest.maybeTruncateReason(leaveReason))) ); future = client.send(coordinator, request).compose(new LeaveGroupResponseHandler(generation)); client.pollNoWakeup(); } ``` -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-15818: ensure leave group on max poll interval [kafka]
philipnee commented on code in PR #14873: URL: https://github.com/apache/kafka/pull/14873#discussion_r1416383477 ## clients/src/main/java/org/apache/kafka/clients/consumer/internals/HeartbeatRequestManager.java: ## @@ -103,6 +103,12 @@ public class HeartbeatRequestManager implements RequestManager { */ private final BackgroundEventHandler backgroundEventHandler; +/** + * Timer for tracking the time since the last consumer poll. If the timer expires, the consumer will stop + * sending heartbeat until the next poll. + */ Review Comment: well, i'm not sure actually. According to KIP-848: `Static membership, introduced in KIP-345, is still supported by this new rebalance protocol. When a member wants to leave temporary – e.g. while being bounced – it should send an heartbeat with a member epoch equals to -2. This signals to the group coordinator that the member left but will rejoin within the session timeout. When the member rejoins with the same instance ID, the group coordinator replaces the old member by the new member and gives back its current assignment.` ## clients/src/main/java/org/apache/kafka/clients/consumer/internals/HeartbeatRequestManager.java: ## @@ -103,6 +103,12 @@ public class HeartbeatRequestManager implements RequestManager { */ private final BackgroundEventHandler backgroundEventHandler; +/** + * Timer for tracking the time since the last consumer poll. If the timer expires, the consumer will stop + * sending heartbeat until the next poll. + */ Review Comment: well, i'm not sure actually. According to KIP-848: `Static membership, introduced in KIP-345, is still supported by this new rebalance protocol. When a member wants to leave temporary – e.g. while being bounced – it should send an heartbeat with a member epoch equals to -2. This signals to the group coordinator that the member left but will rejoin within the session timeout. When the member rejoins with the same instance ID, the group coordinator replaces the old member by the new member and gives back its current assignment.` -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-15818: ensure leave group on max poll interval [kafka]
philipnee commented on code in PR #14873: URL: https://github.com/apache/kafka/pull/14873#discussion_r1416380771 ## clients/src/test/java/org/apache/kafka/clients/consumer/internals/HeartbeatRequestManagerTest.java: ## @@ -399,14 +400,45 @@ public void testHeartbeatState() { new ConsumerGroupHeartbeatResponseData.Assignment(); assignmentTopic1.setTopicPartitions(Collections.singletonList(tpTopic1)); ConsumerGroupHeartbeatResponse rs1 = new ConsumerGroupHeartbeatResponse(new ConsumerGroupHeartbeatResponseData() -.setHeartbeatIntervalMs(DEFAULT_HEARTBEAT_INTERVAL_MS) -.setMemberId(memberId) -.setMemberEpoch(1) -.setAssignment(assignmentTopic1)); +.setHeartbeatIntervalMs(DEFAULT_HEARTBEAT_INTERVAL_MS) +.setMemberId(memberId) +.setMemberEpoch(1) +.setAssignment(assignmentTopic1)); membershipManager.onHeartbeatResponseReceived(rs1.data()); assertEquals(MemberState.RECONCILING, membershipManager.state()); } +@Test +public void testEnsureLeaveGroupWhenPollTimerExpires() { +membershipManager.transitionToJoining(); Review Comment: it is not possible to transition to fenced from unsubscribed -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-15818: ensure leave group on max poll interval [kafka]
philipnee commented on code in PR #14873: URL: https://github.com/apache/kafka/pull/14873#discussion_r1416378676 ## clients/src/main/java/org/apache/kafka/clients/consumer/internals/HeartbeatRequestManager.java: ## @@ -198,6 +219,17 @@ public long maximumTimeToWait(long currentTimeMs) { return heartbeatNow ? 0L : heartbeatRequestState.nextHeartbeatMs(currentTimeMs); } +/** + * When consumer polls, we need to reset the pollTimer. If member is already leaving the group + */ +public void ack() { Review Comment: The code is updated - but the pollTimer is updated on a regular cadence by the HeartbeatRequestManager.poll(). this func is used to check if the timer has been expired. If it was, then we reset the timer everytime we poll the consumer. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-15684: Support to describe all client metrics resources (KIP-714) [kafka]
junrao commented on code in PR #14933: URL: https://github.com/apache/kafka/pull/14933#discussion_r1416359770 ## core/src/main/scala/kafka/server/KafkaApis.scala: ## @@ -3810,16 +3811,27 @@ class KafkaApis(val requestChannel: RequestChannel, } } - // Just a placeholder for now. def handleListClientMetricsResources(request: RequestChannel.Request): Unit = { val listClientMetricsResourcesRequest = request.body[ListClientMetricsResourcesRequest] if (!authHelper.authorize(request.context, DESCRIBE_CONFIGS, CLUSTER, CLUSTER_NAME)) { requestHelper.sendMaybeThrottle(request, listClientMetricsResourcesRequest.getErrorResponse(Errors.CLUSTER_AUTHORIZATION_FAILED.exception)) } else { - // Just return an empty list in the placeholder - val data = new ListClientMetricsResourcesResponseData() - requestHelper.sendMaybeThrottle(request, new ListClientMetricsResourcesResponse(data)) + clientMetricsManager match { +case Some(metricsManager) => + try { +val data = new ListClientMetricsResourcesResponseData().setClientMetricsResources( + metricsManager.listClientMetricsResources.asScala.map( +name => new ClientMetricsResource().setName(name)).toList.asJava) +requestHelper.sendMaybeThrottle(request, new ListClientMetricsResourcesResponse(data)) + } catch { +case _: Exception => Review Comment: Do we need this? `KafkaApis` has a generic way to handle unexpected errors through `handleError`. ## core/src/test/scala/unit/kafka/admin/ConfigCommandTest.scala: ## @@ -1690,13 +1690,33 @@ class ConfigCommandTest extends Logging { } @Test - def shouldNotDescribeClientMetricsConfigWithoutEntityName(): Unit = { + def shouldDescribeClientMetricsConfigWithoutEntityName(): Unit = { val describeOpts = new ConfigCommandOptions(Array("--bootstrap-server", "localhost:9092", "--entity-type", "client-metrics", "--describe")) -val exception = assertThrows(classOf[IllegalArgumentException], () => describeOpts.checkArgs()) -assertEquals("an entity name must be specified with --describe of client-metrics", exception.getMessage) +val resourceCustom = new ConfigResource(ConfigResource.Type.CLIENT_METRICS, "1") +val configEntry = new ConfigEntry("metrics", "*") +val future = new KafkaFutureImpl[util.Map[ConfigResource, Config]] +val describeResult: DescribeConfigsResult = mock(classOf[DescribeConfigsResult]) +when(describeResult.all()).thenReturn(future) + +val node = new Node(1, "localhost", 9092) +val mockAdminClient = new MockAdminClient(util.Collections.singletonList(node), node) { + override def describeConfigs(resources: util.Collection[ConfigResource], options: DescribeConfigsOptions): DescribeConfigsResult = { Review Comment: Hmm, this should call `listClientMetricsResources`, not `describeConfigs`, right? ## core/src/test/scala/unit/kafka/server/KafkaApisTest.scala: ## @@ -6922,4 +6923,58 @@ class KafkaApisTest { val expectedResponse = new PushTelemetryResponseData().setErrorCode(Errors.INVALID_REQUEST.code) assertEquals(expectedResponse, response.data) } + + @Test + def testListClientMetricsResourcesNotAllowedForZkClusters(): Unit = { +val request = buildRequest(new ListClientMetricsResourcesRequest.Builder(new ListClientMetricsResourcesRequestData()).build()) +createKafkaApis(enableForwarding = true).handle(request, RequestLocal.NoCaching) + +val response = verifyNoThrottling[ListClientMetricsResourcesResponse](request) +assertEquals(Errors.UNKNOWN_SERVER_ERROR, Errors.forCode(response.data.errorCode)) Review Comment: Hmm, the code seems to set the error to `UNSUPPORTED_VERSION`? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Updated] (KAFKA-15816) Typos in tests leak network sockets
[ https://issues.apache.org/jira/browse/KAFKA-15816?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Greg Harris updated KAFKA-15816: Description: There are a few tests which leak network sockets due to small typos in the tests themselves. Clients: [https://github.com/apache/kafka/pull/14750] (DONE) * NioEchoServer * KafkaConsumerTest * KafkaProducerTest * SelectorTest * SslTransportLayerTest * SslTransportTls12Tls13Test * SslVersionsTransportLayerTest * SaslAuthenticatorTest Core: [https://github.com/apache/kafka/pull/14754] * MiniKdc * GssapiAuthenticationTest * MirrorMakerIntegrationTest * SocketServerTest * EpochDrivenReplicationProtocolAcceptanceTest * LeaderEpochIntegrationTest Trogdor: [https://github.com/apache/kafka/pull/14771] * AgentTest Mirror: [https://github.com/apache/kafka/pull/14761] * DedicatedMirrorIntegrationTest * MirrorConnectorsIntegrationTest * MirrorConnectorsWithCustomForwardingAdminIntegrationTest Runtime: [https://github.com/apache/kafka/pull/14764] * ConnectorTopicsIntegrationTest * ExactlyOnceSourceIntegrationTest * WorkerTest * WorkerGroupMemberTest Streams: [https://github.com/apache/kafka/pull/14769] (DONE) * IQv2IntegrationTest * MetricsReporterIntegrationTest * NamedTopologyIntegrationTest * PurgeRepartitionTopicIntegrationTest These can be addressed by just fixing the tests. was: There are a few tests which leak network sockets due to small typos in the tests themselves. Clients: [https://github.com/apache/kafka/pull/14750] * NioEchoServer * KafkaConsumerTest * KafkaProducerTest * SelectorTest * SslTransportLayerTest * SslTransportTls12Tls13Test * SslVersionsTransportLayerTest * SaslAuthenticatorTest Core: [https://github.com/apache/kafka/pull/14754] * MiniKdc * GssapiAuthenticationTest * MirrorMakerIntegrationTest * SocketServerTest * EpochDrivenReplicationProtocolAcceptanceTest * LeaderEpochIntegrationTest Trogdor: [https://github.com/apache/kafka/pull/14771] * AgentTest Mirror: [https://github.com/apache/kafka/pull/14761] * DedicatedMirrorIntegrationTest * MirrorConnectorsIntegrationTest * MirrorConnectorsWithCustomForwardingAdminIntegrationTest Runtime: [https://github.com/apache/kafka/pull/14764] * ConnectorTopicsIntegrationTest * ExactlyOnceSourceIntegrationTest * WorkerTest * WorkerGroupMemberTest Streams: [https://github.com/apache/kafka/pull/14769] (DONE) * IQv2IntegrationTest * MetricsReporterIntegrationTest * NamedTopologyIntegrationTest * PurgeRepartitionTopicIntegrationTest These can be addressed by just fixing the tests. > Typos in tests leak network sockets > --- > > Key: KAFKA-15816 > URL: https://issues.apache.org/jira/browse/KAFKA-15816 > Project: Kafka > Issue Type: Bug > Components: unit tests >Affects Versions: 3.6.0 >Reporter: Greg Harris >Assignee: Greg Harris >Priority: Minor > > There are a few tests which leak network sockets due to small typos in the > tests themselves. > Clients: [https://github.com/apache/kafka/pull/14750] (DONE) > * NioEchoServer > * KafkaConsumerTest > * KafkaProducerTest > * SelectorTest > * SslTransportLayerTest > * SslTransportTls12Tls13Test > * SslVersionsTransportLayerTest > * SaslAuthenticatorTest > Core: [https://github.com/apache/kafka/pull/14754] > * MiniKdc > * GssapiAuthenticationTest > * MirrorMakerIntegrationTest > * SocketServerTest > * EpochDrivenReplicationProtocolAcceptanceTest > * LeaderEpochIntegrationTest > Trogdor: [https://github.com/apache/kafka/pull/14771] > * AgentTest > Mirror: [https://github.com/apache/kafka/pull/14761] > * DedicatedMirrorIntegrationTest > * MirrorConnectorsIntegrationTest > * MirrorConnectorsWithCustomForwardingAdminIntegrationTest > Runtime: [https://github.com/apache/kafka/pull/14764] > * ConnectorTopicsIntegrationTest > * ExactlyOnceSourceIntegrationTest > * WorkerTest > * WorkerGroupMemberTest > Streams: [https://github.com/apache/kafka/pull/14769] (DONE) > * IQv2IntegrationTest > * MetricsReporterIntegrationTest > * NamedTopologyIntegrationTest > * PurgeRepartitionTopicIntegrationTest > These can be addressed by just fixing the tests. -- This message was sent by Atlassian Jira (v8.20.10#820010)
Re: [PR] KAFKA-15816: Fix leaked sockets in clients tests [kafka]
gharris1727 merged PR #14750: URL: https://github.com/apache/kafka/pull/14750 -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-15816: Fix leaked sockets in clients tests [kafka]
gharris1727 commented on PR #14750: URL: https://github.com/apache/kafka/pull/14750#issuecomment-1841735675 Test failures appear unrelated, and the clients tests pass locally. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-15361: Process and persist dir info with broker registration [kafka]
cmccabe commented on code in PR #14838: URL: https://github.com/apache/kafka/pull/14838#discussion_r1416350081 ## metadata/src/main/java/org/apache/kafka/metadata/BrokerRegistration.java: ## @@ -192,7 +192,7 @@ private BrokerRegistration( this.isMigratingZkBroker = isMigratingZkBroker; directories = new ArrayList<>(directories); directories.sort(Uuid::compareTo); -this.directories = Collections.unmodifiableList(directories); +this.sortedDirectories = Collections.unmodifiableList(directories); Review Comment: ah, sorry, that is my bad. Please ignore this then :) -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] MINOR: ConsumerRebalanceListenerInvoker class and method visibility [kafka]
kirktrue commented on PR #14931: URL: https://github.com/apache/kafka/pull/14931#issuecomment-1841727258 Three yellows, one red. Restarting 爛 -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-15361: Process and persist dir info with broker registration [kafka]
cmccabe commented on PR #14838: URL: https://github.com/apache/kafka/pull/14838#issuecomment-1841725640 Sorry, one more thing: I didn't understand the purpose of the HeartbeatManager changes. Seems like we should be able to keep this file the same now? Or if not, can we do them in a separate PR where we have some rationale -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] MINOR: ConsumerRebalanceListenerInvoker class and method visibility [kafka]
kirktrue closed pull request #14931: MINOR: ConsumerRebalanceListenerInvoker class and method visibility URL: https://github.com/apache/kafka/pull/14931 -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-15361: Process and persist dir info with broker registration [kafka]
cmccabe commented on code in PR #14838: URL: https://github.com/apache/kafka/pull/14838#discussion_r1416342392 ## metadata/src/main/java/org/apache/kafka/controller/ClusterControlManager.java: ## @@ -354,6 +357,25 @@ public ControllerResult registerBroker( throw new BrokerIdNotRegisteredException("Controller is in pre-migration mode and cannot register KRaft " + "brokers until the metadata migration is complete."); } + +if (featureControl.metadataVersion().isDirectoryAssignmentSupported()) { +Set set = new HashSet<>(request.logDirs()); +if (set.stream().anyMatch(DirectoryId::reserved)) { +throw new InvalidRequestException("Reserved directory ID in request"); +} +if (set.size() != request.logDirs().size()) { +throw new InvalidRequestException("Duplicate directory ID in request"); +} +for (BrokerRegistration registration : brokerRegistrations().values()) { Review Comment: > Also, we shouldn't be checking the previous registration for this specific broker ID... it would be quite normal to re-register broker ID X with the same set of directories it had last time we registered it This might be what's causing the test failures btw -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-15361: Process and persist dir info with broker registration [kafka]
rondagostino commented on code in PR #14838: URL: https://github.com/apache/kafka/pull/14838#discussion_r1416337073 ## metadata/src/main/java/org/apache/kafka/metadata/BrokerRegistration.java: ## @@ -192,7 +192,7 @@ private BrokerRegistration( this.isMigratingZkBroker = isMigratingZkBroker; directories = new ArrayList<>(directories); directories.sort(Uuid::compareTo); -this.directories = Collections.unmodifiableList(directories); +this.sortedDirectories = Collections.unmodifiableList(directories); Review Comment: We do that in the prior 2 lines -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] MINOR: Change test logging capture to per-test, reducing jenkins truncation [kafka]
gharris1727 commented on PR #14795: URL: https://github.com/apache/kafka/pull/14795#issuecomment-1841693899 @mimaison I've raised a ticket to ask the Infra team what they think about this change: https://issues.apache.org/jira/browse/INFRA-25245 -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-15970 Copy over KIP-951 tests in FetcherTest.java to FetchRequestManagerTest.java [kafka]
wcarlson5 commented on PR #14916: URL: https://github.com/apache/kafka/pull/14916#issuecomment-1841690681 I'll rerun it and see if it gets any further :) -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-15361: Process and persist dir info with broker registration [kafka]
cmccabe commented on code in PR #14838: URL: https://github.com/apache/kafka/pull/14838#discussion_r1416327866 ## metadata/src/main/java/org/apache/kafka/controller/ClusterControlManager.java: ## @@ -354,6 +357,25 @@ public ControllerResult registerBroker( throw new BrokerIdNotRegisteredException("Controller is in pre-migration mode and cannot register KRaft " + "brokers until the metadata migration is complete."); } + +if (featureControl.metadataVersion().isDirectoryAssignmentSupported()) { +Set set = new HashSet<>(request.logDirs()); +if (set.stream().anyMatch(DirectoryId::reserved)) { +throw new InvalidRequestException("Reserved directory ID in request"); +} +if (set.size() != request.logDirs().size()) { +throw new InvalidRequestException("Duplicate directory ID in request"); +} +for (BrokerRegistration registration : brokerRegistrations().values()) { Review Comment: Another thing. This is O(num_brokers * num_directories) which isn't good for larger deployments. I think we should just have a `TimelineHashMap` that maps log dir UUID to the broker that has it. The main hassle is removing the mappings during broker unregistration, but that is rare, of course. ## metadata/src/main/java/org/apache/kafka/controller/ClusterControlManager.java: ## @@ -354,6 +357,25 @@ public ControllerResult registerBroker( throw new BrokerIdNotRegisteredException("Controller is in pre-migration mode and cannot register KRaft " + "brokers until the metadata migration is complete."); } + +if (featureControl.metadataVersion().isDirectoryAssignmentSupported()) { +Set set = new HashSet<>(request.logDirs()); +if (set.stream().anyMatch(DirectoryId::reserved)) { +throw new InvalidRequestException("Reserved directory ID in request"); +} +if (set.size() != request.logDirs().size()) { +throw new InvalidRequestException("Duplicate directory ID in request"); +} +for (BrokerRegistration registration : brokerRegistrations().values()) { Review Comment: Another thing. This is `O(num_brokers * num_directories)` which isn't good for larger deployments. I think we should just have a `TimelineHashMap` that maps log dir UUID to the broker that has it. The main hassle is removing the mappings during broker unregistration, but that is rare, of course. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-15361: Process and persist dir info with broker registration [kafka]
cmccabe commented on code in PR #14838: URL: https://github.com/apache/kafka/pull/14838#discussion_r1416317197 ## metadata/src/main/java/org/apache/kafka/controller/ClusterControlManager.java: ## @@ -354,6 +357,25 @@ public ControllerResult registerBroker( throw new BrokerIdNotRegisteredException("Controller is in pre-migration mode and cannot register KRaft " + "brokers until the metadata migration is complete."); } + +if (featureControl.metadataVersion().isDirectoryAssignmentSupported()) { Review Comment: It should not be legal to send an empty `logDirs` array when metadataVersion >= 3.7-IV2 Since setting that MV on the controller requires that all the brokers support it. It is, of course, OK to send logDirs when MV < 3.7-IV2. They will be ignored -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-15760: Disable flaky test testTaskRequestWithOldStartMsGetsUpdated [kafka]
splett2 commented on PR #14917: URL: https://github.com/apache/kafka/pull/14917#issuecomment-1841683355 @gharris1727 pretty odd that class loading takes so long, but the analysis seems reasonable. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-15361: Process and persist dir info with broker registration [kafka]
cmccabe commented on code in PR #14838: URL: https://github.com/apache/kafka/pull/14838#discussion_r1416324308 ## metadata/src/test/java/org/apache/kafka/controller/ClusterControlManagerTest.java: ## @@ -272,8 +274,17 @@ public void testRegistrationWithIncorrectClusterId() { (short) 1)); } +private static Stream metadataVersions() { +return Stream.of( +MetadataVersion.IBP_3_3_IV2, +MetadataVersion.IBP_3_3_IV3, +MetadataVersion.LATEST_PRODUCTION, Review Comment: I feel like we should explicitly include `IBP_3_7_IV2` here rather than `LATEST_PRODUCTION`. After all, that is the first JBOD version so it's worth testing specifically, even after `LATEST_PRODUCTION` marches on. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-15910: New group coordinator needs to generate snapshots while loading [kafka]
jeffkbkim commented on code in PR #14849: URL: https://github.com/apache/kafka/pull/14849#discussion_r1416324140 ## group-coordinator/src/main/java/org/apache/kafka/coordinator/group/runtime/CoordinatorRuntime.java: ## @@ -906,8 +831,8 @@ public void run() { withActiveContextOrThrow(tp, context -> { // Execute the read operation. response = op.generateResponse( -context.coordinator, -context.lastCommittedOffset +context.stateMachine.coordinator(), Review Comment: The lock you're referring to is the SnapshottableCoordinator right? i can't think of a way to resolve this without making the code messier. This seems to require a big change in our code structure -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-15361: Process and persist dir info with broker registration [kafka]
cmccabe commented on code in PR #14838: URL: https://github.com/apache/kafka/pull/14838#discussion_r1416321816 ## metadata/src/main/java/org/apache/kafka/metadata/BrokerRegistration.java: ## @@ -337,7 +341,7 @@ public boolean equals(Object o) { other.fenced == fenced && other.inControlledShutdown == inControlledShutdown && other.isMigratingZkBroker == isMigratingZkBroker && -other.directories.equals(directories); +other.sortedDirectories.equals(sortedDirectories); Review Comment: This is another reason to go with sorted -- `list.equals` won't do the right thing otherwise. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-15361: Process and persist dir info with broker registration [kafka]
cmccabe commented on code in PR #14838: URL: https://github.com/apache/kafka/pull/14838#discussion_r1416321397 ## metadata/src/main/java/org/apache/kafka/metadata/BrokerRegistration.java: ## @@ -192,7 +192,7 @@ private BrokerRegistration( this.isMigratingZkBroker = isMigratingZkBroker; directories = new ArrayList<>(directories); directories.sort(Uuid::compareTo); -this.directories = Collections.unmodifiableList(directories); +this.sortedDirectories = Collections.unmodifiableList(directories); Review Comment: Seems like we should copy the list and sort it here to make sure that it actually is sorted. It should be a short list (a dozen or so max) so not a big performance issue. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-15361: Process and persist dir info with broker registration [kafka]
cmccabe commented on code in PR #14838: URL: https://github.com/apache/kafka/pull/14838#discussion_r1416319067 ## metadata/src/main/java/org/apache/kafka/controller/ClusterControlManager.java: ## @@ -354,6 +357,25 @@ public ControllerResult registerBroker( throw new BrokerIdNotRegisteredException("Controller is in pre-migration mode and cannot register KRaft " + "brokers until the metadata migration is complete."); } + +if (featureControl.metadataVersion().isDirectoryAssignmentSupported()) { +Set set = new HashSet<>(request.logDirs()); +if (set.stream().anyMatch(DirectoryId::reserved)) { +throw new InvalidRequestException("Reserved directory ID in request"); +} +if (set.size() != request.logDirs().size()) { +throw new InvalidRequestException("Duplicate directory ID in request"); +} +for (BrokerRegistration registration : brokerRegistrations().values()) { Review Comment: I think we should have a custom error code for this situation. InvalidRequestException seems too generic. Also, we shouldn't be checking the previous registration for this specific broker ID... it would be quite normal to re-register broker ID X with the same set of directories it had last time we registered it. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-15361: Process and persist dir info with broker registration [kafka]
cmccabe commented on code in PR #14838: URL: https://github.com/apache/kafka/pull/14838#discussion_r1416317197 ## metadata/src/main/java/org/apache/kafka/controller/ClusterControlManager.java: ## @@ -354,6 +357,25 @@ public ControllerResult registerBroker( throw new BrokerIdNotRegisteredException("Controller is in pre-migration mode and cannot register KRaft " + "brokers until the metadata migration is complete."); } + +if (featureControl.metadataVersion().isDirectoryAssignmentSupported()) { Review Comment: If the metadata version does not support directory assignment then it shouldn't be sent by the brokers, right? And if it does support directory assignment but the brokers are not sending directories, we need to fail the heartbeat. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-15932: Wait for responses in consumer operations [kafka]
AndrewJSchofield commented on PR #14912: URL: https://github.com/apache/kafka/pull/14912#issuecomment-1841665124 Closing PR and re-opening to restart build. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-15932: Wait for responses in consumer operations [kafka]
AndrewJSchofield closed pull request #14912: KAFKA-15932: Wait for responses in consumer operations URL: https://github.com/apache/kafka/pull/14912 -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-15347: add support for 'single key multi timestamp' IQs with versioned state stores (KIP-968) [kafka]
mjsax commented on code in PR #14626: URL: https://github.com/apache/kafka/pull/14626#discussion_r1416297822 ## streams/src/test/java/org/apache/kafka/streams/integration/IQv2VersionedStoreIntegrationTest.java: ## @@ -88,7 +87,7 @@ public class IQv2VersionedStoreIntegrationTest { public static final EmbeddedKafkaCluster CLUSTER = new EmbeddedKafkaCluster(NUM_BROKERS, Utils.mkProperties(Collections.singletonMap("auto.create.topics.enable", "true"))); -private KafkaStreams kafkaStreams; +private static KafkaStreams kafkaStreams; Review Comment: Not sure if it's ok to make this one `static`? It's setup inside `@Before` which is not static and we run test in parallel... I think we need to rather pass it as a parameter into the newly added `static` methods that use it. ## streams/src/test/java/org/apache/kafka/streams/integration/IQv2VersionedStoreIntegrationTest.java: ## @@ -168,16 +167,13 @@ private void shouldHandleVersionedKeyQuery(final Optional queryTimestam final Long expectedTimestamp, final Optional expectedValidToTime) { -VersionedKeyQuery query = VersionedKeyQuery.withKey(RECORD_KEY); -if (queryTimestamp.isPresent()) { -query = query.asOf(queryTimestamp.get()); -} - -final StateQueryRequest> request = StateQueryRequest.inStore(STORE_NAME).withQuery(query).withPositionBound(PositionBound.at(INPUT_POSITION)); -final StateQueryResult> result = IntegrationTestUtils.iqv2WaitForResult(kafkaStreams, request); +// define query Review Comment: Comment can be removed. We call `defineQuery()` -- very obvious what is happening (what also means, it's well structured and easy to understand code) ## streams/src/main/java/org/apache/kafka/streams/query/MultiVersionedKeyQuery.java: ## @@ -57,7 +57,7 @@ private MultiVersionedKeyQuery(final K key, final Optional fromTime, fi * @param key The specified key by the query * @param The type of the key * @param The type of the value that will be retrieved - * @throws NullPointerException if @param key is null + * @throws NullPointerException if @code key is null Review Comment: This need to be in `{}` -> `{@code key}` ## streams/src/test/java/org/apache/kafka/streams/integration/IQv2VersionedStoreIntegrationTest.java: ## @@ -168,16 +167,13 @@ private void shouldHandleVersionedKeyQuery(final Optional queryTimestam final Long expectedTimestamp, final Optional expectedValidToTime) { -VersionedKeyQuery query = VersionedKeyQuery.withKey(RECORD_KEY); -if (queryTimestamp.isPresent()) { -query = query.asOf(queryTimestamp.get()); -} - -final StateQueryRequest> request = StateQueryRequest.inStore(STORE_NAME).withQuery(query).withPositionBound(PositionBound.at(INPUT_POSITION)); -final StateQueryResult> result = IntegrationTestUtils.iqv2WaitForResult(kafkaStreams, request); +// define query +final VersionedKeyQuery query = defineQuery(RECORD_KEY, queryTimestamp); -final QueryResult> queryResult = result.getOnlyPartitionResult(); +// send request and receive results Review Comment: as above (more of the same below) -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Created] (KAFKA-15975) Update kafka quickstart guide to no longer list ZK start first
Justine Olshan created KAFKA-15975: -- Summary: Update kafka quickstart guide to no longer list ZK start first Key: KAFKA-15975 URL: https://issues.apache.org/jira/browse/KAFKA-15975 Project: Kafka Issue Type: Task Components: docs Affects Versions: 4.0.0 Reporter: Justine Olshan Given we are deprecating ZooKeeper, I think we should update our quickstart guide to not list the ZooKeeper instructions first. With 4.0, we may want to remove it entirely. -- This message was sent by Atlassian Jira (v8.20.10#820010)
Re: [PR] MINOR: fully encapsulate user restore listener in the DelegatingRestoreListener [kafka]
ableegoldman commented on PR #14886: URL: https://github.com/apache/kafka/pull/14886#issuecomment-1841644618 Lots of test failures, but all are unrelated -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-15361: Process and persist dir info with broker registration [kafka]
rondagostino commented on PR #14838: URL: https://github.com/apache/kafka/pull/14838#issuecomment-1841623911 https://ci-builds.apache.org/blue/organizations/jenkins/Kafka%2Fkafka-pr/detail/PR-14838/13/tests ``` 208 tests have failed There are 20 new tests failing, 188 existing failing and 272 skipped ``` @soarez it seems that something is not correct. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAKFA-15629: Update upgrade-guide.html for TimestampedKeyQuery and TimestampedRangeQuery [kafka]
hanyuzheng7 commented on code in PR #14898: URL: https://github.com/apache/kafka/pull/14898#discussion_r1416268185 ## docs/streams/upgrade-guide.html: ## @@ -136,7 +136,12 @@ < Streams API changes in 3.7.0 IQv2 supports RangeQuery that allows to specify unbounded, bounded, or half-open key-ranges, which return data in ascending (byte[]-lexicographical) order (per partition). -https://cwiki.apache.org/confluence/display/KAFKA/KIP-985%3A+Add+reverseRange+and+reverseAll+query+over+kv-store+in+IQv2;>KIP-985 extends this functionality by adding .withDescendingKeys() to allow user to receive data in descending order. +https://cwiki.apache.org/confluence/display/KAFKA/KIP-985%3A+Add+reverseRange+and+reverseAll+query+over+kv-store+in+IQv2;>KIP-985 extends this functionality by adding .withDescendingKeys() to allow user to receive data in descending order. + + + +KIP-992 add two new query types, namely TimestampedKeyQuery and TimestampedRangeQuery. Both should be used to query a timestamped key-value store, to retrieve a ValueAndTimestamp result. The existing KeyQuery and RangeQuery are changes to always return the value only, also for timestamped key-value stores. Review Comment: ok -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-14412: Generalise over RocksDB WriteBatch [kafka]
ableegoldman commented on PR #14853: URL: https://github.com/apache/kafka/pull/14853#issuecomment-1841596989 Strangely one of the builds failed to compile due to some issue in the upgrade test/smoke client -- seems unrelated but we can't merge with a broken build, so let's retry and hope it goes away 爛 -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-15364: Handle log directory failure in the Controller [kafka]
soarez commented on code in PR #14902: URL: https://github.com/apache/kafka/pull/14902#discussion_r1416264410 ## metadata/src/main/java/org/apache/kafka/controller/BrokerHeartbeatManager.java: ## @@ -107,6 +125,13 @@ int id() { return id; } +/** + * Returns the sorted directories which the broker currently has available + */ +List directories() { Review Comment: This was part of [KAFKA-15361](https://issues.apache.org/jira/browse/KAFKA-15361) (https://github.com/apache/kafka/pull/14902) and is now gone since we removed dirs from the heartbeat manager. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-15364: Handle log directory failure in the Controller [kafka]
soarez commented on code in PR #14902: URL: https://github.com/apache/kafka/pull/14902#discussion_r1416263442 ## core/src/main/scala/kafka/server/ControllerApis.scala: ## @@ -1070,11 +1070,27 @@ class ControllerApis( } def handleAssignReplicasToDirs(request: RequestChannel.Request): CompletableFuture[Unit] = { +authHelper.authorizeClusterOperation(request, CLUSTER_ACTION) val assignReplicasToDirsRequest = request.body[AssignReplicasToDirsRequest] - -// TODO KAFKA-15426 -requestHelper.sendMaybeThrottle(request, - assignReplicasToDirsRequest.getErrorResponse(Errors.UNSUPPORTED_VERSION.exception)) -CompletableFuture.completedFuture[Unit](()) +val context = new ControllerRequestContext(request.context.header.data, request.context.principal, + OptionalLong.empty()) +controller.assignReplicasToDirs(context, assignReplicasToDirsRequest.data).handle[Unit] { (reply, e) => Review Comment: Good point. This change isn't really part of this PR. This PR builds on KAFKA-15361 (#14902) and KAFKA-15426 (#14863). This change is part of #14863, so this was addressed in [2874e11](https://github.com/apache/kafka/pull/14863/commits/2874e11f53c3a3fc73d480614f8d1cf695754bb8). -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-15361: Process and persist dir info with broker registration [kafka]
soarez commented on PR #14838: URL: https://github.com/apache/kafka/pull/14838#issuecomment-1841564307 Thanks for the review. @rondagostino AFAICT, the tests are passing on all JDK versions. Did I miss something? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-15365: Broker-side replica management changes [kafka]
soarez commented on PR #14881: URL: https://github.com/apache/kafka/pull/14881#issuecomment-1841561078 @rondagostino @cmccabe @pprovenzano PTAL -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-15365: Broker-side replica management changes [kafka]
soarez commented on code in PR #14881: URL: https://github.com/apache/kafka/pull/14881#discussion_r1416239117 ## core/src/test/scala/unit/kafka/log/LogManagerTest.scala: ## @@ -93,6 +93,54 @@ class LogManagerTest { log.appendAsLeader(TestUtils.singletonRecords("test".getBytes()), leaderEpoch = 0) } + /** + * Test that getOrCreateLog on a non-existent log creates a new log in given logDirectory using directory id and that we can append to the new log. + */ + @Test + def testCreateLogOnTargetedLogDirectory(): Unit = { +val targetedLogDirectoryId = DirectoryId.random() + +val dirs: Seq[File] = Seq.fill(5)(TestUtils.tempDir()) +writeMetaProperties(dirs(0)) +writeMetaProperties(dirs(1), Optional.of(targetedLogDirectoryId)) +writeMetaProperties(dirs(3), Optional.of(DirectoryId.random())) +writeMetaProperties(dirs(4)) + +logManager = createLogManager(dirs) + +val log = logManager.getOrCreateLog(new TopicPartition(name, 0), topicId = None, targetLogDirectoryId = Some(targetedLogDirectoryId)) +assertEquals(5, logManager.liveLogDirs.size) + +val logFile = new File(dirs(1), name + "-0") +assertTrue(logFile.exists) +assertEquals(dirs(1).getAbsolutePath, logFile.getParent) +log.appendAsLeader(TestUtils.singletonRecords("test".getBytes()), leaderEpoch = 0) + } + + /** + * Test that getOrCreateLog on a non-existent log creates a new log in random logDirectory if the given directory id is DirectoryId.UNASSIGNED. Review Comment: Not random -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-15365: Broker-side replica management changes [kafka]
soarez commented on code in PR #14881: URL: https://github.com/apache/kafka/pull/14881#discussion_r1416236550 ## core/src/main/scala/kafka/log/LogManager.scala: ## @@ -991,18 +997,29 @@ class LogManager(logDirs: Seq[File], * @param isNew Whether the replica should have existed on the broker or not * @param isFuture True if the future log of the specified partition should be returned or created * @param topicId The topic ID of the partition's topic + * @param targetLogDirectoryId The directory Id that should host the the partition's topic. + * The next selected directory will be picked up if it None or equal {@link DirectoryId.UNASSIGNED}. + * The method assumes provided Id belong to online directory. * @throws KafkaStorageException if isNew=false, log is not found in the cache and there is offline log directory on the broker * @throws InconsistentTopicIdException if the topic ID in the log does not match the topic ID provided */ - def getOrCreateLog(topicPartition: TopicPartition, isNew: Boolean = false, isFuture: Boolean = false, topicId: Option[Uuid]): UnifiedLog = { + def getOrCreateLog(topicPartition: TopicPartition, isNew: Boolean = false, isFuture: Boolean = false, + topicId: Option[Uuid], targetLogDirectoryId: Option[Uuid] = Option.empty): UnifiedLog = { logCreationOrDeletionLock synchronized { val log = getLog(topicPartition, isFuture).getOrElse { // create the log if it has not already been created in another thread if (!isNew && offlineLogDirs.nonEmpty) throw new KafkaStorageException(s"Can not create log for $topicPartition because log directories ${offlineLogDirs.mkString(",")} are offline") val logDirs: List[File] = { - val preferredLogDir = preferredLogDirs.get(topicPartition) + val preferredLogDir = targetLogDirectoryId.filterNot(_ == DirectoryId.UNASSIGNED) match { +case Some(targetId) if !preferredLogDirs.containsKey(topicPartition) => + // If partition is configured with both targetLogDirectoryId and preferredLogDirs, then + // preferredLogDirs will be respected, otherwise targetLogDirectoryId will be respected + directoryIds.find(_._2 == targetId).map(_._1).getOrElse(null) Review Comment: This makes sense. The preferredLogDir is set by an admin, and if that exists it should take precedence. ## core/src/main/scala/kafka/log/LogManager.scala: ## @@ -991,18 +997,29 @@ class LogManager(logDirs: Seq[File], * @param isNew Whether the replica should have existed on the broker or not * @param isFuture True if the future log of the specified partition should be returned or created * @param topicId The topic ID of the partition's topic + * @param targetLogDirectoryId The directory Id that should host the the partition's topic. + * The next selected directory will be picked up if it None or equal {@link DirectoryId.UNASSIGNED}. + * The method assumes provided Id belong to online directory. * @throws KafkaStorageException if isNew=false, log is not found in the cache and there is offline log directory on the broker * @throws InconsistentTopicIdException if the topic ID in the log does not match the topic ID provided */ - def getOrCreateLog(topicPartition: TopicPartition, isNew: Boolean = false, isFuture: Boolean = false, topicId: Option[Uuid]): UnifiedLog = { + def getOrCreateLog(topicPartition: TopicPartition, isNew: Boolean = false, isFuture: Boolean = false, + topicId: Option[Uuid], targetLogDirectoryId: Option[Uuid] = Option.empty): UnifiedLog = { logCreationOrDeletionLock synchronized { val log = getLog(topicPartition, isFuture).getOrElse { // create the log if it has not already been created in another thread if (!isNew && offlineLogDirs.nonEmpty) throw new KafkaStorageException(s"Can not create log for $topicPartition because log directories ${offlineLogDirs.mkString(",")} are offline") val logDirs: List[File] = { - val preferredLogDir = preferredLogDirs.get(topicPartition) + val preferredLogDir = targetLogDirectoryId.filterNot(_ == DirectoryId.UNASSIGNED) match { +case Some(targetId) if !preferredLogDirs.containsKey(topicPartition) => + // If partition is configured with both targetLogDirectoryId and preferredLogDirs, then + // preferredLogDirs will be respected, otherwise targetLogDirectoryId will be respected + directoryIds.find(_._2 == targetId).map(_._1).getOrElse(null) Review Comment: This makes sense. The preferredLogDir is set by an admin, and if that exists it should take precedence. -- This is an automated message from the Apache Git Service. To