[jira] [Commented] (KAFKA-10643) Static membership - repetitive PreparingRebalance with updating metadata for member reason

2020-11-23 Thread Eran Levy (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-10643?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17237200#comment-17237200
 ] 

Eran Levy commented on KAFKA-10643:
---

[~ableegoldman] It looks like its rocksdb related, I cant point my finger on 
what exactly but Im using a changelog topic to log the rocksdb store and the 
storage became very large around 50GB-70GB per stream pod and we have 12 in 
total for this app.

After cleaning up the changelog topics, things became to be much more normal. 
We are using AWS ebs io1 disks so i assume that with that large amount of 
state, we might reached the limits..

>From your experience, what are the most important things / metrics to look at 
>while using rocksdb state store along with a changelog topic?

> Static membership - repetitive PreparingRebalance with updating metadata for 
> member reason
> --
>
> Key: KAFKA-10643
> URL: https://issues.apache.org/jira/browse/KAFKA-10643
> Project: Kafka
>  Issue Type: Bug
>  Components: streams
>Affects Versions: 2.6.0
>Reporter: Eran Levy
>Priority: Major
> Attachments: broker-4-11.csv, client-4-11.csv, 
> client-d-9-11-11-2020.csv
>
>
> Kafka streams 2.6.0, brokers version 2.6.0. Kafka nodes are healthy, kafka 
> streams app is healthy. 
> Configured with static membership. 
> Every 10 minutes (I assume cause of topic.metadata.refresh.interval.ms), I 
> see the following group coordinator log for different stream consumers: 
> INFO [GroupCoordinator 2]: Preparing to rebalance group **--**-stream in 
> state PreparingRebalance with old generation 12244 (__consumer_offsets-45) 
> (reason: Updating metadata for member 
> -stream-11-1-013edd56-ed93-4370-b07c-1c29fbe72c9a) 
> (kafka.coordinator.group.GroupCoordinator)
> and right after that the following log: 
> INFO [GroupCoordinator 2]: Assignment received from leader for group 
> **-**-stream for generation 12246 (kafka.coordinator.group.GroupCoordinator)
>  
> Looked a bit on the kafka code and Im not sure that I get why such a thing 
> happening - is this line described the situation that happens here re the 
> "reason:"?[https://github.com/apache/kafka/blob/7ca299b8c0f2f3256c40b694078e422350c20d19/core/src/main/scala/kafka/coordinator/group/GroupCoordinator.scala#L311]
> I also dont see it happening too often in other kafka streams applications 
> that we have. 
> The only thing suspicious that I see around every hour that different pods of 
> that kafka streams application throw this exception: 
> {"timestamp":"2020-10-25T06:44:20.414Z","level":"INFO","thread":"**-**-stream-94561945-4191-4a07-ac1b-07b27e044402-StreamThread-1","logger":"org.apache.kafka.clients.FetchSessionHandler","message":"[Consumer
>  
> clientId=**-**-stream-94561945-4191-4a07-ac1b-07b27e044402-StreamThread-1-restore-consumer,
>  groupId=null] Error sending fetch request (sessionId=34683236, epoch=2872) 
> to node 
> 3:","context":"default","exception":"org.apache.kafka.common.errors.DisconnectException:
>  null\n"}
> I came across this strange behaviour after stated to investigate a strange 
> stuck rebalancing state after one of the members left the group and caused 
> the rebalance to stuck - the only thing that I found is that maybe because 
> that too often preparing to rebalance states, the app might affected of this 
> bug - KAFKA-9752 ?
> I dont understand why it happens, it wasn't before I applied static 
> membership to that kafka streams application (since around 2 weeks ago). 
> Will be happy if you can help me
>  
>  



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[GitHub] [kafka] viktorsomogyi commented on pull request #9326: KAFKA-10460: ReplicaListValidator format checking is incomplete

2020-11-23 Thread GitBox


viktorsomogyi commented on pull request #9326:
URL: https://github.com/apache/kafka/pull/9326#issuecomment-732010943


   @mimaison would you please quickly review this?



This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[jira] [Updated] (KAFKA-10739) Replace EpochEndOffset with automated protocol

2020-11-23 Thread David Jacot (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-10739?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

David Jacot updated KAFKA-10739:

Description: Follow up of KAFKA-9630. We can avoid extra conversation by 
using the auto-generated data structure instead of using `EpochEndOffset ` 
internally.  (was: Follow up of KAFKA-9630. We can avoid extra conversation by 
using the auto-generated data structure instead of using 
`OffsetsForLeaderEpochRequest.PartitionData`.)

> Replace EpochEndOffset with automated protocol
> --
>
> Key: KAFKA-10739
> URL: https://issues.apache.org/jira/browse/KAFKA-10739
> Project: Kafka
>  Issue Type: Improvement
>Reporter: David Jacot
>Assignee: David Jacot
>Priority: Major
>
> Follow up of KAFKA-9630. We can avoid extra conversation by using the 
> auto-generated data structure instead of using `EpochEndOffset ` internally.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Updated] (KAFKA-10739) Replace EpochEndOffset with automated protocol

2020-11-23 Thread David Jacot (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-10739?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

David Jacot updated KAFKA-10739:

Summary: Replace EpochEndOffset with automated protocol  (was: 
KafkaApis#handleOffsetForLeaderEpochRequest should use OffsetForLeaderPartition 
directly)

> Replace EpochEndOffset with automated protocol
> --
>
> Key: KAFKA-10739
> URL: https://issues.apache.org/jira/browse/KAFKA-10739
> Project: Kafka
>  Issue Type: Improvement
>Reporter: David Jacot
>Assignee: David Jacot
>Priority: Major
>
> Follow up of KAFKA-9630. We can avoid extra conversation by using the 
> auto-generated data structure instead of using 
> `OffsetsForLeaderEpochRequest.PartitionData`.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[GitHub] [kafka] dajac commented on pull request #9630: KAFKA-10739; Replace EpochEndOffset with automated protocol

2020-11-23 Thread GitBox


dajac commented on pull request #9630:
URL: https://github.com/apache/kafka/pull/9630#issuecomment-732013207


   cc @hachikuji @chia7712 



This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] chia7712 opened a new pull request #9641: MINOR: Convert connect assignment schemas to use generated protocol

2020-11-23 Thread GitBox


chia7712 opened a new pull request #9641:
URL: https://github.com/apache/kafka/pull/9641


   as title.
   
   ### Committer Checklist (excluded from commit message)
   - [ ] Verify design and implementation 
   - [ ] Verify test coverage and CI build status
   - [ ] Verify documentation (including upgrade notes)
   



This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[jira] [Created] (KAFKA-10759) ARM support for Kafka

2020-11-23 Thread PengLei (Jira)
PengLei created KAFKA-10759:
---

 Summary: ARM support for Kafka
 Key: KAFKA-10759
 URL: https://issues.apache.org/jira/browse/KAFKA-10759
 Project: Kafka
  Issue Type: Improvement
  Components: build
Reporter: PengLei


ARM support for Kafka.

I tried to deploy the Kafka cluster on the ARM server, but unfortunately I did 
not find the official ARM  release for Kafka. I think more and more people will 
try the same thing as I do.

Now the CI of kafka (in github) is handled by jenkins-ci. While the test is 
running under x86 ARCH, the arm ARCH is missing. This leads an problem that we 
don't have a way to test every pull request that if it'll break the kafka 
deployment on arm or not. Similarly, we cannot provide the ARM release package 
without the ARM CI.

If Apache Kafka community has interested with it, I can help for the 
integration.

This is the umbrella issue to track the efforts to make Kafka run on ARM 
processors.

 

 



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[GitHub] [kafka] rajinisivaram commented on pull request #9605: KAFKA-10727; Handle Kerberos error during re-login as transient failure in clients

2020-11-23 Thread GitBox


rajinisivaram commented on pull request #9605:
URL: https://github.com/apache/kafka/pull/9605#issuecomment-732024151


   @omkreddy  @rondagostino Thanks for the reviews. 
   @rondagostino Yes, I had checked the OAuth code path to see if it had the 
same issue and found that you had already taken care of that :-)
   
   Quota test failure not related, merging to trunk.
   



This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[jira] [Commented] (KAFKA-10753) check if ConsumerConfig.AUTO_COMMIT_INTERVAL_MS_CONFIG > 0

2020-11-23 Thread shiqihao (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-10753?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17237237#comment-17237237
 ] 

shiqihao commented on KAFKA-10753:
--

Sorry for not being clear enough. The key point of this issue is: IF AND ONLY 
IF AUTO_COMMIT_INTERVAL_MS_CONFIG == 0, 

auto-commit will execute continuously, so that CPU resources will be exhausted. 
If AUTO_COMMIT_INTERVAL_MS_CONFIG > 0, the program runs well.

i.e. as 
[lqjacklee|https://issues.apache.org/jira/secure/ViewProfile.jspa?name=Jack-Lee]
 shows: if timeoutMs == 0, startMs == deadlineMs, the methods use Timer.reset() 
e.g. auto-commit or heartbeat will exhaust CPU resources.

> check if ConsumerConfig.AUTO_COMMIT_INTERVAL_MS_CONFIG > 0 
> ---
>
> Key: KAFKA-10753
> URL: https://issues.apache.org/jira/browse/KAFKA-10753
> Project: Kafka
>  Issue Type: Improvement
>  Components: consumer
>Reporter: shiqihao
>Assignee: lqjacklee
>Priority: Minor
>
> I accidentally set ConsumerConfig.AUTO_COMMIT_INTERVAL_MS_CONFIG = 0, CPU 
> running at 100%.
> Could we add a check if ConsumerConfig.AUTO_COMMIT_INTERVAL_MS_CONFIG > 0 
> while start consumer?



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[GitHub] [kafka] rajinisivaram merged pull request #9605: KAFKA-10727; Handle Kerberos error during re-login as transient failure in clients

2020-11-23 Thread GitBox


rajinisivaram merged pull request #9605:
URL: https://github.com/apache/kafka/pull/9605


   



This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[jira] [Resolved] (KAFKA-10727) Kafka clients throw AuthenticationException during Kerberos re-login

2020-11-23 Thread Rajini Sivaram (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-10727?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Rajini Sivaram resolved KAFKA-10727.

Fix Version/s: 2.8.0
 Reviewer: Manikumar
   Resolution: Fixed

> Kafka clients throw AuthenticationException during Kerberos re-login
> 
>
> Key: KAFKA-10727
> URL: https://issues.apache.org/jira/browse/KAFKA-10727
> Project: Kafka
>  Issue Type: Bug
>Reporter: Rajini Sivaram
>Assignee: Rajini Sivaram
>Priority: Major
> Fix For: 2.8.0
>
>
> During Kerberos re-login, we log out and login again. There is a timing issue 
> where the principal in the Subject has been cleared, but a new one hasn't 
> been populated yet. We need to ensure that we don't throw 
> AuthenticationException in this case to avoid Kafka clients 
> (consumer/producer etc.) failing instead of retrying.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[GitHub] [kafka] chia7712 commented on a change in pull request #9630: KAFKA-10739; Replace EpochEndOffset with automated protocol

2020-11-23 Thread GitBox


chia7712 commented on a change in pull request #9630:
URL: https://github.com/apache/kafka/pull/9630#discussion_r528545588



##
File path: core/src/main/scala/kafka/server/ReplicaManager.scala
##
@@ -1862,23 +1864,51 @@ class ReplicaManager(val config: KafkaConfig,
 }
   }
 
-  def lastOffsetForLeaderEpoch(requestedEpochInfo: Map[TopicPartition, 
OffsetsForLeaderEpochRequest.PartitionData]): Map[TopicPartition, 
EpochEndOffset] = {
-requestedEpochInfo.map { case (tp, partitionData) =>
-  val epochEndOffset = getPartition(tp) match {
-case HostedPartition.Online(partition) =>
-  partition.lastOffsetForLeaderEpoch(partitionData.currentLeaderEpoch, 
partitionData.leaderEpoch,
-fetchOnlyFromLeader = true)
-
-case HostedPartition.Offline =>
-  new EpochEndOffset(Errors.KAFKA_STORAGE_ERROR, UNDEFINED_EPOCH, 
UNDEFINED_EPOCH_OFFSET)
-
-case HostedPartition.None if metadataCache.contains(tp) =>
-  new EpochEndOffset(Errors.NOT_LEADER_OR_FOLLOWER, UNDEFINED_EPOCH, 
UNDEFINED_EPOCH_OFFSET)
-
-case HostedPartition.None =>
-  new EpochEndOffset(Errors.UNKNOWN_TOPIC_OR_PARTITION, 
UNDEFINED_EPOCH, UNDEFINED_EPOCH_OFFSET)
+  def lastOffsetForLeaderEpoch(
+requestedEpochInfo: Seq[OffsetForLeaderTopic]
+  ): Seq[OffsetForLeaderTopicResult] = {
+requestedEpochInfo.map { offsetForLeaderTopic =>
+  val partitions = offsetForLeaderTopic.partitions.asScala.map { 
offsetForLeaderPartition =>

Review comment:
   Could we avoid duplicate conversion between scala and java? It can be 
rewrite by java stream APIs so the  ```asScala```  can be avoid.

##
File path: core/src/main/scala/kafka/server/KafkaApis.scala
##
@@ -2592,25 +2596,39 @@ class KafkaApis(val requestChannel: RequestChannel,
 
   def handleOffsetForLeaderEpochRequest(request: RequestChannel.Request): Unit 
= {
 val offsetForLeaderEpoch = request.body[OffsetsForLeaderEpochRequest]
-val requestInfo = offsetForLeaderEpoch.epochsByTopicPartition.asScala
+val topics = offsetForLeaderEpoch.data.topics.asScala.toSeq
 
 // The OffsetsForLeaderEpoch API was initially only used for inter-broker 
communication and required
 // cluster permission. With KIP-320, the consumer now also uses this API 
to check for log truncation
 // following a leader change, so we also allow topic describe permission.
-val (authorizedPartitions, unauthorizedPartitions) =
+val (authorizedTopics, unauthorizedTopics) =
   if (authorize(request.context, CLUSTER_ACTION, CLUSTER, CLUSTER_NAME, 
logIfDenied = false))
-(requestInfo, Map.empty[TopicPartition, 
OffsetsForLeaderEpochRequest.PartitionData])
-  else partitionMapByAuthorized(request.context, DESCRIBE, TOPIC, 
requestInfo)(_.topic)
+(topics, Seq.empty[OffsetForLeaderTopic])
+  else partitionSeqByAuthorized(request.context, DESCRIBE, TOPIC, 
topics)(_.topic)
+
+val endOffsetsForAuthorizedPartitions = 
replicaManager.lastOffsetForLeaderEpoch(authorizedTopics)
+val endOffsetsForUnauthorizedPartitions = unauthorizedTopics.map { 
offsetForLeaderTopic =>
+  val partitions = offsetForLeaderTopic.partitions.asScala.map { 
offsetForLeaderPartition =>

Review comment:
   ditto

##
File path: 
core/src/test/scala/unit/kafka/server/epoch/util/ReplicaFetcherMockBlockingSend.scala
##
@@ -78,7 +80,19 @@ class ReplicaFetcherMockBlockingSend(offsets: 
java.util.Map[TopicPartition, Epoc
 callback.foreach(_.apply())
 epochFetchCount += 1
 lastUsedOffsetForLeaderEpochVersion = 
requestBuilder.latestAllowedVersion()
-new OffsetsForLeaderEpochResponse(currentOffsets)
+
+val data = new OffsetForLeaderEpochResponseData()
+currentOffsets.forEach((tp, offsetForLeaderPartition) => {
+  var topic = data.topics.find(tp.topic)
+  if (topic == null) {
+topic = new OffsetForLeaderTopicResult()
+  .setTopic(tp.topic)
+data.topics.add(topic)
+  }
+  
topic.partitions.add(offsetForLeaderPartition.setPartition(tp.partition))

Review comment:
   Should it be 
```topic.partitions.add(offsetForLeaderPartition.duplicate())```?





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] chia7712 commented on pull request #9435: KAFKA-10606: Disable auto topic creation for fetch-all-topic-metadata request

2020-11-23 Thread GitBox


chia7712 commented on pull request #9435:
URL: https://github.com/apache/kafka/pull/9435#issuecomment-732051836


   +1 to last commit. Will merge it to trunk tomorrow if no object.



This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] chia7712 commented on a change in pull request #9435: KAFKA-10606: Disable auto topic creation for fetch-all-topic-metadata request

2020-11-23 Thread GitBox


chia7712 commented on a change in pull request #9435:
URL: https://github.com/apache/kafka/pull/9435#discussion_r528585768



##
File path: core/src/test/scala/unit/kafka/server/KafkaApisTest.scala
##
@@ -107,6 +108,11 @@ class KafkaApisTest {
   private val time = new MockTime
   private val clientId = ""
 
+  @Before
+  def setUp(): Unit = {

Review comment:
   Could we remove this ```@Before``` code? Changing modifier of 
```metadataCache``` from "val" to "var" is enough.





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[jira] [Commented] (KAFKA-10643) Static membership - repetitive PreparingRebalance with updating metadata for member reason

2020-11-23 Thread Bruno Cadonna (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-10643?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17237265#comment-17237265
 ] 

Bruno Cadonna commented on KAFKA-10643:
---

[~eran-levy] Just to avoid misunderstandings: What do you mean exactly with 
"the storage became very large around 50GB-70GB per stream pod"? Do you mean 
the size used locally by the RocksDB state store or the size of the changelog 
topic on the Kafka brokers?

The size of the changelog topic on the Kafka brokers is independent of the 
state store used. It depends solely on the data your application writes to the 
state store without any state store specific overhead. Also the RocksDB metrics 
are independent of the changelog topic.

To identify write stalls, you could look at {{write-stall-duration-[avg | 
total]}} in the RocksDB metrics. For more RocksDB metrics, see 
[https://docs.confluent.io/platform/current/streams/monitoring.html#rocksdb-metrics]
 .

>From 2.7 on, there will be even more RocksDB metrics. See 
>[https://cwiki.apache.org/confluence/display/KAFKA/KIP-607%3A+Add+Metrics+to+Kafka+Streams+to+Report+Properties+of+RocksDB]
> with which you can also monitor the size of RocksDB's sst files and the 
>number of pending compactions.

> Static membership - repetitive PreparingRebalance with updating metadata for 
> member reason
> --
>
> Key: KAFKA-10643
> URL: https://issues.apache.org/jira/browse/KAFKA-10643
> Project: Kafka
>  Issue Type: Bug
>  Components: streams
>Affects Versions: 2.6.0
>Reporter: Eran Levy
>Priority: Major
> Attachments: broker-4-11.csv, client-4-11.csv, 
> client-d-9-11-11-2020.csv
>
>
> Kafka streams 2.6.0, brokers version 2.6.0. Kafka nodes are healthy, kafka 
> streams app is healthy. 
> Configured with static membership. 
> Every 10 minutes (I assume cause of topic.metadata.refresh.interval.ms), I 
> see the following group coordinator log for different stream consumers: 
> INFO [GroupCoordinator 2]: Preparing to rebalance group **--**-stream in 
> state PreparingRebalance with old generation 12244 (__consumer_offsets-45) 
> (reason: Updating metadata for member 
> -stream-11-1-013edd56-ed93-4370-b07c-1c29fbe72c9a) 
> (kafka.coordinator.group.GroupCoordinator)
> and right after that the following log: 
> INFO [GroupCoordinator 2]: Assignment received from leader for group 
> **-**-stream for generation 12246 (kafka.coordinator.group.GroupCoordinator)
>  
> Looked a bit on the kafka code and Im not sure that I get why such a thing 
> happening - is this line described the situation that happens here re the 
> "reason:"?[https://github.com/apache/kafka/blob/7ca299b8c0f2f3256c40b694078e422350c20d19/core/src/main/scala/kafka/coordinator/group/GroupCoordinator.scala#L311]
> I also dont see it happening too often in other kafka streams applications 
> that we have. 
> The only thing suspicious that I see around every hour that different pods of 
> that kafka streams application throw this exception: 
> {"timestamp":"2020-10-25T06:44:20.414Z","level":"INFO","thread":"**-**-stream-94561945-4191-4a07-ac1b-07b27e044402-StreamThread-1","logger":"org.apache.kafka.clients.FetchSessionHandler","message":"[Consumer
>  
> clientId=**-**-stream-94561945-4191-4a07-ac1b-07b27e044402-StreamThread-1-restore-consumer,
>  groupId=null] Error sending fetch request (sessionId=34683236, epoch=2872) 
> to node 
> 3:","context":"default","exception":"org.apache.kafka.common.errors.DisconnectException:
>  null\n"}
> I came across this strange behaviour after stated to investigate a strange 
> stuck rebalancing state after one of the members left the group and caused 
> the rebalance to stuck - the only thing that I found is that maybe because 
> that too often preparing to rebalance states, the app might affected of this 
> bug - KAFKA-9752 ?
> I dont understand why it happens, it wasn't before I applied static 
> membership to that kafka streams application (since around 2 weeks ago). 
> Will be happy if you can help me
>  
>  



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Comment Edited] (KAFKA-10643) Static membership - repetitive PreparingRebalance with updating metadata for member reason

2020-11-23 Thread Bruno Cadonna (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-10643?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17237265#comment-17237265
 ] 

Bruno Cadonna edited comment on KAFKA-10643 at 11/23/20, 10:08 AM:
---

[~eran-levy] Just to avoid misunderstandings: What do you mean exactly with 
"the storage became very large around 50GB-70GB per stream pod"? Do you mean 
the size used locally by the RocksDB state store or the size of the changelog 
topic on the Kafka brokers?

The size of the changelog topic on the Kafka brokers is independent of the 
state store used. It depends solely on the data your application writes to the 
state store without any state store specific overhead. Also the RocksDB metrics 
are independent of the changelog topic.

To identify write stalls, you could look at {{write-stall-duration-[avg | 
total]}} in the RocksDB metrics. For more RocksDB metrics, see 
[https://kafka.apache.org/documentation/#kafka_streams_rocksdb_monitoring] .

>From 2.7 on, there will be even more RocksDB metrics. See 
>[https://cwiki.apache.org/confluence/display/KAFKA/KIP-607%3A+Add+Metrics+to+Kafka+Streams+to+Report+Properties+of+RocksDB]
> with which you can also monitor the size of RocksDB's sst files and the 
>number of pending compactions.


was (Author: cadonna):
[~eran-levy] Just to avoid misunderstandings: What do you mean exactly with 
"the storage became very large around 50GB-70GB per stream pod"? Do you mean 
the size used locally by the RocksDB state store or the size of the changelog 
topic on the Kafka brokers?

The size of the changelog topic on the Kafka brokers is independent of the 
state store used. It depends solely on the data your application writes to the 
state store without any state store specific overhead. Also the RocksDB metrics 
are independent of the changelog topic.

To identify write stalls, you could look at {{write-stall-duration-[avg | 
total]}} in the RocksDB metrics. For more RocksDB metrics, see 
[https://docs.confluent.io/platform/current/streams/monitoring.html#rocksdb-metrics]
 .

>From 2.7 on, there will be even more RocksDB metrics. See 
>[https://cwiki.apache.org/confluence/display/KAFKA/KIP-607%3A+Add+Metrics+to+Kafka+Streams+to+Report+Properties+of+RocksDB]
> with which you can also monitor the size of RocksDB's sst files and the 
>number of pending compactions.

> Static membership - repetitive PreparingRebalance with updating metadata for 
> member reason
> --
>
> Key: KAFKA-10643
> URL: https://issues.apache.org/jira/browse/KAFKA-10643
> Project: Kafka
>  Issue Type: Bug
>  Components: streams
>Affects Versions: 2.6.0
>Reporter: Eran Levy
>Priority: Major
> Attachments: broker-4-11.csv, client-4-11.csv, 
> client-d-9-11-11-2020.csv
>
>
> Kafka streams 2.6.0, brokers version 2.6.0. Kafka nodes are healthy, kafka 
> streams app is healthy. 
> Configured with static membership. 
> Every 10 minutes (I assume cause of topic.metadata.refresh.interval.ms), I 
> see the following group coordinator log for different stream consumers: 
> INFO [GroupCoordinator 2]: Preparing to rebalance group **--**-stream in 
> state PreparingRebalance with old generation 12244 (__consumer_offsets-45) 
> (reason: Updating metadata for member 
> -stream-11-1-013edd56-ed93-4370-b07c-1c29fbe72c9a) 
> (kafka.coordinator.group.GroupCoordinator)
> and right after that the following log: 
> INFO [GroupCoordinator 2]: Assignment received from leader for group 
> **-**-stream for generation 12246 (kafka.coordinator.group.GroupCoordinator)
>  
> Looked a bit on the kafka code and Im not sure that I get why such a thing 
> happening - is this line described the situation that happens here re the 
> "reason:"?[https://github.com/apache/kafka/blob/7ca299b8c0f2f3256c40b694078e422350c20d19/core/src/main/scala/kafka/coordinator/group/GroupCoordinator.scala#L311]
> I also dont see it happening too often in other kafka streams applications 
> that we have. 
> The only thing suspicious that I see around every hour that different pods of 
> that kafka streams application throw this exception: 
> {"timestamp":"2020-10-25T06:44:20.414Z","level":"INFO","thread":"**-**-stream-94561945-4191-4a07-ac1b-07b27e044402-StreamThread-1","logger":"org.apache.kafka.clients.FetchSessionHandler","message":"[Consumer
>  
> clientId=**-**-stream-94561945-4191-4a07-ac1b-07b27e044402-StreamThread-1-restore-consumer,
>  groupId=null] Error sending fetch request (sessionId=34683236, epoch=2872) 
> to node 
> 3:","context":"default","exception":"org.apache.kafka.common.errors.DisconnectException:
>  null\n"}
> I came across this strange behaviour after stated to investigate a strange 
> stuck rebalancing state after one of the members left the 

[jira] [Commented] (KAFKA-10758) Kafka Streams consuming from a pattern goes to PENDING_SHUTDOWN when adding a new topic

2020-11-23 Thread Bruno Cadonna (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-10758?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17237270#comment-17237270
 ] 

Bruno Cadonna commented on KAFKA-10758:
---

[~davideicardi] Thank you for the report!

Could you please also post your call to {{StreamsBuilder#stream(Pattern)}} and 
your topology description that you can get with 
{{topology.describe().toString()}}?

> Kafka Streams consuming from a pattern goes to PENDING_SHUTDOWN when adding a 
> new topic
> ---
>
> Key: KAFKA-10758
> URL: https://issues.apache.org/jira/browse/KAFKA-10758
> Project: Kafka
>  Issue Type: Bug
>  Components: streams
>Affects Versions: 2.6.0
>Reporter: Davide Icardi
>Priority: Major
>
> I have a simple Kafka Stream app that consumes from multiple input topics 
> using the _stream_ function that accepts a Pattern 
> ([link|https://kafka.apache.org/26/javadoc/org/apache/kafka/streams/StreamsBuilder.html#stream-java.util.regex.Pattern-]).
>  
> Whenever I add a new topic that matches the pattern the kafka stream state 
> goes to REBALANCING -> ERROR -> PENDING_SHUTDOWN .
> If I restart the app it correctly starts reading again without problems.
> It is by design? Should I handle this and simply restart the app?
>  
> Kafka Stream version is 2.6.0.
> The error is the following:
> {code:java}
> ERROR o.a.k.s.p.i.ProcessorTopology - Set of source nodes do not match:
> sourceNodesByName = [KSTREAM-SOURCE-03, KSTREAM-SOURCE-02]
> sourceTopicsByName = [KSTREAM-SOURCE-00, KSTREAM-SOURCE-14, 
> KSTREAM-SOURCE-03, KSTREAM-SOURCE-02]
> org.apache.kafka.common.KafkaException: User rebalance callback throws an 
> error
>   at 
> org.apache.kafka.clients.consumer.internals.ConsumerCoordinator.onJoinComplete(ConsumerCoordinator.java:436)
>   at 
> org.apache.kafka.clients.consumer.internals.AbstractCoordinator.joinGroupIfNeeded(AbstractCoordinator.java:440)
>   at 
> org.apache.kafka.clients.consumer.internals.AbstractCoordinator.ensureActiveGroup(AbstractCoordinator.java:359)
>   at 
> org.apache.kafka.clients.consumer.internals.ConsumerCoordinator.poll(ConsumerCoordinator.java:513)
>   at 
> org.apache.kafka.clients.consumer.KafkaConsumer.updateAssignmentMetadataIfNeeded(KafkaConsumer.java:1268)
>   at 
> org.apache.kafka.clients.consumer.KafkaConsumer.poll(KafkaConsumer.java:1230)
>   at 
> org.apache.kafka.clients.consumer.KafkaConsumer.poll(KafkaConsumer.java:1210)
>   at 
> org.apache.kafka.streams.processor.internals.StreamThread.pollRequests(StreamThread.java:766)
>   at 
> org.apache.kafka.streams.processor.internals.StreamThread.runOnce(StreamThread.java:624)
>   at 
> org.apache.kafka.streams.processor.internals.StreamThread.runLoop(StreamThread.java:551)
>   at 
> org.apache.kafka.streams.processor.internals.StreamThread.run(StreamThread.java:510)
>  Caused by: java.lang.IllegalStateException: Tried to update source topics 
> but source nodes did not match
>   at 
> org.apache.kafka.streams.processor.internals.ProcessorTopology.updateSourceTopics(ProcessorTopology.java:151)
>   at 
> org.apache.kafka.streams.processor.internals.AbstractTask.update(AbstractTask.java:109)
>   at 
> org.apache.kafka.streams.processor.internals.StreamTask.update(StreamTask.java:514)
>   at 
> org.apache.kafka.streams.processor.internals.TaskManager.updateInputPartitionsAndResume(TaskManager.java:397)
>   at 
> org.apache.kafka.streams.processor.internals.TaskManager.handleAssignment(TaskManager.java:261)
>   at 
> org.apache.kafka.streams.processor.internals.StreamsPartitionAssignor.onAssignment(StreamsPartitionAssignor.java:1428)
>   at 
> org.apache.kafka.clients.consumer.internals.ConsumerCoordinator.invokeOnAssignment(ConsumerCoordinator.java:279)
>   at 
> org.apache.kafka.clients.consumer.internals.ConsumerCoordinator.onJoinComplete(ConsumerCoordinator.java:421)
>   ... 10 common frames omitted
>  KafkaStream state is ERROR
>  17:28:53.200 [datalake-StreamThread-1] ERROR 
> o.apache.kafka.streams.KafkaStreams - stream-client [datalake] All stream 
> threads have died. The instance will be in error state and should be closed.
>  > User rebalance callback throws an error
>  KafkaStream state is PENDING_SHUTDOWN
> {code}
>  
>  



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[GitHub] [kafka] dengziming commented on a change in pull request #9622: KAFKA-10547; add topicId in MetadataResp

2020-11-23 Thread GitBox


dengziming commented on a change in pull request #9622:
URL: https://github.com/apache/kafka/pull/9622#discussion_r528699411



##
File path: clients/src/main/java/org/apache/kafka/common/Cluster.java
##
@@ -327,6 +350,18 @@ public Node controller() {
 return controller;
 }
 
+public Collection topicIds() {
+return topicIds.values();
+}
+
+public Uuid getTopicId(String topic) {
+return topicIds.getOrDefault(topic, Uuid.ZERO_UUID);
+}
+
+public String getTopicName(Uuid topiId) {

Review comment:
   Here, I try to use getOrDefault but don't know what the default value 
is(it seems that "" is not a good default topicName). so just return null and 
let the caller decide, suggestions are welcomed if you have good ideas.





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] dengziming commented on a change in pull request #9622: KAFKA-10547; add topicId in MetadataResp

2020-11-23 Thread GitBox


dengziming commented on a change in pull request #9622:
URL: https://github.com/apache/kafka/pull/9622#discussion_r528707065



##
File path: clients/src/main/resources/common/message/MetadataRequest.json
##
@@ -31,9 +31,11 @@
 // Starting in version 8, authorized operations can be requested for 
cluster and topic resource.
 //
 // Version 9 is the first flexible version.
+// Version 10 add topicId
 { "name": "Topics", "type": "[]MetadataRequestTopic", "versions": "0+", 
"nullableVersions": "1+",
   "about": "The topics to fetch metadata for.", "fields": [
-  { "name": "Name", "type": "string", "versions": "0+", "entityType": 
"topicName",
+  { "name": "TopicId", "type": "uuid", "versions": "10+", "ignorable": 
true, "about": "The topic id." },
+  { "name": "Name", "type": "string", "versions": "0+", "entityType": 
"topicName", "ignorable": true,

Review comment:
   I tried to use nullable but got an Exceptioin with `Unrecognized field 
"nullable"` message.





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] dengziming commented on a change in pull request #9622: KAFKA-10547; add topicId in MetadataResp

2020-11-23 Thread GitBox


dengziming commented on a change in pull request #9622:
URL: https://github.com/apache/kafka/pull/9622#discussion_r528707065



##
File path: clients/src/main/resources/common/message/MetadataRequest.json
##
@@ -31,9 +31,11 @@
 // Starting in version 8, authorized operations can be requested for 
cluster and topic resource.
 //
 // Version 9 is the first flexible version.
+// Version 10 add topicId
 { "name": "Topics", "type": "[]MetadataRequestTopic", "versions": "0+", 
"nullableVersions": "1+",
   "about": "The topics to fetch metadata for.", "fields": [
-  { "name": "Name", "type": "string", "versions": "0+", "entityType": 
"topicName",
+  { "name": "TopicId", "type": "uuid", "versions": "10+", "ignorable": 
true, "about": "The topic id." },
+  { "name": "Name", "type": "string", "versions": "0+", "entityType": 
"topicName", "ignorable": true,

Review comment:
   I tried to use nullable but got an Exceptioin with `Unrecognized field 
"nullable"` message.





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[jira] [Commented] (KAFKA-10758) Kafka Streams consuming from a pattern goes to PENDING_SHUTDOWN when adding a new topic

2020-11-23 Thread Davide Icardi (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-10758?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17237407#comment-17237407
 ] 

Davide Icardi commented on KAFKA-10758:
---

[~cadonna] here the details requested:

I have this code:

 
{code:java}
private val inputCommandsStream =
streamsBuilder.stream[Key, 
Envelop[RawDataCommand]](Pattern.compile("^ingestion\\.datalake\\..+\\..+\\.commands$"))
private val inputEventStream =
streamsBuilder.stream[Key, 
Envelop[RawDataEvent]](Pattern.compile("^ingestion\\.datalake\\..+\\..+\\.events"))
{code}
And here the topology:

 
{code:java}
Topologies:
   Sub-topology: 0
Source: KSTREAM-SOURCE-00 (topics: [ingestion.datasources.events])
  --> KSTREAM-PROCESSOR-01
Processor: KSTREAM-PROCESSOR-01 (stores: [])
  --> none
  <-- KSTREAM-SOURCE-00  Sub-topology: 1
Source: KSTREAM-SOURCE-02 (topics: 
^ingestion\.datalake\..+\..+\.commands$)
  --> KSTREAM-LEFTJOIN-06
Processor: KSTREAM-LEFTJOIN-06 (stores: 
[ingestion.datalake.store.snapshots])
  --> KSTREAM-MAP-10, KSTREAM-SINK-07
  <-- KSTREAM-SOURCE-02
Source: KSTREAM-SOURCE-03 (topics: 
^ingestion\.datalake\..+\..+\.events)
  --> KSTREAM-FILTER-04
Processor: KSTREAM-FILTER-04 (stores: [])
  --> KSTREAM-AGGREGATE-05
  <-- KSTREAM-SOURCE-03
Processor: KSTREAM-AGGREGATE-05 (stores: 
[ingestion.datalake.store.snapshots])
  --> KTABLE-TOSTREAM-08
  <-- KSTREAM-FILTER-04
Processor: KSTREAM-MAP-10 (stores: [])
  --> KSTREAM-FILTER-13
  <-- KSTREAM-LEFTJOIN-06
Processor: KSTREAM-FILTER-13 (stores: [])
  --> KSTREAM-SINK-12
  <-- KSTREAM-MAP-10
Processor: KTABLE-TOSTREAM-08 (stores: [])
  --> KSTREAM-SINK-09
  <-- KSTREAM-AGGREGATE-05
Sink: KSTREAM-SINK-07 (extractor class: 
service.streaming.EventStreamTopicNameExtractor@20801cbb)
  <-- KSTREAM-LEFTJOIN-06
Sink: KSTREAM-SINK-09 (extractor class: 
service.streaming.SnapshotStreamTopicNameExtractor@1c240cf2)
  <-- KTABLE-TOSTREAM-08
Sink: KSTREAM-SINK-12 (topic: 
KSTREAM-TOTABLE-11-repartition)
  <-- KSTREAM-FILTER-13  Sub-topology: 2
Source: KSTREAM-SOURCE-14 (topics: 
[KSTREAM-TOTABLE-11-repartition])
  --> KSTREAM-TOTABLE-11
Processor: KSTREAM-TOTABLE-11 (stores: 
[ingestion.datalake.store.eventsByMsgId])
  --> none
  <-- KSTREAM-SOURCE-14

{code}
 

It is a work in progress, for sure to be optimized, but I don't understand the 
reason for the error.

 

thanks!

> Kafka Streams consuming from a pattern goes to PENDING_SHUTDOWN when adding a 
> new topic
> ---
>
> Key: KAFKA-10758
> URL: https://issues.apache.org/jira/browse/KAFKA-10758
> Project: Kafka
>  Issue Type: Bug
>  Components: streams
>Affects Versions: 2.6.0
>Reporter: Davide Icardi
>Priority: Major
>
> I have a simple Kafka Stream app that consumes from multiple input topics 
> using the _stream_ function that accepts a Pattern 
> ([link|https://kafka.apache.org/26/javadoc/org/apache/kafka/streams/StreamsBuilder.html#stream-java.util.regex.Pattern-]).
>  
> Whenever I add a new topic that matches the pattern the kafka stream state 
> goes to REBALANCING -> ERROR -> PENDING_SHUTDOWN .
> If I restart the app it correctly starts reading again without problems.
> It is by design? Should I handle this and simply restart the app?
>  
> Kafka Stream version is 2.6.0.
> The error is the following:
> {code:java}
> ERROR o.a.k.s.p.i.ProcessorTopology - Set of source nodes do not match:
> sourceNodesByName = [KSTREAM-SOURCE-03, KSTREAM-SOURCE-02]
> sourceTopicsByName = [KSTREAM-SOURCE-00, KSTREAM-SOURCE-14, 
> KSTREAM-SOURCE-03, KSTREAM-SOURCE-02]
> org.apache.kafka.common.KafkaException: User rebalance callback throws an 
> error
>   at 
> org.apache.kafka.clients.consumer.internals.ConsumerCoordinator.onJoinComplete(ConsumerCoordinator.java:436)
>   at 
> org.apache.kafka.clients.consumer.internals.AbstractCoordinator.joinGroupIfNeeded(AbstractCoordinator.java:440)
>   at 
> org.apache.kafka.clients.consumer.internals.AbstractCoordinator.ensureActiveGroup(AbstractCoordinator.java:359)
>   at 
> org.apache.kafka.clients.consumer.internals.ConsumerCoordinator.poll(ConsumerCoordinator.java:513)
>   at 
> org.apache.kafka.clients.consumer.KafkaConsumer.updateAssignmentMetadataIfNeeded(KafkaConsumer.java:1268)
>   at 
> org.apache.kafka.clients.consumer.KafkaConsumer.po

[jira] [Commented] (KAFKA-10758) Kafka Streams consuming from a pattern goes to PENDING_SHUTDOWN when adding a new topic

2020-11-23 Thread Davide Icardi (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-10758?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17237410#comment-17237410
 ] 

Davide Icardi commented on KAFKA-10758:
---

Also, to be more precise, the error doesn't happen exactly when I add the 
topic. But I think it happens when the app try to refresh the topics 
information. I think every 5 mins by default. Right?

> Kafka Streams consuming from a pattern goes to PENDING_SHUTDOWN when adding a 
> new topic
> ---
>
> Key: KAFKA-10758
> URL: https://issues.apache.org/jira/browse/KAFKA-10758
> Project: Kafka
>  Issue Type: Bug
>  Components: streams
>Affects Versions: 2.6.0
>Reporter: Davide Icardi
>Priority: Major
>
> I have a simple Kafka Stream app that consumes from multiple input topics 
> using the _stream_ function that accepts a Pattern 
> ([link|https://kafka.apache.org/26/javadoc/org/apache/kafka/streams/StreamsBuilder.html#stream-java.util.regex.Pattern-]).
>  
> Whenever I add a new topic that matches the pattern the kafka stream state 
> goes to REBALANCING -> ERROR -> PENDING_SHUTDOWN .
> If I restart the app it correctly starts reading again without problems.
> It is by design? Should I handle this and simply restart the app?
>  
> Kafka Stream version is 2.6.0.
> The error is the following:
> {code:java}
> ERROR o.a.k.s.p.i.ProcessorTopology - Set of source nodes do not match:
> sourceNodesByName = [KSTREAM-SOURCE-03, KSTREAM-SOURCE-02]
> sourceTopicsByName = [KSTREAM-SOURCE-00, KSTREAM-SOURCE-14, 
> KSTREAM-SOURCE-03, KSTREAM-SOURCE-02]
> org.apache.kafka.common.KafkaException: User rebalance callback throws an 
> error
>   at 
> org.apache.kafka.clients.consumer.internals.ConsumerCoordinator.onJoinComplete(ConsumerCoordinator.java:436)
>   at 
> org.apache.kafka.clients.consumer.internals.AbstractCoordinator.joinGroupIfNeeded(AbstractCoordinator.java:440)
>   at 
> org.apache.kafka.clients.consumer.internals.AbstractCoordinator.ensureActiveGroup(AbstractCoordinator.java:359)
>   at 
> org.apache.kafka.clients.consumer.internals.ConsumerCoordinator.poll(ConsumerCoordinator.java:513)
>   at 
> org.apache.kafka.clients.consumer.KafkaConsumer.updateAssignmentMetadataIfNeeded(KafkaConsumer.java:1268)
>   at 
> org.apache.kafka.clients.consumer.KafkaConsumer.poll(KafkaConsumer.java:1230)
>   at 
> org.apache.kafka.clients.consumer.KafkaConsumer.poll(KafkaConsumer.java:1210)
>   at 
> org.apache.kafka.streams.processor.internals.StreamThread.pollRequests(StreamThread.java:766)
>   at 
> org.apache.kafka.streams.processor.internals.StreamThread.runOnce(StreamThread.java:624)
>   at 
> org.apache.kafka.streams.processor.internals.StreamThread.runLoop(StreamThread.java:551)
>   at 
> org.apache.kafka.streams.processor.internals.StreamThread.run(StreamThread.java:510)
>  Caused by: java.lang.IllegalStateException: Tried to update source topics 
> but source nodes did not match
>   at 
> org.apache.kafka.streams.processor.internals.ProcessorTopology.updateSourceTopics(ProcessorTopology.java:151)
>   at 
> org.apache.kafka.streams.processor.internals.AbstractTask.update(AbstractTask.java:109)
>   at 
> org.apache.kafka.streams.processor.internals.StreamTask.update(StreamTask.java:514)
>   at 
> org.apache.kafka.streams.processor.internals.TaskManager.updateInputPartitionsAndResume(TaskManager.java:397)
>   at 
> org.apache.kafka.streams.processor.internals.TaskManager.handleAssignment(TaskManager.java:261)
>   at 
> org.apache.kafka.streams.processor.internals.StreamsPartitionAssignor.onAssignment(StreamsPartitionAssignor.java:1428)
>   at 
> org.apache.kafka.clients.consumer.internals.ConsumerCoordinator.invokeOnAssignment(ConsumerCoordinator.java:279)
>   at 
> org.apache.kafka.clients.consumer.internals.ConsumerCoordinator.onJoinComplete(ConsumerCoordinator.java:421)
>   ... 10 common frames omitted
>  KafkaStream state is ERROR
>  17:28:53.200 [datalake-StreamThread-1] ERROR 
> o.apache.kafka.streams.KafkaStreams - stream-client [datalake] All stream 
> threads have died. The instance will be in error state and should be closed.
>  > User rebalance callback throws an error
>  KafkaStream state is PENDING_SHUTDOWN
> {code}
>  
>  



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[GitHub] [kafka] abc863377 opened a new pull request #9642: MINOR: Merging WorkerTask constructor

2020-11-23 Thread GitBox


abc863377 opened a new pull request #9642:
URL: https://github.com/apache/kafka/pull/9642


   Replace the boolean value success with Throwable error.



This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] abc863377 commented on pull request #9642: MINOR: Merging WorkerTask constructor

2020-11-23 Thread GitBox


abc863377 commented on pull request #9642:
URL: https://github.com/apache/kafka/pull/9642#issuecomment-732208251


   @chia7712  Could you review it please



This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[jira] [Created] (KAFKA-10760) In compacted topic with max.compaction.lag.ms, the segments are not rolled until new messages arrive

2020-11-23 Thread Sarwar Bhuiyan (Jira)
Sarwar Bhuiyan created KAFKA-10760:
--

 Summary: In compacted topic with max.compaction.lag.ms, the 
segments are not rolled until new messages arrive
 Key: KAFKA-10760
 URL: https://issues.apache.org/jira/browse/KAFKA-10760
 Project: Kafka
  Issue Type: Bug
  Components: core
Reporter: Sarwar Bhuiyan


Currently, if a compacted topic has min.cleanable.dirty.ratio set to something 
low and max.compaction.lag.ms set to a small time, according to KIP 
[https://cwiki.apache.org/confluence/display/KAFKA/KIP-354] the expectation is 
that the active segment will be rolled regardless or segment.ms or whether new 
data has come in to "advance" the time. However, in practice, the current 
implementation only rolls the segment when new data which means that there are 
situations where the topic is not fully compacted until new data arrives which 
may not be until a while later. The implementation can be improved by rolling 
the segment just purely based on the max.compaction.lag.ms setting. 



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Updated] (KAFKA-10705) Avoid World Readable RocksDB

2020-11-23 Thread Mickael Maison (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-10705?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Mickael Maison updated KAFKA-10705:
---
Fix Version/s: (was: 2.6.2)
   2.6.1

> Avoid World Readable RocksDB
> 
>
> Key: KAFKA-10705
> URL: https://issues.apache.org/jira/browse/KAFKA-10705
> Project: Kafka
>  Issue Type: Bug
>Reporter: Walker Carlson
>Assignee: Leah Thomas
>Priority: Minor
>  Labels: streams
> Fix For: 2.6.1, 2.8.0, 2.7.1
>
>
> The state directory could be protected more restrictive by preventing access 
> to state directory for group and others. At least other should have no 
> readable access



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Updated] (KAFKA-10755) Should consider commit latency when computing next commit timestamp

2020-11-23 Thread Mickael Maison (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-10755?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Mickael Maison updated KAFKA-10755:
---
Fix Version/s: 2.6.1

> Should consider commit latency when computing next commit timestamp
> ---
>
> Key: KAFKA-10755
> URL: https://issues.apache.org/jira/browse/KAFKA-10755
> Project: Kafka
>  Issue Type: Bug
>  Components: streams
>Affects Versions: 2.6.0
>Reporter: Matthias J. Sax
>Assignee: Matthias J. Sax
>Priority: Critical
> Fix For: 2.6.1, 2.8.0
>
>
> In 2.6, we reworked the main processing/commit loop in `StreamThread` and 
> introduced a regression, by _not_ updating the current time after committing. 
> This implies that we compute the next commit timestamp too low (ie, too 
> early).
> For small commit intervals and high commit latency (like in EOS), this big 
> may lead to an increased commit frequency and fewer processed records between 
> two commits, and thus to reduced throughput.
> For example, assume that the commit interval is 100ms and the commit latency 
> is 50ms, and we start the commit at timestamp 1. The commit finishes at 
> 10050, and the next commit should happen at 10150. However, if we don't 
> update the current timestamp, we incorrectly compute the next commit time as 
> 10100, ie, 50ms too early, and we have only 50ms to process data instead of 
> the intended 100ms.
> In the worst case, if the commit latency is larger than the commit interval, 
> it would imply that we commit after processing a single record per task.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[GitHub] [kafka] ijuma opened a new pull request #9643: MINOR: Upgrade to Scala 2.13.4 and 2.12.13

2020-11-23 Thread GitBox


ijuma opened a new pull request #9643:
URL: https://github.com/apache/kafka/pull/9643


   Scala 2.12.13 includes general bug fixes and performance improvements
   to the collection libraries and the compiler backported from Scala 2.13.x.
   
   Scala 2.13.4 restores default global `ExecutionContext` to 2.12 behavior
   (to fix a perf regression in some use cases) and improves pattern matching
   (especially exhaustiveness checking). Most of the changes are related
   to the latter as I have enabled the newly introduced 
`-Xlint:strict-unsealed-patmat`.
   
   More details on the code changes:
   * Don't swallow exception in 
`ReassignPartitionsCommand.topicDescriptionFutureToState`.
   * `RequestChannel.Response` should be `sealed`.
   * Introduce sealed ClientQuotaManager.BaseUserEntity to avoid false positive
   exhaustiveness warning.
   * Handle a number of cases where pattern matches were not exhaustive:
   either by marking them with @unchecked or by adding a catch-all clause.
   * Workaround scalac bug related to exhaustiveness warnings in ZooKeeperClient
   * Remove warning suppression annotations related to the optimizer that are no
   longer needed in ConsumerGroupCommand and AclAuthorizer.
   * Use `forKeyValue` in `AclAuthorizer.acls` as the scala bug preventing us 
from
   using it seems to be fixed.
   
   Full release notes:
   https://github.com/scala/scala/releases/tag/v2.13.4
   https://github.com/scala/scala/releases/tag/v2.12.12
   
   ### Committer Checklist (excluded from commit message)
   - [ ] Verify design and implementation 
   - [ ] Verify test coverage and CI build status
   - [ ] Verify documentation (including upgrade notes)
   



This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] tombentley opened a new pull request #9644: KAFKA-10565: Only print console producer prompt with a tty

2020-11-23 Thread GitBox


tombentley opened a new pull request #9644:
URL: https://github.com/apache/kafka/pull/9644


   This fixes https://issues.apache.org/jira/browse/KAFKA-2955 by using 
`System.console` to determine whether we're connected to a TTY before printing 
the `kafka-console-producer.sh` prompt. That's not a perfect solution (since 
stdin might be connected to the tty but not stdout, or vice versa), but it's 
still better that printing many prompts when reading from a pipe.



This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] tombentley commented on pull request #9644: KAFKA-10565: Only print console producer prompt with a tty

2020-11-23 Thread GitBox


tombentley commented on pull request #9644:
URL: https://github.com/apache/kafka/pull/9644#issuecomment-732247526


   @chia7712 might you be able to review this? It's trivial, but I couldn't 
find a good way to test it except manually.



This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[jira] [Commented] (KAFKA-10614) Group coordinator onElection/onResignation should guard against leader epoch

2020-11-23 Thread Tom Bentley (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-10614?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17237451#comment-17237451
 ] 

Tom Bentley commented on KAFKA-10614:
-

[~guozhang] [~hachikuji] any thoughts about the PR I opened for this?

> Group coordinator onElection/onResignation should guard against leader epoch
> 
>
> Key: KAFKA-10614
> URL: https://issues.apache.org/jira/browse/KAFKA-10614
> Project: Kafka
>  Issue Type: Bug
>  Components: core
>Reporter: Guozhang Wang
>Assignee: Tom Bentley
>Priority: Major
>
> When there are a sequence of LeaderAndISR or StopReplica requests sent from 
> different controllers causing the group coordinator to elect / resign, we may 
> re-order the events due to race condition. For example:
> 1) First LeaderAndISR request received from old controller to resign as the 
> group coordinator.
> 2) Second LeaderAndISR request received from new controller to elect as the 
> group coordinator.
> 3) Although threads handling the 1/2) requests are synchronized on the 
> replica manager, their callback {{onLeadershipChange}} would trigger 
> {{onElection/onResignation}} which would schedule the loading / unloading on 
> background threads, and are not synchronized.
> 4) As a result, the {{onElection}} maybe triggered by the thread first, and 
> then {{onResignation}}. As a result, the coordinator would not recognize it 
> self as the coordinator and hence would respond any coordinator request with 
> {{NOT_COORDINATOR}}.
> Here are two proposals on top of my head:
> 1) Let the scheduled load / unload function to keep the passed in leader 
> epoch, and also materialize the epoch in memory. Then when execute the 
> unloading check against the leader epoch.
> 2) This may be a bit simpler: using a single background thread working on a 
> FIFO queue of loading / unloading jobs, since the caller are actually 
> synchronized on replica manager and order preserved, the enqueued loading / 
> unloading job would be correctly ordered as well. In that case we would avoid 
> the reordering. 



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[GitHub] [kafka] jolshan commented on a change in pull request #9622: KAFKA-10547; add topicId in MetadataResp

2020-11-23 Thread GitBox


jolshan commented on a change in pull request #9622:
URL: https://github.com/apache/kafka/pull/9622#discussion_r528830838



##
File path: clients/src/main/java/org/apache/kafka/common/Cluster.java
##
@@ -327,6 +350,18 @@ public Node controller() {
 return controller;
 }
 
+public Collection topicIds() {
+return topicIds.values();
+}
+
+public Uuid getTopicId(String topic) {
+return topicIds.getOrDefault(topic, Uuid.ZERO_UUID);
+}
+
+public String getTopicName(Uuid topiId) {

Review comment:
   I see...maybe I missed it, but I did not see the method used. What use 
case would you expect for topic names in Cluster?





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[jira] [Comment Edited] (KAFKA-9552) Stream should handle OutOfSequence exception thrown from Producer

2020-11-23 Thread Matthias J. Sax (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-9552?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17037683#comment-17037683
 ] 

Matthias J. Sax edited comment on KAFKA-9552 at 11/23/20, 4:59 PM:
---

I am not sure if we need to re-balance – if we would have missed a rebalance 
and lost the task, we would get a `ProducerFencedException`. Hence, on this 
error we should still be part of the consumer group.

>From my understanding an `OutOfOrderSequenceException` implies data loss, ie, 
>we got an ack back, but on the next send data is not in the log (this could 
>happen if unclean leader election is enabled broker side) – otherwise should 
>indicate a severe bug.

While we could abort the current transaction, and reinitialize the task (ie, 
refetch the input topic offsets, cleanup the state etc), I am wondering if we 
should do this as it would mask a bug? Instead, it might be better to not catch 
and fail fast thus we can report this error?

Btw: In `RecordCollectorImpl` in a recent PR we started to catch 
`OutOfOrderSequenceException` and rethrow `TaskMigratedException` for this case 
– however, I am not sure if we should keep this change or roll it back for the 
same reason.

\cc [~guozhang] [~hachikuji] [~vvcephei]


was (Author: mjsax):
I am not sure if we need to re-balance – if we would have missed a rebalance 
and lost the task, we would get a `ProducerFencedException`. Hence, on this 
error we should still be part of the consumer group.

>From my understanding an `OutOfOrderSequenceExceptio`n implies data loss, ie, 
>we got an ack back, but on the next send data is not in the log (this could 
>happen if unclean leader election is enabled broker side) – otherwise should 
>indicate a severe bug.

While we could abort the current transaction, and reinitialize the task (ie, 
refetch the input topic offsets, cleanup the state etc), I am wondering if we 
should do this as it would mask a bug? Instead, it might be better to not catch 
and fail fast thus we can report this error?

Btw: In `RecordCollectorImpl` in a recent PR we started to catch 
`OutOfOrderSequenceException` and rethrow `TaskMigratedException` for this case 
– however, I am not sure if we should keep this change or roll it back for the 
same reason.

\cc [~guozhang] [~hachikuji] [~vvcephei]

> Stream should handle OutOfSequence exception thrown from Producer
> -
>
> Key: KAFKA-9552
> URL: https://issues.apache.org/jira/browse/KAFKA-9552
> Project: Kafka
>  Issue Type: Improvement
>  Components: streams
>Affects Versions: 2.5.0
>Reporter: Boyang Chen
>Priority: Major
>
> As of today the stream thread could die from OutOfSequence error:
> {code:java}
>  [2020-02-12T07:14:35-08:00] 
> (streams-soak-2-5-eos_soak_i-03f89b1e566ac95cc_streamslog) 
> org.apache.kafka.common.errors.OutOfOrderSequenceException: The broker 
> received an out of order sequence number.
>  [2020-02-12T07:14:35-08:00] 
> (streams-soak-2-5-eos_soak_i-03f89b1e566ac95cc_streamslog) [2020-02-12 
> 15:14:35,185] ERROR 
> [stream-soak-test-546f8754-5991-4d62-8565-dbe98d51638e-StreamThread-1] 
> stream-thread 
> [stream-soak-test-546f8754-5991-4d62-8565-dbe98d51638e-StreamThread-1] Failed 
> to commit stream task 3_2 due to the following error: 
> (org.apache.kafka.streams.processor.internals.AssignedStreamsTasks)
>  [2020-02-12T07:14:35-08:00] 
> (streams-soak-2-5-eos_soak_i-03f89b1e566ac95cc_streamslog) 
> org.apache.kafka.streams.errors.StreamsException: task [3_2] Abort sending 
> since an error caught with a previous record (timestamp 1581484094825) to 
> topic stream-soak-test-KSTREAM-AGGREGATE-STATE-STORE-49-changelog due 
> to org.apache.kafka.common.errors.OutOfOrderSequenceException: The broker 
> received an out of order sequence number.
>  at 
> org.apache.kafka.streams.processor.internals.RecordCollectorImpl.recordSendError(RecordCollectorImpl.java:154)
>  at 
> org.apache.kafka.streams.processor.internals.RecordCollectorImpl.access$500(RecordCollectorImpl.java:52)
>  at 
> org.apache.kafka.streams.processor.internals.RecordCollectorImpl$1.onCompletion(RecordCollectorImpl.java:214)
>  at 
> org.apache.kafka.clients.producer.KafkaProducer$InterceptorCallback.onCompletion(KafkaProducer.java:1353)
> {code}
>  Although this is fatal exception for Producer, stream should treat it as an 
> opportunity to reinitialize by doing a rebalance, instead of killing 
> computation resource.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Created] (KAFKA-10761) Implement SnapshotId for the Fetch RPC

2020-11-23 Thread Jose Armando Garcia Sancio (Jira)
Jose Armando Garcia Sancio created KAFKA-10761:
--

 Summary: Implement SnapshotId for the Fetch RPC
 Key: KAFKA-10761
 URL: https://issues.apache.org/jira/browse/KAFKA-10761
 Project: Kafka
  Issue Type: Sub-task
  Components: replication
Reporter: Jose Armando Garcia Sancio
Assignee: Jose Armando Garcia Sancio






--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[GitHub] [kafka] ableegoldman commented on a change in pull request #9629: KAFKA-10754: fix flaky tests by waiting kafka streams be in running state before assert

2020-11-23 Thread GitBox


ableegoldman commented on a change in pull request #9629:
URL: https://github.com/apache/kafka/pull/9629#discussion_r528943389



##
File path: 
streams/src/test/java/org/apache/kafka/streams/integration/StreamsUncaughtExceptionHandlerIntegrationTest.java
##
@@ -110,83 +110,45 @@ public void teardown() throws IOException {
 }
 
 @Test
-public void shouldShutdownThreadUsingOldHandler() throws Exception {
+public void shouldShutdownThreadUsingOldHandler() throws 
InterruptedException {
 try (final KafkaStreams kafkaStreams = new 
KafkaStreams(builder.build(), properties)) {
-final CountDownLatch latch = new CountDownLatch(1);
 final AtomicBoolean flag = new AtomicBoolean(false);
 kafkaStreams.setUncaughtExceptionHandler((t, e) -> flag.set(true));
 
 
StreamsTestUtils.startKafkaStreamsAndWaitForRunningState(kafkaStreams);
-
 produceMessages(0L, inputTopic, "A");
-waitForApplicationState(Collections.singletonList(kafkaStreams), 
KafkaStreams.State.ERROR, Duration.ofSeconds(15));
 
 TestUtils.waitForCondition(flag::get, "Handler was called");
-assertThat(processorValueCollector.size(), equalTo(2));

Review comment:
   @wcarlson5 for example, this test probably should have multiple threads, 
right?

##
File path: streams/src/main/java/org/apache/kafka/streams/KafkaStreams.java
##
@@ -379,13 +379,12 @@ public void setUncaughtExceptionHandler(final 
Thread.UncaughtExceptionHandler un
 }
 
 /**
- * Set the handler invoked when an {@link 
StreamsConfig#NUM_STREAM_THREADS_CONFIG internal thread}
+ * Set the handler invoked when an {@link 
StreamsConfig#NUM_STREAM_THREADS_CONFIG} internal thread

Review comment:
   I think this was actually correct as it was (and ditto for the above). 
One alternative suggestion:
   
   ```suggestion
* Set the handler invoked when an internal {@link 
StreamsConfig#NUM_STREAM_THREADS_CONFIG stream thread}
   ```

##
File path: 
streams/src/test/java/org/apache/kafka/streams/integration/StreamsUncaughtExceptionHandlerIntegrationTest.java
##
@@ -97,7 +97,7 @@ public void setup() {
 mkEntry(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, 
CLUSTER.bootstrapServers()),
 mkEntry(StreamsConfig.APPLICATION_ID_CONFIG, appId),
 mkEntry(StreamsConfig.STATE_DIR_CONFIG, 
TestUtils.tempDirectory().getPath()),
-mkEntry(StreamsConfig.NUM_STREAM_THREADS_CONFIG, 2),
+mkEntry(StreamsConfig.NUM_STREAM_THREADS_CONFIG, 1),

Review comment:
   Hey @wcarlson5 , can you take a look at this? If we change the default 
number of threads to 1 will we be reducing test coverage or not testing the 
correct thing anymore?
   
   FWIW I think for tests where the number of threads doesn't matter, we should 
default to 1. But I'm not sure which tests do/do not rely on using multiple 
stream threads





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[jira] [Commented] (KAFKA-10643) Static membership - repetitive PreparingRebalance with updating metadata for member reason

2020-11-23 Thread Eran Levy (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-10643?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17237650#comment-17237650
 ] 

Eran Levy commented on KAFKA-10643:
---

[~cadonna] Thanks for sharing re the RocksDB metrics. 

Re the data, yes, I meant the data stored locally. 

[~ableegoldman] today the rebalancing got stuck again, even with such low 
storage sizes after my last cleanup. 

It always happen after a restart of one of the pods that has been rescheduled 
and it happens only sometimes - i.e. today we had few pod reschedules and only 
2 of them got stuck. So it looks like we are getting into some edge case here. 

What I do see always is that right after the restart of a pod, suddenly few 
seconds after it other pods start to fail which causing the whole rebalancing 
process to get stuck.. 

It looks very related to the static membership, it all started 1 month ago 
after we started to use static membership. Im thinking on reverting from using 
the great capabilities that static membership provides :(

Which debug logs will help you? the debug logs of the app or the kafka cluster?

> Static membership - repetitive PreparingRebalance with updating metadata for 
> member reason
> --
>
> Key: KAFKA-10643
> URL: https://issues.apache.org/jira/browse/KAFKA-10643
> Project: Kafka
>  Issue Type: Bug
>  Components: streams
>Affects Versions: 2.6.0
>Reporter: Eran Levy
>Priority: Major
> Attachments: broker-4-11.csv, client-4-11.csv, 
> client-d-9-11-11-2020.csv
>
>
> Kafka streams 2.6.0, brokers version 2.6.0. Kafka nodes are healthy, kafka 
> streams app is healthy. 
> Configured with static membership. 
> Every 10 minutes (I assume cause of topic.metadata.refresh.interval.ms), I 
> see the following group coordinator log for different stream consumers: 
> INFO [GroupCoordinator 2]: Preparing to rebalance group **--**-stream in 
> state PreparingRebalance with old generation 12244 (__consumer_offsets-45) 
> (reason: Updating metadata for member 
> -stream-11-1-013edd56-ed93-4370-b07c-1c29fbe72c9a) 
> (kafka.coordinator.group.GroupCoordinator)
> and right after that the following log: 
> INFO [GroupCoordinator 2]: Assignment received from leader for group 
> **-**-stream for generation 12246 (kafka.coordinator.group.GroupCoordinator)
>  
> Looked a bit on the kafka code and Im not sure that I get why such a thing 
> happening - is this line described the situation that happens here re the 
> "reason:"?[https://github.com/apache/kafka/blob/7ca299b8c0f2f3256c40b694078e422350c20d19/core/src/main/scala/kafka/coordinator/group/GroupCoordinator.scala#L311]
> I also dont see it happening too often in other kafka streams applications 
> that we have. 
> The only thing suspicious that I see around every hour that different pods of 
> that kafka streams application throw this exception: 
> {"timestamp":"2020-10-25T06:44:20.414Z","level":"INFO","thread":"**-**-stream-94561945-4191-4a07-ac1b-07b27e044402-StreamThread-1","logger":"org.apache.kafka.clients.FetchSessionHandler","message":"[Consumer
>  
> clientId=**-**-stream-94561945-4191-4a07-ac1b-07b27e044402-StreamThread-1-restore-consumer,
>  groupId=null] Error sending fetch request (sessionId=34683236, epoch=2872) 
> to node 
> 3:","context":"default","exception":"org.apache.kafka.common.errors.DisconnectException:
>  null\n"}
> I came across this strange behaviour after stated to investigate a strange 
> stuck rebalancing state after one of the members left the group and caused 
> the rebalance to stuck - the only thing that I found is that maybe because 
> that too often preparing to rebalance states, the app might affected of this 
> bug - KAFKA-9752 ?
> I dont understand why it happens, it wasn't before I applied static 
> membership to that kafka streams application (since around 2 weeks ago). 
> Will be happy if you can help me
>  
>  



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Updated] (KAFKA-10755) Should consider commit latency when computing next commit timestamp

2020-11-23 Thread Matthias J. Sax (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-10755?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Matthias J. Sax updated KAFKA-10755:

Priority: Blocker  (was: Critical)

> Should consider commit latency when computing next commit timestamp
> ---
>
> Key: KAFKA-10755
> URL: https://issues.apache.org/jira/browse/KAFKA-10755
> Project: Kafka
>  Issue Type: Bug
>  Components: streams
>Affects Versions: 2.6.0
>Reporter: Matthias J. Sax
>Assignee: Matthias J. Sax
>Priority: Blocker
> Fix For: 2.6.1, 2.8.0
>
>
> In 2.6, we reworked the main processing/commit loop in `StreamThread` and 
> introduced a regression, by _not_ updating the current time after committing. 
> This implies that we compute the next commit timestamp too low (ie, too 
> early).
> For small commit intervals and high commit latency (like in EOS), this big 
> may lead to an increased commit frequency and fewer processed records between 
> two commits, and thus to reduced throughput.
> For example, assume that the commit interval is 100ms and the commit latency 
> is 50ms, and we start the commit at timestamp 1. The commit finishes at 
> 10050, and the next commit should happen at 10150. However, if we don't 
> update the current timestamp, we incorrectly compute the next commit time as 
> 10100, ie, 50ms too early, and we have only 50ms to process data instead of 
> the intended 100ms.
> In the worst case, if the commit latency is larger than the commit interval, 
> it would imply that we commit after processing a single record per task.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[GitHub] [kafka] ijuma opened a new pull request #9645: MINOR: Update build and test dependencies

2020-11-23 Thread GitBox


ijuma opened a new pull request #9645:
URL: https://github.com/apache/kafka/pull/9645


   * gradle (6.7.0 -> 6.7.1): minor fixes.
   * gradle versions plugin (0.29.0 -> 0.36.0): minor fixes.
   * grgit (4.0.2 -> 4.1.0): a few small fixes and dependency bumps.
   * owasp dependency checker plugin (5.3.2.1 -> 6.0.3): improved db
   schema, data and several fixes. 
   * scoverage plugin (4.0.2 -> 5.0.0): support Scala 2.13.
   * shadow plugin (6.0.0 -> 6.1.0): require Java 8, support for Java 16.
   * spotbugs plugin (4.4.4 -> 4.6.0): support SARIF reporting standard.
   * spotbugs (4.0.6 -> 4.1.4): support for Java 16 and various fixes including
   try with resources false positive.
   * spotless plugin (5.1.0 -> 5.8.2):
   * test retry plugin (1.1.6 -> 1.1.9): newer gradle and java version 
compatibility
   fixes.
   * mockito (3.5.7 -> 3.6.0):
   * powermock (2.0.7 -> 2.0.9): minor fixes.
   
   Release notes links:
   * https://docs.gradle.org/6.7.1/release-notes.html
   * https://github.com/spotbugs/spotbugs/blob/4.1.4/CHANGELOG.md
   * https://github.com/scoverage/gradle-scoverage/releases/tag/5.0.0
   * https://github.com/johnrengelman/shadow/releases/tag/6.1.0
   * https://github.com/spotbugs/spotbugs-gradle-plugin/releases/tag/4.6.0
   * https://github.com/spotbugs/spotbugs-gradle-plugin/releases/tag/4.6.0
   * https://github.com/spotbugs/spotbugs-gradle-plugin/releases/tag/4.5.0
   * https://github.com/ben-manes/gradle-versions-plugin/releases
   * https://github.com/ajoberstar/grgit/releases/tag/4.1.0
   * 
https://github.com/jeremylong/DependencyCheck/blob/main/RELEASE_NOTES.md#version-603-2020-11-03
   * https://github.com/powermock/powermock/releases/tag/powermock-2.0.8
   * https://github.com/powermock/powermock/releases/tag/powermock-2.0.9
   * https://github.com/gradle/test-retry-gradle-plugin/releases
   
   ### Committer Checklist (excluded from commit message)
   - [ ] Verify design and implementation 
   - [ ] Verify test coverage and CI build status
   - [ ] Verify documentation (including upgrade notes)
   



This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] ijuma commented on a change in pull request #9645: MINOR: Update build and test dependencies

2020-11-23 Thread GitBox


ijuma commented on a change in pull request #9645:
URL: https://github.com/apache/kafka/pull/9645#discussion_r528947251



##
File path: 
clients/src/main/java/org/apache/kafka/common/header/internals/RecordHeader.java
##
@@ -70,7 +70,7 @@ public boolean equals(Object o) {
 
 @Override
 public int hashCode() {
-int result = key() != null ? key().hashCode() : 0;
+int result = key().hashCode();

Review comment:
   cc @chia7712 since you changed related code recently.





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[jira] [Commented] (KAFKA-10755) Should consider commit latency when computing next commit timestamp

2020-11-23 Thread Matthias J. Sax (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-10755?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17237655#comment-17237655
 ] 

Matthias J. Sax commented on KAFKA-10755:
-

After reconsideration, we should get this into 2.7.0 release, too.

> Should consider commit latency when computing next commit timestamp
> ---
>
> Key: KAFKA-10755
> URL: https://issues.apache.org/jira/browse/KAFKA-10755
> Project: Kafka
>  Issue Type: Bug
>  Components: streams
>Affects Versions: 2.6.0
>Reporter: Matthias J. Sax
>Assignee: Matthias J. Sax
>Priority: Blocker
> Fix For: 2.6.1, 2.8.0
>
>
> In 2.6, we reworked the main processing/commit loop in `StreamThread` and 
> introduced a regression, by _not_ updating the current time after committing. 
> This implies that we compute the next commit timestamp too low (ie, too 
> early).
> For small commit intervals and high commit latency (like in EOS), this big 
> may lead to an increased commit frequency and fewer processed records between 
> two commits, and thus to reduced throughput.
> For example, assume that the commit interval is 100ms and the commit latency 
> is 50ms, and we start the commit at timestamp 1. The commit finishes at 
> 10050, and the next commit should happen at 10150. However, if we don't 
> update the current timestamp, we incorrectly compute the next commit time as 
> 10100, ie, 50ms too early, and we have only 50ms to process data instead of 
> the intended 100ms.
> In the worst case, if the commit latency is larger than the commit interval, 
> it would imply that we commit after processing a single record per task.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[GitHub] [kafka] ijuma commented on a change in pull request #9645: MINOR: Update build and test dependencies

2020-11-23 Thread GitBox


ijuma commented on a change in pull request #9645:
URL: https://github.com/apache/kafka/pull/9645#discussion_r528947706



##
File path: 
clients/src/main/java/org/apache/kafka/clients/admin/ScramMechanism.java
##
@@ -49,8 +53,11 @@ public static ScramMechanism fromType(byte type) {
  * Salted Challenge Response Authentication Mechanism (SCRAM) SASL and 
GSS-API Mechanisms, Section 4
  */
 public static ScramMechanism fromMechanismName(String mechanismName) {
-ScramMechanism retvalFoundMechanism = 
ScramMechanism.valueOf(mechanismName.replace('-', '_'));
-return retvalFoundMechanism != null ? retvalFoundMechanism : UNKNOWN;
+String normalizedMechanism = mechanismName.replace('-', '_');
+return Arrays.stream(VALUES)
+.filter(mechanism -> mechanism.name().equals(normalizedMechanism))
+.findFirst()
+.orElse(UNKNOWN);

Review comment:
   @rondagostino looks like the original implementation didn't achieve the 
goal. I added a test that failed without this change.





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[jira] [Comment Edited] (KAFKA-10755) Should consider commit latency when computing next commit timestamp

2020-11-23 Thread Matthias J. Sax (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-10755?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17237655#comment-17237655
 ] 

Matthias J. Sax edited comment on KAFKA-10755 at 11/23/20, 7:33 PM:


After reconsideration, I think we should get this into 2.7.0 release, too. \cc 
[~bbejeck]


was (Author: mjsax):
After reconsideration, we should get this into 2.7.0 release, too.

> Should consider commit latency when computing next commit timestamp
> ---
>
> Key: KAFKA-10755
> URL: https://issues.apache.org/jira/browse/KAFKA-10755
> Project: Kafka
>  Issue Type: Bug
>  Components: streams
>Affects Versions: 2.6.0
>Reporter: Matthias J. Sax
>Assignee: Matthias J. Sax
>Priority: Blocker
> Fix For: 2.6.1, 2.8.0
>
>
> In 2.6, we reworked the main processing/commit loop in `StreamThread` and 
> introduced a regression, by _not_ updating the current time after committing. 
> This implies that we compute the next commit timestamp too low (ie, too 
> early).
> For small commit intervals and high commit latency (like in EOS), this big 
> may lead to an increased commit frequency and fewer processed records between 
> two commits, and thus to reduced throughput.
> For example, assume that the commit interval is 100ms and the commit latency 
> is 50ms, and we start the commit at timestamp 1. The commit finishes at 
> 10050, and the next commit should happen at 10150. However, if we don't 
> update the current timestamp, we incorrectly compute the next commit time as 
> 10100, ie, 50ms too early, and we have only 50ms to process data instead of 
> the intended 100ms.
> In the worst case, if the commit latency is larger than the commit interval, 
> it would imply that we commit after processing a single record per task.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Commented] (KAFKA-10643) Static membership - repetitive PreparingRebalance with updating metadata for member reason

2020-11-23 Thread A. Sophie Blee-Goldman (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-10643?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17237660#comment-17237660
 ] 

A. Sophie Blee-Goldman commented on KAFKA-10643:


Sorry yeah I meant debug logs for the Streams app. That should help us narrow 
down whether the app is getting stuck somewhere or what might possibly be 
happening. I mean it's always possible that this is a broker-side bug, since 
the static membership feature spans both client and broker code. But I think it 
makes sense to start investigating the Streams app/logs since in my experience, 
these sorts of issues are typically client-side.

FWIW I'm not personally an expert in static membership – not sure if [~boyang] 
might have any ideas on what could be causing this, if there are any known bugs 
in static membership that cause members to drop out, etc

> Static membership - repetitive PreparingRebalance with updating metadata for 
> member reason
> --
>
> Key: KAFKA-10643
> URL: https://issues.apache.org/jira/browse/KAFKA-10643
> Project: Kafka
>  Issue Type: Bug
>  Components: streams
>Affects Versions: 2.6.0
>Reporter: Eran Levy
>Priority: Major
> Attachments: broker-4-11.csv, client-4-11.csv, 
> client-d-9-11-11-2020.csv
>
>
> Kafka streams 2.6.0, brokers version 2.6.0. Kafka nodes are healthy, kafka 
> streams app is healthy. 
> Configured with static membership. 
> Every 10 minutes (I assume cause of topic.metadata.refresh.interval.ms), I 
> see the following group coordinator log for different stream consumers: 
> INFO [GroupCoordinator 2]: Preparing to rebalance group **--**-stream in 
> state PreparingRebalance with old generation 12244 (__consumer_offsets-45) 
> (reason: Updating metadata for member 
> -stream-11-1-013edd56-ed93-4370-b07c-1c29fbe72c9a) 
> (kafka.coordinator.group.GroupCoordinator)
> and right after that the following log: 
> INFO [GroupCoordinator 2]: Assignment received from leader for group 
> **-**-stream for generation 12246 (kafka.coordinator.group.GroupCoordinator)
>  
> Looked a bit on the kafka code and Im not sure that I get why such a thing 
> happening - is this line described the situation that happens here re the 
> "reason:"?[https://github.com/apache/kafka/blob/7ca299b8c0f2f3256c40b694078e422350c20d19/core/src/main/scala/kafka/coordinator/group/GroupCoordinator.scala#L311]
> I also dont see it happening too often in other kafka streams applications 
> that we have. 
> The only thing suspicious that I see around every hour that different pods of 
> that kafka streams application throw this exception: 
> {"timestamp":"2020-10-25T06:44:20.414Z","level":"INFO","thread":"**-**-stream-94561945-4191-4a07-ac1b-07b27e044402-StreamThread-1","logger":"org.apache.kafka.clients.FetchSessionHandler","message":"[Consumer
>  
> clientId=**-**-stream-94561945-4191-4a07-ac1b-07b27e044402-StreamThread-1-restore-consumer,
>  groupId=null] Error sending fetch request (sessionId=34683236, epoch=2872) 
> to node 
> 3:","context":"default","exception":"org.apache.kafka.common.errors.DisconnectException:
>  null\n"}
> I came across this strange behaviour after stated to investigate a strange 
> stuck rebalancing state after one of the members left the group and caused 
> the rebalance to stuck - the only thing that I found is that maybe because 
> that too often preparing to rebalance states, the app might affected of this 
> bug - KAFKA-9752 ?
> I dont understand why it happens, it wasn't before I applied static 
> membership to that kafka streams application (since around 2 weeks ago). 
> Will be happy if you can help me
>  
>  



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Updated] (KAFKA-10755) Should consider commit latency when computing next commit timestamp

2020-11-23 Thread Matthias J. Sax (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-10755?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Matthias J. Sax updated KAFKA-10755:

Fix Version/s: (was: 2.8.0)
   2.7.0

> Should consider commit latency when computing next commit timestamp
> ---
>
> Key: KAFKA-10755
> URL: https://issues.apache.org/jira/browse/KAFKA-10755
> Project: Kafka
>  Issue Type: Bug
>  Components: streams
>Affects Versions: 2.6.0
>Reporter: Matthias J. Sax
>Assignee: Matthias J. Sax
>Priority: Blocker
> Fix For: 2.7.0, 2.6.1
>
>
> In 2.6, we reworked the main processing/commit loop in `StreamThread` and 
> introduced a regression, by _not_ updating the current time after committing. 
> This implies that we compute the next commit timestamp too low (ie, too 
> early).
> For small commit intervals and high commit latency (like in EOS), this big 
> may lead to an increased commit frequency and fewer processed records between 
> two commits, and thus to reduced throughput.
> For example, assume that the commit interval is 100ms and the commit latency 
> is 50ms, and we start the commit at timestamp 1. The commit finishes at 
> 10050, and the next commit should happen at 10150. However, if we don't 
> update the current timestamp, we incorrectly compute the next commit time as 
> 10100, ie, 50ms too early, and we have only 50ms to process data instead of 
> the intended 100ms.
> In the worst case, if the commit latency is larger than the commit interval, 
> it would imply that we commit after processing a single record per task.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[GitHub] [kafka] rondagostino commented on a change in pull request #9645: MINOR: Update build and test dependencies

2020-11-23 Thread GitBox


rondagostino commented on a change in pull request #9645:
URL: https://github.com/apache/kafka/pull/9645#discussion_r528986190



##
File path: 
clients/src/main/java/org/apache/kafka/clients/admin/ScramMechanism.java
##
@@ -49,8 +53,11 @@ public static ScramMechanism fromType(byte type) {
  * Salted Challenge Response Authentication Mechanism (SCRAM) SASL and 
GSS-API Mechanisms, Section 4
  */
 public static ScramMechanism fromMechanismName(String mechanismName) {
-ScramMechanism retvalFoundMechanism = 
ScramMechanism.valueOf(mechanismName.replace('-', '_'));
-return retvalFoundMechanism != null ? retvalFoundMechanism : UNKNOWN;
+String normalizedMechanism = mechanismName.replace('-', '_');
+return Arrays.stream(VALUES)
+.filter(mechanism -> mechanism.name().equals(normalizedMechanism))
+.findFirst()
+.orElse(UNKNOWN);

Review comment:
   Ah, you are right: `valueOf()` `Throws: IllegalArgumentException - if 
the specified enum type has no constant with the specified name, or the 
specified class object does not represent an enum type.`
   
   Thanks for catching/testing for/fixing it.





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] wcarlson5 commented on a change in pull request #9629: KAFKA-10754: fix flaky tests by waiting kafka streams be in running state before assert

2020-11-23 Thread GitBox


wcarlson5 commented on a change in pull request #9629:
URL: https://github.com/apache/kafka/pull/9629#discussion_r528992986



##
File path: 
streams/src/test/java/org/apache/kafka/streams/integration/StreamsUncaughtExceptionHandlerIntegrationTest.java
##
@@ -110,83 +110,45 @@ public void teardown() throws IOException {
 }
 
 @Test
-public void shouldShutdownThreadUsingOldHandler() throws Exception {
+public void shouldShutdownThreadUsingOldHandler() throws 
InterruptedException {
 try (final KafkaStreams kafkaStreams = new 
KafkaStreams(builder.build(), properties)) {
-final CountDownLatch latch = new CountDownLatch(1);
 final AtomicBoolean flag = new AtomicBoolean(false);
 kafkaStreams.setUncaughtExceptionHandler((t, e) -> flag.set(true));
 
 
StreamsTestUtils.startKafkaStreamsAndWaitForRunningState(kafkaStreams);
-
 produceMessages(0L, inputTopic, "A");
-waitForApplicationState(Collections.singletonList(kafkaStreams), 
KafkaStreams.State.ERROR, Duration.ofSeconds(15));
 
 TestUtils.waitForCondition(flag::get, "Handler was called");
-assertThat(processorValueCollector.size(), equalTo(2));

Review comment:
   as above

##
File path: 
streams/src/test/java/org/apache/kafka/streams/integration/StreamsUncaughtExceptionHandlerIntegrationTest.java
##
@@ -97,7 +97,7 @@ public void setup() {
 mkEntry(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, 
CLUSTER.bootstrapServers()),
 mkEntry(StreamsConfig.APPLICATION_ID_CONFIG, appId),
 mkEntry(StreamsConfig.STATE_DIR_CONFIG, 
TestUtils.tempDirectory().getPath()),
-mkEntry(StreamsConfig.NUM_STREAM_THREADS_CONFIG, 2),
+mkEntry(StreamsConfig.NUM_STREAM_THREADS_CONFIG, 1),

Review comment:
   Yes, Both the old handler test and the close client should have 2 
threads. We need to ensure that after a rebalance the old handler has attempted 
the process the record twice and the client shutdown only once. We can not be 
sure of that with only one thread.

##
File path: 
streams/src/test/java/org/apache/kafka/streams/integration/StreamsUncaughtExceptionHandlerIntegrationTest.java
##
@@ -110,83 +110,45 @@ public void teardown() throws IOException {
 }
 
 @Test
-public void shouldShutdownThreadUsingOldHandler() throws Exception {
+public void shouldShutdownThreadUsingOldHandler() throws 
InterruptedException {
 try (final KafkaStreams kafkaStreams = new 
KafkaStreams(builder.build(), properties)) {
-final CountDownLatch latch = new CountDownLatch(1);
 final AtomicBoolean flag = new AtomicBoolean(false);
 kafkaStreams.setUncaughtExceptionHandler((t, e) -> flag.set(true));
 
 
StreamsTestUtils.startKafkaStreamsAndWaitForRunningState(kafkaStreams);
-
 produceMessages(0L, inputTopic, "A");
-waitForApplicationState(Collections.singletonList(kafkaStreams), 
KafkaStreams.State.ERROR, Duration.ofSeconds(15));
 
 TestUtils.waitForCondition(flag::get, "Handler was called");
+waitForApplicationState(Collections.singletonList(kafkaStreams), 
KafkaStreams.State.ERROR, DEFAULT_DURATION);

Review comment:
   The order is not really that important here, either way works





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] d8tltanc commented on a change in pull request #9485: KAKFA-10619: Idempotent producer will get authorized once it has a WRITE access to at least one topic

2020-11-23 Thread GitBox


d8tltanc commented on a change in pull request #9485:
URL: https://github.com/apache/kafka/pull/9485#discussion_r529008307



##
File path: 
clients/src/main/java/org/apache/kafka/server/authorizer/Authorizer.java
##
@@ -139,4 +150,129 @@
  * @return Iterator for ACL bindings, which may be populated lazily.
  */
 Iterable acls(AclBindingFilter filter);
+
+/**
+ * Check if the caller is authorized to perform the given ACL operation on 
at least one
+ * resource of the given type.
+ *
+ * @param requestContext Request context including request resourceType, 
security protocol, and listener name
+ * @param op The ACL operation to check
+ * @param resourceType   The resource type to check
+ * @return   Return {@link AuthorizationResult#ALLOWED} if the 
caller is authorized to perform the
+ *   given ACL operation on at least one resource of 
the given type.
+ *   Return {@link AuthorizationResult#DENIED} 
otherwise.
+ */
+default AuthorizationResult 
authorizeByResourceType(AuthorizableRequestContext requestContext, AclOperation 
op, ResourceType resourceType) {
+if (resourceType == ResourceType.ANY) {
+throw new IllegalArgumentException(
+"Must specify a non-filter resource type for 
authorizeByResourceType");
+}
+
+if (resourceType == ResourceType.UNKNOWN) {
+throw new IllegalArgumentException(
+"Unknown resource type");
+}
+
+if (op == AclOperation.ANY) {
+throw new IllegalArgumentException(
+"Must specify a non-filter operation type for 
authorizeByResourceType");
+}
+
+if (op == AclOperation.UNKNOWN) {
+throw new IllegalArgumentException(
+"Unknown operation type");
+}
+
+ResourcePatternFilter resourceTypeFilter = new ResourcePatternFilter(
+resourceType, null, PatternType.ANY);
+AclBindingFilter aclFilter = new AclBindingFilter(
+resourceTypeFilter, AccessControlEntryFilter.ANY);
+
+Set denyLiterals = new HashSet<>();
+Set denyPrefixes = new HashSet<>();
+Set allowLiterals = new HashSet<>();
+Set allowPrefixes = new HashSet<>();
+boolean hasWildCardAllow = false;
+
+for (AclBinding binding : acls(aclFilter)) {
+if 
(!binding.entry().host().equals(requestContext.clientAddress().getHostAddress())
+&& !binding.entry().host().equals("*"))
+continue;
+
+if 
(!binding.entry().principal().equals(requestContext.principal().toString())

Review comment:
   Yeah, that's right. Construct a KafkaPrinciple instance with params 
referred from principal.getType() and getName()

##
File path: 
clients/src/main/java/org/apache/kafka/server/authorizer/Authorizer.java
##
@@ -139,4 +150,129 @@
  * @return Iterator for ACL bindings, which may be populated lazily.
  */
 Iterable acls(AclBindingFilter filter);
+
+/**
+ * Check if the caller is authorized to perform the given ACL operation on 
at least one
+ * resource of the given type.
+ *
+ * @param requestContext Request context including request resourceType, 
security protocol, and listener name
+ * @param op The ACL operation to check
+ * @param resourceType   The resource type to check
+ * @return   Return {@link AuthorizationResult#ALLOWED} if the 
caller is authorized to perform the
+ *   given ACL operation on at least one resource of 
the given type.
+ *   Return {@link AuthorizationResult#DENIED} 
otherwise.
+ */
+default AuthorizationResult 
authorizeByResourceType(AuthorizableRequestContext requestContext, AclOperation 
op, ResourceType resourceType) {
+if (resourceType == ResourceType.ANY) {
+throw new IllegalArgumentException(
+"Must specify a non-filter resource type for 
authorizeByResourceType");
+}
+
+if (resourceType == ResourceType.UNKNOWN) {
+throw new IllegalArgumentException(
+"Unknown resource type");
+}
+
+if (op == AclOperation.ANY) {
+throw new IllegalArgumentException(
+"Must specify a non-filter operation type for 
authorizeByResourceType");
+}
+
+if (op == AclOperation.UNKNOWN) {
+throw new IllegalArgumentException(
+"Unknown operation type");
+}
+
+ResourcePatternFilter resourceTypeFilter = new ResourcePatternFilter(
+resourceType, null, PatternType.ANY);
+AclBindingFilter aclFilter = new AclBindingFilter(
+resourceTypeFilter, AccessControlEntryFilter.ANY);
+
+Set denyLiterals = new HashSet<>();
+Set denyPrefixes = new 

[GitHub] [kafka] d8tltanc commented on a change in pull request #9485: KAKFA-10619: Idempotent producer will get authorized once it has a WRITE access to at least one topic

2020-11-23 Thread GitBox


d8tltanc commented on a change in pull request #9485:
URL: https://github.com/apache/kafka/pull/9485#discussion_r529008818



##
File path: core/src/main/scala/kafka/security/authorizer/AclAuthorizer.scala
##
@@ -307,6 +312,111 @@ class AclAuthorizer extends Authorizer with Logging {
 if (zkClient != null) zkClient.close()
   }
 
+  // TODO: 1. Discuss how to log audit message
+  // TODO: 2. Discuss if we need a trie to optimize(mainly for the O(n^2) loop 
but I think
+  //  in most of the cases it would be O(1) because denyDominatePrefixAllow 
should be rare
+  override def authorizeByResourceType(requestContext: 
AuthorizableRequestContext,
+   op: AclOperation,
+   resourceType: ResourceType): 
AuthorizationResult = {
+if (resourceType eq ResourceType.ANY)
+  throw new IllegalArgumentException("Must specify a non-filter resource 
type for authorizeByResourceType")
+
+if (resourceType eq ResourceType.UNKNOWN)
+  throw new IllegalArgumentException("Unknown resource type")
+
+if (op eq AclOperation.ANY)
+  throw new IllegalArgumentException("Must specify a non-filter operation 
type for authorizeByResourceType")
+
+if (op eq AclOperation.UNKNOWN)
+  throw new IllegalArgumentException("Unknown operation type")
+
+val allowPatterns = matchingPatterns(
+  requestContext.principal().toString,

Review comment:
   Yeah, that's right. Construct a KafkaPrinciple instance with params 
referred from principal.getType() and getName()
   
   commit 89df4d7600cad4e3785d0d95624d0918efce1f44





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] d8tltanc commented on a change in pull request #9485: KAKFA-10619: Idempotent producer will get authorized once it has a WRITE access to at least one topic

2020-11-23 Thread GitBox


d8tltanc commented on a change in pull request #9485:
URL: https://github.com/apache/kafka/pull/9485#discussion_r529031043



##
File path: 
clients/src/main/java/org/apache/kafka/server/authorizer/Authorizer.java
##
@@ -139,4 +150,129 @@
  * @return Iterator for ACL bindings, which may be populated lazily.
  */
 Iterable acls(AclBindingFilter filter);
+
+/**
+ * Check if the caller is authorized to perform the given ACL operation on 
at least one
+ * resource of the given type.
+ *
+ * @param requestContext Request context including request resourceType, 
security protocol, and listener name
+ * @param op The ACL operation to check
+ * @param resourceType   The resource type to check
+ * @return   Return {@link AuthorizationResult#ALLOWED} if the 
caller is authorized to perform the
+ *   given ACL operation on at least one resource of 
the given type.
+ *   Return {@link AuthorizationResult#DENIED} 
otherwise.
+ */
+default AuthorizationResult 
authorizeByResourceType(AuthorizableRequestContext requestContext, AclOperation 
op, ResourceType resourceType) {
+if (resourceType == ResourceType.ANY) {
+throw new IllegalArgumentException(
+"Must specify a non-filter resource type for 
authorizeByResourceType");
+}
+
+if (resourceType == ResourceType.UNKNOWN) {
+throw new IllegalArgumentException(
+"Unknown resource type");
+}
+
+if (op == AclOperation.ANY) {
+throw new IllegalArgumentException(
+"Must specify a non-filter operation type for 
authorizeByResourceType");
+}
+
+if (op == AclOperation.UNKNOWN) {
+throw new IllegalArgumentException(
+"Unknown operation type");
+}
+
+ResourcePatternFilter resourceTypeFilter = new ResourcePatternFilter(
+resourceType, null, PatternType.ANY);
+AclBindingFilter aclFilter = new AclBindingFilter(
+resourceTypeFilter, AccessControlEntryFilter.ANY);
+
+Set denyLiterals = new HashSet<>();
+Set denyPrefixes = new HashSet<>();
+Set allowLiterals = new HashSet<>();
+Set allowPrefixes = new HashSet<>();
+boolean hasWildCardAllow = false;
+
+for (AclBinding binding : acls(aclFilter)) {
+if 
(!binding.entry().host().equals(requestContext.clientAddress().getHostAddress())
+&& !binding.entry().host().equals("*"))
+continue;
+
+if 
(!binding.entry().principal().equals(requestContext.principal().toString())
+&& !binding.entry().principal().equals("User:*"))
+continue;
+
+if (binding.entry().operation() != op
+&& binding.entry().operation() != AclOperation.ALL)
+continue;
+
+if (binding.entry().permissionType() == AclPermissionType.DENY) {
+switch (binding.pattern().patternType()) {
+case LITERAL:
+if 
(binding.pattern().name().equals(ResourcePattern.WILDCARD_RESOURCE))
+return AuthorizationResult.DENIED;
+denyLiterals.add(binding.pattern().name());
+break;
+case PREFIXED:
+denyPrefixes.add(binding.pattern().name());
+break;
+}
+continue;
+}
+
+if (binding.entry().permissionType() != AclPermissionType.ALLOW)
+continue;
+
+switch (binding.pattern().patternType()) {
+case LITERAL:
+if 
(binding.pattern().name().equals(ResourcePattern.WILDCARD_RESOURCE)) {
+hasWildCardAllow = true;
+continue;
+}
+allowLiterals.add(binding.pattern().name());
+break;
+case PREFIXED:
+allowPrefixes.add(binding.pattern().name());
+break;
+}
+}
+
+if (hasWildCardAllow) {
+return AuthorizationResult.ALLOWED;
+}
+
+for (String allowPrefix : allowPrefixes) {
+StringBuilder sb = new StringBuilder();
+boolean hasDominatedDeny = false;
+for (char ch : allowPrefix.toCharArray()) {
+sb.append(ch);
+if (denyPrefixes.contains(sb.toString())) {
+hasDominatedDeny = true;
+break;
+}
+}
+if (!hasDominatedDeny)
+return AuthorizationResult.ALLOWED;
+}
+
+for (String allowLiteral : allowLiterals) {
+if (denyLiterals.contains(allowLiteral))
+ 

[GitHub] [kafka] ijuma commented on a change in pull request #9643: MINOR: Upgrade to Scala 2.13.4 and 2.12.13

2020-11-23 Thread GitBox


ijuma commented on a change in pull request #9643:
URL: https://github.com/apache/kafka/pull/9643#discussion_r529043449



##
File path: core/src/main/scala/kafka/log/Log.scala
##
@@ -2418,13 +2418,11 @@ class Log(@volatile private var _dir: File,
   info(s"Replacing overflowed segment $segment with split segments 
$newSegments")
   replaceSegments(newSegments.toList, List(segment))
   newSegments.toList
-} catch {
-  case e: Exception =>
-newSegments.foreach { splitSegment =>
-  splitSegment.close()
-  splitSegment.deleteIfExists()
-}
-throw e
+} finally {
+  newSegments.foreach { splitSegment =>
+splitSegment.close()
+splitSegment.deleteIfExists()
+  }

Review comment:
   @dhruvilshah3 Is there a reason why we had a try/catch with a rethrow 
instead of using `finally`? spotBugs complained about it and hence the question.





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] ijuma opened a new pull request #9646: MINOR: Update snappy-java to 1.1.8.1

2020-11-23 Thread GitBox


ijuma opened a new pull request #9646:
URL: https://github.com/apache/kafka/pull/9646


   It includes small performance improvements:
   * https://github.com/google/snappy/releases/tag/1.1.8
   * https://github.com/xerial/snappy-java/blob/1.1.8.1/Milestone.md
   
   ### Committer Checklist (excluded from commit message)
   - [ ] Verify design and implementation 
   - [ ] Verify test coverage and CI build status
   - [ ] Verify documentation (including upgrade notes)
   



This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[jira] [Updated] (KAFKA-10547) Add topic IDs to MetadataResponse

2020-11-23 Thread Justine Olshan (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-10547?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Justine Olshan updated KAFKA-10547:
---
Description: Will be able to use TopicDescription to identify the topic ID  
(was: Prevent reads from deleted topics

Will be able to use TopicDescription to identify the topic ID)

> Add topic IDs to MetadataResponse
> -
>
> Key: KAFKA-10547
> URL: https://issues.apache.org/jira/browse/KAFKA-10547
> Project: Kafka
>  Issue Type: Sub-task
>Reporter: Justine Olshan
>Assignee: dengziming
>Priority: Major
>
> Will be able to use TopicDescription to identify the topic ID



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Commented] (KAFKA-10550) Update AdminClient and kafka-topics.sh to support topic IDs

2020-11-23 Thread Justine Olshan (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-10550?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17237748#comment-17237748
 ] 

Justine Olshan commented on KAFKA-10550:


[~dengziming] are you also planning on covering the option to describe using 
only the topic ID parameter? I can work on that, but I wanted to check as to 
not duplicate work.

> Update AdminClient and kafka-topics.sh to support topic IDs
> ---
>
> Key: KAFKA-10550
> URL: https://issues.apache.org/jira/browse/KAFKA-10550
> Project: Kafka
>  Issue Type: Sub-task
>Reporter: Justine Olshan
>Assignee: dengziming
>Priority: Major
>
> Change some AdminClient methods to expose and support topic IDs (describe, 
> delete, return the id on create)
>  
>  Make changes to kafka-topics.sh --describe so a user can specify a topic 
> name to describe with the --topic parameter, or alternatively the user can 
> supply a topic ID with the --topic_id parameter



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Comment Edited] (KAFKA-10550) Update AdminClient and kafka-topics.sh to support topic IDs

2020-11-23 Thread Justine Olshan (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-10550?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17237748#comment-17237748
 ] 

Justine Olshan edited comment on KAFKA-10550 at 11/23/20, 11:42 PM:


[~dengziming] are you also planning on covering the option to describe using 
only the topic ID parameter? I can work on that, but I wanted to check as to 
not duplicate work.

 

Are you also working on the delete topics and create topics commands?


was (Author: jolshan):
[~dengziming] are you also planning on covering the option to describe using 
only the topic ID parameter? I can work on that, but I wanted to check as to 
not duplicate work.

> Update AdminClient and kafka-topics.sh to support topic IDs
> ---
>
> Key: KAFKA-10550
> URL: https://issues.apache.org/jira/browse/KAFKA-10550
> Project: Kafka
>  Issue Type: Sub-task
>Reporter: Justine Olshan
>Assignee: dengziming
>Priority: Major
>
> Change some AdminClient methods to expose and support topic IDs (describe, 
> delete, return the id on create)
>  
>  Make changes to kafka-topics.sh --describe so a user can specify a topic 
> name to describe with the --topic parameter, or alternatively the user can 
> supply a topic ID with the --topic_id parameter



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Assigned] (KAFKA-10758) Kafka Streams consuming from a pattern goes to PENDING_SHUTDOWN when adding a new topic

2020-11-23 Thread A. Sophie Blee-Goldman (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-10758?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

A. Sophie Blee-Goldman reassigned KAFKA-10758:
--

Assignee: A. Sophie Blee-Goldman

> Kafka Streams consuming from a pattern goes to PENDING_SHUTDOWN when adding a 
> new topic
> ---
>
> Key: KAFKA-10758
> URL: https://issues.apache.org/jira/browse/KAFKA-10758
> Project: Kafka
>  Issue Type: Bug
>  Components: streams
>Affects Versions: 2.6.0
>Reporter: Davide Icardi
>Assignee: A. Sophie Blee-Goldman
>Priority: Major
>
> I have a simple Kafka Stream app that consumes from multiple input topics 
> using the _stream_ function that accepts a Pattern 
> ([link|https://kafka.apache.org/26/javadoc/org/apache/kafka/streams/StreamsBuilder.html#stream-java.util.regex.Pattern-]).
>  
> Whenever I add a new topic that matches the pattern the kafka stream state 
> goes to REBALANCING -> ERROR -> PENDING_SHUTDOWN .
> If I restart the app it correctly starts reading again without problems.
> It is by design? Should I handle this and simply restart the app?
>  
> Kafka Stream version is 2.6.0.
> The error is the following:
> {code:java}
> ERROR o.a.k.s.p.i.ProcessorTopology - Set of source nodes do not match:
> sourceNodesByName = [KSTREAM-SOURCE-03, KSTREAM-SOURCE-02]
> sourceTopicsByName = [KSTREAM-SOURCE-00, KSTREAM-SOURCE-14, 
> KSTREAM-SOURCE-03, KSTREAM-SOURCE-02]
> org.apache.kafka.common.KafkaException: User rebalance callback throws an 
> error
>   at 
> org.apache.kafka.clients.consumer.internals.ConsumerCoordinator.onJoinComplete(ConsumerCoordinator.java:436)
>   at 
> org.apache.kafka.clients.consumer.internals.AbstractCoordinator.joinGroupIfNeeded(AbstractCoordinator.java:440)
>   at 
> org.apache.kafka.clients.consumer.internals.AbstractCoordinator.ensureActiveGroup(AbstractCoordinator.java:359)
>   at 
> org.apache.kafka.clients.consumer.internals.ConsumerCoordinator.poll(ConsumerCoordinator.java:513)
>   at 
> org.apache.kafka.clients.consumer.KafkaConsumer.updateAssignmentMetadataIfNeeded(KafkaConsumer.java:1268)
>   at 
> org.apache.kafka.clients.consumer.KafkaConsumer.poll(KafkaConsumer.java:1230)
>   at 
> org.apache.kafka.clients.consumer.KafkaConsumer.poll(KafkaConsumer.java:1210)
>   at 
> org.apache.kafka.streams.processor.internals.StreamThread.pollRequests(StreamThread.java:766)
>   at 
> org.apache.kafka.streams.processor.internals.StreamThread.runOnce(StreamThread.java:624)
>   at 
> org.apache.kafka.streams.processor.internals.StreamThread.runLoop(StreamThread.java:551)
>   at 
> org.apache.kafka.streams.processor.internals.StreamThread.run(StreamThread.java:510)
>  Caused by: java.lang.IllegalStateException: Tried to update source topics 
> but source nodes did not match
>   at 
> org.apache.kafka.streams.processor.internals.ProcessorTopology.updateSourceTopics(ProcessorTopology.java:151)
>   at 
> org.apache.kafka.streams.processor.internals.AbstractTask.update(AbstractTask.java:109)
>   at 
> org.apache.kafka.streams.processor.internals.StreamTask.update(StreamTask.java:514)
>   at 
> org.apache.kafka.streams.processor.internals.TaskManager.updateInputPartitionsAndResume(TaskManager.java:397)
>   at 
> org.apache.kafka.streams.processor.internals.TaskManager.handleAssignment(TaskManager.java:261)
>   at 
> org.apache.kafka.streams.processor.internals.StreamsPartitionAssignor.onAssignment(StreamsPartitionAssignor.java:1428)
>   at 
> org.apache.kafka.clients.consumer.internals.ConsumerCoordinator.invokeOnAssignment(ConsumerCoordinator.java:279)
>   at 
> org.apache.kafka.clients.consumer.internals.ConsumerCoordinator.onJoinComplete(ConsumerCoordinator.java:421)
>   ... 10 common frames omitted
>  KafkaStream state is ERROR
>  17:28:53.200 [datalake-StreamThread-1] ERROR 
> o.apache.kafka.streams.KafkaStreams - stream-client [datalake] All stream 
> threads have died. The instance will be in error state and should be closed.
>  > User rebalance callback throws an error
>  KafkaStream state is PENDING_SHUTDOWN
> {code}
>  
>  



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[GitHub] [kafka] junrao commented on a change in pull request #9631: KAFKA-9672: Leader with ISR as a superset of replicas

2020-11-23 Thread GitBox


junrao commented on a change in pull request #9631:
URL: https://github.com/apache/kafka/pull/9631#discussion_r529070546



##
File path: core/src/main/scala/kafka/cluster/Partition.scala
##
@@ -947,9 +947,10 @@ class Partition(val topicPartition: TopicPartition,
   leaderEndOffset: Long,
   currentTimeMs: Long,
   maxLagMs: Long): Boolean = {
-val followerReplica = getReplicaOrException(replicaId)
-followerReplica.logEndOffset != leaderEndOffset &&
-  (currentTimeMs - followerReplica.lastCaughtUpTimeMs) > maxLagMs
+getReplica(replicaId).fold(true) { followerReplica =>

Review comment:
   For the reassignment case, once the controller shrinks the assigned 
replica set, the next step is for the controller to remove the remove replica 
from ISR and bump up the leader epoch. The shrunk isr will then be propagated 
to the leader. With `fold(true)`, we allow to leader to shrink ISR immediately. 
This might be ok, but is unnecessary work since the controller will be doing 
that soon.
   
   Another option is to use `fold(false)`. This way, the leader won't shrink 
the removed replica from ISR. Only the controller will do.
   





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[jira] [Updated] (KAFKA-10733) Enforce exception thrown for KafkaProducer txn APIs

2020-11-23 Thread Boyang Chen (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-10733?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Boyang Chen updated KAFKA-10733:

Labels: need-kip  (was: )

> Enforce exception thrown for KafkaProducer txn APIs
> ---
>
> Key: KAFKA-10733
> URL: https://issues.apache.org/jira/browse/KAFKA-10733
> Project: Kafka
>  Issue Type: Improvement
>Reporter: Boyang Chen
>Priority: Major
>  Labels: need-kip
>
> In general, KafkaProducer could throw both fatal and non-fatal errors as 
> KafkaException, which makes the exception catching hard. Furthermore, not 
> every single fatal exception (checked) is marked on the function signature 
> yet as of 2.7.
> We should have a general supporting strategy in long term for this matter, as 
> whether to declare all non-fatal exceptions as wrapped KafkaException while 
> extracting all fatal ones, or just add a flag to KafkaException indicating 
> fatal or not, to maintain binary compatibility.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Commented] (KAFKA-10733) Enforce exception thrown for KafkaProducer txn APIs

2020-11-23 Thread Boyang Chen (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-10733?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17237764#comment-17237764
 ] 

Boyang Chen commented on KAFKA-10733:
-

Several agreements being made on offline sync:



1. We should catch certain fatal exceptions and throw task migrated, such as 
ProducerFenced
2. We should have a new producer exception type to wrap all non-fatal exception 
to let user catch them and throw as task corrupted
3. We should still crash the stream thread for certain fatal exceptions, such 
as AuthorizationException

> Enforce exception thrown for KafkaProducer txn APIs
> ---
>
> Key: KAFKA-10733
> URL: https://issues.apache.org/jira/browse/KAFKA-10733
> Project: Kafka
>  Issue Type: Improvement
>Reporter: Boyang Chen
>Priority: Major
>  Labels: need-kip
>
> In general, KafkaProducer could throw both fatal and non-fatal errors as 
> KafkaException, which makes the exception catching hard. Furthermore, not 
> every single fatal exception (checked) is marked on the function signature 
> yet as of 2.7.
> We should have a general supporting strategy in long term for this matter, as 
> whether to declare all non-fatal exceptions as wrapped KafkaException while 
> extracting all fatal ones, or just add a flag to KafkaException indicating 
> fatal or not, to maintain binary compatibility.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[GitHub] [kafka] wcarlson5 opened a new pull request #9647: KAFKA-10500: Remove thread

2020-11-23 Thread GitBox


wcarlson5 opened a new pull request #9647:
URL: https://github.com/apache/kafka/pull/9647


   Add the ability to remove running threads
   
   *Summary of testing strategy (including rationale)
   for the feature or bug fix. Unit and/or integration
   tests are expected for any behaviour change and
   system tests should be considered for larger changes.*
   
   ### Committer Checklist (excluded from commit message)
   - [ ] Verify design and implementation 
   - [ ] Verify test coverage and CI build status
   - [ ] Verify documentation (including upgrade notes)
   



This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[jira] [Updated] (KAFKA-10733) Enforce exception thrown for KafkaProducer txn APIs

2020-11-23 Thread Boyang Chen (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-10733?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Boyang Chen updated KAFKA-10733:

Affects Version/s: 2.7.0
   2.5.0
   2.6.0

> Enforce exception thrown for KafkaProducer txn APIs
> ---
>
> Key: KAFKA-10733
> URL: https://issues.apache.org/jira/browse/KAFKA-10733
> Project: Kafka
>  Issue Type: Improvement
>Affects Versions: 2.5.0, 2.6.0, 2.7.0
>Reporter: Boyang Chen
>Priority: Major
>  Labels: need-kip
>
> In general, KafkaProducer could throw both fatal and non-fatal errors as 
> KafkaException, which makes the exception catching hard. Furthermore, not 
> every single fatal exception (checked) is marked on the function signature 
> yet as of 2.7.
> We should have a general supporting strategy in long term for this matter, as 
> whether to declare all non-fatal exceptions as wrapped KafkaException while 
> extracting all fatal ones, or just add a flag to KafkaException indicating 
> fatal or not, to maintain binary compatibility.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Commented] (KAFKA-10733) Enforce exception thrown for KafkaProducer txn APIs

2020-11-23 Thread Boyang Chen (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-10733?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17237765#comment-17237765
 ] 

Boyang Chen commented on KAFKA-10733:
-

Also we should understand why we see a trend of seeing more InvalidPid error 
since 2.5

> Enforce exception thrown for KafkaProducer txn APIs
> ---
>
> Key: KAFKA-10733
> URL: https://issues.apache.org/jira/browse/KAFKA-10733
> Project: Kafka
>  Issue Type: Improvement
>Reporter: Boyang Chen
>Priority: Major
>  Labels: need-kip
>
> In general, KafkaProducer could throw both fatal and non-fatal errors as 
> KafkaException, which makes the exception catching hard. Furthermore, not 
> every single fatal exception (checked) is marked on the function signature 
> yet as of 2.7.
> We should have a general supporting strategy in long term for this matter, as 
> whether to declare all non-fatal exceptions as wrapped KafkaException while 
> extracting all fatal ones, or just add a flag to KafkaException indicating 
> fatal or not, to maintain binary compatibility.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Updated] (KAFKA-10733) Enforce exception thrown for KafkaProducer txn APIs

2020-11-23 Thread Boyang Chen (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-10733?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Boyang Chen updated KAFKA-10733:

Component/s: streams
 producer 

> Enforce exception thrown for KafkaProducer txn APIs
> ---
>
> Key: KAFKA-10733
> URL: https://issues.apache.org/jira/browse/KAFKA-10733
> Project: Kafka
>  Issue Type: Improvement
>  Components: producer , streams
>Affects Versions: 2.5.0, 2.6.0, 2.7.0
>Reporter: Boyang Chen
>Priority: Major
>  Labels: need-kip
>
> In general, KafkaProducer could throw both fatal and non-fatal errors as 
> KafkaException, which makes the exception catching hard. Furthermore, not 
> every single fatal exception (checked) is marked on the function signature 
> yet as of 2.7.
> We should have a general supporting strategy in long term for this matter, as 
> whether to declare all non-fatal exceptions as wrapped KafkaException while 
> extracting all fatal ones, or just add a flag to KafkaException indicating 
> fatal or not, to maintain binary compatibility.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[GitHub] [kafka] wcarlson5 commented on a change in pull request #9647: KAFKA-10500: Remove thread

2020-11-23 Thread GitBox


wcarlson5 commented on a change in pull request #9647:
URL: https://github.com/apache/kafka/pull/9647#discussion_r529087297



##
File path: streams/src/main/java/org/apache/kafka/streams/KafkaStreams.java
##
@@ -894,19 +885,126 @@ private KafkaStreams(final InternalTopologyBuilder 
internalTopologyBuilder,
 queryableStoreProvider = new QueryableStoreProvider(storeProviders, 
globalStateStoreProvider);
 
 stateDirCleaner = setupStateDirCleaner();
-oldHandler = false;
 maybeWarnAboutCodeInRocksDBConfigSetter(log, config);
 rocksDBMetricsRecordingService = 
maybeCreateRocksDBMetricsRecordingService(clientId, config);
 }
 
+private StreamThread createStreamThread(final long cacheSizePerThread, 
final int threadIdx) {
+final StreamThread streamThread = StreamThread.create(
+internalTopologyBuilder,
+config,
+clientSupplier,
+adminClient,
+processId,
+clientId,
+streamsMetrics,
+time,
+streamsMetadataState,
+cacheSizePerThread,
+stateDirectory,
+delegatingStateRestoreListener,
+threadIdx,
+KafkaStreams.this::closeToError,
+streamsUncaughtExceptionHandler
+);
+threads.add(streamThread);
+threadState.put(streamThread.getId(), streamThread.state());
+storeProviders.add(new StreamThreadStateStoreProvider(streamThread));
+return streamThread;
+}
+
+/**
+ * Adds and starts a stream thread in addition to the stream threads that 
are already running in this
+ * Kafka Streams client.
+ *
+ * Since the number of stream threads increases, the sizes of the caches 
in the new stream thread
+ * and the existing stream threads are adapted so that the sum of the 
cache sizes over all stream
+ * threads does not exceed the total cache size specified in configuration
+ * {@code cache.max.bytes.buffering}.
+ *
+ * Stream threads can only be added if this Kafka Streams client is in 
state RUNNING or REBALANCING.
+ *
+ * @return name of the added stream thread or empty if a new stream thread 
could not be added
+ */
+public Optional addStreamThread() {
+if (isRunningOrRebalancing()) {
+final int threadIdx = getNextThreadIndex();
+final long cacheSizePerThread = 
getCacheSizePerThread(threads.size() + 1);
+resizeThreadCache(cacheSizePerThread);
+final StreamThread streamThread = 
createStreamThread(cacheSizePerThread, threadIdx);
+streamThread.setStateListener(streamStateListener);
+synchronized (stateLock) {
+if (isRunningOrRebalancing()) {
+streamThread.start();
+} else {
+return Optional.empty();
+}
+}
+return Optional.of(streamThread.getName());
+} else {
+return Optional.empty();
+}
+}
+
+/**
+ * Removes one stream thread out of the running stream threads from this 
Kafka Streams client.
+ *
+ * The removed stream thread is gracefully shut down. This method does not 
specify which stream
+ * thread is shut down.
+ *
+ * Since the number of stream threads decreases, the sizes of the caches 
in the remaining stream
+ * threads are adapted so that the sum of the cache sizes over all stream 
threads equals the total
+ * cache size specified in configuration {@code cache.max.bytes.buffering}.
+ *
+ * @return name of the removed stream thread or empty if a stream thread 
could not be removed because
+ * no stream threads are alive
+ */
+public Optional removeStreamThread() {
+if (isRunningOrRebalancing()) {
+for (final StreamThread streamThread : threads) {
+if (streamThread.isAlive()) {
+streamThread.shutdown();
+while (streamThread.isAlive() && 
streamThread.getName().equals(Thread.currentThread().getName())) {

Review comment:
   In the Kip we promise to block until the thread is finished being 
shutdown, I don't think this is best way to do this. However I am not sure we 
will really want to at all. There is also a problem if the calling thread is 
the thread to be shutdown.

##
File path: streams/src/main/java/org/apache/kafka/streams/KafkaStreams.java
##
@@ -894,19 +885,126 @@ private KafkaStreams(final InternalTopologyBuilder 
internalTopologyBuilder,
 queryableStoreProvider = new QueryableStoreProvider(storeProviders, 
globalStateStoreProvider);
 
 stateDirCleaner = setupStateDirCleaner();
-oldHandler = false;
 maybeWarnAboutCodeInRocksDBConfigSetter(log, config);
 rocksDBMetricsRecordingService = 
maybe

[GitHub] [kafka] d8tltanc commented on a change in pull request #9485: KAKFA-10619: Idempotent producer will get authorized once it has a WRITE access to at least one topic

2020-11-23 Thread GitBox


d8tltanc commented on a change in pull request #9485:
URL: https://github.com/apache/kafka/pull/9485#discussion_r529089666



##
File path: core/src/main/scala/kafka/security/authorizer/AclAuthorizer.scala
##
@@ -307,6 +312,111 @@ class AclAuthorizer extends Authorizer with Logging {
 if (zkClient != null) zkClient.close()
   }
 
+  // TODO: 1. Discuss how to log audit message
+  // TODO: 2. Discuss if we need a trie to optimize(mainly for the O(n^2) loop 
but I think
+  //  in most of the cases it would be O(1) because denyDominatePrefixAllow 
should be rare
+  override def authorizeByResourceType(requestContext: 
AuthorizableRequestContext,
+   op: AclOperation,
+   resourceType: ResourceType): 
AuthorizationResult = {
+if (resourceType eq ResourceType.ANY)
+  throw new IllegalArgumentException("Must specify a non-filter resource 
type for authorizeByResourceType")
+
+if (resourceType eq ResourceType.UNKNOWN)
+  throw new IllegalArgumentException("Unknown resource type")
+
+if (op eq AclOperation.ANY)
+  throw new IllegalArgumentException("Must specify a non-filter operation 
type for authorizeByResourceType")
+
+if (op eq AclOperation.UNKNOWN)
+  throw new IllegalArgumentException("Unknown operation type")
+
+val allowPatterns = matchingPatterns(
+  requestContext.principal().toString,
+  requestContext.clientAddress().getHostAddress,
+  op,
+  resourceType,
+  AclPermissionType.ALLOW
+)
+
+val denyPatterns = matchingPatterns(
+  requestContext.principal().toString,
+  requestContext.clientAddress().getHostAddress,
+  op,
+  resourceType,
+  AclPermissionType.DENY

Review comment:
   Good point. Deferred the collection generation until we need it. 
   
   commit 3906f978e62255ff266f081bf646a4b3c6b896ad 





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] d8tltanc commented on a change in pull request #9485: KAKFA-10619: Idempotent producer will get authorized once it has a WRITE access to at least one topic

2020-11-23 Thread GitBox


d8tltanc commented on a change in pull request #9485:
URL: https://github.com/apache/kafka/pull/9485#discussion_r529089823



##
File path: core/src/main/scala/kafka/security/authorizer/AuthorizerWrapper.scala
##
@@ -175,4 +179,69 @@ class AuthorizerWrapper(private[kafka] val baseAuthorizer: 
kafka.security.auth.A
   override def close(): Unit = {
 baseAuthorizer.close()
   }
+
+  override def authorizeByResourceType(requestContext: 
AuthorizableRequestContext,
+   op: AclOperation,
+   resourceType: ResourceType): 
AuthorizationResult = {
+if (resourceType == ResourceType.ANY)
+  throw new IllegalArgumentException("Must specify a non-filter resource 
type for authorizeByResourceType")
+
+if (resourceType == ResourceType.UNKNOWN)
+  throw new IllegalArgumentException("Unknown resource type")
+
+if (op == AclOperation.ANY)
+  throw new IllegalArgumentException("Must specify a non-filter operation 
type for authorizeByResourceType")
+
+if (op == AclOperation.UNKNOWN)
+  throw new IllegalArgumentException("Unknown operation type")
+
+if (shouldAllowEveryoneIfNoAclIsFound && !denyAllResource(requestContext, 
op, resourceType)) {
+  AuthorizationResult.ALLOWED
+} else {
+  super.authorizeByResourceType(requestContext, op, resourceType)
+}
+  }
+
+  def denyAllResource(requestContext: AuthorizableRequestContext,

Review comment:
   Good catch. 
   
   commit 3906f978e62255ff266f081bf646a4b3c6b896ad 





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] d8tltanc commented on a change in pull request #9485: KAKFA-10619: Idempotent producer will get authorized once it has a WRITE access to at least one topic

2020-11-23 Thread GitBox


d8tltanc commented on a change in pull request #9485:
URL: https://github.com/apache/kafka/pull/9485#discussion_r529090156



##
File path: core/src/main/scala/kafka/security/authorizer/AuthorizerWrapper.scala
##
@@ -175,4 +179,69 @@ class AuthorizerWrapper(private[kafka] val baseAuthorizer: 
kafka.security.auth.A
   override def close(): Unit = {
 baseAuthorizer.close()
   }
+
+  override def authorizeByResourceType(requestContext: 
AuthorizableRequestContext,
+   op: AclOperation,
+   resourceType: ResourceType): 
AuthorizationResult = {
+if (resourceType == ResourceType.ANY)
+  throw new IllegalArgumentException("Must specify a non-filter resource 
type for authorizeByResourceType")
+
+if (resourceType == ResourceType.UNKNOWN)
+  throw new IllegalArgumentException("Unknown resource type")
+
+if (op == AclOperation.ANY)
+  throw new IllegalArgumentException("Must specify a non-filter operation 
type for authorizeByResourceType")
+
+if (op == AclOperation.UNKNOWN)
+  throw new IllegalArgumentException("Unknown operation type")
+
+if (shouldAllowEveryoneIfNoAclIsFound && !denyAllResource(requestContext, 
op, resourceType)) {
+  AuthorizationResult.ALLOWED
+} else {
+  super.authorizeByResourceType(requestContext, op, resourceType)
+}
+  }
+
+  def denyAllResource(requestContext: AuthorizableRequestContext,
+  op: AclOperation,
+  resourceType: ResourceType): Boolean = {
+val resourceTypeFilter = new ResourcePatternFilter(
+  resourceType, null, PatternType.ANY)
+val accessControlEntry = new AccessControlEntryFilter(
+  null, null, null, AclPermissionType.DENY)
+val aclFilter = new AclBindingFilter(resourceTypeFilter, 
accessControlEntry)
+
+for (binding <- acls(aclFilter).asScala) {
+  if (aceMatched(requestContext, op, binding) && 
canDenyAll(binding.pattern()))
+return true
+}
+false
+  }
+
+  def aceMatched(requestContext: AuthorizableRequestContext,
+ op: AclOperation,
+ binding: AclBinding): Boolean = {
+(hostMatched(requestContext, binding) && principleMatched(requestContext, 
binding)

Review comment:
   Yeah. But I'd guess that the compiler will optimize for us. 
   
   commit 3906f978e62255ff266f081bf646a4b3c6b896ad 





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] ableegoldman opened a new pull request #9648: KAFKA-10758: ProcessorTopology should only consider its own nodes when updating regex source topics

2020-11-23 Thread GitBox


ableegoldman opened a new pull request #9648:
URL: https://github.com/apache/kafka/pull/9648


   The problem is basically just that we compare two incompatible sets in 
ProcessorTopology#updateSourceTopics: the local `sourceNodesByName` map only 
contains nodes that correspond to a particular subtopology whereas the 
passed-in `nodeToSourceTopics` ultimately comes from the 
InternalTopologyBuilder's map, which contains nodes for the entire topology. So 
we would end up hitting the IllegalStateException thrown in 
#updateSourceTopics` any time we tried to update an application with more than 
one subtopology.
   
   The fix is simple, we just need to ignore any source nodes that aren't part 
of the ProcessorTopology's subtopology.



This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] ableegoldman commented on a change in pull request #9648: KAFKA-10758: ProcessorTopology should only consider its own nodes when updating regex source topics

2020-11-23 Thread GitBox


ableegoldman commented on a change in pull request #9648:
URL: https://github.com/apache/kafka/pull/9648#discussion_r529101499



##
File path: 
streams/src/main/java/org/apache/kafka/streams/processor/internals/AbstractTask.java
##
@@ -154,9 +154,9 @@ final void transitionTo(final Task.State newState) {
 }
 
 @Override
-public void update(final Set topicPartitions, final 
Map> nodeToSourceTopics) {

Review comment:
   Since the root cause of this bug was basically just confusion over what 
exactly this set contains, a renaming feels in order





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] ableegoldman commented on a change in pull request #9648: KAFKA-10758: ProcessorTopology should only consider its own nodes when updating regex source topics

2020-11-23 Thread GitBox


ableegoldman commented on a change in pull request #9648:
URL: https://github.com/apache/kafka/pull/9648#discussion_r529104011



##
File path: 
streams/src/main/java/org/apache/kafka/streams/processor/internals/ProcessorTopology.java
##
@@ -149,24 +149,30 @@ public boolean hasPersistentGlobalStore() {
 return false;
 }
 
-public void updateSourceTopics(final Map> 
sourceTopicsByName) {
-if (!sourceTopicsByName.keySet().equals(sourceNodesByName.keySet())) {
-log.error("Set of source nodes do not match: \n" +
-"sourceNodesByName = {}\n" +
-"sourceTopicsByName = {}",
-sourceNodesByName.keySet(), sourceTopicsByName.keySet());
-throw new IllegalStateException("Tried to update source topics but 
source nodes did not match");
-}
+public void updateSourceTopics(final Map> 
allSourceTopicsByNodeName) {
 sourceNodesByTopic.clear();
-for (final Map.Entry> sourceEntry : 
sourceTopicsByName.entrySet()) {
-final String nodeName = sourceEntry.getKey();
-for (final String topic : sourceEntry.getValue()) {
+for (final Map.Entry> sourceNodeEntry : 
sourceNodesByName.entrySet()) {

Review comment:
   In addition to removing the fault check, I slightly refactored this loop 
so that we only loop over the source nodes in this particular subtopology. 
Previously we would have added entries for all source nodes across the entire 
topology to our `sourceNodesByTopic` map





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] jsancio commented on a change in pull request #9631: KAFKA-9672: Leader with ISR as a superset of replicas

2020-11-23 Thread GitBox


jsancio commented on a change in pull request #9631:
URL: https://github.com/apache/kafka/pull/9631#discussion_r529105990



##
File path: core/src/main/scala/kafka/cluster/Partition.scala
##
@@ -947,9 +947,10 @@ class Partition(val topicPartition: TopicPartition,
   leaderEndOffset: Long,
   currentTimeMs: Long,
   maxLagMs: Long): Boolean = {
-val followerReplica = getReplicaOrException(replicaId)
-followerReplica.logEndOffset != leaderEndOffset &&
-  (currentTimeMs - followerReplica.lastCaughtUpTimeMs) > maxLagMs
+getReplica(replicaId).fold(true) { followerReplica =>

Review comment:
   Thanks for the review!
   
   > This might be ok, but is unnecessary work since the controller will be 
doing that soon.
   
   According to some users and the report from KAFKA-9672, it looks like under 
some conditions the controller is writing to ZK that it removed the replica 
from the assignment but not from the ISR. I am unable to reproduce this or 
convince myself from the code on how this can happen.
   
   I was thinking of defensively letting the leader also remove the replica 
from the ISR so that Kafka can recover from this case. If the leader is not 
allowed to do this then `ack=all` produce messages will continue to fail.
   
   What do you think @junrao?





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] ableegoldman commented on a change in pull request #9648: KAFKA-10758: ProcessorTopology should only consider its own nodes when updating regex source topics

2020-11-23 Thread GitBox


ableegoldman commented on a change in pull request #9648:
URL: https://github.com/apache/kafka/pull/9648#discussion_r529109355



##
File path: 
streams/src/test/java/org/apache/kafka/streams/integration/RegexSourceIntegrationTest.java
##
@@ -198,9 +200,16 @@ public void 
testRegexRecordsAreProcessedAfterReassignment() throws Exception {
 
 final StreamsBuilder builder = new StreamsBuilder();
 final KStream pattern1Stream = 
builder.stream(Pattern.compile("TEST-TOPIC-\\d"));
-pattern1Stream.to(outputTopic, Produced.with(Serdes.String(), 
Serdes.String()));
+final KStream otherStream = 
builder.stream(Pattern.compile("not-a-match"));

Review comment:
   We didn't catch the bug in this test for two reasons: it has only one 
subtopology, and it didn't wait for Streams to get to RUNNING before it created 
the new topic. So we weren't even covering the "update source topics" code path 
since all topics existed by the first assignment





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] ableegoldman commented on a change in pull request #9648: KAFKA-10758: ProcessorTopology should only consider its own nodes when updating regex source topics

2020-11-23 Thread GitBox


ableegoldman commented on a change in pull request #9648:
URL: https://github.com/apache/kafka/pull/9648#discussion_r529109803



##
File path: 
streams/src/test/java/org/apache/kafka/streams/integration/RegexSourceIntegrationTest.java
##
@@ -198,9 +200,16 @@ public void 
testRegexRecordsAreProcessedAfterReassignment() throws Exception {
 
 final StreamsBuilder builder = new StreamsBuilder();
 final KStream pattern1Stream = 
builder.stream(Pattern.compile("TEST-TOPIC-\\d"));
-pattern1Stream.to(outputTopic, Produced.with(Serdes.String(), 
Serdes.String()));
+final KStream otherStream = 
builder.stream(Pattern.compile("not-a-match"));
+
+pattern1Stream

Review comment:
   Technically it's sufficient to just add the second KStream above for a 
multi-subtopology application, but I felt the test coverage could only stand to 
benefit with (slightly) more complicated examples





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[jira] [Commented] (KAFKA-10758) Kafka Streams consuming from a pattern goes to PENDING_SHUTDOWN when adding a new topic

2020-11-23 Thread A. Sophie Blee-Goldman (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-10758?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17237778#comment-17237778
 ] 

A. Sophie Blee-Goldman commented on KAFKA-10758:


Hey [~davideicardi], thanks for submitting this ticket. Looks like there's a 
bug in the topic update logic. I've opened a PR which we should be able to get 
into the 2.6.1 and 2.7.0 releases.

Just to answer some of your other questions: yes, this error wouldn't appear 
right after the topic was created but only once the Streams app refreshed its 
topic metadata (5min default as you said). Yes, it should be safe to just 
restart the application after hitting this error as a workaround. And no, this 
was not by design :) 

Sorry for the trouble. Obviously I'd recommend upgrading to 2.6.1 to get the 
fix once it's been released, but for now you should be ok to just ignore it and 
start up the application again. You'll only hit this error once, since after 
the restart there's no need to update anything

> Kafka Streams consuming from a pattern goes to PENDING_SHUTDOWN when adding a 
> new topic
> ---
>
> Key: KAFKA-10758
> URL: https://issues.apache.org/jira/browse/KAFKA-10758
> Project: Kafka
>  Issue Type: Bug
>  Components: streams
>Affects Versions: 2.6.0
>Reporter: Davide Icardi
>Assignee: A. Sophie Blee-Goldman
>Priority: Major
>
> I have a simple Kafka Stream app that consumes from multiple input topics 
> using the _stream_ function that accepts a Pattern 
> ([link|https://kafka.apache.org/26/javadoc/org/apache/kafka/streams/StreamsBuilder.html#stream-java.util.regex.Pattern-]).
>  
> Whenever I add a new topic that matches the pattern the kafka stream state 
> goes to REBALANCING -> ERROR -> PENDING_SHUTDOWN .
> If I restart the app it correctly starts reading again without problems.
> It is by design? Should I handle this and simply restart the app?
>  
> Kafka Stream version is 2.6.0.
> The error is the following:
> {code:java}
> ERROR o.a.k.s.p.i.ProcessorTopology - Set of source nodes do not match:
> sourceNodesByName = [KSTREAM-SOURCE-03, KSTREAM-SOURCE-02]
> sourceTopicsByName = [KSTREAM-SOURCE-00, KSTREAM-SOURCE-14, 
> KSTREAM-SOURCE-03, KSTREAM-SOURCE-02]
> org.apache.kafka.common.KafkaException: User rebalance callback throws an 
> error
>   at 
> org.apache.kafka.clients.consumer.internals.ConsumerCoordinator.onJoinComplete(ConsumerCoordinator.java:436)
>   at 
> org.apache.kafka.clients.consumer.internals.AbstractCoordinator.joinGroupIfNeeded(AbstractCoordinator.java:440)
>   at 
> org.apache.kafka.clients.consumer.internals.AbstractCoordinator.ensureActiveGroup(AbstractCoordinator.java:359)
>   at 
> org.apache.kafka.clients.consumer.internals.ConsumerCoordinator.poll(ConsumerCoordinator.java:513)
>   at 
> org.apache.kafka.clients.consumer.KafkaConsumer.updateAssignmentMetadataIfNeeded(KafkaConsumer.java:1268)
>   at 
> org.apache.kafka.clients.consumer.KafkaConsumer.poll(KafkaConsumer.java:1230)
>   at 
> org.apache.kafka.clients.consumer.KafkaConsumer.poll(KafkaConsumer.java:1210)
>   at 
> org.apache.kafka.streams.processor.internals.StreamThread.pollRequests(StreamThread.java:766)
>   at 
> org.apache.kafka.streams.processor.internals.StreamThread.runOnce(StreamThread.java:624)
>   at 
> org.apache.kafka.streams.processor.internals.StreamThread.runLoop(StreamThread.java:551)
>   at 
> org.apache.kafka.streams.processor.internals.StreamThread.run(StreamThread.java:510)
>  Caused by: java.lang.IllegalStateException: Tried to update source topics 
> but source nodes did not match
>   at 
> org.apache.kafka.streams.processor.internals.ProcessorTopology.updateSourceTopics(ProcessorTopology.java:151)
>   at 
> org.apache.kafka.streams.processor.internals.AbstractTask.update(AbstractTask.java:109)
>   at 
> org.apache.kafka.streams.processor.internals.StreamTask.update(StreamTask.java:514)
>   at 
> org.apache.kafka.streams.processor.internals.TaskManager.updateInputPartitionsAndResume(TaskManager.java:397)
>   at 
> org.apache.kafka.streams.processor.internals.TaskManager.handleAssignment(TaskManager.java:261)
>   at 
> org.apache.kafka.streams.processor.internals.StreamsPartitionAssignor.onAssignment(StreamsPartitionAssignor.java:1428)
>   at 
> org.apache.kafka.clients.consumer.internals.ConsumerCoordinator.invokeOnAssignment(ConsumerCoordinator.java:279)
>   at 
> org.apache.kafka.clients.consumer.internals.ConsumerCoordinator.onJoinComplete(ConsumerCoordinator.java:421)
>   ... 10 common frames omitted
>  KafkaStream state is ERROR
>  17:28:53.200 [datalake-StreamThread-1] ERROR 
> o.apache.kafka.streams.KafkaStreams - stream-client [datalake] All stream 
> threads have died. The insta

[jira] [Updated] (KAFKA-10758) Kafka Streams consuming from a pattern goes to PENDING_SHUTDOWN when adding a new topic

2020-11-23 Thread A. Sophie Blee-Goldman (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-10758?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

A. Sophie Blee-Goldman updated KAFKA-10758:
---
Priority: Blocker  (was: Major)

> Kafka Streams consuming from a pattern goes to PENDING_SHUTDOWN when adding a 
> new topic
> ---
>
> Key: KAFKA-10758
> URL: https://issues.apache.org/jira/browse/KAFKA-10758
> Project: Kafka
>  Issue Type: Bug
>  Components: streams
>Affects Versions: 2.6.0
>Reporter: Davide Icardi
>Assignee: A. Sophie Blee-Goldman
>Priority: Blocker
> Fix For: 2.7.0, 2.6.1
>
>
> I have a simple Kafka Stream app that consumes from multiple input topics 
> using the _stream_ function that accepts a Pattern 
> ([link|https://kafka.apache.org/26/javadoc/org/apache/kafka/streams/StreamsBuilder.html#stream-java.util.regex.Pattern-]).
>  
> Whenever I add a new topic that matches the pattern the kafka stream state 
> goes to REBALANCING -> ERROR -> PENDING_SHUTDOWN .
> If I restart the app it correctly starts reading again without problems.
> It is by design? Should I handle this and simply restart the app?
>  
> Kafka Stream version is 2.6.0.
> The error is the following:
> {code:java}
> ERROR o.a.k.s.p.i.ProcessorTopology - Set of source nodes do not match:
> sourceNodesByName = [KSTREAM-SOURCE-03, KSTREAM-SOURCE-02]
> sourceTopicsByName = [KSTREAM-SOURCE-00, KSTREAM-SOURCE-14, 
> KSTREAM-SOURCE-03, KSTREAM-SOURCE-02]
> org.apache.kafka.common.KafkaException: User rebalance callback throws an 
> error
>   at 
> org.apache.kafka.clients.consumer.internals.ConsumerCoordinator.onJoinComplete(ConsumerCoordinator.java:436)
>   at 
> org.apache.kafka.clients.consumer.internals.AbstractCoordinator.joinGroupIfNeeded(AbstractCoordinator.java:440)
>   at 
> org.apache.kafka.clients.consumer.internals.AbstractCoordinator.ensureActiveGroup(AbstractCoordinator.java:359)
>   at 
> org.apache.kafka.clients.consumer.internals.ConsumerCoordinator.poll(ConsumerCoordinator.java:513)
>   at 
> org.apache.kafka.clients.consumer.KafkaConsumer.updateAssignmentMetadataIfNeeded(KafkaConsumer.java:1268)
>   at 
> org.apache.kafka.clients.consumer.KafkaConsumer.poll(KafkaConsumer.java:1230)
>   at 
> org.apache.kafka.clients.consumer.KafkaConsumer.poll(KafkaConsumer.java:1210)
>   at 
> org.apache.kafka.streams.processor.internals.StreamThread.pollRequests(StreamThread.java:766)
>   at 
> org.apache.kafka.streams.processor.internals.StreamThread.runOnce(StreamThread.java:624)
>   at 
> org.apache.kafka.streams.processor.internals.StreamThread.runLoop(StreamThread.java:551)
>   at 
> org.apache.kafka.streams.processor.internals.StreamThread.run(StreamThread.java:510)
>  Caused by: java.lang.IllegalStateException: Tried to update source topics 
> but source nodes did not match
>   at 
> org.apache.kafka.streams.processor.internals.ProcessorTopology.updateSourceTopics(ProcessorTopology.java:151)
>   at 
> org.apache.kafka.streams.processor.internals.AbstractTask.update(AbstractTask.java:109)
>   at 
> org.apache.kafka.streams.processor.internals.StreamTask.update(StreamTask.java:514)
>   at 
> org.apache.kafka.streams.processor.internals.TaskManager.updateInputPartitionsAndResume(TaskManager.java:397)
>   at 
> org.apache.kafka.streams.processor.internals.TaskManager.handleAssignment(TaskManager.java:261)
>   at 
> org.apache.kafka.streams.processor.internals.StreamsPartitionAssignor.onAssignment(StreamsPartitionAssignor.java:1428)
>   at 
> org.apache.kafka.clients.consumer.internals.ConsumerCoordinator.invokeOnAssignment(ConsumerCoordinator.java:279)
>   at 
> org.apache.kafka.clients.consumer.internals.ConsumerCoordinator.onJoinComplete(ConsumerCoordinator.java:421)
>   ... 10 common frames omitted
>  KafkaStream state is ERROR
>  17:28:53.200 [datalake-StreamThread-1] ERROR 
> o.apache.kafka.streams.KafkaStreams - stream-client [datalake] All stream 
> threads have died. The instance will be in error state and should be closed.
>  > User rebalance callback throws an error
>  KafkaStream state is PENDING_SHUTDOWN
> {code}
>  
>  



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[GitHub] [kafka] chia7712 closed pull request #9612: MINOR: add sudo to travis.yml to run ducktape

2020-11-23 Thread GitBox


chia7712 closed pull request #9612:
URL: https://github.com/apache/kafka/pull/9612


   



This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[jira] [Updated] (KAFKA-10758) Kafka Streams consuming from a pattern goes to PENDING_SHUTDOWN when adding a new topic

2020-11-23 Thread A. Sophie Blee-Goldman (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-10758?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

A. Sophie Blee-Goldman updated KAFKA-10758:
---
Fix Version/s: 2.6.1
   2.7.0

> Kafka Streams consuming from a pattern goes to PENDING_SHUTDOWN when adding a 
> new topic
> ---
>
> Key: KAFKA-10758
> URL: https://issues.apache.org/jira/browse/KAFKA-10758
> Project: Kafka
>  Issue Type: Bug
>  Components: streams
>Affects Versions: 2.6.0
>Reporter: Davide Icardi
>Assignee: A. Sophie Blee-Goldman
>Priority: Major
> Fix For: 2.7.0, 2.6.1
>
>
> I have a simple Kafka Stream app that consumes from multiple input topics 
> using the _stream_ function that accepts a Pattern 
> ([link|https://kafka.apache.org/26/javadoc/org/apache/kafka/streams/StreamsBuilder.html#stream-java.util.regex.Pattern-]).
>  
> Whenever I add a new topic that matches the pattern the kafka stream state 
> goes to REBALANCING -> ERROR -> PENDING_SHUTDOWN .
> If I restart the app it correctly starts reading again without problems.
> It is by design? Should I handle this and simply restart the app?
>  
> Kafka Stream version is 2.6.0.
> The error is the following:
> {code:java}
> ERROR o.a.k.s.p.i.ProcessorTopology - Set of source nodes do not match:
> sourceNodesByName = [KSTREAM-SOURCE-03, KSTREAM-SOURCE-02]
> sourceTopicsByName = [KSTREAM-SOURCE-00, KSTREAM-SOURCE-14, 
> KSTREAM-SOURCE-03, KSTREAM-SOURCE-02]
> org.apache.kafka.common.KafkaException: User rebalance callback throws an 
> error
>   at 
> org.apache.kafka.clients.consumer.internals.ConsumerCoordinator.onJoinComplete(ConsumerCoordinator.java:436)
>   at 
> org.apache.kafka.clients.consumer.internals.AbstractCoordinator.joinGroupIfNeeded(AbstractCoordinator.java:440)
>   at 
> org.apache.kafka.clients.consumer.internals.AbstractCoordinator.ensureActiveGroup(AbstractCoordinator.java:359)
>   at 
> org.apache.kafka.clients.consumer.internals.ConsumerCoordinator.poll(ConsumerCoordinator.java:513)
>   at 
> org.apache.kafka.clients.consumer.KafkaConsumer.updateAssignmentMetadataIfNeeded(KafkaConsumer.java:1268)
>   at 
> org.apache.kafka.clients.consumer.KafkaConsumer.poll(KafkaConsumer.java:1230)
>   at 
> org.apache.kafka.clients.consumer.KafkaConsumer.poll(KafkaConsumer.java:1210)
>   at 
> org.apache.kafka.streams.processor.internals.StreamThread.pollRequests(StreamThread.java:766)
>   at 
> org.apache.kafka.streams.processor.internals.StreamThread.runOnce(StreamThread.java:624)
>   at 
> org.apache.kafka.streams.processor.internals.StreamThread.runLoop(StreamThread.java:551)
>   at 
> org.apache.kafka.streams.processor.internals.StreamThread.run(StreamThread.java:510)
>  Caused by: java.lang.IllegalStateException: Tried to update source topics 
> but source nodes did not match
>   at 
> org.apache.kafka.streams.processor.internals.ProcessorTopology.updateSourceTopics(ProcessorTopology.java:151)
>   at 
> org.apache.kafka.streams.processor.internals.AbstractTask.update(AbstractTask.java:109)
>   at 
> org.apache.kafka.streams.processor.internals.StreamTask.update(StreamTask.java:514)
>   at 
> org.apache.kafka.streams.processor.internals.TaskManager.updateInputPartitionsAndResume(TaskManager.java:397)
>   at 
> org.apache.kafka.streams.processor.internals.TaskManager.handleAssignment(TaskManager.java:261)
>   at 
> org.apache.kafka.streams.processor.internals.StreamsPartitionAssignor.onAssignment(StreamsPartitionAssignor.java:1428)
>   at 
> org.apache.kafka.clients.consumer.internals.ConsumerCoordinator.invokeOnAssignment(ConsumerCoordinator.java:279)
>   at 
> org.apache.kafka.clients.consumer.internals.ConsumerCoordinator.onJoinComplete(ConsumerCoordinator.java:421)
>   ... 10 common frames omitted
>  KafkaStream state is ERROR
>  17:28:53.200 [datalake-StreamThread-1] ERROR 
> o.apache.kafka.streams.KafkaStreams - stream-client [datalake] All stream 
> threads have died. The instance will be in error state and should be closed.
>  > User rebalance callback throws an error
>  KafkaStream state is PENDING_SHUTDOWN
> {code}
>  
>  



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[GitHub] [kafka] chia7712 commented on pull request #9642: MINOR: Merging WorkerTask constructor

2020-11-23 Thread GitBox


chia7712 commented on pull request #9642:
URL: https://github.com/apache/kafka/pull/9642#issuecomment-732532490


   @abc863377 Could you correct the title?



This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] chia7712 commented on pull request #9644: KAFKA-10565: Only print console producer prompt with a tty

2020-11-23 Thread GitBox


chia7712 commented on pull request #9644:
URL: https://github.com/apache/kafka/pull/9644#issuecomment-732534529


   > I couldn't find a good way to test it except manually.
   
   @tombentley Could you attach the console screenshot ? I will test this PR 
manually later. 



This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] dengziming commented on a change in pull request #9622: KAFKA-10547; add topicId in MetadataResp

2020-11-23 Thread GitBox


dengziming commented on a change in pull request #9622:
URL: https://github.com/apache/kafka/pull/9622#discussion_r529122230



##
File path: clients/src/main/java/org/apache/kafka/common/Cluster.java
##
@@ -327,6 +350,18 @@ public Node controller() {
 return controller;
 }
 
+public Collection topicIds() {
+return topicIds.values();
+}
+
+public Uuid getTopicId(String topic) {
+return topicIds.getOrDefault(topic, Uuid.ZERO_UUID);
+}
+
+public String getTopicName(Uuid topiId) {

Review comment:
   This is not used so far but will be used when we get TopicMetadata by 
topicId, so just add it in advance.





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[jira] [Commented] (KAFKA-10550) Update AdminClient and kafka-topics.sh to support topic IDs

2020-11-23 Thread dengziming (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-10550?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17237791#comment-17237791
 ] 

dengziming commented on KAFKA-10550:


[~jolshan] Yes, I think the option to add topicId in metadataRequest is similar 
to add topicId in metadataResponse, so you can leave this to me.

I haven't started on the delete topics and create topics, you can assign them 
to yourself if you have spare time. I also would take on them if you don't have 
spare time.

> Update AdminClient and kafka-topics.sh to support topic IDs
> ---
>
> Key: KAFKA-10550
> URL: https://issues.apache.org/jira/browse/KAFKA-10550
> Project: Kafka
>  Issue Type: Sub-task
>Reporter: Justine Olshan
>Assignee: dengziming
>Priority: Major
>
> Change some AdminClient methods to expose and support topic IDs (describe, 
> delete, return the id on create)
>  
>  Make changes to kafka-topics.sh --describe so a user can specify a topic 
> name to describe with the --topic parameter, or alternatively the user can 
> supply a topic ID with the --topic_id parameter



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Updated] (KAFKA-10753) check if ConsumerConfig.AUTO_COMMIT_INTERVAL_MS_CONFIG > 0

2020-11-23 Thread lqjacklee (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-10753?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

lqjacklee updated KAFKA-10753:
--
Attachment: KAFKA-10753.patch

> check if ConsumerConfig.AUTO_COMMIT_INTERVAL_MS_CONFIG > 0 
> ---
>
> Key: KAFKA-10753
> URL: https://issues.apache.org/jira/browse/KAFKA-10753
> Project: Kafka
>  Issue Type: Improvement
>  Components: consumer
>Reporter: shiqihao
>Assignee: lqjacklee
>Priority: Minor
> Attachments: KAFKA-10753.patch
>
>
> I accidentally set ConsumerConfig.AUTO_COMMIT_INTERVAL_MS_CONFIG = 0, CPU 
> running at 100%.
> Could we add a check if ConsumerConfig.AUTO_COMMIT_INTERVAL_MS_CONFIG > 0 
> while start consumer?



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Commented] (KAFKA-10753) check if ConsumerConfig.AUTO_COMMIT_INTERVAL_MS_CONFIG > 0

2020-11-23 Thread lqjacklee (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-10753?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17237792#comment-17237792
 ] 

lqjacklee commented on KAFKA-10753:
---

[~17hao] I have uploaded the patch to cover the case. however unluckily we 
cannot replay it . please help check it. thanks .

> check if ConsumerConfig.AUTO_COMMIT_INTERVAL_MS_CONFIG > 0 
> ---
>
> Key: KAFKA-10753
> URL: https://issues.apache.org/jira/browse/KAFKA-10753
> Project: Kafka
>  Issue Type: Improvement
>  Components: consumer
>Reporter: shiqihao
>Assignee: lqjacklee
>Priority: Minor
> Attachments: KAFKA-10753.patch
>
>
> I accidentally set ConsumerConfig.AUTO_COMMIT_INTERVAL_MS_CONFIG = 0, CPU 
> running at 100%.
> Could we add a check if ConsumerConfig.AUTO_COMMIT_INTERVAL_MS_CONFIG > 0 
> while start consumer?



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Commented] (KAFKA-10753) check if ConsumerConfig.AUTO_COMMIT_INTERVAL_MS_CONFIG > 0

2020-11-23 Thread dengziming (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-10753?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17237795#comment-17237795
 ] 

dengziming commented on KAFKA-10753:


[~Jack-Lee] I think we should first reach a consensus on how this should be 
fixed, or we just ignore it.
 # If we just add a simple `time>0` check, there are so many configs to be 
checked.
 # If we alter the backoff in `KafkaConsumer.pollForFetches`, it is a 
non-trivial fix, we can create another Jira to track it.

> check if ConsumerConfig.AUTO_COMMIT_INTERVAL_MS_CONFIG > 0 
> ---
>
> Key: KAFKA-10753
> URL: https://issues.apache.org/jira/browse/KAFKA-10753
> Project: Kafka
>  Issue Type: Improvement
>  Components: consumer
>Reporter: shiqihao
>Assignee: lqjacklee
>Priority: Minor
> Attachments: KAFKA-10753.patch
>
>
> I accidentally set ConsumerConfig.AUTO_COMMIT_INTERVAL_MS_CONFIG = 0, CPU 
> running at 100%.
> Could we add a check if ConsumerConfig.AUTO_COMMIT_INTERVAL_MS_CONFIG > 0 
> while start consumer?



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[GitHub] [kafka] ijuma commented on pull request #9646: MINOR: Update snappy-java to 1.1.8.1

2020-11-23 Thread GitBox


ijuma commented on pull request #9646:
URL: https://github.com/apache/kafka/pull/9646#issuecomment-732631711


   JDK 8 and 11 builds passed, JDK 15 had two unrelated flaky failures:
   
   > Build / JDK 15 / 
org.apache.kafka.streams.integration.StreamTableJoinTopologyOptimizationIntegrationTest.shouldDoStreamTableJoinWithDifferentNumberOfPartitions[Optimization
 = all]
   > Build / JDK 15 / 
org.apache.kafka.streams.integration.StreamsUncaughtExceptionHandlerIntegrationTest.shouldShutdownApplication



This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] ijuma merged pull request #9646: MINOR: Update snappy-java to 1.1.8.1

2020-11-23 Thread GitBox


ijuma merged pull request #9646:
URL: https://github.com/apache/kafka/pull/9646


   



This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] ijuma commented on a change in pull request #9645: MINOR: Update build and test dependencies

2020-11-23 Thread GitBox


ijuma commented on a change in pull request #9645:
URL: https://github.com/apache/kafka/pull/9645#discussion_r529180046



##
File path: 
clients/src/main/java/org/apache/kafka/clients/admin/ScramMechanism.java
##
@@ -49,8 +53,11 @@ public static ScramMechanism fromType(byte type) {
  * Salted Challenge Response Authentication Mechanism (SCRAM) SASL and 
GSS-API Mechanisms, Section 4
  */
 public static ScramMechanism fromMechanismName(String mechanismName) {
-ScramMechanism retvalFoundMechanism = 
ScramMechanism.valueOf(mechanismName.replace('-', '_'));
-return retvalFoundMechanism != null ? retvalFoundMechanism : UNKNOWN;
+String normalizedMechanism = mechanismName.replace('-', '_');
+return Arrays.stream(VALUES)
+.filter(mechanism -> mechanism.name().equals(normalizedMechanism))
+.findFirst()
+.orElse(UNKNOWN);

Review comment:
   @rondagostino do we need to cherry-pick this change to the 2.7 branch? I 
am not sure under which context this method is used and whether it's important 
enough to backport.





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] chia7712 commented on a change in pull request #9645: MINOR: Update build and test dependencies

2020-11-23 Thread GitBox


chia7712 commented on a change in pull request #9645:
URL: https://github.com/apache/kafka/pull/9645#discussion_r529180414



##
File path: 
clients/src/main/java/org/apache/kafka/clients/admin/ScramMechanism.java
##
@@ -49,8 +53,11 @@ public static ScramMechanism fromType(byte type) {
  * Salted Challenge Response Authentication Mechanism (SCRAM) SASL and 
GSS-API Mechanisms, Section 4
  */
 public static ScramMechanism fromMechanismName(String mechanismName) {
-ScramMechanism retvalFoundMechanism = 
ScramMechanism.valueOf(mechanismName.replace('-', '_'));
-return retvalFoundMechanism != null ? retvalFoundMechanism : UNKNOWN;
+String normalizedMechanism = mechanismName.replace('-', '_');

Review comment:
   Could we reuse ```mechanismName``` 
(https://github.com/apache/kafka/blob/trunk/clients/src/main/java/org/apache/kafka/clients/admin/ScramMechanism.java#L79)
 so we don't need this duplicate replacement.





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] showuon commented on a change in pull request #9629: KAFKA-10754: fix flaky tests by waiting kafka streams be in running state before assert

2020-11-23 Thread GitBox


showuon commented on a change in pull request #9629:
URL: https://github.com/apache/kafka/pull/9629#discussion_r529182580



##
File path: streams/src/main/java/org/apache/kafka/streams/KafkaStreams.java
##
@@ -379,13 +379,12 @@ public void setUncaughtExceptionHandler(final 
Thread.UncaughtExceptionHandler un
 }
 
 /**
- * Set the handler invoked when an {@link 
StreamsConfig#NUM_STREAM_THREADS_CONFIG internal thread}
+ * Set the handler invoked when an {@link 
StreamsConfig#NUM_STREAM_THREADS_CONFIG} internal thread

Review comment:
   Updated. Thanks.





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] ijuma commented on a change in pull request #9643: MINOR: Upgrade to Scala 2.13.4 and 2.12.13

2020-11-23 Thread GitBox


ijuma commented on a change in pull request #9643:
URL: https://github.com/apache/kafka/pull/9643#discussion_r529043449



##
File path: core/src/main/scala/kafka/log/Log.scala
##
@@ -2418,13 +2418,11 @@ class Log(@volatile private var _dir: File,
   info(s"Replacing overflowed segment $segment with split segments 
$newSegments")
   replaceSegments(newSegments.toList, List(segment))
   newSegments.toList
-} catch {
-  case e: Exception =>
-newSegments.foreach { splitSegment =>
-  splitSegment.close()
-  splitSegment.deleteIfExists()
-}
-throw e
+} finally {
+  newSegments.foreach { splitSegment =>
+splitSegment.close()
+splitSegment.deleteIfExists()
+  }

Review comment:
   @dhruvilshah3 Is there a reason why we had a try/catch with a rethrow 
instead of using `finally`? spotBugs complained about it and hence the question.





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] showuon commented on pull request #9629: KAFKA-10754: fix flaky tests by waiting kafka streams be in running state before assert

2020-11-23 Thread GitBox


showuon commented on pull request #9629:
URL: https://github.com/apache/kafka/pull/9629#issuecomment-732646473


   @ableegoldman @wcarlson5 , thanks for the comments. Now I know why it sets 
the default treads to 2. So, to make the test more reliable, I'll do:
   1. wait for the state become running (as before)
   2. wait for the 1st time handler got called in current thread
   3. wait for the 2nd time handler got called after rebalancing
   4. wait for the stream state turned into ERROR state (as before)
   
   This should make this test more reliable. What do you think?



This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] showuon edited a comment on pull request #9629: KAFKA-10754: fix flaky tests by waiting kafka streams be in running state before assert

2020-11-23 Thread GitBox


showuon edited a comment on pull request #9629:
URL: https://github.com/apache/kafka/pull/9629#issuecomment-732646473


   @ableegoldman @wcarlson5 , thanks for the comments. Now I know why it sets 
the default treads to 2. So, to make the test more reliable, I'll do:
   1. wait for the state become running (as before)
   2. wait for the 1st time handler got called in current thread
   3. wait for the 2nd time handler got called after rebalancing
   4. wait for the stream state turned into ERROR state (as before)
   
   This should make this test more reliable. What do you think?
   commit: 
https://github.com/apache/kafka/pull/9629/commits/2b6d0a2d285b5b7fc0a9a8474712870f6f7a767e



This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[jira] [Commented] (KAFKA-7918) Streams store cleanup: inline byte-store generic parameters

2020-11-23 Thread Ming Liu (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-7918?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17237861#comment-17237861
 ] 

Ming Liu commented on KAFKA-7918:
-

[~ableegoldman], [~guozhang] [~mjsax],  one good benefit of generics based 
inMemoryStore is to save the serialization cost (one great benefit of using 
inMemoryStore).  In the matter of fact, we do have application relying on such 
inMemoryStore capability to improve the performance.  With this change (when we 
try upgrading from 2.2 to 2.5), a few of our applications can't keep it up 
anymore.  

We did writing code to create InMemoryKeyValueStore directly (bypass the 
Store.inMemoryKeyValueStore).   I think we should expose K, V based memory 
store directly, thoughts?

> Streams store cleanup: inline byte-store generic parameters
> ---
>
> Key: KAFKA-7918
> URL: https://issues.apache.org/jira/browse/KAFKA-7918
> Project: Kafka
>  Issue Type: Improvement
>  Components: streams
>Reporter: John Roesler
>Assignee: A. Sophie Blee-Goldman
>Priority: Major
> Fix For: 2.3.0
>
>
> Currently, the fundamental layer of stores in Streams is the "bytes store".
> The easiest way to identify this is in 
> `org.apache.kafka.streams.state.Stores`, all the `StoreBuilder`s require a 
> `XXBytesStoreSupplier`. 
> We provide several implementations of these bytes stores, typically an 
> in-memory one and a persistent one (aka RocksDB).
> Inside these bytes stores, the key is always `Bytes` and the value is always 
> `byte[]` (serialization happens at a higher level). However, the store 
> implementations are generically typed, just `K` and `V`.
> This is good for flexibility, but it makes the code a little harder to 
> understand. I think that we used to do serialization at a lower level, so the 
> generics are a hold-over from that.
> It would simplify the code if we just inlined the actual k/v types and maybe 
> even renamed the classes from (e.g.) `InMemoryKeyValueStore` to 
> `InMemoryKeyValueBytesStore`, and so forth.
>  



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[GitHub] [kafka] ijuma commented on a change in pull request #9645: MINOR: Update build and test dependencies

2020-11-23 Thread GitBox


ijuma commented on a change in pull request #9645:
URL: https://github.com/apache/kafka/pull/9645#discussion_r529202840



##
File path: 
clients/src/main/java/org/apache/kafka/clients/admin/ScramMechanism.java
##
@@ -49,8 +53,11 @@ public static ScramMechanism fromType(byte type) {
  * Salted Challenge Response Authentication Mechanism (SCRAM) SASL and 
GSS-API Mechanisms, Section 4
  */
 public static ScramMechanism fromMechanismName(String mechanismName) {
-ScramMechanism retvalFoundMechanism = 
ScramMechanism.valueOf(mechanismName.replace('-', '_'));
-return retvalFoundMechanism != null ? retvalFoundMechanism : UNKNOWN;
+String normalizedMechanism = mechanismName.replace('-', '_');

Review comment:
   Good suggestion, applied it.





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] lmr3796 commented on a change in pull request #9435: KAFKA-10606: Disable auto topic creation for fetch-all-topic-metadata request

2020-11-23 Thread GitBox


lmr3796 commented on a change in pull request #9435:
URL: https://github.com/apache/kafka/pull/9435#discussion_r529203652



##
File path: core/src/test/scala/unit/kafka/server/KafkaApisTest.scala
##
@@ -107,6 +108,11 @@ class KafkaApisTest {
   private val time = new MockTime
   private val clientId = ""
 
+  @Before
+  def setUp(): Unit = {

Review comment:
   hey @chia7712 ,
   
   I add this to let each test case to have a clean, uncorrupted 
`MetadataCache`.  Is there any historical context that we should avoid that 
cleanup?





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[jira] [Created] (KAFKA-10762) alter the backoff in `KafkaConsumer.pollForFetches`

2020-11-23 Thread lqjacklee (Jira)
lqjacklee created KAFKA-10762:
-

 Summary: alter the backoff in `KafkaConsumer.pollForFetches`
 Key: KAFKA-10762
 URL: https://issues.apache.org/jira/browse/KAFKA-10762
 Project: Kafka
  Issue Type: Bug
  Components: clients
Affects Versions: 2.6.0
Reporter: lqjacklee


Once the autoCommitIntervalMs is zero , the client will poll the request 
Frequently.  should we check the  autoCommitIntervalMs is zero ? 



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Commented] (KAFKA-10753) check if ConsumerConfig.AUTO_COMMIT_INTERVAL_MS_CONFIG > 0

2020-11-23 Thread lqjacklee (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-10753?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17237888#comment-17237888
 ] 

lqjacklee commented on KAFKA-10753:
---

[~dengziming] From my opinion , in the task we should just force on the how to 
check ConsumerConfig.AUTO_COMMIT_INTERVAL_MS_CONFIG. 


2, I have created another Jira to track it. 
(https://issues.apache.org/jira/browse/KAFKA-10762) 

> check if ConsumerConfig.AUTO_COMMIT_INTERVAL_MS_CONFIG > 0 
> ---
>
> Key: KAFKA-10753
> URL: https://issues.apache.org/jira/browse/KAFKA-10753
> Project: Kafka
>  Issue Type: Improvement
>  Components: consumer
>Reporter: shiqihao
>Assignee: lqjacklee
>Priority: Minor
> Attachments: KAFKA-10753.patch
>
>
> I accidentally set ConsumerConfig.AUTO_COMMIT_INTERVAL_MS_CONFIG = 0, CPU 
> running at 100%.
> Could we add a check if ConsumerConfig.AUTO_COMMIT_INTERVAL_MS_CONFIG > 0 
> while start consumer?



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[GitHub] [kafka] chia7712 commented on a change in pull request #9435: KAFKA-10606: Disable auto topic creation for fetch-all-topic-metadata request

2020-11-23 Thread GitBox


chia7712 commented on a change in pull request #9435:
URL: https://github.com/apache/kafka/pull/9435#discussion_r529240711



##
File path: core/src/test/scala/unit/kafka/server/KafkaApisTest.scala
##
@@ -107,6 +108,11 @@ class KafkaApisTest {
   private val time = new MockTime
   private val clientId = ""
 
+  @Before
+  def setUp(): Unit = {

Review comment:
   It seems to me following code is able to generate a clean, uncorrupted 
```MetadataCache``` for each test case.
   
   ```
   var metadataCache = new MetadataCache(brokerId)
   ```
   
   Also, it is simpler than ```@Before``` block.





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] chia7712 merged pull request #9642: MINOR: Remove redundant argument from TaskMetricsGroup#recordCommit

2020-11-23 Thread GitBox


chia7712 merged pull request #9642:
URL: https://github.com/apache/kafka/pull/9642


   



This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org