[GitHub] [kafka] showuon commented on a change in pull request #11173: KAFKA-13509: Support max timestamp in GetOffsetShell

2022-03-16 Thread GitBox


showuon commented on a change in pull request #11173:
URL: https://github.com/apache/kafka/pull/11173#discussion_r827678933



##
File path: core/src/main/scala/kafka/tools/GetOffsetShell.scala
##
@@ -103,59 +104,77 @@ object GetOffsetShell {
   throw new IllegalArgumentException("--topic-partitions cannot be used 
with --topic or --partitions")
 }
 
-val listOffsetsTimestamp = options.valueOf(timeOpt).longValue
+val offsetSpec = parseOffsetSpec(options.valueOf(timeOpt))
 
 val topicPartitionFilter = if (options.has(topicPartitionsOpt)) {
-  
createTopicPartitionFilterWithPatternList(options.valueOf(topicPartitionsOpt), 
excludeInternalTopics)
+  
createTopicPartitionFilterWithPatternList(options.valueOf(topicPartitionsOpt))
 } else {
-  val partitionIdsRequested = 
createPartitionSet(options.valueOf(partitionsOpt))
-
   createTopicPartitionFilterWithTopicAndPartitionPattern(
 if (options.has(topicOpt)) Some(options.valueOf(topicOpt)) else None,
-excludeInternalTopics,
-partitionIdsRequested
+options.valueOf(partitionsOpt)
   )
 }
 
 val config = if (options.has(commandConfigOpt))
   Utils.loadProps(options.valueOf(commandConfigOpt))
 else
   new Properties
-config.setProperty(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, brokerList)
-config.setProperty(ConsumerConfig.CLIENT_ID_CONFIG, clientId)
-val consumer = new KafkaConsumer(config, new ByteArrayDeserializer, new 
ByteArrayDeserializer)
+config.setProperty(AdminClientConfig.BOOTSTRAP_SERVERS_CONFIG, brokerList)
+config.setProperty(AdminClientConfig.CLIENT_ID_CONFIG, clientId)
+val client = Admin.create(config)
 
 try {
-  val partitionInfos = listPartitionInfos(consumer, topicPartitionFilter)
+  val partitionInfos = listPartitionInfos(client, topicPartitionFilter, 
excludeInternalTopics)
 
   if (partitionInfos.isEmpty) {
 throw new IllegalArgumentException("Could not match any 
topic-partitions with the specified filters")
   }
 
-  val topicPartitions = partitionInfos.flatMap { p =>
-if (p.leader == null) {
-  System.err.println(s"Error: topic-partition 
${p.topic}:${p.partition} does not have a leader. Skip getting offsets")
-  None
-} else
-  Some(new TopicPartition(p.topic, p.partition))
-  }
+  val timestampsToSearch = partitionInfos.map(tp => tp -> 
offsetSpec).toMap.asJava
 
-  /* Note that the value of the map can be null */
-  val partitionOffsets: collection.Map[TopicPartition, java.lang.Long] = 
listOffsetsTimestamp match {
-case ListOffsetsRequest.EARLIEST_TIMESTAMP => 
consumer.beginningOffsets(topicPartitions.asJava).asScala
-case ListOffsetsRequest.LATEST_TIMESTAMP => 
consumer.endOffsets(topicPartitions.asJava).asScala
-case _ =>
-  val timestampsToSearch = topicPartitions.map(tp => tp -> 
(listOffsetsTimestamp: java.lang.Long)).toMap.asJava
-  consumer.offsetsForTimes(timestampsToSearch).asScala.map { case (k, 
x) =>
-if (x == null) (k, null) else (k, x.offset: java.lang.Long)
-  }
+  val listOffsetsResult = client.listOffsets(timestampsToSearch)
+  val partitionOffsets = partitionInfos.flatMap { tp =>
+try {
+  val partitionInfo = listOffsetsResult.partitionResult(tp).get
+  Some((tp, partitionInfo.offset))

Review comment:
   additional bracket?

##
File path: core/src/main/scala/kafka/tools/GetOffsetShell.scala
##
@@ -103,59 +104,77 @@ object GetOffsetShell {
   throw new IllegalArgumentException("--topic-partitions cannot be used 
with --topic or --partitions")
 }
 
-val listOffsetsTimestamp = options.valueOf(timeOpt).longValue
+val offsetSpec = parseOffsetSpec(options.valueOf(timeOpt))
 
 val topicPartitionFilter = if (options.has(topicPartitionsOpt)) {
-  
createTopicPartitionFilterWithPatternList(options.valueOf(topicPartitionsOpt), 
excludeInternalTopics)
+  
createTopicPartitionFilterWithPatternList(options.valueOf(topicPartitionsOpt))
 } else {
-  val partitionIdsRequested = 
createPartitionSet(options.valueOf(partitionsOpt))
-
   createTopicPartitionFilterWithTopicAndPartitionPattern(
 if (options.has(topicOpt)) Some(options.valueOf(topicOpt)) else None,
-excludeInternalTopics,
-partitionIdsRequested
+options.valueOf(partitionsOpt)
   )
 }
 
 val config = if (options.has(commandConfigOpt))
   Utils.loadProps(options.valueOf(commandConfigOpt))
 else
   new Properties
-config.setProperty(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, brokerList)
-config.setProperty(ConsumerConfig.CLIENT_ID_CONFIG, clientId)
-val consumer = new KafkaConsumer(config, new ByteArrayDeserializer, new 
ByteArrayDeserializer)
+config.setProperty(AdminClientConfig.BOOTSTRAP_SERVERS_CONFIG, brokerList)
+config.setPro

[GitHub] [kafka] lkokhreidze commented on pull request #11837: KAFKA-6718 / Add rack awareness configurations to StreamsConfig

2022-03-16 Thread GitBox


lkokhreidze commented on pull request #11837:
URL: https://github.com/apache/kafka/pull/11837#issuecomment-1068813143


   Thanks @showuon for the feedback. Should be better now.
   Also reduced test timeout to 1.5 minutes.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] showuon commented on a change in pull request #11173: KAFKA-13509: Support max timestamp in GetOffsetShell

2022-03-16 Thread GitBox


showuon commented on a change in pull request #11173:
URL: https://github.com/apache/kafka/pull/11173#discussion_r827697359



##
File path: core/src/main/scala/kafka/tools/GetOffsetShell.scala
##
@@ -103,59 +104,77 @@ object GetOffsetShell {
   throw new IllegalArgumentException("--topic-partitions cannot be used 
with --topic or --partitions")
 }
 
-val listOffsetsTimestamp = options.valueOf(timeOpt).longValue
+val offsetSpec = parseOffsetSpec(options.valueOf(timeOpt))
 
 val topicPartitionFilter = if (options.has(topicPartitionsOpt)) {
-  
createTopicPartitionFilterWithPatternList(options.valueOf(topicPartitionsOpt), 
excludeInternalTopics)
+  
createTopicPartitionFilterWithPatternList(options.valueOf(topicPartitionsOpt))
 } else {
-  val partitionIdsRequested = 
createPartitionSet(options.valueOf(partitionsOpt))
-
   createTopicPartitionFilterWithTopicAndPartitionPattern(
 if (options.has(topicOpt)) Some(options.valueOf(topicOpt)) else None,
-excludeInternalTopics,
-partitionIdsRequested
+options.valueOf(partitionsOpt)
   )
 }
 
 val config = if (options.has(commandConfigOpt))
   Utils.loadProps(options.valueOf(commandConfigOpt))
 else
   new Properties
-config.setProperty(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, brokerList)
-config.setProperty(ConsumerConfig.CLIENT_ID_CONFIG, clientId)
-val consumer = new KafkaConsumer(config, new ByteArrayDeserializer, new 
ByteArrayDeserializer)
+config.setProperty(AdminClientConfig.BOOTSTRAP_SERVERS_CONFIG, brokerList)
+config.setProperty(AdminClientConfig.CLIENT_ID_CONFIG, clientId)
+val client = Admin.create(config)
 
 try {
-  val partitionInfos = listPartitionInfos(consumer, topicPartitionFilter)
+  val partitionInfos = listPartitionInfos(client, topicPartitionFilter, 
excludeInternalTopics)
 
   if (partitionInfos.isEmpty) {
 throw new IllegalArgumentException("Could not match any 
topic-partitions with the specified filters")
   }
 
-  val topicPartitions = partitionInfos.flatMap { p =>
-if (p.leader == null) {
-  System.err.println(s"Error: topic-partition 
${p.topic}:${p.partition} does not have a leader. Skip getting offsets")
-  None
-} else
-  Some(new TopicPartition(p.topic, p.partition))
-  }
+  val timestampsToSearch = partitionInfos.map(tp => tp -> 
offsetSpec).toMap.asJava
 
-  /* Note that the value of the map can be null */
-  val partitionOffsets: collection.Map[TopicPartition, java.lang.Long] = 
listOffsetsTimestamp match {
-case ListOffsetsRequest.EARLIEST_TIMESTAMP => 
consumer.beginningOffsets(topicPartitions.asJava).asScala
-case ListOffsetsRequest.LATEST_TIMESTAMP => 
consumer.endOffsets(topicPartitions.asJava).asScala
-case _ =>
-  val timestampsToSearch = topicPartitions.map(tp => tp -> 
(listOffsetsTimestamp: java.lang.Long)).toMap.asJava
-  consumer.offsetsForTimes(timestampsToSearch).asScala.map { case (k, 
x) =>
-if (x == null) (k, null) else (k, x.offset: java.lang.Long)
-  }
+  val listOffsetsResult = client.listOffsets(timestampsToSearch)
+  val partitionOffsets = partitionInfos.flatMap { tp =>
+try {
+  val partitionInfo = listOffsetsResult.partitionResult(tp).get
+  Some((tp, partitionInfo.offset))
+} catch {
+  case e: ExecutionException =>
+e.getCause match {
+  case _: LeaderNotAvailableException =>
+System.err.println(s"Error: topic-partition 
${tp.topic}:${tp.partition} does not have a leader. Skip getting offsets")

Review comment:
   nit: Skipping getting offsets for this topic-partition: 
${tp.topic}:${tp.partition} since it does not have a leader.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] showuon commented on a change in pull request #11173: KAFKA-13509: Support max timestamp in GetOffsetShell

2022-03-16 Thread GitBox


showuon commented on a change in pull request #11173:
URL: https://github.com/apache/kafka/pull/11173#discussion_r827697359



##
File path: core/src/main/scala/kafka/tools/GetOffsetShell.scala
##
@@ -103,59 +104,77 @@ object GetOffsetShell {
   throw new IllegalArgumentException("--topic-partitions cannot be used 
with --topic or --partitions")
 }
 
-val listOffsetsTimestamp = options.valueOf(timeOpt).longValue
+val offsetSpec = parseOffsetSpec(options.valueOf(timeOpt))
 
 val topicPartitionFilter = if (options.has(topicPartitionsOpt)) {
-  
createTopicPartitionFilterWithPatternList(options.valueOf(topicPartitionsOpt), 
excludeInternalTopics)
+  
createTopicPartitionFilterWithPatternList(options.valueOf(topicPartitionsOpt))
 } else {
-  val partitionIdsRequested = 
createPartitionSet(options.valueOf(partitionsOpt))
-
   createTopicPartitionFilterWithTopicAndPartitionPattern(
 if (options.has(topicOpt)) Some(options.valueOf(topicOpt)) else None,
-excludeInternalTopics,
-partitionIdsRequested
+options.valueOf(partitionsOpt)
   )
 }
 
 val config = if (options.has(commandConfigOpt))
   Utils.loadProps(options.valueOf(commandConfigOpt))
 else
   new Properties
-config.setProperty(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, brokerList)
-config.setProperty(ConsumerConfig.CLIENT_ID_CONFIG, clientId)
-val consumer = new KafkaConsumer(config, new ByteArrayDeserializer, new 
ByteArrayDeserializer)
+config.setProperty(AdminClientConfig.BOOTSTRAP_SERVERS_CONFIG, brokerList)
+config.setProperty(AdminClientConfig.CLIENT_ID_CONFIG, clientId)
+val client = Admin.create(config)
 
 try {
-  val partitionInfos = listPartitionInfos(consumer, topicPartitionFilter)
+  val partitionInfos = listPartitionInfos(client, topicPartitionFilter, 
excludeInternalTopics)
 
   if (partitionInfos.isEmpty) {
 throw new IllegalArgumentException("Could not match any 
topic-partitions with the specified filters")
   }
 
-  val topicPartitions = partitionInfos.flatMap { p =>
-if (p.leader == null) {
-  System.err.println(s"Error: topic-partition 
${p.topic}:${p.partition} does not have a leader. Skip getting offsets")
-  None
-} else
-  Some(new TopicPartition(p.topic, p.partition))
-  }
+  val timestampsToSearch = partitionInfos.map(tp => tp -> 
offsetSpec).toMap.asJava
 
-  /* Note that the value of the map can be null */
-  val partitionOffsets: collection.Map[TopicPartition, java.lang.Long] = 
listOffsetsTimestamp match {
-case ListOffsetsRequest.EARLIEST_TIMESTAMP => 
consumer.beginningOffsets(topicPartitions.asJava).asScala
-case ListOffsetsRequest.LATEST_TIMESTAMP => 
consumer.endOffsets(topicPartitions.asJava).asScala
-case _ =>
-  val timestampsToSearch = topicPartitions.map(tp => tp -> 
(listOffsetsTimestamp: java.lang.Long)).toMap.asJava
-  consumer.offsetsForTimes(timestampsToSearch).asScala.map { case (k, 
x) =>
-if (x == null) (k, null) else (k, x.offset: java.lang.Long)
-  }
+  val listOffsetsResult = client.listOffsets(timestampsToSearch)
+  val partitionOffsets = partitionInfos.flatMap { tp =>
+try {
+  val partitionInfo = listOffsetsResult.partitionResult(tp).get
+  Some((tp, partitionInfo.offset))
+} catch {
+  case e: ExecutionException =>
+e.getCause match {
+  case _: LeaderNotAvailableException =>
+System.err.println(s"Error: topic-partition 
${tp.topic}:${tp.partition} does not have a leader. Skip getting offsets")

Review comment:
   nit: Skipping getting offsets for this topic-partition: 
${tp.topic}:${tp.partition} since it does not have a leader right now.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] cadonna commented on a change in pull request #11837: KAFKA-6718 / Add rack awareness configurations to StreamsConfig

2022-03-16 Thread GitBox


cadonna commented on a change in pull request #11837:
URL: https://github.com/apache/kafka/pull/11837#discussion_r827776021



##
File path: 
streams/src/test/java/org/apache/kafka/streams/integration/RackAwarenessIntegrationTest.java
##
@@ -108,55 +117,122 @@ public void cleanup() throws IOException {
 kafkaStreamsInstances.clear();
 }
 
+@Test
+public void 
shouldThrowConfigExceptionWhenRackAwareAssignmentTagsExceedTheLimit() {

Review comment:
   I see. I asked some other people and apparently the size of the 
subscription is not that much of a concern currently. I would still propose to 
keep the test with 100 instead of 200 tasks. With 200 tasks, the test takes ~10 
s on my laptop whereas with 100 tasks it takes ~6 s. I think 6 s is acceptable 
and might also reduce flakiness. @lkokhreidze and @showuon WDYT?




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] lkokhreidze commented on a change in pull request #11837: KAFKA-6718 / Add rack awareness configurations to StreamsConfig

2022-03-16 Thread GitBox


lkokhreidze commented on a change in pull request #11837:
URL: https://github.com/apache/kafka/pull/11837#discussion_r82681



##
File path: 
streams/src/test/java/org/apache/kafka/streams/integration/RackAwarenessIntegrationTest.java
##
@@ -108,55 +117,122 @@ public void cleanup() throws IOException {
 kafkaStreamsInstances.clear();
 }
 
+@Test
+public void 
shouldThrowConfigExceptionWhenRackAwareAssignmentTagsExceedTheLimit() {

Review comment:
   Sounds good to me. Will push changes with 100 stateful tasks for the 
rebalancing test.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] lkokhreidze commented on a change in pull request #11837: KAFKA-6718 / Add rack awareness configurations to StreamsConfig

2022-03-16 Thread GitBox


lkokhreidze commented on a change in pull request #11837:
URL: https://github.com/apache/kafka/pull/11837#discussion_r827786084



##
File path: 
streams/src/test/java/org/apache/kafka/streams/integration/RackAwarenessIntegrationTest.java
##
@@ -108,55 +117,122 @@ public void cleanup() throws IOException {
 kafkaStreamsInstances.clear();
 }
 
+@Test
+public void 
shouldThrowConfigExceptionWhenRackAwareAssignmentTagsExceedTheLimit() {

Review comment:
   Pushed changes to reduce number of stateful tasks to 100
   
https://github.com/apache/kafka/pull/11837/commits/e6f717229acb4850aec1f33e13cd88cdc3ccb4bb




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] showuon commented on a change in pull request #11837: KAFKA-6718 / Add rack awareness configurations to StreamsConfig

2022-03-16 Thread GitBox


showuon commented on a change in pull request #11837:
URL: https://github.com/apache/kafka/pull/11837#discussion_r827825740



##
File path: 
streams/src/test/java/org/apache/kafka/streams/integration/RackAwarenessIntegrationTest.java
##
@@ -108,55 +117,122 @@ public void cleanup() throws IOException {
 kafkaStreamsInstances.clear();
 }
 
+@Test
+public void 
shouldThrowConfigExceptionWhenRackAwareAssignmentTagsExceedTheLimit() {

Review comment:
   +1 for 100 tasks. Thanks.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] mimaison commented on pull request #11885: MINOR: Bump latest 3.0 version to 3.0.1

2022-03-16 Thread GitBox


mimaison commented on pull request #11885:
URL: https://github.com/apache/kafka/pull/11885#issuecomment-1068980774


   I confirmed the artifacts are there, merging 


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] mimaison edited a comment on pull request #11885: MINOR: Bump latest 3.0 version to 3.0.1

2022-03-16 Thread GitBox


mimaison edited a comment on pull request #11885:
URL: https://github.com/apache/kafka/pull/11885#issuecomment-1068980774


   Thanks @mjsax, I confirmed the artifacts are there, merging 


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] mimaison merged pull request #11885: MINOR: Bump latest 3.0 version to 3.0.1

2022-03-16 Thread GitBox


mimaison merged pull request #11885:
URL: https://github.com/apache/kafka/pull/11885


   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] mimaison opened a new pull request #11906: MINOR: Doc updates for Kafka 3.0.1

2022-03-16 Thread GitBox


mimaison opened a new pull request #11906:
URL: https://github.com/apache/kafka/pull/11906


   
   
   ### Committer Checklist (excluded from commit message)
   - [ ] Verify design and implementation 
   - [ ] Verify test coverage and CI build status
   - [ ] Verify documentation (including upgrade notes)
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] lkokhreidze commented on a change in pull request #11837: KAFKA-6718 / Add rack awareness configurations to StreamsConfig

2022-03-16 Thread GitBox


lkokhreidze commented on a change in pull request #11837:
URL: https://github.com/apache/kafka/pull/11837#discussion_r827942693



##
File path: 
streams/src/test/java/org/apache/kafka/streams/integration/RackAwarenessIntegrationTest.java
##
@@ -108,55 +117,122 @@ public void cleanup() throws IOException {
 kafkaStreamsInstances.clear();
 }
 
+@Test
+public void 
shouldThrowConfigExceptionWhenRackAwareAssignmentTagsExceedTheLimit() {

Review comment:
   Even with 100 tasks tests on CI failed with timeout. I've bumped timeout 
to 1.5 minutes in the latest commit.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] showuon commented on pull request #11837: KAFKA-6718 / Add rack awareness configurations to StreamsConfig

2022-03-16 Thread GitBox


showuon commented on pull request #11837:
URL: https://github.com/apache/kafka/pull/11837#issuecomment-1069076197


   @lkokhreidze , let's make sure the tests stable before we merge this final 
piece of PRs. Thanks.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] cadonna commented on a change in pull request #11837: KAFKA-6718 / Add rack awareness configurations to StreamsConfig

2022-03-16 Thread GitBox


cadonna commented on a change in pull request #11837:
URL: https://github.com/apache/kafka/pull/11837#discussion_r827955917



##
File path: 
streams/src/test/java/org/apache/kafka/streams/integration/RackAwarenessIntegrationTest.java
##
@@ -108,55 +117,122 @@ public void cleanup() throws IOException {
 kafkaStreamsInstances.clear();
 }
 
+@Test
+public void 
shouldThrowConfigExceptionWhenRackAwareAssignmentTagsExceedTheLimit() {

Review comment:
   If after your PR the tests are still flaky, let's just test with 6 tasks 
and merge this PR. You can then add a system test for the scenario with larger 
subscriptions in a follow-up PR.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] lkokhreidze commented on a change in pull request #11837: KAFKA-6718 / Add rack awareness configurations to StreamsConfig

2022-03-16 Thread GitBox


lkokhreidze commented on a change in pull request #11837:
URL: https://github.com/apache/kafka/pull/11837#discussion_r827960149



##
File path: 
streams/src/test/java/org/apache/kafka/streams/integration/RackAwarenessIntegrationTest.java
##
@@ -108,55 +117,122 @@ public void cleanup() throws IOException {
 kafkaStreamsInstances.clear();
 }
 
+@Test
+public void 
shouldThrowConfigExceptionWhenRackAwareAssignmentTagsExceedTheLimit() {

Review comment:
   Thanks @cadonna 
   If no objections, I will do that now. The fact that it times out in 1 minute 
doesn't give much confidence that it won't be flaky.
   I will follow up the system test.
   fyi @showuon 




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] lkokhreidze commented on pull request #11837: KAFKA-6718 / Add rack awareness configurations to StreamsConfig

2022-03-16 Thread GitBox


lkokhreidze commented on pull request #11837:
URL: https://github.com/apache/kafka/pull/11837#issuecomment-1069087407


   @showuon 
   I've reduced number of stateful tasks to 9. This should eliminate the 
flakiness. 
   I will do the system test in the follow-up PR.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] showuon commented on pull request #10367: KAFKA-12495: allow consecutive revoke in incremental cooperative assignor in connector

2022-03-16 Thread GitBox


showuon commented on pull request #10367:
URL: https://github.com/apache/kafka/pull/10367#issuecomment-1069090526


   @C0urante , thanks for your comments. TBH, I need some time to revisit the 
code (since it's long time ago...), and answering your comments later. Thank 
you.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] cadonna commented on pull request #11837: KAFKA-6718 / Add rack awareness configurations to StreamsConfig

2022-03-16 Thread GitBox


cadonna commented on pull request #11837:
URL: https://github.com/apache/kafka/pull/11837#issuecomment-1069103952


   @lkokhreidze We also need some updates to the docs. The sections to update 
are 
https://kafka.apache.org/31/documentation/streams/architecture#streams_architecture_recovery
 and maybe also 
https://kafka.apache.org/31/documentation/streams/developer-guide/config-streams.html.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] cadonna edited a comment on pull request #11837: KAFKA-6718 / Add rack awareness configurations to StreamsConfig

2022-03-16 Thread GitBox


cadonna edited a comment on pull request #11837:
URL: https://github.com/apache/kafka/pull/11837#issuecomment-1069103952


   @lkokhreidze We also need some updates to the docs. The sections to update 
are 
https://kafka.apache.org/31/documentation/streams/architecture#streams_architecture_recovery
 and maybe also 
https://kafka.apache.org/31/documentation/streams/developer-guide/config-streams.html.
 Not in this PR, please a new PR for the docs.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] lkokhreidze commented on pull request #11837: KAFKA-6718 / Add rack awareness configurations to StreamsConfig

2022-03-16 Thread GitBox


lkokhreidze commented on pull request #11837:
URL: https://github.com/apache/kafka/pull/11837#issuecomment-1069108584


   Thanks @cadonna ,
   I assume it's okay to do it in a follow-up PR?


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] cadonna commented on pull request #11837: KAFKA-6718 / Add rack awareness configurations to StreamsConfig

2022-03-16 Thread GitBox


cadonna commented on pull request #11837:
URL: https://github.com/apache/kafka/pull/11837#issuecomment-1069115152


   Yes, doing it in a follow-up PR is totally fine!


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] mimaison commented on a change in pull request #11471: MINOR: Replace EasyMock with Mockito in connect:file

2022-03-16 Thread GitBox


mimaison commented on a change in pull request #11471:
URL: https://github.com/apache/kafka/pull/11471#discussion_r828078337



##
File path: 
connect/file/src/test/java/org/apache/kafka/connect/file/FileStreamSourceTaskTest.java
##
@@ -220,17 +224,22 @@ public void testMissingFile() throws InterruptedException 
{
 task.stop();
 }
 
+@Test
 public void testInvalidFile() throws InterruptedException {
 config.put(FileStreamSourceConnector.FILE_CONFIG, "bogusfilename");
 task.start(config);
 // Currently the task retries indefinitely if the file isn't found, 
but shouldn't return any data.
-for (int i = 0; i < 100; i++)
+for (int i = 0; i < 3; i++)
 assertNull(task.poll());
 }
 
-
 private void expectOffsetLookupReturnNone() {
-
EasyMock.expect(context.offsetStorageReader()).andReturn(offsetStorageReader);
-EasyMock.expect(offsetStorageReader.offset(EasyMock.>anyObject())).andReturn(null);
+when(context.offsetStorageReader()).thenReturn(offsetStorageReader);
+when(offsetStorageReader.offset(anyMap())).thenReturn(null);
+}
+
+private void verifyAll() {

Review comment:
   Most tests end up calling this method twice, once explicitly and once 
via `teardown()`. Let's pick one way and stick with it.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] mimaison commented on a change in pull request #11471: MINOR: Replace EasyMock with Mockito in connect:file

2022-03-16 Thread GitBox


mimaison commented on a change in pull request #11471:
URL: https://github.com/apache/kafka/pull/11471#discussion_r828080748



##
File path: 
connect/file/src/test/java/org/apache/kafka/connect/file/FileStreamSourceTaskTest.java
##
@@ -204,8 +210,6 @@ private void writeTimesAndFlush(OutputStream os, int times, 
byte[] line) throws
 
 @Test
 public void testMissingFile() throws InterruptedException {

Review comment:
   Can we use this opportunity to improve the name of this test? 
`testMissingFile` sounds like we're testing an error case while this is testing 
using stdin instead of file.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[jira] [Commented] (KAFKA-13739) Sliding window without grace not working

2022-03-16 Thread Leah Thomas (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-13739?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17507674#comment-17507674
 ] 

Leah Thomas commented on KAFKA-13739:
-

I think your analysis makes sense and the change to make `windowEnd >= 
closeTime` makes sense to me _if_ we want to support a grace period of 0. The 
alternative here would be to strictly prohibit a grace period of 0 and require 
something larger.

 

I can't think of any reason to prevent a grace period of 0 so I think the above 
change seems good to me. It would be worth making that change and running the 
full suite of tests to make sure there aren't any assumptions that I can't 
think of right now

> Sliding window without grace not working
> 
>
> Key: KAFKA-13739
> URL: https://issues.apache.org/jira/browse/KAFKA-13739
> Project: Kafka
>  Issue Type: Bug
>  Components: streams
>Affects Versions: 3.1.0
>Reporter: bounkong khamphousone
>Priority: Minor
>
> Hi everyone! I would like to understand why KafkaStreams DSL offer the 
> ability to express a SlidingWindow with no grace period but seems that it 
> doesn't work. [confluent's 
> site|https://docs.confluent.io/platform/current/streams/developer-guide/dsl-api.html#sliding-time-windows]
>  state that grace period is required and with the deprecated method, it's 
> default to 24 hours.
> Doing a basic sliding window with a count, if I set grace period to 1 ms, 
> expected output is done. Based on the sliding window documentation, lower and 
> upper bounds are inclusive.
> If I set grace period to 0 ms, I can see that record is not skipped at 
> KStreamSlidingWindowAggregate(l.126) but when we try to create the window and 
> push the event in KStreamSlidingWindowAggregate#createWindows we call the 
> method updateWindowAndForward(l.417). This method (l.468) check that 
> {{{}windowEnd > closeTime{}}}.
> closeTime is defined as {{observedStreamTime - window.gracePeriodMs}} 
> (Sliding window configuration)
> windowEnd is defined as {{{}inputRecordTimestamp{}}}.
>  
> For a first event with a record timestamp, we can assume that 
> observedStreamTime is equal to inputRecordTimestamp.
>  
> Therefore, closeTime is {{inputRecordTimestamp - 0}} (gracePeriodMS) which 
> results to {{{}inputRecordTimestamp{}}}.
> If we go back to the check done in {{updateWindowAndForward}} method, then we 
> have inputRecordTimestamp > inputRecordTimestamp which is always false. The 
> record is then skipped for record's own window.
> Stating that lower and upper bounds are inclusive, I would have expected the 
> event to be pushed in the store and forwarded. Hence, the check would be 
> {{{}windowEnd >= closeTime{}}}.
>  
> Is it a bug or is it intended ?
> Thanks in advance for your explanations!
> Best regards!



--
This message was sent by Atlassian Jira
(v8.20.1#820001)


[GitHub] [kafka] C0urante edited a comment on pull request #11894: KAFKA-13613: Remove hard dependency on HmacSHA256 algorithm for Connect

2022-03-16 Thread GitBox


C0urante edited a comment on pull request #11894:
URL: https://github.com/apache/kafka/pull/11894#issuecomment-1067359447


   Note that, while this aims to enable Connect workers to be brought up on 
JVMs that do not provide the `HmacSHA256` algorithm, it does not add support 
for Connect to be built from source on these JVMs.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] sarat-kakarla opened a new pull request #11907: Kip 714 process client telemetry data

2022-03-16 Thread GitBox


sarat-kakarla opened a new pull request #11907:
URL: https://github.com/apache/kafka/pull/11907


   *More detailed description of your change,
   if necessary. The PR title and PR message become
   the squashed commit message, so use a separate
   comment to ping reviewers.*
   
   *Summary of testing strategy (including rationale)
   for the feature or bug fix. Unit and/or integration
   tests are expected for any behaviour change and
   system tests should be considered for larger changes.*
   
   ### Committer Checklist (excluded from commit message)
   - [ ] Verify design and implementation 
   - [ ] Verify test coverage and CI build status
   - [ ] Verify documentation (including upgrade notes)
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] sarat-kakarla closed pull request #11907: Kip 714 process client telemetry data

2022-03-16 Thread GitBox


sarat-kakarla closed pull request #11907:
URL: https://github.com/apache/kafka/pull/11907


   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[jira] [Commented] (KAFKA-13730) OAuth access token validation fails if it does not contain the "sub" claim

2022-03-16 Thread Daniel Fonai (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-13730?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17507682#comment-17507682
 ] 

Daniel Fonai commented on KAFKA-13730:
--

[~kirktrue] thank you for the quick response. Please let me know if you need 
anything from my side.

> OAuth access token validation fails if it does not contain the "sub" claim
> --
>
> Key: KAFKA-13730
> URL: https://issues.apache.org/jira/browse/KAFKA-13730
> Project: Kafka
>  Issue Type: Bug
>  Components: clients
>Affects Versions: 3.1.0
>Reporter: Daniel Fonai
>Assignee: Kirk True
>Priority: Minor
>
> Client authentication fails, when configured to use OAuth and the JWT access 
> token does {*}not contain the sub claim{*}. This issue was discovered while 
> testing Kafka integration with Ping Identity OAuth server. According to 
> Ping's 
> [documentation|https://apidocs.pingidentity.com/pingone/devguide/v1/api/#access-tokens-and-id-tokens]:
> {quote}sub – A string that specifies the identifier for the authenticated 
> user. This claim is not present for client_credentials tokens.
> {quote}
> In this case Kafka broker rejects the token regardless of the 
> [sasl.oauthbearer.sub.claim.name|https://kafka.apache.org/documentation/#brokerconfigs_sasl.oauthbearer.sub.claim.name]
>  property value.
>  
> 
>  
> Steps to reproduce:
> 1. Client configuration:
> {noformat}
> security.protocol=SASL_PLAINTEXT
> sasl.mechanism=OAUTHBEARER
> sasl.login.callback.handler.class=org.apache.kafka.common.security.oauthbearer.secured.OAuthBearerLoginCallbackHandler
> sasl.oauthbearer.token.endpoint.url=https://oauth.server.fqdn/token/endpoint
> sasl.jaas.config=org.apache.kafka.common.security.oauthbearer.OAuthBearerLoginModule
>  required\
>  clientId="kafka-client"\
>  clientSecret="kafka-client-secret";
> sasl.oauthbearer.sub.claim.name=client_id # claim name for the principal to 
> be extracted from, needed for client side validation too
> {noformat}
> 2. Broker configuration:
> {noformat}
> sasl.enabled.mechanisms=...,OAUTHBEARER
> listener.name.sasl_plaintext.oauthbearer.sasl.jaas.config=org.apache.kafka.common.security.oauthbearer.OAuthBearerLoginModule
>  required;
> listener.name.sasl_plaintext.oauthbearer.sasl.server.callback.handler.class=org.apache.kafka.common.security.oauthbearer.secured.OAuthBearerValidatorCallbackHandler
> sasl.oauthbearer.jwks.endpoint.url=https://oauth.server.fqdn/jwks/endpoint
> sasl.oauthbearer.expected.audience=oauth-audience # based on OAuth server 
> setup
> sasl.oauthbearer.sub.claim.name=client_id # claim name for the principal to 
> be extracted from
> {noformat}
> 3. Try to perform some client operation:
> {noformat}
> kafka-topics --bootstrap-server `hostname`:9092 --list --command-config 
> oauth-client.properties
> {noformat}
> Result:
> Client authentication fails due to invalid access token.
>  - client log:
> {noformat}
> [2022-03-11 16:21:20,461] ERROR [AdminClient clientId=adminclient-1] 
> Connection to node -1 (localhost/127.0.0.1:9092) failed authentication due 
> to: {"status":"invalid_token"} (org.apache.kafka.clients.NetworkClient)
> [2022-03-11 16:21:20,463] WARN [AdminClient clientId=adminclient-1] Metadata 
> update failed due to authentication error 
> (org.apache.kafka.clients.admin.internals.AdminMetadataManager)
> org.apache.kafka.common.errors.SaslAuthenticationException: 
> {"status":"invalid_token"}
> Error while executing topic command : {"status":"invalid_token"}
> [2022-03-11 16:21:20,468] ERROR 
> org.apache.kafka.common.errors.SaslAuthenticationException: 
> {"status":"invalid_token"}
>  (kafka.admin.TopicCommand$)
> {noformat}
>  - broker log:
> {noformat}
> [2022-03-11 16:21:20,150] WARN Could not validate the access token: JWT 
> (claims->{"client_id":"...","iss":"...","iat":1647012079,"exp":1647015679,"aud":[...],"env":"...","org":"..."})
>  rejected due to invalid claims or other invalid content. Additional details: 
> [[14] No Subject (sub) claim is present.] 
> (org.apache.kafka.common.security.oauthbearer.secured.OAuthBearerValidatorCallbackHandler)
> org.apache.kafka.common.security.oauthbearer.secured.ValidateException: Could 
> not validate the access token: JWT 
> (claims->{"client_id":"...","iss":"...","iat":1647012079,"exp":1647015679,"aud":[...],"env":"...","org":"..."})
>  rejected due to invalid claims or other invalid content. Additional details: 
> [[14] No Subject (sub) claim is present.]
>   at 
> org.apache.kafka.common.security.oauthbearer.secured.ValidatorAccessTokenValidator.validate(ValidatorAccessTokenValidator.java:159)
>   at 
> org.apache.kafka.common.security.oauthbearer.secured.OAuthBearerValidatorCallbackHandler.handleValidatorCallback(OAuthBearerValidatorCallbackHandler.java:184)
> 

[GitHub] [kafka] mumrah commented on a change in pull request #11893: KAFKA-13682; KRaft Controller auto preferred leader election

2022-03-16 Thread GitBox


mumrah commented on a change in pull request #11893:
URL: https://github.com/apache/kafka/pull/11893#discussion_r828108377



##
File path: 
metadata/src/main/java/org/apache/kafka/controller/QuorumController.java
##
@@ -147,6 +147,7 @@
 private int defaultNumPartitions = 1;
 private ReplicaPlacer replicaPlacer = new StripedReplicaPlacer(new 
Random());
 private long snapshotMaxNewRecordBytes = Long.MAX_VALUE;
+private long leaderImbalanceCheckIntervalNs = -1;

Review comment:
   We use `Optional` for a few other things in QuroumController, maybe we 
can use OptionalLong here instead of -1?

##
File path: core/src/main/scala/kafka/server/ControllerServer.scala
##
@@ -158,20 +158,30 @@ class ControllerServer(
   alterConfigPolicy = Option(config.
 getConfiguredInstance(AlterConfigPolicyClassNameProp, 
classOf[AlterConfigPolicy]))
 
-  val controllerBuilder = new QuorumController.Builder(config.nodeId, 
metaProperties.clusterId).
-setTime(time).
-setThreadNamePrefix(threadNamePrefixAsString).
-setConfigSchema(configSchema).
-setRaftClient(raftManager.client).
-setDefaultReplicationFactor(config.defaultReplicationFactor.toShort).
-setDefaultNumPartitions(config.numPartitions.intValue()).
-
setSessionTimeoutNs(TimeUnit.NANOSECONDS.convert(config.brokerSessionTimeoutMs.longValue(),
-  TimeUnit.MILLISECONDS)).
-setSnapshotMaxNewRecordBytes(config.metadataSnapshotMaxNewRecordBytes).
-setMetrics(new 
QuorumControllerMetrics(KafkaYammerMetrics.defaultRegistry())).
-setCreateTopicPolicy(createTopicPolicy.asJava).
-setAlterConfigPolicy(alterConfigPolicy.asJava).
-setConfigurationValidator(new ControllerConfigurationValidator())
+  val controllerBuilder = {
+val leaderImbalanceCheckIntervalNs = if 
(config.autoLeaderRebalanceEnable) {

Review comment:
   We should probably guard against a user setting this to zero. Otherwise, 
we'll thrash on the event queue and maybe even have a live-lock.

##
File path: 
metadata/src/main/java/org/apache/kafka/controller/QuorumController.java
##
@@ -953,6 +971,48 @@ private void cancelMaybeFenceReplicas() {
 queue.cancelDeferred(MAYBE_FENCE_REPLICAS);
 }
 
+private static final int MAX_ELECTIONS_PER_IMBALANCE = 1_000;
+
+private void maybeScheduleNextBalancePartitionLeaders() {
+final String maybeBalancePartitionLeaders = 
"maybeBalancePartitionLeaders";
+
+if (imbalancedScheduled != ImbalanceSchedule.SCHEDULED &&
+leaderImbalanceCheckIntervalNs >= 0 &&
+replicationControl.arePartitionLeadersImbalanced()) {
+
+log.debug(
+"Scheduling deferred event for {} because scheduled ({}), 
checkIntervalNs ({}) and isImbalanced ({})",
+maybeBalancePartitionLeaders,
+imbalancedScheduled,
+leaderImbalanceCheckIntervalNs,
+replicationControl.arePartitionLeadersImbalanced()
+);
+long delayNs = time.nanoseconds();
+if (imbalancedScheduled == ImbalanceSchedule.DEFERRED) {
+delayNs += leaderImbalanceCheckIntervalNs;
+}
+scheduleDeferredWriteEvent(maybeBalancePartitionLeaders, delayNs, 
() -> {

Review comment:
   Looking down in the event code, I see this comment when we are handling 
deferred events.
   
   ```
   // The deferred event is ready to run.  
Prepend it to the
   // queue.  (The value for deferred events is 
a schedule time
   // rather than a timeout.)
   
   ```
   
   In the case of the IMMEDIATE state where we set deadline to the current 
time, this might mean that it gets prepended to the queue and run before other 
waiting write events. Is this what we want here? Maybe we should schedule a 
regular write event for the IMMEDIATE case.

##
File path: 
metadata/src/main/java/org/apache/kafka/controller/QuorumController.java
##
@@ -1197,6 +1257,25 @@ private void resetState() {
  */
 private long newBytesSinceLastSnapshot = 0;
 
+/**
+ * How long to delay partition leader balancing operations.
+ */
+private final long leaderImbalanceCheckIntervalNs;
+
+private static enum ImbalanceSchedule {
+// Leader balancing has been scheduled
+SCHEDULED,
+// Leader balancing should be scheduled in the future
+DEFERRED,

Review comment:
   If I understand correctly, DEFERRED is like our "idle" state here. That 
is, if no automatic preferred leader election is happening, we'll be in the 
DEFERRED state until an imbalance occurs. Is that right?

##
File path: 
metadata/src/main/java/org/apache/kafka/controller/QuorumController.java
##
@@ -1197,6 +1257,25 @@ private v

[GitHub] [kafka] mumrah merged pull request #11901: KAFKA-13741 Don't generate Uuid with a leading "-"

2022-03-16 Thread GitBox


mumrah merged pull request #11901:
URL: https://github.com/apache/kafka/pull/11901


   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] mumrah commented on a change in pull request #11893: KAFKA-13682; KRaft Controller auto preferred leader election

2022-03-16 Thread GitBox


mumrah commented on a change in pull request #11893:
URL: https://github.com/apache/kafka/pull/11893#discussion_r828167249



##
File path: 
metadata/src/main/java/org/apache/kafka/controller/ReplicationControlManager.java
##
@@ -341,22 +359,25 @@ public void replay(RemoveTopicRecord record) {
 // Delete the configurations associated with this topic.
 configurationControl.deleteTopicConfigs(topic.name);
 
-// Remove the entries for this topic in brokersToIsrs.
-for (PartitionRegistration partition : topic.parts.values()) {
+for (Map.Entry entry : 
topic.parts.entrySet()) {
+int partitionId = entry.getKey();
+PartitionRegistration partition = entry.getValue();
+
+// Remove the entries for this topic in brokersToIsrs.
 for (int i = 0; i < partition.isr.length; i++) {
 brokersToIsrs.removeTopicEntryForBroker(topic.id, 
partition.isr[i]);
 }
-if (partition.leader != partition.preferredReplica()) {
-preferredReplicaImbalanceCount.decrement();
-}
+
+imbalancedPartitions.remove(new TopicIdPartition(record.topicId(), 
partitionId));
+

Review comment:
   nit: whitespace

##
File path: 
metadata/src/main/java/org/apache/kafka/controller/ReplicationControlManager.java
##
@@ -1016,6 +1042,55 @@ ApiError electLeader(String topic, int partitionId, 
ElectionType electionType,
 return ControllerResult.of(records, null);
 }
 
+boolean arePartitionLeadersImbalanced() {
+return !imbalancedPartitions.isEmpty();
+}
+
+/**
+ * Attempt to elect a preferred leader for all topic partitions that a 
leader that is not the preferred replica.
+ *
+ * The response() method in the return object is true if this method 
returned without electing all possible preferred replicas.
+ * The quorum controlller should reschedule this operation immediately if 
it is true.
+ *
+ * @return All of the election records and if there may be more available 
preferred replicas to elect as leader
+ */
+ControllerResult maybeBalancePartitionLeaders() {
+List records = new ArrayList<>();
+
+boolean rescheduleImmidiately = false;
+for (TopicIdPartition topicPartition : imbalancedPartitions) {
+if (records.size() >= maxElectionsPerImbalance) {
+rescheduleImmidiately = true;
+break;
+}
+
+TopicControlInfo topic = topics.get(topicPartition.topicId());
+if (topic == null) {
+log.error("Skipping unknown imbalanced topic {}", 
topicPartition);
+continue;
+}
+
+PartitionRegistration partition = 
topic.parts.get(topicPartition.partitionId());
+if (partition == null) {
+log.error("Skipping unknown imbalanced partition {}", 
topicPartition);
+continue;
+}

Review comment:
   Hmm, I think if we hit one of these cases (which seems unlikely), we'll 
never remove the item from `imbalancedPartitions` since we won't get a 
PartitionChangeRecord. I'm not really sure how we can properly handle this 
though since we don't want to mutate `imbalancedPartitions` outside of a 
"relay" method. WDYT?




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] mjsax commented on pull request #11211: KAFKA-12960: Enforcing strict retention time for WindowStore and Sess…

2022-03-16 Thread GitBox


mjsax commented on pull request #11211:
URL: https://github.com/apache/kafka/pull/11211#issuecomment-1069342863


   > the computation is done again on the cached data
   
   What computation do you mean? In the end, we just iterate? And for both 
cases (also with cache) we need to have an interator for the inner store, 
because the cache might not have all data -- thus, we need to "peek" into both 
iterators to see what the next key is to "merge" both together correctly -- if 
the key exists in the underlying store and the cache, we return the data from 
the cache, but we still need to skip over the key in the underlying store to 
ensure the merging is done correctly. (Not sure if your question was referring 
to this part of the "merge logic"?)


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] lkokhreidze commented on pull request #11837: KAFKA-6718 / Add rack awareness configurations to StreamsConfig

2022-03-16 Thread GitBox


lkokhreidze commented on pull request #11837:
URL: https://github.com/apache/kafka/pull/11837#issuecomment-1069347833


   FYI, if I'm looking at test results correctly, seems like latest failures 
are not related.
   
   ```
   Build / JDK 8 and Scala 2.12 / shouldRestoreState – 
org.apache.kafka.streams.integration.PurgeRepartitionTopicIntegrationTest
   1m 3s
   Build / JDK 17 and Scala 2.13 / [1] Type=Raft, 
Name=testElectionResultOutput, Security=PLAINTEXT – 
kafka.admin.LeaderElectionCommandTest
   1m 10s
   Build / JDK 17 and Scala 2.13 / shouldRestoreState – 
org.apache.kafka.streams.integration.PurgeRepartitionTopicIntegrationTest
   1m 2s
   Build / JDK 17 and Scala 2.13 / shouldRestoreState – 
org.apache.kafka.streams.integration.PurgeRepartitionTopicIntegrationTest
   1m 3s
   ```


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] guozhangwang commented on pull request #11796: KAFKA-13152: Replace "buffered.records.per.partition" with "input.buffer.max.bytes"

2022-03-16 Thread GitBox


guozhangwang commented on pull request #11796:
URL: https://github.com/apache/kafka/pull/11796#issuecomment-1069361318


   @vamossagar12 could you resolve the conflicts before I re-trigger jenkins 
again?


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] cadonna merged pull request #11837: KAFKA-6718 / Add rack awareness configurations to StreamsConfig

2022-03-16 Thread GitBox


cadonna merged pull request #11837:
URL: https://github.com/apache/kafka/pull/11837


   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[jira] [Commented] (KAFKA-7509) Kafka Connect logs unnecessary warnings about unused configurations

2022-03-16 Thread Chris Egerton (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-7509?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17507768#comment-17507768
 ] 

Chris Egerton commented on KAFKA-7509:
--

Bumping this again as the problem continues to persist for Connect (and 
possibly other projects).

 

While I agree that there's value in logging warnings about unrecognized values 
in some contexts, that value is lost when the false positive rate for these 
warnings becomes too high, which appears to be the case for Connect. As 
[~rhauch] has noted with his exhaustive research into this problem space, it's 
basically impossible at the moment for Connect to preemptively filter out 
irrelevant values for some of the Kafka clients that it creates. And, as 
[~mjsax] has noted, it's not an option to unconditionally demote these log 
messages to {{DEBUG}} level for all Kafka clients, since there are plenty of 
projects that do not generate these kinds of false positives, in which case 
these messages do serve as effective warnings that there may be a typo in the 
client's config.

 

One compromise could be to do the following:
 # Add a new {{AbstractConfig::logUnused}} variant that operates at {{DEBUG}} 
level instead of {{WARN}} (exact details negotiable; this could be an 
overloading of the existing {{logUnused}} method that accepts a boolean or some 
other parameter to toggle the log level used, or a separate {{debugUnused}} 
method that's hardcoded to use {{{}DEBUG{}}}-level logging, or something else 
entirely).
 # Add a new {{warn.on.unused}} boolean property to the producer, consumer, and 
admin client API, with a default of {{{}true{}}}. When enabled, the current 
behavior of logging unrecognized values at {{WARN}} level will take place. When 
disabled (i.e., set to {{{}false{}}}), unrecognized values will be instead 
logged at {{DEBUG}} level, using the new {{AbstractConfig}} API from part 1.
 # Modify Connect to set {{warn.on.unused}} to {{false}} by default for all 
Kafka clients that it uses for its internal topics (and possibly in other 
contexts where the rate of false positives is also high). Allow users to 
override this default by explicitly specifying a value for the property in the 
Connect worker config.

 

This would be completely backwards compatible outside of Connect but also allow 
for other projects to opt in to a reduction in {{{}WARN{}}}-level log spam, and 
within Connect, would greatly reduce the false positive rate for 
{{{}WARN{}}}-level messages.

 

Obviously, it would require a KIP, but I'm happy to write one if this seems 
like a reasonable high-level approach. Thoughts?

> Kafka Connect logs unnecessary warnings about unused configurations
> ---
>
> Key: KAFKA-7509
> URL: https://issues.apache.org/jira/browse/KAFKA-7509
> Project: Kafka
>  Issue Type: Improvement
>  Components: clients, KafkaConnect
>Affects Versions: 0.10.2.0
>Reporter: Randall Hauch
>Assignee: Randall Hauch
>Priority: Major
>
> When running Connect, the logs contain quite a few warnings about "The 
> configuration '{}' was supplied but isn't a known config." This occurs when 
> Connect creates producers, consumers, and admin clients, because the 
> AbstractConfig is logging unused configuration properties upon construction. 
> It's complicated by the fact that the Producer, Consumer, and AdminClient all 
> create their own AbstractConfig instances within the constructor, so we can't 
> even call its {{ignore(String key)}} method.
> See also KAFKA-6793 for a similar issue with Streams.
> There are no arguments in the Producer, Consumer, or AdminClient constructors 
> to control  whether the configs log these warnings, so a simpler workaround 
> is to only pass those configuration properties to the Producer, Consumer, and 
> AdminClient that the ProducerConfig, ConsumerConfig, and AdminClientConfig 
> configdefs know about.



--
This message was sent by Atlassian Jira
(v8.20.1#820001)


[jira] [Comment Edited] (KAFKA-7509) Kafka Connect logs unnecessary warnings about unused configurations

2022-03-16 Thread Chris Egerton (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-7509?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17507768#comment-17507768
 ] 

Chris Egerton edited comment on KAFKA-7509 at 3/16/22, 5:41 PM:


Bumping this again as the problem continues to persist for Connect (and 
possibly other projects).

While I agree that there's value in logging warnings about unrecognized values 
in some contexts, that value is lost when the false positive rate for these 
warnings becomes too high, which appears to be the case for Connect. As 
[~rhauch] has noted with his exhaustive research into this problem space, it's 
basically impossible at the moment for Connect to preemptively filter out 
irrelevant values for some of the Kafka clients that it creates. And, as 
[~mjsax] has noted, it's not an option to unconditionally demote these log 
messages to {{DEBUG}} level for all Kafka clients, since there are plenty of 
projects that do not generate these kinds of false positives, in which case 
these messages do serve as effective warnings that there may be a typo in the 
client's config.

One compromise could be to do the following:
 # Add a new {{AbstractConfig::logUnused}} variant that operates at {{DEBUG}} 
level instead of {{WARN}} (exact details negotiable; this could be an 
overloading of the existing {{logUnused}} method that accepts a boolean or some 
other parameter to toggle the log level used, or a separate {{debugUnused}} 
method that's hardcoded to use {{{}DEBUG{}}}-level logging, or something else 
entirely).
 # Add a new {{warn.on.unused}} boolean property to the producer, consumer, and 
admin client API, with a default of {{{}true{}}}. When enabled, the current 
behavior of logging unrecognized values at {{WARN}} level will take place. When 
disabled (i.e., set to {{{}false{}}}), unrecognized values will be instead 
logged at {{DEBUG}} level, using the new {{AbstractConfig}} API from part 1.
 # Modify Connect to set {{warn.on.unused}} to {{false}} by default for all 
Kafka clients that it uses for its internal topics (and possibly in other 
contexts where the rate of false positives is also high). Allow users to 
override this default by explicitly specifying a value for the property in the 
Connect worker config.

This would be completely backwards compatible outside of Connect but also allow 
for other projects to opt in to a reduction in {{{}WARN{}}}-level log spam, and 
within Connect, would greatly reduce the false positive rate for 
{{{}WARN{}}}-level messages.

Obviously, it would require a KIP, but I'm happy to write one if this seems 
like a reasonable high-level approach. Thoughts?


was (Author: chrisegerton):
Bumping this again as the problem continues to persist for Connect (and 
possibly other projects).

 

While I agree that there's value in logging warnings about unrecognized values 
in some contexts, that value is lost when the false positive rate for these 
warnings becomes too high, which appears to be the case for Connect. As 
[~rhauch] has noted with his exhaustive research into this problem space, it's 
basically impossible at the moment for Connect to preemptively filter out 
irrelevant values for some of the Kafka clients that it creates. And, as 
[~mjsax] has noted, it's not an option to unconditionally demote these log 
messages to {{DEBUG}} level for all Kafka clients, since there are plenty of 
projects that do not generate these kinds of false positives, in which case 
these messages do serve as effective warnings that there may be a typo in the 
client's config.

 

One compromise could be to do the following:
 # Add a new {{AbstractConfig::logUnused}} variant that operates at {{DEBUG}} 
level instead of {{WARN}} (exact details negotiable; this could be an 
overloading of the existing {{logUnused}} method that accepts a boolean or some 
other parameter to toggle the log level used, or a separate {{debugUnused}} 
method that's hardcoded to use {{{}DEBUG{}}}-level logging, or something else 
entirely).
 # Add a new {{warn.on.unused}} boolean property to the producer, consumer, and 
admin client API, with a default of {{{}true{}}}. When enabled, the current 
behavior of logging unrecognized values at {{WARN}} level will take place. When 
disabled (i.e., set to {{{}false{}}}), unrecognized values will be instead 
logged at {{DEBUG}} level, using the new {{AbstractConfig}} API from part 1.
 # Modify Connect to set {{warn.on.unused}} to {{false}} by default for all 
Kafka clients that it uses for its internal topics (and possibly in other 
contexts where the rate of false positives is also high). Allow users to 
override this default by explicitly specifying a value for the property in the 
Connect worker config.

 

This would be completely backwards compatible outside of Connect but also allow 
for other projects to opt in to a reduction in {{{}WARN{}}}-level log spam, and 
with

[jira] [Assigned] (KAFKA-7509) Kafka Connect logs unnecessary warnings about unused configurations

2022-03-16 Thread Randall Hauch (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-7509?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Randall Hauch reassigned KAFKA-7509:


Assignee: (was: Randall Hauch)

> Kafka Connect logs unnecessary warnings about unused configurations
> ---
>
> Key: KAFKA-7509
> URL: https://issues.apache.org/jira/browse/KAFKA-7509
> Project: Kafka
>  Issue Type: Improvement
>  Components: clients, KafkaConnect
>Affects Versions: 0.10.2.0
>Reporter: Randall Hauch
>Priority: Major
>
> When running Connect, the logs contain quite a few warnings about "The 
> configuration '{}' was supplied but isn't a known config." This occurs when 
> Connect creates producers, consumers, and admin clients, because the 
> AbstractConfig is logging unused configuration properties upon construction. 
> It's complicated by the fact that the Producer, Consumer, and AdminClient all 
> create their own AbstractConfig instances within the constructor, so we can't 
> even call its {{ignore(String key)}} method.
> See also KAFKA-6793 for a similar issue with Streams.
> There are no arguments in the Producer, Consumer, or AdminClient constructors 
> to control  whether the configs log these warnings, so a simpler workaround 
> is to only pass those configuration properties to the Producer, Consumer, and 
> AdminClient that the ProducerConfig, ConsumerConfig, and AdminClientConfig 
> configdefs know about.



--
This message was sent by Atlassian Jira
(v8.20.1#820001)


[GitHub] [kafka] lkokhreidze commented on pull request #11837: KAFKA-6718 / Add rack awareness configurations to StreamsConfig

2022-03-16 Thread GitBox


lkokhreidze commented on pull request #11837:
URL: https://github.com/apache/kafka/pull/11837#issuecomment-1069360043


   Big thank you @cadonna and @showuon for all the reviews.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[jira] [Comment Edited] (KAFKA-7509) Kafka Connect logs unnecessary warnings about unused configurations

2022-03-16 Thread Chris Egerton (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-7509?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17507768#comment-17507768
 ] 

Chris Egerton edited comment on KAFKA-7509 at 3/16/22, 6:15 PM:


Bumping this again as the problem continues to persist for Connect (and 
possibly other projects).

While I agree that there's value in logging warnings about unrecognized values 
in some contexts, that value is lost when the false positive rate for these 
warnings becomes too high, which appears to be the case for Connect. As 
[~rhauch] has noted with his exhaustive research into this problem space, it's 
basically impossible at the moment for Connect to preemptively filter out 
irrelevant values for some of the Kafka clients that it creates. And, as 
[~mjsax] has noted, it's not an option to unconditionally demote these log 
messages to {{DEBUG}} level for all Kafka clients, since there are plenty of 
projects that do not generate these kinds of false positives, in which case 
these messages do serve as effective warnings that there may be a typo in the 
client's config.

One compromise could be to do the following:
 # Add a new {{AbstractConfig::logUnused}} variant that operates at {{DEBUG}} 
level instead of {{WARN}} (exact details negotiable; this could be an 
overloading of the existing {{logUnused}} method that accepts a boolean or some 
other parameter to toggle the log level used, or a separate {{debugUnused}} 
method that's hardcoded to use {{{}DEBUG{}}}-level logging, or something else 
entirely).
 # Add a new {{warn.on.unused}} boolean property to the producer, consumer, and 
admin client API, with a default of {{{}true{}}}. When enabled, the current 
behavior of logging unrecognized values at {{WARN}} level will take place. When 
disabled (i.e., set to {{{}false{}}}), unrecognized values will be instead 
logged at {{DEBUG}} level, using the new {{AbstractConfig}} API from part 1.
 # Modify Connect to set {{warn.on.unused}} to {{false}} by default for all 
Kafka clients that it uses for its internal topics (and possibly in other 
contexts where the rate of false positives is also high). Allow users to 
override this default by explicitly specifying a value for the property in the 
Connect worker (and possibly connector) config.

This would be completely backwards compatible outside of Connect but also allow 
for other projects to opt in to a reduction in {{{}WARN{}}}-level log spam, and 
within Connect, would greatly reduce the false positive rate for 
{{{}WARN{}}}-level messages.

Obviously, it would require a KIP, but I'm happy to write one if this seems 
like a reasonable high-level approach. Thoughts?


was (Author: chrisegerton):
Bumping this again as the problem continues to persist for Connect (and 
possibly other projects).

While I agree that there's value in logging warnings about unrecognized values 
in some contexts, that value is lost when the false positive rate for these 
warnings becomes too high, which appears to be the case for Connect. As 
[~rhauch] has noted with his exhaustive research into this problem space, it's 
basically impossible at the moment for Connect to preemptively filter out 
irrelevant values for some of the Kafka clients that it creates. And, as 
[~mjsax] has noted, it's not an option to unconditionally demote these log 
messages to {{DEBUG}} level for all Kafka clients, since there are plenty of 
projects that do not generate these kinds of false positives, in which case 
these messages do serve as effective warnings that there may be a typo in the 
client's config.

One compromise could be to do the following:
 # Add a new {{AbstractConfig::logUnused}} variant that operates at {{DEBUG}} 
level instead of {{WARN}} (exact details negotiable; this could be an 
overloading of the existing {{logUnused}} method that accepts a boolean or some 
other parameter to toggle the log level used, or a separate {{debugUnused}} 
method that's hardcoded to use {{{}DEBUG{}}}-level logging, or something else 
entirely).
 # Add a new {{warn.on.unused}} boolean property to the producer, consumer, and 
admin client API, with a default of {{{}true{}}}. When enabled, the current 
behavior of logging unrecognized values at {{WARN}} level will take place. When 
disabled (i.e., set to {{{}false{}}}), unrecognized values will be instead 
logged at {{DEBUG}} level, using the new {{AbstractConfig}} API from part 1.
 # Modify Connect to set {{warn.on.unused}} to {{false}} by default for all 
Kafka clients that it uses for its internal topics (and possibly in other 
contexts where the rate of false positives is also high). Allow users to 
override this default by explicitly specifying a value for the property in the 
Connect worker config.

This would be completely backwards compatible outside of Connect but also allow 
for other projects to opt in to a reduction in {{{}WARN{}}}-level log

[GitHub] [kafka] cadonna commented on pull request #11837: KAFKA-6718 / Add rack awareness configurations to StreamsConfig

2022-03-16 Thread GitBox


cadonna commented on pull request #11837:
URL: https://github.com/apache/kafka/pull/11837#issuecomment-1069358982


   Merged!
   @lkokhreidze Congrats to this milestone! Nice work! 🥳 


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] mumrah commented on a change in pull request #11893: KAFKA-13682; KRaft Controller auto preferred leader election

2022-03-16 Thread GitBox


mumrah commented on a change in pull request #11893:
URL: https://github.com/apache/kafka/pull/11893#discussion_r828173718



##
File path: 
metadata/src/main/java/org/apache/kafka/controller/ReplicationControlManager.java
##
@@ -1016,6 +1042,55 @@ ApiError electLeader(String topic, int partitionId, 
ElectionType electionType,
 return ControllerResult.of(records, null);
 }
 
+boolean arePartitionLeadersImbalanced() {
+return !imbalancedPartitions.isEmpty();
+}
+
+/**
+ * Attempt to elect a preferred leader for all topic partitions that a 
leader that is not the preferred replica.
+ *
+ * The response() method in the return object is true if this method 
returned without electing all possible preferred replicas.
+ * The quorum controlller should reschedule this operation immediately if 
it is true.
+ *
+ * @return All of the election records and if there may be more available 
preferred replicas to elect as leader
+ */
+ControllerResult maybeBalancePartitionLeaders() {
+List records = new ArrayList<>();
+
+boolean rescheduleImmidiately = false;
+for (TopicIdPartition topicPartition : imbalancedPartitions) {
+if (records.size() >= maxElectionsPerImbalance) {
+rescheduleImmidiately = true;
+break;
+}
+
+TopicControlInfo topic = topics.get(topicPartition.topicId());
+if (topic == null) {
+log.error("Skipping unknown imbalanced topic {}", 
topicPartition);
+continue;
+}
+
+PartitionRegistration partition = 
topic.parts.get(topicPartition.partitionId());
+if (partition == null) {
+log.error("Skipping unknown imbalanced partition {}", 
topicPartition);
+continue;
+}

Review comment:
   Hmm, I think if we hit one of these cases (which seems unlikely), we'll 
never remove the item from `imbalancedPartitions` since we won't get a 
PartitionChangeRecord. I'm not really sure how we can properly handle this 
though since we don't want to mutate `imbalancedPartitions` outside of a 
"replay" method. WDYT?




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] jsancio commented on pull request #11893: KAFKA-13682; KRaft Controller auto preferred leader election

2022-03-16 Thread GitBox


jsancio commented on pull request #11893:
URL: https://github.com/apache/kafka/pull/11893#issuecomment-1069422793


   Thanks for the reviews @junrao and @mumrah. I uploaded a commit that changes 
the election algorithm for `PartitionChangeBuilder`: 
https://github.com/apache/kafka/pull/11893/commits/5a7fc90c0672b45a4f25616eb094c9cfd682afb7.
   
   Pushed it as a separate commit so that it is easier to review. I am working 
through your other comments. Let me know what you think.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[jira] [Created] (KAFKA-13748) Do not include file stream connectors in Connect's CLASSPATH and plugin.path by default

2022-03-16 Thread Konstantine Karantasis (Jira)
Konstantine Karantasis created KAFKA-13748:
--

 Summary: Do not include file stream connectors in Connect's 
CLASSPATH and plugin.path by default
 Key: KAFKA-13748
 URL: https://issues.apache.org/jira/browse/KAFKA-13748
 Project: Kafka
  Issue Type: Bug
  Components: KafkaConnect
Reporter: Konstantine Karantasis
Assignee: Konstantine Karantasis
 Fix For: 3.2.0, 3.1.1, 3.0.2


File stream connectors have been included with Kafka Connect distributions from 
the very beginning. These simple connectors were included to show case 
connector implementation but were never meant to be used in production and have 
been only available for the straightforward demonstration of Connect's 
capabilities through our quick start guides. 
 
 Given that these connectors are not production ready and yet they offer access 
to the local filesystem, with this ticket I propose to remove them from our 
deployments by default by excluding these connectors from the {{CLASSPATH}} or 
the default {{{}plugin.path{}}}. 
 
 The impact will be minimal. Quick start guides will require a single 
additional step of editing the {{plugin.path}} to include the single package 
that includes these connectors. Production deployments will remain unaffected 
because these are not production grade connectors. 



--
This message was sent by Atlassian Jira
(v8.20.1#820001)


[GitHub] [kafka] kkonstantine opened a new pull request #11908: KAFKA-13748: Do not include file stream connectors in Connect's CLASSPATH and plugin.path by default

2022-03-16 Thread GitBox


kkonstantine opened a new pull request #11908:
URL: https://github.com/apache/kafka/pull/11908


   The purpose of this PR is to stop including the non-production grade 
connectors that are meant to be used for demos and quick starts by default in 
the CLASSPATH and plugin.path of Connect deployments. The package of these 
connector still be shipped with the Apache Kafka distribution and will be 
available for explicit inclusion. 
   
   The changes will be tested through the system tests
   
   ### Committer Checklist (excluded from commit message)
   - [ ] Verify design and implementation 
   - [ ] Verify test coverage and CI build status
   - [ ] Verify documentation (including upgrade notes)
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[jira] [Updated] (KAFKA-13739) Sliding window without grace not working

2022-03-16 Thread Matthias J. Sax (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-13739?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Matthias J. Sax updated KAFKA-13739:

Labels: beginner newbie  (was: )

> Sliding window without grace not working
> 
>
> Key: KAFKA-13739
> URL: https://issues.apache.org/jira/browse/KAFKA-13739
> Project: Kafka
>  Issue Type: Bug
>  Components: streams
>Affects Versions: 3.1.0
>Reporter: bounkong khamphousone
>Priority: Minor
>  Labels: beginner, newbie
>
> Hi everyone! I would like to understand why KafkaStreams DSL offer the 
> ability to express a SlidingWindow with no grace period but seems that it 
> doesn't work. [confluent's 
> site|https://docs.confluent.io/platform/current/streams/developer-guide/dsl-api.html#sliding-time-windows]
>  state that grace period is required and with the deprecated method, it's 
> default to 24 hours.
> Doing a basic sliding window with a count, if I set grace period to 1 ms, 
> expected output is done. Based on the sliding window documentation, lower and 
> upper bounds are inclusive.
> If I set grace period to 0 ms, I can see that record is not skipped at 
> KStreamSlidingWindowAggregate(l.126) but when we try to create the window and 
> push the event in KStreamSlidingWindowAggregate#createWindows we call the 
> method updateWindowAndForward(l.417). This method (l.468) check that 
> {{{}windowEnd > closeTime{}}}.
> closeTime is defined as {{observedStreamTime - window.gracePeriodMs}} 
> (Sliding window configuration)
> windowEnd is defined as {{{}inputRecordTimestamp{}}}.
>  
> For a first event with a record timestamp, we can assume that 
> observedStreamTime is equal to inputRecordTimestamp.
>  
> Therefore, closeTime is {{inputRecordTimestamp - 0}} (gracePeriodMS) which 
> results to {{{}inputRecordTimestamp{}}}.
> If we go back to the check done in {{updateWindowAndForward}} method, then we 
> have inputRecordTimestamp > inputRecordTimestamp which is always false. The 
> record is then skipped for record's own window.
> Stating that lower and upper bounds are inclusive, I would have expected the 
> event to be pushed in the store and forwarded. Hence, the check would be 
> {{{}windowEnd >= closeTime{}}}.
>  
> Is it a bug or is it intended ?
> Thanks in advance for your explanations!
> Best regards!



--
This message was sent by Atlassian Jira
(v8.20.1#820001)


[jira] [Commented] (KAFKA-13739) Sliding window without grace not working

2022-03-16 Thread Matthias J. Sax (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-13739?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17507825#comment-17507825
 ] 

Matthias J. Sax commented on KAFKA-13739:
-

Thanks for chiming in [~lct45] \
{quote}_if_ we want to support a grace period of 
{quote}
Given that we support `SlidingWindow.ofTimeDifferenceNoGrace()` we should 
support 0ms grace period :) 

> Sliding window without grace not working
> 
>
> Key: KAFKA-13739
> URL: https://issues.apache.org/jira/browse/KAFKA-13739
> Project: Kafka
>  Issue Type: Bug
>  Components: streams
>Affects Versions: 3.1.0
>Reporter: bounkong khamphousone
>Priority: Minor
>
> Hi everyone! I would like to understand why KafkaStreams DSL offer the 
> ability to express a SlidingWindow with no grace period but seems that it 
> doesn't work. [confluent's 
> site|https://docs.confluent.io/platform/current/streams/developer-guide/dsl-api.html#sliding-time-windows]
>  state that grace period is required and with the deprecated method, it's 
> default to 24 hours.
> Doing a basic sliding window with a count, if I set grace period to 1 ms, 
> expected output is done. Based on the sliding window documentation, lower and 
> upper bounds are inclusive.
> If I set grace period to 0 ms, I can see that record is not skipped at 
> KStreamSlidingWindowAggregate(l.126) but when we try to create the window and 
> push the event in KStreamSlidingWindowAggregate#createWindows we call the 
> method updateWindowAndForward(l.417). This method (l.468) check that 
> {{{}windowEnd > closeTime{}}}.
> closeTime is defined as {{observedStreamTime - window.gracePeriodMs}} 
> (Sliding window configuration)
> windowEnd is defined as {{{}inputRecordTimestamp{}}}.
>  
> For a first event with a record timestamp, we can assume that 
> observedStreamTime is equal to inputRecordTimestamp.
>  
> Therefore, closeTime is {{inputRecordTimestamp - 0}} (gracePeriodMS) which 
> results to {{{}inputRecordTimestamp{}}}.
> If we go back to the check done in {{updateWindowAndForward}} method, then we 
> have inputRecordTimestamp > inputRecordTimestamp which is always false. The 
> record is then skipped for record's own window.
> Stating that lower and upper bounds are inclusive, I would have expected the 
> event to be pushed in the store and forwarded. Hence, the check would be 
> {{{}windowEnd >= closeTime{}}}.
>  
> Is it a bug or is it intended ?
> Thanks in advance for your explanations!
> Best regards!



--
This message was sent by Atlassian Jira
(v8.20.1#820001)


[jira] [Created] (KAFKA-13749) TopicConfigs and ErrorCode are not set in createTopics response in KRaft

2022-03-16 Thread Akhilesh Chaganti (Jira)
Akhilesh Chaganti created KAFKA-13749:
-

 Summary: TopicConfigs and ErrorCode are not set in createTopics 
response in KRaft
 Key: KAFKA-13749
 URL: https://issues.apache.org/jira/browse/KAFKA-13749
 Project: Kafka
  Issue Type: Bug
  Components: kraft
Reporter: Akhilesh Chaganti


Once the createTopics request is process in KRaft, the `CreatableTopicResult` 
is not set with the appropriate topic configs and error and this breaks KIP-525



--
This message was sent by Atlassian Jira
(v8.20.1#820001)


[jira] [Commented] (KAFKA-7509) Kafka Connect logs unnecessary warnings about unused configurations

2022-03-16 Thread Matthias J. Sax (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-7509?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17507833#comment-17507833
 ] 

Matthias J. Sax commented on KAFKA-7509:


Personally, I think the best way would be to tackle this with a KIP across the 
board: not just for connect. The idea would be to allow user to add a prefix to 
the config they want to "forward" to "inner" classes. For example, if an 
interceptor needs a config, we could use `interceptor.foo.bar` instead of 
`foo.bar` – this way, we know that the consumer/producer can ignore this 
config, and when we forward the config into the interceptor, we actually remove 
`interceptor` prefix.

The `interceptor` prefix would of course be quite specific (and it only works 
because interceptors are a Kafka concept). We could also generalize it using a 
prefix `forward.`.

In Kafka Streams, we are already using `main.consumer.`, `restore.consumer.`, 
and `producer.` as prefix to allow users to configure individual client and to 
avoid that `StreamsConfig` logs warnings about "unknown" client config.

The advantage of the prefix approach is, that is it much more fine-grained 
compared to a config, and it can also be applies recursively. For example, in 
Kafka Streams we use an `AdminClient` inside the `ConsumerClient`. If we want 
to configure the admin client, we can provide 
`main.consumer.forward.actualAdminConfigName` and avoid that the consumer logs 
a warning about an unknown client.

> Kafka Connect logs unnecessary warnings about unused configurations
> ---
>
> Key: KAFKA-7509
> URL: https://issues.apache.org/jira/browse/KAFKA-7509
> Project: Kafka
>  Issue Type: Improvement
>  Components: clients, KafkaConnect
>Affects Versions: 0.10.2.0
>Reporter: Randall Hauch
>Priority: Major
>
> When running Connect, the logs contain quite a few warnings about "The 
> configuration '{}' was supplied but isn't a known config." This occurs when 
> Connect creates producers, consumers, and admin clients, because the 
> AbstractConfig is logging unused configuration properties upon construction. 
> It's complicated by the fact that the Producer, Consumer, and AdminClient all 
> create their own AbstractConfig instances within the constructor, so we can't 
> even call its {{ignore(String key)}} method.
> See also KAFKA-6793 for a similar issue with Streams.
> There are no arguments in the Producer, Consumer, or AdminClient constructors 
> to control  whether the configs log these warnings, so a simpler workaround 
> is to only pass those configuration properties to the Producer, Consumer, and 
> AdminClient that the ProducerConfig, ConsumerConfig, and AdminClientConfig 
> configdefs know about.



--
This message was sent by Atlassian Jira
(v8.20.1#820001)


[jira] [Commented] (KAFKA-7509) Kafka Connect logs unnecessary warnings about unused configurations

2022-03-16 Thread Chris Egerton (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-7509?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17507868#comment-17507868
 ] 

Chris Egerton commented on KAFKA-7509:
--

Thanks [~mjsax]--I think that's an acceptable approach, but it would come at a 
few costs:
 # Breaking backwards compatibility for a lot of Kafka components 
(producer/consumer interceptors, Connect REST extensions, etc.)
 # Complicating the configuration behavior for applications with this type of 
"nested" configuration surface
 # Makes it impossible to have sub-configured components that are aware of 
their parent's configuration (for example, you could no longer have a Connect 
REST extension that's aware of the entire Connect worker's config without 
duplicating those same properties into the sub-config for that REST extension)

I personally hesitate to inflict a breaking change on users just for the sake 
of cleaning up log messages, but the second point actually provides a bit of an 
opportunity to clean up some of the uglier parts of Connect (and possibly other 
projects as well).

Right now, if you want to configure a Connect worker (or connector) with 
credentials to use for its connectors' Kafka clients, you have to specify the 
same credentials in up to three different places: one for each type of Kafka 
client. (This isn't a hypothetical scenario; it's required if you want to run a 
sink connector that uses the DLQ feature.)

The ability to define a single "forward." (or possibly just "clients.") prefix 
for properties to be passed to all nested configurations (or all nested 
configurations used exclusively for Kafka clients) would help with situations 
like this by eliminating the need to duplicate properties within a single 
configuration.

 

Ultimately, this still seems a little too aggressive of a change for the 
problem that it's trying to solve. If we were redesigning these APIs from the 
ground up, it would certainly be beneficial, but considering how much has been 
built on top of these APIs already and how much work it'd take for users to 
adjust to the proposed changes, it doesn't seem like a friendly tradeoff. Plus, 
for some situations (like the example with REST extensions), it's unclear how 
we'd want to proceed.

> Kafka Connect logs unnecessary warnings about unused configurations
> ---
>
> Key: KAFKA-7509
> URL: https://issues.apache.org/jira/browse/KAFKA-7509
> Project: Kafka
>  Issue Type: Improvement
>  Components: clients, KafkaConnect
>Affects Versions: 0.10.2.0
>Reporter: Randall Hauch
>Priority: Major
>
> When running Connect, the logs contain quite a few warnings about "The 
> configuration '{}' was supplied but isn't a known config." This occurs when 
> Connect creates producers, consumers, and admin clients, because the 
> AbstractConfig is logging unused configuration properties upon construction. 
> It's complicated by the fact that the Producer, Consumer, and AdminClient all 
> create their own AbstractConfig instances within the constructor, so we can't 
> even call its {{ignore(String key)}} method.
> See also KAFKA-6793 for a similar issue with Streams.
> There are no arguments in the Producer, Consumer, or AdminClient constructors 
> to control  whether the configs log these warnings, so a simpler workaround 
> is to only pass those configuration properties to the Producer, Consumer, and 
> AdminClient that the ProducerConfig, ConsumerConfig, and AdminClientConfig 
> configdefs know about.



--
This message was sent by Atlassian Jira
(v8.20.1#820001)


[GitHub] [kafka] jsancio commented on a change in pull request #11893: KAFKA-13682; KRaft Controller auto preferred leader election

2022-03-16 Thread GitBox


jsancio commented on a change in pull request #11893:
URL: https://github.com/apache/kafka/pull/11893#discussion_r828471282



##
File path: 
metadata/src/main/java/org/apache/kafka/controller/QuorumController.java
##
@@ -673,10 +679,19 @@ public void run() throws Exception {
 replay(message.message(), Optional.empty(), offset);
 }
 snapshotRegistry.getOrCreateSnapshot(offset);
+
 log.debug("Read-write operation {} will be completed when the 
log " +
 "reaches offset {}.", this, resultAndOffset.offset());
 }
-purgatory.add(resultAndOffset.offset(), this);
+
+// After every controller write event, schedule a leader rebalance 
if there are any topic partition
+// with leader that is not the preferred leader.
+maybeScheduleNextBalancePartitionLeaders();
+
+// Remember the latest offset and future if it is not already 
completed
+if (!future.isDone()) {

Review comment:
   We need this check. The old code returned early and completed the future 
when the event didn't have any records to append.
   
   For are write event there are three cases:
   1. The write operation generated records. In this case we append them to the 
log, wait for the return offset to get commit and finally complete the future 
when the committed offset it reached.
   2. The write operation doesn't generate any records but we have uncommitted 
state in-memory. This case is similar to 1. but instead of waiting for the 
appended offset we wait for the highest offset in the purgatory.
   3. The write operation doesn't generate any records and there is no 
uncommitted state. In this case we complete the future immediately and don't 
store in the purgatory.
   
   I had to add this check because I removed the `return` early in the previous 
code.  




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[jira] [Assigned] (KAFKA-13749) TopicConfigs and ErrorCode are not set in createTopics response in KRaft

2022-03-16 Thread Akhilesh Chaganti (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-13749?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Akhilesh Chaganti reassigned KAFKA-13749:
-

Assignee: Akhilesh Chaganti

> TopicConfigs and ErrorCode are not set in createTopics response in KRaft
> 
>
> Key: KAFKA-13749
> URL: https://issues.apache.org/jira/browse/KAFKA-13749
> Project: Kafka
>  Issue Type: Bug
>  Components: kraft
>Reporter: Akhilesh Chaganti
>Assignee: Akhilesh Chaganti
>Priority: Major
>
> Once the createTopics request is process in KRaft, the `CreatableTopicResult` 
> is not set with the appropriate topic configs and error and this breaks 
> KIP-525



--
This message was sent by Atlassian Jira
(v8.20.1#820001)


[GitHub] [kafka] jsancio commented on a change in pull request #11893: KAFKA-13682; KRaft Controller auto preferred leader election

2022-03-16 Thread GitBox


jsancio commented on a change in pull request #11893:
URL: https://github.com/apache/kafka/pull/11893#discussion_r828505982



##
File path: 
metadata/src/main/java/org/apache/kafka/controller/QuorumController.java
##
@@ -861,6 +875,10 @@ public void handleLeaderChange(LeaderAndEpoch newLeader) {
 // required because the active controller assumes that 
there is always an in-memory snapshot at the
 // last committed offset.
 snapshotRegistry.getOrCreateSnapshot(lastCommittedOffset);
+
+// When becoming the active controller, schedule a leader 
rebalance if there are any topic partition
+// with leader that is not the preferred leader.
+maybeScheduleNextBalancePartitionLeaders();

Review comment:
   I think there are two cases here. Let me cover them in turn. First 
`imbalancedPartitions` is a `TimelineHashSet` so this means that the "latest" 
value/snapshot for this data structure is consistent with `writeOffset` in the 
active controller and `lastCommittedOffset` in the inactive controllers.
   
   Just like other timeline data structure, the quorum controller updates them 
by replaying records from the metadata log.
   
   For inactive controllers, the `imbalancedParttions` gets updated as it 
replays `PartitionRecords` and `PartitionChangeRecords`. Partition records that 
have the preferred replica as leader are remove from the set, otherwise they 
are added to the set. In other words, for inactive controllers the 
`imbalancedPartitions` will always contain the topic partitions that don't have 
a preferred leader at `lastCommittedOffset`. Lastly, inactive controller never 
schedule or perform `ControllerWriteEvent` hence inactive controller never 
schedule `maybeBalancePartitionLeaders` events.
   
   Inactive controllers only become active after the have caught up to the 
committed state and they have been elected leader. This is guarantee by 
`KafkaRaftClient` and `handleLeaderChange`.
   
   When becoming leader we want to schedule a rebalance operation if there are 
imbalance partitions.
   
   You are correct. We need to cancel the rebalance operation if we renounce as 
the active controller. Let me implement that.
   
   For other write operations this code will throw
   ```
 @Override
 public void run() throws Exception {
 long now = time.nanoseconds();
 
controllerMetrics.updateEventQueueTime(NANOSECONDS.toMillis(now - 
eventCreatedTimeNs));
 int controllerEpoch = curClaimEpoch;
 if (controllerEpoch == -1) {
 throw newNotControllerException();
 }
   ```
   and handled here:
   ```
 } else if (e instanceof NotControllerException) {
 log.debug("Cancelling deferred write event {} because this 
controller " +
 "is no longer active.", name);
 return null;
 }
   ```




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] jsancio commented on a change in pull request #11893: KAFKA-13682; KRaft Controller auto preferred leader election

2022-03-16 Thread GitBox


jsancio commented on a change in pull request #11893:
URL: https://github.com/apache/kafka/pull/11893#discussion_r828510590



##
File path: core/src/main/scala/kafka/server/KafkaConfig.scala
##
@@ -1745,7 +1745,7 @@ class KafkaConfig private(doLog: Boolean, val props: 
java.util.Map[_, _], dynami
   val deleteRecordsPurgatoryPurgeIntervalRequests = 
getInt(KafkaConfig.DeleteRecordsPurgatoryPurgeIntervalRequestsProp)
   val autoLeaderRebalanceEnable = 
getBoolean(KafkaConfig.AutoLeaderRebalanceEnableProp)
   val leaderImbalancePerBrokerPercentage = 
getInt(KafkaConfig.LeaderImbalancePerBrokerPercentageProp)
-  val leaderImbalanceCheckIntervalSeconds = 
getLong(KafkaConfig.LeaderImbalanceCheckIntervalSecondsProp)
+  val leaderImbalanceCheckIntervalSeconds: Long = 
getLong(KafkaConfig.LeaderImbalanceCheckIntervalSecondsProp)

Review comment:
   We don't have any plans to implement it for KRaft in the near future. I 
am in favor of deprecating the configuration properties for both 
implementation. Let me start a KIP discussion regarding this change. 




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[jira] [Created] (KAFKA-13750) Client Compatability KafkaTest uses invalid idempotency configs

2022-03-16 Thread Justine Olshan (Jira)
Justine Olshan created KAFKA-13750:
--

 Summary: Client Compatability KafkaTest uses invalid idempotency 
configs
 Key: KAFKA-13750
 URL: https://issues.apache.org/jira/browse/KAFKA-13750
 Project: Kafka
  Issue Type: Bug
Reporter: Justine Olshan


With the switch to idempotency as a default, some of our tests broke including 

ClientCompatibilityFeaturesTest.run_compatibility_test for versions prior to 
0.11 where EOS was enabled. We need to configure the producer correctly for 
these earlier versions.



--
This message was sent by Atlassian Jira
(v8.20.1#820001)


[jira] [Assigned] (KAFKA-13750) Client Compatability KafkaTest uses invalid idempotency configs

2022-03-16 Thread Justine Olshan (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-13750?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Justine Olshan reassigned KAFKA-13750:
--

Assignee: Justine Olshan

> Client Compatability KafkaTest uses invalid idempotency configs
> ---
>
> Key: KAFKA-13750
> URL: https://issues.apache.org/jira/browse/KAFKA-13750
> Project: Kafka
>  Issue Type: Bug
>Reporter: Justine Olshan
>Assignee: Justine Olshan
>Priority: Major
>
> With the switch to idempotency as a default, some of our tests broke 
> including 
> ClientCompatibilityFeaturesTest.run_compatibility_test for versions prior to 
> 0.11 where EOS was enabled. We need to configure the producer correctly for 
> these earlier versions.



--
This message was sent by Atlassian Jira
(v8.20.1#820001)


[GitHub] [kafka] jsancio commented on a change in pull request #11893: KAFKA-13682; KRaft Controller auto preferred leader election

2022-03-16 Thread GitBox


jsancio commented on a change in pull request #11893:
URL: https://github.com/apache/kafka/pull/11893#discussion_r828541824



##
File path: 
metadata/src/main/java/org/apache/kafka/controller/QuorumController.java
##
@@ -953,6 +971,48 @@ private void cancelMaybeFenceReplicas() {
 queue.cancelDeferred(MAYBE_FENCE_REPLICAS);
 }
 
+private static final int MAX_ELECTIONS_PER_IMBALANCE = 1_000;
+
+private void maybeScheduleNextBalancePartitionLeaders() {
+final String maybeBalancePartitionLeaders = 
"maybeBalancePartitionLeaders";
+
+if (imbalancedScheduled != ImbalanceSchedule.SCHEDULED &&
+leaderImbalanceCheckIntervalNs >= 0 &&
+replicationControl.arePartitionLeadersImbalanced()) {
+
+log.debug(
+"Scheduling deferred event for {} because scheduled ({}), 
checkIntervalNs ({}) and isImbalanced ({})",
+maybeBalancePartitionLeaders,
+imbalancedScheduled,
+leaderImbalanceCheckIntervalNs,
+replicationControl.arePartitionLeadersImbalanced()
+);
+long delayNs = time.nanoseconds();
+if (imbalancedScheduled == ImbalanceSchedule.DEFERRED) {
+delayNs += leaderImbalanceCheckIntervalNs;
+}
+scheduleDeferredWriteEvent(maybeBalancePartitionLeaders, delayNs, 
() -> {

Review comment:
   Hmm. Yeah, I think that `KafkaEventQueue` is unfair if we schedule 
deferred events faster than we can process none deferred event. We have a 
similar problem with snapshot generation. Unfortunately, we can only cancel 
events that have been deferred.
   
   I think we need to fix this issue in `KafkaEventQueue`.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] jsancio commented on a change in pull request #11893: KAFKA-13682; KRaft Controller auto preferred leader election

2022-03-16 Thread GitBox


jsancio commented on a change in pull request #11893:
URL: https://github.com/apache/kafka/pull/11893#discussion_r828541824



##
File path: 
metadata/src/main/java/org/apache/kafka/controller/QuorumController.java
##
@@ -953,6 +971,48 @@ private void cancelMaybeFenceReplicas() {
 queue.cancelDeferred(MAYBE_FENCE_REPLICAS);
 }
 
+private static final int MAX_ELECTIONS_PER_IMBALANCE = 1_000;
+
+private void maybeScheduleNextBalancePartitionLeaders() {
+final String maybeBalancePartitionLeaders = 
"maybeBalancePartitionLeaders";
+
+if (imbalancedScheduled != ImbalanceSchedule.SCHEDULED &&
+leaderImbalanceCheckIntervalNs >= 0 &&
+replicationControl.arePartitionLeadersImbalanced()) {
+
+log.debug(
+"Scheduling deferred event for {} because scheduled ({}), 
checkIntervalNs ({}) and isImbalanced ({})",
+maybeBalancePartitionLeaders,
+imbalancedScheduled,
+leaderImbalanceCheckIntervalNs,
+replicationControl.arePartitionLeadersImbalanced()
+);
+long delayNs = time.nanoseconds();
+if (imbalancedScheduled == ImbalanceSchedule.DEFERRED) {
+delayNs += leaderImbalanceCheckIntervalNs;
+}
+scheduleDeferredWriteEvent(maybeBalancePartitionLeaders, delayNs, 
() -> {

Review comment:
   Hmm. Yeah, I think that `KafkaEventQueue` is unfair if we schedule 
deferred events faster than we can process none deferred event. We have a 
similar problem with snapshot generation. Unfortunately, we can only cancel 
events that have been deferred.
   
   I think we need to fix this issue in `KafkaEventQueue`. I think this is 
beyond the scope of this PR. Do you mind if I fix this issue in a future PR?




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] jsancio commented on a change in pull request #11893: KAFKA-13682; KRaft Controller auto preferred leader election

2022-03-16 Thread GitBox


jsancio commented on a change in pull request #11893:
URL: https://github.com/apache/kafka/pull/11893#discussion_r828542616



##
File path: 
metadata/src/main/java/org/apache/kafka/controller/ReplicationControlManager.java
##
@@ -341,22 +359,25 @@ public void replay(RemoveTopicRecord record) {
 // Delete the configurations associated with this topic.
 configurationControl.deleteTopicConfigs(topic.name);
 
-// Remove the entries for this topic in brokersToIsrs.
-for (PartitionRegistration partition : topic.parts.values()) {
+for (Map.Entry entry : 
topic.parts.entrySet()) {
+int partitionId = entry.getKey();
+PartitionRegistration partition = entry.getValue();
+
+// Remove the entries for this topic in brokersToIsrs.
 for (int i = 0; i < partition.isr.length; i++) {
 brokersToIsrs.removeTopicEntryForBroker(topic.id, 
partition.isr[i]);
 }
-if (partition.leader != partition.preferredReplica()) {
-preferredReplicaImbalanceCount.decrement();
-}
+
+imbalancedPartitions.remove(new TopicIdPartition(record.topicId(), 
partitionId));
+

Review comment:
   What do you mean?




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] jolshan opened a new pull request #11909: KAFKA-13750: Client Compatability KafkaTest uses invalid idempotency configs

2022-03-16 Thread GitBox


jolshan opened a new pull request #11909:
URL: https://github.com/apache/kafka/pull/11909


   With the switch to idempotency as a default, 
ClientCompatibilityFeaturesTest.run_compatibility_test broke for versions prior 
to 0.11 where EOS was enabled. This PR disables idempotency for kafka versions 
prior to 0.11.
   
   ### Committer Checklist (excluded from commit message)
   - [ ] Verify design and implementation 
   - [ ] Verify test coverage and CI build status
   - [ ] Verify documentation (including upgrade notes)
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] jsancio commented on a change in pull request #11893: KAFKA-13682; KRaft Controller auto preferred leader election

2022-03-16 Thread GitBox


jsancio commented on a change in pull request #11893:
URL: https://github.com/apache/kafka/pull/11893#discussion_r828543830



##
File path: 
metadata/src/main/java/org/apache/kafka/controller/ReplicationControlManager.java
##
@@ -1016,6 +1042,55 @@ ApiError electLeader(String topic, int partitionId, 
ElectionType electionType,
 return ControllerResult.of(records, null);
 }
 
+boolean arePartitionLeadersImbalanced() {
+return !imbalancedPartitions.isEmpty();
+}
+
+/**
+ * Attempt to elect a preferred leader for all topic partitions that a 
leader that is not the preferred replica.
+ *
+ * The response() method in the return object is true if this method 
returned without electing all possible preferred replicas.
+ * The quorum controlller should reschedule this operation immediately if 
it is true.
+ *
+ * @return All of the election records and if there may be more available 
preferred replicas to elect as leader
+ */
+ControllerResult maybeBalancePartitionLeaders() {
+List records = new ArrayList<>();
+
+boolean rescheduleImmidiately = false;
+for (TopicIdPartition topicPartition : imbalancedPartitions) {
+if (records.size() >= maxElectionsPerImbalance) {
+rescheduleImmidiately = true;
+break;
+}
+
+TopicControlInfo topic = topics.get(topicPartition.topicId());
+if (topic == null) {
+log.error("Skipping unknown imbalanced topic {}", 
topicPartition);
+continue;
+}
+
+PartitionRegistration partition = 
topic.parts.get(topicPartition.partitionId());
+if (partition == null) {
+log.error("Skipping unknown imbalanced partition {}", 
topicPartition);
+continue;
+}

Review comment:
   This case means that there is a bug in the controller implementation. We 
can either skip the topic partition or renounce the active controller. I 
decided to skip it for now.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] jsancio commented on a change in pull request #11893: KAFKA-13682; KRaft Controller auto preferred leader election

2022-03-16 Thread GitBox


jsancio commented on a change in pull request #11893:
URL: https://github.com/apache/kafka/pull/11893#discussion_r828545014



##
File path: 
metadata/src/main/java/org/apache/kafka/controller/QuorumController.java
##
@@ -147,6 +147,7 @@
 private int defaultNumPartitions = 1;
 private ReplicaPlacer replicaPlacer = new StripedReplicaPlacer(new 
Random());
 private long snapshotMaxNewRecordBytes = Long.MAX_VALUE;
+private long leaderImbalanceCheckIntervalNs = -1;

Review comment:
   Decided to use `OptionalLong`.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] jsancio commented on a change in pull request #11893: KAFKA-13682; KRaft Controller auto preferred leader election

2022-03-16 Thread GitBox


jsancio commented on a change in pull request #11893:
URL: https://github.com/apache/kafka/pull/11893#discussion_r828545292



##
File path: 
metadata/src/main/java/org/apache/kafka/controller/ReplicationControlManager.java
##
@@ -1016,6 +1042,55 @@ ApiError electLeader(String topic, int partitionId, 
ElectionType electionType,
 return ControllerResult.of(records, null);
 }
 
+boolean arePartitionLeadersImbalanced() {
+return !imbalancedPartitions.isEmpty();
+}
+
+/**
+ * Attempt to elect a preferred leader for all topic partitions that a 
leader that is not the preferred replica.
+ *
+ * The response() method in the return object is true if this method 
returned without electing all possible preferred replicas.
+ * The quorum controlller should reschedule this operation immediately if 
it is true.
+ *
+ * @return All of the election records and if there may be more available 
preferred replicas to elect as leader
+ */
+ControllerResult maybeBalancePartitionLeaders() {
+List records = new ArrayList<>();
+
+boolean rescheduleImmidiately = false;
+for (TopicIdPartition topicPartition : imbalancedPartitions) {
+if (records.size() >= maxElectionsPerImbalance) {
+rescheduleImmidiately = true;
+break;
+}
+
+TopicControlInfo topic = topics.get(topicPartition.topicId());
+if (topic == null) {
+log.error("Skipping unknown imbalanced topic {}", 
topicPartition);
+continue;
+}
+
+PartitionRegistration partition = 
topic.parts.get(topicPartition.partitionId());
+if (partition == null) {
+log.error("Skipping unknown imbalanced partition {}", 
topicPartition);
+continue;
+}
+
+// Attempt to perform a preferred leader election
+PartitionChangeBuilder builder = new PartitionChangeBuilder(
+partition,
+topicPartition.topicId(),
+topicPartition.partitionId(),
+r -> clusterControl.unfenced(r),
+() -> false
+);
+builder.setAlwaysElectPreferredIfPossible(true);
+builder.build().ifPresent(records::add);

Review comment:
   Agree. Added this comment: 
https://github.com/apache/kafka/pull/11893#issuecomment-1069422793




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] jsancio commented on a change in pull request #11893: KAFKA-13682; KRaft Controller auto preferred leader election

2022-03-16 Thread GitBox


jsancio commented on a change in pull request #11893:
URL: https://github.com/apache/kafka/pull/11893#discussion_r828545811



##
File path: 
metadata/src/main/java/org/apache/kafka/controller/QuorumController.java
##
@@ -1197,6 +1257,25 @@ private void resetState() {
  */
 private long newBytesSinceLastSnapshot = 0;
 
+/**
+ * How long to delay partition leader balancing operations.
+ */
+private final long leaderImbalanceCheckIntervalNs;
+
+private static enum ImbalanceSchedule {
+// Leader balancing has been scheduled
+SCHEDULED,
+// Leader balancing should be scheduled in the future
+DEFERRED,

Review comment:
   Yes. I fixed the documentation for these values to make this clear.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] jsancio commented on a change in pull request #11893: KAFKA-13682; KRaft Controller auto preferred leader election

2022-03-16 Thread GitBox


jsancio commented on a change in pull request #11893:
URL: https://github.com/apache/kafka/pull/11893#discussion_r828546042



##
File path: core/src/main/scala/kafka/server/ControllerServer.scala
##
@@ -158,20 +158,30 @@ class ControllerServer(
   alterConfigPolicy = Option(config.
 getConfiguredInstance(AlterConfigPolicyClassNameProp, 
classOf[AlterConfigPolicy]))
 
-  val controllerBuilder = new QuorumController.Builder(config.nodeId, 
metaProperties.clusterId).
-setTime(time).
-setThreadNamePrefix(threadNamePrefixAsString).
-setConfigSchema(configSchema).
-setRaftClient(raftManager.client).
-setDefaultReplicationFactor(config.defaultReplicationFactor.toShort).
-setDefaultNumPartitions(config.numPartitions.intValue()).
-
setSessionTimeoutNs(TimeUnit.NANOSECONDS.convert(config.brokerSessionTimeoutMs.longValue(),
-  TimeUnit.MILLISECONDS)).
-setSnapshotMaxNewRecordBytes(config.metadataSnapshotMaxNewRecordBytes).
-setMetrics(new 
QuorumControllerMetrics(KafkaYammerMetrics.defaultRegistry())).
-setCreateTopicPolicy(createTopicPolicy.asJava).
-setAlterConfigPolicy(alterConfigPolicy.asJava).
-setConfigurationValidator(new ControllerConfigurationValidator())
+  val controllerBuilder = {
+val leaderImbalanceCheckIntervalNs = if 
(config.autoLeaderRebalanceEnable) {

Review comment:
   I added this check to `KafkaConfig`. :crossed_fingers: 




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] jsancio commented on a change in pull request #11893: KAFKA-13682; KRaft Controller auto preferred leader election

2022-03-16 Thread GitBox


jsancio commented on a change in pull request #11893:
URL: https://github.com/apache/kafka/pull/11893#discussion_r828546402



##
File path: 
metadata/src/main/java/org/apache/kafka/controller/QuorumController.java
##
@@ -953,6 +971,48 @@ private void cancelMaybeFenceReplicas() {
 queue.cancelDeferred(MAYBE_FENCE_REPLICAS);
 }
 
+private static final int MAX_ELECTIONS_PER_IMBALANCE = 1_000;
+
+private void maybeScheduleNextBalancePartitionLeaders() {
+final String maybeBalancePartitionLeaders = 
"maybeBalancePartitionLeaders";
+
+if (imbalancedScheduled != ImbalanceSchedule.SCHEDULED &&
+leaderImbalanceCheckIntervalNs >= 0 &&
+replicationControl.arePartitionLeadersImbalanced()) {
+
+log.debug(
+"Scheduling deferred event for {} because scheduled ({}), 
checkIntervalNs ({}) and isImbalanced ({})",

Review comment:
   Yes. Technically it is a deferred event that has a deadline of now.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[jira] [Commented] (KAFKA-13463) Improvement: KafkaConsumer pause(Collection partitions)

2022-03-16 Thread RivenSun (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-13463?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17507913#comment-17507913
 ] 

RivenSun commented on KAFKA-13463:
--

Hi [~showuon] [~guozhang] 
My mailbox has not received a reply from the discussion thread below, causing 
me to ignore this. 
[https://lists.apache.org/thread/7269n0hncj0glyk3nfqdb8rybgm0xmoz]

Sorry for not replying until now, thank you for your suggestions and 
discussions.
I will create a PR to fix this issue.
Thanks.

> Improvement: KafkaConsumer pause(Collection partitions)
> ---
>
> Key: KAFKA-13463
> URL: https://issues.apache.org/jira/browse/KAFKA-13463
> Project: Kafka
>  Issue Type: Improvement
>  Components: consumer
>Affects Versions: 3.0.0
>Reporter: RivenSun
>Priority: Major
>  Labels: new-consumer-threading-should-fix
>
> h1. 1.Background
> When users use the kafkaConsumer#pause(...) method, they will maybe ignore: 
> the pause method may no longer work, and data will be lost.
> For example, the following simple code:
> {code:java}
> while (true) {
> try {
> kafkaConsumer.pause(kafkaConsumer.assignment());
> ConsumerRecords records = 
> kafkaConsumer.poll(Duration.ofSeconds(2));
> if (!records.isEmpty()) {
> log.error("kafka poll for rebalance discard some record!");
> }
> } catch (Exception e) {
> log.error("maintain poll for rebalance with error:{}", 
> e.getMessage(), e);
> }
> }{code}
> Even if you call pause(assignment) before the poll method every time, the 
> poll method may still return messages.
>  
> h1. 2. RootCause:
> In short, during the rebalance of the group, 
> ConsumerCoordinator#invokePartitionsRevoked(...) will clear the paused mark 
> on the partitions previously held by kafkaConsumer. However, while clearing 
> the paused mark of partitions, the corresponding message in the memory 
> (Fetcher.completedFetches) of pausedPartitions was not cleared, resulting in 
> Fetcher#fetchedRecords() still fetching the message and returning it to the 
> customer.
> For more detailed analysis, if you are interested, you can read Jira 
> https://issues.apache.org/jira/browse/KAFKA-13425 
> looking forward to your reply.
>  
> h1. 3.Discuss : Can KafkaConsumer support the pause method that is not 
> affected by groupRebalance?
> The KafkaConsumer#pause method actually stated one point at the beginning of 
> its design:
>  * Rebalance does not preserve pause/resume state.
> link:https://issues.apache.org/jira/browse/KAFKA-2350
> Unfortunately, I did not see this from the comments of the 
> KafkaConsumer#pause(...) method. At the same time, 
> ConsumerCoordinator#invokePartitionsRevoked did not have any log output when 
> cleaning up the paused mark. I believe that this will cause many users to use 
> the KafkaConsumer#pause(...) method incorrectly.
> But I think it is necessary for KafkaConsumer to provide a pause method that 
> is not affected by groupRebalance.
>  
> h1. 4. Suggestions
> I will optimize the existing pause method from several different 
> perspectives, or provide some new {{pause}} methods, and each point is an 
> independent solution
> h2. 1)ConsumerCoordinator#invokePartitionsRevoked should also trigger Fetcher 
> to clean up the revokedAndPausedPartitions message in memory when clearing 
> the paused mark
> This can prevent the Fetcher#fetchedRecords() method from mistakenly thinking 
> that revokedAndPausedPartitions is legal and returning messages. There are 
> various checks on the partition in the fetchedRecords method.
> The price of this is that if the user does not call the pause(...) method 
> before calling the poll method next time, a new FetchMessage request may be 
> initiated, which will cause additional network transmission.
>  
> h2. 2)Efforts to maintain the old paused mark on the KafkaConsumer side
> <1>In the ConsumerCoordinator#onJoinPrepare(...) method, record all 
> pausedTopicPartitions from the current assignment of KafkaConsumer;
>  <2> In the ConsumerCoordinator#onJoinComplete(...) method, use 
> pausedTopicPartitions to render the latest assignment and restore the paused 
> marks of the partitions that are still in the latest assignment.
> {*}Note{*}: If the new assignment of kafkaConsumer no longer contains 
> topicPartitions that have been paused before rebalance, the paused mark of 
> these topicPartitions will be lost forever on the kafkaConsumer side, even if 
> in a future rebalance, the kafkaConsumer will hold these partitions again.
> At the end of the Jira KAFKA-13425 I mentioned above, I gave a draft code 
> suggestion on this point
> <3> In fact, for consumers who use the RebalanceProtocol.COOPERATIVE protocol
> For example, consumers who use the currently supp

[GitHub] [kafka] dongjinleekr commented on pull request #11430: KAFKA-13352: Kafka Client does not support passwords starting with number in jaas config

2022-03-16 Thread GitBox


dongjinleekr commented on pull request #11430:
URL: https://github.com/apache/kafka/pull/11430#issuecomment-1069811650


   Rebased onto the latest trunk. cc/ @rajinisivaram


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] RivenSun2 commented on pull request #11819: MINOR: Optimize the generateFieldToString method of MessageDataGenerator

2022-03-16 Thread GitBox


RivenSun2 commented on pull request #11819:
URL: https://github.com/apache/kafka/pull/11819#issuecomment-1069812919


   Hi @dajac @cmccabe
   Could you help to review the PR ?
   Thanks.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] showuon merged pull request #11705: KAFKA-9847: add config to set default store type (KIP-591)

2022-03-16 Thread GitBox


showuon merged pull request #11705:
URL: https://github.com/apache/kafka/pull/11705


   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] showuon commented on pull request #11705: KAFKA-9847: add config to set default store type (KIP-591)

2022-03-16 Thread GitBox


showuon commented on pull request #11705:
URL: https://github.com/apache/kafka/pull/11705#issuecomment-1069884347


   Thanks for all your review!


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] showuon commented on pull request #11837: KAFKA-6718 / Add rack awareness configurations to StreamsConfig

2022-03-16 Thread GitBox


showuon commented on pull request #11837:
URL: https://github.com/apache/kafka/pull/11837#issuecomment-1069887108


   Nice work @lkokhreidze ! And nice team work, @cadonna ! :)


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] showuon commented on pull request #11905: MINOR: Fix incorrect log for out-of-order KTable

2022-03-16 Thread GitBox


showuon commented on pull request #11905:
URL: https://github.com/apache/kafka/pull/11905#issuecomment-1069914952


   @tchiotludo , there is test failed due to this change. Please help fix it.
   ```
   
org.apache.kafka.streams.kstream.internals.KTableSourceTest.kTableShouldLogOnOutOfOrder
   ```
   
   Thanks.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] showuon commented on a change in pull request #11909: KAFKA-13750: Client Compatability KafkaTest uses invalid idempotency configs

2022-03-16 Thread GitBox


showuon commented on a change in pull request #11909:
URL: https://github.com/apache/kafka/pull/11909#discussion_r828704440



##
File path: 
tools/src/main/java/org/apache/kafka/tools/ClientCompatibilityTest.java
##
@@ -172,6 +174,13 @@ public static void main(String[] args) throws Exception {
 .dest("describeConfigsSupported")
 .metavar("DESCRIBE_CONFIGS_SUPPORTED")
 .help("Whether describeConfigs is supported in the AdminClient.");
+parser.addArgument("--idempotent-producer-supported")
+.action(store())
+.required(true)
+.type(Boolean.class)
+.dest("idempotentProducerSupported")
+.metavar("IDEMPOTENT_PRODUCER_SUPPORTED")
+.help("Whether the producer supports idempotency.");

Review comment:
   indent is not aligned with above lines.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[jira] [Updated] (KAFKA-10000) Atomic commit of source connector records and offsets

2022-03-16 Thread Chris Egerton (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-1?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Chris Egerton updated KAFKA-1:
--
Fix Version/s: 3.3.0
   (was: 3.2.0)

> Atomic commit of source connector records and offsets
> -
>
> Key: KAFKA-1
> URL: https://issues.apache.org/jira/browse/KAFKA-1
> Project: Kafka
>  Issue Type: New Feature
>  Components: KafkaConnect
>Reporter: Chris Egerton
>Assignee: Chris Egerton
>Priority: Major
>  Labels: needs-kip
> Fix For: 3.3.0
>
>
> It'd be nice to be able to configure source connectors such that their 
> offsets are committed if and only if all records up to that point have been 
> ack'd by the producer. This would go a long way towards EOS for source 
> connectors.
>  
> This differs from https://issues.apache.org/jira/browse/KAFKA-6079, which is 
> marked as {{WONTFIX}} since it only concerns enabling the idempotent producer 
> for source connectors and is not concerned with source connector offsets.
> This also differs from https://issues.apache.org/jira/browse/KAFKA-6080, 
> which had a lot of discussion around allowing connector-defined transaction 
> boundaries. The suggestion in this ticket is to only use source connector 
> offset commits as the transaction boundaries for connectors; allowing 
> connector-specified transaction boundaries can be addressed separately.



--
This message was sent by Atlassian Jira
(v8.20.1#820001)


[jira] [Commented] (KAFKA-10000) Atomic commit of source connector records and offsets

2022-03-16 Thread Chris Egerton (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-1?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17507963#comment-17507963
 ] 

Chris Egerton commented on KAFKA-1:
---

Bumping target release version as feature freeze for 3.2.0 has passed.

> Atomic commit of source connector records and offsets
> -
>
> Key: KAFKA-1
> URL: https://issues.apache.org/jira/browse/KAFKA-1
> Project: Kafka
>  Issue Type: New Feature
>  Components: KafkaConnect
>Reporter: Chris Egerton
>Assignee: Chris Egerton
>Priority: Major
>  Labels: needs-kip
> Fix For: 3.2.0
>
>
> It'd be nice to be able to configure source connectors such that their 
> offsets are committed if and only if all records up to that point have been 
> ack'd by the producer. This would go a long way towards EOS for source 
> connectors.
>  
> This differs from https://issues.apache.org/jira/browse/KAFKA-6079, which is 
> marked as {{WONTFIX}} since it only concerns enabling the idempotent producer 
> for source connectors and is not concerned with source connector offsets.
> This also differs from https://issues.apache.org/jira/browse/KAFKA-6080, 
> which had a lot of discussion around allowing connector-defined transaction 
> boundaries. The suggestion in this ticket is to only use source connector 
> offset commits as the transaction boundaries for connectors; allowing 
> connector-specified transaction boundaries can be addressed separately.



--
This message was sent by Atlassian Jira
(v8.20.1#820001)


[GitHub] [kafka] C0urante commented on a change in pull request #11196: KAFKA-13185 Clear message batch in rewind

2022-03-16 Thread GitBox


C0urante commented on a change in pull request #11196:
URL: https://github.com/apache/kafka/pull/11196#discussion_r828716597



##
File path: 
connect/runtime/src/main/java/org/apache/kafka/connect/runtime/WorkerSinkTask.java
##
@@ -396,6 +396,8 @@ private void commitOffsets(long now, boolean closing) {
 log.debug("{} Rewinding topic partition {} to offset {}", 
this, entry.getKey(), entry.getValue().offset());
 consumer.seek(entry.getKey(), entry.getValue().offset());
 }
+// Clear messages that might have been left in messageBatch 
after last poll
+messageBatch.clear();

Review comment:
   Do we need to also adjust `pausedForRedelivery` and resume the topic 
partitions here?




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[jira] [Commented] (KAFKA-13740) kafka-stream-client-shutdown

2022-03-16 Thread Prashanth Joseph Babu (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-13740?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17507965#comment-17507965
 ] 

Prashanth Joseph Babu commented on KAFKA-13740:
---

[~cadonna]  Thank you so much for your response . Yes I had set 
*{{setUncaughtExceptionHandler}}* {{and I had returned SHUTDOWN_APPLICATION 
which is what caused the behavior that I was noticing . I changed it to 
SHUTDOWN_CLIENT and got the behavior I was expecting .}}

 

Thanks once again!

> kafka-stream-client-shutdown
> 
>
> Key: KAFKA-13740
> URL: https://issues.apache.org/jira/browse/KAFKA-13740
> Project: Kafka
>  Issue Type: Bug
>  Components: streams
>Affects Versions: 3.1.0
>Reporter: Prashanth Joseph Babu
>Priority: Major
>
> I have an apache kafka streams application . I notice that it sometimes 
> shutsdown when a rebalancing occurs with no real reason for the shutdown . It 
> doesn't even throw an exception.
> Here are some logs on the same 
> {code:java}
> [2022-03-08 17:13:37,024] INFO [Consumer 
> clientId=svc-stream-collector-StreamThread-1-consumer, 
> groupId=svc-stream-collector] Adding newly assigned partitions:  
> (org.apache.kafka.clients.consumer.internals.ConsumerCoordinator)
> [2022-03-08 17:13:37,024] ERROR stream-thread 
> [svc-stream-collector-StreamThread-1] A Kafka Streams client in this Kafka 
> Streams application is requesting to shutdown the application 
> (org.apache.kafka.streams.processor.internals.StreamThread)
> [2022-03-08 17:13:37,030] INFO stream-client [svc-stream-collector] State 
> transition from REBALANCING to PENDING_ERROR 
> (org.apache.kafka.streams.KafkaStreams)
> old state:REBALANCING new state:PENDING_ERROR
> [2022-03-08 17:13:37,031] INFO [Consumer 
> clientId=svc-stream-collector-StreamThread-1-consumer, 
> groupId=svc-stream-collector] (Re-)joining group 
> (org.apache.kafka.clients.consumer.internals.AbstractCoordinator)
> [2022-03-08 17:13:37,032] INFO stream-thread 
> [svc-stream-collector-StreamThread-1] Informed to shut down 
> (org.apache.kafka.streams.processor.internals.StreamThread)
> [2022-03-08 17:13:37,032] INFO stream-thread 
> [svc-stream-collector-StreamThread-1] State transition from 
> PARTITIONS_REVOKED to PENDING_SHUTDOWN 
> (org.apache.kafka.streams.processor.internals.StreamThread)
> [2022-03-08 17:13:37,067] INFO stream-thread 
> [svc-stream-collector-StreamThread-1] Thread state is already 
> PENDING_SHUTDOWN, skipping the run once call after poll request 
> (org.apache.kafka.streams.processor.internals.StreamThread)
> [2022-03-08 17:13:37,067] WARN stream-thread 
> [svc-stream-collector-StreamThread-1] Detected that shutdown was requested. 
> All clients in this app will now begin to shutdown 
> (org.apache.kafka.streams.processor.internals.StreamThread)
> {code}
> I'm suspecting its because there are no `newly assigned partitions in the log 
> below`
> {code:java}
> [2022-03-08 17:13:37,024] INFO [Consumer 
> clientId=svc-stream-collector-StreamThread-1-consumer, 
> groupId=svc-stream-collector] Adding newly assigned partitions:  
> (org.apache.kafka.clients.consumer.internals.ConsumerCoordinator)
> {code}
> However I'm not exactly sure why this error occurs . Any help would be 
> appreciated.



--
This message was sent by Atlassian Jira
(v8.20.1#820001)


[jira] [Resolved] (KAFKA-13740) kafka-stream-client-shutdown

2022-03-16 Thread Prashanth Joseph Babu (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-13740?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Prashanth Joseph Babu resolved KAFKA-13740.
---
Resolution: Fixed

> kafka-stream-client-shutdown
> 
>
> Key: KAFKA-13740
> URL: https://issues.apache.org/jira/browse/KAFKA-13740
> Project: Kafka
>  Issue Type: Bug
>  Components: streams
>Affects Versions: 3.1.0
>Reporter: Prashanth Joseph Babu
>Priority: Major
>
> I have an apache kafka streams application . I notice that it sometimes 
> shutsdown when a rebalancing occurs with no real reason for the shutdown . It 
> doesn't even throw an exception.
> Here are some logs on the same 
> {code:java}
> [2022-03-08 17:13:37,024] INFO [Consumer 
> clientId=svc-stream-collector-StreamThread-1-consumer, 
> groupId=svc-stream-collector] Adding newly assigned partitions:  
> (org.apache.kafka.clients.consumer.internals.ConsumerCoordinator)
> [2022-03-08 17:13:37,024] ERROR stream-thread 
> [svc-stream-collector-StreamThread-1] A Kafka Streams client in this Kafka 
> Streams application is requesting to shutdown the application 
> (org.apache.kafka.streams.processor.internals.StreamThread)
> [2022-03-08 17:13:37,030] INFO stream-client [svc-stream-collector] State 
> transition from REBALANCING to PENDING_ERROR 
> (org.apache.kafka.streams.KafkaStreams)
> old state:REBALANCING new state:PENDING_ERROR
> [2022-03-08 17:13:37,031] INFO [Consumer 
> clientId=svc-stream-collector-StreamThread-1-consumer, 
> groupId=svc-stream-collector] (Re-)joining group 
> (org.apache.kafka.clients.consumer.internals.AbstractCoordinator)
> [2022-03-08 17:13:37,032] INFO stream-thread 
> [svc-stream-collector-StreamThread-1] Informed to shut down 
> (org.apache.kafka.streams.processor.internals.StreamThread)
> [2022-03-08 17:13:37,032] INFO stream-thread 
> [svc-stream-collector-StreamThread-1] State transition from 
> PARTITIONS_REVOKED to PENDING_SHUTDOWN 
> (org.apache.kafka.streams.processor.internals.StreamThread)
> [2022-03-08 17:13:37,067] INFO stream-thread 
> [svc-stream-collector-StreamThread-1] Thread state is already 
> PENDING_SHUTDOWN, skipping the run once call after poll request 
> (org.apache.kafka.streams.processor.internals.StreamThread)
> [2022-03-08 17:13:37,067] WARN stream-thread 
> [svc-stream-collector-StreamThread-1] Detected that shutdown was requested. 
> All clients in this app will now begin to shutdown 
> (org.apache.kafka.streams.processor.internals.StreamThread)
> {code}
> I'm suspecting its because there are no `newly assigned partitions in the log 
> below`
> {code:java}
> [2022-03-08 17:13:37,024] INFO [Consumer 
> clientId=svc-stream-collector-StreamThread-1-consumer, 
> groupId=svc-stream-collector] Adding newly assigned partitions:  
> (org.apache.kafka.clients.consumer.internals.ConsumerCoordinator)
> {code}
> However I'm not exactly sure why this error occurs . Any help would be 
> appreciated.



--
This message was sent by Atlassian Jira
(v8.20.1#820001)


[GitHub] [kafka] dongjinleekr commented on pull request #11579: KAFKA-13518: Update gson dependency

2022-03-16 Thread GitBox


dongjinleekr commented on pull request #11579:
URL: https://github.com/apache/kafka/pull/11579#issuecomment-1070139215


   Rebased onto the latest trunk. cc/ @ijuma


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] dengziming commented on a change in pull request #11471: MINOR: Replace EasyMock with Mockito in connect:file

2022-03-16 Thread GitBox


dengziming commented on a change in pull request #11471:
URL: https://github.com/apache/kafka/pull/11471#discussion_r828724024



##
File path: 
connect/file/src/test/java/org/apache/kafka/connect/file/FileStreamSourceTaskTest.java
##
@@ -204,8 +210,6 @@ private void writeTimesAndFlush(OutputStream os, int times, 
byte[] line) throws
 
 @Test
 public void testMissingFile() throws InterruptedException {

Review comment:
   Yeah, done!




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] dengziming commented on a change in pull request #11471: MINOR: Replace EasyMock with Mockito in connect:file

2022-03-16 Thread GitBox


dengziming commented on a change in pull request #11471:
URL: https://github.com/apache/kafka/pull/11471#discussion_r828724247



##
File path: 
connect/file/src/test/java/org/apache/kafka/connect/file/FileStreamSourceTaskTest.java
##
@@ -220,17 +224,22 @@ public void testMissingFile() throws InterruptedException 
{
 task.stop();
 }
 
+@Test
 public void testInvalidFile() throws InterruptedException {
 config.put(FileStreamSourceConnector.FILE_CONFIG, "bogusfilename");
 task.start(config);
 // Currently the task retries indefinitely if the file isn't found, 
but shouldn't return any data.
-for (int i = 0; i < 100; i++)
+for (int i = 0; i < 3; i++)
 assertNull(task.poll());
 }
 
-
 private void expectOffsetLookupReturnNone() {
-
EasyMock.expect(context.offsetStorageReader()).andReturn(offsetStorageReader);
-EasyMock.expect(offsetStorageReader.offset(EasyMock.>anyObject())).andReturn(null);
+when(context.offsetStorageReader()).thenReturn(offsetStorageReader);
+when(offsetStorageReader.offset(anyMap())).thenReturn(null);
+}
+
+private void verifyAll() {

Review comment:
   On I forgot about this, I removed the extra call after each method.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[jira] [Commented] (KAFKA-13136) kafka-connect task.max : active task in consumer group is limited by the bigger topic to consume

2022-03-16 Thread Chris Egerton (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-13136?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17507968#comment-17507968
 ] 

Chris Egerton commented on KAFKA-13136:
---

This is not a bug in Kafka Connect per se, but rather a side effect of the 
default consumer partition assignor, which at the moment is the 
[RangeAssignor|https://github.com/apache/kafka/blob/fbe7fb941173c0907792a8b48e8e9122aabecbd8/clients/src/main/java/org/apache/kafka/clients/consumer/RangeAssignor.java].
 That assignor gives the first partition of every topic to a single consumer, 
the second partition of every topic to a single consumer, etc. This makes it 
particularly ill-suited for consumer groups (or multi-task sink connectors) 
that read from a large number of small topics, or more generally, any situation 
where the number of consumers (or sink tasks) is greater than the maximum 
number of partitions in any single topic being consumed, and there are multiple 
consumers in the group (or sink tasks for the connector).

This should be automatically addressed once 
[KIP-726|https://cwiki.apache.org/confluence/pages/viewpage.action?pageId=177048248]/KAFKA-12473
 get merged, as that will cause the default partition assignor to be updated to 
the 
[CooperativeStickyAssignor|https://github.com/apache/kafka/blob/fbe7fb941173c0907792a8b48e8e9122aabecbd8/clients/src/main/java/org/apache/kafka/clients/consumer/CooperativeStickyAssignor.java],
 which has more intelligent assignment logic.

Until then, or if running on older versions of Kafka Connect that in turn use 
older versions of the Kafka clients library which still use the 
{{RangeAssignor}} as the default, there are a few options to work around this 
problem:
 # You can use a new default partition assignor for every connector on the 
worker by setting the {{consumer.partition.assignment.strategy}} property in 
your Kafka Connect worker config file.
 # You can configure the partition assignor on a per-connector basis by setting 
the {{consumer.override.partition.assignment.strategy}} property in your 
connector config (as long as the worker is configured with a connector client 
override policy that permits this, which should be possible with the default 
override policy as of 3.0.0).

As far as which assignor to use goes--if all you need is a guarantee that the 
spread of partitions across sink tasks is as even as possible, then you can use 
the 
[RoundRobinAssignor|https://github.com/apache/kafka/blob/fbe7fb941173c0907792a8b48e8e9122aabecbd8/clients/src/main/java/org/apache/kafka/clients/consumer/RoundRobinAssignor.java].
 Beyond that, it's probably beyond the scope of this ticket to make 
recommendations, but you can do some research of your own starting with the 
[docs for the consumer partition.assignment.strategy 
property|https://kafka.apache.org/31/documentation.html#consumerconfigs_partition.assignment.strategy].

 

Given that there is a known workaround for this issue and an approved, 
permanent fix in the works for an upcoming release, this may be safe to close. 
[~raphaelauv] thoughts?

> kafka-connect task.max : active task in consumer group is limited by the 
> bigger topic to consume
> 
>
> Key: KAFKA-13136
> URL: https://issues.apache.org/jira/browse/KAFKA-13136
> Project: Kafka
>  Issue Type: Bug
>Reporter: raphaelauv
>Priority: Major
>
> In kafka-connect 2.7
> *The maximum number of active task for a sink connector is equal to the topic 
> with the biggest number of partitions to consume*
> An active task is a task with partitions attributed in the consumer-group of 
> the sink connector
> example :
> With 2 topics where each have 10 partitions ( 20 partitions in total )
> The maximum number of active task is 10 ( if I set task.max at 12 ,there is 
> 10 members of the consumer group consuming partitions and  2 members in the 
> consumer-group that do not have partitions to consume).
> If I add a third topic with 15 partitions to the connector conf then the 12 
> members of the consumer group are consuming partitions, and then if I set now 
> task.max at 17 only 15 members are active in the consumer-group.



--
This message was sent by Atlassian Jira
(v8.20.1#820001)


[GitHub] [kafka] guozhangwang commented on pull request #11705: KAFKA-9847: add config to set default store type (KIP-591)

2022-03-16 Thread GitBox


guozhangwang commented on pull request #11705:
URL: https://github.com/apache/kafka/pull/11705#issuecomment-1070221651


   +1, thanks @showuon for the contribution! And congrats for it finally being 
merged.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[jira] [Created] (KAFKA-13751) On the broker side, OAUTHBEARER is not compatible with other SASL mechanisms

2022-03-16 Thread RivenSun (Jira)
RivenSun created KAFKA-13751:


 Summary: On the broker side, OAUTHBEARER is not compatible with 
other SASL mechanisms
 Key: KAFKA-13751
 URL: https://issues.apache.org/jira/browse/KAFKA-13751
 Project: Kafka
  Issue Type: Bug
  Components: security
Affects Versions: 3.0.1
Reporter: RivenSun


h1. Phenomenon:

 SASL/OAUTHBEARER, whether implemented by default or customized by the user, is 
not compatible with other SASL mechanisms.
h3. 
case1:

kafka_server_jaas_oauth.conf
{code:java}
KafkaServer {
  org.apache.kafka.common.security.plain.PlainLoginModule required
  username="admin"
  password="admin"
  user_admin="admin"
  user_alice="alice"; 

   org.apache.kafka.common.security.oauthbearer.OAuthBearerLoginModule required;

   org.apache.kafka.common.security.scram.ScramLoginModule required
  username="admin"
  password="admin_scram";
}; {code}
 server.properties
{code:java}
advertised.listeners=SASL_PLAINTEXT://publicIp:8779,SASL_SSL://publicIp:8889,OAUTH://publicIp:8669
 
listener.security.protocol.map=INTERNAL_SSL:SASL_SSL,PLAINTEXT:PLAINTEXT,SSL:SSL,SASL_PLAINTEXT:SASL_PLAINTEXT,SASL_SSL:SASL_SSL,OAUTH:SASL_SSL
sasl.enabled.mechanisms=PLAIN,SCRAM-SHA-256,SCRAM-SHA-512,OAUTHBEARER{code}
Error when starting kafka:
server.log
{code:java}
[2022-03-16 13:18:42,658] ERROR [KafkaServer id=1] Fatal error during 
KafkaServer startup. Prepare to shutdown (kafka.server.KafkaServer)
org.apache.kafka.common.KafkaException: java.lang.IllegalArgumentException: 
Must supply exactly 1 non-null JAAS mechanism configuration (size was 3)
        at 
org.apache.kafka.common.network.SaslChannelBuilder.configure(SaslChannelBuilder.java:172)
        at kafka.network.Processor.(SocketServer.scala:724)
        at kafka.network.SocketServer.newProcessor(SocketServer.scala:367)
        at 
kafka.network.SocketServer.$anonfun$addDataPlaneProcessors$1(SocketServer.scala:252)
        at 
kafka.network.SocketServer.addDataPlaneProcessors(SocketServer.scala:251)
        at 
kafka.network.SocketServer.$anonfun$createDataPlaneAcceptorsAndProcessors$1(SocketServer.scala:214)
        at 
kafka.network.SocketServer.$anonfun$createDataPlaneAcceptorsAndProcessors$1$adapted(SocketServer.scala:211)
        at 
scala.collection.mutable.ResizableArray.foreach(ResizableArray.scala:62)
        at 
scala.collection.mutable.ResizableArray.foreach$(ResizableArray.scala:55)
        at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:49)
        at 
kafka.network.SocketServer.createDataPlaneAcceptorsAndProcessors(SocketServer.scala:211)
        at kafka.network.SocketServer.startup(SocketServer.scala:122)
        at kafka.server.KafkaServer.startup(KafkaServer.scala:266)
        at 
kafka.server.KafkaServerStartable.startup(KafkaServerStartable.scala:44)
        at kafka.Kafka$.main(Kafka.scala:82)
        at kafka.Kafka.main(Kafka.scala)
Caused by: java.lang.IllegalArgumentException: Must supply exactly 1 non-null 
JAAS mechanism configuration (size was 3)
        at 
org.apache.kafka.common.security.oauthbearer.internals.unsecured.OAuthBearerUnsecuredValidatorCallbackHandler.configure(OAuthBearerUnsecuredValidatorCallbackHandler.java:117)
        at 
org.apache.kafka.common.network.SaslChannelBuilder.configure(SaslChannelBuilder.java:139)
        ... 17 more
[2022-03-16 13:18:42,662] INFO [KafkaServer id=1] shutting down 
(kafka.server.KafkaServer)
[2022-03-16 13:18:42,664] INFO [SocketServer brokerId=1] Stopping socket server 
request processors (kafka.network.SocketServer) {code}
The default implementation class of oauthbearer's 
`sasl.server.callback.handler.class` is 
OAuthBearerUnsecuredValidatorCallbackHandler. 
In the OAuthBearerUnsecuredValidatorCallbackHandler#configure(...) method, the 
jaasConfigEntries parameter is verified.
What I want to say is that {*}the verification logic here is completely 
reasonable{*}, but the jaasConfigEntries passed in from the upper layer should 
not contain the AppConfigurationEntry of other loginModules. There are several 
other codes for the check of the same keyword *"Must supply exactly 1 non-null 
JAAS mechanism configuration".*

Rootcause elaborates later.


By the way, at present, KafkaServer allows {*}the same LoginModule to be 
configured multiple times in kafkaJaasConfigFile{*}, which will also lead to 
the phenomenon of case1.

kafka_server_jaas_oauth.conf eg:
{code:java}
KafkaServer {
   org.apache.kafka.common.security.oauthbearer.OAuthBearerLoginModule required;

  org.apache.kafka.common.security.oauthbearer.OAuthBearerLoginModule required;
 
}; {code}
 




h3. case2:
On the basis of case1, modify the default implementation of oauthbearer's 
`sasl.server.callback.handler.class`



 server.properties add new configuration
{code:java}
listener.name.sasl_plaintext.oauthbearer.sasl.server.callback.handler.class=us.zoom.mq.security.oauth.AsyncMQOAuthCallbackHandle

[jira] [Commented] (KAFKA-13751) On the broker side, OAUTHBEARER is not compatible with other SASL mechanisms

2022-03-16 Thread RivenSun (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-13751?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17507973#comment-17507973
 ] 

RivenSun commented on KAFKA-13751:
--

Hi [~showuon] [~guozhang] 
I think this issue should be a serious bug and we should fix it asap.
Another issue KAFKA-13422 with the same cause can be fixed together.

In KAFKA-13422, you can see that I have asked the author of this module for 
advice and help many times. But they seem to be busy, can you push these two 
issues to be resolved as soon as possible?
Thanks a lot.

And [~rsivaram]  [~ijuma]  could you give me some advice lately?
Thanks.

> On the broker side, OAUTHBEARER is not compatible with other SASL mechanisms
> 
>
> Key: KAFKA-13751
> URL: https://issues.apache.org/jira/browse/KAFKA-13751
> Project: Kafka
>  Issue Type: Bug
>  Components: security
>Affects Versions: 3.0.1
>Reporter: RivenSun
>Priority: Critical
>
> h1. Phenomenon:
>  SASL/OAUTHBEARER, whether implemented by default or customized by the user, 
> is not compatible with other SASL mechanisms.
> h3. 
> case1:
> kafka_server_jaas_oauth.conf
> {code:java}
> KafkaServer {
>   org.apache.kafka.common.security.plain.PlainLoginModule required
>   username="admin"
>   password="admin"
>   user_admin="admin"
>   user_alice="alice"; 
>org.apache.kafka.common.security.oauthbearer.OAuthBearerLoginModule 
> required;
>org.apache.kafka.common.security.scram.ScramLoginModule required
>   username="admin"
>   password="admin_scram";
> }; {code}
>  server.properties
> {code:java}
> advertised.listeners=SASL_PLAINTEXT://publicIp:8779,SASL_SSL://publicIp:8889,OAUTH://publicIp:8669
>  
> listener.security.protocol.map=INTERNAL_SSL:SASL_SSL,PLAINTEXT:PLAINTEXT,SSL:SSL,SASL_PLAINTEXT:SASL_PLAINTEXT,SASL_SSL:SASL_SSL,OAUTH:SASL_SSL
> sasl.enabled.mechanisms=PLAIN,SCRAM-SHA-256,SCRAM-SHA-512,OAUTHBEARER{code}
> Error when starting kafka:
> server.log
> {code:java}
> [2022-03-16 13:18:42,658] ERROR [KafkaServer id=1] Fatal error during 
> KafkaServer startup. Prepare to shutdown (kafka.server.KafkaServer)
> org.apache.kafka.common.KafkaException: java.lang.IllegalArgumentException: 
> Must supply exactly 1 non-null JAAS mechanism configuration (size was 3)
>         at 
> org.apache.kafka.common.network.SaslChannelBuilder.configure(SaslChannelBuilder.java:172)
>         at kafka.network.Processor.(SocketServer.scala:724)
>         at kafka.network.SocketServer.newProcessor(SocketServer.scala:367)
>         at 
> kafka.network.SocketServer.$anonfun$addDataPlaneProcessors$1(SocketServer.scala:252)
>         at 
> kafka.network.SocketServer.addDataPlaneProcessors(SocketServer.scala:251)
>         at 
> kafka.network.SocketServer.$anonfun$createDataPlaneAcceptorsAndProcessors$1(SocketServer.scala:214)
>         at 
> kafka.network.SocketServer.$anonfun$createDataPlaneAcceptorsAndProcessors$1$adapted(SocketServer.scala:211)
>         at 
> scala.collection.mutable.ResizableArray.foreach(ResizableArray.scala:62)
>         at 
> scala.collection.mutable.ResizableArray.foreach$(ResizableArray.scala:55)
>         at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:49)
>         at 
> kafka.network.SocketServer.createDataPlaneAcceptorsAndProcessors(SocketServer.scala:211)
>         at kafka.network.SocketServer.startup(SocketServer.scala:122)
>         at kafka.server.KafkaServer.startup(KafkaServer.scala:266)
>         at 
> kafka.server.KafkaServerStartable.startup(KafkaServerStartable.scala:44)
>         at kafka.Kafka$.main(Kafka.scala:82)
>         at kafka.Kafka.main(Kafka.scala)
> Caused by: java.lang.IllegalArgumentException: Must supply exactly 1 non-null 
> JAAS mechanism configuration (size was 3)
>         at 
> org.apache.kafka.common.security.oauthbearer.internals.unsecured.OAuthBearerUnsecuredValidatorCallbackHandler.configure(OAuthBearerUnsecuredValidatorCallbackHandler.java:117)
>         at 
> org.apache.kafka.common.network.SaslChannelBuilder.configure(SaslChannelBuilder.java:139)
>         ... 17 more
> [2022-03-16 13:18:42,662] INFO [KafkaServer id=1] shutting down 
> (kafka.server.KafkaServer)
> [2022-03-16 13:18:42,664] INFO [SocketServer brokerId=1] Stopping socket 
> server request processors (kafka.network.SocketServer) {code}
> The default implementation class of oauthbearer's 
> `sasl.server.callback.handler.class` is 
> OAuthBearerUnsecuredValidatorCallbackHandler. 
> In the OAuthBearerUnsecuredValidatorCallbackHandler#configure(...) method, 
> the jaasConfigEntries parameter is verified.
> What I want to say is that {*}the verification logic here is completely 
> reasonable{*}, but the jaasConfigEntries passed in from the upper layer 
> should not contain the AppConfigurationEntry of other loginModules. 

[jira] [Commented] (KAFKA-13749) TopicConfigs and ErrorCode are not set in createTopics response in KRaft

2022-03-16 Thread dengziming (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-13749?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17507974#comment-17507974
 ] 

dengziming commented on KAFKA-13749:


Thank you for reporting this, can you provided more details on the bug or show 
an example?

> TopicConfigs and ErrorCode are not set in createTopics response in KRaft
> 
>
> Key: KAFKA-13749
> URL: https://issues.apache.org/jira/browse/KAFKA-13749
> Project: Kafka
>  Issue Type: Bug
>  Components: kraft
>Reporter: Akhilesh Chaganti
>Assignee: Akhilesh Chaganti
>Priority: Major
>
> Once the createTopics request is process in KRaft, the `CreatableTopicResult` 
> is not set with the appropriate topic configs and error and this breaks 
> KIP-525



--
This message was sent by Atlassian Jira
(v8.20.1#820001)


[jira] [Updated] (KAFKA-13751) On the broker side, OAUTHBEARER is not compatible with other SASL mechanisms

2022-03-16 Thread RivenSun (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-13751?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

RivenSun updated KAFKA-13751:
-
Description: 
h1. Phenomenon:

 SASL/OAUTHBEARER, whether implemented by default or customized by the user, is 
not compatible with other SASL mechanisms.
h3.  

case1:

kafka_server_jaas_oauth.conf
{code:java}
KafkaServer {
  org.apache.kafka.common.security.plain.PlainLoginModule required
  username="admin"
  password="admin"
  user_admin="admin"
  user_alice="alice"; 

   org.apache.kafka.common.security.oauthbearer.OAuthBearerLoginModule required;

   org.apache.kafka.common.security.scram.ScramLoginModule required
  username="admin"
  password="admin_scram";
}; {code}
 server.properties
{code:java}
advertised.listeners=SASL_PLAINTEXT://publicIp:8779,SASL_SSL://publicIp:8889,OAUTH://publicIp:8669
 
listener.security.protocol.map=INTERNAL_SSL:SASL_SSL,PLAINTEXT:PLAINTEXT,SSL:SSL,SASL_PLAINTEXT:SASL_PLAINTEXT,SASL_SSL:SASL_SSL,OAUTH:SASL_SSL
sasl.enabled.mechanisms=PLAIN,SCRAM-SHA-256,SCRAM-SHA-512,OAUTHBEARER{code}
Error when starting kafka:
server.log
{code:java}
[2022-03-16 13:18:42,658] ERROR [KafkaServer id=1] Fatal error during 
KafkaServer startup. Prepare to shutdown (kafka.server.KafkaServer)
org.apache.kafka.common.KafkaException: java.lang.IllegalArgumentException: 
Must supply exactly 1 non-null JAAS mechanism configuration (size was 3)
        at 
org.apache.kafka.common.network.SaslChannelBuilder.configure(SaslChannelBuilder.java:172)
        at kafka.network.Processor.(SocketServer.scala:724)
        at kafka.network.SocketServer.newProcessor(SocketServer.scala:367)
        at 
kafka.network.SocketServer.$anonfun$addDataPlaneProcessors$1(SocketServer.scala:252)
        at 
kafka.network.SocketServer.addDataPlaneProcessors(SocketServer.scala:251)
        at 
kafka.network.SocketServer.$anonfun$createDataPlaneAcceptorsAndProcessors$1(SocketServer.scala:214)
        at 
kafka.network.SocketServer.$anonfun$createDataPlaneAcceptorsAndProcessors$1$adapted(SocketServer.scala:211)
        at 
scala.collection.mutable.ResizableArray.foreach(ResizableArray.scala:62)
        at 
scala.collection.mutable.ResizableArray.foreach$(ResizableArray.scala:55)
        at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:49)
        at 
kafka.network.SocketServer.createDataPlaneAcceptorsAndProcessors(SocketServer.scala:211)
        at kafka.network.SocketServer.startup(SocketServer.scala:122)
        at kafka.server.KafkaServer.startup(KafkaServer.scala:266)
        at 
kafka.server.KafkaServerStartable.startup(KafkaServerStartable.scala:44)
        at kafka.Kafka$.main(Kafka.scala:82)
        at kafka.Kafka.main(Kafka.scala)
Caused by: java.lang.IllegalArgumentException: Must supply exactly 1 non-null 
JAAS mechanism configuration (size was 3)
        at 
org.apache.kafka.common.security.oauthbearer.internals.unsecured.OAuthBearerUnsecuredValidatorCallbackHandler.configure(OAuthBearerUnsecuredValidatorCallbackHandler.java:117)
        at 
org.apache.kafka.common.network.SaslChannelBuilder.configure(SaslChannelBuilder.java:139)
        ... 17 more
[2022-03-16 13:18:42,662] INFO [KafkaServer id=1] shutting down 
(kafka.server.KafkaServer)
[2022-03-16 13:18:42,664] INFO [SocketServer brokerId=1] Stopping socket server 
request processors (kafka.network.SocketServer) {code}
The default implementation class of oauthbearer's 
`sasl.server.callback.handler.class` is 
OAuthBearerUnsecuredValidatorCallbackHandler. 
In the OAuthBearerUnsecuredValidatorCallbackHandler#configure(...) method, the 
jaasConfigEntries parameter is verified.
What I want to say is that {*}the verification logic here is completely 
reasonable{*}, but the jaasConfigEntries passed in from the upper layer should 
not contain the AppConfigurationEntry of other loginModules. There are several 
other codes for the check of the same keyword *"Must supply exactly 1 non-null 
JAAS mechanism configuration".*

Rootcause elaborates later.

By the way, at present, KafkaServer allows {*}the same LoginModule to be 
configured multiple times in kafkaJaasConfigFile{*}, which will also lead to 
the phenomenon of case1.

kafka_server_jaas_oauth.conf eg:
{code:java}
KafkaServer {
   org.apache.kafka.common.security.oauthbearer.OAuthBearerLoginModule required;

  org.apache.kafka.common.security.oauthbearer.OAuthBearerLoginModule required;
 
}; {code}
 
h3. case2:

On the basis of case1, modify the default implementation of oauthbearer's 
`sasl.server.callback.handler.class`

 server.properties add new configuration
{code:java}
listener.name.sasl_plaintext.oauthbearer.sasl.server.callback.handler.class=us.zoom.mq.security.oauth.AsyncMQOAuthCallbackHandler
listener.name.sasl_ssl.oauthbearer.sasl.server.callback.handler.class=us.zoom.mq.security.oauth.AsyncMQOAuthCallbackHandler
listener.name.oauth.oauthbearer.sasl.server.callback.handler.class=us.zoom.mq.security

[GitHub] [kafka] dengziming commented on a change in pull request #11173: KAFKA-13509: Support max timestamp in GetOffsetShell

2022-03-16 Thread GitBox


dengziming commented on a change in pull request #11173:
URL: https://github.com/apache/kafka/pull/11173#discussion_r828767212



##
File path: core/src/test/scala/kafka/tools/GetOffsetShellTest.scala
##
@@ -109,6 +111,46 @@ class GetOffsetShellTest extends KafkaServerTestHarness 
with Logging {
 )
   }
 
+  @ParameterizedTest
+  @ValueSource(strings = Array("-1", "latest"))
+  def testGetLatestOffsets(time: String): Unit = {
+val offsets = executeAndParse(Array("--topic-partitions", "topic.*:0", 
"--time", time))
+assertEquals(
+  List(
+("topic1", 0, Some(1)),
+("topic2", 0, Some(2)),
+("topic3", 0, Some(3)),
+("topic4", 0, Some(4))
+  ),
+  offsets
+)
+  }
+
+  @ParameterizedTest
+  @ValueSource(strings = Array("-2", "earliest"))
+  def testGetEarliestOffsets(time: String): Unit = {
+val offsets = executeAndParse(Array("--topic-partitions", "topic.*:0", 
"--time", time))
+assertEquals(
+  List(
+("topic1", 0, Some(0)),
+("topic2", 0, Some(0)),
+("topic3", 0, Some(0)),
+("topic4", 0, Some(0))
+  ),
+  offsets
+)
+  }
+
+  @ParameterizedTest
+  @ValueSource(strings = Array("-3", "max-timestamp"))
+  def testGetOffsetsByMaxTimestamp(time: String): Unit = {

Review comment:
   I added a one to test this case, however, we can't get a deterministic 
result for a specified timestamp here so I just set a relatively small 
timestamp, and I think these case has been tested in AdminClientTest.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] dengziming commented on a change in pull request #11173: KAFKA-13509: Support max timestamp in GetOffsetShell

2022-03-16 Thread GitBox


dengziming commented on a change in pull request #11173:
URL: https://github.com/apache/kafka/pull/11173#discussion_r828767349



##
File path: core/src/main/scala/kafka/tools/GetOffsetShell.scala
##
@@ -224,9 +252,81 @@ object GetOffsetShell {
   /**
* Return the partition infos. Filter them with topicPartitionFilter.
*/
-  private def listPartitionInfos(consumer: KafkaConsumer[_, _], 
topicPartitionFilter: PartitionInfo => Boolean): Seq[PartitionInfo] = {
-consumer.listTopics.asScala.values.flatMap { partitions =>
-  partitions.asScala.filter(topicPartitionFilter)
+  private def listPartitionInfos(
+client: Admin,
+topicPartitionFilter: TopicPartitionFilter,
+excludeInternalTopics: Boolean
+  ): Seq[TopicPartition] = {
+val listTopicsOptions = new 
ListTopicsOptions().listInternal(!excludeInternalTopics)
+val topics = client.listTopics(listTopicsOptions).names.get
+val filteredTopics = 
topics.asScala.filter(topicPartitionFilter.isTopicAllowed)
+
+
client.describeTopics(filteredTopics.asJava).allTopicNames.get.asScala.flatMap 
{ case (topic, description) =>
+  description
+.partitions
+.asScala
+.map(tp => new TopicPartition(topic, tp.partition))
+.filter(topicPartitionFilter.isTopicPartitionAllowed)
 }.toBuffer
   }
 }
+
+/**
+ * Used to filter partitions after describing them

Review comment:
   Yeah, I reworded these annotations.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] dengziming commented on pull request #11173: KAFKA-13509: Support max timestamp in GetOffsetShell

2022-03-16 Thread GitBox


dengziming commented on pull request #11173:
URL: https://github.com/apache/kafka/pull/11173#issuecomment-1070342480


   Thank you @showuon , PTAL again.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[jira] [Assigned] (KAFKA-13743) kraft controller should prevent topics with conflicting metrics names from being created

2022-03-16 Thread dengziming (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-13743?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

dengziming reassigned KAFKA-13743:
--

Assignee: dengziming

> kraft controller should prevent topics with conflicting metrics names from 
> being created
> 
>
> Key: KAFKA-13743
> URL: https://issues.apache.org/jira/browse/KAFKA-13743
> Project: Kafka
>  Issue Type: Bug
>Reporter: Colin McCabe
>Assignee: dengziming
>Priority: Major
>  Labels: kip-500
>
> The kraft controller should prevent topics with conflicting metrics names 
> from being created, like the zk code does.
> Example:
> {code}
> [cmccabe@zeratul kafka1]$ ./bin/kafka-topics.sh --create --topic f.oo 
> --bootstrap-server localhost:9092 
> 
> WARNING: Due to limitations in metric names, topics with a period ('.') or 
> underscore ('_') could collide. To avoid issues it is best to use either, but 
> not both.
> Created topic f.oo.   
>   
>   
> [cmccabe@zeratul kafka1]$ ./bin/kafka-topics.sh --create --topic f_oo 
> --bootstrap-server localhost:9092
> WARNING: Due to limitations in metric names, topics with a period ('.') or 
> underscore ('_') could collide. To avoid issues it is best to use either, but 
> not both.
> Error while executing topic command : Topic 'f_oo' collides with existing 
> topics: f.oo
> [2022-03-15 09:48:49,563] ERROR 
> org.apache.kafka.common.errors.InvalidTopicException: Topic 'f_oo' collides 
> with existing topics: f.oo
>  (kafka.admin.TopicCommand$)
> {code}



--
This message was sent by Atlassian Jira
(v8.20.1#820001)


[GitHub] [kafka] showuon commented on pull request #11173: KAFKA-13509: Support max timestamp in GetOffsetShell

2022-03-16 Thread GitBox


showuon commented on pull request #11173:
URL: https://github.com/apache/kafka/pull/11173#issuecomment-1070375723


   @dajac @jolshan , do you want to have another look for this PR to make it 
into v3.2.0? 


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org