Streams Processing meetup on Wednesday, February 5, 2020 at LinkedIn, Sunnyvale

2020-01-27 Thread Joel Koshy
*[bcc: (users,dev)@kafka.apache.org ]*

Hello,

The Streams Infra team invites you to attend the Streams Processing meetup
to be held on Wednesday, February 5, 2020. This meetup will focus on Apache
Kafka, Apache Samza and related streaming technologies.

*Where*: Unify Conference room, 950 W Maude Ave, Sunnyvale

*When*: 5:00 - 8:00 PM

*RSVP*: Please RSVP here (only if attending in person)
https://www.meetup.com/Stream-Processing-Meetup-LinkedIn/events/267283444/
A streaming link will be posted approximately 30 minutes prior to the event.

*Agenda:*

 - 5:00 PM: Doors open and catered food available

5:00 - 6:00 PM: Networking

6:00 - 6:30 PM:
*High-performance data replication at Salesforce with MirusPaul Davidson,
Salesforce*

At Salesforce we manage high-volume Apache Kafka clusters in a growing
number of data centers around the globe. In the past we relied on Kafka's
Mirror Maker tool for cross-data center replication but, as the volume and
variety of data increased, we needed a new solution to maintain a high
standard of service reliability. In this talk, we will describe Mirus, our
open-source data replication tool based on Kafka Connect. Mirus was
designed for reliable, high-performance data replication at scale. It
successfully replaced MirrorMaker at Salesforce and has now been running
reliably in production for more than a year. We will give an overview of
the Mirus design and discuss the lessons we learned deploying, tuning, and
operating Mirus in a high-volume production environment.

6:30 - 7:00 PM: *Defending users from Abuse using Stream Processing at
LinkedIn, Bhargav Golla, LinkedIn *

When there are more than half a billion users, how can one effectively,
reliably and scalably classify them as good and bad users? This talk will
highlight how the Anti-Abuse team at LinkedIn leverages Streams Processing
technologies like Samza and Brooklin to keep the good users in a trusted
environment devoid of bad actors.

7:00 - 7:30 PM: *Enabling Mission-critical Stateful Stream Processing with
Samza, Ray Manpreet Singh Matharu, LinkedIn*

Samza powers a variety of large-scale business-critical stateful stream
processing applications at LinkedIn. Their scale necessitates using
persistent and replicated local state. Unfortunately, hard failures can
cause a loss of this local state, and re-caching it can incur downtime
ranging from a few minutes to hours! In this talk, we describe the systems
and protocols that we've devised that bound the down time to a few seconds.
We detail the tradeoffs our approach brings and how we tackle them in
production at LinkedIn.

7:30 - 8:00 PM: Additional networking and Q

Hope to see you there!

*[Streams Infra team @ LinkedIn]*


Re: [VOTE] KIP-514 Add a bounded flush() API to Kafka Producer

2019-10-25 Thread Joel Koshy
+1

On Thu, Oct 24, 2019 at 9:33 PM radai  wrote:

> Hello,
>
> I'd like to initiate a vote on KIP-514.
>
> links:
> the kip -
> https://cwiki.apache.org/confluence/display/KAFKA/KIP-514%3A+Add+a+bounded+flush%28%29+API+to+Kafka+Producer
> the PR - https://github.com/apache/kafka/pull/7569
>
> Thank you
>


Streams meetup at LinkedIn Sunnyvale, 6pm, Thursday, October 3, 2019

2019-09-26 Thread Joel Koshy
*[bcc: (users,dev)@kafka.apache.org ]*

Hi everyone,

The Streams Infra team invites you to attend a Streams Processing meetup on
Thursday, October 3, 2019 at LinkedIn's Sunnyvale campus. (This meetup focuses
on Apache Kafka, Apache Samza, and related streaming technologies.)

As always, there will food, drinks, and time for socializing before and
after the talks.

We would be happy to have you join us, but it will be live streamed and
recorded for those who are unable to attend.

*RSVP:*
Please RSVP *only* if you plan to attend in person.
A streaming link will be posted approximately one hour prior to the event.
https://www.meetup.com/Stream-Processing-Meetup-LinkedIn/events/264589317/

*When:*
6:00 - 9:00 PM, Thursday, October 3, 2019

*Where:*
Unify Conference room, LinkedIn Sunnyvale campus
950 W Maude Ave, Sunnyvale, CA 94085

*Summary:*
We have three talks lined up. Our first speaker is from the Azure Streaming
team. The talk will cover Azure Stream Analytics, its architecture,
capabilities and use cases.  The second talk is about Samza’s recently
built capability to do Stream processing using Python. Samza-Python opens
up multiple possibilities including the ability to use Python libraries for
ML applications. Finally, the Kafka team will talk about the development
workflow that allows them to handle LinkedIn’s scale by building a LinkedIn
version of Kafka while also contributing back to the open source community.
Additional details are below.

*Agenda:*
   5:30 PM - Doors open

5:30 - 6:00 PM - Networking

6:00 - 6:30 PM - *Azure Stream Analytics; Sasha Alperovich & Sid Ramadoss,
Microsoft*

Azure Stream Analytics (ASA) is a fully managed near real-time data
processing service on Azure. In this talk we will highlight the unique
value propositions that ASA brings to the table, and show a demo of how
Azure customers can utilize the power of ASA to gain insights in near
real-time with the NYC taxi scenario. We will then dive deeper into how the
service is built, covering resiliency, dataflow and other technical aspects
of the ASA runtime. We will also discuss how ASA’s unique design choices
compare and contrast with other streaming technologies, namely Spark
Structured Streaming and Flink

6:30 - 7:00 PM - *Stream Processing in Python using Apache Samza; Hai Lu,
LinkedIn*

Apache Samza is the streaming engine being used at LinkedIn that processes
~ 2 trillion messages daily. A while back we announced Samza's integration
with Apache Beam, a great success which leads to our Samza Beam API. Now an
UPGRADE of our APIs - we're now supporting Stream Processing in Python!
This work has made stream processing more accessible and enabled many
interesting use cases, particularly in the area of machine learning. The
Python API is based on our work of Samza runner for Apache Beam. In this
talk, we will quickly review our work on Samza runner, and then how we
extended it to support portability in Beam (Python specifically). In
addition to technical and architectural details, we will also talk about
how we bridged Python and Java ecosystems at LinkedIn with the Python API,
together with different use cases.

7:00 - 7:30 PM - *Apache Kafka and LinkedIn: How LinkedIn customizes Kafka
to work at the trillion scale; Jon Lee & Wesley Wu, LinkedIn*

At LinkedIn, we operate thousands of brokers to handle trillions of
messages per day. Running at such a large scale constantly raises various
scalability and operability challenges for the Kafka ecosystem. While we
try to maintain our internal releases as close as possible to upstream, we
maintain a version of Kafka which includes patches for addressing our
production and feature requirements. In this presentation we will share the
Kafka release that LinkedIn runs in production, the workflow process we
follow to develop new patches, the way we upstream the changes we make,
some of the patches we maintain in our branch and how we generate releases.

7:30 - 8:30 PM - Additional networking and Q

Hope to see you there!


Streams Meetup at LinkedIn Sunnyvale, 6pm, Wednesday, March 20, 2019

2019-03-07 Thread Joel Koshy
*[bcc: (users,dev)@kafka.apache.org ]*

Hi everyone,

The Streams Infrastructure team at LinkedIn invites you to attend a Streams
Processing meetup on Wednesday, March 20 at LinkedIn’s Sunnyvale campus.
(This meetup focuses on Apache Kafka, Apache Samza, and related streaming
technologies.)

As always, there will food, drinks, and time for socializing before and
after the talks.

We would be happy to have you join us, but it will be live streamed and
recorded for those who are unable to attend.

*RSVP*
https://www.meetup.com/Stream-Processing-Meetup-LinkedIn/events/259437388/

*Date*
6pm, Wednesday, March 20, 2019

*Location*
LinkedIn Sunnyvale campus - 950 W Maude Ave, Sunnyvale, CA 94085

*Agenda*
6:00-6:35 PM - Networking
6:35-7:10 PM - Apache Samza 1.0: Recent Advances and our plans for future
in Stream Processing (Prateek Maheshwari, LinkedIn)
7:15-7:50 PM - How and why we moved away from Kafka Mirror Maker to
Brooklin- LinkedIn's story (Shun-ping Chiu, LinkedIn)
7:55-8:30 PM - Puma - Stream Processing in Facebook (Rithin Shetty,
Facebook)
8:30-9:00 PM - Additional networking and Q

See you there!


Re: [VOTE] KIP-354 Time-based log compaction policy

2018-12-06 Thread Joel Koshy
+1 on the updated KIP.

On Wed, Dec 5, 2018 at 11:56 AM Dong Lin  wrote:

> Thanks for the update. +1 (binding)
>
> On Wed, Dec 5, 2018 at 8:19 AM Colin McCabe  wrote:
>
> > Thanks, Xiongqi Wu.  +1 (binding)
> >
> > regards,
> > Colin
> >
> >
> > On Tue, Dec 4, 2018, at 20:58, xiongqi (wesley) wu wrote:
> > > Colin,
> > >
> > > Thanks for comments.
> > > Out of ordered message timestamp is a very good point.
> > > We can combine max.compaction.lag.ms  with
> > > log.message.timestamp.difference.max.ms to achieve what we want in an
> > > environment that message timestamp can be shifted a lot.
> > >
> > > There are similar discussions regarding log.retention.ms and
> > > log.message.timestamp.difference.max.ms  in KAFKA-4340.
> > >
> > > I agree that we can always use first message timestamp not the
> > maxTimestamp
> > > of the previous log segment to make it simple.
> > >
> > > I have updated the KIP.
> > >
> > > Xiongqi (wesley) Wu
> > >
> > >
> > > On Tue, Dec 4, 2018 at 5:13 PM Colin McCabe 
> wrote:
> > >
> > > > Hi Xiongqi,
> > > >
> > > > Thinking about this a little bit more, it seems like we don't have
> any
> > > > guarantees just by looking at the timestamp of the first message in a
> > log
> > > > segment.  Similarly, we don't have any guarantees just by looking at
> > the
> > > > maxTimestamp of the previous log segment.  Old data could appear
> > anywhere--
> > > > you could put data that was years old in the middle of a segment from
> > 2018.
> > > >
> > > > However, if log.message.timestamp.difference.max.ms is set, then we
> > can
> > > > make some actual guarantees that old data gets purged-- which is what
> > the
> > > > GDPR requires, of course.
> > > >
> > > > Overall, maybe we can make KIP-354 simpler by just always looking at
> > the
> > > > timestamp of the first log message.  I don't think looking at the
> > > > maxTimestamp of the previous segment is any more accurate.  Aside
> from
> > > > that, looks good, since we can get what we need with the combination
> of
> > > > this and log.message.timestamp.difference.max.ms.
> > > >
> > > > best,
> > > > Colin
> > > >
> > > >
> > > > On Mon, Nov 26, 2018, at 13:10, xiongqi wu wrote:
> > > > > Thanks for binding and non-binding votes.
> > > > > Can I get one more binding vote?
> > > > >
> > > > > Thanks in advance!
> > > > >
> > > > > Xiongqi (Wesley) Wu
> > > > >
> > > > >
> > > > > On Wed, Nov 14, 2018 at 7:29 PM Matt Farmer  wrote:
> > > > >
> > > > > > I'm a +1 (non-binding) — This looks like it would have saved us a
> > lot
> > > > of
> > > > > > pain in an issue we had to debug recently. I can't go into
> > details, but
> > > > > > figuring out how to achieve this effect gave me quite a headache.
> > :)
> > > > > >
> > > > > > On Mon, Nov 12, 2018 at 1:00 PM xiongqi wu 
> > > > wrote:
> > > > > >
> > > > > > > Hi all,
> > > > > > >
> > > > > > > Can I have one more vote on this KIP?
> > > > > > > Any comment is appreciated.
> > > > > > >
> > > > > > >
> > > > > > >
> > > > > >
> > > >
> >
> https://cwiki.apache.org/confluence/display/KAFKA/KIP-354%3A+Add+a+Maximum+Log+Compaction+Lag
> > > > > > >
> > > > > > >
> > > > > > > Xiongqi (Wesley) Wu
> > > > > > >
> > > > > > >
> > > > > > > On Fri, Nov 9, 2018 at 7:56 PM xiongqi wu  >
> > > > wrote:
> > > > > > >
> > > > > > > > Thanks Dong.
> > > > > > > > I have updated the KIP.
> > > > > > > >
> > > > > > > > Xiongqi (Wesley) Wu
> > > > > > > >
> > > > > > > >
> > > > > > > > On Fri, Nov 9, 2018 at 5:31 PM Dong Lin  >
> > > > wrote:
> > > > > > > >
> > > > > 

Re: [VOTE] KIP-354 Time-based log compaction policy

2018-11-09 Thread Joel Koshy
+1 with one suggestion on the proposed metric. You should probably include
the unit. So for e.g., max-compaction-delay-secs.

Joel

On Tue, Nov 6, 2018 at 5:30 PM xiongqi wu  wrote:

> bump
> Xiongqi (Wesley) Wu
>
>
> On Thu, Sep 27, 2018 at 4:20 PM xiongqi wu  wrote:
>
> >
> > Thanks Eno, Brett, Dong, Guozhang, Colin,  and Xiaohe for feedback.
> > Can I have more feedback or VOTE on this KIP?
> >
> >
> > Xiongqi (Wesley) Wu
> >
> >
> > On Wed, Sep 19, 2018 at 10:52 AM xiongqi wu  wrote:
> >
> >> Any other votes or comments?
> >>
> >> Xiongqi (Wesley) Wu
> >>
> >>
> >> On Tue, Sep 11, 2018 at 11:45 AM xiongqi wu 
> wrote:
> >>
> >>> Yes, more votes and code review.
> >>>
> >>> Xiongqi (Wesley) Wu
> >>>
> >>>
> >>> On Mon, Sep 10, 2018 at 11:37 PM Brett Rann  >
> >>> wrote:
> >>>
>  +1 (non binding) from on 0 then, and on the KIP.
> 
>  Where do we go from here? More votes?
> 
>  On Tue, Sep 11, 2018 at 5:34 AM Colin McCabe 
>  wrote:
> 
>  > On Mon, Sep 10, 2018, at 11:44, xiongqi wu wrote:
>  > > Thank you for comments. I will use '0' for now.
>  > >
>  > > If we create topics through admin client in the future, we might
>  perform
>  > > some useful checks. (but the assumption is all brokers in the same
>  > cluster
>  > > have the same default configurations value, otherwise,it might
>  still be
>  > > tricky to do such cross validation check.)
>  >
>  > This isn't something that we might do in the future-- this is
>  something we
>  > are doing now. We already have Create Topic policies which are
>  enforced by
>  > the broker. Check KIP-108 and KIP-170 for details. This is one of
> the
>  > motivations for getting rid of direct ZK access-- making sure that
>  these
>  > policies are applied.
>  >
>  > I agree that having different configurations on different brokers
> can
>  be
>  > confusing and frustrating . That's why more configurations are being
>  made
>  > dynamic using KIP-226. Dynamic configurations are stored centrally
> in
>  ZK,
>  > so they are the same on all brokers (modulo propagation delays). In
>  any
>  > case, this is a general issue, not specific to "create topics".
>  >
>  > cheers,
>  > Colin
>  >
>  >
>  > >
>  > >
>  > > Xiongqi (Wesley) Wu
>  > >
>  > >
>  > > On Mon, Sep 10, 2018 at 11:15 AM Colin McCabe  >
>  > wrote:
>  > >
>  > > > I don't have a strong opinion. But I think we should probably be
>  > > > consistent with how segment.ms works, and just use 0.
>  > > >
>  > > > best,
>  > > > Colin
>  > > >
>  > > >
>  > > > On Wed, Sep 5, 2018, at 21:19, Brett Rann wrote:
>  > > > > OK thanks for that clarification. I see why you're
> uncomfortable
>  > with 0
>  > > > now.
>  > > > >
>  > > > > I'm not really fussed. I just prefer consistency in
>  configuration
>  > > > options.
>  > > > >
>  > > > > Personally I lean towards treating 0 and 1 similarly in that
>  > scenario,
>  > > > > because it favours the person thinking about setting the
>  > configurations,
>  > > > > and a person doesn't care about a 1ms edge case especially
> when
>  the
>  > > > context
>  > > > > is the true minimum is tied to the log cleaner cadence.
>  > > > >
>  > > > > Introducing 0 to mean "disabled" because there is some
>  uniquness in
>  > > > > segment.ms not being able to be set to 0, reduces
> configuration
>  > > > consistency
>  > > > > in favour of capturing a MS gap in an edge case that nobody
>  would
>  > ever
>  > > > > notice. For someone to understand why everywhere else -1 is
>  used to
>  > > > > disable, but here 0 is used, they would need to learn about
>  > segment.ms
>  > > > > having a 1ms minimum and then after learning would think "who
>  cares
>  > about
>  > > > > 1ms?" in this context. I would anyway :)
>  > > > >
>  > > > > my 2c anyway. Will again defer to majority. Curious which way
>  Colin
>  > falls
>  > > > > now.
>  > > > >
>  > > > > Don't want to spend more time on this though, It's well into
>  > > > bikeshedding
>  > > > > territory now. :)
>  > > > >
>  > > > >
>  > > > >
>  > > > > On Thu, Sep 6, 2018 at 1:31 PM xiongqi wu <
> xiongq...@gmail.com>
>  > wrote:
>  > > > >
>  > > > > > I want to honor the minimum value of segment.ms (which is
>  1ms) to
>  > > > force
>  > > > > > roll an active segment.
>  > > > > > So if we set "max.compaction.lag.ms" any value > 0, the
>  minimum of
>  > > > > > max.compaction.lag.ms and segment.ms will be used to seal
> an
>  > active
>  > > > > > segment.
>  > > > > >
>  > > > > > If we set max.compaction.lag.ms to 0, the current
>  implementation
>  > 

Re: [VOTE] KIP-291: Have separate queues for control requests and data requests

2018-10-09 Thread Joel Koshy
gt; > > > > > >> >> > inter.broker.listener.name?
>> > > > > > > >> >> > That seems limiting.
>> > > > > > > >> >> >
>> > > > > > > >> >> > 101. The KIP says that advertised.listeners and
>> listeners
>> > > > will
>> > > > > > now
>> > > > > > > >> have
>> > > > > > > >> >> a
>> > > > > > > >> >> > different default value including controller. Could
>> you
>> > > > > document
>> > > > > > > what
>> > > > > > > >> >> the
>> > > > > > > >> >> > default value looks like?
>> > > > > > > >> >> >
>> > > > > > > >> >> > 102. About removing the the following configs. How
>> does
>> > > that
>> > > > > > affect
>> > > > > > > >> the
>> > > > > > > >> >> > upgrade path? Do we now expect a user to add a new
>> config
>> > > > when
>> > > > > > > >> upgrading
>> > > > > > > >> >> > from an old version to a new one?
>> > > > > > > >> >> > host
>> > > > > > > >> >> > port
>> > > > > > > >> >> > advertised.host
>> > > > > > > >> >> > advertised.port
>> > > > > > > >> >> >
>> > > > > > > >> >> > Thanks,
>> > > > > > > >> >> >
>> > > > > > > >> >> > Jun
>> > > > > > > >> >> >
>> > > > > > > >> >> >
>> > > > > > > >> >> > On Thu, Sep 6, 2018 at 5:14 PM, Lucas Wang <
>> > > > > > lucasatu...@gmail.com>
>> > > > > > > >> >> wrote:
>> > > > > > > >> >> >
>> > > > > > > >> >> > > @Jun Rao 
>> > > > > > > >> >> > >
>> > > > > > > >> >> > > One clarification, currently on the selector level,
>> we
>> > > have
>> > > > > the
>> > > > > > > >> >> > > io-wait-ratio metric.
>> > > > > > > >> >> > > For the new controller *network* thread, we can use
>> it
>> > > > > directly
>> > > > > > > for
>> > > > > > > >> >> > > IdlePct, instead of using 1- io-ratio,
>> > > > > > > >> >> > > so that the logic is similar to the current average
>> > > IdlePct
>> > > > > for
>> > > > > > > >> >> network
>> > > > > > > >> >> > > threads. Is that correct?
>> > > > > > > >> >> > >
>> > > > > > > >> >> > > I've revised the KIP by adding two new metrics for
>> > > > measuring
>> > > > > > the
>> > > > > > > >> >> IdlePct
>> > > > > > > >> >> > > for the two additional threads.
>> > > > > > > >> >> > > Please take a look again. Thanks!
>> > > > > > > >> >> > >
>> > > > > > > >> >> > > Lucas
>> > > > > > > >> >> > >
>> > > > > > > >> >> > >
>> > > > > > > >> >> > >
>> > > > > > > >> >> > >
>> > > > > > > >> >> > >
>> > > > > > > >> >> > > On Wed, Sep 5, 2018 at 5:01 PM Jun Rao <
>> > j...@confluent.io
>> > > >
>> > > > > > wrote:
>> > > > > > > >> >> > >
>> > > > > > > >> >> > > > Hi, Lucas,
>> > > > > > > >> >> > > >
>> 

Re: [DISCUSS] KIP-291: Have separate queues for control requests and data requests

2018-08-24 Thread Joel Koshy
I had some offline discussions with Lucas on this KIP. While it is much
more work than the original proposals, separating the control plane
entirely removes any interference with the data plane as summarized under
the rejected alternatives section.

Just a few minor comments:

   - Can you update the link to the discussion thread and vote thread?
   - The idle ratio metrics are fairly important for monitoring. I think we
   agreed that these would only apply to the data plane (otherwise there will
   always be some skew due to the controller plane). If so, can you clarify
   that somewhere in the doc?
   - Personally, I prefer the term CONTROL to CONTROLLER in the configs.
   CONTROLLER makes it sound like it is a special listener on the controller.
   CONTROL clarifies that this is a listener for receiving control plane
   requests from the controller.


Thanks,

Joel

On Wed, Aug 22, 2018 at 12:45 AM, Eno Thereska 
wrote:

> Ok thanks, if you guys are seeing this at LinkedIn then the motivation
> makes more sense.
>
> Eno
>
> On Tue, Aug 21, 2018 at 5:39 PM, Becket Qin  wrote:
>
> > Hi Eno,
> >
> > Thanks for the comments. This KIP is not really about improving the
> > performance in general. It is about ensuring the cluster state can still
> be
> > updated quickly even if the brokers are under heavy load.
> >
> > We have seen quite often that it took dozens of seconds for a broker to
> > process the requests sent by the controller when the cluster is under
> heavy
> > load. This leads to the issues Lucas mentioned in the motivation part.
> >
> > Thanks,
> >
> > Jiangjie (Becket) Qin
> >
> > > On Aug 20, 2018, at 11:33 PM, Eno Thereska 
> > wrote:
> > >
> > > Hi folks,
> > >
> > > I looked at the previous numbers that Lucas provided (thanks!) but it's
> > > still not clear to me whether the performance benefits justify the
> added
> > > complexity. I'm looking for some intuition here (a graph would be great
> > but
> > > not required): for a small/medium/large cluster, what are the expected
> > > percentage of control requests today that will benefit from the change?
> > > It's a bit hard to go through this level of detail without knowing the
> > > expected end-to-end benefit. The best folks to answer this might be
> ones
> > > running such clusters, and ideally should pitch in with some data.
> > >
> > > Thanks
> > > Eno
> > >
> > > On Mon, Aug 20, 2018 at 7:29 AM, Becket Qin 
> > wrote:
> > >
> > >> Hi Lucas,
> > >>
> > >> In KIP-103, we introduced a convention to define and look up the
> > listeners.
> > >> So it would be good if the later KIPs can follow the same convention.
> > >>
> > >> From what I understand, the advertised.listeners is actually designed
> > for
> > >> our purpose, i.e. providing a list of listeners that can be used in
> > >> different cases. In KIP-103 it was used to separate internal traffic
> > from
> > >> the external traffic. It is not just for the user traffic or data
> > >> only. So adding
> > >> a controller listener is not repurposing the config. Also, ZK
> structure
> > is
> > >> only visible to brokers, the clients will still only see the listeners
> > they
> > >> are seeing today.
> > >>
> > >> For this KIP, we are essentially trying to separate the controller
> > traffic
> > >> from the inter-broker data traffic. So adding a new
> > >> listener.name.for.controller config seems reasonable. The behavior
> would
> > >> be:
> > >> 1. If the listener.name.for.controller is set, the broker-controller
> > >> communication will go through that listener.
> > >> 2. Otherwise, the controller traffic falls back to use
> > >> inter.broker.listener.name or inter.broker.security.protocol, which
> is
> > the
> > >> current behavior.
> > >>
> > >> Regarding updating the security protocol with one line change v.s
> > two-lines
> > >> change, I am a little confused, can you elaborate?
> > >>
> > >> Regarding the possibility of hurry and misreading. It is the system
> > admin's
> > >> responsibility to configure the right listener to ensure that
> different
> > >> kinds of traffic are using the correct endpoints. So I think it is
> > better
> > >> that we always follow the same of convention instead of doing it in
> > >> different ways.
> > >>
> > >> Thanks,
> > >>
> > >> Jiangjie (Becket) Qin
> > >>
> > >>
> > >>
> > >> On Fri, Aug 17, 2018 at 4:34 AM, Lucas Wang 
> > wrote:
> > >>
> > >>> Thanks for the review, Becket.
> > >>>
> > >>> (1) After comparing the two approaches, I still feel the current
> > writeup
> > >> is
> > >>> a little better.
> > >>> a. The current writeup asks for an explicit endpoint while reusing
> the
> > >>> existing "inter.broker.listener.name" with the exactly same
> semantic,
> > >>> and your proposed change asks for a new listener name for controller
> > >> while
> > >>> reusing the existing "advertised.listeners" config with a slight
> > semantic
> > >>> change since a new controller endpoint needs to be added to it.
> > >>> Hence conceptually the current 

Re: [VOTE] KIP-340: Allow kafka-reassign-partitions.sh and kafka-log-dirs.sh to take admin client property file

2018-07-21 Thread Joel Koshy
+1

On Tue, Jul 17, 2018 at 1:42 PM, Ted Yu  wrote:

> +1
>
> On Tue, Jul 17, 2018 at 1:40 PM Jason Gustafson 
> wrote:
>
> > +1. This is useful (though the naming inconsistencies in the tools are
> > vexing, as always).
> >
> > -Jason
> >
> > On Tue, Jul 17, 2018 at 12:24 PM, Dong Lin  wrote:
> >
> > > Hi all,
> > >
> > > It seems that there is no further concern with the KIP-304. I guess it
> is
> > > time to start the voting thread.
> > >
> > > The KIP can be found at
> > > https://cwiki.apache.org/confluence/display/KAFKA/KIP-340%
> > > 3A+Allow+kafka-reassign-partitions.sh+and+kafka-log-dirs.sh+
> > > to+take+admin+client+property+file
> > > . This KIP provides a way to allow kafka-reassign-partitions.sh and
> > > kafka-log-dirs.sh to talk to broker over SSL.
> > >
> > > Cheers,
> > > Dong
> > >
> >
>


Re: [DISCUSS] KIP-291: Have separate queues for control requests and data requests

2018-07-19 Thread Joel Koshy
Good example. I think this scenario can occur in the current code as well
but with even lower probability given that there are other non-controller
requests interleaved. It is still sketchy though and I think a safer
approach would be separate queues and pinning controller request handling
to one handler thread.

On Wed, Jul 18, 2018 at 11:12 PM, Dong Lin  wrote:

> Hey Becket,
>
> I think you are right that there may be out-of-order processing. However,
> it seems that out-of-order processing may also happen even if we use a
> separate queue.
>
> Here is the example:
>
> - Controller sends R1 and got disconnected before receiving response. Then
> it reconnects and sends R2. Both requests now stay in the controller
> request queue in the order they are sent.
> - thread1 takes R1_a from the request queue and then thread2 takes R2 from
> the request queue almost at the same time.
> - So R1_a and R2 are processed in parallel. There is chance that R2's
> processing is completed before R1.
>
> If out-of-order processing can happen for both approaches with very low
> probability, it may not be worthwhile to add the extra queue. What do you
> think?
>
> Thanks,
> Dong
>
>
> On Wed, Jul 18, 2018 at 6:17 PM, Becket Qin  wrote:
>
> > Hi Mayuresh/Joel,
> >
> > Using the request channel as a dequeue was bright up some time ago when
> we
> > initially thinking of prioritizing the request. The concern was that the
> > controller requests are supposed to be processed in order. If we can
> ensure
> > that there is one controller request in the request channel, the order is
> > not a concern. But in cases that there are more than one controller
> request
> > inserted into the queue, the controller request order may change and
> cause
> > problem. For example, think about the following sequence:
> > 1. Controller successfully sent a request R1 to broker
> > 2. Broker receives R1 and put the request to the head of the request
> queue.
> > 3. Controller to broker connection failed and the controller reconnected
> to
> > the broker.
> > 4. Controller sends a request R2 to the broker
> > 5. Broker receives R2 and add it to the head of the request queue.
> > Now on the broker side, R2 will be processed before R1 is processed,
> which
> > may cause problem.
> >
> > Thanks,
> >
> > Jiangjie (Becket) Qin
> >
> >
> >
> > On Thu, Jul 19, 2018 at 3:23 AM, Joel Koshy  wrote:
> >
> > > @Mayuresh - I like your idea. It appears to be a simpler less invasive
> > > alternative and it should work. Jun/Becket/others, do you see any
> > pitfalls
> > > with this approach?
> > >
> > > On Wed, Jul 18, 2018 at 12:03 PM, Lucas Wang 
> > > wrote:
> > >
> > > > @Mayuresh,
> > > > That's a very interesting idea that I haven't thought before.
> > > > It seems to solve our problem at hand pretty well, and also
> > > > avoids the need to have a new size metric and capacity config
> > > > for the controller request queue. In fact, if we were to adopt
> > > > this design, there is no public interface change, and we
> > > > probably don't need a KIP.
> > > > Also implementation wise, it seems
> > > > the java class LinkedBlockingQueue can readily satisfy the
> requirement
> > > > by supporting a capacity, and also allowing inserting at both ends.
> > > >
> > > > My only concern is that this design is tied to the coincidence that
> > > > we have two request priorities and there are two ends to a deque.
> > > > Hence by using the proposed design, it seems the network layer is
> > > > more tightly coupled with upper layer logic, e.g. if we were to add
> > > > an extra priority level in the future for some reason, we would
> > probably
> > > > need to go back to the design of separate queues, one for each
> priority
> > > > level.
> > > >
> > > > In summary, I'm ok with both designs and lean toward your suggested
> > > > approach.
> > > > Let's hear what others think.
> > > >
> > > > @Becket,
> > > > In light of Mayuresh's suggested new design, I'm answering your
> > question
> > > > only in the context
> > > > of the current KIP design: I think your suggestion makes sense, and
> I'm
> > > ok
> > > > with removing the capacity config and
> > > > just relying on the default value of 20 being sufficient enough.
> > > >
> > > > Thanks,
> > > 

Re: [DISCUSS] KIP-291: Have separate queues for control requests and data requests

2018-07-18 Thread Joel Koshy
@Mayuresh - I like your idea. It appears to be a simpler less invasive
alternative and it should work. Jun/Becket/others, do you see any pitfalls
with this approach?

On Wed, Jul 18, 2018 at 12:03 PM, Lucas Wang  wrote:

> @Mayuresh,
> That's a very interesting idea that I haven't thought before.
> It seems to solve our problem at hand pretty well, and also
> avoids the need to have a new size metric and capacity config
> for the controller request queue. In fact, if we were to adopt
> this design, there is no public interface change, and we
> probably don't need a KIP.
> Also implementation wise, it seems
> the java class LinkedBlockingQueue can readily satisfy the requirement
> by supporting a capacity, and also allowing inserting at both ends.
>
> My only concern is that this design is tied to the coincidence that
> we have two request priorities and there are two ends to a deque.
> Hence by using the proposed design, it seems the network layer is
> more tightly coupled with upper layer logic, e.g. if we were to add
> an extra priority level in the future for some reason, we would probably
> need to go back to the design of separate queues, one for each priority
> level.
>
> In summary, I'm ok with both designs and lean toward your suggested
> approach.
> Let's hear what others think.
>
> @Becket,
> In light of Mayuresh's suggested new design, I'm answering your question
> only in the context
> of the current KIP design: I think your suggestion makes sense, and I'm ok
> with removing the capacity config and
> just relying on the default value of 20 being sufficient enough.
>
> Thanks,
> Lucas
>
>
>
>
>
>
>
>
>
>
> On Wed, Jul 18, 2018 at 9:57 AM, Mayuresh Gharat <
> gharatmayures...@gmail.com
> > wrote:
>
> > Hi Lucas,
> >
> > Seems like the main intent here is to prioritize the controller request
> > over any other requests.
> > In that case, we can change the request queue to a dequeue, where you
> > always insert the normal requests (produce, consume,..etc) to the end of
> > the dequeue, but if its a controller request, you insert it to the head
> of
> > the queue. This ensures that the controller request will be given higher
> > priority over other requests.
> >
> > Also since we only read one request from the socket and mute it and only
> > unmute it after handling the request, this would ensure that we don't
> > handle controller requests out of order.
> >
> > With this approach we can avoid the second queue and the additional
> config
> > for the size of the queue.
> >
> > What do you think ?
> >
> > Thanks,
> >
> > Mayuresh
> >
> >
> > On Wed, Jul 18, 2018 at 3:05 AM Becket Qin  wrote:
> >
> > > Hey Joel,
> > >
> > > Thank for the detail explanation. I agree the current design makes
> sense.
> > > My confusion is about whether the new config for the controller queue
> > > capacity is necessary. I cannot think of a case in which users would
> > change
> > > it.
> > >
> > > Thanks,
> > >
> > > Jiangjie (Becket) Qin
> > >
> > > On Wed, Jul 18, 2018 at 6:00 PM, Becket Qin 
> > wrote:
> > >
> > > > Hi Lucas,
> > > >
> > > > I guess my question can be rephrased to "do we expect user to ever
> > change
> > > > the controller request queue capacity"? If we agree that 20 is
> already
> > a
> > > > very generous default number and we do not expect user to change it,
> is
> > > it
> > > > still necessary to expose this as a config?
> > > >
> > > > Thanks,
> > > >
> > > > Jiangjie (Becket) Qin
> > > >
> > > > On Wed, Jul 18, 2018 at 2:29 AM, Lucas Wang 
> > > wrote:
> > > >
> > > >> @Becket
> > > >> 1. Thanks for the comment. You are right that normally there should
> be
> > > >> just
> > > >> one controller request because of muting,
> > > >> and I had NOT intended to say there would be many enqueued
> controller
> > > >> requests.
> > > >> I went through the KIP again, and I'm not sure which part conveys
> that
> > > >> info.
> > > >> I'd be happy to revise if you point it out the section.
> > > >>
> > > >> 2. Though it should not happen in normal conditions, the current
> > design
> > > >> does not preclude multiple controllers running
> > > >> at the same time, hence if we don't have the controller queue
> capacity
> > > >> config and simply make its capacity to be 1,
> > > >> network threads handling requests from different controllers will be
> > > >> blocked during those troublesome times,
> > > >> which is probably not what we want. On the other hand, adding the
> > extra
> > > >> config with a default value, say 20, guards us from issues in those
> > > >> troublesome times, and IMO there isn't much downside of adding the
> > extra
> > > >> config.
> > > >>
> > > >> @Mayuresh
> > > >> Good catch, this sentence is an obsolete statement based on a
> previous
> > > >> design. I've revised the wording in the KIP.
> > > >>
> > > >> Thanks,
> > > >> Lucas
> > > >>
> > > >> On Tue, Jul 17, 2018 at 10:33 AM, Mayuresh Gharat <
> > > >> gharatmayures...@gmail.com> wrote:
> > > >>
> > > >> > Hi Lucas,
> > > 

[jira] [Resolved] (KAFKA-5098) KafkaProducer.send() blocks and generates TimeoutException if topic name has illegal char

2018-07-17 Thread Joel Koshy (JIRA)


 [ 
https://issues.apache.org/jira/browse/KAFKA-5098?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Joel Koshy resolved KAFKA-5098.
---
Resolution: Fixed

> KafkaProducer.send() blocks and generates TimeoutException if topic name has 
> illegal char
> -
>
> Key: KAFKA-5098
> URL: https://issues.apache.org/jira/browse/KAFKA-5098
> Project: Kafka
>  Issue Type: Bug
>  Components: producer 
>Affects Versions: 0.10.2.0
> Environment: Java client running against server using 
> wurstmeister/kafka Docker image.
>Reporter: Jeff Larsen
>Assignee: Ahmed Al-Mehdi
>Priority: Major
> Fix For: 2.1.0
>
>
> The server is running with auto create enabled. If we try to publish to a 
> topic with a forward slash in the name, the call blocks and we get a 
> TimeoutException in the Callback. I would expect it to return immediately 
> with an InvalidTopicException.
> There are other blocking issues that have been reported which may be related 
> to some degree, but this particular cause seems unrelated.
> Sample code:
> {code}
> import org.apache.kafka.clients.producer.*;
> import java.util.*;
> public class KafkaProducerUnexpectedBlockingAndTimeoutException {
>   public static void main(String[] args) {
> Properties props = new Properties();
> props.put("bootstrap.servers", "kafka.example.com:9092");
> props.put("key.serializer", 
> "org.apache.kafka.common.serialization.StringSerializer");
> props.put("value.serializer", 
> "org.apache.kafka.common.serialization.StringSerializer");
> props.put("max.block.ms", 1); // 10 seconds should illustrate our 
> point
> String separator = "/";
> //String separator = "_";
> try (Producer producer = new KafkaProducer<>(props)) {
>   System.out.println("Calling KafkaProducer.send() at " + new Date());
>   producer.send(
>   new ProducerRecord("abc" + separator + 
> "someStreamName",
>   "Not expecting a TimeoutException here"),
>   new Callback() {
> @Override
> public void onCompletion(RecordMetadata metadata, Exception e) {
>   if (e != null) {
> System.out.println(e.toString());
>   }
> }
>   });
>   System.out.println("KafkaProducer.send() completed at " + new Date());
> }
>   }
> }
> {code}
> Switching to the underscore separator in the above example works as expected.
> Mea culpa: We neglected to research allowed chars in a topic name, but the 
> TimeoutException we encountered did not help point us in the right direction.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


Re: [VOTE] KIP-291: Have separate queues for control requests and data requests

2018-07-17 Thread Joel Koshy
+1 on the KIP.

(I'm not sure we actually necessary to introduce the condition variables
for the concern that Jun raised, but it's an implementation detail that we
can defer to a discussion in the PR.)

On Sat, Jul 14, 2018 at 10:45 PM, Lucas Wang  wrote:

> Hi Jun,
>
> I agree by using the conditional variables, there is no need to add such a
> new config.
> Also thanks for approving this KIP.
>
> Lucas
>


Re: [DISCUSS] KIP-291: Have separate queues for control requests and data requests

2018-07-17 Thread Joel Koshy
Hey Becket - good point. Lucas and I were talking about this offline last
week. It is true that there is only one request in flight for processing.
However, there may be more during a controller failover but it should not
be very high - basically the maximum number of controller failures that can
occur whilst handling any controller request.

This is in fact the more significant issue that may not have been fully
captured in the KIP motivation. i.e., right now, the server processes one
request at a time. Assuming there is moderate to heavy load on the broker,
while it is handling a controller request, it will accumulate a deluge of
regular client requests that will enter the request queue. After the sole
controller request is handled it will read the next controller request into
the request queue. So we end up with a single controller request, then a
potentially large number of regular requests, then a single controller
request, then a large number of regular requests and so on. This can become
especially problematic when you have many small controller requests (say if
you are steadily moving a few partitions at a time) spread over a short
span of time). With the prioritized queue this changes to: handle a
controller request, handle a vector of regular requests (where the vector
size is the number of request handler threads), handle the next controller
request, and so on. The maximum time between handling adjacent controller
requests will be within (*min(local time) of the vector of regular requests*).
So it helps significantly. We also considered the possibility of NOT
muting/unmuting the controller socket to help address this. This would also
mean we would need to pin the handling of all controller requests to one
specific request handler thread in order to ensure order. That change is
probably not worth the effort and we expect the current proposal to be
adequate.

Thanks,

Joel

On Tue, Jul 17, 2018 at 5:06 AM, Becket Qin  wrote:

> Thanks for the KIP, Lucas. Separating the control plane from the data plane
> makes a lot of sense.
>
> In the KIP you mentioned that the controller request queue may have many
> requests in it. Will this be a common case? The controller requests still
> goes through the SocketServer. The SocketServer will mute the channel once
> a request is read and put into the request channel. So assuming there is
> only one connection between controller and each broker, on the broker side,
> there should be only one controller request in the controller request queue
> at any given time. If that is the case, do we need a separate controller
> request queue capacity config? The default value 20 means that we expect
> there are 20 controller switches to happen in a short period of time. I am
> not sure whether someone should increase the controller request queue
> capacity to handle such case, as it seems indicating something very wrong
> has happened.
>
> Thanks,
>
> Jiangjie (Becket) Qin
>
>
> On Fri, Jul 13, 2018 at 1:10 PM, Dong Lin  wrote:
>
> > Thanks for the update Lucas.
> >
> > I think the motivation section is intuitive. It will be good to learn
> more
> > about the comments from other reviewers.
> >
> > On Thu, Jul 12, 2018 at 9:48 PM, Lucas Wang 
> wrote:
> >
> > > Hi Dong,
> > >
> > > I've updated the motivation section of the KIP by explaining the cases
> > that
> > > would have user impacts.
> > > Please take a look at let me know your comments.
> > >
> > > Thanks,
> > > Lucas
> > >
> > > On Mon, Jul 9, 2018 at 5:53 PM, Lucas Wang 
> > wrote:
> > >
> > > > Hi Dong,
> > > >
> > > > The simulation of disk being slow is merely for me to easily
> construct
> > a
> > > > testing scenario
> > > > with a backlog of produce requests. In production, other than the
> disk
> > > > being slow, a backlog of
> > > > produce requests may also be caused by high produce QPS.
> > > > In that case, we may not want to kill the broker and that's when this
> > KIP
> > > > can be useful, both for JBOD
> > > > and non-JBOD setup.
> > > >
> > > > Going back to your previous question about each ProduceRequest
> covering
> > > 20
> > > > partitions that are randomly
> > > > distributed, let's say a LeaderAndIsr request is enqueued that tries
> to
> > > > switch the current broker, say broker0, from leader to follower
> > > > *for one of the partitions*, say *test-0*. For the sake of argument,
> > > > let's also assume the other brokers, say broker1, have *stopped*
> > fetching
> > > > from
> > > > the current broker, i.e. broker0.
> > > > 1. If the enqueued produce requests have acks =  -1 (ALL)
> > > >   1.1 without this KIP, the ProduceRequests ahead of LeaderAndISR
> will
> > be
> > > > put into the purgatory,
> > > > and since they'll never be replicated to other brokers
> (because
> > > of
> > > > the assumption made above), they will
> > > > be completed either when the LeaderAndISR request is
> processed
> > or
> > > > when the timeout happens.
> > > >   1.2 With this KIP, 

[ANNOUNCE] New Kafka PMC member: Jiangjie (Becket) Qin

2017-08-23 Thread Joel Koshy
Hi everyone,

Jiangjie (Becket) Qin has been a Kafka committer in October 2016 and has
contributed significantly to several major patches, reviews and discussions
since. I am glad to announce that Becket is now a member of the Apache Kafka
 PMC.

Congratulations Becket!

Joel


Re: [VOTE] KIP-164 Add unavailablePartitionCount and per-partition Unavailable metrics

2017-07-25 Thread Joel Koshy
+1

On Thu, Jul 20, 2017 at 10:30 AM, Becket Qin  wrote:

> +1, Thanks for the KIP.
>
> On Thu, Jul 20, 2017 at 7:08 AM, Ismael Juma  wrote:
>
> > Thanks for the KIP, +1 (binding).
> >
> > On Thu, Jun 1, 2017 at 9:44 AM, Dong Lin  wrote:
> >
> > > Hi all,
> > >
> > > Can you please vote for KIP-164? The KIP can be found at
> > > https://cwiki.apache.org/confluence/display/KAFKA/KIP-164-+Add+
> > > UnderMinIsrPartitionCount+and+per-partition+UnderMinIsr+metrics
> > > .
> > >
> > > Thanks,
> > > Dong
> > >
> >
>


Re: [VOTE] KIP-168: Add TotalTopicCount metric per cluster

2017-07-13 Thread Joel Koshy
+1

On Thu, Jul 13, 2017 at 12:24 PM, Vahid S Hashemian <
vahidhashem...@us.ibm.com> wrote:

> +1 (non-binding)
>
>
>
>
> From:   Dong Lin 
> To: dev@kafka.apache.org
> Date:   07/12/2017 10:43 AM
> Subject:Re: [VOTE] KIP-168: Add TotalTopicCount metric per cluster
>
>
>
> +1 (non-binding)
>
> On Wed, Jul 12, 2017 at 10:04 AM, Abhishek Mendhekar <
> abhishek.mendhe...@gmail.com> wrote:
>
> > Hello Kafka Dev,
> >
> > I would like to get votes on KIP-168. Here is the updated proposal based
> on
> > the discussion so far.
> >
> > https://cwiki.apache.org/confluence/display/KAFKA/KIP-
> > 168%3A+Add+GlobalTopicCount+metric+per+cluster
> >
> > Thanks,
> > Abhishek
> >
> > Email Thread - http://mail-archives.apache.org/mod_mbox/kafka-dev/201706
> .
> > mbox/%3CCAMcwe-ugep-UiSn9TkKEMwwTM%3DAzGC4jPro9LnyYRezyZg_NKA%
> > 40mail.gmail.com%3E
> >
> > On Fri, Jun 23, 2017 at 5:16 AM, Mickael Maison
> 
> > wrote:
> >
> > > +1 (non-binding)
> > > Thanks
> > >
> > > On Thu, Jun 22, 2017 at 6:07 PM, Onur Karaman
> > >  wrote:
> > > > +1
> > > >
> > > > On Thu, Jun 22, 2017 at 10:05 AM, Dong Lin 
> > wrote:
> > > >
> > > >> Thanks for the KIP. +1 (non-binding)
> > > >>
> > > >> On Wed, Jun 21, 2017 at 1:17 PM, Abhishek Mendhekar <
> > > >> abhishek.mendhe...@gmail.com> wrote:
> > > >>
> > > >> > Hi Kafka Dev,
> > > >> >
> > > >> > I did like to start the voting on -
> > > >> > https://cwiki.apache.org/confluence/display/KAFKA/KIP-
> > > >> > 168%3A+Add+TotalTopicCount+metric+per+cluster
> > > >> >
> > > >> > Discussions will continue on -
> > > >> > http://mail-archives.apache.org/mod_mbox/kafka-dev/201706.
> > > >> > mbox/%3CCAMcwe-ugep-UiSn9TkKEMwwTM%3DAzGC4jPro9LnyYRezyZg_NKA%
> > > >> > 40mail.gmail.com%3E
> > > >> >
> > > >> > Thanks,
> > > >> > Abhishek
> > > >> >
> > > >>
> > >
> >
> >
> >
> > --
> > Abhishek Mendhekar
> > abhishek.mendhe...@gmail.com | 818.263.7030
> >
>
>
>
>
>


Re: [DISCUSS] KIP-168: Add TotalTopicCount metric per cluster

2017-06-26 Thread Joel Koshy
+1 on the original KIP
I actually prefer TotalTopicCount because it makes it clearer that it is a
cluster-wide count. OfflinePartitionsCount is global to the cluster (but it
is fairly clear that the controller is SoT on that). TopicCount on the
other hand could be misread as a local count since PartitinCount, URP, are
all local counts.

On Thu, Jun 22, 2017 at 9:20 AM, Abhishek Mendhekar <
abhishek.mendhe...@gmail.com> wrote:

> Hi Kafka Dev,
>
> Below is the link to the update KIP proposal.
> https://cwiki.apache.org/confluence/display/KAFKA/KIP-
> 168%3A+Add+TopicCount+metric+per+cluster
>
> Thanks,
> Abhishek
>
> On Wed, Jun 21, 2017 at 3:55 PM, Abhishek Mendhekar <
> abhishek.mendhe...@gmail.com> wrote:
>
> > Hi Dong,
> >
> > Thanks for the suggestion!
> >
> > I think TopicCount sounds reasonable to me and it definitely seems
> > consistent with the other metric names. I will update the proposal to
> > reflect this change.
> >
> > Thanks,
> > Abhishek
> >
> > On Wed, Jun 21, 2017 at 2:17 PM, Dong Lin  wrote:
> >
> >> Hey Abhishek,
> >>
> >> I think the metric is useful. Sorry for being late on this. I am
> wondering
> >> if TopicCount is a better name than TotalTopicCount, given that we
> >> currently have metric with names OfflinePartitionsCount, LeaderCount,
> >> PartitionCount etc.
> >>
> >> Thanks,
> >> Dong
> >>
> >> On Fri, Jun 16, 2017 at 9:09 AM, Abhishek Mendhekar <
> >> abhishek.mendhe...@gmail.com> wrote:
> >>
> >> > Hi Kafka Dev,
> >> >
> >> > I created KIP-168 to propose adding a metric to emit total topic count
> >> > in a cluster. The metric will be emited by the controller.
> >> >
> >> > The KIP can be found here
> >> > (https://cwiki.apache.org/confluence/display/KAFKA/KIP-
> >> > 168%3A+Add+TotalTopicCount+metric+per+cluster)
> >> > and the assciated JIRA improvement is KAFKA-5461
> >> > (https://issues.apache.org/jira/browse/KAFKA-5461)
> >> >
> >> > Appreciate all the comments.
> >> >
> >> > Best,
> >> >
> >> > Abhishek
> >> >
> >>
> >
> >
> >
> > --
> > Abhishek Mendhekar
> > abhishek.mendhe...@gmail.com | 818.263.7030 <(818)%20263-7030>
> >
>
>
>
> --
> Abhishek Mendhekar
> abhishek.mendhe...@gmail.com | 818.263.7030
>


Re: [VOTE] KIP-113 - Support replicas movement between log directories

2017-06-03 Thread Joel Koshy
+1

Few additional comments (most of which we discussed offline):

   -

   This was summarized in the “discuss” thread, but it is worth recording
   in the KIP itself that the LEO in DescribeDirsResponse is useful to measure
   progress of the move.
   -

   num.replica.move.threads defaults to # log directories - perhaps note
   that we typically expect a 1-1 mapping to disks for this to work well.
   -

   Can you clarify in the KIP whether intra.broker.throttled.rate is
   per-broker or per-thread?
   -

   Reassignment JSON: can log_dirs be made optional? i.e., its absence
   would mean “any”
   -

   Can you also explicitly state somewhere that “any” translates to
   round-robin assignment today?


On Mon, Apr 3, 2017 at 9:49 AM, Dong Lin  wrote:

> Hi all,
>
> It seems that there is no further concern with the KIP-113. We would like
> to start the voting process. The KIP can be found at
> *https://cwiki.apache.org/confluence/display/KAFKA/KIP-
> 113%3A+Support+replicas+movement+between+log+directories
>  113%3A+Support+replicas+movement+between+log+directories>.*
>
> Thanks,
> Dong
>


Re: [VOTE] KIP-144: Exponential backoff for broker reconnect attempts

2017-05-08 Thread Joel Koshy
+1

On Mon, May 8, 2017 at 11:07 AM, Colin McCabe  wrote:

> +1 (non-binding)
>
>
>
> On Sat, May 6, 2017, at 11:13, Dana Powers wrote:
> > +1 !
> >
> > On May 6, 2017 4:49 AM, "Edoardo Comar"  wrote:
> >
> > > +1 (non binding)
> > > thanks
> > > --
> > > Edoardo Comar
> > > IBM MessageHub
> > > eco...@uk.ibm.com
> > > IBM UK Ltd, Hursley Park, SO21 2JN
> > >
> > > IBM United Kingdom Limited Registered in England and Wales with number
> > > 741598 Registered office: PO Box 41, North Harbour, Portsmouth, Hants.
> PO6
> > > 3AU
> > >
> > >
> > >
> > > From:   Jay Kreps 
> > > To: dev@kafka.apache.org
> > > Date:   05/05/2017 23:19
> > > Subject:Re: [VOTE] KIP-144: Exponential backoff for broker
> > > reconnect attempts
> > >
> > >
> > >
> > > +1
> > > On Fri, May 5, 2017 at 7:29 PM Sriram Subramanian 
> > > wrote:
> > >
> > > > +1
> > > >
> > > > On Fri, May 5, 2017 at 6:04 PM, Gwen Shapira 
> wrote:
> > > >
> > > > > +1
> > > > >
> > > > > On Fri, May 5, 2017 at 3:32 PM, Ismael Juma 
> wrote:
> > > > >
> > > > > > Hi all,
> > > > > >
> > > > > > Given the simple and non controversial nature of the KIP, I would
> > > like
> > > > to
> > > > > > start the voting process for KIP-144: Exponential backoff for
> broker
> > > > > > reconnect attempts:
> > > > > >
> > > > > > https://cwiki.apache.org/confluence/display/KAFKA/KIP-
> > > > > 144%3A+Exponential+
> > > > > > backoff+for+broker+reconnect+attempts
> > > > > >
> > > > > > The vote will run for a minimum of 72 hours.
> > > > > >
> > > > > > Thanks,
> > > > > > Ismael
> > > > > >
> > > > >
> > > > >
> > > > >
> > > > > --
> > > > > *Gwen Shapira*
> > > > > Product Manager | Confluent
> > > > > 650.450.2760 | @gwenshap
> > > > > Follow us: Twitter  | blog
> > > > > 
> > > > >
> > > >
> > >
> > >
> > >
> > > Unless stated otherwise above:
> > > IBM United Kingdom Limited - Registered in England and Wales with
> number
> > > 741598.
> > > Registered office: PO Box 41, North Harbour, Portsmouth, Hampshire PO6
> 3AU
> > >
>


Re: [DISCUSS] KIP-143: Controller Health Metrics

2017-05-08 Thread Joel Koshy
Hi Ismael,


> > What about a broker that is not the controller? Would you need a separate
> > idle-not-controller state?
>
>
> Do we need a separate state or can users just use the ActiveControllerCount
> metric to check if the broker is the controller?
>

Sure - the ACC metric should be sufficient.


> > Given that most of the state changes are short
> > we would just see blips in the best case and nothing in the worst case
> > (depending on how often metrics get sampled). It would only help if you
> > want to visually detect any transitions that are taking an inordinate
> > duration.
> >
>
> Right. Do you think this is not worth it?
>

Not sure - I felt the RateAndTime sensors are sufficient and would give
more intuitive results. E.g., users would emit the state metric as a gauge
but the underlying metric reporter would need to sample the metric
frequently enough to capture state changes. i.e., for most metrics backends
you would likely see a flat line even through a series of (fast) state
changes.


>
> Ok - my thought was since we are already using kafka-metrics for quotas and
> > selector metrics we could just do the same for this (and any *new*
> metrics
> > on the broker).
> >
>
> I don't have a strong opinion either way as I think both options have pros
> and cons. It seems like we don't have the concept of a Gauge in Kafka
> Metrics at the moment, so it seems like it would be easier to do it via
> Yammer metrics while we discuss what's the best way to unify all the
> metrics again.
>

Sounds good.


Re: [VOTE] KIP-153 (separating replication traffic from BytesOutPerSec metric)

2017-05-08 Thread Joel Koshy
+1

On Mon, May 8, 2017 at 8:30 AM, Roger Hoover  wrote:

> +1
>
> Sent from my iPhone
>
> > On May 8, 2017, at 5:00 AM, Edoardo Comar  wrote:
> >
> > +1
> > Many thanks Jun
> > --
> > Edoardo Comar
> > IBM MessageHub
> > eco...@uk.ibm.com
> > IBM UK Ltd, Hursley Park, SO21 2JN
> >
> > IBM United Kingdom Limited Registered in England and Wales with number
> > 741598 Registered office: PO Box 41, North Harbour, Portsmouth, Hants.
> PO6
> > 3AU
> >
> >
> >
> > From:   Jun Rao 
> > To: "dev@kafka.apache.org" 
> > Date:   07/05/2017 22:40
> > Subject:[VOTE] KIP-153 (separating replication traffic from
> > BytesOutPerSec metric)
> >
> >
> >
> > Hi, Everyone,
> >
> > Since this is a relatively simple change, I would like to start the
> voting
> > process for KIP-153 : Include only client traffic in BytesOutPerSec
> > metric.
> >
> > https://cwiki.apache.org/confluence/display/KAFKA/KIP-153+%
> > 3A+Include+only+client+traffic+in+BytesOutPerSec+metric
> >
> > The vote will run for a minimum of 72 hours.
> >
> > Thanks,
> >
> > Jun
> >
> >
> >
> > Unless stated otherwise above:
> > IBM United Kingdom Limited - Registered in England and Wales with number
> > 741598.
> > Registered office: PO Box 41, North Harbour, Portsmouth, Hampshire PO6
> 3AU
>


Re: [VOTE] KIP-126 - Allow KafkaProducer to split and resend oversized batches.

2017-05-04 Thread Joel Koshy
+1

On Thu, May 4, 2017 at 7:00 PM Becket Qin  wrote:

> Bump.
>
> On Tue, Apr 25, 2017 at 3:17 PM, Bill Bejeck  wrote:
>
> > +1
> >
> > On Tue, Apr 25, 2017 at 4:43 PM, Dong Lin  wrote:
> >
> > > +1 (non-binding)
> > >
> > > On Tue, Apr 25, 2017 at 12:33 PM, Becket Qin 
> > wrote:
> > >
> > > > Hi,
> > > >
> > > > I would like to start the voting on KIP-126. The KIP is intended to
> > solve
> > > > the problem that RecordTooLargeExceptions are thrown from the
> producer
> > > due
> > > > to inaccurate estimation of the compression ratio. The solution is to
> > > split
> > > > and resend the over sized batches if possible. A new metric is
> > introduced
> > > > to the producer to show the batch split rate.
> > > >
> > > > The KIP wiki is following:
> > > > *https://cwiki.apache.org/confluence/pages/viewpage.
> > > action?pageId=68715855
> > > >  > > action?pageId=68715855
> > > > >*
> > > >
> > > > We have been running a producer with this patch for some time in our
> > > mirror
> > > > maker and it looks working fine.
> > > >
> > > > Thanks,
> > > >
> > > > Jiangjie (Becket) Qin
> > > >
> > >
> >
>
-- 
Sent from Gmail Mobile


Re: [DISCUSS] KIP-143: Controller Health Metrics

2017-05-03 Thread Joel Koshy
On Wed, May 3, 2017 at 10:54 AM, Onur Karaman <onurkaraman.apa...@gmail.com>
wrote:

> Regarding the ControllerState and the potential for overlap, I think it
> depends on our definition of controller state. While KAFKA-5028 allows only
> a single ControllerEvent to be processed at a time, it still allows
> interleavings for long-lasting actions like partition reassignment and
> topic deletion. For instance, a topic can get created while another topic
> is undergoing partition reassignment. In that sense, there is overlap.
> However, in the sense of the ControllerEvents being processed, there can be
> no overlap.
>

Yes - that is roughly what I was thinking (although deletes are no longer
long running). Also, what is the "steady-state" controller state? Idle?
What about a broker that is not the controller? Would you need a separate
idle-not-controller state? Given that most of the state changes are short
we would just see blips in the best case and nothing in the worst case
(depending on how often metrics get sampled). It would only help if you
want to visually detect any transitions that are taking an inordinate
duration.



> > 1. Yes, the long term goal is to migrate the metrics on the broker to
>> > kafka-metrics. Since many people are using Yammer reporters, we probably
>> > need to support a few popular ones in kafka-metrics before migrating.
>> Until
>> > that happens, we probably want to stick with the Yammer metrics on the
>> > server side unless we depend on features from kafka-metrics (e.g,
>> quota).
>>
>
Ok - my thought was since we are already using kafka-metrics for quotas and
selector metrics we could just do the same for this (and any *new* metrics
on the broker).


> 4. Metrics #2 and #3. The issue with relying on metric #1 is that the
>> > latter is sensitive to the frequency of metric collection. For example,
>> if
>> > the starting of the controller takes 30 secs and the metric is only
>> > collected once a minute, one may not know the latency with just metric
>> #1,
>> > but will know the latency with metrics #2 and #3. Are you concerned
>> about
>> > the memory overhead of histograms? It doesn't seem that a couple of more
>> > histograms will hurt.
>>
>
No I don't have concerns about the histograms - just wondering if it is
useful enough to have these in the first place, but your summary makes
sense.

Joel


> >
>> > Hi, Isamel,
>> >
>> > Thanks the for proposal. A couple of more comments.,
>> >
>> > 10. It would be useful to add a new metrics for the controller queue
>> size.
>> > kafka.controller:type=ControllerStats,name=QueueSize
>> >
>> > 11. It would also be useful to know how long an event is waiting in the
>> > controller queue before being processing. Perhaps, we can add a
>> histogram
>> > metric like the following.
>> > kafka.controller:type=ControllerStats,name=QueueTimeMs
>> >
>> > Jun
>> >
>> > On Thu, Apr 27, 2017 at 11:39 AM, Joel Koshy <jjkosh...@gmail.com>
>> wrote:
>> >
>> > > Thanks for the KIP - couple of comments:
>> > > - Do you intend to actually use yammer metrics? or use kafka-metrics
>> and
>> > > split the timer into an explicit rate and time? I think long term we
>> > ought
>> > > to move off yammer and use kafka-metrics only. Actually either is
>> fine,
>> > but
>> > > we should ideally use only one in the long term - and I thought the
>> plan
>> > > was to use kafka-metrics.
>> > > - metric #9 appears to be redundant since we already have per-API
>> request
>> > > rate and time metrics.
>> > > - Same for metric #4, #5 (as there are request stats for
>> > > DeleteTopicRequest - although it is possible for users to trigger
>> deletes
>> > > via ZK)
>> > > - metric #2, #3 are potentially useful, but a bit overkill for a
>> > > histogram. Alternative is to stick to last known value, but that
>> doesn't
>> > > play well with alerts if a high value isn't reset/decayed. Perhaps
>> metric
>> > > #1 would be sufficient to gauge slow start/resignation transitions.
>> > > - metric #1 - some of the states may actually overlap
>> > > - I don't actually understand the semantics of metric #6. Is it rate
>> of
>> > > partition reassignment triggers? does the number of partitions matter?
>> > >
>> > > Joel
>> > >
>> > > On Thu, Apr 27, 2017 at 8:04 AM, Tom Crayford <tcrayf...@h

Re: [DISCUSS] KIP-143: Controller Health Metrics

2017-04-27 Thread Joel Koshy
Thanks for the KIP - couple of comments:
- Do you intend to actually use yammer metrics? or use kafka-metrics and
split the timer into an explicit rate and time? I think long term we ought
to move off yammer and use kafka-metrics only. Actually either is fine, but
we should ideally use only one in the long term - and I thought the plan
was to use kafka-metrics.
- metric #9 appears to be redundant since we already have per-API request
rate and time metrics.
- Same for metric #4, #5 (as there are request stats for DeleteTopicRequest
- although it is possible for users to trigger deletes via ZK)
- metric #2, #3 are potentially useful, but a bit overkill for a histogram.
Alternative is to stick to last known value, but that doesn't play well
with alerts if a high value isn't reset/decayed. Perhaps metric #1 would be
sufficient to gauge slow start/resignation transitions.
- metric #1 - some of the states may actually overlap
- I don't actually understand the semantics of metric #6. Is it rate of
partition reassignment triggers? does the number of partitions matter?

Joel

On Thu, Apr 27, 2017 at 8:04 AM, Tom Crayford  wrote:

> Ismael,
>
> Great, that sounds lovely.
>
> I'd like a `Timer` (using yammer metrics parlance) over how long it took to
> process the event, so we can get at p99 and max times spent processing
> things. Maybe we could even do a log at warning level if event processing
> takes over some timeout?
>
> Thanks
>
> Tom
>
> On Thu, Apr 27, 2017 at 3:59 PM, Ismael Juma  wrote:
>
> > Hi Tom,
> >
> > Yes, the plan is to merge KAFKA-5028 first and then use a lock-free
> > approach for the new  metrics. I considered mentioning that in the KIP
> > given KAFKA-5120, but didn't in the end. I'll add it to make it clear.
> >
> > Regarding locks, they are removed by KAFKA-5028, as you say. So, if I
> > understand correctly, you are suggesting an event processing rate metric
> > with event type as a tag? Onur and Jun, what do you think?
> >
> > Ismael
> >
> > On Thu, Apr 27, 2017 at 3:47 PM, Tom Crayford 
> > wrote:
> >
> > > Hi,
> > >
> > > We (Heroku) are very excited about this KIP, as we've struggled a bit
> > with
> > > controller stability recently. Having these additional metrics would be
> > > wonderful.
> > >
> > > I'd like to ensure polling these metrics *doesn't* hold any locks etc,
> > > because, as noted in https://issues.apache.org/jira/browse/KAFKA-5120,
> > > that
> > > lock can be held for quite some time. This may become not an issue as
> of
> > > KAFKA-5028 though.
> > >
> > > Lastly, I'd love to see some metrics around how long the controller
> > spends
> > > inside its lock. We've been tracking an issue (
> > > https://issues.apache.org/jira/browse/KAFKA-5116) where it can hold
> the
> > > lock for many, many minutes in a zk client listener thread when
> > responding
> > > to a single request. I'm not sure how that plays into
> > > https://issues.apache.org/jira/browse/KAFKA-5028 (which I assume will
> > land
> > > before this metrics patch), but it feels like there will be equivalent
> > > problems ("how long does it spend processing any individual message
> from
> > > the queue, broken down by message type").
> > >
> > > These are minor improvements though, the addition of more metrics to
> the
> > > controller is already going to be very helpful.
> > >
> > > Thanks
> > >
> > > Tom Crayford
> > > Heroku Kafka
> > >
> > > On Thu, Apr 27, 2017 at 3:10 PM, Ismael Juma 
> wrote:
> > >
> > > > Hi all,
> > > >
> > > > We've posted "KIP-143: Controller Health Metrics" for discussion:
> > > >
> > > > https://cwiki.apache.org/confluence/display/KAFKA/KIP-
> > > > 143%3A+Controller+Health+Metrics
> > > >
> > > > Please take a look. Your feedback is appreciated.
> > > >
> > > > Thanks,
> > > > Ismael
> > > >
> > >
> >
>


Re: [VOTE] KIP-112 - Handle disk failure for JBOD

2017-04-26 Thread Joel Koshy
+1

Discussed a few edits/improvements with Dong.

- Rather than a blanket (Error != None) condition for detecting offline
replicas you probably want a storage exception-specific error code.

- Definitely in favor of improvement #7 and it shouldn’t be too hard to do.
When bouncing with a log directory on a faulty disk, the condition may be
detected while loading logs and you may not have the full list of local
replicas. So a subsequent L request would recreate the replica on the
good disks (which may or may not be what the user wants).

- Another improvement worth investigating is how best to support partition
reassignments even with a bad disk. The wiki hints that this is unnecessary
because reassignments being disallowed with an offline replica is similar
to the current state of handling an offline broker. With JBOD though the
broker with a bad disk does not have to be offline anymore so it should be
possible to support reassignments even with offline replicas. I'm not
suggesting this is trivial, but would better leverage JBOD.

On Wed, Apr 5, 2017 at 5:46 PM, Becket Qin  wrote:

> +1
>
> Thanks for the KIP. Made a pass and had some minor change.
>
> On Mon, Apr 3, 2017 at 3:16 PM, radai  wrote:
>
> > +1, LGTM
> >
> > On Mon, Apr 3, 2017 at 9:49 AM, Dong Lin  wrote:
> >
> > > Hi all,
> > >
> > > It seems that there is no further concern with the KIP-112. We would
> like
> > > to start the voting process. The KIP can be found at
> > > *https://cwiki.apache.org/confluence/display/KAFKA/KIP-
> > > 112%3A+Handle+disk+failure+for+JBOD
> > >  > > 112%3A+Handle+disk+failure+for+JBOD>.*
> > >
> > > Thanks,
> > > Dong
> > >
> >
>


[jira] [Commented] (KAFKA-5011) Replica fetchers may need to down-convert messages during a selective message format upgrade

2017-04-11 Thread Joel Koshy (JIRA)

[ 
https://issues.apache.org/jira/browse/KAFKA-5011?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15964994#comment-15964994
 ] 

Joel Koshy commented on KAFKA-5011:
---

Yes that is correct - it is rare. I think it's reasonable to close this as 
won't fix. Not sure if we need to mention it in docs given that it is extremely 
rare.

> Replica fetchers may need to down-convert messages during a selective message 
> format upgrade
> 
>
> Key: KAFKA-5011
> URL: https://issues.apache.org/jira/browse/KAFKA-5011
> Project: Kafka
>  Issue Type: Bug
>    Reporter: Joel Koshy
>Assignee: Jiangjie Qin
> Fix For: 0.11.0.0
>
>




--
This message was sent by Atlassian JIRA
(v6.3.15#6346)


[jira] [Created] (KAFKA-5011) Replica fetchers may need to down-convert messages during a selective message format upgrade

2017-04-04 Thread Joel Koshy (JIRA)
Joel Koshy created KAFKA-5011:
-

 Summary: Replica fetchers may need to down-convert messages during 
a selective message format upgrade
 Key: KAFKA-5011
 URL: https://issues.apache.org/jira/browse/KAFKA-5011
 Project: Kafka
  Issue Type: Bug
Reporter: Joel Koshy
 Fix For: 0.11.0.0






--
This message was sent by Atlassian JIRA
(v6.3.15#6346)


Re: [VOTE] KIP-82 Add Record Headers

2017-03-22 Thread Joel Koshy
+1

On Tue, Mar 21, 2017 at 5:01 PM, Jason Gustafson  wrote:

> Thanks for the KIP! +1 (binding) from me. Just one nit: can we change
> `Headers.header(key)` to be `Headers.lastHeader(key)`? It's not a
> deal-breaker, but I think it's better to let the name reflect the actual
> behavior as clearly as possible.
>
> -Jason
>
> On Wed, Feb 15, 2017 at 6:10 AM, Jeroen van Disseldorp 
> wrote:
>
> > +1 on introducing the concept of headers, neutral on specific
> > implementation.
> >
> >
> >
> > On 14/02/2017 22:34, Jay Kreps wrote:
> >
> >> Couple of things I think we still need to work out:
> >>
> >> 1. I think we agree about the key, but I think we haven't talked
> about
> >> the value yet. I think if our goal is an open ecosystem of these
> >> header
> >> spread across many plugins from many systems we should consider
> >> making this
> >> a string as well so it can be printed, set via a UI, set in config,
> >> etc.
> >> Basically encouraging pluggable serialization formats here will lead
> >> to a
> >> bit of a tower of babel.
> >> 2. This proposal still includes a pretty big change to our
> >> serialization
> >> and protocol definition layer. Essentially it is introducing an
> >> optional
> >> type, where the format is data dependent. I think this is actually a
> >> big
> >> change though it doesn't seem like it. It means you can no longer
> >> specify
> >> this type with our type definition DSL, and likewise it requires
> >> custom
> >> handling in client libs. This isn't a huge thing, since the Record
> >> definition is custom anyway, but I think this kind of protocol
> >> inconsistency is very non-desirable and ties you to hand-coding
> >> things. I
> >> think the type should instead by [Key Value] in our BNF, where key
> and
> >> value are both short strings as used elsewhere. This brings it in
> >> line with
> >> the rest of the protocol.
> >> 3. Could we get more specific about the exact Java API change to
> >> ProducerRecord, ConsumerRecord, Record, etc?
> >>
> >> -Jay
> >>
> >> On Tue, Feb 14, 2017 at 9:42 AM, Michael Pearce 
> >> wrote:
> >>
> >> Hi all,
> >>>
> >>> We would like to start the voting process for KIP-82 – Add record
> >>> headers.
> >>> The KIP can be found
> >>> at
> >>>
> >>> https://cwiki.apache.org/confluence/display/KAFKA/KIP-
> >>> 82+-+Add+Record+Headers
> >>>
> >>> Discussion thread(s) can be found here:
> >>>
> >>> http://search-hadoop.com/m/Kafka/uyzND1nSTOHTvj81?subj=
> >>> Re+DISCUSS+KIP+82+Add+Record+Headers
> >>> http://search-hadoop.com/m/Kafka/uyzND1Arxt22Tvj81?subj=
> >>> Re+DISCUSS+KIP+82+Add+Record+Headers
> >>> http://search-hadoop.com/?project=Kafka=KIP-82
> >>>
> >>>
> >>>
> >>> Thanks,
> >>> Mike
> >>>
> >>> The information contained in this email is strictly confidential and
> for
> >>> the use of the addressee only, unless otherwise indicated. If you are
> not
> >>> the intended recipient, please do not read, copy, use or disclose to
> >>> others
> >>> this message or any attachment. Please also notify the sender by
> replying
> >>> to this email or by telephone (+44(020 7896 0011) and then delete the
> >>> email
> >>> and any copies of it. Opinions, conclusion (etc) that do not relate to
> >>> the
> >>> official business of this company shall be understood as neither given
> >>> nor
> >>> endorsed by it. IG is a trading name of IG Markets Limited (a company
> >>> registered in England and Wales, company number 04008957) and IG Index
> >>> Limited (a company registered in England and Wales, company number
> >>> 01190902). Registered address at Cannon Bridge House, 25 Dowgate Hill,
> >>> London EC4R 2YA. Both IG Markets Limited (register number 195355) and
> IG
> >>> Index Limited (register number 114059) are authorised and regulated by
> >>> the
> >>> Financial Conduct Authority.
> >>>
> >>>
> >
>


Re: [VOTE] KIP-111 Kafka should preserve the Principal generated by the PrincipalBuilder while processing the request received on socket channel, on the broker.

2017-02-28 Thread Joel Koshy
If we deprecate KafkaPrincipal, then the Authorizer interface will also
need to change - i.e., deprecate the getAcls(KafkaPrincipal) method.

On Tue, Feb 28, 2017 at 10:11 AM, Mayuresh Gharat <
gharatmayures...@gmail.com> wrote:

> Hi Jun/Ismael,
>
> Thanks for the comments.
>
> I agree.
> What I was thinking was, we get the KIP passed now and wait till major
> kafka version release. We can then make this change, but for now we can
> wait. Does that work?
>
> If there are concerns, we can make the addition of extra field of type
> Principal to Session and then deprecate the KafkaPrincipal later.
>
> I am fine either ways. What do you think?
>
> Thanks,
>
> Mayuresh
>
> On Tue, Feb 28, 2017 at 9:53 AM, Jun Rao  wrote:
>
> > Hi, Ismael,
> >
> > Good point on compatibility.
> >
> > Hi, Mayuresh,
> >
> > Given that, it seems that it's better to just add the raw principal as a
> > new field in Session for now and deprecate the KafkaPrincipal field in
> the
> > future if needed?
> >
> > Thanks,
> >
> > Jun
> >
> > On Mon, Feb 27, 2017 at 5:05 PM, Ismael Juma  wrote:
> >
> > > Breaking clients without a deprecation period is something we only do
> as
> > a
> > > last resort. Is there strong justification for doing it here?
> > >
> > > Ismael
> > >
> > > On Mon, Feb 27, 2017 at 11:28 PM, Mayuresh Gharat <
> > > gharatmayures...@gmail.com> wrote:
> > >
> > > > Hi Ismael,
> > > >
> > > > Yeah. I agree that it might break the clients if the user is using
> the
> > > > kafkaPrincipal directly. But since KafkaPrincipal is also a Java
> > > Principal
> > > > and I think, it would be a right thing to do replace the
> kafkaPrincipal
> > > > with Java Principal at this stage than later.
> > > >
> > > > We can mention in the KIP, that it would break the clients that are
> > using
> > > > the KafkaPrincipal directly and they will have to use the
> PrincipalType
> > > > directly, if they are using it as its only one value and use the name
> > > from
> > > > the Principal directly or create a KafkaPrincipal from Java Principal
> > as
> > > we
> > > > are doing in SimpleAclAuthorizer with this KIP.
> > > >
> > > > Thanks,
> > > >
> > > > Mayuresh
> > > >
> > > >
> > > >
> > > > On Mon, Feb 27, 2017 at 10:56 AM, Ismael Juma 
> > wrote:
> > > >
> > > > > Hi Mayuresh,
> > > > >
> > > > > Sorry for the delay. The updated KIP states that there is no
> > > > compatibility
> > > > > impact, but that doesn't seem right. The fact that we changed the
> > type
> > > of
> > > > > Session.principal to `Principal` means that any code that expects
> it
> > to
> > > > be
> > > > > `KafkaPrincipal` will break. Either because of declared types
> > (likely)
> > > or
> > > > > if it accesses `getPrincipalType` (unlikely since the value is
> always
> > > the
> > > > > same). It's a bit annoying, but we should add a new field to
> > `Session`
> > > > with
> > > > > the original principal. We can potentially deprecate the existing
> > one,
> > > if
> > > > > we're sure we don't need it (or we can leave it for now).
> > > > >
> > > > > Ismael
> > > > >
> > > > > On Mon, Feb 27, 2017 at 6:40 PM, Mayuresh Gharat <
> > > > > gharatmayures...@gmail.com
> > > > > > wrote:
> > > > >
> > > > > > Hi Ismael, Joel, Becket
> > > > > >
> > > > > > Would you mind taking a look at this. We require 2 more binding
> > votes
> > > > for
> > > > > > the KIP to pass.
> > > > > >
> > > > > > Thanks,
> > > > > >
> > > > > > Mayuresh
> > > > > >
> > > > > > On Thu, Feb 23, 2017 at 10:57 AM, Dong Lin 
> > > > wrote:
> > > > > >
> > > > > > > +1 (non-binding)
> > > > > > >
> > > > > > > On Wed, Feb 22, 2017 at 10:52 PM, Manikumar <
> > > > manikumar.re...@gmail.com
> > > > > >
> > > > > > > wrote:
> > > > > > >
> > > > > > > > +1 (non-binding)
> > > > > > > >
> > > > > > > > On Thu, Feb 23, 2017 at 3:27 AM, Mayuresh Gharat <
> > > > > > > > gharatmayures...@gmail.com
> > > > > > > > > wrote:
> > > > > > > >
> > > > > > > > > Hi Jun,
> > > > > > > > >
> > > > > > > > > Thanks a lot for the comments and reviews.
> > > > > > > > > I agree we should log the username.
> > > > > > > > > What I meant by creating KafkaPrincipal was, after this KIP
> > we
> > > > > would
> > > > > > > not
> > > > > > > > be
> > > > > > > > > required to create KafkaPrincipal and if we want to
> maintain
> > > the
> > > > > old
> > > > > > > > > logging, we will have to create it as we do today.
> > > > > > > > > I will take care that we specify the Principal name in the
> > log.
> > > > > > > > >
> > > > > > > > > Thanks again for all the reviews.
> > > > > > > > >
> > > > > > > > > Thanks,
> > > > > > > > >
> > > > > > > > > Mayuresh
> > > > > > > > >
> > > > > > > > > On Wed, Feb 22, 2017 at 1:45 PM, Jun Rao  >
> > > > wrote:
> > > > > > > > >
> > > > > > > > > > Hi, Mayuresh,
> > > > > > > > > >
> > > > > > > > > > For logging the user name, we could do either way. We
> just
> > > need
> > > > > to
> 

Re: [DISCUSS] KIP-126 - Allow KafkaProducer to batch based on uncompressed size

2017-02-27 Thread Joel Koshy
>
> Lets say we sent the batch over the wire and received a
> RecordTooLargeException, how do we split it as once we add the message to
> the batch we loose the message level granularity. We will have to
> decompress, do deep iteration and split and again compress. right? This
> looks like a performance bottle neck in case of multi topic producers like
> mirror maker.
>

Yes, but these should be outliers if we do estimation on a per-topic basis
and if we target a conservative-enough compression ratio. The producer
should also avoid sending over the wire if it can be made aware of the
max-message size limit on the broker, and split if it determines that a
record exceeds the broker's config. Ideally this should be part of topic
metadata but is not - so it could be off a periodic describe-configs

(which isn't available yet). This doesn't remove the need to split and
recompress though.


> On Mon, Feb 27, 2017 at 10:51 AM, Becket Qin  wrote:
>
> > Hey Mayuresh,
> >
> > 1) The batch would be split when an RecordTooLargeException is received.
> > 2) Not lower the actual compression ratio, but lower the estimated
> > compression ratio "according to" the Actual Compression Ratio(ACR).
> >
> > An example, let's start with Estimated Compression Ratio (ECR) = 1.0. Say
> > the compression ratio of ACR is ~0.8, instead of letting the ECR dropped
> to
> > 0.8 very quickly, we only drop 0.001 every time when ACR < ECR. However,
> > once we see an ACR > ECR, we increment ECR by 0.05. If a
> > RecordTooLargeException is received, we reset the ECR back to 1.0 and
> split
> > the batch.
> >
> > Thanks,
> >
> > Jiangjie (Becket) Qin
> >
> >
> >
> > On Mon, Feb 27, 2017 at 10:30 AM, Mayuresh Gharat <
> > gharatmayures...@gmail.com> wrote:
> >
> > > Hi Becket,
> > >
> > > Seems like an interesting idea.
> > > I had couple of questions :
> > > 1) How do we decide when the batch should be split?
> > > 2) What do you mean by slowly lowering the "actual" compression ratio?
> > > An example would really help here.
> > >
> > > Thanks,
> > >
> > > Mayuresh
> > >
> > > On Fri, Feb 24, 2017 at 3:17 PM, Becket Qin 
> > wrote:
> > >
> > > > Hi Jay,
> > > >
> > > > Yeah, I got your point.
> > > >
> > > > I think there might be a solution which do not require adding a new
> > > > configuration. We can start from a very conservative compression
> ratio
> > > say
> > > > 1.0 and lower it very slowly according to the actual compression
> ratio
> > > > until we hit a point that we have to split a batch. At that point, we
> > > > exponentially back off on the compression ratio. The idea is somewhat
> > > like
> > > > TCP. This should help avoid frequent split.
> > > >
> > > > The upper bound of the batch size is also a little awkward today
> > because
> > > we
> > > > say the batch size is based on compressed size, but users cannot set
> it
> > > to
> > > > the max message size because that will result in oversized messages.
> > With
> > > > this change we will be able to allow the users to set the message
> size
> > to
> > > > close to max message size.
> > > >
> > > > However the downside is that there could be latency spikes in the
> > system
> > > in
> > > > this case due to the splitting, especially when there are many
> messages
> > > > need to be split at the same time. That could potentially be an issue
> > for
> > > > some users.
> > > >
> > > > What do you think about this approach?
> > > >
> > > > Thanks,
> > > >
> > > > Jiangjie (Becket) Qin
> > > >
> > > >
> > > >
> > > > On Thu, Feb 23, 2017 at 1:31 PM, Jay Kreps  wrote:
> > > >
> > > > > Hey Becket,
> > > > >
> > > > > Yeah that makes sense.
> > > > >
> > > > > I agree that you'd really have to both fix the estimation (i.e.
> make
> > it
> > > > per
> > > > > topic or make it better estimate the high percentiles) AND have the
> > > > > recovery mechanism. If you are underestimating often and then
> paying
> > a
> > > > high
> > > > > recovery price that won't fly.
> > > > >
> > > > > I think you take my main point though, which is just that I hate to
> > > > exposes
> > > > > these super low level options to users because it is so hard to
> > explain
> > > > to
> > > > > people what it means and how they should set it. So if it is
> possible
> > > to
> > > > > make either some combination of better estimation and splitting or
> > > better
> > > > > tolerance of overage that would be preferrable.
> > > > >
> > > > > -Jay
> > > > >
> > > > > On Thu, Feb 23, 2017 at 11:51 AM, Becket Qin  >
> > > > wrote:
> > > > >
> > > > > > @Dong,
> > > > > >
> > > > > > Thanks for the comments. The default behavior of the producer
> won't
> > > > > change.
> > > > > > If the users want to use the uncompressed message 

Re: [VOTE] KIP-98: Exactly Once Delivery and Transactional Messaging

2017-02-24 Thread Joel Koshy
gt; > > > Or we keep as is (valuelen removed), and headers are
> > added
> > > > with
> > > > > > headers
> > > > > > > length..
> > > > > > >
> > > > > > > On 21/02/2017, 23:38, "Apurva Mehta" <
> > apu...@confluent.io>
> > > > > wrote:
> > > > > > >
> > > > > > > Right now, we don't need the value length: since it
> > is
> > > > the
> > > > > last
> > > > > > item
> > > > > > > in the
> > > > > > > message, and we have the message length, we can
> > deduce
> > > > the
> > > > > value
> > > > > > > length.
> > > > > > > However, if we are adding record headers to the
> end,
> > we
> > > > > would
> > > > > > need to
> > > > > > > introduce the value length along with that change.
> > > > > > >
> > > > > > > On Tue, Feb 21, 2017 at 3:34 PM, Michael Pearce <
> > > > > > michael.pea...@ig.com
> > > > > > > >
> > > > > > > wrote:
> > > > > > >
> > > > > > > > It seems I cannot add comment on the doc.
> > > > > > > >
> > > > > > > > In the section around the message protocol.
> > > > > > > >
> > > > > > > > It has stated:
> > > > > > > >
> > > > > > > > Message =>
> > > > > > > > Length => uintVar
> > > > > > > > Attributes => int8
> > > > > > > > TimestampDelta => intVar
> > > > > > > > OffsetDelta => uintVar
> > > > > > > > KeyLen => uintVar [OPTIONAL]
> > > > > > > > Key => data [OPTIONAL]
> > > > > > > > Value => data [OPTIONAL]
> > > > > > > >
> > > > > > > > Should it not be: (added missing value len)
> > > > > > > >
> > > > > > > > Message =>
> > > > > > > > Length => uintVar
> > > > > > > > Attributes => int8
> > > > > > > > TimestampDelta => intVar
> > > > > > > > OffsetDelta => uintVar
> > > > > > > > KeyLen => uintVar [OPTIONAL]
> > > > > > > > Key => data [OPTIONAL]
> > > > > > > > ValueLen => uintVar [OPTIONAL]
> > > > > > > > Value => data [OPTIONAL]
> > > > > > > >
> > > > > > > >
> > > > > > > >
> > > > > > > > On 21/02/2017, 23:07, "Joel Koshy" <
> > > > jjkosh...@gmail.com>
> > > > > > wrote:
> > > > > > > >
> > > > > > > > I left a couple of comments/questions
> directly
> > on
> > > > the
> > > > > > google-doc
> > > > > > > > <https://docs.google.com/document/d/11Jqy_
> > > > > > > > GjUGtdXJK94XGsEIK7CP1SnQGdp2eF0wSw9ra8>
> > > > > > > > - I found it much more tractable for a
> proposal
> > > of
> > > > > this
> > > > > > size to
> > > > > > > > discuss in
> > > > > > > > context within the doc. The permissions on
> the
> > > doc
> > > > > don't
> > > > > > let
> > > > > > > everyone
> > > > > > > > view
> > > > > > > > comments, so if there are any material
> changes
> > > that
> > > > > come
> > > > &

Re: [DISCUSS] KIP-125: ZookeeperConsumerConnector to KafkaConsumer Migration and Rollback

2017-02-23 Thread Joel Koshy
Regarding (2) - yes that's a good point. @Onur - I think the KIP should
explicitly call this out.
It is something that we did consider and decided against optimizing for.
i.e., we just wrote that off as a minor caveat of the upgrade path in that
there will be a few duplicates, but not too many given that we expect the
period of duplicate ownership to be minimal. Although it could be addressed
as you described, it does add complexity to an already-rather-complex
migration path. Given that it is a transition state (i.e., migration) we
felt it would be better and sufficient to keep it only as complex as it
needs to be.

On Mon, Feb 20, 2017 at 4:45 PM, Onur Karaman 
wrote:

> Regarding 1: We won't lose the offset from zookeeper upon partition
> transfer from OZKCC/MDZKCC to MEZKCC because MEZKCC has
> "dual.commit.enabled" set to true as well as "offsets.storage" set to
> kafka. The combination of these configs results in the consumer fetching
> offsets from both kafka and zookeeper and just picking the greater of the
> two.
>
> On Mon, Feb 20, 2017 at 4:33 PM, Dong Lin  wrote:
>
> > Hey Onur,
> >
> > Thanks for the well-written KIP! I have two questions below.
> >
> > 1) In the process of migrating from OZKCCs and MDZKCCs to MEZKCCs, we
> will
> > may a mix of OZKCCs, MDZKCCs and MEZKCCs. OZKCC and MDZKCC will only
> commit
> > to zookeeper and MDZKCC will use kafka-based offset storage. Would we
> lose
> > offset committed to zookeeper by a MDZKCC if a partition ownership if
> > transferred from a MDZKCC to a MEZKCC?
> >
> > 2) Suppose every process in the group is running MEZKCC. Each MEZKCC has
> a
> > zookeeper-based partition assignment and kafka-based partition
> assignment.
> > Is it guaranteed that these two assignments are exactly the same across
> > processes? If not, say the zookeeper-based assignment assigns p1, p2 to
> > process 1, and p3 to process 2. And kafka-based assignment assigns p1, p3
> > to process 1, and p2 to process 2. Say process 1 handles receives the
> > notification to switch to kafka-based notification before process 2, it
> is
> > possible that during a short period of time p3 will be consumed by both
> > processes?
> >
> > This period is probably short and I am not sure how many messages may be
> > duplicated as a result. But it seems possible to avoid this completely
> > according to an idea that Becket suggested in a previous discussion. The
> > znode /consumers//migration/mode can contain a sequence number
> > that increment for each switch. Say the znode is toggled to kafka with
> > sequence number 2, each MEZKCC will commit offset to with number 2 in the
> > metadata for partitions that it currently owns according to the zk-based
> > partition assignment, and then periodically fetches the committed offset
> > and the metadata for the partitions that it should own according to the
> > kafka-based partition assignment. Each MEZKCC only starts consumption
> when
> > the metadata has incremented to the number 2.
> >
> > Thanks,
> > Dong
> >
> >
> >
> >
> >
> >
> >
> >
> > On Mon, Feb 20, 2017 at 12:04 PM, Onur Karaman <
> > onurkaraman.apa...@gmail.com
> > > wrote:
> >
> > > Hey everyone.
> > >
> > > I made a KIP that provides a mechanism for migrating from
> > > ZookeeperConsumerConnector to KafkaConsumer as well as a mechanism for
> > > rolling back from KafkaConsumer to ZookeeperConsumerConnector:
> > > https://cwiki.apache.org/confluence/display/KAFKA/KIP-125%
> > > 3A+ZookeeperConsumerConnector+to+KafkaConsumer+Migration+and+Rollback
> > >
> > > Comments are welcome.
> > >
> > > - Onur
> > >
> >
>


Re: [VOTE] KIP-98: Exactly Once Delivery and Transactional Messaging

2017-02-21 Thread Joel Koshy
I left a couple of comments/questions directly on the google-doc

- I found it much more tractable for a proposal of this size to discuss in
context within the doc. The permissions on the doc don't let everyone view
comments, so if there are any material changes that come out of the
discussions in those comment threads we can summarize here.

Thanks,

Joel

On Mon, Feb 20, 2017 at 4:08 PM, Becket Qin  wrote:

> Thanks for the explanation, Guozhang. That makes sense.
>
> On Sun, Feb 19, 2017 at 7:28 PM, Guozhang Wang  wrote:
>
> > Thanks Becket.
> >
> > Actually sequence is associated with a message, not a message set. For
> > example if a message set sent by producer contains 100 messages, and the
> > first message's sequence is 5, then the last message's sequence number
> > would be 104, and the next message set's first sequence is expected to be
> > 105.
> >
> >
> > Guozhang
> >
> >
> > On Sun, Feb 19, 2017 at 4:48 PM, Becket Qin 
> wrote:
> >
> > > +1. Thanks for the great work on the KIP!
> > >
> > > I have only one minor question, in the wiki (and the doc) the new
> message
> > > set format has a "FirstSequence" field, should it just be "Sequence" if
> > the
> > > sequence is always associated with a message set?
> > >
> > > On Fri, Feb 17, 2017 at 3:28 AM, Michael Pearce  >
> > > wrote:
> > >
> > > > +0
> > > >
> > > > I think need some unified agreement on the VarInts.
> > > >
> > > > Would this also change in all other area’s of the protocol, e.g.
> value
> > > and
> > > > key length in message protocol, to keep this uniform across all
> > protocols
> > > > going forwards?
> > > >
> > > >
> > > >
> > > > On 17/02/2017, 00:23, "Apurva Mehta"  wrote:
> > > >
> > > > Hi Jun,
> > > >
> > > > Thanks for the reply. Comments inline.
> > > >
> > > > On Thu, Feb 16, 2017 at 2:29 PM, Jun Rao 
> wrote:
> > > >
> > > > > Hi, Apurva,
> > > > >
> > > > > Thanks for the reply. A couple of comment below.
> > > > >
> > > > > On Wed, Feb 15, 2017 at 9:45 PM, Apurva Mehta <
> > apu...@confluent.io
> > > >
> > > > wrote:
> > > > >
> > > > > > Hi Jun,
> > > > > >
> > > > > > Answers inline:
> > > > > >
> > > > > > 210. Pid snapshots: Is the number of pid snapshot
> configurable
> > or
> > > > > hardcoded
> > > > > > > with 2? When do we decide to roll a new snapshot? Based on
> > > time,
> > > > byte,
> > > > > or
> > > > > > > offset? Is that configurable too?
> > > > > > >
> > > > >
> > > >
> > > >
> > > > > When a replica becomes a follower, we do a bit log truncation.
> > > > Having an
> > > > > older snapshot allows us to recover the PID->sequence mapping
> > much
> > > > quicker
> > > > > than rescanning the whole log.
> > > >
> > > >
> > > > This is a good point. I have updated the doc with a more detailed
> > > > proposal.
> > > > Essentially, snapshots will be created on a periodic basis. A
> > > > reasonable
> > > > period would be every 30 or 60 seconds. We will keep at most 2
> > copies
> > > > of
> > > > the snapshot file. With this setup, we would have to replay at
> most
> > > 60
> > > > or
> > > > 120 seconds of the log in the event of log truncation during
> leader
> > > > failover.
> > > >
> > > > If we need to make any of this configurable, we can expose a
> config
> > > in
> > > > the
> > > > future. It would be easier to add a config we need than remove
> one
> > > with
> > > > marginal utility.
> > > >
> > > >
> > > > >
> > > > > > >
> > > > > > > 211. I am wondering if we should store ExpirationTime in
> the
> > > > producer
> > > > > > > transactionalId mapping message as we do in the producer
> > > > transaction
> > > > > > status
> > > > > > > message. If a producer only calls initTransactions(), but
> > never
> > > > > publishes
> > > > > > > any data, we still want to be able to expire and remove the
> > > > producer
> > > > > > > transactionalId mapping message.
> > > > > > >
> > > > > > >
> > > > > > Actually, the document was inaccurate. The transactionalId
> will
> > > be
> > > > > expired
> > > > > > only if there is no active transaction, and the age of the
> last
> > > > > transaction
> > > > > > with that transactionalId is older than the transactioanlId
> > > > expiration
> > > > > > time. With these semantics, storing the expiration time in
> the
> > > > > > transactionalId mapping message won't be useful, since the
> > > > expiration
> > > > > time
> > > > > > is a moving target based on transaction activity.
> > > > > >
> > > > > > I have updated the doc with a clarification.
> > > > > >
> > > > > >
> > > > > >
> > > > > Currently, the producer 

Re: [DISCUSS] KIP-115: Enforce offsets.topic.replication.factor

2017-01-25 Thread Joel Koshy
already voted, but one thing worth considering (since this KIP speaks of
*enforcement*) is desired behavior if the topic already exists and the
config != existing RF.

On Wed, Jan 25, 2017 at 4:30 PM, Dong Lin  wrote:

> +1
>
> On Wed, Jan 25, 2017 at 4:22 PM, Ismael Juma  wrote:
>
> > An important question is if this needs to wait for a major release or
> not.
> >
> > Ismael
> >
> > On Thu, Jan 26, 2017 at 12:19 AM, Ismael Juma  wrote:
> >
> > > +1 from me too.
> > >
> > > Ismael
> > >
> > > On Thu, Jan 26, 2017 at 12:07 AM, Ewen Cheslack-Postava <
> > e...@confluent.io
> > > > wrote:
> > >
> > >> +1
> > >>
> > >> Since this is an unusual one, I think it's worth pointing out that the
> > KIP
> > >> notes it is really a bug fix, but since it has compatibility
> > implications
> > >> the KIP was worth it. It was a sort of intentional bug, but confusing
> > and
> > >> dangerous.
> > >>
> > >> Seems important to fix this ASAP since people are hitting this in
> > practice
> > >> and would have to go out of their way to set up monitoring to catch
> the
> > >> issue.
> > >>
> > >> -Ewen
> > >>
> > >> On Wed, Jan 25, 2017 at 4:02 PM, Jason Gustafson 
> > >> wrote:
> > >>
> > >> > +1 from me. The current behavior seems both surprising and
> dangerous.
> > >> >
> > >> > -Jason
> > >> >
> > >> > On Wed, Jan 25, 2017 at 3:58 PM, Onur Karaman <
> > >> > onurkaraman.apa...@gmail.com>
> > >> > wrote:
> > >> >
> > >> > > Hey everyone.
> > >> > >
> > >> > > I made a bug-fix KIP-115 to enforce offsets.topic.replication.
> > factor:
> > >> > > https://cwiki.apache.org/confluence/display/KAFKA/KIP-
> > >> > > 115%3A+Enforce+offsets.topic.replication.factor
> > >> > >
> > >> > > Comments are welcome.
> > >> > >
> > >> > > - Onur
> > >> > >
> > >> >
> > >>
> > >
> > >
> >
>


Re: [VOTE] KIP-115: Enforce offsets.topic.replication.factor

2017-01-25 Thread Joel Koshy
+1


On Wed, Jan 25, 2017 at 4:41 PM, Jason Gustafson  wrote:

> +1
>
> On Wed, Jan 25, 2017 at 4:39 PM, Dong Lin  wrote:
>
> > +1
> >
> > On Wed, Jan 25, 2017 at 4:37 PM, Ismael Juma  wrote:
> >
> > > +1 (binding)
> > >
> > > Ismael
> > >
> > > On Thu, Jan 26, 2017 at 12:34 AM, Onur Karaman <
> > > onurkaraman.apa...@gmail.com
> > > > wrote:
> > >
> > > > I'd like to start the vote for KIP-115: Enforce
> > > > offsets.topic.replication.factor
> > > >
> > > > https://cwiki.apache.org/confluence/display/KAFKA/KIP-
> > > > 115%3A+Enforce+offsets.topic.replication.factor
> > > >
> > > > - Onur
> > > >
> > >
> >
>


Stream processing meetup at LinkedIn (Sunnyvale) on Thursday, February 16 at 6pm

2017-01-24 Thread Joel Koshy
Hi everyone,

We would like to invite you to a Stream Processing Meetup at LinkedIn’s
Sunnyvale campus on Thursday, February 16 at 6pm.

Please RSVP here (*only if you intend to attend in person*):
https://www.meetup.com/Stream-Processing-Meetup-LinkedIn/events/237171557/


We have the following agenda scheduled:

   -

   6PM: Doors open
   -

   6-6:35PM: Networking & Welcome
   -

   6:35-7:10 PM: Asynchronous Processing and Multithreading in Apache
Samza (Xinyu
   Liu, LinkedIn)
   -

   7:15-7:50PM: SSD Benchmarks for Apache Kafka (Mingmin Chen, Uber)
   -

   7:55-8:30 PM: Batching to Streaming Analytics at Optimizely (Vignesh
   Sukumar, Optimizely)

Hope to see you there!

Joel


Re: [VOTE] KIP-107 Add purgeDataBefore() API in AdminClient

2017-01-12 Thread Joel Koshy
+1

(for the record, I favor the rejected alternative of not awaiting low
watermarks to go past the purge offset. I realize it offers a weaker
guarantee but it is still very useful, easier to implement, slightly
simpler API (no need to return a future) and you can still get access to
the current low watermark via a fetch request; although it would be weird
to include the low watermark on the purge response in this variation)

On Wed, Jan 11, 2017 at 1:01 PM, Dong Lin  wrote:

> Hi all,
>
> It seems that there is no further concern with the KIP-107. At this point
> we would like to start the voting process. The KIP can be found at
> https://cwiki.apache.org/confluence/display/KAFKA/KIP-107
> %3A+Add+purgeDataBefore%28%29+API+in+AdminClient.
>
> Thanks,
> Dong
>


Re: [ANNOUNCE] New committer: Grant Henke

2017-01-12 Thread Joel Koshy
Hey Grant - congrats!

On Thu, Jan 12, 2017 at 10:00 AM, Neha Narkhede  wrote:

> Congratulations, Grant. Well deserved!
>
> On Thu, Jan 12, 2017 at 7:51 AM Grant Henke  wrote:
>
> > Thanks everyone!
> >
> > On Thu, Jan 12, 2017 at 2:58 AM, Damian Guy 
> wrote:
> >
> > > Congratulations!
> > >
> > > On Thu, 12 Jan 2017 at 03:35 Jun Rao  wrote:
> > >
> > > > Grant,
> > > >
> > > > Thanks for all your contribution! Congratulations!
> > > >
> > > > Jun
> > > >
> > > > On Wed, Jan 11, 2017 at 2:51 PM, Gwen Shapira 
> > wrote:
> > > >
> > > > > The PMC for Apache Kafka has invited Grant Henke to join as a
> > > > > committer and we are pleased to announce that he has accepted!
> > > > >
> > > > > Grant contributed 88 patches, 90 code reviews, countless great
> > > > > comments on discussions, a much-needed cleanup to our protocol and
> > the
> > > > > on-going and critical work on the Admin protocol. Throughout this,
> he
> > > > > displayed great technical judgment, high-quality work and
> willingness
> > > > > to contribute where needed to make Apache Kafka awesome.
> > > > >
> > > > > Thank you for your contributions, Grant :)
> > > > >
> > > > > --
> > > > > Gwen Shapira
> > > > > Product Manager | Confluent
> > > > > 650.450.2760 <(650)%20450-2760> <(650)%20450-2760> | @gwenshap
> > > > > Follow us: Twitter | blog
> > > > >
> > > >
> > >
> >
> >
> >
> > --
> > Grant Henke
> > Software Engineer | Cloudera
> > gr...@cloudera.com | twitter.com/gchenke | linkedin.com/in/granthenke
> >
> --
> Thanks,
> Neha
>


Re: [DISCUSS] KIP-109: Old Consumer Deprecation

2017-01-10 Thread Joel Koshy
On Tue, Jan 10, 2017 at 12:53 PM, Renu Tewari <tewa...@gmail.com> wrote:

> Hi Ismael,
> What are the expected timelines we are talking about between the major
> releases? At LI we are expecting to have atleast 1 year between the old
> consumer deprecation and removal so we have enough time to upgrade all
> applications. The rollout to new consumer has hit many hurdles so hasn't
> proceeded at the expected pace. What impact would an official deprecation
> have on applications?  Any warnings would be disruptive and would prefer
> that happens when there is a migration plan in place so we have a bound on
> how long it will take. There are too many unknowns at this time.
>
> Thoughts on timelines?
>
> regards
> Renu
>
> On Mon, Jan 9, 2017 at 6:34 PM, Ismael Juma <ism...@juma.me.uk> wrote:
>
> > Hi Joel,
> >
> > Great to hear that LinkedIn is likely to implement KAFKA-4513. :)
> >
> > Yes, the KIP as it stands is a compromise given the situation. We could
> > push the deprecation to the subsequent release: likely to be 0.11.0.0
> since
> > there are a number of KIPs that require message format changes. This
> would
> > mean that the old consumers would not be removed before 0.12.0.0 (the
> major
> > release after 0.11.0.0). Would that work better for you all?
>

It helps, but the main concern is deprecating before implementing the
migration path. So this means merging in the deprecation PR right after
cutting 0.10.2 is also going to be problematic since we release off trunk.
So we can prioritize working on KAFKA-4513.

@Ewen: good question on message format changes - I agree with Ismael that
for features such as a new compression scheme we can do without a format
change. I don't think we have any formal guidance on the scenarios that you
highlighted at this point so it may help to have a discussion on a separate
thread and codify that in our docs under a new "Kafka message and protocol
versioning" section.

Thanks,

Joel


> >
> > On Tue, Jan 10, 2017 at 12:52 AM, Joel Koshy <jjkosh...@gmail.com>
> wrote:
> >
> > > >
> > > >
> > > > The ideal scenario would be for us to provide a tool for no downtime
> > > > migration as discussed in the original thread (I filed
> > > > https://issues.apache.org/jira/browse/KAFKA-4513 in response to that
> > > > discussion). There are a few issues, however:
> > > >
> > > >- There doesn't seem to be much demand for it (outside of
> LinkedIn,
> > at
> > > >least)
> > > >- No-one is working on it or has indicated that they are planning
> to
> > > >work on it
> > > >- It's a non-trivial change and it requires a good amount of
> testing
> > > to
> > > >ensure it works as expected
> > > >
> > >
> > > For LinkedIn: while there are a few consuming applications for which
> the
> > > current shut-down and restart approach to migration will suffice, I
> doubt
> > > we will be able to do this for majority of services that are outside
> our
> > > direct control. Given that a seamless migration is a pre-req for us to
> > > switch to the new consumer widely (there are a few use-cases already on
> > it)
> > > it is something that we (LinkedIn) will likely implement although we
> > > haven't done further brainstorming/improvements beyond what was
> proposed
> > in
> > > the other deprecation thread.
> > >
> > >
> > > > In the meantime, we have this suboptimal situation where the old
> > > consumers
> > > > are close to unmaintained even though we don't say it outright: they
> > > don't
> > >
> > > get new features (basic things like security are missing) and bug fixes
> > are
> > > > rare. In practice, the old clients have been deprecated a while back,
> > we
> > > >
> > >
> > > Agreed that it is suboptimal, but the reality is that LI and I think a
> > few
> > > other companies are still to a large extent on the old consumer and
> will
> > be
> > > for at least a good part of this year. This does mean that we have the
> > > overhead of maintaining some internal workarounds for the old consumer.
> > >
> > >
> > > > just haven't made it official. This proposal is about rectifying that
> > so
> > > > that we communicate our intentions to our users more clearly. As
> Vahid
> > > > said, this KIP is not about changing how we maintain the existing
> code.
> > > >
>

Re: [DISCUSS] KIP-109: Old Consumer Deprecation

2017-01-09 Thread Joel Koshy
>
>
> The ideal scenario would be for us to provide a tool for no downtime
> migration as discussed in the original thread (I filed
> https://issues.apache.org/jira/browse/KAFKA-4513 in response to that
> discussion). There are a few issues, however:
>
>- There doesn't seem to be much demand for it (outside of LinkedIn, at
>least)
>- No-one is working on it or has indicated that they are planning to
>work on it
>- It's a non-trivial change and it requires a good amount of testing to
>ensure it works as expected
>

For LinkedIn: while there are a few consuming applications for which the
current shut-down and restart approach to migration will suffice, I doubt
we will be able to do this for majority of services that are outside our
direct control. Given that a seamless migration is a pre-req for us to
switch to the new consumer widely (there are a few use-cases already on it)
it is something that we (LinkedIn) will likely implement although we
haven't done further brainstorming/improvements beyond what was proposed in
the other deprecation thread.


> In the meantime, we have this suboptimal situation where the old consumers
> are close to unmaintained even though we don't say it outright: they don't

get new features (basic things like security are missing) and bug fixes are
> rare. In practice, the old clients have been deprecated a while back, we
>

Agreed that it is suboptimal, but the reality is that LI and I think a few
other companies are still to a large extent on the old consumer and will be
for at least a good part of this year. This does mean that we have the
overhead of maintaining some internal workarounds for the old consumer.


> just haven't made it official. This proposal is about rectifying that so
> that we communicate our intentions to our users more clearly. As Vahid
> said, this KIP is not about changing how we maintain the existing code.
>
> The KIP that proposes the removal of all the old clients will be more
> interesting, but it doesn't exist yet. :)
>

Deprecating *after* providing a sound migration path still seems to be the
right thing to do but if there isn't any demand for it then maybe that's a
reasonable compromise. I'm still surprised that more users aren't as
impacted by this and as mentioned earlier, it could be an issue of
awareness but I'm not sure that deprecating before a migration path is in
place would be considered best-practice in raising awareness.

Thanks,

Joel



>
> Ismael
>
> On Fri, Jan 6, 2017 at 3:27 AM, Vahid S Hashemian <
> vahidhashem...@us.ibm.com
> > wrote:
>
> > One thing that probably needs some clarification is what is implied by
> > "deprecated" in the Kafka project.
> > I googled it a bit and it doesn't seem that deprecation conventionally
> > implies termination of support (or anything that could negatively impact
> > existing users). That's my interpretation too.
> > It would be good to know if Kafka follows a different interpretation of
> > the term.
> >
> > If my understanding of the term is correct, since we are not yet
> targeting
> > a certain major release in which the old consumer will be removed, I
> don't
> > see any harm in marking it as deprecated.
> > There will be enough time to plan and implement the migration, if the
> > community decides that's the way to go, before phasing it out.
> >
> > At the minimum new Kafka users will pick the Java consumer without any
> > confusion. And existing users will know that Kafka is preparing for the
> > old consumer's retirement.
> >
> > --Vahid
> >
> >
> >
> >
> > From:   Joel Koshy <jjkosh...@gmail.com>
> > To: "dev@kafka.apache.org" <dev@kafka.apache.org>
> > Date:   01/05/2017 06:55 PM
> > Subject:Re: [DISCUSS] KIP-109: Old Consumer Deprecation
> >
> >
> >
> > While I realize this only marks the old consumer as deprecated and not a
> > complete removal, I agree that it is somewhat premature to do this prior
> > to
> > having a migration process implemented. Onur has described this in detail
> > in the earlier thread: http://markmail.org/message/ekv352zy7xttco5s and
> > I'm
> > surprised that more companies aren't affected by (or aware of?) the
> issue.
> >
> > On Thu, Jan 5, 2017 at 4:40 PM, radai <radai.rosenbl...@gmail.com>
> wrote:
> >
> > > I cant speak for anyone else, but a rolling upgrade is definitely how
> we
> > > (LinkedIn) will do the migration.
> > >
> > > On Thu, Jan 5, 2017 at 4:28 PM, Gwen Shapira <g...@confluent.io>
> wrote:
> > >
> > > > it sounds good to have
> > > > it, but that's probably not how people will end up migrati
> > > >
> > >
> >
> >
> >
> >
> >
>


Re: [VOTE] KIP-105: Addition of Record Level for Sensors

2017-01-09 Thread Joel Koshy
+1
(although I added a minor comment on the discussion thread)

On Mon, Jan 9, 2017 at 3:43 AM, Michael Noll  wrote:

> +1 (non-binding)
>
> On Fri, Jan 6, 2017 at 6:12 PM, Matthias J. Sax 
> wrote:
>
> > +1
> >
> > On 1/6/17 9:09 AM, Neha Narkhede wrote:
> > > +1
> > >
> > > On Fri, Jan 6, 2017 at 9:04 AM Sriram Subramanian 
> > wrote:
> > >
> > >> +1
> > >>
> > >> On Fri, Jan 6, 2017 at 8:40 AM, Bill Bejeck 
> wrote:
> > >>
> > >>> +1
> > >>>
> > >>> On Fri, Jan 6, 2017 at 11:06 AM, Guozhang Wang 
> > >> wrote:
> > >>>
> >  +1
> > 
> >  On Fri, Jan 6, 2017 at 2:55 AM, Damian Guy 
> > >> wrote:
> > 
> > > +1
> > >
> > > On Fri, 6 Jan 2017 at 10:48 Ismael Juma  wrote:
> > >
> > >> Thanks for the KIP, +1 (binding).
> > >>
> > >> Ismael
> > >>
> > >> On Fri, Jan 6, 2017 at 10:37 AM, Eno Thereska <
> > >>> eno.there...@gmail.com>
> > >> wrote:
> > >>
> > >>> The discussion points for KIP-105 are addressed. At this point
> > >> we'd
> > > like
> > >>> to start the vote for it:
> > >>>
> > >>> https://cwiki.apache.org/confluence/display/KAFKA/KIP-
> > >>> 105%3A+Addition+of+Record+Level+for+Sensors <
> >  https://cwiki.apache.org/
> > >>> confluence/display/KAFKA/KIP-105:+Addition+of+Record+Level+
> > > for+Sensors>
> > >>>
> > >>> Thanks
> > >>> Eno and Aarti
> > >>
> > >
> > 
> > 
> > 
> >  --
> >  -- Guozhang
> > 
> > >>>
> > >>
> >
> >
>


Re: [DISCUSS] KIP-105: Addition of Record Level for Sensors

2017-01-09 Thread Joel Koshy
Didn't get a chance to review this earlier, but this LGTM. Minor comments:
- The current name is fine, but I would have preferred calling it
RecordingLevel to RecordLevel - first thing that comes to my mind with
RecordLevel is Kafka records.
- Under future work: it may be useful to allow specifying different
recording levels for different hierarchies of sensors (similar to logging
levels for different loggers)

Thanks,

Joel

On Fri, Jan 6, 2017 at 2:27 AM, Ismael Juma  wrote:

> Thanks Eno, sounds good to me. This is indeed what I was suggesting for the
> initial release. Extending the `JmxReporter` to use the additional
> information is something that can be done later, as you said.
>
> Ismael
>
> On Fri, Jan 6, 2017 at 10:24 AM, Eno Thereska 
> wrote:
>
> > To clarify an earlier point Ismael made, MetricReporter implementations
> > have access to the record level via KafkaMetric.config().recordLevel()
> > and can also retrieve the active config for the record level via the
> > configure() method. However, the built-in JmxReporter will not use that
> > information in the initial release. I've updated the KIP to reflect that.
> >
> > Thanks
> > Eno
> >
> > > On 6 Jan 2017, at 09:47, Eno Thereska  wrote:
> > >
> > > After considering the changes needed for not registering sensors at
> all,
> > coupled with the objective that Jay mentioned to leave open the
> possibility
> > of changing the recording level at runtime, we decided that the current
> way
> > we have approached the solution is the best way to go. The KIP focuses on
> > the main problem we have, which is the overhead of computing metrics. It
> > allows for a subsequent JIRA to have a way to change the levels at
> runtime
> > via JMX. It also allows for a subsequent JIRA to provide more tags to the
> > metrics reporter as Ismael had suggested (e.g., "debug", "info").
> > >
> > > I've adjusted the KIP to reflect the above.
> > >
> > > Thanks
> > > Eno
> > >
> > >> On 5 Jan 2017, at 22:14, Eno Thereska   > eno.there...@gmail.com>> wrote:
> > >>
> > >> Thanks Jay, will fix the motivation. We have a microbenchmark and perf
> > graph in the PR:
> > >> https://github.com/apache/kafka/pull/1446#issuecomment-268106260 <
> > https://github.com/apache/kafka/pull/1446#issuecomment-268106260>
> > >>
> > >> I'll need to think some more about point 3.
> > >>
> > >> Thanks
> > >> Eno
> > >>
> > >>> On 5 Jan 2017, at 19:18, Jay Kreps  j...@confluent.io>> wrote:
> > >>>
> > >>> This is great! A couple of quick comments:
> > >>>
> > >>>   1. It'd be good to make the motivation a bit more clear. I think
> the
> > >>>   motivation is "We want to have lots of partition/task/etc metrics
> > but we're
> > >>>   concerned about the performance impact so we want to disable them
> by
> > >>>   default." Currently the motivation section is more about the
> proposed
> > >>>   change and doesn't really make clear why.
> > >>>   2. Do we have a microbenchmark that shows that the performance of
> (1)
> > >>>   enabled metrics, (2) disabled metrics, (3) no metrics? This would
> > help
> > >>>   build the case for needing this extra knob. Obviously if metrics
> are
> > cheap
> > >>>   you would always just leave them enabled and not worry about it. I
> > think
> > >>>   there should be some cost because we are at least taking a lock
> > during the
> > >>>   recording but I'm not sure how material that is.
> > >>>   3. One consideration in how this exposed: we always found the
> > ability to
> > >>>   dynamically change the logging level in JMX for log4j pretty
> useful.
> > I
> > >>>   think if we want to leave the door open to add this ability to
> enable
> > >>>   metrics at runtime it may have some impact on the decision around
> how
> > >>>   metrics are registered/reported.
> > >>>
> > >>> -Jay
> > >>>
> > >>> On Thu, Jan 5, 2017 at 9:59 AM, Guozhang Wang  > > wrote:
> > >>>
> >  I thought about "not registering at all" and left a comment on the
> PR
> > as
> >  well regarding this idea. My concern is that it may be not very
> >  straight-forward to implement though via the MetricsReporter
> > interface, if
> >  Eno and Aarti has a good approach to tackle it I would love it.
> > 
> > 
> >  Guozhang
> > 
> >  On Thu, Jan 5, 2017 at 5:34 AM, Eno Thereska <
> eno.there...@gmail.com
> > >
> >  wrote:
> > 
> > > Updated KIP for 1. Waiting to hear from Guozhang on 2 and then we
> can
> > > proceed.
> > >
> > > Thanks
> > > Eno
> > >> On 5 Jan 2017, at 12:27, Ismael Juma  ism...@juma.me.uk>> wrote:
> > >>
> > >> Thanks Eno. It would be good to update the KIP as well with
> regards
> > to
> >  1.
> > >>
> > >> About 2, I am not sure how adding a field could break existing
> > tools.
> 

Re: [VOTE] KIP-103: Separation of Internal and External traffic

2017-01-09 Thread Joel Koshy
+1

On Fri, Jan 6, 2017 at 4:53 PM, Ismael Juma  wrote:

> Hi all,
>
> Since a few people (including myself) felt that listener name was clearer
> than protocol label, I updated the KIP to use that (as mentioned in the
> discuss thread). Given that this is a minor change, I don't think we need
> to restart the vote. If anyone objects to this change, please let me know.
>
> Thanks,
> Ismael
>
> On Fri, Jan 6, 2017 at 6:58 PM, Colin McCabe  wrote:
>
> > Looks good.  +1 (non-binding).
> >
> > What do you think about changing "protocol label" to "listener key"?
> >
> > best,
> > Colin
> >
> >
> > On Fri, Jan 6, 2017, at 09:23, Neha Narkhede wrote:
> > > +1
> > >
> > > On Fri, Jan 6, 2017 at 9:21 AM Jun Rao  wrote:
> > >
> > > > Hi, Ismael,
> > > >
> > > > Thanks for the KIP. +1
> > > >
> > > > Jun
> > > >
> > > > On Fri, Jan 6, 2017 at 2:51 AM, Ismael Juma 
> wrote:
> > > >
> > > > > Hi all,
> > > > >
> > > > > As the discussion seems to have settled down, I would like to
> > initiate
> > > > the
> > > > > voting process for KIP-103: Separation of Internal and External
> > traffic:
> > > > >
> > > > > https://cwiki.apache.org/confluence/display/KAFKA/KIP-
> > > > > 103%3A+Separation+of+Internal+and+External+traffic
> > > > >
> > > > > The vote will run for a minimum of 72 hours.
> > > > >
> > > > > Thanks,
> > > > > Ismael
> > > > >
> > > >
> > > --
> > > Thanks,
> > > Neha
> >
>


Re: [DISCUSS] KIP-109: Old Consumer Deprecation

2017-01-05 Thread Joel Koshy
While I realize this only marks the old consumer as deprecated and not a
complete removal, I agree that it is somewhat premature to do this prior to
having a migration process implemented. Onur has described this in detail
in the earlier thread: http://markmail.org/message/ekv352zy7xttco5s and I'm
surprised that more companies aren't affected by (or aware of?) the issue.

On Thu, Jan 5, 2017 at 4:40 PM, radai  wrote:

> I cant speak for anyone else, but a rolling upgrade is definitely how we
> (LinkedIn) will do the migration.
>
> On Thu, Jan 5, 2017 at 4:28 PM, Gwen Shapira  wrote:
>
> > it sounds good to have
> > it, but that's probably not how people will end up migrati
> >
>


Re: [VOTE] Vote for KIP-101 - Leader Epochs

2017-01-05 Thread Joel Koshy
(adding the dev list back - as it seems to have gotten dropped earlier in
this thread)

On Thu, Jan 5, 2017 at 6:36 PM, Joel Koshy <jjkosh...@gmail.com> wrote:

> +1
>
> This is a very well-written KIP!
> Minor: there is still a mix of terms in the doc that references the
> earlier LeaderGenerationRequest (which is what I'm assuming what it was
> called in previous versions of the wiki). Same for the diagrams which I'm
> guessing are a little harder to make consistent with the text.
>
>
>
> On Thu, Jan 5, 2017 at 5:54 PM, Jun Rao <j...@confluent.io> wrote:
>
>> Hi, Ben,
>>
>> Thanks for the updated KIP. +1
>>
>> 1) In OffsetForLeaderEpochResponse, start_offset probably should be
>> end_offset since it's the end offset of that epoch.
>> 3) That's fine. We can fix KAFKA-1120 separately.
>>
>> Jun
>>
>>
>> On Thu, Jan 5, 2017 at 11:11 AM, Ben Stopford <b...@confluent.io> wrote:
>>
>> > Hi Jun
>> >
>> > Thanks for raising these points. Thorough as ever!
>> >
>> > 1) Changes made as requested.
>> > 2) Done.
>> > 3) My plan for handing returning leaders is to simply to force the
>> Leader
>> > Epoch to increment if a leader returns. I don't plan to fix KAFKA-1120
>> as
>> > part of this KIP. It is really a separate issue with wider implications.
>> > I'd be happy to add KAFKA-1120 into the release though if we have time.
>> > 4) Agreed. Not sure exactly how that's going to play out, but I think
>> we're
>> > on the same page.
>> >
>> > Please could
>> >
>> > Cheers
>> > B
>> >
>> > On Thu, Jan 5, 2017 at 12:50 AM Jun Rao <j...@confluent.io> wrote:
>> >
>> > > Hi, Ben,
>> > >
>> > > Thanks for the proposal. Looks good overall. A few comments below.
>> > >
>> > > 1. For LeaderEpochRequest, we need to include topic right? We probably
>> > want
>> > > to follow other requests by nesting partition inside topic? For
>> > > LeaderEpochResponse,
>> > > do we need to return leader_epoch? I was thinking that we could just
>> > return
>> > > an end_offset, which is the next offset of the last message in the
>> > > requested leader generation. Finally, would
>> OffsetForLeaderEpochRequest
>> > be
>> > > a better name?
>> > >
>> > > 2. We should bump up both the produce request and the fetch request
>> > > protocol version since both include the message set.
>> > >
>> > > 3. Extending LeaderEpoch to include Returning Leaders: To support
>> this,
>> > do
>> > > you plan to use the approach that stores  CZXID in the broker
>> > registration
>> > > and including the CZXID of the leader in /brokers/topics/[topic]/
>> > > partitions/[partitionId]/state in ZK?
>> > >
>> > > 4. Since there are a few other KIPs involving message format too, it
>> > would
>> > > be useful to consider if we could combine the message format changes
>> in
>> > the
>> > > same release.
>> > >
>> > > Thanks,
>> > >
>> > > Jun
>> > >
>> > >
>> > > On Wed, Jan 4, 2017 at 9:24 AM, Ben Stopford <b...@confluent.io>
>> wrote:
>> > >
>> > > > Hi All
>> > > >
>> > > > We’re having some problems with this thread being subsumed by the
>> > > > [Discuss] thread. Hopefully this one will appear distinct. If you
>> see
>> > > more
>> > > > than one, please use this one.
>> > > >
>> > > > KIP-101 should now be ready for a vote. As a reminder the KIP
>> proposes
>> > a
>> > > > change to the replication protocol to remove the potential for
>> replicas
>> > > to
>> > > > diverge.
>> > > >
>> > > > The KIP can be found here:  https://cwiki.apache.org/confl
>> > > > uence/display/KAFKA/KIP-101+-+Alter+Replication+Protocol+to+
>> > > > use+Leader+Epoch+rather+than+High+Watermark+for+Truncation <
>> > > > https://cwiki.apache.org/confluence/display/KAFKA/KIP-101+-
>> > > > +Alter+Replication+Protocol+to+use+Leader+Epoch+rather+
>> > > > than+High+Watermark+for+Truncation>
>> > > >
>> > > > Please let us know your vote.
>> > > >
>> > > > B
>> > > >
>> > > >
>> > > >
>> > > >
>> > > >
>> > >
>> >
>>
>
>


Re: [VOTE] KIP-72 - Allow putting a bound on memory consumed by Incoming requests

2017-01-03 Thread Joel Koshy
+1 on the KIP.
I'll comment more on the formal PR. Also, can you also link a jira for this
from the KIP?

Thanks,

Joel

On Tue, Jan 3, 2017 at 11:14 AM, radai  wrote:

> I've just re-validated the functionality works - broker throttles under
> stress instead of OOMs.
>
> at this point my branch (
> https://github.com/radai-rosenblatt/kafka/tree/broker-memory
> -pool-with-muting)
> is "code complete" and somewhat tested and im waiting on the voting process
> to come to a conclusion before moving forward.
>
> On Fri, Dec 16, 2016 at 4:46 PM, radai  wrote:
>
> > I've added the 3 new metrics/sensors i've implemented to the KIP.
> >
> > at this point I would need to re-validate the functionality (which i
> > expect to do early january).
> >
> > code reviews welcome ;-)
> >
> > On Mon, Nov 28, 2016 at 10:37 AM, radai 
> > wrote:
> >
> >> will do (only added a single one so far, the rest TBD)
> >>
> >> On Mon, Nov 28, 2016 at 10:04 AM, Jun Rao  wrote:
> >>
> >>> Hi, Radai,
> >>>
> >>> Could you add a high level description of the newly added metrics to
> the
> >>> KIP wiki?
> >>>
> >>> Thanks,
> >>>
> >>> Jun
> >>>
> >>> On Wed, Nov 23, 2016 at 3:45 PM, radai 
> >>> wrote:
> >>>
> >>> > Hi Jun,
> >>> >
> >>> > I've added the sensor you requested (or at least I think I did )
> >>> >
> >>> > On Fri, Nov 18, 2016 at 12:37 PM, Jun Rao  wrote:
> >>> >
> >>> > > KafkaRequestHandlerPool
> >>> >
> >>>
> >>
> >>
> >
>


Re: [DISCUSS] KIP-106 - Change Default unclean.leader.election.enabled from True to False

2017-01-03 Thread Joel Koshy
+1

On Tue, Jan 3, 2017 at 10:54 AM, Ben Stopford  wrote:

> Hi All
>
> Please find the below KIP which proposes changing the setting
> unclean.leader.election.enabled from true to false. The motivation for
> this change is that it catches out new Kafka users who don’t realise the
> default favours availability over data loss.
>
> This would mean clusters wishing to continue with unclean leader election
> enabled would need to add the appropriate configuration on upgrade.
>
> Please let me know if you foresee any issue with this change, agree or
> don’t agree.
>
> https://cwiki.apache.org/confluence/display/KAFKA/%5BWIP%5D+
> KIP-106+-+Change+Default+unclean.leader.election.
> enabled+from+True+to+False  luence/display/KAFKA/[WIP]+KIP-106+-+Change+Default+uncle
> an.leader.election.enabled+from+True+to+False
> 
> >
>
> Thanks
>
> B
>
> Ben Stopford
> Confluent, http://www.confluent.io 
>
>
>
>


Re: [VOTE] KIP-92 - Add per partition lag metrics to KafkaConsumer

2016-12-21 Thread Joel Koshy
+1

On Wed, Dec 21, 2016 at 10:26 AM, radai  wrote:

> +1
>
> On Wed, Dec 21, 2016 at 9:51 AM, Dong Lin  wrote:
>
> > +1 (non-binding)
> >
> > On Thu, Dec 15, 2016 at 5:32 PM, Becket Qin 
> wrote:
> >
> > > Hi,
> > >
> > > I want to start a voting thread on KIP-92 which proposes to add per
> > > partition lag metrics to KafkaConsumer. The KIP wiki page is below:
> > >
> > > https://cwiki.apache.org/confluence/display/KAFKA/KIP-
> > > 92+-+Add+per+partition+lag+metrics+to+KafkaConsumer
> > >
> > > Thanks,
> > >
> > > Jiangjie (Becket) Qin
> > >
> >
>


Re: [DISCUSS] KIP-98: Exactly Once Delivery and Transactional Messaging

2016-12-21 Thread Joel Koshy
>
>
> @Joel,
>
> I read over your wiki, and apart from the introduction of the notion of
> journal partitions --whose pros and cons are already being discussed-- you
> also introduce the notion of a 'producer group' which enables multiple
> producers to participate in a single transaction. This is completely
> opposite of the model in the KIP where a transaction is defined by a
> producer id, and hence there is a 1-1 mapping between producers and
> transactions. Further, each producer can have exactly one in-flight
> transaction at a time in the KIP.
>

Hi Apurva - yes I did notice those differences among other things :) BTW, I
haven't yet gone through the google-doc carefully but on a skim it does not
seem to contain any rejected alternatives as the wiki states.


Re: [DISCUSS] KIP-92 - Add per partition lag metrics to KafkaConsumer

2016-12-21 Thread Joel Koshy
LGTM. However, can you comment on the effect of releasing ownership of
partitions after a rebalance? For e.g., should it reset itself to (say) -1?
or removed? This really applies to any per-partition metrics that we intend
to maintain in the consumer.

On Mon, Nov 14, 2016 at 9:35 AM, Becket Qin  wrote:

> Hey Michael,
>
> Thanks for the comments. Exposing the lag on the client side may serve some
> cases a little different from monitoring. For example, one of the use case
> we have was that an application has some high priority and low priority
> topics to consume. They want to switch between consuming from high priority
> and low priority topics alternately based on the lag on the partition. i.e.
> if the consume has already caught up with the log end offset of a high
> priority topic, the application will switch to consume from the low
> priority topics. Otherwise it will continue consuming from the high
> priority topics. We have seen a few other similar use cases that require a
> programmatic access to the lag. Although people can always use
> offsetsForTimes() to get the LEO, but it is more expensive call involving
> an RPC and is a blocking call.
>
> Thanks,
>
> Jiangjie (Becket) Qin
>
>
>
> On Mon, Nov 14, 2016 at 9:18 AM, Michael Pearce 
> wrote:
>
> > Should state I have no objections adding this client side, just more a
> > question to why we don't look and propose to add this broker side also.
> >
> > Sent using OWA for iPhone
> > 
> > From: Michael Pearce 
> > Sent: Monday, November 14, 2016 4:58:45 PM
> > To: dev@kafka.apache.org
> > Subject: Re: [DISCUSS] KIP-92 - Add per partition lag metrics to
> > KafkaConsumer
> >
> > Why do we not look to expose the lag broker side centrally?
> >
> > Eg like burrow.
> >
> > From an operations point it's a lot easier to monitor lag centrally than
> > per application. Also then you'd be able to see lag of consumers not
> alive
> > or stalled.
> >
> > The information if the consumer uses Kafka based or zookeeper offsets is
> > available to the broker.
> > 
> > From: Becket Qin 
> > Sent: Sunday, November 13, 2016 4:13:01 AM
> > To: dev@kafka.apache.org
> > Subject: [DISCUSS] KIP-92 - Add per partition lag metrics to
> KafkaConsumer
> >
> > Hi,
> >
> > We created KIP-92 to propose adding per partition lag metrics to
> > KafkaConsumer.
> >
> > The KIP wiki link is the following:
> > https://cwiki.apache.org/confluence/display/KAFKA/KIP-
> > 92+-+Add+per+partition+lag+metrics+to+KafkaConsumer
> >
> > Comments are welcome.
> >
> > Thanks,
> >
> > Jiangjie (Becket) Qin
> > The information contained in this email is strictly confidential and for
> > the use of the addressee only, unless otherwise indicated. If you are not
> > the intended recipient, please do not read, copy, use or disclose to
> others
> > this message or any attachment. Please also notify the sender by replying
> > to this email or by telephone (+44(020 7896 0011) and then delete the
> email
> > and any copies of it. Opinions, conclusion (etc) that do not relate to
> the
> > official business of this company shall be understood as neither given
> nor
> > endorsed by it. IG is a trading name of IG Markets Limited (a company
> > registered in England and Wales, company number 04008957) and IG Index
> > Limited (a company registered in England and Wales, company number
> > 01190902). Registered address at Cannon Bridge House, 25 Dowgate Hill,
> > London EC4R 2YA. Both IG Markets Limited (register number 195355) and IG
> > Index Limited (register number 114059) are authorised and regulated by
> the
> > Financial Conduct Authority.
> >
>


Re: [DISCUSS] KIP-98: Exactly Once Delivery and Transactional Messaging

2016-12-20 Thread Joel Koshy
Just got some time to go through most of this thread and KIP - great to see
this materialize and discussed!!
I will add more comments in the coming days on some of the other "tracks"
in this thread; but since Radai brought up the double-journaling approach
that we had discussed I thought I would move over some content from
our internal
wiki on double-journalling

It is thin on details with a few invalid statements because I don't think
we dwelt long enough on it - it was cast aside as being too expensive from
a storage and latency perspective. As the immediately preceding emails
state, I tend to agree that those are compelling enough reasons to take a
hit in complexity/increased memory usage in the consumer. Anyway, couple of
us at LinkedIn can spend some time today brainstorming a little more on
this today.

1. on write amplification: i dont see x6 the writes, at worst i see x2 the
> writes - once to the "tx log", then read and again to the destination
> partition. if you have some != 1 replication factor than both the 1st and
> the 2nd writes get replicated, but it is still a relative factor of x2.
> what am I missing?
>

I think that's right - it would be six total copies if we are doing RF 3.


> 3. why do writes to a TX need the same guarantees as "plain" writes? in
> cases where the user can live with a TX rollback on change of
> leadership/broker crash the TX log can be unreplicated, and even live in
> the leader's memory. that would cut down on writes. this is also an
> acceptable default in SQL - if your socket connection to a DB dies mid-TX
> your TX is toast (mysql is even worse)
>

I may have misunderstood - while the above may be true for transactions
in-flight, it definitely needs the same guarantees at the point of commit
and the straightforward way to achieve that is to rely on the same
guarantees while the transaction is in flight.

4. even if we replicate the TX log, why do we need to re-read it and
> re-write it to the underlying partition? if its already written to disk all
> I would need is to make that file the current segment of the "real"
> partition and i've avoided the double write (at the cost of complicating
> segment management). if the data is replicated fetchers could do the same.
>

I think we had considered the above as well - i.e., if you abstract the
partition's segments into segments that contain non-transactional messages
and those that contain transactional messages then it should be possible to
jump from one to the other and back. It does add quite a bit of complexity
though and you still need to do buffering on reads so the upside perhaps
isn't worth the effort. I'm not convinced about that though - i.e., may
help to spend more time thinking this one through.


> 5. on latency - youre right, what im suggesting would result in tx ordering
> of messages ,"read committed" semantics and therefore higher latency.


*"read committed"* only if you do the copy back to actual log. If you don't
do that (your point 4) then I think you still need to do buffering to
achieve read-committed semantics.



> 6. the added delay (vs your read uncommitted) would be roughly the time
> span of a TX.


I think it would be significantly less given that this is local copying.



>
>
> On Mon, Dec 19, 2016 at 3:15 PM, Guozhang Wang  wrote:
>
> > One more thing about the double journal proposal: when discussing about
> > this method back at LinkedIn, another raised issue besides double writing
> > was that it will void the offset ordering and enforce people to accept
> > "transaction ordering", that is, consumer will not see messages from the
> > same partition in the order where they were produced, but only in the
> order
> > of when the corresponding transaction was committed. For some scenarios,
> we
> > believe that offset ordering would still be preferred than transaction
> > ordering and that is why in KIP-98 proposal we default to the former
> while
> > leave the door open if users want to switch to the latter case.
> >
> >
> > Guozhang
> >
> > On Mon, Dec 19, 2016 at 10:56 AM, Jay Kreps  wrote:
> >
> > > Hey Radai,
> > >
> > > I'm not sure if I fully understand what you are proposing, but I
> > > interpreted it to be similar to a proposal we worked through back at
> > > LinkedIn. The proposal was to commit to a central txlog topic, and then
> > > recopy to the destination topic upon transaction commit. The
> observation
> > on
> > > that approach at the time were the following:
> > >
> > >1. It is cleaner since the output topics have only committed data!
> > >2. You need full replication on the txlog topic to ensure atomicity.
> > We
> > >weren't able to come up with a solution where you buffer in memory
> or
> > > use
> > >renaming tricks the way you are describing. The reason is that once
> > you
> > >begin committing you must ensure 

[jira] [Resolved] (KAFKA-4250) make ProducerRecord and ConsumerRecord extensible

2016-11-30 Thread Joel Koshy (JIRA)

 [ 
https://issues.apache.org/jira/browse/KAFKA-4250?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Joel Koshy resolved KAFKA-4250.
---
Resolution: Fixed

> make ProducerRecord and ConsumerRecord extensible
> -
>
> Key: KAFKA-4250
> URL: https://issues.apache.org/jira/browse/KAFKA-4250
> Project: Kafka
>  Issue Type: Improvement
>  Components: core
>Affects Versions: 0.10.0.1
>Reporter: radai rosenblatt
>Assignee: radai rosenblatt
> Fix For: 0.10.2.0
>
>
> KafkaProducer and KafkaConsumer implement interfaces are are designed to be 
> extensible (or at least allow it).
> ProducerRecord and ConsumerRecord, however, are final, making extending 
> producer/consumer very difficult.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Reopened] (KAFKA-4250) make ProducerRecord and ConsumerRecord extensible

2016-11-30 Thread Joel Koshy (JIRA)

 [ 
https://issues.apache.org/jira/browse/KAFKA-4250?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Joel Koshy reopened KAFKA-4250:
---

cc [~becket_qin] [~nsolis] [~radai]

> make ProducerRecord and ConsumerRecord extensible
> -
>
> Key: KAFKA-4250
> URL: https://issues.apache.org/jira/browse/KAFKA-4250
> Project: Kafka
>  Issue Type: Improvement
>  Components: core
>Affects Versions: 0.10.0.1
>Reporter: radai rosenblatt
>Assignee: radai rosenblatt
> Fix For: 0.10.1.1, 0.10.2.0
>
>
> KafkaProducer and KafkaConsumer implement interfaces are are designed to be 
> extensible (or at least allow it).
> ProducerRecord and ConsumerRecord, however, are final, making extending 
> producer/consumer very difficult.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Updated] (KAFKA-4250) make ProducerRecord and ConsumerRecord extensible

2016-11-30 Thread Joel Koshy (JIRA)

 [ 
https://issues.apache.org/jira/browse/KAFKA-4250?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Joel Koshy updated KAFKA-4250:
--
Fix Version/s: 0.10.2.0

> make ProducerRecord and ConsumerRecord extensible
> -
>
> Key: KAFKA-4250
> URL: https://issues.apache.org/jira/browse/KAFKA-4250
> Project: Kafka
>  Issue Type: Improvement
>  Components: core
>Affects Versions: 0.10.0.1
>Reporter: radai rosenblatt
>Assignee: radai rosenblatt
> Fix For: 0.10.1.1, 0.10.2.0
>
>
> KafkaProducer and KafkaConsumer implement interfaces are are designed to be 
> extensible (or at least allow it).
> ProducerRecord and ConsumerRecord, however, are final, making extending 
> producer/consumer very difficult.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Updated] (KAFKA-1911) Log deletion on stopping replicas should be async

2016-11-30 Thread Joel Koshy (JIRA)

 [ 
https://issues.apache.org/jira/browse/KAFKA-1911?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Joel Koshy updated KAFKA-1911:
--
Resolution: Fixed
Status: Resolved  (was: Patch Available)

> Log deletion on stopping replicas should be async
> -
>
> Key: KAFKA-1911
> URL: https://issues.apache.org/jira/browse/KAFKA-1911
> Project: Kafka
>  Issue Type: Bug
>  Components: log, replication
>    Reporter: Joel Koshy
>Assignee: Mayuresh Gharat
>  Labels: newbie++, newbiee
>
> If a StopReplicaRequest sets delete=true then we do a file.delete on the file 
> message sets. I was under the impression that this is fast but it does not 
> seem to be the case.
> On a partition reassignment in our cluster the local time for stop replica 
> took nearly 30 seconds.
> {noformat}
> Completed request:Name: StopReplicaRequest; Version: 0; CorrelationId: 467; 
> ClientId: ;DeletePartitions: true; ControllerId: 1212; ControllerEpoch: 
> 53 from 
> client/...:45964;totalTime:29191,requestQueueTime:1,localTime:29190,remoteTime:0,responseQueueTime:0,sendTime:0
> {noformat}
> This ties up one API thread for the duration of the request.
> Specifically in our case, the queue times for other requests also went up and 
> producers to the partition that was just deleted on the old leader took a 
> while to refresh their metadata (see KAFKA-1303) and eventually ran out of 
> retries on some messages leading to data loss.
> I think the log deletion in this case should be fully asynchronous although 
> we need to handle the case when a broker may respond immediately to the 
> stop-replica-request but then go down after deleting only some of the log 
> segments.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


Re: [DISCUSS] Deprecating the old consumers in trunk

2016-11-17 Thread Joel Koshy
Not sure it is worth doing, but a simple migration approach that avoids
*service* downtime could be as follows:

   - Add a “migration mode” to the old consumer that disables its fetchers
   and disables offset commits. i.e., the consumers rebalance and own
   partitions but do basically nothing.
   - So assuming the old consumer is already committing offsets to Kafka,
   the process would be:
   - Bounce the consumer group (still on the old consumer) with:
 - Migration mode on
 - consumer.timeout.ms -1
  - Bounce the consumer group to switch to the new consumer
   - i.e., effectively pause and resume the entire group without real
   downtime of the services.



On Thu, Nov 17, 2016 at 7:30 PM, Ismael Juma  wrote:

> Thanks James. I had read your post and was planning to find it in order to
> share it here so you saved me some work. :)
>
> Ismael
>
> On Fri, Nov 18, 2016 at 3:21 AM, James Cheng  wrote:
>
> > Sorry to self-plug, but I wrote a blog post that talks about this, with
> > respect to mirrormaker. I came to the same 3 solutions that Onur
> described.
> >
> > https://logallthethings.com/2016/10/07/mirrormaker-
> > gotchas-when-moving-from-the-old-consumer-to-the-new-consumer/ <
> > https://logallthethings.com/2016/10/07/mirrormaker-
> > gotchas-when-moving-from-the-old-consumer-to-the-new-consumer/>
> >
> > -James
> >
> > > On Nov 17, 2016, at 7:37 AM, Ismael Juma  wrote:
> > >
> > > Hi Onur,
> > >
> > > It is a good point that there is currently no out of the box solution
> for
> > > migrating from the old consumer to the new consumer where neither
> > downtime
> > > or duplicate consumption are acceptable. As I understand, this is
> > important
> > > for some of the usages at LinkedIn. Do you have any plans to tackle
> this
> > > issue?
> > >
> > > Jason, any thoughts on this?
> > >
> > > Ismael
> > >
> > > On Mon, Oct 31, 2016 at 11:03 PM, Onur Karaman <
> > > okara...@linkedin.com.invalid> wrote:
> > >
> > >> Does this make sense given that we still don't have a graceful
> migration
> > >> plan from the old to new consumer?
> > >>
> > >> Different suboptimal migration plans that I can think of are:
> > >> 1. shutdown all the old consumers of a group first and start them back
> > up
> > >> with the new consumer, causing downtime.
> > >> 2. have a mix of old and new consumers in the same group, causing
> > duplicate
> > >> partition ownership and consumption as each rebalance protocol ignores
> > the
> > >> other.
> > >> 3. form a brand new group for the new consumers doing the same work as
> > the
> > >> old consumer group, still causing duplicate partition ownership and
> > >> consumption across the two groups.
> > >>
> > >> On Mon, Oct 31, 2016 at 3:42 PM, Jun Rao  wrote:
> > >>
> > >>> Starting to deprecate the old consumer in the next release seems
> like a
> > >>> good idea.
> > >>>
> > >>> Thanks,
> > >>>
> > >>> Jun
> > >>>
> > >>> On Tue, Oct 25, 2016 at 2:45 AM, Ismael Juma 
> > wrote:
> > >>>
> >  Hi all,
> > 
> >  In 0.10.1.0, we removed the beta label from the new Java consumer
> >  documentation and updated the various tools so that they can use the
> > >> new
> >  consumer without having to pass the `--new-consumer` flag (more
> >  specifically the new consumer is used if `bootstrap-server` is set).
> > >> More
> >  details of the reasoning can be found in the original discuss
> thread:
> >  http://search-hadoop.com/m/Kafka/uyzND1e4bUP1Rjq721
> > 
> >  The old consumers don't have security or `offsetsForTimestamp`
> > (KIP-79)
> >  support and the plan is to only add features to the new Java
> consumer.
> > >>> Even
> >  so, the old consumers are a significant maintenance burden as they
> >  duplicate protocol request/response classes (the SimpleConsumer
> > exposes
> >  them in the public API sadly). I experienced this first hand most
> > >>> recently
> >  while working on KIP-74.
> > 
> >  Given the above, I propose we deprecate the old consumers in trunk
> to
> > >>> nudge
> >  users in the right direction. Users will have the 0.10.1.0 cycle to
> > >> start
> >  migrating to the new Java consumer with no build warnings. Once they
> >  upgrade to the next release (i.e. 0.10.2.0), users who are still
> using
> > >>> the
> >  old consumers will get warnings at build time encouraging them to
> move
> > >> to
> >  the new consumer, but everything will still work fine.
> > 
> >  In a future major release, the old consumers (along with the old
> > >>> producers)
> >  will be removed. We will have a separate discuss/vote thread for
> that
> > >> to
> >  make sure the time is right.
> > 
> >  Thoughts?
> > 
> >  Ismael
> > 
> > >>>
> > >>
> >
> >
>


[jira] [Created] (KAFKA-4409) ZK consumer shutdown/topic event deadlock

2016-11-14 Thread Joel Koshy (JIRA)
Joel Koshy created KAFKA-4409:
-

 Summary: ZK consumer shutdown/topic event deadlock
 Key: KAFKA-4409
 URL: https://issues.apache.org/jira/browse/KAFKA-4409
 Project: Kafka
  Issue Type: Bug
Reporter: Joel Koshy


This only applies to the old zookeeper consumer. It is trivial enough to fix.

The consumer can deadlock on shutdown if a topic event fires during shutdown. 
The shutdown acquires the rebalance lock and then the topic-event-watcher lock. 
The topic event watcher acquires these in the reverse order. Shutdown should 
not need to acquire the topic-event-watcher’s lock - all it does is 
unsubscribes from topic events.

Stack trace:
{noformat}
"mirrormaker-thread-0":
at 
kafka.consumer.ZookeeperTopicEventWatcher.shutdown(ZookeeperTopicEventWatcher.scala:50)
- waiting to lock <0x00072a65d508> (a java.lang.Object)
at 
kafka.consumer.ZookeeperConsumerConnector.shutdown(ZookeeperConsumerConnector.scala:216)
- locked <0x0007103c69c0> (a java.lang.Object)
at 
kafka.tools.MirrorMaker$MirrorMakerOldConsumer.cleanup(MirrorMaker.scala:519)
at 
kafka.tools.MirrorMaker$MirrorMakerThread$$anonfun$run$3.apply$mcV$sp(MirrorMaker.scala:441)
at kafka.utils.CoreUtils$.swallow(CoreUtils.scala:76)
at kafka.utils.Logging$class.swallowWarn(Logging.scala:92)
at kafka.utils.CoreUtils$.swallowWarn(CoreUtils.scala:47)
at kafka.utils.Logging$class.swallow(Logging.scala:94)
at kafka.utils.CoreUtils$.swallow(CoreUtils.scala:47)
at kafka.tools.MirrorMaker$MirrorMakerThread.run(MirrorMaker.scala:441)
"ZkClient-EventThread-58-":
at 
kafka.consumer.ZookeeperConsumerConnector$ZKRebalancerListener.syncedRebalance(ZookeeperConsumerConnector.scala:639)
- waiting to lock <0x0007103c69c0> (a java.lang.Object)
at 
kafka.consumer.ZookeeperConsumerConnector.kafka$consumer$ZookeeperConsumerConnector$$reinitializeConsumer(ZookeeperConsumerConnector.scala:982)
at 
kafka.consumer.ZookeeperConsumerConnector$WildcardStreamsHandler.handleTopicEvent(ZookeeperConsumerConnector.scala:1048)
at 
kafka.consumer.ZookeeperTopicEventWatcher$ZkTopicEventListener.liftedTree1$1(ZookeeperTopicEventWatcher.scala:69)
at 
kafka.consumer.ZookeeperTopicEventWatcher$ZkTopicEventListener.handleChildChange(ZookeeperTopicEventWatcher.scala:65)
- locked <0x00072a65d508> (a java.lang.Object)
at org.I0Itec.zkclient.ZkClient$10.run(ZkClient.java:842)
at org.I0Itec.zkclient.ZkEventThread.run(ZkEventThread.java:71)
Found one Java-level deadlock:
=
"mirrormaker-thread-0":
  waiting to lock monitor 0x7f1f38029748 (object 0x00072a65d508, a 
java.lang.Object),
  which is held by "ZkClient-EventThread-58-"
"ZkClient-EventThread-58-":
  waiting to lock monitor 0x7f1e900249a8 (object 0x0007103c69c0, a 
java.lang.Object),
  which is held by "mirrormaker-thread-0"
{noformat}



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


Re: any plans to switch to java 8?

2016-11-10 Thread Joel Koshy
http://markmail.org/message/gnrn5ccql7a2pmc5
We can bump that up to revisit the discussion. That thread didn't have any
closure, but has a lot of background information.

On Thu, Nov 10, 2016 at 10:37 AM, Sean McCauliff 
wrote:

> Wait for JDK 9 which is supposed to be 4-5 months from now?
>
> Sean
>
> On Thu, Nov 10, 2016 at 10:23 AM, radai 
> wrote:
> > with java 7 being EOL'ed for more than a year and a half now (apr 2015,
> see
> > http://www.oracle.com/technetwork/java/eol-135779.html) i was wondering
> if
> > there's an official plan/timetable for transitioning the kafka codebase
> > over to java 8?
>


[jira] [Commented] (KAFKA-4362) Consumer can fail after reassignment of the offsets topic partition

2016-11-07 Thread Joel Koshy (JIRA)

[ 
https://issues.apache.org/jira/browse/KAFKA-4362?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15645583#comment-15645583
 ] 

Joel Koshy commented on KAFKA-4362:
---

In this specific issue, the coordinator is available, but has moved to another 
broker. The client isn't informed of this movement though since it gets an 
unknown error (-1). With the Java client such errors should be automatically 
handled - i.e., rediscover the coordinator.

> Consumer can fail after reassignment of the offsets topic partition
> ---
>
> Key: KAFKA-4362
> URL: https://issues.apache.org/jira/browse/KAFKA-4362
> Project: Kafka
>  Issue Type: Bug
>Affects Versions: 0.10.1.0
>    Reporter: Joel Koshy
>Assignee: Mayuresh Gharat
>
> When a consumer offsets topic partition reassignment completes, an offset 
> commit shows this:
> {code}
> java.lang.IllegalArgumentException: Message format version for partition 100 
> not found
> at 
> kafka.coordinator.GroupMetadataManager$$anonfun$14.apply(GroupMetadataManager.scala:633)
>  ~[kafka_2.10.jar:?]
> at 
> kafka.coordinator.GroupMetadataManager$$anonfun$14.apply(GroupMetadataManager.scala:633)
>  ~[kafka_2.10.jar:?]
> at scala.Option.getOrElse(Option.scala:120) ~[scala-library-2.10.4.jar:?]
> at 
> kafka.coordinator.GroupMetadataManager.kafka$coordinator$GroupMetadataManager$$getMessageFormatVersionAndTimestamp(GroupMetadataManager.scala:632)
>  ~[kafka_2.10.jar:?]
> at 
> ...
> {code}
> The issue is that the replica has been deleted so the 
> {{GroupMetadataManager.getMessageFormatVersionAndTimestamp}} throws this 
> exception instead which propagates as an unknown error.
> Unfortunately consumers don't respond to this and will fail their offset 
> commits.
> One workaround in the above situation is to bounce the cluster - the consumer 
> will be forced to rediscover the group coordinator.
> (Incidentally, the message incorrectly prints the number of partitions 
> instead of the actual partition.)



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (KAFKA-4362) Consumer can fail after reassignment of the offsets topic partition

2016-11-07 Thread Joel Koshy (JIRA)

[ 
https://issues.apache.org/jira/browse/KAFKA-4362?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15645427#comment-15645427
 ] 

Joel Koshy commented on KAFKA-4362:
---

Sorry - missed this comment. It does not recover because the exception is 
thrown before the point the broker determines that it is no longer the 
coordinator.

> Consumer can fail after reassignment of the offsets topic partition
> ---
>
> Key: KAFKA-4362
> URL: https://issues.apache.org/jira/browse/KAFKA-4362
> Project: Kafka
>  Issue Type: Bug
>Affects Versions: 0.10.1.0
>    Reporter: Joel Koshy
>Assignee: Mayuresh Gharat
>
> When a consumer offsets topic partition reassignment completes, an offset 
> commit shows this:
> {code}
> java.lang.IllegalArgumentException: Message format version for partition 100 
> not found
> at 
> kafka.coordinator.GroupMetadataManager$$anonfun$14.apply(GroupMetadataManager.scala:633)
>  ~[kafka_2.10.jar:?]
> at 
> kafka.coordinator.GroupMetadataManager$$anonfun$14.apply(GroupMetadataManager.scala:633)
>  ~[kafka_2.10.jar:?]
> at scala.Option.getOrElse(Option.scala:120) ~[scala-library-2.10.4.jar:?]
> at 
> kafka.coordinator.GroupMetadataManager.kafka$coordinator$GroupMetadataManager$$getMessageFormatVersionAndTimestamp(GroupMetadataManager.scala:632)
>  ~[kafka_2.10.jar:?]
> at 
> ...
> {code}
> The issue is that the replica has been deleted so the 
> {{GroupMetadataManager.getMessageFormatVersionAndTimestamp}} throws this 
> exception instead which propagates as an unknown error.
> Unfortunately consumers don't respond to this and will fail their offset 
> commits.
> One workaround in the above situation is to bounce the cluster - the consumer 
> will be forced to rediscover the group coordinator.
> (Incidentally, the message incorrectly prints the number of partitions 
> instead of the actual partition.)



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (KAFKA-4381) Add per partition lag metric to KafkaConsumer.

2016-11-07 Thread Joel Koshy (JIRA)

[ 
https://issues.apache.org/jira/browse/KAFKA-4381?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15645419#comment-15645419
 ] 

Joel Koshy commented on KAFKA-4381:
---

This is up to you but I definitely agree with Jason that it's overkill to have 
KIPs for this level of improvement. Configs are similar as well unless they are 
very nuanced such as timeouts. The PR itself should be sufficient to serve as a 
forum to discuss any concerns.

Metric name changes are different since presumably people are already 
monitoring those metrics - such changes could deserve a KIP or even just a 
email heads-up.

> Add per partition lag metric to KafkaConsumer.
> --
>
> Key: KAFKA-4381
> URL: https://issues.apache.org/jira/browse/KAFKA-4381
> Project: Kafka
>  Issue Type: Task
>  Components: clients, consumer
>Affects Versions: 0.10.1.0
>Reporter: Jiangjie Qin
>Assignee: Jiangjie Qin
> Fix For: 0.10.2.0
>
>
> Currently KafkaConsumer only has a metric of max lag across all the 
> partitions. It would be useful to know per partition lag as well.
> I remember there was a ticket created before but did not find it. So I am 
> creating this ticket.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


Re: why cant SslTransportLayer be muted before handshake completion?

2016-11-02 Thread Joel Koshy
Sriharsha can validate this, but I think the reason is that if we allow
muting/unmuting at will (via those public APIs) that can completely mess up
the handshake itself. It should be possible to pause/resume the handshake
if that's what you'r elooking for but I'm not sure it is worth it for the
purposes of KIP-72 given the small volumes of reads/writes involved in
handshaking.

On Wed, Nov 2, 2016 at 4:24 PM, radai  wrote:

> Hi,
>
> as part of testing my code for KIP-72 (broker memory control), i ran into
> the following code snippet in SslTransportLayer:
>
> public void removeInterestOps(int ops) {
> if (!key.isValid())
> throw new CancelledKeyException();
> else if (!handshakeComplete)
> throw new IllegalStateException("handshake is not completed");
>
> key.interestOps(key.interestOps() & ~ops);
> }
>
> why cant an ssl socket be muted before handshake is complete?
>


[jira] [Updated] (KAFKA-4362) Consumer can fail after reassignment of the offsets topic partition

2016-11-01 Thread Joel Koshy (JIRA)

 [ 
https://issues.apache.org/jira/browse/KAFKA-4362?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Joel Koshy updated KAFKA-4362:
--
Summary: Consumer can fail after reassignment of the offsets topic 
partition  (was: Offset commits fail after a partition reassignment)

Yes definitely an issue, so I'm updating the title. So the reassignment of the 
offsets topic will perpetually cause offset commits to fail. A new consumer 
joining the group will talk to the new coordinator and incorrectly becomes an 
isolated group. Any rebalance of the remaining instances of the actual group 
(that's still talking to the old coordinator) can hit this error and die:

{code}
[2016-11-01 15:37:56,120] WARN Auto offset commit failed for group testgroup: 
Unexpected error in commit: The server experienced an unexpected error when 
processing the request 
(org.apache.kafka.clients.consumer.internals.ConsumerCoordinator)
...


...

[2016-11-01 15:37:56,120] INFO Revoking previously assigned partitions 
[testtopic-0, testtopic-1] for group testgroup 
(org.apache.kafka.clients.consumer.internals.ConsumerCoordinator)
[2016-11-01 15:37:56,120] INFO (Re-)joining group testgroup 
(org.apache.kafka.clients.consumer.internals.AbstractCoordinator)
[2016-11-01 15:37:56,124] ERROR Error processing message, terminating consumer 
process:  (kafka.tools.ConsoleConsumer$)
org.apache.kafka.common.KafkaException: Unexpected error from SyncGroup: The 
server experienced an unexpected error when processing the request
at 
org.apache.kafka.clients.consumer.internals.AbstractCoordinator$SyncGroupResponseHandler.handle(AbstractCoordinator.java:518)
at 
org.apache.kafka.clients.consumer.internals.AbstractCoordinator$SyncGroupResponseHandler.handle(AbstractCoordinator.java:485)
at 
org.apache.kafka.clients.consumer.internals.AbstractCoordinator$CoordinatorResponseHandler.onSuccess(AbstractCoordinator.java:742)
at 
org.apache.kafka.clients.consumer.internals.AbstractCoordinator$CoordinatorResponseHandler.onSuccess(AbstractCoordinator.java:722)
at 
org.apache.kafka.clients.consumer.internals.RequestFuture$1.onSuccess(RequestFuture.java:186)
at 
org.apache.kafka.clients.consumer.internals.RequestFuture.fireSuccess(RequestFuture.java:149)
at 
org.apache.kafka.clients.consumer.internals.RequestFuture.complete(RequestFuture.java:116)
at 
org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient$RequestFutureCompletionHandler.fireCompletion(ConsumerNetworkClient.java:479)
at 
org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.firePendingCompletedRequests(ConsumerNetworkClient.java:316)
at 
org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.poll(ConsumerNetworkClient.java:256)
at 
org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.poll(ConsumerNetworkClient.java:180)
at 
org.apache.kafka.clients.consumer.internals.AbstractCoordinator.joinGroupIfNeeded(AbstractCoordinator.java:308)
at 
org.apache.kafka.clients.consumer.internals.AbstractCoordinator.ensureActiveGroup(AbstractCoordinator.java:277)
at 
org.apache.kafka.clients.consumer.internals.ConsumerCoordinator.poll(ConsumerCoordinator.java:259)
at 
org.apache.kafka.clients.consumer.KafkaConsumer.pollOnce(KafkaConsumer.java:1013)
at 
org.apache.kafka.clients.consumer.KafkaConsumer.poll(KafkaConsumer.java:979)
at kafka.consumer.NewShinyConsumer.receive(BaseConsumer.scala:100)
at kafka.tools.ConsoleConsumer$.process(ConsoleConsumer.scala:120)
at kafka.tools.ConsoleConsumer$.run(ConsoleConsumer.scala:75)
at kafka.tools.ConsoleConsumer$.main(ConsoleConsumer.scala:50)
at kafka.tools.ConsoleConsumer.main(ConsoleConsumer.scala)
{code}

> Consumer can fail after reassignment of the offsets topic partition
> ---
>
> Key: KAFKA-4362
> URL: https://issues.apache.org/jira/browse/KAFKA-4362
> Project: Kafka
>  Issue Type: Bug
>Affects Versions: 0.10.1.0
>    Reporter: Joel Koshy
>Assignee: Jiangjie Qin
>
> When a consumer offsets topic partition reassignment completes, an offset 
> commit shows this:
> {code}
> java.lang.IllegalArgumentException: Message format version for partition 100 
> not found
> at 
> kafka.coordinator.GroupMetadataManager$$anonfun$14.apply(GroupMetadataManager.scala:633)
>  ~[kafka_2.10.jar:?]
> at 
> kafka.coordinator.GroupMetadataManager$$anonfun$14.apply(GroupMetadataManager.scala:633)
>  ~[kafka_2.10.jar:?]
> at scala.Option.getOrElse(Option.scala:120) ~[scala-library-2.10.4.jar:?]
> at 
> kafka.coordinator.GroupMetadataManager.kafka$coordinator$GroupMetadataManager$$getMessageFo

[jira] [Commented] (KAFKA-4362) Offset commits fail after a partition reassignment

2016-11-01 Thread Joel Koshy (JIRA)

[ 
https://issues.apache.org/jira/browse/KAFKA-4362?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15624520#comment-15624520
 ] 

Joel Koshy commented on KAFKA-4362:
---

Btw, the summary doesn't make it clear that this also affects operations such 
as sync-group/join-group in the new consumer as well.
I glanced through the new consumer code's handling on unknown error. 
Specifically we will need to rediscover the coordinator to recover from this. 
It does not appear to do this, but will double-check tomorrow.

> Offset commits fail after a partition reassignment
> --
>
> Key: KAFKA-4362
> URL: https://issues.apache.org/jira/browse/KAFKA-4362
> Project: Kafka
>  Issue Type: Bug
>Affects Versions: 0.10.1.0
>    Reporter: Joel Koshy
>Assignee: Jiangjie Qin
>
> When a consumer offsets topic partition reassignment completes, an offset 
> commit shows this:
> {code}
> java.lang.IllegalArgumentException: Message format version for partition 100 
> not found
> at 
> kafka.coordinator.GroupMetadataManager$$anonfun$14.apply(GroupMetadataManager.scala:633)
>  ~[kafka_2.10.jar:?]
> at 
> kafka.coordinator.GroupMetadataManager$$anonfun$14.apply(GroupMetadataManager.scala:633)
>  ~[kafka_2.10.jar:?]
> at scala.Option.getOrElse(Option.scala:120) ~[scala-library-2.10.4.jar:?]
> at 
> kafka.coordinator.GroupMetadataManager.kafka$coordinator$GroupMetadataManager$$getMessageFormatVersionAndTimestamp(GroupMetadataManager.scala:632)
>  ~[kafka_2.10.jar:?]
> at 
> ...
> {code}
> The issue is that the replica has been deleted so the 
> {{GroupMetadataManager.getMessageFormatVersionAndTimestamp}} throws this 
> exception instead which propagates as an unknown error.
> Unfortunately consumers don't respond to this and will fail their offset 
> commits.
> One workaround in the above situation is to bounce the cluster - the consumer 
> will be forced to rediscover the group coordinator.
> (Incidentally, the message incorrectly prints the number of partitions 
> instead of the actual partition.)



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Created] (KAFKA-4362) Offset commits fail after a partition reassignment

2016-10-31 Thread Joel Koshy (JIRA)
Joel Koshy created KAFKA-4362:
-

 Summary: Offset commits fail after a partition reassignment
 Key: KAFKA-4362
 URL: https://issues.apache.org/jira/browse/KAFKA-4362
 Project: Kafka
  Issue Type: Bug
Affects Versions: 0.10.1.0
Reporter: Joel Koshy
Assignee: Jiangjie Qin


When a consumer offsets topic partition reassignment completes, an offset 
commit shows this:

{code}
java.lang.IllegalArgumentException: Message format version for partition 100 
not found
at 
kafka.coordinator.GroupMetadataManager$$anonfun$14.apply(GroupMetadataManager.scala:633)
 ~[kafka_2.10.jar:?]
at 
kafka.coordinator.GroupMetadataManager$$anonfun$14.apply(GroupMetadataManager.scala:633)
 ~[kafka_2.10.jar:?]
at scala.Option.getOrElse(Option.scala:120) ~[scala-library-2.10.4.jar:?]
at 
kafka.coordinator.GroupMetadataManager.kafka$coordinator$GroupMetadataManager$$getMessageFormatVersionAndTimestamp(GroupMetadataManager.scala:632)
 ~[kafka_2.10.jar:?]
at 
...
{code}

The issue is that the replica has been deleted so the 
{{GroupMetadataManager.getMessageFormatVersionAndTimestamp}} throws this 
exception instead which propagates as an unknown error.

Unfortunately consumers don't respond to this and will fail their offset 
commits.

One workaround in the above situation is to bounce the cluster - the consumer 
will be forced to rediscover the group coordinator.

(Incidentally, the message incorrectly prints the number of partitions instead 
of the actual partition.)



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[ANNOUNCE] New committer: Jiangjie (Becket) Qin

2016-10-31 Thread Joel Koshy
The PMC for Apache Kafka has invited Jiangjie (Becket) Qin to join as a
committer and we are pleased to announce that he has accepted!

Becket has made significant contributions to Kafka over the last two years.
He has been deeply involved in a broad range of KIP discussions and has
contributed several major features to the project. He recently completed
the implementation of a series of improvements (KIP-31, KIP-32, KIP-33) to
Kafka’s message format that address a number of long-standing issues such
as avoiding server-side re-compression, better accuracy for time-based log
retention, log roll and time-based indexing of messages.

Congratulations Becket! Thank you for your many contributions. We are
excited to have you on board as a committer and look forward to your
continued participation!

Joel


Re: [DISCUSS] KIP-81: Max in-flight fetches

2016-10-31 Thread Joel Koshy
Agreed with this approach.
One detail to be wary of is that since we multiplex various other requests
(e.g., heartbeats, offset commits, metadata, etc.) over the client that
connects to the coordinator this could delay some of these critical
requests. Realistically I don't think it will be an issue except in extreme
scenarios where someone sets the memory limit to be unreasonably low.

Thanks,

Joel

On Sun, Oct 30, 2016 at 12:32 PM, Jun Rao  wrote:

> Hi, Mickael,
>
> I agree with others that it's better to be able to control the bytes the
> consumer can read from sockets, instead of limiting the fetch requests.
> KIP-72 has a proposal to bound the memory size at the socket selector
> level. Perhaps that can be leveraged in this KIP too.
>
> https://cwiki.apache.org/confluence/display/KAFKA/KIP-
> 72%3A+Allow+putting+a+bound+on+memory+consumed+by+Incoming+requests
>
> Thanks,
>
> Jun
>
> On Thu, Oct 27, 2016 at 3:23 PM, Jay Kreps  wrote:
>
> > This is a good observation on limiting total memory usage. If I
> understand
> > the proposal I think it is that the consumer client would stop sending
> > fetch requests once a certain number of in-flight fetch requests is met.
> I
> > think a better approach would be to always issue one fetch request to
> each
> > broker immediately, allow the server to process that request, and send
> data
> > back to the local machine where it would be stored in the socket buffer
> (up
> > to that buffer size). Instead of throttling the requests sent, the
> consumer
> > should ideally throttle the responses read from the socket buffer at any
> > given time. That is, in a single poll call, rather than reading from
> every
> > single socket it should just read until it has a given amount of memory
> > used then bail out early. It can come back and read more from the other
> > sockets after those messages are processed.
> >
> > The advantage of this approach is that you don't incur the additional
> > latency.
> >
> > -Jay
> >
> > On Mon, Oct 10, 2016 at 6:41 AM, Mickael Maison <
> mickael.mai...@gmail.com>
> > wrote:
> >
> > > Hi all,
> > >
> > > I would like to discuss the following KIP proposal:
> > > https://cwiki.apache.org/confluence/display/KAFKA/KIP-
> > > 81%3A+Max+in-flight+fetches
> > >
> > >
> > > Feedback and comments are welcome.
> > > Thanks !
> > >
> > > Mickael
> > >
> >
>


[VOTE] KIP-47 - Add timestamp-based log deletion policy

2016-10-28 Thread Joel Koshy
>
> - It seems that the consumer will need to write log.retention.min.timestamp
> periodically to zookeeper as dynamic configuration of the topic, so that
> broker can pick up log.retention.min.timestamp. However, this introduces
> dependency of consumer on zookeeper which is undesirable. Note that we have
>

We will be eliminating the need for manipulating topic configs directly in
ZK with the admin APIs in KIP-4



> log.retention.min.timestamp. However, it is not clear how client
> application can set the log.retention.min.timestamp to address the
> use-case. For example, if there are more than one consumer in the consumer
> group, which consumer(s) write log.retention.min.timestamp to zookeeper?

How does consumer determine the value of log.retention.min.timestamp?


I don't quite see the issue here: this is really up to the application to
handle and applies to the trim() approach as well.

>
> this KIP. And a malicious or misconfigured client can easily delete all
> messages of any topic. How do we address this problem so that operator
> won't have to worry about this?
>

The admin APIs are Kafka RPCs that can all be authorized.

BTW, I like Jun's solution of using offsets and IMO it works. Jun's
> solution would also address some problems above. Some ideas discussed in
> the thread of KIP-68 may help address some problems above.
>

I agree - the trim() approach evades the timestamp issue by dealing with
offsets directly (unless the user explicitly opts to look up offsets by
timestamp). Once we are convinced that this simpler approach can satisfy
the motivation for both this KIP as well as KIP-68 we should probably just
consolidate these as use-cases of a new KIP for the trim() API.


> On Mon, Oct 24, 2016 at 5:29 PM, Bill Warshaw  > wrote:
>
> > Hi Jun,
> >
> > Those are valid concerns.  For our particular use case, application
> events
> > triggering the timestamp update will never occur more than once an hour,
> > and we maintain a sliding window so that we don't delete messages too
> close
> > to what our consumers may be reading.
> > For more general use cases, developers will need to be aware of these
> > issues, and would need to write their application code with that in mind.
> >
> >
> > To your second point: I initially wanted to just have a trim() admin api.
> > I started implementing it, but ran into difficulties with synchronously
> > acknowledging to the calling code that all brokers had truncated the
> given
> > partitions.  It seemed like we would have to do something similar to how
> > topic deletion is implemented, where the initial broker uses Zookeeper to
> > coordinate the deletion on the other brokers.  If you have a simpler idea
> > in mind, I'd be happy to update this KIP to provide a trim() api instead.
> >
> > On Mon, Oct 24, 2016 at 8:15 PM Jun Rao  > wrote:
> >
> > > Hi, Bill,
> > >
> > > Thanks for the proposal. Sorry for the late reply.
> > >
> > > The motivation of the proposal makes sense: don't delete the messages
> > until
> > > the application tells you so.
> > >
> > > I am wondering if the current proposal is the best way to address the
> > need
> > > though. There are couple of issues that I saw with the proposal. (1)
> > > Messages in the log may not always be stored in increasing timestamp
> > order.
> > > Suppose that the application sets log.retention.min.timestamp to T and
> > > after that messages with timestamp older than T ((either due to delay
> or
> > > reprocessing) are published to that topic. Those newly published
> messages
> > > are likely going to be deleted immediately before the application gets
> a
> > > chance to read them, which is probably not what the application wants.
> > (2)
> > > The configuration for the topic has to be changed continuously to
> > implement
> > > the use case. Intuitively, one probably shouldn't be changing a
> > > configuration all the time.
> > >
> > > Another way to achieve the goal is what Jay mentioned earlier. We could
> > add
> > > a trim() api like the following that will trim the log up to the
> > specified
> > > offsets. This addresses both of the above issues that I mentioned. Will
> > > that work for you?
> > >
> > > void trim(Map offsetsToTruncate)
> > >
> > > Thanks,
> > >
> > > Jun
> > >
> > > On Wed, Oct 5, 2016 at 1:55 PM, Bill Warshaw  >
> > wrote:
> > >
> > > > Bumping for visibility.  KIP is here:
> > > > https://cwiki.apache.org/confluence/display/KAFKA/KIP-
> > > > 47+-+Add+timestamp-based+log+deletion+policy
> > > >
> > > > On Wed, Aug 24, 2016 at 2:32 PM Bill Warshaw 

Re: [DISCUSS] KIP-87 - Add Compaction Tombstone Flag

2016-10-26 Thread Joel Koshy
I'm not sure why it would be useful, but it should be theoretically
possible if the attribute bit alone is enough to mark a tombstone. OTOH, we
could consider that as invalid if we wish. These are relevant details that
I think should be added to the KIP.

Also, in the few odd scenarios that I mentioned we should also consider
that fetches could be coming from other yet-to-be-upgraded brokers in a
cluster that is being upgraded. So we would probably want to continue to
support nulls as tombstones or down-convert in a way that we are sure works
with least surprise to fetchers.

There is a slightly vague statement under "Compatibility, Deprecation, and
Migration Plan" that could benefit more details: *Logic would base on
current behavior of null value or if tombstone flag set to true, as such
wouldn't impact any existing flows simply allow new producers to make use
of the feature*. It is unclear to me based on that whether you would
interpret null as a tombstone if the tombstone attribute bit is off.

On Wed, Oct 26, 2016 at 3:10 PM, Xavier Léauté  wrote:

> Does this mean that starting with V4 requests we would allow storing null
> messages in compacted topics? The KIP should probably clarify the behavior
> for null messages where the tombstone flag is not net.
>
> On Wed, Oct 26, 2016 at 1:32 AM Magnus Edenhill 
> wrote:
>
> > 2016-10-25 21:36 GMT+02:00 Nacho Solis :
> >
> > > I think you probably require a MagicByte bump if you expect correct
> > > behavior of the system as a whole.
> > >
> > > From a client perspective you want to make sure that when you deliver a
> > > message that the broker supports the feature you're expecting
> > > (compaction).  So, depending on the behavior of the broker on
> > encountering
> > > a previously undefined bit flag I would suggest making some change to
> > make
> > > certain that flag-based compaction is supported.  I'm going to guess
> that
> > > the MagicByte would do this.
> > >
> >
> > I dont believe this is needed since it is already attributed through the
> > request's API version.
> >
> > Producer:
> >  * if a client sends ProduceRequest V4 then attributes.bit5 indicates a
> > tombstone
> >  * if a clients sends ProduceRequest  > and value==null indicates a tombstone
> >  * in both cases the on-disk messages are stored with attributes.bit5 (I
> > assume?)
> >
> > Consumer:
> >  * if a clients sends FetchRequest V4 messages are sendfile():ed directly
> > from disk (with attributes.bit5)
> >  * if a client sends FetchRequest  > translated from attributes.bit5 to value=null as required.
> >
> >
> > That's my understanding anyway, please correct me if I'm wrong.
> >
> > /Magnus
> >
> >
> >
> > > On Tue, Oct 25, 2016 at 10:17 AM, Magnus Edenhill 
> > > wrote:
> > >
> > > > It is safe to assume that a previously undefined attributes bit will
> be
> > > > unset in protocol requests from existing clients, if not, such a
> client
> > > is
> > > > already violating the protocol and needs to be fixed.
> > > >
> > > > So I dont see a need for a MagicByte bump, both broker and client has
> > the
> > > > information it needs to construct or parse the message according to
> > > request
> > > > version.
> > > >
> > > >
> > > > 2016-10-25 18:48 GMT+02:00 Michael Pearce :
> > > >
> > > > > Hi Magnus,
> > > > >
> > > > > I was wondering if I even needed to change those also, as
> technically
> > > > > we’re just making use of a non used attribute bit, but im not 100%
> > that
> > > > it
> > > > > be always false currently.
> > > > >
> > > > > If someone can say 100% it will already be set false with current
> and
> > > > > historic bit wise masking techniques used over the time, we could
> do
> > > away
> > > > > with both, and simply just start to use it. Unfortunately I don’t
> > have
> > > > that
> > > > > historic knowledge so was hoping it would be flagged up in this
> > > > discussion
> > > > > thread ☺
> > > > >
> > > > > Cheers
> > > > > Mike
> > > > >
> > > > > On 10/25/16, 5:36 PM, "Magnus Edenhill" 
> wrote:
> > > > >
> > > > > Hi Michael,
> > > > >
> > > > > With the version bumps for Produce and Fetch requests, do you
> > > really
> > > > > need
> > > > > to bump MagicByte too?
> > > > >
> > > > > Regards,
> > > > > Magnus
> > > > >
> > > > >
> > > > > 2016-10-25 18:09 GMT+02:00 Michael Pearce <
> michael.pea...@ig.com
> > >:
> > > > >
> > > > > > Hi All,
> > > > > >
> > > > > > I would like to discuss the following KIP proposal:
> > > > > > https://cwiki.apache.org/confluence/display/KAFKA/KIP-
> > > > > > 87+-+Add+Compaction+Tombstone+Flag
> > > > > >
> > > > > > This is off the back of the discussion on KIP-82  / KIP
> meeting
> > > > > where it
> > > > > > was agreed to separate this issue and feature. See:
> > > > > > 

Re: [DISCUSS] KIP-87 - Add Compaction Tombstone Flag

2016-10-26 Thread Joel Koshy
A magic byte bump would be required for the addition of new fields; or
removal of existing fields. Changing the interpretation of an existing
field (e.g., switching from absolute to relative offsets) almost always
needs a magic byte bump as well.

One concern Nacho alluded to (I think) is if a newer client sends a
tombstone with the tombstone attribute set (and non-null value field) to an
older broker. While it is true that a higher magic byte would cause the
(older) broker to reject the message, the fact that it is sending a newer
version of the produce request which the broker does not accept will also
cause it to be rejected. So that safeguard already exists for the broker to
indicate that it does not support flag-based compaction.

So strictly speaking we could do without a magic byte bump. However in this
case, we are also changing the interpretation of the value field. Up until
now, a null entry in the value field of the message would cause it to be
interpreted as a tombstone. With this proposal, that is no longer the case.
In fact if an older client issues a fetch request for a tombstone on a
newer broker we should decide on whether we should down-convert it to the
older format and consider replacing the value field (if non-null) to be
null. Another weird scenario is if there is a non-tombstone message with a
null value. An older client that fetches this message would be unable to
tell whether it is a tombstone or not.

Also, under "rejected alternatives" - I'm not a huge fan of using headers
(should they materialize) to mark tombstones. I strongly prefer the
attribute bit or even the existing mechanism of nulls in payloads over
custom headers.

Thanks,

Joel

On Wed, Oct 26, 2016 at 11:23 AM, Magnus Edenhill 
wrote:

> Hi Renu,
>
> that is not completely true, the LZ4 compression codec was added without a
> MagicByte bump.
> (LZ4 might be a bad example though since this feature was added without
> updating the protocol docs..)
>
> Unless the broker needs the MagicByte internally (for translating old logs
> on disk or whatever)
> I dont see a need for bumping the MagicByte when just adding a bit
> *definition* to an existing field.
>
> With just one MagicByte bump so far it is hard to say what is right or
> wrong, but this might be
> a good time for us to decide on some rules.
>
> I dont have a strong opinion, either way is fine with me.
>
> Regards,
> Magnus
>
>
> 2016-10-26 20:01 GMT+02:00 Renu Tewari :
>
> > +1 @Magnus.
> > It is also in line with traditional use of the magic field to indicate a
> > change in the format of the message. Thus a change in the magic field
> > indicates a different "schema" which in this case would reflect adding a
> > new field, removing a field or changing the type of fields etc.
> >
> > The version number bump does not change the message format but just the
> > interpretation of the values.
> >
> > Going with what @Michael suggested on keeping it simple we don't need to
> a
> > add a new field to indicate a tombstone and therefore require a change in
> > the magic byte field. The attribute bit is sufficient for indicating the
> > new interpretation of attribute.bit5 to indicate a tombstone.
> >
> > Bumping the version number and changing the attribute bit keeps it
> simple.
> >
> >
> > Thanks
> > Renu
> >
> >
> >
> > On Wed, Oct 26, 2016 at 10:09 AM, Mayuresh Gharat <
> > gharatmayures...@gmail.com> wrote:
> >
> > > I see the reasoning Magnus described.
> > > If you check the docs https://kafka.apache.org/
> > documentation#messageformat
> > > ,
> > > it says :
> > >
> > > 1 byte "magic" identifier to allow format changes, value is 0 or 1
> > >
> > > Moreover as per comments in the code :
> > > /**
> > >* The "magic" value
> > >* When magic value is 0, the message uses absolute offset and does
> not
> > > have a timestamp field.
> > >* When magic value is 1, the message uses relative offset and has a
> > > timestamp field.
> > >*/
> > >
> > > Since timeStamp was added as an actual field we bumped the the magic
> byte
> > > to 1 for this change.
> > > But since we are not adding an actual field, we can do away with
> bumping
> > up
> > > the magic byte.
> > >
> > > If we really want to go the standard route of bumping up the magic byte
> > for
> > > any change to message format we should actually add a new field for
> > > handling log compaction instead of just using the attribute field,
> which
> > > might sound like an overkill.
> > >
> > > Thanks,
> > >
> > > Mayuresh
> > >
> > >
> > >
> > >
> > >
> > >
> > > On Wed, Oct 26, 2016 at 1:32 AM, Magnus Edenhill 
> > > wrote:
> > >
> > > > 2016-10-25 21:36 GMT+02:00 Nacho Solis  >:
> > > >
> > > > > I think you probably require a MagicByte bump if you expect correct
> > > > > behavior of the system as a whole.
> > > > >
> > > > > From a client perspective you want to make sure that when you
> > deliver a
> > > > > 

Re: [DISCUSS] KIP-68 Add a consumed log retention before log retention

2016-10-25 Thread Joel Koshy
+1 - I was thinking the exact same thing.

On Tue, Oct 25, 2016 at 2:52 PM, Jun Rao  wrote:

> One of the main reasons for retaining messages on the broker after
> consumption is to support replay. A common reason for replay is to fix and
> application error. So, it seems that it's a bit hard to delete log segments
> just based on the committed offsets that the broker knows. An alternative
> approach is to support an api that can trim the log up to a specified
> offset (similar to what's being discussed in KIP-47). This way, an
> application can control when and how much to trim the log.
>
> Thanks,
>
> Jun
>
> On Mon, Oct 24, 2016 at 11:11 AM, Guozhang Wang 
> wrote:
>
> > Overall I think the motivation is common and of interests to lots of
> users.
> > Would like to throw my two cents on this discussion:
> >
> > 1. Kafka topics can be used in different ways. For some categories of
> > topics (think: "pageView" event topics), it is a shared topic among
> > different teams / apps within the organization and lots of temporary
> > consumers (for debugging, trouble shooting, prototype development, etc)
> can
> > come and go dynamically, in which case it is hard to track all of such
> > consumer and maintain the minimum committed offsets; on the other hand,
> > there are another category of topics (think: stream-app owned
> intermediate
> > topics like "pricing-enriched-bid-activity", as Becket mentioned above)
> > which are particularly own but only one or a few apps, and hence the
> > consumer groups for those topics are pre-defined and roughly static. In
> > this case I think it makes sense to allow such consumer-drive log
> retention
> > features.
> >
> > 2. In this case, my question is then whether this bookkeeping of
> > min-committed-offsets should be done at the brokers side or it should be
> on
> > the app side. My gut feeling is that it could be better bookkept on the
> app
> > (i.e. client) side which has the full information of the "registered
> > consumer groups" for certain topics, and then knows the
> > min-committed-offsets. And a slightly-modified KIP-47 mentioned by Dong
> > could a better fit, where a) app side bookkeep the consumer-driven min
> > offset based on their committed offsets, by either talking to the
> consumer
> > clients directly or query broker for the committed offsets of those
> > registered consumer groups, and then b) write
> > *log.retention.min.offset* periodically
> > to broker to let it delete old segments before that offset (NOTE that the
> > semantics is exactly the same as to KIP-47, while the only difference is
> > that we use offset instead of timestamp to indicate, which can be honor
> by
> > the same implementation of KIP-47 on broker side).
> >
> > My arguments for letting the app side to bookkeep such min-offsets and
> only
> > let brokers to take requests to delete segments accordingly are 1)
> keeping
> > the broker simple without any querying each other about such offsets and
> > does the min() calculation, rather only keeping / deleting messages from
> > client admin requests, and 2) allowing more generalized client-driven log
> > retention policies with KIP-47 (i.e. broker is brainless and only take
> > requests while client-app can apply any customized logic to determine the
> > config values of *og.retention.min.offset or
> **og.retention.min.timestamp*
> > that
> > they send to the brokers).
> >
> >
> >
> > Guozhang
> >
> >
> > On Sat, Oct 22, 2016 at 5:46 PM, Becket Qin 
> wrote:
> >
> > > Hi David,
> > >
> > > > 1. What scenario is used to this configuration?
> > >
> > > One scenario is stream processing pipeline. In a stream processing DAG,
> > > there will be a bunch of intermediate result, we only care about the
> > > consumer group that is in the downstream of the DAG, but not other
> > groups.
> > > Ideally we want to delete the log of the intermediate topics right
> after
> > > all the downstream processing jobs has successfully processed the
> > messages.
> > > In that case, we only care about the downstream processing jobs, but
> not
> > > other groups. That means if a down stream job did not commit offset for
> > > some reason, we want to wait for that job. Without the predefined
> > > interested group, it is hard to achieve this.
> > >
> > >
> > > 2. Yes, the configuration should be at topic level and set dynamically.
> > >
> > > Thanks,
> > >
> > > Jiangjie (Becket) Qin
> > >
> > > On Fri, Oct 21, 2016 at 7:40 AM, 东方甲乙 <254479...@qq.com> wrote:
> > >
> > > > Hi Mayuresh,
> > > > Thanks for the reply:
> > > > 1.  In the log retention check schedule, the broker first find the
> all
> > > the
> > > > consumed group which are consuming this topic, and query the commit
> > > offset
> > > > of this consumed group for the topic
> > > > using the OffsetFetch API. And the min commit offset is the minimal
> > > commit
> > > > offset between these commit offsets.
> > > >
> > > >
> > > 

Re: [VOTE] KIP-73 - Replication Quotas

2016-10-21 Thread Joel Koshy
Thanks for catching that and the fix as well. Makes sense to me.

We should consider adding an "amendments" section to KIPs - perhaps just a
link to KAFKA-4313 would suffice in this case.

Thanks,

Joel

On Wed, Oct 19, 2016 at 7:12 PM, Jun Rao <j...@confluent.io> wrote:

> Hi,
>
> While testing KIP-73, we found an issue described in
> https://issues.apache.org/jira/browse/KAFKA-4313. Basically, when there
> are
> mixed high-volume and low-volume partitions, when replication throttling is
> specified, ISRs for those low volume partitions could thrash. KAFKA-4313
> fixes this issue by avoiding throttling those replicas in the throttled
> replica list that are already in sync. Those in-sync replicas traffic will
> still be accounted for the throttled traffic though. Just want to bring
> this up since it slightly changes the behavior described in the KIP. If
> anyone has concerns on this, please comment on the jira.
>
> Thanks,
>
> Jun
>
> On Tue, Aug 23, 2016 at 3:25 PM, Ismael Juma <ism...@juma.me.uk> wrote:
>
> > For the record, there were 4 binding +1s.
> >
> > Ismael
> >
> > On Tue, Aug 23, 2016 at 11:16 PM, Ben Stopford <b...@confluent.io> wrote:
> >
> > > Thanks everyone. It looks like this KIP has now been accepted.
> > >
> > > There is a corresponding PR <https://github.com/apache/kafka/pull/1776
> >
> > > for the implementation also.
> > >
> > > All the best
> > >
> > > B
> > >
> > >
> > > > On 23 Aug 2016, at 22:39, Joel Koshy <jjkosh...@gmail.com> wrote:
> > > >
> > > > +1
> > > > (sent some very minor edits to you off-thread)
> > > >
> > > > On Fri, Aug 19, 2016 at 1:21 AM, Ben Stopford <b...@confluent.io>
> > wrote:
> > > >
> > > >> I’d like to initiate the voting process for KIP-73:
> > > >> https://cwiki.apache.org/confluence/display/KAFKA/KIP-
> > > >> 73+Replication+Quotas <https://cwiki.apache.org/
> > > >> confluence/display/KAFKA/KIP-73+Replication+Quotas>
> > > >>
> > > >> Ben
> > >
> > >
> >
>


[jira] [Resolved] (KAFKA-4250) make ProducerRecord and ConsumerRecord extensible

2016-10-19 Thread Joel Koshy (JIRA)

 [ 
https://issues.apache.org/jira/browse/KAFKA-4250?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Joel Koshy resolved KAFKA-4250.
---
   Resolution: Fixed
 Assignee: radai rosenblatt
 Reviewer: Joel Koshy
Fix Version/s: 0.10.2.0

> make ProducerRecord and ConsumerRecord extensible
> -
>
> Key: KAFKA-4250
> URL: https://issues.apache.org/jira/browse/KAFKA-4250
> Project: Kafka
>  Issue Type: Improvement
>  Components: core
>Affects Versions: 0.10.0.1
>Reporter: radai rosenblatt
>Assignee: radai rosenblatt
> Fix For: 0.10.2.0
>
>
> KafkaProducer and KafkaConsumer implement interfaces are are designed to be 
> extensible (or at least allow it).
> ProducerRecord and ConsumerRecord, however, are final, making extending 
> producer/consumer very difficult.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (KAFKA-1351) String.format is very expensive in Scala

2016-10-17 Thread Joel Koshy (JIRA)

[ 
https://issues.apache.org/jira/browse/KAFKA-1351?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15583851#comment-15583851
 ] 

Joel Koshy commented on KAFKA-1351:
---

Yes - this is still an issue. cc [~nsolis]

> String.format is very expensive in Scala
> 
>
> Key: KAFKA-1351
> URL: https://issues.apache.org/jira/browse/KAFKA-1351
> Project: Kafka
>  Issue Type: Bug
>  Components: core
>Affects Versions: 0.7.2, 0.8.0, 0.8.1
>Reporter: Neha Narkhede
>  Labels: newbie
> Attachments: KAFKA-1351.patch, KAFKA-1351_2014-04-07_18:02:18.patch, 
> KAFKA-1351_2014-04-09_15:40:11.patch
>
>
> As found in KAFKA-1350, logging is causing significant overhead in the 
> performance of a Kafka server. There are several info statements that use 
> String.format which is particularly expensive. We should investigate adding 
> our own version of String.format that merely uses string concatenation under 
> the covers.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


Stream processing meetup at LinkedIn (Sunnyvale) on Wednesday, November 2 at 6pm

2016-10-17 Thread Joel Koshy
Hi everyone,

We would like to invite you to a Stream Processing Meetup at LinkedIn’s
Sunnyvale campus on Wednesday, November 2 at 6pm.

Please RSVP here (if you intend to attend in person):
http://www.meetup.com/Stream-Processing-Meetup-LinkedIn/events/234454163

We have the following three talks scheduled:

   - *Stream Processing using Apache Samza at LinkedIn: Past, Present, and
   Future*
  - *by Kartik Paramasivam* (LinkedIn)
   - *Kafka Cruise Control: Auto Management of the Kafka Clusters*
  - *by Becket (Jiangjie) Qin* (LinkedIn)
   - *Plumber - Intuit's Samza Ecosystem*
  - *by Shekar Tippur* (Intuit)

Hope to see you there!

Joel


[jira] [Resolved] (KAFKA-4025) build fails on windows due to rat target output encoding

2016-10-12 Thread Joel Koshy (JIRA)

 [ 
https://issues.apache.org/jira/browse/KAFKA-4025?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Joel Koshy resolved KAFKA-4025.
---
   Resolution: Fixed
 Assignee: radai rosenblatt
 Reviewer: Joel Koshy
Fix Version/s: 0.10.1.1

> build fails on windows due to rat target output encoding
> 
>
> Key: KAFKA-4025
> URL: https://issues.apache.org/jira/browse/KAFKA-4025
> Project: Kafka
>  Issue Type: Bug
> Environment: windows 7, either regular command prompt or git bash
>Reporter: radai rosenblatt
>Assignee: radai rosenblatt
>Priority: Minor
> Fix For: 0.10.1.1
>
> Attachments: windows build debug output.txt
>
>
> kafka runs a rat report during the build, using [the rat ant report 
> task|http://creadur.apache.org/rat/apache-rat-tasks/report.html], which has 
> no output encoding parameter.
> this means that the resulting xml report is produced using the system-default 
> encoding, which is OS-dependent:
> the rat ant task code instantiates the output writer like so 
> ([org.apache.rat.anttasks.Report.java|http://svn.apache.org/repos/asf/creadur/rat/tags/apache-rat-project-0.11/apache-rat-tasks/src/main/java/org/apache/rat/anttasks/Report.java]
>  line 196):
> {noformat}
> out = new PrintWriter(new FileWriter(reportFile));{noformat}
> which eventually leads to {{Charset.defaultCharset()}} that relies on the 
> file.encoding system property. this causes an issue if the default encoding 
> isnt UTF-8 (which it isnt on windows) as the code called by 
> printUnknownFiles() in rat.gradle defaults to UTF-8 when reading the report 
> xml, causing the build to fail with:
> {noformat}
> com.sun.org.apache.xerces.internal.impl.io.MalformedByteSequenceException: 
> Invalid byte 1 of 1-byte UTF-8 sequence.{noformat}
> (see complete output of {{gradlew --debug --stacktrace rat}} in attached file)



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Updated] (KAFKA-4293) ByteBufferMessageSet.deepIterator burns CPU catching EOFExceptions

2016-10-12 Thread Joel Koshy (JIRA)

 [ 
https://issues.apache.org/jira/browse/KAFKA-4293?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Joel Koshy updated KAFKA-4293:
--
Assignee: radai rosenblatt

It turns out we should be able to handle all of our current codecs by 
re-implementing the {{available()}} method correctly. We would still want to 
continue to catch EOF as a safety net for any future codecs we may add.

> ByteBufferMessageSet.deepIterator burns CPU catching EOFExceptions
> --
>
> Key: KAFKA-4293
> URL: https://issues.apache.org/jira/browse/KAFKA-4293
> Project: Kafka
>  Issue Type: Bug
>  Components: core
>Affects Versions: 0.10.0.1
>Reporter: radai rosenblatt
>Assignee: radai rosenblatt
>
> around line 110:
> {noformat}
> try {
> while (true)
> innerMessageAndOffsets.add(readMessageFromStream(compressed))
> } catch {
> case eofe: EOFException =>
> // we don't do anything at all here, because the finally
> // will close the compressed input stream, and we simply
> // want to return the innerMessageAndOffsets
> {noformat}
> the only indication the code has that the end of the oteration was reached is 
> by catching EOFException (which will be thrown inside 
> readMessageFromStream()).
> profiling runs performed at linkedIn show 10% of the total broker CPU time 
> taken up by Throwable.fillInStack() because of this behaviour.
> unfortunately InputStream.available() cannot be relied upon (concrete example 
> - GZipInputStream will not correctly return 0) so the fix would probably be a 
> wire format change to also encode the number of messages.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Updated] (KAFKA-3175) topic not accessible after deletion even when delete.topic.enable is disabled

2016-10-10 Thread Joel Koshy (JIRA)

 [ 
https://issues.apache.org/jira/browse/KAFKA-3175?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Joel Koshy updated KAFKA-3175:
--
Resolution: Fixed
  Reviewer: Joel Koshy
Status: Resolved  (was: Patch Available)

> topic not accessible after deletion even when delete.topic.enable is disabled
> -
>
> Key: KAFKA-3175
> URL: https://issues.apache.org/jira/browse/KAFKA-3175
> Project: Kafka
>  Issue Type: Bug
>  Components: core
>Affects Versions: 0.9.0.0
>Reporter: Jun Rao
>Assignee: Mayuresh Gharat
> Fix For: 0.10.1.1
>
>
> The can be reproduced with the following steps.
> 1. start ZK and 1 broker (with default delete.topic.enable=false)
> 2. create a topic test
> bin/kafka-topics.sh --zookeeper localhost:2181 --create --topic test 
> --partition 1 --replication-factor 1
> 3. delete topic test
> bin/kafka-topics.sh --zookeeper localhost:2181 --delete --topic test
> 4. restart the broker
> Now topic test still shows up during topic description.
> bin/kafka-topics.sh --zookeeper localhost:2181 --describe
> Topic:testPartitionCount:1ReplicationFactor:1 Configs:
>   Topic: test Partition: 0Leader: 0   Replicas: 0 Isr: 0
> However, one can't produce to this topic any more.
> bin/kafka-console-producer.sh --broker-list localhost:9092 --topic test
> [2016-01-29 17:55:24,527] WARN Error while fetching metadata with correlation 
> id 0 : {test=LEADER_NOT_AVAILABLE} (org.apache.kafka.clients.NetworkClient)
> [2016-01-29 17:55:24,725] WARN Error while fetching metadata with correlation 
> id 1 : {test=LEADER_NOT_AVAILABLE} (org.apache.kafka.clients.NetworkClient)
> [2016-01-29 17:55:24,828] WARN Error while fetching metadata with correlation 
> id 2 : {test=LEADER_NOT_AVAILABLE} (org.apache.kafka.clients.NetworkClient)



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


Re: [DISCUSS] KIP-82 - Add Record Headers

2016-10-07 Thread Joel Koshy
Hi Jay,

Couple of comments inline:

One of things that has helped keep Kafka simple is not adding in new
> abstractions and concepts except when the proposal is really elegant and
> makes things simpler.
>

I don't quite see how this impacts simplicity because (per your taxonomy)
the scope is "company" and "world". So the decision to use it is or not is
really up to companies/individuals.


> Consider three use cases for headers:
>
>1. Kafka-scope: We want to add a feature to Kafka that needs a
>particular field.
>2. Company-scope: You want to add a header to be shared by everyone in
>your company.
>3. World-wide scope: You are building a third party tool and want to add
>some kind of header.
>
> For the case of (1) you should not use headers, you should just add a field
> to the record format.


Agreed - which is what we have been doing so far.


> sense. Occasionally people have complained that adding to the record format
> is hard and it would be nice to just shove lots of things in quickly. I

think a better solution would be to make it easy to add to the record
> format, and I think we've made progress on that. I also think we should be
>

The intent is not to shove things in quickly but to make it possible to
augment the streaming system with infrastructure features such as
audit/call tracing and more without invading the record format for all
users who may not need some of those features.


> earlier proposals. These things end up being long term commitments so it's
> really worth being thoughtful.
>

Which is another reason I think it is helpful to have such headers. If a
certain header type is clearly useful for the vast majority of users then
there is a case for it being integrated directly into the record format.
However, there could always be a large segment of users for whom certain
headers are irrelevant and they should have the ability to opt out of it.

For case (2) just use the body of the message. You don't need a globally
> agreed on definition of headers, just standardize on a header you want to
> include in the value in your company.


This works - but as I described in an earlier email in this thread has
drawbacks.

   1. A global registry of numeric keys is super super ugly. This seems
>silly compared to the Avro (or whatever) header solution which gives
> more
>compact encoding, rich types, etc.
>

Agreed - I would really like us to avoid the burden of being a registrar.

   2. Using byte arrays for header values means they aren't really
>interoperable for case (3). E.g. I can't make a UI that displays
> headers,
>or allow you to set them in config. To work with third party headers,
> the
>only case I think this really helps, you need the union of all
>serialization schemes people have used for any tool.
>

I don't quite see why - the user would need to have the suitable
interceptors in their classpath. Headers that it does not understand are
simply ignored.

   3. For case (2) and (3) your key numbers are going to collide like
>crazy. I don't think a global registry of magic numbers maintained
> either
>by word of mouth or checking in changes to kafka source is the right
> thing
>to do.


Agreed (~ point 1 above)


>4. We are introducing a new serialization primitive which makes fields
>disappear conditional on the contents of other fields. This breaks the
>whole serialization/schema system we have today.
>

I don't quite see why this is so.


>6. This proposes making the ProducerRecord and ConsumerRecord mutable
>and adding setters and getters (which we try to avoid).
>

This is another part of the proposal I don't really like although there are
use cases where it helps.


> For context on LinkedIn: I set up the system there, but it may have changed
> since i left. The header is maintained with the record schemas in the avro
> schema registry and is required for all records. Essentially all messages
>


> Not allowing teams to chose a data format other than avro was considered a
> feature, not a bug, since the whole point was to be able to share data,
> which doesn't work if every team chooses their own format.
>

It is pretty much the same - not much has changed since you left, but see
my earlier comments on this: http://markmail.org/message/3ln5mruxqfhbewgz
The proposal does not mean it empowers applications to use non-Avro for the
data plane. The feature supports a much clearer separation of the user's
data from typically infra-related headers (which could also be Avro-based).

At this point I think we should focus the discussion not on the specifics
(implementation) of the proposal but on the motivation. Email is fine but
it may be better to discuss in a hangout and circle back to the thread.

Thanks,

Joel


> On Thu, Sep 22, 2016 at 12:31 PM, Michael Pearce 
> wrote:
>
> > Hi All,
> >
> >
> > I would like to discuss the following KIP proposal:
> >
> > 

[jira] [Commented] (KAFKA-4254) Questionable handling of unknown partitions in KafkaProducer

2016-10-06 Thread Joel Koshy (JIRA)

[ 
https://issues.apache.org/jira/browse/KAFKA-4254?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15553228#comment-15553228
 ] 

Joel Koshy commented on KAFKA-4254:
---

It seems (2) would still help as there are use-cases which set {{max.block.ms}} 
to zero. So we can refresh metadata but also return a more specific exception.

> Questionable handling of unknown partitions in KafkaProducer
> 
>
> Key: KAFKA-4254
> URL: https://issues.apache.org/jira/browse/KAFKA-4254
> Project: Kafka
>  Issue Type: Bug
>  Components: producer 
>Reporter: Jason Gustafson
>Assignee: Konstantine Karantasis
> Fix For: 0.10.1.1
>
>
> Currently the producer will raise an {{IllegalArgumentException}} if the user 
> attempts to write to a partition which has just been created. This is caused 
> by the fact that the producer does not attempt to refetch topic metadata in 
> this case, which means that its check for partition validity is based on 
> stale metadata.
> If the topic for the partition did not already exist, it works fine because 
> the producer will block until it has metadata for the topic, so this case is 
> primarily hit when the number of partitions is dynamically increased. 
> A couple options to fix this that come to mind:
> 1. We could treat unknown partitions just as we do unknown topics. If the 
> partition doesn't exist, we refetch metadata and try again (timing out when 
> max.block.ms is reached).
> 2. We can at least throw a more specific exception so that users can handle 
> the error. Raising {{IllegalArgumentException}} is not helpful in practice 
> because it can also be caused by other error.s
> My inclination is to do the first one since the producer seems incorrect to 
> tell the user that the partition is invalid.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


Re: [DISCUSS] KIP-82 - Add Record Headers

2016-10-05 Thread Joel Koshy
@Nacho

> > - Brokers can't see the headers (part of the "V" black box)>
>


> (Also, it would be nice if we had a way to access the headers from the
> > brokers, something that is not trivial at this time with the current
> broker
> > architecture).
>
>

I think this can be addressed with broker interceptors which we touched on
in KIP-42

.

@Gwen

You are right that the wrapper thingy “works”, but there are some drawbacks
that Nacho and Radai have covered in detail that I can add a few more
comments to.

At LinkedIn, we *get by* without the proposed Kafka record headers by
dumping such metadata in one or two places:

   - Most of our applications use Avro, so for the most part we can use an
   explicit header field in the Avro schema. Topic owners are supposed to
   include this header in their schemas.
   - A prefix to the payload that primarily contains the schema’s ID so we
   can deserialize the Avro. (We could use this for other use-cases as well -
   i.e., move some of the above into this prefix blob.)

Dumping headers in the Avro schema pollutes the application’s data model
with data/service-infra-related fields that are unrelated to the underlying
topic; and forces the application to deserialize the entire blob whether or
not the headers are actually used. Conversely from an infrastructure
perspective, we would really like to not touch any application data. Our
infiltration of the application’s schema is a major reason why many at
LinkedIn sometimes assume that we (Kafka folks) are the shepherds for all
things Avro :)

Another drawback is that all this only works if everyone in the
organization is a good citizen and includes the header; and uses our
wrapper libraries - which is a good practice IMO - but may not always be
easy for open source projects that wish to directly use the Apache
producer/client. If instead we allow these headers to be inserted via
suitable interceptors outside the application payloads it would remove such
issues of separation in the data model and choice of clients.

Radai has enumerated a number of use-cases

and
I’m sure the broader community will have a lot more to add. The feature as
such would enable an ecosystem of plugins from different vendors that users
can mix and match in their data pipelines without requiring any specific
payload formats or client libraries.

Thanks,

Joel



> >
> >
> > On Wed, Oct 5, 2016 at 2:20 PM, Gwen Shapira  wrote:
> >
> > > Since LinkedIn has some kind of wrapper thingy that adds the headers,
> > > where they could have added them to Apache Kafka - I'm very curious to
> > > hear what drove that decision and the pros/cons of managing the
> > > headers outside Kafka itself.
> > >
>


Re: Snazzy new look to our website

2016-10-04 Thread Joel Koshy
Looks great!

On Tue, Oct 4, 2016 at 4:38 PM, Guozhang Wang  wrote:

> The new look is great, thanks Derrick and Gwen!
>
> I'm wondering if we should still consider breaking "document.html" into
> multiple pages and indexed as sub-topics on the left bar?
>
>
> Guozhang
>
>
> On Tue, Oct 4, 2016 at 4:13 PM, Gwen Shapira  wrote:
>
> > Hi Team Kafka,
> >
> > I just merged PR 20 to our website - which gives it a new (and IMO
> > pretty snazzy) look and feel. Thanks to Derrick Or for contributing
> > the update.
> >
> > I had to do a hard-refresh (shift-f5 on my mac) to get the new look to
> > load properly - so if stuff looks off, try it.
> >
> > Comments and contributions to the site are welcome.
> >
> > Gwen
> >
>
>
>
> --
> -- Guozhang
>


Re: [DISCUSS] Fault injection tests for Kafka

2016-10-04 Thread Joel Koshy
Hi Gwen,

I've also seen suggestions of using Jepsen for fault injection, but
> I'm not familiar with this framework.
>
> What do you guys think? Write our own failure injection? or write
> Kafka tests in Jepsen?
>

This would definitely add a lot of value and save a lot on release
validation overheads. I have heard of Jepsen (via the blog), but haven't
used it. At LinkedIn a couple of infra teams have been using Simoorg
 which being python-based would
perhaps be easier to use for system test writers than Clojure (under
Jepsen). The Ambry  project at LinkedIn
uses it extensively (and I think has added several more failure scenarios
which don't seem to be reflected in the github repo). Anyway, I think we
should at least enumerate what we want to test and evaluate the
alternatives before reinventing.

Thanks,

Joel


[jira] [Commented] (KAFKA-4207) Partitions stopped after a rapid restart of a broker

2016-09-26 Thread Joel Koshy (JIRA)

[ 
https://issues.apache.org/jira/browse/KAFKA-4207?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15524443#comment-15524443
 ] 

Joel Koshy commented on KAFKA-4207:
---

I have a KIP draft that has been sitting around for a while. I should be able 
to clean that up and send it out within the next week or so.

> Partitions stopped after a rapid restart of a broker
> 
>
> Key: KAFKA-4207
> URL: https://issues.apache.org/jira/browse/KAFKA-4207
> Project: Kafka
>  Issue Type: Bug
>  Components: controller
>Affects Versions: 0.9.0.1, 0.10.0.1
>Reporter: Dustin Cote
>
> Environment:
> 4 Kafka brokers
> 10,000 topics with one partition each, replication factor 3
> Partitions with 4KB data each
> No data being produced or consumed
> Scenario:
> Initiate controlled shutdown on one broker
> Interrupt controlled shutdown prior completion with a SIGKILL
> Start a new broker with the same broker ID as broker that was just killed 
> immediately
> Symptoms:
> After starting the new broker, the other three brokers in the cluster will 
> see under replicated partitions forever for some partitions that are hosted 
> on the broker that was killed and restarted
> Cause:
> Today, the controller sends a StopReplica command for each replica hosted on 
> a broker that has initiated a controlled shutdown.  For a large number of 
> replicas this can take awhile.  When the broker that is doing the controlled 
> shutdown is killed, the StopReplica commands are queued up even though the 
> request queue to the broker is cleared.  When the broker comes back online, 
> the StopReplica commands that were queued, get sent to the broker that just 
> started up.  
> CC: [~junrao] since he's familiar with the scenario seen here



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (KAFKA-4178) Replication Throttling: Consolidate Rate Classes

2016-09-21 Thread Joel Koshy (JIRA)

[ 
https://issues.apache.org/jira/browse/KAFKA-4178?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15511604#comment-15511604
 ] 

Joel Koshy commented on KAFKA-4178:
---

[~junrao] sorry I don't remember, but you might :) It appears the change was a 
follow-up to one of your [review comments on 
KAFKA-2084|https://reviews.apache.org/r/33049/#comment150934] Let me know if 
you need help with revisiting that discussion - it has been a while since I 
have looked at that code.

> Replication Throttling: Consolidate Rate Classes
> 
>
> Key: KAFKA-4178
> URL: https://issues.apache.org/jira/browse/KAFKA-4178
> Project: Kafka
>  Issue Type: Improvement
>  Components: replication
>Affects Versions: 0.10.1.0
>Reporter: Ben Stopford
>
> Replication throttling is using a different implementation of Rate to client 
> throttling (Rate & SimpleRate). These should be consolidated so both use the 
> same approach. 



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Resolved] (KAFKA-4158) Reset quota to default value if quota override is deleted for a given clientId

2016-09-13 Thread Joel Koshy (JIRA)

 [ 
https://issues.apache.org/jira/browse/KAFKA-4158?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Joel Koshy resolved KAFKA-4158.
---
Resolution: Fixed

Pushed to trunk.
Not sure if we need this in 0.9 since there is an easy work-around for it 
(i.e., override to the default and then delete) and we are going to release 
0.10.1 soon

> Reset quota to default value if quota override is deleted for a given clientId
> --
>
> Key: KAFKA-4158
> URL: https://issues.apache.org/jira/browse/KAFKA-4158
> Project: Kafka
>  Issue Type: Bug
>Reporter: Dong Lin
>Assignee: Dong Lin
>Priority: Critical
> Fix For: 0.10.1.0
>
>
> If user set a quota override and delete it override, Kafka will still use the 
> quota override. But this is wrong. Kafka should use default quota if the 
> override is deleted.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Updated] (KAFKA-4158) Reset quota to default value if quota override is deleted for a given clientId

2016-09-13 Thread Joel Koshy (JIRA)

 [ 
https://issues.apache.org/jira/browse/KAFKA-4158?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Joel Koshy updated KAFKA-4158:
--
Fix Version/s: 0.10.1.0

> Reset quota to default value if quota override is deleted for a given clientId
> --
>
> Key: KAFKA-4158
> URL: https://issues.apache.org/jira/browse/KAFKA-4158
> Project: Kafka
>  Issue Type: Bug
>Reporter: Dong Lin
>Assignee: Dong Lin
>Priority: Critical
> Fix For: 0.10.1.0
>
>
> If user set a quota override and delete it override, Kafka will still use the 
> quota override. But this is wrong. Kafka should use default quota if the 
> override is deleted.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


Re: [VOTE] 0.10.1 Release Plan

2016-09-13 Thread Joel Koshy
+1

On Tue, Sep 13, 2016 at 2:40 PM, Jason Gustafson  wrote:

> Hi All,
>
> I'd like to start a vote on the release plan documented here:
> https://cwiki.apache.org/confluence/display/KAFKA/Release+Plan+0.10.1. I
> went ahead and included KIP-55 since Jun said it may have a chance, but
> note that in-progress KIPs will only be included if they are ready by the
> feature freeze date (Sep. 17). We'll evaluate KIP-79 once we see Becket's
> patch.
>
> Thanks,
> Jason
>


[jira] [Commented] (KAFKA-4074) Deleting a topic can make it unavailable even if delete.topic.enable is false

2016-09-07 Thread Joel Koshy (JIRA)

[ 
https://issues.apache.org/jira/browse/KAFKA-4074?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15471686#comment-15471686
 ] 

Joel Koshy commented on KAFKA-4074:
---

Yes - totally missed KAFKA-3175

> Deleting a topic can make it unavailable even if delete.topic.enable is false
> -
>
> Key: KAFKA-4074
> URL: https://issues.apache.org/jira/browse/KAFKA-4074
> Project: Kafka
>  Issue Type: Bug
>  Components: controller
>    Reporter: Joel Koshy
>Assignee: Manikumar Reddy
> Fix For: 0.10.1.0
>
>
> The {{delete.topic.enable}} configuration does not completely block the 
> effects of delete topic since the controller may (indirectly) query the list 
> of topics under the delete-topic znode.
> To reproduce:
> * Delete topic X
> * Force a controller move (either by bouncing or removing the controller 
> znode)
> * The new controller will send out UpdateMetadataRequests with leader=-2 for 
> the partitions of X
> * Producers eventually stop producing to that topic
> The reason for this is that when ControllerChannelManager adds 
> UpdateMetadataRequests for brokers, we directly use the partitionsToBeDeleted 
> field of the DeleteTopicManager (which is set to the partitions of the topics 
> under the delete-topic znode on controller startup).
> In order to get out of the situation you have to remove X from the znode and 
> then force another controller move.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


Re: [VOTE] KIP-73 - Replication Quotas

2016-08-23 Thread Joel Koshy
+1
(sent some very minor edits to you off-thread)

On Fri, Aug 19, 2016 at 1:21 AM, Ben Stopford  wrote:

> I’d like to initiate the voting process for KIP-73:
> https://cwiki.apache.org/confluence/display/KAFKA/KIP-
> 73+Replication+Quotas  confluence/display/KAFKA/KIP-73+Replication+Quotas>
>
> Ben


[jira] [Created] (KAFKA-4074) Deleting a topic can make it unavailable even if delete.topic.enable is false

2016-08-22 Thread Joel Koshy (JIRA)
Joel Koshy created KAFKA-4074:
-

 Summary: Deleting a topic can make it unavailable even if 
delete.topic.enable is false
 Key: KAFKA-4074
 URL: https://issues.apache.org/jira/browse/KAFKA-4074
 Project: Kafka
  Issue Type: Bug
  Components: controller
Reporter: Joel Koshy
 Fix For: 0.10.1.0


The {{delete.topic.enable}} configuration does not completely block the effects 
of delete topic since the controller may (indirectly) query the list of topics 
under the delete-topic znode.

To reproduce:
* Delete topic X
* Force a controller move (either by bouncing or removing the controller znode)
* The new controller will send out UpdateMetadataRequests with leader=-2 for 
the partitions of X
* Producers eventually stop producing to that topic

The reason for this is that when ControllerChannelManager adds 
UpdateMetadataRequests for brokers, we directly use the partitionsToBeDeleted 
field of the DeleteTopicManager (which is set to the partitions of the topics 
under the delete-topic znode on controller startup).

In order to get out of the situation you have to remove X from the znode and 
then force another controller move.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Resolved] (KAFKA-4050) Allow configuration of the PRNG used for SSL

2016-08-19 Thread Joel Koshy (JIRA)

 [ 
https://issues.apache.org/jira/browse/KAFKA-4050?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Joel Koshy resolved KAFKA-4050.
---
   Resolution: Fixed
Fix Version/s: 0.10.0.2
   0.10.1.0

Pushed to trunk and 0.10.0

> Allow configuration of the PRNG used for SSL
> 
>
> Key: KAFKA-4050
> URL: https://issues.apache.org/jira/browse/KAFKA-4050
> Project: Kafka
>  Issue Type: Improvement
>  Components: security
>Affects Versions: 0.10.0.1
>Reporter: Todd Palino
>Assignee: Todd Palino
>  Labels: security, ssl
> Fix For: 0.10.1.0, 0.10.0.2
>
>
> This change will make the pseudo-random number generator (PRNG) 
> implementation used by the SSLContext configurable. The configuration is not 
> required, and the default is to use whatever the default PRNG for the JDK/JRE 
> is. Providing a string, such as "SHA1PRNG", will cause that specific 
> SecureRandom implementation to get passed to the SSLContext.
> When enabling inter-broker SSL in our certification cluster, we observed 
> severe performance issues. For reference, this cluster can take up to 600 
> MB/sec of inbound produce traffic over SSL, with RF=2, before it gets close 
> to saturation, and the mirror maker normally produces about 400 MB/sec 
> (unless it is lagging). When we enabled inter-broker SSL, we saw persistent 
> replication problems in the cluster at any inbound rate of more than about 6 
> or 7 MB/sec per-broker. This was narrowed down to all the network threads 
> blocking on a single lock in the SecureRandom code.
> It turns out that the default PRNG implementation on Linux is NativePRNG. 
> This uses randomness from /dev/urandom (which, by itself, is a non-blocking 
> read) and mixes it with randomness from SHA1. The problem is that the entire 
> application shares a single SecureRandom instance, and NativePRNG has a 
> global lock within the implNextBytes method. Switching to another 
> implementation (SHA1PRNG, which has better performance characteristics and is 
> still considered secure) completely eliminated the bottleneck and allowed the 
> cluster to work properly at saturation.
> The SSLContext initialization has an optional argument to provide a 
> SecureRandom instance, which the code currently sets to null. This change 
> creates a new config to specify an implementation, and instantiates that and 
> passes it to SSLContext if provided. This will also let someone select a 
> stronger source of randomness (obviously at a performance cost) if desired.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


Re: [DISCUSS] KIP-73: Replication Quotas

2016-08-18 Thread Joel Koshy
> For your first comment. We thought about determining "effect" replicas
> automatically as well. First, there are some tricky stuff that one has to
>

Auto-detection of effect traffic: i'm fairly certain it's doable but
definitely tricky. I'm also not sure it is something worth tackling at the
outset. If we want to spend more time thinking over it even if it's just an
academic exercise I would be happy to brainstorm offline.


> For your second comment, we discussed that in the client quotas design. A
> down side of that for client quotas is that a client may be surprised that
> its traffic is not throttled at one time, but throttled as another with the
> same quota (basically, less predicability). You can imaging setting a quota
> for all replication traffic and only slow down the "effect" replicas if
> needed. The thought is more or less the same as the above. It requires more
>

For clients, this is true. I think this is much less of an issue for
server-side replication since the "users" here are the Kafka SREs who
generally know these internal details.

I think it would be valuable to get some feedback from SREs on the proposal
before proceeding to a vote. (ping Todd)

Joel


>
> On Thu, Aug 18, 2016 at 9:37 AM, Ben Stopford <b...@confluent.io> wrote:
>
> > Hi Joel
> >
> > Ha! yes we had some similar thoughts, on both counts. Both are actually
> > good approaches, but come with some extra complexity.
> >
> > Segregating the replication type is tempting as it creates a more general
> > solution. One issue is you need to draw a line between lagging and not
> > lagging. The ISR ‘limit' is a tempting divider, but has the side effect
> > that, once you drop out you get immediately throttled. Adding a
> > configurable divider is another option, but difficult for admins to set,
> > and always a little arbitrary. A better idea is to prioritise, in reverse
> > order to lag. But that also comes with additional complexity of its own.
> >
> > Under throttling is also a tempting addition. That’s to say, if there’s
> > idle bandwidth lying around, not being used, why not use it to let
> lagging
> > brokers catch up. This involves some comparison to the maximum bandwidth,
> > which could be configurable, or could be derived, with pros and cons for
> > each.
> >
> > But the more general problem is actually quite hard to reason about, so
> > after some discussion we decided to settle on something simple, that we
> > felt we could get working, and extend to add these additional features as
> > subsequent KIPs.
> >
> > I hope that seems reasonable. Jun may wish to add to this.
> >
> > B
> >
> >
> > > On 18 Aug 2016, at 06:56, Joel Koshy <jjkosh...@gmail.com> wrote:
> > >
> > > On Wed, Aug 17, 2016 at 9:13 PM, Ben Stopford <b...@confluent.io>
> wrote:
> > >
> > >>
> > >> Let's us know if you have any further thoughts on KIP-73, else we'll
> > kick
> > >> off a vote.
> > >>
> > >
> > > I think the mechanism for throttling replicas looks good. Just had a
> few
> > > more thoughts on the configuration section. What you have looks
> > reasonable,
> > > but I was wondering if it could be made simpler. You probably thought
> > > through these, so I'm curious to know your take.
> > >
> > > My guess is that most of the time, users would want to throttle all
> > effect
> > > replication - due to partition reassignments, adding brokers or a
> broker
> > > coming back online after an extended period of time. In all these
> > scenarios
> > > it may be possible to distinguish bootstrap (effect) vs normal
> > replication
> > > - based on how far the replica has to catch up. I'm wondering if it is
> > > enough to just set an umbrella "effect" replication quota with perhaps
> > > per-topic overrides (say if some topics are more important than others)
> > as
> > > opposed to designating throttled replicas.
> > >
> > > Also, IIRC during client-side quota discussions we had considered the
> > > possibility of allowing clients to go above their quotas when resources
> > are
> > > available. We ended up not doing that, but for replication throttling
> it
> > > may make sense - i.e., to treat the quota as a soft limit. Another way
> to
> > > look at it is instead of ensuring "effect replication traffic does not
> > flow
> > > faster than X bytes/sec" it may be useful to instead ensure that
> "effect
> > > replicatio

Re: [DISCUSS] KIP-73: Replication Quotas

2016-08-17 Thread Joel Koshy
On Wed, Aug 17, 2016 at 9:13 PM, Ben Stopford <b...@confluent.io> wrote:

>
> Let's us know if you have any further thoughts on KIP-73, else we'll kick
> off a vote.
>

I think the mechanism for throttling replicas looks good. Just had a few
more thoughts on the configuration section. What you have looks reasonable,
but I was wondering if it could be made simpler. You probably thought
through these, so I'm curious to know your take.

My guess is that most of the time, users would want to throttle all effect
replication - due to partition reassignments, adding brokers or a broker
coming back online after an extended period of time. In all these scenarios
it may be possible to distinguish bootstrap (effect) vs normal replication
- based on how far the replica has to catch up. I'm wondering if it is
enough to just set an umbrella "effect" replication quota with perhaps
per-topic overrides (say if some topics are more important than others) as
opposed to designating throttled replicas.

Also, IIRC during client-side quota discussions we had considered the
possibility of allowing clients to go above their quotas when resources are
available. We ended up not doing that, but for replication throttling it
may make sense - i.e., to treat the quota as a soft limit. Another way to
look at it is instead of ensuring "effect replication traffic does not flow
faster than X bytes/sec" it may be useful to instead ensure that "effect
replication traffic only flows as slowly as necessary (so as not to
adversely affect normal replication traffic)."

Thanks,

Joel

> >
> > > On Thu, Aug 11, 2016 at 2:43 PM, Jun Rao <j...@confluent.io
> > <javascript:;>> wrote:
> > >
> > > > Hi, Joel,
> > > >
> > > > Yes, the response size includes both throttled and unthrottled
> > replicas.
> > > > However, the response is only delayed up to max.wait if the response
> > size
> > > > is less than min.bytes, which matches the current behavior. So, there
> > is
> > > no
> > > > extra delay to due throttling, right? For replica fetchers, the
> default
> > > > min.byte is 1. So, the response is only delayed if there is no byte
> in
> > > the
> > > > response, which is what we want.
> > > >
> > > > Thanks,
> > > >
> > > > Jun
> > > >
> > > > On Thu, Aug 11, 2016 at 11:53 AM, Joel Koshy <jjkosh...@gmail.com
> > <javascript:;>>
> > > wrote:
> > > >
> > > > > Hi Jun,
> > > > >
> > > > > I'm not sure that would work unless we have separate replica
> > fetchers,
> > > > > since this would cause all replicas (including ones that are not
> > > > throttled)
> > > > > to get delayed. Instead, we could just have the leader populate the
> > > > > throttle-time field of the response as a hint to the follower as to
> > how
> > > > > long it should wait before it adds those replicas back to its
> > > subsequent
> > > > > replica fetch requests.
> > > > >
> > > > > Thanks,
> > > > >
> > > > > Joel
> > > > >
> > > > > On Thu, Aug 11, 2016 at 9:50 AM, Jun Rao <j...@confluent.io
> > <javascript:;>> wrote:
> > > > >
> > > > > > Mayuresh,
> > > > > >
> > > > > > That's a good question. I think if the response size (after
> leader
> > > > > > throttling) is smaller than min.bytes, we will just delay the
> > sending
> > > > of
> > > > > > the response up to max.wait as we do now. This should prevent
> > > frequent
> > > > > > empty responses to the follower.
> > > > > >
> > > > > > Thanks,
> > > > > >
> > > > > > Jun
> > > > > >
> > > > > > On Wed, Aug 10, 2016 at 9:17 PM, Mayuresh Gharat <
> > > > > > gharatmayures...@gmail.com <javascript:;>
> > > > > > > wrote:
> > > > > >
> > > > > > > This might have been answered before.
> > > > > > > I was wondering when the leader quota is reached and it sends
> > empty
> > > > > > > response ( If the inclusion of a partition, listed in the
> > leader's
> > > > > > > throttled-replicas list, causes the LeaderQuotaRate to be
> > exceeded,
> > > > > that
> > > > > > > partition is o

Re: [ANNOUNCE] New Kafka PMC member Gwen Shapira

2016-08-17 Thread Joel Koshy
Congrats!

On Wed, Aug 17, 2016 at 9:28 PM, Sriram Subramanian 
wrote:

> Congratulations Gwen!
>
> On Wed, Aug 17, 2016 at 9:24 PM, Neha Narkhede  wrote:
>
> > Congratulations and welcome, Gwen!
> > On Wed, Aug 17, 2016 at 6:44 PM Jun Rao  wrote:
> >
> > > Hi, Everyone,
> > >
> > > Gwen Shapira has been active in the Kafka community since she became a
> > > Kafka committer
> > > about a year ago. I am glad to announce that Gwen is now a member of
> > Kafka
> > > PMC.
> > >
> > > Congratulations, Gwen!
> > >
> > > Jun
> > >
> > --
> > Thanks,
> > Neha
> >
>


[jira] [Commented] (KAFKA-4050) Allow configuration of the PRNG used for SSL

2016-08-17 Thread Joel Koshy (JIRA)

[ 
https://issues.apache.org/jira/browse/KAFKA-4050?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15425906#comment-15425906
 ] 

Joel Koshy commented on KAFKA-4050:
---

[~toddpalino] I had left a comment about this on the PR - one option is to 
default to SHA1PRNG and fall-back to null on NoSuchAlgorithmException

> Allow configuration of the PRNG used for SSL
> 
>
> Key: KAFKA-4050
> URL: https://issues.apache.org/jira/browse/KAFKA-4050
> Project: Kafka
>  Issue Type: Improvement
>  Components: security
>Affects Versions: 0.10.0.1
>Reporter: Todd Palino
>Assignee: Todd Palino
>  Labels: security, ssl
>
> This change will make the pseudo-random number generator (PRNG) 
> implementation used by the SSLContext configurable. The configuration is not 
> required, and the default is to use whatever the default PRNG for the JDK/JRE 
> is. Providing a string, such as "SHA1PRNG", will cause that specific 
> SecureRandom implementation to get passed to the SSLContext.
> When enabling inter-broker SSL in our certification cluster, we observed 
> severe performance issues. For reference, this cluster can take up to 600 
> MB/sec of inbound produce traffic over SSL, with RF=2, before it gets close 
> to saturation, and the mirror maker normally produces about 400 MB/sec 
> (unless it is lagging). When we enabled inter-broker SSL, we saw persistent 
> replication problems in the cluster at any inbound rate of more than about 6 
> or 7 MB/sec per-broker. This was narrowed down to all the network threads 
> blocking on a single lock in the SecureRandom code.
> It turns out that the default PRNG implementation on Linux is NativePRNG. 
> This uses randomness from /dev/urandom (which, by itself, is a non-blocking 
> read) and mixes it with randomness from SHA1. The problem is that the entire 
> application shares a single SecureRandom instance, and NativePRNG has a 
> global lock within the implNextBytes method. Switching to another 
> implementation (SHA1PRNG, which has better performance characteristics and is 
> still considered secure) completely eliminated the bottleneck and allowed the 
> cluster to work properly at saturation.
> The SSLContext initialization has an optional argument to provide a 
> SecureRandom instance, which the code currently sets to null. This change 
> creates a new config to specify an implementation, and instantiates that and 
> passes it to SSLContext if provided. This will also let someone select a 
> stronger source of randomness (obviously at a performance cost) if desired.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (KAFKA-4050) Allow configuration of the PRNG used for SSL

2016-08-16 Thread Joel Koshy (JIRA)

[ 
https://issues.apache.org/jira/browse/KAFKA-4050?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15423839#comment-15423839
 ] 

Joel Koshy commented on KAFKA-4050:
---

A stack trace should help further clarify. (This is from a thread dump that 
Todd shared with us offline). Thanks [~toddpalino] and [~mgharat] for finding 
this.

{noformat}
"kafka-network-thread-1393-SSL-30" #114 prio=5 os_prio=0 tid=0x7f2ec8c30800 
nid=0x5c1e waiting for monitor entry [0x7f213b8f9000]
   java.lang.Thread.State: BLOCKED (on object monitor)
at 
sun.security.provider.NativePRNG$RandomIO.implNextBytes(NativePRNG.java:481)
- waiting to lock <0x000641508bf8> (a java.lang.Object)
at 
sun.security.provider.NativePRNG$RandomIO.access$400(NativePRNG.java:329)
at sun.security.provider.NativePRNG.engineNextBytes(NativePRNG.java:218)
at java.security.SecureRandom.nextBytes(SecureRandom.java:468)
- locked <0x00066aad9880> (a java.security.SecureRandom)
at sun.security.ssl.CipherBox.createExplicitNonce(CipherBox.java:1015)
at 
sun.security.ssl.EngineOutputRecord.write(EngineOutputRecord.java:287)
at 
sun.security.ssl.EngineOutputRecord.write(EngineOutputRecord.java:225)
at sun.security.ssl.EngineWriter.writeRecord(EngineWriter.java:186)
- locked <0x000671c5c978> (a sun.security.ssl.EngineWriter)
at sun.security.ssl.SSLEngineImpl.writeRecord(SSLEngineImpl.java:1300)
at 
sun.security.ssl.SSLEngineImpl.writeAppRecord(SSLEngineImpl.java:1271)
- locked <0x000671ce7170> (a java.lang.Object)
at sun.security.ssl.SSLEngineImpl.wrap(SSLEngineImpl.java:1186)
- locked <0x000671ce7150> (a java.lang.Object)
at javax.net.ssl.SSLEngine.wrap(SSLEngine.java:469)
at org.apache.kafka.common.network.SslTransportLayer.write(p.java:557)
at kafka.api.TopicDataSend.writeTo(FetchResponse.scala:146)
at org.apache.kafka.common.network.MultiSend.writeTo(MultiSend.java:81)
at kafka.api.FetchResponseSend.writeTo(FetchResponse.scala:292)
at 
org.apache.kafka.common.network.KafkaChannel.send(KafkaChannel.java:158)
at 
org.apache.kafka.common.network.KafkaChannel.write(KafkaChannel.java:146)
at 
org.apache.kafka.common.network.Selector.pollSelectionKeys(Selector.java:329)
at org.apache.kafka.common.network.Selector.poll(Selector.java:283)
at kafka.network.Processor.poll(SocketServer.scala:472)
at kafka.network.Processor.run(SocketServer.scala:412)
at java.lang.Thread.run(Thread.java:745)
{noformat}

Of note is that all of the network threads are waiting on the same NativePRNG 
lock (0x000641508bf8)

> Allow configuration of the PRNG used for SSL
> 
>
> Key: KAFKA-4050
> URL: https://issues.apache.org/jira/browse/KAFKA-4050
> Project: Kafka
>  Issue Type: Improvement
>  Components: security
>Affects Versions: 0.10.0.1
>Reporter: Todd Palino
>Assignee: Todd Palino
>  Labels: security, ssl
>
> This change will make the pseudo-random number generator (PRNG) 
> implementation used by the SSLContext configurable. The configuration is not 
> required, and the default is to use whatever the default PRNG for the JDK/JRE 
> is. Providing a string, such as "SHA1PRNG", will cause that specific 
> SecureRandom implementation to get passed to the SSLContext.
> When enabling inter-broker SSL in our certification cluster, we observed 
> severe performance issues. For reference, this cluster can take up to 600 
> MB/sec of inbound produce traffic over SSL, with RF=2, before it gets close 
> to saturation, and the mirror maker normally produces about 400 MB/sec 
> (unless it is lagging). When we enabled inter-broker SSL, we saw persistent 
> replication problems in the cluster at any inbound rate of more than about 6 
> or 7 MB/sec per-broker. This was narrowed down to all the network threads 
> blocking on a single lock in the SecureRandom code.
> It turns out that the default PRNG implementation on Linux is NativePRNG. 
> This uses randomness from /dev/urandom (which, by itself, is a non-blocking 
> read) and mixes it with randomness from SHA1. The problem is that the entire 
> application shares a single SecureRandom instance, and NativePRNG has a 
> global lock within the implNextBytes method. Switching to another 
> implementation (SHA1PRNG, which has better performance characteristics and is 
> still considered secure) completely eliminated the bottleneck and allowed the 
> cluster to work properly at saturation.
> The SSLContext initialization has an optional argument to pr

Re: [VOTE] KIP:71 Enable log compaction and deletion to co-exist

2016-08-15 Thread Joel Koshy
+1

On Mon, Aug 15, 2016 at 4:58 PM, Ewen Cheslack-Postava 
wrote:

> +1 (binding)
>
> Thanks,
> -Ewen
>
> On Mon, Aug 15, 2016 at 4:26 PM, Jun Rao  wrote:
>
> > Thanks for the proposal. +1
> >
> > Jun
> >
> > On Mon, Aug 15, 2016 at 6:20 AM, Damian Guy 
> wrote:
> >
> > > I would like to initiate the voting process for KIP-71 (
> > > https://cwiki.apache.org/confluence/display/KAFKA/KIP-
> > > 71%3A+Enable+log+compaction+and+deletion+to+co-exist
> > > ).
> > >
> > > This change will add a new cleanup.policy, compact_and_delete, that
> when
> > > enabled will run both compaction and deletion.
> > >
> > > Thanks,
> > > Damian
> > >
> >
>
>
>
> --
> Thanks,
> Ewen
>


Re: [DISCUSS] KIP-71 Enable log compaction and deletion to co-exist

2016-08-15 Thread Joel Koshy
Thanks for the proposal. I'm +1 overall with a thought somewhat related to
Jun's comments.

While there may not yet be a sensible use case for it, it should be (in
theory) legal to have compact_and_delete with size based retention as well.
I'm just wondering if it makes sense to allow specifying multiple
comma-separated policies "compact,delete" as opposed to
"compact_and_delete" or "x_and_y_and_z" or "y_and_z" if we ever come up
with more policies. The order could potentially indicate precedence.
Anyway, it is just a thought - it may end up being very confusing for users.

@Jason - I agree this could be used to handle offset expiration as well. We
can discuss that separately though; and if we do that we would want to also
deprecate the retention field in the commit requests.

Joel

On Mon, Aug 15, 2016 at 2:07 AM, Damian Guy  wrote:

> Thanks Jason.
> The log retention.ms will be set to a value that greater than the window
> retention time. So as windows expire, they eventually get cleaned up by the
> broker. It doesn't matter if old windows are around for sometime beyond
> their usefulness, more that they do eventually get removed and the log
> doesn't grow indefinitely (as it does now).
>
> Damian
>
> On Fri, 12 Aug 2016 at 20:25 Jason Gustafson  wrote:
>
> > Hey Damian,
> >
> > That's true, but it would avoid the need to write tombstones for the
> > expired offsets I guess. I'm actually not sure it's a great idea anyway.
> > One thing we've talked about is not expiring any offsets as long as a
> group
> > is alive, which would require some custom expiration logic. There's also
> > the fact that we'd risk expiring group metadata which is stored in the
> same
> > log. Having a builtin expiration mechanism may be more useful for the
> > compacted topics we maintain in Connect, but I think there too we might
> > need some custom logic. For example, expiring connector configs purely
> > based on time doesn't seem like what we'd want.
> >
> > By the way, I wonder if you could describe the expected usage in a little
> > more detail in the KIP for those of us who are not as familiar with Kafka
> > Streams. Is the intention to have the log retain only the most recent
> > window? In that case, would you always set the log retention time to the
> > window length? And I suppose a consumer would do a seek to the start of
> the
> > window (as soon as KIP-33 is available) and consume from there in order
> to
> > read the current state?
> >
> > Thanks,
> > Jason
> >
> > On Fri, Aug 12, 2016 at 8:48 AM, Damian Guy 
> wrote:
> >
> > > Thanks Jun
> > >
> > > On Fri, 12 Aug 2016 at 16:41 Jun Rao  wrote:
> > >
> > > > Hi, Damian,
> > > >
> > > > I was just wondering if we should disable size-based retention in the
> > > > compact_and_delete mode. So, it sounds like that there could be a use
> > > case
> > > > for that. Since by default, the size-based retention is infinite, I
> > think
> > > > we can just leave the KIP as it is.
> > > >
> > > > Thanks,
> > > >
> > > > Jun
> > > >
> > > > On Fri, Aug 12, 2016 at 12:10 AM, Damian Guy 
> > > wrote:
> > > >
> > > > > Hi,
> > > > >
> > > > > The only concrete example i can think of is a case for limiting
> disk
> > > > usage.
> > > > > Say, i had something like Connect running that was tracking changes
> > in
> > > a
> > > > > database. Downstream i don't really care about every change, i just
> > > want
> > > > > the latest values, so compaction could be enabled. However, the
> kafka
> > > > > cluster has limited disk space so we need to limit the size of each
> > > > > partition.
> > > > > In a previous life i have done the same, just without compaction
> > turned
> > > > on.
> > > > >
> > > > > Besides, i don't think it costs us anything in terms of added
> > > complexity
> > > > to
> > > > > enable it for time & size based retention - the code already does
> > this
> > > > for
> > > > > us.
> > > > >
> > > > > Thanks,
> > > > > Damian
> > > > >
> > > > > On Fri, 12 Aug 2016 at 05:30 Neha Narkhede 
> > wrote:
> > > > >
> > > > > > Jun,
> > > > > >
> > > > > > The motivation for this KIP is to handle joins and windows in
> Kafka
> > > > > > streams better and since Streams supports time-based windows, the
> > KIP
> > > > > > suggests combining time-based deletion and compaction.
> > > > > >
> > > > > > It might make sense to do the same for size-based windows, but
> can
> > > you
> > > > > > think of a concrete use case? If not, perhaps we can come back to
> > it.
> > > > > > On Thu, Aug 11, 2016 at 3:08 PM Jun Rao 
> wrote:
> > > > > >
> > > > > >> Hi, Damian,
> > > > > >>
> > > > > >> Thanks for the proposal. It makes sense to use time-based
> deletion
> > > > > >> retention and compaction together, as you mentioned in the
> > KStream.
> > > > > >>
> > > > > >> Is there a use case where we want to combine size-based deletion
> > > > 

  1   2   3   4   5   6   7   8   9   10   >