Re: [VOTE] KIP-794: Strictly Uniform Sticky Partitioner

2022-03-26 Thread Lucas Bradstreet
Hi Artem,

Thank you for all the work on this. I think it'll be quite impactful.

+1 non-binding from me.

Lucas

On Wed, Mar 23, 2022 at 8:27 PM Luke Chen  wrote:

> Hi Artem,
>
> Thanks for the KIP and the patience during discussion!
> +1 binding from me.
>
> Luke
>
> On Thu, Mar 24, 2022 at 3:43 AM Ismael Juma  wrote:
>
> > Thanks for the KIP and for taking the time to address all the feedback.
> +1
> > (binding)
> >
> > Ismael
> >
> > On Mon, Mar 21, 2022 at 4:52 PM Artem Livshits
> >  wrote:
> >
> > > Hi all,
> > >
> > > I'd like to start a vote on
> > >
> > >
> >
> https://cwiki.apache.org/confluence/display/KAFKA/KIP-794%3A+Strictly+Uniform+Sticky+Partitioner
> > > .
> > >
> > > -Artem
> > >
> >
>


Re: [ANNOUNCE] New committer: Luke Chen

2022-02-10 Thread Lucas Bradstreet
Congrats Luke :)

On Thu, Feb 10, 2022 at 3:35 AM Jorge Esteban Quilcate Otoya <
quilcate.jo...@gmail.com> wrote:

> Congratulations Luke!
>
> On Thu, 10 Feb 2022 at 09:20, Bruno Cadonna  wrote:
>
> > Congrats, Luke! Very well deserved!
> >
> > Best,
> > Bruno
> >
> > On 10.02.22 09:20, Manikumar wrote:
> > > Congrats Luke!
> > >
> > > On Thu, Feb 10, 2022 at 1:36 PM Mickael Maison <
> mickael.mai...@gmail.com
> > >
> > > wrote:
> > >
> > >> Congratulations Luke!
> > >>
> > >> On Thu, Feb 10, 2022 at 8:54 AM Tom Bentley 
> > wrote:
> > >>>
> > >>> Congratulations Luke!
> > >>>
> > >>> On Thu, 10 Feb 2022 at 06:41, Josep Prat  >
> > >>> wrote:
> > >>>
> >  Congrats Luke!
> > 
> >  ———
> >  Josep Prat
> > 
> >  Aiven Deutschland GmbH
> > 
> >  Immanuelkirchstraße 26, 10405 Berlin
> > 
> >  Amtsgericht Charlottenburg, HRB 209739 B
> > 
> >  Geschäftsführer: Oskari Saarenmaa & Hannu Valtonen
> > 
> >  m: +491715557497
> > 
> >  w: aiven.io
> > 
> >  e: josep.p...@aiven.io
> > 
> >  On Thu, Feb 10, 2022, 07:07 Randall Hauch  wrote:
> > 
> > > Congratulations, Luke!
> > >
> > > On Wed, Feb 9, 2022 at 11:02 PM Matthias J. Sax 
> >  wrote:
> > >
> > >> Congratulations! Glad to have you onboard, Luke!
> > >>
> > >> -Matthias
> > >>
> > >> On 2/9/22 16:37, Bill Bejeck wrote:
> > >>> Congrats Luke! Well deserved.
> > >>>
> > >>> -Bill
> > >>>
> > >>> On Wed, Feb 9, 2022 at 7:25 PM Israel Ekpo  > >>>
> > > wrote:
> > >>>
> >  Congratulations Luke!
> > 
> >  Thank you for your service
> > 
> >  On Wed, Feb 9, 2022 at 6:22 PM Guozhang Wang <
> > >> wangg...@gmail.com>
> > >> wrote:
> > 
> > > The PMC for Apache Kafka has invited Luke Chen (showuon) as a
> > > committer
> >  and
> > > we are pleased to announce that he has accepted!
> > >
> > > Luke has been actively contributing to Kafka since early 2020.
> > >> He
> >  has
> > > made more than 120 commits on various components of Kafka, with
> > > notable
> > > contributions to the rebalance protocol in Consumer and Streams
> > >> (KIP-766,
> > > KIP-726, KIP-591, KAFKA-12675 and KAFKA12464, to just name a
> > >> few),
> >  as
> >  well
> > > as making an impact on improving test stability of the project.
> >  Aside
> >  from
> > > all his code contributions, Luke has been a great participant
> > >> in
> > > discussions across the board, a very active and helpful
> > >> reviewer of
> > >> other
> > > contributors' works, all of which are super valuable and highly
> >  appreciated
> > > by the community.
> > >
> > >
> > > Thanks for all of your contributions Luke. Congratulations!
> > >
> > > -- Guozhang, on behalf of the Apache Kafka PMC
> > >
> >  --
> >  Israel Ekpo
> >  Lead Instructor, IzzyAcademy.com
> >  https://www.youtube.com/c/izzyacademy
> >  https://izzyacademy.com/
> > 
> > >>>
> > >>
> > >
> > 
> > >>
> > >
> >
>


Re: [DISCUSS] KIP-782: Expandable batch size in producer

2021-12-10 Thread Lucas Bradstreet
Hi Jun,

One difference compared to increasing the default batch size is that users
may actually prefer smaller batches but it makes much less sense to
accumulate many small batches if a batch is already sending.

For example, imagine a user that prefer 16K batches with 5ms linger.
Everything is functioning normally and 16KB batches are being sent. Then
there's a 500ms blip for that broker. Do we want to continue to accumulate
16KB batches, each of which requires a round trip, or would we prefer to
accumulate larger batches while sending is blocked?

I'm not hugely against increasing the default batch.size in general, but
batch.max.size does seem to have some nice properties.

Thanks,

Lucas

On Fri, Dec 10, 2021 at 9:42 AM Jun Rao  wrote:

> Hi, Artem, Luke,
>
> Thanks for the reply.
>
> 11. If we get rid of batch.max.size and increase the default batch.size,
> it's true the behavior is slightly different than before. However, does
> that difference matter to most of our users? In your example, if a user
> sets linger.ms to 100ms and thinks 256KB is good for throughput, does it
> matter to deliver any batch smaller than 256KB before 100ms? I also find it
> a bit hard to explain to our users these 3 different settings related to
> batch size.
>
> Thanks,
>
> Jun
>
> On Thu, Dec 9, 2021 at 5:47 AM Luke Chen  wrote:
>
> > Hi Jun,
> >
> > 11. In addition to Artem's comment, I think the reason to have additional
> > "batch.max.size" is to have more flexibility to users.
> > For example:
> > With linger.ms=100ms, batch.size=16KB, now, we have 20KB of data coming
> to
> > a partition within 50ms. Now, sender is ready to pick up the batch to
> send.
> > In current design, we send 16KB data to broker, and keep the remaining
> 4KB
> > in the producer, to keep accumulating data.
> > But after this KIP, user can send the whole 20KB of data together. That
> is,
> > user can decide if they want to accumulate more data before the sender is
> > ready, and send them together, to have higher throughput. The
> > "batch.size=16KB" in the proposal, is more like a soft limit, (and
> > "batch.max.size" is like a hard limit), or it's like a switch to enable
> the
> > batch to become ready. Before sender is ready, we can still accumulate
> more
> > data, and wrap them together to send to broker.
> >
> > User can increase "batch.size" to 20KB to achieve the same goal in the
> > current design, of course. But you can imagine, if the data within 100ms
> is
> > just 18KB, then the batch of data will wait for 100ms passed to be sent
> > out. This "batch.max.size" config will allow more flexible for user
> config.
> >
> > Does that make sense?
> >
> > Thank you.
> > Luke
> >
> > On Thu, Dec 9, 2021 at 7:53 AM Artem Livshits
> >  wrote:
> >
> > > Hi Jun,
> > >
> > > 11. That was my initial thinking as well, but in a discussion some
> people
> > > pointed out the change of behavior in some scenarios.  E.g. if someone
> > for
> > > some reason really wants batches to be at least 16KB and sets large
> > > linger.ms, and most of the time the batches are filled quickly enough
> > and
> > > they observe a certain latency.  Then they upgrade their client with a
> > > default size 256KB and the latency increases.  This could be seen as a
> > > regression.  It could be fixed by just reducing linger.ms to specify
> the
> > > expected latency, but still could be seen as a disruption by some
> users.
> > > The other reason to have 2 sizes is to avoid allocating large buffers
> > > upfront.
> > >
> > > -Artem
> > >
> > > On Wed, Dec 8, 2021 at 3:07 PM Jun Rao 
> wrote:
> > >
> > > > Hi, Artem,
> > > >
> > > > Thanks for the reply.
> > > >
> > > > 11. Got it. To me, batch.size is really used for throughput and not
> for
> > > > latency guarantees. There is no guarantee when 16KB will be
> > accumulated.
> > > > So, if users want any latency guarantee, they will need to specify
> > > > linger.ms accordingly.
> > > > Then, batch.size can just be used to tune for throughput.
> > > >
> > > > 20. Could we also describe the unit of compression? Is
> > > > it batch.initial.size, batch.size or batch.max.size?
> > > >
> > > > Thanks,
> > > >
> > > > Jun
> > > >
> > > > On Wed, Dec 8, 2021 at 9:58 AM Artem Livshits
> > > >  wrote:
> > > >
> > > > > Hi Jun,
> > > > >
> > > > > 10. My understanding is that MemoryRecords would under the covers
> be
> > > > > allocated in chunks, so logically it still would be one
> MemoryRecords
> > > > > object, it's just instead of allocating one large chunk upfront,
> > > smaller
> > > > > chunks are allocated as needed to grow the batch and linked into a
> > > list.
> > > > >
> > > > > 11. The reason for 2 sizes is to avoid change of behavior when
> > > triggering
> > > > > batch send with large linger.ms.  Currently, a batch send is
> > triggered
> > > > > once
> > > > > the batch reaches 16KB by default, if we just raise the default to
> > > 256KB,
> > > > > then the batch send will be delayed.  Using a separate value would

[jira] [Resolved] (KAFKA-12791) ConcurrentModificationException in KafkaProducer constructor

2021-11-26 Thread Lucas Bradstreet (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-12791?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Lucas Bradstreet resolved KAFKA-12791.
--
Resolution: Fixed

> ConcurrentModificationException in KafkaProducer constructor
> 
>
> Key: KAFKA-12791
> URL: https://issues.apache.org/jira/browse/KAFKA-12791
> Project: Kafka
>  Issue Type: Bug
>    Reporter: Lucas Bradstreet
>Priority: Minor
> Fix For: 3.0.0
>
>
> Recently we have noticed multiple instances where KafkaProducers have failed 
> to constructe due to the following exception:
> {noformat}
> org.apache.kafka.common.KafkaException: Failed to construct kafka producer at 
> org.apache.kafka.clients.producer.KafkaProducer.(KafkaProducer.java:440)
>  at 
> org.apache.kafka.clients.producer.KafkaProducer.(KafkaProducer.java:291)
>  at 
> org.apache.kafka.clients.producer.KafkaProducer.(KafkaProducer.java:318)
>  java.base/java.lang.Thread.run(Thread.java:832) Caused by: 
> java.util.ConcurrentModificationException at 
> java.base/java.util.HashMap$HashIterator.nextNode(HashMap.java:1584) at 
> java.base/java.util.HashMap$KeyIterator.next(HashMap.java:1607) at 
> java.base/java.util.AbstractSet.removeAll(AbstractSet.java:171) at 
> org.apache.kafka.common.config.AbstractConfig.unused(AbstractConfig.java:221) 
> at 
> org.apache.kafka.common.config.AbstractConfig.logUnused(AbstractConfig.java:379)
>  at 
> org.apache.kafka.clients.producer.KafkaProducer.(KafkaProducer.java:433)
>  ... 9 more exception.class:org.apache.kafka.common.KafkaException 
> exception.message:Failed to construct kafka producer
> {noformat}
> It appears that this is due to the fact that `used` below is a synchronized 
> set:
>  
> {code:java}
> public Set unused() {
>  Set keys = new HashSet<>(originals.keySet());
>  keys.removeAll(used);
>  return keys;
> }{code}
> It appears that `used` is being modified while removeAll is being called. 
> This may be due to the way that keys are added to it when used:
> {code:java}
> protected Object get(String key) {
>  if (!values.containsKey(key))
>  throw new ConfigException(String.format("Unknown configuration '%s'", key));
>  used.add(key);
>  return values.get(key);
> }{code}
>  



--
This message was sent by Atlassian Jira
(v8.20.1#820001)


Re: [VOTE] KIP-764 Configurable backlog size for creating Acceptor

2021-10-18 Thread Lucas Bradstreet
The other related kernel config that I can recall is
net.ipv4.tcp_max_syn_backlog.

On Mon, Oct 18, 2021 at 4:32 PM Lucas Bradstreet  wrote:

> Thank you for the KIP. +1 (non-binding)
>
> For the implementation can we ensure that any kernel parameters that may
> need to also be adjusted are documented in the config documentation
> (e.g. net.core.somaxconn)?
>
>
> On Mon, Oct 18, 2021 at 4:23 PM Haruki Okada  wrote:
>
>> Hi Luke.
>>
>> Thank you for the vote.
>> Updated KIP to link to ServerSocket#bind javadoc.
>>
>>
>> 2021年10月18日(月) 17:00 Luke Chen :
>>
>> > Hi Okada,
>> > Thanks for the KIP.
>> > +1 (non-binding)
>> >
>> > One thing to add is that you should add ServerSocket#bind java doc link
>> > into the KIP.
>> > I don't think everyone is familiar with the definition of the method
>> > parameters.
>> >
>> > Thank you.
>> > Luke
>> >
>> > On Mon, Oct 18, 2021 at 3:43 PM Haruki Okada 
>> wrote:
>> >
>> > > Hi Kafka.
>> > >
>> > > Let me bump this VOTE thread for the KIP.
>> > > We applied proposed changes in the KIP to our large Kafka cluster by
>> > > building patched Kafka internally and confirmed it's working well.
>> > >
>> > > Please feel free to give your feedback if there's any points to be
>> > > clarified in the KIP.
>> > >
>> > > Thanks,
>> > >
>> > > 2021年8月9日(月) 11:25 Haruki Okada :
>> > >
>> > > > Thanks for your comment LI-san.
>> > > >
>> > > > Could anyone else review and vote for the KIP?
>> > > >
>> > > > I think the situation described in the KIP's motivation can happen
>> in
>> > any
>> > > > large-scale Kafka deployment, so may be helpful for many users while
>> > the
>> > > > proposed changes are small enough.
>> > > >
>> > > >
>> > > > Thanks,
>> > > >
>> > > > 2021年8月3日(火) 15:49 Xiangyuan LI :
>> > > >
>> > > >> Hi Haruki Okada:
>> > > >>   i read your comment, thx for your detail explain!
>> > > >>   add backlog parameter is a useful suggestion, hope it could
>> added to
>> > > >> kafka.
>> > > >>
>> > > >> Haruki Okada  于2021年8月2日周一 上午7:43写道:
>> > > >>
>> > > >> > Hi, Kafka.
>> > > >> >
>> > > >> > I would like to start a vote on KIP that makes SocketServer
>> > acceptor's
>> > > >> > backlog size configurable.
>> > > >> >
>> > > >> > KIP:
>> > > >> >
>> > > >> >
>> > > >>
>> > >
>> >
>> https://cwiki.apache.org/confluence/display/KAFKA/KIP-764%3A+Configurable+backlog+size+for+creating+Acceptor
>> > > >> >
>> > > >> > Discussion thread:
>> > > >> >
>> > > >> >
>> > > >>
>> > >
>> >
>> https://lists.apache.org/thread.html/rd77469b7de0190d601dd37bd6894e1352a674d08038bcfe7ff68a1e0%40%3Cdev.kafka.apache.org%3E
>> > > >> >
>> > > >> > Thanks,
>> > > >> >
>> > > >> > --
>> > > >> > 
>> > > >> > Okada Haruki
>> > > >> > ocadar...@gmail.com
>> > > >> > 
>> > > >> >
>> > > >>
>> > > >
>> > > >
>> > > > --
>> > > > 
>> > > > Okada Haruki
>> > > > ocadar...@gmail.com
>> > > > 
>> > > >
>> > >
>> > >
>> > > --
>> > > 
>> > > Okada Haruki
>> > > ocadar...@gmail.com
>> > > 
>> > >
>> >
>>
>>
>> --
>> 
>> Okada Haruki
>> ocadar...@gmail.com
>> 
>>
>


Re: [VOTE] KIP-764 Configurable backlog size for creating Acceptor

2021-10-18 Thread Lucas Bradstreet
Thank you for the KIP. +1 (non-binding)

For the implementation can we ensure that any kernel parameters that may
need to also be adjusted are documented in the config documentation
(e.g. net.core.somaxconn)?


On Mon, Oct 18, 2021 at 4:23 PM Haruki Okada  wrote:

> Hi Luke.
>
> Thank you for the vote.
> Updated KIP to link to ServerSocket#bind javadoc.
>
>
> 2021年10月18日(月) 17:00 Luke Chen :
>
> > Hi Okada,
> > Thanks for the KIP.
> > +1 (non-binding)
> >
> > One thing to add is that you should add ServerSocket#bind java doc link
> > into the KIP.
> > I don't think everyone is familiar with the definition of the method
> > parameters.
> >
> > Thank you.
> > Luke
> >
> > On Mon, Oct 18, 2021 at 3:43 PM Haruki Okada 
> wrote:
> >
> > > Hi Kafka.
> > >
> > > Let me bump this VOTE thread for the KIP.
> > > We applied proposed changes in the KIP to our large Kafka cluster by
> > > building patched Kafka internally and confirmed it's working well.
> > >
> > > Please feel free to give your feedback if there's any points to be
> > > clarified in the KIP.
> > >
> > > Thanks,
> > >
> > > 2021年8月9日(月) 11:25 Haruki Okada :
> > >
> > > > Thanks for your comment LI-san.
> > > >
> > > > Could anyone else review and vote for the KIP?
> > > >
> > > > I think the situation described in the KIP's motivation can happen in
> > any
> > > > large-scale Kafka deployment, so may be helpful for many users while
> > the
> > > > proposed changes are small enough.
> > > >
> > > >
> > > > Thanks,
> > > >
> > > > 2021年8月3日(火) 15:49 Xiangyuan LI :
> > > >
> > > >> Hi Haruki Okada:
> > > >>   i read your comment, thx for your detail explain!
> > > >>   add backlog parameter is a useful suggestion, hope it could added
> to
> > > >> kafka.
> > > >>
> > > >> Haruki Okada  于2021年8月2日周一 上午7:43写道:
> > > >>
> > > >> > Hi, Kafka.
> > > >> >
> > > >> > I would like to start a vote on KIP that makes SocketServer
> > acceptor's
> > > >> > backlog size configurable.
> > > >> >
> > > >> > KIP:
> > > >> >
> > > >> >
> > > >>
> > >
> >
> https://cwiki.apache.org/confluence/display/KAFKA/KIP-764%3A+Configurable+backlog+size+for+creating+Acceptor
> > > >> >
> > > >> > Discussion thread:
> > > >> >
> > > >> >
> > > >>
> > >
> >
> https://lists.apache.org/thread.html/rd77469b7de0190d601dd37bd6894e1352a674d08038bcfe7ff68a1e0%40%3Cdev.kafka.apache.org%3E
> > > >> >
> > > >> > Thanks,
> > > >> >
> > > >> > --
> > > >> > 
> > > >> > Okada Haruki
> > > >> > ocadar...@gmail.com
> > > >> > 
> > > >> >
> > > >>
> > > >
> > > >
> > > > --
> > > > 
> > > > Okada Haruki
> > > > ocadar...@gmail.com
> > > > 
> > > >
> > >
> > >
> > > --
> > > 
> > > Okada Haruki
> > > ocadar...@gmail.com
> > > 
> > >
> >
>
>
> --
> 
> Okada Haruki
> ocadar...@gmail.com
> 
>


[jira] [Created] (KAFKA-13342) LISR sent for topic queued for deletion in controller

2021-10-03 Thread Lucas Bradstreet (Jira)
Lucas Bradstreet created KAFKA-13342:


 Summary: LISR sent for topic queued for deletion in controller
 Key: KAFKA-13342
 URL: https://issues.apache.org/jira/browse/KAFKA-13342
 Project: Kafka
  Issue Type: Bug
Reporter: Lucas Bradstreet


Under certain conditions in some system tests a broker will be hard killed 
during a topic deletion and before its replica has moved to OfflineReplica 
state. When the broker comes back up the controller will send it a 
LeaderAndIsrRequest containing the partition causing it to recreate the 
partition locally even though it is in deleting state in the controller.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Created] (KAFKA-13194) LogCleaner may clean past highwatermark

2021-08-11 Thread Lucas Bradstreet (Jira)
Lucas Bradstreet created KAFKA-13194:


 Summary: LogCleaner may clean past highwatermark
 Key: KAFKA-13194
 URL: https://issues.apache.org/jira/browse/KAFKA-13194
 Project: Kafka
  Issue Type: Bug
Reporter: Lucas Bradstreet


Here we have the cleaning point being bounded to the active segment base offset 
and the first unstable offset. Which makes sense:
// find first segment that cannot be cleaned
// neither the active segment, nor segments with any messages closer to the 
head of the log than the minimum compaction lag time
// may be cleaned
val firstUncleanableDirtyOffset: Long = Seq(

  // we do not clean beyond the first unstable offset
  log.firstUnstableOffset,

  // the active segment is always uncleanable
  Option(log.activeSegment.baseOffset),

  // the first segment whose largest message timestamp is within a minimum 
time lag from now
  if (minCompactionLagMs > 0) \{
// dirty log segments
val dirtyNonActiveSegments = 
log.localNonActiveLogSegmentsFrom(firstDirtyOffset)
dirtyNonActiveSegments.find { s =>
  val isUncleanable = s.largestTimestamp > now - minCompactionLagMs
  debug(s"Checking if log segment may be cleaned: log='${log.name}' 
segment.baseOffset=${s.baseOffset} " +
s"segment.largestTimestamp=${s.largestTimestamp}; now - 
compactionLag=${now - minCompactionLagMs}; " +
s"is uncleanable=$isUncleanable")
  isUncleanable
}.map(_.baseOffset)
  } else None
).flatten.min
But LSO starts out as None.
  @volatile private var firstUnstableOffsetMetadata: Option[LogOffsetMetadata] 
= None
  private[log] def firstUnstableOffset: Option[Long] = 
firstUnstableOffsetMetadata.map(_.messageOffset)
For most code depending on the LSO, fetchLastStableOffsetMetadata is used to 
default it to the hwm if it's not set.
  private def fetchLastStableOffsetMetadata: LogOffsetMetadata = \{
checkIfMemoryMappedBufferClosed()

// cache the current high watermark to avoid a concurrent update 
invalidating the range check
val highWatermarkMetadata = fetchHighWatermarkMetadata

firstUnstableOffsetMetadata match {
  case Some(offsetMetadata) if offsetMetadata.messageOffset < 
highWatermarkMetadata.messageOffset =>
if (offsetMetadata.messageOffsetOnly) {
  lock synchronized {
val fullOffset = 
convertToOffsetMetadataOrThrow(offsetMetadata.messageOffset)
if (firstUnstableOffsetMetadata.contains(offsetMetadata))
  firstUnstableOffsetMetadata = Some(fullOffset)
fullOffset
  }
} else \{
  offsetMetadata
}
  case _ => highWatermarkMetadata
}
  }
This means that in the case where the hwm is prior to the active segment base, 
the log cleaner may clean past the hwm. This is most likely to occur after a 
broker restart when the log cleaner may start cleaning prior to replication 
becoming active.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Created] (KAFKA-12896) Group rebalance loop caused by repeated group leader JoinGroups

2021-06-04 Thread Lucas Bradstreet (Jira)
Lucas Bradstreet created KAFKA-12896:


 Summary: Group rebalance loop caused by repeated group leader 
JoinGroups
 Key: KAFKA-12896
 URL: https://issues.apache.org/jira/browse/KAFKA-12896
 Project: Kafka
  Issue Type: Bug
  Components: clients
Affects Versions: 2.6.0
Reporter: Lucas Bradstreet


We encountered a strange case of a rebalance loop with the "cooperative-sticky" 
assignor. The logs show the following for several hours:

 

{{Apr 7, 2021 @ 03:58:36.040[GroupCoordinator 7]: Stabilized group mygroup 
generation 19830137 (__consumer_offsets-7)}}

{{Apr 7, 2021 @ 03:58:35.992[GroupCoordinator 7]: Preparing to rebalance 
group mygroup in state PreparingRebalance with old generation 19830136 
(__consumer_offsets-7) (reason: Updating metadata for member 
mygroup-1-7ad27e07-3784-4588-97e1-d796a74d4ecc during CompletingRebalance)}}

{{Apr 7, 2021 @ 03:58:35.988[GroupCoordinator 7]: Stabilized group mygroup 
generation 19830136 (__consumer_offsets-7)}}

{{Apr 7, 2021 @ 03:58:35.972[GroupCoordinator 7]: Preparing to rebalance 
group mygroup in state PreparingRebalance with old generation 19830135 
(__consumer_offsets-7) (reason: Updating metadata for member mygroup during 
CompletingRebalance)}}

{{Apr 7, 2021 @ 03:58:35.965[GroupCoordinator 7]: Stabilized group mygroup 
generation 19830135 (__consumer_offsets-7)}}

{{Apr 7, 2021 @ 03:58:35.953[GroupCoordinator 7]: Preparing to rebalance 
group mygroup in state PreparingRebalance with old generation 19830134 
(__consumer_offsets-7) (reason: Updating metadata for member 
mygroup-7ad27e07-3784-4588-97e1-d796a74d4ecc during CompletingRebalance)}}

{{Apr 7, 2021 @ 03:58:35.941[GroupCoordinator 7]: Stabilized group mygroup 
generation 19830134 (__consumer_offsets-7)}}

{{Apr 7, 2021 @ 03:58:35.926[GroupCoordinator 7]: Preparing to rebalance 
group mygroup in state PreparingRebalance with old generation 19830133 
(__consumer_offsets-7) (reason: Updating metadata for member mygroup during 
CompletingRebalance)}}

Every single time, it was the same member that triggered the JoinGroup and it 
was always the leader of the group.{{}}

The leader has the privilege of being able to trigger a rebalance by sending 
`JoinGroup` even if its subscription metadata has not changed. But why would it 
do so?

It is possible that this is due to the same issue or a similar bug to 
https://issues.apache.org/jira/browse/KAFKA-12890.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Created] (KAFKA-12791) ConcurrentModificationException in KafkaProducer constructor

2021-05-15 Thread Lucas Bradstreet (Jira)
Lucas Bradstreet created KAFKA-12791:


 Summary: ConcurrentModificationException in KafkaProducer 
constructor
 Key: KAFKA-12791
 URL: https://issues.apache.org/jira/browse/KAFKA-12791
 Project: Kafka
  Issue Type: Bug
Reporter: Lucas Bradstreet


Recently we have noticed multiple instances where KafkaProducers have failed to 
constructe due to the following exception:
{noformat}
org.apache.kafka.common.KafkaException: Failed to construct kafka producer at 
org.apache.kafka.clients.producer.KafkaProducer.(KafkaProducer.java:440) 
at 
org.apache.kafka.clients.producer.KafkaProducer.(KafkaProducer.java:291) 
at 
org.apache.kafka.clients.producer.KafkaProducer.(KafkaProducer.java:318) 
java.base/java.lang.Thread.run(Thread.java:832) Caused by: 
java.util.ConcurrentModificationException at 
java.base/java.util.HashMap$HashIterator.nextNode(HashMap.java:1584) at 
java.base/java.util.HashMap$KeyIterator.next(HashMap.java:1607) at 
java.base/java.util.AbstractSet.removeAll(AbstractSet.java:171) at 
org.apache.kafka.common.config.AbstractConfig.unused(AbstractConfig.java:221) 
at 
org.apache.kafka.common.config.AbstractConfig.logUnused(AbstractConfig.java:379)
 at 
org.apache.kafka.clients.producer.KafkaProducer.(KafkaProducer.java:433) 
... 9 more exception.class:org.apache.kafka.common.KafkaException 
exception.message:Failed to construct kafka producer
{noformat}

It appears that this is due to the fact that `used` below is a synchronized set:



 
{code:java}
public Set unused() {
 Set keys = new HashSet<>(originals.keySet());
 keys.removeAll(used);
 return keys;
}{code}
It appears that `used` is being modified while removeAll is being called. This 
may be due to the way that keys are added to it when used:


{code:java}
protected Object get(String key) {
 if (!values.containsKey(key))
 throw new ConfigException(String.format("Unknown configuration '%s'", key));
 used.add(key);
 return values.get(key);
}{code}
 



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Created] (KAFKA-12736) KafkaProducer.flush holds onto completed ProducerBatch(s) until flush completed

2021-04-30 Thread Lucas Bradstreet (Jira)
Lucas Bradstreet created KAFKA-12736:


 Summary: KafkaProducer.flush holds onto completed ProducerBatch(s) 
until flush completed
 Key: KAFKA-12736
 URL: https://issues.apache.org/jira/browse/KAFKA-12736
 Project: Kafka
  Issue Type: Improvement
Reporter: Lucas Bradstreet


When flush is called a copy of the incomplete batches is made. This means that 
the full ProducerBatch(s) are held in memory until the flush has completed. For 
batches where the existing memory pool is used this is not as wasteful as the 
memory will already be returned to the pool, but for non pool memory it can 
only be GC'd after the flush has completed. Rather than use copyAll we can make 
a new array with only the produceFuture(s) and await on those.

 
{code:java}
/**
 * Mark all partitions as ready to send and block until the send is complete
 */
public void awaitFlushCompletion() throws InterruptedException {
 try {
 for (ProducerBatch batch : this.incomplete.copyAll())
 batch.produceFuture.await();
 } finally {
 this.flushesInProgress.decrementAndGet();
 }
}

{code}
 

 



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


Re: [ANNOUNCE] New Kafka PMC Member: Chia-Ping Tsai

2021-03-12 Thread Lucas Bradstreet
Congrats Chia-Ping! Thanks for all of your contributions.

On Sat, 13 Mar 2021 at 10:27, Bill Bejeck  wrote:

> Congratulations Chia-Ping!
> On Fri, Mar 12, 2021 at 9:03 PM Luke Chen  wrote:
>
> > Congratulations Chia-Ping!
> > 恭喜大大!!
> >
> > Luke
> >
> > deng ziming  於 2021年3月13日 週六 上午8:52 寫道:
> >
> > > Congratulations Chia-Ping!
> > >
> > > > On Mar 13, 2021, at 05:39, Sophie Blee-Goldman
> > >  wrote:
> > > >
> > > > Congrats Chia-Ping! Thanks for all your contributions
> > > >
> > > > On Fri, Mar 12, 2021 at 12:24 PM Mickael Maison <
> > > mickael.mai...@gmail.com>
> > > > wrote:
> > > >
> > > >> Congratulations Chia-Ping!
> > > >>
> > > >> On Fri, Mar 12, 2021 at 7:54 PM Israel Ekpo 
> > > wrote:
> > > >>>
> > > >>> Congrats @Chia-Ping!
> > > >>>
> > > >>> On Fri, Mar 12, 2021 at 2:23 PM Guozhang Wang 
> > > >> wrote:
> > > >>>
> > >  Congratulations Chia-Ping! Really glad to have you on the PMC.
> > > 
> > > 
> > >  Guozhang
> > > 
> > >  On Fri, Mar 12, 2021 at 11:14 AM Jun Rao  >
> > > >> wrote:
> > > 
> > > > Hi, Everyone,
> > > >
> > > > Chia-Ping Tsai has been a Kafka committer since Oct. 15,  2020.
> He
> > > >> has
> > >  been
> > > > very instrumental to the community since becoming a committer.
> It's
> > > >> my
> > > > pleasure to announce that Chia-Ping  is now a member of Kafka
> PMC.
> > > >
> > > > Congratulations Chia-Ping!
> > > >
> > > > Jun
> > > > on behalf of Apache Kafka PMC
> > > >
> > > 
> > > 
> > >  --
> > >  -- Guozhang
> > > 
> > > >>
> > >
> > >
> >
>


[jira] [Created] (KAFKA-12330) FetchSessionCache may cause starvation for partitions when FetchResponse is full

2021-02-16 Thread Lucas Bradstreet (Jira)
Lucas Bradstreet created KAFKA-12330:


 Summary: FetchSessionCache may cause starvation for partitions 
when FetchResponse is full
 Key: KAFKA-12330
 URL: https://issues.apache.org/jira/browse/KAFKA-12330
 Project: Kafka
  Issue Type: Bug
Reporter: Lucas Bradstreet


The incremental FetchSessionCache sessions deprioritizes partitions where a 
response is returned. This may happen if log metadata such as log start offset, 
hwm, etc is returned, or if data for that partition is returned.

When a fetch response fills to maxBytes, data may not be returned for 
partitions where it's available. However, the fetch response will still contain 
updates to metadata such as hwm if that metadata has changed. This can lead to 
degenerate behavior where a partition's hwm or log start offset is updated 
resulting in the next fetch being unnecessarily skipped for that partition. At 
first this appeared to be worse, as hwm updates occur frequently, but 
starvation should result in hwm movement becoming blocked, allowing a fetch to 
go through and then becoming unstuck. However, it'll still require one more 
fetch request than necessary to do so.

I believe we should only reorder the partition fetch priority if data is 
actually returned for a partition.

 
{code:java}
 {code}
{noformat}
private class PartitionIterator(val iter: FetchSession.RESP_MAP_ITER,
val updateFetchContextAndRemoveUnselected: 
Boolean)
  extends FetchSession.RESP_MAP_ITER {
  var nextElement: util.Map.Entry[TopicPartition, 
FetchResponse.PartitionData[Records]] = null

  override def hasNext: Boolean = {
while ((nextElement == null) && iter.hasNext) {
  val element = iter.next()
  val topicPart = element.getKey
  val respData = element.getValue
  val cachedPart = session.partitionMap.find(new CachedPartition(topicPart))
  val mustRespond = cachedPart.maybeUpdateResponseData(respData, 
updateFetchContextAndRemoveUnselected)
  if (mustRespond) {
nextElement = element
// Don't move partition to end of queue if we didn't actually fetch data
// This should help avoid starvation even when we are filling the fetch 
response fully while returning metadata for these partitions
if (updateFetchContextAndRemoveUnselected && respData.records != null 
&& respData.records.sizeInBytes > 0) {
  session.partitionMap.remove(cachedPart)
  session.partitionMap.mustAdd(cachedPart)
}
  } else {
if (updateFetchContextAndRemoveUnselected) {
  iter.remove()
}
  }
}
nextElement != null
  }{noformat}
 



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Created] (KAFKA-12178) Improve guard rails for consumer commit when using EOS

2021-01-11 Thread Lucas Bradstreet (Jira)
Lucas Bradstreet created KAFKA-12178:


 Summary: Improve guard rails for consumer commit when using EOS
 Key: KAFKA-12178
 URL: https://issues.apache.org/jira/browse/KAFKA-12178
 Project: Kafka
  Issue Type: Improvement
Reporter: Lucas Bradstreet


When EOS is in use, offsets are committed via the producer using the 
sendOffsetsToTransaction​ API. This is what ensures that a transaction is 
committed atomically along with the consumer offsets. Unfortunately this does 
not prevent the consumer from committing, making it easy to achieve non-EOS 
characteristics by accident. enable.auto.commit = true is the default setting 
for consumers. If this not set to false, or if commitSync/commitAsync are 
called manually offsets will no longer be committed correctly for EOS semantics.

We need more guard rails to prevent consumers from being incorrectly used in 
this way. Currently the consumers have no knowledge that a producer is even 
committing offsets and this is difficult to achieve.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Created] (KAFKA-12177) Retention is not idempotent

2021-01-11 Thread Lucas Bradstreet (Jira)
Lucas Bradstreet created KAFKA-12177:


 Summary: Retention is not idempotent
 Key: KAFKA-12177
 URL: https://issues.apache.org/jira/browse/KAFKA-12177
 Project: Kafka
  Issue Type: Improvement
Reporter: Lucas Bradstreet


Kafka today applies retention in the following order:
 # Time
 # Size
 # Log start offset


Today it is possible for a segment with offsets less than the log start offset 
to contain data that is not deletable due to time retention. This means that 
it's possible for log start offset retention to unblock further deletions as a 
result of time based retention. Note that this does require a case where the 
max timestamp for each segment increases, decreases and then increases again. 
Even so it would be nice to make retention idempotent by applying log start 
offset retention first, followed by size and time. This would also be 
potentially cheaper to perform as neither log start offset and size retention 
require the maxTimestamp for a segment to be loaded from disk after a broker 
restart.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Created] (KAFKA-10839) Improve consumer group coordinator unavailable message

2020-12-10 Thread Lucas Bradstreet (Jira)
Lucas Bradstreet created KAFKA-10839:


 Summary: Improve consumer group coordinator unavailable message
 Key: KAFKA-10839
 URL: https://issues.apache.org/jira/browse/KAFKA-10839
 Project: Kafka
  Issue Type: Improvement
Reporter: Lucas Bradstreet


When a consumer encounters an issue that triggers marking a coordinator as 
unknown, the error message it prints does not give context about the error that 
triggered it.
{noformat}
log.info("Group coordinator {} is unavailable or invalid, will attempt 
rediscovery", this.coordinator);{noformat}
These may be triggered by response errors or the coordinator becoming 
disconnected. We should improve this error message to make the cause clear.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


Re: [VOTE] KIP-673: Emit JSONs with new auto-generated schema

2020-10-29 Thread Lucas Bradstreet
Hi David,

That is a good point that we should clarify. I think we should not commit
to guaranteeing the names or the format of the particular request and
response schemas themselves, though we should guarantee that they are
parseable as JSON. The pre-existing trace logging did not guarantee this
either and it was more difficult to parse as it was not well structured. I
believe for most structured logging use cases where we are analyzing
cluster behavior this should be sufficient.

Thanks,

Lucas

On Thu, Oct 29, 2020 at 3:47 AM David Jacot  wrote:

> Hi folks,
>
> I have looked at Anastasia's PR to implement this KIP and I was wondering
> how far we want to go with the backward compatibility of this in the
> future.
> Now that we rely on the auto-generated protocol, the outputted requests and
> responses use the name of the fields defined in the schema. Until now, we
> have been renaming fields rather easily in the schemas as they were purely
> internal. With this KIP, renaming a field will break the structured request
> log.
>
> Does this imply that we will now consider the schemas as part of our public
> API? We don't discuss this point in the KIP so it is subject to
> interpretation.
>
> I think that we should be clear on that point and either commit to only
> making
> the request log parsable while not guaranteeing the format of the requests
> and
> the responses; or commit to making the schemas part of our public API.
>
> Personally, I lean towards the former at the moment. That is
> probably sufficient
> for the targeted use cases. Breaking changes are annoying but as this is
> intended to be used for debugging purposes, that may be OK.
>
> What do you guys think?
>
> Best,
> David
>
>
> On Thu, Oct 8, 2020 at 7:19 PM Anastasia Vela  wrote:
>
> > Thanks everyone for the votes. In summary, the vote passed and the KIP
> was
> > accepted with 3 binding and 3 non-binding +1s.
> >
> > Best,
> > Anastasia
> >
> > On Mon, Oct 5, 2020 at 2:50 PM Jason Gustafson 
> wrote:
> >
> > > +1 Thanks for the KIP!
> > >
> > > On Fri, Oct 2, 2020 at 4:37 PM Colin McCabe 
> wrote:
> > >
> > > > Thanks, Anastasia!  This will be a lot easier to maintain.
> > > >
> > > > +1 (binding)
> > > >
> > > > best,
> > > > Colin
> > > >
> > > > On Wed, Sep 30, 2020, at 23:57, David Jacot wrote:
> > > > > Thanks for the KIP, Anastasia.
> > > > >
> > > > > +1 (non-binding)
> > > > >
> > > > > On Thu, Oct 1, 2020 at 8:06 AM Tom Bentley 
> > > wrote:
> > > > >
> > > > > > Thanks Anastasia,
> > > > > >
> > > > > > +1 (non-binding)
> > > > > >
> > > > > >
> > > > > > On Thu, Oct 1, 2020 at 6:30 AM Gwen Shapira 
> > > wrote:
> > > > > >
> > > > > > > Thank you, this will be quite helpful.
> > > > > > >
> > > > > > > +1 (binding)
> > > > > > >
> > > > > > > On Wed, Sep 30, 2020 at 11:19 AM Anastasia Vela <
> > > av...@confluent.io>
> > > > > > > wrote:
> > > > > > > >
> > > > > > > > Hi everyone,
> > > > > > > >
> > > > > > > > Thanks again for the discussion. I'd like to start the vote
> for
> > > > this
> > > > > > KIP.
> > > > > > > >
> > > > > > >
> > > > > >
> > > >
> > >
> >
> https://cwiki.apache.org/confluence/display/KAFKA/KIP-673%3A+Emit+JSONs+with+new+auto-generated+schema
> > > > > > > >
> > > > > > > > Thanks,
> > > > > > > > Anastasia
> > > > > > >
> > > > > > >
> > > > > > >
> > > > > > > --
> > > > > > > Gwen Shapira
> > > > > > > Engineering Manager | Confluent
> > > > > > > 650.450.2760 | @gwenshap
> > > > > > > Follow us: Twitter | blog
> > > > > > >
> > > > > > >
> > > > > >
> > > > >
> > > >
> > >
> >
>


Re: [ANNOUNCE] New committer: David Jacot

2020-10-16 Thread Lucas Bradstreet
Grats David!

On Fri, Oct 16, 2020 at 9:25 AM Tom Bentley  wrote:

> Congratulations David!
>
> On Fri, Oct 16, 2020 at 5:10 PM Bill Bejeck  wrote:
>
> > Congrats David! Well deserved.
> >
> > -Bill
> >
> > On Fri, Oct 16, 2020 at 12:01 PM Gwen Shapira  wrote:
> >
> > > The PMC for Apache Kafka has invited David Jacot as a committer, and
> > > we are excited to say that he accepted!
> > >
> > > David Jacot has been contributing to Apache Kafka since July 2015 (!)
> > > and has been very active since August 2019. He contributed several
> > > notable KIPs:
> > >
> > > KIP-511: Collect and Expose Client Name and Version in Brokers
> > > KIP-559: Make the Kafka Protocol Friendlier with L7 Proxies:
> > > KIP-570: Add leader epoch in StopReplicaReques
> > > KIP-599: Throttle Create Topic, Create Partition and Delete Topic
> > > Operations
> > > KIP-496 Added an API for the deletion of consumer offsets
> > >
> > > In addition, David Jacot reviewed many community contributions and
> > > showed great technical and architectural taste. Great reviews are hard
> > > and often thankless work - but this is what makes Kafka a great
> > > product and helps us grow our community.
> > >
> > > Thanks for all the contributions, David! Looking forward to more
> > > collaboration in the Apache Kafka community.
> > >
> > > --
> > > Gwen Shapira
> > >
> >
>


Re: [VOTE] KIP-516: Topic Identifiers

2020-10-15 Thread Lucas Bradstreet
Hi Justine,

+1 (non-binding). Thanks for all your hard work on this KIP!

Lucas

On Wed, Oct 14, 2020 at 8:59 AM Jun Rao  wrote:

> Hi, Justine,
>
> Thanks for the updated KIP. +1 from me.
>
> Jun
>
> On Tue, Oct 13, 2020 at 2:38 PM Jun Rao  wrote:
>
> > Hi, Justine,
> >
> > Thanks for starting the vote. Just a few minor comments.
> >
> > 1. It seems that we should remove the topic field from the
> > StopReplicaResponse below?
> > StopReplica Response (Version: 4) => error_code [topics]
> >   error_code => INT16
> > topics => topic topic_id* [partitions]
> >
> > 2. "After controller election, upon receiving the result, assign the
> > metadata topic its unique topic ID". Will the UUID for the metadata topic
> > be written to the metadata topic itself?
> >
> > 3. The vote request is designed to support multiple topics, each of them
> > may require a different sentinel ID. Should we reserve more than one
> > sentinel ID for future usage?
> >
> > 4. UUID.randomUUID(): Could we clarify whether this method returns any
> > sentinel ID? Also, how do we expect the user to use it?
> >
> > Thanks,
> >
> > Jun
> >
> > On Mon, Oct 12, 2020 at 9:54 AM Justine Olshan 
> > wrote:
> >
> >> Hi all,
> >>
> >> After further discussion and changes to this KIP, I think we are ready
> to
> >> restart this vote.
> >>
> >> Again, here is the KIP:
> >>
> >>
> https://cwiki.apache.org/confluence/display/KAFKA/KIP-516%3A+Topic+Identifiers
> >>
> >> The discussion thread is here:
> >>
> >>
> https://lists.apache.org/thread.html/7efa8cd169cadc7dc9cf86a7c0dbbab1836ddb5024d310fcebacf80c@%3Cdev.kafka.apache.org%3E
> >>
> >> Please take a look and vote if you have a chance.
> >>
> >> Thanks,
> >> Justine
> >>
> >> On Tue, Sep 22, 2020 at 8:52 AM Justine Olshan 
> >> wrote:
> >>
> >> > Hi all,
> >> >
> >> > I'd like to call a vote on KIP-516: Topic Identifiers. Here is the
> KIP:
> >> >
> >> >
> >>
> https://cwiki.apache.org/confluence/display/KAFKA/KIP-516%3A+Topic+Identifiers
> >> >
> >> > The discussion thread is here:
> >> >
> >> >
> >>
> https://lists.apache.org/thread.html/7efa8cd169cadc7dc9cf86a7c0dbbab1836ddb5024d310fcebacf80c@%3Cdev.kafka.apache.org%3E
> >> >
> >> > Please take a look and vote if you have a chance.
> >> >
> >> > Thank you,
> >> > Justine
> >> >
> >>
> >
>


Re: [VOTE] KIP-630: Kafka Raft Snapshot

2020-10-02 Thread Lucas Bradstreet
Thanks for the KIP! Non-binding +1

On Fri, Oct 2, 2020 at 3:30 PM Guozhang Wang  wrote:

> Thanks Jose! +1 from me.
>
> On Fri, Oct 2, 2020 at 3:18 PM Jose Garcia Sancio 
> wrote:
>
> > Hi all,
> >
> > I would like to start a vote on KIP-630.
> >
> > KIP: https://cwiki.apache.org/confluence/x/exV4CQ
> > Discussion Thread:
> >
> >
> https://lists.apache.org/thread.html/r9468d1f276385695a2d6d48f6dfbdc504c445fc5745aaa606d138fed%40%3Cdev.kafka.apache.org%3E
> >
> > Thank you
> > --
> > -Jose
> >
>
>
> --
> -- Guozhang
>


Re: [VOTE] KIP-673: Emit JSONs with new auto-generated schema

2020-09-30 Thread Lucas Bradstreet
Hi Anastasia,

This looks like a great change that will make debugging cluster behavior
much easier.

+1 non binding.

Lucas

On Wed, Sep 30, 2020 at 11:19 AM Anastasia Vela  wrote:

> Hi everyone,
>
> Thanks again for the discussion. I'd like to start the vote for this KIP.
>
> https://cwiki.apache.org/confluence/display/KAFKA/KIP-673%3A+Emit+JSONs+with+new+auto-generated+schema
>
> Thanks,
> Anastasia
>


Re: [DISCUSS] KIP-516: Topic Identifiers

2020-09-25 Thread Lucas Bradstreet
Hi Ismael,

If you do not store it in a metadata file or in the directory structure
would you then
require the LeaderAndIsrRequest following startup to give you some notion
of
topic name in memory? We would still need this information for the older
protocols, but
perhaps this is what's meant by tech debt.

Once we're free of the old non-topicID requests then I think you wouldn't
need to retain the topic name.
I think the ability to easily look up topic names associated with partition
directories would still be missed
when diagnosing issues, though maybe it wouldn't be a deal breaker with the
right tooling.

Thanks,

Lucas

On Fri, Sep 25, 2020 at 7:55 AM Ismael Juma  wrote:

> Hi Lucas,
>
> Why would you include the name and id? I think you'd want to remove the
> name from the directory name right? Jason's suggestion was that if you
> remove the name from the directory, then why would you need the id name
> mapping file?
>
> Ismael
>
> On Thu, Sep 24, 2020 at 4:24 PM Lucas Bradstreet 
> wrote:
>
> > > 2. Part of the usage of the file is to have persistent storage of the
> > topic
> > ID and use it to compare with the ID supplied in the LeaderAndIsr
> Request.
> > There is some discussion in the KIP about changes to the directory
> > structure, but I believe directory changes were considered to be out of
> > scope when the KIP was written.
> >
> >
> > Yeah, I was hoping to get a better understanding of why it was taken out
> of
> > scope. Perhaps Lucas Bradstreet might have more insight about the
> decision.
> > Basically my point is that we have to create additional infrastructure
> here
> > to support the name/id mapping, so I wanted to understand if we just
> > consider this a sort of tech debt. It would be useful to cover how we
> > handle the case when this file gets corrupted. Seems like we just have to
> > trust that it matches whatever the controller tells us and rewrite it?
> >
> >
> > Hi Jason, Justine,
> >
> > My thought process is that we will likely need the metadata file whether
> we
> > rename the directories or not.
> > Linux supports filenames of up to 255 bytes and that would not be enough
> to
> > support a directory name
> >  including both the name and topic ID. Given that fact, it seemed
> > reasonable to add the metadata file
> > and leave the directory rename until some time in the future (possibly
> > never).
> >
> > If the file does get corrupted I think you're right. We would either have
> > to trust it matches what the controller tells us
> >  or error out and let an administrator resolve it by checking across
> > replicas for consistency.
> >
> > Lucas
> >
> >
> > On Thu, Sep 24, 2020 at 3:41 PM Jason Gustafson 
> > wrote:
> >
> > > Thanks Justine. Responses below:
> > >
> > > > 1. Yes, the directory will still be based on the topic names.
> > > LeaderAndIsrRequest is one of the few requests that will still contain
> > the
> > > topic name. So I think we have this covered. Sorry for confusion.
> > >
> > > Ah, you're right. My eyes passed right over the field.
> > >
> > > > 2. Part of the usage of the file is to have persistent storage of the
> > > topic
> > > ID and use it to compare with the ID supplied in the LeaderAndIsr
> > Request.
> > > There is some discussion in the KIP about changes to the directory
> > > structure, but I believe directory changes were considered to be out of
> > > scope when the KIP was written.
> > >
> > > Yeah, I was hoping to get a better understanding of why it was taken
> out
> > of
> > > scope. Perhaps Lucas Bradstreet might have more insight about the
> > decision.
> > > Basically my point is that we have to create additional infrastructure
> > here
> > > to support the name/id mapping, so I wanted to understand if we just
> > > consider this a sort of tech debt. It would be useful to cover how we
> > > handle the case when this file gets corrupted. Seems like we just have
> to
> > > trust that it matches whatever the controller tells us and rewrite it?
> > >
> > > > 3. I think this is a good point, but I again I wonder about the scope
> > of
> > > the KIP. Most of the changes mentioned in the KIP are for supporting
> > topic
> > > deletion and I believe that is why the produce request was listed under
> > > future work.
> > >
> > > That's fair. I brought it up since `Fetch` is already included. If
>

Re: [DISCUSS] KIP-516: Topic Identifiers

2020-09-24 Thread Lucas Bradstreet
> 2. Part of the usage of the file is to have persistent storage of the
topic
ID and use it to compare with the ID supplied in the LeaderAndIsr Request.
There is some discussion in the KIP about changes to the directory
structure, but I believe directory changes were considered to be out of
scope when the KIP was written.


Yeah, I was hoping to get a better understanding of why it was taken out of
scope. Perhaps Lucas Bradstreet might have more insight about the decision.
Basically my point is that we have to create additional infrastructure here
to support the name/id mapping, so I wanted to understand if we just
consider this a sort of tech debt. It would be useful to cover how we
handle the case when this file gets corrupted. Seems like we just have to
trust that it matches whatever the controller tells us and rewrite it?


Hi Jason, Justine,

My thought process is that we will likely need the metadata file whether we
rename the directories or not.
Linux supports filenames of up to 255 bytes and that would not be enough to
support a directory name
 including both the name and topic ID. Given that fact, it seemed
reasonable to add the metadata file
and leave the directory rename until some time in the future (possibly
never).

If the file does get corrupted I think you're right. We would either have
to trust it matches what the controller tells us
 or error out and let an administrator resolve it by checking across
replicas for consistency.

Lucas


On Thu, Sep 24, 2020 at 3:41 PM Jason Gustafson  wrote:

> Thanks Justine. Responses below:
>
> > 1. Yes, the directory will still be based on the topic names.
> LeaderAndIsrRequest is one of the few requests that will still contain the
> topic name. So I think we have this covered. Sorry for confusion.
>
> Ah, you're right. My eyes passed right over the field.
>
> > 2. Part of the usage of the file is to have persistent storage of the
> topic
> ID and use it to compare with the ID supplied in the LeaderAndIsr Request.
> There is some discussion in the KIP about changes to the directory
> structure, but I believe directory changes were considered to be out of
> scope when the KIP was written.
>
> Yeah, I was hoping to get a better understanding of why it was taken out of
> scope. Perhaps Lucas Bradstreet might have more insight about the decision.
> Basically my point is that we have to create additional infrastructure here
> to support the name/id mapping, so I wanted to understand if we just
> consider this a sort of tech debt. It would be useful to cover how we
> handle the case when this file gets corrupted. Seems like we just have to
> trust that it matches whatever the controller tells us and rewrite it?
>
> > 3. I think this is a good point, but I again I wonder about the scope of
> the KIP. Most of the changes mentioned in the KIP are for supporting topic
> deletion and I believe that is why the produce request was listed under
> future work.
>
> That's fair. I brought it up since `Fetch` is already included. If we've
> got `Metadata` and `Fetch`, seems we may as well do `Produce` and save an
> extra kip. No strong objection though if you want to leave it out.
>
>
> -Jason
>
>
> On Thu, Sep 24, 2020 at 3:26 PM Justine Olshan 
> wrote:
>
> > Hi Jason,
> >
> > Thanks for your comments.
> >
> > 1. Yes, the directory will still be based on the topic names.
> > LeaderAndIsrRequest is one of the few requests that will still contain
> the
> > topic name. So I think we have this covered. Sorry for confusion.
> >
> > 2. Part of the usage of the file is to have persistent storage of the
> topic
> > ID and use it to compare with the ID supplied in the LeaderAndIsr
> Request.
> > There is some discussion in the KIP about changes to the directory
> > structure, but I believe directory changes were considered to be out of
> > scope when the KIP was written.
> >
> > 3. I think this is a good point, but I again I wonder about the scope of
> > the KIP. Most of the changes mentioned in the KIP are for supporting
> topic
> > deletion and I believe that is why the produce request was listed under
> > future work.
> >
> > 4. This sounds like it might be a good solution, but I will need to
> discuss
> > more with KIP-500 folks to get the details right.
> >
> > Thanks,
> > Justine
> >
> > On Thu, Sep 24, 2020 at 12:30 PM Jason Gustafson 
> > wrote:
> >
> > > Hi Justine,
> > >
> > > Thanks for picking up this work. I have a few questions/comments:
> > >
> > > 1. It sounds like the directory structure is still going to be based on
> > > topic names. Do I have that right? One complication is tha

Re: [DISCUSS] KIP-664: Provide tooling to detect and abort hanging transactions

2020-08-27 Thread Lucas Bradstreet
>> Would it be worth returning transactional.id.expiration.ms in the
DescribeProducersResponse?

> That's an interesting thought as well. Are you trying to avoid the need to
specify it through the command line? The tool could also query the value
with DescribeConfigs I suppose.

Basically. I'm not sure how useful this will be in practice, though it
might help when debugging.

Lucas

On Thu, Aug 27, 2020 at 11:00 AM Jason Gustafson  wrote:

> Hey Lucas,
>
> Thanks for the comments. Responses below:
>
> > Given that it's possible for replica producer states to diverge from each
> other, it would be very useful if DescribeProducers(Request,Response) and
> tooling is able to query all partition replicas for their producers
>
> Yes, it makes sense to me to let DescribeProducers work on both followers
> and leaders. In fact, I'm encouraged that there are use cases for this work
> other than detecting hanging transactions. That was indeed the hope, but I
> didn't have anything specific in mind. I will update the proposal.
>
> > Would it be worth returning transactional.id.expiration.ms in the
> DescribeProducersResponse?
>
> That's an interesting thought as well. Are you trying to avoid the need to
> specify it through the command line? The tool could also query the value
> with DescribeConfigs I suppose.
>
> Thanks,
> Jason
>
> On Thu, Aug 27, 2020 at 10:48 AM Lucas Bradstreet 
> wrote:
>
> > Hi Jason,
> >
> > This looks like a very useful tool, thanks for writing it up.
> >
> > Given that it's possible for replica producer states to diverge from each
> > other, it would be very useful if DescribeProducers(Request,Response) and
> > tooling is able to query all partition replicas for their producers. One
> > way I can see this being used immediately is in kafka's system tests,
> > especially the ones that inject failures. At the end of the test we can
> > query all replicas and make sure that their states have not diverged. I
> can
> > also see it being useful when debugging production clusters too.
> >
> > Would it be worth returning transactional.id.expiration.ms in the
> > DescribeProducersResponse?
> >
> > Cheers,
> >
> > Lucas
> >
> >
> >
> > On Wed, Aug 26, 2020 at 12:12 PM Ron Dagostino 
> wrote:
> >
> > > Yes, that definitely sounds reasonable.  Thanks, Jason!
> > >
> > > Ron
> > >
> > > On Wed, Aug 26, 2020 at 3:03 PM Jason Gustafson 
> > > wrote:
> > >
> > > > Hey Ron,
> > > >
> > > > We do not typically backport new APIs to older versions. I think we
> can
> > > > however make the --abort command compatible with older versions. It
> > would
> > > > require a user to do some analysis on their own to identify a hanging
> > > > transaction, but then they can use the tool from a new release to
> > > recover.
> > > > For example, users could detect a hanging transaction through the
> > > existing
> > > > "LastStableOffsetLag" metric and then collect the needed information
> > > from a
> > > > dump of the log (or producer snapshot). It's more work, but at least
> > it's
> > > > possible. Does that sound fair?
> > > >
> > > > Thanks,
> > > > Jason
> > > >
> > > > On Wed, Aug 26, 2020 at 11:51 AM Ron Dagostino 
> > > wrote:
> > > >
> > > > > Hi Jason.  Thanks for the excellently-written KIP.
> > > > >
> > > > > Will the implementation be backported to prior Kafka versions?  The
> > > > reason
> > > > > I ask is because if it is not backported and similar functionality
> is
> > > not
> > > > > otherwise made available for older versions, then the only recourse
> > > > (aside
> > > > > from deleting and recreating the topic as you pointed out) may be
> to
> > > > > upgrade to 2.7 (or whatever version ends up getting this
> > > functionality).
> > > > > Such an upgrade may not be desirable, especially if the number of
> > > > > intermediate versions is considerable. I understand the mantra of
> > > "never
> > > > > fall too many versions behind" but the reality of it is that it
> isn't
> > > > > always the case.  Even if the version is relatively recent, an
> > upgrade
> > > > may
> > > > > still not be possible for some time, and a quicker resolution may
> be
> > > > > necessary.
> > > > >
> > > > > Ron
> > > > >
> > > > > On Wed, Aug 26, 2020 at 2:33 PM Jason Gustafson <
> ja...@confluent.io>
> > > > > wrote:
> > > > >
> > > > > > Hi All,
> > > > > >
> > > > > > I've added a proposal to handle the problem of hanging
> > transactions:
> > > > > >
> > > > > >
> > > > >
> > > >
> > >
> >
> https://cwiki.apache.org/confluence/display/KAFKA/KIP-664%3A+Provide+tooling+to+detect+and+abort+hanging+transactions
> > > > > > .
> > > > > > In theory, this should never happen. In practice, we have hit one
> > bug
> > > > > where
> > > > > > it was possible and there are few good options today to recover.
> > > Take a
> > > > > > look and let me know what you think.
> > > > > >
> > > > > > Thanks,
> > > > > > Jason
> > > > > >
> > > > >
> > > >
> > >
> >
>


Re: [DISCUSS] KIP-664: Provide tooling to detect and abort hanging transactions

2020-08-27 Thread Lucas Bradstreet
Hi Jason,

This looks like a very useful tool, thanks for writing it up.

Given that it's possible for replica producer states to diverge from each
other, it would be very useful if DescribeProducers(Request,Response) and
tooling is able to query all partition replicas for their producers. One
way I can see this being used immediately is in kafka's system tests,
especially the ones that inject failures. At the end of the test we can
query all replicas and make sure that their states have not diverged. I can
also see it being useful when debugging production clusters too.

Would it be worth returning transactional.id.expiration.ms in the
DescribeProducersResponse?

Cheers,

Lucas



On Wed, Aug 26, 2020 at 12:12 PM Ron Dagostino  wrote:

> Yes, that definitely sounds reasonable.  Thanks, Jason!
>
> Ron
>
> On Wed, Aug 26, 2020 at 3:03 PM Jason Gustafson 
> wrote:
>
> > Hey Ron,
> >
> > We do not typically backport new APIs to older versions. I think we can
> > however make the --abort command compatible with older versions. It would
> > require a user to do some analysis on their own to identify a hanging
> > transaction, but then they can use the tool from a new release to
> recover.
> > For example, users could detect a hanging transaction through the
> existing
> > "LastStableOffsetLag" metric and then collect the needed information
> from a
> > dump of the log (or producer snapshot). It's more work, but at least it's
> > possible. Does that sound fair?
> >
> > Thanks,
> > Jason
> >
> > On Wed, Aug 26, 2020 at 11:51 AM Ron Dagostino 
> wrote:
> >
> > > Hi Jason.  Thanks for the excellently-written KIP.
> > >
> > > Will the implementation be backported to prior Kafka versions?  The
> > reason
> > > I ask is because if it is not backported and similar functionality is
> not
> > > otherwise made available for older versions, then the only recourse
> > (aside
> > > from deleting and recreating the topic as you pointed out) may be to
> > > upgrade to 2.7 (or whatever version ends up getting this
> functionality).
> > > Such an upgrade may not be desirable, especially if the number of
> > > intermediate versions is considerable. I understand the mantra of
> "never
> > > fall too many versions behind" but the reality of it is that it isn't
> > > always the case.  Even if the version is relatively recent, an upgrade
> > may
> > > still not be possible for some time, and a quicker resolution may be
> > > necessary.
> > >
> > > Ron
> > >
> > > On Wed, Aug 26, 2020 at 2:33 PM Jason Gustafson 
> > > wrote:
> > >
> > > > Hi All,
> > > >
> > > > I've added a proposal to handle the problem of hanging transactions:
> > > >
> > > >
> > >
> >
> https://cwiki.apache.org/confluence/display/KAFKA/KIP-664%3A+Provide+tooling+to+detect+and+abort+hanging+transactions
> > > > .
> > > > In theory, this should never happen. In practice, we have hit one bug
> > > where
> > > > it was possible and there are few good options today to recover.
> Take a
> > > > look and let me know what you think.
> > > >
> > > > Thanks,
> > > > Jason
> > > >
> > >
> >
>


[jira] [Created] (KAFKA-10432) LeaderEpochCache is incorrectly recovered on segment recovery for epoch 0

2020-08-25 Thread Lucas Bradstreet (Jira)
Lucas Bradstreet created KAFKA-10432:


 Summary: LeaderEpochCache is incorrectly recovered on segment 
recovery for epoch 0
 Key: KAFKA-10432
 URL: https://issues.apache.org/jira/browse/KAFKA-10432
 Project: Kafka
  Issue Type: Bug
Affects Versions: 2.6.0, 2.5.0, 2.4.0, 2.3.0
Reporter: Lucas Bradstreet


I added some functionality to the system tests to compare epoch cache lineages 
([https://github.com/apache/kafka/pull/9213]), and I found a bug in leader 
epoch cache recovery.

The test hard kills a broker and the cache hasn't been flushed yet, and then it 
starts up and goes through log recovery. After recovery there is divergence in 
the epoch caches for epoch 0:
{noformat}
AssertionError: leader epochs for output-topic-1 didn't match
 [{0: 9393L, 2: 9441L, 4: 42656L},
 {0: 0L, 2: 9441L, 4: 42656L}, 
 {0: 0L, 2: 9441L, 4: 42656L}]  

  
{noformat}
The cache is supposed to include the offset for epoch 0 but in recovery it 
skips it 
[https://github.com/apache/kafka/blob/487b3682ebe0eefde3445b37ee72956451a9d15e/core/src/main/scala/kafka/log/LogSegment.scala#L364]
 due to 
[https://github.com/apache/kafka/commit/d152989f26f51b9004b881397db818ad6eaf0392].
 Then it stamps the epoch with a later offset when fetching from the leader.

I'm not sure why the recovery code includes the condition 
`batch.partitionLeaderEpoch > 0`. I discussed this with Jason Gustafson and he 
believes it may have been intended to avoid assigning negative epochs but is 
not sure why it was added. None of the tests fail with this check removed.
{noformat}
  leaderEpochCache.foreach { cache =>
if (batch.partitionLeaderEpoch > 0 && 
cache.latestEpoch.forall(batch.partitionLeaderEpoch > _))
  cache.assign(batch.partitionLeaderEpoch, batch.baseOffset)
  }
{noformat}



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Created] (KAFKA-10399) Producer and consumer clients could log IP addresses for brokers to ease debugging

2020-08-13 Thread Lucas Bradstreet (Jira)
Lucas Bradstreet created KAFKA-10399:


 Summary: Producer and consumer clients could log IP addresses for 
brokers to ease debugging
 Key: KAFKA-10399
 URL: https://issues.apache.org/jira/browse/KAFKA-10399
 Project: Kafka
  Issue Type: Improvement
  Components: consumer, producer 
Reporter: Lucas Bradstreet


Lag in DNS updates and resolution can cause connectivity problems in clients. 
client.dns.lookup = "use_all_dns_ips"
helps reduce the incidence of such issues, however it's still possible for DNS 
issues to cause real problems with clients.

The ZK client helpfully logs IP addresses with DNS addresses. We could do the 
same thing in the Kafka clients, e.g.
{noformat}
Group coordinator broker3.my.kafka.cluster.com/52.32.14.201:9092 (id: 3738382 
rack: null) is unavailable or invalid{noformat}



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Created] (KAFKA-10390) kafka-server-stop lookup is not specific enough and may kill other processes

2020-08-11 Thread Lucas Bradstreet (Jira)
Lucas Bradstreet created KAFKA-10390:


 Summary: kafka-server-stop lookup is not specific enough and may 
kill other processes
 Key: KAFKA-10390
 URL: https://issues.apache.org/jira/browse/KAFKA-10390
 Project: Kafka
  Issue Type: Bug
  Components: core
Reporter: Lucas Bradstreet


kafka-server-stop.sh picks out kafka processes by:


 
{noformat}
PIDS=$(ps ax | grep -i 'kafka\.Kafka' | grep java | grep -v grep | awk '{print 
$1}'){noformat}
 

This is not specific enough and may match unintended processes, e.g. one that 
even includes dependencies including *.kafka.kafka.*

**A better match would be:
{noformat}
PIDS=$(ps ax | grep ' kafka\.Kafka ' | grep java | grep -v grep | awk '{print 
$1}')
{noformat}



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Created] (KAFKA-9946) KAFKA-9539/StopReplicaRequest deletePartition changes may cause premature topic deletion handling in controller

2020-05-01 Thread Lucas Bradstreet (Jira)
Lucas Bradstreet created KAFKA-9946:
---

 Summary: KAFKA-9539/StopReplicaRequest deletePartition changes may 
cause premature topic deletion handling in controller
 Key: KAFKA-9946
 URL: https://issues.apache.org/jira/browse/KAFKA-9946
 Project: Kafka
  Issue Type: Bug
  Components: core
Affects Versions: 2.6.0
Reporter: Lucas Bradstreet


It seems like 
[https://github.com/apache/kafka/commit/7c7d55dbd8d42f6378d13ba02d62633366a7ede8]
 does not handle StopReplicaRequest where deletePartition(s) is set to false 
correctly when another delete topic request is outstanding at the time of the 
response being received.

In the failing code it seems like two StopReplicaRequest(s) are sent, one with 
the delete flag set on partitions, and one without. It seems like the request 
without the delete flag set on any partitions is prematurely triggering the 
controller to believe that the topic was deleted successfully.

We previously didn't set a callback if the StopReplicaRequest was not a delete 
request 
[https://github.com/apache/kafka/commit/7c7d55dbd8d42f6378d13ba02d62633366a7ede8#diff-987fef43991384a3ebec5fb55e53b577L570|https://github.com/apache/kafka/commit/7c7d55dbd8d42f6378d13ba02d62633366a7ede8#diff-987fef43991384a3ebec5fb55e53b577L570,].
 Now we set it unconditionally 
[https://github.com/apache/kafka/commit/7c7d55dbd8d42f6378d13ba02d62633366a7ede8#diff-987fef43991384a3ebec5fb55e53b577L570,]
 but the callback does not distinguish between the partition states where a 
delete was being performed and where it was not. This happens on all IBP 
versions.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Created] (KAFKA-9864) Avoid expensive QuotaViolationException usage

2020-04-14 Thread Lucas Bradstreet (Jira)
Lucas Bradstreet created KAFKA-9864:
---

 Summary: Avoid expensive QuotaViolationException usage
 Key: KAFKA-9864
 URL: https://issues.apache.org/jira/browse/KAFKA-9864
 Project: Kafka
  Issue Type: Improvement
  Components: core
Reporter: Lucas Bradstreet


QuotaViolationException generates stack traces and uses String.format in 
exception generation. QuotaViolationException is used for control flow and 
these costs add up even though the exception contents are ignored.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Created] (KAFKA-9820) validateMessagesAndAssignOffsetsCompressed allocates batch iterator which is not used

2020-04-03 Thread Lucas Bradstreet (Jira)
Lucas Bradstreet created KAFKA-9820:
---

 Summary: validateMessagesAndAssignOffsetsCompressed allocates 
batch iterator which is not used
 Key: KAFKA-9820
 URL: https://issues.apache.org/jira/browse/KAFKA-9820
 Project: Kafka
  Issue Type: Bug
  Components: core
Reporter: Lucas Bradstreet


KAFKA-8106 added a new skip key/value iterator that reduces allocations 
[https://github.com/apache/kafka/commit/3e9d1c1411c5268de382f9dfcc95bdf66d0063a0].

Unfortunately in LogValidator it creates that iterator but it never uses it, 
and this is quite expensive in terms of allocations.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Resolved] (KAFKA-8963) Benchmark and optimize incremental fetch session handler

2020-02-28 Thread Lucas Bradstreet (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-8963?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Lucas Bradstreet resolved KAFKA-8963.
-
Fix Version/s: 2.5.0
   Resolution: Fixed

> Benchmark and optimize incremental fetch session handler
> 
>
> Key: KAFKA-8963
> URL: https://issues.apache.org/jira/browse/KAFKA-8963
> Project: Kafka
>  Issue Type: Task
>    Reporter: Lucas Bradstreet
>    Assignee: Lucas Bradstreet
>Priority: Major
> Fix For: 2.5.0
>
>
> The FetchSessionHandler is a cause of high CPU usage in the replica fetcher 
> for brokers with high partition counts. A jmh benchmark should be added and 
> the incremental fetch session handling should be measured and optimized.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Created] (KAFKA-9577) Client encountering SASL_HANDSHAKE protocol version errors on 2.5 / trunk

2020-02-19 Thread Lucas Bradstreet (Jira)
Lucas Bradstreet created KAFKA-9577:
---

 Summary: Client encountering SASL_HANDSHAKE protocol version 
errors on 2.5 / trunk
 Key: KAFKA-9577
 URL: https://issues.apache.org/jira/browse/KAFKA-9577
 Project: Kafka
  Issue Type: Bug
  Components: core
Affects Versions: 2.5.0
Reporter: Lucas Bradstreet


I am trying 2.5.0 with sasl turned on and my consumer clients receive:
{noformat}
org.apache.kafka.common.errors.UnsupportedVersionException: The SASL_HANDSHAKE 
protocol does not support version 2
{noformat}
I believe this is due to 
[https://github.com/apache/kafka/commit/0a2569e2b9907a1217dd50ccbc320f8ad0b42fd0]
 which added flexible version support and bumped the protocol version.

It appears that the SaslClientAuthenticator uses the max version for 
SASL_HANDSHAKE returned by the broker's api versions request, and then uses 
that version even though it may not support it. See 
[https://github.com/apache/kafka/blob/eb09efa9ac79efa484307bdcf03ac8eb8a3a94e2/clients/src/main/java/org/apache/kafka/common/security/authenticator/SaslClientAuthenticator.java#L290].
 

This may make it hard to ever evolve this schema. In the short term I suggest 
we roll back the version bump and flexible schema until we figure out a path 
forward.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Resolved] (KAFKA-9137) Maintenance of FetchSession cache causing FETCH_SESSION_ID_NOT_FOUND in live sessions

2020-02-15 Thread Lucas Bradstreet (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-9137?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Lucas Bradstreet resolved KAFKA-9137.
-
Resolution: Fixed

Closed by [https://github.com/apache/kafka/pull/7640]

> Maintenance of FetchSession cache causing FETCH_SESSION_ID_NOT_FOUND in live 
> sessions
> -
>
> Key: KAFKA-9137
> URL: https://issues.apache.org/jira/browse/KAFKA-9137
> Project: Kafka
>  Issue Type: Bug
>  Components: core
>Reporter: Lucas Bradstreet
>Priority: Major
>
> We have recently seen cases where brokers end up in a bad state where fetch 
> session evictions occur at a high rate (> 16 per second) after a roll. This 
> increase in eviction rate included the following pattern in our logs:
>  
> {noformat}
> broker 6: October 31st 2019, 17:52:45.496 Created a new incremental 
> FetchContext for session id 2046264334, epoch 9790: added (), updated (), 
> removed ()
> broker 6: October 31st 2019, 17:52:45.496 Created a new incremental 
> FetchContext for session id 2046264334, epoch 9791: added (), updated (), 
> removed () broker 6: October 31st 2019, 17:52:45.500 Created a new 
> incremental FetchContext for session id 2046264334, epoch 9792: added (), 
> updated (lkc-7nv6o_tenant_soak_topic_144p-67), removed () 
> broker 6: October 31st 2019, 17:52:45.501 Created a new incremental 
> FetchContext for session id 2046264334, epoch 9793: added (), updated 
> (lkc-7nv6o_tenant_soak_topic_144p-59, lkc-7nv6o_tenant_soak_topic_144p-123, 
> lkc-7nv6o_tenant_soak_topic_144p-11, lkc-7nv6o_tenant_soak_topic_144p-3, 
> lkc-7nv6o_tenant_soak_topic_144p-67, lkc-7nv6o_tenant_soak_topic_144p-115), 
> removed () 
> broker 6: October 31st 2019, 17:52:45.501 Evicting stale FetchSession 
> 2046264334. 
> broker 6: October 31st 2019, 17:52:45.502 Session error for 2046264334: no 
> such session ID found. 
> broker 4: October 31st 2019, 17:52:45.813 [ReplicaFetcher replicaId=4, 
> leaderId=6, fetcherId=0] Node 6 was unable to process the fetch request with 
> (sessionId=2046264334, epoch=9793): FETCH_SESSION_ID_NOT_FOUND.  
> {noformat}
> This pattern appears to be problematic for two reasons. Firstly, the replica 
> fetcher for broker 4 was clearly able to send multiple incremental fetch 
> requests to broker 6, and receive replies, and did so right up to the point 
> where broker 6 evicted its fetch session within milliseconds of multiple 
> fetch requests. The second problem is that replica fetchers are considered 
> privileged for the fetch session cache, and should not be evicted by consumer 
> fetch sessions. This cluster only has 12 brokers and 1000 fetch session cache 
> slots (the default for max.incremental.fetch.session.cache.slots), and it 
> thus very unlikely that this session should have been evicted by another 
> replica fetcher session.
> This cluster also appears to be causing cycles of fetch session evictions 
> where the cluster never stabilizes into a state where fetch sessions are not 
> evicted. The above logs are the best example I could find of a case where a 
> session clearly should not have been evicted.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


Re: [DISCUSS] KIP-501 Avoid out-of-sync or offline partitions when follower fetch requests not processed in time

2020-02-10 Thread Lucas Bradstreet
Hi Harsha,

Is the problem you'd like addressed the following?

Assume 3 replicas, L and F1 and F2.

1. F1 and F2 are alive and sending fetch requests to L.
2. L starts encountering disk issues, any requests being processed by
the request handler threads become blocked.
3. L's zookeeper connection is still alive so it remains the leader
for the partition.
4. Given that F1 and F2 have not successfully fetched, L shrinks the
ISR to itself.

While KIP-501 may help prevent a shrink in partitions where a replica
fetch request has started processing, any fetch requests in the
request queue will have no effect. Generally when these slow/failing
disk issues occur, all of the request handler threads end up blocked
and requests queue up in the request queue. For example, all of the
request handler threads may end up stuck in
KafkaApis.handleProduceRequest handling produce requests, at which
point all of the replica fetcher fetch requests remain queued in the
request queue. If this happens, there will be no tracked fetch
requests to prevent a shrink.

Solving this shrinking issue is tricky. It would be better if L
resigns leadership when it enters a degraded state rather than
avoiding a shrink. If L is no longer the leader in this situation, it
will eventually become blocked fetching from the new leader and the
new leader will shrink the ISR, kicking out L.

Cheers,

Lucas


[jira] [Created] (KAFKA-9513) Failed GroupMetadataManager loadGroupAndOffsets will consider groups as loaded

2020-02-05 Thread Lucas Bradstreet (Jira)
Lucas Bradstreet created KAFKA-9513:
---

 Summary: Failed GroupMetadataManager loadGroupAndOffsets will 
consider groups as loaded
 Key: KAFKA-9513
 URL: https://issues.apache.org/jira/browse/KAFKA-9513
 Project: Kafka
  Issue Type: Improvement
Reporter: Lucas Bradstreet


 

Bugs in group loading such as https://issues.apache.org/jira/browse/KAFKA-8896 
may cause errors loading offsets. loadGroupsAndOffsets's finally block adds the 
offsets partition to ownedPartitions and removes it from loadingPartitions even 
if this process does not succeed.
{code:java}
private[group] def loadGroupsAndOffsets(topicPartition: TopicPartition, 
onGroupLoaded: GroupMetadata => Unit): Unit = {
  try {
val startMs = time.milliseconds()
doLoadGroupsAndOffsets(topicPartition, onGroupLoaded)
val endMs = time.milliseconds()
val timeLapse = endMs - startMs
partitionLoadSensor.record(timeLapse, endMs, false)
info(s"Finished loading offsets and group metadata from $topicPartition in 
$timeLapse milliseconds.")
  } catch {
case t: Throwable => error(s"Error loading offsets from $topicPartition", t)
  } finally {
inLock(partitionLock) {
  ownedPartitions.add(topicPartition.partition)
  loadingPartitions.remove(topicPartition.partition)
}
  }
}
{code}
This means that the group is considered loaded by:
{code:java}
def isGroupLoading(groupId: String): Boolean = 
isPartitionLoading(partitionFor(groupId))
{code}
 

Which may result in consumers being able to load the wrong offsets.

We should consider whether we should be more defensive and instead mark the 
partition as failed.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Created] (KAFKA-9401) High lock contention for kafka.server.FetchManager.newContext

2020-01-11 Thread Lucas Bradstreet (Jira)
Lucas Bradstreet created KAFKA-9401:
---

 Summary: High lock contention for 
kafka.server.FetchManager.newContext
 Key: KAFKA-9401
 URL: https://issues.apache.org/jira/browse/KAFKA-9401
 Project: Kafka
  Issue Type: Improvement
  Components: core
Reporter: Lucas Bradstreet


kafka.server.FetchManager.newContext takes out what is essentially a global 
fetch lock on kafka.server.FetchSessionCache, for updates to not only the 
FetchSessionCache but the also update the fetch sessions stored with in it. 
This causes a high amount of lock contention for fetches, as every fetch 
request must go through this lock.

I have taken an async-profiler lock profile on a high throughput cluster, and I 
see around 25s of waiting on this lock for a sixty second profile.
--- 25818577497 ns (20.84%), 5805 samples
  [ 0] kafka.server.FetchSessionCache
  [ 1] kafka.server.FetchManager.newContext
  [ 2] kafka.server.KafkaApis.handleFetchRequest
  [ 3] kafka.server.KafkaApis.handle
  [ 4] kafka.server.KafkaRequestHandler.run
  [ 5] java.lang.Thread.run



 
{code:java}

  cache.synchronized {
cache.get(reqMetadata.sessionId) match {
  case None => {
debug(s"Session error for ${reqMetadata.sessionId}: no such session ID 
found.")
new SessionErrorContext(Errors.FETCH_SESSION_ID_NOT_FOUND, reqMetadata)
  }
  case Some(session) => session.synchronized {
if (session.epoch != reqMetadata.epoch) {
  debug(s"Session error for ${reqMetadata.sessionId}: expected epoch " +
s"${session.epoch}, but got ${reqMetadata.epoch} instead.");
  new SessionErrorContext(Errors.INVALID_FETCH_SESSION_EPOCH, 
reqMetadata)
} else {
  val (added, updated, removed) = session.update(fetchData, toForget, 
reqMetadata)
  if (session.isEmpty) {
debug(s"Created a new sessionless FetchContext and closing session 
id ${session.id}, " +
  s"epoch ${session.epoch}: after removing 
${partitionsToLogString(removed)}, " +
  s"there are no more partitions left.")
cache.remove(session)
new SessionlessFetchContext(fetchData)
  } else {
cache.touch(session, time.milliseconds())
session.epoch = JFetchMetadata.nextEpoch(session.epoch)
debug(s"Created a new incremental FetchContext for session id 
${session.id}, " +
  s"epoch ${session.epoch}: added ${partitionsToLogString(added)}, 
" +
  s"updated ${partitionsToLogString(updated)}, " +
  s"removed ${partitionsToLogString(removed)}")
new IncrementalFetchContext(time, reqMetadata, session)
  }
}
  }
}
  }

{code}
Contention has been made worse by the solution for "KAFKA-9137: Fix incorrect 
FetchSessionCache eviction logic" 
([https://github.com/apache/kafka/pull/7640),] as the cache is correctly 
touched now, whereas previously the touch was being skipped.

 



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


Re: [VOTE] KIP-551: Expose disk read and write metrics

2020-01-11 Thread Lucas Bradstreet
+1 (non binding)

On Sat, 11 Jan 2020 at 02:32, M. Manna  wrote:

> Hey Colin,
>
> +1 - Really useful for folks managing a cluster by themselves.
>
> M. MAnna
>
> On Fri, 10 Jan 2020 at 22:35, Jose Garcia Sancio 
> wrote:
>
> > +1, LGTM.
> >
> > On Fri, Jan 10, 2020 at 2:19 PM Gwen Shapira  wrote:
> >
> > > +1, thanks for driving this
> > >
> > > On Fri, Jan 10, 2020 at 2:17 PM Colin McCabe 
> wrote:
> > > >
> > > > Hi all,
> > > >
> > > > I'd like to start the vote on KIP-551: Expose disk read and write
> > > metrics.
> > > >
> > > > KIP:  https://cwiki.apache.org/confluence/x/sotSC
> > > >
> > > > Discussion thread:
> > > >
> > >
> >
> https://lists.apache.org/thread.html/cfaac4426455406abe890464a7f4ae23a5c69a39afde66fe6eb3d696%40%3Cdev.kafka.apache.org%3E
> > > >
> > > > cheers,
> > > > Colin
> > >
> >
> >
> > --
> > -Jose
> >
>


Re: [DISCUSS] KIP-551: Expose disk read and write metrics

2020-01-09 Thread Lucas Bradstreet
Hi Colin,

This is a great idea, as it is very useful to have these metrics in
addition to the usual Kafka metrics given the impact of hitting disk
outside of page cache. Describing it as a gauge did initially strike me as
oldd, but given the way this is works it makes sense to me.

/proc/[pid]/io appears to only be supported as of kernel 2.6.20. Given that
was released back in 2007, maybe it's safe enough to assume it exists, but
I thought I would mention that anyway.

Without bikeshedding the metric names, would including a "Total" in the
name be better e.g. kafka.server:type=KafkaServer,name=DiskReadBytesTotal?

Cheers,

Lucas


On Mon, Jan 6, 2020 at 5:28 PM Colin McCabe  wrote:

> On Tue, Dec 10, 2019, at 11:10, Magnus Edenhill wrote:
> > Hi Colin,
> >
>
> Hi Magnus,
>
> Thanks for taking a look.
>
> > aren't those counters (ever increasing), rather than gauges
> (fluctuating)?
>
> Since this is in the Kafka broker, we're using Yammer.  This might be
> confusing, but Yammer's concept of a "counter" is not actually monotonic.
> It can decrease as well as increase.
>
> In general Yammer counters require you to call inc(amount) or dec(amount)
> on them.  This doesn't match up with what we need to do here, which is to
> (essentially) make a callback into the kernel by reading from /proc.
>
> The counter/gauge dichotomy doesn't affect the JMX, (I think?), so it's
> really kind of an implementation detail.
>
> >
> > You also mention CPU usage as a side note, you could use getrusage(2)'s
> > ru_utime (user) and ru_stime (sys)
> > to allow the broker to monitor its own CPU usage.
> >
>
> Interesting idea.  It might be better to save that for a future KIP,
> though, to avoid scope creep.
>
> best,
> Colin
>
> > /Magnus
> >
> > Den tis 10 dec. 2019 kl 19:33 skrev Colin McCabe :
> >
> > > Hi all,
> > >
> > > I wrote KIP about adding support for exposing disk read and write
> > > metrics.  Check it out here:
> > >
> > > https://cwiki.apache.org/confluence/x/sotSC
> > >
> > > best,
> > > Colin
> > >
> >
>


[jira] [Created] (KAFKA-9393) DeleteRecords triggers extreme lock contention for large partition directories

2020-01-09 Thread Lucas Bradstreet (Jira)
Lucas Bradstreet created KAFKA-9393:
---

 Summary: DeleteRecords triggers extreme lock contention for large 
partition directories
 Key: KAFKA-9393
 URL: https://issues.apache.org/jira/browse/KAFKA-9393
 Project: Kafka
  Issue Type: Improvement
Affects Versions: 2.4.0, 2.3.0, 2.2.0
Reporter: Lucas Bradstreet


DeleteRecords, frequently used by KStreams triggers a 
Log.maybeIncrementLogStartOffset call, calling 
kafka.log.ProducerStateManager.listSnapshotFiles which calls 
java.io.File.listFiles on the partition dir. The time taken to list this 
directory can be extreme for partitions with many small segments (e.g 2) 
taking multiple seconds to finish. This causes lock contention for the log, and 
if produce requests are also occurring for the same log can cause a majority of 
request handler threads to become blocked waiting for the DeleteRecords call to 
finish.

I believe this is a problem going back to the initial implementation of the 
transactional producer, but I need to confirm how far back it goes.

One possible solution is to maintain a producer state snapshot aligned to the 
log segment, and simply delete it whenever we delete a segment. This would 
ensure that we never have to perform a directory scan.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


Re: [VOTE] KIP-526: Reduce Producer Metadata Lookups for Large Number of Topics

2020-01-06 Thread Lucas Bradstreet
+1 (non-binding)

On Thu, Jan 2, 2020 at 11:15 AM Brian Byrne  wrote:

> Hello all,
>
> After further discussion and improvements, I'd like to reinstate the voting
> process.
>
> The updated KIP: https://cwiki.apache.org/confluence/display/KAFKA/KIP-526
> %3A+Reduce+Producer+Metadata+Lookups+for+Large+Number+of+Topics
> 
>
> The continued discussion:
>
> https://lists.apache.org/thread.html/b2f8f830ef04587144cf0840c7d4811bbf0a14f3c459723dbc5acf9e@%3Cdev.kafka.apache.org%3E
>
> I'd be happy to address any further comments/feedback.
>
> Thanks,
> Brian
>
> On Mon, Dec 9, 2019 at 11:02 PM Guozhang Wang  wrote:
>
> > With the concluded summary on the other discussion thread, I'm +1 on the
> > proposal.
> >
> > Thanks Brian!
> >
> > On Tue, Nov 19, 2019 at 8:00 PM deng ziming 
> > wrote:
> >
> > > >
> > > > For new (uncached) topics, one problem here is that we don't know
> which
> > > > partition to map a record to in the event that it has a key or custom
> > > > partitioner, so the RecordAccumulator wouldn't know which
> batch/broker
> > it
> > > > belongs. We'd need an intermediate record queue that subsequently
> moved
> > > the
> > > > records into RecordAccumulators once metadata resolution was
> complete.
> > > For
> > > > known topics, we don't currently block at all in waitOnMetadata.
> > > >
> > >
> > > You are right, I forget this fact, and the intermediate record queue
> will
> > > help, but I have some questions
> > >
> > > if we add an intermediate record queue in KafkaProducer, when should we
> > > move the records into RecordAccumulators?
> > > only NetworkClient is aware of the MetadataResponse, here is the
> > > hierarchical structure of the related classes:
> > > KafkaProducer
> > > Accumulator
> > > Sender
> > > NetworkClient
> > > metadataUpdater.handleCompletedMetadataResponse
> > >
> > > so
> > > 1. we should also add a metadataUpdater to KafkaProducer?
> > > 2. if the topic really does not exists? the intermediate record queue
> > will
> > > become too large?
> > > 3. and should we `block` when the intermediate record queue is too
> large?
> > > and this will again bring the blocking problem?
> > >
> > >
> > >
> > > On Wed, Nov 20, 2019 at 12:40 AM Brian Byrne 
> > wrote:
> > >
> > > > Hi Deng,
> > > >
> > > > Thanks for the feedback.
> > > >
> > > > On Mon, Nov 18, 2019 at 6:56 PM deng ziming <
> dengziming1...@gmail.com>
> > > > wrote:
> > > >
> > > > > hi, I reviewed the current code, the ProduceMetadata maintains an
> > > expiry
> > > > > threshold for every topic, every time when we write to a topic we
> > will
> > > > set
> > > > > the expiry time to -1 to indicate it should be updated, this does
> > work
> > > to
> > > > > reduce the size of the topic working set, but the producer will
> > > continue
> > > > > fetching metadata for these topics in every metadata request for
> the
> > > full
> > > > > expiry duration.
> > > > >
> > > >
> > > > Indeed, you are correct, I terribly misread the code here.
> Fortunately
> > > this
> > > > was only a minor optimization in the KIP that's no longer necessary.
> > > >
> > > >
> > > > and we can improve the situation by 2 means:
> > > > > 1. we maintain a refresh threshold for every topic which is for
> > > > example
> > > > > 0.8 * expiry_threshold, and when we send `MetadataRequest` to
> brokers
> > > we
> > > > > just request unknownLeaderTopics + unknownPartitionTopics + topics
> > > > > reach refresh threshold.
> > > > >
> > > >
> > > > Right, this is similar to what I suggested, with a larger window on
> the
> > > > "staleness" that permits for batching to an appropriate size (except
> if
> > > > there's any unknown topics, you'd want to issue the request
> > immediately).
> > > >
> > > >
> > > >
> > > > > 2. we don't invoke KafkaProducer#waitOnMetadata when we call
> > > > > KafkaProducer#send because of we just send data to
> RecordAccumulator,
> > > and
> > > > > before we send data to brokers we will invoke
> > > RecordAccumulator#ready(),
> > > > so
> > > > > we can only invoke waitOnMetadata to block when (number topics
> > > > > reach refresh threshold)>(number of all known topics)*0.2.
> > > > >
> > > >
> > > > For new (uncached) topics, one problem here is that we don't know
> which
> > > > partition to map a record to in the event that it has a key or custom
> > > > partitioner, so the RecordAccumulator wouldn't know which
> batch/broker
> > it
> > > > belongs. We'd need an intermediate record queue that subsequently
> moved
> > > the
> > > > records into RecordAccumulators once metadata resolution was
> complete.
> > > For
> > > > known topics, we don't currently block at all in waitOnMetadata.
> > > >
> > > > The last major point of minimizing producer startup metadata RPCs may
> > > still
> > > > need to be improved, but this would be a large improvement on the
> > cu

[jira] [Created] (KAFKA-9359) Controller does not handle requests while broker is being shutdown

2020-01-02 Thread Lucas Bradstreet (Jira)
Lucas Bradstreet created KAFKA-9359:
---

 Summary: Controller does not handle requests while broker is being 
shutdown
 Key: KAFKA-9359
 URL: https://issues.apache.org/jira/browse/KAFKA-9359
 Project: Kafka
  Issue Type: Improvement
  Components: controller, core
Reporter: Lucas Bradstreet


When a broker is shutdown it stops accepting requests, as it immediately socket 
server and handler pools are shutdown. It does so before shutting down the 
controller and or closing the log manager, and this may take some time to 
complete. During this time it will remain the controller as the zkClient has 
not been closed. We should improve the shutdown process such that a broker does 
not remain the controller while it is unable to accept requests that is 
expected of a controller.

See also https://issues.apache.org/jira/browse/KAFKA-9358



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Created] (KAFKA-9358) Explicitly resign controller leadership and broker znode

2020-01-02 Thread Lucas Bradstreet (Jira)
Lucas Bradstreet created KAFKA-9358:
---

 Summary: Explicitly resign controller leadership and broker znode
 Key: KAFKA-9358
 URL: https://issues.apache.org/jira/browse/KAFKA-9358
 Project: Kafka
  Issue Type: Improvement
  Components: controller, core
Reporter: Lucas Bradstreet
Assignee: Lucas Bradstreet


When shutting down the controller the broker shuts down the controller and then 
closes the zookeeper connection. Closing the zookeeper connection results in 
ephemeral nodes being removed. It is currently critical that the zkClient is 
closed after the controller is shutdown, otherwise a controller election will 
not occur if the broker being shutdown is currently the controller.

We should consider resigning leadership explicitly in the controller rather 
than relying on the zookeeper client being closed. This would ensure that any 
changes in shutdown order cannot lead to periods where a broker's controller 
component is stopped while also maintaining leadership until the zkClient is 
closed.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Created] (KAFKA-9338) Incremental fetch sessions do not maintain or use leader epoch for fencing purposes

2019-12-27 Thread Lucas Bradstreet (Jira)
Lucas Bradstreet created KAFKA-9338:
---

 Summary: Incremental fetch sessions do not maintain or use leader 
epoch for fencing purposes
 Key: KAFKA-9338
 URL: https://issues.apache.org/jira/browse/KAFKA-9338
 Project: Kafka
  Issue Type: Bug
  Components: core
Affects Versions: 2.4.0, 2.3.0, 2.2.0, 2.1.0
Reporter: Lucas Bradstreet


KIP-320 adds the ability to fence replicas by detecting stale leader epochs 
from followers, and helping consumers handle unclean truncation.

Unfortunately the incremental fetch session handling does not maintain or use 
the leader epoch in the fetch session cache. As a result, it does not appear 
that the leader epoch is used for fencing a majority of the time. I'm not sure 
if this is only the case after incremental fetch sessions are established - it 
may be the case that the first "full" fetch session is safe.

Optional.empty is returned for the FetchRequest.PartitionData here:

[https://github.com/apache/kafka/blob/a4cbdc6a7b3140ccbcd0e2339e28c048b434974e/core/src/main/scala/kafka/server/FetchSession.scala#L111]

I believe this affects brokers from 2.1.0 when fencing was improved on the 
replica fetcher side, and 2.3.0 and above for consumers, which is when client 
side truncation detection was added on the consumer side.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Created] (KAFKA-9312) KafkaProducer flush behavior does not guarantee send completion under record batch splitting

2019-12-17 Thread Lucas Bradstreet (Jira)
Lucas Bradstreet created KAFKA-9312:
---

 Summary: KafkaProducer flush behavior does not guarantee send 
completion under record batch splitting
 Key: KAFKA-9312
 URL: https://issues.apache.org/jira/browse/KAFKA-9312
 Project: Kafka
  Issue Type: Bug
  Components: clients
Affects Versions: 2.4.0, 2.3.0, 2.2.0, 2.1.0
Reporter: Lucas Bradstreet


The KafkaProducer flush call guarantees that all records that have been sent at 
time of the flush call will be either sent successfully or will result in an 
error.

The KafkaProducer will split record batches upon receiving a MESSAGE_TOO_LARGE 
error from the broker. However the flush behavior relies on the accumulator 
checking incomplete sends that exist at the time of the flush call.
{code:java}
public void awaitFlushCompletion() throws InterruptedException {
try {
for (ProducerBatch batch : this.incomplete.copyAll())
batch.produceFuture.await();
} finally {
this.flushesInProgress.decrementAndGet();
}
}{code}
When large record batches are split, the batch producerFuture in question is 
completed, and new batches added to the incomplete list of record batches. This 
will break the flush guarantee as awaitFlushCompletion will finish without 
awaiting the corresponding batches.

This is demonstrated in a test case that can be found at 
[https://github.com/lbradstreet/kafka/commit/733a683273c31823df354d0a785cb2c24365735a#diff-0b8da0c7ceecaa1f00486dadb53208b1R2339]



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Created] (KAFKA-9200) ListOffsetRequest missing error response for v5

2019-11-17 Thread Lucas Bradstreet (Jira)
Lucas Bradstreet created KAFKA-9200:
---

 Summary: ListOffsetRequest missing error response for v5
 Key: KAFKA-9200
 URL: https://issues.apache.org/jira/browse/KAFKA-9200
 Project: Kafka
  Issue Type: Bug
Affects Versions: 2.3.0
Reporter: Lucas Bradstreet
Assignee: Lucas Bradstreet


It seems ListOffsetResponse getErrorResponse is missing a a case for version 5. 
I'd have hoped that this kind of case would be picked up by KafkaApisTest.
{noformat}
java.lang.IllegalArgumentException: Version 5 is not valid. Valid versions for 
ListOffsetRequest are 0 to 5


   
at 
org.apache.kafka.common.requests.ListOffsetRequest.getErrorResponse(ListOffsetRequest.java:282)




at 
kafka.server.KafkaApis.sendErrorOrCloseConnection(KafkaApis.scala:3062) 



   
at 
kafka.server.KafkaApis.sendErrorResponseMaybeThrottle(KafkaApis.scala:3045) 



   
at kafka.server.KafkaApis.handleError(KafkaApis.scala:3027) 



  
at kafka.server.KafkaApis.handle(KafkaApis.scala:209)   



  
at kafka.server.KafkaRequestHandler.run(KafkaRequestHandler.scala:78)   



  
at java.lang.Thread.run(Thread.java:748)

{noformat}



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Created] (KAFKA-9193) org.apache.kafka.common.utils.Timer should use monotonic clock

2019-11-14 Thread Lucas Bradstreet (Jira)
Lucas Bradstreet created KAFKA-9193:
---

 Summary: org.apache.kafka.common.utils.Timer should use monotonic 
clock
 Key: KAFKA-9193
 URL: https://issues.apache.org/jira/browse/KAFKA-9193
 Project: Kafka
  Issue Type: Bug
Reporter: Lucas Bradstreet


utils.Timer uses System.currentTimeMillis to implement blocking methods with 
timeouts. We should not rely on a non-monotonic clock and should instead switch 
this to Time.hiResClockMs() (which uses System.nanoTime).

When we do so we should revert [https://github.com/apache/kafka/pull/7683] 
which was caused by inaccuracies in our current approach (the test was good, 
the code is bad).



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


Re: [DISCUSS] KIP-531: Drop support for Scala 2.11 in Kafka 2.5

2019-11-08 Thread Lucas Bradstreet
This would be great. Maintaining Scala 2.11 support is reasonably painful.
Dropping support in 2.5 sounds good to me.

On Thu, 7 Nov 2019 at 17:35, Ismael Juma  wrote:

> Hi all,
>
> I think it's time to simplify our development environment by dropping
> support for Scala 2.11. Please take a look at the proposal and
> provide feedback:
>
>
> https://cwiki.apache.org/confluence/display/KAFKA/KIP-531%3A+Drop+support+for+Scala+2.11+in+Kafka+2.5
>
> Ismael
>


[jira] [Created] (KAFKA-9137) Maintenance of FetchSession cache causing FETCH_SESSION_ID_NOT_FOUND in live sessions

2019-11-04 Thread Lucas Bradstreet (Jira)
Lucas Bradstreet created KAFKA-9137:
---

 Summary: Maintenance of FetchSession cache causing 
FETCH_SESSION_ID_NOT_FOUND in live sessions
 Key: KAFKA-9137
 URL: https://issues.apache.org/jira/browse/KAFKA-9137
 Project: Kafka
  Issue Type: Bug
  Components: core
Reporter: Lucas Bradstreet


We have recently seen cases where brokers end up in a bad state where fetch 
session evictions occur at a high rate (> 16 per second) after a roll. This 
increase in eviction rate included the following pattern in our logs:

 
{noformat}
broker 6: October 31st 2019, 17:52:45.496 Created a new incremental 
FetchContext for session id 2046264334, epoch 9790: added (), updated (), 
removed ()

broker 6: October 31st 2019, 17:52:45.496 Created a new incremental 
FetchContext for session id 2046264334, epoch 9791: added (), updated (), 
removed () broker 6: October 31st 2019, 17:52:45.500 Created a new incremental 
FetchContext for session id 2046264334, epoch 9792: added (), updated 
(lkc-7nv6o_tenant_soak_topic_144p-67), removed () 

broker 6: October 31st 2019, 17:52:45.501 Created a new incremental 
FetchContext for session id 2046264334, epoch 9793: added (), updated 
(lkc-7nv6o_tenant_soak_topic_144p-59, lkc-7nv6o_tenant_soak_topic_144p-123, 
lkc-7nv6o_tenant_soak_topic_144p-11, lkc-7nv6o_tenant_soak_topic_144p-3, 
lkc-7nv6o_tenant_soak_topic_144p-67, lkc-7nv6o_tenant_soak_topic_144p-115), 
removed () 

broker 6: October 31st 2019, 17:52:45.501 Evicting stale FetchSession 
2046264334. 

broker 6: October 31st 2019, 17:52:45.502 Session error for 2046264334: no such 
session ID found. 

broker 4: October 31st 2019, 17:52:45.813 [ReplicaFetcher replicaId=4, 
leaderId=6, fetcherId=0] Node 6 was unable to process the fetch request with 
(sessionId=2046264334, epoch=9793): FETCH_SESSION_ID_NOT_FOUND.  
{noformat}
This pattern appears to be problematic for two reasons. Firstly, the replica 
fetcher for broker 4 was clearly able to send multiple incremental fetch 
requests to broker 6, and receive replies, and did so right up to the point 
where broker 6 evicted its fetch session within milliseconds of multiple fetch 
requests. The second problem is that replica fetchers are considered privileged 
for the fetch session cache, and should not be evicted by consumer fetch 
sessions. This cluster only has 12 brokers and 1000 fetch session cache slots 
(the default for max.incremental.fetch.session.cache.slots), and it thus very 
unlikely that this session should have been evicted by another replica fetcher 
session.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Created] (KAFKA-9048) Improve partition scalability in replica fetcher

2019-10-15 Thread Lucas Bradstreet (Jira)
Lucas Bradstreet created KAFKA-9048:
---

 Summary: Improve partition scalability in replica fetcher
 Key: KAFKA-9048
 URL: https://issues.apache.org/jira/browse/KAFKA-9048
 Project: Kafka
  Issue Type: Task
  Components: core
Reporter: Lucas Bradstreet


https://issues.apache.org/jira/browse/KAFKA-9039 
([https://github.com/apache/kafka/pull/7443]) improves the performance of the 
replica fetcher (at both small and large numbers of partitions), but it does 
not improve its complexity or scalability in the number of partitions.

I took a profile using async-profiler for the 1000 partition JMH replica 
fetcher benchmark. The big remaining culprits are:
 * ~18% looking up logStartOffset
 * ~45% FetchSessionHandler$Builder.add
 * ~19% FetchSessionHandler$Builder.build

*Suggestions*
 #  The logStartOffset is looked up for every partition on each doWork pass. 
This requires a hashmap lookup even though the logStartOffset changes rarely. 
If the replica fetcher could be notified of updates to the logStartOffset, then 
we could reduce the overhead to a function of the number of updates to the 
logStartOffset instead of O(n) on each pass.
 #  The use of FetchSessionHandler means that we maintain a partitionStates 
hashmap in the replica fetcher, and a sessionPartitions hashmap in the 
FetchSessionHandler. On each incremental fetch session pass, we need to 
reconcile these two hashmaps to determine which partitions were added/updated 
and which partitions were removed. This reconciliation process is especially 
expensive, requiring multiple passes over the fetching partitions, and hashmap 
remove and puts for most partitions. The replica fetcher could be smarter by 
maintaining the fetch session *updated* hashmap containing 
FetchRequest.PartitionData(s) directly, as well as *removed* partitions list so 
that these do not need to be generated by reconciled on each fetch pass.
 #  maybeTruncate requires an O(n) pass over the elements in partitionStates 
even if there are no partitions in truncating state. If we can maintain some 
additional state about whether truncating partitions exist in partitionStates, 
or if we could separate these states into a separate data structure, we would 
not need to iterate across all partitions on every doWork pass. I’ve seen 
clusters where this work takes about 0.5%-1% of CPU, which is minor but will 
become more substantial as the number of partitions increases.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


Re: [DISCUSS] KIP-526: Reduce Producer Metadata Lookups for Large Number of Topics

2019-10-03 Thread Lucas Bradstreet
Hi Brian,

This looks great, and should help reduce blocking and high metadata request
volumes when the producer is sending to large numbers of topics, especially
at low volumes. I think the approach to make metadata fetching asynchronous
and batch metadata requests together will help significantly.

The only other approach I can think of is to allow users to supply the
producer with the expected topics upfront, allowing the producer to perform
a single initial metadata request before any sends occur. I see no real
advantages to this approach compared to the async method you’ve proposed,
but maybe we could add it to the rejected alternatives section?

Thanks,

Lucas

On Fri, 20 Sep 2019 at 11:46, Brian Byrne  wrote:

> I've updated the 'Proposed Changes' to include two new producer
> configuration variables: topic.expiry.ms and topic.refresh.ms. Please take
> a look.
>
> Thanks,
> Brian
>
> On Tue, Sep 17, 2019 at 12:59 PM Brian Byrne  wrote:
>
> > Dev team,
> >
> > Requesting discussion for improvement to the producer when dealing with a
> > large number of topics.
> >
> > KIP:
> >
> https://cwiki.apache.org/confluence/display/KAFKA/KIP-526%3A+Reduce+Producer+Metadata+Lookups+for+Large+Number+of+Topics
> >
> > JIRA: https://issues.apache.org/jira/browse/KAFKA-8904
> >
> > Thoughts and feedback would be appreciated.
> >
> > Thanks,
> > Brian
> >
>


[jira] [Created] (KAFKA-8963) Benchmark and optimize incremental fetch session handler

2019-09-30 Thread Lucas Bradstreet (Jira)
Lucas Bradstreet created KAFKA-8963:
---

 Summary: Benchmark and optimize incremental fetch session handler
 Key: KAFKA-8963
 URL: https://issues.apache.org/jira/browse/KAFKA-8963
 Project: Kafka
  Issue Type: Task
Reporter: Lucas Bradstreet


The FetchSessionHandler is a cause of high CPU usage in the replica fetcher for 
brokers with high partition counts. We should add a jmh benchmark and optimize 
the incremental fetch session building.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Resolved] (KAFKA-8899) Optimize Partition.maybeIncrementLeaderHW

2019-09-12 Thread Lucas Bradstreet (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-8899?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Lucas Bradstreet resolved KAFKA-8899.
-
Resolution: Duplicate

Duplicate of https://issues.apache.org/jira/browse/KAFKA-8841

> Optimize Partition.maybeIncrementLeaderHW
> -
>
> Key: KAFKA-8899
> URL: https://issues.apache.org/jira/browse/KAFKA-8899
> Project: Kafka
>  Issue Type: Task
>  Components: core
>Affects Versions: 2.3.0, 2.2.1
>Reporter: Lucas Bradstreet
>Priority: Major
>
> Partition.maybeIncrementLeaderHW is in the hot path for 
> ReplicaManager.updateFollowerFetchState. When replicating between brokers 
> with high partition counts, maybeIncrementLeaderHW becomes expensive, with 
> much of the time going to calling Partition.remoteReplicas which performs a 
> toSet conversion. maybeIncrementLeaderHW should avoid generating any 
> intermediate collections when calculating the new HWM.



--
This message was sent by Atlassian Jira
(v8.3.2#803003)


[jira] [Created] (KAFKA-8899) Optimize Partition.maybeIncrementLeaderHW

2019-09-11 Thread Lucas Bradstreet (Jira)
Lucas Bradstreet created KAFKA-8899:
---

 Summary: Optimize Partition.maybeIncrementLeaderHW
 Key: KAFKA-8899
 URL: https://issues.apache.org/jira/browse/KAFKA-8899
 Project: Kafka
  Issue Type: Task
  Components: core
Affects Versions: 2.2.1, 2.3.0
Reporter: Lucas Bradstreet


Partition.maybeIncrementLeaderHW is in the hot path for 
ReplicaManager.updateFollowerFetchState. When replicating between brokers with 
high partition counts, maybeIncrementLeaderHW becomes expensive, with much of 
the time going to calling Partition.remoteReplicas which performs a toSet 
conversion. maybeIncrementLeaderHW should avoid generating any intermediate 
collections when calculating the new HWM.



--
This message was sent by Atlassian Jira
(v8.3.2#803003)


[DISCUSS] KIP-516: Topic Identifiers

2019-09-04 Thread Lucas Bradstreet
Hi all,

I would like to kick off discussion of KIP-516, an implementation of topic
IDs for Kafka. Topic IDs aim to solve topic uniqueness problems in Kafka,
where referring to a topic by name alone is insufficient. Such cases
include when a topic has been deleted and recreated with the same name.

Unique identifiers will help simplify and improve Kafka's topic deletion
process, as well as prevent cases where brokers may incorrectly interact
with stale versions of topics.

https://cwiki.apache.org/confluence/display/KAFKA/KIP-516%3A+Topic+Identifiers

Looking forward to your thoughts.

Lucas


[jira] [Created] (KAFKA-8872) Improvements to controller "deleting" state / topic Identifiers

2019-09-04 Thread Lucas Bradstreet (Jira)
Lucas Bradstreet created KAFKA-8872:
---

 Summary: Improvements to controller "deleting" state /  topic 
Identifiers
 Key: KAFKA-8872
 URL: https://issues.apache.org/jira/browse/KAFKA-8872
 Project: Kafka
  Issue Type: Improvement
Reporter: Lucas Bradstreet


Kafka currently uniquely identifies a topic by its name. This is generally 
sufficient, but there are flaws in this scheme if a topic is deleted and 
recreated with the same name. As a result, Kafka attempts to prevent these 
classes of issues by ensuring a topic is deleted from all replicas before 
completing a deletion. This solution is not perfect, as it is possible for 
partitions to be reassigned from brokers while they are down, and there are no 
guarantees that this state will ever be cleaned up and will not cause issues in 
the future.

As the controller must wait for all replicas to delete their local partitions, 
deletes can also become blocked, preventing topics from being created with the 
same name until the deletion is complete on all replicas. This can mean that 
downtime for a single broker can effectively cause a complete outage for 
everyone producing/consuming to that topic name, as the topic cannot be 
recreated without manual intervention.

Unique topic IDs could help address this issue by associating a unique ID with 
each topic, ensuring a newly created topic with a previously used name cannot 
be confused with a previous topic with that name.

 



--
This message was sent by Atlassian Jira
(v8.3.2#803003)


Request for permission to create KIP

2019-09-03 Thread Lucas Bradstreet
Hi,

Could I please be given permission to add a KIP to
https://cwiki.apache.org/confluence/display/KAFKA/Kafka+Improvement+Proposals?
My username is lucasbradstreet.

Thanks


[jira] [Created] (KAFKA-8499) Ducker missing java commands in path for ducker user on openjdk docker images

2019-06-06 Thread Lucas Bradstreet (JIRA)
Lucas Bradstreet created KAFKA-8499:
---

 Summary: Ducker missing java commands in path for ducker user on 
openjdk docker images
 Key: KAFKA-8499
 URL: https://issues.apache.org/jira/browse/KAFKA-8499
 Project: Kafka
  Issue Type: Improvement
  Components: system tests
Affects Versions: 2.2.0, 2.3.0
Reporter: Lucas Bradstreet


openjdk:8/openjdk:11 used to include java and other java programs in /usr/bin. 
It has since been moved to ```/usr/local/openjdk-VERSION/bin```, which will 
cause problems when the system tests invoke any java related utility as well as 
java itself if the user is using a later image with the same tag. The openjdk 
images have been updated with the same tag, so this can happen suddenly without 
any other code changes if the new version is pulled.

We need to ensure the ducker user that is created in the Dockerfile includes 
the new location that java is installed to is included in its path.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Created] (KAFKA-8125) Check for topic existence in CreateTopicsRequest prior to creating replica assignment

2019-03-18 Thread Lucas Bradstreet (JIRA)
Lucas Bradstreet created KAFKA-8125:
---

 Summary: Check for topic existence in CreateTopicsRequest prior to 
creating replica assignment
 Key: KAFKA-8125
 URL: https://issues.apache.org/jira/browse/KAFKA-8125
 Project: Kafka
  Issue Type: Improvement
  Components: core
Affects Versions: 2.1.1
Reporter: Lucas Bradstreet


Imagine the following pattern to ensure topic creation in an application:
 # Attempt to create a topic with # partitions P and replication factor R.
 #  If topic creation fails with TopicExistsException, continue. If topic 
creation succeeds, continue, the topic now exists.

This normally works fine. However, if the topic has already been created, but 
if the number of live brokers < R, then the topic creation will fail an 
org.apache.kafka.common.errors.InvalidReplicationFactorException, even though 
the topic already exists.

This could be avoided if we check whether the topic exists prior to calling 
AdminUtils.assignReplicasToBrokers.

 



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Created] (KAFKA-7410) Rack aware partitions assignment create unbalanced broker assignments on unbalanced racks

2018-09-13 Thread Lucas Bradstreet (JIRA)
Lucas Bradstreet created KAFKA-7410:
---

 Summary: Rack aware partitions assignment create unbalanced broker 
assignments on unbalanced racks
 Key: KAFKA-7410
 URL: https://issues.apache.org/jira/browse/KAFKA-7410
 Project: Kafka
  Issue Type: Bug
  Components: admin
Affects Versions: 1.1.1
Reporter: Lucas Bradstreet
 Attachments: AdminUtilsTest.scala

AdminUtils creates a bad partition assignment when the number of brokers on 
each rack is unbalanced, e.g. 80 brokers rack A, 20 brokers rack B, 15 brokers 
rack C. Under such a scenario, a single broker from rack C may be assigned over 
and over again, when more balanced allocations exist.

kafka.admin.AdminUtils.getRackAlternatedBrokerList is supposed to create a list 
of brokers alternating by rack, however once it runs out of brokers on the 
racks with fewer brokers, it ends up placing a run of brokers from the same 
rack together as rackIterator.hasNext will return false for the other racks.
{code:java}
while (result.size < brokerRackMap.size) {
  val rackIterator = brokersIteratorByRack(racks(rackIndex))
  if (rackIterator.hasNext)
result += rackIterator.next()
  rackIndex = (rackIndex + 1) % racks.length
}{code}
Once assignReplicasToBrokersRackAware hits the run of brokers from the same 
rack, when choosing the replicas to go along with the leader on the rack with 
the most brokers e.g. C, it will skip all of the C brokers until it wraps 
around to the first broker in the alternated list, and choose the first broker 
in the alternated list.

 
{code:java}
if ((!racksWithReplicas.contains(rack) || racksWithReplicas.size == numRacks)
&& (!brokersWithReplicas.contains(broker) || brokersWithReplicas.size == 
numBrokers)) {
replicaBuffer += broker
racksWithReplicas += rack
brokersWithReplicas += broker
done = true
}
k += 1
{code}
It does so for each of the remaining brokers for C, choosing the first broker 
in the alternated list until it's allocated all of the partitions.

See the attached sample code for more details.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)