[jira] [Resolved] (KAFKA-14435) Kraft: StandardAuthorizer allowing a non-authorized user when `allow.everyone.if.no.acl.found` is enabled
[ https://issues.apache.org/jira/browse/KAFKA-14435?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Purshotam Chauhan resolved KAFKA-14435. --- Fix Version/s: 3.3.2 3.4.0 Resolution: Fixed > Kraft: StandardAuthorizer allowing a non-authorized user when > `allow.everyone.if.no.acl.found` is enabled > - > > Key: KAFKA-14435 > URL: https://issues.apache.org/jira/browse/KAFKA-14435 > Project: Kafka > Issue Type: Bug > Components: kraft >Affects Versions: 3.2.0, 3.3.0, 3.2.1, 3.2.2, 3.2.3, 3.3.1 >Reporter: Purshotam Chauhan >Assignee: Purshotam Chauhan >Priority: Critical > Fix For: 3.3.2, 3.4.0 > > > When `allow.everyone.if.no.acl.found` is enabled, the authorizer should allow > everyone only if there is no ACL present for a particular resource. But if > there are ACL present for the resource, then it shouldn't be allowing > everyone. > StandardAuthorizer is allowing the principals for which no ACLs are defined > even when the resource has other ACLs. > > This behavior can be validated with the following test case: > > {code:java} > @Test > public void testAllowEveryoneConfig() throws Exception { > StandardAuthorizer authorizer = new StandardAuthorizer(); > HashMap configs = new HashMap<>(); > configs.put(SUPER_USERS_CONFIG, "User:alice;User:chris"); > configs.put(ALLOW_EVERYONE_IF_NO_ACL_IS_FOUND_CONFIG, "true"); > authorizer.configure(configs); > authorizer.start(new > AuthorizerTestServerInfo(Collections.singletonList(PLAINTEXT))); > authorizer.completeInitialLoad(); > // Allow User:Alice to read topic "foobar" > List acls = asList( > withId(new StandardAcl(TOPIC, "foobar", LITERAL, "User:Alice", > WILDCARD, READ, ALLOW)) > ); > acls.forEach(acl -> authorizer.addAcl(acl.id(), acl.acl())); > // User:Bob shouldn't be allowed to read topic "foobar" > assertEquals(singletonList(DENIED), > authorizer.authorize(new MockAuthorizableRequestContext.Builder(). > setPrincipal(new KafkaPrincipal(USER_TYPE, "Bob")).build(), > singletonList(newAction(READ, TOPIC, "foobar"; > } > {code} > > In the above test, `User:Bob` should be DENIED but the above test case fails. -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Updated] (KAFKA-14435) Kraft: StandardAuthorizer allowing a non-authorized user when `allow.everyone.if.no.acl.found` is enabled
[ https://issues.apache.org/jira/browse/KAFKA-14435?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Purshotam Chauhan updated KAFKA-14435: -- Affects Version/s: 3.3.1 3.3.0 > Kraft: StandardAuthorizer allowing a non-authorized user when > `allow.everyone.if.no.acl.found` is enabled > - > > Key: KAFKA-14435 > URL: https://issues.apache.org/jira/browse/KAFKA-14435 > Project: Kafka > Issue Type: Bug > Components: kraft >Affects Versions: 3.2.0, 3.3.0, 3.2.1, 3.2.2, 3.2.3, 3.3.1 >Reporter: Purshotam Chauhan >Assignee: Purshotam Chauhan >Priority: Critical > > When `allow.everyone.if.no.acl.found` is enabled, the authorizer should allow > everyone only if there is no ACL present for a particular resource. But if > there are ACL present for the resource, then it shouldn't be allowing > everyone. > StandardAuthorizer is allowing the principals for which no ACLs are defined > even when the resource has other ACLs. > > This behavior can be validated with the following test case: > > {code:java} > @Test > public void testAllowEveryoneConfig() throws Exception { > StandardAuthorizer authorizer = new StandardAuthorizer(); > HashMap configs = new HashMap<>(); > configs.put(SUPER_USERS_CONFIG, "User:alice;User:chris"); > configs.put(ALLOW_EVERYONE_IF_NO_ACL_IS_FOUND_CONFIG, "true"); > authorizer.configure(configs); > authorizer.start(new > AuthorizerTestServerInfo(Collections.singletonList(PLAINTEXT))); > authorizer.completeInitialLoad(); > // Allow User:Alice to read topic "foobar" > List acls = asList( > withId(new StandardAcl(TOPIC, "foobar", LITERAL, "User:Alice", > WILDCARD, READ, ALLOW)) > ); > acls.forEach(acl -> authorizer.addAcl(acl.id(), acl.acl())); > // User:Bob shouldn't be allowed to read topic "foobar" > assertEquals(singletonList(DENIED), > authorizer.authorize(new MockAuthorizableRequestContext.Builder(). > setPrincipal(new KafkaPrincipal(USER_TYPE, "Bob")).build(), > singletonList(newAction(READ, TOPIC, "foobar"; > } > {code} > > In the above test, `User:Bob` should be DENIED but the above test case fails. -- This message was sent by Atlassian Jira (v8.20.10#820010)
[GitHub] [kafka] omkreddy merged pull request #11976: KAFKA-13771: Support to explicitly delete delegationTokens that have expired but have not been automatically cleaned up
omkreddy merged PR #11976: URL: https://github.com/apache/kafka/pull/11976 -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] satishd merged pull request #13244: KAFKA-14495: assert the cache size for each operation
satishd merged PR #13244: URL: https://github.com/apache/kafka/pull/13244 -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] hgeraldino commented on a diff in pull request #13191: KAFKA-14060: Replace EasyMock and PowerMock with Mockito in AbstractWorkerSourceTaskTest
hgeraldino commented on code in PR #13191: URL: https://github.com/apache/kafka/pull/13191#discussion_r1112485394 ## connect/runtime/src/test/java/org/apache/kafka/connect/runtime/AbstractWorkerSourceTaskTest.java: ## @@ -453,19 +444,22 @@ public void testSendRecordsTopicCreateRetries() { SourceRecord record1 = new SourceRecord(PARTITION, OFFSET, TOPIC, 1, KEY_SCHEMA, KEY, RECORD_SCHEMA, RECORD); SourceRecord record2 = new SourceRecord(PARTITION, OFFSET, TOPIC, 2, KEY_SCHEMA, KEY, RECORD_SCHEMA, RECORD); -// First call to describe the topic times out expectPreliminaryCalls(); - EasyMock.expect(admin.describeTopics(TOPIC)).andReturn(Collections.emptyMap()); -Capture newTopicCapture = EasyMock.newCapture(); - EasyMock.expect(admin.createOrFindTopics(EasyMock.capture(newTopicCapture))) -.andThrow(new RetriableException(new TimeoutException("timeout"))); - -// Second round -expectTopicCreation(TOPIC); -expectSendRecord(); -expectSendRecord(); -PowerMock.replayAll(); +when(admin.describeTopics(TOPIC)).thenReturn(Collections.emptyMap()); +when(admin.createOrFindTopics(any(NewTopic.class))) +.thenAnswer(new Answer() { +boolean firstCall = true; + +@Override +public TopicAdmin.TopicCreationResponse answer(InvocationOnMock invocation) { +if (firstCall) { +firstCall = false; +throw new RetriableException(new TimeoutException("timeout")); +} +return createdTopic(TOPIC); +} +}); workerTask.toSend = Arrays.asList(record1, record2); Review Comment: This one is a little bit trickier, as we cannot do partial verification without resetting the mock. What I ended up doing was checking that calls to `createOrFindTopics` happen twice midway, and verify the arguments, then check once again at the end of the test - this last verification is a cumulative of all 3 calls. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] hgeraldino commented on a diff in pull request #13191: KAFKA-14060: Replace EasyMock and PowerMock with Mockito in AbstractWorkerSourceTaskTest
hgeraldino commented on code in PR #13191: URL: https://github.com/apache/kafka/pull/13191#discussion_r1112474934 ## connect/runtime/src/test/java/org/apache/kafka/connect/runtime/AbstractWorkerSourceTaskTest.java: ## @@ -639,144 +644,112 @@ public void testTopicCreateSucceedsWhenCreateReturnsNewTopicFound() { SourceRecord record1 = new SourceRecord(PARTITION, OFFSET, TOPIC, 1, KEY_SCHEMA, KEY, RECORD_SCHEMA, RECORD); SourceRecord record2 = new SourceRecord(PARTITION, OFFSET, TOPIC, 2, KEY_SCHEMA, KEY, RECORD_SCHEMA, RECORD); -expectPreliminaryCalls(); - EasyMock.expect(admin.describeTopics(TOPIC)).andReturn(Collections.emptyMap()); +expectSendRecord(emptyHeaders()); -Capture newTopicCapture = EasyMock.newCapture(); - EasyMock.expect(admin.createOrFindTopics(EasyMock.capture(newTopicCapture))).andReturn(createdTopic(TOPIC)); - -expectSendRecord(); -expectSendRecord(); - -PowerMock.replayAll(); +when(admin.describeTopics(TOPIC)).thenReturn(Collections.emptyMap()); + when(admin.createOrFindTopics(any(NewTopic.class))).thenReturn(createdTopic(TOPIC)); workerTask.toSend = Arrays.asList(record1, record2); workerTask.sendRecords(); + +ArgumentCaptor> sent = verifySendRecord(2); + +List> capturedValues = sent.getAllValues(); +assertEquals(2, capturedValues.size()); } -private Capture> expectSendRecord( -String topic, -boolean anyTimes, -Headers headers -) { +private void expectSendRecord(Headers headers) { if (headers != null) -expectConvertHeadersAndKeyValue(topic, anyTimes, headers); +expectConvertHeadersAndKeyValue(headers); -expectApplyTransformationChain(anyTimes); +expectApplyTransformationChain(); -Capture> sent = EasyMock.newCapture(); - -IExpectationSetters> expect = EasyMock.expect( -producer.send(EasyMock.capture(sent), EasyMock.capture(producerCallbacks))); +expectTaskGetTopic(); +} -IAnswer> expectResponse = () -> { -synchronized (producerCallbacks) { -for (Callback cb : producerCallbacks.getValues()) { -cb.onCompletion(new RecordMetadata(new TopicPartition("foo", 0), 0, 0, 0L, 0, 0), null); -} -producerCallbacks.reset(); -} -return null; -}; +private ArgumentCaptor> verifySendRecord() { +return verifySendRecord(1); +} -if (anyTimes) -expect.andStubAnswer(expectResponse); -else -expect.andAnswer(expectResponse); +private ArgumentCaptor> verifySendRecord(int times) { +ArgumentCaptor> sent = ArgumentCaptor.forClass(ProducerRecord.class); +ArgumentCaptor producerCallbacks = ArgumentCaptor.forClass(Callback.class); +verify(producer, times(times)).send(sent.capture(), producerCallbacks.capture()); -expectTaskGetTopic(anyTimes); +for (Callback cb : producerCallbacks.getAllValues()) { +cb.onCompletion(new RecordMetadata(new TopicPartition("foo", 0), 0, 0, 0L, 0, 0), +null); +} return sent; } -private Capture> expectSendRecordAnyTimes() { -return expectSendRecord(TOPIC, true, emptyHeaders()); +private void expectTaskGetTopic() { +when(statusBackingStore.getTopic(anyString(), anyString())).thenAnswer((Answer) invocation -> { +String connector = invocation.getArgument(0, String.class); +String topic = invocation.getArgument(1, String.class); +return new TopicStatus(topic, new ConnectorTaskId(connector, 0), Time.SYSTEM.milliseconds()); +}); } -private Capture> expectSendRecord() { -return expectSendRecord(TOPIC, false, emptyHeaders()); -} +private void verifyTaskGetTopic() { +ArgumentCaptor connectorCapture = ArgumentCaptor.forClass(String.class); +ArgumentCaptor topicCapture = ArgumentCaptor.forClass(String.class); +ArgumentCaptor newTopicCapture = ArgumentCaptor.forClass(NewTopic.class); +verify(statusBackingStore).getTopic(connectorCapture.capture(), topicCapture.capture()); -private void expectTaskGetTopic(boolean anyTimes) { -final Capture connectorCapture = EasyMock.newCapture(); -final Capture topicCapture = EasyMock.newCapture(); -IExpectationSetters expect = EasyMock.expect(statusBackingStore.getTopic( -EasyMock.capture(connectorCapture), -EasyMock.capture(topicCapture))); -if (anyTimes) { -expect.andStubAnswer(() -> new TopicStatus( -topicCapture.getValue(), -new ConnectorTaskId(connectorCapture.getValue(), 0), -Time.SYSTEM.mil
[GitHub] [kafka] hgeraldino commented on a diff in pull request #13191: KAFKA-14060: Replace EasyMock and PowerMock with Mockito in AbstractWorkerSourceTaskTest
hgeraldino commented on code in PR #13191: URL: https://github.com/apache/kafka/pull/13191#discussion_r1112464114 ## connect/runtime/src/test/java/org/apache/kafka/connect/runtime/AbstractWorkerSourceTaskTest.java: ## @@ -485,32 +479,45 @@ public void testSendRecordsTopicDescribeRetriesMidway() { SourceRecord record2 = new SourceRecord(PARTITION, OFFSET, TOPIC, 2, KEY_SCHEMA, KEY, RECORD_SCHEMA, RECORD); SourceRecord record3 = new SourceRecord(PARTITION, OFFSET, OTHER_TOPIC, 3, KEY_SCHEMA, KEY, RECORD_SCHEMA, RECORD); -// First round -expectPreliminaryCalls(OTHER_TOPIC); -expectTopicCreation(TOPIC); -expectSendRecord(); -expectSendRecord(); - -// First call to describe the topic times out -EasyMock.expect(admin.describeTopics(OTHER_TOPIC)) -.andThrow(new RetriableException(new TimeoutException("timeout"))); +expectPreliminaryCalls(); -// Second round -expectTopicCreation(OTHER_TOPIC); -expectSendRecord(OTHER_TOPIC, false, emptyHeaders()); +when(admin.describeTopics(anyString())).thenAnswer(new Answer>() { +int counter = 0; -PowerMock.replayAll(); +@Override +public Map answer(InvocationOnMock invocation) { +counter++; +if (counter == 2) { +throw new RetriableException(new TimeoutException("timeout")); +} -// Try to send 3, make first pass, second fail. Should save last two +return Collections.emptyMap(); +} +}); Review Comment: It certainly is more readable. Fixed -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] bachmanity1 commented on pull request #13261: MINOR: after reading BYTES type it's possible to access data beyond its size
bachmanity1 commented on PR #13261: URL: https://github.com/apache/kafka/pull/13261#issuecomment-1437758582 @cmccabe could you please review this? Thanks! -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] hgeraldino commented on a diff in pull request #13191: KAFKA-14060: Replace EasyMock and PowerMock with Mockito in AbstractWorkerSourceTaskTest
hgeraldino commented on code in PR #13191: URL: https://github.com/apache/kafka/pull/13191#discussion_r1112459731 ## connect/runtime/src/test/java/org/apache/kafka/connect/runtime/AbstractWorkerSourceTaskTest.java: ## @@ -235,115 +236,100 @@ public void testMetricsGroup() { public void testSendRecordsConvertsData() { createWorkerTask(); -List records = new ArrayList<>(); // Can just use the same record for key and value -records.add(new SourceRecord(PARTITION, OFFSET, "topic", null, KEY_SCHEMA, KEY, RECORD_SCHEMA, RECORD)); - -Capture> sent = expectSendRecordAnyTimes(); +List records = Collections.singletonList( +new SourceRecord(PARTITION, OFFSET, "topic", null, KEY_SCHEMA, KEY, RECORD_SCHEMA, RECORD) +); +expectSendRecord(emptyHeaders()); expectTopicCreation(TOPIC); -PowerMock.replayAll(); - workerTask.toSend = records; workerTask.sendRecords(); + +ArgumentCaptor> sent = verifySendRecord(); + assertEquals(SERIALIZED_KEY, sent.getValue().key()); assertEquals(SERIALIZED_RECORD, sent.getValue().value()); -PowerMock.verifyAll(); +verifyTaskGetTopic(); } @Test public void testSendRecordsPropagatesTimestamp() { final Long timestamp = System.currentTimeMillis(); - createWorkerTask(); -List records = Collections.singletonList( -new SourceRecord(PARTITION, OFFSET, "topic", null, KEY_SCHEMA, KEY, RECORD_SCHEMA, RECORD, timestamp) -); - -Capture> sent = expectSendRecordAnyTimes(); - +expectSendRecord(emptyHeaders()); expectTopicCreation(TOPIC); -PowerMock.replayAll(); - -workerTask.toSend = records; +workerTask.toSend = Collections.singletonList( +new SourceRecord(PARTITION, OFFSET, "topic", null, KEY_SCHEMA, KEY, RECORD_SCHEMA, RECORD, timestamp) +); workerTask.sendRecords(); + +ArgumentCaptor> sent = verifySendRecord(); assertEquals(timestamp, sent.getValue().timestamp()); -PowerMock.verifyAll(); +verifyTaskGetTopic(); } @Test public void testSendRecordsCorruptTimestamp() { final Long timestamp = -3L; createWorkerTask(); -List records = Collections.singletonList( +expectSendRecord(emptyHeaders()); +expectTopicCreation(TOPIC); Review Comment: Good catch. Removed -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] hgeraldino commented on a diff in pull request #13191: KAFKA-14060: Replace EasyMock and PowerMock with Mockito in AbstractWorkerSourceTaskTest
hgeraldino commented on code in PR #13191: URL: https://github.com/apache/kafka/pull/13191#discussion_r1112456303 ## connect/runtime/src/test/java/org/apache/kafka/connect/runtime/AbstractWorkerSourceTaskTest.java: ## @@ -639,144 +644,112 @@ public void testTopicCreateSucceedsWhenCreateReturnsNewTopicFound() { SourceRecord record1 = new SourceRecord(PARTITION, OFFSET, TOPIC, 1, KEY_SCHEMA, KEY, RECORD_SCHEMA, RECORD); SourceRecord record2 = new SourceRecord(PARTITION, OFFSET, TOPIC, 2, KEY_SCHEMA, KEY, RECORD_SCHEMA, RECORD); -expectPreliminaryCalls(); - EasyMock.expect(admin.describeTopics(TOPIC)).andReturn(Collections.emptyMap()); +expectSendRecord(emptyHeaders()); -Capture newTopicCapture = EasyMock.newCapture(); - EasyMock.expect(admin.createOrFindTopics(EasyMock.capture(newTopicCapture))).andReturn(createdTopic(TOPIC)); - -expectSendRecord(); -expectSendRecord(); - -PowerMock.replayAll(); +when(admin.describeTopics(TOPIC)).thenReturn(Collections.emptyMap()); + when(admin.createOrFindTopics(any(NewTopic.class))).thenReturn(createdTopic(TOPIC)); workerTask.toSend = Arrays.asList(record1, record2); workerTask.sendRecords(); + +ArgumentCaptor> sent = verifySendRecord(2); + +List> capturedValues = sent.getAllValues(); +assertEquals(2, capturedValues.size()); } -private Capture> expectSendRecord( -String topic, -boolean anyTimes, -Headers headers -) { +private void expectSendRecord(Headers headers) { if (headers != null) -expectConvertHeadersAndKeyValue(topic, anyTimes, headers); +expectConvertHeadersAndKeyValue(headers); -expectApplyTransformationChain(anyTimes); +expectApplyTransformationChain(); -Capture> sent = EasyMock.newCapture(); - -IExpectationSetters> expect = EasyMock.expect( -producer.send(EasyMock.capture(sent), EasyMock.capture(producerCallbacks))); +expectTaskGetTopic(); +} -IAnswer> expectResponse = () -> { -synchronized (producerCallbacks) { -for (Callback cb : producerCallbacks.getValues()) { -cb.onCompletion(new RecordMetadata(new TopicPartition("foo", 0), 0, 0, 0L, 0, 0), null); -} -producerCallbacks.reset(); -} -return null; -}; +private ArgumentCaptor> verifySendRecord() { +return verifySendRecord(1); +} -if (anyTimes) -expect.andStubAnswer(expectResponse); -else -expect.andAnswer(expectResponse); +private ArgumentCaptor> verifySendRecord(int times) { +ArgumentCaptor> sent = ArgumentCaptor.forClass(ProducerRecord.class); +ArgumentCaptor producerCallbacks = ArgumentCaptor.forClass(Callback.class); +verify(producer, times(times)).send(sent.capture(), producerCallbacks.capture()); -expectTaskGetTopic(anyTimes); +for (Callback cb : producerCallbacks.getAllValues()) { +cb.onCompletion(new RecordMetadata(new TopicPartition("foo", 0), 0, 0, 0L, 0, 0), +null); +} return sent; } -private Capture> expectSendRecordAnyTimes() { -return expectSendRecord(TOPIC, true, emptyHeaders()); +private void expectTaskGetTopic() { +when(statusBackingStore.getTopic(anyString(), anyString())).thenAnswer((Answer) invocation -> { +String connector = invocation.getArgument(0, String.class); +String topic = invocation.getArgument(1, String.class); +return new TopicStatus(topic, new ConnectorTaskId(connector, 0), Time.SYSTEM.milliseconds()); +}); } -private Capture> expectSendRecord() { -return expectSendRecord(TOPIC, false, emptyHeaders()); -} +private void verifyTaskGetTopic() { +ArgumentCaptor connectorCapture = ArgumentCaptor.forClass(String.class); +ArgumentCaptor topicCapture = ArgumentCaptor.forClass(String.class); +ArgumentCaptor newTopicCapture = ArgumentCaptor.forClass(NewTopic.class); +verify(statusBackingStore).getTopic(connectorCapture.capture(), topicCapture.capture()); -private void expectTaskGetTopic(boolean anyTimes) { -final Capture connectorCapture = EasyMock.newCapture(); -final Capture topicCapture = EasyMock.newCapture(); -IExpectationSetters expect = EasyMock.expect(statusBackingStore.getTopic( -EasyMock.capture(connectorCapture), -EasyMock.capture(topicCapture))); -if (anyTimes) { -expect.andStubAnswer(() -> new TopicStatus( -topicCapture.getValue(), -new ConnectorTaskId(connectorCapture.getValue(), 0), -Time.SYSTEM.mil
[jira] [Created] (KAFKA-14736) Kafka Connect REST API: POST/PUT/DELETE requests are not working
lingsbigm created KAFKA-14736: - Summary: Kafka Connect REST API: POST/PUT/DELETE requests are not working Key: KAFKA-14736 URL: https://issues.apache.org/jira/browse/KAFKA-14736 Project: Kafka Issue Type: Improvement Components: KafkaConnect Affects Versions: 3.1.1 Environment: deverlopment Reporter: lingsbigm Hi, We now using debezium 1.8.1. Final with kafka connect in distributed mode, But suddenly one day we found that we can't add a new connector and found nothing in the log when we try to delete a connector or update the configuration of the connector, and not work too. Besides, I found connect-configs topic has no messages before the first operation, and it also has some messages when updating or deleting the connector, but the connector has nothing changed. Have Anyone occurred the same problem too? -- This message was sent by Atlassian Jira (v8.20.10#820010)
[GitHub] [kafka] showuon commented on pull request #13100: MINOR: add size check for tagged fields
showuon commented on PR #13100: URL: https://github.com/apache/kafka/pull/13100#issuecomment-1437718642 @mimaison , sorry, my bad, PR updated. Thanks. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] kowshik commented on pull request #13268: MINOR: Introduce OffsetAndEpoch in LeaderEndpoint interface return values
kowshik commented on PR #13268: URL: https://github.com/apache/kafka/pull/13268#issuecomment-1437650383 @junrao Thanks for the review! I've addressed the comments, the PR is ready for another pass. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] nizhikov opened a new pull request, #13281: [MINOR] Adjust logging with ZK log format
nizhikov opened a new pull request, #13281: URL: https://github.com/apache/kafka/pull/13281 *More detailed description of your change, if necessary. The PR title and PR message become the squashed commit message, so use a separate comment to ping reviewers.* *Summary of testing strategy (including rationale) for the feature or bug fix. Unit and/or integration tests are expected for any behaviour change and system tests should be considered for larger changes.* ### Committer Checklist (excluded from commit message) - [ ] Verify design and implementation - [ ] Verify test coverage and CI build status - [ ] Verify documentation (including upgrade notes) -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] rondagostino opened a new pull request, #13280: KAFKA-14735: Improve KRaft metadata image change performance at high …
rondagostino opened a new pull request, #13280: URL: https://github.com/apache/kafka/pull/13280 …topic counts Performance of KRaft metadata image changes is currently O(<# of topics in cluster>). This means the amount of time it takes to create just a single topic scales linearly with the number of topics in the entire cluster. This impact both controllers and brokers because both use the metadata image to represent the KRaft metadata log. The performance of these changes should scale with the number of topics being changed – creating a single topic should perform similarly regardless of the number of topics in the cluster. This patch introduces a dependency on the [Paguro](https://github.com/GlenKPeterson/Paguro/) library for immutable/persistent collection support in Java and leverages persistent data structures to avoid copying the entire TopicsImage upon every change. We choose this library because it is relatively small and [well-integrated](https://github.com/GlenKPeterson/Paguro/blob/main/inheritanceHierarchy.pdf) with the existing Java Collections class hierarchy (the latter property especially helps to minimize the changes required to introduce the library into the existing code base). The patch also adds the following JMH benchmarks demonstrating the resulting performance changes: - `TopicsImageSingleRecordChangeBenchmark` tracks how long it takes to create a new topic. This is the benchmark that clearly identifies the O(N) behavior in the existing code and that most dramatically illustrates a performance improvement. As shown below, the existing code takes several orders of magnitude longer to make a single change than the new code. The existing code, with 12,500 topics, took 1.4 milliseconds on my laptop and grows more or less linearly as the number of topics grows. The new code took a constant amount of time (~250 nanoseconds) regardless of the number of topics in the cluster. The reason for the improvement is because it is inexpensive to add, update, or delete an entry in an immutable, persistent map to create a new persistent map. The new map shares the vast amount of the old map; only the root node and any nodes along the path to the node that must change are swapped out, and when the reference to the old map is released the swapped-out nodes can be garbage-collected. **Current Code, unpatched** Total Topic Count | nanoseconds/op | error -- | -- | -- 12,500 | 1,410,901 | 153,461 25,000 | 3,570,451 | 221,992 50,000 | 14,143,125 | 1,123,616 100,000 | 31,126,930 | 4,243,625 **Updated Code** Total Topic Count | nanoseconds/op | error -- | -- | -- 12,500 | 258 | 13 25,000 | 265 | 8 50,000 | 273 | 5 100,000 | 222 | 4 - `TopicsImageZonalOutageBenchmark` simulates a zonal outage where each broker in the zone will lose its session – in this benchmark we assume the controller deals with them one by one rather than demoting 1/3 of the cluster all at once. Since the number of topics per broker does not change very much, we expect O(N) behavior with the current code but not with the updated code, so we should see a performance improvement here as well -- and in fact we do. The existing code scales with the number of topics in the cluster, thus the time always doubles as the cluster size doubles, increasing from 5ms to 47ms (a factor of 9) as the cluster scales by a factor of 8. The updated code should scale with the number of affected topics, which in this case, based on the (debatable) values chosen of 1 replicas per broker and 10 partitions per topic, means a factor of 1.6 (from 4167 topics affected to 6667 topics affected) as the cluster scaled by a factor of 8. In fact we see the time spent increasing by a factor of 2.6 (from 4.4 ms to 11.6 ms) when the cluster scaled by that factor of 8. This a bit higher than expected, but it is still sub-linear (and there is some +/- error in these numbers, so the sub-linear behavior is the real point as opposed to the specific number). **Current Code, unpatched** Total Topic Count | milliseconds/op | error | (Brokers Impacted) | (Topics Impacted) -- | -- | -- | -- | -- 12,500 | 5.2 | 0.4 | 1/36 | 4,167 25,000 | 10.6 | 0.1 | 1/75 | 5,000 50,000 | 21.7 | 0.4 | 1/150 | 6,667 100,000 | 47.7 | 5.2 | 1/300 | 6,667 **Updated Code** Total Topic Count | milliseconds/op | error | (Brokers Impacted) | (Topics Impacted) -- | -- | -- | -- | -- 12,500 | 4.4 | 0.2 | 1/36 | 4,167 25,000 | 6.9 | 0.2 | 1/75 | 5,000 50,000 | 10.2 | 2.5 | 1/150 | 6,667 100,000 | 11.6 | 2.8 | 1/300 | 6,667 - `TopicsImageSnapshotLoadBenchmark` simulates the loading of a snapshot when the broker starts – i.e. load up 100,000 topics/1M partitions from scratch and commit them all at once. We would expect to see some performance degradation here in the updated code, and the qu
[jira] [Created] (KAFKA-14735) Improve KRaft metadata image change performance at high topic counts
Ron Dagostino created KAFKA-14735: - Summary: Improve KRaft metadata image change performance at high topic counts Key: KAFKA-14735 URL: https://issues.apache.org/jira/browse/KAFKA-14735 Project: Kafka Issue Type: Improvement Components: kraft Reporter: Ron Dagostino Assignee: Ron Dagostino Fix For: 3.5.0 Performance of KRaft metadata image changes is currently O(<# of topics in cluster>). This means the amount of time it takes to create just a *single* topic scales linearly with the number of topics in the entire cluster. This impact both controllers and brokers because both use the metadata image to represent the KRaft metadata log. The performance of these changes should scale with the number of topics being changed -- so creating a single topic should perform similarly regardless of the number of topics in the cluster. -- This message was sent by Atlassian Jira (v8.20.10#820010)
[GitHub] [kafka] vamossagar12 commented on pull request #13127: KAFKA-14586: Moving StreamResetter to tools
vamossagar12 commented on PR #13127: URL: https://github.com/apache/kafka/pull/13127#issuecomment-1437373209 > > Regarding moving to CommandDefaultOptions, is it better if we do it in a follow up PR as that is not directly connected to migrating to tools module. > > Hi @vamossagar12, I'm ok with doing it in a follow up PR. Do we have a Jira for that? Thanks @fvaleri , here you go: https://issues.apache.org/jira/browse/KAFKA-14734 -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Created] (KAFKA-14734) Use CommandDefaultOptions in StreamsResetter
Sagar Rao created KAFKA-14734: - Summary: Use CommandDefaultOptions in StreamsResetter Key: KAFKA-14734 URL: https://issues.apache.org/jira/browse/KAFKA-14734 Project: Kafka Issue Type: Sub-task Reporter: Sagar Rao This came up as a suggestion here: [https://github.com/apache/kafka/pull/13127#issuecomment-1433155607] . -- This message was sent by Atlassian Jira (v8.20.10#820010)
[GitHub] [kafka] vamossagar12 commented on pull request #13095: KAFKA-14580: Moving EndToEndLatency from core to tools module
vamossagar12 commented on PR #13095: URL: https://github.com/apache/kafka/pull/13095#issuecomment-1437369614 Thanks @mimaison . I addressed the comments. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Resolved] (KAFKA-8752) Ensure plugin classes are instantiable when discovering plugins
[ https://issues.apache.org/jira/browse/KAFKA-8752?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Alexandre Dupriez resolved KAFKA-8752. -- Resolution: Not A Problem > Ensure plugin classes are instantiable when discovering plugins > --- > > Key: KAFKA-8752 > URL: https://issues.apache.org/jira/browse/KAFKA-8752 > Project: Kafka > Issue Type: Bug > Components: core >Reporter: Alexandre Dupriez >Assignee: Alexandre Dupriez >Priority: Minor > Attachments: stacktrace.log > > > While running integration tests from the IntelliJ IDE, it appears plugins > fail to load in {{DelegatingClassLoader.scanUrlsAndAddPlugins}}. The reason > was, in this case, that the class > {{org.apache.kafka.connect.connector.ConnectorReconfigurationTest$TestConnector}} > could not be instantiated - which it does not intend to be. > The problem does not appear when running integration tests with Gradle as the > runtime closure is different from IntelliJ - which includes test sources from > module dependencies on the classpath. > While debugging this minor inconvenience, I could see that > {{DelegatingClassLoader}} performs a sanity check on the plugin class to > instantiate - as of now, it verifies the class is concrete. A quick fix for > the problem highlighted above could to add an extra condition on the Java > modifiers of the class to ensure it will be instantiable. -- This message was sent by Atlassian Jira (v8.20.10#820010)
[GitHub] [kafka] Hangleton commented on a diff in pull request #13214: KAFKA-14577: Move the scala ConsoleProducer from core to tools module
Hangleton commented on code in PR #13214: URL: https://github.com/apache/kafka/pull/13214#discussion_r1112171337 ## tools/src/main/java/org/apache/kafka/tools/MessageReader.java: ## @@ -0,0 +1,48 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.kafka.tools; + +import org.apache.kafka.clients.producer.ProducerRecord; + +import java.io.InputStream; +import java.util.Properties; + +/** + * Typical implementations of this interface convert data from an {@link InputStream} received via + * {@link MessageReader#init(InputStream, Properties)} into a {@link ProducerRecord} instance on each + * invocation of `{@link MessageReader#readMessage()}`. + * + * This is used by the {@link ConsoleProducer}. + */ +public interface MessageReader { Review Comment: Thanks @fvaleri and apologies for the delay, I will look at the KIP. Thanks! -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] mimaison commented on pull request #13100: MINOR: add size check for tagged fields
mimaison commented on PR #13100: URL: https://github.com/apache/kafka/pull/13100#issuecomment-1437298488 @showuon Thanks for the PR. The new test is failing with ``` org.opentest4j.AssertionFailedError: expected: but was: ``` https://ci-builds.apache.org/job/Kafka/job/kafka-pr/job/PR-13100/3/testReport/org.apache.kafka.common.protocol.types/ProtocolSerializationTest/Build___JDK_11_and_Scala_2_13___testReadTaggedFieldsSizeTooLarge__/ -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] Hangleton commented on pull request #13240: KAFKA-14690: Add topic IDs to OffsetCommit API and propagate for request version >= 9
Hangleton commented on PR #13240: URL: https://github.com/apache/kafka/pull/13240#issuecomment-1437297790 Hello David, I updated the PR to take into account your comments and have been adding tests. Almost ready for review. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] nizhikov closed pull request #13271: KAFKA-14730 AdminOperationException moved to java
nizhikov closed pull request #13271: KAFKA-14730 AdminOperationException moved to java URL: https://github.com/apache/kafka/pull/13271 -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] nizhikov commented on pull request #13271: KAFKA-14730 AdminOperationException moved to java
nizhikov commented on PR #13271: URL: https://github.com/apache/kafka/pull/13271#issuecomment-1437279016 @mimaison OK. Do you have some guidelines on how move should be implemented exactly? If straight rewriting don't work here. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] nizhikov commented on pull request #13247: KAFKA-14595 Move value objects of ReassignPartitionsCommand to java
nizhikov commented on PR #13247: URL: https://github.com/apache/kafka/pull/13247#issuecomment-1437274047 @mimaison > I think we should directly move the classes to the tools module instead of temporarily keeping them in core Are you suggesting to perform all moving in one PR? Or you have some plan how to performa moving in several steps? If yes, please, share som guideline on how should I implement it. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] mimaison commented on pull request #13271: KAFKA-14730 AdminOperationException moved to java
mimaison commented on PR #13271: URL: https://github.com/apache/kafka/pull/13271#issuecomment-1437268624 Thanks for the PR. Rather than updating all the tools, I wonder if we should just create the new exception in tools. Then each tool can start using it when it's moved. There are a few other PRs in flights so let's avoid creating many conflicts. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] mimaison commented on pull request #13247: KAFKA-14595 Move value objects of ReassignPartitionsCommand to java
mimaison commented on PR #13247: URL: https://github.com/apache/kafka/pull/13247#issuecomment-1437252881 I think we should directly move the classes to the tools module instead of temporarily keeping them in core. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] mimaison commented on a diff in pull request #13095: KAFKA-14580: Moving EndToEndLatency from core to tools module
mimaison commented on code in PR #13095: URL: https://github.com/apache/kafka/pull/13095#discussion_r1112025397 ## tools/src/main/java/org/apache/kafka/tools/EndToEndLatency.java: ## @@ -0,0 +1,223 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.kafka.tools; + +import org.apache.kafka.clients.CommonClientConfigs; +import org.apache.kafka.clients.admin.Admin; +import org.apache.kafka.clients.admin.NewTopic; +import org.apache.kafka.clients.consumer.ConsumerConfig; +import org.apache.kafka.clients.consumer.ConsumerRecords; +import org.apache.kafka.clients.consumer.KafkaConsumer; +import org.apache.kafka.clients.producer.KafkaProducer; +import org.apache.kafka.clients.producer.ProducerConfig; +import org.apache.kafka.clients.producer.ProducerRecord; +import org.apache.kafka.common.TopicPartition; +import org.apache.kafka.common.utils.Exit; +import org.apache.kafka.common.utils.Utils; + +import java.io.IOException; +import java.nio.charset.StandardCharsets; +import java.time.Duration; +import java.util.Arrays; +import java.util.Collections; +import java.util.Properties; +import java.util.List; +import java.util.Random; +import java.util.Optional; +import java.util.concurrent.ExecutionException; +import java.util.stream.Collectors; + +/** + * This class records the average end to end latency for a single message to travel through Kafka + * + * broker_list = location of the bootstrap broker for both the producer and the consumer Review Comment: Can we update the comment to include the topic argument that's required too? Also let's format this javadoc comment a bit so it renders nicely ## tools/src/main/java/org/apache/kafka/tools/EndToEndLatency.java: ## @@ -0,0 +1,223 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.kafka.tools; + +import org.apache.kafka.clients.CommonClientConfigs; +import org.apache.kafka.clients.admin.Admin; +import org.apache.kafka.clients.admin.NewTopic; +import org.apache.kafka.clients.consumer.ConsumerConfig; +import org.apache.kafka.clients.consumer.ConsumerRecords; +import org.apache.kafka.clients.consumer.KafkaConsumer; +import org.apache.kafka.clients.producer.KafkaProducer; +import org.apache.kafka.clients.producer.ProducerConfig; +import org.apache.kafka.clients.producer.ProducerRecord; +import org.apache.kafka.common.TopicPartition; +import org.apache.kafka.common.utils.Exit; +import org.apache.kafka.common.utils.Utils; + +import java.io.IOException; +import java.nio.charset.StandardCharsets; +import java.time.Duration; +import java.util.Arrays; +import java.util.Collections; +import java.util.Properties; +import java.util.List; +import java.util.Random; +import java.util.Optional; +import java.util.concurrent.ExecutionException; +import java.util.stream.Collectors; + +/** + * This class records the average end to end latency for a single message to travel through Kafka + * + * broker_list = location of the bootstrap broker for both the producer and the consumer + * num_messages = # messages to send + * producer_acks = See ProducerConfig.ACKS_DOC + * message_size_bytes = size of each message in bytes + * + * e.g. [localhost:9092 test 1 1 20] + */ +public class EndToEndLatency { +private final static long POLL_TIMEOUT_MS = 6; +private final static short DEFAULT_REPLICATION_FACTOR = 1; +private final static int DEFAULT_NUM_PARTITIONS = 1; + +public stat
[GitHub] [kafka] rajinisivaram commented on pull request #12990: KAFKA-14451: Rack-aware consumer partition assignment for RangeAssignor (KIP-881)
rajinisivaram commented on PR #12990: URL: https://github.com/apache/kafka/pull/12990#issuecomment-1437118603 @dajac Thanks for the review. I have addressed the comments and left some questions. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] rajinisivaram commented on a diff in pull request #12990: KAFKA-14451: Rack-aware consumer partition assignment for RangeAssignor (KIP-881)
rajinisivaram commented on code in PR #12990: URL: https://github.com/apache/kafka/pull/12990#discussion_r1112029042 ## core/src/test/scala/integration/kafka/api/PlaintextConsumerTest.scala: ## @@ -1951,20 +1954,47 @@ class PlaintextConsumerTest extends BaseConsumerTest { } @Test - def testConsumerRackIdPropagatedToPartitionAssignor(): Unit = { -consumerConfig.setProperty(ConsumerConfig.CLIENT_RACK_CONFIG, "rack-a") - consumerConfig.setProperty(ConsumerConfig.PARTITION_ASSIGNMENT_STRATEGY_CONFIG, classOf[RackAwareAssignor].getName) -val consumer = createConsumer() -consumer.subscribe(Set(topic).asJava) -awaitAssignment(consumer, Set(tp, tp2)) - } -} + def testRackAwareRangeAssignor(): Unit = { Review Comment: I wanted to put this test into a class that had FFF enabled, but I couldn't find any integration tests for FFF. So included here. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] rajinisivaram commented on a diff in pull request #12990: KAFKA-14451: Rack-aware consumer partition assignment for RangeAssignor (KIP-881)
rajinisivaram commented on code in PR #12990: URL: https://github.com/apache/kafka/pull/12990#discussion_r1112027236 ## core/src/test/scala/integration/kafka/api/PlaintextConsumerTest.scala: ## @@ -1951,20 +1954,47 @@ class PlaintextConsumerTest extends BaseConsumerTest { } @Test - def testConsumerRackIdPropagatedToPartitionAssignor(): Unit = { -consumerConfig.setProperty(ConsumerConfig.CLIENT_RACK_CONFIG, "rack-a") - consumerConfig.setProperty(ConsumerConfig.PARTITION_ASSIGNMENT_STRATEGY_CONFIG, classOf[RackAwareAssignor].getName) -val consumer = createConsumer() -consumer.subscribe(Set(topic).asJava) -awaitAssignment(consumer, Set(tp, tp2)) - } -} + def testRackAwareRangeAssignor(): Unit = { +val partitionList = servers.indices.toList + +val topicWithAllPartitionsOnAllRacks = "topicWithAllPartitionsOnAllRacks" +createTopic(topicWithAllPartitionsOnAllRacks, servers.size, servers.size) + +// Racks are in order of broker ids, assign leaders in reverse order +val topicWithSingleRackPartitions = "topicWithSingleRackPartitions" +createTopicWithAssignment(topicWithSingleRackPartitions, partitionList.map(i => (i, Seq(servers.size - i - 1))).toMap) + +// Create consumers with instance ids in ascending order, with racks in the same order. + consumerConfig.setProperty(ConsumerConfig.PARTITION_ASSIGNMENT_STRATEGY_CONFIG, classOf[RangeAssignor].getName) +val consumers = servers.map { server => + consumerConfig.setProperty(ConsumerConfig.CLIENT_RACK_CONFIG, server.config.rack.orNull) + consumerConfig.setProperty(ConsumerConfig.GROUP_INSTANCE_ID_CONFIG, s"instance-${server.config.brokerId}") + createConsumer() +} + +val executor = Executors.newFixedThreadPool(consumers.size) +def waitForAssignments(assignments: List[Set[TopicPartition]]): Unit = { + val futures = consumers.zipWithIndex.map { case (consumer, i) => +executor.submit(() => awaitAssignment(consumer, assignments(i)), 0) + } + futures.foreach(future => assertEquals(0, future.get(20, TimeUnit.SECONDS))) +} -class RackAwareAssignor extends RoundRobinAssignor { - override def assign(partitionsPerTopic: util.Map[String, Integer], subscriptions: util.Map[String, ConsumerPartitionAssignor.Subscription]): util.Map[String, util.List[TopicPartition]] = { -assertEquals(1, subscriptions.size()) -assertEquals(Optional.of("rack-a"), subscriptions.values.asScala.head.rackId) -super.assign(partitionsPerTopic, subscriptions) +try { + // Rack-based assignment results in partitions assigned in reverse order since partition racks are in the reverse order. + consumers.foreach(_.subscribe(Collections.singleton(topicWithSingleRackPartitions))) + waitForAssignments(partitionList.reverse.map(p => Set(new TopicPartition(topicWithSingleRackPartitions, p + + // Non-rack-aware assignment results in ordered partitions. + consumers.foreach(_.subscribe(Collections.singleton(topicWithAllPartitionsOnAllRacks))) + waitForAssignments(partitionList.map(p => Set(new TopicPartition(topicWithAllPartitionsOnAllRacks, p + + // Rack-aware assignment with co-partitioning results in reverse assignment for both topics. + consumers.foreach(_.subscribe(Set(topicWithSingleRackPartitions, topicWithAllPartitionsOnAllRacks).asJava)) + waitForAssignments(partitionList.reverse.map(p => Set(new TopicPartition(topicWithAllPartitionsOnAllRacks, p), new TopicPartition(topicWithSingleRackPartitions, p Review Comment: No, we don't do any leader-specific assignment, we are assuming all replicas are equal. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] rajinisivaram commented on a diff in pull request #12990: KAFKA-14451: Rack-aware consumer partition assignment for RangeAssignor (KIP-881)
rajinisivaram commented on code in PR #12990: URL: https://github.com/apache/kafka/pull/12990#discussion_r1112026015 ## core/src/test/scala/integration/kafka/api/PlaintextConsumerTest.scala: ## @@ -29,24 +35,21 @@ import org.apache.kafka.common.utils.Utils import org.apache.kafka.test.{MockConsumerInterceptor, MockProducerInterceptor} import org.junit.jupiter.api.Assertions._ import org.junit.jupiter.api.Test - -import scala.jdk.CollectionConverters._ -import scala.collection.mutable.Buffer -import kafka.server.QuotaType -import kafka.server.KafkaServer -import org.apache.kafka.clients.admin.NewPartitions -import org.apache.kafka.clients.admin.NewTopic -import org.apache.kafka.common.config.TopicConfig import org.junit.jupiter.params.ParameterizedTest import org.junit.jupiter.params.provider.ValueSource -import java.util.concurrent.TimeUnit -import java.util.concurrent.locks.ReentrantLock import scala.collection.mutable +import scala.collection.mutable.Buffer +import scala.jdk.CollectionConverters._ /* We have some tests in this class instead of `BaseConsumerTest` in order to keep the build time under control. */ class PlaintextConsumerTest extends BaseConsumerTest { + override def modifyConfigs(props: collection.Seq[Properties]): Unit = { +super.modifyConfigs(props) +props.zipWithIndex.foreach{ case (p, i) => p.setProperty(KafkaConfig.RackProp, i.toString) } Review Comment: Done. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] rajinisivaram commented on a diff in pull request #12990: KAFKA-14451: Rack-aware consumer partition assignment for RangeAssignor (KIP-881)
rajinisivaram commented on code in PR #12990: URL: https://github.com/apache/kafka/pull/12990#discussion_r1112025710 ## clients/src/main/java/org/apache/kafka/clients/consumer/RangeAssignor.java: ## @@ -76,43 +99,185 @@ private Map> consumersPerTopic(Map> topicToConsumers = new HashMap<>(); for (Map.Entry subscriptionEntry : consumerMetadata.entrySet()) { String consumerId = subscriptionEntry.getKey(); -MemberInfo memberInfo = new MemberInfo(consumerId, subscriptionEntry.getValue().groupInstanceId()); -for (String topic : subscriptionEntry.getValue().topics()) { +Subscription subscription = subscriptionEntry.getValue(); +MemberInfo memberInfo = new MemberInfo(consumerId, subscription.groupInstanceId(), subscription.rackId()); +for (String topic : subscription.topics()) { put(topicToConsumers, topic, memberInfo); } } return topicToConsumers; } @Override -public Map> assign(Map partitionsPerTopic, -Map subscriptions) { +public Map> assignPartitions(Map> partitionsPerTopic, + Map subscriptions) { Map> consumersPerTopic = consumersPerTopic(subscriptions); +List topicAssignmentStates = partitionsPerTopic.entrySet().stream() +.filter(e -> !e.getValue().isEmpty()) +.map(e -> new TopicAssignmentState(e.getKey(), e.getValue(), consumersPerTopic.get(e.getKey( +.collect(Collectors.toList()); Map> assignment = new HashMap<>(); for (String memberId : subscriptions.keySet()) assignment.put(memberId, new ArrayList<>()); -for (Map.Entry> topicEntry : consumersPerTopic.entrySet()) { -String topic = topicEntry.getKey(); -List consumersForTopic = topicEntry.getValue(); +boolean useRackAware = topicAssignmentStates.stream().anyMatch(t -> t.needsRackAwareAssignment); +if (useRackAware) +assignWithRackMatching(topicAssignmentStates, assignment); + +topicAssignmentStates.forEach(t -> assignRanges(t, (c, tp) -> true, assignment)); + +if (useRackAware) +assignment.values().forEach(list -> list.sort(PARTITION_COMPARATOR)); +return assignment; +} + +// This method is not used, but retained for compatibility with any custom assignors that extend this class. +@Override +public Map> assign(Map partitionsPerTopic, +Map subscriptions) { +return assignPartitions(partitionInfosWithoutRacks(partitionsPerTopic), subscriptions); +} + +private void assignRanges(TopicAssignmentState assignmentState, + BiFunction mayAssign, + Map> assignment) { +for (String consumer : assignmentState.consumers) { +if (assignmentState.unassignedPartitions.isEmpty()) +break; +List assignablePartitions = assignmentState.unassignedPartitions.stream() +.filter(tp -> mayAssign.apply(consumer, tp)) +.collect(Collectors.toList()); -Integer numPartitionsForTopic = partitionsPerTopic.get(topic); -if (numPartitionsForTopic == null) +int maxAssignable = Math.min(assignmentState.maxAssignable(consumer), assignablePartitions.size()); +if (maxAssignable <= 0) continue; -Collections.sort(consumersForTopic); +assign(consumer, assignablePartitions.subList(0, maxAssignable), assignmentState, assignment); +} +} + +private void assignWithRackMatching(Collection assignmentStates, +Map> assignment) { -int numPartitionsPerConsumer = numPartitionsForTopic / consumersForTopic.size(); -int consumersWithExtraPartition = numPartitionsForTopic % consumersForTopic.size(); +assignmentStates.stream().collect(Collectors.groupingBy(t -> t.consumers)).forEach((consumers, states) -> { +states.stream().collect(Collectors.groupingBy(t -> t.partitionRacks.size())).forEach((numPartitions, coPartitionedStates) -> { +if (coPartitionedStates.size() > 1) +assignCoPartitionedWithRackMatching(consumers, numPartitions, states, assignment); +else { +TopicAssignmentState state = coPartitionedStates.get(0); +if (state.needsRackAwareAssignment) +assignRanges(state, state::racksMatch, assignment); +} +}); +}); +} + +private void assignCoPartitionedWithRackMatching(List consumers, + int nu
[GitHub] [kafka] rajinisivaram commented on a diff in pull request #12990: KAFKA-14451: Rack-aware consumer partition assignment for RangeAssignor (KIP-881)
rajinisivaram commented on code in PR #12990: URL: https://github.com/apache/kafka/pull/12990#discussion_r1112016639 ## clients/src/main/java/org/apache/kafka/clients/consumer/RangeAssignor.java: ## @@ -76,43 +99,185 @@ private Map> consumersPerTopic(Map> topicToConsumers = new HashMap<>(); for (Map.Entry subscriptionEntry : consumerMetadata.entrySet()) { String consumerId = subscriptionEntry.getKey(); -MemberInfo memberInfo = new MemberInfo(consumerId, subscriptionEntry.getValue().groupInstanceId()); -for (String topic : subscriptionEntry.getValue().topics()) { +Subscription subscription = subscriptionEntry.getValue(); +MemberInfo memberInfo = new MemberInfo(consumerId, subscription.groupInstanceId(), subscription.rackId()); +for (String topic : subscription.topics()) { put(topicToConsumers, topic, memberInfo); } } return topicToConsumers; } @Override -public Map> assign(Map partitionsPerTopic, -Map subscriptions) { +public Map> assignPartitions(Map> partitionsPerTopic, + Map subscriptions) { Map> consumersPerTopic = consumersPerTopic(subscriptions); +List topicAssignmentStates = partitionsPerTopic.entrySet().stream() +.filter(e -> !e.getValue().isEmpty()) +.map(e -> new TopicAssignmentState(e.getKey(), e.getValue(), consumersPerTopic.get(e.getKey( +.collect(Collectors.toList()); Map> assignment = new HashMap<>(); for (String memberId : subscriptions.keySet()) assignment.put(memberId, new ArrayList<>()); -for (Map.Entry> topicEntry : consumersPerTopic.entrySet()) { -String topic = topicEntry.getKey(); -List consumersForTopic = topicEntry.getValue(); +boolean useRackAware = topicAssignmentStates.stream().anyMatch(t -> t.needsRackAwareAssignment); +if (useRackAware) +assignWithRackMatching(topicAssignmentStates, assignment); + +topicAssignmentStates.forEach(t -> assignRanges(t, (c, tp) -> true, assignment)); + +if (useRackAware) +assignment.values().forEach(list -> list.sort(PARTITION_COMPARATOR)); +return assignment; +} + +// This method is not used, but retained for compatibility with any custom assignors that extend this class. +@Override +public Map> assign(Map partitionsPerTopic, +Map subscriptions) { +return assignPartitions(partitionInfosWithoutRacks(partitionsPerTopic), subscriptions); +} + +private void assignRanges(TopicAssignmentState assignmentState, + BiFunction mayAssign, + Map> assignment) { +for (String consumer : assignmentState.consumers) { +if (assignmentState.unassignedPartitions.isEmpty()) +break; +List assignablePartitions = assignmentState.unassignedPartitions.stream() +.filter(tp -> mayAssign.apply(consumer, tp)) +.collect(Collectors.toList()); -Integer numPartitionsForTopic = partitionsPerTopic.get(topic); -if (numPartitionsForTopic == null) +int maxAssignable = Math.min(assignmentState.maxAssignable(consumer), assignablePartitions.size()); +if (maxAssignable <= 0) continue; -Collections.sort(consumersForTopic); +assign(consumer, assignablePartitions.subList(0, maxAssignable), assignmentState, assignment); +} +} + +private void assignWithRackMatching(Collection assignmentStates, +Map> assignment) { -int numPartitionsPerConsumer = numPartitionsForTopic / consumersForTopic.size(); -int consumersWithExtraPartition = numPartitionsForTopic % consumersForTopic.size(); +assignmentStates.stream().collect(Collectors.groupingBy(t -> t.consumers)).forEach((consumers, states) -> { +states.stream().collect(Collectors.groupingBy(t -> t.partitionRacks.size())).forEach((numPartitions, coPartitionedStates) -> { +if (coPartitionedStates.size() > 1) +assignCoPartitionedWithRackMatching(consumers, numPartitions, states, assignment); +else { +TopicAssignmentState state = coPartitionedStates.get(0); +if (state.needsRackAwareAssignment) +assignRanges(state, state::racksMatch, assignment); +} +}); +}); +} + +private void assignCoPartitionedWithRackMatching(List consumers, + int nu
[GitHub] [kafka] rajinisivaram commented on a diff in pull request #12990: KAFKA-14451: Rack-aware consumer partition assignment for RangeAssignor (KIP-881)
rajinisivaram commented on code in PR #12990: URL: https://github.com/apache/kafka/pull/12990#discussion_r1112015848 ## clients/src/main/java/org/apache/kafka/clients/consumer/RangeAssignor.java: ## @@ -76,43 +99,185 @@ private Map> consumersPerTopic(Map> topicToConsumers = new HashMap<>(); for (Map.Entry subscriptionEntry : consumerMetadata.entrySet()) { String consumerId = subscriptionEntry.getKey(); -MemberInfo memberInfo = new MemberInfo(consumerId, subscriptionEntry.getValue().groupInstanceId()); -for (String topic : subscriptionEntry.getValue().topics()) { +Subscription subscription = subscriptionEntry.getValue(); +MemberInfo memberInfo = new MemberInfo(consumerId, subscription.groupInstanceId(), subscription.rackId()); +for (String topic : subscription.topics()) { put(topicToConsumers, topic, memberInfo); } } return topicToConsumers; } @Override -public Map> assign(Map partitionsPerTopic, -Map subscriptions) { +public Map> assignPartitions(Map> partitionsPerTopic, + Map subscriptions) { Map> consumersPerTopic = consumersPerTopic(subscriptions); +List topicAssignmentStates = partitionsPerTopic.entrySet().stream() +.filter(e -> !e.getValue().isEmpty()) +.map(e -> new TopicAssignmentState(e.getKey(), e.getValue(), consumersPerTopic.get(e.getKey( +.collect(Collectors.toList()); Map> assignment = new HashMap<>(); for (String memberId : subscriptions.keySet()) assignment.put(memberId, new ArrayList<>()); -for (Map.Entry> topicEntry : consumersPerTopic.entrySet()) { -String topic = topicEntry.getKey(); -List consumersForTopic = topicEntry.getValue(); +boolean useRackAware = topicAssignmentStates.stream().anyMatch(t -> t.needsRackAwareAssignment); +if (useRackAware) +assignWithRackMatching(topicAssignmentStates, assignment); + +topicAssignmentStates.forEach(t -> assignRanges(t, (c, tp) -> true, assignment)); + +if (useRackAware) +assignment.values().forEach(list -> list.sort(PARTITION_COMPARATOR)); +return assignment; +} + +// This method is not used, but retained for compatibility with any custom assignors that extend this class. +@Override +public Map> assign(Map partitionsPerTopic, +Map subscriptions) { +return assignPartitions(partitionInfosWithoutRacks(partitionsPerTopic), subscriptions); +} + +private void assignRanges(TopicAssignmentState assignmentState, + BiFunction mayAssign, + Map> assignment) { +for (String consumer : assignmentState.consumers) { +if (assignmentState.unassignedPartitions.isEmpty()) +break; +List assignablePartitions = assignmentState.unassignedPartitions.stream() +.filter(tp -> mayAssign.apply(consumer, tp)) +.collect(Collectors.toList()); -Integer numPartitionsForTopic = partitionsPerTopic.get(topic); -if (numPartitionsForTopic == null) +int maxAssignable = Math.min(assignmentState.maxAssignable(consumer), assignablePartitions.size()); +if (maxAssignable <= 0) continue; -Collections.sort(consumersForTopic); +assign(consumer, assignablePartitions.subList(0, maxAssignable), assignmentState, assignment); +} +} + +private void assignWithRackMatching(Collection assignmentStates, +Map> assignment) { -int numPartitionsPerConsumer = numPartitionsForTopic / consumersForTopic.size(); -int consumersWithExtraPartition = numPartitionsForTopic % consumersForTopic.size(); +assignmentStates.stream().collect(Collectors.groupingBy(t -> t.consumers)).forEach((consumers, states) -> { +states.stream().collect(Collectors.groupingBy(t -> t.partitionRacks.size())).forEach((numPartitions, coPartitionedStates) -> { +if (coPartitionedStates.size() > 1) +assignCoPartitionedWithRackMatching(consumers, numPartitions, states, assignment); +else { +TopicAssignmentState state = coPartitionedStates.get(0); +if (state.needsRackAwareAssignment) +assignRanges(state, state::racksMatch, assignment); +} +}); +}); +} + +private void assignCoPartitionedWithRackMatching(List consumers, + int nu
[GitHub] [kafka] rajinisivaram commented on a diff in pull request #12990: KAFKA-14451: Rack-aware consumer partition assignment for RangeAssignor (KIP-881)
rajinisivaram commented on code in PR #12990: URL: https://github.com/apache/kafka/pull/12990#discussion_r1112014546 ## clients/src/main/java/org/apache/kafka/clients/consumer/RangeAssignor.java: ## @@ -76,43 +99,185 @@ private Map> consumersPerTopic(Map> topicToConsumers = new HashMap<>(); for (Map.Entry subscriptionEntry : consumerMetadata.entrySet()) { String consumerId = subscriptionEntry.getKey(); -MemberInfo memberInfo = new MemberInfo(consumerId, subscriptionEntry.getValue().groupInstanceId()); -for (String topic : subscriptionEntry.getValue().topics()) { +Subscription subscription = subscriptionEntry.getValue(); +MemberInfo memberInfo = new MemberInfo(consumerId, subscription.groupInstanceId(), subscription.rackId()); +for (String topic : subscription.topics()) { put(topicToConsumers, topic, memberInfo); } } return topicToConsumers; } @Override -public Map> assign(Map partitionsPerTopic, -Map subscriptions) { +public Map> assignPartitions(Map> partitionsPerTopic, + Map subscriptions) { Map> consumersPerTopic = consumersPerTopic(subscriptions); +List topicAssignmentStates = partitionsPerTopic.entrySet().stream() +.filter(e -> !e.getValue().isEmpty()) +.map(e -> new TopicAssignmentState(e.getKey(), e.getValue(), consumersPerTopic.get(e.getKey( +.collect(Collectors.toList()); Map> assignment = new HashMap<>(); for (String memberId : subscriptions.keySet()) assignment.put(memberId, new ArrayList<>()); -for (Map.Entry> topicEntry : consumersPerTopic.entrySet()) { -String topic = topicEntry.getKey(); -List consumersForTopic = topicEntry.getValue(); +boolean useRackAware = topicAssignmentStates.stream().anyMatch(t -> t.needsRackAwareAssignment); +if (useRackAware) +assignWithRackMatching(topicAssignmentStates, assignment); + +topicAssignmentStates.forEach(t -> assignRanges(t, (c, tp) -> true, assignment)); + +if (useRackAware) +assignment.values().forEach(list -> list.sort(PARTITION_COMPARATOR)); +return assignment; +} + +// This method is not used, but retained for compatibility with any custom assignors that extend this class. +@Override +public Map> assign(Map partitionsPerTopic, +Map subscriptions) { +return assignPartitions(partitionInfosWithoutRacks(partitionsPerTopic), subscriptions); +} + +private void assignRanges(TopicAssignmentState assignmentState, + BiFunction mayAssign, + Map> assignment) { +for (String consumer : assignmentState.consumers) { +if (assignmentState.unassignedPartitions.isEmpty()) +break; +List assignablePartitions = assignmentState.unassignedPartitions.stream() +.filter(tp -> mayAssign.apply(consumer, tp)) +.collect(Collectors.toList()); -Integer numPartitionsForTopic = partitionsPerTopic.get(topic); -if (numPartitionsForTopic == null) +int maxAssignable = Math.min(assignmentState.maxAssignable(consumer), assignablePartitions.size()); +if (maxAssignable <= 0) continue; -Collections.sort(consumersForTopic); +assign(consumer, assignablePartitions.subList(0, maxAssignable), assignmentState, assignment); +} +} + +private void assignWithRackMatching(Collection assignmentStates, +Map> assignment) { -int numPartitionsPerConsumer = numPartitionsForTopic / consumersForTopic.size(); -int consumersWithExtraPartition = numPartitionsForTopic % consumersForTopic.size(); +assignmentStates.stream().collect(Collectors.groupingBy(t -> t.consumers)).forEach((consumers, states) -> { +states.stream().collect(Collectors.groupingBy(t -> t.partitionRacks.size())).forEach((numPartitions, coPartitionedStates) -> { +if (coPartitionedStates.size() > 1) +assignCoPartitionedWithRackMatching(consumers, numPartitions, states, assignment); +else { +TopicAssignmentState state = coPartitionedStates.get(0); +if (state.needsRackAwareAssignment) +assignRanges(state, state::racksMatch, assignment); +} +}); +}); +} + +private void assignCoPartitionedWithRackMatching(List consumers, + int nu
[GitHub] [kafka] rajinisivaram commented on a diff in pull request #12990: KAFKA-14451: Rack-aware consumer partition assignment for RangeAssignor (KIP-881)
rajinisivaram commented on code in PR #12990: URL: https://github.com/apache/kafka/pull/12990#discussion_r1112013804 ## clients/src/main/java/org/apache/kafka/clients/consumer/RangeAssignor.java: ## @@ -76,43 +99,185 @@ private Map> consumersPerTopic(Map> topicToConsumers = new HashMap<>(); for (Map.Entry subscriptionEntry : consumerMetadata.entrySet()) { String consumerId = subscriptionEntry.getKey(); -MemberInfo memberInfo = new MemberInfo(consumerId, subscriptionEntry.getValue().groupInstanceId()); -for (String topic : subscriptionEntry.getValue().topics()) { +Subscription subscription = subscriptionEntry.getValue(); +MemberInfo memberInfo = new MemberInfo(consumerId, subscription.groupInstanceId(), subscription.rackId()); +for (String topic : subscription.topics()) { put(topicToConsumers, topic, memberInfo); } } return topicToConsumers; } @Override -public Map> assign(Map partitionsPerTopic, -Map subscriptions) { +public Map> assignPartitions(Map> partitionsPerTopic, + Map subscriptions) { Map> consumersPerTopic = consumersPerTopic(subscriptions); +List topicAssignmentStates = partitionsPerTopic.entrySet().stream() +.filter(e -> !e.getValue().isEmpty()) +.map(e -> new TopicAssignmentState(e.getKey(), e.getValue(), consumersPerTopic.get(e.getKey( +.collect(Collectors.toList()); Map> assignment = new HashMap<>(); for (String memberId : subscriptions.keySet()) assignment.put(memberId, new ArrayList<>()); -for (Map.Entry> topicEntry : consumersPerTopic.entrySet()) { -String topic = topicEntry.getKey(); -List consumersForTopic = topicEntry.getValue(); +boolean useRackAware = topicAssignmentStates.stream().anyMatch(t -> t.needsRackAwareAssignment); +if (useRackAware) +assignWithRackMatching(topicAssignmentStates, assignment); + +topicAssignmentStates.forEach(t -> assignRanges(t, (c, tp) -> true, assignment)); + +if (useRackAware) +assignment.values().forEach(list -> list.sort(PARTITION_COMPARATOR)); +return assignment; +} + +// This method is not used, but retained for compatibility with any custom assignors that extend this class. +@Override +public Map> assign(Map partitionsPerTopic, +Map subscriptions) { +return assignPartitions(partitionInfosWithoutRacks(partitionsPerTopic), subscriptions); +} + +private void assignRanges(TopicAssignmentState assignmentState, + BiFunction mayAssign, + Map> assignment) { +for (String consumer : assignmentState.consumers) { +if (assignmentState.unassignedPartitions.isEmpty()) +break; +List assignablePartitions = assignmentState.unassignedPartitions.stream() +.filter(tp -> mayAssign.apply(consumer, tp)) +.collect(Collectors.toList()); -Integer numPartitionsForTopic = partitionsPerTopic.get(topic); -if (numPartitionsForTopic == null) +int maxAssignable = Math.min(assignmentState.maxAssignable(consumer), assignablePartitions.size()); +if (maxAssignable <= 0) continue; -Collections.sort(consumersForTopic); +assign(consumer, assignablePartitions.subList(0, maxAssignable), assignmentState, assignment); +} +} + +private void assignWithRackMatching(Collection assignmentStates, +Map> assignment) { -int numPartitionsPerConsumer = numPartitionsForTopic / consumersForTopic.size(); -int consumersWithExtraPartition = numPartitionsForTopic % consumersForTopic.size(); +assignmentStates.stream().collect(Collectors.groupingBy(t -> t.consumers)).forEach((consumers, states) -> { +states.stream().collect(Collectors.groupingBy(t -> t.partitionRacks.size())).forEach((numPartitions, coPartitionedStates) -> { +if (coPartitionedStates.size() > 1) +assignCoPartitionedWithRackMatching(consumers, numPartitions, states, assignment); +else { +TopicAssignmentState state = coPartitionedStates.get(0); +if (state.needsRackAwareAssignment) +assignRanges(state, state::racksMatch, assignment); +} +}); +}); +} + +private void assignCoPartitionedWithRackMatching(List consumers, + int nu
[GitHub] [kafka] rajinisivaram commented on a diff in pull request #12990: KAFKA-14451: Rack-aware consumer partition assignment for RangeAssignor (KIP-881)
rajinisivaram commented on code in PR #12990: URL: https://github.com/apache/kafka/pull/12990#discussion_r1112011627 ## clients/src/main/java/org/apache/kafka/clients/consumer/RangeAssignor.java: ## @@ -76,43 +99,185 @@ private Map> consumersPerTopic(Map> topicToConsumers = new HashMap<>(); for (Map.Entry subscriptionEntry : consumerMetadata.entrySet()) { String consumerId = subscriptionEntry.getKey(); -MemberInfo memberInfo = new MemberInfo(consumerId, subscriptionEntry.getValue().groupInstanceId()); -for (String topic : subscriptionEntry.getValue().topics()) { +Subscription subscription = subscriptionEntry.getValue(); +MemberInfo memberInfo = new MemberInfo(consumerId, subscription.groupInstanceId(), subscription.rackId()); +for (String topic : subscription.topics()) { put(topicToConsumers, topic, memberInfo); } } return topicToConsumers; } @Override -public Map> assign(Map partitionsPerTopic, -Map subscriptions) { +public Map> assignPartitions(Map> partitionsPerTopic, + Map subscriptions) { Map> consumersPerTopic = consumersPerTopic(subscriptions); +List topicAssignmentStates = partitionsPerTopic.entrySet().stream() +.filter(e -> !e.getValue().isEmpty()) +.map(e -> new TopicAssignmentState(e.getKey(), e.getValue(), consumersPerTopic.get(e.getKey( +.collect(Collectors.toList()); Map> assignment = new HashMap<>(); for (String memberId : subscriptions.keySet()) assignment.put(memberId, new ArrayList<>()); -for (Map.Entry> topicEntry : consumersPerTopic.entrySet()) { -String topic = topicEntry.getKey(); -List consumersForTopic = topicEntry.getValue(); +boolean useRackAware = topicAssignmentStates.stream().anyMatch(t -> t.needsRackAwareAssignment); +if (useRackAware) +assignWithRackMatching(topicAssignmentStates, assignment); + +topicAssignmentStates.forEach(t -> assignRanges(t, (c, tp) -> true, assignment)); Review Comment: Added javadoc. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] rajinisivaram commented on a diff in pull request #12990: KAFKA-14451: Rack-aware consumer partition assignment for RangeAssignor (KIP-881)
rajinisivaram commented on code in PR #12990: URL: https://github.com/apache/kafka/pull/12990#discussion_r1112011199 ## clients/src/main/java/org/apache/kafka/clients/consumer/RangeAssignor.java: ## @@ -63,9 +76,19 @@ * I0: [t0p0, t0p1, t1p0, t1p1] * I1: [t0p2, t1p2] * + * + * Rack-aware assignment is used if both consumer and partition replica racks are available and + * some partitions have replicas only on a subset of racks. We attempt to match consumer racks with + * partition replica racks on a best-effort basis, prioritizing balanced assignment over rack alignment. + * Topics with equal partition count and same set of subscribers prioritize co-partitioning guarantee + * over rack alignment. In this case, aligning partition replicas of these topics on the same racks + * will improve locality for consumers. For example, if partitions 0 of all topics have a replica on + * rack 'a', partition 1 on rack 'b' etc., partition 0 of all topics can be assigned to a consumer + * on rack 'a', partition 1 to a consumer on rack 'b' and so on. Review Comment: Yes, that is correct. ## clients/src/main/java/org/apache/kafka/clients/consumer/RangeAssignor.java: ## @@ -76,43 +99,185 @@ private Map> consumersPerTopic(Map> topicToConsumers = new HashMap<>(); for (Map.Entry subscriptionEntry : consumerMetadata.entrySet()) { Review Comment: Done. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] chia7712 opened a new pull request, #13279: KAFKA-14295 FetchMessageConversionsPerSec meter not recorded
chia7712 opened a new pull request, #13279: URL: https://github.com/apache/kafka/pull/13279 https://issues.apache.org/jira/browse/KAFKA-14295 The broker topic metric FetchMessageConversionsPerSec doesn't get recorded on a fetch message conversion. The bug is that we pass in a callback that expects a MultiRecordsSend in KafkaApis: ```scala def updateConversionStats(send: Send): Unit = { send match { case send: MultiRecordsSend if send.recordConversionStats != null => send.recordConversionStats.asScala.toMap.foreach { case (tp, stats) => updateRecordConversionStats(request, tp, stats) } case _ => } } ``` But we call this callback with a NetworkSend in the SocketServer: ```scala selector.completedSends.forEach { send => try { val response = inflightResponses.remove(send.destinationId).getOrElse { throw new IllegalStateException(s"Send for ${send.destinationId} completed, but not in `inflightResponses`") } updateRequestMetrics(response) // Invoke send completion callback response.onComplete.foreach(onComplete => onComplete(send)) ``` Note that Selector.completedSends returns a collection of NetworkSend ### Committer Checklist (excluded from commit message) - [ ] Verify design and implementation - [ ] Verify test coverage and CI build status - [ ] Verify documentation (including upgrade notes) -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Assigned] (KAFKA-14295) FetchMessageConversionsPerSec meter not recorded
[ https://issues.apache.org/jira/browse/KAFKA-14295?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Chia-Ping Tsai reassigned KAFKA-14295: -- Assignee: Chia-Ping Tsai > FetchMessageConversionsPerSec meter not recorded > > > Key: KAFKA-14295 > URL: https://issues.apache.org/jira/browse/KAFKA-14295 > Project: Kafka > Issue Type: Bug >Reporter: David Mao >Assignee: Chia-Ping Tsai >Priority: Major > > The broker topic metric FetchMessageConversionsPerSec doesn't get recorded on > a fetch message conversion. > The bug is that we pass in a callback that expects a MultiRecordsSend in > KafkaApis: > {code:java} > def updateConversionStats(send: Send): Unit = { > send match { > case send: MultiRecordsSend if send.recordConversionStats != null => > send.recordConversionStats.asScala.toMap.foreach { > case (tp, stats) => updateRecordConversionStats(request, tp, stats) > } > case _ => > } > } {code} > But we call this callback with a NetworkSend in the SocketServer: > {code:java} > selector.completedSends.forEach { send => > try { > val response = inflightResponses.remove(send.destinationId).getOrElse { > throw new IllegalStateException(s"Send for ${send.destinationId} > completed, but not in `inflightResponses`") > } > updateRequestMetrics(response) > // Invoke send completion callback > response.onComplete.foreach(onComplete => onComplete(send)) > ...{code} > Note that Selector.completedSends returns a collection of NetworkSend -- This message was sent by Atlassian Jira (v8.20.10#820010)
[GitHub] [kafka] satishd commented on pull request #13255: KAFKA 14714: Move/Rewrite RollParams, LogAppendInfo, and LeaderHwChange to storage module.
satishd commented on PR #13255: URL: https://github.com/apache/kafka/pull/13255#issuecomment-1437063403 Thanks @junrao @ijuma for your review. Addressed them with inline replies/commits. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] lucasbru commented on a diff in pull request #13025: KAFKA-14299: Fix pause and resume with state updater
lucasbru commented on code in PR #13025: URL: https://github.com/apache/kafka/pull/13025#discussion_r950411 ## streams/src/main/java/org/apache/kafka/streams/processor/internals/DefaultStateUpdater.java: ## @@ -258,21 +269,27 @@ private List getTasksAndActions() { } private void addTask(final Task task) { +final TaskId taskId = task.id(); if (isStateless(task)) { addToRestoredTasks((StreamTask) task); -log.info("Stateless active task " + task.id() + " was added to the restored tasks of the state updater"); +log.info("Stateless active task " + taskId + " was added to the restored tasks of the state updater"); +} else if (topologyMetadata.isPaused(taskId.topologyName())) { +pausedTasks.put(taskId, task); Review Comment: Sorry, almost missed this comment. I think this would work, but it would mean cycling through the list of all tasks in every single iteration of main state updater loop. I thought the pause code was only run after `commitMs` to reduce this overhead, so I thought it makes sense. It's different for resume, because we get a resume signal from the stream thread. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Created] (KAFKA-14733) Update AclAuthorizerTest to run tests for both zk and kraft mode
Purshotam Chauhan created KAFKA-14733: - Summary: Update AclAuthorizerTest to run tests for both zk and kraft mode Key: KAFKA-14733 URL: https://issues.apache.org/jira/browse/KAFKA-14733 Project: Kafka Issue Type: Improvement Reporter: Purshotam Chauhan Assignee: Purshotam Chauhan Currently, we have two test classes AclAuthorizerTest and StandardAuthorizerTest that are used exclusively for zk and kraft mode. But AclAuthorizerTest has a lot of tests covering various scenarios. We should change AclAuthorizerTest to run for both zk and kraft modes so as to keep parity between both modes. -- This message was sent by Atlassian Jira (v8.20.10#820010)
[GitHub] [kafka] lucasbru commented on pull request #13025: KAFKA-14299: Fix pause and resume with state updater
lucasbru commented on PR #13025: URL: https://github.com/apache/kafka/pull/13025#issuecomment-1436959926 > @lucasbru the pause/ resume integration test fails again for J11/S13, could you take a look into it? Ah yes, that was coming from the rebase. should be fixed now -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] lucasbru commented on a diff in pull request #13025: KAFKA-14299: Fix pause and resume with state updater
lucasbru commented on code in PR #13025: URL: https://github.com/apache/kafka/pull/13025#discussion_r909621 ## streams/src/test/java/org/apache/kafka/streams/processor/internals/StreamThreadTest.java: ## @@ -3159,7 +3159,7 @@ private void addRecord(final MockConsumer mockConsumer, } StreamTask activeTask(final TaskManager taskManager, final TopicPartition partition) { -final Stream standbys = taskManager.allTasks().values().stream().filter(Task::isActive); +final Stream standbys = taskManager.allOwnedTasks().values().stream().filter(Task::isActive); Review Comment: Good point, we don't -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] showuon commented on pull request #13172: KAFKA-14590: Move DelegationTokenCommand to tools
showuon commented on PR #13172: URL: https://github.com/apache/kafka/pull/13172#issuecomment-1436957769 > > @tinaselenge , thanks for the PR. Some questions: > > > > 1. This PR creates a new `DelegationTokenCommand` class, but there's no old `DelegationTokenCommand` class removal. Why is that? > > 2. The original `DelegationTokenCommandTest` is an integration test, but now we changed to unit test by mockAdminClient, why do we change that? > > > > Thanks. > > Hi @showuon > > 1. I have removed the existing Scala class and its test. > > 2. I thought it's good enough to test it using the mock as it's not really doing anything specific with the cluster. I understand that changes the test behaviour. If you think we should test the tool against an integration test cluster, I'm happy to change it back. Please let me know. For (1), thanks for the update For (2), yes, I think the integration test is important, especially after KRaft starting to support delegation token, we should rely on integration tests to make sure everything works fine on both ZK and KRaft mode. But the mock client implementation are also great to be added. Those could also be kept. Thank you. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] tinaselenge commented on pull request #13172: KAFKA-14590: Move DelegationTokenCommand to tools
tinaselenge commented on PR #13172: URL: https://github.com/apache/kafka/pull/13172#issuecomment-1436890093 > @tinaselenge , thanks for the PR. Some questions: > > 1. This PR creates a new `DelegationTokenCommand` class, but there's no old `DelegationTokenCommand` class removal. Why is that? > 2. The original `DelegationTokenCommandTest` is an integration test, but now we changed to unit test by mockAdminClient, why do we change that? > > Thanks. Hi @showuon 1. I have removed the existing Scala class and its test. 2. I thought it's good enough to test it using the mock as it's not really doing anything specific with the cluster. I understand that changes the test behaviour. If you think we should test the tool against an integration test cluster, I'm happy to change it back. Please let me know. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] mukkachaitanya commented on a diff in pull request #13276: KAFKA-14732: Use an exponential backoff retry mechanism while reconfiguring connector tasks
mukkachaitanya commented on code in PR #13276: URL: https://github.com/apache/kafka/pull/13276#discussion_r817237 ## connect/runtime/src/main/java/org/apache/kafka/connect/runtime/distributed/DistributedHerder.java: ## @@ -1870,7 +1880,16 @@ private Callable getConnectorStoppingCallable(final String connectorName) }; } -private void reconfigureConnectorTasksWithRetry(long initialRequestTime, final String connName) { +/** + * Request task configs from the connector and write them to the config storage in case the configs are detected to + * have changed. This method retries infinitely in case of any errors. + * + * @param initialRequestTime the time in milliseconds when the original request was made (i.e. before any retries) + * @param connName the name of the connector + * @param exponentialBackoff {@link ExponentialBackoff} used to calculate the retry backoff duration + * @param attempts the number of retry attempts that have been made + */ +private void reconfigureConnectorTasksWithRetry(long initialRequestTime, final String connName, ExponentialBackoff exponentialBackoff, int attempts) { Review Comment: I see currently we are always gonna do an ExponentialBackoff. Should we simply move the logic to set up the `ExponentialBackoff`in this function? I was thinking something like ```java private void reconfigureConnectorTasksWithRetry(long initialRequestTime, final String connName, ExponentialBackoff exponentialBackoff, int attempts) { ExponentialBackoff exponentialBackoff = new ExponentialBackoff( RECONFIGURE_CONNECTOR_TASKS_BACKOFF_INITIAL_MS, 2, RECONFIGURE_CONNECTOR_TASKS_BACKOFF_MAX_MS, 0); reconfigureConnectorTasksWithExpontialBackoff(initialRequestTime, connName, exponentialBackoff, attempts + 1); } ``` ## connect/runtime/src/main/java/org/apache/kafka/connect/runtime/distributed/DistributedHerder.java: ## @@ -1870,7 +1880,16 @@ private Callable getConnectorStoppingCallable(final String connectorName) }; } -private void reconfigureConnectorTasksWithRetry(long initialRequestTime, final String connName) { +/** + * Request task configs from the connector and write them to the config storage in case the configs are detected to + * have changed. This method retries infinitely in case of any errors. Review Comment: I am curious if there is a way to not do infinite retries. If we are actually retrying infinitely, esp in the case of `startConnector` phase, then the connector just doesn't have tasks. Is it possible to somehow bubble up errors as part of connector (not task) status? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] mimaison commented on a diff in pull request #13122: KAFKA-14594: Move LogDirsCommand to tools module
mimaison commented on code in PR #13122: URL: https://github.com/apache/kafka/pull/13122#discussion_r724503 ## tools/src/main/java/org/apache/kafka/tools/LogDirsCommand.java: ## @@ -0,0 +1,220 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.kafka.tools; + +import com.fasterxml.jackson.core.JsonProcessingException; +import com.fasterxml.jackson.databind.ObjectMapper; +import joptsimple.OptionSpec; +import joptsimple.OptionSpecBuilder; +import org.apache.kafka.clients.admin.Admin; +import org.apache.kafka.clients.admin.AdminClientConfig; +import org.apache.kafka.clients.admin.DescribeLogDirsResult; +import org.apache.kafka.clients.admin.LogDirDescription; +import org.apache.kafka.clients.admin.ReplicaInfo; +import org.apache.kafka.common.Node; +import org.apache.kafka.common.TopicPartition; +import org.apache.kafka.common.utils.Exit; +import org.apache.kafka.common.utils.Utils; +import org.apache.kafka.server.util.CommandDefaultOptions; +import org.apache.kafka.server.util.CommandLineUtils; + +import java.io.IOException; +import java.util.Arrays; +import java.util.HashMap; +import java.util.HashSet; +import java.util.List; +import java.util.Map; +import java.util.Properties; +import java.util.Set; +import java.util.stream.Collectors; +import java.util.stream.Stream; + +public class LogDirsCommand { + +public static void main(String... args) { +Exit.exit(mainNoExit(args)); +} + +static int mainNoExit(String... args) { +try { +execute(args); +return 0; +} catch (TerseException e) { +System.err.println(e.getMessage()); +return 1; +} catch (Throwable e) { +System.err.println(e.getMessage()); +System.err.println(Utils.stackTrace(e)); +return 1; +} +} + +private static void execute(String... args) throws Exception { +LogDirsCommandOptions options = new LogDirsCommandOptions(args); +try (Admin adminClient = createAdminClient(options)) { +execute(options, adminClient); +} +} + +static void execute(LogDirsCommandOptions options, Admin adminClient) throws Exception { +Set topics = options.topics(); +Set clusterBrokers = adminClient.describeCluster().nodes().get().stream().map(Node::id).collect(Collectors.toSet()); +Set inputBrokers = options.brokers(); +Set existingBrokers = inputBrokers.isEmpty() ? new HashSet<>(clusterBrokers) : new HashSet<>(inputBrokers); +existingBrokers.retainAll(clusterBrokers); +Set nonExistingBrokers = new HashSet<>(inputBrokers); +nonExistingBrokers.removeAll(clusterBrokers); + +if (!nonExistingBrokers.isEmpty()) { +throw new TerseException( +String.format( Review Comment: Nit: we don't need the newline here since we print the exception message using `println()` in `mainNoExit()` ## tools/src/main/java/org/apache/kafka/tools/LogDirsCommand.java: ## @@ -0,0 +1,220 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.kafka.tools; + +import com.fasterxml.jackson.core.JsonProcessingException; +import com.fasterxml.jackson.databind.ObjectMapper; +import joptsimple.OptionSpec; +import joptsimple.OptionSpecBuilder; +import org.apache.kafka.clients.admin.Admin; +import org.apache.kafka.clients.admin.AdminClientConfig; +import