Streams Processing meetup on Wednesday, February 5, 2020 at LinkedIn, Sunnyvale

2020-01-27 Thread Joel Koshy
*[bcc: (users,dev)@kafka.apache.org ]*

Hello,

The Streams Infra team invites you to attend the Streams Processing meetup
to be held on Wednesday, February 5, 2020. This meetup will focus on Apache
Kafka, Apache Samza and related streaming technologies.

*Where*: Unify Conference room, 950 W Maude Ave, Sunnyvale

*When*: 5:00 - 8:00 PM

*RSVP*: Please RSVP here (only if attending in person)
https://www.meetup.com/Stream-Processing-Meetup-LinkedIn/events/267283444/
A streaming link will be posted approximately 30 minutes prior to the event.

*Agenda:*

 - 5:00 PM: Doors open and catered food available

5:00 - 6:00 PM: Networking

6:00 - 6:30 PM:
*High-performance data replication at Salesforce with MirusPaul Davidson,
Salesforce*

At Salesforce we manage high-volume Apache Kafka clusters in a growing
number of data centers around the globe. In the past we relied on Kafka's
Mirror Maker tool for cross-data center replication but, as the volume and
variety of data increased, we needed a new solution to maintain a high
standard of service reliability. In this talk, we will describe Mirus, our
open-source data replication tool based on Kafka Connect. Mirus was
designed for reliable, high-performance data replication at scale. It
successfully replaced MirrorMaker at Salesforce and has now been running
reliably in production for more than a year. We will give an overview of
the Mirus design and discuss the lessons we learned deploying, tuning, and
operating Mirus in a high-volume production environment.

6:30 - 7:00 PM: *Defending users from Abuse using Stream Processing at
LinkedIn, Bhargav Golla, LinkedIn *

When there are more than half a billion users, how can one effectively,
reliably and scalably classify them as good and bad users? This talk will
highlight how the Anti-Abuse team at LinkedIn leverages Streams Processing
technologies like Samza and Brooklin to keep the good users in a trusted
environment devoid of bad actors.

7:00 - 7:30 PM: *Enabling Mission-critical Stateful Stream Processing with
Samza, Ray Manpreet Singh Matharu, LinkedIn*

Samza powers a variety of large-scale business-critical stateful stream
processing applications at LinkedIn. Their scale necessitates using
persistent and replicated local state. Unfortunately, hard failures can
cause a loss of this local state, and re-caching it can incur downtime
ranging from a few minutes to hours! In this talk, we describe the systems
and protocols that we've devised that bound the down time to a few seconds.
We detail the tradeoffs our approach brings and how we tackle them in
production at LinkedIn.

7:30 - 8:00 PM: Additional networking and Q&A

Hope to see you there!

*[Streams Infra team @ LinkedIn]*


Streams meetup at LinkedIn Sunnyvale, 6pm, Thursday, October 3, 2019

2019-09-26 Thread Joel Koshy
*[bcc: (users,dev)@kafka.apache.org ]*

Hi everyone,

The Streams Infra team invites you to attend a Streams Processing meetup on
Thursday, October 3, 2019 at LinkedIn's Sunnyvale campus. (This meetup focuses
on Apache Kafka, Apache Samza, and related streaming technologies.)

As always, there will food, drinks, and time for socializing before and
after the talks.

We would be happy to have you join us, but it will be live streamed and
recorded for those who are unable to attend.

*RSVP:*
Please RSVP *only* if you plan to attend in person.
A streaming link will be posted approximately one hour prior to the event.
https://www.meetup.com/Stream-Processing-Meetup-LinkedIn/events/264589317/

*When:*
6:00 - 9:00 PM, Thursday, October 3, 2019

*Where:*
Unify Conference room, LinkedIn Sunnyvale campus
950 W Maude Ave, Sunnyvale, CA 94085

*Summary:*
We have three talks lined up. Our first speaker is from the Azure Streaming
team. The talk will cover Azure Stream Analytics, its architecture,
capabilities and use cases.  The second talk is about Samza’s recently
built capability to do Stream processing using Python. Samza-Python opens
up multiple possibilities including the ability to use Python libraries for
ML applications. Finally, the Kafka team will talk about the development
workflow that allows them to handle LinkedIn’s scale by building a LinkedIn
version of Kafka while also contributing back to the open source community.
Additional details are below.

*Agenda:*
   5:30 PM - Doors open

5:30 - 6:00 PM - Networking

6:00 - 6:30 PM - *Azure Stream Analytics; Sasha Alperovich & Sid Ramadoss,
Microsoft*

Azure Stream Analytics (ASA) is a fully managed near real-time data
processing service on Azure. In this talk we will highlight the unique
value propositions that ASA brings to the table, and show a demo of how
Azure customers can utilize the power of ASA to gain insights in near
real-time with the NYC taxi scenario. We will then dive deeper into how the
service is built, covering resiliency, dataflow and other technical aspects
of the ASA runtime. We will also discuss how ASA’s unique design choices
compare and contrast with other streaming technologies, namely Spark
Structured Streaming and Flink

6:30 - 7:00 PM - *Stream Processing in Python using Apache Samza; Hai Lu,
LinkedIn*

Apache Samza is the streaming engine being used at LinkedIn that processes
~ 2 trillion messages daily. A while back we announced Samza's integration
with Apache Beam, a great success which leads to our Samza Beam API. Now an
UPGRADE of our APIs - we're now supporting Stream Processing in Python!
This work has made stream processing more accessible and enabled many
interesting use cases, particularly in the area of machine learning. The
Python API is based on our work of Samza runner for Apache Beam. In this
talk, we will quickly review our work on Samza runner, and then how we
extended it to support portability in Beam (Python specifically). In
addition to technical and architectural details, we will also talk about
how we bridged Python and Java ecosystems at LinkedIn with the Python API,
together with different use cases.

7:00 - 7:30 PM - *Apache Kafka and LinkedIn: How LinkedIn customizes Kafka
to work at the trillion scale; Jon Lee & Wesley Wu, LinkedIn*

At LinkedIn, we operate thousands of brokers to handle trillions of
messages per day. Running at such a large scale constantly raises various
scalability and operability challenges for the Kafka ecosystem. While we
try to maintain our internal releases as close as possible to upstream, we
maintain a version of Kafka which includes patches for addressing our
production and feature requirements. In this presentation we will share the
Kafka release that LinkedIn runs in production, the workflow process we
follow to develop new patches, the way we upstream the changes we make,
some of the patches we maintain in our branch and how we generate releases.

7:30 - 8:30 PM - Additional networking and Q&A

Hope to see you there!


Streams Meetup at LinkedIn Sunnyvale, 6pm, Wednesday, March 20, 2019

2019-03-07 Thread Joel Koshy
*[bcc: (users,dev)@kafka.apache.org ]*

Hi everyone,

The Streams Infrastructure team at LinkedIn invites you to attend a Streams
Processing meetup on Wednesday, March 20 at LinkedIn’s Sunnyvale campus.
(This meetup focuses on Apache Kafka, Apache Samza, and related streaming
technologies.)

As always, there will food, drinks, and time for socializing before and
after the talks.

We would be happy to have you join us, but it will be live streamed and
recorded for those who are unable to attend.

*RSVP*
https://www.meetup.com/Stream-Processing-Meetup-LinkedIn/events/259437388/

*Date*
6pm, Wednesday, March 20, 2019

*Location*
LinkedIn Sunnyvale campus - 950 W Maude Ave, Sunnyvale, CA 94085

*Agenda*
6:00-6:35 PM - Networking
6:35-7:10 PM - Apache Samza 1.0: Recent Advances and our plans for future
in Stream Processing (Prateek Maheshwari, LinkedIn)
7:15-7:50 PM - How and why we moved away from Kafka Mirror Maker to
Brooklin- LinkedIn's story (Shun-ping Chiu, LinkedIn)
7:55-8:30 PM - Puma - Stream Processing in Facebook (Rithin Shetty,
Facebook)
8:30-9:00 PM - Additional networking and Q&A

See you there!


Stream processing meetup at LinkedIn (Sunnyvale) on Thursday, February 16 at 6pm

2017-01-24 Thread Joel Koshy
Hi everyone,

We would like to invite you to a Stream Processing Meetup at LinkedIn’s
Sunnyvale campus on Thursday, February 16 at 6pm.

Please RSVP here (*only if you intend to attend in person*):
https://www.meetup.com/Stream-Processing-Meetup-LinkedIn/events/237171557/


We have the following agenda scheduled:

   -

   6PM: Doors open
   -

   6-6:35PM: Networking & Welcome
   -

   6:35-7:10 PM: Asynchronous Processing and Multithreading in Apache
Samza (Xinyu
   Liu, LinkedIn)
   -

   7:15-7:50PM: SSD Benchmarks for Apache Kafka (Mingmin Chen, Uber)
   -

   7:55-8:30 PM: Batching to Streaming Analytics at Optimizely (Vignesh
   Sukumar, Optimizely)

Hope to see you there!

Joel


Re: [ANNOUNCE] New committer: Grant Henke

2017-01-12 Thread Joel Koshy
Hey Grant - congrats!

On Thu, Jan 12, 2017 at 10:00 AM, Neha Narkhede  wrote:

> Congratulations, Grant. Well deserved!
>
> On Thu, Jan 12, 2017 at 7:51 AM Grant Henke  wrote:
>
> > Thanks everyone!
> >
> > On Thu, Jan 12, 2017 at 2:58 AM, Damian Guy 
> wrote:
> >
> > > Congratulations!
> > >
> > > On Thu, 12 Jan 2017 at 03:35 Jun Rao  wrote:
> > >
> > > > Grant,
> > > >
> > > > Thanks for all your contribution! Congratulations!
> > > >
> > > > Jun
> > > >
> > > > On Wed, Jan 11, 2017 at 2:51 PM, Gwen Shapira 
> > wrote:
> > > >
> > > > > The PMC for Apache Kafka has invited Grant Henke to join as a
> > > > > committer and we are pleased to announce that he has accepted!
> > > > >
> > > > > Grant contributed 88 patches, 90 code reviews, countless great
> > > > > comments on discussions, a much-needed cleanup to our protocol and
> > the
> > > > > on-going and critical work on the Admin protocol. Throughout this,
> he
> > > > > displayed great technical judgment, high-quality work and
> willingness
> > > > > to contribute where needed to make Apache Kafka awesome.
> > > > >
> > > > > Thank you for your contributions, Grant :)
> > > > >
> > > > > --
> > > > > Gwen Shapira
> > > > > Product Manager | Confluent
> > > > > 650.450.2760 <(650)%20450-2760> <(650)%20450-2760> | @gwenshap
> > > > > Follow us: Twitter | blog
> > > > >
> > > >
> > >
> >
> >
> >
> > --
> > Grant Henke
> > Software Engineer | Cloudera
> > gr...@cloudera.com | twitter.com/gchenke | linkedin.com/in/granthenke
> >
> --
> Thanks,
> Neha
>


Re: [VOTE] Vote for KIP-101 - Leader Epochs

2017-01-05 Thread Joel Koshy
(adding the dev list back - as it seems to have gotten dropped earlier in
this thread)

On Thu, Jan 5, 2017 at 6:36 PM, Joel Koshy  wrote:

> +1
>
> This is a very well-written KIP!
> Minor: there is still a mix of terms in the doc that references the
> earlier LeaderGenerationRequest (which is what I'm assuming what it was
> called in previous versions of the wiki). Same for the diagrams which I'm
> guessing are a little harder to make consistent with the text.
>
>
>
> On Thu, Jan 5, 2017 at 5:54 PM, Jun Rao  wrote:
>
>> Hi, Ben,
>>
>> Thanks for the updated KIP. +1
>>
>> 1) In OffsetForLeaderEpochResponse, start_offset probably should be
>> end_offset since it's the end offset of that epoch.
>> 3) That's fine. We can fix KAFKA-1120 separately.
>>
>> Jun
>>
>>
>> On Thu, Jan 5, 2017 at 11:11 AM, Ben Stopford  wrote:
>>
>> > Hi Jun
>> >
>> > Thanks for raising these points. Thorough as ever!
>> >
>> > 1) Changes made as requested.
>> > 2) Done.
>> > 3) My plan for handing returning leaders is to simply to force the
>> Leader
>> > Epoch to increment if a leader returns. I don't plan to fix KAFKA-1120
>> as
>> > part of this KIP. It is really a separate issue with wider implications.
>> > I'd be happy to add KAFKA-1120 into the release though if we have time.
>> > 4) Agreed. Not sure exactly how that's going to play out, but I think
>> we're
>> > on the same page.
>> >
>> > Please could
>> >
>> > Cheers
>> > B
>> >
>> > On Thu, Jan 5, 2017 at 12:50 AM Jun Rao  wrote:
>> >
>> > > Hi, Ben,
>> > >
>> > > Thanks for the proposal. Looks good overall. A few comments below.
>> > >
>> > > 1. For LeaderEpochRequest, we need to include topic right? We probably
>> > want
>> > > to follow other requests by nesting partition inside topic? For
>> > > LeaderEpochResponse,
>> > > do we need to return leader_epoch? I was thinking that we could just
>> > return
>> > > an end_offset, which is the next offset of the last message in the
>> > > requested leader generation. Finally, would
>> OffsetForLeaderEpochRequest
>> > be
>> > > a better name?
>> > >
>> > > 2. We should bump up both the produce request and the fetch request
>> > > protocol version since both include the message set.
>> > >
>> > > 3. Extending LeaderEpoch to include Returning Leaders: To support
>> this,
>> > do
>> > > you plan to use the approach that stores  CZXID in the broker
>> > registration
>> > > and including the CZXID of the leader in /brokers/topics/[topic]/
>> > > partitions/[partitionId]/state in ZK?
>> > >
>> > > 4. Since there are a few other KIPs involving message format too, it
>> > would
>> > > be useful to consider if we could combine the message format changes
>> in
>> > the
>> > > same release.
>> > >
>> > > Thanks,
>> > >
>> > > Jun
>> > >
>> > >
>> > > On Wed, Jan 4, 2017 at 9:24 AM, Ben Stopford 
>> wrote:
>> > >
>> > > > Hi All
>> > > >
>> > > > We’re having some problems with this thread being subsumed by the
>> > > > [Discuss] thread. Hopefully this one will appear distinct. If you
>> see
>> > > more
>> > > > than one, please use this one.
>> > > >
>> > > > KIP-101 should now be ready for a vote. As a reminder the KIP
>> proposes
>> > a
>> > > > change to the replication protocol to remove the potential for
>> replicas
>> > > to
>> > > > diverge.
>> > > >
>> > > > The KIP can be found here:  https://cwiki.apache.org/confl
>> > > > uence/display/KAFKA/KIP-101+-+Alter+Replication+Protocol+to+
>> > > > use+Leader+Epoch+rather+than+High+Watermark+for+Truncation <
>> > > > https://cwiki.apache.org/confluence/display/KAFKA/KIP-101+-
>> > > > +Alter+Replication+Protocol+to+use+Leader+Epoch+rather+
>> > > > than+High+Watermark+for+Truncation>
>> > > >
>> > > > Please let us know your vote.
>> > > >
>> > > > B
>> > > >
>> > > >
>> > > >
>> > > >
>> > > >
>> > >
>> >
>>
>
>


Re: [VOTE] Vote for KIP-101 - Leader Epochs

2017-01-05 Thread Joel Koshy
+1

This is a very well-written KIP!
Minor: there is still a mix of terms in the doc that references the earlier
LeaderGenerationRequest (which is what I'm assuming what it was called in
previous versions of the wiki). Same for the diagrams which I'm guessing
are a little harder to make consistent with the text.



On Thu, Jan 5, 2017 at 5:54 PM, Jun Rao  wrote:

> Hi, Ben,
>
> Thanks for the updated KIP. +1
>
> 1) In OffsetForLeaderEpochResponse, start_offset probably should be
> end_offset since it's the end offset of that epoch.
> 3) That's fine. We can fix KAFKA-1120 separately.
>
> Jun
>
>
> On Thu, Jan 5, 2017 at 11:11 AM, Ben Stopford  wrote:
>
> > Hi Jun
> >
> > Thanks for raising these points. Thorough as ever!
> >
> > 1) Changes made as requested.
> > 2) Done.
> > 3) My plan for handing returning leaders is to simply to force the Leader
> > Epoch to increment if a leader returns. I don't plan to fix KAFKA-1120 as
> > part of this KIP. It is really a separate issue with wider implications.
> > I'd be happy to add KAFKA-1120 into the release though if we have time.
> > 4) Agreed. Not sure exactly how that's going to play out, but I think
> we're
> > on the same page.
> >
> > Please could
> >
> > Cheers
> > B
> >
> > On Thu, Jan 5, 2017 at 12:50 AM Jun Rao  wrote:
> >
> > > Hi, Ben,
> > >
> > > Thanks for the proposal. Looks good overall. A few comments below.
> > >
> > > 1. For LeaderEpochRequest, we need to include topic right? We probably
> > want
> > > to follow other requests by nesting partition inside topic? For
> > > LeaderEpochResponse,
> > > do we need to return leader_epoch? I was thinking that we could just
> > return
> > > an end_offset, which is the next offset of the last message in the
> > > requested leader generation. Finally, would OffsetForLeaderEpochRequest
> > be
> > > a better name?
> > >
> > > 2. We should bump up both the produce request and the fetch request
> > > protocol version since both include the message set.
> > >
> > > 3. Extending LeaderEpoch to include Returning Leaders: To support this,
> > do
> > > you plan to use the approach that stores  CZXID in the broker
> > registration
> > > and including the CZXID of the leader in /brokers/topics/[topic]/
> > > partitions/[partitionId]/state in ZK?
> > >
> > > 4. Since there are a few other KIPs involving message format too, it
> > would
> > > be useful to consider if we could combine the message format changes in
> > the
> > > same release.
> > >
> > > Thanks,
> > >
> > > Jun
> > >
> > >
> > > On Wed, Jan 4, 2017 at 9:24 AM, Ben Stopford  wrote:
> > >
> > > > Hi All
> > > >
> > > > We’re having some problems with this thread being subsumed by the
> > > > [Discuss] thread. Hopefully this one will appear distinct. If you see
> > > more
> > > > than one, please use this one.
> > > >
> > > > KIP-101 should now be ready for a vote. As a reminder the KIP
> proposes
> > a
> > > > change to the replication protocol to remove the potential for
> replicas
> > > to
> > > > diverge.
> > > >
> > > > The KIP can be found here:  https://cwiki.apache.org/confl
> > > > uence/display/KAFKA/KIP-101+-+Alter+Replication+Protocol+to+
> > > > use+Leader+Epoch+rather+than+High+Watermark+for+Truncation <
> > > > https://cwiki.apache.org/confluence/display/KAFKA/KIP-101+-
> > > > +Alter+Replication+Protocol+to+use+Leader+Epoch+rather+
> > > > than+High+Watermark+for+Truncation>
> > > >
> > > > Please let us know your vote.
> > > >
> > > > B
> > > >
> > > >
> > > >
> > > >
> > > >
> > >
> >
>


[ANNOUNCE] New committer: Jiangjie (Becket) Qin

2016-10-31 Thread Joel Koshy
The PMC for Apache Kafka has invited Jiangjie (Becket) Qin to join as a
committer and we are pleased to announce that he has accepted!

Becket has made significant contributions to Kafka over the last two years.
He has been deeply involved in a broad range of KIP discussions and has
contributed several major features to the project. He recently completed
the implementation of a series of improvements (KIP-31, KIP-32, KIP-33) to
Kafka’s message format that address a number of long-standing issues such
as avoiding server-side re-compression, better accuracy for time-based log
retention, log roll and time-based indexing of messages.

Congratulations Becket! Thank you for your many contributions. We are
excited to have you on board as a committer and look forward to your
continued participation!

Joel


Re: Stream processing meetup at LinkedIn (Sunnyvale) on Wednesday, November 2 at 6pm

2016-10-18 Thread Joel Koshy
Yes it will be streamed and archived. The streaming link and subsequent
recording will be posted in the comments on the meetup page.

Thanks,

Joel

On Tue, Oct 18, 2016 at 11:25 AM, João Reis  wrote:

> Hi Joel,
>
> Would it be possible to stream the presentations ?
>
> Cheers,
> João Reis
>
> ____
> From: Joel Koshy 
> Sent: Monday, October 17, 2016 10:25:10 PM
> Cc: eyakabo...@linkedin.com
> Subject: Stream processing meetup at LinkedIn (Sunnyvale) on Wednesday,
> November 2 at 6pm
>
> Hi everyone,
>
> We would like to invite you to a Stream Processing Meetup at LinkedIn’s
> Sunnyvale campus on Wednesday, November 2 at 6pm.
>
> Please RSVP here (if you intend to attend in person):
> http://www.meetup.com/Stream-Processing-Meetup-LinkedIn/events/234454163
>
> We have the following three talks scheduled:
>
>- *Stream Processing using Apache Samza at LinkedIn: Past, Present, and
>Future*
>   - *by Kartik Paramasivam* (LinkedIn)
>- *Kafka Cruise Control: Auto Management of the Kafka Clusters*
>   - *by Becket (Jiangjie) Qin* (LinkedIn)
>- *Plumber - Intuit's Samza Ecosystem*
>   - *by Shekar Tippur* (Intuit)
>
> Hope to see you there!
>
> Joel
>
> __
> This email has been scanned by the Symantec Email Security.cloud service.
> For more information please visit http://www.symanteccloud.com
> __
>
> __
> This email has been scanned by the Symantec Email Security.cloud service.
> For more information please visit http://www.symanteccloud.com
> __


Stream processing meetup at LinkedIn (Sunnyvale) on Wednesday, November 2 at 6pm

2016-10-17 Thread Joel Koshy
Hi everyone,

We would like to invite you to a Stream Processing Meetup at LinkedIn’s
Sunnyvale campus on Wednesday, November 2 at 6pm.

Please RSVP here (if you intend to attend in person):
http://www.meetup.com/Stream-Processing-Meetup-LinkedIn/events/234454163

We have the following three talks scheduled:

   - *Stream Processing using Apache Samza at LinkedIn: Past, Present, and
   Future*
  - *by Kartik Paramasivam* (LinkedIn)
   - *Kafka Cruise Control: Auto Management of the Kafka Clusters*
  - *by Becket (Jiangjie) Qin* (LinkedIn)
   - *Plumber - Intuit's Samza Ecosystem*
  - *by Shekar Tippur* (Intuit)

Hope to see you there!

Joel


Stream processing meetup at LinkedIn (Mountain View) on Tuesday, August 23 at 6pm

2016-08-12 Thread Joel Koshy
Hi everyone,

We would like to invite you to a Stream Processing Meetup at
LinkedIn’s *Mountain
View campus on Tuesday, August 23 at 6pm*. Please RSVP here (only if you
intend to attend in person):
https://www.meetup.com/Stream-Processing-Meetup-LinkedIn/events/232864129

We have three great talks lined up with speakers from Confluent, LinkedIn
and TripAdvisor.

Hope to see you there!

Joel


Re: [kafka-clients] [VOTE] 0.10.0.1 RC2

2016-08-05 Thread Joel Koshy
+1 (binding)

Thanks Ismael!

On Thu, Aug 4, 2016 at 6:54 AM, Ismael Juma  wrote:

> Hello Kafka users, developers and client-developers,
>
> This is the third candidate for the release of Apache Kafka 0.10.0.1. This
> is a bug fix release and it includes fixes and improvements from 53 JIRAs
> (including a few critical bugs). See the release notes for more details:
>
> http://home.apache.org/~ijuma/kafka-0.10.0.1-rc2/RELEASE_NOTES.html
>
> When compared to RC1, RC2 contains a fix for a regression where an older
> version of slf4j-log4j12 was also being included in the libs folder of the
> binary tarball (KAFKA-4008). Thanks to Manikumar Reddy for reporting the
> issue.
>
> *** Please download, test and vote by Monday, 8 August, 8am PT ***
>
> Kafka's KEYS file containing PGP keys we use to sign the release:
> http://kafka.apache.org/KEYS
>
> * Release artifacts to be voted upon (source and binary):
> http://home.apache.org/~ijuma/kafka-0.10.0.1-rc2/
>
> * Maven artifacts to be voted upon:
> https://repository.apache.org/content/groups/staging
>
> * Javadoc:
> http://home.apache.org/~ijuma/kafka-0.10.0.1-rc2/javadoc/
>
> * Tag to be voted upon (off 0.10.0 branch) is the 0.10.0.1-rc2 tag:
> https://git-wip-us.apache.org/repos/asf?p=kafka.git;a=tag;h=
> f8f56751744ba8e55f90f5c4f3aed8c3459447b2
>
> * Documentation:
> http://kafka.apache.org/0100/documentation.html
>
> * Protocol:
> http://kafka.apache.org/0100/protocol.html
>
> * Successful Jenkins builds for the 0.10.0 branch:
> Unit/integration tests: *https://builds.apache.org/job/kafka-0.10.0-jdk7/182/
> *
> System tests: *https://jenkins.confluent.io/job/system-test-kafka-0.10.0/138/
> *
>
> Thanks,
> Ismael
>
> --
> You received this message because you are subscribed to the Google Groups
> "kafka-clients" group.
> To unsubscribe from this group and stop receiving emails from it, send an
> email to kafka-clients+unsubscr...@googlegroups.com.
> To post to this group, send email to kafka-clie...@googlegroups.com.
> Visit this group at https://groups.google.com/group/kafka-clients.
> To view this discussion on the web visit https://groups.google.com/d/
> msgid/kafka-clients/CAD5tkZYMMxDEjg_2jt4x-mVZZHgJ6EC6HKSf4Hn%
> 2Bi59DbTdVoQ%40mail.gmail.com
> 
> .
> For more options, visit https://groups.google.com/d/optout.
>


Kafka/Samza meetup at LinkedIn, June 15, 6pm

2016-05-27 Thread Joel Koshy
[bcc: users@kafka.apache.org, d...@kafka.apache.org]

Hi everyone,

We would like to invite you to our first Stream Processing Meetup at
LinkedIn on June 15 at 6pm. Please RSVP here:
http://www.meetup.com/Stream-Processing-Meetup-LinkedIn/events/231454378

Going forward (at LinkedIn) we will host meetups for Kafka/Samza in this
combined format. We have three great talks lined up for our June meetup:

*Scalable Complex Event Processing on Samza @Uber*
by Shuyi Chen 
The Marketplace data team at Uber has built a scalable complex event
processing platform to solve many challenging real time data needs for
various Uber products. This platform has been in production for some time
and it has proven to be very flexible to solve many use cases.  In this
talk, we will share in detail the design and architecture of the platform,
and how we employ Samza, Kafka, and Siddhi at scale.

*Air Traffic Controller: Using Samza to Manage Communications with Members*
by Cameron Lee 
Air Traffic Controller (ATC) is a system built on top of Samza which is
responsible for managing many of the communication channels LinkedIn has
with its members. Historically, LinkedIn has been known for sending too
many emails to members which are not useful to them. The goal of ATC is to
improve on that experience by providing some common functionality that
multiple use cases can leverage, such as dynamic batching of messages,
delivery time optimization, and channel selection. This discussion will
include some of the functionality ATC provides, how Samza was leveraged to
build ATC, and some challenges that we faced while building ATC.

*Tuning Kafka for low latency guaranteed messaging*
by Jiangjie (Becket) Qin 
Kafka is well known for high throughput ingestion. However, to get the best
latency characteristics without compromising on throughput and durability,
we need to tune Kafka. We will share our experiences to achieve the optimal
combination of latency, throughput and durability for different scenarios.

Thanks,

Joel (for the Streams infrastructure team @ LinkedIn)


Re: Wiki Karma

2016-02-18 Thread Joel Koshy
You should have access now.

On Thu, Feb 18, 2016 at 12:09 PM, Christian Posta  wrote:

> Can someone add Karma to my user id for contributing to the wiki/docs?
> userid is 'ceposta'
>
> thanks!
>
> --
> *Christian Posta*
> twitter: @christianposta
> http://www.christianposta.com/blog
> http://fabric8.io
>


Re: Kafka response ordering guarantees

2016-02-18 Thread Joel Koshy
>
> Does this mean that when a client is sending more than one in-flight
>> request per connection, the server does not guarantee that responses will
>> be sent in the same order as requests?
>
>
> No - the server does provide this guarantee - i.e., responses will always
> be sent in the same order as requests received on the same TCP connection. 
> *"The
> server guarantees that on a single TCP connection, requests will be
> processed in the order they are sent and responses will return in that
> order as well."* The subsequent statements just describe how this is
> achieved (i.e., we process only one request at a time in the API handlers).
>

To further clarify: the above is true even if the client sends more than on
in-flight request per connection.


>
> On Thu, Feb 18, 2016 at 5:59 AM, Ivan Dyachkov  wrote:
>
>> Thanks Ben.
>>
>> As I mentioned, I'm developing a kafka library and not using standard
>> java producer.
>>
>> My question is really about protocol guarantees.
>>
>> /Ivan
>>
>> - Original message -
>> From: Ben Stopford 
>> To: users@kafka.apache.org
>> Subject: Re: Kafka response ordering guarantees
>> Date: Wed, 17 Feb 2016 14:48:59 -0800
>>
>> So long as you set max.inflight.requests.per.connection = 1 Kafka should
>> provide strong ordering within a partition (so use the same key for
>> messages that should retain their order). There is a bug currently raised
>> agaisnt this feature though where there is an edge case that can cause
>> ordering issues.
>>
>> https://issues.apache.org/jira/browse/KAFKA-3197
>> > On 17 Feb 2016, at 07:17, Ivan Dyachkov  wrote:
>> >
>> > Hello all.
>> >
>> > I'm developing a kafka client and have a question about kafka server
>> guarantees.
>> >
>> > A statement from
>> https://cwiki.apache.org/confluence/display/KAFKA/A+Guide+To+The+Kafka+Protocol#AGuideToTheKafkaProtocol-Network
>> makes me a bit confused:
>> >
>> > "The server guarantees that on a single TCP connection, requests will
>> be processed in the order they are sent and responses will return in that
>> order as well. The broker's request processing allows only a single
>> in-flight request per connection in order to guarantee this ordering. Note
>> that clients can (and ideally should) use non-blocking IO to implement
>> request pipelining and achieve higher throughput. i.e., clients can send
>> requests even while awaiting responses for preceding requests since the
>> outstanding requests will be buffered in the underlying OS socket buffer.
>> All requests are initiated by the client, and result in a corresponding
>> response message from the server except where noted."
>> >
>> > Does this mean that when a client is sending more than one in-flight
>> request per connection, the server does not guarantee that responses will
>> be sent in the same order as requests?
>> >
>> > In other words, if I have a strictly monotonically increasing integer
>> as a correlation id for all requests, can I rely on Kafka that correlation
>> id in responses will also have this property?
>> >
>> > Thanks.
>> >
>> > /Ivan
>>
>>
>


Re: Kafka response ordering guarantees

2016-02-18 Thread Joel Koshy
> Does this mean that when a client is sending more than one in-flight
> request per connection, the server does not guarantee that responses will
> be sent in the same order as requests?


No - the server does provide this guarantee - i.e., responses will always
be sent in the same order as requests received on the same TCP
connection. *"The
server guarantees that on a single TCP connection, requests will be
processed in the order they are sent and responses will return in that
order as well."* The subsequent statements just describe how this is
achieved (i.e., we process only one request at a time in the API handlers).

On Thu, Feb 18, 2016 at 5:59 AM, Ivan Dyachkov  wrote:

> Thanks Ben.
>
> As I mentioned, I'm developing a kafka library and not using standard java
> producer.
>
> My question is really about protocol guarantees.
>
> /Ivan
>
> - Original message -
> From: Ben Stopford 
> To: users@kafka.apache.org
> Subject: Re: Kafka response ordering guarantees
> Date: Wed, 17 Feb 2016 14:48:59 -0800
>
> So long as you set max.inflight.requests.per.connection = 1 Kafka should
> provide strong ordering within a partition (so use the same key for
> messages that should retain their order). There is a bug currently raised
> agaisnt this feature though where there is an edge case that can cause
> ordering issues.
>
> https://issues.apache.org/jira/browse/KAFKA-3197
> > On 17 Feb 2016, at 07:17, Ivan Dyachkov  wrote:
> >
> > Hello all.
> >
> > I'm developing a kafka client and have a question about kafka server
> guarantees.
> >
> > A statement from
> https://cwiki.apache.org/confluence/display/KAFKA/A+Guide+To+The+Kafka+Protocol#AGuideToTheKafkaProtocol-Network
> makes me a bit confused:
> >
> > "The server guarantees that on a single TCP connection, requests will be
> processed in the order they are sent and responses will return in that
> order as well. The broker's request processing allows only a single
> in-flight request per connection in order to guarantee this ordering. Note
> that clients can (and ideally should) use non-blocking IO to implement
> request pipelining and achieve higher throughput. i.e., clients can send
> requests even while awaiting responses for preceding requests since the
> outstanding requests will be buffered in the underlying OS socket buffer.
> All requests are initiated by the client, and result in a corresponding
> response message from the server except where noted."
> >
> > Does this mean that when a client is sending more than one in-flight
> request per connection, the server does not guarantee that responses will
> be sent in the same order as requests?
> >
> > In other words, if I have a strictly monotonically increasing integer as
> a correlation id for all requests, can I rely on Kafka that correlation id
> in responses will also have this property?
> >
> > Thanks.
> >
> > /Ivan
>
>


Re: [kafka-clients] 0.9.0.1 RC1

2016-02-16 Thread Joel Koshy
+1

On Thu, Feb 11, 2016 at 6:55 PM, Jun Rao  wrote:

> This is the first candidate for release of Apache Kafka 0.9.0.1. This a
> bug fix release that fixes 70 issues.
>
> Release Notes for the 0.9.0.1 release
> https://home.apache.org/~junrao/kafka-0.9.0.1-candidate1/RELEASE_NOTES.html
>
> *** Please download, test and vote by Tuesday, Feb. 16, 7pm PT
>
> Kafka's KEYS file containing PGP keys we use to sign the release:
> http://kafka.apache.org/KEYS in addition to the md5, sha1
> and sha2 (SHA256) checksum.
>
> * Release artifacts to be voted upon (source and binary):
> https://home.apache.org/~junrao/kafka-0.9.0.1-candidate1/
>
> * Maven artifacts to be voted upon prior to release:
> https://repository.apache.org/content/groups/staging/org/apache/kafka/
>
> * scala-doc
> https://home.apache.org/~junrao/kafka-0.9.0.1-candidate1/scaladoc/
>
> * java-doc
> https://home.apache.org/~junrao/kafka-0.9.0.1-candidate1/javadoc/
>
> * The tag to be voted upon (off the 0.9.0 branch) is the 0.9.0.1 tag
>
> https://git-wip-us.apache.org/repos/asf?p=kafka.git;a=tag;h=2c17685a45efe665bf5f24c0296cb8f9e1157e89
>
> * Documentation
> http://kafka.apache.org/090/documentation.html
>
> Thanks,
>
> Jun
>
> --
> You received this message because you are subscribed to the Google Groups
> "kafka-clients" group.
> To unsubscribe from this group and stop receiving emails from it, send an
> email to kafka-clients+unsubscr...@googlegroups.com.
> To post to this group, send email to kafka-clie...@googlegroups.com.
> Visit this group at https://groups.google.com/group/kafka-clients.
> To view this discussion on the web visit
> https://groups.google.com/d/msgid/kafka-clients/CAFc58G8VbhZ5Q0nVnUAg8qR0yEO%3DqhYrHFtLySpJo1Nha%3DoOxA%40mail.gmail.com
> 
> .
> For more options, visit https://groups.google.com/d/optout.
>


Re: Question about offset expiration

2016-02-09 Thread Joel Koshy
Hi Matvey,


I have a question about the config value offsets.retention.minutes in kafka
> 0.9.0.0.
> Is this the timeout for when offsets get compacted in the topic or actually
> deleted (as it appears from a cursory reading of
>
> https://github.com/apache/kafka/blob/0.9.0/core/src/main/scala/kafka/coordinator/GroupMetadataManager.scala#L532
> )
>

It is actually deleted at that point. i.e., it is removed from the
in-memory cache and we append a tombstone for it in the log (which means it
will get deleted whenever that segment rolls over and becomes eligible for
compaction). So a subsequent offset fetch will return no offset.


> If it actually deletes the offsets, what happens if a topic simply doesn't
> get a new message for a day (the default value for
> offsets.retention.minutes). Are the offsets then forgotten? This seems
>

Yes


> really strange and counter-intuitive. I thought offsets were supposed to be
> saved forever (isn't that why a compacted topic is used)?
>

One reason to purge offsets older than the configured retention is to
clean-up after consumers that are no longer active (especially short-lived
consumers such as console consumers). Users can certainly opt to override
this behavior by setting it to a very high value. (Although it sounds like
you are suggesting that "forever" should be the default in the first place?)

Joel


Re: trouble upgrading from 0.8.2.1 to 0.9.0.0: invalid message

2016-01-20 Thread Joel Koshy
Hi Dave,

This change was introduced in
https://issues.apache.org/jira/browse/KAFKA-1755 for compacted topics.

>
> Interestingly, none of the messages currently going to the topic use
> message
> compaction (i.e. they all have empty keys), although at some time in the
> past
> I may have sent a few messages with keys.  Message compaction is being
> used for other topics.  So, the 0.9.0.0 version of the broker seems to
> think the
> topic is compacted while the 0.8.2.1 broker apparently doesn't think so.
> Does
> this shed any light on things?
>
> Also I notice the error message says "Compacted topic", which suggests that
> compaction is a property of the topic, and not individual messages as
>

Yes - compaction is a topic-level property. You can use --describe to
verify that the topic is compacted or not and if you didn't intend it to be
compacted you can alter the configuration.

I thought it was ok to send messages
> both
> with and without a key to the same topic, thus having compaction enabled
> for
> only a subset of the messages.  Is this incorrect?
>

In 0.9 you cannot send unkeyed messages to compacted topics. In 0.8.x this
would actually cause the log compaction thread to subsequently complain and
quit (and stop compacting all compacted topics). We did consider the
possibility of allowing producers to send both keyed and unkeyed but after
discussion we felt it would be better to fail fast and prevent unkeyed
messages from getting in. This was on the premise that supporting mixed
messages and only compacting a subset that have keys may not work very well
since the non-keyed messages would stick around indefinitely; however let
me know if you think differently on this and we can revisit.

Joel


> Thanks,
> Dave
>
>
> [2016-01-20 19:21:44,923] ERROR [Replica Manager on Broker 172341926]:
> Error processing append operation on partition [shown_news_stories,7]
> (kafka.server.ReplicaManager)
> kafka.message.InvalidMessageException: Compacted topic cannot accept
> message without key.
> at
>
> kafka.message.ByteBufferMessageSet.validateMessagesAndAssignOffsets(ByteBufferMessageSet.scala:250)
> at kafka.log.Log.liftedTree1$1(Log.scala:327)
> at kafka.log.Log.append(Log.scala:326)
> at kafka.cluster.Partition$$anonfun$9.apply(Partition.scala:442)
> at kafka.cluster.Partition$$anonfun$9.apply(Partition.scala:428)
> at kafka.utils.CoreUtils$.inLock(CoreUtils.scala:262)
> at kafka.utils.CoreUtils$.inReadLock(CoreUtils.scala:268)
> at
> kafka.cluster.Partition.appendMessagesToLeader(Partition.scala:428)
> at
>
> kafka.server.ReplicaManager$$anonfun$appendToLocalLog$2.apply(ReplicaManager.scala:401)
> at
>
> kafka.server.ReplicaManager$$anonfun$appendToLocalLog$2.apply(ReplicaManager.scala:386)
> at
>
> scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:244)
> at
>
> scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:244)
> at
> scala.collection.mutable.HashMap$$anonfun$foreach$1.apply(HashMap.scala:98)
> at
> scala.collection.mutable.HashMap$$anonfun$foreach$1.apply(HashMap.scala:98)
> at
> scala.collection.mutable.HashTable$class.foreachEntry(HashTable.scala:226)
> at scala.collection.mutable.HashMap.foreachEntry(HashMap.scala:39)
> at scala.collection.mutable.HashMap.foreach(HashMap.scala:98)
> at
> scala.collection.TraversableLike$class.map(TraversableLike.scala:244)
> at scala.collection.AbstractTraversable.map(Traversable.scala:105)
> at
> kafka.server.ReplicaManager.appendToLocalLog(ReplicaManager.scala:386)
> at
> kafka.server.ReplicaManager.appendMessages(ReplicaManager.scala:322)
> at
> kafka.server.KafkaApis.handleProducerRequest(KafkaApis.scala:366)
> at kafka.server.KafkaApis.handle(KafkaApis.scala:68)
> at
> kafka.server.KafkaRequestHandler.run(KafkaRequestHandler.scala:60)
> at java.lang.Thread.run(Thread.java:745)
>
>
>
> On Tue, Jan 19, 2016 at 2:50 AM, Ismael Juma  wrote:
>
> > Hi Dave,
> >
> > Do you get any errors logged in the broker when you get ACK error 2
> > (InvalidMessage) while producing requests to a mixed version cluster? It
> > would be helpful to see them.
> >
> > With regards to the kafka-console-producer.sh error, did you use the
> > 0.9.0.0 console producer with a mixed version cluster (ie some brokers
> were
> > on 0.8.2.1 while others were on 0.9.0.0)? If so, it is expected that it
> > won't work correctly. All the brokers should be upgraded before the
> clients
> > are upgraded (otherwise the 0.8.2.1 broker will send a response that the
> > newer clients cannot handle).
> >
> > Ismael
> >
> > On Fri, Jan 15, 2016 at 7:52 PM, Dave Peterson 
> wrote:
> >
> > > Hi Ismael,
> > >
> > > I'm using bruce (https://github.com/ifwe/bruce) to send the produce
> > > requests, with a RequiredAcks value of 1.  Everything works fine when
> >

Re: Kafka APIs version

2016-01-20 Thread Joel Koshy
That is definitely no longer true. A number of requests are at version 1 or
higher. I will file a jira for this.

On Wed, Jan 20, 2016 at 8:30 AM, tao xiao  wrote:

> Hi team,
>
> In the Kafka protocol wiki it states that version 0 is the only supported
> version in all APIs. I want to know if this still remains true? if not
> which APIs are now using version 1?
>


Re: Create Kafka Topic Programatically

2016-01-20 Thread Joel Koshy
https://issues.apache.org/jira/browse/KAFKA-2945 and related jiras is
probably what you are looking for. That is planned for the next release.

On Wed, Jan 20, 2016 at 8:41 AM, Tommy Becker  wrote:

> This works, but it's clumsy and has limitations. Unfortunately, I'm not
> aware of any alternatives. We do some programmatic topic creation when we
> want to create a topic with a non-default configuration. But specifying the
> configuration is an all or nothing affair. For example, there's no way to
> say create this topic using all the defaults except enable log compaction.
> Worse, I don't think there's a way to get what the default topic
> configuration is from the client. I'd like to see some improvements here.
>
> On 01/20/2016 08:24 AM, Timo Ahokas wrote:
>
> Hi Joe,
>
> We're doing a similar thing that you're looking for with some of our app
> nodes. We use the kafka.admin.AdminTools and its
> topicExists()/createTopic() methods. Some additional code (e.g. Kafka
> context check/creation) might be needed for dev/test environments if you're
> using a specific Kafka context instead of the root.
>
> -Timo
>
> On 20 January 2016 at 23:14, Joe San  codeintheo...@gmail.com> wrote:
>
>
>
> I doubt that might be enough. Could you tell me if the
> auto.create.topics.enable satisfies the following requirement?
>
> 1. I want to create a topic with a specific name
> 2. If I restart the producer client, if the topic with that name already
> exists, it should do nothing and use the topic as is
> 3. Upon producer restart, if the topic already exists, and if the topic has
> un consumed messages, it should not be deleted
>
> Thanks and Regards,
> Joe
>
> On Wed, Jan 20, 2016 at 2:11 PM, Franco Giacosa  >
> wrote:
>
>
>
> Hi Joe,
>
> There is an option in the producer called auto.create.topics.enable, so
>
>
> the
>
>
> producer can just start sending data to a topic and the topic will be
> created with the default values.
>
>
>
> 2016-01-20 13:19 GMT+01:00 Joe San  codeintheo...@gmail.com>:
>
>
>
> Kafka Users,
>
> How can I create a kafka topic programatically?
>
> I would like to create the topics when I initialize my application. It
> should also be in such a way that if the topic already exists, the
> initialization code should do nothing!
>
> Thanks and Regards,
> Joe
>
>
>
>
>
>
>
>
>
>
>
>
> --
> Tommy Becker
> Senior Software Engineer
>
> Digitalsmiths
> A TiVo Company
>
> www.digitalsmiths.com
> tobec...@tivo.com
>
> 
>
> This email and any attachments may contain confidential and privileged
> material for the sole use of the intended recipient. Any review, copying,
> or distribution of this email (or any attachments) by others is prohibited.
> If you are not the intended recipient, please contact the sender
> immediately and permanently delete this email and any attachments. No
> employee or agent of TiVo Inc. is authorized to conclude any binding
> agreement on behalf of TiVo Inc. by email. Binding agreements with TiVo
> Inc. may only be made by a signed written agreement.
>


Re: KAFKA-1499 compression.type

2016-01-15 Thread Joel Koshy
Yes you are right - thanks for pointing it out. We will get that fixed.

Joel

On Fri, Jan 15, 2016 at 10:25 AM, Elias Levy 
wrote:

> Anyone?
>
> On Thu, Jan 14, 2016 at 8:42 PM, Elias Levy 
> wrote:
>
> > The description of the compression.type config property in the
> > documentation is somewhat confusing.  It begins with "Specify the final
> > compression type for a given topic.", yet it is defined as a broker
> > configuration property and it is not listed under topic-level
> configuration
> > properties.
> >
> > Reading the discussion in KAFKA-1499 leads me to believe that the broker
> > level property it a default that can be overridden by using the same
> > property at the topic level.
> >
> > Is this correct?
> >
> > If so, it would be best to make the documentation clearer and to add the
> > property to the topic-level config properties section in addition to the
> > broker level config section.
> >
>


Re: Does quota requires 0.9.X clients?

2016-01-12 Thread Joel Koshy
I'm pretty sure it should work - you may want to give it a try locally
though. We did add a throttle-time field in the responses but that will
only be included in responses for requests from 0.9.x clients. 0.8.x
requests will just get throttled at the broker and will get an 0.8.x format
response without the throttle-time information.

On Tue, Jan 12, 2016 at 2:00 PM, Allen Wang 
wrote:

> From looking at the design document, it seems quota is implemented purely
> at server side. So it should work with 0.8.X clients. But I would like to
> get confirmation.
>
> Thanks,
> Allen
>


Re: log compaction scaling with ~100m messages

2015-10-07 Thread Joel Koshy
Using log compaction is well-suited for applications that use Kafka
directly and need to persist some state associated with its processing. So
something like offset management for consumers
 is a good
fit. Another good use-case is for storing schemas

associated with your Kafka topics. These are both very specific to
maintaining metadata around your stream processing. Although it can be used
for more general K-V storage it is not *always* a good fit. This is
especially true if your key-space is bound to grow significantly over time
or has an high update rate. The other aspect is the need to do some sort of
caching of your key-value pairs (since otherwise lookups would require
scanning the log). So for application-level general K-V storage, you could
certainly use Kafka as a persistence mechanism for recording recent updates
(with traditional time-based retention), but you would probably want a more
suitable K-V store separate from Kafka. I'm not sure this (i.e.,
traditional db storage) is your use case since you mention "a lot of stream
processing on these messages" - so it sounds more like repetitive
processing over the entire key space. For that it may be more reasonable.
The alternative is to use snapshots and read more recent updates from the
updates stream in Kafka. Samza folks may want to weigh in here as well.

That said, to answer your question: sure it is feasible to use log
compaction with 1B keys, especially if you have enough brokers, partitions,
and log cleaner threads but I'm not sure it is the best approach to take.
We did hit various issues (bugs/feature gaps) with log compaction while
using it for consumer offset management: e.g., support for compressed
messages, various other bugs, but most of these have been resolved.

Hope that helps,

Joel

On Tue, Oct 6, 2015 at 8:34 PM, Feroze Daud 
wrote:
> hi!
> We have a use case where we want to store ~100m keys in kafka. Is there
any problem with this approach?
> I have heard from some people using kafka, that kafka has a problem when
doing log compaction with those many number of keys.
> Another topic might have around 10 different K/V pairs for each key in
the primary topic. The primary topic's keyspace is approx of 100m keys. We
would like to store this in kafka because we are doing a lot of stream
processing on these messages, and want to avoid writing another process to
recompute data from snapshots.
> So, in summary:
> primary topic: ~100m keyssecondary topic: ~1B keys
> Is it feasible to use log compaction at such a scale of data?
> Thanks
> feroze.


Re: Dealing with large messages

2015-10-06 Thread Joel Koshy
The best practice I think is to just put large objects in a blob store
and have messages embed references to those blobs. Interestingly we
ended up having to implement large-message-support at LinkedIn but for
various reasons were forced to put messages inline (i.e., against the
above recommendation). So we ended up having to break up large
messages into smaller chunks. This obviously adds considerable
complexity to the consumer since the checkpointing can become pretty
complicated. There are other nuances as well - we can probably do a
short talk on this at an upcoming meetup.

Joel


On Mon, Oct 5, 2015 at 9:31 PM, Rahul Jain  wrote:
> In addition to the config changes mentioned in that post, you may also have
> to change producer config if you are using the new producer.
>
> Specifically, *max.request.size* and *request.timeout.ms
> * have to be increased to allow the producer to
> send large messages.
>
>
> On 6 Oct 2015 02:02, "James Cheng"  wrote:
>
>> Here’s an article that Gwen wrote earlier this year on handling large
>> messages in Kafka.
>>
>> http://ingest.tips/2015/01/21/handling-large-messages-kafka/
>>
>> -James
>>
>> > On Oct 5, 2015, at 11:20 AM, Pradeep Gollakota 
>> wrote:
>> >
>> > Fellow Kafkaers,
>> >
>> > We have a pretty heavyweight legacy event logging system for batch
>> > processing. We're now sending the events into Kafka now for realtime
>> > analytics. But we have some pretty large messages (> 40 MB).
>> >
>> > I'm wondering if any of you have use cases where you have to send large
>> > messages to Kafka and how you're dealing with them.
>> >
>> > Thanks,
>> > Pradeep
>>
>>
>> 
>>
>> This email and any attachments may contain confidential and privileged
>> material for the sole use of the intended recipient. Any review, copying,
>> or distribution of this email (or any attachments) by others is prohibited.
>> If you are not the intended recipient, please contact the sender
>> immediately and permanently delete this email and any attachments. No
>> employee or agent of TiVo Inc. is authorized to conclude any binding
>> agreement on behalf of TiVo Inc. by email. Binding agreements with TiVo
>> Inc. may only be made by a signed written agreement.
>>


Re: [kafka-clients] [VOTE] 0.8.2.2 Candidate 1

2015-09-09 Thread Joel Koshy
+1 binding

On Thu, Sep 3, 2015 at 9:22 AM, Jun Rao  wrote:
> This is the first candidate for release of Apache Kafka 0.8.2.2. This only
> fixes two critical issues (KAFKA-2189 and KAFKA-2308) related to snappy in
> 0.8.2.1.
>
> Release Notes for the 0.8.2.2 release
> https://people.apache.org/~junrao/kafka-0.8.2.2-candidate1/RELEASE_NOTES.html
>
> *** Please download, test and vote by Tuesday, Sep 8, 7pm PT
>
> Kafka's KEYS file containing PGP keys we use to sign the release:
> http://kafka.apache.org/KEYS in addition to the md5, sha1
> and sha2 (SHA256) checksum.
>
> * Release artifacts to be voted upon (source and binary):
> https://people.apache.org/~junrao/kafka-0.8.2.2-candidate1/
>
> * Maven artifacts to be voted upon prior to release:
> https://repository.apache.org/content/groups/staging/
>
> * scala-doc
> https://people.apache.org/~junrao/kafka-0.8.2.2-candidate1/scaladoc/
>
> * java-doc
> https://people.apache.org/~junrao/kafka-0.8.2.2-candidate1/javadoc/
>
> * The tag to be voted upon (off the 0.8.2 branch) is the 0.8.2.2 tag
> https://git-wip-us.apache.org/repos/asf?p=kafka.git;a=tag;h=d01226cfdcb3d9daad8465234750fa515a1e7e4a
>
> /***
>
> Thanks,
>
> Jun
>
> --
> You received this message because you are subscribed to the Google Groups
> "kafka-clients" group.
> To unsubscribe from this group and stop receiving emails from it, send an
> email to kafka-clients+unsubscr...@googlegroups.com.
> To post to this group, send email to kafka-clie...@googlegroups.com.
> Visit this group at http://groups.google.com/group/kafka-clients.
> To view this discussion on the web visit
> https://groups.google.com/d/msgid/kafka-clients/CAFc58G-O56-Wb29W65fY1KFwA3Dy9Uok%3DpixdfboDD8xQhiMog%40mail.gmail.com.
> For more options, visit https://groups.google.com/d/optout.


Re: New producer in production

2015-07-17 Thread Joel Koshy
It is pretty safe :) and adopting it now will save you the trouble
of migrating later.

At LinkedIn, we are using the new producer for all the producers that
the Kafka team directly own - this means mirror-maker pipelines, REST
proxies and some auditing tools. We have not yet migrated other
producers, but will likely proceed with that after
https://issues.apache.org/jira/browse/KAFKA-2120 is done. That is
probably not an extremely critical issue though if your Kafka
deployments are smaller and do not have hardware failures as often as
ours (which is mainly because we just happen to have so many brokers
some of which are just older machines).

Thanks,

Joel

On Fri, Jul 17, 2015 at 09:35:46AM -0700, Sivananda Reddy wrote:
> Hi,
> 
> Kafka document ion says that the new producer is in Beta state, how safe is
> it to use the new producer in production?. This is the first time I am
> using Kafka for my application messaging needs. Please let me know.
> 
> Thank you,
> Siva.



Re: Offset not committed

2015-07-15 Thread Joel Koshy
- You can also change the log4j level dynamically via the
  kafka.Log4jController mbean.
- You can also look at offset commit request metrics (mbeans) on the
  broker (just to check if _any_ offset commits are coming through
  during the period you see no moving offsets).
- The alternative is to just consume the offsets topic.

On Wed, Jul 15, 2015 at 05:30:17PM +, Jiangjie Qin wrote:
> I am not sure how your project was setup. But I think it depends on what
> log4j property file you specified when you started your application. Can
> you check if you have log4j appender defined and the loggers are directed
> to the correct appender?
> 
> Thanks,
> 
> Jiangjie (Becket) Qin
> 
> On 7/15/15, 8:10 AM, "Vadim Bobrov"  wrote:
> 
> >Thanks Jiangjie,
> >
> >unfortunately turning trace level on does not seem to work (any log level
> >actually) I am using log4j2 (through slf4j) and despite including log4j1
> >bridge and these lines:
> >
> >
> >
> >
> >in my conf file I could not squeeze out any logging from kafka. Logging
> >for
> >all other libs (like zookeeper e.g.) work perfectly. Am I doing something
> >wrong?
> >
> >
> >On Tue, Jul 14, 2015 at 6:55 PM, Jiangjie Qin 
> >wrote:
> >
> >> Hi Vadim,
> >>
> >> Can you turn on trace level logging on your consumer and search for
> >> "offset commit response² in the log?
> >> Also maybe take a look at the log to see if there is any exception
> >>thrown.
> >>
> >> Thanks,
> >>
> >> Jiangjie (Becket) Qin
> >>
> >> On 7/14/15, 11:06 AM, "Vadim Bobrov"  wrote:
> >>
> >> >just caught this error again. I issue commitOffsets - no error but no
> >> >committng offsets either. __consumer_offsets watching shows no new
> >> >messages
> >> >either. Then in a few minutes I issue commitOffsets again - all
> >>committed.
> >> >Unless I am doing something terribly wrong this is very unreliable
> >> >
> >> >On Tue, Jul 14, 2015 at 1:49 PM, Joel Koshy 
> >>wrote:
> >> >
> >> >> Actually, how are you committing offsets? Are you using the old
> >> >> (zookeeperconsumerconnector) or new KafkaConsumer?
> >> >>
> >> >> It is true that the current APIs don't return any result, but it
> >>would
> >> >> help to check if anything is getting into the offsets topic - unless
> >> >> you are seeing errors in the logs, the offset commit should succeed
> >> >> (if you are indeed explicitly committing offsets).
> >> >>
> >> >> Thanks,
> >> >>
> >> >> Joel
> >> >>
> >> >> On Tue, Jul 14, 2015 at 12:19:01PM -0400, Vadim Bobrov wrote:
> >> >> > Thanks, Joel, I will but regardless of my findings the basic
> >>problem
> >> >>will
> >> >> > still be there: there is no guarantee that the offsets will be
> >> >>committed
> >> >> > after commitOffsets. Because commitOffsets does not return its exit
> >> >> status,
> >> >> > nor does it block as I understand until offsets are committed. In
> >> >>other
> >> >> > words, there is no way to know that it has, in fact, commited the
> >> >>offsets
> >> >> >
> >> >> > or am I missing something? And then another question - why does it
> >> >>seem
> >> >> to
> >> >> > depend on the number of consumed messages?
> >> >> >
> >> >> > On Tue, Jul 14, 2015 at 11:36 AM, Joel Koshy 
> >> >> wrote:
> >> >> >
> >> >> > > Can you take a look at the kafka commit rate mbean on your
> >>consumer?
> >> >> > > Also, can you consume the offsets topic while you are committing
> >> >> > > offsets and see if/what offsets are getting committed?
> >> >> > > (http://www.slideshare.net/jjkoshy/offset-management-in-kafka/32)
> >> >> > >
> >> >> > > Thanks,
> >> >> > >
> >> >> > > Joel
> >> >> > >
> >> >> > > On Tue, Jul 14, 2015 at 11:12:03AM -0400, Vadim Bobrov wrote:
> >> >> > > > I am trying to replace ActiveMQ with Kafka in our environment
> >> >> however I
> >> >> > > > have encountered a strange problem that basically prevents from
> >> >>using
> >> >> > > Kafka
> >> >> > > > in production. The problem is that sometimes the offsets are
> >>not
> >> >> > > committed.
> >> >> > > >
> >> >> > > > I am using Kafka 0.8.2.1, offset storage = kafka, high level
> >> >> consumer,
> >> >> > > > auto-commit = off. Every N messages I issue commitOffsets().
> >>Now
> >> >> here is
> >> >> > > > the problem - if N is below a certain number (180 000 for me)
> >>it
> >> >> works
> >> >> > > and
> >> >> > > > the offset is moving. If N is 180 000 or more the offset is not
> >> >> updated
> >> >> > > > after commitOffsets
> >> >> > > >
> >> >> > > > I am looking at offsets using kafka-run-class.sh
> >> >> > > > kafka.tools.ConsumerOffsetChecker
> >> >> > > > Any help?
> >> >> > >
> >> >> > >
> >> >>
> >> >>
> >>
> >>
> 

-- 
Joel


Re: Fwd: Offset not committed

2015-07-14 Thread Joel Koshy
Actually, how are you committing offsets? Are you using the old
(zookeeperconsumerconnector) or new KafkaConsumer?

It is true that the current APIs don't return any result, but it would
help to check if anything is getting into the offsets topic - unless
you are seeing errors in the logs, the offset commit should succeed
(if you are indeed explicitly committing offsets).

Thanks,

Joel

On Tue, Jul 14, 2015 at 12:19:01PM -0400, Vadim Bobrov wrote:
> Thanks, Joel, I will but regardless of my findings the basic problem will
> still be there: there is no guarantee that the offsets will be committed
> after commitOffsets. Because commitOffsets does not return its exit status,
> nor does it block as I understand until offsets are committed. In other
> words, there is no way to know that it has, in fact, commited the offsets
> 
> or am I missing something? And then another question - why does it seem to
> depend on the number of consumed messages?
> 
> On Tue, Jul 14, 2015 at 11:36 AM, Joel Koshy  wrote:
> 
> > Can you take a look at the kafka commit rate mbean on your consumer?
> > Also, can you consume the offsets topic while you are committing
> > offsets and see if/what offsets are getting committed?
> > (http://www.slideshare.net/jjkoshy/offset-management-in-kafka/32)
> >
> > Thanks,
> >
> > Joel
> >
> > On Tue, Jul 14, 2015 at 11:12:03AM -0400, Vadim Bobrov wrote:
> > > I am trying to replace ActiveMQ with Kafka in our environment however I
> > > have encountered a strange problem that basically prevents from using
> > Kafka
> > > in production. The problem is that sometimes the offsets are not
> > committed.
> > >
> > > I am using Kafka 0.8.2.1, offset storage = kafka, high level consumer,
> > > auto-commit = off. Every N messages I issue commitOffsets(). Now here is
> > > the problem - if N is below a certain number (180 000 for me) it works
> > and
> > > the offset is moving. If N is 180 000 or more the offset is not updated
> > > after commitOffsets
> > >
> > > I am looking at offsets using kafka-run-class.sh
> > > kafka.tools.ConsumerOffsetChecker
> > > Any help?
> >
> >



Re: Fwd: Offset not committed

2015-07-14 Thread Joel Koshy
Can you take a look at the kafka commit rate mbean on your consumer?
Also, can you consume the offsets topic while you are committing
offsets and see if/what offsets are getting committed?
(http://www.slideshare.net/jjkoshy/offset-management-in-kafka/32)

Thanks,

Joel

On Tue, Jul 14, 2015 at 11:12:03AM -0400, Vadim Bobrov wrote:
> I am trying to replace ActiveMQ with Kafka in our environment however I
> have encountered a strange problem that basically prevents from using Kafka
> in production. The problem is that sometimes the offsets are not committed.
> 
> I am using Kafka 0.8.2.1, offset storage = kafka, high level consumer,
> auto-commit = off. Every N messages I issue commitOffsets(). Now here is
> the problem - if N is below a certain number (180 000 for me) it works and
> the offset is moving. If N is 180 000 or more the offset is not updated
> after commitOffsets
> 
> I am looking at offsets using kafka-run-class.sh
> kafka.tools.ConsumerOffsetChecker
> Any help?



Re: some problem abort isrshrinkrate

2015-07-02 Thread Joel Koshy
I'm assuming you mean you shut down the follower and the leader did
not reflect this in isr shrink-rate. I think the reason for this is
that the controller would have sent a leaderAndIsr request to the
(existing leader) with the shrunk ISR.

On the (redundant) leader transition the leader updates the ISR (i.e.,
shrinks) but does not mark the shrink-rate meter. We currently only
mark the meter if the follower is slow/stuck. If it is down, then the
controller may detect that and send the leaderAndIsr request before
the ISR shrink-thread gets a chance to detect and shrink the ISR.

We should mark the ISR shrink when receiving a leaderAndIsr request
from the controller with a shrunk ISR - I filed
https://issues.apache.org/jira/browse/KAFKA-2309 for this.

Thanks,

Joel

On Thu, Jul 02, 2015 at 12:07:46AM +0800, taosheng53 wrote:
> hello,
>when i use jmx to monitor kafka, i find the value of the isrshrinkrate 
> always 0,never change when one broker has been shutdown.(i have a cluster 
> containing three nodes),i want to know if there is some problem?



Re: Indication in FetchResponse that fetch size is too small

2015-07-02 Thread Joel Koshy
A somewhat related request came up in another thread and I think it is
reasonable to provide this. However, there are already some indicators
that you can use:
- The consumer iterator throws a MessageSizeTooLargeException if it
  cannot extract any messages out of the next chunk.
- If you are using the simple consumer, the fetch response includes
  the high watermark - so if the HW > your fetch offset but there are
  no iterable messages, then you will know that your fetch size needs
  to increase.

Thanks,

Joel

On Thu, Jul 02, 2015 at 05:32:20PM +0200, Stevo Slavić wrote:
> Hello Apache Kafka community,
> 
> Couldn't broker return a special error code in FetchResponse for a given
> partition(s) where it detects that there was something to return/read from
> partition but actual FetchResponse contains no messages for that partition
> since fetch size in FetchRequest for that partition is too small?
> 
> I don't see it already in existing Kafka wire protocol docs
> .
> It doesn't seem to be supported, since docs for broker and consumer
> configuration mention that the maximums should be in sync, otherwise too
> large message could block consumer.
> 
> If it was available this feature would be useful for simple consumer API
> users, in the cases when range of possible message sizes on same topic is
> relatively large (e.g. most very small messages, but some small fraction of
> relatively large messages). Would help with being able to  soft and hard
> maximums, so most of the time one could read with soft max, and only in
> case of this error code is on, attempt a read using hard max limit.
> 
> Assuming this feature is not there, I guess what would need to be done at
> minimum is:
> - minimal change in protocol and docs, just one new error code
> - accompanying detection of this condition on broker side, so extend
> handling of FetchRequest
> - maybe in high level consumer make use of this error code in a special way
> (e.g. just log a warning/error that there is potential misconfiguration).
> 
> Kind regards,
> Stevo Slavic.



Re: EOL JDK 1.6 for Kafka

2015-07-01 Thread Joel Koshy
+1

On Wednesday, July 1, 2015, Harsha  wrote:

> Hi,
> During our SSL Patch KAFKA-1690. Some of the reviewers/users
> asked for support this config
>
> https://docs.oracle.com/javase/8/docs/api/javax/net/ssl/SSLParameters.html#setEndpointIdentificationAlgorithm-java.lang.String-
> It allows clients to verify the server and prevent potential MITM. This
> api doesn't exist in Java 1.6.
> Are there any users still want 1.6 support or can we stop supporting 1.6
> from next release on wards.
>
> Thanks,
> Harsha
>


-- 
Sent from Gmail Mobile


Re: data loss - replicas

2015-06-23 Thread Joel Koshy
It seems you might have run that on the last log segment. Can you run
it on 21764229.log on both brokers and compare? I'm
guessing there may be a message-set with a different compression codec
that may be causing this.

Thanks,

Joel

On Tue, Jun 23, 2015 at 01:06:16PM +0530, nirmal wrote:
> Hi,
> i ran DumpLogSegments.
> 
> *Broker 1*
> offset: 23077447 position: 1073722324 isvalid: true payloadsize: 431
> magic: 0 compresscodec: NoCompressionCodec crc: 895349554
> 
> *Broker 2*
> offset: 23077447 position: 1073740131 isvalid: true payloadsize: 431
> magic: 0 compresscodec: NoCompressionCodec crc: 895349554
> 
> Thanks
> 
> 
> On 06/23/2015 04:52 AM, Joel Koshy wrote:
> >The replicas do not have to decompress/recompress so I don't think
> >that would contribute to this.
> >
> >There may be some corner cases such as:
> >- Multiple unclean leadership elections in sequence
> >- Changing the compression codec for a topic on the fly - different
> >   brokers may see this config change at almost (but not exactly) the
> >   same time, but not sure if you are using that feature.
> >
> >You may want to use the DumpLogSegments tool to actually compare the
> >offsets present in both log files.
> >
> >On Mon, Jun 22, 2015 at 08:55:40AM -0700, Todd Palino wrote:
> >>I assume that you are considering the data loss to be the difference in
> >>size between the two directories? This is generally not a good guideline,
> >>as the batching and compression will be different between the two replicas.
> >>
> >>-Todd
> >>
> >>
> >>On Mon, Jun 22, 2015 at 7:26 AM, Nirmal ram 
> >>wrote:
> >>
> >>>Hi,
> >>>
> >>>I noticed a data loss while storing in kafka logs.
> >>>Generally, leader hands the request to  followers, is there a data loss in
> >>>that process?
> >>>
> >>>topic 'jun8' with 2 replicas and 8 partitions
> >>>
> >>>*Broker 1*[user@ jun8-6]$ ls -ltr
> >>>total 7337500
> >>>-rw-rw-r-- 1 user user 1073741311 Jun 22 12:45 15195331.log
> >>>-rw-rw-r-- 1 user user1127512 Jun 22 12:45 15195331.index
> >>>-rw-rw-r-- 1 user user 1073741396 Jun 22 12:48 16509739.log
> >>>-rw-rw-r-- 1 user user1108544 Jun 22 12:48 16509739.index
> >>>-rw-rw-r-- 1 user user 1073740645 Jun 22 12:52 17823869.log
> >>>-rw-rw-r-- 1 user user1129064 Jun 22 12:52 17823869.index
> >>>-rw-rw-r-- 1 user user 1073741800 Jun 22 13:17 19136798.log
> >>>-rw-rw-r-- 1 user user1161152 Jun 22 13:17 19136798.index
> >>>-rw-rw-r-- 1 user user 1073741509 Jun 22 13:21 20451309.log
> >>>-rw-rw-r-- 1 user user1152448 Jun 22 13:21 20451309.index
> >>>*-rw-rw-r-- 1 user user 1073740588 Jun 22 13:39 21764229.log*
> >>>-rw-rw-r-- 1 user user1241168 Jun 22 13:39 21764229.index
> >>>-rw-rw-r-- 1 user user 1062343875 Jun 22 13:42 23077448.log
> >>>-rw-rw-r-- 1 user user   10485760 Jun 22 13:42 23077448.index
> >>>[user@ jun8-6]$
> >>>
> >>>
> >>>
> >>>*Broker 2*[user@ jun8-6]$ ls -ltr
> >>>total 7340468
> >>>-rw-rw-r-- 1 user user 1073741311 Jun 22 12:45 15195331.log
> >>>-rw-rw-r-- 1 user user1857144 Jun 22 12:45 15195331.index
> >>>-rw-rw-r-- 1 user user 1073741396 Jun 22 12:48 16509739.log
> >>>-rw-rw-r-- 1 user user1857168 Jun 22 12:48 16509739.index
> >>>-rw-rw-r-- 1 user user 1073740645 Jun 22 12:52 17823869.log
> >>>-rw-rw-r-- 1 user user1857752 Jun 22 12:52 17823869.index
> >>>-rw-rw-r-- 1 user user 1073741800 Jun 22 13:17 19136798.log
> >>>-rw-rw-r-- 1 user user1857440 Jun 22 13:17 19136798.index
> >>>-rw-rw-r-- 1 user user 1073741509 Jun 22 13:21 20451309.log
> >>>-rw-rw-r-- 1 user user1856968 Jun 22 13:21 20451309.index
> >>>*-rw-rw-r-- 1 user user 1073722781 Jun 22 13:39 21764229.log*
> >>>-rw-rw-r-- 1 user user1762288 Jun 22 13:39 21764229.index
> >>>-rw-rw-r-- 1 user user   10485760 Jun 22 13:42 23077448.index
> >>>-rw-rw-r-- 1 user user 1062343875 Jun 22 13:42 23077448.log
> >>>[user@ jun8-6]$
> >>>
> 

-- 
Joel


Re: Is trunk safe for production?

2015-06-23 Thread Joel Koshy
Yes new features are a big part of it and sometimes bug
fixes/improvements. Bug fixes are mostly due to being on trunk, but
some aren't necessarily introduced on trunk.  For e.g., we would like
to do a broader roll-out of the new producer, but KAFKA-2121 (adding a
request timeout to NetworkClient) actually blocks that effort. (The
reason for that being we have occasional broker hardware failures
given the size of our deployment which can actually cause producers
under certain circumstances to become wedged).

Joel

On Tue, Jun 23, 2015 at 09:54:30AM -0700, Gwen Shapira wrote:
> Out of curiosity, why do you want to run trunk?
> General fondness for cutting edge stuff? Or are there specific
> features in trunk that you need?
> 
> Gwen
> 
> On Tue, Jun 23, 2015 at 2:59 AM, Achanta Vamsi Subhash
>  wrote:
> > I am planning to use for the producer part. How stable is trunk generally?
> >
> > --
> > Regards
> > Vamsi Subhash
> >
> > --
> >
> >
> > --
> >
> > This email and any files transmitted with it are confidential and intended
> > solely for the use of the individual or entity to whom they are addressed.
> > If you have received this email in error please notify the system manager.
> > This message contains confidential information and is intended only for the
> > individual named. If you are not the named addressee you should not
> > disseminate, distribute or copy this e-mail. Please notify the sender
> > immediately by e-mail if you have received this e-mail by mistake and
> > delete this e-mail from your system. If you are not the intended recipient
> > you are notified that disclosing, copying, distributing or taking any
> > action in reliance on the contents of this information is strictly
> > prohibited. Although Flipkart has taken reasonable precautions to ensure no
> > viruses are present in this email, the company cannot accept responsibility
> > for any loss or damage arising from the use of this email or attachments



Re: data loss - replicas

2015-06-22 Thread Joel Koshy
The replicas do not have to decompress/recompress so I don't think
that would contribute to this.

There may be some corner cases such as:
- Multiple unclean leadership elections in sequence
- Changing the compression codec for a topic on the fly - different
  brokers may see this config change at almost (but not exactly) the
  same time, but not sure if you are using that feature.

You may want to use the DumpLogSegments tool to actually compare the
offsets present in both log files.

On Mon, Jun 22, 2015 at 08:55:40AM -0700, Todd Palino wrote:
> I assume that you are considering the data loss to be the difference in
> size between the two directories? This is generally not a good guideline,
> as the batching and compression will be different between the two replicas.
> 
> -Todd
> 
> 
> On Mon, Jun 22, 2015 at 7:26 AM, Nirmal ram 
> wrote:
> 
> > Hi,
> >
> > I noticed a data loss while storing in kafka logs.
> > Generally, leader hands the request to  followers, is there a data loss in
> > that process?
> >
> > topic 'jun8' with 2 replicas and 8 partitions
> >
> > *Broker 1*[user@ jun8-6]$ ls -ltr
> > total 7337500
> > -rw-rw-r-- 1 user user 1073741311 Jun 22 12:45 15195331.log
> > -rw-rw-r-- 1 user user1127512 Jun 22 12:45 15195331.index
> > -rw-rw-r-- 1 user user 1073741396 Jun 22 12:48 16509739.log
> > -rw-rw-r-- 1 user user1108544 Jun 22 12:48 16509739.index
> > -rw-rw-r-- 1 user user 1073740645 Jun 22 12:52 17823869.log
> > -rw-rw-r-- 1 user user1129064 Jun 22 12:52 17823869.index
> > -rw-rw-r-- 1 user user 1073741800 Jun 22 13:17 19136798.log
> > -rw-rw-r-- 1 user user1161152 Jun 22 13:17 19136798.index
> > -rw-rw-r-- 1 user user 1073741509 Jun 22 13:21 20451309.log
> > -rw-rw-r-- 1 user user1152448 Jun 22 13:21 20451309.index
> > *-rw-rw-r-- 1 user user 1073740588 Jun 22 13:39 21764229.log*
> > -rw-rw-r-- 1 user user1241168 Jun 22 13:39 21764229.index
> > -rw-rw-r-- 1 user user 1062343875 Jun 22 13:42 23077448.log
> > -rw-rw-r-- 1 user user   10485760 Jun 22 13:42 23077448.index
> > [user@ jun8-6]$
> >
> >
> >
> > *Broker 2*[user@ jun8-6]$ ls -ltr
> > total 7340468
> > -rw-rw-r-- 1 user user 1073741311 Jun 22 12:45 15195331.log
> > -rw-rw-r-- 1 user user1857144 Jun 22 12:45 15195331.index
> > -rw-rw-r-- 1 user user 1073741396 Jun 22 12:48 16509739.log
> > -rw-rw-r-- 1 user user1857168 Jun 22 12:48 16509739.index
> > -rw-rw-r-- 1 user user 1073740645 Jun 22 12:52 17823869.log
> > -rw-rw-r-- 1 user user1857752 Jun 22 12:52 17823869.index
> > -rw-rw-r-- 1 user user 1073741800 Jun 22 13:17 19136798.log
> > -rw-rw-r-- 1 user user1857440 Jun 22 13:17 19136798.index
> > -rw-rw-r-- 1 user user 1073741509 Jun 22 13:21 20451309.log
> > -rw-rw-r-- 1 user user1856968 Jun 22 13:21 20451309.index
> > *-rw-rw-r-- 1 user user 1073722781 Jun 22 13:39 21764229.log*
> > -rw-rw-r-- 1 user user1762288 Jun 22 13:39 21764229.index
> > -rw-rw-r-- 1 user user   10485760 Jun 22 13:42 23077448.index
> > -rw-rw-r-- 1 user user 1062343875 Jun 22 13:42 23077448.log
> > [user@ jun8-6]$
> >

-- 
Joel


Re: Increased replication factor. Replication didn't happen!

2015-06-11 Thread Joel Koshy
This may help:
http://kafka.apache.org/documentation.html#basic_ops_increase_replication_factor

On Thu, Jun 11, 2015 at 11:20:05AM +0800, Shady Xu wrote:
> Right now, Kafka topics do not support changing replication factor or
> partition number after creation. The  kafka-reassign-partitions.sh tool can
> only reassign existent partitions.
> 
> 2015-06-11 9:31 GMT+08:00 Gwen Shapira :
> 
> > What do the logs show?
> >
> > On Wed, Jun 10, 2015 at 5:07 PM, Dillian Murphey
> >  wrote:
> > > Ran this:
> > >
> > > $KAFKA_HOME/bin/kafka-reassign-partitions.sh
> > >
> > > But Kafka did not actually do the replication. Topic description shows
> > the
> > > right numbers, but it just didn't replicate.
> > >
> > > What's wrong, and how do I trigger the replication to occur??
> > >
> > > I'm running 0.8.2.0
> > >
> > > thanks
> >



Re: offset storage as kafka with zookeeper 3.4.6

2015-06-11 Thread Joel Koshy
> Is it mandatory to use the zookeeper that comes with kafka for offset
> storage to be migrated to kafka?
If you want to "move" offsets from zookeeper to Kafka then yes you
need to have a phase where all consumers in your group set dual commit
to true. If you are starting a fresh consumer group then you can
turn off dual-commit.

> But nothing is being written to this topic, while the consumer offsets
> continue to reside on zookeeper.

The zookeeper offsets won't be removed. However, are they changing?
How are you verifying that nothing is written to this topic? If you
are trying to consume it, then you will need to set
exclude.internal.topics=false in your consumer properties. You can
also check consumer mbeans that give the KafkaCommitRate or enable
trace logging in either the consumer or the broker's request logs to
check if offset commit request are getting sent out to the cluster.

On Thu, Jun 11, 2015 at 01:03:09AM -0700, Kris K wrote:
> I am trying to migrate the offset storage to kafka (3 brokers of version
> 0.8.2.1) using the consumer property offsets.storage=kafka.  I noticed that
> a new topic, __consumer_offsets got created.
> But nothing is being written to this topic, while the consumer offsets
> continue to reside on zookeeper.
> 
> I am using a 3 node zookeeper ensemble (version 3.4.6) and not using the
> one that comes with kafka.
> 
> The current config consumer.properties now contains:
> 
> offsets.storage=kafka
> dual.commit.enabled=false
> exclude.internal.topics=false
> 
> Is it mandatory to use the zookeeper that comes with kafka for offset
> storage to be migrated to kafka?
> 
> I tried both the approaches:
> 
> 1. As listed on slide 34 of
> http://www.slideshare.net/jjkoshy/offset-management-in-kafka.
> 2. By deleting the zookeeper data directories and kafka log directories.
> 
> None of them worked.
> 
> Thanks
> Kris



Re: Consumer lag lies - orphaned offsets?

2015-06-05 Thread Joel Koshy
On Fri, Jun 05, 2015 at 12:53:00AM -0400, Otis Gospodnetić wrote:
> Hi Joel,
> 
> On Thu, Jun 4, 2015 at 8:52 PM, Joel Koshy  wrote:
> 
> > Hi Otis,
> >
> > Yes this is a limitation in the old consumer. i.e., a number of
> > per-topic/partition mbeans remain even on a rebalance. Those need to
> > be de-registered. So if you stop consuming from some partition after a
> > rebalance that lag mbean currently remain which is why it remains
> > flat.  This is a known issue.
> >
> 
> I see.  Is / should this be considered a bug?  Something worth fixing for
> 0.8.3?

Yes I would call it a bug, but it hasn't been a high priority so far
mainly because (I think) most users monitor lag with committed
offsets. This is what we do at LinkedIn for instance as Todd mentioned
in his reply.

> 
> Also, you say this is the limitation of the old consumer.  Does that mean
> that this problem goes away completely if one uses the new consumer?

This is sort of n/a at the moment as per-partition lag has not been
added yet to the new consumer. It does have the equivalent of max-lag.
If we add per-partition lag sensors we would need to be able to remove
those sensors if applicable after a rebalance.

> 
> > On the restart, the lag goes down to zero because - well the mbeans
> > get recreated and the consumer starts fetching. If the fetch request
> > reads up to the end of the log then the mbean will report zero. Your
> > actual committed offset may be behind though which is why your true
> > lag is > 0.
> >
> > The lag mbeans are useful, but have a number of limitations - it
> > depends on active fetches in progress;
> 
> 
> What do you mean by this?

If the fetcher threads die for any reason then fetches stop and the
consumer continues to report lag off the last fetched offset and the
last reported log end offset. So it will stay flat when it should be
increasing (since the log end offset on the broker is increasing if
producers are still sending to that partition).

Also, the old consumer pre-fetches chunks and buffers these
internally.  If the chunk queue is full fetches stop; and if the
consumer is extremely slow in actually processing the messages off
each chunk then lag can stay flat (perhaps even at zero) until the
next chunk, while the consumer is iterating messages off the previous
chunk.

> 
> > it also does not exactly
> > correspond with your actual processed (and committed) offset.
> 
> Right.  Though it should be updated in near real-time, so it will
> approximately match the reality, no?

Yes - I think it is fair to say that in most cases the lag mbeans
should be accurate within a small delta of the true lag. Although we
are trying to avoid further non-critical development on the old
consumer it is convenient to have these mbeans. So I think it may be
worth fixing this issue (i.e., deregistering mbeans on a rebalance).
Can you file a jira for this?

Thanks,

Joel

> 
> Thanks,
> Otis
> --
> Monitoring * Alerting * Anomaly Detection * Centralized Log Management
> Solr & Elasticsearch Support * http://sematext.com/
> 
> 
> 
> > The most
> > reliable way to monitor application lag is to use the committed
> > offsets and the current log end offsets. Todd has been doing a lot of
> > interesting work in making lag monitoring less painful and can comment
> > more.
> >
> > Joel
> >
> > On Thu, Jun 04, 2015 at 04:55:44PM -0400, Otis Gospodnetić wrote:
> > > Hi,
> > >
> > > On Thu, Jun 4, 2015 at 4:26 PM, Scott Reynolds 
> > wrote:
> > >
> > > > I believe the JMX metrics reflect the consumer PRIOR to committing
> > offsets
> > > > to Kafka / Zookeeper. But when you query from the command line using
> > the
> > > > kafka tools, you are just getting the committed offsets.
> > > >
> > >
> > > Even if that were the case, and maybe it is, it doesn't explain why the
> > > ConsumerLag in JMX often remains *completely constant*.forever...
> > until
> > > the consumer is restarted.  You see what I mean?
> > >
> > > Otis
> > > --
> > > Monitoring * Alerting * Anomaly Detection * Centralized Log Management
> > > Solr & Elasticsearch Support * http://sematext.com/
> > >
> > >
> > >
> > > > On Thu, Jun 4, 2015 at 1:23 PM, Otis Gospodnetic <
> > > > otis.gospodne...@gmail.com
> > > > > wrote:
> > > >
> > > > > Hi,
> > > > >
> > > > > Here's something potentially useful.
> > > > >
> > > > > 1) Before: https://apps.sematext.com/spm-rep

Re: Consumer lag lies - orphaned offsets?

2015-06-04 Thread Joel Koshy
Hi Otis,

Yes this is a limitation in the old consumer. i.e., a number of
per-topic/partition mbeans remain even on a rebalance. Those need to
be de-registered. So if you stop consuming from some partition after a
rebalance that lag mbean currently remain which is why it remains
flat.  This is a known issue.

On the restart, the lag goes down to zero because - well the mbeans
get recreated and the consumer starts fetching. If the fetch request
reads up to the end of the log then the mbean will report zero. Your
actual committed offset may be behind though which is why your true
lag is > 0.

The lag mbeans are useful, but have a number of limitations - it
depends on active fetches in progress; it also does not exactly
correspond with your actual processed (and committed) offset. The most
reliable way to monitor application lag is to use the committed
offsets and the current log end offsets. Todd has been doing a lot of
interesting work in making lag monitoring less painful and can comment
more.

Joel

On Thu, Jun 04, 2015 at 04:55:44PM -0400, Otis Gospodnetić wrote:
> Hi,
> 
> On Thu, Jun 4, 2015 at 4:26 PM, Scott Reynolds  wrote:
> 
> > I believe the JMX metrics reflect the consumer PRIOR to committing offsets
> > to Kafka / Zookeeper. But when you query from the command line using the
> > kafka tools, you are just getting the committed offsets.
> >
> 
> Even if that were the case, and maybe it is, it doesn't explain why the
> ConsumerLag in JMX often remains *completely constant*.forever... until
> the consumer is restarted.  You see what I mean?
> 
> Otis
> --
> Monitoring * Alerting * Anomaly Detection * Centralized Log Management
> Solr & Elasticsearch Support * http://sematext.com/
> 
> 
> 
> > On Thu, Jun 4, 2015 at 1:23 PM, Otis Gospodnetic <
> > otis.gospodne...@gmail.com
> > > wrote:
> >
> > > Hi,
> > >
> > > Here's something potentially useful.
> > >
> > > 1) Before: https://apps.sematext.com/spm-reports/s/eQ9WhLegW9 - the
> > "flat
> > > Lag situation"
> > >
> > > 2) I restarted the consumer whose lag is shown in the above graph
> > >
> > > 3) After restart: https://apps.sematext.com/spm-reports/s/4YGkcUP9ms -
> > NO
> > > lag at all!?
> > >
> > > So that 81560 Lag value that was stuck in JMX is gone.  Went down to 0.
> > > Kind of makes sense - the whole consumer was restarted, consumer/java
> > > process was restarted, everything that was in JMX got reset, and if there
> > > is truly no consumer lag it makes sense that the values in JMX are 0.
> > >
> > > HOWEVER, is the Lag *really* always *exactly* 0?  No way.  Look what
> > Offset
> > > Checker shows for this one consumer:
> > >
> > > af_servers  spm_cluster_topic-new-cdh  18  220551962
> > > 220586078   34116
> > > af_servers_spm-afs-6.prod.sematext-1433447997963-c40182c8-0
> > > af_servers  spm_cluster_topic-new-cdh  19  161936440
> > > 161960377   23937
> > > af_servers_spm-afs-6.prod.sematext-1433447997963-c40182c8-0
> > > af_servers  spm_cluster_topic-new-cdh  20  248308642
> > > 248340350   31708
> > > af_servers_spm-afs-6.prod.sematext-1433447997963-c40182c8-0
> > > af_servers  spm_cluster_topic-new-cdh  21  259901355
> > > 259934911   33556
> > > af_servers_spm-afs-6.prod.sematext-1433447997963-c40182c8-0
> > > af_servers  spm_cluster_topic-new-cdh  22  205274547
> > > 205296950   22403
> > > af_servers_spm-afs-6.prod.sematext-1433447997963-c40182c8-0
> > > af_servers  spm_cluster_topic-new-cdh  23  167756654
> > > 167780028   23374
> > > af_servers_spm-afs-6.prod.sematext-1433447997963-c40182c8-0
> > > af_servers  spm_cluster_topic-new-cdh  24  357517989
> > > 357574627   56638
> > > af_servers_spm-afs-6.prod.sematext-1433447997963-c40182c8-0
> > > af_servers  spm_cluster_topic-new-cdh  25  194313232
> > > 194338154   24922
> > > af_servers_spm-afs-6.prod.sematext-1433447997963-c40182c8-0
> > >
> > > You can see Lag is NOT == 0 for any of the partitions!
> > >
> > > And yet, look what JMX is showing now -- all Lag for all partitions is
> > > supposedly 0. Always:
> > >
> > >
> > >
> > kafka.server:type=FetcherLagMetrics,name=ConsumerLag,clientId=af_servers,topic=spm_cluster_free_system_topic-new-cdh,partition=17
> > >   Value java.lang.Object = 0
> > >
> > >
> > kafka.server:type=FetcherLagMetrics,name=ConsumerLag,clientId=af_servers,topic=spm_cluster_free_system_topic-new-cdh,partition=18
> > >   Value java.lang.Object = 0
> > >
> > >
> > kafka.server:type=FetcherLagMetrics,name=ConsumerLag,clientId=af_servers,topic=spm_cluster_topic-2-new-cdh,partition=18
> > >   Value java.lang.Object = 0
> > >
> > >
> > kafka.server:type=FetcherLagMetrics,name=ConsumerLag,clientId=af_servers,topic=spm_cluster_topic-3-new-cdh,partition=18
> > >   Value java.lang.Object = 0
> > >
> > >
> > kafka.server:type=FetcherLagMetrics,name=ConsumerLag,clientId=af_servers,topic=spm_cluster_topic-new-cdh,partition=18
> > >   Value java.lang

Re: Kafka Not Commiting Messages

2015-05-28 Thread Joel Koshy
I think you can also set this dynamically via an mbean
(kafka.Log4jController)

On Thu, May 28, 2015 at 08:26:00PM +, Jiangjie Qin wrote:
> Actually the name should be log4j.logger.kafka.network.RequestChannel$
> It should be there in 0.8.2.1. Can you check it again?
> 
> From: Charlie Mason mailto:charlie@gmail.com>>
> Reply-To: "charlie@gmail.com" 
> mailto:charlie@gmail.com>>
> Date: Thursday, May 28, 2015 at 1:09 PM
> To: Jiangjie Qin mailto:j...@linkedin.com>>
> Cc: "users@kafka.apache.org" 
> mailto:users@kafka.apache.org>>
> Subject: Re: Kafka Not Commiting Messages
> 
> Hi Jiangjie,
> 
> Thanks for you message. Unfortunately there doesn't appear to have that 
> setting in log4j.properties in Kafka 0.8.2.1.
> 
> I tried adding that property to the log file however it doesn't seem to have 
> any effect.
> 
> Is there a different logger I need to configure for 0.8.2.1?
> 
> 
> Thanks,
> 
> Charlie M
> 
> On Thu, May 28, 2015 at 1:15 AM, Jiangjie Qin 
> mailto:j...@linkedin.com>> wrote:
> Can you turn on TRACE level logging for kafka-request.log and see if
> broker received the producer request or not?
> You can go to KAKFA_FOLDER/config/log4j.properties and change
> log4j.logger.kafka.network.RequestChannels to TRACE.
> 
> Jiangjie (Becket) Qin
> 
> On 5/27/15, 12:12 PM, "Charlie Mason" 
> mailto:charlie@gmail.com>> wrote:
> 
> >Hi All,
> >
> >So I have done some more tests and found something I really don't
> >understand.
> >
> >I found a simple example of the Kafka Java producer so I ran that pointing
> >at the same topic as my last test. That failed when run from my local
> >machine. I uploaded it to the VM where Kafka is installed and it worked
> >perfectly. Bare in mind this exactly the same code configured to talk to
> >exactly the same IP address.
> >
> >The thing I don't understand is I can see a connection via Wireshark to
> >Kafka when the code is running on my machine. I can even query for the
> >topic metadata successfully from my local machine. The only thing it won't
> >do is commit a message. From what I have read the Kafka protocol is a
> >fairly straight forward single TCP connection to the server from the
> >client.
> >
> >Does anyone have any ideas what be causing this? Obviously people run in
> >production with the producer on different nodes from the brokers, so I
> >can't see whats different here.
> >
> >Thanks,
> >
> >Charlie M
> >
> >
> >
> >
> >On Tue, May 26, 2015 at 9:09 PM, Charlie Mason 
> >mailto:charlie@gmail.com>>
> >wrote:
> >
> >> Hi All,
> >>
> >> I have been trying to get started with Kafka. I have set up an 0.8.2
> >> broker as per the quick start. With a single node broker I am able to
> >>run
> >> the scripts in the bin folder to successfully produce and consume
> >>messages.
> >>
> >> I then tried to write some Scala code to use the new 0.8 Producer API to
> >> produce messages. However nothing seems to appear on the consumer. I
> >> modified the producer code to wait for the broker's metadata response.
> >>That
> >> blocks and then times out.
> >>
> >> The only difference I can see apart from using the new Producer API is
> >>all
> >> my previous tests were run inside the VM Kafka is installed on. Where
> >>as my
> >> code is running on the host machine and connecting into Kafka. I put
> >>some
> >> code into get the topics metadata to confirm connectivity to Kafka. That
> >> prints the topic metadata correctly before hanging on the
> >>send(...).get().
> >> I have also checked the topic is set for a replication factor of 1. I
> >>can't
> >> see anything in the Kafka logs either. All I see on the broker is a
> >>message
> >> when the Producer times out saying the client disconnected.
> >>
> >> Anyone got any ideas what might be making Kafka fail to commit the
> >> messages?
> >>
> >> I really want to start playing with Kafka however I seem to have fallen
> >>at
> >> the first hurdle.
> >>
> >> Thanks,
> >>
> >> Charlie M
> >>
> 
> 

-- 
Joel


Re: Waiting for replication factor to take effect...?

2015-05-28 Thread Joel Koshy
There are also various mbeans you can poke to check the replica
fetcher lag but from your description it appears that the fetcher quit
for some reason (since you mentioned the topic is 'pretty much' empty)

BTW, the below is only for partition 0

> Do you have state change logs available? You should have a become
> follower state transition on 9255216 and a leader state transition at
> around the same time on 925537. The server log should also show
> 9255216 starting a replica fetcher thread to the leader.


> 
> Do you have request logging available? You can check if the follower
> is issuing fetch requests to the leader. Another option is to take a
> threaddump on the follower and see if its replica fetcher thread is
> alive or not.
> 
> Joel
> 
> On Thu, May 28, 2015 at 11:26:05AM -0700, Dillian Murphey wrote:
> > Nothing on this?  Someone must surely have a need to add brokers and
> > increase the replication factor.  I waited a day and I still do not see
> > in-sync replicas.
> > 
> > :(
> > 
> > 
> > 
> > On Wed, May 27, 2015 at 5:51 PM, Dillian Murphey 
> > wrote:
> > 
> > > Hi Ho,
> > >
> > > I'm trying to increase my replication factor from 1 to 2.
> > >
> > > I used the tool
> > >
> > > kafka-reassign-partitions.sh
> > >
> > > I see the replica factor change, but my replicas are not syncing up. There
> > > is pretty much no data in this topic. Will this replication get triggered
> > > at some point??
> > >
> > > root@b3b4b5d71b48:/opt/kafka_2.10-0.8.2.1#
> > > $KAFKA_HOME/bin/kafka-topics.sh --describe --zookeeper $ZK --topic mytopic
> > > Topic:mytopic PartitionCount:2 ReplicationFactor:2 Configs:
> > > Topic: mytopic Partition: 0 Leader: 925537 Replicas: 9255216,925537 Isr:
> > > 925537
> > > Topic: mytopic Partition: 1 Leader: 9255216 Replicas: 925537,9255216 Isr:
> > > 9255216
> > >
> > >
> > > Thanks for any comments.
> > >
> 



Re: Waiting for replication factor to take effect...?

2015-05-28 Thread Joel Koshy
Do you have state change logs available? You should have a become
follower state transition on 9255216 and a leader state transition at
around the same time on 925537. The server log should also show
9255216 starting a replica fetcher thread to the leader.

Do you have request logging available? You can check if the follower
is issuing fetch requests to the leader. Another option is to take a
threaddump on the follower and see if its replica fetcher thread is
alive or not.

Joel

On Thu, May 28, 2015 at 11:26:05AM -0700, Dillian Murphey wrote:
> Nothing on this?  Someone must surely have a need to add brokers and
> increase the replication factor.  I waited a day and I still do not see
> in-sync replicas.
> 
> :(
> 
> 
> 
> On Wed, May 27, 2015 at 5:51 PM, Dillian Murphey 
> wrote:
> 
> > Hi Ho,
> >
> > I'm trying to increase my replication factor from 1 to 2.
> >
> > I used the tool
> >
> > kafka-reassign-partitions.sh
> >
> > I see the replica factor change, but my replicas are not syncing up. There
> > is pretty much no data in this topic. Will this replication get triggered
> > at some point??
> >
> > root@b3b4b5d71b48:/opt/kafka_2.10-0.8.2.1#
> > $KAFKA_HOME/bin/kafka-topics.sh --describe --zookeeper $ZK --topic mytopic
> > Topic:mytopic PartitionCount:2 ReplicationFactor:2 Configs:
> > Topic: mytopic Partition: 0 Leader: 925537 Replicas: 9255216,925537 Isr:
> > 925537
> > Topic: mytopic Partition: 1 Leader: 9255216 Replicas: 925537,9255216 Isr:
> > 9255216
> >
> >
> > Thanks for any comments.
> >



Re: Is fetching from in-sync replicas possible?

2015-05-27 Thread Joel Koshy
> > Out of curiosity - what's the typical latency (distribution) you see
> between zones?
> 
> Unfortunately I don't have any good numbers on that. Since we're publishing
> both in the same AZ and to other AZs the latency metrics reflect both. If I
> figure out a good way to measure this I will report back.

Thanks - I was wondering if you had done some simple ping/traceroute
tests. It's just that with replicas in different zones the end-to-end
latency from producer to consumer will be correspondingly higher.
Does your zookeeper setup span zones as well?

> 
> On Tue, May 26, 2015 at 10:44 PM, Joel Koshy  wrote:
> 
> > > Apologies if this question has been asked before. If I understand things
> > > correctly a client can only fetch from the leader of a partition, not
> > from
> > > an (in-sync) replica. I have a use case where it would be very beneficial
> > > if it were possible to fetch from a replica instead of just the leader,
> > and
> > > I wonder why it is not allowed? Are there any consistency problems with
> > > allowing it, for example? Is there any way to configure Kafka to allow
> > it?
> >
> > Yes this should be possible.  I don't think there are any consistency
> > issues (barring any bugs) since we never expose past the
> > high-watermark and the follower HW is strictly <= leader HW. Can you
> > file a jira for this?
> >
> > > The use case is a Kafka cluster running in EC2 across three availability
> > > zones.
> >
> > Out of curiosity - what's the typical latency (distribution) you see
> > between zones?
> >
> > Joel
> >



Re: Is fetching from in-sync replicas possible?

2015-05-27 Thread Joel Koshy
That's right - it should not help significantly assuming even
distribution of leaders and even distribution of partition volume
(average inbound messages/sec).

Theo's use-case is a bit different though in which you want to avoid
cross-zone consumer reads especially if you have a high fan-out in
number of consumers.

On Wed, May 27, 2015 at 05:56:56PM +, Aditya Auradkar wrote:
> Is that necessarily the case? On a cluster hosting partitions, assuming the 
> leaders are evenly distributed, every node should receive a roughly equal 
> share of the traffic. It does help a lot when the consumer throughput of a 
> single partition exceeds the capacity of a single leader but at that point 
> the topic ideally needs more partitions.
> 
> Aditya
> 
> 
> From: James Cheng [jch...@tivo.com]
> Sent: Wednesday, May 27, 2015 10:50 AM
> To: users@kafka.apache.org
> Subject: Re: Is fetching from in-sync replicas possible?
> 
> On May 26, 2015, at 1:44 PM, Joel Koshy  wrote:
> 
> >> Apologies if this question has been asked before. If I understand things
> >> correctly a client can only fetch from the leader of a partition, not from
> >> an (in-sync) replica. I have a use case where it would be very beneficial
> >> if it were possible to fetch from a replica instead of just the leader, and
> >> I wonder why it is not allowed? Are there any consistency problems with
> >> allowing it, for example? Is there any way to configure Kafka to allow it?
> >
> > Yes this should be possible.  I don't think there are any consistency
> > issues (barring any bugs) since we never expose past the
> > high-watermark and the follower HW is strictly <= leader HW. Can you
> > file a jira for this?
> >
> 
> Wouldn't this allow Kafka to scale to handle a lot more consumer traffic? 
> Currently, consumers all have to read from the leader, which means that the 
> network/disk bandwidth of a particular leader is the bottleneck. If consumers 
> could read from in-sync replicas, then a single node no longer is the 
> bottleneck for reads. You could scale out your read capacity as far as you 
> want.
> 
> -James
> 
> 
> >> The use case is a Kafka cluster running in EC2 across three availability
> >> zones.
> >
> > Out of curiosity - what's the typical latency (distribution) you see
> > between zones?
> >
> > Joel
> 



Re: Is fetching from in-sync replicas possible?

2015-05-26 Thread Joel Koshy
> Apologies if this question has been asked before. If I understand things
> correctly a client can only fetch from the leader of a partition, not from
> an (in-sync) replica. I have a use case where it would be very beneficial
> if it were possible to fetch from a replica instead of just the leader, and
> I wonder why it is not allowed? Are there any consistency problems with
> allowing it, for example? Is there any way to configure Kafka to allow it?

Yes this should be possible.  I don't think there are any consistency
issues (barring any bugs) since we never expose past the
high-watermark and the follower HW is strictly <= leader HW. Can you
file a jira for this?

> The use case is a Kafka cluster running in EC2 across three availability
> zones.

Out of curiosity - what's the typical latency (distribution) you see
between zones?

Joel


Re: Replica manager exception in broker

2015-05-22 Thread Joel Koshy
Sorry about that - I thought this was the follower since you mentioned
"This is the follower broker of topic test1..." in your email. So this
is a different issue.

The consumer requests should go to the leader. For some reason, this
particular broker does not seem to know that - it would have otherwise
rejected the fetch request with a NotLeaderForPartition exception.
That's also probably why it has not fetched anything from the new
leader. Can you check your state change logs (on the controller) - it
should tell you what happened when this broker became a follower.
Also, you should have some error logs in the consumer - can you take a
look at that? The consumer on an offset out of range error should
reset to the earliest/latest depending on the config.

Thanks,

Joel

On Sat, May 23, 2015 at 01:16:45AM +0800, tao xiao wrote:
> Hi Joel,
> 
> The error offset 206845418 didn't change. The only thing that changed is
> the correlation id and it was incrementing.
> 
> The broker is the follower and I saw similar error messages for other
> topics the broker was a follower for.  As indicated by the log this is a
> request coming from a consumer not follower. One thing I don't quite
> understand is that consumer requests for the topic (test1) should go to the
> leader not follower so why there were consumer requests connecting to the
> broker? The other issue I noticed is that the replica fetcher threads from
> the follower didn't fetch any data at all from leader the log file size in
> follower didn't grow for several hours
> 
> On Sat, May 23, 2015 at 12:40 AM, Joel Koshy  wrote:
> 
> > When you say "keeps getting below exception" I'm assuming that the
> > error offset (206845418) keeps changing - right? We saw a similar
> > issue in the past and it turned out to be due to a NIC issue - i.e.,
> > it negotiated at a low speed. So the replica fetcher couldn't keep up
> > with the leader. i.e., while it caught up within the first segment the
> > leader's log would roll (i.e., the segment would get deleted) and we
> > would see the out of range error. Is this broker a follower for other
> > partitions? Do those partitions show up in these error message?
> >
> > On Fri, May 22, 2015 at 03:11:09PM +0800, tao xiao wrote:
> > > Hi team,
> > >
> > > One of the brokers keeps getting below exception.
> > >
> > > [2015-05-21 23:56:52,687] ERROR [Replica Manager on Broker 15]: Error
> > when
> > > processing fetch request for partition [test1,0] offset 206845418 from
> > > consumer with correlation id 93748260. Possible cause: Request for offset
> > > 206845418 but we only have log segments in the range 207804287 to
> > > 207804287. (kafka.server.ReplicaManager)
> > > This is the follower broker of topic test1 and ISR of that topic has
> > only 1
> > > broker left right now. Just wanted to know what cause this issue and how
> > I
> > > can prevent it?
> > >
> > > --
> > > Regards,
> > > Tao
> >
> >
> 
> 
> -- 
> Regards,
> Tao



Re: Mirrormaker stops consuming

2015-05-22 Thread Joel Koshy
The issue is that multiple consumers feed into all the data channels.
So they will all eventually block if any data channel becomes full.
The mirror maker on trunk is significantly different so this is not an
issue on trunk.

On Fri, May 22, 2015 at 12:37:01PM -0400, Rajasekar Elango wrote:
> Thanks for pointers Joel. Will look into SSLSocketChannel. Yes this was
> working fine before upgrade.
> 
> If its just one producer thread stuck on write, it might affect only one
> consumer thread/partition. But we found consuming stopped for all
> topic/partitions. Or Is it only single  data channel shared between all
> producer and consumer threads..?
> 
> Thanks,
> Raja.
> 
> 
> On Fri, May 22, 2015 at 12:12 PM, Joel Koshy  wrote:
> 
> > The threaddump suggests that one of the producers
> > (mirrormaker-producer-6) is blocked on write for some reason. So the
> > data-channel for that producer (which sits between the consumers and
> > the producer) is full which blocks the consumers from progressing.
> >
> > This appears to be in your (custom) SSLSocketChannel code. If you take
> > consecutive threaddumps I'm guessing you would see the same trace. If
> > this is reproducible can you do that? You can also hook up jvisualvm
> > or yourkit to see which threads are active and it may well be that
> > producer in a tight loop on the writeCompletely. Just to confirm you
> > did not see this issue before upgrading?
> >
> > Joel
> >
> > On Fri, May 22, 2015 at 11:35:19AM -0400, Rajasekar Elango wrote:
> > > We recently upgraded to kafka 0.8.2.1 and found issues with mirrormaker
> > > that randomly stops consuming. We had to restart the mirrormaker process
> > to
> > > resolve the problem. This problem has occurred several times in past two
> > > weeks.
> > >
> > > Here is what I found in analysis:
> > >
> > > When this problem happens:
> > >
> > > Mirrormaker log stopped rolling (ie nothing in logs) . Last couple of
> > > messages in mirrormaker log are ProducerSendThread producing to
> > > destination. No errors or exceptions.
> > >
> > > Mirrormaker consumer offset doesn't increase. ConsumerOffsetChecker shows
> > > mirrormaker consumer offset stops incrementing.
> > >
> > > Mirrormaker consumer MinFetch rate jmx metric drops to zero.
> > > ConsumerTopicMetric.BytesPerSec drops to zero.
> > >
> > > So its mirrormaker consumer should have stopped accepting new data.
> > >
> > > Can some one provide input on how to trouble shoot this problem further
> > and
> > > identify root cause?
> > >
> > > Got Thread dump before restarting, it looks ok to me, no blocked thread.
> > > Here is thread dump output
> > >
> > > 2015-05-21 18:59:09
> > > Full thread dump Java HotSpot(TM) 64-Bit Server VM (24.76-b04 mixed
> > mode):
> > >
> > > "Attach Listener" daemon prio=10 tid=0x7f7248002000 nid=0x2d53
> > waiting
> > > on condition [0x]
> > >java.lang.Thread.State: RUNNABLE
> > >
> > >Locked ownable synchronizers:
> > > - None
> > >
> > >
> > "ConsumerFetcherThread-mirrormakerProd_ops-mmrs1-2-tyo.ops.sfdc.net-1431458688650-fb15f395-0-2"
> > > prio=10 tid=0x7f71e407e000 nid=0x3425 waiting on condition
> > > [0x7f72833f2000]
> > >java.lang.Thread.State: WAITING (parking)
> > > at sun.misc.Unsafe.park(Native Method)
> > > - parking to wait for  <0x00042cd15cc8> (a
> > > java.util.concurrent.locks.AbstractQueuedSynchronizer$ConditionObject)
> > > at java.util.concurrent.locks.LockSupport.park(LockSupport.java:186)
> > > at
> > >
> > java.util.concurrent.locks.AbstractQueuedSynchronizer$ConditionObject.await(AbstractQueuedSynchronizer.java:2043)
> > > at
> > >
> > java.util.concurrent.LinkedBlockingQueue.put(LinkedBlockingQueue.java:349)
> > > at
> > > kafka.consumer.PartitionTopicInfo.enqueue(PartitionTopicInfo.scala:60)
> > > at
> > >
> > kafka.consumer.ConsumerFetcherThread.processPartitionData(ConsumerFetcherThread.scala:49)
> > > at
> > >
> > kafka.server.AbstractFetcherThread$$anonfun$processFetchRequest$1$$anonfun$apply$mcV$sp$2.apply(AbstractFetcherThread.scala:129)
> > > at
> > >
> > kafka.server.AbstractFetcherThread$$anonfun$processFetchRequest$1$$anonfun$apply$mcV$sp$2.apply(AbstractFetcherThread.scala

Re: Replica manager exception in broker

2015-05-22 Thread Joel Koshy
When you say "keeps getting below exception" I'm assuming that the
error offset (206845418) keeps changing - right? We saw a similar
issue in the past and it turned out to be due to a NIC issue - i.e.,
it negotiated at a low speed. So the replica fetcher couldn't keep up
with the leader. i.e., while it caught up within the first segment the
leader's log would roll (i.e., the segment would get deleted) and we
would see the out of range error. Is this broker a follower for other
partitions? Do those partitions show up in these error message?

On Fri, May 22, 2015 at 03:11:09PM +0800, tao xiao wrote:
> Hi team,
> 
> One of the brokers keeps getting below exception.
> 
> [2015-05-21 23:56:52,687] ERROR [Replica Manager on Broker 15]: Error when
> processing fetch request for partition [test1,0] offset 206845418 from
> consumer with correlation id 93748260. Possible cause: Request for offset
> 206845418 but we only have log segments in the range 207804287 to
> 207804287. (kafka.server.ReplicaManager)
> This is the follower broker of topic test1 and ISR of that topic has only 1
> broker left right now. Just wanted to know what cause this issue and how I
> can prevent it?
> 
> -- 
> Regards,
> Tao



Re: Mirrormaker stops consuming

2015-05-22 Thread Joel Koshy
The threaddump suggests that one of the producers
(mirrormaker-producer-6) is blocked on write for some reason. So the
data-channel for that producer (which sits between the consumers and
the producer) is full which blocks the consumers from progressing.

This appears to be in your (custom) SSLSocketChannel code. If you take
consecutive threaddumps I'm guessing you would see the same trace. If
this is reproducible can you do that? You can also hook up jvisualvm
or yourkit to see which threads are active and it may well be that
producer in a tight loop on the writeCompletely. Just to confirm you
did not see this issue before upgrading?

Joel

On Fri, May 22, 2015 at 11:35:19AM -0400, Rajasekar Elango wrote:
> We recently upgraded to kafka 0.8.2.1 and found issues with mirrormaker
> that randomly stops consuming. We had to restart the mirrormaker process to
> resolve the problem. This problem has occurred several times in past two
> weeks.
> 
> Here is what I found in analysis:
> 
> When this problem happens:
> 
> Mirrormaker log stopped rolling (ie nothing in logs) . Last couple of
> messages in mirrormaker log are ProducerSendThread producing to
> destination. No errors or exceptions.
> 
> Mirrormaker consumer offset doesn't increase. ConsumerOffsetChecker shows
> mirrormaker consumer offset stops incrementing.
> 
> Mirrormaker consumer MinFetch rate jmx metric drops to zero.
> ConsumerTopicMetric.BytesPerSec drops to zero.
> 
> So its mirrormaker consumer should have stopped accepting new data.
> 
> Can some one provide input on how to trouble shoot this problem further and
> identify root cause?
> 
> Got Thread dump before restarting, it looks ok to me, no blocked thread.
> Here is thread dump output
> 
> 2015-05-21 18:59:09
> Full thread dump Java HotSpot(TM) 64-Bit Server VM (24.76-b04 mixed mode):
> 
> "Attach Listener" daemon prio=10 tid=0x7f7248002000 nid=0x2d53 waiting
> on condition [0x]
>java.lang.Thread.State: RUNNABLE
> 
>Locked ownable synchronizers:
> - None
> 
> "ConsumerFetcherThread-mirrormakerProd_ops-mmrs1-2-tyo.ops.sfdc.net-1431458688650-fb15f395-0-2"
> prio=10 tid=0x7f71e407e000 nid=0x3425 waiting on condition
> [0x7f72833f2000]
>java.lang.Thread.State: WAITING (parking)
> at sun.misc.Unsafe.park(Native Method)
> - parking to wait for  <0x00042cd15cc8> (a
> java.util.concurrent.locks.AbstractQueuedSynchronizer$ConditionObject)
> at java.util.concurrent.locks.LockSupport.park(LockSupport.java:186)
> at
> java.util.concurrent.locks.AbstractQueuedSynchronizer$ConditionObject.await(AbstractQueuedSynchronizer.java:2043)
> at
> java.util.concurrent.LinkedBlockingQueue.put(LinkedBlockingQueue.java:349)
> at
> kafka.consumer.PartitionTopicInfo.enqueue(PartitionTopicInfo.scala:60)
> at
> kafka.consumer.ConsumerFetcherThread.processPartitionData(ConsumerFetcherThread.scala:49)
> at
> kafka.server.AbstractFetcherThread$$anonfun$processFetchRequest$1$$anonfun$apply$mcV$sp$2.apply(AbstractFetcherThread.scala:129)
> at
> kafka.server.AbstractFetcherThread$$anonfun$processFetchRequest$1$$anonfun$apply$mcV$sp$2.apply(AbstractFetcherThread.scala:110)
> at
> scala.collection.immutable.HashMap$HashMap1.foreach(HashMap.scala:224)
> at
> scala.collection.immutable.HashMap$HashTrieMap.foreach(HashMap.scala:403)
> at
> scala.collection.immutable.HashMap$HashTrieMap.foreach(HashMap.scala:403)
> at
> kafka.server.AbstractFetcherThread$$anonfun$processFetchRequest$1.apply$mcV$sp(AbstractFetcherThread.scala:110)
> at
> kafka.server.AbstractFetcherThread$$anonfun$processFetchRequest$1.apply(AbstractFetcherThread.scala:110)
> at
> kafka.server.AbstractFetcherThread$$anonfun$processFetchRequest$1.apply(AbstractFetcherThread.scala:110)
> at kafka.utils.Utils$.inLock(Utils.scala:535)
> at
> kafka.server.AbstractFetcherThread.processFetchRequest(AbstractFetcherThread.scala:109)
> at
> kafka.server.AbstractFetcherThread.doWork(AbstractFetcherThread.scala:87)
> at kafka.utils.ShutdownableThread.run(ShutdownableThread.scala:60)
> 
>Locked ownable synchronizers:
> - <0x00042ea62eb0> (a
> java.util.concurrent.locks.ReentrantLock$NonfairSync)
> 
> "ConsumerFetcherThread-mirrormakerProd_ops-mmrs1-2-tyo.ops.sfdc.net-1431458688650-fb15f395-0-3"
> prio=10 tid=0x7f71e407b000 nid=0x3424 waiting on condition
> [0x7f7281f99000]
>java.lang.Thread.State: WAITING (parking)
> at sun.misc.Unsafe.park(Native Method)
> - parking to wait for  <0x00042ccece80> (a
> java.util.concurrent.locks.AbstractQueuedSynchronizer$ConditionObject)
> at java.util.concurrent.locks.LockSupport.park(LockSupport.java:186)
> at
> java.util.concurrent.locks.AbstractQueuedSynchronizer$ConditionObject.await(AbstractQueuedSynchronizer.java:2043)
> at
> java.util.concurrent.LinkedBlockingQueue.put(LinkedBlockingQueue.java:349)
> at
> kafka.consumer.PartitionTopicInfo.enqueue(PartitionTopicIn

Re: New Producer API - batched sync mode support

2015-04-27 Thread Joel Koshy

>   Fine grained tracking of status of individual events is quite painful in
> contrast to simply blocking on every batch. Old style Batched-sync mode
> has great advantages in terms of simplicity and performance.

I may be missing something, but I'm not so convinced that it is that
painful/very different from the old-style.

In the old approach, you would compose a batch (in a list of messages)
and do a synchronous send:

try {
  producer.send(recordsToSend)
}
catch (...) {
  // handle (e.g., retry sending recordsToSend)
}

In the new approach, you would do (something like) this:

for (record: recordsToSend) {
  futureList.add(producer.send(record));
}
producer.flush();
for (result: futureList) {
  try { result.get(); }
  catch (...) { // handle (e.g., retry sending recordsToSend) }
}




Re: New Producer API - batched sync mode support

2015-04-27 Thread Joel Koshy
As long as you retain the returned futures somewhere, you can always
iterate over the futures after the flush completes and check for
success/failure. Would that work for you?

On Mon, Apr 27, 2015 at 08:53:36PM +, Roshan Naik wrote:
> The important guarantee that is needed for a client producer thread is
> that it requires an indication of success/failure of the batch of events
> it pushed. Essentially it needs to retry producer.send() on that same
> batch in case of failure. My understanding is that flush will simply flush
> data from all threads (correct me if I am wrong).
> 
> -roshan
> 
> 
> 
> On 4/27/15 1:36 PM, "Joel Koshy"  wrote:
> 
> >This sounds like flush:
> >https://cwiki.apache.org/confluence/display/KAFKA/KIP-8+-+Add+a+flush+meth
> >od+to+the+producer+API
> >
> >which was recently implemented in trunk.
> >
> >Joel
> >
> >On Mon, Apr 27, 2015 at 08:19:40PM +, Roshan Naik wrote:
> >> Been evaluating the perf of old and new Produce APIs for reliable high
> >>volume streaming data movement. I do see one area of improvement that
> >>the new API could use for synchronous clients.
> >> 
> >> AFAIKT, the new API does not support batched synchronous transfers. To
> >>do synchronous send, one needs to do a future.get() after every
> >>Producer.send(). I changed the new
> >>o.a.k.clients.tools.ProducerPerformance tool to asses the perf of this
> >>mode of operation. May not be surprising that it much slower than the
> >>async mode... hard t push it beyond 4MB/s.
> >> 
> >> The 0.8.1 Scala based producer API supported a batched sync mode via
> >>Producer.send( List ) . My measurements show that it was
> >>able to approach (and sometimes exceed) the old async speeds... 266MB/s
> >> 
> >> 
> >> Supporting this batched sync mode is very critical for streaming
> >>clients (such as flume for example) that need delivery guarantees.
> >>Although it can be done with Async mode, it requires additional book
> >>keeping as to which events are delivered and which ones are not. The
> >>programming model becomes much simpler with the batched sync mode.
> >>Client having to deal with one single future.get() helps performance
> >>greatly too as I noted.
> >> 
> >> Wanted to propose adding this as an enhancement to the new Producer API.
> >
> 



Re: New Producer API - batched sync mode support

2015-04-27 Thread Joel Koshy
This sounds like flush:
https://cwiki.apache.org/confluence/display/KAFKA/KIP-8+-+Add+a+flush+method+to+the+producer+API

which was recently implemented in trunk.

Joel

On Mon, Apr 27, 2015 at 08:19:40PM +, Roshan Naik wrote:
> Been evaluating the perf of old and new Produce APIs for reliable high volume 
> streaming data movement. I do see one area of improvement that the new API 
> could use for synchronous clients.
> 
> AFAIKT, the new API does not support batched synchronous transfers. To do 
> synchronous send, one needs to do a future.get() after every Producer.send(). 
> I changed the new o.a.k.clients.tools.ProducerPerformance tool to asses the 
> perf of this mode of operation. May not be surprising that it much slower 
> than the async mode... hard t push it beyond 4MB/s.
> 
> The 0.8.1 Scala based producer API supported a batched sync mode via 
> Producer.send( List ) . My measurements show that it was able 
> to approach (and sometimes exceed) the old async speeds... 266MB/s
> 
> 
> Supporting this batched sync mode is very critical for streaming clients 
> (such as flume for example) that need delivery guarantees. Although it can be 
> done with Async mode, it requires additional book keeping as to which events 
> are delivered and which ones are not. The programming model becomes much 
> simpler with the batched sync mode. Client having to deal with one single 
> future.get() helps performance greatly too as I noted.
> 
> Wanted to propose adding this as an enhancement to the new Producer API.



Re: New Offset Management API Question

2015-03-26 Thread Joel Koshy

> 1) Does Offset Commit/Fetch API works with Simple Consumer ?

Yes - in 0.8.2.1. There is an example given at
https://cwiki.apache.org/confluence/display/KAFKA/Committing+and+fetching+consumer+offsets+in+Kafka

> 2) With MM, can you selectively MM offset topic.
> 
> Use Case:
> Let's suppose you want to build Active Consumer Group in DC1 and passive
> Consumer Group (Not yet started in DC 2).  Can you MM single offset topic
> and when DC1 consumer Group goes down, DC2 (with manual or  automated
> custom logic) start same consumer group with last committed offset.  Is
> this possible ?

Not sure I follow - the passive consumer would just need to start up
with the same group right? Then it would pick up the last committed
offset in DC1 for that group.

Thanks,

Joel


Re: High Replica Max Lag

2015-03-13 Thread Joel Koshy
Can you verify that the leaders are evenly spread? and if necessary
run a preferred leader election?

On Fri, Mar 13, 2015 at 05:10:22PM -0700, Zakee wrote:
> I have 35 topics spread with total 398 partitions (2 of them are supposed to 
> be very high volume and so allocated 28 partitions to them, others vary 
> between 5 to 14).
> 
> Thanks
> Zakee
> 
> 
> 
> > On Mar 13, 2015, at 3:25 PM, Joel Koshy  wrote:
> > 
> > I think what people have observed in the past is that increasing
> > num-replica-fetcher-threads has diminishing returns fairly quickly.
> > You may want to instead increase the number of partitions in the topic
> > you are producing to. (How many do you have right now?)
> > 
> > On Fri, Mar 13, 2015 at 02:48:17PM -0700, Zakee wrote:
> >> Hi Mayuresh,
> >> 
> >> I have currently set this property to 4 and I see from the logs that it 
> >> starts 12 threads on each broker. I will try increasing it further.
> >> 
> >> Thanks
> >> Zakee
> >> 
> >> 
> >> 
> >>> On Mar 13, 2015, at 11:53 AM, Mayuresh Gharat 
> >>>  wrote:
> >>> 
> >>> You might want to increase the number of Replica Fetcher threads by 
> >>> setting
> >>> this property : *num.replica.fetchers*.
> >>> 
> >>> Thanks,
> >>> 
> >>> Mayuresh
> >>> 
> >>> On Thu, Mar 12, 2015 at 10:39 PM, Zakee  wrote:
> >>> 
> >>>> With the producer throughput as large as > 150MB/s to 5 brokers on a
> >>>> continuous basis, I see a consistently high value for Replica Max Lag (in
> >>>> millions). Is this normal or there is a way to tune so as to reduce 
> >>>> replica
> >>>> MaxLag?
> >>>> As per documentation, replica max lag (in messages) between follower and
> >>>> leader replicas, should be less than replica.lag.max.messages (currently
> >>>> set to 5000)
> >>>> 
> >>>> 
> >>>> Thanks
> >>>> Zakee
> >>>> 
> >>>> 
> >>>> 
> >>>> 
> >>>> Old School Yearbook Pics
> >>>> View Class Yearbooks Online Free. Search by School & Year. Look Now!
> >>>> http://thirdpartyoffers.netzero.net/TGL3231/550278012ef8378006048st04duc
> >>> 
> >>> 
> >>> 
> >>> 
> >>> -- 
> >>> -Regards,
> >>> Mayuresh R. Gharat
> >>> (862) 250-7125
> >>> 
> >>> What's your flood risk?
> >>> Find flood maps, interactive tools, FAQs, and agents in your area.
> >>> http://thirdpartyoffers.netzero.net/TGL3255/550336d785c4236d61d0cmp05duc
> >> 
> > 
> > 
> > The WORST exercise for aging
> > Avoid this "healthy" exercise to look & feel 5-10 years YOUNGER
> > http://thirdpartyoffers.netzero.net/TGL3255/550371f12e6ab71f1228amp01duc
> > 
> 



Re: High Replica Max Lag

2015-03-13 Thread Joel Koshy
I think what people have observed in the past is that increasing
num-replica-fetcher-threads has diminishing returns fairly quickly.
You may want to instead increase the number of partitions in the topic
you are producing to. (How many do you have right now?)

On Fri, Mar 13, 2015 at 02:48:17PM -0700, Zakee wrote:
> Hi Mayuresh,
> 
> I have currently set this property to 4 and I see from the logs that it 
> starts 12 threads on each broker. I will try increasing it further.
> 
> Thanks
> Zakee
> 
> 
> 
> > On Mar 13, 2015, at 11:53 AM, Mayuresh Gharat  
> > wrote:
> > 
> > You might want to increase the number of Replica Fetcher threads by setting
> > this property : *num.replica.fetchers*.
> > 
> > Thanks,
> > 
> > Mayuresh
> > 
> > On Thu, Mar 12, 2015 at 10:39 PM, Zakee  wrote:
> > 
> >> With the producer throughput as large as > 150MB/s to 5 brokers on a
> >> continuous basis, I see a consistently high value for Replica Max Lag (in
> >> millions). Is this normal or there is a way to tune so as to reduce replica
> >> MaxLag?
> >> As per documentation, replica max lag (in messages) between follower and
> >> leader replicas, should be less than replica.lag.max.messages (currently
> >> set to 5000)
> >> 
> >> 
> >> Thanks
> >> Zakee
> >> 
> >> 
> >> 
> >> 
> >> Old School Yearbook Pics
> >> View Class Yearbooks Online Free. Search by School & Year. Look Now!
> >> http://thirdpartyoffers.netzero.net/TGL3231/550278012ef8378006048st04duc
> > 
> > 
> > 
> > 
> > -- 
> > -Regards,
> > Mayuresh R. Gharat
> > (862) 250-7125
> > 
> > What's your flood risk?
> > Find flood maps, interactive tools, FAQs, and agents in your area.
> > http://thirdpartyoffers.netzero.net/TGL3255/550336d785c4236d61d0cmp05duc
> 



Re: Log cleaner patch (KAFKA-1641) on 0.8.2.1

2015-03-13 Thread Joel Koshy
+1 - if you have a way to reproduce that would be ideal. We don't know
the root cause of this yet. Our guess is a corner case around
shutdowns, but not sure.

On Fri, Mar 13, 2015 at 03:13:45PM -0700, Jun Rao wrote:
> Is there a way that you can reproduce this easily?
> 
> Thanks,
> 
> Jun
> 
> On Fri, Mar 13, 2015 at 8:13 AM, Marc Labbe  wrote:
> 
> > No exactly, the topics are compacted but messages are not compressed.
> >
> > I get the exact same error though. Any other options I should consider?
> > We're on 0.8.2.0 and we also had this on 0.8.1.1 before.
> >
> > marc
> >
> > On Fri, Mar 13, 2015 at 10:47 AM, Jun Rao  wrote:
> >
> > > Did you get into that issue for the same reason as in the jira, i.e.,
> > > somehow compressed messages were sent to the compact topics?
> > >
> > > Thanks,
> > >
> > > Jun
> > >
> > > On Fri, Mar 13, 2015 at 6:45 AM, Marc Labbe  wrote:
> > >
> > > > Hello,
> > > >
> > > > we're often seeing log cleaner exceptions reported in KAFKA-1641 and
> > I'd
> > > > like to know if it's safe to apply the patch from that issue resolution
> > > to
> > > > 0.8.2.1?
> > > >
> > > > Reference: https://issues.apache.org/jira/browse/KAFKA-1641
> > > >
> > > > Also there are 2 patches in there, I suppose I should be using only the
> > > > latest of the two.
> > > >
> > > > thanks!
> > > > marc
> > > >
> > >
> >



Re: moving replications

2015-03-04 Thread Joel Koshy
I think what you may be looking for is being discussed here:
https://cwiki.apache.org/confluence/display/KAFKA/KIP-6+-+New+reassignment+partition+logic+for+rebalancing

On Wed, Mar 04, 2015 at 12:34:30PM +0530, sunil kalva wrote:
> Is there any way to automate
> On Mar 3, 2015 11:57 AM, "sunil kalva"  wrote:
> 
> > Why can't kafka automatically rebalances partitions with new broker and
> > adjust with existing brokers ?
> > Why should we run manually ?
> >
> > On Tue, Mar 3, 2015 at 6:41 AM, Gwen Shapira 
> > wrote:
> >
> >> I think the ReassignPartitionsTool does what you need, at least partially.
> >>
> >> It will move partitions of given topics to a new set of brokers - this
> >> includes replicas and leaders from what I can tell.
> >>
> >> Here's the documentation of one of the options:
> >>
> >> topics-to-move-json-file: Generate a reassignment configuration to move
> >> the partitions of the specified topics to the list of brokers specified by
> >> the --broker-list option. The format to use is...
> >>
> >> So it sounds like what you looked for?
> >>
> >> Since the tool runs in "dry run" mode, you can try and see without
> >> actually moving anything.
> >>
> >> Gwen
> >>
> >>
> >>
> >> On Mar 2, 2015 10:05 AM, "sunil kalva"  wrote:
> >>
> >>> Shapira
> >>>
> >>> tx for quick reply, but this tool elects a new leader for a given
> >>> partition from the existing replicas of that partition.
> >>> But my problem is basically move one replica completely from old broker
> >>> to new broker and eventually move leader also to new broker (with out
> >>> incrementing replica count for that partition)
> >>> Please let me know if more information required.
> >>>
> >>> t
> >>> SunilKalva
> >>>
> >>> On Mon, Mar 2, 2015 at 10:43 PM, Gwen Shapira 
> >>> wrote:
> >>>
>  Take a look at the Reassign Partition Tool. It lets you specify which
>  replica exists on which broker:
> 
>  https://cwiki.apache.org/confluence/display/KAFKA/Replication+tools#Replicationtools-6.ReassignPartitionsTool
> 
>  Its a bit tricky to use, so feel free to follow up with more questions
>  :)
> 
>  Gwen
> 
>  On Mon, Mar 2, 2015 at 7:59 AM, sunil kalva 
>  wrote:
>  > Hi
>  > How to move replications from one broker to another broker ?
>  >
>  > --
>  > SunilKalva
> 
> >>>
> >>>
> >>>
> >>> --
> >>> SunilKalva
> >>>
> >>
> >
> >
> > --
> > SunilKalva
> >



Re: high level consumer rollback

2015-03-04 Thread Joel Koshy
This is not possible with the current high-level consumer without a
restart, but the new consumer (under development) does have support
for this.

On Wed, Mar 04, 2015 at 03:04:57PM -0500, Luiz Geovani Vier wrote:
> Hello,
> 
> I'm using the high level consumer with auto-commit disabled and a
> single thread per consumer, in order to consume messages in batches.
> In case of failures on the database, I'd like to stop processing,
> rollback and restart from the last commited offset.
> Is there a way to receive the messages since the last commit again,
> without reconnecting? (something like a reset on KafkaStream or
> ConsumerIterator)
> 
> Thanks,
> -Geovani



Re: Trying to get kafka data to Hadoop

2015-03-04 Thread Joel Koshy
I think the camus mailing list would be more suitable for this
question.

Thanks,

Joel

On Wed, Mar 04, 2015 at 11:00:51AM -0500, max square wrote:
> Hi all,
> 
> I have browsed through different conversations around Camus, and bring this
> as a kinda Kafka question. I know is not the most orthodox, but if someone
> has some thoughts I'd appreciate ir.
> 
> That said, I am trying to set up Camus, using a 3 node Kafka cluster
> 0.8.2.1, using a project that is trying to build Avro Schema-Repo
> . All of the Avro schemas for
> the topics are published correctly. I am building Camus and using:
> 
> hadoop jar camus-example-0.1.0-SNAPSHOT-shaded.jar com.linkedin.camus.etl.
> kafka.CamusJob -libjars $CAMUS_LIBJARS  -D mapreduce.job.user.classpath.
> first=true -P config.properties
> 
> As the command to start the job, where I have set up an environment
> variable that holds all the libjars that the mvn package command generates.
> 
> I have also set the following properties to configure the job:
> camus.message.decoder.class=com.linkedin.camus.etl.kafka.coders.
> LatestSchemaKafkaAvroMessageDecoder
> kafka.message.coder.schema.registry.class=com.linkedin.camus.schemaregistry.
> AvroRestSchemaRegistry
> 
> etl.schema.registry.url=http://10.0.14.25:2876/schema-repo/
> 
> When I execute the job I get an Exception indicating the
> AvroRestSchemaRegistry class can't be found (I've double checked it's part
> of the libjars). I wanted to ask if this is the correct way to set up this
> integration, and if anyone has pointers on why the job is not finding the
> class AvroRestSchemaRegistry
> 
> Thanks in advance for the help!
> 
> Max
> 
> Follows the complete stack trace:
> 
> [CamusJob] - failed to create decoder
> 
> com.linkedin.camus.coders.MessageDecoderException: com.linkedin.camus.coders
> .MessageDecoderException:java.lang.ClassNotFoundException: com.linkedin.
> camus.schemaregistry.AvroRestSchemaRegistry
>at com.linkedin.camus.etl.kafka.coders.MessageDecoderFactory.
> createMessageDecoder(MessageDecoderFactory.java:29)
> 
>   at com.linkedin.camus.etl.kafka.mapred.EtlInputFormat.createMessageDecoder
> (EtlInputFormat.java:391)
> 
>at com.linkedin.camus.etl.kafka.mapred.EtlInputFormat.getSplits(
> EtlInputFormat.java:256)
> 
>at org.apache.hadoop.mapred.JobClient.writeNewSplits(JobClient.java:
> 1107)
> 
>at org.apache.hadoop.mapred.JobClient.writeSplits(JobClient.java:1124
> )
> 
>  at org.apache.hadoop.mapred.JobClient.access$600(JobClient.java:178)
> 
>at org.apache.hadoop.mapred.JobClient$2.run(JobClient.java:1023)
> 
>at org.apache.hadoop.mapred.JobClient$2.run(JobClient.java:976)
> 
>at java.security.AccessController.doPrivileged(Native Method)
> 
>at javax.security.auth.Subject.doAs(Subject.java:415)
> 
>at org.apache.hadoop.security.UserGroupInformation.doAs(
> UserGroupInformation.java:1642)
> 
>at org.apache.hadoop.mapred.JobClient.submitJobInternal(JobClient.
> java:976)
> 
>at org.apache.hadoop.mapreduce.Job.submit(Job.java:582)
> 
>at com.linkedin.camus.etl.kafka.CamusJob.run(CamusJob.java:335)
> 
>at com.linkedin.camus.etl.kafka.CamusJob.run(CamusJob.java:563)
> 
>at org.apache.hadoop.util.ToolRunner.run(ToolRunner.java:70)
> 
>at org.apache.hadoop.util.ToolRunner.run(ToolRunner.java:84)
> 
>at com.linkedin.camus.etl.kafka.CamusJob.main(CamusJob.java:518)
> 
>at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
> 
>at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodessorImpl.
> java:57)
> 
>at sun.reflect.DelegatingMethodAccessorImpl.invoke(
> DelegatingMethodAccessorImpl.java:43)
> 
>at java.lang.reflect.Method.invoke(Method.



Re: Why The Division Between Scala And Java

2015-02-23 Thread Joel Koshy
We will eventually only have Java clients.

For your specific question: javaapi.SimpleConsumer and
consumer.SimpleConsumer - there are some arguments that contain
scala-specific constructs. E.g., scala maps which cannot be created in
Java. This is why we expose a javaapi variant which takes Java
collections and uses conversions to convert to scala equivalents which
is then passed to the underlying scala consumer.

On Sun, Feb 22, 2015 at 04:45:23PM -0800, Stephen Boesch wrote:
> so there will be both scala and java clients?  or will scala users simply
> import the java libraries (which is after all not too bad)
> 
> 2015-02-22 16:30 GMT-08:00 Guozhang Wang :
> 
> > Alex,
> >
> > Before 0.8 Kafka is written in Scala, and in 0.8.2 we are re-writing the
> > clients in Java for better clients adoption while the server is still under
> > Scala. The plan after the Java clients also includes migrating the common
> > utils / error code / request formats to Java that will be used for both
> > clients and servers.
> >
> > Guozhang
> >
> > On Sat, Feb 21, 2015 at 3:35 PM, Alex Melville 
> > wrote:
> >
> > > Hi All,
> > >
> > >
> > > Why does the Kafka codebase contain both Scala and Java code? There are
> > > even some cases where the same class (i.e. javaapi.SimpleConsumer and
> > > kafka.consumer.SimpleConsumer). Is it just to allow a Scala developer to
> > > write Scala and a Java developer to use Java? We are trying to use the
> > > SimpleConsumer to gather more in-depth information about the partitions
> > on
> > > a topic, and the documentation is fragmented and consequently it's
> > unclear
> > > whether we have to use one of either Scala or Java, or if we have
> > complete
> > > freedom to choose either (none of us have programmed in Scala, so we're
> > > inclined to choose the Java one).
> > >
> > > Functionally are both the scala and java codebases the same?
> > >
> > > Thanks in advance,
> > >
> > > Alex Melville
> > >
> >
> >
> >
> > --
> > -- Guozhang
> >



Re: Simple Consumer and offsets

2015-02-23 Thread Joel Koshy
> Assuming one can store the offset in an external location (redis/db
> etc), along with the rest of the state that a program requires,
> wouldn't it be possible to manage things such that, you use the High
> Level API with auto commit turned off and do your custom offset
> management followed by the kafka commit api call (probably delayed
> to give a breather to zookeeper)? 

You can certainly manage your offsets outside Kafka but you may not be
able to use the existing high level consumer (e.g., Camus does this).
The reason is that on a consumer rebalance you need to stop fetching,
commit offsets, and then fetch offsets before resuming. These barrier
points are not exposed in 0.8.2.

On Fri, Feb 20, 2015 at 07:53:05AM +, Arunkumar Srambikkal (asrambik) wrote:
> If I may use the same thread to discuss the exact same issue  
> 
> Assuming one can store the offset in an external location (redis/db etc), 
> along with the rest of the state that a program requires, wouldn't it be 
> possible to manage things such that, you use the High Level API with auto 
> commit turned off and do your custom offset management followed by the kafka 
> commit api call (probably delayed to give a breather to zookeeper)? 
> 
> That way in the failure scenario, the high level consumer offset would ALWAYS 
> be only smaller than what is actually valid and you can skip forward and 
> avoid using the simple consumer.
> 
> I assume one needs the simple consumer in the offset management use case, 
> only we want to skip back to an older offset / use Kafka for storing offsets? 
> 
> I was trying to handle the customer failure scenario but avoiding the simple 
> consumer and all the complexities it ensues. 
> 
> Does this work or is there anything wrong with this picture? 
> 
> Thanks
> Arun
> 
> On Thu, Feb 19, 2015 at 03:29:19PM +, Suren wrote:
> > We are using the High Level Consumer API to interact with Kafka for our 
> > normal use cases.
> > 
> > However, on consumer restart in the case of consumer failures, we want 
> > to be able to manually reset offsets in certain situations.
> > And ideally we'd like to use the same api in 0.8.1 and 0.8.2. :-) It 
> > looked like instantiating a SimpleConsumer just to reset offsets on restart 
> > was a viable option, while continuing to use the High Level Consumer for 
> > our normal operations. Not sure if there is a better way that is compatible 
> > across 0.8.1 and 0.8.2.
> > -Suren
> >  
> > 
> >  On Thursday, February 19, 2015 10:25 AM, Joel Koshy 
> >  wrote:
> >
> > 
> >  Not sure what you mean by using the SimpleConsumer on failure 
> > recovery. Can you elaborate on this?
> > 
> > On Thu, Feb 19, 2015 at 03:04:47PM +, Suren wrote:
> > > Haven't used either one now. Sounds like 0.8.2.1 will help.
> > > We are using the High Level Consumer generally but are thinking to use 
> > > the SimpleConsumer on failure recovery to set the offsets.
> > > Is that the recommended approach for this use case?
> > > Thanks.
> > > -Suren
> > >  
> > > 
> > >  On Thursday, February 19, 2015 9:40 AM, Joel Koshy 
> > >  wrote:
> > >
> > > 
> > >  Are you using it from Java or Scala? i.e., are you using the  
> > >javaapi.SimpleConsumer or kafka.consumer.SimpleConsumer
> > > 
> > > In 0.8.2 javaapi we explicitly set version 0 of the 
> > > OffsetCommitRequest/OffsetFetchRequest which means it will 
> > > commit/fetch to/from ZooKeeper only. If you use the scala API you 
> > > can create an OffsetCommitRequest with version set to 1 (which will 
> > > allow you to commit to Kafka).
> > > 
> > > Since we are doing an 0.8.2.1 release we will make the above more 
> > > consistent. i.e., you can create OffsetCommitRequests with version 1 
> > > even from the javaapi. I will be updating the documentation on this 
> > > to make it clearer.
> > > 
> > > Thanks,
> > > 
> > > Joel
> > > 
> > > On Thu, Feb 19, 2015 at 02:28:32PM +, Suren wrote:
> > > > Joel,
> > > > Looking at SimpleConsumer in the 0.8.2 code, it is using 
> > > > OffsetCommitRequest and sending that over to a broker.
> > > > Is the broker storing that in ZK?
> > > > -Suren
> > > >  
> > > > 
> > > >  On Tuesday, February 17, 2015 12:22 PM, Joel Koshy 
> > > >  wrote:
> > > >
> > > > 
> > > >  Hi Chris,
> > > > 
> > &

Re: Commit offset with SimpleConsumer in 0.8.2

2015-02-23 Thread Joel Koshy
Can you add yourself as a watcher on KAFKA-1729? I will update that
when I fix the example on the wiki.

On Sun, Feb 22, 2015 at 10:16:44PM +0100, Jochen Mader wrote:
> I have a hard time figuring out how to do a commit using API 0.8.2 on JDK 8.
> 
> I tried using the examples from 0.8.1.1.
> 
> First of all: I can't use OffsetMetadataAndError inside the
> offsets-map as it was possible in 0.8.1. I can't really find a
> difference, but builds break.
> 
> I'm also unable to figure out all of the required parameters.
> 
>  and  are obviuous.
> 
> I suppose the  is the client-name I picked for my consumer.
> 
> But what is the value expected for the ?
> 
> Here is my code that won't build with 0.8.2 on JDK 8:
> 
> Map offsets = new HashMap<>();
> 
> offsets.put(new TopicAndPartition(**, **), new
> OffsetMetadataAndError(messageAndOffset.offset(), "",
> ErrorMapping.NoError()));
> 
> OffsetCommitRequest ocRequest = new OffsetCommitRequest(**,
> offsets, **, consumer.clientId(),
> kafka.api.OffsetCommitRequest.CurrentVersion());
> OffsetCommitResponse offsetCommitResponse = consumer.commitOffsets(ocRequest);
> 
> 
> Thanks,
> Jochen
> -- 
> Jochen Mader | Lead IT Consultant
> 
> codecentric AG | Elsenheimerstr. 55a | 80687 München | Deutschland
> tel: +49 89 215486633 | fax: +49 89 215486699 | mobil: +49 152 51862390
> www.codecentric.de | blog.codecentric.de | www.meettheexperts.de |
> www.more4fi.de
> 
> Sitz der Gesellschaft: Düsseldorf | HRB 63043 | Amtsgericht Düsseldorf
> Vorstand: Michael Hochgürtel . Mirko Novakovic . Rainer Vehns
> Aufsichtsrat: Patric Fedlmeier (Vorsitzender) . Klaus Jäger . Jürgen Schütz



Re: ability to delete inactive queues?

2015-02-23 Thread Joel Koshy
We do support delete topic. However, this is a client(admin) operation
that is done via zookeeper. It would be useful to do this
automatically on the broker-side. Can you file a jira for this?  It is
not very straightforward to implement this since you would want to
check across all partitions which would be on multiple brokers. An
alternative would be offline tools that scrape the topic-level mbeans
on the broker and proceed to delete topic if the corresponding
incoming message rate has been stuck at zero for a certain period of
time.

On Sun, Feb 22, 2015 at 11:18:23AM -0800, Kevin Burton wrote:
> I’m considering porting an app from ActiveMQ to Kafka so I’m new here.
> Apologizes if this has been asked before.
> 
> Is it possible to delete inactive queues?  Specifically queues with no
> messages on them which haven’t received any new messages in say 5 minutes.
> 
> We use a lot of ephemeral queues which is why I ask.
> 
> -- 
> 
> Founder/CEO Spinn3r.com
> Location: *San Francisco, CA*
> blog: http://burtonator.wordpress.com
> … or check out my Google+ profile
> 
> 



Re: Simple Consumer and offsets

2015-02-19 Thread Joel Koshy
Yeah that is a good point - will do the update as part of the doc
changes in KAFKA-1729

On Thu, Feb 19, 2015 at 09:26:30PM -0500, Evan Huus wrote:
> On Thu, Feb 19, 2015 at 8:43 PM, Joel Koshy  wrote:
> 
> > If you are using v0 of OffsetCommit/FetchRequest then you can issue
> > that to any broker. For version > 0 you will need to issue it to the
> > coordinator. You can discover the coordinator by sending a
> > ConsumerMetadataRequest to any broker.
> >
> 
> The protocol spec [1] still says "Currently the supported version for all
> APIs is 0". Based on your message above that is no longer true, so could
> somebody familiar with the changes please update the spec appropriately?
> 
> Thanks,
> Evan
> 
> [1]
> https://cwiki.apache.org/confluence/display/KAFKA/A+Guide+To+The+Kafka+Protocol
> 
> 
> 
> > On Thu, Feb 19, 2015 at 07:55:16PM +, Suren wrote:
> > > Joel/All,
> > > The SimpleConsumer constructor requires a specific host and port.
> > >
> > > Can this be any broker?
> > > If it needs to be a specific broker, for 0.8.2, should this be the
> > offset coordinator? For 0.8.1, does it matter?
> > > -Suren
> > >
> > >
> > >  On Thursday, February 19, 2015 10:43 AM, Joel Koshy <
> > jjkosh...@gmail.com> wrote:
> > >
> > >
> > >  I see - yes, you can use the SimpleConsumer for that. However, your
> > > high-level consumers need to be shutdown while you do that (otherwise
> > > they may auto-commit while you are resetting offsets).
> > >
> > > Thanks,
> > >
> > > Joel
> > >
> > > On Thu, Feb 19, 2015 at 03:29:19PM +, Suren wrote:
> > > > We are using the High Level Consumer API to interact with Kafka for
> > our normal use cases.
> > > >
> > > > However, on consumer restart in the case of consumer failures, we want
> > to be able to manually
> > > > reset offsets in certain situations.
> > > > And ideally we'd like to use the same api in 0.8.1 and 0.8.2. :-)
> > > > It looked like instantiating a SimpleConsumer just to reset offsets on
> > restart was a viable option, while continuing to use the High Level
> > Consumer for our normal operations. Not sure if there is a better way that
> > is compatible across 0.8.1 and 0.8.2.
> > > > -Suren
> > > >
> > > >
> > > >  On Thursday, February 19, 2015 10:25 AM, Joel Koshy <
> > jjkosh...@gmail.com> wrote:
> > > >
> > > >
> > > >  Not sure what you mean by using the SimpleConsumer on failure
> > > > recovery. Can you elaborate on this?
> > > >
> > > > On Thu, Feb 19, 2015 at 03:04:47PM +, Suren wrote:
> > > > > Haven't used either one now. Sounds like 0.8.2.1 will help.
> > > > > We are using the High Level Consumer generally but are thinking to
> > use the SimpleConsumer on failure recovery to set the offsets.
> > > > > Is that the recommended approach for this use case?
> > > > > Thanks.
> > > > > -Suren
> > > > >
> > > > >
> > > > >  On Thursday, February 19, 2015 9:40 AM, Joel Koshy <
> > jjkosh...@gmail.com> wrote:
> > > > >
> > > > >
> > > > >  Are you using it from Java or Scala? i.e., are you using the
> > > > > javaapi.SimpleConsumer or kafka.consumer.SimpleConsumer
> > > > >
> > > > > In 0.8.2 javaapi we explicitly set version 0 of the
> > > > > OffsetCommitRequest/OffsetFetchRequest which means it will
> > > > > commit/fetch to/from ZooKeeper only. If you use the scala API you can
> > > > > create an OffsetCommitRequest with version set to 1 (which will allow
> > > > > you to commit to Kafka).
> > > > >
> > > > > Since we are doing an 0.8.2.1 release we will make the above more
> > > > > consistent. i.e., you can create OffsetCommitRequests with version 1
> > > > > even from the javaapi. I will be updating the documentation on this
> > to
> > > > > make it clearer.
> > > > >
> > > > > Thanks,
> > > > >
> > > > > Joel
> > > > >
> > > > > On Thu, Feb 19, 2015 at 02:28:32PM +, Suren wrote:
> > > > > > Joel,
> > > > > > Looking at SimpleConsumer in the 0.8.2 code, it is using
> > Offs

Re: Simple Consumer and offsets

2015-02-19 Thread Joel Koshy
If you are using v0 of OffsetCommit/FetchRequest then you can issue
that to any broker. For version > 0 you will need to issue it to the
coordinator. You can discover the coordinator by sending a
ConsumerMetadataRequest to any broker.

On Thu, Feb 19, 2015 at 07:55:16PM +, Suren wrote:
> Joel/All,
> The SimpleConsumer constructor requires a specific host and port.
> 
> Can this be any broker?
> If it needs to be a specific broker, for 0.8.2, should this be the offset 
> coordinator? For 0.8.1, does it matter?
> -Suren
>  
> 
>  On Thursday, February 19, 2015 10:43 AM, Joel Koshy 
>  wrote:
>
> 
>  I see - yes, you can use the SimpleConsumer for that. However, your
> high-level consumers need to be shutdown while you do that (otherwise
> they may auto-commit while you are resetting offsets).
> 
> Thanks,
> 
> Joel
> 
> On Thu, Feb 19, 2015 at 03:29:19PM +, Suren wrote:
> > We are using the High Level Consumer API to interact with Kafka for our 
> > normal use cases.
> > 
> > However, on consumer restart in the case of consumer failures, we want to 
> > be able to manually
> > reset offsets in certain situations.
> > And ideally we'd like to use the same api in 0.8.1 and 0.8.2. :-)
> > It looked like instantiating a SimpleConsumer just to reset offsets on 
> > restart was a viable option, while continuing to use the High Level 
> > Consumer for our normal operations. Not sure if there is a better way that 
> > is compatible across 0.8.1 and 0.8.2.
> > -Suren
> >  
> > 
> >      On Thursday, February 19, 2015 10:25 AM, Joel Koshy 
> > wrote:
> >    
> > 
> >  Not sure what you mean by using the SimpleConsumer on failure
> > recovery. Can you elaborate on this?
> > 
> > On Thu, Feb 19, 2015 at 03:04:47PM +, Suren wrote:
> > > Haven't used either one now. Sounds like 0.8.2.1 will help.
> > > We are using the High Level Consumer generally but are thinking to use 
> > > the SimpleConsumer on failure recovery to set the offsets.
> > > Is that the recommended approach for this use case?
> > > Thanks.
> > > -Suren
> > >  
> > > 
> > >      On Thursday, February 19, 2015 9:40 AM, Joel Koshy 
> > > wrote:
> > >    
> > > 
> > >  Are you using it from Java or Scala? i.e., are you using the
> > > javaapi.SimpleConsumer or kafka.consumer.SimpleConsumer
> > > 
> > > In 0.8.2 javaapi we explicitly set version 0 of the
> > > OffsetCommitRequest/OffsetFetchRequest which means it will
> > > commit/fetch to/from ZooKeeper only. If you use the scala API you can
> > > create an OffsetCommitRequest with version set to 1 (which will allow
> > > you to commit to Kafka).
> > > 
> > > Since we are doing an 0.8.2.1 release we will make the above more
> > > consistent. i.e., you can create OffsetCommitRequests with version 1
> > > even from the javaapi. I will be updating the documentation on this to
> > > make it clearer.
> > > 
> > > Thanks,
> > > 
> > > Joel
> > > 
> > > On Thu, Feb 19, 2015 at 02:28:32PM +, Suren wrote:
> > > > Joel,
> > > > Looking at SimpleConsumer in the 0.8.2 code, it is using 
> > > > OffsetCommitRequest and sending that over to a broker.
> > > > Is the broker storing that in ZK?
> > > > -Suren
> > > >  
> > > > 
> > > >      On Tuesday, February 17, 2015 12:22 PM, Joel Koshy 
> > > > wrote:
> > > >    
> > > > 
> > > >  Hi Chris,
> > > > 
> > > > In 0.8.2, the simple consumer Java API supports committing/fetching
> > > > offsets that are stored in ZooKeeper. You don't need to issue any
> > > > ConsumerMetadataRequest for this. Unfortunately, the API currently
> > > > does not support fetching offsets that are stored in Kafka.
> > > > 
> > > > Thanks,
> > > > 
> > > > Joel
> > > > 
> > > > On Mon, Feb 16, 2015 at 05:02:08PM -0500, Christopher Piggott wrote:
> > > > > Hi,
> > > > > 
> > > > > I am still using 0.8.1.1 because of the CPU use concerns.
> > > > > 
> > > > > I'm confused about why the SimpleConsumer has:
> > > > > 
> > > > > OffsetCommitResponse commitOffsets(OffsetCommitRequest request)
> > > > > 
> > > > > and
> > > > > 
> > > > > OffsetFetchResp

Re: Consuming a snapshot from log compacted topic

2015-02-19 Thread Joel Koshy
The log end offset (of a partition) changes when messages are appended
to the partition. (It is not correlated with the consumer's offset).


On Thu, Feb 19, 2015 at 08:58:10PM +, Will Funnell wrote:
> So at what point does the log end offset change? When you commit?
> 
> On 19 February 2015 at 18:47, Joel Koshy  wrote:
> 
> > > If I consumed up to the log end offset and log compaction happens in
> > > between, I would have missed some messages.
> >
> > Compaction actually only runs on the rolled over segments (not the
> > active - i.e., latest segment). The log-end-offset will be in the
> > latest segment which does not participate in compaction.
> >
> > > > The log end offset is just the end of the committed messages in the log
> > > > (the last thing the consumer has access to). It isn't the same as the
> > > > cleaner point but is always later than it so it would work just as
> > well.
> > >
> > > Isn't this just roughly the same value as using c.getOffsetsBefore()
> > with a
> > > partitionRequestTime of -1?
> > >
> > >
> > > Although its always later than the cleaner point, surely log compaction
> > is
> > > still an issue here.
> > >
> > > If I consumed up to the log end offset and log compaction happens in
> > > between, I would have missed some messages.
> > >
> > >
> > > My thinking was that if you knew the log cleaner point, you could:
> > >
> > > Make a note of the starting offset
> > > Consume till end of log
> > > Check my starting point is ahead of current cleaner point, otherwise
> > loop.
> > >
> > >
> > > I appreciate there is a chance I misunderstood your point.
> > >
> > > On 19 February 2015 at 18:02, Jay Kreps  wrote:
> > >
> > > > The log end offset is just the end of the committed messages in the log
> > > > (the last thing the consumer has access to). It isn't the same as the
> > > > cleaner point but is always later than it so it would work just as
> > well.
> > > >
> > > > -Jay
> > > >
> > > > On Thu, Feb 19, 2015 at 8:54 AM, Will Funnell 
> > > > wrote:
> > > >
> > > > > > I'm not sure if I misunderstood Jay's suggestion, but I think it is
> > > > > > along the lines of: we expose the log-end-offset (actually the high
> > > > > > watermark) of the partition in the fetch response. However, this is
> > > > > > not exposed to the consumer (either in the new ConsumerRecord class
> > > > > > or the existing MessageAndMetadata class). If we did, then if you
> > > > > > were to consume a record you can check that it has offsets up to
> > the
> > > > > > log-end offset. If it does then you would know for sure that you
> > have
> > > > > > consumed everything for that partition
> > > > >
> > > > > To confirm then, the log-end-offset is the same as the cleaner point?
> > > > >
> > > > >
> > > > >
> > > > > On 19 February 2015 at 03:10, Jay Kreps  wrote:
> > > > >
> > > > > > Yeah I was thinking either along the lines Joel was suggesting or
> > else
> > > > > > adding a logEndOffset(TopicPartition) method or something like
> > that. As
> > > > > > Joel says the consumer actually has this information internally (we
> > > > > return
> > > > > > it with the fetch request) but doesn't expose it.
> > > > > >
> > > > > > -Jay
> > > > > >
> > > > > > On Wed, Feb 18, 2015 at 4:51 PM, Joel Koshy 
> > > > wrote:
> > > > > >
> > > > > > > > > 2. Make the log end offset available more easily in the
> > consumer.
> > > > > > > >
> > > > > > > > Was thinking something would need to be added in
> > LogCleanerManager,
> > > > > in
> > > > > > > the
> > > > > > > > updateCheckpoints function. Where would be best to publish the
> > > > > > > information
> > > > > > > > to make it more easily available, or would you just expose the
> > > > > > > > offset-cleaner-checkpoint file as it is?
> > > > > > > > Is it right you would also need to know whi

Re: New Consumer Offset management in 0.8.2

2015-02-19 Thread Joel Koshy
Yes it is supported in 0.8.2-beta. It is documented on the site - you
will need to set offsets.storage to kafka.

On Thu, Feb 19, 2015 at 03:57:31PM -0500, Matthew Butt wrote:
> I'm having a hard time figuring out if the new Kafka-based offset
> management in the high-level Scala Consumer is implemented in the current
> version of 0.8.2-beta. If I implement a high-level consumer, will it use
> the new system, or will it still be storing in zookeeper? Do I need to wait
> for the Java consumer to take advantage of it?
> 
> -- 
> - Matt



Re: Consuming a snapshot from log compacted topic

2015-02-19 Thread Joel Koshy
> If I consumed up to the log end offset and log compaction happens in
> between, I would have missed some messages.

Compaction actually only runs on the rolled over segments (not the
active - i.e., latest segment). The log-end-offset will be in the
latest segment which does not participate in compaction.

> > The log end offset is just the end of the committed messages in the log
> > (the last thing the consumer has access to). It isn't the same as the
> > cleaner point but is always later than it so it would work just as well.
> 
> Isn't this just roughly the same value as using c.getOffsetsBefore() with a
> partitionRequestTime of -1?
> 
> 
> Although its always later than the cleaner point, surely log compaction is
> still an issue here.
> 
> If I consumed up to the log end offset and log compaction happens in
> between, I would have missed some messages.
> 
> 
> My thinking was that if you knew the log cleaner point, you could:
> 
> Make a note of the starting offset
> Consume till end of log
> Check my starting point is ahead of current cleaner point, otherwise loop.
> 
> 
> I appreciate there is a chance I misunderstood your point.
> 
> On 19 February 2015 at 18:02, Jay Kreps  wrote:
> 
> > The log end offset is just the end of the committed messages in the log
> > (the last thing the consumer has access to). It isn't the same as the
> > cleaner point but is always later than it so it would work just as well.
> >
> > -Jay
> >
> > On Thu, Feb 19, 2015 at 8:54 AM, Will Funnell 
> > wrote:
> >
> > > > I'm not sure if I misunderstood Jay's suggestion, but I think it is
> > > > along the lines of: we expose the log-end-offset (actually the high
> > > > watermark) of the partition in the fetch response. However, this is
> > > > not exposed to the consumer (either in the new ConsumerRecord class
> > > > or the existing MessageAndMetadata class). If we did, then if you
> > > > were to consume a record you can check that it has offsets up to the
> > > > log-end offset. If it does then you would know for sure that you have
> > > > consumed everything for that partition
> > >
> > > To confirm then, the log-end-offset is the same as the cleaner point?
> > >
> > >
> > >
> > > On 19 February 2015 at 03:10, Jay Kreps  wrote:
> > >
> > > > Yeah I was thinking either along the lines Joel was suggesting or else
> > > > adding a logEndOffset(TopicPartition) method or something like that. As
> > > > Joel says the consumer actually has this information internally (we
> > > return
> > > > it with the fetch request) but doesn't expose it.
> > > >
> > > > -Jay
> > > >
> > > > On Wed, Feb 18, 2015 at 4:51 PM, Joel Koshy 
> > wrote:
> > > >
> > > > > > > 2. Make the log end offset available more easily in the consumer.
> > > > > >
> > > > > > Was thinking something would need to be added in LogCleanerManager,
> > > in
> > > > > the
> > > > > > updateCheckpoints function. Where would be best to publish the
> > > > > information
> > > > > > to make it more easily available, or would you just expose the
> > > > > > offset-cleaner-checkpoint file as it is?
> > > > > > Is it right you would also need to know which
> > > offset-cleaner-checkpoint
> > > > > > entry related to each active partition?
> > > > >
> > > > > I'm not sure if I misunderstood Jay's suggestion, but I think it is
> > > > > along the lines of: we expose the log-end-offset (actually the high
> > > > > watermark) of the partition in the fetch response. However, this is
> > > > > not exposed to the consumer (either in the new ConsumerRecord class
> > > > > or the existing MessageAndMetadata class). If we did, then if you
> > > > > were to consume a record you can check that it has offsets up to the
> > > > > log-end offset. If it does then you would know for sure that you have
> > > > > consumed everything for that partition.
> > > > >
> > > > > > Yes, was looking at this initially, but as we have 100-150 writes
> > per
> > > > > > second, it could be a while before there is a pause long enough to
> > > > check
> > > > > it
> > > > > > has caught up. Even with the consumer tim

Re: Simple Consumer and offsets

2015-02-19 Thread Joel Koshy
I see - yes, you can use the SimpleConsumer for that. However, your
high-level consumers need to be shutdown while you do that (otherwise
they may auto-commit while you are resetting offsets).

Thanks,

Joel

On Thu, Feb 19, 2015 at 03:29:19PM +, Suren wrote:
> We are using the High Level Consumer API to interact with Kafka for our 
> normal use cases.
> 
> However, on consumer restart in the case of consumer failures, we want to be 
> able to manually
> reset offsets in certain situations.
> And ideally we'd like to use the same api in 0.8.1 and 0.8.2. :-)
> It looked like instantiating a SimpleConsumer just to reset offsets on 
> restart was a viable option, while continuing to use the High Level Consumer 
> for our normal operations. Not sure if there is a better way that is 
> compatible across 0.8.1 and 0.8.2.
> -Suren
>  
> 
>  On Thursday, February 19, 2015 10:25 AM, Joel Koshy 
>  wrote:
>
> 
>  Not sure what you mean by using the SimpleConsumer on failure
> recovery. Can you elaborate on this?
> 
> On Thu, Feb 19, 2015 at 03:04:47PM +, Suren wrote:
> > Haven't used either one now. Sounds like 0.8.2.1 will help.
> > We are using the High Level Consumer generally but are thinking to use the 
> > SimpleConsumer on failure recovery to set the offsets.
> > Is that the recommended approach for this use case?
> > Thanks.
> > -Suren
> >  
> > 
> >      On Thursday, February 19, 2015 9:40 AM, Joel Koshy 
> > wrote:
> >    
> > 
> >  Are you using it from Java or Scala? i.e., are you using the
> > javaapi.SimpleConsumer or kafka.consumer.SimpleConsumer
> > 
> > In 0.8.2 javaapi we explicitly set version 0 of the
> > OffsetCommitRequest/OffsetFetchRequest which means it will
> > commit/fetch to/from ZooKeeper only. If you use the scala API you can
> > create an OffsetCommitRequest with version set to 1 (which will allow
> > you to commit to Kafka).
> > 
> > Since we are doing an 0.8.2.1 release we will make the above more
> > consistent. i.e., you can create OffsetCommitRequests with version 1
> > even from the javaapi. I will be updating the documentation on this to
> > make it clearer.
> > 
> > Thanks,
> > 
> > Joel
> > 
> > On Thu, Feb 19, 2015 at 02:28:32PM +, Suren wrote:
> > > Joel,
> > > Looking at SimpleConsumer in the 0.8.2 code, it is using 
> > > OffsetCommitRequest and sending that over to a broker.
> > > Is the broker storing that in ZK?
> > > -Suren
> > >  
> > > 
> > >      On Tuesday, February 17, 2015 12:22 PM, Joel Koshy 
> > > wrote:
> > >    
> > > 
> > >  Hi Chris,
> > > 
> > > In 0.8.2, the simple consumer Java API supports committing/fetching
> > > offsets that are stored in ZooKeeper. You don't need to issue any
> > > ConsumerMetadataRequest for this. Unfortunately, the API currently
> > > does not support fetching offsets that are stored in Kafka.
> > > 
> > > Thanks,
> > > 
> > > Joel
> > > 
> > > On Mon, Feb 16, 2015 at 05:02:08PM -0500, Christopher Piggott wrote:
> > > > Hi,
> > > > 
> > > > I am still using 0.8.1.1 because of the CPU use concerns.
> > > > 
> > > > I'm confused about why the SimpleConsumer has:
> > > > 
> > > > OffsetCommitResponse commitOffsets(OffsetCommitRequest request)
> > > > 
> > > > and
> > > > 
> > > > OffsetFetchResponse fetchOffsets(OffsetFetchRequest request)
> > > > 
> > > > but no way that I can see to issue a ConsumerMetadataRequest, which is
> > > > what I think when restarting my consumers so that they can begin
> > > > working where they last left off (in the event that they were stopped
> > > > for a while then restarted some time later, and new messages had come
> > > > in).
> > > > 
> > > > The fetchOffsets() works on time, usually it looks like you send it
> > > > Earliest or Latest (beginning or end of what's currently in the
> > > > stream).
> > > > 
> > > > I realize the documentation says this:
> > > > 
> > > > 
> > > > > *Downsides of using SimpleConsumer*The SimpleConsumer does require a 
> > > > > significant amount of work not needed in the Consumer Groups:
> > > > >
> > > > >    1. You must keep track of the offsets in your application to know 
> > > > >where you left o

Re: Simple Consumer and offsets

2015-02-19 Thread Joel Koshy
Not sure what you mean by using the SimpleConsumer on failure
recovery. Can you elaborate on this?

On Thu, Feb 19, 2015 at 03:04:47PM +, Suren wrote:
> Haven't used either one now. Sounds like 0.8.2.1 will help.
> We are using the High Level Consumer generally but are thinking to use the 
> SimpleConsumer on failure recovery to set the offsets.
> Is that the recommended approach for this use case?
> Thanks.
> -Suren
>  
> 
>  On Thursday, February 19, 2015 9:40 AM, Joel Koshy  
> wrote:
>
> 
>  Are you using it from Java or Scala? i.e., are you using the
> javaapi.SimpleConsumer or kafka.consumer.SimpleConsumer
> 
> In 0.8.2 javaapi we explicitly set version 0 of the
> OffsetCommitRequest/OffsetFetchRequest which means it will
> commit/fetch to/from ZooKeeper only. If you use the scala API you can
> create an OffsetCommitRequest with version set to 1 (which will allow
> you to commit to Kafka).
> 
> Since we are doing an 0.8.2.1 release we will make the above more
> consistent. i.e., you can create OffsetCommitRequests with version 1
> even from the javaapi. I will be updating the documentation on this to
> make it clearer.
> 
> Thanks,
> 
> Joel
> 
> On Thu, Feb 19, 2015 at 02:28:32PM +, Suren wrote:
> > Joel,
> > Looking at SimpleConsumer in the 0.8.2 code, it is using 
> > OffsetCommitRequest and sending that over to a broker.
> > Is the broker storing that in ZK?
> > -Suren
> >  
> > 
> >      On Tuesday, February 17, 2015 12:22 PM, Joel Koshy 
> > wrote:
> >    
> > 
> >  Hi Chris,
> > 
> > In 0.8.2, the simple consumer Java API supports committing/fetching
> > offsets that are stored in ZooKeeper. You don't need to issue any
> > ConsumerMetadataRequest for this. Unfortunately, the API currently
> > does not support fetching offsets that are stored in Kafka.
> > 
> > Thanks,
> > 
> > Joel
> > 
> > On Mon, Feb 16, 2015 at 05:02:08PM -0500, Christopher Piggott wrote:
> > > Hi,
> > > 
> > > I am still using 0.8.1.1 because of the CPU use concerns.
> > > 
> > > I'm confused about why the SimpleConsumer has:
> > > 
> > > OffsetCommitResponse commitOffsets(OffsetCommitRequest request)
> > > 
> > > and
> > > 
> > > OffsetFetchResponse fetchOffsets(OffsetFetchRequest request)
> > > 
> > > but no way that I can see to issue a ConsumerMetadataRequest, which is
> > > what I think when restarting my consumers so that they can begin
> > > working where they last left off (in the event that they were stopped
> > > for a while then restarted some time later, and new messages had come
> > > in).
> > > 
> > > The fetchOffsets() works on time, usually it looks like you send it
> > > Earliest or Latest (beginning or end of what's currently in the
> > > stream).
> > > 
> > > I realize the documentation says this:
> > > 
> > > 
> > > > *Downsides of using SimpleConsumer*The SimpleConsumer does require a 
> > > > significant amount of work not needed in the Consumer Groups:
> > > >
> > > >    1. You must keep track of the offsets in your application to know 
> > > >where you left off consuming.
> > > >
> > > > But that's not really quite true ... not as long as commitOffsets() has 
> > > > been provided.  It seems the SimpleConsumer provides you with a 
> > > > solution to only one half of the problem of offset management.
> > > 
> > > Using some zookeeper python scripts I wrote I can see that the
> > > commitOffsets() is doing its job and writing to
> > > 
> > > 
> > > /consumers/myGroupId/offsets/myTopic/0
> > > 
> > > 
> > > That has this value:
> > > 
> > > ('32757408', ZnodeStat(czxid=2211679, mzxid=14779964, ctime=1423777630972,
> > > > mtime=1424122117397, version=12568262, cversion=0, aversion=0,
> > > > ephemeralOwner=0, dataLength=8, numChildren=0, pzxid=2211679))
> > > 
> > > 
> > > Now the question is just how to retrieve that - do I really have to
> > > have my client connect to ZK directly?  If that's the case, future
> > > upgrades would break (e.g. 0.8.2 having its own storage for commit
> > > watermarks).
> > > 
> > > 
> > > What was the intent here, and what's the advice on how to proceed
> > > being that 0.8.2 is in an iffy state right now?
> > > 
> > > 
> > > --Chris
> > 
> > 
> > 
> >    
> 
> 
> 
>



Re: Simple Consumer and offsets

2015-02-19 Thread Joel Koshy
Are you using it from Java or Scala? i.e., are you using the
javaapi.SimpleConsumer or kafka.consumer.SimpleConsumer

In 0.8.2 javaapi we explicitly set version 0 of the
OffsetCommitRequest/OffsetFetchRequest which means it will
commit/fetch to/from ZooKeeper only. If you use the scala API you can
create an OffsetCommitRequest with version set to 1 (which will allow
you to commit to Kafka).

Since we are doing an 0.8.2.1 release we will make the above more
consistent. i.e., you can create OffsetCommitRequests with version 1
even from the javaapi. I will be updating the documentation on this to
make it clearer.

Thanks,

Joel

On Thu, Feb 19, 2015 at 02:28:32PM +, Suren wrote:
> Joel,
> Looking at SimpleConsumer in the 0.8.2 code, it is using OffsetCommitRequest 
> and sending that over to a broker.
> Is the broker storing that in ZK?
> -Suren
>  
> 
>  On Tuesday, February 17, 2015 12:22 PM, Joel Koshy  
> wrote:
>
> 
>  Hi Chris,
> 
> In 0.8.2, the simple consumer Java API supports committing/fetching
> offsets that are stored in ZooKeeper. You don't need to issue any
> ConsumerMetadataRequest for this. Unfortunately, the API currently
> does not support fetching offsets that are stored in Kafka.
> 
> Thanks,
> 
> Joel
> 
> On Mon, Feb 16, 2015 at 05:02:08PM -0500, Christopher Piggott wrote:
> > Hi,
> > 
> > I am still using 0.8.1.1 because of the CPU use concerns.
> > 
> > I'm confused about why the SimpleConsumer has:
> > 
> > OffsetCommitResponse commitOffsets(OffsetCommitRequest request)
> > 
> > and
> > 
> > OffsetFetchResponse fetchOffsets(OffsetFetchRequest request)
> > 
> > but no way that I can see to issue a ConsumerMetadataRequest, which is
> > what I think when restarting my consumers so that they can begin
> > working where they last left off (in the event that they were stopped
> > for a while then restarted some time later, and new messages had come
> > in).
> > 
> > The fetchOffsets() works on time, usually it looks like you send it
> > Earliest or Latest (beginning or end of what's currently in the
> > stream).
> > 
> > I realize the documentation says this:
> > 
> > 
> > > *Downsides of using SimpleConsumer*The SimpleConsumer does require a 
> > > significant amount of work not needed in the Consumer Groups:
> > >
> > >    1. You must keep track of the offsets in your application to know 
> > >where you left off consuming.
> > >
> > > But that's not really quite true ... not as long as commitOffsets() has 
> > > been provided.  It seems the SimpleConsumer provides you with a solution 
> > > to only one half of the problem of offset management.
> > 
> > Using some zookeeper python scripts I wrote I can see that the
> > commitOffsets() is doing its job and writing to
> > 
> > 
> > /consumers/myGroupId/offsets/myTopic/0
> > 
> > 
> > That has this value:
> > 
> > ('32757408', ZnodeStat(czxid=2211679, mzxid=14779964, ctime=1423777630972,
> > > mtime=1424122117397, version=12568262, cversion=0, aversion=0,
> > > ephemeralOwner=0, dataLength=8, numChildren=0, pzxid=2211679))
> > 
> > 
> > Now the question is just how to retrieve that - do I really have to
> > have my client connect to ZK directly?  If that's the case, future
> > upgrades would break (e.g. 0.8.2 having its own storage for commit
> > watermarks).
> > 
> > 
> > What was the intent here, and what's the advice on how to proceed
> > being that 0.8.2 is in an iffy state right now?
> > 
> > 
> > --Chris
> 
> 
> 
>



Re: Consuming a snapshot from log compacted topic

2015-02-18 Thread Joel Koshy
> > 2. Make the log end offset available more easily in the consumer.
> 
> Was thinking something would need to be added in LogCleanerManager, in the
> updateCheckpoints function. Where would be best to publish the information
> to make it more easily available, or would you just expose the
> offset-cleaner-checkpoint file as it is?
> Is it right you would also need to know which offset-cleaner-checkpoint
> entry related to each active partition?

I'm not sure if I misunderstood Jay's suggestion, but I think it is
along the lines of: we expose the log-end-offset (actually the high
watermark) of the partition in the fetch response. However, this is
not exposed to the consumer (either in the new ConsumerRecord class
or the existing MessageAndMetadata class). If we did, then if you
were to consume a record you can check that it has offsets up to the
log-end offset. If it does then you would know for sure that you have
consumed everything for that partition.

> Yes, was looking at this initially, but as we have 100-150 writes per
> second, it could be a while before there is a pause long enough to check it
> has caught up. Even with the consumer timeout set to -1, it takes some time
> to query the max offset values, which is still long enough for more
> messages to arrive.

Got it - thanks for clarifying.

> 
> 
> 
> On 18 February 2015 at 23:16, Joel Koshy  wrote:
> 
> > > You are also correct and perceptive to notice that if you check the end
> > of
> > > the log then begin consuming and read up to that point compaction may
> > have
> > > already kicked in (if the reading takes a while) and hence you might have
> > > an incomplete snapshot.
> >
> > Isn't it sufficient to just repeat the check at the end after reading
> > the log and repeat until you are truly done? At least for the purposes
> > of a snapshot?
> >
> > On Wed, Feb 18, 2015 at 02:21:49PM -0800, Jay Kreps wrote:
> > > If you catch up off a compacted topic and keep consuming then you will
> > > become consistent with the log.
> > >
> > > I think what you are saying is that you want to create a snapshot from
> > the
> > > Kafka topic but NOT do continual reads after that point. For example you
> > > might be creating a backup of the data to a file.
> > >
> > > I agree that this isn't as easy as it could be. As you say the only
> > > solution we have is that timeout which doesn't differentiate between GC
> > > stall in your process and no more messages left so you would need to tune
> > > the timeout. This is admittedly kind of a hack.
> > >
> > > You are also correct and perceptive to notice that if you check the end
> > of
> > > the log then begin consuming and read up to that point compaction may
> > have
> > > already kicked in (if the reading takes a while) and hence you might have
> > > an incomplete snapshot.
> > >
> > > I think there are two features we could add that would make this easier:
> > > 1. Make the cleaner point configurable on a per-topic basis. This feature
> > > would allow you to control how long the full log is retained and when
> > > compaction can kick in. This would give a configurable SLA for the reader
> > > process to catch up.
> > > 2. Make the log end offset available more easily in the consumer.
> > >
> > > -Jay
> > >
> > >
> > >
> > > On Wed, Feb 18, 2015 at 10:18 AM, Will Funnell 
> > > wrote:
> > >
> > > > We are currently using Kafka 0.8.1.1 with log compaction in order to
> > > > provide streams of messages to our clients.
> > > >
> > > > As well as constantly consuming the stream, one of our use cases is to
> > > > provide a snapshot, meaning the user will receive a copy of every
> > message
> > > > at least once.
> > > >
> > > > Each one of these messages represents an item of content in our system.
> > > >
> > > >
> > > > The problem comes when determining if the client has actually reached
> > the
> > > > end of the topic.
> > > >
> > > > The standard Kafka way of dealing with this seems to be by using a
> > > > ConsumerTimeoutException, but we are frequently getting this error
> > when the
> > > > end of the topic has not been reached or even it may take a long time
> > > > before a timeout naturally occurs.
> > > >
> > > >
> > > > On first glance it would seem possible to do a lookup for the max

Re: Consuming a snapshot from log compacted topic

2015-02-18 Thread Joel Koshy
> You are also correct and perceptive to notice that if you check the end of
> the log then begin consuming and read up to that point compaction may have
> already kicked in (if the reading takes a while) and hence you might have
> an incomplete snapshot.

Isn't it sufficient to just repeat the check at the end after reading
the log and repeat until you are truly done? At least for the purposes
of a snapshot?

On Wed, Feb 18, 2015 at 02:21:49PM -0800, Jay Kreps wrote:
> If you catch up off a compacted topic and keep consuming then you will
> become consistent with the log.
> 
> I think what you are saying is that you want to create a snapshot from the
> Kafka topic but NOT do continual reads after that point. For example you
> might be creating a backup of the data to a file.
> 
> I agree that this isn't as easy as it could be. As you say the only
> solution we have is that timeout which doesn't differentiate between GC
> stall in your process and no more messages left so you would need to tune
> the timeout. This is admittedly kind of a hack.
> 
> You are also correct and perceptive to notice that if you check the end of
> the log then begin consuming and read up to that point compaction may have
> already kicked in (if the reading takes a while) and hence you might have
> an incomplete snapshot.
> 
> I think there are two features we could add that would make this easier:
> 1. Make the cleaner point configurable on a per-topic basis. This feature
> would allow you to control how long the full log is retained and when
> compaction can kick in. This would give a configurable SLA for the reader
> process to catch up.
> 2. Make the log end offset available more easily in the consumer.
> 
> -Jay
> 
> 
> 
> On Wed, Feb 18, 2015 at 10:18 AM, Will Funnell 
> wrote:
> 
> > We are currently using Kafka 0.8.1.1 with log compaction in order to
> > provide streams of messages to our clients.
> >
> > As well as constantly consuming the stream, one of our use cases is to
> > provide a snapshot, meaning the user will receive a copy of every message
> > at least once.
> >
> > Each one of these messages represents an item of content in our system.
> >
> >
> > The problem comes when determining if the client has actually reached the
> > end of the topic.
> >
> > The standard Kafka way of dealing with this seems to be by using a
> > ConsumerTimeoutException, but we are frequently getting this error when the
> > end of the topic has not been reached or even it may take a long time
> > before a timeout naturally occurs.
> >
> >
> > On first glance it would seem possible to do a lookup for the max offset
> > for each partition when you begin consuming, stopping when this position it
> > reached.
> >
> > But log compaction means that if an update to a piece of content arrives
> > with the same message key, then this will be written to the end so the
> > snapshot will be incomplete.
> >
> >
> > Another thought is to make use of the cleaner point. Currently Kafka writes
> > out to a "cleaner-offset-checkpoint" file in each data directory which is
> > written to after log compaction completes.
> >
> > If the consumer was able to access the cleaner-offset-checkpoint you would
> > be able to consume up to this point, check the point was still the same,
> > and compaction had not yet occurred, and therefore determine you had
> > receive everything at least once. (Assuming there was no race condition
> > between compaction and writing to the file)
> >
> >
> > Has anybody got any thoughts?
> >
> > Will
> >



Re: Simple Consumer and offsets

2015-02-17 Thread Joel Koshy
Hi Chris,

In 0.8.2, the simple consumer Java API supports committing/fetching
offsets that are stored in ZooKeeper. You don't need to issue any
ConsumerMetadataRequest for this. Unfortunately, the API currently
does not support fetching offsets that are stored in Kafka.

Thanks,

Joel

On Mon, Feb 16, 2015 at 05:02:08PM -0500, Christopher Piggott wrote:
> Hi,
> 
> I am still using 0.8.1.1 because of the CPU use concerns.
> 
> I'm confused about why the SimpleConsumer has:
> 
> OffsetCommitResponse commitOffsets(OffsetCommitRequest request)
> 
> and
> 
> OffsetFetchResponse fetchOffsets(OffsetFetchRequest request)
> 
> but no way that I can see to issue a ConsumerMetadataRequest, which is
> what I think when restarting my consumers so that they can begin
> working where they last left off (in the event that they were stopped
> for a while then restarted some time later, and new messages had come
> in).
> 
> The fetchOffsets() works on time, usually it looks like you send it
> Earliest or Latest (beginning or end of what's currently in the
> stream).
> 
> I realize the documentation says this:
> 
> 
> > *Downsides of using SimpleConsumer*The SimpleConsumer does require a 
> > significant amount of work not needed in the Consumer Groups:
> >
> >1. You must keep track of the offsets in your application to know where 
> > you left off consuming.
> >
> > But that's not really quite true ... not as long as commitOffsets() has 
> > been provided.  It seems the SimpleConsumer provides you with a solution to 
> > only one half of the problem of offset management.
> 
> Using some zookeeper python scripts I wrote I can see that the
> commitOffsets() is doing its job and writing to
> 
> 
> /consumers/myGroupId/offsets/myTopic/0
> 
> 
> That has this value:
> 
> ('32757408', ZnodeStat(czxid=2211679, mzxid=14779964, ctime=1423777630972,
> > mtime=1424122117397, version=12568262, cversion=0, aversion=0,
> > ephemeralOwner=0, dataLength=8, numChildren=0, pzxid=2211679))
> 
> 
> Now the question is just how to retrieve that - do I really have to
> have my client connect to ZK directly?  If that's the case, future
> upgrades would break (e.g. 0.8.2 having its own storage for commit
> watermarks).
> 
> 
> What was the intent here, and what's the advice on how to proceed
> being that 0.8.2 is in an iffy state right now?
> 
> 
> --Chris



Re: consumer lag metric

2015-02-13 Thread Joel Koshy
There are FetcherLagMetrics that you can take a look at. However, it
is probably easiest to just monitor MaxLag as that reports the maximum
of all the lag metrics.

On Fri, Feb 13, 2015 at 05:03:28PM +0800, tao xiao wrote:
> Hi team,
> 
> Is there a metric that shows the consumer lag of a particular consumer
> group? similar to what offset checker provides
> 
> -- 
> Regards,
> Tao



Re: offset migration from kafka to zookeeper

2015-02-13 Thread Joel Koshy
Thanks for looking into that!

On Fri, Feb 13, 2015 at 05:31:39AM +, Jiangjie Qin wrote:
> I think this is the offset checker bug.
> The offset checker will
> 1. first check if the offset exists in offset topic on broker or not.
> 2. If it is on broker then it will just return that offset.
> 3. Otherwise it goes to zookeeper.
> 
> So the problem you saw was actually following this logic.
> After dual commit, offset topic already had the offsets for this consumer
> and topic.
> Then you switched to zookeeper commit.
> Because the offset topic has the offsets already, offset checker will use
> that and skip checking zookeeper. So the offset will not change anymore
> because you are no longer committing to offset topic on broker, while
> offset checker always use that offset.
> 
> On 2/12/15, 7:30 PM, "tao xiao"  wrote:
> 
> >I used the one shipped with 0.8.2. It is pretty straightforward to
> >reproduce the issue.
> >
> >Here are the steps to reproduce:
> >1. I have a consumer using high level consumer API with initial settings
> >offsets.storage=kafka and dual.commit.enabled=false.
> >2. After consuming messages for a while shutdown the consumer and change
> >setting dual.commit.enabled=true
> >3. bounce the consumer and run for while. The lag looks good
> >4. change setting offsets.storage=zookeeper and bounce the consumer.
> >Starting from now the lag remain unchanged
> >
> >On Fri, Feb 13, 2015 at 11:01 AM, Joel Koshy  wrote:
> >
> >> That is weird. Are you by any chance running an older version of the
> >> offset checker? Is this straightforward to reproduce?
> >>
> >> On Fri, Feb 13, 2015 at 09:57:31AM +0800, tao xiao wrote:
> >> > Joel,
> >> >
> >> > No, the metric was not increasing. It was 0 all the time.
> >> >
> >> > On Fri, Feb 13, 2015 at 12:18 AM, Joel Koshy 
> >> wrote:
> >> >
> >> > > Actually I meant to say check that is not increasing.
> >> > >
> >> > > On Thu, Feb 12, 2015 at 08:15:01AM -0800, Joel Koshy wrote:
> >> > > > Possibly a bug - can you also look at the MaxLag mbean in the
> >> consumer
> >> > > > to verify that the maxlag is zero?
> >> > > >
> >> > > > On Thu, Feb 12, 2015 at 11:24:42PM +0800, tao xiao wrote:
> >> > > > > Hi Joel,
> >> > > > >
> >> > > > > When I set dual.commit.enabled=true the count value of both
> >> metrics got
> >> > > > > increased. After I set offsets.storage=zookeeper only
> >> > > ZooKeeperCommitsPerSec
> >> > > > > changed but not KafkaCommitsPerSec. I think this is expected as
> >> kafka
> >> > > > > offset storage was turned off.
> >> > > > >
> >> > > > > But when I looked up the consumer lag via
> >> > > kafka.tools.ConsumerOffsetChecker
> >> > > > > the lag still remained unchanged.
> >> > > > >
> >> > > > > I scanned through the source code of ConsumerOffsetChecker it
> >> doesn't
> >> > > > > check the offset in zk unless offsetFetchResponse returns
> >>NoOffset.
> >> > > Since
> >> > > > > the consumer used kafka as the offset storage before I don't
> >>think
> >> > > > > offsetFetchResponse would return NoOffset
> >> > > > >
> >> > > > >  offsetFetchResponse.requestInfo.foreach { case
> >>(topicAndPartition,
> >> > > > > offsetAndMetadata) =>
> >> > > > >
> >> > > > > if (offsetAndMetadata ==
> >>OffsetMetadataAndError.NoOffset) {
> >> > > > >
> >> > > > >   val topicDirs = new ZKGroupTopicDirs(group,
> >> > > topicAndPartition.
> >> > > > > topic)
> >> > > > >
> >> > > > >   // this group may not have migrated off zookeeper for
> >> offsets
> >> > > > > storage (we don't expose the dual-commit option in this tool
> >> > > > >
> >> > > > >   // (meaning the lag may be off until all the consumers
> >> in the
> >> > > > > group have the same setting for offsets storage)
> >> > > > >
> >> &

Re: offset migration from kafka to zookeeper

2015-02-12 Thread Joel Koshy
That is weird. Are you by any chance running an older version of the
offset checker? Is this straightforward to reproduce?

On Fri, Feb 13, 2015 at 09:57:31AM +0800, tao xiao wrote:
> Joel,
> 
> No, the metric was not increasing. It was 0 all the time.
> 
> On Fri, Feb 13, 2015 at 12:18 AM, Joel Koshy  wrote:
> 
> > Actually I meant to say check that is not increasing.
> >
> > On Thu, Feb 12, 2015 at 08:15:01AM -0800, Joel Koshy wrote:
> > > Possibly a bug - can you also look at the MaxLag mbean in the consumer
> > > to verify that the maxlag is zero?
> > >
> > > On Thu, Feb 12, 2015 at 11:24:42PM +0800, tao xiao wrote:
> > > > Hi Joel,
> > > >
> > > > When I set dual.commit.enabled=true the count value of both metrics got
> > > > increased. After I set offsets.storage=zookeeper only
> > ZooKeeperCommitsPerSec
> > > > changed but not KafkaCommitsPerSec. I think this is expected as kafka
> > > > offset storage was turned off.
> > > >
> > > > But when I looked up the consumer lag via
> > kafka.tools.ConsumerOffsetChecker
> > > > the lag still remained unchanged.
> > > >
> > > > I scanned through the source code of ConsumerOffsetChecker it  doesn't
> > > > check the offset in zk unless offsetFetchResponse returns NoOffset.
> > Since
> > > > the consumer used kafka as the offset storage before I don't think
> > > > offsetFetchResponse would return NoOffset
> > > >
> > > >  offsetFetchResponse.requestInfo.foreach { case (topicAndPartition,
> > > > offsetAndMetadata) =>
> > > >
> > > > if (offsetAndMetadata == OffsetMetadataAndError.NoOffset) {
> > > >
> > > >   val topicDirs = new ZKGroupTopicDirs(group,
> > topicAndPartition.
> > > > topic)
> > > >
> > > >   // this group may not have migrated off zookeeper for offsets
> > > > storage (we don't expose the dual-commit option in this tool
> > > >
> > > >   // (meaning the lag may be off until all the consumers in the
> > > > group have the same setting for offsets storage)
> > > >
> > > >   try {
> > > >
> > > > val offset = ZkUtils.readData(zkClient,
> > topicDirs.consumerOffsetDir
> > > > + "/%d".format(topicAndPartition.partition))._1.toLong
> > > >
> > > > offsetMap.put(topicAndPartition, offset)
> > > >
> > > >   } catch {
> > > >
> > > > case z: ZkNoNodeException =>
> > > >
> > > >
> >  if(ZkUtils.pathExists(zkClient,topicDirs.consumerOffsetDir))
> > > >
> > > > offsetMap.put(topicAndPartition,-1)
> > > >
> > > >   else
> > > >
> > > > throw z
> > > >
> > > >   }
> > > >
> > > > }
> > > >
> > > > else if (offsetAndMetadata.error == ErrorMapping.NoError)
> > > >
> > > >   offsetMap.put(topicAndPartition, offsetAndMetadata.offset)
> > > >
> > > > else {
> > > >
> > > >   println("Could not fetch offset for %s due to %s.".format(
> > > > topicAndPartition, ErrorMapping.exceptionFor(offsetAndMetadata.error)))
> > > >
> > > > }
> > > >
> > > >   }
> > > >
> > > > On Thu, Feb 12, 2015 at 10:03 PM, Joel Koshy 
> > wrote:
> > > >
> > > > > There are mbeans named KafkaCommitsPerSec and ZooKeeperCommitsPerSec
> > -
> > > > > can you look those up and see what they report?
> > > > >
> > > > > On Thu, Feb 12, 2015 at 07:32:39PM +0800, tao xiao wrote:
> > > > > > Hi team,
> > > > > >
> > > > > > I was trying to migrate my consumer offset from kafka to zookeeper.
> > > > > >
> > > > > > Here is the original settings of my consumer
> > > > > >
> > > > > > props.put("offsets.storage", "kafka");
> > > > > >
> > > > > > props.put("dual.commit.enabled", "false");
> > > > > > Here is the steps
> > > > > >
> > > > > > 1. set dual.commit.enabled=true
> > > > > > 2. restart my consumer and monitor offset lag with
> > > > > > kafka.tools.ConsumerOffsetChecker
> > > > > > 3. set offsets.storage=zookeeper
> > > > > > 4. restart my consumer and monitor offset lag with
> > > > > > kafka.tools.ConsumerOffsetChecker
> > > > > >
> > > > > > After step 4 my consumer was able to continually consume data from
> > topic
> > > > > > but the offset lag remained unchanged. Did I do anything wrong?
> > > > > >
> > > > > > --
> > > > > > Regards,
> > > > > > Tao
> > > > >
> > > > >
> > > >
> > > >
> > > > --
> > > > Regards,
> > > > Tao
> > >
> >
> >
> 
> 
> -- 
> Regards,
> Tao



Re: offset migration from kafka to zookeeper

2015-02-12 Thread Joel Koshy
Actually I meant to say check that is not increasing.

On Thu, Feb 12, 2015 at 08:15:01AM -0800, Joel Koshy wrote:
> Possibly a bug - can you also look at the MaxLag mbean in the consumer
> to verify that the maxlag is zero?
> 
> On Thu, Feb 12, 2015 at 11:24:42PM +0800, tao xiao wrote:
> > Hi Joel,
> > 
> > When I set dual.commit.enabled=true the count value of both metrics got
> > increased. After I set offsets.storage=zookeeper only ZooKeeperCommitsPerSec
> > changed but not KafkaCommitsPerSec. I think this is expected as kafka
> > offset storage was turned off.
> > 
> > But when I looked up the consumer lag via kafka.tools.ConsumerOffsetChecker
> > the lag still remained unchanged.
> > 
> > I scanned through the source code of ConsumerOffsetChecker it  doesn't
> > check the offset in zk unless offsetFetchResponse returns NoOffset. Since
> > the consumer used kafka as the offset storage before I don't think
> > offsetFetchResponse would return NoOffset
> > 
> >  offsetFetchResponse.requestInfo.foreach { case (topicAndPartition,
> > offsetAndMetadata) =>
> > 
> > if (offsetAndMetadata == OffsetMetadataAndError.NoOffset) {
> > 
> >   val topicDirs = new ZKGroupTopicDirs(group, topicAndPartition.
> > topic)
> > 
> >   // this group may not have migrated off zookeeper for offsets
> > storage (we don't expose the dual-commit option in this tool
> > 
> >   // (meaning the lag may be off until all the consumers in the
> > group have the same setting for offsets storage)
> > 
> >   try {
> > 
> > val offset = ZkUtils.readData(zkClient, 
> > topicDirs.consumerOffsetDir
> > + "/%d".format(topicAndPartition.partition))._1.toLong
> > 
> > offsetMap.put(topicAndPartition, offset)
> > 
> >   } catch {
> > 
> > case z: ZkNoNodeException =>
> > 
> >   if(ZkUtils.pathExists(zkClient,topicDirs.consumerOffsetDir))
> > 
> > offsetMap.put(topicAndPartition,-1)
> > 
> >   else
> > 
> > throw z
> > 
> >   }
> > 
> > }
> > 
> > else if (offsetAndMetadata.error == ErrorMapping.NoError)
> > 
> >   offsetMap.put(topicAndPartition, offsetAndMetadata.offset)
> > 
> > else {
> > 
> >   println("Could not fetch offset for %s due to %s.".format(
> > topicAndPartition, ErrorMapping.exceptionFor(offsetAndMetadata.error)))
> > 
> > }
> > 
> >   }
> > 
> > On Thu, Feb 12, 2015 at 10:03 PM, Joel Koshy  wrote:
> > 
> > > There are mbeans named KafkaCommitsPerSec and ZooKeeperCommitsPerSec -
> > > can you look those up and see what they report?
> > >
> > > On Thu, Feb 12, 2015 at 07:32:39PM +0800, tao xiao wrote:
> > > > Hi team,
> > > >
> > > > I was trying to migrate my consumer offset from kafka to zookeeper.
> > > >
> > > > Here is the original settings of my consumer
> > > >
> > > > props.put("offsets.storage", "kafka");
> > > >
> > > > props.put("dual.commit.enabled", "false");
> > > > Here is the steps
> > > >
> > > > 1. set dual.commit.enabled=true
> > > > 2. restart my consumer and monitor offset lag with
> > > > kafka.tools.ConsumerOffsetChecker
> > > > 3. set offsets.storage=zookeeper
> > > > 4. restart my consumer and monitor offset lag with
> > > > kafka.tools.ConsumerOffsetChecker
> > > >
> > > > After step 4 my consumer was able to continually consume data from topic
> > > > but the offset lag remained unchanged. Did I do anything wrong?
> > > >
> > > > --
> > > > Regards,
> > > > Tao
> > >
> > >
> > 
> > 
> > -- 
> > Regards,
> > Tao
> 



Re: offset migration from kafka to zookeeper

2015-02-12 Thread Joel Koshy
Possibly a bug - can you also look at the MaxLag mbean in the consumer
to verify that the maxlag is zero?

On Thu, Feb 12, 2015 at 11:24:42PM +0800, tao xiao wrote:
> Hi Joel,
> 
> When I set dual.commit.enabled=true the count value of both metrics got
> increased. After I set offsets.storage=zookeeper only ZooKeeperCommitsPerSec
> changed but not KafkaCommitsPerSec. I think this is expected as kafka
> offset storage was turned off.
> 
> But when I looked up the consumer lag via kafka.tools.ConsumerOffsetChecker
> the lag still remained unchanged.
> 
> I scanned through the source code of ConsumerOffsetChecker it  doesn't
> check the offset in zk unless offsetFetchResponse returns NoOffset. Since
> the consumer used kafka as the offset storage before I don't think
> offsetFetchResponse would return NoOffset
> 
>  offsetFetchResponse.requestInfo.foreach { case (topicAndPartition,
> offsetAndMetadata) =>
> 
> if (offsetAndMetadata == OffsetMetadataAndError.NoOffset) {
> 
>   val topicDirs = new ZKGroupTopicDirs(group, topicAndPartition.
> topic)
> 
>   // this group may not have migrated off zookeeper for offsets
> storage (we don't expose the dual-commit option in this tool
> 
>   // (meaning the lag may be off until all the consumers in the
> group have the same setting for offsets storage)
> 
>   try {
> 
> val offset = ZkUtils.readData(zkClient, 
> topicDirs.consumerOffsetDir
> + "/%d".format(topicAndPartition.partition))._1.toLong
> 
> offsetMap.put(topicAndPartition, offset)
> 
>   } catch {
> 
> case z: ZkNoNodeException =>
> 
>   if(ZkUtils.pathExists(zkClient,topicDirs.consumerOffsetDir))
> 
> offsetMap.put(topicAndPartition,-1)
> 
>   else
> 
> throw z
> 
>   }
> 
> }
> 
> else if (offsetAndMetadata.error == ErrorMapping.NoError)
> 
>   offsetMap.put(topicAndPartition, offsetAndMetadata.offset)
> 
> else {
> 
>   println("Could not fetch offset for %s due to %s.".format(
> topicAndPartition, ErrorMapping.exceptionFor(offsetAndMetadata.error)))
> 
> }
> 
>   }
> 
> On Thu, Feb 12, 2015 at 10:03 PM, Joel Koshy  wrote:
> 
> > There are mbeans named KafkaCommitsPerSec and ZooKeeperCommitsPerSec -
> > can you look those up and see what they report?
> >
> > On Thu, Feb 12, 2015 at 07:32:39PM +0800, tao xiao wrote:
> > > Hi team,
> > >
> > > I was trying to migrate my consumer offset from kafka to zookeeper.
> > >
> > > Here is the original settings of my consumer
> > >
> > > props.put("offsets.storage", "kafka");
> > >
> > > props.put("dual.commit.enabled", "false");
> > > Here is the steps
> > >
> > > 1. set dual.commit.enabled=true
> > > 2. restart my consumer and monitor offset lag with
> > > kafka.tools.ConsumerOffsetChecker
> > > 3. set offsets.storage=zookeeper
> > > 4. restart my consumer and monitor offset lag with
> > > kafka.tools.ConsumerOffsetChecker
> > >
> > > After step 4 my consumer was able to continually consume data from topic
> > > but the offset lag remained unchanged. Did I do anything wrong?
> > >
> > > --
> > > Regards,
> > > Tao
> >
> >
> 
> 
> -- 
> Regards,
> Tao



Re: offset migration from kafka to zookeeper

2015-02-12 Thread Joel Koshy
There are mbeans named KafkaCommitsPerSec and ZooKeeperCommitsPerSec -
can you look those up and see what they report?

On Thu, Feb 12, 2015 at 07:32:39PM +0800, tao xiao wrote:
> Hi team,
> 
> I was trying to migrate my consumer offset from kafka to zookeeper.
> 
> Here is the original settings of my consumer
> 
> props.put("offsets.storage", "kafka");
> 
> props.put("dual.commit.enabled", "false");
> Here is the steps
> 
> 1. set dual.commit.enabled=true
> 2. restart my consumer and monitor offset lag with
> kafka.tools.ConsumerOffsetChecker
> 3. set offsets.storage=zookeeper
> 4. restart my consumer and monitor offset lag with
> kafka.tools.ConsumerOffsetChecker
> 
> After step 4 my consumer was able to continually consume data from topic
> but the offset lag remained unchanged. Did I do anything wrong?
> 
> -- 
> Regards,
> Tao



Re: Java APIs - which one to use?

2015-02-11 Thread Joel Koshy
You should use the producer under o.a.k.c. The new consumer
implementation is not available in 0.8.2 (although the APIs are there)
so you would need to use the kafka.javaapi classes for the consumer.
We plan to deprecate kafka.javaapi eventually.

Thanks,

Joel

On Wed, Feb 11, 2015 at 11:42:04AM +0100, Ole Hedegaard wrote:
> Hi,
> 
> I'm trying out Kafka for a Java-centric project, and to that end I have 
> downloaded the 0.8.2 source. I find both the kafka.javaapi.* classes (in the 
> core project) and the org.apache.kafka.clients.* classes (in the clients 
> project). I need to use the low-level API, and that seems do-able with both 
> approaches.
> 
> My question is: which is better to proceed with? My code will be around for a 
> while, so I would like there to be as little impact as possible when version 
> 0.9 hits the stores.
> 
> Regards,
> Ole Hedegaard
> 



Re: Lack of JMX LogCleaner and LogCleanerManager metrics

2015-02-10 Thread Joel Koshy
+1

On Tue, Feb 10, 2015 at 01:32:13PM -0800, Jay Kreps wrote:
> I agree that would be a better name. We could rename it if everyone likes
> Compactor better.
> 
> -Jay
> 
> On Tue, Feb 10, 2015 at 9:33 AM, Gwen Shapira  wrote:
> 
> > btw. the name LogCleaner is seriously misleading. Its more of a log
> > compacter.
> > Deleting old logs happens elsewhere from what I've seen.
> >
> > Gwen
> >
> > On Tue, Feb 10, 2015 at 8:07 AM, Jay Kreps  wrote:
> >
> > > Probably you need to enable the log cleaner for those to show up? We
> > > disable it by default and so I think those metrics never get created.
> > >
> > > -Jay
> > >
> > > On Tue, Feb 10, 2015 at 3:33 AM, o...@sematext.com <
> > st.comm.c...@gmail.com
> > > >
> > > wrote:
> > >
> > > > Hello,
> > > >
> > > > I have a problem with some JMX metrics. In Kafka source code I see
> > > > LogCleaner (has metrics: 'max-buffer-utilization-percent',
> > > > 'cleaner-recopy-percent', 'max-clean-time-secs') and LogCleanerManager
> > > (has
> > > > metric: 'max-dirty-percent')
> > > >
> > > > But I don't see this beans&metrics in jconsole when I attach to broker
> > > > process.
> > > > Any ideas why? Maybe this beans live only short time during cleanup
> > > that's
> > > > why I don't see them in jconsole?
> > > >
> > > > I use Kafka monitoring tool which show me other metrics, but not these
> > > > 'Log clean' metrics.
> > > >
> > > > Below are some SPM graphs showing the state of my system.
> > > > Here's the 'Topic Bytes/Messages' graph:
> > > >   https://apps.sematext.com/spm-reports/s/omV6bYPGAG
> > >
> >



Re: question about new consumer offset management in 0.8.2

2015-02-06 Thread Joel Koshy

On Thu, Feb 05, 2015 at 11:57:15PM -0800, Joel Koshy wrote:
> On Fri, Feb 06, 2015 at 12:43:37AM -0500, Jason Rosenberg wrote:
> > I'm not sure what you mean by 'default' behavior 'only if' offset.storage
> > is kafka.  Does that mean the 'default' behavior is 'false' if
> > offset.storage is 'zookeeper'?  Can that be clarified in the config
> > documentation section?
> > 
> > In section 5.6 where the offset managements is described, there is this:
> > "A roll-back (i.e., migrating from Kafka back to ZooKeeper) can also be
> > performed using the above steps if you set offsets.storage=zookeeper."
> > 
> > This implies that dual commit will work also if offsets.storage=zookeeper,
> > no?  Just not by default?  Perhaps there needs to be clarification there
> > (and in the config section for offsets.storage & dual.commit.enabled).
> 
> Actually I think there may be a bug here if someone needs to roll back
> from Kafka-based offsets to zookeeper. Will reply tomorrow on this.
> 
> > 

Never mind - I think we are fine here.  The scenario I was thinking
about is the following: 
- If there are three consumer instances c0, c1, c2 consuming
  partitions pX, pY, ...  and are committing offsets to Kafka and you
  want to migrate to zookeeper
- Do a rolling bounce to turn on dual-commit (and keep offset.storage
  = kafka)
- Do another rolling bounce to set offset.storage to zookeeper:
  - Say, you bounce c0 to commit offsets to zk and it comes back up
and then owns pX. It begins to commit offsets for pX to zookeeper
only.
  - You then bounce c1; after it goes down due to our partition
assignment strategy say pX is now assigned to c2 (which has not
yet been bounced).
  - c2 uses offset.storage=kafka so would fetch a potentially stale
offset for pX which would be an issue. 
  - So we explicitly handle this case - if dual.commit is turned on
and offset.storage is kafka, then the broker fetches offsets from
both Kafka and ZooKeeper and selects the maximum of the two.

Let me know if you see any holes in the above.

dual.commit is confusing and would have been (slightly) less confusing if it
was called offset.migration.in.progress or something similar. Still, I think
we can document the process carefully and state clearly that it is
intended for use during migration/roll-back only.



Re: question about new consumer offset management in 0.8.2

2015-02-06 Thread Joel Koshy
On Fri, Feb 06, 2015 at 12:43:37AM -0500, Jason Rosenberg wrote:
> I'm not sure what you mean by 'default' behavior 'only if' offset.storage
> is kafka.  Does that mean the 'default' behavior is 'false' if
> offset.storage is 'zookeeper'?  Can that be clarified in the config
> documentation section?
> 
> In section 5.6 where the offset managements is described, there is this:
> "A roll-back (i.e., migrating from Kafka back to ZooKeeper) can also be
> performed using the above steps if you set offsets.storage=zookeeper."
> 
> This implies that dual commit will work also if offsets.storage=zookeeper,
> no?  Just not by default?  Perhaps there needs to be clarification there
> (and in the config section for offsets.storage & dual.commit.enabled).

Actually I think there may be a bug here if someone needs to roll back
from Kafka-based offsets to zookeeper. Will reply tomorrow on this.

> 
> The doc in section 5.6 is probably in need of editing, it looks like it in
> places assumes zookeeper offset storage, and has some repeated sentences,
> etc.

Will do.

> 
> Finally, why is section 5.6 titled "Distribution"?  Seems to be a grab-bag
> of mostly consumer related topics?

Yes this was prior structure that can be improved.

> >
> > > On Thu, Feb 5, 2015 at 2:21 PM, Joel Koshy  wrote:
> > >
> > > > This is documented in the official docs:
> > > > http://kafka.apache.org/documentation.html#distributionimpl
> > > >
> > > > On Thu, Feb 05, 2015 at 01:23:01PM -0500, Jason Rosenberg wrote:
> > > > > What are the defaults for those settings (I assume it will be to
> > continue
> > > > > using only zookeeper by default)?
> > > > >
> > > > > Also, if I have a cluster of consumers sharing the same groupId, and
> > I
> > > > > update them via a rolling release, will it be a problem during the
> > > > rolling
> > > > > restart if there is inconsistency in the settings for a short time?
> > Or
> > > > is
> > > > > it required that the entire cluster be stopped, then update configs,
> > then
> > > > > restart all nodes?
> > > > >
> > > > > Jason
> > > > >
> > > > > On Thu, Feb 5, 2015 at 12:45 PM, Gwen Shapira  > >
> > > > wrote:
> > > > >
> > > > > > Thanks Jon. I updated the FAQ with your procedure:
> > > > > >
> > > > > >
> > > > > >
> > > >
> > https://cwiki.apache.org/confluence/display/KAFKA/FAQ#FAQ-HowdowemigratetocommittingoffsetstoKafka(ratherthanZookeeper)in0.8.2
> > > > > > ?
> > > > > >
> > > > > > On Thu, Feb 5, 2015 at 9:16 AM, Jon Bringhurst <
> > > > > > jbringhu...@linkedin.com.invalid> wrote:
> > > > > >
> > > > > > > There should probably be a wiki page started for this so we have
> > the
> > > > > > > details in one place. The same question was asked on Freenode
> > IRC a
> > > > few
> > > > > > > minutes ago. :)
> > > > > > >
> > > > > > > A summary of the migration procedure is:
> > > > > > >
> > > > > > > 1) Upgrade your brokers and set dual.commit.enabled=false and
> > > > > > > offsets.storage=zookeeper (Commit offsets to Zookeeper Only).
> > > > > > > 2) Set dual.commit.enabled=true and offsets.storage=kafka and
> > restart
> > > > > > > (Commit offsets to Zookeeper and Kafka).
> > > > > > > 3) Set dual.commit.enabled=false and offsets.storage=kafka and
> > > > restart
> > > > > > > (Commit offsets to Kafka only).
> > > > > > >
> > > > > > > -Jon
> > > > > > >
> > > > > > > On Feb 5, 2015, at 9:03 AM, Jason Rosenberg 
> > > > wrote:
> > > > > > >
> > > > > > > > Hi,
> > > > > > > >
> > > > > > > > For 0.8.2, one of the features listed is:
> > > > > > > >  - Kafka-based offset storage.
> > > > > > > >
> > > > > > > > Is there documentation on this (I've heard discussion of it of
> > > > course)?
> > > > > > > >
> > > > > > > > Also, is it something that will be used by existing consumers
> > when
> > > > they
> > > > > > > > migrate up to 0.8.2?  What is the migration process?
> > > > > > > >
> > > > > > > > Thanks,
> > > > > > > >
> > > > > > > > Jason
> > > > > > >
> > > > > > >
> > > > > >
> > > >
> > > >
> >
> > --
> > Joel
> >



Re: How to fetch old messages from kafka

2015-02-05 Thread Joel Koshy
> We can reset the offset and get first 10 messages, but since we need to back 
> in reverse sequence, suppose user has consumed messages upto 100 offset , 
> currently there are only last 10 messages are visible, from 100 -90, now I 
> want to retrieve messages from 80 to 90, how can we do that?

I'm not sure I fully understand the question, but you can always
rewind to an explicit offset - i.e., if you know that you want offset
80 onwards, then set your fetch offset to 80 and start fetching from
there until you hit offset 90 (or whatever limit you need). This
assumes that those messages are still present though on the broker.

On Wed, Feb 04, 2015 at 02:59:17PM +0530, Snehalata Nagaje wrote:
> 
> 
> Hi Mayuresh,
> 
> 
> Thanks for quick response.
> 
> We can reset the offset and get first 10 messages, but since we need to back 
> in reverse sequence, suppose user has consumed messages upto 100 offset , 
> currently there are only last 10 messages are visible, from 100 -90, now I 
> want to retrieve messages from 80 to 90, how can we do that?
> 
> Can we use getOffsetBefore() function to get valid offset before given time, 
> this will return all valid offsets. we can get all valid offsets before 
> latest time.
> 
> Then we can fetch messages from any given valid offset returned from 
> getOffsetBefore().
> 
> is this correct approach?
> 
> Thanks,
> Snehalata
> 
> - Original Message -
> From: gharatmayures...@gmail.com
> To: users@kafka.apache.org
> Sent: Wednesday, February 4, 2015 11:24:46 AM
> Subject: Re: How to fetch old messages from kafka
> 
> In that case you will have to maintain the offsets consumed and reset the 
> offsets in case you need to consume from past.
> 
> For example, suppose you have a userA for which you have a partitionA for 
> topic TopicA. Each page shown to user increments the offset by 10. You have 
> consumed till offset 100 and the user wants to go back 1 page you will have 
> to reset the offset for TopicA partitionA in the zookeeper. Since you are 
> using simple consumer the offset management has to be done by your 
> application.
> 
> Thanks,
> 
> Mayuresh
> 
> Sent from my iPhone
> 
> > On Feb 3, 2015, at 9:18 PM, Snehalata Nagaje 
> >  wrote:
> > 
> > 
> > 
> > Hi ,
> > 
> > 
> > We are using kafka for storing messages in chat application.
> > 
> > Currently we divided each topic in multiple partitions. each partition 
> > stores data for given customer who uses the application.
> > 
> > Right now on very first request, application fetches log from kafka from 
> > earliest valid offset to maxiumum 10 bytes. hence it reads all messages 
> > for given topic
> > 
> > for given partition. Now we want to apply pagination as linkedin, facebook 
> > does. Only latest 10-15 messages should be displayed. And then on scroll 
> > down
> > 
> > fetch next set of previous messages, we are using Simple consumer to fetch 
> > messages.
> > 
> > Can you please guide on this?
> > 
> > 
> > Thanks,
> > Snehalata



Re: How to delete defunct topics

2015-02-05 Thread Joel Koshy
There are mbeans
(http://kafka.apache.org/documentation.html#monitoring) that you can
poke for incoming message rate - if you look at those over a period of
time you can figure out which of those are likely to be defunct and
then delete those topics.

On Thu, Feb 05, 2015 at 02:38:27PM -0800, Jagbir Hooda wrote:
> First I would like to take this opportunity to thank this group for
> releasing 0.8.2.0. It's a major milestone with a rich set of features.
> Kudos to all the contributors! We are still running 0.8.1.2 and are
> planning to upgrade to 0.8.2.0. While planning this upgrade we
> discovered many topics that are no longer active and best be deleted.
> Since it would be a common task faced by many kafka adopters I thought
> I'd raise it here and seek expert advise. Basically what we desire is
> a utility/script/steps to first identify the defunct topics (let's say
> those topics that haven't seen any traffic in the past 'n' days) and
> then delete them. Will appreciate your response.
> 
> Thanks,
> Jagbir



Re: Issue with topic deletion

2015-02-05 Thread Joel Koshy
Thanks will take a look..

On Wed, Feb 04, 2015 at 11:33:03PM -0800, Sumit Rangwala wrote:
> >
> > Any idea why you have session expirations? This is typically due to GC
> >> and/or flaky network. Regardless, we should be handling that scenario
> >> as well. However, your logs seem incomplete. Can you redo this and
> >> perhaps keep the set up running a little longer and send over those
> >> logs?
> >>
> >>
> > I am stress testing my application by doing a large number of read and
> > writes to kafka. My setup consist many docker instances (of brokers and
> > client) running (intentionally) on a single linux box. Since the machine is
> > overload, congested network and long GC are a possibility.
> >
> > I will redo the experiment and keep the kakfa brokers running. However, I
> > will move to 0.8.2 release since Jun asked me to try it for another issue
> > (topic creation). I hope that is fine.
> >
> >
> Joel, Harsha,
> 
> Here are the logs from a different run ( http://d.pr/f/1giCl/PA6GBhrU ).
> Everything is same except that I moved from 0.8.2-beta to 0.8.2.0 (for
> clients as well as brokers). My setup consist of 1 zookeeper, 3 kafka
> brokers, 5 my application nodes. I shutdown all my application nodes after
> the experiment but zookeeper and kafka brokers are still running in case we
> need more data. All the dump of log files on brokers were taken *after* I
> shut down all application nodes to make sure there were no consumer for any
> of the topics.
> 
> The current state of topics on kafka nodes where the topic with deletion
> issue is LAX1-GRIFFIN-r25-1423118905870
> 
> root@69a87c0e7c5b:/# date && $KAFKA_HOME/bin/kafka-topics.sh
> --zookeeper=$ZK_PORT_2181_TCP_ADDR:$ZK_PORT_2181_TCP_PORT --list
> 
> Thu Feb  5 07:15:28 UTC 2015
> LAX1-GRIFFIN-CONTROL
> LAX1-GRIFFIN-r1-1423119435923
> LAX1-GRIFFIN-r10-1423119436039
> LAX1-GRIFFIN-r11-1423119436675
> LAX1-GRIFFIN-r12-1423119436320
> LAX1-GRIFFIN-r13-1423119436162
> LAX1-GRIFFIN-r14-1423119436583
> LAX1-GRIFFIN-r15-1423119436446
> LAX1-GRIFFIN-r16-1423119437195
> LAX1-GRIFFIN-r17-1423119436330
> LAX1-GRIFFIN-r18-1423119436411
> LAX1-GRIFFIN-r19-1423119437150
> LAX1-GRIFFIN-r2-1423119435925
> LAX1-GRIFFIN-r20-1423119436750
> LAX1-GRIFFIN-r21-1423119437717
> LAX1-GRIFFIN-r22-1423119436820
> LAX1-GRIFFIN-r23-1423119436483
> LAX1-GRIFFIN-r24-1423119437210
> LAX1-GRIFFIN-r25-1423118905870 - marked for deletion
> LAX1-GRIFFIN-r25-1423119437277
> LAX1-GRIFFIN-r26-1423119438650
> LAX1-GRIFFIN-r27-1423119436855
> LAX1-GRIFFIN-r28-1423119436566
> LAX1-GRIFFIN-r29-1423119437955
> LAX1-GRIFFIN-r3-1423119435925
> LAX1-GRIFFIN-r30-1423119437949
> LAX1-GRIFFIN-r4-1423119435925
> LAX1-GRIFFIN-r5-1423119435923
> LAX1-GRIFFIN-r6-1423119436354
> LAX1-GRIFFIN-r7-1423119436095
> LAX1-GRIFFIN-r8-1423119436100
> LAX1-GRIFFIN-r9-1423119436377
> 
> Sumit
> 
> 
> 
> 
> >
> > Sumit
> >
> >
> >
> >> Thanks,
> >>
> >> Joel
> >>
> >> On Wed, Feb 04, 2015 at 01:00:46PM -0800, Sumit Rangwala wrote:
> >> > >
> >> > >
> >> > >> I have since stopped the container so I cannot say if
> >> > > LAX1-GRIFFIN-r45-142388317 was one of the topic in "marked for
> >> > > deletion" forever.  However, there were many topics (at least 10 of
> >> them)
> >> > > that were perennially in "marked for deletion" state.
> >> > >
> >> > >
> >> > I have the setup to recreate the issue in case the logs are not
> >> sufficient.
> >> >
> >> >
> >> > Sumit
> >> >
> >> >
> >> >
> >> > > Sumit
> >> > >
> >> > >
> >> > >
> >> > >
> >> > >
> >> > >> -Harsha
> >> > >>
> >> > >> On Tue, Feb 3, 2015, at 09:19 PM, Harsha wrote:
> >> > >> > you are probably handling it but there is a case where you call
> >> > >> > deleteTopic and kafka goes through delete topic process but your
> >> > >> > consumer is running probably made a TopicMetadataRequest for the
> >> same
> >> > >> > topic which can re-create the topic with the default
> >> num.partitions and
> >> > >> > replication.factor.  Did you try stopping the consumer first and
> >> issue
> >> > >> > the topic delete.
> >> > >> > -Harsha
>

Re: question about new consumer offset management in 0.8.2

2015-02-05 Thread Joel Koshy
> Ok, so it looks like the default settings are:
> offset.storage = zookeeper
> dual.commit.enabled = true
> The doc for 'dual.commit.enabled' seems to imply (but doesn't clearly
> state) that it will only apply if offset.storage = kafka.  Is that right?
> (I'm guessing not)

dual.commit.enabled defaults to true only if offset.storage is kafka.
As you noted, it only applies if offset.storage = kafka is primarily
intended for migration.

> It seems to me less than ideal to have the default behavior to have
> dual.commit.enabled = true, since this seems like a performance hit, no?

To some degree yes, but it is relatively cheap.

> I'd think you'd only want this during a planned migration.

Yes.

> 
> Also, I assume it's desirable to switch to using 'kafka' for offset
> storage, for performance reasons?  Will it better handle a larger number of
> topics?

Yes.

> Also, I assume the __consumer_offsets topic will be set to have an infinite
> retention policy internally, is that right?  So that currently committed
> offsets for a given consumer group won't be lost?

It uses the "compaction" retention policy - so the topic won't grow
unbounded. Compaction will basically dedupe on the inactive segments
of the topic - so in effect it will only maintain the last committed
offset for a given group-topic-partition 3-tuple (plus a few - since
it runs only when a certain dirtiness threshold has been met). The
compaction policy is also documented on the site.

Thanks,

Joel

> On Thu, Feb 5, 2015 at 2:21 PM, Joel Koshy  wrote:
> 
> > This is documented in the official docs:
> > http://kafka.apache.org/documentation.html#distributionimpl
> >
> > On Thu, Feb 05, 2015 at 01:23:01PM -0500, Jason Rosenberg wrote:
> > > What are the defaults for those settings (I assume it will be to continue
> > > using only zookeeper by default)?
> > >
> > > Also, if I have a cluster of consumers sharing the same groupId, and I
> > > update them via a rolling release, will it be a problem during the
> > rolling
> > > restart if there is inconsistency in the settings for a short time?  Or
> > is
> > > it required that the entire cluster be stopped, then update configs, then
> > > restart all nodes?
> > >
> > > Jason
> > >
> > > On Thu, Feb 5, 2015 at 12:45 PM, Gwen Shapira 
> > wrote:
> > >
> > > > Thanks Jon. I updated the FAQ with your procedure:
> > > >
> > > >
> > > >
> > https://cwiki.apache.org/confluence/display/KAFKA/FAQ#FAQ-HowdowemigratetocommittingoffsetstoKafka(ratherthanZookeeper)in0.8.2
> > > > ?
> > > >
> > > > On Thu, Feb 5, 2015 at 9:16 AM, Jon Bringhurst <
> > > > jbringhu...@linkedin.com.invalid> wrote:
> > > >
> > > > > There should probably be a wiki page started for this so we have the
> > > > > details in one place. The same question was asked on Freenode IRC a
> > few
> > > > > minutes ago. :)
> > > > >
> > > > > A summary of the migration procedure is:
> > > > >
> > > > > 1) Upgrade your brokers and set dual.commit.enabled=false and
> > > > > offsets.storage=zookeeper (Commit offsets to Zookeeper Only).
> > > > > 2) Set dual.commit.enabled=true and offsets.storage=kafka and restart
> > > > > (Commit offsets to Zookeeper and Kafka).
> > > > > 3) Set dual.commit.enabled=false and offsets.storage=kafka and
> > restart
> > > > > (Commit offsets to Kafka only).
> > > > >
> > > > > -Jon
> > > > >
> > > > > On Feb 5, 2015, at 9:03 AM, Jason Rosenberg 
> > wrote:
> > > > >
> > > > > > Hi,
> > > > > >
> > > > > > For 0.8.2, one of the features listed is:
> > > > > >  - Kafka-based offset storage.
> > > > > >
> > > > > > Is there documentation on this (I've heard discussion of it of
> > course)?
> > > > > >
> > > > > > Also, is it something that will be used by existing consumers when
> > they
> > > > > > migrate up to 0.8.2?  What is the migration process?
> > > > > >
> > > > > > Thanks,
> > > > > >
> > > > > > Jason
> > > > >
> > > > >
> > > >
> >
> >

-- 
Joel


Re: generics type for Producer and Consumer do not need to match?

2015-02-05 Thread Joel Koshy
There has to be an implicit contract between the producer and
consumer. The K, V pairs don't _need_ to match but generally _should_.
If producer sends with  the consumer may receive  as
long as it knows how to convert those raw bytes to . In the
example if CK == byte[] and CV == byte[] it is effectively a no-op
conversion.

On Thu, Feb 05, 2015 at 12:11:03PM -0800, Yang wrote:
> in the example
> https://cwiki.apache.org/confluence/display/KAFKA/0.8.0+Producer+Example
> 
> we use a String,String for 
> 
> in the same set of example
> https://cwiki.apache.org/confluence/display/KAFKA/Consumer+Group+Example
> 
> on the consumer side we use byte[], byte[] for K,V
> ConsumerIterator it = m_stream.iterator();
> 
> 
> 
> we tested in our own code that the above producer-consumer pair does work.
> 
> 
> so the K,V types on the 2 sides do NOT need to match? if we argue that
> "everything has to come down to byte[], then maybe we don't need the
> generics types on the consumer side? in other words, how do we determine
> the K,V types on consumer side?
> 
> thanks
> Yang



Re: Get Latest Offset for Specific Topic for All Partition

2015-02-05 Thread Joel Koshy
https://cwiki.apache.org/confluence/display/KAFKA/FAQ#FAQ-HowdoIaccuratelygetoffsetsofmessagesforacertaintimestampusingOffsetRequest?

However, you will need to issue a TopicMetadataRequest first to
discover the leaders for all the partitions and then issue the offset
request.

On Thu, Feb 05, 2015 at 11:54:00AM -0800, Bhavesh Mistry wrote:
> HI All,
> 
> I just need to get the latest offset # for topic (not for consumer group).
> Which API to get this info ?
> 
> My use case is to analyze the data injection rate to each of  partition is
> uniform or not (close). For this,  I am planing to dump the latest offset
> into graphite  for each partition and look at derivative over time.
> 
> Thanks,
> 
> Bhavesh



Re: question about new consumer offset management in 0.8.2

2015-02-05 Thread Joel Koshy
This is documented in the official docs: 
http://kafka.apache.org/documentation.html#distributionimpl

On Thu, Feb 05, 2015 at 01:23:01PM -0500, Jason Rosenberg wrote:
> What are the defaults for those settings (I assume it will be to continue
> using only zookeeper by default)?
> 
> Also, if I have a cluster of consumers sharing the same groupId, and I
> update them via a rolling release, will it be a problem during the rolling
> restart if there is inconsistency in the settings for a short time?  Or is
> it required that the entire cluster be stopped, then update configs, then
> restart all nodes?
> 
> Jason
> 
> On Thu, Feb 5, 2015 at 12:45 PM, Gwen Shapira  wrote:
> 
> > Thanks Jon. I updated the FAQ with your procedure:
> >
> >
> > https://cwiki.apache.org/confluence/display/KAFKA/FAQ#FAQ-HowdowemigratetocommittingoffsetstoKafka(ratherthanZookeeper)in0.8.2
> > ?
> >
> > On Thu, Feb 5, 2015 at 9:16 AM, Jon Bringhurst <
> > jbringhu...@linkedin.com.invalid> wrote:
> >
> > > There should probably be a wiki page started for this so we have the
> > > details in one place. The same question was asked on Freenode IRC a few
> > > minutes ago. :)
> > >
> > > A summary of the migration procedure is:
> > >
> > > 1) Upgrade your brokers and set dual.commit.enabled=false and
> > > offsets.storage=zookeeper (Commit offsets to Zookeeper Only).
> > > 2) Set dual.commit.enabled=true and offsets.storage=kafka and restart
> > > (Commit offsets to Zookeeper and Kafka).
> > > 3) Set dual.commit.enabled=false and offsets.storage=kafka and restart
> > > (Commit offsets to Kafka only).
> > >
> > > -Jon
> > >
> > > On Feb 5, 2015, at 9:03 AM, Jason Rosenberg  wrote:
> > >
> > > > Hi,
> > > >
> > > > For 0.8.2, one of the features listed is:
> > > >  - Kafka-based offset storage.
> > > >
> > > > Is there documentation on this (I've heard discussion of it of course)?
> > > >
> > > > Also, is it something that will be used by existing consumers when they
> > > > migrate up to 0.8.2?  What is the migration process?
> > > >
> > > > Thanks,
> > > >
> > > > Jason
> > >
> > >
> >



Re: Issue with topic deletion

2015-02-04 Thread Joel Koshy
I took a look at your logs. I agree with Harsh that the logs seem
truncated. The basic issue though is that you have session expirations
and controller failover. Broker 49554 was the controller and hosted
some partition(s) of LAX1-GRIFFIN-r13-1423001701601. After controller
failover the new controller marks it as ineligible for deletion since
49554 is considered down (until it re-registers in zookeeper) and is
relected as the leader - however I don't see those logs.

Any idea why you have session expirations? This is typically due to GC
and/or flaky network. Regardless, we should be handling that scenario
as well. However, your logs seem incomplete. Can you redo this and
perhaps keep the set up running a little longer and send over those
logs?

Thanks,

Joel

On Wed, Feb 04, 2015 at 01:00:46PM -0800, Sumit Rangwala wrote:
> >
> >
> >> I have since stopped the container so I cannot say if
> > LAX1-GRIFFIN-r45-142388317 was one of the topic in "marked for
> > deletion" forever.  However, there were many topics (at least 10 of them)
> > that were perennially in "marked for deletion" state.
> >
> >
> I have the setup to recreate the issue in case the logs are not sufficient.
> 
> 
> Sumit
> 
> 
> 
> > Sumit
> >
> >
> >
> >
> >
> >> -Harsha
> >>
> >> On Tue, Feb 3, 2015, at 09:19 PM, Harsha wrote:
> >> > you are probably handling it but there is a case where you call
> >> > deleteTopic and kafka goes through delete topic process but your
> >> > consumer is running probably made a TopicMetadataRequest for the same
> >> > topic which can re-create the topic with the default num.partitions and
> >> > replication.factor.  Did you try stopping the consumer first and issue
> >> > the topic delete.
> >> > -Harsha
> >> >
> >> > On Tue, Feb 3, 2015, at 08:37 PM, Sumit Rangwala wrote:
> >> > > On Tue, Feb 3, 2015 at 6:48 PM, Harsha  wrote:
> >> > >
> >> > > > Sumit,
> >> > > >lets say you are deleting a older topic "test1" do you have
> >> any
> >> > > >consumers running simultaneously for the topic "test1"  while
> >> > > >deletion of topic going on.
> >> > > >
> >> > >
> >> > > Yes it is the case. However, after a small period of time (say few
> >> > > minutes)
> >> > > there won't be any consumer running for the deleted topic.
> >> > >
> >> > >
> >> > > Sumit
> >> > >
> >> > >
> >> > >
> >> > >
> >> > > > -Harsha
> >> > > >
> >> > > > On Tue, Feb 3, 2015, at 06:17 PM, Joel Koshy wrote:
> >> > > > > Thanks for the logs - will take a look tomorrow unless someone
> >> else
> >> > > > > gets a chance to get to it today.
> >> > > > >
> >> > > > > Joel
> >> > > > >
> >> > > > > On Tue, Feb 03, 2015 at 04:11:57PM -0800, Sumit Rangwala wrote:
> >> > > > > > On Tue, Feb 3, 2015 at 3:37 PM, Joel Koshy  >> >
> >> > > > wrote:
> >> > > > > >
> >> > > > > > > Hey Sumit,
> >> > > > > > >
> >> > > > > > > I thought you would be providing the actual steps to
> >> reproduce :)
> >> > > > > > >
> >> > > > > >
> >> > > > > > I want to but some proprietary code prevents me to do it.
> >> > > > > >
> >> > > > > >
> >> > > > > > > Nevertheless, can you get all the relevant logs: state change
> >> logs
> >> > > > and
> >> > > > > > > controller logs at the very least and if possible server logs
> >> and
> >> > > > send
> >> > > > > > > those over?
> >> > > > > > >
> >> > > > > >
> >> > > > > > Here are all the logs you requested (there are three brokers in
> >> my
> >> > > > setup
> >> > > > > > k1, k2, k3): http://d.pr/f/1kprY/2quHBRRT (Gmail has issue
> >> with the
> >> > > > file)
> >> > > > > >
> >> > > > > >
> >> >

Re: Issue with topic deletion

2015-02-03 Thread Joel Koshy
Thanks for the logs - will take a look tomorrow unless someone else
gets a chance to get to it today.

Joel

On Tue, Feb 03, 2015 at 04:11:57PM -0800, Sumit Rangwala wrote:
> On Tue, Feb 3, 2015 at 3:37 PM, Joel Koshy  wrote:
> 
> > Hey Sumit,
> >
> > I thought you would be providing the actual steps to reproduce :)
> >
> 
> I want to but some proprietary code prevents me to do it.
> 
> 
> > Nevertheless, can you get all the relevant logs: state change logs and
> > controller logs at the very least and if possible server logs and send
> > those over?
> >
> 
> Here are all the logs you requested (there are three brokers in my setup
> k1, k2, k3): http://d.pr/f/1kprY/2quHBRRT (Gmail has issue with the file)
> 
> 
> Sumit
> 
> 
> 
> 
> >
> > Joel
> >
> > On Tue, Feb 03, 2015 at 03:27:43PM -0800, Sumit Rangwala wrote:
> > > In my setup kafka brokers are set for auto topic creation. In the
> > scenario
> > > below a node informs other nodes (currently 5 in total) about  a number
> > of
> > > new (non-existent) topics, and  all the nodes almost simultaneously open
> > a
> > > consumer for each of those topics. Sometime later another node informs
> > all
> > > other nodes of a new list of topics and each node, if they find that an
> > > older topic exists in kafka, goes ahead and deletes the older topic.
> > What I
> > > have found is that many of the topics stay in the "marked for deletion"
> > > state forever.
> > >
> > >
> > > I get the list of topics using ZkUtils.getAllTopics(zkClient) and delete
> > > topics using AdminUtils.deleteTopic(zkClient, topic). Since many nodes
> > > might try to delete the same topic at the same time I do
> > > see ZkNodeExistsException while deleting the topic, which I catch an
> > > ignore. (e.g., org.apache.zookeeper.KeeperException$NodeExistsException:
> > > KeeperErrorCode = NodeExists for
> > > /admin/delete_topics/LAX1-GRIFFIN-r13-1423001701601)
> > >
> > > # State of one deleted topic on kafka brokers:
> > > Topic:LAX1-GRIFFIN-r13-1423001701601 PartitionCount:8 ReplicationFactor:1
> > > Configs:
> > > Topic: LAX1-GRIFFIN-r13-1423001701601 Partition: 0 Leader: -1 Replicas:
> > > 49558 Isr:
> > > Topic: LAX1-GRIFFIN-r13-1423001701601 Partition: 1 Leader: -1 Replicas:
> > > 49554 Isr:
> > > Topic: LAX1-GRIFFIN-r13-1423001701601 Partition: 2 Leader: -1 Replicas:
> > > 49557 Isr:
> > > Topic: LAX1-GRIFFIN-r13-1423001701601 Partition: 3 Leader: -1 Replicas:
> > > 49558 Isr:
> > > Topic: LAX1-GRIFFIN-r13-1423001701601 Partition: 4 Leader: -1 Replicas:
> > > 49554 Isr:
> > > Topic: LAX1-GRIFFIN-r13-1423001701601 Partition: 5 Leader: -1 Replicas:
> > > 49557 Isr:
> > > Topic: LAX1-GRIFFIN-r13-1423001701601 Partition: 6 Leader: -1 Replicas:
> > > 49558 Isr:
> > > Topic: LAX1-GRIFFIN-r13-1423001701601 Partition: 7 Leader: -1 Replicas:
> > > 49554 Isr:
> > >
> > >
> > > # Controller log says
> > >
> > > [2015-02-03 22:59:03,399] INFO [delete-topics-thread-49554], Deletion for
> > > replicas 49557,49554,49558 for partition
> > >
> > [LAX1-GRIFFIN-r13-1423001701601,0],[LAX1-GRIFFIN-r13-1423001701601,6],[LAX1-GRIFFIN-r13-1423001701601,5],[LAX1-GRIFFIN-r13-1423001701601,3],[LAX1-GRIFFIN-r13-1423001701601,7],[LAX1-GRIFFIN-r13-1423001701601,1],[LAX1-GRIFFIN-r13-1423001701601,4],[LAX1-GRIFFIN-r13-1423001701601,2]
> > > of topic LAX1-GRIFFIN-r13-1423001701601 in progress
> > > (kafka.controller.TopicDeletionManager$DeleteTopicsThread)
> > >
> > > current time: Tue Feb  3 23:20:58 UTC 2015
> > >
> > >
> > > Since I don't know the delete topic algorithm, I am not sure why sure
> > these
> > > topics are not garbage collected. I do have the complete setup running in
> > > docker right now on my local box so please let me know if any more info
> > is
> > > required to troubleshoot this issue.
> > >
> > >
> > > Furthermore, does ZkUtils.getAllTopics(zkClient) return "marked for
> > > deletion" topic as well. If so, is there a easy way to get a list of
> > active
> > > topics (other than looking at all the topics in /admin/delete_topics/ and
> > > taking a set difference with the topics returned
> > > by ZkUtils.getAllTopics(zkClient) )
> > >
> > > Sumit
> > > (More setup info below)
> > >
> > >
> > > Setup
> > > 
> > > Zookeeper: 3.4.6
> > > Kafka broker: 0.8.2-beta
> > > Kafka clients: 0.8.2-beta
> > >
> > > # Kafka boker settings (all other settings are default 0.8.2-beta
> > settings)
> > > kafka.controlled.shutdown.enable: 'FALSE'
> > > kafka.auto.create.topics.enable: 'TRUE'
> > > kafka.num.partitions: 8
> > > kafka.default.replication.factor: 1
> > > kafka.rebalance.backoff.ms: 3000
> > > kafka.rebalance.max.retries: 10
> > > kafka.log.retention.minutes: 1200
> > > kafka.delete.topic.enable: 'TRUE'
> >
> >



Re: Turning on cleanup.policy=compact for a topic => not starting cleanup ?

2015-02-03 Thread Joel Koshy
Oh yes I forgot about that possibility. I think this should be fixed by 
KAFKA-1641 which will be in the next release.

On Tue, Feb 03, 2015 at 11:50:54PM +, Thunder Stumpges wrote:
> AH hah! The log-cleaner.log file did NOT go into the logs directory for some 
> reason, found it in the kafka root directory. But in there I found the 
> following output:
> 
> [2015-02-03 15:43:08,128] INFO Cleaner 0: Beginning cleaning of log 
> dev_testcompact-9. (kafka.log.LogCleaner)
> [2015-02-03 15:43:08,129] INFO Cleaner 0: Building offset map for 
> dev_testcompact-9... (kafka.log.LogCleaner)
> [2015-02-03 15:43:08,222] INFO Cleaner 0: Building offset map for log 
> dev_testcompact-9 for 15 segments in offset range [0, 230). 
> (kafka.log.LogCleaner)
> [2015-02-03 15:43:08,227] ERROR [kafka-log-cleaner-thread-0], Error due to  
> (kafka.log.LogCleaner)
> java.lang.IllegalArgumentException: requirement failed: Last clean offset is 
> 0 but segment base offset is 152 for log dev_testcompact -9.
> at scala.Predef$.require(Predef.scala:214)
> at kafka.log.Cleaner.buildOffsetMap(LogCleaner.scala:491)
> at kafka.log.Cleaner.clean(LogCleaner.scala:288)
> at 
> kafka.log.LogCleaner$CleanerThread.cleanOrSleep(LogCleaner.scala:202)
> at kafka.log.LogCleaner$CleanerThread.doWork(LogCleaner.scala:187)
> at kafka.utils.ShutdownableThread.run(ShutdownableThread.scala:51)
> [2015-02-03 15:43:08,232] INFO [kafka-log-cleaner-thread-0], Stopped  
> (kafka.log.LogCleaner)
> 
> 
> It appears it doesn't like the fact that some of the log segments have been 
> deleted already... ? From when I was using cleanup.policy=delete ?
> 
> "Last clean offset is 0 but segment base offset is 152 for log 
> dev_vsw.avrodto.addelivery.machinelearning.adbanditmodel-9."
> 
> Thanks,
> Thunder
> 
> -Original Message-
> From: Thunder Stumpges [mailto:tstump...@ntent.com] 
> Sent: Tuesday, February 03, 2015 3:37 PM
> To: users@kafka.apache.org
> Subject: RE: Turning on cleanup.policy=compact for a topic => not starting 
> cleanup ?
> 
> Thanks Joel for the quick turnaround!
> 
> 1st, I took a look at "max-dirty-percent" and noticed it is "100", so that's 
> a good sign that it *should* have something to clean.
> 
> 2nd, I didn't realize the cleaner messages went to a different log! However I 
> have NO log-cleaner.log file in the normal logs directory. It does seem to be 
> configured in log4j (we haven't changed any of the defaults here):
> 
> log4j.appender.cleanerAppender=org.apache.log4j.DailyRollingFileAppender
> log4j.appender.cleanerAppender.DatePattern='.'-MM-dd-HH
> log4j.appender.cleanerAppender.File=log-cleaner.log
> log4j.appender.cleanerAppender.layout=org.apache.log4j.PatternLayout
> log4j.appender.cleanerAppender.layout.ConversionPattern=[%d] %p %m (%c)%n
> 
> log4j.logger.kafka.log.LogCleaner=INFO, cleanerAppender 
> log4j.additivity.kafka.log.LogCleaner=false
> 
> 3rd, no messages using compaction, all messages have a key.
> 
> So, lastly the thread dump. Not sure exactly how to do that or what thread 
> I'd be looking for specifically... Found a suggestion to run 
> 
> Jstack -l  > jstack.out
> 
> So I did that, and looked for anything containing "Clean" or "clean" and no 
> matches.
> 
> I will be trying to set the log level for LogCleaner to debug to see if that 
> helps at all.
> 
> Thanks in advance,
> Thunder
> 
> 
> -Original Message-
> From: Joel Koshy [mailto:jjkosh...@gmail.com]
> Sent: Tuesday, February 03, 2015 3:07 PM
> To: users@kafka.apache.org
> Subject: Re: Turning on cleanup.policy=compact for a topic => not starting 
> cleanup ?
> 
> - Can you check the log cleaner logs?
> - Do you have any compressed messages in your log? Or messages without
>   a key?
> - By default it is in a log-cleaner.log file unless you modified that.
> - Can you take a thread-dump to see if the log cleaner is still alive?
> - Also, there is an mbean that you can poke: "max-dirty-percent" under
>   log cleaner. Can you check on its value?
> 
> Thanks,
> 
> Joel
> 
> On Tue, Feb 03, 2015 at 10:59:27PM +, Thunder Stumpges wrote:
> > Hi guys, I am having a difficult time getting the log compaction to run on 
> > a topic I created initially with cleanup.policy=delete. Here's the details:
> > 
> > 3 brokers, 0.8.1.1 all have log.cleaner.enable=true (other than this 
> > there are no non-defaults set in our server.properties)
> > 
> > Create the topic first with policy=delete and a short window. Wanted to see 
> > that the segme

Re: Turning on cleanup.policy=compact for a topic => not starting cleanup ?

2015-02-03 Thread Joel Koshy
> 
> So, lastly the thread dump. Not sure exactly how to do that or what thread 
> I'd be looking for specifically... Found a suggestion to run 
> 
> Jstack -l  > jstack.out

That should work, although you can just send SIGQUIT (kill -3) to the
PID.

> So I did that, and looked for anything containing "Clean" or "clean" and no 
> matches.

So the cleaner is not there. Usually this is due to a fatal error
while doing the cleaning round - most often due to a compressed
message leaking in or insufficient memory. There should be a
stacktrace in the broker log if this is the case.

> 
> 
> -Original Message-
> From: Joel Koshy [mailto:jjkosh...@gmail.com] 
> Sent: Tuesday, February 03, 2015 3:07 PM
> To: users@kafka.apache.org
> Subject: Re: Turning on cleanup.policy=compact for a topic => not starting 
> cleanup ?
> 
> - Can you check the log cleaner logs?
> - Do you have any compressed messages in your log? Or messages without
>   a key?
> - By default it is in a log-cleaner.log file unless you modified that.
> - Can you take a thread-dump to see if the log cleaner is still alive?
> - Also, there is an mbean that you can poke: "max-dirty-percent" under
>   log cleaner. Can you check on its value?
> 
> Thanks,
> 
> Joel
> 
> On Tue, Feb 03, 2015 at 10:59:27PM +, Thunder Stumpges wrote:
> > Hi guys, I am having a difficult time getting the log compaction to run on 
> > a topic I created initially with cleanup.policy=delete. Here's the details:
> > 
> > 3 brokers, 0.8.1.1 all have log.cleaner.enable=true (other than this there 
> > are no non-defaults set in our server.properties)
> > 
> > Create the topic first with policy=delete and a short window. Wanted to see 
> > that the segments were being created and deleted by the "normal" delete 
> > retention. This seemed to work fine. New segments created every 6 seconds 
> > while I add some test data, then the cleaner comes along and deletes log 
> > segments after 1min.
> > 
> > sudo bin/kafka-topics.sh --zookeeper zk1:2181/kafka --create --topic 
> > dev_testcompact --partitions 10 --replication-factor 3 --config 
> > cleanup.policy=delete --config segment.ms=6000 --config retention.ms=6
> > 
> > 
> > Then I tried to update the policy to do compact:
> > sudo bin/kafka-topics.sh --zookeeper zk1:2181/kafka --alter --topic 
> > dev_testcompact --config cleanup.policy=compact --config 
> > min.cleanable.dirty.ratio=0.3 -config delete.retention.ms=6
> > 
> > From this point, the deletion retention stopped cleaning up, but the 
> > compact cleaner doesn't seem to be running at all. I was expecting that 
> > after 15 seconds (default for setting log.cleaner.backoff.ms) the cleaner 
> > would come in and compact the old segments. I now have generated 400 rows 
> > across 40 keys (so there is plenty to compact). Segments are being created 
> > every 6 seconds per the config.
> > 
> > I have also tried removing the deletion related config:
> > 
> > sudo bin/kafka-topics.sh --zookeeper zk1:2181/kafka --alter --topic 
> > dev_testcompact --deleteConfig retention.ms
> > 
> > I then restarted one of the three brokers wondering if it needed to restart 
> > to pick up new configs, still no clenup. What am I doing wrong?!
> 

-- 
Joel


Re: Issue with topic deletion

2015-02-03 Thread Joel Koshy
Hey Sumit,

I thought you would be providing the actual steps to reproduce :)
Nevertheless, can you get all the relevant logs: state change logs and
controller logs at the very least and if possible server logs and send
those over?

Joel

On Tue, Feb 03, 2015 at 03:27:43PM -0800, Sumit Rangwala wrote:
> In my setup kafka brokers are set for auto topic creation. In the scenario
> below a node informs other nodes (currently 5 in total) about  a number of
> new (non-existent) topics, and  all the nodes almost simultaneously open a
> consumer for each of those topics. Sometime later another node informs all
> other nodes of a new list of topics and each node, if they find that an
> older topic exists in kafka, goes ahead and deletes the older topic. What I
> have found is that many of the topics stay in the "marked for deletion"
> state forever.
> 
> 
> I get the list of topics using ZkUtils.getAllTopics(zkClient) and delete
> topics using AdminUtils.deleteTopic(zkClient, topic). Since many nodes
> might try to delete the same topic at the same time I do
> see ZkNodeExistsException while deleting the topic, which I catch an
> ignore. (e.g., org.apache.zookeeper.KeeperException$NodeExistsException:
> KeeperErrorCode = NodeExists for
> /admin/delete_topics/LAX1-GRIFFIN-r13-1423001701601)
> 
> # State of one deleted topic on kafka brokers:
> Topic:LAX1-GRIFFIN-r13-1423001701601 PartitionCount:8 ReplicationFactor:1
> Configs:
> Topic: LAX1-GRIFFIN-r13-1423001701601 Partition: 0 Leader: -1 Replicas:
> 49558 Isr:
> Topic: LAX1-GRIFFIN-r13-1423001701601 Partition: 1 Leader: -1 Replicas:
> 49554 Isr:
> Topic: LAX1-GRIFFIN-r13-1423001701601 Partition: 2 Leader: -1 Replicas:
> 49557 Isr:
> Topic: LAX1-GRIFFIN-r13-1423001701601 Partition: 3 Leader: -1 Replicas:
> 49558 Isr:
> Topic: LAX1-GRIFFIN-r13-1423001701601 Partition: 4 Leader: -1 Replicas:
> 49554 Isr:
> Topic: LAX1-GRIFFIN-r13-1423001701601 Partition: 5 Leader: -1 Replicas:
> 49557 Isr:
> Topic: LAX1-GRIFFIN-r13-1423001701601 Partition: 6 Leader: -1 Replicas:
> 49558 Isr:
> Topic: LAX1-GRIFFIN-r13-1423001701601 Partition: 7 Leader: -1 Replicas:
> 49554 Isr:
> 
> 
> # Controller log says
> 
> [2015-02-03 22:59:03,399] INFO [delete-topics-thread-49554], Deletion for
> replicas 49557,49554,49558 for partition
> [LAX1-GRIFFIN-r13-1423001701601,0],[LAX1-GRIFFIN-r13-1423001701601,6],[LAX1-GRIFFIN-r13-1423001701601,5],[LAX1-GRIFFIN-r13-1423001701601,3],[LAX1-GRIFFIN-r13-1423001701601,7],[LAX1-GRIFFIN-r13-1423001701601,1],[LAX1-GRIFFIN-r13-1423001701601,4],[LAX1-GRIFFIN-r13-1423001701601,2]
> of topic LAX1-GRIFFIN-r13-1423001701601 in progress
> (kafka.controller.TopicDeletionManager$DeleteTopicsThread)
> 
> current time: Tue Feb  3 23:20:58 UTC 2015
> 
> 
> Since I don't know the delete topic algorithm, I am not sure why sure these
> topics are not garbage collected. I do have the complete setup running in
> docker right now on my local box so please let me know if any more info is
> required to troubleshoot this issue.
> 
> 
> Furthermore, does ZkUtils.getAllTopics(zkClient) return "marked for
> deletion" topic as well. If so, is there a easy way to get a list of active
> topics (other than looking at all the topics in /admin/delete_topics/ and
> taking a set difference with the topics returned
> by ZkUtils.getAllTopics(zkClient) )
> 
> Sumit
> (More setup info below)
> 
> 
> Setup
> 
> Zookeeper: 3.4.6
> Kafka broker: 0.8.2-beta
> Kafka clients: 0.8.2-beta
> 
> # Kafka boker settings (all other settings are default 0.8.2-beta settings)
> kafka.controlled.shutdown.enable: 'FALSE'
> kafka.auto.create.topics.enable: 'TRUE'
> kafka.num.partitions: 8
> kafka.default.replication.factor: 1
> kafka.rebalance.backoff.ms: 3000
> kafka.rebalance.max.retries: 10
> kafka.log.retention.minutes: 1200
> kafka.delete.topic.enable: 'TRUE'



Re: Turning on cleanup.policy=compact for a topic => not starting cleanup ?

2015-02-03 Thread Joel Koshy
- Can you check the log cleaner logs?
- Do you have any compressed messages in your log? Or messages without
  a key?
- By default it is in a log-cleaner.log file unless you modified that.
- Can you take a thread-dump to see if the log cleaner is still alive?
- Also, there is an mbean that you can poke: "max-dirty-percent" under
  log cleaner. Can you check on its value?

Thanks,

Joel

On Tue, Feb 03, 2015 at 10:59:27PM +, Thunder Stumpges wrote:
> Hi guys, I am having a difficult time getting the log compaction to run on a 
> topic I created initially with cleanup.policy=delete. Here's the details:
> 
> 3 brokers, 0.8.1.1 all have log.cleaner.enable=true (other than this there 
> are no non-defaults set in our server.properties)
> 
> Create the topic first with policy=delete and a short window. Wanted to see 
> that the segments were being created and deleted by the "normal" delete 
> retention. This seemed to work fine. New segments created every 6 seconds 
> while I add some test data, then the cleaner comes along and deletes log 
> segments after 1min.
> 
> sudo bin/kafka-topics.sh --zookeeper zk1:2181/kafka --create --topic 
> dev_testcompact --partitions 10 --replication-factor 3 --config 
> cleanup.policy=delete --config segment.ms=6000 --config retention.ms=6
> 
> 
> Then I tried to update the policy to do compact:
> sudo bin/kafka-topics.sh --zookeeper zk1:2181/kafka --alter --topic 
> dev_testcompact --config cleanup.policy=compact --config 
> min.cleanable.dirty.ratio=0.3 -config delete.retention.ms=6
> 
> From this point, the deletion retention stopped cleaning up, but the compact 
> cleaner doesn't seem to be running at all. I was expecting that after 15 
> seconds (default for setting log.cleaner.backoff.ms) the cleaner would come 
> in and compact the old segments. I now have generated 400 rows across 40 keys 
> (so there is plenty to compact). Segments are being created every 6 seconds 
> per the config.
> 
> I have also tried removing the deletion related config:
> 
> sudo bin/kafka-topics.sh --zookeeper zk1:2181/kafka --alter --topic 
> dev_testcompact --deleteConfig retention.ms
> 
> I then restarted one of the three brokers wondering if it needed to restart 
> to pick up new configs, still no clenup. What am I doing wrong?!



Re: kafka-web-console goes down regularly

2015-02-03 Thread Joel Koshy
Can you contact the maintainer directly?
http://github.com/claudemamo/kafka-web-console/issues

On Tue, Feb 03, 2015 at 12:09:46PM -0800, Sa Li wrote:
> Hi, All
> 
> I am currently using kafka-web-console to monitor the kafka system, it get
> down regularly, so I have to restart it every few hours which is kinda
> annoying. I downloaded two versions
> 
> https://github.com/claudemamo/kafka-web-console
> http://mungeol-heo.blogspot.ca/2014/12/kafka-web-console.html
> 
> And I started by
> 
> play start (using 9000), or play "start -Dhttp.port=8080", either version
> can make it work fine at the beginning, but down after few hours. I am
> thinking use upstart to make it up automatically, but it should not be what
> is suppose to be, any idea to fix the problem, or I did something wrong?
> 
> thanks
> 
> -- 
> 
> Alec Li



Re: Can't create a topic; can't delete it either

2015-02-03 Thread Joel Koshy
That would be great!

Joel

On Tue, Feb 03, 2015 at 01:39:02PM -0800, Sumit Rangwala wrote:
> Joel,
> 
> My apology for being silent on this thread. I have now been able to
> recreate the setup in a docker environment and I can now consistently
> re-create an issue with auto-topic creation. Since the exact setup is
> different I will a start another thread will the the information.
> 
> Sumit
> 
> On Thu, Jan 29, 2015 at 1:29 AM, Joel Koshy  wrote:
> 
> > > If you can tell me where the find the logs I can check. I haven't
> > restarted
> > > my brokers since the issue.
> >
> > This will be specified in the log4j properties that you are using.
> >
> > On Wed, Jan 28, 2015 at 12:01:01PM -0800, Sumit Rangwala wrote:
> > > On Tue, Jan 27, 2015 at 10:54 PM, Joel Koshy 
> > wrote:
> > >
> > > > Do you still have the controller and state change logs from the time
> > > > you originally tried to delete the topic?
> > > >
> > > >
> > > If you can tell me where the find the logs I can check. I haven't
> > restarted
> > > my brokers since the issue.
> > >
> > > Sumit
> > >
> > >
> > >
> > > > On Tue, Jan 27, 2015 at 03:11:48PM -0800, Sumit Rangwala wrote:
> > > > > I am using 0.8.2-beta on brokers 0.8.1.1 for client (producer and
> > > > > consumers). delete.topic.enable=true on all brokers. replication
> > factor
> > > > is
> > > > > < number of brokers. I see this issue with just one single topic, all
> > > > other
> > > > > topics are fine (creation and deletion). Even after a day it is
> > still in
> > > > > marked for deletion stage. Let me know what other  information from
> > the
> > > > > brokers or the zookeepers can help me debug this issue.
> > > > >
> > > > > On Tue, Jan 27, 2015 at 9:47 AM, Gwen Shapira  > >
> > > > wrote:
> > > > >
> > > > > > Also, do you have delete.topic.enable=true on all brokers?
> > > > > >
> > > > > > The automatic topic creation can fail if the default number of
> > > > > > replicas is greater than number of available brokers. Check the
> > > > > > default.replication.factor parameter.
> > > > > >
> > > > > > Gwen
> > > > > >
> > > > > > On Tue, Jan 27, 2015 at 12:29 AM, Joel Koshy 
> > > > wrote:
> > > > > > > Which version of the broker are you using?
> > > > > > >
> > > > > > > On Mon, Jan 26, 2015 at 10:27:14PM -0800, Sumit Rangwala wrote:
> > > > > > >> While running kafka in production I found an issue where a topic
> > > > wasn't
> > > > > > >> getting created even with auto topic enabled. I then went ahead
> > and
> > > > > > created
> > > > > > >> the topic manually (from the command line). I then delete the
> > topic,
> > > > > > again
> > > > > > >> manually. Now my broker won't allow me to either create *the*
> > topic
> > > > or
> > > > > > >> delete *the* topic. (other topic creation and deletion is
> > working
> > > > fine).
> > > > > > >>
> > > > > > >> The topic is in "marked for deletion" stage for more than 3
> > hours.
> > > > > > >>
> > > > > > >> $ bin/kafka-topics.sh --zookeeper
> > zookeeper1:2181/replication/kafka
> > > > > > --list
> > > > > > >> --topic GRIFFIN-TldAdFormat.csv-1422321736886
> > > > > > >> GRIFFIN-TldAdFormat.csv-1422321736886 - marked for deletion
> > > > > > >>
> > > > > > >> If this is a known issue, is there a workaround?
> > > > > > >>
> > > > > > >> Sumit
> > > > > > >
> > > > > >
> > > >
> > > >
> >
> >



  1   2   3   4   >