[GitHub] [kafka] ItherNiT commented on pull request #6329: KAFKA-1194: Fix renaming open files on Windows

2021-03-15 Thread GitBox


ItherNiT commented on pull request #6329:
URL: https://github.com/apache/kafka/pull/6329#issuecomment-799172904


   Any update when this will be integrated into kafka?



This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[jira] [Commented] (KAFKA-7870) Error sending fetch request (sessionId=1578860481, epoch=INITIAL) to node 2: java.io.IOException: Connection to 2 was disconnected before the response was read.

2021-03-15 Thread ShiminHuang (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-7870?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17301470#comment-17301470
 ] 

ShiminHuang commented on KAFKA-7870:


We actually found this in version 2.4.1 as well

> Error sending fetch request (sessionId=1578860481, epoch=INITIAL) to node 2: 
> java.io.IOException: Connection to 2 was disconnected before the response was 
> read.
> 
>
> Key: KAFKA-7870
> URL: https://issues.apache.org/jira/browse/KAFKA-7870
> Project: Kafka
>  Issue Type: Bug
>  Components: clients
>Affects Versions: 2.1.0
>Reporter: Chakhsu Lau
>Priority: Blocker
>
> We build a kafka cluster with 5 brokers. But one of brokers suddenly stopped 
> running during the run. And it happened twice in the same broker. Here is the 
> log and is this a bug in kafka ?
> {code:java}
> [2019-01-25 12:57:14,686] INFO [ReplicaFetcher replicaId=3, leaderId=2, 
> fetcherId=0] Error sending fetch request (sessionId=1578860481, 
> epoch=INITIAL) to node 2: java.io.IOException: Connection to 2 was 
> disconnected before the response was read. 
> (org.apache.kafka.clients.FetchSessionHandler)
> [2019-01-25 12:57:14,687] WARN [ReplicaFetcher replicaId=3, leaderId=2, 
> fetcherId=0] Error in response for fetch request (type=FetchRequest, 
> replicaId=3, maxWait=500, minBytes=1, maxBytes=10485760, 
> fetchData={api-result-bi-heatmap-8=(offset=0, logStartOffset=0, 
> maxBytes=1048576, currentLeaderEpoch=Optional[4]), 
> api-result-bi-heatmap-save-12=(offset=0, logStartOffset=0, maxBytes=1048576, 
> currentLeaderEpoch=Optional[4]), api-result-bi-heatmap-task-2=(offset=2, 
> logStartOffset=0, maxBytes=1048576, currentLeaderEpoch=Optional[4]), 
> api-result-bi-flow-39=(offset=1883206, logStartOffset=0, maxBytes=1048576, 
> currentLeaderEpoch=Optional[4]), __consumer_offsets-47=(offset=349437, 
> logStartOffset=0, maxBytes=1048576, currentLeaderEpoch=Optional[4]), 
> api-result-bi-heatmap-track-6=(offset=1039889, logStartOffset=0, 
> maxBytes=1048576, currentLeaderEpoch=Optional[4]), 
> api-result-bi-heatmap-task-17=(offset=0, logStartOffset=0, maxBytes=1048576, 
> currentLeaderEpoch=Optional[4]), __consumer_offsets-2=(offset=0, 
> logStartOffset=0, maxBytes=1048576, currentLeaderEpoch=Optional[4]), 
> api-result-bi-heatmap-aggs-19=(offset=1255056, logStartOffset=0, 
> maxBytes=1048576, currentLeaderEpoch=Optional[4])}, 
> isolationLevel=READ_UNCOMMITTED, toForget=, metadata=(sessionId=1578860481, 
> epoch=INITIAL)) (kafka.server.ReplicaFetcherThread)
> java.io.IOException: Connection to 2 was disconnected before the response was 
> read
> at 
> org.apache.kafka.clients.NetworkClientUtils.sendAndReceive(NetworkClientUtils.java:97)
> at 
> kafka.server.ReplicaFetcherBlockingSend.sendRequest(ReplicaFetcherBlockingSend.scala:97)
> at 
> kafka.server.ReplicaFetcherThread.fetchFromLeader(ReplicaFetcherThread.scala:190)
> at 
> kafka.server.AbstractFetcherThread.kafka$server$AbstractFetcherThread$$processFetchRequest(AbstractFetcherThread.scala:241)
> at 
> kafka.server.AbstractFetcherThread$$anonfun$maybeFetch$1.apply(AbstractFetcherThread.scala:130)
> at 
> kafka.server.AbstractFetcherThread$$anonfun$maybeFetch$1.apply(AbstractFetcherThread.scala:129)
> at scala.Option.foreach(Option.scala:257)
> at 
> kafka.server.AbstractFetcherThread.maybeFetch(AbstractFetcherThread.scala:129)
> at kafka.server.AbstractFetcherThread.doWork(AbstractFetcherThread.scala:111)
> at kafka.utils.ShutdownableThread.run(ShutdownableThread.scala:82)
> {code}



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Created] (KAFKA-12465) Decide whether inconsistent cluster id error are fatal

2021-03-15 Thread dengziming (Jira)
dengziming created KAFKA-12465:
--

 Summary: Decide whether inconsistent cluster id error are fatal
 Key: KAFKA-12465
 URL: https://issues.apache.org/jira/browse/KAFKA-12465
 Project: Kafka
  Issue Type: Sub-task
Reporter: dengziming


Currently, we just log an error when an inconsistent cluster-id occurred. We 
should set a window during startup when these errors are fatal but after that 
window, we no longer treat them to be fatal.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[GitHub] [kafka] dengziming commented on a change in pull request #10289: KAFKA-12440: ClusterId validation for Vote, BeginQuorum, EndQuorum and FetchSnapshot

2021-03-15 Thread GitBox


dengziming commented on a change in pull request #10289:
URL: https://github.com/apache/kafka/pull/10289#discussion_r594127628



##
File path: raft/src/main/java/org/apache/kafka/raft/KafkaRaftClient.java
##
@@ -939,12 +951,27 @@ private FetchResponseData buildEmptyFetchResponse(
 );
 }
 
-private boolean hasValidClusterId(FetchRequestData request) {
+private boolean hasValidClusterId(ApiMessage request) {

Review comment:
   It's a bit difficult to figure out how to add the window, we could not 
simply rely on a fixed configuration, I add a ticket to track this problem: 
https://issues.apache.org/jira/browse/KAFKA-12465.





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[jira] [Updated] (KAFKA-12465) Decide whether inconsistent cluster id error are fatal

2021-03-15 Thread dengziming (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-12465?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

dengziming updated KAFKA-12465:
---
Description: Currently, we just log an error when an inconsistent 
cluster-id occurred. We should set a window during startup when these errors 
are fatal but after that window, we no longer treat them to be fatal. see 
https://github.com/apache/kafka/pull/10289#discussion_r592853088  (was: 
Currently, we just log an error when an inconsistent cluster-id occurred. We 
should set a window during startup when these errors are fatal but after that 
window, we no longer treat them to be fatal.)

> Decide whether inconsistent cluster id error are fatal
> --
>
> Key: KAFKA-12465
> URL: https://issues.apache.org/jira/browse/KAFKA-12465
> Project: Kafka
>  Issue Type: Sub-task
>Reporter: dengziming
>Priority: Major
>
> Currently, we just log an error when an inconsistent cluster-id occurred. We 
> should set a window during startup when these errors are fatal but after that 
> window, we no longer treat them to be fatal. see 
> https://github.com/apache/kafka/pull/10289#discussion_r592853088



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[GitHub] [kafka] dajac commented on a change in pull request #10304: KAFKA-12454:Add ERROR logging on kafka-log-dirs when given brokerIds do not exist in current kafka cluster

2021-03-15 Thread GitBox


dajac commented on a change in pull request #10304:
URL: https://github.com/apache/kafka/pull/10304#discussion_r594184162



##
File path: core/src/main/scala/kafka/admin/LogDirsCommand.scala
##
@@ -39,19 +39,31 @@ object LogDirsCommand {
 def describe(args: Array[String], out: PrintStream): Unit = {
 val opts = new LogDirsCommandOptions(args)
 val adminClient = createAdminClient(opts)
-val topicList = 
opts.options.valueOf(opts.topicListOpt).split(",").filter(!_.isEmpty)
-val brokerList = Option(opts.options.valueOf(opts.brokerListOpt)) 
match {
-case Some(brokerListStr) => 
brokerListStr.split(',').filter(!_.isEmpty).map(_.toInt)
-case None => 
adminClient.describeCluster().nodes().get().asScala.map(_.id()).toArray
-}
+val topicList = 
opts.options.valueOf(opts.topicListOpt).split(",").filter(_.nonEmpty)
+var nonExistBrokers: Set[Int] = Set.empty
+try {
+val clusterBrokers: Set[Int] = 
adminClient.describeCluster().nodes().get().asScala.map(_.id()).toSet

Review comment:
   nit: We can remove specifying `Set[Int]`.

##
File path: core/src/main/scala/kafka/admin/LogDirsCommand.scala
##
@@ -39,19 +39,31 @@ object LogDirsCommand {
 def describe(args: Array[String], out: PrintStream): Unit = {
 val opts = new LogDirsCommandOptions(args)
 val adminClient = createAdminClient(opts)
-val topicList = 
opts.options.valueOf(opts.topicListOpt).split(",").filter(!_.isEmpty)
-val brokerList = Option(opts.options.valueOf(opts.brokerListOpt)) 
match {
-case Some(brokerListStr) => 
brokerListStr.split(',').filter(!_.isEmpty).map(_.toInt)
-case None => 
adminClient.describeCluster().nodes().get().asScala.map(_.id()).toArray
-}
+val topicList = 
opts.options.valueOf(opts.topicListOpt).split(",").filter(_.nonEmpty)
+var nonExistBrokers: Set[Int] = Set.empty
+try {
+val clusterBrokers: Set[Int] = 
adminClient.describeCluster().nodes().get().asScala.map(_.id()).toSet
+val brokerList = Option(opts.options.valueOf(opts.brokerListOpt)) 
match {
+case Some(brokerListStr) =>
+val inputBrokers: Set[Int] = 
brokerListStr.split(',').filter(_.nonEmpty).map(_.toInt).toSet

Review comment:
   ditto.

##
File path: core/src/main/scala/kafka/admin/LogDirsCommand.scala
##
@@ -39,19 +39,31 @@ object LogDirsCommand {
 def describe(args: Array[String], out: PrintStream): Unit = {
 val opts = new LogDirsCommandOptions(args)
 val adminClient = createAdminClient(opts)
-val topicList = 
opts.options.valueOf(opts.topicListOpt).split(",").filter(!_.isEmpty)
-val brokerList = Option(opts.options.valueOf(opts.brokerListOpt)) 
match {
-case Some(brokerListStr) => 
brokerListStr.split(',').filter(!_.isEmpty).map(_.toInt)
-case None => 
adminClient.describeCluster().nodes().get().asScala.map(_.id()).toArray
-}
+val topicList = 
opts.options.valueOf(opts.topicListOpt).split(",").filter(_.nonEmpty)
+var nonExistBrokers: Set[Int] = Set.empty
+try {
+val clusterBrokers: Set[Int] = 
adminClient.describeCluster().nodes().get().asScala.map(_.id()).toSet
+val brokerList = Option(opts.options.valueOf(opts.brokerListOpt)) 
match {
+case Some(brokerListStr) =>
+val inputBrokers: Set[Int] = 
brokerListStr.split(',').filter(_.nonEmpty).map(_.toInt).toSet
+nonExistBrokers = inputBrokers.diff(clusterBrokers)
+inputBrokers
+case None => clusterBrokers
+}

Review comment:
   nit: We usually avoid using mutable variable unless it is really 
necessary. In this case, I would rather return the `nonExistingBrokers` when 
the argument is processed. Something like this:
   
   ```
   val (existingBrokers, nonExistingBrokers) = 
Option(opts.options.valueOf(opts.brokerListOpt)) match {
   case Some(brokerListStr) =>
   val inputBrokers: Set[Int] = 
brokerListStr.split(',').filter(_.nonEmpty).map(_.toInt).toSet
  (inputBrokers, inputBrokers.diff(clusterBrokers)
   case None => (clusterBrokers, Set.empty)
   }
   ```

##
File path: core/src/main/scala/kafka/admin/LogDirsCommand.scala
##
@@ -39,19 +39,31 @@ object LogDirsCommand {
 def describe(args: Array[String], out: PrintStream): Unit = {
 val opts = new LogDirsCommandOptions(args)
 val adminClient = createAdminClient(opts)
-val topicList = 
opts.options.valueOf(opts.topicListOpt).split(",").filter(!_.isEmpty)
-val brokerList = Option(opts.options.valueOf(opts.brokerListOpt)) 
match {
-case Some(brokerListStr) => 
brokerListStr.split(',').filter(!_.isEmpty).map(_.toInt)
-case None => 
adminClient.descr

[GitHub] [kafka] ikdekker edited a comment on pull request #10313: KAFKA-12456: Log "Listeners are not identical across brokers" message at WARN/INFO instead of ERROR

2021-03-15 Thread GitBox


ikdekker edited a comment on pull request #10313:
URL: https://github.com/apache/kafka/pull/10313#issuecomment-798355603


   Hello Kafka Committers,
   this contribution is part of a university course. We would appreciate any 
kind of feedback. This initial PR is purposefully a very simple one for us to 
get familiar with the process of PRs in this repo.
   
   We attempted running tests (by executing `./gradlew tests`), but reports 
were not generated. The test output did indicate a report was generated into 
the builds directory. Is this a known issue, or expected behaviour? We followed 
the Readme as stated in the contributing page 
(https://github.com/apache/kafka/blob/trunk/README.md).
   
   On the [PR guideline 
page](https://cwiki.apache.org/confluence/display/KAFKA/Contributing+Code+Changes),
 it says we should update the status of 
https://issues.apache.org/jira/browse/KAFKA-12456 to submit a patch. It seems 
we do not have the rights to do this, is this right?
   
   Thanks for your time!



This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] mimaison merged pull request #10308: MINOR: Update year in NOTICE

2021-03-15 Thread GitBox


mimaison merged pull request #10308:
URL: https://github.com/apache/kafka/pull/10308


   



This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] cadonna opened a new pull request #10317: KAFKA-10357: Add setup method to internal topics

2021-03-15 Thread GitBox


cadonna opened a new pull request #10317:
URL: https://github.com/apache/kafka/pull/10317


   For KIP-698, we need a way to setup internal topics without
   validating them. This PR adds a setup method to the
   InternalTopicManager for that purpose.
   
   
   ### Committer Checklist (excluded from commit message)
   - [ ] Verify design and implementation 
   - [ ] Verify test coverage and CI build status
   - [ ] Verify documentation (including upgrade notes)
   



This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] cadonna commented on a change in pull request #10317: KAFKA-10357: Add setup method to internal topics

2021-03-15 Thread GitBox


cadonna commented on a change in pull request #10317:
URL: https://github.com/apache/kafka/pull/10317#discussion_r594287719



##
File path: 
streams/src/test/java/org/apache/kafka/streams/processor/internals/InternalTopicManagerTest.java
##
@@ -653,56 +798,22 @@ public void 
shouldReportMisconfigurationsOfCleanupPolicyForWindowedChangelogTopi
 @Test
 public void 
shouldReportMisconfigurationsOfCleanupPolicyForRepartitionTopics() {
 final long retentionMs = 1000;
-mockAdminClient.addTopic(

Review comment:
   Just refactorings from here to the end.





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] cadonna commented on pull request #10317: KAFKA-10357: Add setup method to internal topics

2021-03-15 Thread GitBox


cadonna commented on pull request #10317:
URL: https://github.com/apache/kafka/pull/10317#issuecomment-799376130


   Call for review: @rodesai



This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[jira] [Updated] (KAFKA-12464) Enhance constrained sticky Assign algorithm

2021-03-15 Thread Luke Chen (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-12464?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Luke Chen updated KAFKA-12464:
--
Description: 
In KAFKA-9987, we did a great improvement for the case when all consumers were 
subscribed to same set of topics. The algorithm contains 4 phases:
 # Reassign as many previously owned partitions as possible, up to the maxQuota
 # Fill remaining members up to minQuota
 # If we ran out of unassigned partitions before filling all consumers, we need 
to start stealing partitions from the over-full consumers at max capacity
 # Otherwise we may have run out of unfilled consumers before assigning all 
partitions, in which case we should just distribute one partition each to all 
consumers at min capacity

 

Take an example for better understanding:

*example:*

Current status: 2 consumers (C0, C1), and 10 topic partitions: t1p0, t1p1, ... 
t1p9

Suppose, current assignment is:

_C0: t1p0, t1p1, t1p2, t1p3, t1p4_
 _C1: t1p5, t1p6, t1p7, t1p8, t1p9_

Now, new consumer added: C2, so we'll do:
 # Reassign as many previously owned partitions as possible, up to the maxQuota
 After this phase, the assignment will be: (maxQuota will be 4)

_C0: t1p0, t1p1, t1p2, t1p3_
 _C1: t1p5, t1p6, t1p7, t1p8_
 # Fill remaining members up to minQuota
 After this phase, the assignment will be:

_C0: t1p0, t1p1, t1p2, t1p3_
 _C1: t1p5, t1p6, t1p7, t1p8_
 _C2: t1p4, t1p9_
 # If we ran out of unassigned partitions before filling all consumers, we need 
to start stealing partitions from the over-full consumers at max capacity
 After this phase, the assignment will be:

_C0: t1p0, t1p1, t1p2_ 
 _C1: t1p5, t1p6, t1p7, t1p8_
 _C2: t1p4, t1p9,_ _t1p3_
 # Otherwise we may have run out of unfilled consumers before assigning all 
partitions, in which case we should just distribute one partition each to all 
consumers at min capacity

 

 

As we can see, we need 3 phases to complete the assignment. But we can actually 
completed with 2 phases. Here's the updated algorithm:
 # Reassign as many previously owned partitions as possible, up to the 
maxQuota, and also considering the numMaxQuota by the remainder of (Partitions 
/ Consumers)
 # Fill remaining members up to maxQuota if current maxQuotaMember < 
numMaxQuota, otherwise, to minQuota

 

By considering the numMaxQuota, the original step 1 won't be too aggressive to 
assign too many partitions to consumers, and the step 2 won't be too 
conservative to assign not enough partitions to consumers, so that we don't 
need step 3 and step 4 to balance them.

 

{{So, the updated Pseudo-code sketch of the algorithm:}}

C_f := (P/N)_floor, the floor capacity 
 C_c := (P/N)_ceil, the ceiling capacity

*C_r := (P%N) the allowed number of members with C_c partitions assigned*
 *num_max_capacity_members := current number of members with C_c partitions 
assigned (default to 0)*

members := the sorted set of all consumers
 partitions := the set of all partitions
 unassigned_partitions := the set of partitions not yet assigned, initialized 
to be all partitions
 unfilled_members := the set of consumers not yet at capacity, initialized to 
empty
 -max_capacity_members := the set of members with exactly C_c partitions 
assigned, initialized to empty-
 member.owned_partitions := the set of previously owned partitions encoded in 
the Subscription

// Reassign as many previously owned partitions as possible, *by considering 
the num_max_capacity_members*
 for member : members
  remove any partitions that are no longer in the subscription from its 
owned partitions
  remove all owned_partitions if the generation is old
  if member.owned_partitions.size < C_f
  assign all owned partitions to member and remove from 
unassigned_partitions
  add member to unfilled_members
  -else if member.owned_partitions.size == C_f-
  -assign first C_f owned_partitions to member and remove from 
unassigned_partitions-
  else if member.owned_partitions.size >= C_c *&& num_max_capacity_members 
< C_r*
  *assign first C_c owned_partitions to member and remove from 
unassigned_partitions*

 *num_max_capacity_members++*
  a-dd member to max_capacity_members-

 *else*
   *assign first C_f owned_partitions to member and remove from 
unassigned_partitions*

sort unassigned_partitions in partition order, ie t0_p0, t1_p0, t2_p0, t0_p1, 
t1_p0  (for data parallelism)
 sort unfilled_members by memberId (for determinism)

// Fill remaining members *up to the C_r numbers of C_c, otherwise, to C_f*
 for member : unfilled_members
  compute the remaining capacity as -C = C_f - num_assigned_partitions-

 if num_max_capacity_members < C_r:

    C = C_c - num_assigned_partitions

    num_max_capacity_members++
  else

   C = C_f - num_assigned_partitions
 pop the first C partitions from unassig

[jira] [Commented] (KAFKA-10582) Mirror Maker 2 not replicating new topics until restart

2021-03-15 Thread Ravishankar S R (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-10582?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17301648#comment-17301648
 ] 

Ravishankar S R commented on KAFKA-10582:
-

Hi,

I'm facing same problem. I'm using kafka 2.6.1. In my case, kafka restart on 
one instance works if restart done within few minutes of new topic creation. 
However, if it takes long time to restart then restarting on one instance 
doesn't help and both instances needs to be restarted in parallel (stop both 
instance and start both instance).

Please let me know if there is any workaround to overcome this problem.

Best Regards,

Ravi

> Mirror Maker 2 not replicating new topics until restart
> ---
>
> Key: KAFKA-10582
> URL: https://issues.apache.org/jira/browse/KAFKA-10582
> Project: Kafka
>  Issue Type: Bug
>  Components: mirrormaker
>Affects Versions: 2.5.1
> Environment: RHEL 7 Linux.
>Reporter: Robert Martin
>Priority: Minor
>
> We are using Mirror Maker 2 from the 2.5.1 release for replication on some 
> clusters.  Replication is working as expected for existing topics.  When we 
> create a new topic, however, Mirror Maker 2 creates the replicated topic as 
> expected but never starts replicating it.  If we restart Mirror Maker 2 
> within 2-3 minutes the topic starts replicating as expected.  From 
> documentation we haveve seen it appears this should start replicating without 
> a restart based on the settings we have.
> *Example:*
> Create topic "mytesttopic" on source cluster
> MirrorMaker 2 creates "source.mytesttopioc" on target cluster with no issue
> MirrorMaker 2 does not replicate "mytesttopic" -> "source.mytesttopic"
> Restart MirrorMaker 2 and now replication works for "mytesttopic" -> 
> "source.mytesttopic"
> *Example config:*
> name = source->target
> group.id = source-to-target
> clusters = source, target
> source.bootstrap.servers = sourcehosts:9092
> target.bootstrap.servers = targethosts:9092
> source->target.enabled = true
> source->target.topics = .*
> target->source = false
> target->source.topics = .*
> replication.factor=3
> checkpoints.topic.replication.factor=3
> heartbeats.topic.replication.factor=3
> offset-syncs.topic.replication.factor=3
> offset.storage.replication.factor=3
> status.storage.replication.factor=3
> config.storage.replication.factor=3
> tasks.max = 16
> refresh.topics.enabled = true
> sync.topic.configs.enabled = true
> refresh.topics.interval.seconds = 300
> refresh.groups.interval.seconds = 300
> readahead.queue.capacity = 100
> emit.checkpoints.enabled = true
> emit.checkpoints.interval.seconds = 5



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[GitHub] [kafka] dajac opened a new pull request #10318: KAFKA-12330; FetchSessionCache may cause starvation for partitions when FetchResponse is full

2021-03-15 Thread GitBox


dajac opened a new pull request #10318:
URL: https://github.com/apache/kafka/pull/10318


   The incremental FetchSessionCache sessions deprioritizes partitions where a 
response is returned. This may happen if log metadata such as log start offset, 
hwm, etc is returned, or if data for that partition is returned.
   
   When a fetch response fills to maxBytes, data may not be returned for 
partitions even if the fetch offset is lower than the fetch upper bound. 
However, the fetch response will still contain updates to metadata such as hwm 
if that metadata has changed. This can lead to degenerate behavior where a 
partition's hwm or log start offset is updated resulting in the next fetch 
being unnecessarily skipped for that partition. At first this appeared to be 
worse, as hwm updates occur frequently, but starvation should result in hwm 
movement becoming blocked, allowing a fetch to go through and then becoming 
unstuck. However, it'll still require one more fetch request than necessary to 
do so. Consumers may be affected more than replica fetchers, however they often 
remove partitions with fetched data from the next fetch request and this may be 
helping prevent starvation.
   
   ### Committer Checklist (excluded from commit message)
   - [ ] Verify design and implementation 
   - [ ] Verify test coverage and CI build status
   - [ ] Verify documentation (including upgrade notes)
   



This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] dajac opened a new pull request #10319: MINOR; Various code cleanups

2021-03-15 Thread GitBox


dajac opened a new pull request #10319:
URL: https://github.com/apache/kafka/pull/10319


   
   ### Committer Checklist (excluded from commit message)
   - [ ] Verify design and implementation 
   - [ ] Verify test coverage and CI build status
   - [ ] Verify documentation (including upgrade notes)
   



This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] wenbingshen commented on pull request #10304: KAFKA-12454:Add ERROR logging on kafka-log-dirs when given brokerIds do not exist in current kafka cluster

2021-03-15 Thread GitBox


wenbingshen commented on pull request #10304:
URL: https://github.com/apache/kafka/pull/10304#issuecomment-799498049


   > @wenbingshen Thanks for the updates. I have left few more minor comments. 
Also, it seems that the build failed. Could you check it?
   
   Thank you for your review and suggestions. I have submitted the latest code, 
and the code has been tested and compiled successfully. Please help review it 
again, thank you.



This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[jira] [Comment Edited] (KAFKA-12463) Update default consumer partition assignor for sink tasks

2021-03-15 Thread Chris Egerton (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-12463?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17301327#comment-17301327
 ] 

Chris Egerton edited comment on KAFKA-12463 at 3/15/21, 3:26 PM:
-

Ah, thanks [~ableegoldman], I'd misread the javadocs for the cooperative sticky 
assignor. Updated the description to point to 2.4 instead of 2.3.

 

RE clearness on the upgrade section in KIP-429–I didn't see a specific section 
for Connect, and both of the sections that were there ("Consumer" and 
"Streams") provided a different procedure than the one I proposed here. It 
seems like an implicit goal of both of them is to arrive at an end state where 
all consumers only provide the cooperative assignor in their list of supported 
assignors, instead of the cooperative assignor first and with the other, older 
assignor behind it. I'm wondering if the lack of that goal is why this 
different approach (which only requires one-step rolling as opposed to two) is 
viable here but not necessarily for other applications?


was (Author: chrisegerton):
Ah, thanks [~ableegoldman], I'd misread the javadocs for the cooperative sticky 
assignor. Updated the description to point to 2.4 instead of 2.3.

 

RE clearness on the upgrade section in KIP-429–I didn't see a specific section 
for Connect, and both of the sections that were provided ("Consumer" and 
"Streams") provided a different procedure than the one I proposed here. It 
seems like an implicit goal of both of them is to arrive at an end state where 
all consumers only provide the cooperative assignor in their list of supported 
assignors, instead of the cooperative assignor first and with the other, older 
assignor behind it. I'm wondering if the lack of that goal is why this 
different approach (which only requires one-step rolling as opposed to two) is 
viable here but not necessarily for other applications?

> Update default consumer partition assignor for sink tasks
> -
>
> Key: KAFKA-12463
> URL: https://issues.apache.org/jira/browse/KAFKA-12463
> Project: Kafka
>  Issue Type: Improvement
>  Components: KafkaConnect
>Reporter: Chris Egerton
>Assignee: Chris Egerton
>Priority: Major
>
> Kafka consumers have a pluggable [partition assignment 
> interface|https://kafka.apache.org/27/javadoc/org/apache/kafka/clients/consumer/ConsumerPartitionAssignor.html]
>  that comes with several out-of-the-box implementations including the 
> [RangeAssignor|https://kafka.apache.org/27/javadoc/org/apache/kafka/clients/consumer/RangeAssignor.html],
>  
> [RoundRobinAssignor|https://kafka.apache.org/27/javadoc/org/apache/kafka/clients/consumer/RoundRobinAssignor.html],
>  
> [StickyAssignor|https://kafka.apache.org/27/javadoc/org/apache/kafka/clients/consumer/StickyAssignor.html],
>  and 
> [CooperativeStickyAssignor|https://kafka.apache.org/27/javadoc/org/apache/kafka/clients/consumer/CooperativeStickyAssignor.html].
> If no partition assignor is configured with a consumer, the {{RangeAssignor}} 
> is used by default. Although there are some benefits to this assignor 
> including stability of assignment across generations and simplicity of 
> design, it comes with a major drawback: the number of active consumers in a 
> group is limited to the number of partitions in the topic(s) with the most 
> partitions. For an example of the worst case, in a consumer group where every 
> member is subscribed to ten topics that each have one partition, only one 
> member of that group will be assigned any topic partitions.
> This can end up producing counterintuitive and even frustrating behavior when 
> a sink connector is brought up with N tasks to read from some collection of 
> topics with a total of N topic partitions, but some tasks end up idling and 
> not processing any data.
>  
> [KIP-429|https://cwiki.apache.org/confluence/display/KAFKA/KIP-429%3A+Kafka+Consumer+Incremental+Rebalance+Protocol]
>  introduced the {{CooperativeStickyAssignor}}, which seeks to provide a 
> stable assignment across generations wherever possible, provide the most even 
> assignment possible (taking into account possible differences in 
> subscriptions across consumers in the group), and allow consumers to continue 
> processing data during rebalance. The documentation for the assignor states 
> that "Users should prefer this assignor for newer clusters."
> We should alter the default consumer configuration for sink tasks to use the 
> new {{CooperativeStickyAssignor}}. In order to do this in a 
> backwards-compatible fashion that also enables rolling upgrades, this should 
> be implemented by setting the {{partition.assignment.strategy}} property of 
> sink task consumers to the list 
> {{org.apache.kafka.clients.consumer.CooperativeStickyAssigno

[jira] [Commented] (KAFKA-12463) Update default consumer partition assignor for sink tasks

2021-03-15 Thread Chris Egerton (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-12463?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17301706#comment-17301706
 ] 

Chris Egerton commented on KAFKA-12463:
---

cc [~rhauch] what do you think about this?

> Update default consumer partition assignor for sink tasks
> -
>
> Key: KAFKA-12463
> URL: https://issues.apache.org/jira/browse/KAFKA-12463
> Project: Kafka
>  Issue Type: Improvement
>  Components: KafkaConnect
>Reporter: Chris Egerton
>Assignee: Chris Egerton
>Priority: Major
>
> Kafka consumers have a pluggable [partition assignment 
> interface|https://kafka.apache.org/27/javadoc/org/apache/kafka/clients/consumer/ConsumerPartitionAssignor.html]
>  that comes with several out-of-the-box implementations including the 
> [RangeAssignor|https://kafka.apache.org/27/javadoc/org/apache/kafka/clients/consumer/RangeAssignor.html],
>  
> [RoundRobinAssignor|https://kafka.apache.org/27/javadoc/org/apache/kafka/clients/consumer/RoundRobinAssignor.html],
>  
> [StickyAssignor|https://kafka.apache.org/27/javadoc/org/apache/kafka/clients/consumer/StickyAssignor.html],
>  and 
> [CooperativeStickyAssignor|https://kafka.apache.org/27/javadoc/org/apache/kafka/clients/consumer/CooperativeStickyAssignor.html].
> If no partition assignor is configured with a consumer, the {{RangeAssignor}} 
> is used by default. Although there are some benefits to this assignor 
> including stability of assignment across generations and simplicity of 
> design, it comes with a major drawback: the number of active consumers in a 
> group is limited to the number of partitions in the topic(s) with the most 
> partitions. For an example of the worst case, in a consumer group where every 
> member is subscribed to ten topics that each have one partition, only one 
> member of that group will be assigned any topic partitions.
> This can end up producing counterintuitive and even frustrating behavior when 
> a sink connector is brought up with N tasks to read from some collection of 
> topics with a total of N topic partitions, but some tasks end up idling and 
> not processing any data.
>  
> [KIP-429|https://cwiki.apache.org/confluence/display/KAFKA/KIP-429%3A+Kafka+Consumer+Incremental+Rebalance+Protocol]
>  introduced the {{CooperativeStickyAssignor}}, which seeks to provide a 
> stable assignment across generations wherever possible, provide the most even 
> assignment possible (taking into account possible differences in 
> subscriptions across consumers in the group), and allow consumers to continue 
> processing data during rebalance. The documentation for the assignor states 
> that "Users should prefer this assignor for newer clusters."
> We should alter the default consumer configuration for sink tasks to use the 
> new {{CooperativeStickyAssignor}}. In order to do this in a 
> backwards-compatible fashion that also enables rolling upgrades, this should 
> be implemented by setting the {{partition.assignment.strategy}} property of 
> sink task consumers to the list 
> {{org.apache.kafka.clients.consumer.CooperativeStickyAssignor, 
> org.apache.kafka.clients.consumer.RangeAssignor}} when no worker-level or 
> connector-level override is present.
> This way, consumer groups for sink connectors on Connect clusters in the 
> process of being upgraded will continue to use the {{RangeAssignor}} until 
> all workers in the cluster have been upgraded, and then will switch over to 
> the new {{CooperativeStickyAssignor}} automatically.
>  
> This improvement is viable as far back as -2.3- 2.4, when the 
> {{CooperativeStickyAssignor}} was introduced, but given that it is not a bug 
> fix, should only be applied to the Connect framework in an upcoming minor 
> release. This does not preclude users from following the steps outlined here 
> to improve sink connector behavior on existing clusters by modifying their 
> worker configs to use {{consumer.partition.assignment.strategy =}} 
> {{org.apache.kafka.clients.consumer.CooperativeStickyAssignor, 
> org.apache.kafka.clients.consumer.RangeAssignor}}, or doing the same on a 
> per-connector basis using the 
> {{consumer.override.partition.assignment.strategy}} property.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[GitHub] [kafka] abbccdda merged pull request #10135: KAFKA-10348: Share client channel between forwarding and auto creation manager

2021-03-15 Thread GitBox


abbccdda merged pull request #10135:
URL: https://github.com/apache/kafka/pull/10135


   



This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] abbccdda opened a new pull request #10320: MINOR: revert stream logging level back to ERROR

2021-03-15 Thread GitBox


abbccdda opened a new pull request #10320:
URL: https://github.com/apache/kafka/pull/10320


   An accidental change of logging level for streams from 
https://github.com/apache/kafka/pull/9579, correcting it.
   
   ### Committer Checklist (excluded from commit message)
   - [ ] Verify design and implementation 
   - [ ] Verify test coverage and CI build status
   - [ ] Verify documentation (including upgrade notes)
   



This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] dajac commented on a change in pull request #10304: KAFKA-12454:Add ERROR logging on kafka-log-dirs when given brokerIds do not exist in current kafka cluster

2021-03-15 Thread GitBox


dajac commented on a change in pull request #10304:
URL: https://github.com/apache/kafka/pull/10304#discussion_r594476389



##
File path: core/src/test/scala/unit/kafka/admin/LogDirsCommandTest.scala
##
@@ -0,0 +1,52 @@
+package unit.kafka.admin

Review comment:
   We must add the licence header here. You can copy it from another file.

##
File path: core/src/main/scala/kafka/admin/LogDirsCommand.scala
##
@@ -39,19 +39,29 @@ object LogDirsCommand {
 def describe(args: Array[String], out: PrintStream): Unit = {
 val opts = new LogDirsCommandOptions(args)
 val adminClient = createAdminClient(opts)
-val topicList = 
opts.options.valueOf(opts.topicListOpt).split(",").filter(!_.isEmpty)
-val brokerList = Option(opts.options.valueOf(opts.brokerListOpt)) 
match {
-case Some(brokerListStr) => 
brokerListStr.split(',').filter(!_.isEmpty).map(_.toInt)
-case None => 
adminClient.describeCluster().nodes().get().asScala.map(_.id()).toArray
-}
+val topicList = 
opts.options.valueOf(opts.topicListOpt).split(",").filter(_.nonEmpty)
+try {
+val clusterBrokers = 
adminClient.describeCluster().nodes().get().asScala.map(_.id()).toSet
+val (existingBrokers, nonExistingBrokers) = 
Option(opts.options.valueOf(opts.brokerListOpt)) match {
+case Some(brokerListStr) =>
+val inputBrokers = 
brokerListStr.split(',').filter(_.nonEmpty).map(_.toInt).toSet
+(inputBrokers, inputBrokers.diff(clusterBrokers))
+case None => (clusterBrokers, Set.empty)
+}
 
-out.println("Querying brokers for log directories information")
-val describeLogDirsResult: DescribeLogDirsResult = 
adminClient.describeLogDirs(brokerList.map(Integer.valueOf).toSeq.asJava)
-val logDirInfosByBroker = 
describeLogDirsResult.allDescriptions.get().asScala.map { case (k, v) => k -> 
v.asScala }
+if (nonExistingBrokers.nonEmpty) {
+out.println(s"ERROR: The given node(s) does not exist from 
broker-list: ${nonExistingBrokers.mkString(",")}. Current cluster exist 
node(s): ${clusterBrokers.mkString(",")}")

Review comment:
   nit: Should we say `--broker-list` instead of `broker-list`? Also, 
should we say `broker(s)` instead of `node(s)` to be consistent with the 
message below?

##
File path: core/src/main/scala/kafka/admin/LogDirsCommand.scala
##
@@ -39,19 +39,29 @@ object LogDirsCommand {
 def describe(args: Array[String], out: PrintStream): Unit = {
 val opts = new LogDirsCommandOptions(args)
 val adminClient = createAdminClient(opts)
-val topicList = 
opts.options.valueOf(opts.topicListOpt).split(",").filter(!_.isEmpty)
-val brokerList = Option(opts.options.valueOf(opts.brokerListOpt)) 
match {
-case Some(brokerListStr) => 
brokerListStr.split(',').filter(!_.isEmpty).map(_.toInt)
-case None => 
adminClient.describeCluster().nodes().get().asScala.map(_.id()).toArray
-}
+val topicList = 
opts.options.valueOf(opts.topicListOpt).split(",").filter(_.nonEmpty)
+try {
+val clusterBrokers = 
adminClient.describeCluster().nodes().get().asScala.map(_.id()).toSet
+val (existingBrokers, nonExistingBrokers) = 
Option(opts.options.valueOf(opts.brokerListOpt)) match {
+case Some(brokerListStr) =>
+val inputBrokers = 
brokerListStr.split(',').filter(_.nonEmpty).map(_.toInt).toSet
+(inputBrokers, inputBrokers.diff(clusterBrokers))
+case None => (clusterBrokers, Set.empty)
+}
 
-out.println("Querying brokers for log directories information")
-val describeLogDirsResult: DescribeLogDirsResult = 
adminClient.describeLogDirs(brokerList.map(Integer.valueOf).toSeq.asJava)
-val logDirInfosByBroker = 
describeLogDirsResult.allDescriptions.get().asScala.map { case (k, v) => k -> 
v.asScala }
+if (nonExistingBrokers.nonEmpty) {
+out.println(s"ERROR: The given node(s) does not exist from 
broker-list: ${nonExistingBrokers.mkString(",")}. Current cluster exist 
node(s): ${clusterBrokers.mkString(",")}")
+} else {
+out.println("Querying brokers for log directories information")
+val describeLogDirsResult: DescribeLogDirsResult = 
adminClient.describeLogDirs(existingBrokers.map(Integer.valueOf).toSeq.asJava)

Review comment:
   nit: `DescribeLogDirsResult` can be removed.





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] wenbingshen commented on a change in pull request #10304: KAFKA-12454:Add ERROR logging on kafka-log-dirs when given brokerIds do not exist in current kafka cluster

2021-03-15 Thread GitBox


wenbingshen commented on a change in pull request #10304:
URL: https://github.com/apache/kafka/pull/10304#discussion_r594482210



##
File path: core/src/test/scala/unit/kafka/admin/LogDirsCommandTest.scala
##
@@ -0,0 +1,52 @@
+package unit.kafka.admin

Review comment:
   Sorry, after checking the compilation report, I have realized this 
problem and I have made changes.





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] wenbingshen commented on a change in pull request #10304: KAFKA-12454:Add ERROR logging on kafka-log-dirs when given brokerIds do not exist in current kafka cluster

2021-03-15 Thread GitBox


wenbingshen commented on a change in pull request #10304:
URL: https://github.com/apache/kafka/pull/10304#discussion_r594483345



##
File path: core/src/main/scala/kafka/admin/LogDirsCommand.scala
##
@@ -39,19 +39,29 @@ object LogDirsCommand {
 def describe(args: Array[String], out: PrintStream): Unit = {
 val opts = new LogDirsCommandOptions(args)
 val adminClient = createAdminClient(opts)
-val topicList = 
opts.options.valueOf(opts.topicListOpt).split(",").filter(!_.isEmpty)
-val brokerList = Option(opts.options.valueOf(opts.brokerListOpt)) 
match {
-case Some(brokerListStr) => 
brokerListStr.split(',').filter(!_.isEmpty).map(_.toInt)
-case None => 
adminClient.describeCluster().nodes().get().asScala.map(_.id()).toArray
-}
+val topicList = 
opts.options.valueOf(opts.topicListOpt).split(",").filter(_.nonEmpty)
+try {
+val clusterBrokers = 
adminClient.describeCluster().nodes().get().asScala.map(_.id()).toSet
+val (existingBrokers, nonExistingBrokers) = 
Option(opts.options.valueOf(opts.brokerListOpt)) match {
+case Some(brokerListStr) =>
+val inputBrokers = 
brokerListStr.split(',').filter(_.nonEmpty).map(_.toInt).toSet
+(inputBrokers, inputBrokers.diff(clusterBrokers))
+case None => (clusterBrokers, Set.empty)
+}
 
-out.println("Querying brokers for log directories information")
-val describeLogDirsResult: DescribeLogDirsResult = 
adminClient.describeLogDirs(brokerList.map(Integer.valueOf).toSeq.asJava)
-val logDirInfosByBroker = 
describeLogDirsResult.allDescriptions.get().asScala.map { case (k, v) => k -> 
v.asScala }
+if (nonExistingBrokers.nonEmpty) {
+out.println(s"ERROR: The given node(s) does not exist from 
broker-list: ${nonExistingBrokers.mkString(",")}. Current cluster exist 
node(s): ${clusterBrokers.mkString(",")}")

Review comment:
   Good idea.I will act right away.





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] wenbingshen commented on a change in pull request #10304: KAFKA-12454:Add ERROR logging on kafka-log-dirs when given brokerIds do not exist in current kafka cluster

2021-03-15 Thread GitBox


wenbingshen commented on a change in pull request #10304:
URL: https://github.com/apache/kafka/pull/10304#discussion_r594484169



##
File path: core/src/main/scala/kafka/admin/LogDirsCommand.scala
##
@@ -39,19 +39,29 @@ object LogDirsCommand {
 def describe(args: Array[String], out: PrintStream): Unit = {
 val opts = new LogDirsCommandOptions(args)
 val adminClient = createAdminClient(opts)
-val topicList = 
opts.options.valueOf(opts.topicListOpt).split(",").filter(!_.isEmpty)
-val brokerList = Option(opts.options.valueOf(opts.brokerListOpt)) 
match {
-case Some(brokerListStr) => 
brokerListStr.split(',').filter(!_.isEmpty).map(_.toInt)
-case None => 
adminClient.describeCluster().nodes().get().asScala.map(_.id()).toArray
-}
+val topicList = 
opts.options.valueOf(opts.topicListOpt).split(",").filter(_.nonEmpty)
+try {
+val clusterBrokers = 
adminClient.describeCluster().nodes().get().asScala.map(_.id()).toSet
+val (existingBrokers, nonExistingBrokers) = 
Option(opts.options.valueOf(opts.brokerListOpt)) match {
+case Some(brokerListStr) =>
+val inputBrokers = 
brokerListStr.split(',').filter(_.nonEmpty).map(_.toInt).toSet
+(inputBrokers, inputBrokers.diff(clusterBrokers))
+case None => (clusterBrokers, Set.empty)
+}
 
-out.println("Querying brokers for log directories information")
-val describeLogDirsResult: DescribeLogDirsResult = 
adminClient.describeLogDirs(brokerList.map(Integer.valueOf).toSeq.asJava)
-val logDirInfosByBroker = 
describeLogDirsResult.allDescriptions.get().asScala.map { case (k, v) => k -> 
v.asScala }
+if (nonExistingBrokers.nonEmpty) {
+out.println(s"ERROR: The given node(s) does not exist from 
broker-list: ${nonExistingBrokers.mkString(",")}. Current cluster exist 
node(s): ${clusterBrokers.mkString(",")}")
+} else {
+out.println("Querying brokers for log directories information")
+val describeLogDirsResult: DescribeLogDirsResult = 
adminClient.describeLogDirs(existingBrokers.map(Integer.valueOf).toSeq.asJava)

Review comment:
   Sorry, I forgot this, I will change it right away





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] guozhangwang merged pull request #10232: KAFKA-12352: Make sure all rejoin group and reset state has a reason

2021-03-15 Thread GitBox


guozhangwang merged pull request #10232:
URL: https://github.com/apache/kafka/pull/10232


   



This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] jsancio commented on a change in pull request #10289: KAFKA-12440: ClusterId validation for Vote, BeginQuorum, EndQuorum and FetchSnapshot

2021-03-15 Thread GitBox


jsancio commented on a change in pull request #10289:
URL: https://github.com/apache/kafka/pull/10289#discussion_r594490953



##
File path: raft/src/main/java/org/apache/kafka/raft/KafkaRaftClient.java
##
@@ -939,12 +951,27 @@ private FetchResponseData buildEmptyFetchResponse(
 );
 }
 
-private boolean hasValidClusterId(FetchRequestData request) {
+private boolean hasValidClusterId(ApiMessage request) {

Review comment:
   We can implement that when handling a response, invalid cluster id are 
fatal unless a previous response contained a valid cluster id.





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] wenbingshen commented on pull request #10304: KAFKA-12454:Add ERROR logging on kafka-log-dirs when given brokerIds do not exist in current kafka cluster

2021-03-15 Thread GitBox


wenbingshen commented on pull request #10304:
URL: https://github.com/apache/kafka/pull/10304#issuecomment-799585108


   > @wenbingshen Thanks for the updates. Let few more minot comments.
   
   Thank you for your commonts.I submitted the latest code, please review it, 
thank you!



This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] jsancio commented on a change in pull request #10276: KAFKA-12253: Add tests that cover all of the cases for ReplicatedLog's validateOffsetAndEpoch

2021-03-15 Thread GitBox


jsancio commented on a change in pull request #10276:
URL: https://github.com/apache/kafka/pull/10276#discussion_r594509680



##
File path: raft/src/test/java/org/apache/kafka/raft/MockLogTest.java
##
@@ -732,13 +760,17 @@ public int hashCode() {
 }
 
 private void appendAsLeader(Collection records, int epoch) {
+appendAsLeader(records, epoch, log.endOffset().offset);
+}
+
+private void appendAsLeader(Collection records, int epoch, 
long initialOffset) {
 log.appendAsLeader(
-MemoryRecords.withRecords(
-log.endOffset().offset,
-CompressionType.NONE,
-records.toArray(new SimpleRecord[records.size()])
-),
-epoch
+MemoryRecords.withRecords(
+initialOffset,
+CompressionType.NONE,
+records.toArray(new SimpleRecord[records.size()])
+),
+epoch

Review comment:
   Indentation looks off. We indent 4 spaces.

##
File path: core/src/test/scala/kafka/raft/KafkaMetadataLogTest.scala
##
@@ -413,22 +413,53 @@ final class KafkaMetadataLogTest {
 assertTrue(log.deleteBeforeSnapshot(snapshotId))
 
 val resultOffsetAndEpoch = log.validateOffsetAndEpoch(offset, epoch)
-assertEquals(ValidOffsetAndEpoch.Type.VALID, 
resultOffsetAndEpoch.getType())
+assertEquals(ValidOffsetAndEpoch.Type.VALID, resultOffsetAndEpoch.getType)
 assertEquals(new OffsetAndEpoch(offset, epoch), 
resultOffsetAndEpoch.offsetAndEpoch())
   }
 
   @Test
-  def testValidateEpochUnknown(): Unit = {
+  def testValidateUnknownEpochLessThanLastKnownGreaterThanOldestSnapshot(): 
Unit = {
+val offset = 10
+val numOfRecords = 5
+
 val log = buildMetadataLog(tempDir, mockTime)
+log.updateHighWatermark(new LogOffsetMetadata(offset))
+val snapshotId = new OffsetAndEpoch(offset, 1)
+TestUtils.resource(log.createSnapshot(snapshotId)) { snapshot =>
+  snapshot.freeze()
+}
+log.truncateToLatestSnapshot()
 
-val numberOfRecords = 1
-val epoch = 1
 
-append(log, numberOfRecords, epoch)
+append(log, numOfRecords, epoch = 1, initialOffset = 10)
+append(log, numOfRecords, epoch = 2, initialOffset = 15)
+append(log, numOfRecords, epoch = 4, initialOffset = 20)
 
-val resultOffsetAndEpoch = log.validateOffsetAndEpoch(numberOfRecords, 
epoch + 10)
-assertEquals(ValidOffsetAndEpoch.Type.DIVERGING, 
resultOffsetAndEpoch.getType())
-assertEquals(new OffsetAndEpoch(log.endOffset.offset, epoch), 
resultOffsetAndEpoch.offsetAndEpoch())
+// offset is not equal to oldest snapshot's offset
+val resultOffsetAndEpoch = log.validateOffsetAndEpoch(100, 3)
+assertEquals(ValidOffsetAndEpoch.Type.DIVERGING, 
resultOffsetAndEpoch.getType)
+assertEquals(new OffsetAndEpoch(20, 2), 
resultOffsetAndEpoch.offsetAndEpoch())
+  }
+
+  @Test
+  def testValidateUnknownEpochLessThanLeaderGreaterThanOldestSnapshot(): Unit 
= {

Review comment:
   How about `testValidateEpochLessThanFirstEpochInLog`? If you agree, 
let's change it in `MockLogTest` also.

##
File path: raft/src/main/java/org/apache/kafka/raft/ValidOffsetAndEpoch.java
##
@@ -25,7 +27,7 @@
 this.offsetAndEpoch = offsetAndEpoch;
 }
 
-public Type type() {
+public Type getType() {

Review comment:
   By the way we can also just change the name of the type and field to 
something like `public Kind kind()`

##
File path: raft/src/test/java/org/apache/kafka/raft/MockLogTest.java
##
@@ -750,4 +782,13 @@ private void appendBatch(int numRecords, int epoch) {
 
 appendAsLeader(records, epoch);
 }
+
+private void appendBatch(int numRecords, int epoch, long initialOffset) {

Review comment:
   How about `private void appendBatch(int numberOfRecords, int epoch)` and 
always use the LEO like `appendAsLeader(Collection records, int 
epoch)`?

##
File path: core/src/test/scala/kafka/raft/KafkaMetadataLogTest.scala
##
@@ -413,22 +413,53 @@ final class KafkaMetadataLogTest {
 assertTrue(log.deleteBeforeSnapshot(snapshotId))
 
 val resultOffsetAndEpoch = log.validateOffsetAndEpoch(offset, epoch)
-assertEquals(ValidOffsetAndEpoch.Type.VALID, 
resultOffsetAndEpoch.getType())
+assertEquals(ValidOffsetAndEpoch.Type.VALID, resultOffsetAndEpoch.getType)
 assertEquals(new OffsetAndEpoch(offset, epoch), 
resultOffsetAndEpoch.offsetAndEpoch())
   }
 
   @Test
-  def testValidateEpochUnknown(): Unit = {
+  def testValidateUnknownEpochLessThanLastKnownGreaterThanOldestSnapshot(): 
Unit = {
+val offset = 10
+val numOfRecords = 5
+
 val log = buildMetadataLog(tempDir, mockTime)
+log.updateHighWatermark(new LogOffsetMetadata(offset))
+val snapshotId = new OffsetAndEpoch(offset, 1)
+TestUtils.resource(log.createSnapshot(snapshotId)) { snapshot =>
+  snapshot.

[GitHub] [kafka] andrewegel commented on pull request #10056: KAFKA-12293: remove JCenter and Bintray repositories mentions out of build gradle scripts (sunset is announced for those repositories)

2021-03-15 Thread GitBox


andrewegel commented on pull request #10056:
URL: https://github.com/apache/kafka/pull/10056#issuecomment-799593438


   At the time of my comment here, jcenter is throwing 500s to my kafka build 
processes: https://status.bintray.com/incidents/ctv4bdfz08bg
   
   Now is a good as time as any to take this change to remove this project's 
dependence on the jcenter service.



This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] jolshan commented on a change in pull request #10282: KAFKA-12426: Missing logic to create partition.metadata files in RaftReplicaManager

2021-03-15 Thread GitBox


jolshan commented on a change in pull request #10282:
URL: https://github.com/apache/kafka/pull/10282#discussion_r594552602



##
File path: core/src/main/scala/kafka/cluster/Partition.scala
##
@@ -308,27 +308,29 @@ class Partition(val topicPartition: TopicPartition,
 s"different from the requested log dir $logDir")
 false
   case None =>
-createLogIfNotExists(isNew = false, isFutureReplica = true, 
highWatermarkCheckpoints)
+// not sure if topic ID should be none here, but not sure if we 
have access in ReplicaManager where this is called.

Review comment:
   TODO: remove this when we decide if we want to pass in None or a topicID





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] jolshan commented on a change in pull request #10282: KAFKA-12426: Missing logic to create partition.metadata files in RaftReplicaManager

2021-03-15 Thread GitBox


jolshan commented on a change in pull request #10282:
URL: https://github.com/apache/kafka/pull/10282#discussion_r594553183



##
File path: 
jmh-benchmarks/src/main/java/org/apache/kafka/jmh/server/CheckpointBench.java
##
@@ -145,7 +145,7 @@ public void setup() {
 OffsetCheckpoints checkpoints = (logDir, topicPartition) -> 
Option.apply(0L);
 for (TopicPartition topicPartition : topicPartitions) {
 final Partition partition = 
this.replicaManager.createPartition(topicPartition);
-partition.createLogIfNotExists(true, false, checkpoints);
+partition.createLogIfNotExists(true, false, checkpoints, 
Option.empty());

Review comment:
   I think we can just not set the topic ID here, but want to confirm.





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] jolshan commented on a change in pull request #10282: KAFKA-12426: Missing logic to create partition.metadata files in RaftReplicaManager

2021-03-15 Thread GitBox


jolshan commented on a change in pull request #10282:
URL: https://github.com/apache/kafka/pull/10282#discussion_r594558352



##
File path: core/src/main/scala/kafka/log/Log.scala
##
@@ -349,11 +352,20 @@ class Log(@volatile private var _dir: File,
 
 // Delete partition metadata file if the version does not support topic 
IDs.
 // Recover topic ID if present and topic IDs are supported
+// If we were provided a topic ID when creating the log, partition 
metadata files are supported, and one does not yet exist
+// write to the partition metadata file.
+// Ensure we do not try to assign a provided topicId that is inconsistent 
with the ID on file.
 if (partitionMetadataFile.exists()) {
 if (!keepPartitionMetadataFile)
   partitionMetadataFile.delete()
-else
-  topicId = partitionMetadataFile.read().topicId
+else {
+  val fileTopicId = partitionMetadataFile.read().topicId
+  if (topicId.isDefined && fileTopicId != topicId.get)
+throw new IllegalStateException(s"Tried to assign topic ID 
$topicId to log, but log already contained topicId $fileTopicId")

Review comment:
   I don't know if it is possible to get to this error message. I think in 
most cases, the log should be grabbed if it already exists in the 
makeLeader/makeFollower path. In the log loading path, the topicId should be 
None. I thought it would be good to throw this error to know that something was 
wrong with the code, but maybe there is a better way. (Like maybe if topicId is 
defined in general)





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] C0urante commented on pull request #10014: KAFKA-12252 and KAFKA-12262: Fix session key rotation when leadership changes

2021-03-15 Thread GitBox


C0urante commented on pull request #10014:
URL: https://github.com/apache/kafka/pull/10014#issuecomment-799623900


   @rhauch ping 🙂 
   This has been waiting for a while and the only concern that's been raised is 
unrelated to the fix at hand. Can you take another look?



This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] abbccdda merged pull request #10320: MINOR: revert stream logging level back to ERROR

2021-03-15 Thread GitBox


abbccdda merged pull request #10320:
URL: https://github.com/apache/kafka/pull/10320


   



This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[jira] [Commented] (KAFKA-12464) Enhance constrained sticky Assign algorithm

2021-03-15 Thread A. Sophie Blee-Goldman (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-12464?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17301824#comment-17301824
 ] 

A. Sophie Blee-Goldman commented on KAFKA-12464:


Thanks for the proposal – I think the basic idea makes sense. There is an 
existing scale test in AbstractStickyAssignorTest called  
testLargeAssignmentAndGroupWithUniformSubscription() which you can use to 
measure the improvement. It already has pretty good performance but further 
optimizations are welcome! If this has a significant impact we should also 
scale up the number of consumers and/or partitions in that test. The aim is to 
have it complete in a few seconds when running locally. Right now I believe we 
run it with 1mil partitions and 2,000 consumers; the 1mil partitions is 
probably already close (or beyond) the limits of what kafka can handle for a 
single consumer group in general, but I'd be interested in seeing how much we 
can push the size of the consumer group with this enhancement.

Some quick notes about the second enhancement: (1) we need to track the revoked 
partitions as well (this is for the CooperativeStickyAssignor, needed for 
cooperative rebalancing), so we should also have something like
allRevokedPartitions.removeAll(ownedPartitions.subList(maxQuota,ownedPartitions.size()));
and (2), I think the second line should be
unassignedPartitions.removeAll(ownedPartitions.subList(0, maxQuota));
since we assigned the first sublist of the ownedPartitions list, not the second.

(also (3), I'm guessing you just left this out for brevity but we still need to 
assign partitions in the case ownedPartitions.size() < maxQuota. Might be good 
to include this branch in the ticket description for clarity)

 

Looking forward to the PR!

> Enhance constrained sticky Assign algorithm
> ---
>
> Key: KAFKA-12464
> URL: https://issues.apache.org/jira/browse/KAFKA-12464
> Project: Kafka
>  Issue Type: Improvement
>Affects Versions: 2.7.0
>Reporter: Luke Chen
>Assignee: Luke Chen
>Priority: Major
>
> In KAFKA-9987, we did a great improvement for the case when all consumers 
> were subscribed to same set of topics. The algorithm contains 4 phases:
>  # Reassign as many previously owned partitions as possible, up to the 
> maxQuota
>  # Fill remaining members up to minQuota
>  # If we ran out of unassigned partitions before filling all consumers, we 
> need to start stealing partitions from the over-full consumers at max capacity
>  # Otherwise we may have run out of unfilled consumers before assigning all 
> partitions, in which case we should just distribute one partition each to all 
> consumers at min capacity
>  
> Take an example for better understanding:
> *example:*
> Current status: 2 consumers (C0, C1), and 10 topic partitions: t1p0, t1p1, 
> ... t1p9
> Suppose, current assignment is:
> _C0: t1p0, t1p1, t1p2, t1p3, t1p4_
>  _C1: t1p5, t1p6, t1p7, t1p8, t1p9_
> Now, new consumer added: C2, so we'll do:
>  # Reassign as many previously owned partitions as possible, up to the 
> maxQuota
>  After this phase, the assignment will be: (maxQuota will be 4)
> _C0: t1p0, t1p1, t1p2, t1p3_
>  _C1: t1p5, t1p6, t1p7, t1p8_
>  # Fill remaining members up to minQuota
>  After this phase, the assignment will be:
> _C0: t1p0, t1p1, t1p2, t1p3_
>  _C1: t1p5, t1p6, t1p7, t1p8_
>  _C2: t1p4, t1p9_
>  # If we ran out of unassigned partitions before filling all consumers, we 
> need to start stealing partitions from the over-full consumers at max capacity
>  After this phase, the assignment will be:
> _C0: t1p0, t1p1, t1p2_ 
>  _C1: t1p5, t1p6, t1p7, t1p8_
>  _C2: t1p4, t1p9,_ _t1p3_
>  # Otherwise we may have run out of unfilled consumers before assigning all 
> partitions, in which case we should just distribute one partition each to all 
> consumers at min capacity
>  
>  
> As we can see, we need 3 phases to complete the assignment. But we can 
> actually completed with 2 phases. Here's the updated algorithm:
>  # Reassign as many previously owned partitions as possible, up to the 
> maxQuota, and also considering the numMaxQuota by the remainder of 
> (Partitions / Consumers)
>  # Fill remaining members up to maxQuota if current maxQuotaMember < 
> numMaxQuota, otherwise, to minQuota
>  
> By considering the numMaxQuota, the original step 1 won't be too aggressive 
> to assign too many partitions to consumers, and the step 2 won't be too 
> conservative to assign not enough partitions to consumers, so that we don't 
> need step 3 and step 4 to balance them.
>  
> {{So, the updated Pseudo-code sketch of the algorithm:}}
> C_f := (P/N)_floor, the floor capacity 
>  C_c := (P/N)_ceil, the ceiling capacity
> *C_r := (P%N) the allowed number of members with C_c partitions assigned*
>  *num_max_capacity_members :=

[GitHub] [kafka] junrao commented on a change in pull request #10218: KAFKA-12368: Added inmemory implementations for RemoteStorageManager and RemoteLogMetadataManager.

2021-03-15 Thread GitBox


junrao commented on a change in pull request #10218:
URL: https://github.com/apache/kafka/pull/10218#discussion_r594509836



##
File path: 
clients/src/main/java/org/apache/kafka/server/log/remote/storage/RemoteLogSegmentState.java
##
@@ -87,4 +89,27 @@ public byte id() {
 public static RemoteLogSegmentState forId(byte id) {
 return STATE_TYPES.get(id);
 }
+
+public static boolean isValidTransition(RemoteLogSegmentState srcState, 
RemoteLogSegmentState targetState) {
+Objects.requireNonNull(targetState, "targetState can not be null");
+
+if (srcState == null) {
+// If the source state is null, check the target state as the 
initial state viz DELETE_PARTITION_MARKED

Review comment:
   DELETE_PARTITION_MARKED is not part of RemoteLogSegmentState.

##
File path: 
remote-storage/src/main/java/org/apache/kafka/server/log/remote/storage/RemoteLogMetadataCache.java
##
@@ -0,0 +1,172 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.server.log.remote.storage;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.Comparator;
+import java.util.HashSet;
+import java.util.Iterator;
+import java.util.Map;
+import java.util.NavigableMap;
+import java.util.Optional;
+import java.util.Set;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.ConcurrentMap;
+import java.util.concurrent.ConcurrentSkipListMap;
+import java.util.stream.Collectors;
+
+/**
+ * This class provides an inmemory cache of remote log segment metadata. This 
maintains the lineage of segments
+ * with respect to epoch evolution. It also keeps track of segments which are 
not considered to be copied to remote
+ * storage.
+ */
+public class RemoteLogMetadataCache {
+private static final Logger log = 
LoggerFactory.getLogger(RemoteLogMetadataCache.class);
+
+private final ConcurrentMap 
idToSegmentMetadata
+= new ConcurrentHashMap<>();
+
+// It keeps the segments which are not yet reached to 
COPY_SEGMENT_FINISHED state.
+private final Set remoteLogSegmentIdInProgress = new 
HashSet<>();
+
+// It will have all the segments except with state as COPY_SEGMENT_STARTED.
+private final ConcurrentMap> leaderEpochToOffsetToId
+= new ConcurrentHashMap<>();
+
+private void addRemoteLogSegmentMetadata(RemoteLogSegmentMetadata 
remoteLogSegmentMetadata) {
+log.debug("Adding remote log segment metadata: [{}]", 
remoteLogSegmentMetadata);
+idToSegmentMetadata.put(remoteLogSegmentMetadata.remoteLogSegmentId(), 
remoteLogSegmentMetadata);
+Map leaderEpochToOffset = 
remoteLogSegmentMetadata.segmentLeaderEpochs();
+for (Map.Entry entry : leaderEpochToOffset.entrySet()) {
+leaderEpochToOffsetToId.computeIfAbsent(entry.getKey(), k -> new 
ConcurrentSkipListMap<>())
+.put(entry.getValue(), 
remoteLogSegmentMetadata.remoteLogSegmentId());
+}
+}
+
+public Optional remoteLogSegmentMetadata(int 
leaderEpoch, long offset) {
+NavigableMap offsetToId = 
leaderEpochToOffsetToId.get(leaderEpoch);
+if (offsetToId == null || offsetToId.isEmpty()) {
+return Optional.empty();
+}
+
+// look for floor entry as the given offset may exist in this entry.
+Map.Entry entry = 
offsetToId.floorEntry(offset);
+if (entry == null) {
+// if the offset is lower than the minimum offset available in 
metadata then return empty.
+return Optional.empty();
+}
+
+RemoteLogSegmentMetadata metadata = 
idToSegmentMetadata.get(entry.getValue());
+// check whether the given offset with leaderEpoch exists in this 
segment.
+// check for epoch's offset boundaries with in this segment.
+//  1. get the next epoch's start offset -1 if exists
+//  2. if no next epoch exists, then segment end offset can be 
considered as epoch's relative end offset.
+Map.Entry nextEntry = metadata.segmentLeaderEpochs()
+.higherEntry(leaderEpoch);
+long epochEndOf

[GitHub] [kafka] hachikuji commented on a change in pull request #10309: KAFKA-12181; Loosen raft fetch offset validation of remote replicas

2021-03-15 Thread GitBox


hachikuji commented on a change in pull request #10309:
URL: https://github.com/apache/kafka/pull/10309#discussion_r594599970



##
File path: raft/src/test/java/org/apache/kafka/raft/RaftEventSimulationTest.java
##
@@ -401,6 +401,51 @@ private void checkBackToBackLeaderFailures(QuorumConfig 
config) {
 }
 }
 
+@Test
+public void checkSingleNodeCommittedDataLossQuorumSizeThree() {
+checkSingleNodeCommittedDataLoss(new QuorumConfig(3, 0));
+}
+
+private void checkSingleNodeCommittedDataLoss(QuorumConfig config) {
+assertTrue(config.numVoters > 2,
+"This test requires the cluster to be able to recover from one 
failed node");
+
+for (int seed = 0; seed < 100; seed++) {
+// We run this test without the `MonotonicEpoch` and 
`MajorityReachedHighWatermark`
+// invariants since the loss of committed data on one node can 
violate them.
+
+Cluster cluster = new Cluster(config, seed);
+EventScheduler scheduler = new EventScheduler(cluster.random, 
cluster.time);
+scheduler.addInvariant(new MonotonicHighWatermark(cluster));
+scheduler.addInvariant(new SingleLeader(cluster));
+scheduler.addValidation(new ConsistentCommittedData(cluster));
+
+MessageRouter router = new MessageRouter(cluster);
+
+cluster.startAll();
+schedulePolling(scheduler, cluster, 3, 5);
+scheduler.schedule(router::deliverAll, 0, 2, 5);
+scheduler.schedule(new SequentialAppendAction(cluster), 0, 2, 3);
+scheduler.runUntil(() -> cluster.anyReachedHighWatermark(10));
+
+RaftNode node = cluster.randomRunning().orElseThrow(() ->
+new AssertionError("Failed to find running node")
+);
+
+// Kill a random node and drop all of its persistent state. The 
Raft
+// protocol guarantees should still ensure we lose no committed 
data
+// as long as a new leader is elected before the failed node is 
restarted.
+cluster.kill(node.nodeId);
+cluster.deletePersistentState(node.nodeId);
+scheduler.runUntil(() -> !cluster.hasLeader(node.nodeId) && 
cluster.hasConsistentLeader());

Review comment:
   It is checking consistent `ElectionState`, which is basically the same 
as verifying all `quorum-state` files match.





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] wcarlson5 opened a new pull request #10321: Minor:timeout issue in Remove thread

2021-03-15 Thread GitBox


wcarlson5 opened a new pull request #10321:
URL: https://github.com/apache/kafka/pull/10321


   timeout is a duration not a point in time
   
   ### Committer Checklist (excluded from commit message)
   - [ ] Verify design and implementation 
   - [ ] Verify test coverage and CI build status
   - [ ] Verify documentation (including upgrade notes)
   



This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] wcarlson5 commented on pull request #10321: Minor:timeout issue in Remove thread

2021-03-15 Thread GitBox


wcarlson5 commented on pull request #10321:
URL: https://github.com/apache/kafka/pull/10321#issuecomment-799685448


   @cadonna @ableegoldman Can I get a look at this?
   
   @vvcephei This will need to be picked to 2.8 as well



This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[jira] [Commented] (KAFKA-12313) Consider deprecating the default.windowed.serde.inner.class configs

2021-03-15 Thread A. Sophie Blee-Goldman (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-12313?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17301917#comment-17301917
 ] 

A. Sophie Blee-Goldman commented on KAFKA-12313:


Ah, I didn't notice (or forgot ;)) that we had started throwing an 
IllegalArgumentException instead of setting it to MAX_VALUE when windowSize = 
null during KIP-659. I guess we don't have to worry about that here

In that case we should also do something similar for the innerClass: ie
if (innerClassDeserializer != null && configInnerClassDeserializer != null) {   
 throw new IllegalArgumentException("Inner class deserializer should not be set 
in both the time windowed deserializer constructor and the 
windowed.deserializer.inner.class config");
} 
But that still leaves the door open for users to _only_ set the config for both 
windowSize & innerClassSerde, which is what we want to discourage. That was 
what I was trying to propose in item #3 – that the configs should only be used 
for the console consumer, while for use within Kafka Streams the Serde should 
always be instantiated directly. Does that make sense?

Here's what I'm thinking:

We deprecate both existing configs, and replace them with a single 
windowed.deserializer.inner.class config. Since we technically only need this 
config for the console consumer, but not the producer, we can actually just 
make the config accept a deserializer class directly (rather than the serde 
class). That way it seems especially obvious that this config is not intended 
for use within Kafka Streams, since it's just a deserializer.

WDYT?

Also, Re: your question in #5: you're asking if we should ensure that both the 
innerClassDeserializer and the windowSize parameters are passed in to the 
configs, right? If so, then yes, that makes sense to me.

> Consider deprecating the default.windowed.serde.inner.class configs
> ---
>
> Key: KAFKA-12313
> URL: https://issues.apache.org/jira/browse/KAFKA-12313
> Project: Kafka
>  Issue Type: Improvement
>  Components: streams
>Reporter: A. Sophie Blee-Goldman
>Assignee: Sagar Rao
>Priority: Major
>  Labels: needs-kip
> Fix For: 3.0.0
>
>
> During the discussion of KIP-659 we discussed whether it made sense to have a 
> "default" class for the serdes of windowed inner classes across Streams. 
> Using these configs instead of specifying an actual Serde object can lead to 
> subtle bugs, since the WindowedDeserializer requires a windowSize in addition 
> to the inner class. If the default constructor is invoked, as it will be when 
> falling back on the config, this windowSize defaults to MAX_VALUE. 
> If the downstream program doesn't care about the window end time in the 
> output, then this can go unnoticed and technically there is no problem. But 
> if anything does depend on the end time, or the user just wants to manually 
> read the output for testing purposes, then the MAX_VALUE will result in a 
> garbage timestamp.
> We should consider whether the convenience of specifying a config instead of 
> instantiating a Serde in each operator is really worth the risk of a user 
> accidentally failing to specify a windowSize



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[GitHub] [kafka] jsancio commented on a change in pull request #10309: KAFKA-12181; Loosen raft fetch offset validation of remote replicas

2021-03-15 Thread GitBox


jsancio commented on a change in pull request #10309:
URL: https://github.com/apache/kafka/pull/10309#discussion_r594638669



##
File path: raft/src/test/java/org/apache/kafka/raft/RaftEventSimulationTest.java
##
@@ -401,6 +401,51 @@ private void checkBackToBackLeaderFailures(QuorumConfig 
config) {
 }
 }
 
+@Test
+public void checkSingleNodeCommittedDataLossQuorumSizeThree() {
+checkSingleNodeCommittedDataLoss(new QuorumConfig(3, 0));
+}
+
+private void checkSingleNodeCommittedDataLoss(QuorumConfig config) {
+assertTrue(config.numVoters > 2,
+"This test requires the cluster to be able to recover from one 
failed node");
+
+for (int seed = 0; seed < 100; seed++) {
+// We run this test without the `MonotonicEpoch` and 
`MajorityReachedHighWatermark`
+// invariants since the loss of committed data on one node can 
violate them.
+
+Cluster cluster = new Cluster(config, seed);
+EventScheduler scheduler = new EventScheduler(cluster.random, 
cluster.time);
+scheduler.addInvariant(new MonotonicHighWatermark(cluster));
+scheduler.addInvariant(new SingleLeader(cluster));
+scheduler.addValidation(new ConsistentCommittedData(cluster));
+
+MessageRouter router = new MessageRouter(cluster);
+
+cluster.startAll();
+schedulePolling(scheduler, cluster, 3, 5);
+scheduler.schedule(router::deliverAll, 0, 2, 5);
+scheduler.schedule(new SequentialAppendAction(cluster), 0, 2, 3);
+scheduler.runUntil(() -> cluster.anyReachedHighWatermark(10));
+
+RaftNode node = cluster.randomRunning().orElseThrow(() ->
+new AssertionError("Failed to find running node")
+);
+
+// Kill a random node and drop all of its persistent state. The 
Raft
+// protocol guarantees should still ensure we lose no committed 
data
+// as long as a new leader is elected before the failed node is 
restarted.
+cluster.kill(node.nodeId);
+cluster.deletePersistentState(node.nodeId);
+scheduler.runUntil(() -> !cluster.hasLeader(node.nodeId) && 
cluster.hasConsistentLeader());

Review comment:
   Yes. I meant to say `ElectionState` instead of `LeaderState`. 
`ElectionState` has a field called `votedIdOpt` for which `equals` checks for 
equality. This is not strictly required for having a "consistent" leader. I 
think for having a consistent leader for an epoch, only the `epoch` and 
`leaderIdOpt` need to match for all of the replicas.





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[jira] [Updated] (KAFKA-10310) Kafka Raft Snapshot

2021-03-15 Thread Jose Armando Garcia Sancio (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-10310?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Jose Armando Garcia Sancio updated KAFKA-10310:
---
Description: 
Tracking issue for KIP-630: Kafka Raft Snapshot

https://cwiki.apache.org/confluence/display/KAFKA/KIP-630%3A+Kafka+Raft+Snapshot

  was:Tracking issue for KIP-630: Kafka Raft Snapshot


> Kafka Raft Snapshot
> ---
>
> Key: KAFKA-10310
> URL: https://issues.apache.org/jira/browse/KAFKA-10310
> Project: Kafka
>  Issue Type: New Feature
>Reporter: Jose Armando Garcia Sancio
>Assignee: Jose Armando Garcia Sancio
>Priority: Major
>
> Tracking issue for KIP-630: Kafka Raft Snapshot
> https://cwiki.apache.org/confluence/display/KAFKA/KIP-630%3A+Kafka+Raft+Snapshot



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Updated] (KAFKA-10310) Kafka Raft Snapshot

2021-03-15 Thread Jose Armando Garcia Sancio (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-10310?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Jose Armando Garcia Sancio updated KAFKA-10310:
---
Description: Tracking issue for [KIP-630: Kafka Raft 
Snapshot|https://cwiki.apache.org/confluence/display/KAFKA/KIP-630%3A+Kafka+Raft+Snapshot]
  (was: Tracking issue for KIP-630: Kafka Raft Snapshot

https://cwiki.apache.org/confluence/display/KAFKA/KIP-630%3A+Kafka+Raft+Snapshot)

> Kafka Raft Snapshot
> ---
>
> Key: KAFKA-10310
> URL: https://issues.apache.org/jira/browse/KAFKA-10310
> Project: Kafka
>  Issue Type: New Feature
>Reporter: Jose Armando Garcia Sancio
>Assignee: Jose Armando Garcia Sancio
>Priority: Major
>
> Tracking issue for [KIP-630: Kafka Raft 
> Snapshot|https://cwiki.apache.org/confluence/display/KAFKA/KIP-630%3A+Kafka+Raft+Snapshot]



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Created] (KAFKA-12466) Controller and Broker Metadata Snapshots

2021-03-15 Thread Jose Armando Garcia Sancio (Jira)
Jose Armando Garcia Sancio created KAFKA-12466:
--

 Summary: Controller and Broker Metadata Snapshots
 Key: KAFKA-12466
 URL: https://issues.apache.org/jira/browse/KAFKA-12466
 Project: Kafka
  Issue Type: New Feature
Reporter: Jose Armando Garcia Sancio
Assignee: Jose Armando Garcia Sancio






--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Created] (KAFKA-12467) Disabled implementation for generating controller snapshots

2021-03-15 Thread Jose Armando Garcia Sancio (Jira)
Jose Armando Garcia Sancio created KAFKA-12467:
--

 Summary: Disabled implementation for generating controller 
snapshots
 Key: KAFKA-12467
 URL: https://issues.apache.org/jira/browse/KAFKA-12467
 Project: Kafka
  Issue Type: Sub-task
Reporter: Jose Armando Garcia Sancio
Assignee: Jose Armando Garcia Sancio


Controller implementation for generating snapshots but the default 
configuration doesn’t generate snapshots. The default configuration wont 
generate snapshot because both the Controller and Broker won't know how to load 
at this point.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Commented] (KAFKA-12455) OffsetValidationTest.test_broker_rolling_bounce failing for Raft quorums

2021-03-15 Thread Ron Dagostino (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-12455?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17301952#comment-17301952
 ] 

Ron Dagostino commented on KAFKA-12455:
---

I looked at the brokers' metadata caches for the two separate configurations -- 
ZK vs. Raft -- to find out what percentage of the time they showed showed 1 
alive broker instead of 2.  I was expecting the ZooKeeper configuration to show 
relatively little time with just 1 alive broker since the clients are never 
seeing that metadata situation, and I was expecting the Raft configuration to 
show a much higher percentage of time with just 1 alive broker since the 
clients do see that metadata situation.  I did not find what I was expecting to 
find.

The amounts of times where the brokers are advertising just 1 alive broker in 
their metadata cache as follows:

*ZooKeeper Configuration*:
BrokerId=1: 37 seconds out of 61 seconds of that broker's availability 
during the test, or 61% of the time with just 1 alive broker in metadata cache
BrokerId=2: 39 seconds out of 61 seconds of that broker's availability 
during the test, or 64% of the time with just 1 alive broker in metadata cache

*Raft Configuration*:
BrokerId=1: 37 seconds out of 88 seconds of that broker's availability 
during the test, or 42% of the time with just 1 alive broker in metadata cache
BrokerId=2: 52 seconds out of 88 seconds of that broker's availability 
during the test, or 59% of the time with just 1 alive broker in metadata cache

So the brokers in the Zookeeper configuration consider just 1 broker to be 
alive more often than the brokers in the Raft configuration consider just 1 
broker to be alive!

It is still not clear why the consumers never see just a single alive broker in 
the ZooKeeper configuration.  From the above it does not appear to be due to 
any difference in metadata cache population -- if it were just that then we 
would see the test failing in the ZooKeeper configuration since that actually 
advertises a single alive broker more frequently in terms of percentage of test 
time.



> OffsetValidationTest.test_broker_rolling_bounce failing for Raft quorums
> 
>
> Key: KAFKA-12455
> URL: https://issues.apache.org/jira/browse/KAFKA-12455
> Project: Kafka
>  Issue Type: Bug
>Affects Versions: 2.8.0
>Reporter: Ron Dagostino
>Assignee: Ron Dagostino
>Priority: Blocker
>
> OffsetValidationTest.test_broker_rolling_bounce in `consumer_test.py` is 
> failing because the consumer group is rebalancing unexpectedly.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Commented] (KAFKA-12455) OffsetValidationTest.test_broker_rolling_bounce failing for Raft quorums

2021-03-15 Thread Ron Dagostino (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-12455?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17301956#comment-17301956
 ] 

Ron Dagostino commented on KAFKA-12455:
---

The test is using the default value of metadata.max.age.ms=30 (5 minutes).  
When I explicitly turn it down to metadata.max.age.ms=5000 (5 seconds) the test 
passes for Raft but then fails for ZK (2 unexpected group rebalances in that 
case).

I increased it to 10 seconds and then the Raft configuration failed with 3 
unexpected rebalances and the ZK configuration failed with 1 unexpected 
rebalance.

I decreased it to a very aggressive 1 second -- and they both passed.

We have historically seen some flakiness in the ZooKeeper version of this test, 
and the fact that the test suddenly failed if we set metadata.max.age.ms to 5 
or 10 seconds indicates that the it is just plain luck that the test is passing 
today.

Given that the current client-side code doesn't fall back to the bootstrap 
brokers when it sees no brokers available, I think any test really needs to 
make it *impossible* for the client to see cluster metadata with just a single 
broker. Decreasing the metadata max age decreases the possibility of it 
happening but doesn't make it impossible.

Another experiment was to keep metadata.max.age.ms=30 but define 
session.timeout.ms = 3 instead of the 1 it was setting before -- this 
is longer tyan the broker roll time, and in fact this change allows both 
configurations to pass.

A further experiment was to keep metadata.max.age.ms=30 and 
session.timeout.ms = 1 but expand to 3 brokers instead of just 2.  This 
should fix the issue since there would never be a situation where just 1 broker 
is available, and a METADATA response would always have at least 2 brokers for 
the consumer to use.  Both configurations pass.


> OffsetValidationTest.test_broker_rolling_bounce failing for Raft quorums
> 
>
> Key: KAFKA-12455
> URL: https://issues.apache.org/jira/browse/KAFKA-12455
> Project: Kafka
>  Issue Type: Bug
>Affects Versions: 2.8.0
>Reporter: Ron Dagostino
>Assignee: Ron Dagostino
>Priority: Blocker
>
> OffsetValidationTest.test_broker_rolling_bounce in `consumer_test.py` is 
> failing because the consumer group is rebalancing unexpectedly.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[GitHub] [kafka] bbejeck commented on a change in pull request #10150: KAFKA-3745: Add access to read-only key in value joiner

2021-03-15 Thread GitBox


bbejeck commented on a change in pull request #10150:
URL: https://github.com/apache/kafka/pull/10150#discussion_r594665281



##
File path: 
streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamImplValueJoinerWithKeyTest.java
##
@@ -0,0 +1,229 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.streams.kstream.internals;
+
+import org.apache.kafka.common.serialization.IntegerSerializer;
+import org.apache.kafka.common.serialization.Serdes;
+import org.apache.kafka.common.serialization.StringDeserializer;
+import org.apache.kafka.common.serialization.StringSerializer;
+import org.apache.kafka.streams.KeyValue;
+import org.apache.kafka.streams.StreamsBuilder;
+import org.apache.kafka.streams.TestInputTopic;
+import org.apache.kafka.streams.TestOutputTopic;
+import org.apache.kafka.streams.TopologyTestDriver;
+import org.apache.kafka.streams.kstream.Consumed;
+import org.apache.kafka.streams.kstream.GlobalKTable;
+import org.apache.kafka.streams.kstream.JoinWindows;
+import org.apache.kafka.streams.kstream.Joined;
+import org.apache.kafka.streams.kstream.KStream;
+import org.apache.kafka.streams.kstream.KTable;
+import org.apache.kafka.streams.kstream.KeyValueMapper;
+import org.apache.kafka.streams.kstream.Produced;
+import org.apache.kafka.streams.kstream.StreamJoined;
+import org.apache.kafka.streams.kstream.ValueJoinerWithKey;
+import org.apache.kafka.test.StreamsTestUtils;
+import org.junit.Before;
+import org.junit.Test;
+
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.List;
+import java.util.Properties;
+
+import static java.time.Duration.ofMillis;
+import static org.junit.Assert.assertEquals;
+
+public class KStreamImplValueJoinerWithKeyTest {
+
+private KStream leftStream;
+private KStream rightStream;
+private KTable ktable;
+private GlobalKTable globalKTable;
+private StreamsBuilder builder;
+
+private final Properties props = 
StreamsTestUtils.getStreamsConfig(Serdes.String(), Serdes.String());
+private final String leftTopic = "left";
+private final String rightTopic = "right";
+private final String ktableTopic = "ktableTopic";
+private final String globalTopic = "globalTopic";
+private final String outputTopic = "joined-result";
+
+private final ValueJoinerWithKey 
valueJoinerWithKey =
+(key, lv, rv) -> key + ":" + (lv + (rv == null ? 0 : rv));
+private final JoinWindows joinWindows = JoinWindows.of(ofMillis(100));
+private final StreamJoined streamJoined =
+StreamJoined.with(Serdes.String(), Serdes.Integer(), 
Serdes.Integer());
+private final Joined joined =
+Joined.with(Serdes.String(), Serdes.Integer(), Serdes.Integer());
+private final KeyValueMapper keyValueMapper =
+(k, v) -> k;
+
+@Before
+public void setup() {
+builder = new StreamsBuilder();
+leftStream = builder.stream(leftTopic, Consumed.with(Serdes.String(), 
Serdes.Integer()));
+rightStream = builder.stream(rightTopic, 
Consumed.with(Serdes.String(), Serdes.Integer()));
+ktable = builder.table(ktableTopic, Consumed.with(Serdes.String(), 
Serdes.Integer()));
+globalKTable = builder.globalTable(globalTopic, 
Consumed.with(Serdes.String(), Serdes.Integer()));
+}
+
+@Test
+public void shouldIncludeKeyInStreamSteamJoinResults() {
+leftStream.join(
+rightStream,
+valueJoinerWithKey,
+joinWindows,
+streamJoined
+).to(outputTopic, Produced.with(Serdes.String(), Serdes.String()));
+// Left KV A, 3, Right KV A, 5
+runJoinTopology(builder,
+Collections.singletonList(KeyValue.pair("A", "A:5")),
+false,
+rightTopic
+);
+}
+
+@Test
+public void shouldIncludeKeyInStreamLeftJoinResults() {
+leftStream.leftJoin(
+rightStream,
+valueJoinerWithKey,
+joinWindows,
+streamJoined
+).to(outputTopic, Produced.with(Serdes.String(), Serdes.String()));
+// Left KV A, 3, Right KV A, 5
+// TTD pipes reco

[GitHub] [kafka] bbejeck commented on a change in pull request #10150: KAFKA-3745: Add access to read-only key in value joiner

2021-03-15 Thread GitBox


bbejeck commented on a change in pull request #10150:
URL: https://github.com/apache/kafka/pull/10150#discussion_r594665349



##
File path: 
streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamImplValueJoinerWithKeyTest.java
##
@@ -0,0 +1,229 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.streams.kstream.internals;
+
+import org.apache.kafka.common.serialization.IntegerSerializer;
+import org.apache.kafka.common.serialization.Serdes;
+import org.apache.kafka.common.serialization.StringDeserializer;
+import org.apache.kafka.common.serialization.StringSerializer;
+import org.apache.kafka.streams.KeyValue;
+import org.apache.kafka.streams.StreamsBuilder;
+import org.apache.kafka.streams.TestInputTopic;
+import org.apache.kafka.streams.TestOutputTopic;
+import org.apache.kafka.streams.TopologyTestDriver;
+import org.apache.kafka.streams.kstream.Consumed;
+import org.apache.kafka.streams.kstream.GlobalKTable;
+import org.apache.kafka.streams.kstream.JoinWindows;
+import org.apache.kafka.streams.kstream.Joined;
+import org.apache.kafka.streams.kstream.KStream;
+import org.apache.kafka.streams.kstream.KTable;
+import org.apache.kafka.streams.kstream.KeyValueMapper;
+import org.apache.kafka.streams.kstream.Produced;
+import org.apache.kafka.streams.kstream.StreamJoined;
+import org.apache.kafka.streams.kstream.ValueJoinerWithKey;
+import org.apache.kafka.test.StreamsTestUtils;
+import org.junit.Before;
+import org.junit.Test;
+
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.List;
+import java.util.Properties;
+
+import static java.time.Duration.ofMillis;
+import static org.junit.Assert.assertEquals;
+
+public class KStreamImplValueJoinerWithKeyTest {
+
+private KStream leftStream;
+private KStream rightStream;
+private KTable ktable;
+private GlobalKTable globalKTable;
+private StreamsBuilder builder;
+
+private final Properties props = 
StreamsTestUtils.getStreamsConfig(Serdes.String(), Serdes.String());
+private final String leftTopic = "left";
+private final String rightTopic = "right";
+private final String ktableTopic = "ktableTopic";
+private final String globalTopic = "globalTopic";
+private final String outputTopic = "joined-result";
+
+private final ValueJoinerWithKey 
valueJoinerWithKey =
+(key, lv, rv) -> key + ":" + (lv + (rv == null ? 0 : rv));
+private final JoinWindows joinWindows = JoinWindows.of(ofMillis(100));
+private final StreamJoined streamJoined =
+StreamJoined.with(Serdes.String(), Serdes.Integer(), 
Serdes.Integer());
+private final Joined joined =
+Joined.with(Serdes.String(), Serdes.Integer(), Serdes.Integer());
+private final KeyValueMapper keyValueMapper =
+(k, v) -> k;
+
+@Before
+public void setup() {
+builder = new StreamsBuilder();
+leftStream = builder.stream(leftTopic, Consumed.with(Serdes.String(), 
Serdes.Integer()));
+rightStream = builder.stream(rightTopic, 
Consumed.with(Serdes.String(), Serdes.Integer()));
+ktable = builder.table(ktableTopic, Consumed.with(Serdes.String(), 
Serdes.Integer()));
+globalKTable = builder.globalTable(globalTopic, 
Consumed.with(Serdes.String(), Serdes.Integer()));
+}
+
+@Test
+public void shouldIncludeKeyInStreamSteamJoinResults() {
+leftStream.join(
+rightStream,
+valueJoinerWithKey,
+joinWindows,
+streamJoined
+).to(outputTopic, Produced.with(Serdes.String(), Serdes.String()));
+// Left KV A, 3, Right KV A, 5
+runJoinTopology(builder,
+Collections.singletonList(KeyValue.pair("A", "A:5")),
+false,
+rightTopic
+);
+}
+
+@Test
+public void shouldIncludeKeyInStreamLeftJoinResults() {
+leftStream.leftJoin(
+rightStream,
+valueJoinerWithKey,
+joinWindows,
+streamJoined
+).to(outputTopic, Produced.with(Serdes.String(), Serdes.String()));
+// Left KV A, 3, Right KV A, 5
+// TTD pipes reco

[jira] [Created] (KAFKA-12468) Initial offsets are copied from source to target cluster

2021-03-15 Thread Bart De Neuter (Jira)
Bart De Neuter created KAFKA-12468:
--

 Summary: Initial offsets are copied from source to target cluster
 Key: KAFKA-12468
 URL: https://issues.apache.org/jira/browse/KAFKA-12468
 Project: Kafka
  Issue Type: Bug
  Components: mirrormaker
Affects Versions: 2.7.0
Reporter: Bart De Neuter


We have an active-passive setup where  the 3 connectors from mirror maker 2 
(heartbeat, checkpoint and source) are running on a dedicated Kafka connect 
cluster on the target cluster.

Offset syncing is enabled as specified by KIP-545. But when activated, it seems 
the offsets from the source cluster are initially copied to the target cluster 
without translation. This causes a negative lag for all synced consumer groups. 
Only when we reset the offsets for each topic/partition on the target cluster 
and produce a record on the topic/partition in the source, the sync starts 
working correctly. 

I would expect that the consumer groups are synced but that the current offsets 
of the source cluster are not copied to the target cluster.

This is the configuration we are currently using:

Heartbeat connector

 
{code:xml}
{
  "name": "mm2-mirror-heartbeat",
  "config": {
"name": "mm2-mirror-heartbeat",
"connector.class": 
"org.apache.kafka.connect.mirror.MirrorHeartbeatConnector",
"source.cluster.alias": "eventador",
"target.cluster.alias": "msk",
"source.cluster.bootstrap.servers": "",
"target.cluster.bootstrap.servers": "",
"topics": ".*",
"groups": ".*",
"tasks.max": "1",
"replication.policy.class": "CustomReplicationPolicy",
"sync.group.offsets.enabled": "true",
"sync.group.offsets.interval.seconds": "5",
"emit.checkpoints.enabled": "true",
"emit.checkpoints.interval.seconds": "30",
"emit.heartbeats.interval.seconds": "30",
"key.converter": " org.apache.kafka.connect.converters.ByteArrayConverter",
"value.converter": "org.apache.kafka.connect.converters.ByteArrayConverter"
  }
}
{code}
Checkpoint connector:
{code:xml}
{
  "name": "mm2-mirror-checkpoint",
  "config": {
"name": "mm2-mirror-checkpoint",
"connector.class": 
"org.apache.kafka.connect.mirror.MirrorCheckpointConnector",
"source.cluster.alias": "eventador",
"target.cluster.alias": "msk",
"source.cluster.bootstrap.servers": "",
"target.cluster.bootstrap.servers": "",
"topics": ".*",
"groups": ".*",
"tasks.max": "40",
"replication.policy.class": "CustomReplicationPolicy",
"sync.group.offsets.enabled": "true",
"sync.group.offsets.interval.seconds": "5",
"emit.checkpoints.enabled": "true",
"emit.checkpoints.interval.seconds": "30",
"emit.heartbeats.interval.seconds": "30",
"key.converter": " org.apache.kafka.connect.converters.ByteArrayConverter",
"value.converter": "org.apache.kafka.connect.converters.ByteArrayConverter"
  }
}
{code}
 Source connector:
{code:xml}
{
  "name": "mm2-mirror-source",
  "config": {
"name": "mm2-mirror-source",
"connector.class": "org.apache.kafka.connect.mirror.MirrorSourceConnector",
"source.cluster.alias": "eventador",
"target.cluster.alias": "msk",
"source.cluster.bootstrap.servers": "",
"target.cluster.bootstrap.servers": "",
"topics": ".*",
"groups": ".*",
"tasks.max": "40",
"replication.policy.class": "CustomReplicationPolicy",
"sync.group.offsets.enabled": "true",
"sync.group.offsets.interval.seconds": "5",
"emit.checkpoints.enabled": "true",
"emit.checkpoints.interval.seconds": "30",
"emit.heartbeats.interval.seconds": "30",
"key.converter": " org.apache.kafka.connect.converters.ByteArrayConverter",
"value.converter": "org.apache.kafka.connect.converters.ByteArrayConverter"
  }
}
{code}
 



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[GitHub] [kafka] kowshik commented on pull request #10271: KAFKA-12429: Added serdes for the default implementation of RLMM based on an internal topic as storage.

2021-03-15 Thread GitBox


kowshik commented on pull request #10271:
URL: https://github.com/apache/kafka/pull/10271#issuecomment-799747349


   @satishd should this PR be reviewed after #10218 has been reviewed and 
merged? It seems like this PR contains some of the code thats seen in #10218, 
ex: `InmemoryRemoteLogMetadataManager.java`.



This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] bbejeck commented on a change in pull request #10150: KAFKA-3745: Add access to read-only key in value joiner

2021-03-15 Thread GitBox


bbejeck commented on a change in pull request #10150:
URL: https://github.com/apache/kafka/pull/10150#discussion_r594678897



##
File path: streams/src/main/java/org/apache/kafka/streams/kstream/KStream.java
##
@@ -1315,15 +1315,15 @@ void to(final TopicNameExtractor topicExtractor,
  final JoinWindows windows);
 
 /**
- * Join records of this stream with another {@code KStream}'s records 
using windowed inner equi join using the

Review comment:
   >I guess you actually inserted this a copy from above with slight 
modification only.
   
   Yes, I wanted to keep everything in the same order.  I'm not sure how to 
handle how the diffs get rendered in this situation





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[jira] [Created] (KAFKA-12469) The topic names in the metrics do not retain their format when extracting through JMX.

2021-03-15 Thread Jira
Rafał Chmielewski created KAFKA-12469:
-

 Summary: The topic names in the metrics do not retain their format 
when extracting through JMX.
 Key: KAFKA-12469
 URL: https://issues.apache.org/jira/browse/KAFKA-12469
 Project: Kafka
  Issue Type: Bug
  Components: metrics
Reporter: Rafał Chmielewski


I have topic names that have a period in the name:


product.order
product.offering.price

 

However, for the metrics issued by JMX by a program that is a consumer of Kafka 
messages, the dots are replaced with an underscore:

kafka.consumer<>records-lead

 

But for the producer, this problem doesn't occur:

kafka.producer<>record-send-total

 

As a consumer I use Akka Alpakka. But I think it's using Apache library to 
connect to Kafka and report metrics via JMX.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Created] (KAFKA-12470) The topic names in the metrics do not retain their format when extracting through JMX.

2021-03-15 Thread Jira
Rafał Chmielewski created KAFKA-12470:
-

 Summary: The topic names in the metrics do not retain their format 
when extracting through JMX.
 Key: KAFKA-12470
 URL: https://issues.apache.org/jira/browse/KAFKA-12470
 Project: Kafka
  Issue Type: Bug
  Components: metrics
Reporter: Rafał Chmielewski


I have topic names that have a period in the name:


product.order
product.offering.price

 

However, for the metrics issued by JMX by a program that is a consumer of Kafka 
messages, the dots are replaced with an underscore:

kafka.consumer<>records-lead

 

But for the producer, this problem doesn't occur:

kafka.producer<>record-send-total

 

As a consumer I use Akka Alpakka. But I think it's using Apache library to 
connect to Kafka and report metrics via JMX.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Commented] (KAFKA-10666) Kafka doesn't use keystore / key / truststore passwords for named SSL connections

2021-03-15 Thread Jason (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-10666?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17302015#comment-17302015
 ] 

Jason commented on KAFKA-10666:
---

This might have been user error. In revisiting my code, it looks like my 
settings may have been missing the .ssl. Sorry :D

> Kafka doesn't use keystore / key / truststore passwords for named SSL 
> connections
> -
>
> Key: KAFKA-10666
> URL: https://issues.apache.org/jira/browse/KAFKA-10666
> Project: Kafka
>  Issue Type: Bug
>  Components: admin
>Affects Versions: 2.5.0, 2.6.0
> Environment: kafka in an openjdk-11 docker container, the client java 
> application is in an alpine container. zookeeper in a separate container. 
>Reporter: Jason
>Priority: Minor
>
> When configuring named listener SSL connections with ssl key and keystore 
> with passwords including listener.name.ourname.ssl.key.password, 
> listener.name.ourname.ssl.keystore.password, and 
> listener.name.ourname.ssl.truststore.password via via the AdminClient the 
> settings are not used and the setting is not accepted if the default 
> ssl.key.password or ssl.keystore.password are not set.  We configure all 
> keystore and truststore values for the named listener in a single batch using 
> incrementalAlterConfigs. Additionally, when ssl.keystore.password is set to 
> the value of our keystore password the keystore is loaded for SSL 
> communication without issue, however if ssl.keystore.password is incorrect 
> and listener.name.ourname.keystore.password is correct, we are unable to load 
> the keystore with bad password errors.  It appears that only the default 
> ssl.xxx.password settings are used. This setting is immutable as when we 
> attempt to set it we get an error indicating that the listener.name. setting 
> can be set. 



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Resolved] (KAFKA-12470) The topic names in the metrics do not retain their format when extracting through JMX.

2021-03-15 Thread Jira


 [ 
https://issues.apache.org/jira/browse/KAFKA-12470?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Rafał Chmielewski resolved KAFKA-12470.
---
Resolution: Duplicate

> The topic names in the metrics do not retain their format when extracting 
> through JMX.
> --
>
> Key: KAFKA-12470
> URL: https://issues.apache.org/jira/browse/KAFKA-12470
> Project: Kafka
>  Issue Type: Bug
>  Components: metrics
>Reporter: Rafał Chmielewski
>Priority: Major
>
> I have topic names that have a period in the name:
> product.order
> product.offering.price
>  
> However, for the metrics issued by JMX by a program that is a consumer of 
> Kafka messages, the dots are replaced with an underscore:
> kafka.consumer client-id=consumer-export-4, topic=product_offering_price, 
> partition=1><>records-lead
>  
> But for the producer, this problem doesn't occur:
> kafka.producer client-id=bss.data.verification.pi_1, 
> topic=product.offering.price><>record-send-total
>  
> As a consumer I use Akka Alpakka. But I think it's using Apache library to 
> connect to Kafka and report metrics via JMX.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[GitHub] [kafka] jolshan commented on pull request #10313: KAFKA-12456: Log "Listeners are not identical across brokers" message at WARN/INFO instead of ERROR

2021-03-15 Thread GitBox


jolshan commented on pull request #10313:
URL: https://github.com/apache/kafka/pull/10313#issuecomment-799770551


   Hi @ikdekker. I'm not a committer, but I can answer some of your questions. 
   The report you mentioned was generated to the builds directory. If you copy 
and paste the file path into a web browser, you should be able to see the 
result. 
   
   As for the JIRA ticket, the `KAFKA-12456` at the start of the PR title will 
automatically link this PR to the ticket. If you'd like access to editing the 
ticket, you can create an Apache JIRA account. Then simply ask for JIRA 
permissions by sending your JIRA username to d...@kafka.apache.org
   
   Hope this helps!
   
   



This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] jolshan edited a comment on pull request #10313: KAFKA-12456: Log "Listeners are not identical across brokers" message at WARN/INFO instead of ERROR

2021-03-15 Thread GitBox


jolshan edited a comment on pull request #10313:
URL: https://github.com/apache/kafka/pull/10313#issuecomment-799770551


   Hi @ikdekker. I'm not a committer, but I can answer some of your questions. 
   The report you mentioned was generated to the builds directory. If you copy 
and paste the file path into a web browser, you should be able to see the 
result. 
   
   As for the JIRA ticket, the `KAFKA-12456` at the start of the PR title will 
automatically link this PR to the ticket. If you'd like access to editing the 
ticket, you can create an Apache JIRA account. Then simply ask for JIRA 
permissions by sending your JIRA username to d...@kafka.apache.org
   
   More info on mailing lists here: http://kafka.apache.org/contact
   
   Hope this helps!
   
   



This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] rondagostino opened a new pull request #10322: KAFKA-12455: OffsetValidationTest.test_broker_rolling_bounce fail: Raft

2021-03-15 Thread GitBox


rondagostino opened a new pull request #10322:
URL: https://github.com/apache/kafka/pull/10322


   OffsetValidationTest.test_broker_rolling_bounce was failing when used with a 
Raft-based metadata quorum but succeeding with a ZooKeeper-based quorum.  The 
problem was that in the Raft case the consumer was sometimes receiving a 
METADATA response with just 1 alive broker, and then when that broker rolled 
the consumer wouldn't know about any alive nodes.  It would have to wait until 
the broker returned before it could reconnect, and by that time the group 
coordinator on the second broker would have timed-out the client and initiated 
a group rebalance.  The test explicitly checks that no rebalances occur, so the 
test would fail.  It turns out that the reason why the ZooKeeper configuration 
wasn't seeing rebalances was just plain luck.  The brokers' metadata caches in 
the ZooKeeper configuration show 1 alive broker even more frequently than the 
Raft configuration does.  If we tweak the metadata.max.age.ms value on the 
consumers we can easily get the ZooKeeper test to fail, and in fact
  this system test has historically been flaky for the ZoKeeper configuration.  
We can get the test to pass by setting session.timeout.ms=3 (which is 
longer than the roll time of any broker), or we can increase the broker count 
so that the client never sees a METADATA response with just a single alive 
broker and therefore never loses contact with the cluster for an extended 
period of time.
   
   This patch increases the broker count for this particular system test from 2 
to 3.
   
   This patch also fixes a minor logging bug that was discovered in 
`RaftReplicaManager` that was discovered during the debugging of this issue, 
and it adds an extra logging statement when a single metadata batch is applied 
to mirror the same logging statement that occurs when deferred metadata changes 
are applied.
   
   
   
   ### Committer Checklist (excluded from commit message)
   - [ ] Verify design and implementation 
   - [ ] Verify test coverage and CI build status
   - [ ] Verify documentation (including upgrade notes)
   



This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] rondagostino commented on pull request #10322: KAFKA-12455: OffsetValidationTest.test_broker_rolling_bounce fail: Raft

2021-03-15 Thread GitBox


rondagostino commented on pull request #10322:
URL: https://github.com/apache/kafka/pull/10322#issuecomment-799777289


   This patch needs to be cherry-picked to 2.8



This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] rondagostino commented on pull request #10322: KAFKA-12455: OffsetValidationTest.test_broker_rolling_bounce fail: Raft

2021-03-15 Thread GitBox


rondagostino commented on pull request #10322:
URL: https://github.com/apache/kafka/pull/10322#issuecomment-799781073


   As per an offline conversation, since 2 brokers is a supported cluster size, 
we would prefer that this system test keep 2 brokers instead of bumping it to 3 
-- we have lots of tests that run with 3 brokers already.  So I will change the 
test to use the `session.timeout.ms=3` solution instead.



This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[jira] [Updated] (KAFKA-12469) The topic names in the metrics do not retain their format when extracting through JMX.

2021-03-15 Thread Jira


 [ 
https://issues.apache.org/jira/browse/KAFKA-12469?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Rafał Chmielewski updated KAFKA-12469:
--
Description: 
I have topic names that have a period in the name:

product.order
 product.offering.price

 

However, for the metrics issued by JMX by a program that is a consumer of Kafka 
messages, the dots are replaced with an underscore:

kafka.consumer<>records-lead

 

This creates a problem if I want to calculate the customer's lag in relation to 
the number of messages on Kafka.

 

But for the producer, this problem doesn't occur:

kafka.producer<>record-send-total

 

As a consumer I use Akka Alpakka. But I think it's using Apache library to 
connect to Kafka and report metrics via JMX.

  was:
I have topic names that have a period in the name:


product.order
product.offering.price

 

However, for the metrics issued by JMX by a program that is a consumer of Kafka 
messages, the dots are replaced with an underscore:

kafka.consumer<>records-lead

 

But for the producer, this problem doesn't occur:

kafka.producer<>record-send-total

 

As a consumer I use Akka Alpakka. But I think it's using Apache library to 
connect to Kafka and report metrics via JMX.


> The topic names in the metrics do not retain their format when extracting 
> through JMX.
> --
>
> Key: KAFKA-12469
> URL: https://issues.apache.org/jira/browse/KAFKA-12469
> Project: Kafka
>  Issue Type: Bug
>  Components: metrics
>Reporter: Rafał Chmielewski
>Priority: Major
>
> I have topic names that have a period in the name:
> product.order
>  product.offering.price
>  
> However, for the metrics issued by JMX by a program that is a consumer of 
> Kafka messages, the dots are replaced with an underscore:
> kafka.consumer client-id=consumer-export-4, topic=product_offering_price, 
> partition=1><>records-lead
>  
> This creates a problem if I want to calculate the customer's lag in relation 
> to the number of messages on Kafka.
>  
> But for the producer, this problem doesn't occur:
> kafka.producer client-id=bss.data.verification.pi_1, 
> topic=product.offering.price><>record-send-total
>  
> As a consumer I use Akka Alpakka. But I think it's using Apache library to 
> connect to Kafka and report metrics via JMX.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Updated] (KAFKA-12469) The topic names in the metrics do not retain their format when extracting through JMX.

2021-03-15 Thread Jira


 [ 
https://issues.apache.org/jira/browse/KAFKA-12469?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Rafał Chmielewski updated KAFKA-12469:
--
Component/s: consumer

> The topic names in the metrics do not retain their format when extracting 
> through JMX.
> --
>
> Key: KAFKA-12469
> URL: https://issues.apache.org/jira/browse/KAFKA-12469
> Project: Kafka
>  Issue Type: Bug
>  Components: consumer, metrics
>Reporter: Rafał Chmielewski
>Priority: Major
>
> I have topic names that have a period in the name:
> product.order
>  product.offering.price
>  
> However, for the metrics issued by JMX by a program that is a consumer of 
> Kafka messages, the dots are replaced with an underscore:
> kafka.consumer client-id=consumer-export-4, topic=product_offering_price, 
> partition=1><>records-lead
>  
> This creates a problem if I want to calculate the customer's lag in relation 
> to the number of messages on Kafka.
>  
> But for the producer, this problem doesn't occur:
> kafka.producer client-id=bss.data.verification.pi_1, 
> topic=product.offering.price><>record-send-total
>  
> As a consumer I use Akka Alpakka. But I think it's using Apache library to 
> connect to Kafka and report metrics via JMX.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[GitHub] [kafka] ijuma commented on pull request #10056: KAFKA-12293: remove JCenter and Bintray repositories mentions out of build gradle scripts (sunset is announced for those repositories)

2021-03-15 Thread GitBox


ijuma commented on pull request #10056:
URL: https://github.com/apache/kafka/pull/10056#issuecomment-799785567


   @dejan2609 can you please resolve the conflict?



This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[jira] [Created] (KAFKA-12471) Implement createPartitions in KIP-500 mode

2021-03-15 Thread Colin McCabe (Jira)
Colin McCabe created KAFKA-12471:


 Summary: Implement createPartitions in KIP-500 mode
 Key: KAFKA-12471
 URL: https://issues.apache.org/jira/browse/KAFKA-12471
 Project: Kafka
  Issue Type: New Feature
Reporter: Colin McCabe






--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Assigned] (KAFKA-12471) Implement createPartitions in KIP-500 mode

2021-03-15 Thread Colin McCabe (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-12471?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Colin McCabe reassigned KAFKA-12471:


Assignee: Colin McCabe
  Labels: kip-500  (was: )

> Implement createPartitions in KIP-500 mode
> --
>
> Key: KAFKA-12471
> URL: https://issues.apache.org/jira/browse/KAFKA-12471
> Project: Kafka
>  Issue Type: New Feature
>Reporter: Colin McCabe
>Assignee: Colin McCabe
>Priority: Major
>  Labels: kip-500
>




--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[GitHub] [kafka] hachikuji commented on a change in pull request #10309: KAFKA-12181; Loosen raft fetch offset validation of remote replicas

2021-03-15 Thread GitBox


hachikuji commented on a change in pull request #10309:
URL: https://github.com/apache/kafka/pull/10309#discussion_r594725779



##
File path: raft/src/test/java/org/apache/kafka/raft/RaftEventSimulationTest.java
##
@@ -401,6 +401,51 @@ private void checkBackToBackLeaderFailures(QuorumConfig 
config) {
 }
 }
 
+@Test
+public void checkSingleNodeCommittedDataLossQuorumSizeThree() {
+checkSingleNodeCommittedDataLoss(new QuorumConfig(3, 0));
+}
+
+private void checkSingleNodeCommittedDataLoss(QuorumConfig config) {
+assertTrue(config.numVoters > 2,
+"This test requires the cluster to be able to recover from one 
failed node");
+
+for (int seed = 0; seed < 100; seed++) {
+// We run this test without the `MonotonicEpoch` and 
`MajorityReachedHighWatermark`
+// invariants since the loss of committed data on one node can 
violate them.
+
+Cluster cluster = new Cluster(config, seed);
+EventScheduler scheduler = new EventScheduler(cluster.random, 
cluster.time);
+scheduler.addInvariant(new MonotonicHighWatermark(cluster));
+scheduler.addInvariant(new SingleLeader(cluster));
+scheduler.addValidation(new ConsistentCommittedData(cluster));
+
+MessageRouter router = new MessageRouter(cluster);
+
+cluster.startAll();
+schedulePolling(scheduler, cluster, 3, 5);
+scheduler.schedule(router::deliverAll, 0, 2, 5);
+scheduler.schedule(new SequentialAppendAction(cluster), 0, 2, 3);
+scheduler.runUntil(() -> cluster.anyReachedHighWatermark(10));
+
+RaftNode node = cluster.randomRunning().orElseThrow(() ->
+new AssertionError("Failed to find running node")
+);
+
+// Kill a random node and drop all of its persistent state. The 
Raft
+// protocol guarantees should still ensure we lose no committed 
data
+// as long as a new leader is elected before the failed node is 
restarted.
+cluster.kill(node.nodeId);
+cluster.deletePersistentState(node.nodeId);
+scheduler.runUntil(() -> !cluster.hasLeader(node.nodeId) && 
cluster.hasConsistentLeader());

Review comment:
   It amounts to the same thing because `votedIdOpt` is only set when the 
election outcome has not been determined.





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] ikdekker commented on pull request #10313: KAFKA-12456: Log "Listeners are not identical across brokers" message at WARN/INFO instead of ERROR

2021-03-15 Thread GitBox


ikdekker commented on pull request #10313:
URL: https://github.com/apache/kafka/pull/10313#issuecomment-799798100


   Hi @jolshan,
   
   I have looked into my build directories. The ones that my gradle outputs 
after running tests. These do not contain the reports. For example, both 
clients and streams modules do not contain an html file: 
   
![image](https://user-images.githubusercontent.com/9272424/111229557-63e73080-85e6-11eb-8d2b-0be54a6ffbe8.png)
   
   




This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] ikdekker edited a comment on pull request #10313: KAFKA-12456: Log "Listeners are not identical across brokers" message at WARN/INFO instead of ERROR

2021-03-15 Thread GitBox


ikdekker edited a comment on pull request #10313:
URL: https://github.com/apache/kafka/pull/10313#issuecomment-799798100


   Hi @jolshan,
   
   Thanks for the response!
   
   I have looked into my build directories. The ones that my gradle outputs 
after running tests. These do not contain the reports. For example, both 
clients and streams modules do not contain an html file: 
   
![image](https://user-images.githubusercontent.com/9272424/111229557-63e73080-85e6-11eb-8d2b-0be54a6ffbe8.png)
   
   



This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] jolshan commented on pull request #10313: KAFKA-12456: Log "Listeners are not identical across brokers" message at WARN/INFO instead of ERROR

2021-03-15 Thread GitBox


jolshan commented on pull request #10313:
URL: https://github.com/apache/kafka/pull/10313#issuecomment-799800389


   @ikdekker Hmm. When you run another test, old files are deleted. Try copy 
and pasting the file path directly after running a test.



This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[jira] [Commented] (KAFKA-10518) Consumer fetches could be inefficient when lags are unbalanced

2021-03-15 Thread Colin McCabe (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-10518?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17302075#comment-17302075
 ] 

Colin McCabe commented on KAFKA-10518:
--

One simple workaround is to increase max.poll.records so that you get more 
records per fetch for the partition that has a high rate of new records.

I do wonder whether there's any useful purpose served by "explicitly 
exclud[ing] partitions for which the consumer received data in the previous 
round".  It seems a bit like an implementation hack based on how we do 
buffering in the consumer, but I could be missing something

> Consumer fetches could be inefficient when lags are unbalanced
> --
>
> Key: KAFKA-10518
> URL: https://issues.apache.org/jira/browse/KAFKA-10518
> Project: Kafka
>  Issue Type: Bug
>Reporter: Dhruvil Shah
>Priority: Major
>
> Consumer fetches are inefficient when lags are imbalanced across partitions, 
> due to head of the line blocking and the behavior of blocking for 
> `max.wait.ms` until data is available.
> When the consumer receives a fetch response, it prepares the next fetch 
> request and sends it out. The caveat is that the subsequent fetch request 
> would explicitly exclude partitions for which the consumer received data in 
> the previous round. This is to allow the consumer application to drain the 
> data for those partitions, until the consumer fetches the other partitions it 
> is subscribed to.
> This behavior does not play out too well if the consumer is consuming when 
> the lag is unbalanced, because it would receive data for the partitions it is 
> lagging on, and then it would send a fetch request for partitions that do not 
> have any data (or have little data). The latter will end up blocking for 
> fetch.max.wait.ms on the broker before an empty response is sent back. This 
> slows down the consumer’s overall consumption throughput since 
> fetch.max.wait.ms is 500ms by default.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[GitHub] [kafka] abbccdda commented on pull request #10320: MINOR: revert stream logging level back to ERROR

2021-03-15 Thread GitBox


abbccdda commented on pull request #10320:
URL: https://github.com/apache/kafka/pull/10320#issuecomment-799811099


   Cherry-picked to 2.8



This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[jira] [Commented] (KAFKA-12453) Guidance on whether a topology is eligible for optimisation

2021-03-15 Thread A. Sophie Blee-Goldman (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-12453?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17302087#comment-17302087
 ] 

A. Sophie Blee-Goldman commented on KAFKA-12453:


[~mjsax] I thought we don't enable the changelog optimization for repartition 
topics, only "true" source topics? Did we make an exception for the new 
toTable() operator, or am I just misinterpreting the context of this ticket?

> Guidance on whether a topology is eligible for optimisation
> ---
>
> Key: KAFKA-12453
> URL: https://issues.apache.org/jira/browse/KAFKA-12453
> Project: Kafka
>  Issue Type: Improvement
>  Components: streams
>Reporter: Patrick O'Keeffe
>Priority: Major
>
> Since the introduction of KStream.toTable() in Kafka 2.6.x, the decision 
> about whether a topology is eligible for optimisation is no longer a simple 
> one, and is related to whether toTable() operations are preceded by key 
> changing operators.
> This decision requires expert level knowledge, and there are serious 
> implications associated with getting it wrong in terms of fault tolerance
> Some ideas spring to mind around how to guide developers to make the correct 
> decision:
>  # Topology.describe() could indicate whether this topology is eligible for 
> optimisation
>  # Topologies could be automatically optimised - note this may have an impact 
> at deployment time, in that an application reset may be required. The 
> developer would need to made aware of this and adjust the deployment plan 
> accordingly
>  
>  



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[GitHub] [kafka] ikdekker commented on pull request #10313: KAFKA-12456: Log "Listeners are not identical across brokers" message at WARN/INFO instead of ERROR

2021-03-15 Thread GitBox


ikdekker commented on pull request #10313:
URL: https://github.com/apache/kafka/pull/10313#issuecomment-799813229


   Directly after running (all tests), I see only a clients index.html, streams 
and core do not generate a report, even when running `./gradlew streams:test` 
or `./gradlew core:test` (which would be relevant for this PR).
   
   The streams tests seem to take a very long time and many fail due to 
connection timeouts. Is it necessary to start Zookeeper for these tests? The 
command at https://kafka.apache.org/quickstart throws an error, which may 
require some Zookeeper setup:
   `Error: Could not find or load main class 
org.apache.zookeeper.server.quorum.QuorumPeerMain
   `



This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] hachikuji opened a new pull request #10323: KAFKA-12459; Use property testing library for raft event simulation tests

2021-03-15 Thread GitBox


hachikuji opened a new pull request #10323:
URL: https://github.com/apache/kafka/pull/10323


   This patch changes the raft simulation tests to use jqwik, which is a 
property testing library. This provides two main benefits:
   
   - It simplifies the randomization of test parameters. Currently the tests 
use a fixed set of `Random` seeds, which means that most builds are doing 
redundant work. We get a bigger benefit from allowing each build to test 
different parameterizations.
   - It makes it easier to reproduce failures. Whenever a test fails, jqwik 
will report the random seed that failed. A developer can then modify the 
`@Property` annotation to use that specific seed in order to reproduce the 
failure.
   
   Note that I have resisted making logical changes to the tests themselves. 
The only difference is the way the parameters are specified.
   
   ### Committer Checklist (excluded from commit message)
   - [ ] Verify design and implementation 
   - [ ] Verify test coverage and CI build status
   - [ ] Verify documentation (including upgrade notes)
   



This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] jolshan commented on pull request #10313: KAFKA-12456: Log "Listeners are not identical across brokers" message at WARN/INFO instead of ERROR

2021-03-15 Thread GitBox


jolshan commented on pull request #10313:
URL: https://github.com/apache/kafka/pull/10313#issuecomment-799824018


   @ikdekker You don't need to follow any of the quickstart steps to run the 
tests. Simply running `./gradlew test` or any subset of tests should do the 
trick. 
   
   As for `core:test` not generating a report, that may be due to the build 
passing (no failed tests). If it says "BUILD SUCESSFUL" you are good to go.
   With streams, I noticed a gradle error in the screenshot above. If the run 
is incomplete, the report file may not be generated.



This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] jolshan edited a comment on pull request #10313: KAFKA-12456: Log "Listeners are not identical across brokers" message at WARN/INFO instead of ERROR

2021-03-15 Thread GitBox


jolshan edited a comment on pull request #10313:
URL: https://github.com/apache/kafka/pull/10313#issuecomment-799824018


   @ikdekker You don't need to follow any of the quickstart steps to run the 
tests. Simply running `./gradlew test` or any subset of tests should do the 
trick. 
   
   As for `core:test` not outputting a report, that may be due to the build 
passing (no failed tests). If it says "BUILD SUCESSFUL" you are good to go.
   With streams, I noticed a gradle error in the screenshot above. If the run 
is incomplete, the report file may not be generated.



This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] jolshan edited a comment on pull request #10313: KAFKA-12456: Log "Listeners are not identical across brokers" message at WARN/INFO instead of ERROR

2021-03-15 Thread GitBox


jolshan edited a comment on pull request #10313:
URL: https://github.com/apache/kafka/pull/10313#issuecomment-799824018


   @ikdekker You don't need to follow any of the quickstart steps to run the 
tests. Simply running `./gradlew test` or any subset of tests should do the 
trick. 
   
   As for `core:test` not outputting a report, that may be due to the build 
passing (no failed tests). If it says "BUILD SUCESSFUL" you are good to go.
   EDIT: oh hmmm, you are right. It should still output the result to the file. 
I'm not sure about why that is not happening, but it should be. It shows up for 
me. 
   
   With streams, I noticed a gradle error in the screenshot above. If the run 
is incomplete, the report file may not be generated.



This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] ikdekker commented on pull request #10313: KAFKA-12456: Log "Listeners are not identical across brokers" message at WARN/INFO instead of ERROR

2021-03-15 Thread GitBox


ikdekker commented on pull request #10313:
URL: https://github.com/apache/kafka/pull/10313#issuecomment-799830566


   Running core:tests takes around 20 minutes and fails at the end. If it is as 
you say and simply running the gradle, is it expected to have some failures? 
For example for tests that need authentication.
   
   core:tests output is:
   
   ```
   2479 tests completed, 257 failed, 13 skipped
   
   > Task :core:test FAILED
   
   FAILURE: Build failed with an exception.
   
   * What went wrong:
   Execution failed for task ':core:test'.
   > Process 'Gradle Test Executor 4' finished with non-zero exit value 1
 This problem might be caused by incorrect test process configuration.
 Please refer to the test execution section in the User Manual at 
https://docs.gradle.org/6.8.3/userguide/java_testing.html#sec:test_execution
   
   BUILD FAILED in 22m 13s
   28 actionable tasks: 3 executed, 25 up-to-date
   ```
   
   An example of failure with auth:
   ```
   kafka.api.DescribeAuthorizedOperationsTest > 
testConsumerGroupAuthorizedOperations FAILED
   org.apache.kafka.common.KafkaException: 
javax.security.auth.login.LoginException: Could not login: the client is being 
asked for a password, but the Kafka client code does not currently support 
obtaining a password from the user. not available to garner  authentication 
information from the user
   ```
   
   



This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] ikdekker edited a comment on pull request #10313: KAFKA-12456: Log "Listeners are not identical across brokers" message at WARN/INFO instead of ERROR

2021-03-15 Thread GitBox


ikdekker edited a comment on pull request #10313:
URL: https://github.com/apache/kafka/pull/10313#issuecomment-799830566


   Running core:tests takes around 20 minutes and fails at the end. If it is as 
you say and simply running the gradle, is it expected to have some failures? 
For example for tests that need authentication.
   
   core:tests output is:
   
   ```
   2479 tests completed, 257 failed, 13 skipped
   
   > Task :core:test FAILED
   
   FAILURE: Build failed with an exception.
   
   * What went wrong:
   Execution failed for task ':core:test'.
   > Process 'Gradle Test Executor 4' finished with non-zero exit value 1
 This problem might be caused by incorrect test process configuration.
 Please refer to the test execution section in the User Manual at 
https://docs.gradle.org/6.8.3/userguide/java_testing.html#sec:test_execution
   
   BUILD FAILED in 22m 13s
   28 actionable tasks: 3 executed, 25 up-to-date
   ```
   
   An example of failure with auth:
   ```
   kafka.api.DescribeAuthorizedOperationsTest > 
testConsumerGroupAuthorizedOperations FAILED
   org.apache.kafka.common.KafkaException: 
javax.security.auth.login.LoginException: Could not login: the client is being 
asked for a password, but the Kafka client code does not currently support 
obtaining a password from the user. not available to garner  authentication 
information from the user
   ```
   
   By the way, should the trunk branch also have this report generation, or is 
it unstable? And from which branch do you recommend to start the PR, 2.8?



This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] jolshan commented on pull request #10313: KAFKA-12456: Log "Listeners are not identical across brokers" message at WARN/INFO instead of ERROR

2021-03-15 Thread GitBox


jolshan commented on pull request #10313:
URL: https://github.com/apache/kafka/pull/10313#issuecomment-799842000


   @ikdekker Hmm. The best advice I can give here is running `./gradlew clean` 
and trying again.
   I've seen plenty of green runs with core tests, but yes, it usually takes 20 
min or so.
   
   Please develop in trunk. This should be the most up-to-date branch and 
reports should generate.



This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] jolshan edited a comment on pull request #10313: KAFKA-12456: Log "Listeners are not identical across brokers" message at WARN/INFO instead of ERROR

2021-03-15 Thread GitBox


jolshan edited a comment on pull request #10313:
URL: https://github.com/apache/kafka/pull/10313#issuecomment-799842000


   @ikdekker Hmm. The best advice I can give here is running `./gradlew clean` 
and trying again.
   I've seen plenty of green runs with core tests, but yes, it usually takes 20 
min or so. For running shorter tests, try `core:unitTest`
   
   Please develop in trunk. This should be the most up-to-date branch and 
reports should generate.



This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] dejan2609 commented on pull request #10056: KAFKA-12293: remove JCenter and Bintray repositories mentions out of build gradle scripts (sunset is announced for those repositories)

2021-03-15 Thread GitBox


dejan2609 commented on pull request #10056:
URL: https://github.com/apache/kafka/pull/10056#issuecomment-799842860


   @ijuma Done (conflict solved / branch force-pushed).



This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] jsancio opened a new pull request #10324: MINOR: Add a few more benchmark for the timeline map

2021-03-15 Thread GitBox


jsancio opened a new pull request #10324:
URL: https://github.com/apache/kafka/pull/10324


   Improve the benchmark tests for TimelineHashMap by adding tests for adding 
entries, removing entries and Scala's immutable hash map.
   
   *More detailed description of your change,
   if necessary. The PR title and PR message become
   the squashed commit message, so use a separate
   comment to ping reviewers.*
   
   *Summary of testing strategy (including rationale)
   for the feature or bug fix. Unit and/or integration
   tests are expected for any behaviour change and
   system tests should be considered for larger changes.*
   
   ### Committer Checklist (excluded from commit message)
   - [ ] Verify design and implementation 
   - [ ] Verify test coverage and CI build status
   - [ ] Verify documentation (including upgrade notes)
   



This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] ikdekker commented on pull request #10313: KAFKA-12456: Log "Listeners are not identical across brokers" message at WARN/INFO instead of ERROR

2021-03-15 Thread GitBox


ikdekker commented on pull request #10313:
URL: https://github.com/apache/kafka/pull/10313#issuecomment-799843740


   Alright, I will check some things out. This PR was created from trunk. I 
might try to create some tests myself, which I should be able to get output 
from as a starting point. Thanks again for your time.



This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] jsancio commented on pull request #10324: MINOR: Add a few more benchmark for the timeline map

2021-03-15 Thread GitBox


jsancio commented on pull request #10324:
URL: https://github.com/apache/kafka/pull/10324#issuecomment-799843745


   Benchmark results:
   
   ```
   # Run complete. Total time: 00:26:54
   
   Benchmark Mode  CntScore 
   Error  Units
   TimelineHashMapBenchmark.testAddEntriesInHashMap  avgt   10  238.332 
±  4.554  ms/op
   TimelineHashMapBenchmark.testAddEntriesInImmutableMap avgt   10  366.732 
±  6.463  ms/op
   TimelineHashMapBenchmark.testAddEntriesInTimelineMap  avgt   10  277.197 
±  4.699  ms/op
   TimelineHashMapBenchmark.testAddEntriesWithSnapshots  avgt   10  302.747 
±  4.959  ms/op
   TimelineHashMapBenchmark.testRemoveEntriesInHashMap   avgt   10  201.004 
±  2.675  ms/op
   TimelineHashMapBenchmark.testRemoveEntriesInImmutableMap  avgt   10  479.964 
±  7.254  ms/op
   TimelineHashMapBenchmark.testRemoveEntriesInTimelineMap   avgt   10  195.382 
±  1.917  ms/op
   TimelineHashMapBenchmark.testRemoveEntriesWithSnapshots   avgt   10  427.747 
± 12.865  ms/op
   TimelineHashMapBenchmark.testUpdateEntriesInHashMap   avgt   10  267.895 
± 20.143  ms/op
   TimelineHashMapBenchmark.testUpdateEntriesInImmutableMap  avgt   10  532.843 
±  5.766  ms/op
   TimelineHashMapBenchmark.testUpdateEntriesInTimelineMap   avgt   10  364.766 
± 25.154  ms/op
   TimelineHashMapBenchmark.testUpdateEntriesWithSnapshots   avgt   10  488.308 
± 43.992  ms/op
   JMH benchmarks done
   ```
   
   Benchmark configuration:
   
   ```
   # JMH version: 1.27
   # VM version: JDK 11.0.10, OpenJDK 64-Bit Server VM, 
11.0.10+9-Ubuntu-0ubuntu1.20.10
   # VM invoker: /usr/lib/jvm/java-11-openjdk-amd64/bin/java
   # VM options: 
   # JMH blackhole mode: full blackhole + dont-inline hint
   # Warmup: 3 iterations, 10 s each
   # Measurement: 10 iterations, 10 s each
   # Timeout: 10 min per iteration
   # Threads: 1 thread, will synchronize iterations
   # Benchmark mode: Average time, time/op
   ```



This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[jira] [Updated] (KAFKA-12428) Add a last-heartbeat-seconds-ago metric to Kafka Consumer

2021-03-15 Thread Guozhang Wang (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-12428?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Guozhang Wang updated KAFKA-12428:
--
Labels: newbie++  (was: newbie)

> Add a last-heartbeat-seconds-ago metric to Kafka Consumer
> -
>
> Key: KAFKA-12428
> URL: https://issues.apache.org/jira/browse/KAFKA-12428
> Project: Kafka
>  Issue Type: Improvement
>  Components: consumer, streams
>Reporter: Guozhang Wang
>Priority: Major
>  Labels: newbie++
>
> I have encountered several issues in the past where heartbeat requests are 
> not sent [1,2] (either in time, or ever), and today it is a bit hard to get 
> to that from the logs. I think it is better to add a metric as 
> "last-heartbeat-seconds-ago" where when rebalances were triggered we can 
> immediately find out if this is the root cause.
> 1. https://issues.apache.org/jira/browse/KAFKA-10793
> 2. https://issues.apache.org/jira/browse/KAFKA-10827



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[GitHub] [kafka] ikdekker commented on pull request #10313: KAFKA-12456: Log "Listeners are not identical across brokers" message at WARN/INFO instead of ERROR

2021-03-15 Thread GitBox


ikdekker commented on pull request #10313:
URL: https://github.com/apache/kafka/pull/10313#issuecomment-799845091


   By the way, I sent my JIRA username to the dev mail, not sure if i should 
have typed some extra lines there. But then I might try again 😄 



This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] jsancio commented on a change in pull request #10309: KAFKA-12181; Loosen raft fetch offset validation of remote replicas

2021-03-15 Thread GitBox


jsancio commented on a change in pull request #10309:
URL: https://github.com/apache/kafka/pull/10309#discussion_r594787145



##
File path: raft/src/test/java/org/apache/kafka/raft/RaftEventSimulationTest.java
##
@@ -401,6 +401,51 @@ private void checkBackToBackLeaderFailures(QuorumConfig 
config) {
 }
 }
 
+@Test
+public void checkSingleNodeCommittedDataLossQuorumSizeThree() {
+checkSingleNodeCommittedDataLoss(new QuorumConfig(3, 0));
+}
+
+private void checkSingleNodeCommittedDataLoss(QuorumConfig config) {
+assertTrue(config.numVoters > 2,
+"This test requires the cluster to be able to recover from one 
failed node");
+
+for (int seed = 0; seed < 100; seed++) {
+// We run this test without the `MonotonicEpoch` and 
`MajorityReachedHighWatermark`
+// invariants since the loss of committed data on one node can 
violate them.
+
+Cluster cluster = new Cluster(config, seed);
+EventScheduler scheduler = new EventScheduler(cluster.random, 
cluster.time);
+scheduler.addInvariant(new MonotonicHighWatermark(cluster));
+scheduler.addInvariant(new SingleLeader(cluster));
+scheduler.addValidation(new ConsistentCommittedData(cluster));
+
+MessageRouter router = new MessageRouter(cluster);
+
+cluster.startAll();
+schedulePolling(scheduler, cluster, 3, 5);
+scheduler.schedule(router::deliverAll, 0, 2, 5);
+scheduler.schedule(new SequentialAppendAction(cluster), 0, 2, 3);
+scheduler.runUntil(() -> cluster.anyReachedHighWatermark(10));
+
+RaftNode node = cluster.randomRunning().orElseThrow(() ->
+new AssertionError("Failed to find running node")
+);
+
+// Kill a random node and drop all of its persistent state. The 
Raft
+// protocol guarantees should still ensure we lose no committed 
data
+// as long as a new leader is elected before the failed node is 
restarted.
+cluster.kill(node.nodeId);
+cluster.deletePersistentState(node.nodeId);
+scheduler.runUntil(() -> !cluster.hasLeader(node.nodeId) && 
cluster.hasConsistentLeader());

Review comment:
   Got it. I missed that `votedIdOpt` is set to `empty` by the leader and 
the followers.





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[jira] [Created] (KAFKA-12472) Add a Consumer / Streams metric to indicate the current rebalance status

2021-03-15 Thread Guozhang Wang (Jira)
Guozhang Wang created KAFKA-12472:
-

 Summary: Add a Consumer / Streams metric to indicate the current 
rebalance status
 Key: KAFKA-12472
 URL: https://issues.apache.org/jira/browse/KAFKA-12472
 Project: Kafka
  Issue Type: Improvement
  Components: consumer, streams
Reporter: Guozhang Wang


Today to trouble shoot a rebalance issue operators need to do a lot of manual 
steps: locating the problematic members, search in the log entries, and look 
for related metrics. It would be great to add a single metric that covers all 
these manual steps and operators would only need to check this single signal to 
check what is the root cause. A concrete idea is to expose two enum gauge 
metrics on consumer and streams, respectively:

* Consumer level (the order below is by-design, see Streams level for details):
  0. None => there is no rebalance on going.
  1. CoordinatorRequested => any of the coordinator response contains a 
RebalanceInProgress error code.
  2. NewMember => when the join group response has a MemberIdRequired error 
code.
  3. UnknownMember => when any of the coordinator response contains an 
UnknownMember error code, indicating this member is already kicked out of the 
group.
  4. StaleMember => when any of the coordinator response contains an 
IllegalGeneration error code.
  5. DroppedGroup => when hb thread decides to call leaveGroup due to hb 
expired.
  6. UserRequested => when leaveGroup upon the shutdown / unsubscribeAll API, 
as well as upon calling the enforceRebalance API.
  7. MetadataChanged => requestRejoin triggered since metadata has changed.
  8. SubscriptionChanged => requestRejoin triggered since subscription has 
changed.
  9. RetryOnError => when join/syncGroup response contains a retriable error 
which would cause the consumer to backoff and retry.
 10. RevocationNeeded => requestRejoin triggered since revoked partitions is 
not empty.

The transition rule is that a non-zero status code can only transit to zero or 
to a higher code, but not to a lower code (same for streams, see rationales 
below).

* Streams level: today a streams client can have multiple consumers. We 
introduced some new enum states as well as aggregation rules across consumers: 
if there's no streams-layer events as below that transits its status (i.e. 
streams layer think it is 0), then we aggregate across all the embedded 
consumers and take the largest status code value as the streams metric; if 
there are streams-layer events that determines its status should be in 10+, 
then its overrides all embedded consumer layer status code. In addition, when 
create aggregated metric across streams instance within an app, we also follow 
the same aggregation rule, e.g. if there are two streams instance where one 
instance's status code is 1), and the other is 10), then the app's status is 
10).

 10. RevocationNeeded => the definition of this is changed to the original 10) 
defined in consumer above, OR leader decides to revoke either active/standby 
tasks and hence schedule follow-ups.
 11. AssignmentProbing => leader decides to schedule follow-ups since the 
current assignment is unstable.
 12. VersionProbing => leader decides to schedule follow-ups due to version 
probing.
 13. EndpointUpdate => anyone decides to schedule follow-ups due to endpoint 
updates.


The main motivations of the above proposed precedence order are the following:
1. When a rebalance is triggered by one member, all other members would only 
know it is due to CoordinatorRequested from coordinator error codes, and hence 
CoordinatorRequested should be overridden by any other status when aggregating 
across clients.
2. DroppedGroup could cause unknown/stale members that would fail and retry 
immediately, and hence should take higher precedence.
3. Revocation definition is extended in Streams, and hence it needs to take the 
highest precedence among all consumer-only status so that it would not be 
overridden by any of the consumer-only status.
4. In general, more rare events get higher precedence.

Any comments on the precedence rules / categorization are more than welcomed!



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Updated] (KAFKA-12472) Add a Consumer / Streams metric to indicate the current rebalance status

2021-03-15 Thread Guozhang Wang (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-12472?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Guozhang Wang updated KAFKA-12472:
--
Description: 
Today to trouble shoot a rebalance issue operators need to do a lot of manual 
steps: locating the problematic members, search in the log entries, and look 
for related metrics. It would be great to add a single metric that covers all 
these manual steps and operators would only need to check this single signal to 
check what is the root cause. A concrete idea is to expose two enum gauge 
metrics on consumer and streams, respectively:

* Consumer level (the order below is by-design, see Streams level for details):
  0. None => there is no rebalance on going.
  1. CoordinatorRequested => any of the coordinator response contains a 
RebalanceInProgress error code.
  2. NewMember => when the join group response has a MemberIdRequired error 
code.
  3. UnknownMember => when any of the coordinator response contains an 
UnknownMember error code, indicating this member is already kicked out of the 
group.
  4. StaleMember => when any of the coordinator response contains an 
IllegalGeneration error code.
  5. DroppedGroup => when hb thread decides to call leaveGroup due to hb 
expired.
  6. UserRequested => when leaveGroup upon the shutdown / unsubscribeAll API, 
as well as upon calling the enforceRebalance API.
  7. MetadataChanged => requestRejoin triggered since metadata has changed.
  8. SubscriptionChanged => requestRejoin triggered since subscription has 
changed.
  9. RetryOnError => when join/syncGroup response contains a retriable error 
which would cause the consumer to backoff and retry.
 10. RevocationNeeded => requestRejoin triggered since revoked partitions is 
not empty.

The transition rule is that a non-zero status code can only transit to zero or 
to a higher code, but not to a lower code (same for streams, see rationales 
below).

* Streams level: today a streams client can have multiple consumers. We 
introduced some new enum states as well as aggregation rules across consumers: 
if there's no streams-layer events as below that transits its status (i.e. 
streams layer think it is 0), then we aggregate across all the embedded 
consumers and take the largest status code value as the streams metric; if 
there are streams-layer events that determines its status should be in 10+, 
then its overrides all embedded consumer layer status code. In addition, when 
create aggregated metric across streams instance within an app, we also follow 
the same aggregation rule, e.g. if there are two streams instance where one 
instance's status code is 1), and the other is 10), then the app's status is 
10).
 10. RevocationNeeded => the definition of this is changed to the original 10) 
defined in consumer above, OR leader decides to revoke either active/standby 
tasks and hence schedule follow-ups.
 11. AssignmentProbing => leader decides to schedule follow-ups since the 
current assignment is unstable.
 12. VersionProbing => leader decides to schedule follow-ups due to version 
probing.
 13. EndpointUpdate => anyone decides to schedule follow-ups due to endpoint 
updates.


The main motivations of the above proposed precedence order are the following:
1. When a rebalance is triggered by one member, all other members would only 
know it is due to CoordinatorRequested from coordinator error codes, and hence 
CoordinatorRequested should be overridden by any other status when aggregating 
across clients.
2. DroppedGroup could cause unknown/stale members that would fail and retry 
immediately, and hence should take higher precedence.
3. Revocation definition is extended in Streams, and hence it needs to take the 
highest precedence among all consumer-only status so that it would not be 
overridden by any of the consumer-only status.
4. In general, more rare events get higher precedence.

Any comments on the precedence rules / categorization are more than welcomed!

  was:
Today to trouble shoot a rebalance issue operators need to do a lot of manual 
steps: locating the problematic members, search in the log entries, and look 
for related metrics. It would be great to add a single metric that covers all 
these manual steps and operators would only need to check this single signal to 
check what is the root cause. A concrete idea is to expose two enum gauge 
metrics on consumer and streams, respectively:

* Consumer level (the order below is by-design, see Streams level for details):
  0. None => there is no rebalance on going.
  1. CoordinatorRequested => any of the coordinator response contains a 
RebalanceInProgress error code.
  2. NewMember => when the join group response has a MemberIdRequired error 
code.
  3. UnknownMember => when any of the coordinator response contains an 
UnknownMember error code, indicating this member is already kicked out of the 
group.
  4. StaleMember => when any of the coo

[jira] [Updated] (KAFKA-12472) Add a Consumer / Streams metric to indicate the current rebalance status

2021-03-15 Thread Guozhang Wang (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-12472?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Guozhang Wang updated KAFKA-12472:
--
Description: 
Today to trouble shoot a rebalance issue operators need to do a lot of manual 
steps: locating the problematic members, search in the log entries, and look 
for related metrics. It would be great to add a single metric that covers all 
these manual steps and operators would only need to check this single signal to 
check what is the root cause. A concrete idea is to expose two enum gauge 
metrics on consumer and streams, respectively:

* Consumer level (the order below is by-design, see Streams level for details):
  0. None => there is no rebalance on going.
  1. *CoordinatorRequested* => any of the coordinator response contains a 
RebalanceInProgress error code.
  2. *NewMember* => when the join group response has a MemberIdRequired error 
code.
  3. *UnknownMember* => when any of the coordinator response contains an 
UnknownMember error code, indicating this member is already kicked out of the 
group.
  4. *StaleMember* => when any of the coordinator response contains an 
IllegalGeneration error code.
  5. *DroppedGroup* => when hb thread decides to call leaveGroup due to hb 
expired.
  6. *UserRequested* => when leaveGroup upon the shutdown / unsubscribeAll API, 
as well as upon calling the enforceRebalance API.
  7. *MetadataChanged* => requestRejoin triggered since metadata has changed.
  8. *SubscriptionChanged* => requestRejoin triggered since subscription has 
changed.
  9. *RetryOnError* => when join/syncGroup response contains a retriable error 
which would cause the consumer to backoff and retry.
 10. *RevocationNeeded* => requestRejoin triggered since revoked partitions is 
not empty.

The transition rule is that a non-zero status code can only transit to zero or 
to a higher code, but not to a lower code (same for streams, see rationales 
below).

* Streams level: today a streams client can have multiple consumers. We 
introduced some new enum states as well as aggregation rules across consumers: 
if there's no streams-layer events as below that transits its status (i.e. 
streams layer think it is 0), then we aggregate across all the embedded 
consumers and take the largest status code value as the streams metric; if 
there are streams-layer events that determines its status should be in 10+, 
then its overrides all embedded consumer layer status code. In addition, when 
create aggregated metric across streams instance within an app, we also follow 
the same aggregation rule, e.g. if there are two streams instance where one 
instance's status code is 1), and the other is 10), then the app's status is 
10).
 10. *RevocationNeeded* => the definition of this is changed to the original 
10) defined in consumer above, OR leader decides to revoke either 
active/standby tasks and hence schedule follow-ups.
 11. *AssignmentProbing* => leader decides to schedule follow-ups since the 
current assignment is unstable.
 12. *VersionProbing* => leader decides to schedule follow-ups due to version 
probing.
 13. *EndpointUpdate* => anyone decides to schedule follow-ups due to endpoint 
updates.


The main motivations of the above proposed precedence order are the following:
1. When a rebalance is triggered by one member, all other members would only 
know it is due to CoordinatorRequested from coordinator error codes, and hence 
CoordinatorRequested should be overridden by any other status when aggregating 
across clients.
2. DroppedGroup could cause unknown/stale members that would fail and retry 
immediately, and hence should take higher precedence.
3. Revocation definition is extended in Streams, and hence it needs to take the 
highest precedence among all consumer-only status so that it would not be 
overridden by any of the consumer-only status.
4. In general, more rare events get higher precedence.

Any comments on the precedence rules / categorization are more than welcomed!

  was:
Today to trouble shoot a rebalance issue operators need to do a lot of manual 
steps: locating the problematic members, search in the log entries, and look 
for related metrics. It would be great to add a single metric that covers all 
these manual steps and operators would only need to check this single signal to 
check what is the root cause. A concrete idea is to expose two enum gauge 
metrics on consumer and streams, respectively:

* Consumer level (the order below is by-design, see Streams level for details):
  0. None => there is no rebalance on going.
  1. CoordinatorRequested => any of the coordinator response contains a 
RebalanceInProgress error code.
  2. NewMember => when the join group response has a MemberIdRequired error 
code.
  3. UnknownMember => when any of the coordinator response contains an 
UnknownMember error code, indicating this member is already kicked out of the 
group.
  4. StaleM

[jira] [Updated] (KAFKA-12472) Add a Consumer / Streams metric to indicate the current rebalance status

2021-03-15 Thread Guozhang Wang (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-12472?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Guozhang Wang updated KAFKA-12472:
--
Description: 
Today to trouble shoot a rebalance issue operators need to do a lot of manual 
steps: locating the problematic members, search in the log entries, and look 
for related metrics. It would be great to add a single metric that covers all 
these manual steps and operators would only need to check this single signal to 
check what is the root cause. A concrete idea is to expose two enum gauge 
metrics on consumer and streams, respectively:

* Consumer level (the order below is by-design, see Streams level for details):
  0. *None* => there is no rebalance on going.
  1. *CoordinatorRequested* => any of the coordinator response contains a 
RebalanceInProgress error code.
  2. *NewMember* => when the join group response has a MemberIdRequired error 
code.
  3. *UnknownMember* => when any of the coordinator response contains an 
UnknownMember error code, indicating this member is already kicked out of the 
group.
  4. *StaleMember* => when any of the coordinator response contains an 
IllegalGeneration error code.
  5. *DroppedGroup* => when hb thread decides to call leaveGroup due to hb 
expired.
  6. *UserRequested* => when leaveGroup upon the shutdown / unsubscribeAll API, 
as well as upon calling the enforceRebalance API.
  7. *MetadataChanged* => requestRejoin triggered since metadata has changed.
  8. *SubscriptionChanged* => requestRejoin triggered since subscription has 
changed.
  9. *RetryOnError* => when join/syncGroup response contains a retriable error 
which would cause the consumer to backoff and retry.
 10. *RevocationNeeded* => requestRejoin triggered since revoked partitions is 
not empty.

The transition rule is that a non-zero status code can only transit to zero or 
to a higher code, but not to a lower code (same for streams, see rationales 
below).

* Streams level: today a streams client can have multiple consumers. We 
introduced some new enum states as well as aggregation rules across consumers: 
if there's no streams-layer events as below that transits its status (i.e. 
streams layer think it is 0), then we aggregate across all the embedded 
consumers and take the largest status code value as the streams metric; if 
there are streams-layer events that determines its status should be in 10+, 
then its overrides all embedded consumer layer status code. In addition, when 
create aggregated metric across streams instance within an app, we also follow 
the same aggregation rule, e.g. if there are two streams instance where one 
instance's status code is 1), and the other is 10), then the app's status is 
10).
 10. *RevocationNeeded* => the definition of this is changed to the original 
10) defined in consumer above, OR leader decides to revoke either 
active/standby tasks and hence schedule follow-ups.
 11. *AssignmentProbing* => leader decides to schedule follow-ups since the 
current assignment is unstable.
 12. *VersionProbing* => leader decides to schedule follow-ups due to version 
probing.
 13. *EndpointUpdate* => anyone decides to schedule follow-ups due to endpoint 
updates.


The main motivations of the above proposed precedence order are the following:
1. When a rebalance is triggered by one member, all other members would only 
know it is due to CoordinatorRequested from coordinator error codes, and hence 
CoordinatorRequested should be overridden by any other status when aggregating 
across clients.
2. DroppedGroup could cause unknown/stale members that would fail and retry 
immediately, and hence should take higher precedence.
3. Revocation definition is extended in Streams, and hence it needs to take the 
highest precedence among all consumer-only status so that it would not be 
overridden by any of the consumer-only status.
4. In general, more rare events get higher precedence.

Any comments on the precedence rules / categorization are more than welcomed!

  was:
Today to trouble shoot a rebalance issue operators need to do a lot of manual 
steps: locating the problematic members, search in the log entries, and look 
for related metrics. It would be great to add a single metric that covers all 
these manual steps and operators would only need to check this single signal to 
check what is the root cause. A concrete idea is to expose two enum gauge 
metrics on consumer and streams, respectively:

* Consumer level (the order below is by-design, see Streams level for details):
  0. None => there is no rebalance on going.
  1. *CoordinatorRequested* => any of the coordinator response contains a 
RebalanceInProgress error code.
  2. *NewMember* => when the join group response has a MemberIdRequired error 
code.
  3. *UnknownMember* => when any of the coordinator response contains an 
UnknownMember error code, indicating this member is already kicked out of the 
group.
  4

[jira] [Updated] (KAFKA-10518) Consumer fetches could be inefficient when lags are unbalanced

2021-03-15 Thread Tom Lee (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-10518?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Tom Lee updated KAFKA-10518:

Attachment: kafka-slow-consumer-repro.tar.gz

> Consumer fetches could be inefficient when lags are unbalanced
> --
>
> Key: KAFKA-10518
> URL: https://issues.apache.org/jira/browse/KAFKA-10518
> Project: Kafka
>  Issue Type: Bug
>Reporter: Dhruvil Shah
>Priority: Major
> Attachments: kafka-slow-consumer-repro.tar.gz
>
>
> Consumer fetches are inefficient when lags are imbalanced across partitions, 
> due to head of the line blocking and the behavior of blocking for 
> `max.wait.ms` until data is available.
> When the consumer receives a fetch response, it prepares the next fetch 
> request and sends it out. The caveat is that the subsequent fetch request 
> would explicitly exclude partitions for which the consumer received data in 
> the previous round. This is to allow the consumer application to drain the 
> data for those partitions, until the consumer fetches the other partitions it 
> is subscribed to.
> This behavior does not play out too well if the consumer is consuming when 
> the lag is unbalanced, because it would receive data for the partitions it is 
> lagging on, and then it would send a fetch request for partitions that do not 
> have any data (or have little data). The latter will end up blocking for 
> fetch.max.wait.ms on the broker before an empty response is sent back. This 
> slows down the consumer’s overall consumption throughput since 
> fetch.max.wait.ms is 500ms by default.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Updated] (KAFKA-12472) Add a Consumer / Streams metric to indicate the current rebalance status

2021-03-15 Thread Guozhang Wang (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-12472?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Guozhang Wang updated KAFKA-12472:
--
Description: 
Today to trouble shoot a rebalance issue operators need to do a lot of manual 
steps: locating the problematic members, search in the log entries, and look 
for related metrics. It would be great to add a single metric that covers all 
these manual steps and operators would only need to check this single signal to 
check what is the root cause. A concrete idea is to expose two enum gauge 
metrics on consumer and streams, respectively:

* Consumer level (the order below is by-design, see Streams level for details):
  0. *None* => there is no rebalance on going.
  1. *CoordinatorRequested* => any of the coordinator response contains a 
RebalanceInProgress error code.
  2. *NewMember* => when the join group response has a MemberIdRequired error 
code.
  3. *UnknownMember* => when any of the coordinator response contains an 
UnknownMember error code, indicating this member is already kicked out of the 
group.
  4. *StaleMember* => when any of the coordinator response contains an 
IllegalGeneration error code.
  5. *DroppedGroup* => when hb thread decides to call leaveGroup due to hb 
expired.
  6. *UserRequested* => when leaveGroup upon the shutdown / unsubscribeAll API, 
as well as upon calling the enforceRebalance API.
  7. *MetadataChanged* => requestRejoin triggered since metadata has changed.
  8. *SubscriptionChanged* => requestRejoin triggered since subscription has 
changed.
  9. *RetryOnError* => when join/syncGroup response contains a retriable error 
which would cause the consumer to backoff and retry.
 10. *RevocationNeeded* => requestRejoin triggered since revoked partitions is 
not empty.

The transition rule is that a non-zero status code can only transit to zero or 
to a higher code, but not to a lower code (same for streams, see rationales 
below).

* Streams level: today a streams client can have multiple consumers. We 
introduced some new enum states as well as aggregation rules across consumers: 
if there's no streams-layer events as below that transits its status (i.e. 
streams layer think it is 0), then we aggregate across all the embedded 
consumers and take the largest status code value as the streams metric; if 
there are streams-layer events that determines its status should be in 10+, 
then it ignores all embedded consumer layer status code since it should has a 
higher precedence. In addition, when create aggregated metric across streams 
instance within an app, we also follow the same aggregation rule, e.g. if there 
are two streams instance where one instance's status code is 1), and the other 
is 10), then the app's status is 10).
 10. *RevocationNeeded* => the definition of this is changed to the original 
10) defined in consumer above, OR leader decides to revoke either 
active/standby tasks and hence schedule follow-ups.
 11. *AssignmentProbing* => leader decides to schedule follow-ups since the 
current assignment is unstable.
 12. *VersionProbing* => leader decides to schedule follow-ups due to version 
probing.
 13. *EndpointUpdate* => anyone decides to schedule follow-ups due to endpoint 
updates.


The main motivations of the above proposed precedence order are the following:
1. When a rebalance is triggered by one member, all other members would only 
know it is due to CoordinatorRequested from coordinator error codes, and hence 
CoordinatorRequested should be overridden by any other status when aggregating 
across clients.
2. DroppedGroup could cause unknown/stale members that would fail and retry 
immediately, and hence should take higher precedence.
3. Revocation definition is extended in Streams, and hence it needs to take the 
highest precedence among all consumer-only status so that it would not be 
overridden by any of the consumer-only status.
4. In general, more rare events get higher precedence.

Any comments on the precedence rules / categorization are more than welcomed!

  was:
Today to trouble shoot a rebalance issue operators need to do a lot of manual 
steps: locating the problematic members, search in the log entries, and look 
for related metrics. It would be great to add a single metric that covers all 
these manual steps and operators would only need to check this single signal to 
check what is the root cause. A concrete idea is to expose two enum gauge 
metrics on consumer and streams, respectively:

* Consumer level (the order below is by-design, see Streams level for details):
  0. *None* => there is no rebalance on going.
  1. *CoordinatorRequested* => any of the coordinator response contains a 
RebalanceInProgress error code.
  2. *NewMember* => when the join group response has a MemberIdRequired error 
code.
  3. *UnknownMember* => when any of the coordinator response contains an 
UnknownMember error code, indicating this member i

[jira] [Updated] (KAFKA-8127) It may need to import scala.io

2021-03-15 Thread JieFang.He (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-8127?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

JieFang.He updated KAFKA-8127:
--
Description: 
I get an error when compile kafka,which disappear when import scala.io

 
{code:java}
core\src\main\scala\kafka\tools\StateChangeLogMerger.scala:140: object Source 
is not a member of package io
val lineIterators = files.map(io.Source.fromFile(_).getLines)
^
6 warnings found
one error found
:core:compileScala FAILED

FAILURE: Build failed with an exception.

* What went wrong:
Execution failed for task ':core:compileScala'.
> Compilation failed
{code}

  was:
I get an error when compile kafka,which disappear when import scala.io

 
{code:java}
D:\gerrit\Kafka\core\src\main\scala\kafka\tools\StateChangeLogMerger.scala:140: 
object Source is not a member of package io
val lineIterators = files.map(io.Source.fromFile(_).getLines)
^
6 warnings found
one error found
:core:compileScala FAILED

FAILURE: Build failed with an exception.

* What went wrong:
Execution failed for task ':core:compileScala'.
> Compilation failed
{code}


> It may need to import scala.io
> --
>
> Key: KAFKA-8127
> URL: https://issues.apache.org/jira/browse/KAFKA-8127
> Project: Kafka
>  Issue Type: Improvement
>Reporter: JieFang.He
>Priority: Major
>
> I get an error when compile kafka,which disappear when import scala.io
>  
> {code:java}
> core\src\main\scala\kafka\tools\StateChangeLogMerger.scala:140: object Source 
> is not a member of package io
> val lineIterators = files.map(io.Source.fromFile(_).getLines)
> ^
> 6 warnings found
> one error found
> :core:compileScala FAILED
> FAILURE: Build failed with an exception.
> * What went wrong:
> Execution failed for task ':core:compileScala'.
> > Compilation failed
> {code}



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Updated] (KAFKA-12472) Add a Consumer / Streams metric to indicate the current rebalance status

2021-03-15 Thread Guozhang Wang (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-12472?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Guozhang Wang updated KAFKA-12472:
--
Description: 
Today to trouble shoot a rebalance issue operators need to do a lot of manual 
steps: locating the problematic members, search in the log entries, and look 
for related metrics. It would be great to add a single metric that covers all 
these manual steps and operators would only need to check this single signal to 
check what is the root cause. A concrete idea is to expose two enum gauge 
metrics on consumer and streams, respectively:

* Consumer level (the order below is by-design, see Streams level for details):
  0. *None* => there is no rebalance on going.
  1. *CoordinatorRequested* => any of the coordinator response contains a 
RebalanceInProgress error code.
  2. *NewMember* => when the join group response has a MemberIdRequired error 
code.
  3. *UnknownMember* => when any of the coordinator response contains an 
UnknownMember error code, indicating this member is already kicked out of the 
group.
  4. *StaleMember* => when any of the coordinator response contains an 
IllegalGeneration error code.
  5. *DroppedGroup* => when hb thread decides to call leaveGroup due to hb 
expired.
  6. *UserRequested* => when leaveGroup upon the shutdown / unsubscribeAll API, 
as well as upon calling the enforceRebalance API.
  7. *MetadataChanged* => requestRejoin triggered since metadata has changed.
  8. *SubscriptionChanged* => requestRejoin triggered since subscription has 
changed.
  9. *RetryOnError* => when join/syncGroup response contains a retriable error 
which would cause the consumer to backoff and retry.
 10. *RevocationNeeded* => requestRejoin triggered since revoked partitions is 
not empty.

The transition rule is that a non-zero status code can only transit to zero or 
to a higher code, but not to a lower code (same for streams, see rationales 
below).

* Streams level: today a streams client can have multiple consumers. We 
introduced some new enum states as well as aggregation rules across consumers: 
if there's no streams-layer events as below that transits its status (i.e. 
streams layer think it is 0), then we aggregate across all the embedded 
consumers and take the largest status code value as the streams metric; if 
there are streams-layer events that determines its status should be in 10+, 
then it ignores all embedded consumer layer status code since it should has a 
higher precedence. In addition, when create aggregated metric across streams 
instance (a.k.a at the app-level, which is usually what we would care and alert 
on), we also follow the same aggregation rule, e.g. if there are two streams 
instance where one instance's status code is 1), and the other is 10), then the 
app's status is 10).
 10. *RevocationNeeded* => the definition of this is changed to the original 
10) defined in consumer above, OR leader decides to revoke either 
active/standby tasks and hence schedule follow-ups.
 11. *AssignmentProbing* => leader decides to schedule follow-ups since the 
current assignment is unstable.
 12. *VersionProbing* => leader decides to schedule follow-ups due to version 
probing.
 13. *EndpointUpdate* => anyone decides to schedule follow-ups due to endpoint 
updates.


The main motivations of the above proposed precedence order are the following:
1. When a rebalance is triggered by one member, all other members would only 
know it is due to CoordinatorRequested from coordinator error codes, and hence 
CoordinatorRequested should be overridden by any other status when aggregating 
across clients.
2. DroppedGroup could cause unknown/stale members that would fail and retry 
immediately, and hence should take higher precedence.
3. Revocation definition is extended in Streams, and hence it needs to take the 
highest precedence among all consumer-only status so that it would not be 
overridden by any of the consumer-only status.
4. In general, more rare events get higher precedence.

Any comments on the precedence rules / categorization are more than welcomed!

  was:
Today to trouble shoot a rebalance issue operators need to do a lot of manual 
steps: locating the problematic members, search in the log entries, and look 
for related metrics. It would be great to add a single metric that covers all 
these manual steps and operators would only need to check this single signal to 
check what is the root cause. A concrete idea is to expose two enum gauge 
metrics on consumer and streams, respectively:

* Consumer level (the order below is by-design, see Streams level for details):
  0. *None* => there is no rebalance on going.
  1. *CoordinatorRequested* => any of the coordinator response contains a 
RebalanceInProgress error code.
  2. *NewMember* => when the join group response has a MemberIdRequired error 
code.
  3. *UnknownMember* => when any of the coordinator response c

[jira] [Updated] (KAFKA-12472) Add a Consumer / Streams metric to indicate the current rebalance status

2021-03-15 Thread Guozhang Wang (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-12472?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Guozhang Wang updated KAFKA-12472:
--
Description: 
Today to trouble shoot a rebalance issue operators need to do a lot of manual 
steps: locating the problematic members, search in the log entries, and look 
for related metrics. It would be great to add a single metric that covers all 
these manual steps and operators would only need to check this single signal to 
check what is the root cause. A concrete idea is to expose two enum gauge 
metrics on consumer and streams, respectively:

* Consumer level (the order below is by-design, see Streams level for details):
  0. *None* => there is no rebalance on going.
  1. *CoordinatorRequested* => any of the coordinator response contains a 
RebalanceInProgress error code.
  2. *NewMember* => when the join group response has a MemberIdRequired error 
code.
  3. *UnknownMember* => when any of the coordinator response contains an 
UnknownMember error code, indicating this member is already kicked out of the 
group.
  4. *StaleMember* => when any of the coordinator response contains an 
IllegalGeneration error code.
  5. *DroppedGroup* => when hb thread decides to call leaveGroup due to hb 
expired.
  6. *UserRequested* => when leaveGroup upon the shutdown / unsubscribeAll API, 
as well as upon calling the enforceRebalance API.
  7. *MetadataChanged* => requestRejoin triggered since metadata has changed.
  8. *SubscriptionChanged* => requestRejoin triggered since subscription has 
changed.
  9. *RetryOnError* => when join/syncGroup response contains a retriable error 
which would cause the consumer to backoff and retry.
 10. *RevocationNeeded* => requestRejoin triggered since revoked partitions is 
not empty.

The transition rule is that a non-zero status code can only transit to zero or 
to a higher code, but not to a lower code (same for streams, see rationales 
below).

* Streams level: today a streams client can have multiple consumers. We 
introduced some new enum states as well as aggregation rules across consumers: 
if there's no streams-layer events as below that transits its status (i.e. 
streams layer think it is 0), then we aggregate across all the embedded 
consumers and take the largest status code value as the streams metric; if 
there are streams-layer events that determines its status should be in 10+, 
then it ignores all embedded consumer layer status code since it should has a 
higher precedence. In addition, when create aggregated metric across streams 
instance (a.k.a at the app-level, which is usually what we would care and alert 
on), we also follow the same aggregation rule, e.g. if there are two streams 
instance where one instance's status code is 1), and the other is 10), then the 
app's status is 10).
 10. *RevocationNeeded* => the definition of this is changed to the original 
10) defined in consumer above, OR leader decides to revoke either 
active/standby tasks and hence schedule follow-ups.
 11. *AssignmentProbing* => leader decides to schedule follow-ups since the 
current assignment is unstable.
 12. *VersionProbing* => leader decides to schedule follow-ups due to version 
probing.
 13. *EndpointUpdate* => anyone decides to schedule follow-ups due to endpoint 
updates.


The main motivations of the above proposed precedence order are the following:
1. When a rebalance is triggered by one member, all other members would only 
know it is due to CoordinatorRequested from coordinator error codes, and hence 
CoordinatorRequested should be overridden by any other status when aggregating 
across clients.
2. DroppedGroup could cause unknown/stale members that would fail and retry 
immediately, and hence should take higher precedence.
3. Revocation definition is extended in Streams, and hence it needs to take the 
highest precedence among all consumer-only status so that it would not be 
overridden by any of the consumer-only status.
4. In general, more rare events get higher precedence.

This is proposed on top of KAFKA-12352. Any comments on the precedence rules / 
categorization are more than welcomed!

  was:
Today to trouble shoot a rebalance issue operators need to do a lot of manual 
steps: locating the problematic members, search in the log entries, and look 
for related metrics. It would be great to add a single metric that covers all 
these manual steps and operators would only need to check this single signal to 
check what is the root cause. A concrete idea is to expose two enum gauge 
metrics on consumer and streams, respectively:

* Consumer level (the order below is by-design, see Streams level for details):
  0. *None* => there is no rebalance on going.
  1. *CoordinatorRequested* => any of the coordinator response contains a 
RebalanceInProgress error code.
  2. *NewMember* => when the join group response has a MemberIdRequired error 
code.
  3. *UnknownMember* 

[GitHub] [kafka] dengziming commented on pull request #10312: MINOR: Fix log statement whose placeholders are inconsistent with arguments

2021-03-15 Thread GitBox


dengziming commented on pull request #10312:
URL: https://github.com/apache/kafka/pull/10312#issuecomment-799873639


   Similar to #10262, ping @tombentley to have a look.



This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[jira] [Commented] (KAFKA-10518) Consumer fetches could be inefficient when lags are unbalanced

2021-03-15 Thread Tom Lee (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-10518?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17302143#comment-17302143
 ] 

Tom Lee commented on KAFKA-10518:
-

Attaching a contrived repro case. In case it's not obvious: in the event the 
repro case is run against a multi-broker cluster, key to the repro is that the 
partitions assigned to the consumer are being fetched from the same broker.

Sample output here: 
[https://gist.githubusercontent.com/thomaslee/fa13c9a10466dc35792173c2485ad84b/raw/34c02bfc9f756eced8b952530b1b6378760fd7cd/bug-repro-output
 
|https://gist.githubusercontent.com/thomaslee/fa13c9a10466dc35792173c2485ad84b/raw/34c02bfc9f756eced8b952530b1b6378760fd7cd/bug-repro-output]Note
 the throughput drop from a ballpark ~2-3M records/sec to less than 200k/sec. 
This is the point at which the _disable_topic_2_ file is created and the 
producer stops writing to topic_2.

Imagine a scenario where a consumer of topic_2 is downstream of another system 
producing to topic_2: if conditions are right, an incident impacting the 
producer could also impact the consumer. Same deal if the producer is decommed.

> Consumer fetches could be inefficient when lags are unbalanced
> --
>
> Key: KAFKA-10518
> URL: https://issues.apache.org/jira/browse/KAFKA-10518
> Project: Kafka
>  Issue Type: Bug
>Reporter: Dhruvil Shah
>Priority: Major
> Attachments: kafka-slow-consumer-repro.tar.gz
>
>
> Consumer fetches are inefficient when lags are imbalanced across partitions, 
> due to head of the line blocking and the behavior of blocking for 
> `max.wait.ms` until data is available.
> When the consumer receives a fetch response, it prepares the next fetch 
> request and sends it out. The caveat is that the subsequent fetch request 
> would explicitly exclude partitions for which the consumer received data in 
> the previous round. This is to allow the consumer application to drain the 
> data for those partitions, until the consumer fetches the other partitions it 
> is subscribed to.
> This behavior does not play out too well if the consumer is consuming when 
> the lag is unbalanced, because it would receive data for the partitions it is 
> lagging on, and then it would send a fetch request for partitions that do not 
> have any data (or have little data). The latter will end up blocking for 
> fetch.max.wait.ms on the broker before an empty response is sent back. This 
> slows down the consumer’s overall consumption throughput since 
> fetch.max.wait.ms is 500ms by default.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


  1   2   >