[GitHub] [kafka] showuon commented on pull request #9674: KAFKA-10665: close all kafkaStreams before purgeLocalStreamsState
showuon commented on pull request #9674: URL: https://github.com/apache/kafka/pull/9674#issuecomment-737056995 @bbejeck @vvcephei @lkokhreidze , please help review this PR. Thanks. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Commented] (KAFKA-10636) Bypass log validation for writes to raft log
[ https://issues.apache.org/jira/browse/KAFKA-10636?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17242122#comment-17242122 ] feyman commented on KAFKA-10636: Hi, [~hachikuji] . I would like to take this task, please let me know if this one is still under discussion or have some pending dependencies. I had roughly went through the design and implementation of kafka-raft recently, will add my understanding of this task later to make sure we are on the same page, thanks! > Bypass log validation for writes to raft log > > > Key: KAFKA-10636 > URL: https://issues.apache.org/jira/browse/KAFKA-10636 > Project: Kafka > Issue Type: Sub-task >Reporter: Jason Gustafson >Assignee: feyman >Priority: Major > > The raft leader is responsible for creating the records written to the log > (including assigning offsets and the epoch), so we can consider bypassing the > validation done in `LogValidator`. This lets us skip potentially expensive > decompression and the unnecessary recomputation of the CRC. -- This message was sent by Atlassian Jira (v8.3.4#803005)
[GitHub] [kafka] showuon commented on a change in pull request #9674: KAFKA-10665: close all kafkaStreams before purgeLocalStreamsState
showuon commented on a change in pull request #9674: URL: https://github.com/apache/kafka/pull/9674#discussion_r533956768 ## File path: streams/src/test/java/org/apache/kafka/streams/integration/StreamTableJoinTopologyOptimizationIntegrationTest.java ## @@ -221,7 +223,7 @@ private boolean topicExists(final String topic) { ); } -private void validateReceivedMessages(final String topic, +private void validateMessagesReceived(final String topic, Review comment: Rename the method name This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] showuon commented on a change in pull request #9674: KAFKA-10665: close all kafkaStreams before purgeLocalStreamsState
showuon commented on a change in pull request #9674: URL: https://github.com/apache/kafka/pull/9674#discussion_r533956676 ## File path: streams/src/test/java/org/apache/kafka/streams/integration/StreamTableJoinTopologyOptimizationIntegrationTest.java ## @@ -149,9 +153,7 @@ public void shouldDoStreamTableJoinWithDifferentNumberOfPartitions() throws Exce sendEvents(inputTopic, timestamp, expectedRecords); sendEvents(outputTopic, timestamp, expectedRecords); -startStreams(streamsBuilder); Review comment: I think we don't need to start 2 streams for this test This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] showuon opened a new pull request #9674: KAFKA-10665: close all kafkaStreams before purgeLocalStreamsState
showuon opened a new pull request #9674: URL: https://github.com/apache/kafka/pull/9674 The flaky tests are because we forgot to close the kafkaStreams before purgeLocalStreamsState, so that sometimes there will be some tmp files be created during streams running/removing (ex: `checkpoint.tmp`), and caused the `DirectoryNotEmptyException` or `NoSuchFileException` be thrown. ### Committer Checklist (excluded from commit message) - [ ] Verify design and implementation - [ ] Verify test coverage and CI build status - [ ] Verify documentation (including upgrade notes) This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Assigned] (KAFKA-10636) Bypass log validation for writes to raft log
[ https://issues.apache.org/jira/browse/KAFKA-10636?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] feyman reassigned KAFKA-10636: -- Assignee: feyman > Bypass log validation for writes to raft log > > > Key: KAFKA-10636 > URL: https://issues.apache.org/jira/browse/KAFKA-10636 > Project: Kafka > Issue Type: Sub-task >Reporter: Jason Gustafson >Assignee: feyman >Priority: Major > > The raft leader is responsible for creating the records written to the log > (including assigning offsets and the epoch), so we can consider bypassing the > validation done in `LogValidator`. This lets us skip potentially expensive > decompression and the unnecessary recomputation of the CRC. -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Updated] (KAFKA-10794) Replica leader election is too slow in the case of too many partitions
[ https://issues.apache.org/jira/browse/KAFKA-10794?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] limeng updated KAFKA-10794: --- Description: There is more than 6000 topics and 300 brokers in my kafka cluster, and we frequently run kafka-preferred-replica-election.sh to rebalance our cluster. But the reblance process spendes too more time and cpu resource like the picture blow. We find that the function:'controllerContext.allPartitions' is invoked too many times. !截屏2020-12-02 上午11.43.48.png! !image-2020-12-02-15-23-43-384.png! was: There is more than 6000 topics and 300 brokers in my kafka cluster, and we frequently run kafka-preferred-replica-election.sh to rebalance our cluster. But the reblance process spendes too more time and cpu resource like the picture blow. We find that the function:'controllerContext.allPartitions' is invoked too many times. !截屏2020-12-02 上午11.43.48.png! > Replica leader election is too slow in the case of too many partitions > -- > > Key: KAFKA-10794 > URL: https://issues.apache.org/jira/browse/KAFKA-10794 > Project: Kafka > Issue Type: Improvement > Components: core >Affects Versions: 2.6.0, 2.5.1 >Reporter: limeng >Priority: Major > Attachments: clipboard_image_1606806581629.png, > image-2020-12-02-15-23-43-384.png, 截屏2020-12-01 下午2.36.19.png, 截屏2020-12-02 > 上午11.43.48.png > > > There is more than 6000 topics and 300 brokers in my kafka cluster, and we > frequently run kafka-preferred-replica-election.sh to rebalance our cluster. > But the reblance process spendes too more time and cpu resource like the > picture blow. > We find that the function:'controllerContext.allPartitions' is invoked too > many times. > !截屏2020-12-02 上午11.43.48.png! > !image-2020-12-02-15-23-43-384.png! > -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Updated] (KAFKA-10794) Replica leader election is too slow in the case of too many partitions
[ https://issues.apache.org/jira/browse/KAFKA-10794?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] limeng updated KAFKA-10794: --- Attachment: image-2020-12-02-15-23-43-384.png > Replica leader election is too slow in the case of too many partitions > -- > > Key: KAFKA-10794 > URL: https://issues.apache.org/jira/browse/KAFKA-10794 > Project: Kafka > Issue Type: Improvement > Components: core >Affects Versions: 2.6.0, 2.5.1 >Reporter: limeng >Priority: Major > Attachments: clipboard_image_1606806581629.png, > image-2020-12-02-15-23-43-384.png, 截屏2020-12-01 下午2.36.19.png, 截屏2020-12-02 > 上午11.43.48.png > > > There is more than 6000 topics and 300 brokers in my kafka cluster, and we > frequently run kafka-preferred-replica-election.sh to rebalance our cluster. > But the reblance process spendes too more time and cpu resource like the > picture blow. > We find that the function:'controllerContext.allPartitions' is invoked too > many times. > !截屏2020-12-02 上午11.43.48.png! -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Updated] (KAFKA-10794) Replica leader election is too slow in the case of too many partitions
[ https://issues.apache.org/jira/browse/KAFKA-10794?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] limeng updated KAFKA-10794: --- Attachment: 截屏2020-12-01 下午2.36.19.png > Replica leader election is too slow in the case of too many partitions > -- > > Key: KAFKA-10794 > URL: https://issues.apache.org/jira/browse/KAFKA-10794 > Project: Kafka > Issue Type: Improvement > Components: core >Affects Versions: 2.6.0, 2.5.1 >Reporter: limeng >Priority: Major > Attachments: clipboard_image_1606806581629.png, 截屏2020-12-01 > 下午2.36.19.png, 截屏2020-12-02 上午11.43.48.png > > > There is more than 6000 topics and 300 brokers in my kafka cluster, and we > frequently run kafka-preferred-replica-election.sh to rebalance our cluster. > But the reblance process spendes too more time and cpu resource like the > picture blow. > We find that the function:'controllerContext.allPartitions' is invoked too > many times. > !截屏2020-12-02 上午11.43.48.png! -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Updated] (KAFKA-10794) Replica leader election is too slow in the case of too many partitions
[ https://issues.apache.org/jira/browse/KAFKA-10794?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] limeng updated KAFKA-10794: --- Description: There is more than 6000 topics and 300 brokers in my kafka cluster, and we frequently run kafka-preferred-replica-election.sh to rebalance our cluster. But the reblance process spendes too more time and cpu resource like the picture blow. We find that the function:'controllerContext.allPartitions' is invoked too many times. !截屏2020-12-02 上午11.43.48.png! was: There is more than 6000 topics and 300 brokers in my kafka cluster, and we set the config:'auto.leader.rebalance.enable=true'. The reblance process spendes too more time and cpu resource. We find that the function:'controllerContext.allPartitions' is invoked too many times. !截屏2020-12-02 上午11.43.48.png! > Replica leader election is too slow in the case of too many partitions > -- > > Key: KAFKA-10794 > URL: https://issues.apache.org/jira/browse/KAFKA-10794 > Project: Kafka > Issue Type: Improvement > Components: core >Affects Versions: 2.6.0, 2.5.1 >Reporter: limeng >Priority: Major > Attachments: clipboard_image_1606806581629.png, 截屏2020-12-02 > 上午11.43.48.png > > > There is more than 6000 topics and 300 brokers in my kafka cluster, and we > frequently run kafka-preferred-replica-election.sh to rebalance our cluster. > But the reblance process spendes too more time and cpu resource like the > picture blow. > We find that the function:'controllerContext.allPartitions' is invoked too > many times. > !截屏2020-12-02 上午11.43.48.png! -- This message was sent by Atlassian Jira (v8.3.4#803005)
[GitHub] [kafka] chia7712 commented on pull request #8826: KAFKA-10090 Misleading warnings: The configuration was supplied but i…
chia7712 commented on pull request #8826: URL: https://github.com/apache/kafka/pull/8826#issuecomment-737028662 > There are 6 failures in the system test run. Are they related to this PR? They are unrelated to this PR and I have opened a PR to fix them (#9673) This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] chia7712 opened a new pull request #9673: KAFKA-10289 fix failed connect_distributed_test.py (ConnectDistribute…
chia7712 opened a new pull request #9673: URL: https://github.com/apache/kafka/pull/9673 issue: https://issues.apache.org/jira/browse/KAFKA-10289 In Python 3, ```filter``` functions return iterators rather than ```list``` so it can traverse only once. Hence, the following loop only see "empty" and then validation fails. ```python src_messages = self.source.committed_messages() # return iterator sink_messages = self.sink.flushed_messages()) # return iterator for task in range(num_tasks): # only first task can "see" the result. following tasks see empty result src_seqnos = [msg['seqno'] for msg in src_messages if msg['task'] == task] ``` ### Committer Checklist (excluded from commit message) - [ ] Verify design and implementation - [ ] Verify test coverage and CI build status - [ ] Verify documentation (including upgrade notes) This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Updated] (KAFKA-10289) fix failed connect_distributed_test.py (ConnectDistributedTest.test_bounce)
[ https://issues.apache.org/jira/browse/KAFKA-10289?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Chia-Ping Tsai updated KAFKA-10289: --- Summary: fix failed connect_distributed_test.py (ConnectDistributedTest.test_bounce) (was: fix failed connect_distributed_test.py.test_bounce) > fix failed connect_distributed_test.py (ConnectDistributedTest.test_bounce) > --- > > Key: KAFKA-10289 > URL: https://issues.apache.org/jira/browse/KAFKA-10289 > Project: Kafka > Issue Type: Bug > Components: KafkaConnect, system tests >Reporter: Chia-Ping Tsai >Assignee: Chia-Ping Tsai >Priority: Major > > {quote} > Module: kafkatest.tests.connect.connect_distributed_test > Class: ConnectDistributedTest > Method: test_broker_compatibility > Arguments: > { > "auto_create_topics": false, > "broker_version": "0.10.1.1", > "connect_protocol": "compatible", > "security_protocol": "PLAINTEXT" > } > {quote} > {quote} > Module: kafkatest.tests.connect.connect_distributed_test > Class: ConnectDistributedTest > Method: test_broker_compatibility > Arguments: > { > "auto_create_topics": false, > "broker_version": "2.1.1", > "connect_protocol": "compatible", > "security_protocol": "PLAINTEXT" > } > {quote} -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Updated] (KAFKA-10289) fix failed connect_distributed_test.py.test_bounce
[ https://issues.apache.org/jira/browse/KAFKA-10289?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Chia-Ping Tsai updated KAFKA-10289: --- Summary: fix failed connect_distributed_test.py.test_bounce (was: fix flaky connect/connect_distributed_test.py) > fix failed connect_distributed_test.py.test_bounce > -- > > Key: KAFKA-10289 > URL: https://issues.apache.org/jira/browse/KAFKA-10289 > Project: Kafka > Issue Type: Bug > Components: KafkaConnect, system tests >Reporter: Chia-Ping Tsai >Assignee: Chia-Ping Tsai >Priority: Major > > {quote} > Module: kafkatest.tests.connect.connect_distributed_test > Class: ConnectDistributedTest > Method: test_broker_compatibility > Arguments: > { > "auto_create_topics": false, > "broker_version": "0.10.1.1", > "connect_protocol": "compatible", > "security_protocol": "PLAINTEXT" > } > {quote} > {quote} > Module: kafkatest.tests.connect.connect_distributed_test > Class: ConnectDistributedTest > Method: test_broker_compatibility > Arguments: > { > "auto_create_topics": false, > "broker_version": "2.1.1", > "connect_protocol": "compatible", > "security_protocol": "PLAINTEXT" > } > {quote} -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Assigned] (KAFKA-10289) fix flaky connect/connect_distributed_test.py
[ https://issues.apache.org/jira/browse/KAFKA-10289?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Chia-Ping Tsai reassigned KAFKA-10289: -- Assignee: Chia-Ping Tsai > fix flaky connect/connect_distributed_test.py > - > > Key: KAFKA-10289 > URL: https://issues.apache.org/jira/browse/KAFKA-10289 > Project: Kafka > Issue Type: Bug > Components: KafkaConnect, system tests >Reporter: Chia-Ping Tsai >Assignee: Chia-Ping Tsai >Priority: Major > > {quote} > Module: kafkatest.tests.connect.connect_distributed_test > Class: ConnectDistributedTest > Method: test_broker_compatibility > Arguments: > { > "auto_create_topics": false, > "broker_version": "0.10.1.1", > "connect_protocol": "compatible", > "security_protocol": "PLAINTEXT" > } > {quote} > {quote} > Module: kafkatest.tests.connect.connect_distributed_test > Class: ConnectDistributedTest > Method: test_broker_compatibility > Arguments: > { > "auto_create_topics": false, > "broker_version": "2.1.1", > "connect_protocol": "compatible", > "security_protocol": "PLAINTEXT" > } > {quote} -- This message was sent by Atlassian Jira (v8.3.4#803005)
[GitHub] [kafka] d8tltanc commented on a change in pull request #9485: KAKFA-10619: Idempotent producer will get authorized once it has a WRITE access to at least one topic
d8tltanc commented on a change in pull request #9485: URL: https://github.com/apache/kafka/pull/9485#discussion_r533922526 ## File path: jmh-benchmarks/src/main/java/org/apache/kafka/jmh/acl/AclAuthorizerBenchmark.java ## @@ -115,45 +122,62 @@ private void setFieldValue(Object obj, String fieldName, Object value) throws Ex Set entries = aclEntries.computeIfAbsent(resource, k -> new HashSet<>()); -for (int aclId = 0; aclId < aclCount; aclId++) { -AccessControlEntry ace = new AccessControlEntry(principal.toString() + aclId, -"*", AclOperation.READ, AclPermissionType.ALLOW); -entries.add(new AclEntry(ace)); +for (int aclId = 0; aclId < aclCount / 2; aclId++) { +String acePrinciple = principal.toString() + (aclId == 0 ? "" : aclId); +AccessControlEntry allowAce = new AccessControlEntry( +acePrinciple, Review comment: commit 6ab95d3668b3de27a7f6f58fc171a1e2e8925f69 ## File path: core/src/main/scala/kafka/security/authorizer/AuthorizerWrapper.scala ## @@ -175,4 +179,73 @@ class AuthorizerWrapper(private[kafka] val baseAuthorizer: kafka.security.auth.A override def close(): Unit = { baseAuthorizer.close() } + + override def authorizeByResourceType(requestContext: AuthorizableRequestContext, + op: AclOperation, + resourceType: ResourceType): AuthorizationResult = { +if (resourceType == ResourceType.ANY) + throw new IllegalArgumentException("Must specify a non-filter resource type for authorizeByResourceType") + +if (resourceType == ResourceType.UNKNOWN) + throw new IllegalArgumentException("Unknown resource type") + +if (op == AclOperation.ANY) + throw new IllegalArgumentException("Must specify a non-filter operation type for authorizeByResourceType") + +if (op == AclOperation.UNKNOWN) + throw new IllegalArgumentException("Unknown operation type") + +if (shouldAllowEveryoneIfNoAclIsFound && !denyAllResource(requestContext, op, resourceType)) { + AuthorizationResult.ALLOWED +} else { + super.authorizeByResourceType(requestContext, op, resourceType) +} + } + + private def denyAllResource(requestContext: AuthorizableRequestContext, + op: AclOperation, + resourceType: ResourceType): Boolean = { +val resourceTypeFilter = new ResourcePatternFilter( + resourceType, null, PatternType.ANY) +val accessControlEntry = new AccessControlEntryFilter( + null, null, null, AclPermissionType.DENY) +val aclFilter = new AclBindingFilter(resourceTypeFilter, accessControlEntry) + +for (binding <- acls(aclFilter).asScala) { + if (aceMatched(requestContext, op, binding) && canDenyAll(binding.pattern())) +return true +} +false + } + + @inline + private def aceMatched(requestContext: AuthorizableRequestContext, + op: AclOperation, + binding: AclBinding): Boolean = { +(hostMatched(requestContext, binding) && principleMatched(requestContext, binding) + && operationMatched(op, binding)) + } + + @inline + private def hostMatched(requestContext: AuthorizableRequestContext, + binding: AclBinding): Boolean = + (binding.entry().host().equals(requestContext.clientAddress().getHostAddress) + || binding.entry().host().equals(AclEntry.WildcardHost)) + + @inline + private def principleMatched(requestContext: AuthorizableRequestContext, Review comment: commit 6ab95d3668b3de27a7f6f58fc171a1e2e8925f69 This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] d8tltanc commented on a change in pull request #9485: KAKFA-10619: Idempotent producer will get authorized once it has a WRITE access to at least one topic
d8tltanc commented on a change in pull request #9485: URL: https://github.com/apache/kafka/pull/9485#discussion_r533919425 ## File path: core/src/main/scala/kafka/security/authorizer/AclAuthorizer.scala ## @@ -304,6 +308,122 @@ class AclAuthorizer extends Authorizer with Logging { if (zkClient != null) zkClient.close() } + // TODO: 1. Discuss how to log audit message + // TODO: 2. Discuss if we need a trie to optimize(mainly for the O(n^2) loop but I think + // in most of the cases it would be O(1) because denyDominatePrefixAllow should be rare + override def authorizeByResourceType(requestContext: AuthorizableRequestContext, + op: AclOperation, + resourceType: ResourceType): AuthorizationResult = { +if (resourceType eq ResourceType.ANY) + throw new IllegalArgumentException("Must specify a non-filter resource type for authorizeByResourceType") + +if (resourceType eq ResourceType.UNKNOWN) + throw new IllegalArgumentException("Unknown resource type") + +if (op eq AclOperation.ANY) + throw new IllegalArgumentException("Must specify a non-filter operation type for authorizeByResourceType") + +if (op eq AclOperation.UNKNOWN) + throw new IllegalArgumentException("Unknown operation type") Review comment: commit f6d2a39706998160ebe77a854b8bf64268eec68a This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] d8tltanc commented on a change in pull request #9485: KAKFA-10619: Idempotent producer will get authorized once it has a WRITE access to at least one topic
d8tltanc commented on a change in pull request #9485: URL: https://github.com/apache/kafka/pull/9485#discussion_r533916889 ## File path: clients/src/main/java/org/apache/kafka/server/authorizer/Authorizer.java ## @@ -139,4 +152,125 @@ * @return Iterator for ACL bindings, which may be populated lazily. */ Iterable acls(AclBindingFilter filter); + +/** + * Check if the caller is authorized to perform the given ACL operation on at least one + * resource of the given type. + * + * @param requestContext Request context including request resourceType, security protocol, and listener name + * @param op The ACL operation to check + * @param resourceType The resource type to check + * @return Return {@link AuthorizationResult#ALLOWED} if the caller is authorized to perform the + * given ACL operation on at least one resource of the given type. + * Return {@link AuthorizationResult#DENIED} otherwise. + */ +default AuthorizationResult authorizeByResourceType(AuthorizableRequestContext requestContext, AclOperation op, ResourceType resourceType) { +if (resourceType == ResourceType.ANY) { +throw new IllegalArgumentException( +"Must specify a non-filter resource type for authorizeByResourceType"); +} + +if (resourceType == ResourceType.UNKNOWN) { +throw new IllegalArgumentException( +"Unknown resource type"); +} + +if (op == AclOperation.ANY) { +throw new IllegalArgumentException( +"Must specify a non-filter operation type for authorizeByResourceType"); +} + +if (op == AclOperation.UNKNOWN) { +throw new IllegalArgumentException( +"Unknown operation type"); +} + +ResourcePatternFilter resourceTypeFilter = new ResourcePatternFilter( +resourceType, null, PatternType.ANY); +AclBindingFilter aclFilter = new AclBindingFilter( +resourceTypeFilter, AccessControlEntryFilter.ANY); + +final int typeLiteral = 0; +final int typePrefix = 1; + +List> deny = new ArrayList<>( +Arrays.asList(new HashSet<>(), new HashSet<>())); +List> allow = new ArrayList<>( +Arrays.asList(new HashSet<>(), new HashSet<>())); + +boolean hasWildCardAllow = false; + +for (AclBinding binding : acls(aclFilter)) { +if (!binding.entry().host().equals(requestContext.clientAddress().getHostAddress()) Review comment: Right. Just as what we've done to Principle. commit 29ac8628089ddf1210072bbf52e01a41e123a718 This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] d8tltanc commented on a change in pull request #9485: KAKFA-10619: Idempotent producer will get authorized once it has a WRITE access to at least one topic
d8tltanc commented on a change in pull request #9485: URL: https://github.com/apache/kafka/pull/9485#discussion_r533915261 ## File path: clients/src/main/java/org/apache/kafka/server/authorizer/Authorizer.java ## @@ -139,4 +152,125 @@ * @return Iterator for ACL bindings, which may be populated lazily. */ Iterable acls(AclBindingFilter filter); + +/** + * Check if the caller is authorized to perform the given ACL operation on at least one + * resource of the given type. + * + * @param requestContext Request context including request resourceType, security protocol, and listener name + * @param op The ACL operation to check + * @param resourceType The resource type to check + * @return Return {@link AuthorizationResult#ALLOWED} if the caller is authorized to perform the + * given ACL operation on at least one resource of the given type. + * Return {@link AuthorizationResult#DENIED} otherwise. + */ +default AuthorizationResult authorizeByResourceType(AuthorizableRequestContext requestContext, AclOperation op, ResourceType resourceType) { +if (resourceType == ResourceType.ANY) { +throw new IllegalArgumentException( +"Must specify a non-filter resource type for authorizeByResourceType"); +} + +if (resourceType == ResourceType.UNKNOWN) { +throw new IllegalArgumentException( +"Unknown resource type"); +} + +if (op == AclOperation.ANY) { +throw new IllegalArgumentException( +"Must specify a non-filter operation type for authorizeByResourceType"); +} + +if (op == AclOperation.UNKNOWN) { +throw new IllegalArgumentException( +"Unknown operation type"); +} + +ResourcePatternFilter resourceTypeFilter = new ResourcePatternFilter( +resourceType, null, PatternType.ANY); +AclBindingFilter aclFilter = new AclBindingFilter( +resourceTypeFilter, AccessControlEntryFilter.ANY); + +final int typeLiteral = 0; +final int typePrefix = 1; + +List> deny = new ArrayList<>( +Arrays.asList(new HashSet<>(), new HashSet<>())); Review comment: EnumMap make sense. commit 1a139ce744a279e4424188008ee5158186b0fcbe ## File path: clients/src/main/java/org/apache/kafka/server/authorizer/Authorizer.java ## @@ -139,4 +152,125 @@ * @return Iterator for ACL bindings, which may be populated lazily. */ Iterable acls(AclBindingFilter filter); + +/** + * Check if the caller is authorized to perform the given ACL operation on at least one + * resource of the given type. + * + * @param requestContext Request context including request resourceType, security protocol, and listener name + * @param op The ACL operation to check + * @param resourceType The resource type to check + * @return Return {@link AuthorizationResult#ALLOWED} if the caller is authorized to perform the + * given ACL operation on at least one resource of the given type. + * Return {@link AuthorizationResult#DENIED} otherwise. + */ +default AuthorizationResult authorizeByResourceType(AuthorizableRequestContext requestContext, AclOperation op, ResourceType resourceType) { +if (resourceType == ResourceType.ANY) { +throw new IllegalArgumentException( +"Must specify a non-filter resource type for authorizeByResourceType"); +} + +if (resourceType == ResourceType.UNKNOWN) { +throw new IllegalArgumentException( +"Unknown resource type"); +} + +if (op == AclOperation.ANY) { +throw new IllegalArgumentException( +"Must specify a non-filter operation type for authorizeByResourceType"); +} + +if (op == AclOperation.UNKNOWN) { +throw new IllegalArgumentException( +"Unknown operation type"); +} + +ResourcePatternFilter resourceTypeFilter = new ResourcePatternFilter( +resourceType, null, PatternType.ANY); +AclBindingFilter aclFilter = new AclBindingFilter( +resourceTypeFilter, AccessControlEntryFilter.ANY); + +final int typeLiteral = 0; +final int typePrefix = 1; Review comment: EnumMap make sense. commit 1a139ce744a279e4424188008ee5158186b0fcbe This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] d8tltanc commented on a change in pull request #9485: KAKFA-10619: Idempotent producer will get authorized once it has a WRITE access to at least one topic
d8tltanc commented on a change in pull request #9485: URL: https://github.com/apache/kafka/pull/9485#discussion_r533915142 ## File path: clients/src/main/java/org/apache/kafka/server/authorizer/Authorizer.java ## @@ -139,4 +152,125 @@ * @return Iterator for ACL bindings, which may be populated lazily. */ Iterable acls(AclBindingFilter filter); + +/** + * Check if the caller is authorized to perform the given ACL operation on at least one + * resource of the given type. + * + * @param requestContext Request context including request resourceType, security protocol, and listener name + * @param op The ACL operation to check + * @param resourceType The resource type to check + * @return Return {@link AuthorizationResult#ALLOWED} if the caller is authorized to perform the + * given ACL operation on at least one resource of the given type. + * Return {@link AuthorizationResult#DENIED} otherwise. + */ +default AuthorizationResult authorizeByResourceType(AuthorizableRequestContext requestContext, AclOperation op, ResourceType resourceType) { +if (resourceType == ResourceType.ANY) { +throw new IllegalArgumentException( +"Must specify a non-filter resource type for authorizeByResourceType"); +} + +if (resourceType == ResourceType.UNKNOWN) { +throw new IllegalArgumentException( +"Unknown resource type"); +} + +if (op == AclOperation.ANY) { +throw new IllegalArgumentException( +"Must specify a non-filter operation type for authorizeByResourceType"); +} + +if (op == AclOperation.UNKNOWN) { +throw new IllegalArgumentException( +"Unknown operation type"); +} + +ResourcePatternFilter resourceTypeFilter = new ResourcePatternFilter( +resourceType, null, PatternType.ANY); +AclBindingFilter aclFilter = new AclBindingFilter( +resourceTypeFilter, AccessControlEntryFilter.ANY); + +final int typeLiteral = 0; +final int typePrefix = 1; + +List> deny = new ArrayList<>( +Arrays.asList(new HashSet<>(), new HashSet<>())); +List> allow = new ArrayList<>( +Arrays.asList(new HashSet<>(), new HashSet<>())); + +boolean hasWildCardAllow = false; + +for (AclBinding binding : acls(aclFilter)) { +if (!binding.entry().host().equals(requestContext.clientAddress().getHostAddress()) +&& !binding.entry().host().equals("*")) +continue; + +KafkaPrincipal principal = new KafkaPrincipal( +requestContext.principal().getPrincipalType(), +requestContext.principal().getName()); Review comment: Yeah. Took out. commit 1a139ce744a279e4424188008ee5158186b0fcbe This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] d8tltanc commented on a change in pull request #9485: KAKFA-10619: Idempotent producer will get authorized once it has a WRITE access to at least one topic
d8tltanc commented on a change in pull request #9485: URL: https://github.com/apache/kafka/pull/9485#discussion_r533915036 ## File path: clients/src/main/java/org/apache/kafka/server/authorizer/Authorizer.java ## @@ -139,4 +152,125 @@ * @return Iterator for ACL bindings, which may be populated lazily. */ Iterable acls(AclBindingFilter filter); + +/** + * Check if the caller is authorized to perform the given ACL operation on at least one + * resource of the given type. + * + * @param requestContext Request context including request resourceType, security protocol, and listener name + * @param op The ACL operation to check + * @param resourceType The resource type to check + * @return Return {@link AuthorizationResult#ALLOWED} if the caller is authorized to perform the + * given ACL operation on at least one resource of the given type. + * Return {@link AuthorizationResult#DENIED} otherwise. + */ +default AuthorizationResult authorizeByResourceType(AuthorizableRequestContext requestContext, AclOperation op, ResourceType resourceType) { +if (resourceType == ResourceType.ANY) { +throw new IllegalArgumentException( +"Must specify a non-filter resource type for authorizeByResourceType"); +} + +if (resourceType == ResourceType.UNKNOWN) { +throw new IllegalArgumentException( +"Unknown resource type"); +} + +if (op == AclOperation.ANY) { +throw new IllegalArgumentException( +"Must specify a non-filter operation type for authorizeByResourceType"); +} + +if (op == AclOperation.UNKNOWN) { +throw new IllegalArgumentException( +"Unknown operation type"); +} + +ResourcePatternFilter resourceTypeFilter = new ResourcePatternFilter( +resourceType, null, PatternType.ANY); +AclBindingFilter aclFilter = new AclBindingFilter( +resourceTypeFilter, AccessControlEntryFilter.ANY); + +final int typeLiteral = 0; +final int typePrefix = 1; + +List> deny = new ArrayList<>( +Arrays.asList(new HashSet<>(), new HashSet<>())); +List> allow = new ArrayList<>( +Arrays.asList(new HashSet<>(), new HashSet<>())); + +boolean hasWildCardAllow = false; + +for (AclBinding binding : acls(aclFilter)) { +if (!binding.entry().host().equals(requestContext.clientAddress().getHostAddress()) +&& !binding.entry().host().equals("*")) +continue; + +KafkaPrincipal principal = new KafkaPrincipal( +requestContext.principal().getPrincipalType(), +requestContext.principal().getName()); + +if (!SecurityUtils.parseKafkaPrincipal(binding.entry().principal()).equals(principal) +&& !binding.entry().principal().equals("User:*")) +continue; + +if (binding.entry().operation() != op +&& binding.entry().operation() != AclOperation.ALL) +continue; + +if (binding.entry().permissionType() == AclPermissionType.DENY) { +switch (binding.pattern().patternType()) { +case LITERAL: +if (binding.pattern().name().equals(ResourcePattern.WILDCARD_RESOURCE)) +return AuthorizationResult.DENIED; +deny.get(typeLiteral).add(binding.pattern().name()); +break; +case PREFIXED: +deny.get(typePrefix).add(binding.pattern().name()); +break; +} +continue; +} + +if (binding.entry().permissionType() != AclPermissionType.ALLOW) +continue; + +switch (binding.pattern().patternType()) { +case LITERAL: +if (binding.pattern().name().equals(ResourcePattern.WILDCARD_RESOURCE)) { +hasWildCardAllow = true; +continue; +} +allow.get(typeLiteral).add(binding.pattern().name()); +break; +case PREFIXED: +allow.get(typePrefix).add(binding.pattern().name()); +break; +} +} + +if (hasWildCardAllow) { +return AuthorizationResult.ALLOWED; +} + +for (int allowType : Arrays.asList(typePrefix, typeLiteral)) { Review comment: EnumMap make sense. commit 1a139ce744a279e4424188008ee5158186b0fcbe This is an automated message from the Apache Git Service. To respond to the
[jira] [Commented] (KAFKA-10665) Flaky Test StreamTableJoinTopologyOptimizationIntegrationTest.shouldDoStreamTableJoinWithDifferentNumberOfPartitions[Optimization = all]
[ https://issues.apache.org/jira/browse/KAFKA-10665?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17242062#comment-17242062 ] Luke Chen commented on KAFKA-10665: --- org.apache.kafka.streams.integration.StreamTableJoinTopologyOptimizationIntegrationTest.shouldDoStreamTableJoinWithDifferentNumberOfPartitions[Optimization = none] Failing for the past 1 build (Since [!https://ci-builds.apache.org/static/23d98232/images/16x16/red.png! #294|https://ci-builds.apache.org/job/Kafka/job/kafka-trunk-jdk15/294/] ) [Took 4.5 sec.|https://ci-builds.apache.org/job/Kafka/job/kafka-trunk-jdk15/294/testReport/junit/org.apache.kafka.streams.integration/StreamTableJoinTopologyOptimizationIntegrationTest/shouldDoStreamTableJoinWithDifferentNumberOfPartitions_Optimization___none_/history] h3. Error Message java.nio.file.NoSuchFileException: /tmp/kafka-3251174604229833116/app-StreamTableJoinTopologyOptimizationIntegrationTestshouldDoStreamTableJoinWithDifferentNumberOfPartitions_Optimization___none_/1_1/.checkpoint.tmp h3. Stacktrace java.nio.file.NoSuchFileException: /tmp/kafka-3251174604229833116/app-StreamTableJoinTopologyOptimizationIntegrationTestshouldDoStreamTableJoinWithDifferentNumberOfPartitions_Optimization___none_/1_1/.checkpoint.tmp at java.base/sun.nio.fs.UnixException.translateToIOException(UnixException.java:92) at java.base/sun.nio.fs.UnixException.rethrowAsIOException(UnixException.java:106) at java.base/sun.nio.fs.UnixException.rethrowAsIOException(UnixException.java:111) at java.base/sun.nio.fs.UnixFileAttributeViews$Basic.readAttributes(UnixFileAttributeViews.java:55) at java.base/sun.nio.fs.UnixFileSystemProvider.readAttributes(UnixFileSystemProvider.java:148) at java.base/sun.nio.fs.LinuxFileSystemProvider.readAttributes(LinuxFileSystemProvider.java:99) at java.base/java.nio.file.Files.readAttributes(Files.java:1843) at java.base/java.nio.file.FileTreeWalker.getAttributes(FileTreeWalker.java:219) at java.base/java.nio.file.FileTreeWalker.visit(FileTreeWalker.java:276) at java.base/java.nio.file.FileTreeWalker.next(FileTreeWalker.java:373) at java.base/java.nio.file.Files.walkFileTree(Files.java:2840) at java.base/java.nio.file.Files.walkFileTree(Files.java:2876) at org.apache.kafka.common.utils.Utils.delete(Utils.java:841) at org.apache.kafka.common.utils.Utils.delete(Utils.java:827) at org.apache.kafka.streams.integration.utils.IntegrationTestUtils.purgeLocalStreamsState(IntegrationTestUtils.java:151) at org.apache.kafka.streams.integration.StreamTableJoinTopologyOptimizationIntegrationTest.whenShuttingDown(StreamTableJoinTopologyOptimizationIntegrationTest.java:122) at java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke0(Native Method) at java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:64) at java.base/jdk.internal.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) at java.base/java.lang.reflect.Method.invoke(Method.java:564) at org.junit.runners.model.FrameworkMethod$1.runReflectiveCall(FrameworkMethod.java:59) at org.junit.internal.runners.model.ReflectiveCallable.run(ReflectiveCallable.java:12) at org.junit.runners.model.FrameworkMethod.invokeExplosively(FrameworkMethod.java:56) at org.junit.internal.runners.statements.RunAfters.invokeMethod(RunAfters.java:46) at org.junit.internal.runners.statements.RunAfters.evaluate(RunAfters.java:33) at org.junit.rules.TestWatcher$1.evaluate(TestWatcher.java:61) at org.junit.runners.ParentRunner$3.evaluate(ParentRunner.java:306) at org.junit.runners.BlockJUnit4ClassRunner$1.evaluate(BlockJUnit4ClassRunner.java:100) at org.junit.runners.ParentRunner.runLeaf(ParentRunner.java:366) at org.junit.runners.BlockJUnit4ClassRunner.runChild(BlockJUnit4ClassRunner.java:103) at org.junit.runners.BlockJUnit4ClassRunner.runChild(BlockJUnit4ClassRunner.java:63) at org.junit.runners.ParentRunner$4.run(ParentRunner.java:331) at org.junit.runners.ParentRunner$1.schedule(ParentRunner.java:79) at org.junit.runners.ParentRunner.runChildren(ParentRunner.java:329) at org.junit.runners.ParentRunner.access$100(ParentRunner.java:66) at org.junit.runners.ParentRunner$2.evaluate(ParentRunner.java:293) at org.junit.runners.ParentRunner.run(ParentRunner.java:413) at org.junit.runners.Suite.runChild(Suite.java:128) at org.junit.runners.Suite.runChild(Suite.java:27) at org.junit.runners.ParentRunner$4.run(ParentRunner.java:331) at org.junit.runners.ParentRunner$1.schedule(ParentRunner.java:79) at org.junit.runners.ParentRunner.runChildren(ParentRunner.java:329) at org.junit.runners.ParentRunner.access$100(ParentRunner.java:66) at org.junit.runners.ParentRunner$2.evaluate(ParentRunner.java:293) at org.junit.rules.ExternalResource$1.evaluate(ExternalResource.java:54) at org.junit.rules.RunRules.evaluate(RunRules.java:20) at
[jira] [Commented] (KAFKA-10665) Flaky Test StreamTableJoinTopologyOptimizationIntegrationTest.shouldDoStreamTableJoinWithDifferentNumberOfPartitions[Optimization = all]
[ https://issues.apache.org/jira/browse/KAFKA-10665?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17242063#comment-17242063 ] Luke Chen commented on KAFKA-10665: --- org.apache.kafka.streams.integration.StreamTableJoinTopologyOptimizationIntegrationTest.shouldDoStreamTableJoinWithDifferentNumberOfPartitions[Optimization = all] Failing for the past 1 build (Since [!https://ci-builds.apache.org/static/23d98232/images/16x16/red.png! #291|https://ci-builds.apache.org/job/Kafka/job/kafka-trunk-jdk15/291/] ) [Took 4.5 sec.|https://ci-builds.apache.org/job/Kafka/job/kafka-trunk-jdk15/291/testReport/junit/org.apache.kafka.streams.integration/StreamTableJoinTopologyOptimizationIntegrationTest/shouldDoStreamTableJoinWithDifferentNumberOfPartitions_Optimization___all_/history] h3. Error Message java.nio.file.DirectoryNotEmptyException: /tmp/kafka-12480882490717231293/app-StreamTableJoinTopologyOptimizationIntegrationTestshouldDoStreamTableJoinWithDifferentNumberOfPartitions_Optimization___all_ h3. Stacktrace java.nio.file.DirectoryNotEmptyException: /tmp/kafka-12480882490717231293/app-StreamTableJoinTopologyOptimizationIntegrationTestshouldDoStreamTableJoinWithDifferentNumberOfPartitions_Optimization___all_ at java.base/sun.nio.fs.UnixFileSystemProvider.implDelete(UnixFileSystemProvider.java:246) at java.base/sun.nio.fs.AbstractFileSystemProvider.delete(AbstractFileSystemProvider.java:105) at java.base/java.nio.file.Files.delete(Files.java:1146) at org.apache.kafka.common.utils.Utils$2.postVisitDirectory(Utils.java:871) at org.apache.kafka.common.utils.Utils$2.postVisitDirectory(Utils.java:841) at java.base/java.nio.file.Files.walkFileTree(Files.java:2822) at java.base/java.nio.file.Files.walkFileTree(Files.java:2876) at org.apache.kafka.common.utils.Utils.delete(Utils.java:841) at org.apache.kafka.common.utils.Utils.delete(Utils.java:827) at org.apache.kafka.streams.integration.utils.IntegrationTestUtils.purgeLocalStreamsState(IntegrationTestUtils.java:151) at org.apache.kafka.streams.integration.StreamTableJoinTopologyOptimizationIntegrationTest.whenShuttingDown(StreamTableJoinTopologyOptimizationIntegrationTest.java:122) at java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke0(Native Method) at java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:64) at java.base/jdk.internal.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) at java.base/java.lang.reflect.Method.invoke(Method.java:564) at org.junit.runners.model.FrameworkMethod$1.runReflectiveCall(FrameworkMethod.java:59) at org.junit.internal.runners.model.ReflectiveCallable.run(ReflectiveCallable.java:12) at org.junit.runners.model.FrameworkMethod.invokeExplosively(FrameworkMethod.java:56) at org.junit.internal.runners.statements.RunAfters.invokeMethod(RunAfters.java:46) at org.junit.internal.runners.statements.RunAfters.evaluate(RunAfters.java:33) at org.junit.rules.TestWatcher$1.evaluate(TestWatcher.java:61) at org.junit.runners.ParentRunner$3.evaluate(ParentRunner.java:306) at org.junit.runners.BlockJUnit4ClassRunner$1.evaluate(BlockJUnit4ClassRunner.java:100) at org.junit.runners.ParentRunner.runLeaf(ParentRunner.java:366) at org.junit.runners.BlockJUnit4ClassRunner.runChild(BlockJUnit4ClassRunner.java:103) at org.junit.runners.BlockJUnit4ClassRunner.runChild(BlockJUnit4ClassRunner.java:63) at org.junit.runners.ParentRunner$4.run(ParentRunner.java:331) at org.junit.runners.ParentRunner$1.schedule(ParentRunner.java:79) at org.junit.runners.ParentRunner.runChildren(ParentRunner.java:329) at org.junit.runners.ParentRunner.access$100(ParentRunner.java:66) at org.junit.runners.ParentRunner$2.evaluate(ParentRunner.java:293) at org.junit.runners.ParentRunner.run(ParentRunner.java:413) at org.junit.runners.Suite.runChild(Suite.java:128) at org.junit.runners.Suite.runChild(Suite.java:27) at org.junit.runners.ParentRunner$4.run(ParentRunner.java:331) at org.junit.runners.ParentRunner$1.schedule(ParentRunner.java:79) at org.junit.runners.ParentRunner.runChildren(ParentRunner.java:329) at org.junit.runners.ParentRunner.access$100(ParentRunner.java:66) at org.junit.runners.ParentRunner$2.evaluate(ParentRunner.java:293) at org.junit.rules.ExternalResource$1.evaluate(ExternalResource.java:54) at org.junit.rules.RunRules.evaluate(RunRules.java:20) at org.junit.runners.ParentRunner$3.evaluate(ParentRunner.java:306) at org.junit.runners.ParentRunner.run(ParentRunner.java:413) at org.gradle.api.internal.tasks.testing.junit.JUnitTestClassExecutor.runTestClass(JUnitTestClassExecutor.java:110) at org.gradle.api.internal.tasks.testing.junit.JUnitTestClassExecutor.execute(JUnitTestClassExecutor.java:58) at org.gradle.api.internal.tasks.testing.junit.JUnitTestClassExecutor.execute(JUnitTestClassExecutor.java:38) at
[jira] [Assigned] (KAFKA-10665) Flaky Test StreamTableJoinTopologyOptimizationIntegrationTest.shouldDoStreamTableJoinWithDifferentNumberOfPartitions[Optimization = all]
[ https://issues.apache.org/jira/browse/KAFKA-10665?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Luke Chen reassigned KAFKA-10665: - Assignee: Luke Chen > Flaky Test > StreamTableJoinTopologyOptimizationIntegrationTest.shouldDoStreamTableJoinWithDifferentNumberOfPartitions[Optimization > = all] > > > Key: KAFKA-10665 > URL: https://issues.apache.org/jira/browse/KAFKA-10665 > Project: Kafka > Issue Type: Bug > Components: streams >Reporter: A. Sophie Blee-Goldman >Assignee: Luke Chen >Priority: Major > Labels: flaky-test > > {code:java} > java.nio.file.DirectoryNotEmptyException: > /tmp/kafka-13241964730537515637/app-StreamTableJoinTopologyOptimizationIntegrationTestshouldDoStreamTableJoinWithDifferentNumberOfPartitions_Optimization___all_ > at > java.base/sun.nio.fs.UnixFileSystemProvider.implDelete(UnixFileSystemProvider.java:246) > at > java.base/sun.nio.fs.AbstractFileSystemProvider.delete(AbstractFileSystemProvider.java:105) > at java.base/java.nio.file.Files.delete(Files.java:1146) > at > org.apache.kafka.common.utils.Utils$2.postVisitDirectory(Utils.java:869) > at > org.apache.kafka.common.utils.Utils$2.postVisitDirectory(Utils.java:839) > at java.base/java.nio.file.Files.walkFileTree(Files.java:2822) > at java.base/java.nio.file.Files.walkFileTree(Files.java:2876) > at org.apache.kafka.common.utils.Utils.delete(Utils.java:839) > at org.apache.kafka.common.utils.Utils.delete(Utils.java:825) > at > org.apache.kafka.streams.integration.utils.IntegrationTestUtils.purgeLocalStreamsState(IntegrationTestUtils.java:151) > at > org.apache.kafka.streams.integration.StreamTableJoinTopologyOptimizationIntegrationTest.whenShuttingDown(StreamTableJoinTopologyOptimizationIntegrationTest.java:122) > {code} > https://github.com/apache/kafka/pull/9515/checks?check_run_id=1333753280 -- This message was sent by Atlassian Jira (v8.3.4#803005)
[GitHub] [kafka] d8tltanc commented on a change in pull request #9485: KAKFA-10619: Idempotent producer will get authorized once it has a WRITE access to at least one topic
d8tltanc commented on a change in pull request #9485: URL: https://github.com/apache/kafka/pull/9485#discussion_r533765909 ## File path: jmh-benchmarks/src/main/java/org/apache/kafka/jmh/acl/AclAuthorizerBenchmark.java ## @@ -69,33 +70,39 @@ @BenchmarkMode(Mode.AverageTime) @OutputTimeUnit(TimeUnit.MILLISECONDS) public class AclAuthorizerBenchmark { -@Param({"1", "5", "20"}) +@Param({"1", "4", "8"}) Review comment: The underlying algorithm of AuthorizeByResourceType() implementation in AclAuthorizer has several characteristics: 1. If any "allow resource" of the given ACE does not have a dominant "deny resource", the API will return immediately 2. The complexity is O(n*m) where `n` is the number of "allow resources" of the given ACE, 'm' is the number of "deny resources" of the given ACE, but not related to the number of "ACE" in the cluster. $1 means that, given an ACE, suppose `p%` of its "allow resource" does not have a dominant "deny resource", if `resourceCount` is `r`, on average, after checking `r * p * 0.01` "allow resources", the API will return. a) if we are let the "dominant deny resource" distribute evenly, like use the (loop index % something) to determine which "allow resource" should have a dominant "deny resource", we end up iterating the same amount of the "allow resource" and returning from the API call every time, which is `r*p*0.01` b) if we are determine which "allow resource" should have a dominant "deny resource", the result will be too noisy. We may iterate only 1 resource or iterate all resources based on the randomize algorithm and seed. $2 means that, the API time cost is not related to the number of "ACE" but is hyperbolically increasing when `resourceCount` is increasing. Under the assumption in (1), the actual complexity would be (r * r * p * 0.01) As a result, we should get an insight into how long does the worst case takes, as `t`. Then we can estimate some reasonable values of `p` and then estimate the API cost by `t * p`. So I was directly testing the worst case, where p = 1, which means 100% of the "allow resource" will have a dominant "deny resource. The complexity hence would be (r^2). It's rare that a cluster can have 200k "allow resources" and 200k corresponding "dominant deny resources" for each user, and it's not fair to have a relatively smaller `aclCount` and huger `resourceCount`, as the API is optimizing the performance by indexing on `ACE`. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] d8tltanc commented on a change in pull request #9485: KAKFA-10619: Idempotent producer will get authorized once it has a WRITE access to at least one topic
d8tltanc commented on a change in pull request #9485: URL: https://github.com/apache/kafka/pull/9485#discussion_r533765909 ## File path: jmh-benchmarks/src/main/java/org/apache/kafka/jmh/acl/AclAuthorizerBenchmark.java ## @@ -69,33 +70,39 @@ @BenchmarkMode(Mode.AverageTime) @OutputTimeUnit(TimeUnit.MILLISECONDS) public class AclAuthorizerBenchmark { -@Param({"1", "5", "20"}) +@Param({"1", "4", "8"}) Review comment: The underlying algorithm of AuthorizeByResourceType() implementation in AclAuthorizer has several characteristics: 1. If any "allow resource" of the given ACE does not have a dominant "deny resource", the API will return immediately 2. The complexity is O(n*m) where `n` is the number of "allow resources" of the given ACE, 'm' is the number of "deny resources" of the given ACE, but not related to the number of "ACE" in the cluster. $1 means that, given an ACE, suppose `p%` of its "allow resource" does not have a dominant "deny resource", if `resourceCount` is `r`, on average, after checking `r * p * 0.01` "allow resources", the API will return. a) if we are let the "dominant deny resource" distribute evenly, like use the (loop index % something) to determine which "allow resource" should have a dominant "deny resource", we end up iterating the same amount of the "allow resource" and returning from the API call every time, which is `r*p*0.01` b) if we are determine which "allow resource" should have a dominant "deny resource", the result will be too noisy. We may iterate only 1 resource or iterate all resources based on the randomize algorithm and seed. $2 means that, the API time cost is not related to the number of "ACE" but is hyperbolically increasing when `resourceCount` is increasing. Under the assumption in (1), the actual complexity would be (r * r * p * 0.01) As a result, we should get an insight into how long does the worst case takes, as `t`. Then we can estimate some reasonable values of `p` and calculate the corresponding API cost by t * p. So I was directly testing the worst case, where p = 1, which means 100% of the "allow resource" will have a dominant "deny resource. The complexity hence would be (r^2). It's rare that a cluster can have 200k "allow resources" and 200k corresponding "dominant deny resources" for each user, and it's not fair to have a relatively smaller `aclCount` and huger `resourceCount`, as the API is optimizing the performance by indexing on `ACE`. ## File path: jmh-benchmarks/src/main/java/org/apache/kafka/jmh/acl/AclAuthorizerBenchmark.java ## @@ -69,33 +70,39 @@ @BenchmarkMode(Mode.AverageTime) @OutputTimeUnit(TimeUnit.MILLISECONDS) public class AclAuthorizerBenchmark { -@Param({"1", "5", "20"}) +@Param({"1", "4", "8"}) Review comment: The underlying algorithm of AuthorizeByResourceType() implementation in AclAuthorizer has several characteristics: 1. If any "allow resource" of the given ACE does not have a dominant "deny resource", the API will return immediately 2. The complexity is O(n*m) where `n` is the number of "allow resources" of the given ACE, 'm' is the number of "deny resources" of the given ACE, but not related to the number of "ACE" in the cluster. $1 means that, given an ACE, suppose `p%` of its "allow resource" does not have a dominant "deny resource", if `resourceCount` is `r`, on average, after checking `r * p * 0.01` "allow resources", the API will return. a) if we are let the "dominant deny resource" distribute evenly, like use the (loop index % something) to determine which "allow resource" should have a dominant "deny resource", we end up iterating the same amount of the "allow resource" and returning from the API call every time, which is `r*p*0.01` b) if we are determine which "allow resource" should have a dominant "deny resource", the result will be too noisy. We may iterate only 1 resource or iterate all resources based on the randomize algorithm and seed. $2 means that, the API time cost is not related to the number of "ACE" but is hyperbolically increasing when `resourceCount` is increasing. Under the assumption in (1), the actual complexity would be (r * r * p * 0.01) As a result, we should get an insight into how long does the worst case takes, as `t`. Then we can estimate some reasonable values of `p` and calculate the corresponding API cost by `t * p`. So I was directly testing the worst case, where p = 1, which means 100% of the "allow resource" will have a dominant "deny resource. The complexity hence would be (r^2). It's rare that a cluster can have 200k "allow resources" and 200k corresponding "dominant deny resources" for each user, and it's not fair to have a relatively smaller `aclCount` and huger `resourceCount`, as the API is optimizing the performance by indexing
[GitHub] [kafka] junrao commented on pull request #8826: KAFKA-10090 Misleading warnings: The configuration was supplied but i…
junrao commented on pull request #8826: URL: https://github.com/apache/kafka/pull/8826#issuecomment-736983541 @chia7712 : There are 6 failures in the system test run. Are they related to this PR? http://confluent-kafka-branch-builder-system-test-results.s3-us-west-2.amazonaws.com/2020-12-01--001.1606874236--chia7712--KAFKA-10090--4da062adc/report.html This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Commented] (KAFKA-10772) java.lang.IllegalStateException: There are insufficient bytes available to read assignment from the sync-group response (actual byte size 0)
[ https://issues.apache.org/jira/browse/KAFKA-10772?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17242025#comment-17242025 ] A. Sophie Blee-Goldman commented on KAFKA-10772: Hm, the leadership changes definitely rings a bell – maybe this is another symptom of KAFKA-10284 ? This should be fixed in 2.6.1 which is currently in the progress of being released. You could try out the RC for that release, or just build from source if that's an option. > java.lang.IllegalStateException: There are insufficient bytes available to > read assignment from the sync-group response (actual byte size 0) > > > Key: KAFKA-10772 > URL: https://issues.apache.org/jira/browse/KAFKA-10772 > Project: Kafka > Issue Type: Bug > Components: streams >Affects Versions: 2.6.0 >Reporter: Levani Kokhreidze >Priority: Major > Attachments: KAFKA-10772.log > > > From time to time we encounter the following exception that results in Kafka > Streams threads dying. > Broker version 2.4.1, Client version 2.6.0 > {code:java} > Nov 27 00:59:53.681 streaming-app service: prod | streaming-app-2 | > stream-client [cluster1-profile-stats-pipeline-client-id] State transition > from REBALANCING to ERROR Nov 27 00:59:53.681 streaming-app service: prod | > streaming-app-2 | stream-client [cluster1-profile-stats-pipeline-client-id] > State transition from REBALANCING to ERROR Nov 27 00:59:53.682 streaming-app > service: prod | streaming-app-2 | 2020-11-27 00:59:53.681 ERROR 105 --- > [-StreamThread-1] .KafkaStreamsBasedStreamProcessingEngine : Stream > processing pipeline: [profile-stats] encountered unrecoverable exception. > Thread: [cluster1-profile-stats-pipeline-client-id-StreamThread-1] is > completely dead. If all worker threads die, Kafka Streams will be moved to > permanent ERROR state. Nov 27 00:59:53.682 streaming-app service: prod | > streaming-app-2 | Stream processing pipeline: [profile-stats] encountered > unrecoverable exception. Thread: > [cluster1-profile-stats-pipeline-client-id-StreamThread-1] is completely > dead. If all worker threads die, Kafka Streams will be moved to permanent > ERROR state. java.lang.IllegalStateException: There are insufficient bytes > available to read assignment from the sync-group response (actual byte size > 0) , this is not expected; it is possible that the leader's assign function > is buggy and did not return any assignment for this member, or because static > member is configured and the protocol is buggy hence did not get the > assignment for this member at > org.apache.kafka.clients.consumer.internals.ConsumerCoordinator.onJoinComplete(ConsumerCoordinator.java:367) > at > org.apache.kafka.clients.consumer.internals.AbstractCoordinator.joinGroupIfNeeded(AbstractCoordinator.java:440) > at > org.apache.kafka.clients.consumer.internals.AbstractCoordinator.ensureActiveGroup(AbstractCoordinator.java:359) > at > org.apache.kafka.clients.consumer.internals.ConsumerCoordinator.poll(ConsumerCoordinator.java:513) > at > org.apache.kafka.clients.consumer.KafkaConsumer.updateAssignmentMetadataIfNeeded(KafkaConsumer.java:1268) > at > org.apache.kafka.clients.consumer.KafkaConsumer.poll(KafkaConsumer.java:1230) > at > org.apache.kafka.clients.consumer.KafkaConsumer.poll(KafkaConsumer.java:1210) > at > org.apache.kafka.streams.processor.internals.StreamThread.pollRequests(StreamThread.java:766) > at > org.apache.kafka.streams.processor.internals.StreamThread.runOnce(StreamThread.java:624) > at > org.apache.kafka.streams.processor.internals.StreamThread.runLoop(StreamThread.java:551) > at > org.apache.kafka.streams.processor.internals.StreamThread.run(StreamThread.java:510) > {code} -- This message was sent by Atlassian Jira (v8.3.4#803005)
[GitHub] [kafka] ableegoldman commented on a change in pull request #9615: KAFKA-10500: Add thread option
ableegoldman commented on a change in pull request #9615: URL: https://github.com/apache/kafka/pull/9615#discussion_r533878767 ## File path: streams/src/main/java/org/apache/kafka/streams/KafkaStreams.java ## @@ -870,43 +899,73 @@ private KafkaStreams(final InternalTopologyBuilder internalTopologyBuilder, cacheSizePerThread, stateDirectory, delegatingStateRestoreListener, -i + 1, +threadIdx, KafkaStreams.this::closeToError, -this::defaultStreamsUncaughtExceptionHandler -); -threads.add(streamThread); -threadState.put(streamThread.getId(), streamThread.state()); -storeProviders.add(new StreamThreadStateStoreProvider(streamThread)); -} +streamsUncaughtExceptionHandler +); +streamThread.setStateListener(streamStateListener); +threads.add(streamThread); +threadState.put(streamThread.getId(), streamThread.state()); +storeProviders.add(new StreamThreadStateStoreProvider(streamThread)); +return streamThread; +} -ClientMetrics.addNumAliveStreamThreadMetric(streamsMetrics, (metricsConfig, now) -> -Math.toIntExact(threads.stream().filter(thread -> thread.state().isAlive()).count())); +/** + * Adds and starts a stream thread in addition to the stream threads that are already running in this + * Kafka Streams client. + * + * Since the number of stream threads increases, the sizes of the caches in the new stream thread + * and the existing stream threads are adapted so that the sum of the cache sizes over all stream + * threads does not exceed the total cache size specified in configuration + * {@link StreamsConfig#CACHE_MAX_BYTES_BUFFERING_CONFIG}. + * + * Stream threads can only be added if this Kafka Streams client is in state RUNNING or REBALANCING. + * + * @return name of the added stream thread or empty if a new stream thread could not be added + */ +public Optional addStreamThread() { +if (isRunningOrRebalancing()) { +final int threadIdx = getNextThreadIndex(); +final long cacheSizePerThread = getCacheSizePerThread(threads.size() + 1); +resizeThreadCache(cacheSizePerThread); +final StreamThread streamThread = createStreamThread(cacheSizePerThread, threadIdx); +synchronized (stateLock) { +if (isRunningOrRebalancing()) { +streamThread.start(); +return Optional.of(streamThread.getName()); +} else { +threads.remove(streamThread); Review comment: We should also shutdown the thread if it doesn't get started, otherwise me may leak (consumer or producer) clients. But I'm actually not sure why we don't just do everything (resize cache, create thread) inside the synchronized block? I'm guessing it would deadlock due to locking on the `statelock` but can't we just synchronize on something else that wouldn't interfere with the StreamThread creation? ## File path: streams/src/test/java/org/apache/kafka/streams/integration/AdjustStreamThreadCountTest.java ## @@ -0,0 +1,108 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.kafka.streams.integration; + +import org.apache.kafka.common.serialization.Serdes; +import org.apache.kafka.streams.KafkaStreams; +import org.apache.kafka.streams.StreamsBuilder; +import org.apache.kafka.streams.StreamsConfig; +import org.apache.kafka.streams.integration.utils.EmbeddedKafkaCluster; +import org.apache.kafka.streams.integration.utils.IntegrationTestUtils; +import org.apache.kafka.streams.processor.ThreadMetadata; +import org.apache.kafka.test.IntegrationTest; +import org.apache.kafka.test.StreamsTestUtils; +import org.apache.kafka.test.TestUtils; +import org.hamcrest.CoreMatchers; +import org.junit.After; +import org.junit.Before; +import org.junit.ClassRule; +import org.junit.Rule; +import org.junit.Test; +import org.junit.experimental.categories.Category; +import
[jira] [Updated] (KAFKA-10794) Replica leader election is too slow in the case of too many partitions
[ https://issues.apache.org/jira/browse/KAFKA-10794?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] limeng updated KAFKA-10794: --- Description: There is more than 6000 topics and 300 brokers in my kafka cluster, and we set the config:'auto.leader.rebalance.enable=true'. The reblance process spendes too more time and cpu resource. We find that the function:'controllerContext.allPartitions' is invoked too many times. !截屏2020-12-02 上午11.43.48.png! was: There is more than 6000 topics and 300 brokers in my kafka cluster, and we set the config:'auto.leader.rebalance.enable=true'. The reblance process spendes too more time and cpu resource. We find that the function:'controllerContext.allPartitions' is invoked too many times. !clipboard_image_1606806581629.png! > Replica leader election is too slow in the case of too many partitions > -- > > Key: KAFKA-10794 > URL: https://issues.apache.org/jira/browse/KAFKA-10794 > Project: Kafka > Issue Type: Improvement > Components: core >Affects Versions: 2.6.0, 2.5.1 >Reporter: limeng >Priority: Major > Attachments: clipboard_image_1606806581629.png, 截屏2020-12-02 > 上午11.43.48.png > > > There is more than 6000 topics and 300 brokers in my kafka cluster, and we > set the config:'auto.leader.rebalance.enable=true'. > The reblance process spendes too more time and cpu resource. > We find that the function:'controllerContext.allPartitions' is invoked too > many times. > !截屏2020-12-02 上午11.43.48.png! -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Updated] (KAFKA-10794) Replica leader election is too slow in the case of too many partitions
[ https://issues.apache.org/jira/browse/KAFKA-10794?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] limeng updated KAFKA-10794: --- Attachment: clipboard_image_1606806581629.png > Replica leader election is too slow in the case of too many partitions > -- > > Key: KAFKA-10794 > URL: https://issues.apache.org/jira/browse/KAFKA-10794 > Project: Kafka > Issue Type: Improvement > Components: core >Affects Versions: 2.6.0, 2.5.1 >Reporter: limeng >Priority: Major > Attachments: clipboard_image_1606806581629.png > > -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Updated] (KAFKA-10794) Replica leader election is too slow in the case of too many partitions
[ https://issues.apache.org/jira/browse/KAFKA-10794?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] limeng updated KAFKA-10794: --- Description: There is more than 6000 topics and 300 brokers in my kafka cluster, and we set the config:'auto.leader.rebalance.enable=true'. The reblance process spendes too more time and cpu resource. We find that the function:'controllerContext.allPartitions' is invoked too many times. !clipboard_image_1606806581629.png! > Replica leader election is too slow in the case of too many partitions > -- > > Key: KAFKA-10794 > URL: https://issues.apache.org/jira/browse/KAFKA-10794 > Project: Kafka > Issue Type: Improvement > Components: core >Affects Versions: 2.6.0, 2.5.1 >Reporter: limeng >Priority: Major > Attachments: clipboard_image_1606806581629.png > > > There is more than 6000 topics and 300 brokers in my kafka cluster, and we > set the config:'auto.leader.rebalance.enable=true'. > The reblance process spendes too more time and cpu resource. > We find that the function:'controllerContext.allPartitions' is invoked too > many times. > !clipboard_image_1606806581629.png! -- This message was sent by Atlassian Jira (v8.3.4#803005)
[GitHub] [kafka] ableegoldman commented on a change in pull request #9609: KAFKA-6687: restrict DSL to allow only Streams from the same source topics
ableegoldman commented on a change in pull request #9609: URL: https://github.com/apache/kafka/pull/9609#discussion_r533876280 ## File path: streams/src/main/java/org/apache/kafka/streams/kstream/internals/graph/SourceGraphNode.java ## @@ -0,0 +1,71 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.kafka.streams.kstream.internals.graph; + +import java.util.Collection; +import java.util.HashSet; +import java.util.Set; +import java.util.regex.Pattern; +import org.apache.kafka.common.serialization.Serde; +import org.apache.kafka.streams.kstream.internals.ConsumedInternal; + +abstract public class SourceGraphNode extends StreamsGraphNode { + +private Collection topicNames; +private Pattern topicPattern; +private final ConsumedInternal consumedInternal; + +public SourceGraphNode(final String nodeName, +final Collection topicNames, +final ConsumedInternal consumedInternal) { +super(nodeName); + +this.topicNames = topicNames; +this.consumedInternal = consumedInternal; +} + +public SourceGraphNode(final String nodeName, +final Pattern topicPattern, +final ConsumedInternal consumedInternal) { + +super(nodeName); + +this.topicPattern = topicPattern; +this.consumedInternal = consumedInternal; +} + +public Set topicNames() { +return new HashSet<>(topicNames); Review comment: Ah ok you meant making it a Set vs a Collection -- I do agree with the principle, and I did push the Set-ification of the topics up one level so that the actual class field is a Set. But I don't think it's really worth it to push it up another layer and Set-ify the constructor argument. For one thing we would just have to do the same conversion to a Set but in more places, and more importantly, the actual callers of the constructor don't care at all whether it's a Set or any other Collection. So I think it actually does make sense to convert to a Set inside the constructor body This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] ableegoldman commented on pull request #9671: KAFKA-10793: move handling of FindCoordinatorFuture to fix race condition
ableegoldman commented on pull request #9671: URL: https://github.com/apache/kafka/pull/9671#issuecomment-736964914 Waiting to add tests until I get some sanity checks on this proposal This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] showuon opened a new pull request #9672: MINOR: work in progress for Eos test(don't review)
showuon opened a new pull request #9672: URL: https://github.com/apache/kafka/pull/9672 work in progress for Eos test(don't review) ### Committer Checklist (excluded from commit message) - [ ] Verify design and implementation - [ ] Verify test coverage and CI build status - [ ] Verify documentation (including upgrade notes) This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Resolved] (KAFKA-9263) The new hw is added to incorrect log when ReplicaAlterLogDirsThread is replacing log (fix PlaintextAdminIntegrationTest.testAlterReplicaLogDirs)
[ https://issues.apache.org/jira/browse/KAFKA-9263?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Chia-Ping Tsai resolved KAFKA-9263. --- Fix Version/s: 2.8.0 Resolution: Fixed > The new hw is added to incorrect log when ReplicaAlterLogDirsThread is > replacing log (fix PlaintextAdminIntegrationTest.testAlterReplicaLogDirs) > -- > > Key: KAFKA-9263 > URL: https://issues.apache.org/jira/browse/KAFKA-9263 > Project: Kafka > Issue Type: Bug > Components: clients >Affects Versions: 2.4.0 >Reporter: John Roesler >Assignee: Chia-Ping Tsai >Priority: Major > Labels: flaky-test > Fix For: 2.8.0 > > > This test has failed for me on > https://builds.apache.org/job/kafka-pr-jdk11-scala2.12/9691/testReport/junit/kafka.api/AdminClientIntegrationTest/testAlterReplicaLogDirs/ > {noformat} > Error Message > org.scalatest.exceptions.TestFailedException: only 0 messages are produced > within timeout after replica movement. Producer future > Some(Failure(java.util.concurrent.TimeoutException: Timeout after waiting for > 1 ms.)) > Stacktrace > org.scalatest.exceptions.TestFailedException: only 0 messages are produced > within timeout after replica movement. Producer future > Some(Failure(java.util.concurrent.TimeoutException: Timeout after waiting for > 1 ms.)) > at > org.scalatest.Assertions.newAssertionFailedException(Assertions.scala:530) > at > org.scalatest.Assertions.newAssertionFailedException$(Assertions.scala:529) > at > org.scalatest.Assertions$.newAssertionFailedException(Assertions.scala:1389) > at org.scalatest.Assertions.fail(Assertions.scala:1091) > at org.scalatest.Assertions.fail$(Assertions.scala:1087) > at org.scalatest.Assertions$.fail(Assertions.scala:1389) > at kafka.utils.TestUtils$.waitUntilTrue(TestUtils.scala:842) > at > kafka.api.AdminClientIntegrationTest.testAlterReplicaLogDirs(AdminClientIntegrationTest.scala:459) > at > java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke0(Native Method) > at > java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62) > at > java.base/jdk.internal.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) > at java.base/java.lang.reflect.Method.invoke(Method.java:566) > at > org.junit.runners.model.FrameworkMethod$1.runReflectiveCall(FrameworkMethod.java:59) > at > org.junit.internal.runners.model.ReflectiveCallable.run(ReflectiveCallable.java:12) > at > org.junit.runners.model.FrameworkMethod.invokeExplosively(FrameworkMethod.java:56) > at > org.junit.internal.runners.statements.InvokeMethod.evaluate(InvokeMethod.java:17) > at > org.junit.internal.runners.statements.RunBefores.evaluate(RunBefores.java:26) > at > org.junit.internal.runners.statements.RunAfters.evaluate(RunAfters.java:27) > at > org.junit.internal.runners.statements.FailOnTimeout$CallableStatement.call(FailOnTimeout.java:288) > at > org.junit.internal.runners.statements.FailOnTimeout$CallableStatement.call(FailOnTimeout.java:282) > at java.base/java.util.concurrent.FutureTask.run(FutureTask.java:264) > at java.base/java.lang.Thread.run(Thread.java:834) > Standard Output > [2019-12-03 04:54:16,111] ERROR [ReplicaFetcher replicaId=2, leaderId=1, > fetcherId=0] Error for partition unclean-test-topic-1-0 at offset 0 > (kafka.server.ReplicaFetcherThread:76) > org.apache.kafka.common.errors.UnknownTopicOrPartitionException: This server > does not host this topic-partition. > [2019-12-03 04:54:21,711] ERROR [ReplicaFetcher replicaId=0, leaderId=1, > fetcherId=0] Error for partition topic-0 at offset 0 > (kafka.server.ReplicaFetcherThread:76) > org.apache.kafka.common.errors.UnknownTopicOrPartitionException: This server > does not host this topic-partition. > [2019-12-03 04:54:21,712] ERROR [ReplicaFetcher replicaId=2, leaderId=1, > fetcherId=0] Error for partition topic-0 at offset 0 > (kafka.server.ReplicaFetcherThread:76) > org.apache.kafka.common.errors.UnknownTopicOrPartitionException: This server > does not host this topic-partition. > [2019-12-03 04:54:27,092] ERROR [ReplicaFetcher replicaId=2, leaderId=1, > fetcherId=0] Error for partition unclean-test-topic-1-0 at offset 0 > (kafka.server.ReplicaFetcherThread:76) > org.apache.kafka.common.errors.UnknownTopicOrPartitionException: This server > does not host this topic-partition. > [2019-12-03 04:54:27,091] ERROR [ReplicaFetcher replicaId=0, leaderId=1, > fetcherId=0] Error for partition unclean-test-topic-1-1 at offset 0 >
[GitHub] [kafka] chia7712 merged pull request #9423: KAFKA-9263 The new hw is added to incorrect log when ReplicaAlterLogD…
chia7712 merged pull request #9423: URL: https://github.com/apache/kafka/pull/9423 This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Commented] (KAFKA-9263) Reocurrence: Transient failure in kafka.api.PlaintextAdminIntegrationTest.testLogStartOffsetCheckpoint and kafka.api.PlaintextAdminIntegrationTest.testAlterReplicaLogDi
[ https://issues.apache.org/jira/browse/KAFKA-9263?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17242016#comment-17242016 ] Chia-Ping Tsai commented on KAFKA-9263: --- PlaintextAdminIntegrationTest.testLogStartOffsetCheckpoint does not fail recently and I looped it 200 times, all pass. The https://github.com/apache/kafka/pull/9423 which fixes kafka.api.PlaintextAdminIntegrationTest.testAlterReplicaLogDirs is going to be merged so I will revise the title of this issue (i.e remove PlaintextAdminIntegrationTest.testLogStartOffsetCheckpoint) > Reocurrence: Transient failure in > kafka.api.PlaintextAdminIntegrationTest.testLogStartOffsetCheckpoint and > kafka.api.PlaintextAdminIntegrationTest.testAlterReplicaLogDirs > -- > > Key: KAFKA-9263 > URL: https://issues.apache.org/jira/browse/KAFKA-9263 > Project: Kafka > Issue Type: Bug > Components: clients >Affects Versions: 2.4.0 >Reporter: John Roesler >Priority: Major > Labels: flaky-test > > This test has failed for me on > https://builds.apache.org/job/kafka-pr-jdk11-scala2.12/9691/testReport/junit/kafka.api/AdminClientIntegrationTest/testAlterReplicaLogDirs/ > {noformat} > Error Message > org.scalatest.exceptions.TestFailedException: only 0 messages are produced > within timeout after replica movement. Producer future > Some(Failure(java.util.concurrent.TimeoutException: Timeout after waiting for > 1 ms.)) > Stacktrace > org.scalatest.exceptions.TestFailedException: only 0 messages are produced > within timeout after replica movement. Producer future > Some(Failure(java.util.concurrent.TimeoutException: Timeout after waiting for > 1 ms.)) > at > org.scalatest.Assertions.newAssertionFailedException(Assertions.scala:530) > at > org.scalatest.Assertions.newAssertionFailedException$(Assertions.scala:529) > at > org.scalatest.Assertions$.newAssertionFailedException(Assertions.scala:1389) > at org.scalatest.Assertions.fail(Assertions.scala:1091) > at org.scalatest.Assertions.fail$(Assertions.scala:1087) > at org.scalatest.Assertions$.fail(Assertions.scala:1389) > at kafka.utils.TestUtils$.waitUntilTrue(TestUtils.scala:842) > at > kafka.api.AdminClientIntegrationTest.testAlterReplicaLogDirs(AdminClientIntegrationTest.scala:459) > at > java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke0(Native Method) > at > java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62) > at > java.base/jdk.internal.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) > at java.base/java.lang.reflect.Method.invoke(Method.java:566) > at > org.junit.runners.model.FrameworkMethod$1.runReflectiveCall(FrameworkMethod.java:59) > at > org.junit.internal.runners.model.ReflectiveCallable.run(ReflectiveCallable.java:12) > at > org.junit.runners.model.FrameworkMethod.invokeExplosively(FrameworkMethod.java:56) > at > org.junit.internal.runners.statements.InvokeMethod.evaluate(InvokeMethod.java:17) > at > org.junit.internal.runners.statements.RunBefores.evaluate(RunBefores.java:26) > at > org.junit.internal.runners.statements.RunAfters.evaluate(RunAfters.java:27) > at > org.junit.internal.runners.statements.FailOnTimeout$CallableStatement.call(FailOnTimeout.java:288) > at > org.junit.internal.runners.statements.FailOnTimeout$CallableStatement.call(FailOnTimeout.java:282) > at java.base/java.util.concurrent.FutureTask.run(FutureTask.java:264) > at java.base/java.lang.Thread.run(Thread.java:834) > Standard Output > [2019-12-03 04:54:16,111] ERROR [ReplicaFetcher replicaId=2, leaderId=1, > fetcherId=0] Error for partition unclean-test-topic-1-0 at offset 0 > (kafka.server.ReplicaFetcherThread:76) > org.apache.kafka.common.errors.UnknownTopicOrPartitionException: This server > does not host this topic-partition. > [2019-12-03 04:54:21,711] ERROR [ReplicaFetcher replicaId=0, leaderId=1, > fetcherId=0] Error for partition topic-0 at offset 0 > (kafka.server.ReplicaFetcherThread:76) > org.apache.kafka.common.errors.UnknownTopicOrPartitionException: This server > does not host this topic-partition. > [2019-12-03 04:54:21,712] ERROR [ReplicaFetcher replicaId=2, leaderId=1, > fetcherId=0] Error for partition topic-0 at offset 0 > (kafka.server.ReplicaFetcherThread:76) > org.apache.kafka.common.errors.UnknownTopicOrPartitionException: This server > does not host this topic-partition. > [2019-12-03 04:54:27,092] ERROR [ReplicaFetcher replicaId=2, leaderId=1, > fetcherId=0] Error for partition unclean-test-topic-1-0 at
[jira] [Assigned] (KAFKA-9263) The new hw is added to incorrect log when ReplicaAlterLogDirsThread is replacing log (fix PlaintextAdminIntegrationTest.testAlterReplicaLogDirs)
[ https://issues.apache.org/jira/browse/KAFKA-9263?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Chia-Ping Tsai reassigned KAFKA-9263: - Assignee: Chia-Ping Tsai > The new hw is added to incorrect log when ReplicaAlterLogDirsThread is > replacing log (fix PlaintextAdminIntegrationTest.testAlterReplicaLogDirs) > -- > > Key: KAFKA-9263 > URL: https://issues.apache.org/jira/browse/KAFKA-9263 > Project: Kafka > Issue Type: Bug > Components: clients >Affects Versions: 2.4.0 >Reporter: John Roesler >Assignee: Chia-Ping Tsai >Priority: Major > Labels: flaky-test > > This test has failed for me on > https://builds.apache.org/job/kafka-pr-jdk11-scala2.12/9691/testReport/junit/kafka.api/AdminClientIntegrationTest/testAlterReplicaLogDirs/ > {noformat} > Error Message > org.scalatest.exceptions.TestFailedException: only 0 messages are produced > within timeout after replica movement. Producer future > Some(Failure(java.util.concurrent.TimeoutException: Timeout after waiting for > 1 ms.)) > Stacktrace > org.scalatest.exceptions.TestFailedException: only 0 messages are produced > within timeout after replica movement. Producer future > Some(Failure(java.util.concurrent.TimeoutException: Timeout after waiting for > 1 ms.)) > at > org.scalatest.Assertions.newAssertionFailedException(Assertions.scala:530) > at > org.scalatest.Assertions.newAssertionFailedException$(Assertions.scala:529) > at > org.scalatest.Assertions$.newAssertionFailedException(Assertions.scala:1389) > at org.scalatest.Assertions.fail(Assertions.scala:1091) > at org.scalatest.Assertions.fail$(Assertions.scala:1087) > at org.scalatest.Assertions$.fail(Assertions.scala:1389) > at kafka.utils.TestUtils$.waitUntilTrue(TestUtils.scala:842) > at > kafka.api.AdminClientIntegrationTest.testAlterReplicaLogDirs(AdminClientIntegrationTest.scala:459) > at > java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke0(Native Method) > at > java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62) > at > java.base/jdk.internal.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) > at java.base/java.lang.reflect.Method.invoke(Method.java:566) > at > org.junit.runners.model.FrameworkMethod$1.runReflectiveCall(FrameworkMethod.java:59) > at > org.junit.internal.runners.model.ReflectiveCallable.run(ReflectiveCallable.java:12) > at > org.junit.runners.model.FrameworkMethod.invokeExplosively(FrameworkMethod.java:56) > at > org.junit.internal.runners.statements.InvokeMethod.evaluate(InvokeMethod.java:17) > at > org.junit.internal.runners.statements.RunBefores.evaluate(RunBefores.java:26) > at > org.junit.internal.runners.statements.RunAfters.evaluate(RunAfters.java:27) > at > org.junit.internal.runners.statements.FailOnTimeout$CallableStatement.call(FailOnTimeout.java:288) > at > org.junit.internal.runners.statements.FailOnTimeout$CallableStatement.call(FailOnTimeout.java:282) > at java.base/java.util.concurrent.FutureTask.run(FutureTask.java:264) > at java.base/java.lang.Thread.run(Thread.java:834) > Standard Output > [2019-12-03 04:54:16,111] ERROR [ReplicaFetcher replicaId=2, leaderId=1, > fetcherId=0] Error for partition unclean-test-topic-1-0 at offset 0 > (kafka.server.ReplicaFetcherThread:76) > org.apache.kafka.common.errors.UnknownTopicOrPartitionException: This server > does not host this topic-partition. > [2019-12-03 04:54:21,711] ERROR [ReplicaFetcher replicaId=0, leaderId=1, > fetcherId=0] Error for partition topic-0 at offset 0 > (kafka.server.ReplicaFetcherThread:76) > org.apache.kafka.common.errors.UnknownTopicOrPartitionException: This server > does not host this topic-partition. > [2019-12-03 04:54:21,712] ERROR [ReplicaFetcher replicaId=2, leaderId=1, > fetcherId=0] Error for partition topic-0 at offset 0 > (kafka.server.ReplicaFetcherThread:76) > org.apache.kafka.common.errors.UnknownTopicOrPartitionException: This server > does not host this topic-partition. > [2019-12-03 04:54:27,092] ERROR [ReplicaFetcher replicaId=2, leaderId=1, > fetcherId=0] Error for partition unclean-test-topic-1-0 at offset 0 > (kafka.server.ReplicaFetcherThread:76) > org.apache.kafka.common.errors.UnknownTopicOrPartitionException: This server > does not host this topic-partition. > [2019-12-03 04:54:27,091] ERROR [ReplicaFetcher replicaId=0, leaderId=1, > fetcherId=0] Error for partition unclean-test-topic-1-1 at offset 0 > (kafka.server.ReplicaFetcherThread:76) >
[jira] [Updated] (KAFKA-9263) The new hw is added to incorrect log when ReplicaAlterLogDirsThread is replacing log (fix PlaintextAdminIntegrationTest.testAlterReplicaLogDirs)
[ https://issues.apache.org/jira/browse/KAFKA-9263?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Chia-Ping Tsai updated KAFKA-9263: -- Summary: The new hw is added to incorrect log when ReplicaAlterLogDirsThread is replacing log (fix PlaintextAdminIntegrationTest.testAlterReplicaLogDirs) (was: Reocurrence: Transient failure in kafka.api.PlaintextAdminIntegrationTest.testLogStartOffsetCheckpoint and kafka.api.PlaintextAdminIntegrationTest.testAlterReplicaLogDirs) > The new hw is added to incorrect log when ReplicaAlterLogDirsThread is > replacing log (fix PlaintextAdminIntegrationTest.testAlterReplicaLogDirs) > -- > > Key: KAFKA-9263 > URL: https://issues.apache.org/jira/browse/KAFKA-9263 > Project: Kafka > Issue Type: Bug > Components: clients >Affects Versions: 2.4.0 >Reporter: John Roesler >Priority: Major > Labels: flaky-test > > This test has failed for me on > https://builds.apache.org/job/kafka-pr-jdk11-scala2.12/9691/testReport/junit/kafka.api/AdminClientIntegrationTest/testAlterReplicaLogDirs/ > {noformat} > Error Message > org.scalatest.exceptions.TestFailedException: only 0 messages are produced > within timeout after replica movement. Producer future > Some(Failure(java.util.concurrent.TimeoutException: Timeout after waiting for > 1 ms.)) > Stacktrace > org.scalatest.exceptions.TestFailedException: only 0 messages are produced > within timeout after replica movement. Producer future > Some(Failure(java.util.concurrent.TimeoutException: Timeout after waiting for > 1 ms.)) > at > org.scalatest.Assertions.newAssertionFailedException(Assertions.scala:530) > at > org.scalatest.Assertions.newAssertionFailedException$(Assertions.scala:529) > at > org.scalatest.Assertions$.newAssertionFailedException(Assertions.scala:1389) > at org.scalatest.Assertions.fail(Assertions.scala:1091) > at org.scalatest.Assertions.fail$(Assertions.scala:1087) > at org.scalatest.Assertions$.fail(Assertions.scala:1389) > at kafka.utils.TestUtils$.waitUntilTrue(TestUtils.scala:842) > at > kafka.api.AdminClientIntegrationTest.testAlterReplicaLogDirs(AdminClientIntegrationTest.scala:459) > at > java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke0(Native Method) > at > java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62) > at > java.base/jdk.internal.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) > at java.base/java.lang.reflect.Method.invoke(Method.java:566) > at > org.junit.runners.model.FrameworkMethod$1.runReflectiveCall(FrameworkMethod.java:59) > at > org.junit.internal.runners.model.ReflectiveCallable.run(ReflectiveCallable.java:12) > at > org.junit.runners.model.FrameworkMethod.invokeExplosively(FrameworkMethod.java:56) > at > org.junit.internal.runners.statements.InvokeMethod.evaluate(InvokeMethod.java:17) > at > org.junit.internal.runners.statements.RunBefores.evaluate(RunBefores.java:26) > at > org.junit.internal.runners.statements.RunAfters.evaluate(RunAfters.java:27) > at > org.junit.internal.runners.statements.FailOnTimeout$CallableStatement.call(FailOnTimeout.java:288) > at > org.junit.internal.runners.statements.FailOnTimeout$CallableStatement.call(FailOnTimeout.java:282) > at java.base/java.util.concurrent.FutureTask.run(FutureTask.java:264) > at java.base/java.lang.Thread.run(Thread.java:834) > Standard Output > [2019-12-03 04:54:16,111] ERROR [ReplicaFetcher replicaId=2, leaderId=1, > fetcherId=0] Error for partition unclean-test-topic-1-0 at offset 0 > (kafka.server.ReplicaFetcherThread:76) > org.apache.kafka.common.errors.UnknownTopicOrPartitionException: This server > does not host this topic-partition. > [2019-12-03 04:54:21,711] ERROR [ReplicaFetcher replicaId=0, leaderId=1, > fetcherId=0] Error for partition topic-0 at offset 0 > (kafka.server.ReplicaFetcherThread:76) > org.apache.kafka.common.errors.UnknownTopicOrPartitionException: This server > does not host this topic-partition. > [2019-12-03 04:54:21,712] ERROR [ReplicaFetcher replicaId=2, leaderId=1, > fetcherId=0] Error for partition topic-0 at offset 0 > (kafka.server.ReplicaFetcherThread:76) > org.apache.kafka.common.errors.UnknownTopicOrPartitionException: This server > does not host this topic-partition. > [2019-12-03 04:54:27,092] ERROR [ReplicaFetcher replicaId=2, leaderId=1, > fetcherId=0] Error for partition unclean-test-topic-1-0 at offset 0 > (kafka.server.ReplicaFetcherThread:76) > org.apache.kafka.common.errors.UnknownTopicOrPartitionException: This server >
[jira] [Created] (KAFKA-10794) Replica leader election is too slow in the case of too many partitions
limeng created KAFKA-10794: -- Summary: Replica leader election is too slow in the case of too many partitions Key: KAFKA-10794 URL: https://issues.apache.org/jira/browse/KAFKA-10794 Project: Kafka Issue Type: Improvement Components: core Affects Versions: 2.5.1, 2.6.0 Reporter: limeng -- This message was sent by Atlassian Jira (v8.3.4#803005)
[GitHub] [kafka] highluck commented on a change in pull request #8965: KAFKA-8147: Add changelog topic configuration to KTable suppress
highluck commented on a change in pull request #8965: URL: https://github.com/apache/kafka/pull/8965#discussion_r533863847 ## File path: docs/streams/developer-guide/dsl-api.html ## @@ -3393,6 +3393,10 @@ KTable-KTable Foreign-Key but this simple example creates a buffer with no upper bound. +withLoggingDisabled() + + This configures the suppression operator to disable logging for changelog entries. + Review comment: update code thanks! This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] ableegoldman opened a new pull request #9671: KAFKA-10793: move handling of FindCoordinatorFuture to fix race condition
ableegoldman opened a new pull request #9671: URL: https://github.com/apache/kafka/pull/9671 A race condition between the consumer and hb thread can lead to a failed but non-null `findCoordinatorFuture`, causing the AbstractCoordinator to wait endlessly on the request which it thinks is still in flight. We should move the handling of this future out of the listener callbacks and into the `ensureCoordinatorReady()` method where we can check the exception and clear the future all in one place. See ticket for full analysis. Also starts logging a warning if the consumer is unable to connect to the coordinator for longer than the max poll interval. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] chia7712 commented on pull request #9666: MINOR: Remove broken `.travis.yml` with system test hooks
chia7712 commented on pull request #9666: URL: https://github.com/apache/kafka/pull/9666#issuecomment-736948097 > Nice that you got it working! I think there is value running a subset of system tests automatically. It might even encourage us to write more system tests if we can get the feedback more easily. Not having access to logs is annoying, but might not be a dealbreaker. It looks like we need a bit of initial work to tune the build though. I'm happy to close this and merge #9652. Then perhaps we can file a few jiras for any follow-up actions. What do you think? +1 This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Comment Edited] (KAFKA-10793) Race condition in FindCoordinatorFuture permanently severs connection to group coordinator
[ https://issues.apache.org/jira/browse/KAFKA-10793?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17241976#comment-17241976 ] A. Sophie Blee-Goldman edited comment on KAFKA-10793 at 12/2/20, 2:30 AM: -- At this point we can only guess, but all signs point to a race condition between the main consumer thread and the heartbeat thread. One possibility is that when the future failed it just didn't trigger the `onFailure` callback, but [~guozhang] & I have both looked through the source code and don't see any way for this to occur. Another possibility is that the `onFailure` callback was triggered, but it was invoked too soon. If the future was completed before we ever assigned it to the _findCoordinatorFuture_ field, then we would never actually clear the latest future (we would just set an already-null field to null again). Is this possible? Here's how the AbstractCoordinator builds the request and assigns the future: {code:java} protected synchronized RequestFuture lookupCoordinator() { ... findCoordinatorFuture = sendFindCoordinatorRequest(node); } {code} {code:java} private RequestFuture sendFindCoordinatorRequest(Node node) { ... return client.send(node, requestBuilder) .compose(new FindCoordinatorResponseHandler());{code} {code:java} }{code} Inside #compose we call #addListener, which contains this snippet: {code:java} if (failed()) fireFailure(); {code} If the request has already failed by the time we reach this, then we'll trigger the `onFailure` callback before #compose ever returns – ie before we've assigned the future to _findCoordinatorFuture_. The obvious question now is whether it's possible for the request to be failed in another thread while one thread is in the middle of the synchronized lookupCoordinator(). The request can be failed by the ConsumerNetworkClient when polled, during checkDisconnects(). The heartbeat thread actually synchronizes the entire run loop, so it doesn't seem possible for the hb thread to fail this request in the background of the main thread during a lookupCoordinator(). But the inverse is not true: it's possible for the main consumer thread to fail the request while the hb thread is inside of lookupCoordinator(). The AbstractCoordinator will poll the network client inside of joinGroupIfNeeded(), which in not itself synchronized and may be invoked without any locking through a Consumer#poll. was (Author: ableegoldman): At this point we can only guess, but all signs point to a race condition between the main consumer thread and the heartbeat thread. One possibility is that when the future failed it just didn't trigger the `onFailure` callback, but [~guozhang] & I have both looked through the source code and don't see any way for this to occur. Another possibility is that the `onFailure` callback was triggered, but it was invoked too soon. If the future was completed before we ever assigned it to the _findCoordinatorFuture_ field, then we would never actually clear the latest future (we would just set an already-null field to null again). Is this possible? Here's how the AbstractCoordinator builds the request and assigns the future: {code:java} protected synchronized RequestFuture lookupCoordinator() { ... findCoordinatorFuture = sendFindCoordinatorRequest(node); } {code} {code:java} private RequestFuture sendFindCoordinatorRequest(Node node) { ... return client.send(node, requestBuilder) .compose(new FindCoordinatorResponseHandler());{code} {code:java} }{code} Inside #compose we call #addListener, which contains this snippet: {code:java} if (failed()) fireFailure(); {code} If the request has already failed by the time we reach this, then we'll trigger the `onFailure` callback before #compose ever returns – ie before we've assigned the future to _findCoordinatorFuture_. The obvious question now is whether it's possible for the request to be failed in another thread while one thread is in the middle of the synchronized lookupCoordinator(). The request can be failed by the ConsumerNetworkClient when polled, during checkDisconnects(). The heartbeat thread actually synchronizes the entire run loop, so it doesn't seem possible for the hb thread to fail this request in the background of the main thread during a lookupCoordinator(). But the inverse is not true: it's possible for the main consumer thread to fail the request while the hb thread is inside of lookupCoordinator(). The AbstractCoordinator will poll the network client inside of joinGroupIfNeeded(), which in not itself synchronized and may be invoked without any locking through a Consumer#poll. (note that clearFindCoordinatorFuture() is also synchronized, but it's possible we trigger this method after leaving the synchronized lookupCoordinator method but before assigning the result to the
[jira] [Comment Edited] (KAFKA-10793) Race condition in FindCoordinatorFuture permanently severs connection to group coordinator
[ https://issues.apache.org/jira/browse/KAFKA-10793?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17241976#comment-17241976 ] A. Sophie Blee-Goldman edited comment on KAFKA-10793 at 12/2/20, 2:26 AM: -- At this point we can only guess, but all signs point to a race condition between the main consumer thread and the heartbeat thread. One possibility is that when the future failed it just didn't trigger the `onFailure` callback, but [~guozhang] & I have both looked through the source code and don't see any way for this to occur. Another possibility is that the `onFailure` callback was triggered, but it was invoked too soon. If the future was completed before we ever assigned it to the _findCoordinatorFuture_ field, then we would never actually clear the latest future (we would just set an already-null field to null again). Is this possible? Here's how the AbstractCoordinator builds the request and assigns the future: {code:java} protected synchronized RequestFuture lookupCoordinator() { ... findCoordinatorFuture = sendFindCoordinatorRequest(node); } {code} {code:java} private RequestFuture sendFindCoordinatorRequest(Node node) { ... return client.send(node, requestBuilder) .compose(new FindCoordinatorResponseHandler());{code} {code:java} }{code} Inside #compose we call #addListener, which contains this snippet: {code:java} if (failed()) fireFailure(); {code} If the request has already failed by the time we reach this, then we'll trigger the `onFailure` callback before #compose ever returns – ie before we've assigned the future to _findCoordinatorFuture_. The obvious question now is whether it's possible for the request to be failed in another thread while one thread is in the middle of the synchronized lookupCoordinator(). The request can be failed by the ConsumerNetworkClient when polled, during checkDisconnects(). The heartbeat thread actually synchronizes the entire run loop, so it doesn't seem possible for the hb thread to fail this request in the background of the main thread during a lookupCoordinator(). But the inverse is not true: it's possible for the main consumer thread to fail the request while the hb thread is inside of lookupCoordinator(). The AbstractCoordinator will poll the network client inside of joinGroupIfNeeded(), which in not itself synchronized and may be invoked without any locking through a Consumer#poll. (note that clearFindCoordinatorFuture() is also synchronized, but it's possible we trigger this method after leaving the synchronized lookupCoordinator method but before assigning the result to the _findCoordinatorFuture_ was (Author: ableegoldman): At this point we can only guess, but all signs point to a race condition between the main consumer thread and the heartbeat thread. One possibility is that when the future failed it just didn't trigger the `onFailure` callback, but [~guozhang] & I have both looked through the source code and don't see any way for this to occur. Another possibility is that the `onFailure` callback was triggered, but it was invoked too soon. If the future was completed before we ever assigned it to the _findCoordinatorFuture_ field, then we would never actually clear the latest future (we would just set an already-null field to null again). Is this possible? Here's how the AbstractCoordinator builds the request and assigns the future: {code:java} protected synchronized RequestFuture lookupCoordinator() { ... findCoordinatorFuture = sendFindCoordinatorRequest(node); } {code} {code:java} private RequestFuture sendFindCoordinatorRequest(Node node) { ... return client.send(node, requestBuilder).compose(new FindCoordinatorResponseHandler()); }{code} Inside #compose we call #addListener, which contains this snippet: {code:java} if (failed()) fireFailure(); {code} If the request has already failed by the time we reach this, then we'll trigger the `onFailure` callback before #compose ever returns – ie before we've assigned the future to _findCoordinatorFuture_. The obvious question now is whether it's possible for the request to be failed in another thread while one thread is in the middle of the synchronized lookupCoordinator(). The request can be failed by the ConsumerNetworkClient when polled, during checkDisconnects(). The heartbeat thread actually synchronizes the entire run loop, so it doesn't seem possible for the hb thread to fail this request in the background of the main thread during a lookupCoordinator(). But the inverse is not true: it's possible for the main consumer thread to fail the request while the hb thread is inside of lookupCoordinator(). The AbstractCoordinator will poll the network client inside of joinGroupIfNeeded(), which in not itself synchronized and may be invoked without any locking through a Consumer#poll. > Race
[jira] [Commented] (KAFKA-10688) Handle accidental truncation of repartition topics as exceptional failure
[ https://issues.apache.org/jira/browse/KAFKA-10688?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17241992#comment-17241992 ] Guozhang Wang commented on KAFKA-10688: --- Had some more discussions with [~cadonna] about different scenarios, and I think we can potentially enlarge the scope of this ticket to include all the following cases: 1) When starting the application for the first time, the repartition is newly created. In this case we should set the starting offset on the repartition topics according to the global reset policy. 2) When restarting the application, where the repartition topic already exist and may have some data. In this case we would try to read the committed offset and start from there. 2.a) If the committed offset is already out of the range --- i.e. a truncation happens before restarting the application --- we should treat it as a fatal error. 2.b) if there is no committed offset, indicating that either the application was not gracefully shutdown before (since otherwise the committed offset should be found), or the committed offset is somehow lost. We should treat it as a fatal error. 3) During normal processing, suddenly the consumer found itself out of the range --- i.e. a truncation happens at the same time --- we should treat it as a fatal error. The challenge today is that we cannot easily distinguish case 1) from case 2) and 3), since the consumer would throw the same invalid offset exception and Streams would handle it universally. Instead of relying on consumer to improve (KAFKA-3370), we can do it at the Streams layer only, as the following: * Whenever we create the repartition topic, we commit an offset as 0 regardless to the global offset reset policy, since in either earliest or latest it should just be 0. * Whenever we get an invalid offset exception (note we still keep the consumer's configuration as `none`), we check if it is from the repartition topic, if yes we always treat it as fatal error; if not we use the reset policy on the corresponding source topic accordingly. > Handle accidental truncation of repartition topics as exceptional failure > - > > Key: KAFKA-10688 > URL: https://issues.apache.org/jira/browse/KAFKA-10688 > Project: Kafka > Issue Type: Improvement > Components: streams >Reporter: Guozhang Wang >Assignee: Guozhang Wang >Priority: Major > > Today we always handle InvalidOffsetException from the main consumer by the > resetting policy assuming they are for source topics. But repartition topics > are also source topics and should never be truncated and hence cause > InvalidOffsetException. > We should differentiate these repartition topics from external source topics > and treat the InvalidOffsetException from repartition topics as fatal and > close the whole application. -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Commented] (KAFKA-10793) Race condition in FindCoordinatorFuture permanently severs connection to group coordinator
[ https://issues.apache.org/jira/browse/KAFKA-10793?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17241987#comment-17241987 ] A. Sophie Blee-Goldman commented on KAFKA-10793: So that's our best guess. We should discuss the solution on the PR, but I'll lay out the two possibilities I see here in case anyone has any better ideas. 1) synchronize joinGroupIfNeeded() 2) clear the _findCoordinatorFuture_ when handling the result, rather than in the listener callbacks. For example in the main loop of ensureCoordinatorReady() Personally I think option 2 is better, since it makes a lot more sense to me to begin with. By clearing the future in the listener callbacks, we might clear it before we ever even get to check on the result, eg the exception if failed. We actually seem to already anticipate this particular problem and recently implemented a workaround by adding an extra listener which saves the exception to a class _findCoordinatorException_ field. If we just wait to clear the future then presumably we could remove this workaround as well, and just save the exception when we check "if (future.failed())" inside of lookupCoordinator(). All that said, I'm not intimately familiar with the technical details of the ConsumerNetworkClient and how it handles its RequestFutures, so it's possible I'm missing something important. > Race condition in FindCoordinatorFuture permanently severs connection to > group coordinator > -- > > Key: KAFKA-10793 > URL: https://issues.apache.org/jira/browse/KAFKA-10793 > Project: Kafka > Issue Type: Bug > Components: consumer, streams >Affects Versions: 2.5.0 >Reporter: A. Sophie Blee-Goldman >Priority: Critical > > Pretty much as soon as we started actively monitoring the > _last-rebalance-seconds-ago_ metric in our Kafka Streams test environment, we > started seeing something weird. Every so often one of the StreamThreads (ie a > single Consumer instance) would appear to permanently fall out of the group, > as evidenced by a monotonically increasing _last-rebalance-seconds-ago._ We > inject artificial network failures every few hours at most, so the group > rebalances quite often. But the one consumer never rejoins, with no other > symptoms (besides a slight drop in throughput since the remaining threads had > to take over this member's work). We're confident that the problem exists in > the client layer, since the logs confirmed that the unhealthy consumer was > still calling poll. It was also calling Consumer#committed in its main poll > loop, which was consistently failing with a TimeoutException. > When I attached a remote debugger to an instance experiencing this issue, the > network client's connection to the group coordinator (the one that uses > MAX_VALUE - node.id as the coordinator id) was in the DISCONNECTED state. But > for some reason it never tried to re-establish this connection, although it > did successfully connect to that same broker through the "normal" connection > (ie the one that juts uses node.id). > The tl;dr is that the AbstractCoordinator's FindCoordinatorRequest has failed > (presumably due to a disconnect), but the _findCoordinatorFuture_ is non-null > so a new request is never sent. This shouldn't be possible since the > FindCoordinatorResponseHandler is supposed to clear the > _findCoordinatorFuture_ when the future is completed. But somehow that didn't > happen, so the consumer continues to assume there's still a FindCoordinator > request in flight and never even notices that it's dropped out of the group. > These are the only confirmed findings so far, however we have some guesses > which I'll leave in the comments. Note that we only noticed this due to the > newly added _last-rebalance-seconds-ago_ __metric, and there's no reason to > believe this bug hasn't been flying under the radar since the Consumer's > inception -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Resolved] (KAFKA-10780) Rewrite ControllerZNode struct with auto-generated protocol
[ https://issues.apache.org/jira/browse/KAFKA-10780?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] dengziming resolved KAFKA-10780. Resolution: Won't Do KIP-500 will replace all this code. > Rewrite ControllerZNode struct with auto-generated protocol > > > Key: KAFKA-10780 > URL: https://issues.apache.org/jira/browse/KAFKA-10780 > Project: Kafka > Issue Type: Sub-task > Components: protocol >Reporter: dengziming >Assignee: dengziming >Priority: Major > > User auto-generated protocol to rewrite zk controller node -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Comment Edited] (KAFKA-10793) Race condition in FindCoordinatorFuture permanently severs connection to group coordinator
[ https://issues.apache.org/jira/browse/KAFKA-10793?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17241976#comment-17241976 ] A. Sophie Blee-Goldman edited comment on KAFKA-10793 at 12/2/20, 1:37 AM: -- At this point we can only guess, but all signs point to a race condition between the main consumer thread and the heartbeat thread. One possibility is that when the future failed it just didn't trigger the `onFailure` callback, but [~guozhang] & I have both looked through the source code and don't see any way for this to occur. Another possibility is that the `onFailure` callback was triggered, but it was invoked too soon. If the future was completed before we ever assigned it to the _findCoordinatorFuture_ field, then we would never actually clear the latest future (we would just set an already-null field to null again). Is this possible? Here's how the AbstractCoordinator builds the request and assigns the future: {code:java} protected synchronized RequestFuture lookupCoordinator() { ... findCoordinatorFuture = sendFindCoordinatorRequest(node); } {code} {code:java} private RequestFuture sendFindCoordinatorRequest(Node node) { ... return client.send(node, requestBuilder).compose(new FindCoordinatorResponseHandler()); }{code} Inside #compose we call #addListener, which contains this snippet: {code:java} if (failed()) fireFailure(); {code} If the request has already failed by the time we reach this, then we'll trigger the `onFailure` callback before #compose ever returns – ie before we've assigned the future to _findCoordinatorFuture_. The obvious question now is whether it's possible for the request to be failed in another thread while one thread is in the middle of the synchronized lookupCoordinator(). The request can be failed by the ConsumerNetworkClient when polled, during checkDisconnects(). The heartbeat thread actually synchronizes the entire run loop, so it doesn't seem possible for the hb thread to fail this request in the background of the main thread during a lookupCoordinator(). But the inverse is not true: it's possible for the main consumer thread to fail the request while the hb thread is inside of lookupCoordinator(). The AbstractCoordinator will poll the network client inside of joinGroupIfNeeded(), which in not itself synchronized and may be invoked without any locking through a Consumer#poll. was (Author: ableegoldman): At this point we can only guess, but all signs point to a race condition between the main consumer thread and the heartbeat thread. One possibility is that when the future failed it just didn't trigger the `onFailure` callback, but [~guozhang] & I have both looked through the source code and don't see any way for this to occur. Another possibility is that the `onFailure` callback was triggered, but it was invoked too soon. If the future was completed before we ever assigned it to the findCoordinatorFuture field, then we would never actually clear the latest future (we would just set an already-null field to null again). Is this possible? Here's how the AbstractCoordinator builds the request and assigns the future: {code:java} protected synchronized RequestFuture lookupCoordinator() { ... findCoordinatorFuture = sendFindCoordinatorRequest(node); } {code} {code:java} private RequestFuture sendFindCoordinatorRequest(Node node) { ... return client.send(node, requestBuilder).compose(new FindCoordinatorResponseHandler()); }{code} Inside #compose we call #addListener, which contains this snippet: {code:java} if (failed()) fireFailure(); {code} If the request has already failed by the time we reach this, then we'll trigger the `onFailure` callback before #compose ever returns – ie before we've assigned the future to _findCoordinatorFuture_. The obvious question now is whether it's possible for the request to be failed in another thread while one thread is in the middle of the synchronized lookupCoordinator(). The request can be failed by the ConsumerNetworkClient when polled, during checkDisconnects(). The heartbeat thread actually synchronizes the entire run loop, so it doesn't seem possible for the hb thread to fail this request in the background of the main thread during a lookupCoordinator(). But the inverse is not true: it's possible for the main consumer thread to fail the request while the hb thread is inside of lookupCoordinator(). The AbstractCoordinator will poll the network client inside of joinGroupIfNeeded(), which in not itself synchronized and may be invoked without any locking through a Consumer#poll. > Race condition in FindCoordinatorFuture permanently severs connection to > group coordinator > -- > > Key: KAFKA-10793 > URL:
[jira] [Comment Edited] (KAFKA-10793) Race condition in FindCoordinatorFuture permanently severs connection to group coordinator
[ https://issues.apache.org/jira/browse/KAFKA-10793?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17241976#comment-17241976 ] A. Sophie Blee-Goldman edited comment on KAFKA-10793 at 12/2/20, 1:37 AM: -- At this point we can only guess, but all signs point to a race condition between the main consumer thread and the heartbeat thread. One possibility is that when the future failed it just didn't trigger the `onFailure` callback, but [~guozhang] & I have both looked through the source code and don't see any way for this to occur. Another possibility is that the `onFailure` callback was triggered, but it was invoked too soon. If the future was completed before we ever assigned it to the findCoordinatorFuture field, then we would never actually clear the latest future (we would just set an already-null field to null again). Is this possible? Here's how the AbstractCoordinator builds the request and assigns the future: {code:java} protected synchronized RequestFuture lookupCoordinator() { ... findCoordinatorFuture = sendFindCoordinatorRequest(node); } {code} {code:java} private RequestFuture sendFindCoordinatorRequest(Node node) { ... return client.send(node, requestBuilder).compose(new FindCoordinatorResponseHandler()); }{code} Inside #compose we call #addListener, which contains this snippet: {code:java} if (failed()) fireFailure(); {code} If the request has already failed by the time we reach this, then we'll trigger the `onFailure` callback before #compose ever returns – ie before we've assigned the future to _findCoordinatorFuture_. The obvious question now is whether it's possible for the request to be failed in another thread while one thread is in the middle of the synchronized lookupCoordinator(). The request can be failed by the ConsumerNetworkClient when polled, during checkDisconnects(). The heartbeat thread actually synchronizes the entire run loop, so it doesn't seem possible for the hb thread to fail this request in the background of the main thread during a lookupCoordinator(). But the inverse is not true: it's possible for the main consumer thread to fail the request while the hb thread is inside of lookupCoordinator(). The AbstractCoordinator will poll the network client inside of joinGroupIfNeeded(), which in not itself synchronized and may be invoked without any locking through a Consumer#poll. was (Author: ableegoldman): At this point we can only guess, but all signs point to a race condition between the main consumer thread and the heartbeat thread. One possibility is that when the future failed it just didn't trigger the `onFailure` callback, but [~guozhang] & I have both looked through the source code and don't see any way for this to occur. Another possibility is that the `onFailure` callback was triggered, but it was invoked too soon_. If the future was completed before we ever assigned it to the findCoordinatorFuture field, then we would never actually clear the latest future (we would just set an already-null field to null again)._ Is this possible? Here's how the AbstractCoordinator builds the request and assigns the future: {code:java} protected synchronized RequestFuture lookupCoordinator() { ... findCoordinatorFuture = sendFindCoordinatorRequest(node); } {code} {code:java} private RequestFuture sendFindCoordinatorRequest(Node node) { ... return client.send(node, requestBuilder).compose(new FindCoordinatorResponseHandler()); }{code} Inside #compose we call #addListener, which contains this snippet: {code:java} if (failed()) fireFailure(); {code} If the request has already failed by the time we reach this, then we'll trigger the `onFailure` callback before #compose ever returns – ie before we've assigned the future to _findCoordinatorFuture_. The obvious question now is whether it's possible for the request to be failed in another thread while one thread is in the middle of the synchronized lookupCoordinator(). The request can be failed by the ConsumerNetworkClient when polled, during checkDisconnects(). The heartbeat thread actually synchronizes the entire run loop, so it doesn't seem possible for the hb thread to fail this request in the background of the main thread during a lookupCoordinator(). But the inverse is not true: it's possible for the main consumer thread to fail the request while the hb thread is inside of lookupCoordinator(). The AbstractCoordinator will poll the network client inside of joinGroupIfNeeded(), which in not itself synchronized and may be invoked without any locking through a Consumer#poll. > Race condition in FindCoordinatorFuture permanently severs connection to > group coordinator > -- > > Key: KAFKA-10793 > URL:
[jira] [Comment Edited] (KAFKA-10793) Race condition in FindCoordinatorFuture permanently severs connection to group coordinator
[ https://issues.apache.org/jira/browse/KAFKA-10793?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17241976#comment-17241976 ] A. Sophie Blee-Goldman edited comment on KAFKA-10793 at 12/2/20, 1:37 AM: -- At this point we can only guess, but all signs point to a race condition between the main consumer thread and the heartbeat thread. One possibility is that when the future failed it just didn't trigger the `onFailure` callback, but [~guozhang] & I have both looked through the source code and don't see any way for this to occur. Another possibility is that the `onFailure` callback was triggered, but it was invoked too soon_. If the future was completed before we ever assigned it to the findCoordinatorFuture field, then we would never actually clear the latest future (we would just set an already-null field to null again)._ Is this possible? Here's how the AbstractCoordinator builds the request and assigns the future: {code:java} protected synchronized RequestFuture lookupCoordinator() { ... findCoordinatorFuture = sendFindCoordinatorRequest(node); } {code} {code:java} private RequestFuture sendFindCoordinatorRequest(Node node) { ... return client.send(node, requestBuilder).compose(new FindCoordinatorResponseHandler()); }{code} Inside #compose we call #addListener, which contains this snippet: {code:java} if (failed()) fireFailure(); {code} If the request has already failed by the time we reach this, then we'll trigger the `onFailure` callback before #compose ever returns – ie before we've assigned the future to _findCoordinatorFuture_. The obvious question now is whether it's possible for the request to be failed in another thread while one thread is in the middle of the synchronized lookupCoordinator(). The request can be failed by the ConsumerNetworkClient when polled, during checkDisconnects(). The heartbeat thread actually synchronizes the entire run loop, so it doesn't seem possible for the hb thread to fail this request in the background of the main thread during a lookupCoordinator(). But the inverse is not true: it's possible for the main consumer thread to fail the request while the hb thread is inside of lookupCoordinator(). The AbstractCoordinator will poll the network client inside of joinGroupIfNeeded(), which in not itself synchronized and may be invoked without any locking through a Consumer#poll. was (Author: ableegoldman): At this point we can only guess, but all signs point to a race condition between the main consumer thread and the heartbeat thread. One possibility is that when the future failed it just didn't trigger the `onFailure` callback, but [~guozhang] & I have both looked through the source code and don't see any way for this to occur. Another possibility is that the `onFailure` callback was triggered, but it was invoked too soon_. If the future was completed before we ever assigned it to the findCoordinatorFuture field, then we would never actually clear the latest future (we would just set an already-null field to null again)._ Is this possible? Here's how the AbstractCoordinator builds the request and assigns the future: {code:java} protected synchronized RequestFuture lookupCoordinator() { ... findCoordinatorFuture = sendFindCoordinatorRequest(node); } {code} {code:java} private RequestFuture sendFindCoordinatorRequest(Node node) { ... return client.send(node, requestBuilder).compose(new FindCoordinatorResponseHandler()); }{code} Inside #compose we call #addListener, which contains this snippet: {code:java} if (failed()) fireFailure(); {code} If the request has already failed by the time we reach this, then we'll trigger the `onFailure` callback before #compose ever returns – ie before we've assigned the future to _findCoordinatorFuture_. The obvious question now is whether it's possible for the request to be failed in another thread while one thread is in the middle of the synchronized lookupCoordinator(). The request can be failed by the ConsumerNetworkClient when polled, during checkDisconnects(). The heartbeat thread actually synchronizes the entire run loop, so it doesn't seem possible for the hb thread to fail this request in the background of the main thread during a lookupCoordinator(). But the inverse is not true: it's possible for the main consumer thread to fail the request while the hb thread is inside of lookupCoordinator(). The AbstractCoordinator will poll the network client inside of joinGroupIfNeeded(), which in not itself synchronized and may be invoked without any locking through a Consumer#poll. > Race condition in FindCoordinatorFuture permanently severs connection to > group coordinator > -- > > Key: KAFKA-10793 > URL:
[jira] [Commented] (KAFKA-10793) Race condition in FindCoordinatorFuture permanently severs connection to group coordinator
[ https://issues.apache.org/jira/browse/KAFKA-10793?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17241976#comment-17241976 ] A. Sophie Blee-Goldman commented on KAFKA-10793: At this point we can only guess, but all signs point to a race condition between the main consumer thread and the heartbeat thread. One possibility is that when the future failed it just didn't trigger the `onFailure` callback, but [~guozhang] & I have both looked through the source code and don't see any way for this to occur. Another possibility is that the `onFailure` callback was triggered, but it was invoked too soon_. If the future was completed before we ever assigned it to the findCoordinatorFuture field, then we would never actually clear the latest future (we would just set an already-null field to null again)._ Is this possible? Here's how the AbstractCoordinator builds the request and assigns the future: {code:java} protected synchronized RequestFuture lookupCoordinator() { ... findCoordinatorFuture = sendFindCoordinatorRequest(node); } {code} {code:java} private RequestFuture sendFindCoordinatorRequest(Node node) { ... return client.send(node, requestBuilder).compose(new FindCoordinatorResponseHandler()); }{code} Inside #compose we call #addListener, which contains this snippet: {code:java} if (failed()) fireFailure(); {code} If the request has already failed by the time we reach this, then we'll trigger the `onFailure` callback before #compose ever returns – ie before we've assigned the future to _findCoordinatorFuture_. The obvious question now is whether it's possible for the request to be failed in another thread while one thread is in the middle of the synchronized lookupCoordinator(). The request can be failed by the ConsumerNetworkClient when polled, during checkDisconnects(). The heartbeat thread actually synchronizes the entire run loop, so it doesn't seem possible for the hb thread to fail this request in the background of the main thread during a lookupCoordinator(). But the inverse is not true: it's possible for the main consumer thread to fail the request while the hb thread is inside of lookupCoordinator(). The AbstractCoordinator will poll the network client inside of joinGroupIfNeeded(), which in not itself synchronized and may be invoked without any locking through a Consumer#poll. > Race condition in FindCoordinatorFuture permanently severs connection to > group coordinator > -- > > Key: KAFKA-10793 > URL: https://issues.apache.org/jira/browse/KAFKA-10793 > Project: Kafka > Issue Type: Bug > Components: consumer, streams >Affects Versions: 2.5.0 >Reporter: A. Sophie Blee-Goldman >Priority: Critical > > Pretty much as soon as we started actively monitoring the > _last-rebalance-seconds-ago_ metric in our Kafka Streams test environment, we > started seeing something weird. Every so often one of the StreamThreads (ie a > single Consumer instance) would appear to permanently fall out of the group, > as evidenced by a monotonically increasing _last-rebalance-seconds-ago._ We > inject artificial network failures every few hours at most, so the group > rebalances quite often. But the one consumer never rejoins, with no other > symptoms (besides a slight drop in throughput since the remaining threads had > to take over this member's work). We're confident that the problem exists in > the client layer, since the logs confirmed that the unhealthy consumer was > still calling poll. It was also calling Consumer#committed in its main poll > loop, which was consistently failing with a TimeoutException. > When I attached a remote debugger to an instance experiencing this issue, the > network client's connection to the group coordinator (the one that uses > MAX_VALUE - node.id as the coordinator id) was in the DISCONNECTED state. But > for some reason it never tried to re-establish this connection, although it > did successfully connect to that same broker through the "normal" connection > (ie the one that juts uses node.id). > The tl;dr is that the AbstractCoordinator's FindCoordinatorRequest has failed > (presumably due to a disconnect), but the _findCoordinatorFuture_ is non-null > so a new request is never sent. This shouldn't be possible since the > FindCoordinatorResponseHandler is supposed to clear the > _findCoordinatorFuture_ when the future is completed. But somehow that didn't > happen, so the consumer continues to assume there's still a FindCoordinator > request in flight and never even notices that it's dropped out of the group. > These are the only confirmed findings so far, however we have some guesses > which I'll leave in the comments. Note that we only noticed
[GitHub] [kafka] jsancio commented on a change in pull request #9512: KAFKA-10394: generate snapshot
jsancio commented on a change in pull request #9512: URL: https://github.com/apache/kafka/pull/9512#discussion_r533835857 ## File path: raft/src/main/java/org/apache/kafka/raft/ReplicatedLog.java ## @@ -149,6 +152,29 @@ default OptionalLong truncateToEndOffset(OffsetAndEpoch endOffset) { return OptionalLong.of(truncationOffset); } +/** + * Create a writable snapshot for the given snapshot id. + * + * See {@link RawSnapshotWriter} for details on how to use this object. + * + * @param snapshotId the end offset and epoch that identifies the snapshot + * @return a writable snapshot + */ +RawSnapshotWriter createSnapshot(OffsetAndEpoch snapshotId) throws IOException; Review comment: Yeah. I think we will need that when we implement deleting snapshot. Do you mind if I add this later? Also, I think we are going to need a `readLatestSnapshot()` when the state machine (controller or broker) needs to load the latest valid snapshot. I was planning to add this later when the case was clear to me. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] jsancio commented on a change in pull request #9512: KAFKA-10394: generate snapshot
jsancio commented on a change in pull request #9512: URL: https://github.com/apache/kafka/pull/9512#discussion_r533833428 ## File path: raft/src/main/java/org/apache/kafka/snapshot/Snapshots.java ## @@ -0,0 +1,67 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.kafka.snapshot; + +import java.io.IOException; +import java.nio.file.Files; +import java.nio.file.Path; +import java.text.NumberFormat; +import org.apache.kafka.raft.OffsetAndEpoch; + +final class Snapshots { +private static final String SNAPSHOT_DIR = "snapshots"; Review comment: I think this depends on if we need to scan the snapshot directory. Unfortunately, I don't have a concrete answer at the moment. When we implement the changes to the rest of the raft client. Log truncation, updating the start offset and LEO, we may need to scan the snapshot/checkpoint folder to determine the greatest log start offset and LEO. @lbradstreet suggested storing them in a different directory as part of the KIP-630 review process as Kafka already have a few files in the partition log directory. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] jsancio commented on a change in pull request #9512: KAFKA-10394: generate snapshot
jsancio commented on a change in pull request #9512: URL: https://github.com/apache/kafka/pull/9512#discussion_r533829583 ## File path: raft/src/main/java/org/apache/kafka/snapshot/Snapshots.java ## @@ -0,0 +1,67 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.kafka.snapshot; + +import java.io.IOException; +import java.nio.file.Files; +import java.nio.file.Path; +import java.text.NumberFormat; +import org.apache.kafka.raft.OffsetAndEpoch; + +final class Snapshots { +private static final String SNAPSHOT_DIR = "snapshots"; +private static final String SUFFIX = ".snapshot"; Review comment: Good catch. I mentioned using `.checkpoint` in KIP-630. I forgot to change it here. I'll change it to that but let me know if you have a preference. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Created] (KAFKA-10793) Race condition in FindCoordinatorFuture permanently severs connection to group coordinator
A. Sophie Blee-Goldman created KAFKA-10793: -- Summary: Race condition in FindCoordinatorFuture permanently severs connection to group coordinator Key: KAFKA-10793 URL: https://issues.apache.org/jira/browse/KAFKA-10793 Project: Kafka Issue Type: Bug Components: consumer, streams Affects Versions: 2.5.0 Reporter: A. Sophie Blee-Goldman Pretty much as soon as we started actively monitoring the _last-rebalance-seconds-ago_ metric in our Kafka Streams test environment, we started seeing something weird. Every so often one of the StreamThreads (ie a single Consumer instance) would appear to permanently fall out of the group, as evidenced by a monotonically increasing _last-rebalance-seconds-ago._ We inject artificial network failures every few hours at most, so the group rebalances quite often. But the one consumer never rejoins, with no other symptoms (besides a slight drop in throughput since the remaining threads had to take over this member's work). We're confident that the problem exists in the client layer, since the logs confirmed that the unhealthy consumer was still calling poll. It was also calling Consumer#committed in its main poll loop, which was consistently failing with a TimeoutException. When I attached a remote debugger to an instance experiencing this issue, the network client's connection to the group coordinator (the one that uses MAX_VALUE - node.id as the coordinator id) was in the DISCONNECTED state. But for some reason it never tried to re-establish this connection, although it did successfully connect to that same broker through the "normal" connection (ie the one that juts uses node.id). The tl;dr is that the AbstractCoordinator's FindCoordinatorRequest has failed (presumably due to a disconnect), but the _findCoordinatorFuture_ is non-null so a new request is never sent. This shouldn't be possible since the FindCoordinatorResponseHandler is supposed to clear the _findCoordinatorFuture_ when the future is completed. But somehow that didn't happen, so the consumer continues to assume there's still a FindCoordinator request in flight and never even notices that it's dropped out of the group. These are the only confirmed findings so far, however we have some guesses which I'll leave in the comments. Note that we only noticed this due to the newly added _last-rebalance-seconds-ago_ __metric, and there's no reason to believe this bug hasn't been flying under the radar since the Consumer's inception -- This message was sent by Atlassian Jira (v8.3.4#803005)
[GitHub] [kafka] jsancio commented on a change in pull request #9512: KAFKA-10394: generate snapshot
jsancio commented on a change in pull request #9512: URL: https://github.com/apache/kafka/pull/9512#discussion_r533828329 ## File path: raft/src/main/java/org/apache/kafka/raft/RaftClient.java ## @@ -100,4 +102,15 @@ default void handleResign() {} */ CompletableFuture shutdown(int timeoutMs); +/** + * Create a writable snapshot file for a given offset and epoch. + * + * The RaftClient assumes that the snapshot return will contain the records up to but + * not including the end offset in the snapshot id. See {@link SnapshotWriter} for + * details on how to use this object. + * + * @param snapshotId the end offset and epoch that identifies the snapshot Review comment: Is there a specific reason why you are asking this? We don't currently check for this. I will add a check for this and we can relax this later if we need to. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] hachikuji commented on pull request #9666: MINOR: Remove broken `.travis.yml` with system test hooks
hachikuji commented on pull request #9666: URL: https://github.com/apache/kafka/pull/9666#issuecomment-736910631 @chia7712 Nice that you got it working! I think there is value running a subset of system tests automatically. It might even encourage us to write more system tests if we can get the feedback more easily. Not having access to logs is annoying, but might not be a dealbreaker. It looks like we need a bit of initial work to tune the build though. I'm happy to close this and merge #9652. Then perhaps we can file a few jiras for any follow-up actions. What do you think? This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] abbccdda commented on a change in pull request #9564: KAFKA-10667: add timeout for forwarding requests
abbccdda commented on a change in pull request #9564: URL: https://github.com/apache/kafka/pull/9564#discussion_r533814137 ## File path: core/src/main/scala/kafka/server/AlterIsrManager.scala ## @@ -88,20 +88,33 @@ class AlterIsrManagerImpl(val controllerChannelManager: BrokerToControllerChanne private def sendRequest(inflightAlterIsrItems: Seq[AlterIsrItem]): Unit = { val message = buildRequest(inflightAlterIsrItems) -def responseHandler(response: ClientResponse): Unit = { - try { -val body = response.responseBody().asInstanceOf[AlterIsrResponse] -handleAlterIsrResponse(body, message.brokerEpoch(), inflightAlterIsrItems) - } finally { -// Be sure to clear the in-flight flag to allow future AlterIsr requests -if (!inflightRequest.compareAndSet(true, false)) { - throw new IllegalStateException("AlterIsr response callback called when no requests were in flight") + +def clearInflightRequests(): Unit = { + // Be sure to clear the in-flight flag to allow future AlterIsr requests + if (!inflightRequest.compareAndSet(true, false)) { +throw new IllegalStateException("AlterIsr response callback called when no requests were in flight") + } +} + +class AlterIsrResponseHandler extends BrokerToControllerRequestCompletionHandler { + override def onComplete(response: ClientResponse): Unit = { +try { + val body = response.responseBody().asInstanceOf[AlterIsrResponse] + handleAlterIsrResponse(body, message.brokerEpoch(), inflightAlterIsrItems) +} finally { + clearInflightRequests() } } + + override def onTimeout(): Unit = { +warn(s"Encountered request when sending AlterIsr to the controller") Review comment: That's what I decide to do eventually. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] chia7712 commented on pull request #9666: MINOR: Remove broken `.travis.yml` with system test hooks
chia7712 commented on pull request #9666: URL: https://github.com/apache/kafka/pull/9666#issuecomment-736902424 > guess you were trying to get this working instead? #9652 Removing travis test is ok to me. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] C0urante commented on pull request #9669: KAFKA-10792: Prevent source task shutdown from blocking herder thread
C0urante commented on pull request #9669: URL: https://github.com/apache/kafka/pull/9669#issuecomment-736896160 Thanks, Nigel. @chia7712 @rhauch @kkonstantine would you like to take a look? This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Resolved] (KAFKA-10729) KIP-482: Bump remaining RPC's to use tagged fields
[ https://issues.apache.org/jira/browse/KAFKA-10729?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Jason Gustafson resolved KAFKA-10729. - Fix Version/s: 2.8.0 Resolution: Fixed > KIP-482: Bump remaining RPC's to use tagged fields > -- > > Key: KAFKA-10729 > URL: https://issues.apache.org/jira/browse/KAFKA-10729 > Project: Kafka > Issue Type: Improvement >Reporter: Gardner Vickers >Assignee: Gardner Vickers >Priority: Major > Fix For: 2.8.0 > > > With > [KIP-482|https://cwiki.apache.org/confluence/display/KAFKA/KIP-482%3A+The+Kafka+Protocol+should+Support+Optional+Tagged+Fields], > the Kafka protocol gained support for tagged fields. > Not all RPC's were bumped to use flexible versioning and tagged fields. We > should bump the remaining RPC's and provide a new IBP to take advantage of > tagged fields via the flexible versioning mechanism. > > The RPC's which need to be bumped are: > > {code:java} > AddOffsetsToTxnRequest > AddOffsetsToTxnResponse > AddPartitionsToTxnRequest > AddPartitionsToTxnResponse > AlterClientQuotasRequest > AlterClientQuotasResponse > AlterConfigsRequest > AlterConfigsResponse > AlterReplicaLogDirsRequest > AlterReplicaLogDirsResponse > DescribeClientQuotasRequest > DescribeClientQuotasResponse > DescribeConfigsRequest > DescribeConfigsResponse > EndTxnRequest > EndTxnResponse > ListOffsetRequest > ListOffsetResponse > OffsetForLeaderEpochRequest > OffsetForLeaderEpochResponse > ProduceRequest > ProduceResponse > WriteTxnMarkersRequest > WriteTxnMarkersResponse > {code} -- This message was sent by Atlassian Jira (v8.3.4#803005)
[GitHub] [kafka] hachikuji merged pull request #9601: KAFKA-10729: Bump remaining RPC's to use tagged fields.
hachikuji merged pull request #9601: URL: https://github.com/apache/kafka/pull/9601 This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] ncliang commented on a change in pull request #9669: KAFKA-10792: Prevent source task shutdown from blocking herder thread
ncliang commented on a change in pull request #9669: URL: https://github.com/apache/kafka/pull/9669#discussion_r533788764 ## File path: connect/runtime/src/main/java/org/apache/kafka/connect/runtime/WorkerSourceTask.java ## @@ -206,16 +203,13 @@ public void cancel() { public void stop() { super.stop(); stopRequestedLatch.countDown(); -synchronized (this) { -if (finishedStart) -tryStop(); -else -startedShutdownBeforeStartCompleted = true; -} } -private synchronized void tryStop() { -if (!stopped) { +// Note: This method is not thread-safe +private void tryStop() { +// If the task is scheduled for shutdown before we invoke initialize or start on it (which +// can happy reliably if it's started in the PAUSED state), we don't have to invoke stop on it Review comment: ```suggestion // can happen reliably if it's started in the PAUSED state), we don't have to invoke stop on it ``` This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] hachikuji commented on a change in pull request #9300: KAFKA-10491: Check authorizations first in KafkaApis
hachikuji commented on a change in pull request #9300: URL: https://github.com/apache/kafka/pull/9300#discussion_r533763099 ## File path: core/src/main/scala/kafka/server/KafkaApis.scala ## @@ -1770,45 +1770,47 @@ class KafkaApis(val requestChannel: RequestChannel, sendResponseMaybeThrottle(controllerMutationQuota, request, createResponse, onComplete = None) } +// be sure to check authorization first, before checking if this is the controller, to avoid leaking +// information about the system (i.e. who is the controller) to principals unauthorized for that information + val createTopicsRequest = request.body[CreateTopicsRequest] val results = new CreatableTopicResultCollection(createTopicsRequest.data.topics.size) -if (!controller.isActive) { - createTopicsRequest.data.topics.forEach { topic => -results.add(new CreatableTopicResult().setName(topic.name) - .setErrorCode(Errors.NOT_CONTROLLER.code)) - } - sendResponseCallback(results) -} else { - createTopicsRequest.data.topics.forEach { topic => -results.add(new CreatableTopicResult().setName(topic.name)) +createTopicsRequest.data.topics.forEach { topic => + results.add(new CreatableTopicResult().setName(topic.name)) +} +val hasClusterAuthorization = authorize(request.context, CREATE, CLUSTER, CLUSTER_NAME, + logIfDenied = false) +val topics = createTopicsRequest.data.topics.asScala.map(_.name) +val authorizedTopics = + if (hasClusterAuthorization) topics.toSet + else filterByAuthorized(request.context, CREATE, TOPIC, topics)(identity) +val authorizedForDescribeConfigs = filterByAuthorized(request.context, DESCRIBE_CONFIGS, TOPIC, + topics, logIfDenied = false)(identity).map(name => name -> results.find(name)).toMap + +results.forEach { topic => + if (results.findAll(topic.name).size > 1) { +topic.setErrorCode(Errors.INVALID_REQUEST.code) +topic.setErrorMessage("Found multiple entries for this topic.") + } else if (!authorizedTopics.contains(topic.name)) { +topic.setErrorCode(Errors.TOPIC_AUTHORIZATION_FAILED.code) +topic.setErrorMessage("Authorization failed.") + } + if (!authorizedForDescribeConfigs.contains(topic.name)) { +topic.setTopicConfigErrorCode(Errors.TOPIC_AUTHORIZATION_FAILED.code) } - val hasClusterAuthorization = authorize(request.context, CREATE, CLUSTER, CLUSTER_NAME, -logIfDenied = false) - val topics = createTopicsRequest.data.topics.asScala.map(_.name) - val authorizedTopics = -if (hasClusterAuthorization) topics.toSet -else filterByAuthorized(request.context, CREATE, TOPIC, topics)(identity) - val authorizedForDescribeConfigs = filterByAuthorized(request.context, DESCRIBE_CONFIGS, TOPIC, -topics, logIfDenied = false)(identity).map(name => name -> results.find(name)).toMap - +} +if (!controller.isActive) { + // Don't provide the information that this node is not the controller unless they were authorized + // to perform at least one of their requests. So only set NOT_CONTROLLER error for anything that so far has a + // success/NONE error code. Keep the existing error codes that we've determined rather than overwriting them + // with NOT_CONTROLLER because that is potentially useful information for the client. results.forEach { topic => -if (results.findAll(topic.name).size > 1) { - topic.setErrorCode(Errors.INVALID_REQUEST.code) - topic.setErrorMessage("Found multiple entries for this topic.") -} else if (!authorizedTopics.contains(topic.name)) { - topic.setErrorCode(Errors.TOPIC_AUTHORIZATION_FAILED.code) - topic.setErrorMessage("Authorization failed.") -} -if (!authorizedForDescribeConfigs.contains(topic.name)) { - topic.setTopicConfigErrorCode(Errors.TOPIC_AUTHORIZATION_FAILED.code) -} - } - val toCreate = mutable.Map[String, CreatableTopic]() - createTopicsRequest.data.topics.forEach { topic => -if (results.find(topic.name).errorCode == Errors.NONE.code) { - toCreate += topic.name -> topic +if(topic.errorCode() == Errors.NONE.code()) { Review comment: nit: convention is to add space after `if`. There are a few of these in the patch ## File path: core/src/main/scala/kafka/server/KafkaApis.scala ## @@ -1770,45 +1770,47 @@ class KafkaApis(val requestChannel: RequestChannel, sendResponseMaybeThrottle(controllerMutationQuota, request, createResponse, onComplete = None) } +// be sure to check authorization first, before checking if this is the controller, to avoid leaking +// information about the system (i.e. who is the controller) to principals unauthorized for that information + val createTopicsRequest =
[GitHub] [kafka] d8tltanc commented on a change in pull request #9485: KAKFA-10619: Idempotent producer will get authorized once it has a WRITE access to at least one topic
d8tltanc commented on a change in pull request #9485: URL: https://github.com/apache/kafka/pull/9485#discussion_r533765909 ## File path: jmh-benchmarks/src/main/java/org/apache/kafka/jmh/acl/AclAuthorizerBenchmark.java ## @@ -69,33 +70,39 @@ @BenchmarkMode(Mode.AverageTime) @OutputTimeUnit(TimeUnit.MILLISECONDS) public class AclAuthorizerBenchmark { -@Param({"1", "5", "20"}) +@Param({"1", "4", "8"}) Review comment: The underlying algorithm of AuthorizeByResourceType() implementation in AclAuthorizer has several characteristics: 1. If any "allow resource" of the given ACE does not have a dominant "deny resource", the API will return immediately 2. The complexity is O(n*m) where `n` is the number of "allow resources" of the given ACE, 'm' is the number of "deny resources" of the given ACE, but not related to the number of "ACE" in the cluster. $1 means that, given an ACE, suppose `p%` of its "allow resource" does not have a dominant "deny resource", if `resourceCount` is `r`, on average, after checking `r * p * 0.01` "allow resources", the API will return. a) if we are let the "dominant deny resource" distribute evenly, like use the (loop index % something) to determine which "allow resource" should have a dominant "deny resource", we end up iterating the same amount of the "allow resource" and returning from the API call every time, which is `r*p*0.01` b) if we are determine which "allow resource" should have a dominant "deny resource", the result will be too noisy. We may iterate only 1 resource or iterate all resources based on the randomize algorithm and seed. $2 means that, the API time cost is not related to the number of "ACE" but is hyperbolically increasing when `resourceCount` is increasing. Under the assumption in (1), the actual complexity would be (r * r * p * 0.01) So what I was doing is to directly test the worst case, where p = 1, which means 100% of the "allow resource" will have a dominant "deny resource. The complexity hence would be (r^2). It's rare that a cluster can have 200k "allow resources" and 200k corresponding "dominant deny resources" for each user, and it's not fair to have a relatively smaller `aclCount` and huger `resourceCount`, as the API is optimizing the performance by indexing on `ACE`. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] d8tltanc commented on a change in pull request #9485: KAKFA-10619: Idempotent producer will get authorized once it has a WRITE access to at least one topic
d8tltanc commented on a change in pull request #9485: URL: https://github.com/apache/kafka/pull/9485#discussion_r533765909 ## File path: jmh-benchmarks/src/main/java/org/apache/kafka/jmh/acl/AclAuthorizerBenchmark.java ## @@ -69,33 +70,39 @@ @BenchmarkMode(Mode.AverageTime) @OutputTimeUnit(TimeUnit.MILLISECONDS) public class AclAuthorizerBenchmark { -@Param({"1", "5", "20"}) +@Param({"1", "4", "8"}) Review comment: The underlying algorithm of AuthorizeByResourceType() implementation in AclAuthorizer has several characteristics: 1. If any "allow resource" of the given ACE does not have a dominant "deny resource", the API will return immediately 2. The complexity is O(n*m) where `n` is the number of "allow resources" of the given ACE, 'm' is the number of "deny resources" of the given ACE, but not related to the number of "ACE". $1 means that, given an ACE, suppose `p%` of its "allow resource" does not have a dominant "deny resource", if `resourceCount` is `r`, on average, after checking `r * p * 0.01` "allow resources", the API will return. a) if we are let the "dominant deny resource" distribute evenly, like use the (loop index % something) to determine which "allow resource" should have a dominant "deny resource", we end up iterating the same amount of the "allow resource" and returning from the API call every time, which is `r*p*0.01` b) if we are determine which "allow resource" should have a dominant "deny resource", the result will be too noisy. We may iterate only 1 resource or iterate all resources based on the randomize algorithm and seed. $2 means that, the API time cost is not related to the number of "ACE" but is hyperbolically increasing when `resourceCount` is increasing. Under the assumption in (1), the actual complexity would be (r * r * p * 0.01) So what I was doing is to directly test the worst case, where p = 1, which means 100% of the "allow resource" will have a dominant "deny resource. The complexity hence would be (r^2). It's rare that a cluster can have 200k "allow resources" and 200k corresponding "dominant deny resources" for each user, and it's not fair to have a relatively smaller `aclCount` and huger `resourceCount`, as the API is optimizing the performance by indexing on `ACE`. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] d8tltanc commented on a change in pull request #9485: KAKFA-10619: Idempotent producer will get authorized once it has a WRITE access to at least one topic
d8tltanc commented on a change in pull request #9485: URL: https://github.com/apache/kafka/pull/9485#discussion_r533765909 ## File path: jmh-benchmarks/src/main/java/org/apache/kafka/jmh/acl/AclAuthorizerBenchmark.java ## @@ -69,33 +70,39 @@ @BenchmarkMode(Mode.AverageTime) @OutputTimeUnit(TimeUnit.MILLISECONDS) public class AclAuthorizerBenchmark { -@Param({"1", "5", "20"}) +@Param({"1", "4", "8"}) Review comment: The underlying algorithm of AuthorizeByResourceType() implementation in AclAuthorizer has several characteristics: 1. If any "allow resource" of the given ACE does not have a dominant "deny resource", the API will return immediately 2. The complexity is O(n*m) where `n` is the number of "allow resources" of the given ACE, 'm' is the number of "deny resources" of the given ACE, but not related to the number of "ACE". $1 means that, given an ACE, suppose `p%` of its "allow resource" does not have a dominant "deny resource", if `resourceCount` is `r`, on average, after checking `r * p * 0.01` "allow resources", the API will return. a) if we are let the "dominant deny resource" distribute evenly, like use the (loop index % something) to determine which "allow resource" should have a dominant "deny resource", we end up iterating the same amount of the "allow resource" and returning from the API call every time, which is `r*p*0.01` b) if we are determine which "allow resource" should have a dominant "deny resource", the result will be too noisy. We may iterate only 1 resource or iterate all resources based on the randomize algorithm and seed. $2 means that, the API time cost is not related to the number of "ACE" but is hyperbolically increasing when `resourceCount` is increasing. Under the assumption in (1), the actual complexity would be (r * r * p * 0.01) So what I was doing is to directly test the worst case, where p = 1, which means 100% of the "allow resource" will have a dominant "deny resource. The complexity hence would be (r^2). It's rare that a cluster can have 200k "allow resources" and 200k corresponding "dominant deny resources" for each user and it's not fair to have a relatively smaller `aclCount` and huger `resourceCount`, as the API is indexing on `ACE`. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] d8tltanc commented on a change in pull request #9485: KAKFA-10619: Idempotent producer will get authorized once it has a WRITE access to at least one topic
d8tltanc commented on a change in pull request #9485: URL: https://github.com/apache/kafka/pull/9485#discussion_r533765909 ## File path: jmh-benchmarks/src/main/java/org/apache/kafka/jmh/acl/AclAuthorizerBenchmark.java ## @@ -69,33 +70,39 @@ @BenchmarkMode(Mode.AverageTime) @OutputTimeUnit(TimeUnit.MILLISECONDS) public class AclAuthorizerBenchmark { -@Param({"1", "5", "20"}) +@Param({"1", "4", "8"}) Review comment: The underlying algorithm of AuthorizeByResourceType() implementation in AclAuthorizer has several characteristics: 1. If any "allow resource" of the given ACE does not have a dominant "deny resource", the API will return immediately 2. The complexity is O(n*m) where `n` is the number of "allow resources" of the given ACE, 'm' is the number of "deny resources" of the given ACE, but not related to the number of "ACE". $1 means that, given an ACE, suppose `p%` of its "allow resource" does not have a dominant "deny resource", if `resourceCount` is `r`, on average, after checking `r * p * 0.01` "allow resources", the API will return. a) if we are let the "dominant deny resource" distribute evenly, like use the (loop index % something) to determine which "allow resource" should have a dominant "deny resource", we end up iterating the same amount of the "allow resource" and returning from the API call every time, which is `r*p*0.01` b) if we are determine which "allow resource" should have a dominant "deny resource", the result will be too noisy. We may iterate only 1 resource or iterate all resources based on the randomize algorithm and seed. $2 means that, the API time cost is not related to the number of "ACE" but is hyperbolically increasing when `resourceCount` is increasing. Under the assumption in (1), the actual complexity would be (r * r * p * 0.01) So what I was doing is to directly test the worst case, where p = 1, which means 100% of the "allow resource" will have a dominant "deny resource. The complexity hence would be (r^2). It's rare that a cluster can have 200k "allow resources" rules for each user and it's not fair to have a relatively smaller `aclCount` and huger `resourceCount`, as the API is indexing on `ACE`. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] d8tltanc commented on a change in pull request #9485: KAKFA-10619: Idempotent producer will get authorized once it has a WRITE access to at least one topic
d8tltanc commented on a change in pull request #9485: URL: https://github.com/apache/kafka/pull/9485#discussion_r533765909 ## File path: jmh-benchmarks/src/main/java/org/apache/kafka/jmh/acl/AclAuthorizerBenchmark.java ## @@ -69,33 +70,39 @@ @BenchmarkMode(Mode.AverageTime) @OutputTimeUnit(TimeUnit.MILLISECONDS) public class AclAuthorizerBenchmark { -@Param({"1", "5", "20"}) +@Param({"1", "4", "8"}) Review comment: The underlying algorithm of AuthorizeByResourceType() implementation in AclAuthorizer has several characteristics: 1. If any "allow resource" of the given ACE does not have a dominant "deny resource", the API will return immediately 2. The complexity is O(n*m) where `n` is the number of "allow resources" of the given ACE, 'm' is the number of "deny resources" of the given ACE, but not related to the number of "ACE". $1 means that, given an ACE, suppose `p%` of its "allow resource" does not have a dominant "deny resource", if `resourceCount` is `r`, on average, after checking `r * p * 0.01` "allow resources", the API will return. a) if we are let the "dominant deny resource" distribute evenly, like use the loop index % something to determine which "allow resource" should have a dominant "deny resource", we end up iterating the same amount of the "allow resource" and returning from the API call every time b) if we are determine which "allow resource" should have a dominant "deny resource", the result will be too noisy. $2 means that, the API time cost is not related to the number of "ACE" but is hyperbolically increasing when `resourceCount` is increasing. Under the assumption in (1), the actual complexity would be (r * r * p * 0.01) So what I was doing is to directly test the worst case, where p = 1, which means 100% of the "allow resource" will have a dominant "deny resource. The complexity hence would be (r^2). It's rare that a cluster can have 200k "allow resources" rules for each user and it's not fair to have a relatively smaller `aclCount` and huger `resourceCount`, as the API is indexing on `ACE`. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] d8tltanc commented on a change in pull request #9485: KAKFA-10619: Idempotent producer will get authorized once it has a WRITE access to at least one topic
d8tltanc commented on a change in pull request #9485: URL: https://github.com/apache/kafka/pull/9485#discussion_r533765909 ## File path: jmh-benchmarks/src/main/java/org/apache/kafka/jmh/acl/AclAuthorizerBenchmark.java ## @@ -69,33 +70,39 @@ @BenchmarkMode(Mode.AverageTime) @OutputTimeUnit(TimeUnit.MILLISECONDS) public class AclAuthorizerBenchmark { -@Param({"1", "5", "20"}) +@Param({"1", "4", "8"}) Review comment: The underlying algorithm of AuthorizeByResourceType() implementation in AclAuthorizer has several characteristics: 1. If any "allow resource" of the given ACE does not have a dominant "deny resource", the API will return immediately 2. The complexity is O(n*m) where `n` is the number of "allow resources" of the given ACE, 'm' is the number of "deny resources" of the given ACE, but not related to the number of "ACE". $1 means that, given an ACE, suppose `p%` of its "allow resource" does not have a dominant "deny resource", if `resourceCount` is `r`, on average, after checking `r * p * 0.01` "allow resources", the API will return. a) if we are hard-coding, like use the loop index % something to determine which "allow resource" should have a dominant "deny resource", we end up iterating the same amount of the "allow resource" and returning from the API call every time b) if we are determine which "allow resource" should have a dominant "deny resource", the result will be too noisy. $2 means that, the API time cost is not related to the number of "ACE" but is hyperbolically increasing when `resourceCount` is increasing. Under the assumption in (1), the actual complexity would be (r * r * p * 0.01) So what I was doing is to directly test the worst case, where p = 1, which means 100% of the "allow resource" will have a dominant "deny resource. The complexity hence would be (r^2). It's rare that a cluster can have 200k "allow resources" rules for each user and it's not fair to have a relatively smaller `aclCount` and huger `resourceCount`, as the API is indexing on `ACE`. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] d8tltanc commented on a change in pull request #9485: KAKFA-10619: Idempotent producer will get authorized once it has a WRITE access to at least one topic
d8tltanc commented on a change in pull request #9485: URL: https://github.com/apache/kafka/pull/9485#discussion_r533765909 ## File path: jmh-benchmarks/src/main/java/org/apache/kafka/jmh/acl/AclAuthorizerBenchmark.java ## @@ -69,33 +70,39 @@ @BenchmarkMode(Mode.AverageTime) @OutputTimeUnit(TimeUnit.MILLISECONDS) public class AclAuthorizerBenchmark { -@Param({"1", "5", "20"}) +@Param({"1", "4", "8"}) Review comment: The underlying algorithm of AuthorizeByResourceType() implementation in AclAuthorizer has several characteristics: 1. If any "allow resource" of the given ACE does not have a dominant "deny resource", the API will return immediately 2. The complexity is O(n*m) where `n` is the number of "allow resources" of the given ACE, 'm' is the number of "deny resources" of the given ACE, but not related to the number of "ACE". 1) means that, given an ACE, suppose `p%` of its "allow resource" does not have a dominant "deny resource", if `resourceCount` is `r`, on average, after checking `r * p * 0.01` "allow resources", the API will return. a) if we are hard-coding, like use the loop index % something to determine which "allow resource" should have a dominant "deny resource", we end up iterating the same amount of the "allow resource" and returning from the API call every time b) if we are determine which "allow resource" should have a dominant "deny resource", the result will be too noisy. 2) means that, the API time cost is not related to the number of "ACE" but is hyperbolically increasing when `resourceCount` is increasing. Under the assumption in (1), the actual complexity would be (r * r * p * 0.01) So what I was doing is to directly test the worst case, where p = 1, which means 100% of the "allow resource" will have a dominant "deny resource. The complexity hence would be (r^2). It's rare that a cluster can have 200k "allow resources" rules for each user and it's not fair to have a relatively smaller `aclCount` and huger `resourceCount`, as the API is indexing on `ACE`. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] hachikuji commented on a change in pull request #9512: KAFKA-10394: generate snapshot
hachikuji commented on a change in pull request #9512: URL: https://github.com/apache/kafka/pull/9512#discussion_r533732789 ## File path: raft/src/main/java/org/apache/kafka/snapshot/Snapshots.java ## @@ -0,0 +1,67 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.kafka.snapshot; + +import java.io.IOException; +import java.nio.file.Files; +import java.nio.file.Path; +import java.text.NumberFormat; +import org.apache.kafka.raft.OffsetAndEpoch; + +final class Snapshots { +private static final String SNAPSHOT_DIR = "snapshots"; +private static final String SUFFIX = ".snapshot"; Review comment: This suffix is used for producer state snapshots already. Maybe we could use `.snap` or something like that. ## File path: raft/src/main/java/org/apache/kafka/raft/ReplicatedLog.java ## @@ -149,6 +152,29 @@ default OptionalLong truncateToEndOffset(OffsetAndEpoch endOffset) { return OptionalLong.of(truncationOffset); } +/** + * Create a writable snapshot for the given snapshot id. + * + * See {@link RawSnapshotWriter} for details on how to use this object. + * + * @param snapshotId the end offset and epoch that identifies the snapshot + * @return a writable snapshot + */ +RawSnapshotWriter createSnapshot(OffsetAndEpoch snapshotId) throws IOException; Review comment: Do we also need an api to list snapshots? ## File path: raft/src/main/java/org/apache/kafka/snapshot/FileRawSnapshotWriter.java ## @@ -0,0 +1,107 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.kafka.snapshot; + +import java.io.IOException; +import java.nio.ByteBuffer; +import java.nio.channels.FileChannel; +import java.nio.file.Files; +import java.nio.file.Path; +import java.nio.file.StandardCopyOption; +import java.nio.file.StandardOpenOption; +import org.apache.kafka.common.utils.Utils; +import org.apache.kafka.raft.OffsetAndEpoch; + +public final class FileRawSnapshotWriter implements RawSnapshotWriter { +private final Path path; +private final FileChannel channel; +private final OffsetAndEpoch snapshotId; +private boolean frozen = false; + +private FileRawSnapshotWriter( +Path path, +FileChannel channel, +OffsetAndEpoch snapshotId +) { +this.path = path; +this.channel = channel; +this.snapshotId = snapshotId; +} + +@Override +public OffsetAndEpoch snapshotId() { +return snapshotId; +} + +@Override +public long sizeInBytes() throws IOException { +return channel.size(); +} + +@Override +public void append(ByteBuffer buffer) throws IOException { +if (frozen) { +throw new IllegalStateException( +String.format("Append not supported. Snapshot is already frozen: id = %s; path = %s", snapshotId, path) +); +} + +Utils.writeFully(channel, buffer); +} + +@Override +public boolean isFrozen() { +return frozen; +} + +@Override +public void freeze() throws IOException { +channel.close(); +frozen = true; + +// Set readonly and ignore the result +if (!path.toFile().setReadOnly()) { +throw new IOException(String.format("Unable to set file %s as read-only", path)); +} + +Path destination =
[GitHub] [kafka] d8tltanc commented on a change in pull request #9485: KAKFA-10619: Idempotent producer will get authorized once it has a WRITE access to at least one topic
d8tltanc commented on a change in pull request #9485: URL: https://github.com/apache/kafka/pull/9485#discussion_r533737220 ## File path: core/src/test/scala/unit/kafka/security/authorizer/MockAuthorizer.scala ## @@ -0,0 +1,62 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package kafka.security.authorizer + +import java.{lang, util} +import java.util.concurrent.CompletionStage + +import org.apache.kafka.common.Endpoint +import org.apache.kafka.common.acl.{AclBinding, AclBindingFilter} +import org.apache.kafka.server.authorizer.{AclCreateResult, AclDeleteResult, Action, AuthorizableRequestContext, AuthorizationResult, Authorizer, AuthorizerServerInfo} + +object MockAuthorizer { +val authorizer = new AclAuthorizer +} + +/** + * A mock authorizer for testing the interface default + */ +class MockAuthorizer extends Authorizer { Review comment: Right. DelegatingAuthorizer is more reasonable as a design pattern naming here. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] rajinisivaram commented on a change in pull request #9485: KAKFA-10619: Idempotent producer will get authorized once it has a WRITE access to at least one topic
rajinisivaram commented on a change in pull request #9485: URL: https://github.com/apache/kafka/pull/9485#discussion_r533510477 ## File path: core/src/main/scala/kafka/security/authorizer/AclAuthorizer.scala ## @@ -304,6 +308,122 @@ class AclAuthorizer extends Authorizer with Logging { if (zkClient != null) zkClient.close() } + // TODO: 1. Discuss how to log audit message + // TODO: 2. Discuss if we need a trie to optimize(mainly for the O(n^2) loop but I think + // in most of the cases it would be O(1) because denyDominatePrefixAllow should be rare + override def authorizeByResourceType(requestContext: AuthorizableRequestContext, + op: AclOperation, + resourceType: ResourceType): AuthorizationResult = { +if (resourceType eq ResourceType.ANY) + throw new IllegalArgumentException("Must specify a non-filter resource type for authorizeByResourceType") + +if (resourceType eq ResourceType.UNKNOWN) + throw new IllegalArgumentException("Unknown resource type") + +if (op eq AclOperation.ANY) + throw new IllegalArgumentException("Must specify a non-filter operation type for authorizeByResourceType") + +if (op eq AclOperation.UNKNOWN) + throw new IllegalArgumentException("Unknown operation type") Review comment: We should probably move this common code to SecurityUtils and use it both here and in the default implementation. ## File path: core/src/main/scala/kafka/security/authorizer/AclAuthorizer.scala ## @@ -304,6 +308,122 @@ class AclAuthorizer extends Authorizer with Logging { if (zkClient != null) zkClient.close() } + // TODO: 1. Discuss how to log audit message + // TODO: 2. Discuss if we need a trie to optimize(mainly for the O(n^2) loop but I think + // in most of the cases it would be O(1) because denyDominatePrefixAllow should be rare + override def authorizeByResourceType(requestContext: AuthorizableRequestContext, + op: AclOperation, + resourceType: ResourceType): AuthorizationResult = { +if (resourceType eq ResourceType.ANY) + throw new IllegalArgumentException("Must specify a non-filter resource type for authorizeByResourceType") + +if (resourceType eq ResourceType.UNKNOWN) + throw new IllegalArgumentException("Unknown resource type") + +if (op eq AclOperation.ANY) + throw new IllegalArgumentException("Must specify a non-filter operation type for authorizeByResourceType") + +if (op eq AclOperation.UNKNOWN) + throw new IllegalArgumentException("Unknown operation type") + +val principal = new KafkaPrincipal( + requestContext.principal().getPrincipalType, + requestContext.principal().getName).toString + +val denyPatterns = matchingPatterns( + principal, + requestContext.clientAddress().getHostAddress, + op, + resourceType, + AclPermissionType.DENY +) + +if (denyAll(denyPatterns)) { + logAuditMessage(requestContext, new Action(op, null,0, true, true), false, false) + return AuthorizationResult.DENIED +} + +if (shouldAllowEveryoneIfNoAclIsFound) { + logAuditMessage(requestContext, new Action(op, null, 0, true, true), true, false) + return AuthorizationResult.ALLOWED +} + +val allowPatterns = matchingPatterns( + principal, + requestContext.clientAddress().getHostAddress, + op, + resourceType, + AclPermissionType.ALLOW +) + +if (allowAny(allowPatterns, denyPatterns)) { + logAuditMessage(requestContext, new Action(op,null, 0, true, true), true, false) + return AuthorizationResult.ALLOWED +} + +logAuditMessage(requestContext, new Action(op, null, 0, true, true), false, false) +AuthorizationResult.DENIED + } + + def matchingPatterns(principal: String, host: String, op: AclOperation, + resourceType: ResourceType, + permission: AclPermissionType): Set[ResourcePattern] = { +var resources = Set[ResourcePattern]() +for (p <- Set(principal, AclEntry.WildcardPrincipal.toString)) { + for (h <- Set(host, AclEntry.WildcardHost)) { +for (o <- Set(op, AclOperation.ALL)) { + val ace = new AccessControlEntry(p, h, o, permission) + resourceCache.get(ace) match { +case Some(r) => resources ++= r.filter(r => r.resourceType() == resourceType) +case None => + } +} + } +} +resources + } + + private def denyAll(denyResources: Set[ResourcePattern]): Boolean = +denyResources.exists(rp => denyAll(rp)) + + private def denyAll(rp: ResourcePattern): Boolean = +rp.patternType() == PatternType.LITERAL && rp.name() == ResourcePattern.WILDCARD_RESOURCE + + private def allowAny(allowPatterns: Set[ResourcePattern], denyPatterns:
[GitHub] [kafka] JimGalasyn opened a new pull request #9670: DOCS-6076: Clarify config names for EOS versions 1 and 2
JimGalasyn opened a new pull request #9670: URL: https://github.com/apache/kafka/pull/9670 This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] hachikuji commented on a change in pull request #9601: KAFKA-10729: Bump remaining RPC's to use tagged fields.
hachikuji commented on a change in pull request #9601: URL: https://github.com/apache/kafka/pull/9601#discussion_r533719708 ## File path: core/src/test/scala/unit/kafka/server/ReplicaFetcherThreadTest.scala ## @@ -437,8 +437,8 @@ class ReplicaFetcherThreadTest { thread.doWork() assertEquals(2, mockNetwork.epochFetchCount) assertEquals(1, mockNetwork.fetchCount) -assertEquals("OffsetsForLeaderEpochRequest version.", - 3, mockNetwork.lastUsedOffsetForLeaderEpochVersion) +assertTrue("OffsetsForLeaderEpochRequest version.", + mockNetwork.lastUsedOffsetForLeaderEpochVersion >= 3) Review comment: I think this was originally using `1` in order to ensure that we were using a version which included the epoch in the response. Since then it looks like it has been updated blindly every time we've bumped the protocol. I'm ok leaving this as is, but we could probably also get rid of it. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Commented] (KAFKA-10772) java.lang.IllegalStateException: There are insufficient bytes available to read assignment from the sync-group response (actual byte size 0)
[ https://issues.apache.org/jira/browse/KAFKA-10772?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17241865#comment-17241865 ] Levani Kokhreidze commented on KAFKA-10772: --- Maybe this helps – we noticed that this problem happens mostly when there's leadership change in the cluster. > java.lang.IllegalStateException: There are insufficient bytes available to > read assignment from the sync-group response (actual byte size 0) > > > Key: KAFKA-10772 > URL: https://issues.apache.org/jira/browse/KAFKA-10772 > Project: Kafka > Issue Type: Bug > Components: streams >Affects Versions: 2.6.0 >Reporter: Levani Kokhreidze >Priority: Major > Attachments: KAFKA-10772.log > > > From time to time we encounter the following exception that results in Kafka > Streams threads dying. > Broker version 2.4.1, Client version 2.6.0 > {code:java} > Nov 27 00:59:53.681 streaming-app service: prod | streaming-app-2 | > stream-client [cluster1-profile-stats-pipeline-client-id] State transition > from REBALANCING to ERROR Nov 27 00:59:53.681 streaming-app service: prod | > streaming-app-2 | stream-client [cluster1-profile-stats-pipeline-client-id] > State transition from REBALANCING to ERROR Nov 27 00:59:53.682 streaming-app > service: prod | streaming-app-2 | 2020-11-27 00:59:53.681 ERROR 105 --- > [-StreamThread-1] .KafkaStreamsBasedStreamProcessingEngine : Stream > processing pipeline: [profile-stats] encountered unrecoverable exception. > Thread: [cluster1-profile-stats-pipeline-client-id-StreamThread-1] is > completely dead. If all worker threads die, Kafka Streams will be moved to > permanent ERROR state. Nov 27 00:59:53.682 streaming-app service: prod | > streaming-app-2 | Stream processing pipeline: [profile-stats] encountered > unrecoverable exception. Thread: > [cluster1-profile-stats-pipeline-client-id-StreamThread-1] is completely > dead. If all worker threads die, Kafka Streams will be moved to permanent > ERROR state. java.lang.IllegalStateException: There are insufficient bytes > available to read assignment from the sync-group response (actual byte size > 0) , this is not expected; it is possible that the leader's assign function > is buggy and did not return any assignment for this member, or because static > member is configured and the protocol is buggy hence did not get the > assignment for this member at > org.apache.kafka.clients.consumer.internals.ConsumerCoordinator.onJoinComplete(ConsumerCoordinator.java:367) > at > org.apache.kafka.clients.consumer.internals.AbstractCoordinator.joinGroupIfNeeded(AbstractCoordinator.java:440) > at > org.apache.kafka.clients.consumer.internals.AbstractCoordinator.ensureActiveGroup(AbstractCoordinator.java:359) > at > org.apache.kafka.clients.consumer.internals.ConsumerCoordinator.poll(ConsumerCoordinator.java:513) > at > org.apache.kafka.clients.consumer.KafkaConsumer.updateAssignmentMetadataIfNeeded(KafkaConsumer.java:1268) > at > org.apache.kafka.clients.consumer.KafkaConsumer.poll(KafkaConsumer.java:1230) > at > org.apache.kafka.clients.consumer.KafkaConsumer.poll(KafkaConsumer.java:1210) > at > org.apache.kafka.streams.processor.internals.StreamThread.pollRequests(StreamThread.java:766) > at > org.apache.kafka.streams.processor.internals.StreamThread.runOnce(StreamThread.java:624) > at > org.apache.kafka.streams.processor.internals.StreamThread.runLoop(StreamThread.java:551) > at > org.apache.kafka.streams.processor.internals.StreamThread.run(StreamThread.java:510) > {code} -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Commented] (KAFKA-10772) java.lang.IllegalStateException: There are insufficient bytes available to read assignment from the sync-group response (actual byte size 0)
[ https://issues.apache.org/jira/browse/KAFKA-10772?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17241863#comment-17241863 ] Levani Kokhreidze commented on KAFKA-10772: --- Hi [~ableegoldman], yes we are using static membership. I've attached logs for the incident.[^KAFKA-10772.log] > java.lang.IllegalStateException: There are insufficient bytes available to > read assignment from the sync-group response (actual byte size 0) > > > Key: KAFKA-10772 > URL: https://issues.apache.org/jira/browse/KAFKA-10772 > Project: Kafka > Issue Type: Bug > Components: streams >Affects Versions: 2.6.0 >Reporter: Levani Kokhreidze >Priority: Major > Attachments: KAFKA-10772.log > > > From time to time we encounter the following exception that results in Kafka > Streams threads dying. > Broker version 2.4.1, Client version 2.6.0 > {code:java} > Nov 27 00:59:53.681 streaming-app service: prod | streaming-app-2 | > stream-client [cluster1-profile-stats-pipeline-client-id] State transition > from REBALANCING to ERROR Nov 27 00:59:53.681 streaming-app service: prod | > streaming-app-2 | stream-client [cluster1-profile-stats-pipeline-client-id] > State transition from REBALANCING to ERROR Nov 27 00:59:53.682 streaming-app > service: prod | streaming-app-2 | 2020-11-27 00:59:53.681 ERROR 105 --- > [-StreamThread-1] .KafkaStreamsBasedStreamProcessingEngine : Stream > processing pipeline: [profile-stats] encountered unrecoverable exception. > Thread: [cluster1-profile-stats-pipeline-client-id-StreamThread-1] is > completely dead. If all worker threads die, Kafka Streams will be moved to > permanent ERROR state. Nov 27 00:59:53.682 streaming-app service: prod | > streaming-app-2 | Stream processing pipeline: [profile-stats] encountered > unrecoverable exception. Thread: > [cluster1-profile-stats-pipeline-client-id-StreamThread-1] is completely > dead. If all worker threads die, Kafka Streams will be moved to permanent > ERROR state. java.lang.IllegalStateException: There are insufficient bytes > available to read assignment from the sync-group response (actual byte size > 0) , this is not expected; it is possible that the leader's assign function > is buggy and did not return any assignment for this member, or because static > member is configured and the protocol is buggy hence did not get the > assignment for this member at > org.apache.kafka.clients.consumer.internals.ConsumerCoordinator.onJoinComplete(ConsumerCoordinator.java:367) > at > org.apache.kafka.clients.consumer.internals.AbstractCoordinator.joinGroupIfNeeded(AbstractCoordinator.java:440) > at > org.apache.kafka.clients.consumer.internals.AbstractCoordinator.ensureActiveGroup(AbstractCoordinator.java:359) > at > org.apache.kafka.clients.consumer.internals.ConsumerCoordinator.poll(ConsumerCoordinator.java:513) > at > org.apache.kafka.clients.consumer.KafkaConsumer.updateAssignmentMetadataIfNeeded(KafkaConsumer.java:1268) > at > org.apache.kafka.clients.consumer.KafkaConsumer.poll(KafkaConsumer.java:1230) > at > org.apache.kafka.clients.consumer.KafkaConsumer.poll(KafkaConsumer.java:1210) > at > org.apache.kafka.streams.processor.internals.StreamThread.pollRequests(StreamThread.java:766) > at > org.apache.kafka.streams.processor.internals.StreamThread.runOnce(StreamThread.java:624) > at > org.apache.kafka.streams.processor.internals.StreamThread.runLoop(StreamThread.java:551) > at > org.apache.kafka.streams.processor.internals.StreamThread.run(StreamThread.java:510) > {code} -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Comment Edited] (KAFKA-10772) java.lang.IllegalStateException: There are insufficient bytes available to read assignment from the sync-group response (actual byte size 0)
[ https://issues.apache.org/jira/browse/KAFKA-10772?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17241863#comment-17241863 ] Levani Kokhreidze edited comment on KAFKA-10772 at 12/1/20, 8:50 PM: - Hi [~ableegoldman], yes we are using static membership. I've attached logs for the incident. [^KAFKA-10772.log] was (Author: lkokhreidze): Hi [~ableegoldman], yes we are using static membership. I've attached logs for the incident.[^KAFKA-10772.log] > java.lang.IllegalStateException: There are insufficient bytes available to > read assignment from the sync-group response (actual byte size 0) > > > Key: KAFKA-10772 > URL: https://issues.apache.org/jira/browse/KAFKA-10772 > Project: Kafka > Issue Type: Bug > Components: streams >Affects Versions: 2.6.0 >Reporter: Levani Kokhreidze >Priority: Major > Attachments: KAFKA-10772.log > > > From time to time we encounter the following exception that results in Kafka > Streams threads dying. > Broker version 2.4.1, Client version 2.6.0 > {code:java} > Nov 27 00:59:53.681 streaming-app service: prod | streaming-app-2 | > stream-client [cluster1-profile-stats-pipeline-client-id] State transition > from REBALANCING to ERROR Nov 27 00:59:53.681 streaming-app service: prod | > streaming-app-2 | stream-client [cluster1-profile-stats-pipeline-client-id] > State transition from REBALANCING to ERROR Nov 27 00:59:53.682 streaming-app > service: prod | streaming-app-2 | 2020-11-27 00:59:53.681 ERROR 105 --- > [-StreamThread-1] .KafkaStreamsBasedStreamProcessingEngine : Stream > processing pipeline: [profile-stats] encountered unrecoverable exception. > Thread: [cluster1-profile-stats-pipeline-client-id-StreamThread-1] is > completely dead. If all worker threads die, Kafka Streams will be moved to > permanent ERROR state. Nov 27 00:59:53.682 streaming-app service: prod | > streaming-app-2 | Stream processing pipeline: [profile-stats] encountered > unrecoverable exception. Thread: > [cluster1-profile-stats-pipeline-client-id-StreamThread-1] is completely > dead. If all worker threads die, Kafka Streams will be moved to permanent > ERROR state. java.lang.IllegalStateException: There are insufficient bytes > available to read assignment from the sync-group response (actual byte size > 0) , this is not expected; it is possible that the leader's assign function > is buggy and did not return any assignment for this member, or because static > member is configured and the protocol is buggy hence did not get the > assignment for this member at > org.apache.kafka.clients.consumer.internals.ConsumerCoordinator.onJoinComplete(ConsumerCoordinator.java:367) > at > org.apache.kafka.clients.consumer.internals.AbstractCoordinator.joinGroupIfNeeded(AbstractCoordinator.java:440) > at > org.apache.kafka.clients.consumer.internals.AbstractCoordinator.ensureActiveGroup(AbstractCoordinator.java:359) > at > org.apache.kafka.clients.consumer.internals.ConsumerCoordinator.poll(ConsumerCoordinator.java:513) > at > org.apache.kafka.clients.consumer.KafkaConsumer.updateAssignmentMetadataIfNeeded(KafkaConsumer.java:1268) > at > org.apache.kafka.clients.consumer.KafkaConsumer.poll(KafkaConsumer.java:1230) > at > org.apache.kafka.clients.consumer.KafkaConsumer.poll(KafkaConsumer.java:1210) > at > org.apache.kafka.streams.processor.internals.StreamThread.pollRequests(StreamThread.java:766) > at > org.apache.kafka.streams.processor.internals.StreamThread.runOnce(StreamThread.java:624) > at > org.apache.kafka.streams.processor.internals.StreamThread.runLoop(StreamThread.java:551) > at > org.apache.kafka.streams.processor.internals.StreamThread.run(StreamThread.java:510) > {code} -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Updated] (KAFKA-10772) java.lang.IllegalStateException: There are insufficient bytes available to read assignment from the sync-group response (actual byte size 0)
[ https://issues.apache.org/jira/browse/KAFKA-10772?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Levani Kokhreidze updated KAFKA-10772: -- Attachment: KAFKA-10772.log > java.lang.IllegalStateException: There are insufficient bytes available to > read assignment from the sync-group response (actual byte size 0) > > > Key: KAFKA-10772 > URL: https://issues.apache.org/jira/browse/KAFKA-10772 > Project: Kafka > Issue Type: Bug > Components: streams >Affects Versions: 2.6.0 >Reporter: Levani Kokhreidze >Priority: Major > Attachments: KAFKA-10772.log > > > From time to time we encounter the following exception that results in Kafka > Streams threads dying. > Broker version 2.4.1, Client version 2.6.0 > {code:java} > Nov 27 00:59:53.681 streaming-app service: prod | streaming-app-2 | > stream-client [cluster1-profile-stats-pipeline-client-id] State transition > from REBALANCING to ERROR Nov 27 00:59:53.681 streaming-app service: prod | > streaming-app-2 | stream-client [cluster1-profile-stats-pipeline-client-id] > State transition from REBALANCING to ERROR Nov 27 00:59:53.682 streaming-app > service: prod | streaming-app-2 | 2020-11-27 00:59:53.681 ERROR 105 --- > [-StreamThread-1] .KafkaStreamsBasedStreamProcessingEngine : Stream > processing pipeline: [profile-stats] encountered unrecoverable exception. > Thread: [cluster1-profile-stats-pipeline-client-id-StreamThread-1] is > completely dead. If all worker threads die, Kafka Streams will be moved to > permanent ERROR state. Nov 27 00:59:53.682 streaming-app service: prod | > streaming-app-2 | Stream processing pipeline: [profile-stats] encountered > unrecoverable exception. Thread: > [cluster1-profile-stats-pipeline-client-id-StreamThread-1] is completely > dead. If all worker threads die, Kafka Streams will be moved to permanent > ERROR state. java.lang.IllegalStateException: There are insufficient bytes > available to read assignment from the sync-group response (actual byte size > 0) , this is not expected; it is possible that the leader's assign function > is buggy and did not return any assignment for this member, or because static > member is configured and the protocol is buggy hence did not get the > assignment for this member at > org.apache.kafka.clients.consumer.internals.ConsumerCoordinator.onJoinComplete(ConsumerCoordinator.java:367) > at > org.apache.kafka.clients.consumer.internals.AbstractCoordinator.joinGroupIfNeeded(AbstractCoordinator.java:440) > at > org.apache.kafka.clients.consumer.internals.AbstractCoordinator.ensureActiveGroup(AbstractCoordinator.java:359) > at > org.apache.kafka.clients.consumer.internals.ConsumerCoordinator.poll(ConsumerCoordinator.java:513) > at > org.apache.kafka.clients.consumer.KafkaConsumer.updateAssignmentMetadataIfNeeded(KafkaConsumer.java:1268) > at > org.apache.kafka.clients.consumer.KafkaConsumer.poll(KafkaConsumer.java:1230) > at > org.apache.kafka.clients.consumer.KafkaConsumer.poll(KafkaConsumer.java:1210) > at > org.apache.kafka.streams.processor.internals.StreamThread.pollRequests(StreamThread.java:766) > at > org.apache.kafka.streams.processor.internals.StreamThread.runOnce(StreamThread.java:624) > at > org.apache.kafka.streams.processor.internals.StreamThread.runLoop(StreamThread.java:551) > at > org.apache.kafka.streams.processor.internals.StreamThread.run(StreamThread.java:510) > {code} -- This message was sent by Atlassian Jira (v8.3.4#803005)
[GitHub] [kafka] wcarlson5 commented on a change in pull request #9615: KAFKA-10500: Add thread option
wcarlson5 commented on a change in pull request #9615: URL: https://github.com/apache/kafka/pull/9615#discussion_r533707640 ## File path: streams/src/main/java/org/apache/kafka/streams/KafkaStreams.java ## @@ -894,19 +885,88 @@ private KafkaStreams(final InternalTopologyBuilder internalTopologyBuilder, queryableStoreProvider = new QueryableStoreProvider(storeProviders, globalStateStoreProvider); stateDirCleaner = setupStateDirCleaner(); -oldHandler = false; maybeWarnAboutCodeInRocksDBConfigSetter(log, config); rocksDBMetricsRecordingService = maybeCreateRocksDBMetricsRecordingService(clientId, config); } +private StreamThread createStreamThread(final long cacheSizePerThread, final int threadIdx) { +final StreamThread streamThread = StreamThread.create( +internalTopologyBuilder, +config, +clientSupplier, +adminClient, +processId, +clientId, +streamsMetrics, +time, +streamsMetadataState, +cacheSizePerThread, +stateDirectory, +delegatingStateRestoreListener, +threadIdx, +KafkaStreams.this::closeToError, +streamsUncaughtExceptionHandler +); +threads.add(streamThread); +threadState.put(streamThread.getId(), streamThread.state()); +storeProviders.add(new StreamThreadStateStoreProvider(streamThread)); +return streamThread; +} + +/** + * Adds and starts a stream thread in addition to the stream threads that are already running in this + * Kafka Streams client. + * + * Since the number of stream threads increases, the sizes of the caches in the new stream thread + * and the existing stream threads are adapted so that the sum of the cache sizes over all stream + * threads does not exceed the total cache size specified in configuration + * {@code cache.max.bytes.buffering}. + * + * Stream threads can only be added if this Kafka Streams client is in state RUNNING or REBALANCING. + * + * @return name of the added stream thread or empty if a new stream thread could not be added + */ +public Optional addStreamThread() { +if (isRunningOrRebalancing()) { +final int threadIdx = getNextThreadIndex(); +final long cacheSizePerThread = getCacheSizePerThread(threads.size() + 1); +resizeThreadCache(cacheSizePerThread); +final StreamThread streamThread = createStreamThread(cacheSizePerThread, threadIdx); +streamThread.setStateListener(streamStateListener); +synchronized (stateLock) { +if (isRunningOrRebalancing()) { +streamThread.start(); Review comment: I think when a state transition is changed or add is when these changes should be made. Removing from the thread list is low cost as is increasing the size of the cache, so it won't be expensive to make these changes for all cases. I think the two good options we have is that we can move the cache resize and create thread into the stateLock or we can undo the changes we made if we have to abort starting the new thread. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] hachikuji commented on a change in pull request #9382: KAFKA-10554; Perform follower truncation based on diverging epochs in Fetch response
hachikuji commented on a change in pull request #9382: URL: https://github.com/apache/kafka/pull/9382#discussion_r533648722 ## File path: core/src/main/scala/kafka/server/AbstractFetcherThread.scala ## @@ -426,21 +451,34 @@ abstract class AbstractFetcherThread(name: String, warn(s"Partition $topicPartition marked as failed") } - def addPartitions(initialFetchStates: Map[TopicPartition, OffsetAndEpoch]): Set[TopicPartition] = { + /** + * Returns initial partition fetch state based on current state and the provided `initialFetchState`. + * From IBP 2.7 onwards, we can rely on truncation based on diverging data returned in fetch responses. + * For older versions, we can skip the truncation step iff the leader epoch matches the existing epoch. + */ + private def partitionFetchState(tp: TopicPartition, initialFetchState: InitialFetchState, currentState: PartitionFetchState): PartitionFetchState = { +if (currentState != null && currentState.currentLeaderEpoch == initialFetchState.currentLeaderEpoch) { + currentState +} else if (isTruncationOnFetchSupported && initialFetchState.initOffset >= 0 && initialFetchState.lastFetchedEpoch.nonEmpty && + (currentState == null || currentState.state == Fetching)) { + PartitionFetchState(initialFetchState.initOffset, None, initialFetchState.currentLeaderEpoch, + state = Fetching, initialFetchState.lastFetchedEpoch) Review comment: This check is a still a little hard to follow. I think we expect that if `initOffset` is negative, then `lastFetchedEpoch` will be empty and we will hit the `fetchOffsetAndTruncate` case below. Is that right? On the other hand, if `lastFetchedEpoch` is empty, then `initOffset` could still be non-negative if we have an old message format, which means we need to enter `Truncating` so that we can truncate to the high watermark. One case that is not so clear is when `currentState` is non-null. Then we will enter the `Truncating` state below regardless whether `isTruncationOnFetchSupported` is set or not. Is that what we want? ## File path: core/src/main/scala/kafka/server/AbstractFetcherThread.scala ## @@ -669,11 +714,18 @@ abstract class AbstractFetcherThread(name: String, Option(partitionStates.stateValue(topicPartition)) } + /** + * Returns current fetch state for each partition assigned to this thread. This is used to reassign + * partitions when thread pool is resized. We return `lastFetchedEpoch=None` to ensure we go through Review comment: This is probably ok. I guess an alternative would be to not take the initial last fetched epoch from `InitialFetchState`, but instead use `latestEpoch`. ## File path: core/src/main/scala/kafka/server/ReplicaFetcherThread.scala ## @@ -102,6 +103,7 @@ class ReplicaFetcherThread(name: String, private val maxBytes = brokerConfig.replicaFetchResponseMaxBytes private val fetchSize = brokerConfig.replicaFetchMaxBytes private val brokerSupportsLeaderEpochRequest = brokerConfig.interBrokerProtocolVersion >= KAFKA_0_11_0_IV2 + private val brokerSupportsTruncationOnFetch = ApiVersion.isTruncationOnFetchSupported(brokerConfig.interBrokerProtocolVersion) Review comment: nit: I don't think we need this. We can override `isTruncationOnFetchSupported` with a `val` This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] cadonna commented on a change in pull request #9615: KAFKA-10500: Add thread option
cadonna commented on a change in pull request #9615: URL: https://github.com/apache/kafka/pull/9615#discussion_r533687031 ## File path: streams/src/main/java/org/apache/kafka/streams/KafkaStreams.java ## @@ -894,19 +885,88 @@ private KafkaStreams(final InternalTopologyBuilder internalTopologyBuilder, queryableStoreProvider = new QueryableStoreProvider(storeProviders, globalStateStoreProvider); stateDirCleaner = setupStateDirCleaner(); -oldHandler = false; maybeWarnAboutCodeInRocksDBConfigSetter(log, config); rocksDBMetricsRecordingService = maybeCreateRocksDBMetricsRecordingService(clientId, config); } +private StreamThread createStreamThread(final long cacheSizePerThread, final int threadIdx) { +final StreamThread streamThread = StreamThread.create( +internalTopologyBuilder, +config, +clientSupplier, +adminClient, +processId, +clientId, +streamsMetrics, +time, +streamsMetadataState, +cacheSizePerThread, +stateDirectory, +delegatingStateRestoreListener, +threadIdx, +KafkaStreams.this::closeToError, +streamsUncaughtExceptionHandler +); +threads.add(streamThread); +threadState.put(streamThread.getId(), streamThread.state()); +storeProviders.add(new StreamThreadStateStoreProvider(streamThread)); +return streamThread; +} + +/** + * Adds and starts a stream thread in addition to the stream threads that are already running in this + * Kafka Streams client. + * + * Since the number of stream threads increases, the sizes of the caches in the new stream thread + * and the existing stream threads are adapted so that the sum of the cache sizes over all stream + * threads does not exceed the total cache size specified in configuration + * {@code cache.max.bytes.buffering}. + * + * Stream threads can only be added if this Kafka Streams client is in state RUNNING or REBALANCING. + * + * @return name of the added stream thread or empty if a new stream thread could not be added + */ +public Optional addStreamThread() { +if (isRunningOrRebalancing()) { +final int threadIdx = getNextThreadIndex(); +final long cacheSizePerThread = getCacheSizePerThread(threads.size() + 1); +resizeThreadCache(cacheSizePerThread); +final StreamThread streamThread = createStreamThread(cacheSizePerThread, threadIdx); +streamThread.setStateListener(streamStateListener); +synchronized (stateLock) { +if (isRunningOrRebalancing()) { +streamThread.start(); Review comment: Yes, currently this assumption is correct, but if the state transitions change in future, we would be safe if we do the cleanup. On a second thought, we are probably not 100% safe because if a transition from `NOT_RUNNING` to `RUNNING` is added (or any other transition that goes from the above mentioned states to `RUNNING` or `REBALANCING`), we would still not do the clean up. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] C0urante commented on pull request #9669: KAFKA-10792: Prevent source task shutdown from blocking herder thread
C0urante commented on pull request #9669: URL: https://github.com/apache/kafka/pull/9669#issuecomment-736784767 @tombentley @gharris1727 @ncliang care to take a look? This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] C0urante opened a new pull request #9669: KAFKA-10792: Prevent source task shutdown from blocking herder thread
C0urante opened a new pull request #9669: URL: https://github.com/apache/kafka/pull/9669 [Jira](https://issues.apache.org/jira/browse/KAFKA-10792) The functional changes are simple: change the `WorkerSourceTask` class to only call `SourceTask::stop` from one location, during task shutdown, and only if an attempt has been made to start the task (which will not be the case if it was created in the paused state and then shut down before being started). Unit tests are tweaked where necessary to account for this new logic, which covers some edge cases mentioned in https://github.com/apache/kafka/pull/5020 that were unaddressed up until now. The existing integration tests for blocking connectors are expanded to also include cases for blocking source and sink tasks. Full coverage of every source/sink task method is intentionally omitted from these expanded tests in order to avoid inflating test runtime (each one adds an extra 5 seconds at minimum) and because the tests that are added here were sufficient to reproduce the bug with source task shutdown. ### Committer Checklist (excluded from commit message) - [ ] Verify design and implementation - [ ] Verify test coverage and CI build status - [ ] Verify documentation (including upgrade notes) This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Created] (KAFKA-10792) Source tasks can block herder thread by hanging during stop
Chris Egerton created KAFKA-10792: - Summary: Source tasks can block herder thread by hanging during stop Key: KAFKA-10792 URL: https://issues.apache.org/jira/browse/KAFKA-10792 Project: Kafka Issue Type: Bug Components: KafkaConnect Affects Versions: 2.5.1, 2.6.0, 2.4.1, 2.5.0, 2.4.0, 2.4.2, 2.7.0 Reporter: Chris Egerton Assignee: Chris Egerton If a source task blocks during its {{stop}} method, the herder thread will also block, which can cause issues with detecting rebalances, reconfiguring connectors, and other vital functions of a Connect worker. This occurs because the call to {{SourceTask::stop}} occurs on the herder's thread, instead of on the source task's own dedicated thread. This can be fixed by moving the call to {{SourceTask::stop}} onto the source task's dedicated thread and aligning with the current approach for {{Connector}}s and {{SinkTask}}s. -- This message was sent by Atlassian Jira (v8.3.4#803005)
[GitHub] [kafka] wcarlson5 commented on a change in pull request #9615: KAFKA-10500: Add thread option
wcarlson5 commented on a change in pull request #9615: URL: https://github.com/apache/kafka/pull/9615#discussion_r533676976 ## File path: streams/src/main/java/org/apache/kafka/streams/KafkaStreams.java ## @@ -894,19 +885,88 @@ private KafkaStreams(final InternalTopologyBuilder internalTopologyBuilder, queryableStoreProvider = new QueryableStoreProvider(storeProviders, globalStateStoreProvider); stateDirCleaner = setupStateDirCleaner(); -oldHandler = false; maybeWarnAboutCodeInRocksDBConfigSetter(log, config); rocksDBMetricsRecordingService = maybeCreateRocksDBMetricsRecordingService(clientId, config); } +private StreamThread createStreamThread(final long cacheSizePerThread, final int threadIdx) { +final StreamThread streamThread = StreamThread.create( +internalTopologyBuilder, +config, +clientSupplier, +adminClient, +processId, +clientId, +streamsMetrics, +time, +streamsMetadataState, +cacheSizePerThread, +stateDirectory, +delegatingStateRestoreListener, +threadIdx, +KafkaStreams.this::closeToError, +streamsUncaughtExceptionHandler +); +threads.add(streamThread); +threadState.put(streamThread.getId(), streamThread.state()); +storeProviders.add(new StreamThreadStateStoreProvider(streamThread)); +return streamThread; +} + +/** + * Adds and starts a stream thread in addition to the stream threads that are already running in this + * Kafka Streams client. + * + * Since the number of stream threads increases, the sizes of the caches in the new stream thread + * and the existing stream threads are adapted so that the sum of the cache sizes over all stream + * threads does not exceed the total cache size specified in configuration + * {@code cache.max.bytes.buffering}. + * + * Stream threads can only be added if this Kafka Streams client is in state RUNNING or REBALANCING. + * + * @return name of the added stream thread or empty if a new stream thread could not be added + */ +public Optional addStreamThread() { +if (isRunningOrRebalancing()) { +final int threadIdx = getNextThreadIndex(); +final long cacheSizePerThread = getCacheSizePerThread(threads.size() + 1); +resizeThreadCache(cacheSizePerThread); +final StreamThread streamThread = createStreamThread(cacheSizePerThread, threadIdx); +streamThread.setStateListener(streamStateListener); +synchronized (stateLock) { +if (isRunningOrRebalancing()) { +streamThread.start(); Review comment: From running or rebalancing aren't those the only states we can get to? This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] cadonna commented on a change in pull request #9615: KAFKA-10500: Add thread option
cadonna commented on a change in pull request #9615: URL: https://github.com/apache/kafka/pull/9615#discussion_r533675429 ## File path: streams/src/main/java/org/apache/kafka/streams/KafkaStreams.java ## @@ -894,19 +885,88 @@ private KafkaStreams(final InternalTopologyBuilder internalTopologyBuilder, queryableStoreProvider = new QueryableStoreProvider(storeProviders, globalStateStoreProvider); stateDirCleaner = setupStateDirCleaner(); -oldHandler = false; maybeWarnAboutCodeInRocksDBConfigSetter(log, config); rocksDBMetricsRecordingService = maybeCreateRocksDBMetricsRecordingService(clientId, config); } +private StreamThread createStreamThread(final long cacheSizePerThread, final int threadIdx) { +final StreamThread streamThread = StreamThread.create( +internalTopologyBuilder, +config, +clientSupplier, +adminClient, +processId, +clientId, +streamsMetrics, +time, +streamsMetadataState, +cacheSizePerThread, +stateDirectory, +delegatingStateRestoreListener, +threadIdx, +KafkaStreams.this::closeToError, +streamsUncaughtExceptionHandler +); +threads.add(streamThread); +threadState.put(streamThread.getId(), streamThread.state()); +storeProviders.add(new StreamThreadStateStoreProvider(streamThread)); +return streamThread; +} + +/** + * Adds and starts a stream thread in addition to the stream threads that are already running in this + * Kafka Streams client. + * + * Since the number of stream threads increases, the sizes of the caches in the new stream thread + * and the existing stream threads are adapted so that the sum of the cache sizes over all stream + * threads does not exceed the total cache size specified in configuration + * {@code cache.max.bytes.buffering}. + * + * Stream threads can only be added if this Kafka Streams client is in state RUNNING or REBALANCING. + * + * @return name of the added stream thread or empty if a new stream thread could not be added + */ +public Optional addStreamThread() { +if (isRunningOrRebalancing()) { +final int threadIdx = getNextThreadIndex(); +final long cacheSizePerThread = getCacheSizePerThread(threads.size() + 1); +resizeThreadCache(cacheSizePerThread); +final StreamThread streamThread = createStreamThread(cacheSizePerThread, threadIdx); +streamThread.setStateListener(streamStateListener); +synchronized (stateLock) { +if (isRunningOrRebalancing()) { +streamThread.start(); Review comment: What about checking for the state and do the clean-up only if the state is not `PENDING_SHUTDOWN` and not `ERROR` and not `NOT_RUNNING`? In this way we are safe for future changes that break our assumption on state transitions and we make sure not to do unnecessary stuff when we are shutting down. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] mjsax opened a new pull request #9668: MINOR: add test for repartition/source-topic/changelog optimization
mjsax opened a new pull request #9668: URL: https://github.com/apache/kafka/pull/9668 If topology optimization is enabled, KafkaStreams does not create store changelog topics but re-uses source input topics if possible. However, this optimization should not be applied to internal repartition topics, because those are actively purged. Call for review @ableegoldman This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] wcarlson5 commented on a change in pull request #9615: KAFKA-10500: Add thread option
wcarlson5 commented on a change in pull request #9615: URL: https://github.com/apache/kafka/pull/9615#discussion_r533656732 ## File path: streams/src/main/java/org/apache/kafka/streams/KafkaStreams.java ## @@ -894,19 +885,88 @@ private KafkaStreams(final InternalTopologyBuilder internalTopologyBuilder, queryableStoreProvider = new QueryableStoreProvider(storeProviders, globalStateStoreProvider); stateDirCleaner = setupStateDirCleaner(); -oldHandler = false; maybeWarnAboutCodeInRocksDBConfigSetter(log, config); rocksDBMetricsRecordingService = maybeCreateRocksDBMetricsRecordingService(clientId, config); } +private StreamThread createStreamThread(final long cacheSizePerThread, final int threadIdx) { +final StreamThread streamThread = StreamThread.create( +internalTopologyBuilder, +config, +clientSupplier, +adminClient, +processId, +clientId, +streamsMetrics, +time, +streamsMetadataState, +cacheSizePerThread, +stateDirectory, +delegatingStateRestoreListener, +threadIdx, +KafkaStreams.this::closeToError, +streamsUncaughtExceptionHandler +); +threads.add(streamThread); +threadState.put(streamThread.getId(), streamThread.state()); +storeProviders.add(new StreamThreadStateStoreProvider(streamThread)); +return streamThread; +} + +/** + * Adds and starts a stream thread in addition to the stream threads that are already running in this + * Kafka Streams client. + * + * Since the number of stream threads increases, the sizes of the caches in the new stream thread + * and the existing stream threads are adapted so that the sum of the cache sizes over all stream + * threads does not exceed the total cache size specified in configuration + * {@code cache.max.bytes.buffering}. + * + * Stream threads can only be added if this Kafka Streams client is in state RUNNING or REBALANCING. + * + * @return name of the added stream thread or empty if a new stream thread could not be added + */ +public Optional addStreamThread() { +if (isRunningOrRebalancing()) { +final int threadIdx = getNextThreadIndex(); +final long cacheSizePerThread = getCacheSizePerThread(threads.size() + 1); +resizeThreadCache(cacheSizePerThread); +final StreamThread streamThread = createStreamThread(cacheSizePerThread, threadIdx); +streamThread.setStateListener(streamStateListener); +synchronized (stateLock) { +if (isRunningOrRebalancing()) { +streamThread.start(); Review comment: There will be two more cases of remove. In the replace thread option and in the remove thread option. I'm not really convinced it is necessary but I don't see a problem with re-resizing the cache if we do not start the thread. I don't think there will be any side affects as the client should be shutting down, but if we resize there should be a little extra info in the state and store providers but it would not get used ## File path: streams/src/main/java/org/apache/kafka/streams/state/internals/ThreadCache.java ## @@ -76,6 +76,9 @@ public void resize(final long newCacheSizeBytes) { final boolean shrink = newCacheSizeBytes < maxCacheSizeBytes; maxCacheSizeBytes = newCacheSizeBytes; if (shrink) { +if (caches.values().isEmpty()) { Review comment: yeah, I didn't realize this was a problem, but when I added more test coverage it showed up ## File path: streams/src/main/java/org/apache/kafka/streams/KafkaStreams.java ## @@ -894,19 +885,88 @@ private KafkaStreams(final InternalTopologyBuilder internalTopologyBuilder, queryableStoreProvider = new QueryableStoreProvider(storeProviders, globalStateStoreProvider); stateDirCleaner = setupStateDirCleaner(); -oldHandler = false; maybeWarnAboutCodeInRocksDBConfigSetter(log, config); rocksDBMetricsRecordingService = maybeCreateRocksDBMetricsRecordingService(clientId, config); } +private StreamThread createStreamThread(final long cacheSizePerThread, final int threadIdx) { +final StreamThread streamThread = StreamThread.create( +internalTopologyBuilder, +config, +clientSupplier, +adminClient, +processId, +clientId, +streamsMetrics, +time, +streamsMetadataState, +cacheSizePerThread, +stateDirectory, +delegatingStateRestoreListener, +
[GitHub] [kafka] ableegoldman commented on pull request #9609: KAFKA-6687: restrict DSL to allow only Streams from the same source topics
ableegoldman commented on pull request #9609: URL: https://github.com/apache/kafka/pull/9609#issuecomment-736766888 Merged to trunk This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] ableegoldman merged pull request #9609: KAFKA-6687: restrict DSL to allow only Streams from the same source topics
ableegoldman merged pull request #9609: URL: https://github.com/apache/kafka/pull/9609 This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Commented] (KAFKA-10772) java.lang.IllegalStateException: There are insufficient bytes available to read assignment from the sync-group response (actual byte size 0)
[ https://issues.apache.org/jira/browse/KAFKA-10772?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17241814#comment-17241814 ] A. Sophie Blee-Goldman commented on KAFKA-10772: Hey [~lkokhreidze], thanks for the report. Definitely looks like a bug – based on the error message, presumably either in the assignment function or with static membership. Are you using static membership? Do you have any client logs from around the time of the exception? Broker logs would be useful as well, but probably only client logs can tell us if "the leader's assign function is buggy and did not return any assignment for this member" or not > java.lang.IllegalStateException: There are insufficient bytes available to > read assignment from the sync-group response (actual byte size 0) > > > Key: KAFKA-10772 > URL: https://issues.apache.org/jira/browse/KAFKA-10772 > Project: Kafka > Issue Type: Bug > Components: streams >Affects Versions: 2.6.0 >Reporter: Levani Kokhreidze >Priority: Major > > From time to time we encounter the following exception that results in Kafka > Streams threads dying. > Broker version 2.4.1, Client version 2.6.0 > {code:java} > Nov 27 00:59:53.681 streaming-app service: prod | streaming-app-2 | > stream-client [cluster1-profile-stats-pipeline-client-id] State transition > from REBALANCING to ERROR Nov 27 00:59:53.681 streaming-app service: prod | > streaming-app-2 | stream-client [cluster1-profile-stats-pipeline-client-id] > State transition from REBALANCING to ERROR Nov 27 00:59:53.682 streaming-app > service: prod | streaming-app-2 | 2020-11-27 00:59:53.681 ERROR 105 --- > [-StreamThread-1] .KafkaStreamsBasedStreamProcessingEngine : Stream > processing pipeline: [profile-stats] encountered unrecoverable exception. > Thread: [cluster1-profile-stats-pipeline-client-id-StreamThread-1] is > completely dead. If all worker threads die, Kafka Streams will be moved to > permanent ERROR state. Nov 27 00:59:53.682 streaming-app service: prod | > streaming-app-2 | Stream processing pipeline: [profile-stats] encountered > unrecoverable exception. Thread: > [cluster1-profile-stats-pipeline-client-id-StreamThread-1] is completely > dead. If all worker threads die, Kafka Streams will be moved to permanent > ERROR state. java.lang.IllegalStateException: There are insufficient bytes > available to read assignment from the sync-group response (actual byte size > 0) , this is not expected; it is possible that the leader's assign function > is buggy and did not return any assignment for this member, or because static > member is configured and the protocol is buggy hence did not get the > assignment for this member at > org.apache.kafka.clients.consumer.internals.ConsumerCoordinator.onJoinComplete(ConsumerCoordinator.java:367) > at > org.apache.kafka.clients.consumer.internals.AbstractCoordinator.joinGroupIfNeeded(AbstractCoordinator.java:440) > at > org.apache.kafka.clients.consumer.internals.AbstractCoordinator.ensureActiveGroup(AbstractCoordinator.java:359) > at > org.apache.kafka.clients.consumer.internals.ConsumerCoordinator.poll(ConsumerCoordinator.java:513) > at > org.apache.kafka.clients.consumer.KafkaConsumer.updateAssignmentMetadataIfNeeded(KafkaConsumer.java:1268) > at > org.apache.kafka.clients.consumer.KafkaConsumer.poll(KafkaConsumer.java:1230) > at > org.apache.kafka.clients.consumer.KafkaConsumer.poll(KafkaConsumer.java:1210) > at > org.apache.kafka.streams.processor.internals.StreamThread.pollRequests(StreamThread.java:766) > at > org.apache.kafka.streams.processor.internals.StreamThread.runOnce(StreamThread.java:624) > at > org.apache.kafka.streams.processor.internals.StreamThread.runLoop(StreamThread.java:551) > at > org.apache.kafka.streams.processor.internals.StreamThread.run(StreamThread.java:510) > {code} -- This message was sent by Atlassian Jira (v8.3.4#803005)
[GitHub] [kafka] hachikuji merged pull request #9663: MINOR: Small cleanups in `AlterIsr` handling logic
hachikuji merged pull request #9663: URL: https://github.com/apache/kafka/pull/9663 This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] dajac commented on pull request #9666: MINOR: Remove broken `.travis.yml` with system test hooks
dajac commented on pull request #9666: URL: https://github.com/apache/kafka/pull/9666#issuecomment-736754455 I am +1 on removing it. We can always bring it back to trunk if we can make it work later. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] ableegoldman commented on pull request #9654: MINOR: Increase unit test coverage of ProcessorTopology#updateSourceTopics()
ableegoldman commented on pull request #9654: URL: https://github.com/apache/kafka/pull/9654#issuecomment-736752649 Merged to trunk This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] ableegoldman merged pull request #9654: MINOR: Increase unit test coverage of ProcessorTopology#updateSourceTopics()
ableegoldman merged pull request #9654: URL: https://github.com/apache/kafka/pull/9654 This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] lmr3796 commented on a change in pull request #9435: KAFKA-10606: Disable auto topic creation for fetch-all-topic-metadata request
lmr3796 commented on a change in pull request #9435: URL: https://github.com/apache/kafka/pull/9435#discussion_r533648988 ## File path: core/src/main/scala/kafka/server/KafkaApis.scala ## @@ -1250,13 +1251,23 @@ class KafkaApis(val requestChannel: RequestChannel, metadataResponseTopic(Errors.INVALID_REPLICATION_FACTOR, topic, true, util.Collections.emptyList()) else topicMetadata -} else if (allowAutoTopicCreation && config.autoCreateTopicsEnable) { +} else if (!isFetchAllMetadata && allowAutoTopicCreation && config.autoCreateTopicsEnable) { + // KAFKA-10606: If this request is to get metadata for all topics, auto topic creation should not be allowed + // The special handling is necessary on broker side because allowAutoTopicCreation is hard coded to true + // for backward compatibility on client side. createTopic(topic, config.numPartitions, config.defaultReplicationFactor) } else { metadataResponseTopic(Errors.UNKNOWN_TOPIC_OR_PARTITION, topic, false, util.Collections.emptyList()) Review comment: @ijuma That's a good point. I've updated it. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org