[GitHub] [kafka] showuon commented on pull request #9674: KAFKA-10665: close all kafkaStreams before purgeLocalStreamsState

2020-12-01 Thread GitBox


showuon commented on pull request #9674:
URL: https://github.com/apache/kafka/pull/9674#issuecomment-737056995


   @bbejeck @vvcephei @lkokhreidze , please help review this PR. Thanks.



This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[jira] [Commented] (KAFKA-10636) Bypass log validation for writes to raft log

2020-12-01 Thread feyman (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-10636?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17242122#comment-17242122
 ] 

feyman commented on KAFKA-10636:


Hi, [~hachikuji] . I would like to take this task, please let me know if this 
one is still under discussion or have some pending dependencies. I had roughly 
went through the design and implementation of kafka-raft recently, will add my 
understanding of this task later to make sure we are on the same page, thanks!

> Bypass log validation for writes to raft log
> 
>
> Key: KAFKA-10636
> URL: https://issues.apache.org/jira/browse/KAFKA-10636
> Project: Kafka
>  Issue Type: Sub-task
>Reporter: Jason Gustafson
>Assignee: feyman
>Priority: Major
>
> The raft leader is responsible for creating the records written to the log 
> (including assigning offsets and the epoch), so we can consider bypassing the 
> validation done in `LogValidator`. This lets us skip potentially expensive 
> decompression and the unnecessary recomputation of the CRC.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[GitHub] [kafka] showuon commented on a change in pull request #9674: KAFKA-10665: close all kafkaStreams before purgeLocalStreamsState

2020-12-01 Thread GitBox


showuon commented on a change in pull request #9674:
URL: https://github.com/apache/kafka/pull/9674#discussion_r533956768



##
File path: 
streams/src/test/java/org/apache/kafka/streams/integration/StreamTableJoinTopologyOptimizationIntegrationTest.java
##
@@ -221,7 +223,7 @@ private boolean topicExists(final String topic) {
 );
 }
 
-private  void validateReceivedMessages(final String topic,
+private  void validateMessagesReceived(final String topic,

Review comment:
   Rename the method name





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] showuon commented on a change in pull request #9674: KAFKA-10665: close all kafkaStreams before purgeLocalStreamsState

2020-12-01 Thread GitBox


showuon commented on a change in pull request #9674:
URL: https://github.com/apache/kafka/pull/9674#discussion_r533956676



##
File path: 
streams/src/test/java/org/apache/kafka/streams/integration/StreamTableJoinTopologyOptimizationIntegrationTest.java
##
@@ -149,9 +153,7 @@ public void 
shouldDoStreamTableJoinWithDifferentNumberOfPartitions() throws Exce
 sendEvents(inputTopic, timestamp, expectedRecords);
 sendEvents(outputTopic, timestamp, expectedRecords);
 
-startStreams(streamsBuilder);

Review comment:
   I think we don't need to start 2 streams for this test





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] showuon opened a new pull request #9674: KAFKA-10665: close all kafkaStreams before purgeLocalStreamsState

2020-12-01 Thread GitBox


showuon opened a new pull request #9674:
URL: https://github.com/apache/kafka/pull/9674


   The flaky tests are because we forgot to close the kafkaStreams before 
purgeLocalStreamsState, so that sometimes there will be some tmp files be 
created during streams running/removing (ex: `checkpoint.tmp`), and caused the 
`DirectoryNotEmptyException` or `NoSuchFileException` be thrown. 
   
   ### Committer Checklist (excluded from commit message)
   - [ ] Verify design and implementation 
   - [ ] Verify test coverage and CI build status
   - [ ] Verify documentation (including upgrade notes)
   



This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[jira] [Assigned] (KAFKA-10636) Bypass log validation for writes to raft log

2020-12-01 Thread feyman (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-10636?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

feyman reassigned KAFKA-10636:
--

Assignee: feyman

> Bypass log validation for writes to raft log
> 
>
> Key: KAFKA-10636
> URL: https://issues.apache.org/jira/browse/KAFKA-10636
> Project: Kafka
>  Issue Type: Sub-task
>Reporter: Jason Gustafson
>Assignee: feyman
>Priority: Major
>
> The raft leader is responsible for creating the records written to the log 
> (including assigning offsets and the epoch), so we can consider bypassing the 
> validation done in `LogValidator`. This lets us skip potentially expensive 
> decompression and the unnecessary recomputation of the CRC.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Updated] (KAFKA-10794) Replica leader election is too slow in the case of too many partitions

2020-12-01 Thread limeng (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-10794?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

limeng updated KAFKA-10794:
---
Description: 
There is more than 6000 topics and 300 brokers in my kafka cluster, and we 
frequently run kafka-preferred-replica-election.sh to rebalance our cluster. 
But the reblance process spendes too more time and cpu resource like the 
picture blow.

We find that the function:'controllerContext.allPartitions' is invoked too many 
times.

!截屏2020-12-02 上午11.43.48.png!

!image-2020-12-02-15-23-43-384.png!

 

  was:
There is more than 6000 topics and 300 brokers in my kafka cluster, and we 
frequently run kafka-preferred-replica-election.sh to rebalance our cluster. 
But the reblance process spendes too more time and cpu resource like the 
picture blow.

We find that the function:'controllerContext.allPartitions' is invoked too many 
times.

!截屏2020-12-02 上午11.43.48.png!


> Replica leader election is too slow in the case of too many partitions
> --
>
> Key: KAFKA-10794
> URL: https://issues.apache.org/jira/browse/KAFKA-10794
> Project: Kafka
>  Issue Type: Improvement
>  Components: core
>Affects Versions: 2.6.0, 2.5.1
>Reporter: limeng
>Priority: Major
> Attachments: clipboard_image_1606806581629.png, 
> image-2020-12-02-15-23-43-384.png, 截屏2020-12-01 下午2.36.19.png, 截屏2020-12-02 
> 上午11.43.48.png
>
>
> There is more than 6000 topics and 300 brokers in my kafka cluster, and we 
> frequently run kafka-preferred-replica-election.sh to rebalance our cluster. 
> But the reblance process spendes too more time and cpu resource like the 
> picture blow.
> We find that the function:'controllerContext.allPartitions' is invoked too 
> many times.
> !截屏2020-12-02 上午11.43.48.png!
> !image-2020-12-02-15-23-43-384.png!
>  



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Updated] (KAFKA-10794) Replica leader election is too slow in the case of too many partitions

2020-12-01 Thread limeng (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-10794?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

limeng updated KAFKA-10794:
---
Attachment: image-2020-12-02-15-23-43-384.png

> Replica leader election is too slow in the case of too many partitions
> --
>
> Key: KAFKA-10794
> URL: https://issues.apache.org/jira/browse/KAFKA-10794
> Project: Kafka
>  Issue Type: Improvement
>  Components: core
>Affects Versions: 2.6.0, 2.5.1
>Reporter: limeng
>Priority: Major
> Attachments: clipboard_image_1606806581629.png, 
> image-2020-12-02-15-23-43-384.png, 截屏2020-12-01 下午2.36.19.png, 截屏2020-12-02 
> 上午11.43.48.png
>
>
> There is more than 6000 topics and 300 brokers in my kafka cluster, and we 
> frequently run kafka-preferred-replica-election.sh to rebalance our cluster. 
> But the reblance process spendes too more time and cpu resource like the 
> picture blow.
> We find that the function:'controllerContext.allPartitions' is invoked too 
> many times.
> !截屏2020-12-02 上午11.43.48.png!



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Updated] (KAFKA-10794) Replica leader election is too slow in the case of too many partitions

2020-12-01 Thread limeng (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-10794?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

limeng updated KAFKA-10794:
---
Attachment: 截屏2020-12-01 下午2.36.19.png

> Replica leader election is too slow in the case of too many partitions
> --
>
> Key: KAFKA-10794
> URL: https://issues.apache.org/jira/browse/KAFKA-10794
> Project: Kafka
>  Issue Type: Improvement
>  Components: core
>Affects Versions: 2.6.0, 2.5.1
>Reporter: limeng
>Priority: Major
> Attachments: clipboard_image_1606806581629.png, 截屏2020-12-01 
> 下午2.36.19.png, 截屏2020-12-02 上午11.43.48.png
>
>
> There is more than 6000 topics and 300 brokers in my kafka cluster, and we 
> frequently run kafka-preferred-replica-election.sh to rebalance our cluster. 
> But the reblance process spendes too more time and cpu resource like the 
> picture blow.
> We find that the function:'controllerContext.allPartitions' is invoked too 
> many times.
> !截屏2020-12-02 上午11.43.48.png!



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Updated] (KAFKA-10794) Replica leader election is too slow in the case of too many partitions

2020-12-01 Thread limeng (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-10794?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

limeng updated KAFKA-10794:
---
Description: 
There is more than 6000 topics and 300 brokers in my kafka cluster, and we 
frequently run kafka-preferred-replica-election.sh to rebalance our cluster. 
But the reblance process spendes too more time and cpu resource like the 
picture blow.

We find that the function:'controllerContext.allPartitions' is invoked too many 
times.

!截屏2020-12-02 上午11.43.48.png!

  was:
There is more than 6000 topics and 300 brokers in my kafka cluster, and we set 
the config:'auto.leader.rebalance.enable=true'. 

The reblance process spendes too more time and cpu resource.

We find that the function:'controllerContext.allPartitions' is invoked too many 
times.

!截屏2020-12-02 上午11.43.48.png!


> Replica leader election is too slow in the case of too many partitions
> --
>
> Key: KAFKA-10794
> URL: https://issues.apache.org/jira/browse/KAFKA-10794
> Project: Kafka
>  Issue Type: Improvement
>  Components: core
>Affects Versions: 2.6.0, 2.5.1
>Reporter: limeng
>Priority: Major
> Attachments: clipboard_image_1606806581629.png, 截屏2020-12-02 
> 上午11.43.48.png
>
>
> There is more than 6000 topics and 300 brokers in my kafka cluster, and we 
> frequently run kafka-preferred-replica-election.sh to rebalance our cluster. 
> But the reblance process spendes too more time and cpu resource like the 
> picture blow.
> We find that the function:'controllerContext.allPartitions' is invoked too 
> many times.
> !截屏2020-12-02 上午11.43.48.png!



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[GitHub] [kafka] chia7712 commented on pull request #8826: KAFKA-10090 Misleading warnings: The configuration was supplied but i…

2020-12-01 Thread GitBox


chia7712 commented on pull request #8826:
URL: https://github.com/apache/kafka/pull/8826#issuecomment-737028662


   > There are 6 failures in the system test run. Are they related to this PR?
   
   They are unrelated to this PR and I have opened a PR to fix them (#9673)



This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] chia7712 opened a new pull request #9673: KAFKA-10289 fix failed connect_distributed_test.py (ConnectDistribute…

2020-12-01 Thread GitBox


chia7712 opened a new pull request #9673:
URL: https://github.com/apache/kafka/pull/9673


   issue: https://issues.apache.org/jira/browse/KAFKA-10289
   
   In Python 3, ```filter``` functions return iterators rather than ```list``` 
so it can traverse only once. Hence, the following loop only see "empty" and 
then validation fails.
   
   ```python
   src_messages = self.source.committed_messages() # return iterator
   sink_messages = self.sink.flushed_messages()) # return iterator
   for task in range(num_tasks):
   # only first task can "see" the result. following tasks see 
empty result
   src_seqnos = [msg['seqno'] for msg in src_messages if 
msg['task'] == task]
   ```
   
   ### Committer Checklist (excluded from commit message)
   - [ ] Verify design and implementation 
   - [ ] Verify test coverage and CI build status
   - [ ] Verify documentation (including upgrade notes)
   



This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[jira] [Updated] (KAFKA-10289) fix failed connect_distributed_test.py (ConnectDistributedTest.test_bounce)

2020-12-01 Thread Chia-Ping Tsai (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-10289?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Chia-Ping Tsai updated KAFKA-10289:
---
Summary: fix failed connect_distributed_test.py 
(ConnectDistributedTest.test_bounce)  (was: fix failed 
connect_distributed_test.py.test_bounce)

> fix failed connect_distributed_test.py (ConnectDistributedTest.test_bounce)
> ---
>
> Key: KAFKA-10289
> URL: https://issues.apache.org/jira/browse/KAFKA-10289
> Project: Kafka
>  Issue Type: Bug
>  Components: KafkaConnect, system tests
>Reporter: Chia-Ping Tsai
>Assignee: Chia-Ping Tsai
>Priority: Major
>
> {quote}
> Module: kafkatest.tests.connect.connect_distributed_test
> Class:  ConnectDistributedTest
> Method: test_broker_compatibility
> Arguments:
> {
>   "auto_create_topics": false,
>   "broker_version": "0.10.1.1",
>   "connect_protocol": "compatible",
>   "security_protocol": "PLAINTEXT"
> }
> {quote}
> {quote}
> Module: kafkatest.tests.connect.connect_distributed_test
> Class:  ConnectDistributedTest
> Method: test_broker_compatibility
> Arguments:
> {
>   "auto_create_topics": false,
>   "broker_version": "2.1.1",
>   "connect_protocol": "compatible",
>   "security_protocol": "PLAINTEXT"
> }
> {quote}



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Updated] (KAFKA-10289) fix failed connect_distributed_test.py.test_bounce

2020-12-01 Thread Chia-Ping Tsai (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-10289?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Chia-Ping Tsai updated KAFKA-10289:
---
Summary: fix failed connect_distributed_test.py.test_bounce  (was: fix 
flaky connect/connect_distributed_test.py)

> fix failed connect_distributed_test.py.test_bounce
> --
>
> Key: KAFKA-10289
> URL: https://issues.apache.org/jira/browse/KAFKA-10289
> Project: Kafka
>  Issue Type: Bug
>  Components: KafkaConnect, system tests
>Reporter: Chia-Ping Tsai
>Assignee: Chia-Ping Tsai
>Priority: Major
>
> {quote}
> Module: kafkatest.tests.connect.connect_distributed_test
> Class:  ConnectDistributedTest
> Method: test_broker_compatibility
> Arguments:
> {
>   "auto_create_topics": false,
>   "broker_version": "0.10.1.1",
>   "connect_protocol": "compatible",
>   "security_protocol": "PLAINTEXT"
> }
> {quote}
> {quote}
> Module: kafkatest.tests.connect.connect_distributed_test
> Class:  ConnectDistributedTest
> Method: test_broker_compatibility
> Arguments:
> {
>   "auto_create_topics": false,
>   "broker_version": "2.1.1",
>   "connect_protocol": "compatible",
>   "security_protocol": "PLAINTEXT"
> }
> {quote}



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Assigned] (KAFKA-10289) fix flaky connect/connect_distributed_test.py

2020-12-01 Thread Chia-Ping Tsai (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-10289?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Chia-Ping Tsai reassigned KAFKA-10289:
--

Assignee: Chia-Ping Tsai

> fix flaky connect/connect_distributed_test.py
> -
>
> Key: KAFKA-10289
> URL: https://issues.apache.org/jira/browse/KAFKA-10289
> Project: Kafka
>  Issue Type: Bug
>  Components: KafkaConnect, system tests
>Reporter: Chia-Ping Tsai
>Assignee: Chia-Ping Tsai
>Priority: Major
>
> {quote}
> Module: kafkatest.tests.connect.connect_distributed_test
> Class:  ConnectDistributedTest
> Method: test_broker_compatibility
> Arguments:
> {
>   "auto_create_topics": false,
>   "broker_version": "0.10.1.1",
>   "connect_protocol": "compatible",
>   "security_protocol": "PLAINTEXT"
> }
> {quote}
> {quote}
> Module: kafkatest.tests.connect.connect_distributed_test
> Class:  ConnectDistributedTest
> Method: test_broker_compatibility
> Arguments:
> {
>   "auto_create_topics": false,
>   "broker_version": "2.1.1",
>   "connect_protocol": "compatible",
>   "security_protocol": "PLAINTEXT"
> }
> {quote}



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[GitHub] [kafka] d8tltanc commented on a change in pull request #9485: KAKFA-10619: Idempotent producer will get authorized once it has a WRITE access to at least one topic

2020-12-01 Thread GitBox


d8tltanc commented on a change in pull request #9485:
URL: https://github.com/apache/kafka/pull/9485#discussion_r533922526



##
File path: 
jmh-benchmarks/src/main/java/org/apache/kafka/jmh/acl/AclAuthorizerBenchmark.java
##
@@ -115,45 +122,62 @@ private void setFieldValue(Object obj, String fieldName, 
Object value) throws Ex
 
 Set entries = aclEntries.computeIfAbsent(resource, k -> 
new HashSet<>());
 
-for (int aclId = 0; aclId < aclCount; aclId++) {
-AccessControlEntry ace = new 
AccessControlEntry(principal.toString() + aclId,
-"*", AclOperation.READ, AclPermissionType.ALLOW);
-entries.add(new AclEntry(ace));
+for (int aclId = 0; aclId < aclCount / 2; aclId++) {
+String acePrinciple = principal.toString() + (aclId == 0 ? "" 
: aclId);
+AccessControlEntry allowAce = new AccessControlEntry(
+acePrinciple,

Review comment:
   commit 6ab95d3668b3de27a7f6f58fc171a1e2e8925f69

##
File path: core/src/main/scala/kafka/security/authorizer/AuthorizerWrapper.scala
##
@@ -175,4 +179,73 @@ class AuthorizerWrapper(private[kafka] val baseAuthorizer: 
kafka.security.auth.A
   override def close(): Unit = {
 baseAuthorizer.close()
   }
+
+  override def authorizeByResourceType(requestContext: 
AuthorizableRequestContext,
+   op: AclOperation,
+   resourceType: ResourceType): 
AuthorizationResult = {
+if (resourceType == ResourceType.ANY)
+  throw new IllegalArgumentException("Must specify a non-filter resource 
type for authorizeByResourceType")
+
+if (resourceType == ResourceType.UNKNOWN)
+  throw new IllegalArgumentException("Unknown resource type")
+
+if (op == AclOperation.ANY)
+  throw new IllegalArgumentException("Must specify a non-filter operation 
type for authorizeByResourceType")
+
+if (op == AclOperation.UNKNOWN)
+  throw new IllegalArgumentException("Unknown operation type")
+
+if (shouldAllowEveryoneIfNoAclIsFound && !denyAllResource(requestContext, 
op, resourceType)) {
+  AuthorizationResult.ALLOWED
+} else {
+  super.authorizeByResourceType(requestContext, op, resourceType)
+}
+  }
+
+  private def denyAllResource(requestContext: AuthorizableRequestContext,
+  op: AclOperation,
+  resourceType: ResourceType): Boolean = {
+val resourceTypeFilter = new ResourcePatternFilter(
+  resourceType, null, PatternType.ANY)
+val accessControlEntry = new AccessControlEntryFilter(
+  null, null, null, AclPermissionType.DENY)
+val aclFilter = new AclBindingFilter(resourceTypeFilter, 
accessControlEntry)
+
+for (binding <- acls(aclFilter).asScala) {
+  if (aceMatched(requestContext, op, binding) && 
canDenyAll(binding.pattern()))
+return true
+}
+false
+  }
+
+  @inline
+  private def aceMatched(requestContext: AuthorizableRequestContext,
+ op: AclOperation,
+ binding: AclBinding): Boolean = {
+(hostMatched(requestContext, binding) && principleMatched(requestContext, 
binding)
+  && operationMatched(op, binding))
+  }
+
+  @inline
+  private def hostMatched(requestContext: AuthorizableRequestContext,
+  binding: AclBinding): Boolean =
+
(binding.entry().host().equals(requestContext.clientAddress().getHostAddress)
+  || binding.entry().host().equals(AclEntry.WildcardHost))
+
+  @inline
+  private def principleMatched(requestContext: AuthorizableRequestContext,

Review comment:
   commit 6ab95d3668b3de27a7f6f58fc171a1e2e8925f69





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] d8tltanc commented on a change in pull request #9485: KAKFA-10619: Idempotent producer will get authorized once it has a WRITE access to at least one topic

2020-12-01 Thread GitBox


d8tltanc commented on a change in pull request #9485:
URL: https://github.com/apache/kafka/pull/9485#discussion_r533919425



##
File path: core/src/main/scala/kafka/security/authorizer/AclAuthorizer.scala
##
@@ -304,6 +308,122 @@ class AclAuthorizer extends Authorizer with Logging {
 if (zkClient != null) zkClient.close()
   }
 
+  // TODO: 1. Discuss how to log audit message
+  // TODO: 2. Discuss if we need a trie to optimize(mainly for the O(n^2) loop 
but I think
+  //  in most of the cases it would be O(1) because denyDominatePrefixAllow 
should be rare
+  override def authorizeByResourceType(requestContext: 
AuthorizableRequestContext,
+   op: AclOperation,
+   resourceType: ResourceType): 
AuthorizationResult = {
+if (resourceType eq ResourceType.ANY)
+  throw new IllegalArgumentException("Must specify a non-filter resource 
type for authorizeByResourceType")
+
+if (resourceType eq ResourceType.UNKNOWN)
+  throw new IllegalArgumentException("Unknown resource type")
+
+if (op eq AclOperation.ANY)
+  throw new IllegalArgumentException("Must specify a non-filter operation 
type for authorizeByResourceType")
+
+if (op eq AclOperation.UNKNOWN)
+  throw new IllegalArgumentException("Unknown operation type")

Review comment:
   commit f6d2a39706998160ebe77a854b8bf64268eec68a





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] d8tltanc commented on a change in pull request #9485: KAKFA-10619: Idempotent producer will get authorized once it has a WRITE access to at least one topic

2020-12-01 Thread GitBox


d8tltanc commented on a change in pull request #9485:
URL: https://github.com/apache/kafka/pull/9485#discussion_r533916889



##
File path: 
clients/src/main/java/org/apache/kafka/server/authorizer/Authorizer.java
##
@@ -139,4 +152,125 @@
  * @return Iterator for ACL bindings, which may be populated lazily.
  */
 Iterable acls(AclBindingFilter filter);
+
+/**
+ * Check if the caller is authorized to perform the given ACL operation on 
at least one
+ * resource of the given type.
+ *
+ * @param requestContext Request context including request resourceType, 
security protocol, and listener name
+ * @param op The ACL operation to check
+ * @param resourceType   The resource type to check
+ * @return   Return {@link AuthorizationResult#ALLOWED} if the 
caller is authorized to perform the
+ *   given ACL operation on at least one resource of 
the given type.
+ *   Return {@link AuthorizationResult#DENIED} 
otherwise.
+ */
+default AuthorizationResult 
authorizeByResourceType(AuthorizableRequestContext requestContext, AclOperation 
op, ResourceType resourceType) {
+if (resourceType == ResourceType.ANY) {
+throw new IllegalArgumentException(
+"Must specify a non-filter resource type for 
authorizeByResourceType");
+}
+
+if (resourceType == ResourceType.UNKNOWN) {
+throw new IllegalArgumentException(
+"Unknown resource type");
+}
+
+if (op == AclOperation.ANY) {
+throw new IllegalArgumentException(
+"Must specify a non-filter operation type for 
authorizeByResourceType");
+}
+
+if (op == AclOperation.UNKNOWN) {
+throw new IllegalArgumentException(
+"Unknown operation type");
+}
+
+ResourcePatternFilter resourceTypeFilter = new ResourcePatternFilter(
+resourceType, null, PatternType.ANY);
+AclBindingFilter aclFilter = new AclBindingFilter(
+resourceTypeFilter, AccessControlEntryFilter.ANY);
+
+final int typeLiteral = 0;
+final int typePrefix = 1;
+
+List> deny = new ArrayList<>(
+Arrays.asList(new HashSet<>(), new HashSet<>()));
+List> allow = new ArrayList<>(
+Arrays.asList(new HashSet<>(), new HashSet<>()));
+
+boolean hasWildCardAllow = false;
+
+for (AclBinding binding : acls(aclFilter)) {
+if 
(!binding.entry().host().equals(requestContext.clientAddress().getHostAddress())

Review comment:
   Right. Just as what we've done to Principle. commit 
29ac8628089ddf1210072bbf52e01a41e123a718





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] d8tltanc commented on a change in pull request #9485: KAKFA-10619: Idempotent producer will get authorized once it has a WRITE access to at least one topic

2020-12-01 Thread GitBox


d8tltanc commented on a change in pull request #9485:
URL: https://github.com/apache/kafka/pull/9485#discussion_r533915261



##
File path: 
clients/src/main/java/org/apache/kafka/server/authorizer/Authorizer.java
##
@@ -139,4 +152,125 @@
  * @return Iterator for ACL bindings, which may be populated lazily.
  */
 Iterable acls(AclBindingFilter filter);
+
+/**
+ * Check if the caller is authorized to perform the given ACL operation on 
at least one
+ * resource of the given type.
+ *
+ * @param requestContext Request context including request resourceType, 
security protocol, and listener name
+ * @param op The ACL operation to check
+ * @param resourceType   The resource type to check
+ * @return   Return {@link AuthorizationResult#ALLOWED} if the 
caller is authorized to perform the
+ *   given ACL operation on at least one resource of 
the given type.
+ *   Return {@link AuthorizationResult#DENIED} 
otherwise.
+ */
+default AuthorizationResult 
authorizeByResourceType(AuthorizableRequestContext requestContext, AclOperation 
op, ResourceType resourceType) {
+if (resourceType == ResourceType.ANY) {
+throw new IllegalArgumentException(
+"Must specify a non-filter resource type for 
authorizeByResourceType");
+}
+
+if (resourceType == ResourceType.UNKNOWN) {
+throw new IllegalArgumentException(
+"Unknown resource type");
+}
+
+if (op == AclOperation.ANY) {
+throw new IllegalArgumentException(
+"Must specify a non-filter operation type for 
authorizeByResourceType");
+}
+
+if (op == AclOperation.UNKNOWN) {
+throw new IllegalArgumentException(
+"Unknown operation type");
+}
+
+ResourcePatternFilter resourceTypeFilter = new ResourcePatternFilter(
+resourceType, null, PatternType.ANY);
+AclBindingFilter aclFilter = new AclBindingFilter(
+resourceTypeFilter, AccessControlEntryFilter.ANY);
+
+final int typeLiteral = 0;
+final int typePrefix = 1;
+
+List> deny = new ArrayList<>(
+Arrays.asList(new HashSet<>(), new HashSet<>()));

Review comment:
   EnumMap make sense. commit 1a139ce744a279e4424188008ee5158186b0fcbe

##
File path: 
clients/src/main/java/org/apache/kafka/server/authorizer/Authorizer.java
##
@@ -139,4 +152,125 @@
  * @return Iterator for ACL bindings, which may be populated lazily.
  */
 Iterable acls(AclBindingFilter filter);
+
+/**
+ * Check if the caller is authorized to perform the given ACL operation on 
at least one
+ * resource of the given type.
+ *
+ * @param requestContext Request context including request resourceType, 
security protocol, and listener name
+ * @param op The ACL operation to check
+ * @param resourceType   The resource type to check
+ * @return   Return {@link AuthorizationResult#ALLOWED} if the 
caller is authorized to perform the
+ *   given ACL operation on at least one resource of 
the given type.
+ *   Return {@link AuthorizationResult#DENIED} 
otherwise.
+ */
+default AuthorizationResult 
authorizeByResourceType(AuthorizableRequestContext requestContext, AclOperation 
op, ResourceType resourceType) {
+if (resourceType == ResourceType.ANY) {
+throw new IllegalArgumentException(
+"Must specify a non-filter resource type for 
authorizeByResourceType");
+}
+
+if (resourceType == ResourceType.UNKNOWN) {
+throw new IllegalArgumentException(
+"Unknown resource type");
+}
+
+if (op == AclOperation.ANY) {
+throw new IllegalArgumentException(
+"Must specify a non-filter operation type for 
authorizeByResourceType");
+}
+
+if (op == AclOperation.UNKNOWN) {
+throw new IllegalArgumentException(
+"Unknown operation type");
+}
+
+ResourcePatternFilter resourceTypeFilter = new ResourcePatternFilter(
+resourceType, null, PatternType.ANY);
+AclBindingFilter aclFilter = new AclBindingFilter(
+resourceTypeFilter, AccessControlEntryFilter.ANY);
+
+final int typeLiteral = 0;
+final int typePrefix = 1;

Review comment:
   EnumMap make sense. commit 1a139ce744a279e4424188008ee5158186b0fcbe





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] d8tltanc commented on a change in pull request #9485: KAKFA-10619: Idempotent producer will get authorized once it has a WRITE access to at least one topic

2020-12-01 Thread GitBox


d8tltanc commented on a change in pull request #9485:
URL: https://github.com/apache/kafka/pull/9485#discussion_r533915142



##
File path: 
clients/src/main/java/org/apache/kafka/server/authorizer/Authorizer.java
##
@@ -139,4 +152,125 @@
  * @return Iterator for ACL bindings, which may be populated lazily.
  */
 Iterable acls(AclBindingFilter filter);
+
+/**
+ * Check if the caller is authorized to perform the given ACL operation on 
at least one
+ * resource of the given type.
+ *
+ * @param requestContext Request context including request resourceType, 
security protocol, and listener name
+ * @param op The ACL operation to check
+ * @param resourceType   The resource type to check
+ * @return   Return {@link AuthorizationResult#ALLOWED} if the 
caller is authorized to perform the
+ *   given ACL operation on at least one resource of 
the given type.
+ *   Return {@link AuthorizationResult#DENIED} 
otherwise.
+ */
+default AuthorizationResult 
authorizeByResourceType(AuthorizableRequestContext requestContext, AclOperation 
op, ResourceType resourceType) {
+if (resourceType == ResourceType.ANY) {
+throw new IllegalArgumentException(
+"Must specify a non-filter resource type for 
authorizeByResourceType");
+}
+
+if (resourceType == ResourceType.UNKNOWN) {
+throw new IllegalArgumentException(
+"Unknown resource type");
+}
+
+if (op == AclOperation.ANY) {
+throw new IllegalArgumentException(
+"Must specify a non-filter operation type for 
authorizeByResourceType");
+}
+
+if (op == AclOperation.UNKNOWN) {
+throw new IllegalArgumentException(
+"Unknown operation type");
+}
+
+ResourcePatternFilter resourceTypeFilter = new ResourcePatternFilter(
+resourceType, null, PatternType.ANY);
+AclBindingFilter aclFilter = new AclBindingFilter(
+resourceTypeFilter, AccessControlEntryFilter.ANY);
+
+final int typeLiteral = 0;
+final int typePrefix = 1;
+
+List> deny = new ArrayList<>(
+Arrays.asList(new HashSet<>(), new HashSet<>()));
+List> allow = new ArrayList<>(
+Arrays.asList(new HashSet<>(), new HashSet<>()));
+
+boolean hasWildCardAllow = false;
+
+for (AclBinding binding : acls(aclFilter)) {
+if 
(!binding.entry().host().equals(requestContext.clientAddress().getHostAddress())
+&& !binding.entry().host().equals("*"))
+continue;
+
+KafkaPrincipal principal = new KafkaPrincipal(
+requestContext.principal().getPrincipalType(),
+requestContext.principal().getName());

Review comment:
   Yeah. Took out. commit 1a139ce744a279e4424188008ee5158186b0fcbe





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] d8tltanc commented on a change in pull request #9485: KAKFA-10619: Idempotent producer will get authorized once it has a WRITE access to at least one topic

2020-12-01 Thread GitBox


d8tltanc commented on a change in pull request #9485:
URL: https://github.com/apache/kafka/pull/9485#discussion_r533915036



##
File path: 
clients/src/main/java/org/apache/kafka/server/authorizer/Authorizer.java
##
@@ -139,4 +152,125 @@
  * @return Iterator for ACL bindings, which may be populated lazily.
  */
 Iterable acls(AclBindingFilter filter);
+
+/**
+ * Check if the caller is authorized to perform the given ACL operation on 
at least one
+ * resource of the given type.
+ *
+ * @param requestContext Request context including request resourceType, 
security protocol, and listener name
+ * @param op The ACL operation to check
+ * @param resourceType   The resource type to check
+ * @return   Return {@link AuthorizationResult#ALLOWED} if the 
caller is authorized to perform the
+ *   given ACL operation on at least one resource of 
the given type.
+ *   Return {@link AuthorizationResult#DENIED} 
otherwise.
+ */
+default AuthorizationResult 
authorizeByResourceType(AuthorizableRequestContext requestContext, AclOperation 
op, ResourceType resourceType) {
+if (resourceType == ResourceType.ANY) {
+throw new IllegalArgumentException(
+"Must specify a non-filter resource type for 
authorizeByResourceType");
+}
+
+if (resourceType == ResourceType.UNKNOWN) {
+throw new IllegalArgumentException(
+"Unknown resource type");
+}
+
+if (op == AclOperation.ANY) {
+throw new IllegalArgumentException(
+"Must specify a non-filter operation type for 
authorizeByResourceType");
+}
+
+if (op == AclOperation.UNKNOWN) {
+throw new IllegalArgumentException(
+"Unknown operation type");
+}
+
+ResourcePatternFilter resourceTypeFilter = new ResourcePatternFilter(
+resourceType, null, PatternType.ANY);
+AclBindingFilter aclFilter = new AclBindingFilter(
+resourceTypeFilter, AccessControlEntryFilter.ANY);
+
+final int typeLiteral = 0;
+final int typePrefix = 1;
+
+List> deny = new ArrayList<>(
+Arrays.asList(new HashSet<>(), new HashSet<>()));
+List> allow = new ArrayList<>(
+Arrays.asList(new HashSet<>(), new HashSet<>()));
+
+boolean hasWildCardAllow = false;
+
+for (AclBinding binding : acls(aclFilter)) {
+if 
(!binding.entry().host().equals(requestContext.clientAddress().getHostAddress())
+&& !binding.entry().host().equals("*"))
+continue;
+
+KafkaPrincipal principal = new KafkaPrincipal(
+requestContext.principal().getPrincipalType(),
+requestContext.principal().getName());
+
+if 
(!SecurityUtils.parseKafkaPrincipal(binding.entry().principal()).equals(principal)
+&& !binding.entry().principal().equals("User:*"))
+continue;
+
+if (binding.entry().operation() != op
+&& binding.entry().operation() != AclOperation.ALL)
+continue;
+
+if (binding.entry().permissionType() == AclPermissionType.DENY) {
+switch (binding.pattern().patternType()) {
+case LITERAL:
+if 
(binding.pattern().name().equals(ResourcePattern.WILDCARD_RESOURCE))
+return AuthorizationResult.DENIED;
+deny.get(typeLiteral).add(binding.pattern().name());
+break;
+case PREFIXED:
+deny.get(typePrefix).add(binding.pattern().name());
+break;
+}
+continue;
+}
+
+if (binding.entry().permissionType() != AclPermissionType.ALLOW)
+continue;
+
+switch (binding.pattern().patternType()) {
+case LITERAL:
+if 
(binding.pattern().name().equals(ResourcePattern.WILDCARD_RESOURCE)) {
+hasWildCardAllow = true;
+continue;
+}
+allow.get(typeLiteral).add(binding.pattern().name());
+break;
+case PREFIXED:
+allow.get(typePrefix).add(binding.pattern().name());
+break;
+}
+}
+
+if (hasWildCardAllow) {
+return AuthorizationResult.ALLOWED;
+}
+
+for (int allowType : Arrays.asList(typePrefix, typeLiteral)) {

Review comment:
   EnumMap make sense. commit 1a139ce744a279e4424188008ee5158186b0fcbe





This is an automated message from the Apache Git Service.
To respond to the 

[jira] [Commented] (KAFKA-10665) Flaky Test StreamTableJoinTopologyOptimizationIntegrationTest.shouldDoStreamTableJoinWithDifferentNumberOfPartitions[Optimization = all]

2020-12-01 Thread Luke Chen (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-10665?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17242062#comment-17242062
 ] 

Luke Chen commented on KAFKA-10665:
---

org.apache.kafka.streams.integration.StreamTableJoinTopologyOptimizationIntegrationTest.shouldDoStreamTableJoinWithDifferentNumberOfPartitions[Optimization
 = none]
Failing for the past 1 build (Since 
[!https://ci-builds.apache.org/static/23d98232/images/16x16/red.png! 
#294|https://ci-builds.apache.org/job/Kafka/job/kafka-trunk-jdk15/294/] )
[Took 4.5 
sec.|https://ci-builds.apache.org/job/Kafka/job/kafka-trunk-jdk15/294/testReport/junit/org.apache.kafka.streams.integration/StreamTableJoinTopologyOptimizationIntegrationTest/shouldDoStreamTableJoinWithDifferentNumberOfPartitions_Optimization___none_/history]
 
h3. Error Message

java.nio.file.NoSuchFileException: 
/tmp/kafka-3251174604229833116/app-StreamTableJoinTopologyOptimizationIntegrationTestshouldDoStreamTableJoinWithDifferentNumberOfPartitions_Optimization___none_/1_1/.checkpoint.tmp
h3. Stacktrace

java.nio.file.NoSuchFileException: 
/tmp/kafka-3251174604229833116/app-StreamTableJoinTopologyOptimizationIntegrationTestshouldDoStreamTableJoinWithDifferentNumberOfPartitions_Optimization___none_/1_1/.checkpoint.tmp
 at 
java.base/sun.nio.fs.UnixException.translateToIOException(UnixException.java:92)
 at 
java.base/sun.nio.fs.UnixException.rethrowAsIOException(UnixException.java:106) 
at 
java.base/sun.nio.fs.UnixException.rethrowAsIOException(UnixException.java:111) 
at 
java.base/sun.nio.fs.UnixFileAttributeViews$Basic.readAttributes(UnixFileAttributeViews.java:55)
 at 
java.base/sun.nio.fs.UnixFileSystemProvider.readAttributes(UnixFileSystemProvider.java:148)
 at 
java.base/sun.nio.fs.LinuxFileSystemProvider.readAttributes(LinuxFileSystemProvider.java:99)
 at java.base/java.nio.file.Files.readAttributes(Files.java:1843) at 
java.base/java.nio.file.FileTreeWalker.getAttributes(FileTreeWalker.java:219) 
at java.base/java.nio.file.FileTreeWalker.visit(FileTreeWalker.java:276) at 
java.base/java.nio.file.FileTreeWalker.next(FileTreeWalker.java:373) at 
java.base/java.nio.file.Files.walkFileTree(Files.java:2840) at 
java.base/java.nio.file.Files.walkFileTree(Files.java:2876) at 
org.apache.kafka.common.utils.Utils.delete(Utils.java:841) at 
org.apache.kafka.common.utils.Utils.delete(Utils.java:827) at 
org.apache.kafka.streams.integration.utils.IntegrationTestUtils.purgeLocalStreamsState(IntegrationTestUtils.java:151)
 at 
org.apache.kafka.streams.integration.StreamTableJoinTopologyOptimizationIntegrationTest.whenShuttingDown(StreamTableJoinTopologyOptimizationIntegrationTest.java:122)
 at java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke0(Native 
Method) at 
java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:64)
 at 
java.base/jdk.internal.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
 at java.base/java.lang.reflect.Method.invoke(Method.java:564) at 
org.junit.runners.model.FrameworkMethod$1.runReflectiveCall(FrameworkMethod.java:59)
 at 
org.junit.internal.runners.model.ReflectiveCallable.run(ReflectiveCallable.java:12)
 at 
org.junit.runners.model.FrameworkMethod.invokeExplosively(FrameworkMethod.java:56)
 at 
org.junit.internal.runners.statements.RunAfters.invokeMethod(RunAfters.java:46) 
at org.junit.internal.runners.statements.RunAfters.evaluate(RunAfters.java:33) 
at org.junit.rules.TestWatcher$1.evaluate(TestWatcher.java:61) at 
org.junit.runners.ParentRunner$3.evaluate(ParentRunner.java:306) at 
org.junit.runners.BlockJUnit4ClassRunner$1.evaluate(BlockJUnit4ClassRunner.java:100)
 at org.junit.runners.ParentRunner.runLeaf(ParentRunner.java:366) at 
org.junit.runners.BlockJUnit4ClassRunner.runChild(BlockJUnit4ClassRunner.java:103)
 at 
org.junit.runners.BlockJUnit4ClassRunner.runChild(BlockJUnit4ClassRunner.java:63)
 at org.junit.runners.ParentRunner$4.run(ParentRunner.java:331) at 
org.junit.runners.ParentRunner$1.schedule(ParentRunner.java:79) at 
org.junit.runners.ParentRunner.runChildren(ParentRunner.java:329) at 
org.junit.runners.ParentRunner.access$100(ParentRunner.java:66) at 
org.junit.runners.ParentRunner$2.evaluate(ParentRunner.java:293) at 
org.junit.runners.ParentRunner.run(ParentRunner.java:413) at 
org.junit.runners.Suite.runChild(Suite.java:128) at 
org.junit.runners.Suite.runChild(Suite.java:27) at 
org.junit.runners.ParentRunner$4.run(ParentRunner.java:331) at 
org.junit.runners.ParentRunner$1.schedule(ParentRunner.java:79) at 
org.junit.runners.ParentRunner.runChildren(ParentRunner.java:329) at 
org.junit.runners.ParentRunner.access$100(ParentRunner.java:66) at 
org.junit.runners.ParentRunner$2.evaluate(ParentRunner.java:293) at 
org.junit.rules.ExternalResource$1.evaluate(ExternalResource.java:54) at 
org.junit.rules.RunRules.evaluate(RunRules.java:20) at 

[jira] [Commented] (KAFKA-10665) Flaky Test StreamTableJoinTopologyOptimizationIntegrationTest.shouldDoStreamTableJoinWithDifferentNumberOfPartitions[Optimization = all]

2020-12-01 Thread Luke Chen (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-10665?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17242063#comment-17242063
 ] 

Luke Chen commented on KAFKA-10665:
---

org.apache.kafka.streams.integration.StreamTableJoinTopologyOptimizationIntegrationTest.shouldDoStreamTableJoinWithDifferentNumberOfPartitions[Optimization
 = all]
Failing for the past 1 build (Since 
[!https://ci-builds.apache.org/static/23d98232/images/16x16/red.png! 
#291|https://ci-builds.apache.org/job/Kafka/job/kafka-trunk-jdk15/291/] )
[Took 4.5 
sec.|https://ci-builds.apache.org/job/Kafka/job/kafka-trunk-jdk15/291/testReport/junit/org.apache.kafka.streams.integration/StreamTableJoinTopologyOptimizationIntegrationTest/shouldDoStreamTableJoinWithDifferentNumberOfPartitions_Optimization___all_/history]
 
h3. Error Message

java.nio.file.DirectoryNotEmptyException: 
/tmp/kafka-12480882490717231293/app-StreamTableJoinTopologyOptimizationIntegrationTestshouldDoStreamTableJoinWithDifferentNumberOfPartitions_Optimization___all_
h3. Stacktrace

java.nio.file.DirectoryNotEmptyException: 
/tmp/kafka-12480882490717231293/app-StreamTableJoinTopologyOptimizationIntegrationTestshouldDoStreamTableJoinWithDifferentNumberOfPartitions_Optimization___all_
 at 
java.base/sun.nio.fs.UnixFileSystemProvider.implDelete(UnixFileSystemProvider.java:246)
 at 
java.base/sun.nio.fs.AbstractFileSystemProvider.delete(AbstractFileSystemProvider.java:105)
 at java.base/java.nio.file.Files.delete(Files.java:1146) at 
org.apache.kafka.common.utils.Utils$2.postVisitDirectory(Utils.java:871) at 
org.apache.kafka.common.utils.Utils$2.postVisitDirectory(Utils.java:841) at 
java.base/java.nio.file.Files.walkFileTree(Files.java:2822) at 
java.base/java.nio.file.Files.walkFileTree(Files.java:2876) at 
org.apache.kafka.common.utils.Utils.delete(Utils.java:841) at 
org.apache.kafka.common.utils.Utils.delete(Utils.java:827) at 
org.apache.kafka.streams.integration.utils.IntegrationTestUtils.purgeLocalStreamsState(IntegrationTestUtils.java:151)
 at 
org.apache.kafka.streams.integration.StreamTableJoinTopologyOptimizationIntegrationTest.whenShuttingDown(StreamTableJoinTopologyOptimizationIntegrationTest.java:122)
 at java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke0(Native 
Method) at 
java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:64)
 at 
java.base/jdk.internal.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
 at java.base/java.lang.reflect.Method.invoke(Method.java:564) at 
org.junit.runners.model.FrameworkMethod$1.runReflectiveCall(FrameworkMethod.java:59)
 at 
org.junit.internal.runners.model.ReflectiveCallable.run(ReflectiveCallable.java:12)
 at 
org.junit.runners.model.FrameworkMethod.invokeExplosively(FrameworkMethod.java:56)
 at 
org.junit.internal.runners.statements.RunAfters.invokeMethod(RunAfters.java:46) 
at org.junit.internal.runners.statements.RunAfters.evaluate(RunAfters.java:33) 
at org.junit.rules.TestWatcher$1.evaluate(TestWatcher.java:61) at 
org.junit.runners.ParentRunner$3.evaluate(ParentRunner.java:306) at 
org.junit.runners.BlockJUnit4ClassRunner$1.evaluate(BlockJUnit4ClassRunner.java:100)
 at org.junit.runners.ParentRunner.runLeaf(ParentRunner.java:366) at 
org.junit.runners.BlockJUnit4ClassRunner.runChild(BlockJUnit4ClassRunner.java:103)
 at 
org.junit.runners.BlockJUnit4ClassRunner.runChild(BlockJUnit4ClassRunner.java:63)
 at org.junit.runners.ParentRunner$4.run(ParentRunner.java:331) at 
org.junit.runners.ParentRunner$1.schedule(ParentRunner.java:79) at 
org.junit.runners.ParentRunner.runChildren(ParentRunner.java:329) at 
org.junit.runners.ParentRunner.access$100(ParentRunner.java:66) at 
org.junit.runners.ParentRunner$2.evaluate(ParentRunner.java:293) at 
org.junit.runners.ParentRunner.run(ParentRunner.java:413) at 
org.junit.runners.Suite.runChild(Suite.java:128) at 
org.junit.runners.Suite.runChild(Suite.java:27) at 
org.junit.runners.ParentRunner$4.run(ParentRunner.java:331) at 
org.junit.runners.ParentRunner$1.schedule(ParentRunner.java:79) at 
org.junit.runners.ParentRunner.runChildren(ParentRunner.java:329) at 
org.junit.runners.ParentRunner.access$100(ParentRunner.java:66) at 
org.junit.runners.ParentRunner$2.evaluate(ParentRunner.java:293) at 
org.junit.rules.ExternalResource$1.evaluate(ExternalResource.java:54) at 
org.junit.rules.RunRules.evaluate(RunRules.java:20) at 
org.junit.runners.ParentRunner$3.evaluate(ParentRunner.java:306) at 
org.junit.runners.ParentRunner.run(ParentRunner.java:413) at 
org.gradle.api.internal.tasks.testing.junit.JUnitTestClassExecutor.runTestClass(JUnitTestClassExecutor.java:110)
 at 
org.gradle.api.internal.tasks.testing.junit.JUnitTestClassExecutor.execute(JUnitTestClassExecutor.java:58)
 at 
org.gradle.api.internal.tasks.testing.junit.JUnitTestClassExecutor.execute(JUnitTestClassExecutor.java:38)
 at 

[jira] [Assigned] (KAFKA-10665) Flaky Test StreamTableJoinTopologyOptimizationIntegrationTest.shouldDoStreamTableJoinWithDifferentNumberOfPartitions[Optimization = all]

2020-12-01 Thread Luke Chen (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-10665?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Luke Chen reassigned KAFKA-10665:
-

Assignee: Luke Chen

> Flaky Test 
> StreamTableJoinTopologyOptimizationIntegrationTest.shouldDoStreamTableJoinWithDifferentNumberOfPartitions[Optimization
>  = all]
> 
>
> Key: KAFKA-10665
> URL: https://issues.apache.org/jira/browse/KAFKA-10665
> Project: Kafka
>  Issue Type: Bug
>  Components: streams
>Reporter: A. Sophie Blee-Goldman
>Assignee: Luke Chen
>Priority: Major
>  Labels: flaky-test
>
> {code:java}
> java.nio.file.DirectoryNotEmptyException: 
> /tmp/kafka-13241964730537515637/app-StreamTableJoinTopologyOptimizationIntegrationTestshouldDoStreamTableJoinWithDifferentNumberOfPartitions_Optimization___all_
>   at 
> java.base/sun.nio.fs.UnixFileSystemProvider.implDelete(UnixFileSystemProvider.java:246)
>   at 
> java.base/sun.nio.fs.AbstractFileSystemProvider.delete(AbstractFileSystemProvider.java:105)
>   at java.base/java.nio.file.Files.delete(Files.java:1146)
>   at 
> org.apache.kafka.common.utils.Utils$2.postVisitDirectory(Utils.java:869)
>   at 
> org.apache.kafka.common.utils.Utils$2.postVisitDirectory(Utils.java:839)
>   at java.base/java.nio.file.Files.walkFileTree(Files.java:2822)
>   at java.base/java.nio.file.Files.walkFileTree(Files.java:2876)
>   at org.apache.kafka.common.utils.Utils.delete(Utils.java:839)
>   at org.apache.kafka.common.utils.Utils.delete(Utils.java:825)
>   at 
> org.apache.kafka.streams.integration.utils.IntegrationTestUtils.purgeLocalStreamsState(IntegrationTestUtils.java:151)
>   at 
> org.apache.kafka.streams.integration.StreamTableJoinTopologyOptimizationIntegrationTest.whenShuttingDown(StreamTableJoinTopologyOptimizationIntegrationTest.java:122)
> {code}
> https://github.com/apache/kafka/pull/9515/checks?check_run_id=1333753280



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[GitHub] [kafka] d8tltanc commented on a change in pull request #9485: KAKFA-10619: Idempotent producer will get authorized once it has a WRITE access to at least one topic

2020-12-01 Thread GitBox


d8tltanc commented on a change in pull request #9485:
URL: https://github.com/apache/kafka/pull/9485#discussion_r533765909



##
File path: 
jmh-benchmarks/src/main/java/org/apache/kafka/jmh/acl/AclAuthorizerBenchmark.java
##
@@ -69,33 +70,39 @@
 @BenchmarkMode(Mode.AverageTime)
 @OutputTimeUnit(TimeUnit.MILLISECONDS)
 public class AclAuthorizerBenchmark {
-@Param({"1", "5", "20"})
+@Param({"1", "4", "8"})

Review comment:
   The underlying algorithm of AuthorizeByResourceType() implementation in 
AclAuthorizer has several characteristics:
   1. If any "allow resource" of the given ACE does not have a dominant "deny 
resource", the API will return immediately
   2. The complexity is O(n*m) where `n` is the number of "allow resources" of 
the given ACE, 'm' is the number of "deny resources" of the given ACE, but not 
related to the number of "ACE" in the cluster.
   
   $1 means that, given an ACE,  suppose `p%` of its "allow resource" does not 
have a dominant "deny resource", if `resourceCount` is `r`, on average, after 
checking `r * p * 0.01` "allow resources", the API will return. 
   a) if we are let the "dominant deny resource" distribute evenly, like use 
the (loop index % something) to determine which "allow resource" should have a 
dominant "deny resource", we end up iterating the same amount of the "allow 
resource" and returning from the API call every time, which is `r*p*0.01`
   b) if we are determine which "allow resource" should have a dominant "deny 
resource", the result will be too noisy. We may iterate only 1 resource or 
iterate all resources based on the randomize algorithm and seed.
   
   $2 means that, the API time cost is not related to the number of "ACE" but 
is hyperbolically increasing when `resourceCount` is increasing. Under the 
assumption in (1), the actual complexity would be (r * r * p * 0.01)
   
   As a result, we should get an insight into how long does the worst case 
takes, as `t`.  Then we can estimate some reasonable values of `p` and then 
estimate the API cost by `t * p`. 
   
   So I was directly testing the worst case, where p = 1, which means 100% of 
the "allow resource" will have a dominant "deny resource. The complexity hence 
would be (r^2). It's rare that a cluster can have 200k "allow resources" and 
200k corresponding "dominant deny resources" for each user, and it's not fair 
to have a relatively smaller `aclCount` and huger `resourceCount`, as the API 
is optimizing the performance by indexing on `ACE`.
   





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] d8tltanc commented on a change in pull request #9485: KAKFA-10619: Idempotent producer will get authorized once it has a WRITE access to at least one topic

2020-12-01 Thread GitBox


d8tltanc commented on a change in pull request #9485:
URL: https://github.com/apache/kafka/pull/9485#discussion_r533765909



##
File path: 
jmh-benchmarks/src/main/java/org/apache/kafka/jmh/acl/AclAuthorizerBenchmark.java
##
@@ -69,33 +70,39 @@
 @BenchmarkMode(Mode.AverageTime)
 @OutputTimeUnit(TimeUnit.MILLISECONDS)
 public class AclAuthorizerBenchmark {
-@Param({"1", "5", "20"})
+@Param({"1", "4", "8"})

Review comment:
   The underlying algorithm of AuthorizeByResourceType() implementation in 
AclAuthorizer has several characteristics:
   1. If any "allow resource" of the given ACE does not have a dominant "deny 
resource", the API will return immediately
   2. The complexity is O(n*m) where `n` is the number of "allow resources" of 
the given ACE, 'm' is the number of "deny resources" of the given ACE, but not 
related to the number of "ACE" in the cluster.
   
   $1 means that, given an ACE,  suppose `p%` of its "allow resource" does not 
have a dominant "deny resource", if `resourceCount` is `r`, on average, after 
checking `r * p * 0.01` "allow resources", the API will return. 
   a) if we are let the "dominant deny resource" distribute evenly, like use 
the (loop index % something) to determine which "allow resource" should have a 
dominant "deny resource", we end up iterating the same amount of the "allow 
resource" and returning from the API call every time, which is `r*p*0.01`
   b) if we are determine which "allow resource" should have a dominant "deny 
resource", the result will be too noisy. We may iterate only 1 resource or 
iterate all resources based on the randomize algorithm and seed.
   
   $2 means that, the API time cost is not related to the number of "ACE" but 
is hyperbolically increasing when `resourceCount` is increasing. Under the 
assumption in (1), the actual complexity would be (r * r * p * 0.01)
   
   As a result, we should get an insight into how long does the worst case 
takes, as `t`.  Then we can estimate some reasonable values of `p` and 
calculate the corresponding API cost by t * p. 
   
   So I was directly testing the worst case, where p = 1, which means 100% of 
the "allow resource" will have a dominant "deny resource. The complexity hence 
would be (r^2). It's rare that a cluster can have 200k "allow resources" and 
200k corresponding "dominant deny resources" for each user, and it's not fair 
to have a relatively smaller `aclCount` and huger `resourceCount`, as the API 
is optimizing the performance by indexing on `ACE`.
   

##
File path: 
jmh-benchmarks/src/main/java/org/apache/kafka/jmh/acl/AclAuthorizerBenchmark.java
##
@@ -69,33 +70,39 @@
 @BenchmarkMode(Mode.AverageTime)
 @OutputTimeUnit(TimeUnit.MILLISECONDS)
 public class AclAuthorizerBenchmark {
-@Param({"1", "5", "20"})
+@Param({"1", "4", "8"})

Review comment:
   The underlying algorithm of AuthorizeByResourceType() implementation in 
AclAuthorizer has several characteristics:
   1. If any "allow resource" of the given ACE does not have a dominant "deny 
resource", the API will return immediately
   2. The complexity is O(n*m) where `n` is the number of "allow resources" of 
the given ACE, 'm' is the number of "deny resources" of the given ACE, but not 
related to the number of "ACE" in the cluster.
   
   $1 means that, given an ACE,  suppose `p%` of its "allow resource" does not 
have a dominant "deny resource", if `resourceCount` is `r`, on average, after 
checking `r * p * 0.01` "allow resources", the API will return. 
   a) if we are let the "dominant deny resource" distribute evenly, like use 
the (loop index % something) to determine which "allow resource" should have a 
dominant "deny resource", we end up iterating the same amount of the "allow 
resource" and returning from the API call every time, which is `r*p*0.01`
   b) if we are determine which "allow resource" should have a dominant "deny 
resource", the result will be too noisy. We may iterate only 1 resource or 
iterate all resources based on the randomize algorithm and seed.
   
   $2 means that, the API time cost is not related to the number of "ACE" but 
is hyperbolically increasing when `resourceCount` is increasing. Under the 
assumption in (1), the actual complexity would be (r * r * p * 0.01)
   
   As a result, we should get an insight into how long does the worst case 
takes, as `t`.  Then we can estimate some reasonable values of `p` and 
calculate the corresponding API cost by `t * p`. 
   
   So I was directly testing the worst case, where p = 1, which means 100% of 
the "allow resource" will have a dominant "deny resource. The complexity hence 
would be (r^2). It's rare that a cluster can have 200k "allow resources" and 
200k corresponding "dominant deny resources" for each user, and it's not fair 
to have a relatively smaller `aclCount` and huger `resourceCount`, as the API 
is optimizing the performance by indexing 

[GitHub] [kafka] junrao commented on pull request #8826: KAFKA-10090 Misleading warnings: The configuration was supplied but i…

2020-12-01 Thread GitBox


junrao commented on pull request #8826:
URL: https://github.com/apache/kafka/pull/8826#issuecomment-736983541


   @chia7712 :  There are 6 failures in the system test run. Are they related 
to this PR?
   
   
http://confluent-kafka-branch-builder-system-test-results.s3-us-west-2.amazonaws.com/2020-12-01--001.1606874236--chia7712--KAFKA-10090--4da062adc/report.html



This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[jira] [Commented] (KAFKA-10772) java.lang.IllegalStateException: There are insufficient bytes available to read assignment from the sync-group response (actual byte size 0)

2020-12-01 Thread A. Sophie Blee-Goldman (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-10772?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17242025#comment-17242025
 ] 

A. Sophie Blee-Goldman commented on KAFKA-10772:


Hm, the leadership changes definitely rings a bell – maybe this is another 
symptom of KAFKA-10284 ? This should be fixed in 2.6.1 which is currently in 
the progress of being released. You could try out the RC for that release, or 
just build from source if that's an option.

> java.lang.IllegalStateException: There are insufficient bytes available to 
> read assignment from the sync-group response (actual byte size 0)
> 
>
> Key: KAFKA-10772
> URL: https://issues.apache.org/jira/browse/KAFKA-10772
> Project: Kafka
>  Issue Type: Bug
>  Components: streams
>Affects Versions: 2.6.0
>Reporter: Levani Kokhreidze
>Priority: Major
> Attachments: KAFKA-10772.log
>
>
> From time to time we encounter the following exception that results in Kafka 
> Streams threads dying.
> Broker version 2.4.1, Client version 2.6.0
> {code:java}
> Nov 27 00:59:53.681 streaming-app service: prod | streaming-app-2 | 
> stream-client [cluster1-profile-stats-pipeline-client-id] State transition 
> from REBALANCING to ERROR Nov 27 00:59:53.681 streaming-app service: prod | 
> streaming-app-2 | stream-client [cluster1-profile-stats-pipeline-client-id] 
> State transition from REBALANCING to ERROR Nov 27 00:59:53.682 streaming-app 
> service: prod | streaming-app-2 | 2020-11-27 00:59:53.681 ERROR 105 --- 
> [-StreamThread-1] .KafkaStreamsBasedStreamProcessingEngine : Stream 
> processing pipeline: [profile-stats] encountered unrecoverable exception. 
> Thread: [cluster1-profile-stats-pipeline-client-id-StreamThread-1] is 
> completely dead. If all worker threads die, Kafka Streams will be moved to 
> permanent ERROR state. Nov 27 00:59:53.682 streaming-app service: prod | 
> streaming-app-2 | Stream processing pipeline: [profile-stats] encountered 
> unrecoverable exception. Thread: 
> [cluster1-profile-stats-pipeline-client-id-StreamThread-1] is completely 
> dead. If all worker threads die, Kafka Streams will be moved to permanent 
> ERROR state. java.lang.IllegalStateException: There are insufficient bytes 
> available to read assignment from the sync-group response (actual byte size 
> 0) , this is not expected; it is possible that the leader's assign function 
> is buggy and did not return any assignment for this member, or because static 
> member is configured and the protocol is buggy hence did not get the 
> assignment for this member at 
> org.apache.kafka.clients.consumer.internals.ConsumerCoordinator.onJoinComplete(ConsumerCoordinator.java:367)
>  at 
> org.apache.kafka.clients.consumer.internals.AbstractCoordinator.joinGroupIfNeeded(AbstractCoordinator.java:440)
>  at 
> org.apache.kafka.clients.consumer.internals.AbstractCoordinator.ensureActiveGroup(AbstractCoordinator.java:359)
>  at 
> org.apache.kafka.clients.consumer.internals.ConsumerCoordinator.poll(ConsumerCoordinator.java:513)
>  at 
> org.apache.kafka.clients.consumer.KafkaConsumer.updateAssignmentMetadataIfNeeded(KafkaConsumer.java:1268)
>  at 
> org.apache.kafka.clients.consumer.KafkaConsumer.poll(KafkaConsumer.java:1230) 
> at 
> org.apache.kafka.clients.consumer.KafkaConsumer.poll(KafkaConsumer.java:1210) 
> at 
> org.apache.kafka.streams.processor.internals.StreamThread.pollRequests(StreamThread.java:766)
>  at 
> org.apache.kafka.streams.processor.internals.StreamThread.runOnce(StreamThread.java:624)
>  at 
> org.apache.kafka.streams.processor.internals.StreamThread.runLoop(StreamThread.java:551)
>  at 
> org.apache.kafka.streams.processor.internals.StreamThread.run(StreamThread.java:510)
> {code}



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[GitHub] [kafka] ableegoldman commented on a change in pull request #9615: KAFKA-10500: Add thread option

2020-12-01 Thread GitBox


ableegoldman commented on a change in pull request #9615:
URL: https://github.com/apache/kafka/pull/9615#discussion_r533878767



##
File path: streams/src/main/java/org/apache/kafka/streams/KafkaStreams.java
##
@@ -870,43 +899,73 @@ private KafkaStreams(final InternalTopologyBuilder 
internalTopologyBuilder,
 cacheSizePerThread,
 stateDirectory,
 delegatingStateRestoreListener,
-i + 1,
+threadIdx,
 KafkaStreams.this::closeToError,
-this::defaultStreamsUncaughtExceptionHandler
-);
-threads.add(streamThread);
-threadState.put(streamThread.getId(), streamThread.state());
-storeProviders.add(new 
StreamThreadStateStoreProvider(streamThread));
-}
+streamsUncaughtExceptionHandler
+);
+streamThread.setStateListener(streamStateListener);
+threads.add(streamThread);
+threadState.put(streamThread.getId(), streamThread.state());
+storeProviders.add(new StreamThreadStateStoreProvider(streamThread));
+return streamThread;
+}
 
-ClientMetrics.addNumAliveStreamThreadMetric(streamsMetrics, 
(metricsConfig, now) ->
-Math.toIntExact(threads.stream().filter(thread -> 
thread.state().isAlive()).count()));
+/**
+ * Adds and starts a stream thread in addition to the stream threads that 
are already running in this
+ * Kafka Streams client.
+ * 
+ * Since the number of stream threads increases, the sizes of the caches 
in the new stream thread
+ * and the existing stream threads are adapted so that the sum of the 
cache sizes over all stream
+ * threads does not exceed the total cache size specified in configuration
+ * {@link StreamsConfig#CACHE_MAX_BYTES_BUFFERING_CONFIG}.
+ * 
+ * Stream threads can only be added if this Kafka Streams client is in 
state RUNNING or REBALANCING.
+ *
+ * @return name of the added stream thread or empty if a new stream thread 
could not be added
+ */
+public Optional addStreamThread() {
+if (isRunningOrRebalancing()) {
+final int threadIdx = getNextThreadIndex();
+final long cacheSizePerThread = 
getCacheSizePerThread(threads.size() + 1);
+resizeThreadCache(cacheSizePerThread);
+final StreamThread streamThread = 
createStreamThread(cacheSizePerThread, threadIdx);
+synchronized (stateLock) {
+if (isRunningOrRebalancing()) {
+streamThread.start();
+return Optional.of(streamThread.getName());
+} else {
+threads.remove(streamThread);

Review comment:
   We should also shutdown the thread if it doesn't get started, otherwise 
me may leak (consumer or producer) clients. But I'm actually not sure why we 
don't just do everything (resize cache, create thread) inside the synchronized 
block? I'm guessing it would deadlock due to locking on the `statelock` but 
can't we just synchronize on something else that wouldn't interfere with the 
StreamThread creation?

##
File path: 
streams/src/test/java/org/apache/kafka/streams/integration/AdjustStreamThreadCountTest.java
##
@@ -0,0 +1,108 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.streams.integration;
+
+import org.apache.kafka.common.serialization.Serdes;
+import org.apache.kafka.streams.KafkaStreams;
+import org.apache.kafka.streams.StreamsBuilder;
+import org.apache.kafka.streams.StreamsConfig;
+import org.apache.kafka.streams.integration.utils.EmbeddedKafkaCluster;
+import org.apache.kafka.streams.integration.utils.IntegrationTestUtils;
+import org.apache.kafka.streams.processor.ThreadMetadata;
+import org.apache.kafka.test.IntegrationTest;
+import org.apache.kafka.test.StreamsTestUtils;
+import org.apache.kafka.test.TestUtils;
+import org.hamcrest.CoreMatchers;
+import org.junit.After;
+import org.junit.Before;
+import org.junit.ClassRule;
+import org.junit.Rule;
+import org.junit.Test;
+import org.junit.experimental.categories.Category;
+import 

[jira] [Updated] (KAFKA-10794) Replica leader election is too slow in the case of too many partitions

2020-12-01 Thread limeng (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-10794?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

limeng updated KAFKA-10794:
---
Description: 
There is more than 6000 topics and 300 brokers in my kafka cluster, and we set 
the config:'auto.leader.rebalance.enable=true'. 

The reblance process spendes too more time and cpu resource.

We find that the function:'controllerContext.allPartitions' is invoked too many 
times.

!截屏2020-12-02 上午11.43.48.png!

  was:
There is more than 6000 topics and 300 brokers in my kafka cluster, and we set 
the config:'auto.leader.rebalance.enable=true'. 

The reblance process spendes too more time and cpu resource.

We find that the function:'controllerContext.allPartitions' is invoked too many 
times.

!clipboard_image_1606806581629.png!


> Replica leader election is too slow in the case of too many partitions
> --
>
> Key: KAFKA-10794
> URL: https://issues.apache.org/jira/browse/KAFKA-10794
> Project: Kafka
>  Issue Type: Improvement
>  Components: core
>Affects Versions: 2.6.0, 2.5.1
>Reporter: limeng
>Priority: Major
> Attachments: clipboard_image_1606806581629.png, 截屏2020-12-02 
> 上午11.43.48.png
>
>
> There is more than 6000 topics and 300 brokers in my kafka cluster, and we 
> set the config:'auto.leader.rebalance.enable=true'. 
> The reblance process spendes too more time and cpu resource.
> We find that the function:'controllerContext.allPartitions' is invoked too 
> many times.
> !截屏2020-12-02 上午11.43.48.png!



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Updated] (KAFKA-10794) Replica leader election is too slow in the case of too many partitions

2020-12-01 Thread limeng (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-10794?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

limeng updated KAFKA-10794:
---
Attachment: clipboard_image_1606806581629.png

> Replica leader election is too slow in the case of too many partitions
> --
>
> Key: KAFKA-10794
> URL: https://issues.apache.org/jira/browse/KAFKA-10794
> Project: Kafka
>  Issue Type: Improvement
>  Components: core
>Affects Versions: 2.6.0, 2.5.1
>Reporter: limeng
>Priority: Major
> Attachments: clipboard_image_1606806581629.png
>
>




--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Updated] (KAFKA-10794) Replica leader election is too slow in the case of too many partitions

2020-12-01 Thread limeng (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-10794?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

limeng updated KAFKA-10794:
---
Description: 
There is more than 6000 topics and 300 brokers in my kafka cluster, and we set 
the config:'auto.leader.rebalance.enable=true'. 

The reblance process spendes too more time and cpu resource.

We find that the function:'controllerContext.allPartitions' is invoked too many 
times.

!clipboard_image_1606806581629.png!

> Replica leader election is too slow in the case of too many partitions
> --
>
> Key: KAFKA-10794
> URL: https://issues.apache.org/jira/browse/KAFKA-10794
> Project: Kafka
>  Issue Type: Improvement
>  Components: core
>Affects Versions: 2.6.0, 2.5.1
>Reporter: limeng
>Priority: Major
> Attachments: clipboard_image_1606806581629.png
>
>
> There is more than 6000 topics and 300 brokers in my kafka cluster, and we 
> set the config:'auto.leader.rebalance.enable=true'. 
> The reblance process spendes too more time and cpu resource.
> We find that the function:'controllerContext.allPartitions' is invoked too 
> many times.
> !clipboard_image_1606806581629.png!



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[GitHub] [kafka] ableegoldman commented on a change in pull request #9609: KAFKA-6687: restrict DSL to allow only Streams from the same source topics

2020-12-01 Thread GitBox


ableegoldman commented on a change in pull request #9609:
URL: https://github.com/apache/kafka/pull/9609#discussion_r533876280



##
File path: 
streams/src/main/java/org/apache/kafka/streams/kstream/internals/graph/SourceGraphNode.java
##
@@ -0,0 +1,71 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.streams.kstream.internals.graph;
+
+import java.util.Collection;
+import java.util.HashSet;
+import java.util.Set;
+import java.util.regex.Pattern;
+import org.apache.kafka.common.serialization.Serde;
+import org.apache.kafka.streams.kstream.internals.ConsumedInternal;
+
+abstract public class SourceGraphNode extends StreamsGraphNode {
+
+private Collection topicNames;
+private Pattern topicPattern;
+private final ConsumedInternal consumedInternal;
+
+public SourceGraphNode(final String nodeName,
+final Collection topicNames,
+final ConsumedInternal consumedInternal) {
+super(nodeName);
+
+this.topicNames = topicNames;
+this.consumedInternal = consumedInternal;
+}
+
+public SourceGraphNode(final String nodeName,
+final Pattern topicPattern,
+final ConsumedInternal consumedInternal) {
+
+super(nodeName);
+
+this.topicPattern = topicPattern;
+this.consumedInternal = consumedInternal;
+}
+
+public Set topicNames() {
+return new HashSet<>(topicNames);

Review comment:
   Ah ok you meant making it a Set vs a Collection -- I do agree with the 
principle, and I did push the Set-ification of the topics up one level so that 
the actual class field is a Set. But I don't think it's really worth it to push 
it up another layer and Set-ify the constructor argument. For one thing we 
would just have to do the same conversion to a Set but in more places, and more 
importantly, the actual callers of the constructor don't care at all whether 
it's a Set or any other Collection. So I think it actually does make sense to 
convert to a Set inside the constructor body





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] ableegoldman commented on pull request #9671: KAFKA-10793: move handling of FindCoordinatorFuture to fix race condition

2020-12-01 Thread GitBox


ableegoldman commented on pull request #9671:
URL: https://github.com/apache/kafka/pull/9671#issuecomment-736964914


   Waiting to add tests until I get some sanity checks on this proposal



This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] showuon opened a new pull request #9672: MINOR: work in progress for Eos test(don't review)

2020-12-01 Thread GitBox


showuon opened a new pull request #9672:
URL: https://github.com/apache/kafka/pull/9672


   work in progress for Eos test(don't review)
   
   ### Committer Checklist (excluded from commit message)
   - [ ] Verify design and implementation 
   - [ ] Verify test coverage and CI build status
   - [ ] Verify documentation (including upgrade notes)
   



This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[jira] [Resolved] (KAFKA-9263) The new hw is added to incorrect log when ReplicaAlterLogDirsThread is replacing log (fix PlaintextAdminIntegrationTest.testAlterReplicaLogDirs)

2020-12-01 Thread Chia-Ping Tsai (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-9263?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Chia-Ping Tsai resolved KAFKA-9263.
---
Fix Version/s: 2.8.0
   Resolution: Fixed

> The new hw is added to incorrect log when  ReplicaAlterLogDirsThread is 
> replacing log (fix  PlaintextAdminIntegrationTest.testAlterReplicaLogDirs)
> --
>
> Key: KAFKA-9263
> URL: https://issues.apache.org/jira/browse/KAFKA-9263
> Project: Kafka
>  Issue Type: Bug
>  Components: clients
>Affects Versions: 2.4.0
>Reporter: John Roesler
>Assignee: Chia-Ping Tsai
>Priority: Major
>  Labels: flaky-test
> Fix For: 2.8.0
>
>
> This test has failed for me on 
> https://builds.apache.org/job/kafka-pr-jdk11-scala2.12/9691/testReport/junit/kafka.api/AdminClientIntegrationTest/testAlterReplicaLogDirs/
> {noformat}
> Error Message
> org.scalatest.exceptions.TestFailedException: only 0 messages are produced 
> within timeout after replica movement. Producer future 
> Some(Failure(java.util.concurrent.TimeoutException: Timeout after waiting for 
> 1 ms.))
> Stacktrace
> org.scalatest.exceptions.TestFailedException: only 0 messages are produced 
> within timeout after replica movement. Producer future 
> Some(Failure(java.util.concurrent.TimeoutException: Timeout after waiting for 
> 1 ms.))
>   at 
> org.scalatest.Assertions.newAssertionFailedException(Assertions.scala:530)
>   at 
> org.scalatest.Assertions.newAssertionFailedException$(Assertions.scala:529)
>   at 
> org.scalatest.Assertions$.newAssertionFailedException(Assertions.scala:1389)
>   at org.scalatest.Assertions.fail(Assertions.scala:1091)
>   at org.scalatest.Assertions.fail$(Assertions.scala:1087)
>   at org.scalatest.Assertions$.fail(Assertions.scala:1389)
>   at kafka.utils.TestUtils$.waitUntilTrue(TestUtils.scala:842)
>   at 
> kafka.api.AdminClientIntegrationTest.testAlterReplicaLogDirs(AdminClientIntegrationTest.scala:459)
>   at 
> java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
>   at 
> java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
>   at 
> java.base/jdk.internal.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
>   at java.base/java.lang.reflect.Method.invoke(Method.java:566)
>   at 
> org.junit.runners.model.FrameworkMethod$1.runReflectiveCall(FrameworkMethod.java:59)
>   at 
> org.junit.internal.runners.model.ReflectiveCallable.run(ReflectiveCallable.java:12)
>   at 
> org.junit.runners.model.FrameworkMethod.invokeExplosively(FrameworkMethod.java:56)
>   at 
> org.junit.internal.runners.statements.InvokeMethod.evaluate(InvokeMethod.java:17)
>   at 
> org.junit.internal.runners.statements.RunBefores.evaluate(RunBefores.java:26)
>   at 
> org.junit.internal.runners.statements.RunAfters.evaluate(RunAfters.java:27)
>   at 
> org.junit.internal.runners.statements.FailOnTimeout$CallableStatement.call(FailOnTimeout.java:288)
>   at 
> org.junit.internal.runners.statements.FailOnTimeout$CallableStatement.call(FailOnTimeout.java:282)
>   at java.base/java.util.concurrent.FutureTask.run(FutureTask.java:264)
>   at java.base/java.lang.Thread.run(Thread.java:834)
> Standard Output
> [2019-12-03 04:54:16,111] ERROR [ReplicaFetcher replicaId=2, leaderId=1, 
> fetcherId=0] Error for partition unclean-test-topic-1-0 at offset 0 
> (kafka.server.ReplicaFetcherThread:76)
> org.apache.kafka.common.errors.UnknownTopicOrPartitionException: This server 
> does not host this topic-partition.
> [2019-12-03 04:54:21,711] ERROR [ReplicaFetcher replicaId=0, leaderId=1, 
> fetcherId=0] Error for partition topic-0 at offset 0 
> (kafka.server.ReplicaFetcherThread:76)
> org.apache.kafka.common.errors.UnknownTopicOrPartitionException: This server 
> does not host this topic-partition.
> [2019-12-03 04:54:21,712] ERROR [ReplicaFetcher replicaId=2, leaderId=1, 
> fetcherId=0] Error for partition topic-0 at offset 0 
> (kafka.server.ReplicaFetcherThread:76)
> org.apache.kafka.common.errors.UnknownTopicOrPartitionException: This server 
> does not host this topic-partition.
> [2019-12-03 04:54:27,092] ERROR [ReplicaFetcher replicaId=2, leaderId=1, 
> fetcherId=0] Error for partition unclean-test-topic-1-0 at offset 0 
> (kafka.server.ReplicaFetcherThread:76)
> org.apache.kafka.common.errors.UnknownTopicOrPartitionException: This server 
> does not host this topic-partition.
> [2019-12-03 04:54:27,091] ERROR [ReplicaFetcher replicaId=0, leaderId=1, 
> fetcherId=0] Error for partition unclean-test-topic-1-1 at offset 0 
> 

[GitHub] [kafka] chia7712 merged pull request #9423: KAFKA-9263 The new hw is added to incorrect log when ReplicaAlterLogD…

2020-12-01 Thread GitBox


chia7712 merged pull request #9423:
URL: https://github.com/apache/kafka/pull/9423


   



This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[jira] [Commented] (KAFKA-9263) Reocurrence: Transient failure in kafka.api.PlaintextAdminIntegrationTest.testLogStartOffsetCheckpoint and kafka.api.PlaintextAdminIntegrationTest.testAlterReplicaLogDi

2020-12-01 Thread Chia-Ping Tsai (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-9263?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17242016#comment-17242016
 ] 

Chia-Ping Tsai commented on KAFKA-9263:
---

PlaintextAdminIntegrationTest.testLogStartOffsetCheckpoint does not fail 
recently and I looped it 200 times, all pass.

The https://github.com/apache/kafka/pull/9423 which fixes 
kafka.api.PlaintextAdminIntegrationTest.testAlterReplicaLogDirs is going to be 
merged so I will revise the title of this issue (i.e remove 
PlaintextAdminIntegrationTest.testLogStartOffsetCheckpoint)

> Reocurrence: Transient failure in 
> kafka.api.PlaintextAdminIntegrationTest.testLogStartOffsetCheckpoint and 
> kafka.api.PlaintextAdminIntegrationTest.testAlterReplicaLogDirs
> --
>
> Key: KAFKA-9263
> URL: https://issues.apache.org/jira/browse/KAFKA-9263
> Project: Kafka
>  Issue Type: Bug
>  Components: clients
>Affects Versions: 2.4.0
>Reporter: John Roesler
>Priority: Major
>  Labels: flaky-test
>
> This test has failed for me on 
> https://builds.apache.org/job/kafka-pr-jdk11-scala2.12/9691/testReport/junit/kafka.api/AdminClientIntegrationTest/testAlterReplicaLogDirs/
> {noformat}
> Error Message
> org.scalatest.exceptions.TestFailedException: only 0 messages are produced 
> within timeout after replica movement. Producer future 
> Some(Failure(java.util.concurrent.TimeoutException: Timeout after waiting for 
> 1 ms.))
> Stacktrace
> org.scalatest.exceptions.TestFailedException: only 0 messages are produced 
> within timeout after replica movement. Producer future 
> Some(Failure(java.util.concurrent.TimeoutException: Timeout after waiting for 
> 1 ms.))
>   at 
> org.scalatest.Assertions.newAssertionFailedException(Assertions.scala:530)
>   at 
> org.scalatest.Assertions.newAssertionFailedException$(Assertions.scala:529)
>   at 
> org.scalatest.Assertions$.newAssertionFailedException(Assertions.scala:1389)
>   at org.scalatest.Assertions.fail(Assertions.scala:1091)
>   at org.scalatest.Assertions.fail$(Assertions.scala:1087)
>   at org.scalatest.Assertions$.fail(Assertions.scala:1389)
>   at kafka.utils.TestUtils$.waitUntilTrue(TestUtils.scala:842)
>   at 
> kafka.api.AdminClientIntegrationTest.testAlterReplicaLogDirs(AdminClientIntegrationTest.scala:459)
>   at 
> java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
>   at 
> java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
>   at 
> java.base/jdk.internal.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
>   at java.base/java.lang.reflect.Method.invoke(Method.java:566)
>   at 
> org.junit.runners.model.FrameworkMethod$1.runReflectiveCall(FrameworkMethod.java:59)
>   at 
> org.junit.internal.runners.model.ReflectiveCallable.run(ReflectiveCallable.java:12)
>   at 
> org.junit.runners.model.FrameworkMethod.invokeExplosively(FrameworkMethod.java:56)
>   at 
> org.junit.internal.runners.statements.InvokeMethod.evaluate(InvokeMethod.java:17)
>   at 
> org.junit.internal.runners.statements.RunBefores.evaluate(RunBefores.java:26)
>   at 
> org.junit.internal.runners.statements.RunAfters.evaluate(RunAfters.java:27)
>   at 
> org.junit.internal.runners.statements.FailOnTimeout$CallableStatement.call(FailOnTimeout.java:288)
>   at 
> org.junit.internal.runners.statements.FailOnTimeout$CallableStatement.call(FailOnTimeout.java:282)
>   at java.base/java.util.concurrent.FutureTask.run(FutureTask.java:264)
>   at java.base/java.lang.Thread.run(Thread.java:834)
> Standard Output
> [2019-12-03 04:54:16,111] ERROR [ReplicaFetcher replicaId=2, leaderId=1, 
> fetcherId=0] Error for partition unclean-test-topic-1-0 at offset 0 
> (kafka.server.ReplicaFetcherThread:76)
> org.apache.kafka.common.errors.UnknownTopicOrPartitionException: This server 
> does not host this topic-partition.
> [2019-12-03 04:54:21,711] ERROR [ReplicaFetcher replicaId=0, leaderId=1, 
> fetcherId=0] Error for partition topic-0 at offset 0 
> (kafka.server.ReplicaFetcherThread:76)
> org.apache.kafka.common.errors.UnknownTopicOrPartitionException: This server 
> does not host this topic-partition.
> [2019-12-03 04:54:21,712] ERROR [ReplicaFetcher replicaId=2, leaderId=1, 
> fetcherId=0] Error for partition topic-0 at offset 0 
> (kafka.server.ReplicaFetcherThread:76)
> org.apache.kafka.common.errors.UnknownTopicOrPartitionException: This server 
> does not host this topic-partition.
> [2019-12-03 04:54:27,092] ERROR [ReplicaFetcher replicaId=2, leaderId=1, 
> fetcherId=0] Error for partition unclean-test-topic-1-0 at 

[jira] [Assigned] (KAFKA-9263) The new hw is added to incorrect log when ReplicaAlterLogDirsThread is replacing log (fix PlaintextAdminIntegrationTest.testAlterReplicaLogDirs)

2020-12-01 Thread Chia-Ping Tsai (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-9263?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Chia-Ping Tsai reassigned KAFKA-9263:
-

Assignee: Chia-Ping Tsai

> The new hw is added to incorrect log when  ReplicaAlterLogDirsThread is 
> replacing log (fix  PlaintextAdminIntegrationTest.testAlterReplicaLogDirs)
> --
>
> Key: KAFKA-9263
> URL: https://issues.apache.org/jira/browse/KAFKA-9263
> Project: Kafka
>  Issue Type: Bug
>  Components: clients
>Affects Versions: 2.4.0
>Reporter: John Roesler
>Assignee: Chia-Ping Tsai
>Priority: Major
>  Labels: flaky-test
>
> This test has failed for me on 
> https://builds.apache.org/job/kafka-pr-jdk11-scala2.12/9691/testReport/junit/kafka.api/AdminClientIntegrationTest/testAlterReplicaLogDirs/
> {noformat}
> Error Message
> org.scalatest.exceptions.TestFailedException: only 0 messages are produced 
> within timeout after replica movement. Producer future 
> Some(Failure(java.util.concurrent.TimeoutException: Timeout after waiting for 
> 1 ms.))
> Stacktrace
> org.scalatest.exceptions.TestFailedException: only 0 messages are produced 
> within timeout after replica movement. Producer future 
> Some(Failure(java.util.concurrent.TimeoutException: Timeout after waiting for 
> 1 ms.))
>   at 
> org.scalatest.Assertions.newAssertionFailedException(Assertions.scala:530)
>   at 
> org.scalatest.Assertions.newAssertionFailedException$(Assertions.scala:529)
>   at 
> org.scalatest.Assertions$.newAssertionFailedException(Assertions.scala:1389)
>   at org.scalatest.Assertions.fail(Assertions.scala:1091)
>   at org.scalatest.Assertions.fail$(Assertions.scala:1087)
>   at org.scalatest.Assertions$.fail(Assertions.scala:1389)
>   at kafka.utils.TestUtils$.waitUntilTrue(TestUtils.scala:842)
>   at 
> kafka.api.AdminClientIntegrationTest.testAlterReplicaLogDirs(AdminClientIntegrationTest.scala:459)
>   at 
> java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
>   at 
> java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
>   at 
> java.base/jdk.internal.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
>   at java.base/java.lang.reflect.Method.invoke(Method.java:566)
>   at 
> org.junit.runners.model.FrameworkMethod$1.runReflectiveCall(FrameworkMethod.java:59)
>   at 
> org.junit.internal.runners.model.ReflectiveCallable.run(ReflectiveCallable.java:12)
>   at 
> org.junit.runners.model.FrameworkMethod.invokeExplosively(FrameworkMethod.java:56)
>   at 
> org.junit.internal.runners.statements.InvokeMethod.evaluate(InvokeMethod.java:17)
>   at 
> org.junit.internal.runners.statements.RunBefores.evaluate(RunBefores.java:26)
>   at 
> org.junit.internal.runners.statements.RunAfters.evaluate(RunAfters.java:27)
>   at 
> org.junit.internal.runners.statements.FailOnTimeout$CallableStatement.call(FailOnTimeout.java:288)
>   at 
> org.junit.internal.runners.statements.FailOnTimeout$CallableStatement.call(FailOnTimeout.java:282)
>   at java.base/java.util.concurrent.FutureTask.run(FutureTask.java:264)
>   at java.base/java.lang.Thread.run(Thread.java:834)
> Standard Output
> [2019-12-03 04:54:16,111] ERROR [ReplicaFetcher replicaId=2, leaderId=1, 
> fetcherId=0] Error for partition unclean-test-topic-1-0 at offset 0 
> (kafka.server.ReplicaFetcherThread:76)
> org.apache.kafka.common.errors.UnknownTopicOrPartitionException: This server 
> does not host this topic-partition.
> [2019-12-03 04:54:21,711] ERROR [ReplicaFetcher replicaId=0, leaderId=1, 
> fetcherId=0] Error for partition topic-0 at offset 0 
> (kafka.server.ReplicaFetcherThread:76)
> org.apache.kafka.common.errors.UnknownTopicOrPartitionException: This server 
> does not host this topic-partition.
> [2019-12-03 04:54:21,712] ERROR [ReplicaFetcher replicaId=2, leaderId=1, 
> fetcherId=0] Error for partition topic-0 at offset 0 
> (kafka.server.ReplicaFetcherThread:76)
> org.apache.kafka.common.errors.UnknownTopicOrPartitionException: This server 
> does not host this topic-partition.
> [2019-12-03 04:54:27,092] ERROR [ReplicaFetcher replicaId=2, leaderId=1, 
> fetcherId=0] Error for partition unclean-test-topic-1-0 at offset 0 
> (kafka.server.ReplicaFetcherThread:76)
> org.apache.kafka.common.errors.UnknownTopicOrPartitionException: This server 
> does not host this topic-partition.
> [2019-12-03 04:54:27,091] ERROR [ReplicaFetcher replicaId=0, leaderId=1, 
> fetcherId=0] Error for partition unclean-test-topic-1-1 at offset 0 
> (kafka.server.ReplicaFetcherThread:76)
> 

[jira] [Updated] (KAFKA-9263) The new hw is added to incorrect log when ReplicaAlterLogDirsThread is replacing log (fix PlaintextAdminIntegrationTest.testAlterReplicaLogDirs)

2020-12-01 Thread Chia-Ping Tsai (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-9263?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Chia-Ping Tsai updated KAFKA-9263:
--
Summary: The new hw is added to incorrect log when  
ReplicaAlterLogDirsThread is replacing log (fix  
PlaintextAdminIntegrationTest.testAlterReplicaLogDirs)  (was: Reocurrence: 
Transient failure in 
kafka.api.PlaintextAdminIntegrationTest.testLogStartOffsetCheckpoint and 
kafka.api.PlaintextAdminIntegrationTest.testAlterReplicaLogDirs)

> The new hw is added to incorrect log when  ReplicaAlterLogDirsThread is 
> replacing log (fix  PlaintextAdminIntegrationTest.testAlterReplicaLogDirs)
> --
>
> Key: KAFKA-9263
> URL: https://issues.apache.org/jira/browse/KAFKA-9263
> Project: Kafka
>  Issue Type: Bug
>  Components: clients
>Affects Versions: 2.4.0
>Reporter: John Roesler
>Priority: Major
>  Labels: flaky-test
>
> This test has failed for me on 
> https://builds.apache.org/job/kafka-pr-jdk11-scala2.12/9691/testReport/junit/kafka.api/AdminClientIntegrationTest/testAlterReplicaLogDirs/
> {noformat}
> Error Message
> org.scalatest.exceptions.TestFailedException: only 0 messages are produced 
> within timeout after replica movement. Producer future 
> Some(Failure(java.util.concurrent.TimeoutException: Timeout after waiting for 
> 1 ms.))
> Stacktrace
> org.scalatest.exceptions.TestFailedException: only 0 messages are produced 
> within timeout after replica movement. Producer future 
> Some(Failure(java.util.concurrent.TimeoutException: Timeout after waiting for 
> 1 ms.))
>   at 
> org.scalatest.Assertions.newAssertionFailedException(Assertions.scala:530)
>   at 
> org.scalatest.Assertions.newAssertionFailedException$(Assertions.scala:529)
>   at 
> org.scalatest.Assertions$.newAssertionFailedException(Assertions.scala:1389)
>   at org.scalatest.Assertions.fail(Assertions.scala:1091)
>   at org.scalatest.Assertions.fail$(Assertions.scala:1087)
>   at org.scalatest.Assertions$.fail(Assertions.scala:1389)
>   at kafka.utils.TestUtils$.waitUntilTrue(TestUtils.scala:842)
>   at 
> kafka.api.AdminClientIntegrationTest.testAlterReplicaLogDirs(AdminClientIntegrationTest.scala:459)
>   at 
> java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
>   at 
> java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
>   at 
> java.base/jdk.internal.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
>   at java.base/java.lang.reflect.Method.invoke(Method.java:566)
>   at 
> org.junit.runners.model.FrameworkMethod$1.runReflectiveCall(FrameworkMethod.java:59)
>   at 
> org.junit.internal.runners.model.ReflectiveCallable.run(ReflectiveCallable.java:12)
>   at 
> org.junit.runners.model.FrameworkMethod.invokeExplosively(FrameworkMethod.java:56)
>   at 
> org.junit.internal.runners.statements.InvokeMethod.evaluate(InvokeMethod.java:17)
>   at 
> org.junit.internal.runners.statements.RunBefores.evaluate(RunBefores.java:26)
>   at 
> org.junit.internal.runners.statements.RunAfters.evaluate(RunAfters.java:27)
>   at 
> org.junit.internal.runners.statements.FailOnTimeout$CallableStatement.call(FailOnTimeout.java:288)
>   at 
> org.junit.internal.runners.statements.FailOnTimeout$CallableStatement.call(FailOnTimeout.java:282)
>   at java.base/java.util.concurrent.FutureTask.run(FutureTask.java:264)
>   at java.base/java.lang.Thread.run(Thread.java:834)
> Standard Output
> [2019-12-03 04:54:16,111] ERROR [ReplicaFetcher replicaId=2, leaderId=1, 
> fetcherId=0] Error for partition unclean-test-topic-1-0 at offset 0 
> (kafka.server.ReplicaFetcherThread:76)
> org.apache.kafka.common.errors.UnknownTopicOrPartitionException: This server 
> does not host this topic-partition.
> [2019-12-03 04:54:21,711] ERROR [ReplicaFetcher replicaId=0, leaderId=1, 
> fetcherId=0] Error for partition topic-0 at offset 0 
> (kafka.server.ReplicaFetcherThread:76)
> org.apache.kafka.common.errors.UnknownTopicOrPartitionException: This server 
> does not host this topic-partition.
> [2019-12-03 04:54:21,712] ERROR [ReplicaFetcher replicaId=2, leaderId=1, 
> fetcherId=0] Error for partition topic-0 at offset 0 
> (kafka.server.ReplicaFetcherThread:76)
> org.apache.kafka.common.errors.UnknownTopicOrPartitionException: This server 
> does not host this topic-partition.
> [2019-12-03 04:54:27,092] ERROR [ReplicaFetcher replicaId=2, leaderId=1, 
> fetcherId=0] Error for partition unclean-test-topic-1-0 at offset 0 
> (kafka.server.ReplicaFetcherThread:76)
> org.apache.kafka.common.errors.UnknownTopicOrPartitionException: This server 
> 

[jira] [Created] (KAFKA-10794) Replica leader election is too slow in the case of too many partitions

2020-12-01 Thread limeng (Jira)
limeng created KAFKA-10794:
--

 Summary: Replica leader election is too slow in the case of too 
many partitions
 Key: KAFKA-10794
 URL: https://issues.apache.org/jira/browse/KAFKA-10794
 Project: Kafka
  Issue Type: Improvement
  Components: core
Affects Versions: 2.5.1, 2.6.0
Reporter: limeng






--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[GitHub] [kafka] highluck commented on a change in pull request #8965: KAFKA-8147: Add changelog topic configuration to KTable suppress

2020-12-01 Thread GitBox


highluck commented on a change in pull request #8965:
URL: https://github.com/apache/kafka/pull/8965#discussion_r533863847



##
File path: docs/streams/developer-guide/dsl-api.html
##
@@ -3393,6 +3393,10 @@ KTable-KTable 
Foreign-Key
  but this simple example creates a buffer 
with no
  upper bound.

+withLoggingDisabled()
+
+  This configures the suppression operator to disable 
logging for changelog entries.
+

Review comment:
   update code thanks!





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] ableegoldman opened a new pull request #9671: KAFKA-10793: move handling of FindCoordinatorFuture to fix race condition

2020-12-01 Thread GitBox


ableegoldman opened a new pull request #9671:
URL: https://github.com/apache/kafka/pull/9671


   A race condition between the consumer and hb thread can lead to a failed but 
non-null `findCoordinatorFuture`, causing the AbstractCoordinator to wait 
endlessly on the request which it thinks is still in flight. We should move the 
handling of this future out of the listener callbacks and into the 
`ensureCoordinatorReady()` method where we can check the exception and clear 
the future all in one place.  
   
   See ticket for full analysis.
   
   Also starts logging a warning if the consumer is unable to connect to the 
coordinator for longer than the max poll interval.
   



This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] chia7712 commented on pull request #9666: MINOR: Remove broken `.travis.yml` with system test hooks

2020-12-01 Thread GitBox


chia7712 commented on pull request #9666:
URL: https://github.com/apache/kafka/pull/9666#issuecomment-736948097


   > Nice that you got it working! I think there is value running a subset of 
system tests automatically. It might even encourage us to write more system 
tests if we can get the feedback more easily. Not having access to logs is 
annoying, but might not be a dealbreaker. It looks like we need a bit of 
initial work to tune the build though. I'm happy to close this and merge #9652. 
Then perhaps we can file a few jiras for any follow-up actions. What do you 
think?
   
   +1 



This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[jira] [Comment Edited] (KAFKA-10793) Race condition in FindCoordinatorFuture permanently severs connection to group coordinator

2020-12-01 Thread A. Sophie Blee-Goldman (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-10793?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17241976#comment-17241976
 ] 

A. Sophie Blee-Goldman edited comment on KAFKA-10793 at 12/2/20, 2:30 AM:
--

At this point we can only guess, but all signs point to a race condition 
between the main consumer thread and the heartbeat thread. One possibility is 
that when the future failed it just didn't trigger the `onFailure` callback, 
but [~guozhang] & I have both looked through the source code and don't see any 
way for this to occur. Another possibility is that the `onFailure` callback was 
triggered, but it was invoked too soon. If the future was completed before we 
ever assigned it to the _findCoordinatorFuture_ field, then we would never 
actually clear the latest future (we would just set an already-null field to 
null again).

Is this possible? Here's how the AbstractCoordinator builds the request and 
assigns the future:
{code:java}
protected synchronized RequestFuture lookupCoordinator() {
...
findCoordinatorFuture = sendFindCoordinatorRequest(node);
}
{code}
{code:java}
private RequestFuture sendFindCoordinatorRequest(Node node) {
...
return client.send(node, requestBuilder)
   .compose(new FindCoordinatorResponseHandler());{code}
{code:java}
}{code}
Inside #compose we call #addListener, which contains this snippet:
{code:java}
if (failed()) 
fireFailure(); 
{code}
If the request has already failed by the time we reach this, then we'll trigger 
the `onFailure` callback before #compose ever returns – ie before we've 
assigned the future to _findCoordinatorFuture_.

The obvious question now is whether it's possible for the request to be failed 
in another thread while one thread is in the middle of the synchronized 
lookupCoordinator(). The request can be failed by the ConsumerNetworkClient 
when polled, during checkDisconnects(). The heartbeat thread actually 
synchronizes the entire run loop, so it doesn't seem possible for the hb thread 
to fail this request in the background of the main thread during a 
lookupCoordinator().

But the inverse is not true: it's possible for the main consumer thread to fail 
the request while the hb thread is inside of lookupCoordinator(). The 
AbstractCoordinator will poll the network client inside of joinGroupIfNeeded(), 
which in not itself synchronized and may be invoked without any locking through 
a Consumer#poll. 


was (Author: ableegoldman):
At this point we can only guess, but all signs point to a race condition 
between the main consumer thread and the heartbeat thread. One possibility is 
that when the future failed it just didn't trigger the `onFailure` callback, 
but [~guozhang] & I have both looked through the source code and don't see any 
way for this to occur. Another possibility is that the `onFailure` callback was 
triggered, but it was invoked too soon. If the future was completed before we 
ever assigned it to the _findCoordinatorFuture_ field, then we would never 
actually clear the latest future (we would just set an already-null field to 
null again).

Is this possible? Here's how the AbstractCoordinator builds the request and 
assigns the future:
{code:java}
protected synchronized RequestFuture lookupCoordinator() {
...
findCoordinatorFuture = sendFindCoordinatorRequest(node);
}
{code}
{code:java}
private RequestFuture sendFindCoordinatorRequest(Node node) {
...
return client.send(node, requestBuilder)
   .compose(new FindCoordinatorResponseHandler());{code}
{code:java}
}{code}
Inside #compose we call #addListener, which contains this snippet:
{code:java}
if (failed()) 
fireFailure(); 
{code}
If the request has already failed by the time we reach this, then we'll trigger 
the `onFailure` callback before #compose ever returns – ie before we've 
assigned the future to _findCoordinatorFuture_.

The obvious question now is whether it's possible for the request to be failed 
in another thread while one thread is in the middle of the synchronized 
lookupCoordinator(). The request can be failed by the ConsumerNetworkClient 
when polled, during checkDisconnects(). The heartbeat thread actually 
synchronizes the entire run loop, so it doesn't seem possible for the hb thread 
to fail this request in the background of the main thread during a 
lookupCoordinator().

But the inverse is not true: it's possible for the main consumer thread to fail 
the request while the hb thread is inside of lookupCoordinator(). The 
AbstractCoordinator will poll the network client inside of joinGroupIfNeeded(), 
which in not itself synchronized and may be invoked without any locking through 
a Consumer#poll. 

(note that clearFindCoordinatorFuture() is also synchronized, but it's possible 
we trigger this method after leaving the synchronized lookupCoordinator method 
but before assigning the result to the 

[jira] [Comment Edited] (KAFKA-10793) Race condition in FindCoordinatorFuture permanently severs connection to group coordinator

2020-12-01 Thread A. Sophie Blee-Goldman (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-10793?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17241976#comment-17241976
 ] 

A. Sophie Blee-Goldman edited comment on KAFKA-10793 at 12/2/20, 2:26 AM:
--

At this point we can only guess, but all signs point to a race condition 
between the main consumer thread and the heartbeat thread. One possibility is 
that when the future failed it just didn't trigger the `onFailure` callback, 
but [~guozhang] & I have both looked through the source code and don't see any 
way for this to occur. Another possibility is that the `onFailure` callback was 
triggered, but it was invoked too soon. If the future was completed before we 
ever assigned it to the _findCoordinatorFuture_ field, then we would never 
actually clear the latest future (we would just set an already-null field to 
null again).

Is this possible? Here's how the AbstractCoordinator builds the request and 
assigns the future:
{code:java}
protected synchronized RequestFuture lookupCoordinator() {
...
findCoordinatorFuture = sendFindCoordinatorRequest(node);
}
{code}
{code:java}
private RequestFuture sendFindCoordinatorRequest(Node node) {
...
return client.send(node, requestBuilder)
   .compose(new FindCoordinatorResponseHandler());{code}
{code:java}
}{code}
Inside #compose we call #addListener, which contains this snippet:
{code:java}
if (failed()) 
fireFailure(); 
{code}
If the request has already failed by the time we reach this, then we'll trigger 
the `onFailure` callback before #compose ever returns – ie before we've 
assigned the future to _findCoordinatorFuture_.

The obvious question now is whether it's possible for the request to be failed 
in another thread while one thread is in the middle of the synchronized 
lookupCoordinator(). The request can be failed by the ConsumerNetworkClient 
when polled, during checkDisconnects(). The heartbeat thread actually 
synchronizes the entire run loop, so it doesn't seem possible for the hb thread 
to fail this request in the background of the main thread during a 
lookupCoordinator().

But the inverse is not true: it's possible for the main consumer thread to fail 
the request while the hb thread is inside of lookupCoordinator(). The 
AbstractCoordinator will poll the network client inside of joinGroupIfNeeded(), 
which in not itself synchronized and may be invoked without any locking through 
a Consumer#poll. 

(note that clearFindCoordinatorFuture() is also synchronized, but it's possible 
we trigger this method after leaving the synchronized lookupCoordinator method 
but before assigning the result to the _findCoordinatorFuture_


was (Author: ableegoldman):
At this point we can only guess, but all signs point to a race condition 
between the main consumer thread and the heartbeat thread. One possibility is 
that when the future failed it just didn't trigger the `onFailure` callback, 
but [~guozhang] & I have both looked through the source code and don't see any 
way for this to occur. Another possibility is that the `onFailure` callback was 
triggered, but it was invoked too soon. If the future was completed before we 
ever assigned it to the _findCoordinatorFuture_ field, then we would never 
actually clear the latest future (we would just set an already-null field to 
null again).

Is this possible? Here's how the AbstractCoordinator builds the request and 
assigns the future:
{code:java}
protected synchronized RequestFuture lookupCoordinator() {
...
findCoordinatorFuture = sendFindCoordinatorRequest(node);
}
{code}
{code:java}
private RequestFuture sendFindCoordinatorRequest(Node node) {
...
return client.send(node, requestBuilder).compose(new 
FindCoordinatorResponseHandler());
}{code}
Inside #compose we call #addListener, which contains this snippet:
{code:java}
if (failed()) 
fireFailure(); 
{code}
If the request has already failed by the time we reach this, then we'll trigger 
the `onFailure` callback before #compose ever returns – ie before we've 
assigned the future to _findCoordinatorFuture_.

The obvious question now is whether it's possible for the request to be failed 
in another thread while one thread is in the middle of the synchronized 
lookupCoordinator(). The request can be failed by the ConsumerNetworkClient 
when polled, during checkDisconnects(). The heartbeat thread actually 
synchronizes the entire run loop, so it doesn't seem possible for the hb thread 
to fail this request in the background of the main thread during a 
lookupCoordinator().

But the inverse is not true: it's possible for the main consumer thread to fail 
the request while the hb thread is inside of lookupCoordinator(). The 
AbstractCoordinator will poll the network client inside of joinGroupIfNeeded(), 
which in not itself synchronized and may be invoked without any locking through 
a Consumer#poll. 

> Race 

[jira] [Commented] (KAFKA-10688) Handle accidental truncation of repartition topics as exceptional failure

2020-12-01 Thread Guozhang Wang (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-10688?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17241992#comment-17241992
 ] 

Guozhang Wang commented on KAFKA-10688:
---

Had some more discussions with [~cadonna] about different scenarios, and I 
think we can potentially enlarge the scope of this ticket to include all the 
following cases:

1) When starting the application for the first time, the repartition is newly 
created. In this case we should set the starting offset on the repartition 
topics according to the global reset policy.

2) When restarting the application, where the repartition topic already exist 
and may have some data. In this case we would try to read the committed offset 
and start from there.
2.a) If the committed offset is already out of the range --- i.e. a 
truncation happens before restarting the application --- we should treat it as 
a fatal error.
2.b) if there is no committed offset, indicating that either the 
application was not gracefully shutdown before (since otherwise the committed 
offset should be found), or the committed offset is somehow lost. We should 
treat it as a fatal error.

3) During normal processing, suddenly the consumer found itself out of the 
range --- i.e. a truncation happens at the same time --- we should treat it as 
a fatal error.

The challenge today is that we cannot easily distinguish case 1) from case 2) 
and 3), since the consumer would throw the same invalid offset exception and 
Streams would handle it universally. Instead of relying on consumer to improve 
(KAFKA-3370), we can do it at the Streams layer only, as the following:

* Whenever we create the repartition topic, we commit an offset as 0 regardless 
to the global offset reset policy, since in either earliest or latest it should 
just be 0.
* Whenever we get an invalid offset exception (note we still keep the 
consumer's configuration as `none`), we check if it is from the repartition 
topic, if yes we always treat it as fatal error; if not we use the reset policy 
on the corresponding source topic accordingly.

> Handle accidental truncation of repartition topics as exceptional failure
> -
>
> Key: KAFKA-10688
> URL: https://issues.apache.org/jira/browse/KAFKA-10688
> Project: Kafka
>  Issue Type: Improvement
>  Components: streams
>Reporter: Guozhang Wang
>Assignee: Guozhang Wang
>Priority: Major
>
> Today we always handle InvalidOffsetException from the main consumer by the 
> resetting policy assuming they are for source topics. But repartition topics 
> are also source topics and should never be truncated and hence cause 
> InvalidOffsetException.
> We should differentiate these repartition topics from external source topics 
> and treat the InvalidOffsetException from repartition topics as fatal and 
> close the whole application.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Commented] (KAFKA-10793) Race condition in FindCoordinatorFuture permanently severs connection to group coordinator

2020-12-01 Thread A. Sophie Blee-Goldman (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-10793?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17241987#comment-17241987
 ] 

A. Sophie Blee-Goldman commented on KAFKA-10793:


So that's our best guess. We should discuss the solution on the PR, but I'll 
lay out the two possibilities I see here in case anyone has any better ideas.

1) synchronize joinGroupIfNeeded()
2) clear the _findCoordinatorFuture_ when handling the result, rather than in 
the listener callbacks. For example in the main loop of ensureCoordinatorReady()

Personally I think option 2 is better, since it makes a lot more sense to me to 
begin with. By clearing the future in the listener callbacks, we might clear it 
before we ever even get to check on the result, eg the exception if failed. We 
actually seem to already anticipate this particular problem and recently 
implemented a workaround by adding an extra listener which saves the exception 
to a class _findCoordinatorException_ field. If we just wait to clear the 
future then presumably we could remove this workaround as well, and just save 
the exception when we check "if (future.failed())" inside of 
lookupCoordinator().

All that said, I'm not intimately familiar with the technical details of the 
ConsumerNetworkClient and how it handles its RequestFutures, so it's possible 
I'm missing something important.

> Race condition in FindCoordinatorFuture permanently severs connection to 
> group coordinator
> --
>
> Key: KAFKA-10793
> URL: https://issues.apache.org/jira/browse/KAFKA-10793
> Project: Kafka
>  Issue Type: Bug
>  Components: consumer, streams
>Affects Versions: 2.5.0
>Reporter: A. Sophie Blee-Goldman
>Priority: Critical
>
> Pretty much as soon as we started actively monitoring the 
> _last-rebalance-seconds-ago_ metric in our Kafka Streams test environment, we 
> started seeing something weird. Every so often one of the StreamThreads (ie a 
> single Consumer instance) would appear to permanently fall out of the group, 
> as evidenced by a monotonically increasing _last-rebalance-seconds-ago._ We 
> inject artificial network failures every few hours at most, so the group 
> rebalances quite often. But the one consumer never rejoins, with no other 
> symptoms (besides a slight drop in throughput since the remaining threads had 
> to take over this member's work). We're confident that the problem exists in 
> the client layer, since the logs confirmed that the unhealthy consumer was 
> still calling poll. It was also calling Consumer#committed in its main poll 
> loop, which was consistently failing with a TimeoutException.
> When I attached a remote debugger to an instance experiencing this issue, the 
> network client's connection to the group coordinator (the one that uses 
> MAX_VALUE - node.id as the coordinator id) was in the DISCONNECTED state. But 
> for some reason it never tried to re-establish this connection, although it 
> did successfully connect to that same broker through the "normal" connection 
> (ie the one that juts uses node.id).
> The tl;dr is that the AbstractCoordinator's FindCoordinatorRequest has failed 
> (presumably due to a disconnect), but the _findCoordinatorFuture_ is non-null 
> so a new request is never sent. This shouldn't be possible since the 
> FindCoordinatorResponseHandler is supposed to clear the 
> _findCoordinatorFuture_ when the future is completed. But somehow that didn't 
> happen, so the consumer continues to assume there's still a FindCoordinator 
> request in flight and never even notices that it's dropped out of the group.
> These are the only confirmed findings so far, however we have some guesses 
> which I'll leave in the comments. Note that we only noticed this due to the 
> newly added _last-rebalance-seconds-ago_ __metric, and there's no reason to 
> believe this bug hasn't been flying under the radar since the Consumer's 
> inception



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Resolved] (KAFKA-10780) Rewrite ControllerZNode struct with auto-generated protocol

2020-12-01 Thread dengziming (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-10780?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

dengziming resolved KAFKA-10780.

Resolution: Won't Do

KIP-500 will replace all this code.

>  Rewrite ControllerZNode struct with auto-generated protocol
> 
>
> Key: KAFKA-10780
> URL: https://issues.apache.org/jira/browse/KAFKA-10780
> Project: Kafka
>  Issue Type: Sub-task
>  Components: protocol
>Reporter: dengziming
>Assignee: dengziming
>Priority: Major
>
> User auto-generated protocol to rewrite zk controller node



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Comment Edited] (KAFKA-10793) Race condition in FindCoordinatorFuture permanently severs connection to group coordinator

2020-12-01 Thread A. Sophie Blee-Goldman (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-10793?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17241976#comment-17241976
 ] 

A. Sophie Blee-Goldman edited comment on KAFKA-10793 at 12/2/20, 1:37 AM:
--

At this point we can only guess, but all signs point to a race condition 
between the main consumer thread and the heartbeat thread. One possibility is 
that when the future failed it just didn't trigger the `onFailure` callback, 
but [~guozhang] & I have both looked through the source code and don't see any 
way for this to occur. Another possibility is that the `onFailure` callback was 
triggered, but it was invoked too soon. If the future was completed before we 
ever assigned it to the _findCoordinatorFuture_ field, then we would never 
actually clear the latest future (we would just set an already-null field to 
null again).

Is this possible? Here's how the AbstractCoordinator builds the request and 
assigns the future:
{code:java}
protected synchronized RequestFuture lookupCoordinator() {
...
findCoordinatorFuture = sendFindCoordinatorRequest(node);
}
{code}
{code:java}
private RequestFuture sendFindCoordinatorRequest(Node node) {
...
return client.send(node, requestBuilder).compose(new 
FindCoordinatorResponseHandler());
}{code}
Inside #compose we call #addListener, which contains this snippet:
{code:java}
if (failed()) 
fireFailure(); 
{code}
If the request has already failed by the time we reach this, then we'll trigger 
the `onFailure` callback before #compose ever returns – ie before we've 
assigned the future to _findCoordinatorFuture_.

The obvious question now is whether it's possible for the request to be failed 
in another thread while one thread is in the middle of the synchronized 
lookupCoordinator(). The request can be failed by the ConsumerNetworkClient 
when polled, during checkDisconnects(). The heartbeat thread actually 
synchronizes the entire run loop, so it doesn't seem possible for the hb thread 
to fail this request in the background of the main thread during a 
lookupCoordinator().

But the inverse is not true: it's possible for the main consumer thread to fail 
the request while the hb thread is inside of lookupCoordinator(). The 
AbstractCoordinator will poll the network client inside of joinGroupIfNeeded(), 
which in not itself synchronized and may be invoked without any locking through 
a Consumer#poll. 


was (Author: ableegoldman):
At this point we can only guess, but all signs point to a race condition 
between the main consumer thread and the heartbeat thread. One possibility is 
that when the future failed it just didn't trigger the `onFailure` callback, 
but [~guozhang] & I have both looked through the source code and don't see any 
way for this to occur. Another possibility is that the `onFailure` callback was 
triggered, but it was invoked too soon. If the future was completed before we 
ever assigned it to the findCoordinatorFuture field, then we would never 
actually clear the latest future (we would just set an already-null field to 
null again).

Is this possible? Here's how the AbstractCoordinator builds the request and 
assigns the future:
{code:java}
protected synchronized RequestFuture lookupCoordinator() {
...
findCoordinatorFuture = sendFindCoordinatorRequest(node);
}
{code}
{code:java}
private RequestFuture sendFindCoordinatorRequest(Node node) {
...
return client.send(node, requestBuilder).compose(new 
FindCoordinatorResponseHandler());
}{code}
Inside #compose we call #addListener, which contains this snippet:
{code:java}
if (failed()) 
fireFailure(); 
{code}
If the request has already failed by the time we reach this, then we'll trigger 
the `onFailure` callback before #compose ever returns – ie before we've 
assigned the future to _findCoordinatorFuture_.

The obvious question now is whether it's possible for the request to be failed 
in another thread while one thread is in the middle of the synchronized 
lookupCoordinator(). The request can be failed by the ConsumerNetworkClient 
when polled, during checkDisconnects(). The heartbeat thread actually 
synchronizes the entire run loop, so it doesn't seem possible for the hb thread 
to fail this request in the background of the main thread during a 
lookupCoordinator().

But the inverse is not true: it's possible for the main consumer thread to fail 
the request while the hb thread is inside of lookupCoordinator(). The 
AbstractCoordinator will poll the network client inside of joinGroupIfNeeded(), 
which in not itself synchronized and may be invoked without any locking through 
a Consumer#poll. 

> Race condition in FindCoordinatorFuture permanently severs connection to 
> group coordinator
> --
>
> Key: KAFKA-10793
> URL: 

[jira] [Comment Edited] (KAFKA-10793) Race condition in FindCoordinatorFuture permanently severs connection to group coordinator

2020-12-01 Thread A. Sophie Blee-Goldman (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-10793?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17241976#comment-17241976
 ] 

A. Sophie Blee-Goldman edited comment on KAFKA-10793 at 12/2/20, 1:37 AM:
--

At this point we can only guess, but all signs point to a race condition 
between the main consumer thread and the heartbeat thread. One possibility is 
that when the future failed it just didn't trigger the `onFailure` callback, 
but [~guozhang] & I have both looked through the source code and don't see any 
way for this to occur. Another possibility is that the `onFailure` callback was 
triggered, but it was invoked too soon. If the future was completed before we 
ever assigned it to the findCoordinatorFuture field, then we would never 
actually clear the latest future (we would just set an already-null field to 
null again).

Is this possible? Here's how the AbstractCoordinator builds the request and 
assigns the future:
{code:java}
protected synchronized RequestFuture lookupCoordinator() {
...
findCoordinatorFuture = sendFindCoordinatorRequest(node);
}
{code}
{code:java}
private RequestFuture sendFindCoordinatorRequest(Node node) {
...
return client.send(node, requestBuilder).compose(new 
FindCoordinatorResponseHandler());
}{code}
Inside #compose we call #addListener, which contains this snippet:
{code:java}
if (failed()) 
fireFailure(); 
{code}
If the request has already failed by the time we reach this, then we'll trigger 
the `onFailure` callback before #compose ever returns – ie before we've 
assigned the future to _findCoordinatorFuture_.

The obvious question now is whether it's possible for the request to be failed 
in another thread while one thread is in the middle of the synchronized 
lookupCoordinator(). The request can be failed by the ConsumerNetworkClient 
when polled, during checkDisconnects(). The heartbeat thread actually 
synchronizes the entire run loop, so it doesn't seem possible for the hb thread 
to fail this request in the background of the main thread during a 
lookupCoordinator().

But the inverse is not true: it's possible for the main consumer thread to fail 
the request while the hb thread is inside of lookupCoordinator(). The 
AbstractCoordinator will poll the network client inside of joinGroupIfNeeded(), 
which in not itself synchronized and may be invoked without any locking through 
a Consumer#poll. 


was (Author: ableegoldman):
At this point we can only guess, but all signs point to a race condition 
between the main consumer thread and the heartbeat thread. One possibility is 
that when the future failed it just didn't trigger the `onFailure` callback, 
but [~guozhang] & I have both looked through the source code and don't see any 
way for this to occur. Another possibility is that the `onFailure` callback was 
triggered, but it was invoked too soon_. If the future was completed before we 
ever assigned it to the findCoordinatorFuture field, then we would never 
actually clear the latest future (we would just set an already-null field to 
null again)._

Is this possible? Here's how the AbstractCoordinator builds the request and 
assigns the future:
{code:java}
protected synchronized RequestFuture lookupCoordinator() {
...
findCoordinatorFuture = sendFindCoordinatorRequest(node);
}
{code}
{code:java}
private RequestFuture sendFindCoordinatorRequest(Node node) {
...
return client.send(node, requestBuilder).compose(new 
FindCoordinatorResponseHandler());
}{code}
Inside #compose we call #addListener, which contains this snippet:
{code:java}
if (failed()) 
fireFailure(); 
{code}
If the request has already failed by the time we reach this, then we'll trigger 
the `onFailure` callback before #compose ever returns – ie before we've 
assigned the future to _findCoordinatorFuture_.

The obvious question now is whether it's possible for the request to be failed 
in another thread while one thread is in the middle of the synchronized 
lookupCoordinator(). The request can be failed by the ConsumerNetworkClient 
when polled, during checkDisconnects(). The heartbeat thread actually 
synchronizes the entire run loop, so it doesn't seem possible for the hb thread 
to fail this request in the background of the main thread during a 
lookupCoordinator().

But the inverse is not true: it's possible for the main consumer thread to fail 
the request while the hb thread is inside of lookupCoordinator(). The 
AbstractCoordinator will poll the network client inside of joinGroupIfNeeded(), 
which in not itself synchronized and may be invoked without any locking through 
a Consumer#poll. 

> Race condition in FindCoordinatorFuture permanently severs connection to 
> group coordinator
> --
>
> Key: KAFKA-10793
> URL: 

[jira] [Comment Edited] (KAFKA-10793) Race condition in FindCoordinatorFuture permanently severs connection to group coordinator

2020-12-01 Thread A. Sophie Blee-Goldman (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-10793?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17241976#comment-17241976
 ] 

A. Sophie Blee-Goldman edited comment on KAFKA-10793 at 12/2/20, 1:37 AM:
--

At this point we can only guess, but all signs point to a race condition 
between the main consumer thread and the heartbeat thread. One possibility is 
that when the future failed it just didn't trigger the `onFailure` callback, 
but [~guozhang] & I have both looked through the source code and don't see any 
way for this to occur. Another possibility is that the `onFailure` callback was 
triggered, but it was invoked too soon_. If the future was completed before we 
ever assigned it to the findCoordinatorFuture field, then we would never 
actually clear the latest future (we would just set an already-null field to 
null again)._

Is this possible? Here's how the AbstractCoordinator builds the request and 
assigns the future:
{code:java}
protected synchronized RequestFuture lookupCoordinator() {
...
findCoordinatorFuture = sendFindCoordinatorRequest(node);
}
{code}
{code:java}
private RequestFuture sendFindCoordinatorRequest(Node node) {
...
return client.send(node, requestBuilder).compose(new 
FindCoordinatorResponseHandler());
}{code}
Inside #compose we call #addListener, which contains this snippet:
{code:java}
if (failed()) 
fireFailure(); 
{code}
If the request has already failed by the time we reach this, then we'll trigger 
the `onFailure` callback before #compose ever returns – ie before we've 
assigned the future to _findCoordinatorFuture_.

The obvious question now is whether it's possible for the request to be failed 
in another thread while one thread is in the middle of the synchronized 
lookupCoordinator(). The request can be failed by the ConsumerNetworkClient 
when polled, during checkDisconnects(). The heartbeat thread actually 
synchronizes the entire run loop, so it doesn't seem possible for the hb thread 
to fail this request in the background of the main thread during a 
lookupCoordinator().

But the inverse is not true: it's possible for the main consumer thread to fail 
the request while the hb thread is inside of lookupCoordinator(). The 
AbstractCoordinator will poll the network client inside of joinGroupIfNeeded(), 
which in not itself synchronized and may be invoked without any locking through 
a Consumer#poll. 


was (Author: ableegoldman):
At this point we can only guess, but all signs point to a race condition 
between the main consumer thread and the heartbeat thread. One possibility is 
that when the future failed it just didn't trigger the `onFailure` callback, 
but [~guozhang] & I have both looked through the source code and don't see any 
way for this to occur. Another possibility is that the `onFailure` callback was 
triggered, but it was invoked too soon_. If the future was completed before we 
ever assigned it to the findCoordinatorFuture field, then we would never 
actually clear the latest future (we would just set an already-null field to 
null again)._

Is this possible? Here's how the AbstractCoordinator builds the request and 
assigns the future:
{code:java}
protected synchronized RequestFuture lookupCoordinator() {
...
findCoordinatorFuture = sendFindCoordinatorRequest(node);
}
{code}
 
{code:java}
private RequestFuture sendFindCoordinatorRequest(Node node) {
...
return client.send(node, requestBuilder).compose(new 
FindCoordinatorResponseHandler());
}{code}
Inside #compose we call #addListener, which contains this snippet:

 
{code:java}
if (failed()) 
fireFailure(); 
{code}
If the request has already failed by the time we reach this, then we'll trigger 
the `onFailure` callback before #compose ever returns – ie before we've 
assigned the future to _findCoordinatorFuture_.

The obvious question now is whether it's possible for the request to be failed 
in another thread while one thread is in the middle of the synchronized 
lookupCoordinator(). The request can be failed by the ConsumerNetworkClient 
when polled, during checkDisconnects(). The heartbeat thread actually 
synchronizes the entire run loop, so it doesn't seem possible for the hb thread 
to fail this request in the background of the main thread during a 
lookupCoordinator().

But the inverse is not true: it's possible for the main consumer thread to fail 
the request while the hb thread is inside of lookupCoordinator(). The 
AbstractCoordinator will poll the network client inside of joinGroupIfNeeded(), 
which in not itself synchronized and may be invoked without any locking through 
a Consumer#poll.

 

 

 

> Race condition in FindCoordinatorFuture permanently severs connection to 
> group coordinator
> --
>
> Key: KAFKA-10793
> URL: 

[jira] [Commented] (KAFKA-10793) Race condition in FindCoordinatorFuture permanently severs connection to group coordinator

2020-12-01 Thread A. Sophie Blee-Goldman (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-10793?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17241976#comment-17241976
 ] 

A. Sophie Blee-Goldman commented on KAFKA-10793:


At this point we can only guess, but all signs point to a race condition 
between the main consumer thread and the heartbeat thread. One possibility is 
that when the future failed it just didn't trigger the `onFailure` callback, 
but [~guozhang] & I have both looked through the source code and don't see any 
way for this to occur. Another possibility is that the `onFailure` callback was 
triggered, but it was invoked too soon_. If the future was completed before we 
ever assigned it to the findCoordinatorFuture field, then we would never 
actually clear the latest future (we would just set an already-null field to 
null again)._

Is this possible? Here's how the AbstractCoordinator builds the request and 
assigns the future:
{code:java}
protected synchronized RequestFuture lookupCoordinator() {
...
findCoordinatorFuture = sendFindCoordinatorRequest(node);
}
{code}
 
{code:java}
private RequestFuture sendFindCoordinatorRequest(Node node) {
...
return client.send(node, requestBuilder).compose(new 
FindCoordinatorResponseHandler());
}{code}
Inside #compose we call #addListener, which contains this snippet:

 
{code:java}
if (failed()) 
fireFailure(); 
{code}
If the request has already failed by the time we reach this, then we'll trigger 
the `onFailure` callback before #compose ever returns – ie before we've 
assigned the future to _findCoordinatorFuture_.

The obvious question now is whether it's possible for the request to be failed 
in another thread while one thread is in the middle of the synchronized 
lookupCoordinator(). The request can be failed by the ConsumerNetworkClient 
when polled, during checkDisconnects(). The heartbeat thread actually 
synchronizes the entire run loop, so it doesn't seem possible for the hb thread 
to fail this request in the background of the main thread during a 
lookupCoordinator().

But the inverse is not true: it's possible for the main consumer thread to fail 
the request while the hb thread is inside of lookupCoordinator(). The 
AbstractCoordinator will poll the network client inside of joinGroupIfNeeded(), 
which in not itself synchronized and may be invoked without any locking through 
a Consumer#poll.

 

 

 

> Race condition in FindCoordinatorFuture permanently severs connection to 
> group coordinator
> --
>
> Key: KAFKA-10793
> URL: https://issues.apache.org/jira/browse/KAFKA-10793
> Project: Kafka
>  Issue Type: Bug
>  Components: consumer, streams
>Affects Versions: 2.5.0
>Reporter: A. Sophie Blee-Goldman
>Priority: Critical
>
> Pretty much as soon as we started actively monitoring the 
> _last-rebalance-seconds-ago_ metric in our Kafka Streams test environment, we 
> started seeing something weird. Every so often one of the StreamThreads (ie a 
> single Consumer instance) would appear to permanently fall out of the group, 
> as evidenced by a monotonically increasing _last-rebalance-seconds-ago._ We 
> inject artificial network failures every few hours at most, so the group 
> rebalances quite often. But the one consumer never rejoins, with no other 
> symptoms (besides a slight drop in throughput since the remaining threads had 
> to take over this member's work). We're confident that the problem exists in 
> the client layer, since the logs confirmed that the unhealthy consumer was 
> still calling poll. It was also calling Consumer#committed in its main poll 
> loop, which was consistently failing with a TimeoutException.
> When I attached a remote debugger to an instance experiencing this issue, the 
> network client's connection to the group coordinator (the one that uses 
> MAX_VALUE - node.id as the coordinator id) was in the DISCONNECTED state. But 
> for some reason it never tried to re-establish this connection, although it 
> did successfully connect to that same broker through the "normal" connection 
> (ie the one that juts uses node.id).
> The tl;dr is that the AbstractCoordinator's FindCoordinatorRequest has failed 
> (presumably due to a disconnect), but the _findCoordinatorFuture_ is non-null 
> so a new request is never sent. This shouldn't be possible since the 
> FindCoordinatorResponseHandler is supposed to clear the 
> _findCoordinatorFuture_ when the future is completed. But somehow that didn't 
> happen, so the consumer continues to assume there's still a FindCoordinator 
> request in flight and never even notices that it's dropped out of the group.
> These are the only confirmed findings so far, however we have some guesses 
> which I'll leave in the comments. Note that we only noticed 

[GitHub] [kafka] jsancio commented on a change in pull request #9512: KAFKA-10394: generate snapshot

2020-12-01 Thread GitBox


jsancio commented on a change in pull request #9512:
URL: https://github.com/apache/kafka/pull/9512#discussion_r533835857



##
File path: raft/src/main/java/org/apache/kafka/raft/ReplicatedLog.java
##
@@ -149,6 +152,29 @@ default OptionalLong truncateToEndOffset(OffsetAndEpoch 
endOffset) {
 return OptionalLong.of(truncationOffset);
 }
 
+/**
+ * Create a writable snapshot for the given snapshot id.
+ *
+ * See {@link RawSnapshotWriter} for details on how to use this object.
+ *
+ * @param snapshotId the end offset and epoch that identifies the snapshot
+ * @return a writable snapshot
+ */
+RawSnapshotWriter createSnapshot(OffsetAndEpoch snapshotId) throws 
IOException;

Review comment:
   Yeah. I think we will need that when we implement deleting snapshot. Do 
you mind if I add this later?
   
   Also, I think we are going to need a `readLatestSnapshot()` when the state 
machine (controller or broker) needs to load the latest valid snapshot. I was 
planning to add this later when the case was clear to me.





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] jsancio commented on a change in pull request #9512: KAFKA-10394: generate snapshot

2020-12-01 Thread GitBox


jsancio commented on a change in pull request #9512:
URL: https://github.com/apache/kafka/pull/9512#discussion_r533833428



##
File path: raft/src/main/java/org/apache/kafka/snapshot/Snapshots.java
##
@@ -0,0 +1,67 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.snapshot;
+
+import java.io.IOException;
+import java.nio.file.Files;
+import java.nio.file.Path;
+import java.text.NumberFormat;
+import org.apache.kafka.raft.OffsetAndEpoch;
+
+final class Snapshots {
+private static final String SNAPSHOT_DIR = "snapshots";

Review comment:
   I think this depends on if we need to scan the snapshot directory. 
Unfortunately, I don't have a concrete answer at the moment. When we implement 
the changes to the rest of the raft client. Log truncation, updating the start 
offset and LEO, we may need to scan the snapshot/checkpoint folder to determine 
the greatest log start offset and LEO. @lbradstreet suggested storing them in a 
different directory as part of the KIP-630 review process as Kafka already have 
a few files in the partition log directory.





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] jsancio commented on a change in pull request #9512: KAFKA-10394: generate snapshot

2020-12-01 Thread GitBox


jsancio commented on a change in pull request #9512:
URL: https://github.com/apache/kafka/pull/9512#discussion_r533829583



##
File path: raft/src/main/java/org/apache/kafka/snapshot/Snapshots.java
##
@@ -0,0 +1,67 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.snapshot;
+
+import java.io.IOException;
+import java.nio.file.Files;
+import java.nio.file.Path;
+import java.text.NumberFormat;
+import org.apache.kafka.raft.OffsetAndEpoch;
+
+final class Snapshots {
+private static final String SNAPSHOT_DIR = "snapshots";
+private static final String SUFFIX =  ".snapshot";

Review comment:
   Good catch. I mentioned using `.checkpoint` in KIP-630. I forgot to 
change it here. I'll change it to that but let me know if you have a preference.





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[jira] [Created] (KAFKA-10793) Race condition in FindCoordinatorFuture permanently severs connection to group coordinator

2020-12-01 Thread A. Sophie Blee-Goldman (Jira)
A. Sophie Blee-Goldman created KAFKA-10793:
--

 Summary: Race condition in FindCoordinatorFuture permanently 
severs connection to group coordinator
 Key: KAFKA-10793
 URL: https://issues.apache.org/jira/browse/KAFKA-10793
 Project: Kafka
  Issue Type: Bug
  Components: consumer, streams
Affects Versions: 2.5.0
Reporter: A. Sophie Blee-Goldman


Pretty much as soon as we started actively monitoring the 
_last-rebalance-seconds-ago_ metric in our Kafka Streams test environment, we 
started seeing something weird. Every so often one of the StreamThreads (ie a 
single Consumer instance) would appear to permanently fall out of the group, as 
evidenced by a monotonically increasing _last-rebalance-seconds-ago._ We inject 
artificial network failures every few hours at most, so the group rebalances 
quite often. But the one consumer never rejoins, with no other symptoms 
(besides a slight drop in throughput since the remaining threads had to take 
over this member's work). We're confident that the problem exists in the client 
layer, since the logs confirmed that the unhealthy consumer was still calling 
poll. It was also calling Consumer#committed in its main poll loop, which was 
consistently failing with a TimeoutException.

When I attached a remote debugger to an instance experiencing this issue, the 
network client's connection to the group coordinator (the one that uses 
MAX_VALUE - node.id as the coordinator id) was in the DISCONNECTED state. But 
for some reason it never tried to re-establish this connection, although it did 
successfully connect to that same broker through the "normal" connection (ie 
the one that juts uses node.id).

The tl;dr is that the AbstractCoordinator's FindCoordinatorRequest has failed 
(presumably due to a disconnect), but the _findCoordinatorFuture_ is non-null 
so a new request is never sent. This shouldn't be possible since the 
FindCoordinatorResponseHandler is supposed to clear the _findCoordinatorFuture_ 
when the future is completed. But somehow that didn't happen, so the consumer 
continues to assume there's still a FindCoordinator request in flight and never 
even notices that it's dropped out of the group.

These are the only confirmed findings so far, however we have some guesses 
which I'll leave in the comments. Note that we only noticed this due to the 
newly added _last-rebalance-seconds-ago_ __metric, and there's no reason to 
believe this bug hasn't been flying under the radar since the Consumer's 
inception



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[GitHub] [kafka] jsancio commented on a change in pull request #9512: KAFKA-10394: generate snapshot

2020-12-01 Thread GitBox


jsancio commented on a change in pull request #9512:
URL: https://github.com/apache/kafka/pull/9512#discussion_r533828329



##
File path: raft/src/main/java/org/apache/kafka/raft/RaftClient.java
##
@@ -100,4 +102,15 @@ default void handleResign() {}
  */
 CompletableFuture shutdown(int timeoutMs);
 
+/**
+ * Create a writable snapshot file for a given offset and epoch.
+ *
+ * The RaftClient assumes that the snapshot return will contain the 
records up to but
+ * not including the end offset in the snapshot id. See {@link 
SnapshotWriter} for
+ * details on how to use this object.
+ *
+ * @param snapshotId the end offset and epoch that identifies the snapshot

Review comment:
   Is there a specific reason why you are asking this? We don't currently 
check for this. I will add a check for this and we can relax this later if we 
need to.





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] hachikuji commented on pull request #9666: MINOR: Remove broken `.travis.yml` with system test hooks

2020-12-01 Thread GitBox


hachikuji commented on pull request #9666:
URL: https://github.com/apache/kafka/pull/9666#issuecomment-736910631


   @chia7712 Nice that you got it working! I think there is value running a 
subset of system tests automatically. It might even encourage us to write more 
system tests if we can get the feedback more easily. Not having access to logs 
is annoying, but might not be a dealbreaker. It looks like we need a bit of 
initial work to tune the build though. I'm happy to close this and merge #9652. 
Then perhaps we can file a few jiras for any follow-up actions. What do you 
think?



This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] abbccdda commented on a change in pull request #9564: KAFKA-10667: add timeout for forwarding requests

2020-12-01 Thread GitBox


abbccdda commented on a change in pull request #9564:
URL: https://github.com/apache/kafka/pull/9564#discussion_r533814137



##
File path: core/src/main/scala/kafka/server/AlterIsrManager.scala
##
@@ -88,20 +88,33 @@ class AlterIsrManagerImpl(val controllerChannelManager: 
BrokerToControllerChanne
 
   private def sendRequest(inflightAlterIsrItems: Seq[AlterIsrItem]): Unit = {
 val message = buildRequest(inflightAlterIsrItems)
-def responseHandler(response: ClientResponse): Unit = {
-  try {
-val body = response.responseBody().asInstanceOf[AlterIsrResponse]
-handleAlterIsrResponse(body, message.brokerEpoch(), 
inflightAlterIsrItems)
-  } finally {
-// Be sure to clear the in-flight flag to allow future AlterIsr 
requests
-if (!inflightRequest.compareAndSet(true, false)) {
-  throw new IllegalStateException("AlterIsr response callback called 
when no requests were in flight")
+
+def clearInflightRequests(): Unit = {
+  // Be sure to clear the in-flight flag to allow future AlterIsr requests
+  if (!inflightRequest.compareAndSet(true, false)) {
+throw new IllegalStateException("AlterIsr response callback called 
when no requests were in flight")
+  }
+}
+
+class AlterIsrResponseHandler extends 
BrokerToControllerRequestCompletionHandler {
+  override def onComplete(response: ClientResponse): Unit = {
+try {
+  val body = response.responseBody().asInstanceOf[AlterIsrResponse]
+  handleAlterIsrResponse(body, message.brokerEpoch(), 
inflightAlterIsrItems)
+} finally {
+  clearInflightRequests()
 }
   }
+
+  override def onTimeout(): Unit = {
+warn(s"Encountered request when sending AlterIsr to the controller")

Review comment:
   That's what I decide to do eventually.





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] chia7712 commented on pull request #9666: MINOR: Remove broken `.travis.yml` with system test hooks

2020-12-01 Thread GitBox


chia7712 commented on pull request #9666:
URL: https://github.com/apache/kafka/pull/9666#issuecomment-736902424


   > guess you were trying to get this working instead? 
   
   #9652
   
   Removing travis test is ok to me.



This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] C0urante commented on pull request #9669: KAFKA-10792: Prevent source task shutdown from blocking herder thread

2020-12-01 Thread GitBox


C0urante commented on pull request #9669:
URL: https://github.com/apache/kafka/pull/9669#issuecomment-736896160


   Thanks, Nigel.
   
   @chia7712 @rhauch @kkonstantine would you like to take a look?



This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[jira] [Resolved] (KAFKA-10729) KIP-482: Bump remaining RPC's to use tagged fields

2020-12-01 Thread Jason Gustafson (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-10729?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Jason Gustafson resolved KAFKA-10729.
-
Fix Version/s: 2.8.0
   Resolution: Fixed

> KIP-482: Bump remaining RPC's to use tagged fields
> --
>
> Key: KAFKA-10729
> URL: https://issues.apache.org/jira/browse/KAFKA-10729
> Project: Kafka
>  Issue Type: Improvement
>Reporter: Gardner Vickers
>Assignee: Gardner Vickers
>Priority: Major
> Fix For: 2.8.0
>
>
> With 
> [KIP-482|https://cwiki.apache.org/confluence/display/KAFKA/KIP-482%3A+The+Kafka+Protocol+should+Support+Optional+Tagged+Fields],
>  the Kafka protocol gained support for tagged fields.
> Not all RPC's were bumped to use flexible versioning and tagged fields. We 
> should bump the remaining RPC's and provide a new IBP to take advantage of 
> tagged fields via the flexible versioning mechanism.
>  
> The RPC's which need to be bumped are:
>  
> {code:java}
> AddOffsetsToTxnRequest
> AddOffsetsToTxnResponse
> AddPartitionsToTxnRequest
> AddPartitionsToTxnResponse
> AlterClientQuotasRequest
> AlterClientQuotasResponse
> AlterConfigsRequest
> AlterConfigsResponse
> AlterReplicaLogDirsRequest
> AlterReplicaLogDirsResponse
> DescribeClientQuotasRequest
> DescribeClientQuotasResponse
> DescribeConfigsRequest
> DescribeConfigsResponse
> EndTxnRequest
> EndTxnResponse
> ListOffsetRequest
> ListOffsetResponse
> OffsetForLeaderEpochRequest
> OffsetForLeaderEpochResponse
> ProduceRequest
> ProduceResponse
> WriteTxnMarkersRequest
> WriteTxnMarkersResponse 
> {code}



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[GitHub] [kafka] hachikuji merged pull request #9601: KAFKA-10729: Bump remaining RPC's to use tagged fields.

2020-12-01 Thread GitBox


hachikuji merged pull request #9601:
URL: https://github.com/apache/kafka/pull/9601


   



This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] ncliang commented on a change in pull request #9669: KAFKA-10792: Prevent source task shutdown from blocking herder thread

2020-12-01 Thread GitBox


ncliang commented on a change in pull request #9669:
URL: https://github.com/apache/kafka/pull/9669#discussion_r533788764



##
File path: 
connect/runtime/src/main/java/org/apache/kafka/connect/runtime/WorkerSourceTask.java
##
@@ -206,16 +203,13 @@ public void cancel() {
 public void stop() {
 super.stop();
 stopRequestedLatch.countDown();
-synchronized (this) {
-if (finishedStart)
-tryStop();
-else
-startedShutdownBeforeStartCompleted = true;
-}
 }
 
-private synchronized void tryStop() {
-if (!stopped) {
+// Note: This method is not thread-safe
+private void tryStop() {
+// If the task is scheduled for shutdown before we invoke initialize 
or start on it (which
+// can happy reliably if it's started in the PAUSED state), we don't 
have to invoke stop on it

Review comment:
   ```suggestion
   // can happen reliably if it's started in the PAUSED state), we 
don't have to invoke stop on it
   ```





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] hachikuji commented on a change in pull request #9300: KAFKA-10491: Check authorizations first in KafkaApis

2020-12-01 Thread GitBox


hachikuji commented on a change in pull request #9300:
URL: https://github.com/apache/kafka/pull/9300#discussion_r533763099



##
File path: core/src/main/scala/kafka/server/KafkaApis.scala
##
@@ -1770,45 +1770,47 @@ class KafkaApis(val requestChannel: RequestChannel,
   sendResponseMaybeThrottle(controllerMutationQuota, request, 
createResponse, onComplete = None)
 }
 
+// be sure to check authorization first, before checking if this is the 
controller, to avoid leaking
+// information about the system (i.e. who is the controller) to principals 
unauthorized for that information
+
 val createTopicsRequest = request.body[CreateTopicsRequest]
 val results = new 
CreatableTopicResultCollection(createTopicsRequest.data.topics.size)
-if (!controller.isActive) {
-  createTopicsRequest.data.topics.forEach { topic =>
-results.add(new CreatableTopicResult().setName(topic.name)
-  .setErrorCode(Errors.NOT_CONTROLLER.code))
-  }
-  sendResponseCallback(results)
-} else {
-  createTopicsRequest.data.topics.forEach { topic =>
-results.add(new CreatableTopicResult().setName(topic.name))
+createTopicsRequest.data.topics.forEach { topic =>
+  results.add(new CreatableTopicResult().setName(topic.name))
+}
+val hasClusterAuthorization = authorize(request.context, CREATE, CLUSTER, 
CLUSTER_NAME,
+  logIfDenied = false)
+val topics = createTopicsRequest.data.topics.asScala.map(_.name)
+val authorizedTopics =
+  if (hasClusterAuthorization) topics.toSet
+  else filterByAuthorized(request.context, CREATE, TOPIC, topics)(identity)
+val authorizedForDescribeConfigs = filterByAuthorized(request.context, 
DESCRIBE_CONFIGS, TOPIC,
+  topics, logIfDenied = false)(identity).map(name => name -> 
results.find(name)).toMap
+
+results.forEach { topic =>
+  if (results.findAll(topic.name).size > 1) {
+topic.setErrorCode(Errors.INVALID_REQUEST.code)
+topic.setErrorMessage("Found multiple entries for this topic.")
+  } else if (!authorizedTopics.contains(topic.name)) {
+topic.setErrorCode(Errors.TOPIC_AUTHORIZATION_FAILED.code)
+topic.setErrorMessage("Authorization failed.")
+  }
+  if (!authorizedForDescribeConfigs.contains(topic.name)) {
+topic.setTopicConfigErrorCode(Errors.TOPIC_AUTHORIZATION_FAILED.code)
   }
-  val hasClusterAuthorization = authorize(request.context, CREATE, 
CLUSTER, CLUSTER_NAME,
-logIfDenied = false)
-  val topics = createTopicsRequest.data.topics.asScala.map(_.name)
-  val authorizedTopics =
-if (hasClusterAuthorization) topics.toSet
-else filterByAuthorized(request.context, CREATE, TOPIC, 
topics)(identity)
-  val authorizedForDescribeConfigs = filterByAuthorized(request.context, 
DESCRIBE_CONFIGS, TOPIC,
-topics, logIfDenied = false)(identity).map(name => name -> 
results.find(name)).toMap
-
+}
+if (!controller.isActive) {
+  // Don't provide the information that this node is not the controller 
unless they were authorized
+  // to perform at least one of their requests.  So only set 
NOT_CONTROLLER error for anything that so far has a
+  // success/NONE error code.  Keep the existing error codes that we've 
determined rather than overwriting them
+  // with NOT_CONTROLLER because that is potentially useful information 
for the client.
   results.forEach { topic =>
-if (results.findAll(topic.name).size > 1) {
-  topic.setErrorCode(Errors.INVALID_REQUEST.code)
-  topic.setErrorMessage("Found multiple entries for this topic.")
-} else if (!authorizedTopics.contains(topic.name)) {
-  topic.setErrorCode(Errors.TOPIC_AUTHORIZATION_FAILED.code)
-  topic.setErrorMessage("Authorization failed.")
-}
-if (!authorizedForDescribeConfigs.contains(topic.name)) {
-  topic.setTopicConfigErrorCode(Errors.TOPIC_AUTHORIZATION_FAILED.code)
-}
-  }
-  val toCreate = mutable.Map[String, CreatableTopic]()
-  createTopicsRequest.data.topics.forEach { topic =>
-if (results.find(topic.name).errorCode == Errors.NONE.code) {
-  toCreate += topic.name -> topic
+if(topic.errorCode() == Errors.NONE.code()) {

Review comment:
   nit: convention is to add space after `if`. There are a few of these in 
the patch

##
File path: core/src/main/scala/kafka/server/KafkaApis.scala
##
@@ -1770,45 +1770,47 @@ class KafkaApis(val requestChannel: RequestChannel,
   sendResponseMaybeThrottle(controllerMutationQuota, request, 
createResponse, onComplete = None)
 }
 
+// be sure to check authorization first, before checking if this is the 
controller, to avoid leaking
+// information about the system (i.e. who is the controller) to principals 
unauthorized for that information
+
 val createTopicsRequest = 

[GitHub] [kafka] d8tltanc commented on a change in pull request #9485: KAKFA-10619: Idempotent producer will get authorized once it has a WRITE access to at least one topic

2020-12-01 Thread GitBox


d8tltanc commented on a change in pull request #9485:
URL: https://github.com/apache/kafka/pull/9485#discussion_r533765909



##
File path: 
jmh-benchmarks/src/main/java/org/apache/kafka/jmh/acl/AclAuthorizerBenchmark.java
##
@@ -69,33 +70,39 @@
 @BenchmarkMode(Mode.AverageTime)
 @OutputTimeUnit(TimeUnit.MILLISECONDS)
 public class AclAuthorizerBenchmark {
-@Param({"1", "5", "20"})
+@Param({"1", "4", "8"})

Review comment:
   The underlying algorithm of AuthorizeByResourceType() implementation in 
AclAuthorizer has several characteristics:
   1. If any "allow resource" of the given ACE does not have a dominant "deny 
resource", the API will return immediately
   2. The complexity is O(n*m) where `n` is the number of "allow resources" of 
the given ACE, 'm' is the number of "deny resources" of the given ACE, but not 
related to the number of "ACE" in the cluster.
   
   $1 means that, given an ACE,  suppose `p%` of its "allow resource" does not 
have a dominant "deny resource", if `resourceCount` is `r`, on average, after 
checking `r * p * 0.01` "allow resources", the API will return. 
   a) if we are let the "dominant deny resource" distribute evenly, like use 
the (loop index % something) to determine which "allow resource" should have a 
dominant "deny resource", we end up iterating the same amount of the "allow 
resource" and returning from the API call every time, which is `r*p*0.01`
   b) if we are determine which "allow resource" should have a dominant "deny 
resource", the result will be too noisy. We may iterate only 1 resource or 
iterate all resources based on the randomize algorithm and seed.
   
   $2 means that, the API time cost is not related to the number of "ACE" but 
is hyperbolically increasing when `resourceCount` is increasing. Under the 
assumption in (1), the actual complexity would be (r * r * p * 0.01)
   
   
   So what I was doing is to directly test the worst case, where p = 1, which 
means 100% of the "allow resource" will have a dominant "deny resource. The 
complexity hence would be (r^2). It's rare that a cluster can have 200k "allow 
resources" and 200k corresponding "dominant deny resources" for each user, and 
it's not fair to have a relatively smaller `aclCount` and huger 
`resourceCount`, as the API is optimizing the performance by indexing on `ACE`.
   





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] d8tltanc commented on a change in pull request #9485: KAKFA-10619: Idempotent producer will get authorized once it has a WRITE access to at least one topic

2020-12-01 Thread GitBox


d8tltanc commented on a change in pull request #9485:
URL: https://github.com/apache/kafka/pull/9485#discussion_r533765909



##
File path: 
jmh-benchmarks/src/main/java/org/apache/kafka/jmh/acl/AclAuthorizerBenchmark.java
##
@@ -69,33 +70,39 @@
 @BenchmarkMode(Mode.AverageTime)
 @OutputTimeUnit(TimeUnit.MILLISECONDS)
 public class AclAuthorizerBenchmark {
-@Param({"1", "5", "20"})
+@Param({"1", "4", "8"})

Review comment:
   The underlying algorithm of AuthorizeByResourceType() implementation in 
AclAuthorizer has several characteristics:
   1. If any "allow resource" of the given ACE does not have a dominant "deny 
resource", the API will return immediately
   2. The complexity is O(n*m) where `n` is the number of "allow resources" of 
the given ACE, 'm' is the number of "deny resources" of the given ACE, but not 
related to the number of "ACE".
   
   $1 means that, given an ACE,  suppose `p%` of its "allow resource" does not 
have a dominant "deny resource", if `resourceCount` is `r`, on average, after 
checking `r * p * 0.01` "allow resources", the API will return. 
   a) if we are let the "dominant deny resource" distribute evenly, like use 
the (loop index % something) to determine which "allow resource" should have a 
dominant "deny resource", we end up iterating the same amount of the "allow 
resource" and returning from the API call every time, which is `r*p*0.01`
   b) if we are determine which "allow resource" should have a dominant "deny 
resource", the result will be too noisy. We may iterate only 1 resource or 
iterate all resources based on the randomize algorithm and seed.
   
   $2 means that, the API time cost is not related to the number of "ACE" but 
is hyperbolically increasing when `resourceCount` is increasing. Under the 
assumption in (1), the actual complexity would be (r * r * p * 0.01)
   
   
   So what I was doing is to directly test the worst case, where p = 1, which 
means 100% of the "allow resource" will have a dominant "deny resource. The 
complexity hence would be (r^2). It's rare that a cluster can have 200k "allow 
resources" and 200k corresponding "dominant deny resources" for each user, and 
it's not fair to have a relatively smaller `aclCount` and huger 
`resourceCount`, as the API is optimizing the performance by indexing on `ACE`.
   





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] d8tltanc commented on a change in pull request #9485: KAKFA-10619: Idempotent producer will get authorized once it has a WRITE access to at least one topic

2020-12-01 Thread GitBox


d8tltanc commented on a change in pull request #9485:
URL: https://github.com/apache/kafka/pull/9485#discussion_r533765909



##
File path: 
jmh-benchmarks/src/main/java/org/apache/kafka/jmh/acl/AclAuthorizerBenchmark.java
##
@@ -69,33 +70,39 @@
 @BenchmarkMode(Mode.AverageTime)
 @OutputTimeUnit(TimeUnit.MILLISECONDS)
 public class AclAuthorizerBenchmark {
-@Param({"1", "5", "20"})
+@Param({"1", "4", "8"})

Review comment:
   The underlying algorithm of AuthorizeByResourceType() implementation in 
AclAuthorizer has several characteristics:
   1. If any "allow resource" of the given ACE does not have a dominant "deny 
resource", the API will return immediately
   2. The complexity is O(n*m) where `n` is the number of "allow resources" of 
the given ACE, 'm' is the number of "deny resources" of the given ACE, but not 
related to the number of "ACE".
   
   $1 means that, given an ACE,  suppose `p%` of its "allow resource" does not 
have a dominant "deny resource", if `resourceCount` is `r`, on average, after 
checking `r * p * 0.01` "allow resources", the API will return. 
   a) if we are let the "dominant deny resource" distribute evenly, like use 
the (loop index % something) to determine which "allow resource" should have a 
dominant "deny resource", we end up iterating the same amount of the "allow 
resource" and returning from the API call every time, which is `r*p*0.01`
   b) if we are determine which "allow resource" should have a dominant "deny 
resource", the result will be too noisy. We may iterate only 1 resource or 
iterate all resources based on the randomize algorithm and seed.
   
   $2 means that, the API time cost is not related to the number of "ACE" but 
is hyperbolically increasing when `resourceCount` is increasing. Under the 
assumption in (1), the actual complexity would be (r * r * p * 0.01)
   
   
   So what I was doing is to directly test the worst case, where p = 1, which 
means 100% of the "allow resource" will have a dominant "deny resource. The 
complexity hence would be (r^2). It's rare that a cluster can have 200k "allow 
resources" and 200k corresponding "dominant deny resources" for each user and 
it's not fair to have a relatively smaller `aclCount` and huger 
`resourceCount`, as the API is indexing on `ACE`.
   





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] d8tltanc commented on a change in pull request #9485: KAKFA-10619: Idempotent producer will get authorized once it has a WRITE access to at least one topic

2020-12-01 Thread GitBox


d8tltanc commented on a change in pull request #9485:
URL: https://github.com/apache/kafka/pull/9485#discussion_r533765909



##
File path: 
jmh-benchmarks/src/main/java/org/apache/kafka/jmh/acl/AclAuthorizerBenchmark.java
##
@@ -69,33 +70,39 @@
 @BenchmarkMode(Mode.AverageTime)
 @OutputTimeUnit(TimeUnit.MILLISECONDS)
 public class AclAuthorizerBenchmark {
-@Param({"1", "5", "20"})
+@Param({"1", "4", "8"})

Review comment:
   The underlying algorithm of AuthorizeByResourceType() implementation in 
AclAuthorizer has several characteristics:
   1. If any "allow resource" of the given ACE does not have a dominant "deny 
resource", the API will return immediately
   2. The complexity is O(n*m) where `n` is the number of "allow resources" of 
the given ACE, 'm' is the number of "deny resources" of the given ACE, but not 
related to the number of "ACE".
   
   $1 means that, given an ACE,  suppose `p%` of its "allow resource" does not 
have a dominant "deny resource", if `resourceCount` is `r`, on average, after 
checking `r * p * 0.01` "allow resources", the API will return. 
   a) if we are let the "dominant deny resource" distribute evenly, like use 
the (loop index % something) to determine which "allow resource" should have a 
dominant "deny resource", we end up iterating the same amount of the "allow 
resource" and returning from the API call every time, which is `r*p*0.01`
   b) if we are determine which "allow resource" should have a dominant "deny 
resource", the result will be too noisy. We may iterate only 1 resource or 
iterate all resources based on the randomize algorithm and seed.
   
   $2 means that, the API time cost is not related to the number of "ACE" but 
is hyperbolically increasing when `resourceCount` is increasing. Under the 
assumption in (1), the actual complexity would be (r * r * p * 0.01)
   
   
   So what I was doing is to directly test the worst case, where p = 1, which 
means 100% of the "allow resource" will have a dominant "deny resource. The 
complexity hence would be (r^2). It's rare that a cluster can have 200k "allow 
resources" rules for each user and it's not fair to have a relatively smaller 
`aclCount` and huger `resourceCount`, as the API is indexing on `ACE`.
   





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] d8tltanc commented on a change in pull request #9485: KAKFA-10619: Idempotent producer will get authorized once it has a WRITE access to at least one topic

2020-12-01 Thread GitBox


d8tltanc commented on a change in pull request #9485:
URL: https://github.com/apache/kafka/pull/9485#discussion_r533765909



##
File path: 
jmh-benchmarks/src/main/java/org/apache/kafka/jmh/acl/AclAuthorizerBenchmark.java
##
@@ -69,33 +70,39 @@
 @BenchmarkMode(Mode.AverageTime)
 @OutputTimeUnit(TimeUnit.MILLISECONDS)
 public class AclAuthorizerBenchmark {
-@Param({"1", "5", "20"})
+@Param({"1", "4", "8"})

Review comment:
   The underlying algorithm of AuthorizeByResourceType() implementation in 
AclAuthorizer has several characteristics:
   1. If any "allow resource" of the given ACE does not have a dominant "deny 
resource", the API will return immediately
   2. The complexity is O(n*m) where `n` is the number of "allow resources" of 
the given ACE, 'm' is the number of "deny resources" of the given ACE, but not 
related to the number of "ACE".
   
   $1 means that, given an ACE,  suppose `p%` of its "allow resource" does not 
have a dominant "deny resource", if `resourceCount` is `r`, on average, after 
checking `r * p * 0.01` "allow resources", the API will return. 
   a) if we are let the "dominant deny resource" distribute evenly, like use 
the loop index % something to determine which "allow resource" should have a 
dominant "deny resource", we end up iterating the same amount of the "allow 
resource" and returning from the API call every time
   b) if we are determine which "allow resource" should have a dominant "deny 
resource", the result will be too noisy.
   
   $2 means that, the API time cost is not related to the number of "ACE" but 
is hyperbolically increasing when `resourceCount` is increasing. Under the 
assumption in (1), the actual complexity would be (r * r * p * 0.01)
   
   
   So what I was doing is to directly test the worst case, where p = 1, which 
means 100% of the "allow resource" will have a dominant "deny resource. The 
complexity hence would be (r^2). It's rare that a cluster can have 200k "allow 
resources" rules for each user and it's not fair to have a relatively smaller 
`aclCount` and huger `resourceCount`, as the API is indexing on `ACE`.
   





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] d8tltanc commented on a change in pull request #9485: KAKFA-10619: Idempotent producer will get authorized once it has a WRITE access to at least one topic

2020-12-01 Thread GitBox


d8tltanc commented on a change in pull request #9485:
URL: https://github.com/apache/kafka/pull/9485#discussion_r533765909



##
File path: 
jmh-benchmarks/src/main/java/org/apache/kafka/jmh/acl/AclAuthorizerBenchmark.java
##
@@ -69,33 +70,39 @@
 @BenchmarkMode(Mode.AverageTime)
 @OutputTimeUnit(TimeUnit.MILLISECONDS)
 public class AclAuthorizerBenchmark {
-@Param({"1", "5", "20"})
+@Param({"1", "4", "8"})

Review comment:
   The underlying algorithm of AuthorizeByResourceType() implementation in 
AclAuthorizer has several characteristics:
   1. If any "allow resource" of the given ACE does not have a dominant "deny 
resource", the API will return immediately
   2. The complexity is O(n*m) where `n` is the number of "allow resources" of 
the given ACE, 'm' is the number of "deny resources" of the given ACE, but not 
related to the number of "ACE".
   
   $1 means that, given an ACE,  suppose `p%` of its "allow resource" does not 
have a dominant "deny resource", if `resourceCount` is `r`, on average, after 
checking `r * p * 0.01` "allow resources", the API will return. 
   a) if we are hard-coding, like use the loop index % something to determine 
which "allow resource" should have a dominant "deny resource", we end up 
iterating the same amount of the "allow resource" and returning from the API 
call every time
   b) if we are determine which "allow resource" should have a dominant "deny 
resource", the result will be too noisy.
   
   $2 means that, the API time cost is not related to the number of "ACE" but 
is hyperbolically increasing when `resourceCount` is increasing. Under the 
assumption in (1), the actual complexity would be (r * r * p * 0.01)
   
   
   So what I was doing is to directly test the worst case, where p = 1, which 
means 100% of the "allow resource" will have a dominant "deny resource. The 
complexity hence would be (r^2). It's rare that a cluster can have 200k "allow 
resources" rules for each user and it's not fair to have a relatively smaller 
`aclCount` and huger `resourceCount`, as the API is indexing on `ACE`.
   





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] d8tltanc commented on a change in pull request #9485: KAKFA-10619: Idempotent producer will get authorized once it has a WRITE access to at least one topic

2020-12-01 Thread GitBox


d8tltanc commented on a change in pull request #9485:
URL: https://github.com/apache/kafka/pull/9485#discussion_r533765909



##
File path: 
jmh-benchmarks/src/main/java/org/apache/kafka/jmh/acl/AclAuthorizerBenchmark.java
##
@@ -69,33 +70,39 @@
 @BenchmarkMode(Mode.AverageTime)
 @OutputTimeUnit(TimeUnit.MILLISECONDS)
 public class AclAuthorizerBenchmark {
-@Param({"1", "5", "20"})
+@Param({"1", "4", "8"})

Review comment:
   The underlying algorithm of AuthorizeByResourceType() implementation in 
AclAuthorizer has several characteristics:
   1. If any "allow resource" of the given ACE does not have a dominant "deny 
resource", the API will return immediately
   2. The complexity is O(n*m) where `n` is the number of "allow resources" of 
the given ACE, 'm' is the number of "deny resources" of the given ACE, but not 
related to the number of "ACE".
   
   1) means that, given an ACE,  suppose `p%` of its "allow resource" does not 
have a dominant "deny resource", if `resourceCount` is `r`, on average, after 
checking `r * p * 0.01` "allow resources", the API will return. 
   a) if we are hard-coding, like use the loop index % something to determine 
which "allow resource" should have a dominant "deny resource", we end up 
iterating the same amount of the "allow resource" and returning from the API 
call every time
   b) if we are determine which "allow resource" should have a dominant "deny 
resource", the result will be too noisy.
   
   2) means that, the API time cost is not related to the number of "ACE" but 
is hyperbolically increasing when `resourceCount` is increasing. Under the 
assumption in (1), the actual complexity would be (r * r * p * 0.01)
   
   
   So what I was doing is to directly test the worst case, where p = 1, which 
means 100% of the "allow resource" will have a dominant "deny resource. The 
complexity hence would be (r^2). It's rare that a cluster can have 200k "allow 
resources" rules for each user and it's not fair to have a relatively smaller 
`aclCount` and huger `resourceCount`, as the API is indexing on `ACE`.
   





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] hachikuji commented on a change in pull request #9512: KAFKA-10394: generate snapshot

2020-12-01 Thread GitBox


hachikuji commented on a change in pull request #9512:
URL: https://github.com/apache/kafka/pull/9512#discussion_r533732789



##
File path: raft/src/main/java/org/apache/kafka/snapshot/Snapshots.java
##
@@ -0,0 +1,67 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.snapshot;
+
+import java.io.IOException;
+import java.nio.file.Files;
+import java.nio.file.Path;
+import java.text.NumberFormat;
+import org.apache.kafka.raft.OffsetAndEpoch;
+
+final class Snapshots {
+private static final String SNAPSHOT_DIR = "snapshots";
+private static final String SUFFIX =  ".snapshot";

Review comment:
   This suffix is used for producer state snapshots already. Maybe we could 
use `.snap` or something like that.

##
File path: raft/src/main/java/org/apache/kafka/raft/ReplicatedLog.java
##
@@ -149,6 +152,29 @@ default OptionalLong truncateToEndOffset(OffsetAndEpoch 
endOffset) {
 return OptionalLong.of(truncationOffset);
 }
 
+/**
+ * Create a writable snapshot for the given snapshot id.
+ *
+ * See {@link RawSnapshotWriter} for details on how to use this object.
+ *
+ * @param snapshotId the end offset and epoch that identifies the snapshot
+ * @return a writable snapshot
+ */
+RawSnapshotWriter createSnapshot(OffsetAndEpoch snapshotId) throws 
IOException;

Review comment:
   Do we also need an api to list snapshots?

##
File path: 
raft/src/main/java/org/apache/kafka/snapshot/FileRawSnapshotWriter.java
##
@@ -0,0 +1,107 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.snapshot;
+
+import java.io.IOException;
+import java.nio.ByteBuffer;
+import java.nio.channels.FileChannel;
+import java.nio.file.Files;
+import java.nio.file.Path;
+import java.nio.file.StandardCopyOption;
+import java.nio.file.StandardOpenOption;
+import org.apache.kafka.common.utils.Utils;
+import org.apache.kafka.raft.OffsetAndEpoch;
+
+public final class FileRawSnapshotWriter implements RawSnapshotWriter {
+private final Path path;
+private final FileChannel channel;
+private final OffsetAndEpoch snapshotId;
+private boolean frozen = false;
+
+private FileRawSnapshotWriter(
+Path path,
+FileChannel channel,
+OffsetAndEpoch snapshotId
+) {
+this.path = path;
+this.channel = channel;
+this.snapshotId = snapshotId;
+}
+
+@Override
+public OffsetAndEpoch snapshotId() {
+return snapshotId;
+}
+
+@Override
+public long sizeInBytes() throws IOException {
+return channel.size();
+}
+
+@Override
+public void append(ByteBuffer buffer) throws IOException {
+if (frozen) {
+throw new IllegalStateException(
+String.format("Append not supported. Snapshot is already 
frozen: id = %s; path = %s", snapshotId, path)
+);
+}
+
+Utils.writeFully(channel, buffer);
+}
+
+@Override
+public boolean isFrozen() {
+return frozen;
+}
+
+@Override
+public void freeze() throws IOException {
+channel.close();
+frozen = true;
+
+// Set readonly and ignore the result
+if (!path.toFile().setReadOnly()) {
+throw new IOException(String.format("Unable to set file %s as 
read-only", path));
+}
+
+Path destination = 

[GitHub] [kafka] d8tltanc commented on a change in pull request #9485: KAKFA-10619: Idempotent producer will get authorized once it has a WRITE access to at least one topic

2020-12-01 Thread GitBox


d8tltanc commented on a change in pull request #9485:
URL: https://github.com/apache/kafka/pull/9485#discussion_r533737220



##
File path: 
core/src/test/scala/unit/kafka/security/authorizer/MockAuthorizer.scala
##
@@ -0,0 +1,62 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package kafka.security.authorizer
+
+import java.{lang, util}
+import java.util.concurrent.CompletionStage
+
+import org.apache.kafka.common.Endpoint
+import org.apache.kafka.common.acl.{AclBinding, AclBindingFilter}
+import org.apache.kafka.server.authorizer.{AclCreateResult, AclDeleteResult, 
Action, AuthorizableRequestContext, AuthorizationResult, Authorizer, 
AuthorizerServerInfo}
+
+object MockAuthorizer {
+val authorizer = new AclAuthorizer
+}
+
+/**
+ * A mock authorizer for testing the interface default
+ */
+class MockAuthorizer extends Authorizer {

Review comment:
   Right. DelegatingAuthorizer is more reasonable as a design pattern 
naming here.





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] rajinisivaram commented on a change in pull request #9485: KAKFA-10619: Idempotent producer will get authorized once it has a WRITE access to at least one topic

2020-12-01 Thread GitBox


rajinisivaram commented on a change in pull request #9485:
URL: https://github.com/apache/kafka/pull/9485#discussion_r533510477



##
File path: core/src/main/scala/kafka/security/authorizer/AclAuthorizer.scala
##
@@ -304,6 +308,122 @@ class AclAuthorizer extends Authorizer with Logging {
 if (zkClient != null) zkClient.close()
   }
 
+  // TODO: 1. Discuss how to log audit message
+  // TODO: 2. Discuss if we need a trie to optimize(mainly for the O(n^2) loop 
but I think
+  //  in most of the cases it would be O(1) because denyDominatePrefixAllow 
should be rare
+  override def authorizeByResourceType(requestContext: 
AuthorizableRequestContext,
+   op: AclOperation,
+   resourceType: ResourceType): 
AuthorizationResult = {
+if (resourceType eq ResourceType.ANY)
+  throw new IllegalArgumentException("Must specify a non-filter resource 
type for authorizeByResourceType")
+
+if (resourceType eq ResourceType.UNKNOWN)
+  throw new IllegalArgumentException("Unknown resource type")
+
+if (op eq AclOperation.ANY)
+  throw new IllegalArgumentException("Must specify a non-filter operation 
type for authorizeByResourceType")
+
+if (op eq AclOperation.UNKNOWN)
+  throw new IllegalArgumentException("Unknown operation type")

Review comment:
   We should probably move this common code to SecurityUtils and use it 
both here and in the default implementation.

##
File path: core/src/main/scala/kafka/security/authorizer/AclAuthorizer.scala
##
@@ -304,6 +308,122 @@ class AclAuthorizer extends Authorizer with Logging {
 if (zkClient != null) zkClient.close()
   }
 
+  // TODO: 1. Discuss how to log audit message
+  // TODO: 2. Discuss if we need a trie to optimize(mainly for the O(n^2) loop 
but I think
+  //  in most of the cases it would be O(1) because denyDominatePrefixAllow 
should be rare
+  override def authorizeByResourceType(requestContext: 
AuthorizableRequestContext,
+   op: AclOperation,
+   resourceType: ResourceType): 
AuthorizationResult = {
+if (resourceType eq ResourceType.ANY)
+  throw new IllegalArgumentException("Must specify a non-filter resource 
type for authorizeByResourceType")
+
+if (resourceType eq ResourceType.UNKNOWN)
+  throw new IllegalArgumentException("Unknown resource type")
+
+if (op eq AclOperation.ANY)
+  throw new IllegalArgumentException("Must specify a non-filter operation 
type for authorizeByResourceType")
+
+if (op eq AclOperation.UNKNOWN)
+  throw new IllegalArgumentException("Unknown operation type")
+
+val principal = new KafkaPrincipal(
+  requestContext.principal().getPrincipalType,
+  requestContext.principal().getName).toString
+
+val denyPatterns = matchingPatterns(
+  principal,
+  requestContext.clientAddress().getHostAddress,
+  op,
+  resourceType,
+  AclPermissionType.DENY
+)
+
+if (denyAll(denyPatterns)) {
+  logAuditMessage(requestContext, new Action(op, null,0, true, true), 
false, false)
+  return AuthorizationResult.DENIED
+}
+
+if (shouldAllowEveryoneIfNoAclIsFound) {
+  logAuditMessage(requestContext, new Action(op, null, 0, true, true), 
true, false)
+  return AuthorizationResult.ALLOWED
+}
+
+val allowPatterns = matchingPatterns(
+  principal,
+  requestContext.clientAddress().getHostAddress,
+  op,
+  resourceType,
+  AclPermissionType.ALLOW
+)
+
+if (allowAny(allowPatterns, denyPatterns)) {
+  logAuditMessage(requestContext, new Action(op,null, 0, true, true), 
true, false)
+  return AuthorizationResult.ALLOWED
+}
+
+logAuditMessage(requestContext, new Action(op, null, 0, true, true), 
false, false)
+AuthorizationResult.DENIED
+  }
+
+  def matchingPatterns(principal: String, host: String, op: AclOperation,
+   resourceType: ResourceType,
+   permission: AclPermissionType): Set[ResourcePattern] = {
+var resources = Set[ResourcePattern]()
+for (p <- Set(principal, AclEntry.WildcardPrincipal.toString)) {
+  for (h <- Set(host, AclEntry.WildcardHost)) {
+for (o <- Set(op, AclOperation.ALL)) {
+  val ace = new AccessControlEntry(p, h, o, permission)
+  resourceCache.get(ace) match {
+case Some(r) => resources ++= r.filter(r => r.resourceType() == 
resourceType)
+case None =>
+  }
+}
+  }
+}
+resources
+  }
+
+  private def denyAll(denyResources: Set[ResourcePattern]): Boolean =
+denyResources.exists(rp => denyAll(rp))
+
+  private def denyAll(rp: ResourcePattern): Boolean =
+rp.patternType() == PatternType.LITERAL && rp.name() == 
ResourcePattern.WILDCARD_RESOURCE
+
+  private def allowAny(allowPatterns: Set[ResourcePattern], denyPatterns: 

[GitHub] [kafka] JimGalasyn opened a new pull request #9670: DOCS-6076: Clarify config names for EOS versions 1 and 2

2020-12-01 Thread GitBox


JimGalasyn opened a new pull request #9670:
URL: https://github.com/apache/kafka/pull/9670


   



This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] hachikuji commented on a change in pull request #9601: KAFKA-10729: Bump remaining RPC's to use tagged fields.

2020-12-01 Thread GitBox


hachikuji commented on a change in pull request #9601:
URL: https://github.com/apache/kafka/pull/9601#discussion_r533719708



##
File path: core/src/test/scala/unit/kafka/server/ReplicaFetcherThreadTest.scala
##
@@ -437,8 +437,8 @@ class ReplicaFetcherThreadTest {
 thread.doWork()
 assertEquals(2, mockNetwork.epochFetchCount)
 assertEquals(1, mockNetwork.fetchCount)
-assertEquals("OffsetsForLeaderEpochRequest version.",
-  3, mockNetwork.lastUsedOffsetForLeaderEpochVersion)
+assertTrue("OffsetsForLeaderEpochRequest version.",
+  mockNetwork.lastUsedOffsetForLeaderEpochVersion >= 3)

Review comment:
   I think this was originally using `1` in order to ensure that we were 
using a version which included the epoch in the response. Since then it looks 
like it has been updated blindly every time we've bumped the protocol. I'm ok 
leaving this as is, but we could probably also get rid of it.





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[jira] [Commented] (KAFKA-10772) java.lang.IllegalStateException: There are insufficient bytes available to read assignment from the sync-group response (actual byte size 0)

2020-12-01 Thread Levani Kokhreidze (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-10772?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17241865#comment-17241865
 ] 

Levani Kokhreidze commented on KAFKA-10772:
---

Maybe this helps – we noticed that this problem happens mostly when there's 
leadership change in the cluster.

> java.lang.IllegalStateException: There are insufficient bytes available to 
> read assignment from the sync-group response (actual byte size 0)
> 
>
> Key: KAFKA-10772
> URL: https://issues.apache.org/jira/browse/KAFKA-10772
> Project: Kafka
>  Issue Type: Bug
>  Components: streams
>Affects Versions: 2.6.0
>Reporter: Levani Kokhreidze
>Priority: Major
> Attachments: KAFKA-10772.log
>
>
> From time to time we encounter the following exception that results in Kafka 
> Streams threads dying.
> Broker version 2.4.1, Client version 2.6.0
> {code:java}
> Nov 27 00:59:53.681 streaming-app service: prod | streaming-app-2 | 
> stream-client [cluster1-profile-stats-pipeline-client-id] State transition 
> from REBALANCING to ERROR Nov 27 00:59:53.681 streaming-app service: prod | 
> streaming-app-2 | stream-client [cluster1-profile-stats-pipeline-client-id] 
> State transition from REBALANCING to ERROR Nov 27 00:59:53.682 streaming-app 
> service: prod | streaming-app-2 | 2020-11-27 00:59:53.681 ERROR 105 --- 
> [-StreamThread-1] .KafkaStreamsBasedStreamProcessingEngine : Stream 
> processing pipeline: [profile-stats] encountered unrecoverable exception. 
> Thread: [cluster1-profile-stats-pipeline-client-id-StreamThread-1] is 
> completely dead. If all worker threads die, Kafka Streams will be moved to 
> permanent ERROR state. Nov 27 00:59:53.682 streaming-app service: prod | 
> streaming-app-2 | Stream processing pipeline: [profile-stats] encountered 
> unrecoverable exception. Thread: 
> [cluster1-profile-stats-pipeline-client-id-StreamThread-1] is completely 
> dead. If all worker threads die, Kafka Streams will be moved to permanent 
> ERROR state. java.lang.IllegalStateException: There are insufficient bytes 
> available to read assignment from the sync-group response (actual byte size 
> 0) , this is not expected; it is possible that the leader's assign function 
> is buggy and did not return any assignment for this member, or because static 
> member is configured and the protocol is buggy hence did not get the 
> assignment for this member at 
> org.apache.kafka.clients.consumer.internals.ConsumerCoordinator.onJoinComplete(ConsumerCoordinator.java:367)
>  at 
> org.apache.kafka.clients.consumer.internals.AbstractCoordinator.joinGroupIfNeeded(AbstractCoordinator.java:440)
>  at 
> org.apache.kafka.clients.consumer.internals.AbstractCoordinator.ensureActiveGroup(AbstractCoordinator.java:359)
>  at 
> org.apache.kafka.clients.consumer.internals.ConsumerCoordinator.poll(ConsumerCoordinator.java:513)
>  at 
> org.apache.kafka.clients.consumer.KafkaConsumer.updateAssignmentMetadataIfNeeded(KafkaConsumer.java:1268)
>  at 
> org.apache.kafka.clients.consumer.KafkaConsumer.poll(KafkaConsumer.java:1230) 
> at 
> org.apache.kafka.clients.consumer.KafkaConsumer.poll(KafkaConsumer.java:1210) 
> at 
> org.apache.kafka.streams.processor.internals.StreamThread.pollRequests(StreamThread.java:766)
>  at 
> org.apache.kafka.streams.processor.internals.StreamThread.runOnce(StreamThread.java:624)
>  at 
> org.apache.kafka.streams.processor.internals.StreamThread.runLoop(StreamThread.java:551)
>  at 
> org.apache.kafka.streams.processor.internals.StreamThread.run(StreamThread.java:510)
> {code}



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Commented] (KAFKA-10772) java.lang.IllegalStateException: There are insufficient bytes available to read assignment from the sync-group response (actual byte size 0)

2020-12-01 Thread Levani Kokhreidze (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-10772?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17241863#comment-17241863
 ] 

Levani Kokhreidze commented on KAFKA-10772:
---

Hi [~ableegoldman], yes we are using static membership. 

I've attached logs for the incident.[^KAFKA-10772.log]

> java.lang.IllegalStateException: There are insufficient bytes available to 
> read assignment from the sync-group response (actual byte size 0)
> 
>
> Key: KAFKA-10772
> URL: https://issues.apache.org/jira/browse/KAFKA-10772
> Project: Kafka
>  Issue Type: Bug
>  Components: streams
>Affects Versions: 2.6.0
>Reporter: Levani Kokhreidze
>Priority: Major
> Attachments: KAFKA-10772.log
>
>
> From time to time we encounter the following exception that results in Kafka 
> Streams threads dying.
> Broker version 2.4.1, Client version 2.6.0
> {code:java}
> Nov 27 00:59:53.681 streaming-app service: prod | streaming-app-2 | 
> stream-client [cluster1-profile-stats-pipeline-client-id] State transition 
> from REBALANCING to ERROR Nov 27 00:59:53.681 streaming-app service: prod | 
> streaming-app-2 | stream-client [cluster1-profile-stats-pipeline-client-id] 
> State transition from REBALANCING to ERROR Nov 27 00:59:53.682 streaming-app 
> service: prod | streaming-app-2 | 2020-11-27 00:59:53.681 ERROR 105 --- 
> [-StreamThread-1] .KafkaStreamsBasedStreamProcessingEngine : Stream 
> processing pipeline: [profile-stats] encountered unrecoverable exception. 
> Thread: [cluster1-profile-stats-pipeline-client-id-StreamThread-1] is 
> completely dead. If all worker threads die, Kafka Streams will be moved to 
> permanent ERROR state. Nov 27 00:59:53.682 streaming-app service: prod | 
> streaming-app-2 | Stream processing pipeline: [profile-stats] encountered 
> unrecoverable exception. Thread: 
> [cluster1-profile-stats-pipeline-client-id-StreamThread-1] is completely 
> dead. If all worker threads die, Kafka Streams will be moved to permanent 
> ERROR state. java.lang.IllegalStateException: There are insufficient bytes 
> available to read assignment from the sync-group response (actual byte size 
> 0) , this is not expected; it is possible that the leader's assign function 
> is buggy and did not return any assignment for this member, or because static 
> member is configured and the protocol is buggy hence did not get the 
> assignment for this member at 
> org.apache.kafka.clients.consumer.internals.ConsumerCoordinator.onJoinComplete(ConsumerCoordinator.java:367)
>  at 
> org.apache.kafka.clients.consumer.internals.AbstractCoordinator.joinGroupIfNeeded(AbstractCoordinator.java:440)
>  at 
> org.apache.kafka.clients.consumer.internals.AbstractCoordinator.ensureActiveGroup(AbstractCoordinator.java:359)
>  at 
> org.apache.kafka.clients.consumer.internals.ConsumerCoordinator.poll(ConsumerCoordinator.java:513)
>  at 
> org.apache.kafka.clients.consumer.KafkaConsumer.updateAssignmentMetadataIfNeeded(KafkaConsumer.java:1268)
>  at 
> org.apache.kafka.clients.consumer.KafkaConsumer.poll(KafkaConsumer.java:1230) 
> at 
> org.apache.kafka.clients.consumer.KafkaConsumer.poll(KafkaConsumer.java:1210) 
> at 
> org.apache.kafka.streams.processor.internals.StreamThread.pollRequests(StreamThread.java:766)
>  at 
> org.apache.kafka.streams.processor.internals.StreamThread.runOnce(StreamThread.java:624)
>  at 
> org.apache.kafka.streams.processor.internals.StreamThread.runLoop(StreamThread.java:551)
>  at 
> org.apache.kafka.streams.processor.internals.StreamThread.run(StreamThread.java:510)
> {code}



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Comment Edited] (KAFKA-10772) java.lang.IllegalStateException: There are insufficient bytes available to read assignment from the sync-group response (actual byte size 0)

2020-12-01 Thread Levani Kokhreidze (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-10772?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17241863#comment-17241863
 ] 

Levani Kokhreidze edited comment on KAFKA-10772 at 12/1/20, 8:50 PM:
-

Hi [~ableegoldman], yes we are using static membership. 

I've attached logs for the incident.
[^KAFKA-10772.log]


was (Author: lkokhreidze):
Hi [~ableegoldman], yes we are using static membership. 

I've attached logs for the incident.[^KAFKA-10772.log]

> java.lang.IllegalStateException: There are insufficient bytes available to 
> read assignment from the sync-group response (actual byte size 0)
> 
>
> Key: KAFKA-10772
> URL: https://issues.apache.org/jira/browse/KAFKA-10772
> Project: Kafka
>  Issue Type: Bug
>  Components: streams
>Affects Versions: 2.6.0
>Reporter: Levani Kokhreidze
>Priority: Major
> Attachments: KAFKA-10772.log
>
>
> From time to time we encounter the following exception that results in Kafka 
> Streams threads dying.
> Broker version 2.4.1, Client version 2.6.0
> {code:java}
> Nov 27 00:59:53.681 streaming-app service: prod | streaming-app-2 | 
> stream-client [cluster1-profile-stats-pipeline-client-id] State transition 
> from REBALANCING to ERROR Nov 27 00:59:53.681 streaming-app service: prod | 
> streaming-app-2 | stream-client [cluster1-profile-stats-pipeline-client-id] 
> State transition from REBALANCING to ERROR Nov 27 00:59:53.682 streaming-app 
> service: prod | streaming-app-2 | 2020-11-27 00:59:53.681 ERROR 105 --- 
> [-StreamThread-1] .KafkaStreamsBasedStreamProcessingEngine : Stream 
> processing pipeline: [profile-stats] encountered unrecoverable exception. 
> Thread: [cluster1-profile-stats-pipeline-client-id-StreamThread-1] is 
> completely dead. If all worker threads die, Kafka Streams will be moved to 
> permanent ERROR state. Nov 27 00:59:53.682 streaming-app service: prod | 
> streaming-app-2 | Stream processing pipeline: [profile-stats] encountered 
> unrecoverable exception. Thread: 
> [cluster1-profile-stats-pipeline-client-id-StreamThread-1] is completely 
> dead. If all worker threads die, Kafka Streams will be moved to permanent 
> ERROR state. java.lang.IllegalStateException: There are insufficient bytes 
> available to read assignment from the sync-group response (actual byte size 
> 0) , this is not expected; it is possible that the leader's assign function 
> is buggy and did not return any assignment for this member, or because static 
> member is configured and the protocol is buggy hence did not get the 
> assignment for this member at 
> org.apache.kafka.clients.consumer.internals.ConsumerCoordinator.onJoinComplete(ConsumerCoordinator.java:367)
>  at 
> org.apache.kafka.clients.consumer.internals.AbstractCoordinator.joinGroupIfNeeded(AbstractCoordinator.java:440)
>  at 
> org.apache.kafka.clients.consumer.internals.AbstractCoordinator.ensureActiveGroup(AbstractCoordinator.java:359)
>  at 
> org.apache.kafka.clients.consumer.internals.ConsumerCoordinator.poll(ConsumerCoordinator.java:513)
>  at 
> org.apache.kafka.clients.consumer.KafkaConsumer.updateAssignmentMetadataIfNeeded(KafkaConsumer.java:1268)
>  at 
> org.apache.kafka.clients.consumer.KafkaConsumer.poll(KafkaConsumer.java:1230) 
> at 
> org.apache.kafka.clients.consumer.KafkaConsumer.poll(KafkaConsumer.java:1210) 
> at 
> org.apache.kafka.streams.processor.internals.StreamThread.pollRequests(StreamThread.java:766)
>  at 
> org.apache.kafka.streams.processor.internals.StreamThread.runOnce(StreamThread.java:624)
>  at 
> org.apache.kafka.streams.processor.internals.StreamThread.runLoop(StreamThread.java:551)
>  at 
> org.apache.kafka.streams.processor.internals.StreamThread.run(StreamThread.java:510)
> {code}



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Updated] (KAFKA-10772) java.lang.IllegalStateException: There are insufficient bytes available to read assignment from the sync-group response (actual byte size 0)

2020-12-01 Thread Levani Kokhreidze (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-10772?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Levani Kokhreidze updated KAFKA-10772:
--
Attachment: KAFKA-10772.log

> java.lang.IllegalStateException: There are insufficient bytes available to 
> read assignment from the sync-group response (actual byte size 0)
> 
>
> Key: KAFKA-10772
> URL: https://issues.apache.org/jira/browse/KAFKA-10772
> Project: Kafka
>  Issue Type: Bug
>  Components: streams
>Affects Versions: 2.6.0
>Reporter: Levani Kokhreidze
>Priority: Major
> Attachments: KAFKA-10772.log
>
>
> From time to time we encounter the following exception that results in Kafka 
> Streams threads dying.
> Broker version 2.4.1, Client version 2.6.0
> {code:java}
> Nov 27 00:59:53.681 streaming-app service: prod | streaming-app-2 | 
> stream-client [cluster1-profile-stats-pipeline-client-id] State transition 
> from REBALANCING to ERROR Nov 27 00:59:53.681 streaming-app service: prod | 
> streaming-app-2 | stream-client [cluster1-profile-stats-pipeline-client-id] 
> State transition from REBALANCING to ERROR Nov 27 00:59:53.682 streaming-app 
> service: prod | streaming-app-2 | 2020-11-27 00:59:53.681 ERROR 105 --- 
> [-StreamThread-1] .KafkaStreamsBasedStreamProcessingEngine : Stream 
> processing pipeline: [profile-stats] encountered unrecoverable exception. 
> Thread: [cluster1-profile-stats-pipeline-client-id-StreamThread-1] is 
> completely dead. If all worker threads die, Kafka Streams will be moved to 
> permanent ERROR state. Nov 27 00:59:53.682 streaming-app service: prod | 
> streaming-app-2 | Stream processing pipeline: [profile-stats] encountered 
> unrecoverable exception. Thread: 
> [cluster1-profile-stats-pipeline-client-id-StreamThread-1] is completely 
> dead. If all worker threads die, Kafka Streams will be moved to permanent 
> ERROR state. java.lang.IllegalStateException: There are insufficient bytes 
> available to read assignment from the sync-group response (actual byte size 
> 0) , this is not expected; it is possible that the leader's assign function 
> is buggy and did not return any assignment for this member, or because static 
> member is configured and the protocol is buggy hence did not get the 
> assignment for this member at 
> org.apache.kafka.clients.consumer.internals.ConsumerCoordinator.onJoinComplete(ConsumerCoordinator.java:367)
>  at 
> org.apache.kafka.clients.consumer.internals.AbstractCoordinator.joinGroupIfNeeded(AbstractCoordinator.java:440)
>  at 
> org.apache.kafka.clients.consumer.internals.AbstractCoordinator.ensureActiveGroup(AbstractCoordinator.java:359)
>  at 
> org.apache.kafka.clients.consumer.internals.ConsumerCoordinator.poll(ConsumerCoordinator.java:513)
>  at 
> org.apache.kafka.clients.consumer.KafkaConsumer.updateAssignmentMetadataIfNeeded(KafkaConsumer.java:1268)
>  at 
> org.apache.kafka.clients.consumer.KafkaConsumer.poll(KafkaConsumer.java:1230) 
> at 
> org.apache.kafka.clients.consumer.KafkaConsumer.poll(KafkaConsumer.java:1210) 
> at 
> org.apache.kafka.streams.processor.internals.StreamThread.pollRequests(StreamThread.java:766)
>  at 
> org.apache.kafka.streams.processor.internals.StreamThread.runOnce(StreamThread.java:624)
>  at 
> org.apache.kafka.streams.processor.internals.StreamThread.runLoop(StreamThread.java:551)
>  at 
> org.apache.kafka.streams.processor.internals.StreamThread.run(StreamThread.java:510)
> {code}



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[GitHub] [kafka] wcarlson5 commented on a change in pull request #9615: KAFKA-10500: Add thread option

2020-12-01 Thread GitBox


wcarlson5 commented on a change in pull request #9615:
URL: https://github.com/apache/kafka/pull/9615#discussion_r533707640



##
File path: streams/src/main/java/org/apache/kafka/streams/KafkaStreams.java
##
@@ -894,19 +885,88 @@ private KafkaStreams(final InternalTopologyBuilder 
internalTopologyBuilder,
 queryableStoreProvider = new QueryableStoreProvider(storeProviders, 
globalStateStoreProvider);
 
 stateDirCleaner = setupStateDirCleaner();
-oldHandler = false;
 maybeWarnAboutCodeInRocksDBConfigSetter(log, config);
 rocksDBMetricsRecordingService = 
maybeCreateRocksDBMetricsRecordingService(clientId, config);
 }
 
+private StreamThread createStreamThread(final long cacheSizePerThread, 
final int threadIdx) {
+final StreamThread streamThread = StreamThread.create(
+internalTopologyBuilder,
+config,
+clientSupplier,
+adminClient,
+processId,
+clientId,
+streamsMetrics,
+time,
+streamsMetadataState,
+cacheSizePerThread,
+stateDirectory,
+delegatingStateRestoreListener,
+threadIdx,
+KafkaStreams.this::closeToError,
+streamsUncaughtExceptionHandler
+);
+threads.add(streamThread);
+threadState.put(streamThread.getId(), streamThread.state());
+storeProviders.add(new StreamThreadStateStoreProvider(streamThread));
+return streamThread;
+}
+
+/**
+ * Adds and starts a stream thread in addition to the stream threads that 
are already running in this
+ * Kafka Streams client.
+ *
+ * Since the number of stream threads increases, the sizes of the caches 
in the new stream thread
+ * and the existing stream threads are adapted so that the sum of the 
cache sizes over all stream
+ * threads does not exceed the total cache size specified in configuration
+ * {@code cache.max.bytes.buffering}.
+ *
+ * Stream threads can only be added if this Kafka Streams client is in 
state RUNNING or REBALANCING.
+ *
+ * @return name of the added stream thread or empty if a new stream thread 
could not be added
+ */
+public Optional addStreamThread() {
+if (isRunningOrRebalancing()) {
+final int threadIdx = getNextThreadIndex();
+final long cacheSizePerThread = 
getCacheSizePerThread(threads.size() + 1);
+resizeThreadCache(cacheSizePerThread);
+final StreamThread streamThread = 
createStreamThread(cacheSizePerThread, threadIdx);
+streamThread.setStateListener(streamStateListener);
+synchronized (stateLock) {
+if (isRunningOrRebalancing()) {
+streamThread.start();

Review comment:
   I think when a state transition is changed or add is when these changes 
should be made. Removing from the thread list is low cost as is increasing the 
size of the cache, so it won't be expensive to make these changes for all cases.
   
   I think the two good options we have is that we can move the cache resize 
and create thread into the stateLock or we can undo the changes we made if we 
have to abort starting the new thread. 





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] hachikuji commented on a change in pull request #9382: KAFKA-10554; Perform follower truncation based on diverging epochs in Fetch response

2020-12-01 Thread GitBox


hachikuji commented on a change in pull request #9382:
URL: https://github.com/apache/kafka/pull/9382#discussion_r533648722



##
File path: core/src/main/scala/kafka/server/AbstractFetcherThread.scala
##
@@ -426,21 +451,34 @@ abstract class AbstractFetcherThread(name: String,
 warn(s"Partition $topicPartition marked as failed")
   }
 
-  def addPartitions(initialFetchStates: Map[TopicPartition, OffsetAndEpoch]): 
Set[TopicPartition] = {
+  /**
+   * Returns initial partition fetch state based on current state and the 
provided `initialFetchState`.
+   * From IBP 2.7 onwards, we can rely on truncation based on diverging data 
returned in fetch responses.
+   * For older versions, we can skip the truncation step iff the leader epoch 
matches the existing epoch.
+   */
+  private def partitionFetchState(tp: TopicPartition, initialFetchState: 
InitialFetchState, currentState: PartitionFetchState): PartitionFetchState = {
+if (currentState != null && currentState.currentLeaderEpoch == 
initialFetchState.currentLeaderEpoch) {
+  currentState
+} else if (isTruncationOnFetchSupported && initialFetchState.initOffset >= 
0 && initialFetchState.lastFetchedEpoch.nonEmpty &&
+  (currentState == null || currentState.state == Fetching)) {
+  PartitionFetchState(initialFetchState.initOffset, None, 
initialFetchState.currentLeaderEpoch,
+  state = Fetching, initialFetchState.lastFetchedEpoch)

Review comment:
   This check is a still a little hard to follow. I think we expect that if 
`initOffset` is negative, then `lastFetchedEpoch` will be empty and we will hit 
the `fetchOffsetAndTruncate` case below. Is that right? On the other hand, if 
`lastFetchedEpoch` is empty, then `initOffset` could still be non-negative if 
we have an old message format, which means we need to enter `Truncating` so 
that we can truncate to the high watermark. 
   
One case that is not so clear is when `currentState` is non-null. Then we 
will enter the `Truncating` state below regardless whether 
`isTruncationOnFetchSupported` is set or not. Is that what we want?

##
File path: core/src/main/scala/kafka/server/AbstractFetcherThread.scala
##
@@ -669,11 +714,18 @@ abstract class AbstractFetcherThread(name: String,
 Option(partitionStates.stateValue(topicPartition))
   }
 
+  /**
+   * Returns current fetch state for each partition assigned to this thread. 
This is used to reassign
+   * partitions when thread pool is resized. We return `lastFetchedEpoch=None` 
to ensure we go through

Review comment:
   This is probably ok. I guess an alternative would be to not take the 
initial last fetched epoch from `InitialFetchState`, but instead use 
`latestEpoch`.

##
File path: core/src/main/scala/kafka/server/ReplicaFetcherThread.scala
##
@@ -102,6 +103,7 @@ class ReplicaFetcherThread(name: String,
   private val maxBytes = brokerConfig.replicaFetchResponseMaxBytes
   private val fetchSize = brokerConfig.replicaFetchMaxBytes
   private val brokerSupportsLeaderEpochRequest = 
brokerConfig.interBrokerProtocolVersion >= KAFKA_0_11_0_IV2
+  private val brokerSupportsTruncationOnFetch = 
ApiVersion.isTruncationOnFetchSupported(brokerConfig.interBrokerProtocolVersion)

Review comment:
   nit: I don't think we need this. We can override 
`isTruncationOnFetchSupported` with a `val`





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] cadonna commented on a change in pull request #9615: KAFKA-10500: Add thread option

2020-12-01 Thread GitBox


cadonna commented on a change in pull request #9615:
URL: https://github.com/apache/kafka/pull/9615#discussion_r533687031



##
File path: streams/src/main/java/org/apache/kafka/streams/KafkaStreams.java
##
@@ -894,19 +885,88 @@ private KafkaStreams(final InternalTopologyBuilder 
internalTopologyBuilder,
 queryableStoreProvider = new QueryableStoreProvider(storeProviders, 
globalStateStoreProvider);
 
 stateDirCleaner = setupStateDirCleaner();
-oldHandler = false;
 maybeWarnAboutCodeInRocksDBConfigSetter(log, config);
 rocksDBMetricsRecordingService = 
maybeCreateRocksDBMetricsRecordingService(clientId, config);
 }
 
+private StreamThread createStreamThread(final long cacheSizePerThread, 
final int threadIdx) {
+final StreamThread streamThread = StreamThread.create(
+internalTopologyBuilder,
+config,
+clientSupplier,
+adminClient,
+processId,
+clientId,
+streamsMetrics,
+time,
+streamsMetadataState,
+cacheSizePerThread,
+stateDirectory,
+delegatingStateRestoreListener,
+threadIdx,
+KafkaStreams.this::closeToError,
+streamsUncaughtExceptionHandler
+);
+threads.add(streamThread);
+threadState.put(streamThread.getId(), streamThread.state());
+storeProviders.add(new StreamThreadStateStoreProvider(streamThread));
+return streamThread;
+}
+
+/**
+ * Adds and starts a stream thread in addition to the stream threads that 
are already running in this
+ * Kafka Streams client.
+ *
+ * Since the number of stream threads increases, the sizes of the caches 
in the new stream thread
+ * and the existing stream threads are adapted so that the sum of the 
cache sizes over all stream
+ * threads does not exceed the total cache size specified in configuration
+ * {@code cache.max.bytes.buffering}.
+ *
+ * Stream threads can only be added if this Kafka Streams client is in 
state RUNNING or REBALANCING.
+ *
+ * @return name of the added stream thread or empty if a new stream thread 
could not be added
+ */
+public Optional addStreamThread() {
+if (isRunningOrRebalancing()) {
+final int threadIdx = getNextThreadIndex();
+final long cacheSizePerThread = 
getCacheSizePerThread(threads.size() + 1);
+resizeThreadCache(cacheSizePerThread);
+final StreamThread streamThread = 
createStreamThread(cacheSizePerThread, threadIdx);
+streamThread.setStateListener(streamStateListener);
+synchronized (stateLock) {
+if (isRunningOrRebalancing()) {
+streamThread.start();

Review comment:
   Yes, currently this assumption is correct, but if the state transitions 
change in future, we would be safe if we do the cleanup.
   On a second thought, we are probably not 100% safe because if a transition 
from `NOT_RUNNING` to `RUNNING` is added (or any other transition that goes 
from the above mentioned states to `RUNNING` or `REBALANCING`), we would still 
not do the clean up. 





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] C0urante commented on pull request #9669: KAFKA-10792: Prevent source task shutdown from blocking herder thread

2020-12-01 Thread GitBox


C0urante commented on pull request #9669:
URL: https://github.com/apache/kafka/pull/9669#issuecomment-736784767


   @tombentley @gharris1727 @ncliang care to take a look?



This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] C0urante opened a new pull request #9669: KAFKA-10792: Prevent source task shutdown from blocking herder thread

2020-12-01 Thread GitBox


C0urante opened a new pull request #9669:
URL: https://github.com/apache/kafka/pull/9669


   [Jira](https://issues.apache.org/jira/browse/KAFKA-10792)
   
   The functional changes are simple: change the `WorkerSourceTask` class to 
only call `SourceTask::stop` from one location, during task shutdown, and only 
if an attempt has been made to start the task (which will not be the case if it 
was created in the paused state and then shut down before being started).
   
   Unit tests are tweaked where necessary to account for this new logic, which 
covers some edge cases mentioned in https://github.com/apache/kafka/pull/5020 
that were unaddressed up until now.
   
   The existing integration tests for blocking connectors are expanded to also 
include cases for blocking source and sink tasks. Full coverage of every 
source/sink task method is intentionally omitted from these expanded tests in 
order to avoid inflating test runtime (each one adds an extra 5 seconds at 
minimum) and because the tests that are added here were sufficient to reproduce 
the bug with source task shutdown.
   
   ### Committer Checklist (excluded from commit message)
   - [ ] Verify design and implementation 
   - [ ] Verify test coverage and CI build status
   - [ ] Verify documentation (including upgrade notes)
   



This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[jira] [Created] (KAFKA-10792) Source tasks can block herder thread by hanging during stop

2020-12-01 Thread Chris Egerton (Jira)
Chris Egerton created KAFKA-10792:
-

 Summary: Source tasks can block herder thread by hanging during 
stop
 Key: KAFKA-10792
 URL: https://issues.apache.org/jira/browse/KAFKA-10792
 Project: Kafka
  Issue Type: Bug
  Components: KafkaConnect
Affects Versions: 2.5.1, 2.6.0, 2.4.1, 2.5.0, 2.4.0, 2.4.2, 2.7.0
Reporter: Chris Egerton
Assignee: Chris Egerton


If a source task blocks during its {{stop}} method, the herder thread will also 
block, which can cause issues with detecting rebalances, reconfiguring 
connectors, and other vital functions of a Connect worker.

This occurs because the call to {{SourceTask::stop}} occurs on the herder's 
thread, instead of on the source task's own dedicated thread. This can be fixed 
by moving the call to {{SourceTask::stop}} onto the source task's dedicated 
thread and aligning with the current approach for {{Connector}}s and 
{{SinkTask}}s.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[GitHub] [kafka] wcarlson5 commented on a change in pull request #9615: KAFKA-10500: Add thread option

2020-12-01 Thread GitBox


wcarlson5 commented on a change in pull request #9615:
URL: https://github.com/apache/kafka/pull/9615#discussion_r533676976



##
File path: streams/src/main/java/org/apache/kafka/streams/KafkaStreams.java
##
@@ -894,19 +885,88 @@ private KafkaStreams(final InternalTopologyBuilder 
internalTopologyBuilder,
 queryableStoreProvider = new QueryableStoreProvider(storeProviders, 
globalStateStoreProvider);
 
 stateDirCleaner = setupStateDirCleaner();
-oldHandler = false;
 maybeWarnAboutCodeInRocksDBConfigSetter(log, config);
 rocksDBMetricsRecordingService = 
maybeCreateRocksDBMetricsRecordingService(clientId, config);
 }
 
+private StreamThread createStreamThread(final long cacheSizePerThread, 
final int threadIdx) {
+final StreamThread streamThread = StreamThread.create(
+internalTopologyBuilder,
+config,
+clientSupplier,
+adminClient,
+processId,
+clientId,
+streamsMetrics,
+time,
+streamsMetadataState,
+cacheSizePerThread,
+stateDirectory,
+delegatingStateRestoreListener,
+threadIdx,
+KafkaStreams.this::closeToError,
+streamsUncaughtExceptionHandler
+);
+threads.add(streamThread);
+threadState.put(streamThread.getId(), streamThread.state());
+storeProviders.add(new StreamThreadStateStoreProvider(streamThread));
+return streamThread;
+}
+
+/**
+ * Adds and starts a stream thread in addition to the stream threads that 
are already running in this
+ * Kafka Streams client.
+ *
+ * Since the number of stream threads increases, the sizes of the caches 
in the new stream thread
+ * and the existing stream threads are adapted so that the sum of the 
cache sizes over all stream
+ * threads does not exceed the total cache size specified in configuration
+ * {@code cache.max.bytes.buffering}.
+ *
+ * Stream threads can only be added if this Kafka Streams client is in 
state RUNNING or REBALANCING.
+ *
+ * @return name of the added stream thread or empty if a new stream thread 
could not be added
+ */
+public Optional addStreamThread() {
+if (isRunningOrRebalancing()) {
+final int threadIdx = getNextThreadIndex();
+final long cacheSizePerThread = 
getCacheSizePerThread(threads.size() + 1);
+resizeThreadCache(cacheSizePerThread);
+final StreamThread streamThread = 
createStreamThread(cacheSizePerThread, threadIdx);
+streamThread.setStateListener(streamStateListener);
+synchronized (stateLock) {
+if (isRunningOrRebalancing()) {
+streamThread.start();

Review comment:
   From running or rebalancing aren't those the only states we can get to?





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] cadonna commented on a change in pull request #9615: KAFKA-10500: Add thread option

2020-12-01 Thread GitBox


cadonna commented on a change in pull request #9615:
URL: https://github.com/apache/kafka/pull/9615#discussion_r533675429



##
File path: streams/src/main/java/org/apache/kafka/streams/KafkaStreams.java
##
@@ -894,19 +885,88 @@ private KafkaStreams(final InternalTopologyBuilder 
internalTopologyBuilder,
 queryableStoreProvider = new QueryableStoreProvider(storeProviders, 
globalStateStoreProvider);
 
 stateDirCleaner = setupStateDirCleaner();
-oldHandler = false;
 maybeWarnAboutCodeInRocksDBConfigSetter(log, config);
 rocksDBMetricsRecordingService = 
maybeCreateRocksDBMetricsRecordingService(clientId, config);
 }
 
+private StreamThread createStreamThread(final long cacheSizePerThread, 
final int threadIdx) {
+final StreamThread streamThread = StreamThread.create(
+internalTopologyBuilder,
+config,
+clientSupplier,
+adminClient,
+processId,
+clientId,
+streamsMetrics,
+time,
+streamsMetadataState,
+cacheSizePerThread,
+stateDirectory,
+delegatingStateRestoreListener,
+threadIdx,
+KafkaStreams.this::closeToError,
+streamsUncaughtExceptionHandler
+);
+threads.add(streamThread);
+threadState.put(streamThread.getId(), streamThread.state());
+storeProviders.add(new StreamThreadStateStoreProvider(streamThread));
+return streamThread;
+}
+
+/**
+ * Adds and starts a stream thread in addition to the stream threads that 
are already running in this
+ * Kafka Streams client.
+ *
+ * Since the number of stream threads increases, the sizes of the caches 
in the new stream thread
+ * and the existing stream threads are adapted so that the sum of the 
cache sizes over all stream
+ * threads does not exceed the total cache size specified in configuration
+ * {@code cache.max.bytes.buffering}.
+ *
+ * Stream threads can only be added if this Kafka Streams client is in 
state RUNNING or REBALANCING.
+ *
+ * @return name of the added stream thread or empty if a new stream thread 
could not be added
+ */
+public Optional addStreamThread() {
+if (isRunningOrRebalancing()) {
+final int threadIdx = getNextThreadIndex();
+final long cacheSizePerThread = 
getCacheSizePerThread(threads.size() + 1);
+resizeThreadCache(cacheSizePerThread);
+final StreamThread streamThread = 
createStreamThread(cacheSizePerThread, threadIdx);
+streamThread.setStateListener(streamStateListener);
+synchronized (stateLock) {
+if (isRunningOrRebalancing()) {
+streamThread.start();

Review comment:
   What about checking for the state and do the clean-up only if the state 
is not `PENDING_SHUTDOWN` and not `ERROR` and not `NOT_RUNNING`? In this way we 
are safe for future changes that break our assumption on state transitions and 
we make sure not to do unnecessary stuff when we are shutting down.   





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] mjsax opened a new pull request #9668: MINOR: add test for repartition/source-topic/changelog optimization

2020-12-01 Thread GitBox


mjsax opened a new pull request #9668:
URL: https://github.com/apache/kafka/pull/9668


   If topology optimization is enabled, KafkaStreams does not create store
   changelog topics but re-uses source input topics if possible. However,
   this optimization should not be applied to internal repartition topics,
   because those are actively purged.
   
   Call for review @ableegoldman 



This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] wcarlson5 commented on a change in pull request #9615: KAFKA-10500: Add thread option

2020-12-01 Thread GitBox


wcarlson5 commented on a change in pull request #9615:
URL: https://github.com/apache/kafka/pull/9615#discussion_r533656732



##
File path: streams/src/main/java/org/apache/kafka/streams/KafkaStreams.java
##
@@ -894,19 +885,88 @@ private KafkaStreams(final InternalTopologyBuilder 
internalTopologyBuilder,
 queryableStoreProvider = new QueryableStoreProvider(storeProviders, 
globalStateStoreProvider);
 
 stateDirCleaner = setupStateDirCleaner();
-oldHandler = false;
 maybeWarnAboutCodeInRocksDBConfigSetter(log, config);
 rocksDBMetricsRecordingService = 
maybeCreateRocksDBMetricsRecordingService(clientId, config);
 }
 
+private StreamThread createStreamThread(final long cacheSizePerThread, 
final int threadIdx) {
+final StreamThread streamThread = StreamThread.create(
+internalTopologyBuilder,
+config,
+clientSupplier,
+adminClient,
+processId,
+clientId,
+streamsMetrics,
+time,
+streamsMetadataState,
+cacheSizePerThread,
+stateDirectory,
+delegatingStateRestoreListener,
+threadIdx,
+KafkaStreams.this::closeToError,
+streamsUncaughtExceptionHandler
+);
+threads.add(streamThread);
+threadState.put(streamThread.getId(), streamThread.state());
+storeProviders.add(new StreamThreadStateStoreProvider(streamThread));
+return streamThread;
+}
+
+/**
+ * Adds and starts a stream thread in addition to the stream threads that 
are already running in this
+ * Kafka Streams client.
+ *
+ * Since the number of stream threads increases, the sizes of the caches 
in the new stream thread
+ * and the existing stream threads are adapted so that the sum of the 
cache sizes over all stream
+ * threads does not exceed the total cache size specified in configuration
+ * {@code cache.max.bytes.buffering}.
+ *
+ * Stream threads can only be added if this Kafka Streams client is in 
state RUNNING or REBALANCING.
+ *
+ * @return name of the added stream thread or empty if a new stream thread 
could not be added
+ */
+public Optional addStreamThread() {
+if (isRunningOrRebalancing()) {
+final int threadIdx = getNextThreadIndex();
+final long cacheSizePerThread = 
getCacheSizePerThread(threads.size() + 1);
+resizeThreadCache(cacheSizePerThread);
+final StreamThread streamThread = 
createStreamThread(cacheSizePerThread, threadIdx);
+streamThread.setStateListener(streamStateListener);
+synchronized (stateLock) {
+if (isRunningOrRebalancing()) {
+streamThread.start();

Review comment:
   There will be two more cases of remove. In the replace thread option and 
in the remove thread option.
   
   I'm not really convinced it is necessary but I don't see a problem with 
re-resizing the cache if we do not start the thread. I don't think there will 
be any side affects as the client should be shutting down, but if we resize 
there should be a little extra info in the state and store providers but it 
would not get used

##
File path: 
streams/src/main/java/org/apache/kafka/streams/state/internals/ThreadCache.java
##
@@ -76,6 +76,9 @@ public void resize(final long newCacheSizeBytes) {
 final boolean shrink = newCacheSizeBytes < maxCacheSizeBytes;
 maxCacheSizeBytes = newCacheSizeBytes;
 if (shrink) {
+if (caches.values().isEmpty()) {

Review comment:
   yeah, I didn't realize this was a problem, but when I added more test 
coverage it showed up

##
File path: streams/src/main/java/org/apache/kafka/streams/KafkaStreams.java
##
@@ -894,19 +885,88 @@ private KafkaStreams(final InternalTopologyBuilder 
internalTopologyBuilder,
 queryableStoreProvider = new QueryableStoreProvider(storeProviders, 
globalStateStoreProvider);
 
 stateDirCleaner = setupStateDirCleaner();
-oldHandler = false;
 maybeWarnAboutCodeInRocksDBConfigSetter(log, config);
 rocksDBMetricsRecordingService = 
maybeCreateRocksDBMetricsRecordingService(clientId, config);
 }
 
+private StreamThread createStreamThread(final long cacheSizePerThread, 
final int threadIdx) {
+final StreamThread streamThread = StreamThread.create(
+internalTopologyBuilder,
+config,
+clientSupplier,
+adminClient,
+processId,
+clientId,
+streamsMetrics,
+time,
+streamsMetadataState,
+cacheSizePerThread,
+stateDirectory,
+delegatingStateRestoreListener,
+

[GitHub] [kafka] ableegoldman commented on pull request #9609: KAFKA-6687: restrict DSL to allow only Streams from the same source topics

2020-12-01 Thread GitBox


ableegoldman commented on pull request #9609:
URL: https://github.com/apache/kafka/pull/9609#issuecomment-736766888


   Merged to trunk



This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] ableegoldman merged pull request #9609: KAFKA-6687: restrict DSL to allow only Streams from the same source topics

2020-12-01 Thread GitBox


ableegoldman merged pull request #9609:
URL: https://github.com/apache/kafka/pull/9609


   



This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[jira] [Commented] (KAFKA-10772) java.lang.IllegalStateException: There are insufficient bytes available to read assignment from the sync-group response (actual byte size 0)

2020-12-01 Thread A. Sophie Blee-Goldman (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-10772?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17241814#comment-17241814
 ] 

A. Sophie Blee-Goldman commented on KAFKA-10772:


Hey [~lkokhreidze], thanks for the report. Definitely looks like a bug – based 
on the error message, presumably either in the assignment function or with 
static membership. Are you using static membership?

Do you have any client logs from around the time of the exception? Broker logs 
would be useful as well, but probably only client logs can tell us if "the 
leader's assign function is buggy and did not return any assignment for this 
member" or not

> java.lang.IllegalStateException: There are insufficient bytes available to 
> read assignment from the sync-group response (actual byte size 0)
> 
>
> Key: KAFKA-10772
> URL: https://issues.apache.org/jira/browse/KAFKA-10772
> Project: Kafka
>  Issue Type: Bug
>  Components: streams
>Affects Versions: 2.6.0
>Reporter: Levani Kokhreidze
>Priority: Major
>
> From time to time we encounter the following exception that results in Kafka 
> Streams threads dying.
> Broker version 2.4.1, Client version 2.6.0
> {code:java}
> Nov 27 00:59:53.681 streaming-app service: prod | streaming-app-2 | 
> stream-client [cluster1-profile-stats-pipeline-client-id] State transition 
> from REBALANCING to ERROR Nov 27 00:59:53.681 streaming-app service: prod | 
> streaming-app-2 | stream-client [cluster1-profile-stats-pipeline-client-id] 
> State transition from REBALANCING to ERROR Nov 27 00:59:53.682 streaming-app 
> service: prod | streaming-app-2 | 2020-11-27 00:59:53.681 ERROR 105 --- 
> [-StreamThread-1] .KafkaStreamsBasedStreamProcessingEngine : Stream 
> processing pipeline: [profile-stats] encountered unrecoverable exception. 
> Thread: [cluster1-profile-stats-pipeline-client-id-StreamThread-1] is 
> completely dead. If all worker threads die, Kafka Streams will be moved to 
> permanent ERROR state. Nov 27 00:59:53.682 streaming-app service: prod | 
> streaming-app-2 | Stream processing pipeline: [profile-stats] encountered 
> unrecoverable exception. Thread: 
> [cluster1-profile-stats-pipeline-client-id-StreamThread-1] is completely 
> dead. If all worker threads die, Kafka Streams will be moved to permanent 
> ERROR state. java.lang.IllegalStateException: There are insufficient bytes 
> available to read assignment from the sync-group response (actual byte size 
> 0) , this is not expected; it is possible that the leader's assign function 
> is buggy and did not return any assignment for this member, or because static 
> member is configured and the protocol is buggy hence did not get the 
> assignment for this member at 
> org.apache.kafka.clients.consumer.internals.ConsumerCoordinator.onJoinComplete(ConsumerCoordinator.java:367)
>  at 
> org.apache.kafka.clients.consumer.internals.AbstractCoordinator.joinGroupIfNeeded(AbstractCoordinator.java:440)
>  at 
> org.apache.kafka.clients.consumer.internals.AbstractCoordinator.ensureActiveGroup(AbstractCoordinator.java:359)
>  at 
> org.apache.kafka.clients.consumer.internals.ConsumerCoordinator.poll(ConsumerCoordinator.java:513)
>  at 
> org.apache.kafka.clients.consumer.KafkaConsumer.updateAssignmentMetadataIfNeeded(KafkaConsumer.java:1268)
>  at 
> org.apache.kafka.clients.consumer.KafkaConsumer.poll(KafkaConsumer.java:1230) 
> at 
> org.apache.kafka.clients.consumer.KafkaConsumer.poll(KafkaConsumer.java:1210) 
> at 
> org.apache.kafka.streams.processor.internals.StreamThread.pollRequests(StreamThread.java:766)
>  at 
> org.apache.kafka.streams.processor.internals.StreamThread.runOnce(StreamThread.java:624)
>  at 
> org.apache.kafka.streams.processor.internals.StreamThread.runLoop(StreamThread.java:551)
>  at 
> org.apache.kafka.streams.processor.internals.StreamThread.run(StreamThread.java:510)
> {code}



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[GitHub] [kafka] hachikuji merged pull request #9663: MINOR: Small cleanups in `AlterIsr` handling logic

2020-12-01 Thread GitBox


hachikuji merged pull request #9663:
URL: https://github.com/apache/kafka/pull/9663


   



This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] dajac commented on pull request #9666: MINOR: Remove broken `.travis.yml` with system test hooks

2020-12-01 Thread GitBox


dajac commented on pull request #9666:
URL: https://github.com/apache/kafka/pull/9666#issuecomment-736754455


   I am +1 on removing it. We can always bring it back to trunk if we can make 
it work later.



This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] ableegoldman commented on pull request #9654: MINOR: Increase unit test coverage of ProcessorTopology#updateSourceTopics()

2020-12-01 Thread GitBox


ableegoldman commented on pull request #9654:
URL: https://github.com/apache/kafka/pull/9654#issuecomment-736752649


   Merged to trunk



This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] ableegoldman merged pull request #9654: MINOR: Increase unit test coverage of ProcessorTopology#updateSourceTopics()

2020-12-01 Thread GitBox


ableegoldman merged pull request #9654:
URL: https://github.com/apache/kafka/pull/9654


   



This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] lmr3796 commented on a change in pull request #9435: KAFKA-10606: Disable auto topic creation for fetch-all-topic-metadata request

2020-12-01 Thread GitBox


lmr3796 commented on a change in pull request #9435:
URL: https://github.com/apache/kafka/pull/9435#discussion_r533648988



##
File path: core/src/main/scala/kafka/server/KafkaApis.scala
##
@@ -1250,13 +1251,23 @@ class KafkaApis(val requestChannel: RequestChannel,
 metadataResponseTopic(Errors.INVALID_REPLICATION_FACTOR, topic, 
true, util.Collections.emptyList())
   else
 topicMetadata
-} else if (allowAutoTopicCreation && config.autoCreateTopicsEnable) {
+} else if (!isFetchAllMetadata && allowAutoTopicCreation && 
config.autoCreateTopicsEnable) {
+  // KAFKA-10606: If this request is to get metadata for all topics, 
auto topic creation should not be allowed
+  // The special handling is necessary on broker side because 
allowAutoTopicCreation is hard coded to true
+  // for backward compatibility on client side.
   createTopic(topic, config.numPartitions, 
config.defaultReplicationFactor)
 } else {
   metadataResponseTopic(Errors.UNKNOWN_TOPIC_OR_PARTITION, topic, 
false, util.Collections.emptyList())

Review comment:
   @ijuma That's a good point.  I've updated it.





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




  1   2   >