Re: Confluent Schema Registry Compatibility config

2021-12-16 Thread Mayuresh Gharat
Hi Folks,

I was reading docs on Confluent Schema Registry about Compatibility :
https://docs.confluent.io/platform/current/schema-registry/avro.html#compatibility-types

I was confused with "BACKWARDS" vs "BACKWARDS_TRANSITIVE".

If we have 3 schemas X, X-1, X-2 and configure a schema registry with
compatibility = "BACKWARDS". When we registered the X-1 schema it must have
been compared against the X-2 schema. When we register Xth schema it must
have been compared against X-1 schema. So by transitivity Xth Schema would
also be compatible with X-2.

So I am wondering what is the difference between "BACKWARDS" vs
"BACKWARDS_TRANSITIVE"? Any example would be really helpful.

--
-Regards,
Mayuresh R. Gharat
(862) 250-7125


-- 
-Regards,
Mayuresh R. Gharat
(862) 250-7125


Confluent Schema Registry Compatibility config

2021-12-16 Thread Mayuresh Gharat
Hi Folks,

I was reading docs on Confluent Schema Registry about Compatibility :
https://docs.confluent.io/platform/current/schema-registry/avro.html#compatibility-types

I was confused with "BACKWARDS" vs "BACKWARDS_TRANSITIVE".

If we have 3 schemas X, X-1, X-2 and configure a schema registry with
compatibility = "BACKWARDS". When we registered the X-1 schema it must have
been compared against the X-2 schema. When we register Xth schema it must
have been compared against X-1 schema. So by transitivity Xth Schema would
also be compatible with X-2.

So I am wondering what is the difference between "BACKWARDS" vs
"BACKWARDS_TRANSITIVE"? Any example would be really helpful.

-- 
-Regards,
Mayuresh R. Gharat
(862) 250-7125


Kafka-consumer-groups.sh says "no active members" but the CURRENT_OFFSET is moving

2021-08-17 Thread Mayuresh Gharat
Hi Folks,

We recently came across a weird scenario where we had a consumer group
consuming from multiple topics. When we ran the "Kafka-consumer-group"
command multiple times, we saw that the CURRENT-OFFSET is advancing;
however , we also saw a line printed:
*"Consumer group 'GROUP_ABC' has no active members."*

The consumer lag graph shows no data from this group.


Here is the output from the Kafka-consumer-groups.sh:

./bin/kafka-consumer-groups.sh --bootstrap-server  --group
GROUP_ABC --describe
*Consumer group 'GROUP_ABC' has no active members.*

GROUP TOPIC
PARTITION  CURRENT-OFFSET  LOG-END-OFFSET  LAG CONSUMER-ID
HOSTCLIENT-ID*GROUP_ABC*
TOPIC_A 0  2280611 3861697
1581086 -   -   -
*GROUP_ABC
*TOPIC_A 1  3845015 3845015
 0   -   -   -
*GROUP_ABC*  Topic_B 2
 530 530 0   -
  -   -*GROUP_ABC
  *TOPIC_A 2  2387180
   3736450 1349270 -   -   -
*GROUP_ABC*  TOPIC_B 0
 655 655 0   -
  -   -*GROUP_ABC*  TOPIC_B
 1  563 563 0
 -   -   -


Any idea why this might be happening?


Thanks and regards,

Mayuresh


Re: [ANNOUNCE] New committer: John Roesler

2019-11-12 Thread Mayuresh Gharat
Congratulations John!

Thanks,

Mayuresh

On Tue, Nov 12, 2019 at 4:54 PM Vahid Hashemian 
wrote:

> Congratulations John!
>
> --Vahid
>
> On Tue, Nov 12, 2019 at 4:38 PM Adam Bellemare 
> wrote:
>
> > Congratulations John, and thanks for all your help on KIP-213!
> >
> > > On Nov 12, 2019, at 6:24 PM, Bill Bejeck  wrote:
> > >
> > > Congratulations John!
> > >
> > > On Tue, Nov 12, 2019 at 6:20 PM Matthias J. Sax  >
> > > wrote:
> > >
> > >> Congrats John!
> > >>
> > >>
> > >>> On 11/12/19 2:52 PM, Boyang Chen wrote:
> > >>> Great work John! Well deserved
> > >>>
> > >>> On Tue, Nov 12, 2019 at 1:56 PM Guozhang Wang 
> > >> wrote:
> > >>>
> >  Hi Everyone,
> > 
> >  The PMC of Apache Kafka is pleased to announce a new Kafka
> committer,
> > >> John
> >  Roesler.
> > 
> >  John has been contributing to Apache Kafka since early 2018. His
> main
> >  contributions are primarily around Kafka Streams, but have also
> > included
> >  improving our test coverage beyond Streams as well. Besides his own
> > code
> >  contributions, John has also actively participated on community
> > >> discussions
> >  and reviews including several other contributors' big proposals like
> >  foreign-key join in Streams (KIP-213). He has also been writing,
> > >> presenting
> >  and evangelizing Apache Kafka in many venues.
> > 
> >  Congratulations, John! And look forward to more collaborations with
> > you
> > >> on
> >  Apache Kafka.
> > 
> > 
> >  Guozhang, on behalf of the Apache Kafka PMC
> > 
> > >>>
> > >>
> > >>
> >
>
>
> --
>
> Thanks!
> --Vahid
>


-- 
-Regards,
Mayuresh R. Gharat
(862) 250-7125


Re: Optimising Time For Leader Election

2018-12-10 Thread Mayuresh Gharat
Hi Mark,

Reducing the zookeeper session timeouts would enable the broker change
zookeeper listener on the controller to fire earlier than later. This will
enable the controller to detect that the broker is down earlier.

Increasing the network threads (processor threads) would not help here,
unless you are seeing that the brokers are not able to pickup new
connections fast enough.
Increasing the api handler threads (KafkaRequestHandler) threads might
help, if you are seeing that you request queue is filling up faster than
the requests that can be processed. There is a metric called
RequestHandlerAvgIdlePercent that you can use to tell if the Request
Handlers are always busy and cannot keep up.

Thanks,

Mayuresh

On Mon, Dec 10, 2018 at 7:38 AM Mark Anderson  wrote:

> Mayuresh,
>
> Thanks for the details. I'll need to do some more tests to get back with
> specific numbers re delay and check for timeouts.
>
> For now (pre KIP-291 being implemented), the only parameters that will tune
> leader election will be the zookeeper timeout and increasing the number of
> network threads (To try and work through the queued requests faster)?
>
> Thanks,
> Mark
>
> On Thu, 6 Dec 2018 at 23:43 Mayuresh Gharat 
> wrote:
>
> > Hi Mark,
> >
> > The leader election of a new topic partition happens once the controller
> > detects that the Leader has crashed.
> > This happens asynchronously via a zookeeper listener. Once a zookeeper
> > listener is fired, the corresponding object indicating the event happened
> > is put in to a controller queue.
> > The controller has a single thread that pulls data out of this queue and
> > handles each event one after another.
> > I can't remember of a config to tune this, on top of my head.
> > How much delay are you seeing in leadership change? Are there any
> > controller socket timeouts in the log?
> > Also might want to take a look at KIP-291 (KAFKA-4453), which is meant
> for
> > shortening this time period for handling controller events.
> >
> > Thanks,
> >
> > Mayuresh
> >
> > On Thu, Dec 6, 2018 at 9:50 AM Harper Henn 
> wrote:
> >
> > > Hi Mark,
> > >
> > > If a broker fails and you want to elect a new leader as quickly as
> > > possible, you could tweak zookeeper.session.timeout.ms in the kafka
> > broker
> > > configuration. According to the documentation: "If the consumer fails
> to
> > > heartbeat to ZooKeeper for this period of time it is considered dead
> and
> > a
> > > rebalance will occur."
> > >
> > > https://kafka.apache.org/0101/documentation.html
> > >
> > > I think making zookeeper.session.timeout.ms smaller will result in
> > faster
> > > detection of a dead node, but the downside is that a leader election
> > might
> > > get triggered by network blips or other cases where your broker is not
> > > actually dead.
> > >
> > > Harper
> > >
> > > On Thu, Dec 6, 2018 at 9:11 AM Mark Anderson 
> > > wrote:
> > >
> > > > Hi,
> > > >
> > > > I'm currently testing how Kafka reacts in cases of broker failure due
> > to
> > > > process failure or network timeout.
> > > >
> > > > I'd like to have the election of a new leader for a topic partition
> > > happen
> > > > as quickly as possible but it is unclear from the documentation or
> > broker
> > > > configuration what the key parameters are to tune to make this
> > possible.
> > > >
> > > > Does anyone have any pointers? Or are there any guides online?
> > > >
> > > > Thanks,
> > > > Mark
> > > >
> > >
> >
> >
> > --
> > -Regards,
> > Mayuresh R. Gharat
> > (862) 250-7125
> >
>


-- 
-Regards,
Mayuresh R. Gharat
(862) 250-7125


Re: Optimising Time For Leader Election

2018-12-06 Thread Mayuresh Gharat
Hi Mark,

The leader election of a new topic partition happens once the controller
detects that the Leader has crashed.
This happens asynchronously via a zookeeper listener. Once a zookeeper
listener is fired, the corresponding object indicating the event happened
is put in to a controller queue.
The controller has a single thread that pulls data out of this queue and
handles each event one after another.
I can't remember of a config to tune this, on top of my head.
How much delay are you seeing in leadership change? Are there any
controller socket timeouts in the log?
Also might want to take a look at KIP-291 (KAFKA-4453), which is meant for
shortening this time period for handling controller events.

Thanks,

Mayuresh

On Thu, Dec 6, 2018 at 9:50 AM Harper Henn  wrote:

> Hi Mark,
>
> If a broker fails and you want to elect a new leader as quickly as
> possible, you could tweak zookeeper.session.timeout.ms in the kafka broker
> configuration. According to the documentation: "If the consumer fails to
> heartbeat to ZooKeeper for this period of time it is considered dead and a
> rebalance will occur."
>
> https://kafka.apache.org/0101/documentation.html
>
> I think making zookeeper.session.timeout.ms smaller will result in faster
> detection of a dead node, but the downside is that a leader election might
> get triggered by network blips or other cases where your broker is not
> actually dead.
>
> Harper
>
> On Thu, Dec 6, 2018 at 9:11 AM Mark Anderson 
> wrote:
>
> > Hi,
> >
> > I'm currently testing how Kafka reacts in cases of broker failure due to
> > process failure or network timeout.
> >
> > I'd like to have the election of a new leader for a topic partition
> happen
> > as quickly as possible but it is unclear from the documentation or broker
> > configuration what the key parameters are to tune to make this possible.
> >
> > Does anyone have any pointers? Or are there any guides online?
> >
> > Thanks,
> > Mark
> >
>


-- 
-Regards,
Mayuresh R. Gharat
(862) 250-7125


Re: Kafka Producer Buffer and Broker Connection Failure

2018-11-23 Thread Mayuresh Gharat
Hi Mark,

The initial understanding is correct.
To understand the timeout scenarios, you might want to take a look at
KIP-91 :
https://cwiki.apache.org/confluence/display/KAFKA/KIP-91+Provide+Intuitive+User+Timeouts+in+The+Producer

If the producer sends a request to the old leader of a topic partition, it
will get a NotLeaderForPartitionException due to which it will update its
metadata and then if you have retries configured will try to send the data
to the new leader.

Thanks,

Mayuresh

On Fri, Nov 23, 2018 at 8:19 AM Mark Anderson  wrote:

> Hi,
>
> I'm currently testing Kafka Producers in cases of broker connection failure
> due to the broker process dieing or network connection timeout. I'd like to
> make sure that I understand how the Producer buffer functions in this case.
> Note that I have retries set to 0.
>
> From what I can see when send is called on the Producer the Node to which
> the record will be sent is the Node which is the leader for that topic
> partition at the time send was called.
>
> Therefore, if the leader for a topic partition changes while the record is
> still within the Producer buffer (waiting to be sent) the record would
> still be sent to the original leader from record creation time. And in this
> case the request.timeout.ms would apply to the send of that record
> failing.
>
> Only records created when send is called on the Producer after broker
> connection failure and re-fetch of the metadata will be sent to the new
> leader.
>
> Could you please confirm my understanding is correct?
>
> Thanks,
> Mark
>


-- 
-Regards,
Mayuresh R. Gharat
(862) 250-7125


Re: [ANNOUNCE] New Kafka PMC member: Dong Lin

2018-08-20 Thread Mayuresh Gharat
Congrats Dong !!!

Thanks,

Mayuresh

On Mon, Aug 20, 2018 at 1:36 PM Gwen Shapira  wrote:

> Congrats Dong Lin! Well deserved!
>
> On Mon, Aug 20, 2018, 3:55 AM Ismael Juma  wrote:
>
> > Hi everyone,
> >
> > Dong Lin became a committer in March 2018. Since then, he has remained
> > active in the community and contributed a number of patches, reviewed
> > several pull requests and participated in numerous KIP discussions. I am
> > happy to announce that Dong is now a member of the
> > Apache Kafka PMC.
> >
> > Congratulation Dong! Looking forward to your future contributions.
> >
> > Ismael, on behalf of the Apache Kafka PMC
> >
>


-- 
-Regards,
Mayuresh R. Gharat
(862) 250-7125


Re: more than 1 active controler

2017-04-21 Thread Mayuresh Gharat
Hi Wei,

Do you mean to say...you had 2 controllers in your kafka cluster for a few
days and everything was fine?
Did you investigate how the cluster got in to that state?

Thanks,

Mayuresh

On Fri, Apr 21, 2017 at 2:04 PM, Jeff Widman  wrote:

> Remove the /controller znode in zookeeper and it will force kafka to
> trigger a new controller re-election.
>
> On Fri, Apr 21, 2017 at 1:58 PM, wei  wrote:
>
> > We noticed we have more than 2 active controllers. How can we fix the
> > issue? it has been for a few days.
> >
> > Thanks,
> > Wei
> >
>



-- 
-Regards,
Mayuresh R. Gharat
(862) 250-7125


Re: Topic deletion

2017-04-07 Thread Mayuresh Gharat
Hi Adrian,

When you delete a topic, it is marked a under /admin/delete_topics path in
zookeeper. Once that is done, controller gets a notification that a topic
is marked for delete. The controller then notifies the replcias of the
topic-partitions to delete the data for the topic and the topic itself.
After KAFKA-1911, delete topic is made asynchronous and faster. The
replicas on receiving the notification from controller will rename the
topic-partition directory to something like topic-partition.delete and
return to controller saying delete topic is done. The controller when it
receives the ack from all the replicas that topic has been deleted, it will
delete the znode under /admin/delete_topics path. Your delete topic is
successful at this stage. The renamed directory gets removed asynchronously
on those replicas brokers at some later point in time.
Also remember, if you topic-partitions are been reassigned, delete topic
will not be triggered till the reassignment is finished.

We at Linkedin, have been suing Delete topic for a while now and it has
been working fine after KAFKA-1911.

Hope this helps.

Thanks,

Mayuresh

On Fri, Apr 7, 2017 at 2:15 AM, Adrian McCague 
wrote:

> Indeed, I wish it was explained somewhere what marked for deletion does,
> and what process handles it. I will have to go and investigate the source.
>
> I can confirm the zkCli did the trick, thanks for the hint!
>
> Adrian
>
> -Original Message-
> From: Akhilesh Pathodia [mailto:pathodia.akhil...@gmail.com]
> Sent: 07 April 2017 09:57
> To: users@kafka.apache.org
> Subject: Re: Topic deletion
>
> I am not sure but kafka delete command does not delete the topic actually,
> it only marks it for deletion. Probably it is fixed in later version of
> kafka.
>
> On Fri, Apr 7, 2017 at 2:14 PM, Adrian McCague 
> wrote:
>
> > Hi Akhilesh,
> >
> > Why would this approach need to be taken over the kafka-topics tool,
> > out of interest?
> >
> > Thanks
> > Adrian
> >
> > -Original Message-
> > From: Akhilesh Pathodia [mailto:pathodia.akhil...@gmail.com]
> > Sent: 07 April 2017 09:37
> > To: users@kafka.apache.org
> > Subject: Re: Topic deletion
> >
> > Hi Adrian,
> >
> > You will have to delete the broker directory from zookeeper. This can
> > be done  from zookeeper cli. Connect to zookeeper cli using below
> command:
> >
> > zookeeper-client -server 
> >
> > Then run below command :
> >
> > rmr /brokers/topics/
> >
> > Thanks,
> > AKhilesh
> >
> > On Thu, Apr 6, 2017 at 11:03 PM, Adrian McCague
> > 
> > wrote:
> >
> > > Hi all,
> > >
> > > I am trying to understand topic deletion in kafka, there appears to
> > > be very little documentation or answers on how this works. Typically
> > > they just say to turn on the feature on the broker (in my case it is).
> > >
> > > I executed:
> > > Kafka-topics.bat -delete -zookeeper keeperhere -topic mytopic
> > >
> > > Running this again yields:
> > > Topic mytopic is already marked for deletion.
> > >
> > > --describe yields:
> > > Topic: mytopic  PartitionCount:6ReplicationFactor:3
>  Configs:
> > > retention.ms=0
> > > Topic: mytopic  Partition: 0Leader: -1  Replicas:
> > > 1006,1001,1005Isr:
> > > Topic  mytopic  Partition: 1Leader: -1  Replicas:
> > > 1001,1005,1003Isr:
> > >Topic: mytopic  Partition: 2Leader: -1  Replicas:
> > > 1005,1003,1004Isr:
> > > Topic: mytopic  Partition: 3Leader: -1  Replicas:
> > > 1003,1004,1007Isr:
> > > Topic: mytopic  Partition: 4Leader: -1  Replicas:
> > > 1004,1007,1006Isr:
> > > Topic: mytopic  Partition: 5Leader: -1  Replicas:
> > > 1007,1006,1001Isr:
> > >
> > > You can see that the deletion mark has meant that the Leader is -1.
> > > Also I read somewhere that retention needs to be set to something
> > > low to trigger the deletion, hence the config of retention.ms=0
> > >
> > > Consumers (or streams in my case) no longer see the topic:
> > > org.apache.kafka.streams.errors.TopologyBuilderException: Invalid
> > > topology building: stream-thread [StreamThread-1] Topic not found:
> > > mytopic
> > >
> > > And I can't create a new topic in its place:
> > > [2017-04-06 18:26:00,702] ERROR org.apache.kafka.common.
> > errors.TopicExistsException:
> > > Topic 'mytopic' already exists. (kafka.admin.TopicCommand$)
> > >
> > > I am a little lost as to where to go next, could someone explain how
> > > topic deletion is actually applied when a topic is 'marked' for
> > > deletion as that may help trigger it.
> > >
> > > Thanks!
> > > Adrian
> > >
> > >
> >
>



-- 
-Regards,
Mayuresh R. Gharat
(862) 250-7125


Re: [ANNOUNCE] New committer: Grant Henke

2017-01-11 Thread Mayuresh Gharat
Congrats Grant !

Thanks,

Mayuresh

On Thu, Jan 12, 2017 at 3:47 AM, Kaufman Ng  wrote:

> Congrats Grant!
>
> On Wed, Jan 11, 2017 at 4:28 PM, Jay Kreps  wrote:
>
> > Congrats Grant!
> >
> > -Jay
> >
> > On Wed, Jan 11, 2017 at 11:51 AM, Gwen Shapira 
> wrote:
> >
> > > The PMC for Apache Kafka has invited Grant Henke to join as a
> > > committer and we are pleased to announce that he has accepted!
> > >
> > > Grant contributed 88 patches, 90 code reviews, countless great
> > > comments on discussions, a much-needed cleanup to our protocol and the
> > > on-going and critical work on the Admin protocol. Throughout this, he
> > > displayed great technical judgment, high-quality work and willingness
> > > to contribute where needed to make Apache Kafka awesome.
> > >
> > > Thank you for your contributions, Grant :)
> > >
> > > --
> > > Gwen Shapira
> > > Product Manager | Confluent
> > > 650.450.2760 | @gwenshap
> > > Follow us: Twitter | blog
> > >
> >
>
>
>
> --
> Kaufman Ng
> +1 646 961 8063
> Solutions Architect | Confluent | www.confluent.io
>



-- 
-Regards,
Mayuresh R. Gharat
(862) 250-7125


Re: Writing a customized principal builder for authorization

2016-11-30 Thread Mayuresh Gharat
"principal.builder.class" is the name of the property.

Thanks,

Mayuresh

On Wed, Nov 30, 2016 at 9:30 AM,  wrote:

> Hi Kriti,
>
> You will have to implement the Principal Builder interface and provide the
> full class path in broker config. I don't remember the exact config name
> right now, but you can search for some config by name
> "principalbuilder.class" in the broker configs.
>
> Once you do this, Kafka will automatically use your custom
> PrincipalBuilder class for generating the principal.
>
> The buildPrincipal() function in the PrincipalBuilder is where you will
> have to create the your custom Principal class object ( This custom
> principal class should implement Java principal interface) and this custom
> principal.getname() can return whatever name you want.
>
> Let me know if this helps.
>
> Thanks,
>
> Mayuresh
>
>
>
> Sent from my iPhone
>
> > On Nov 29, 2016, at 11:40 PM, Kiriti Sai 
> wrote:
> >
> > Hi,
> > Can anyone help me or point me to any resources that can be of help for
> > writing a customized principal builder to use in Authorization using
> ACLs?
> > I've enabled SSL authentication scheme for both clients and brokers but I
> > would like to change the principal name to just the original name and
> > Organizational unit instead of the complete defiant principal name for
> SSL.
> >
> > Thanka in advance for the help.
>



-- 
-Regards,
Mayuresh R. Gharat
(862) 250-7125


Re: Batch Expired

2016-08-29 Thread Mayuresh Gharat
Hi,

RequestTimeout is used for 2 cases :
1) Timing out the batches sitting in the accumulator.
2) Requests that are already sent over the wire and you have not yet heard
from the server.

In a case where there is a network partition, the client might not detect
it, till the actual TCP timeout that is I think around 30 minutes or more.
The requestTimeout setting kicks in before the TCP timeout and then the
request is re tried with fresh metadata or errored out depending upon the
retry setting.


Thanks,

Mayuresh



On Mon, Aug 29, 2016 at 9:32 AM, Ghosh, Achintya (Contractor) <
achintya_gh...@comcast.com> wrote:

> Hi Krishna,
> Thank you for your response.
>
> Connections already made but if we increase the request timeout 5 times
> let's say  request.timeout.ms= 5*6 , then the number of 'Batch
> Expired ' exception is less, so what is the recommended value for '
> request.timeout.ms '.
> If we increase more, is there any impact?
>
> Thanks
> Achintya
>
> -Original Message-
> From: R Krishna [mailto:krishna...@gmail.com]
> Sent: Friday, August 26, 2016 6:17 PM
> To: users@kafka.apache.org
> Cc: d...@kafka.apache.org
> Subject: Re: Batch Expired
>
> Are any requests at all making it? That is a pretty big timeout.
>
> However, I noticed if there is no connections made to broker, you can
> still get batch expiry.
>
>
> On Fri, Aug 26, 2016 at 6:32 AM, Ghosh, Achintya (Contractor) <
> achintya_gh...@comcast.com> wrote:
>
> > Hi there,
> >
> > What is the recommended Producer setting for Producer as I see a lot
> > of Batch Expired exception even though I put request.timeout=6.
> >
> > Producer settings:
> > acks=1
> > retries=3
> > batch.size=16384
> > linger.ms=5
> > buffer.memory=33554432
> > request.timeout.ms=6
> > timeout.ms=6
> >
> > Thanks
> > Achintya
> >
>
>
>
> --
> Radha Krishna, Proddaturi
> 253-234-5657
>



-- 
-Regards,
Mayuresh R. Gharat
(862) 250-7125


Re: Reporting security issues

2016-05-25 Thread Mayuresh Gharat
Excellent :)

Thanks,

Mayuresh

On Tue, May 24, 2016 at 2:55 AM, Ismael Juma  wrote:

> Hi all,
>
> Since Kafka implements a number of security features, we need a procedure
> for reporting potential security vulnerabilities privately (as per
> http://www.apache.org/security/). We have added a simple page to the
> website that describes the procedure (thanks Flavio):
>
> http://kafka.apache.org/project-security.html
>
> See https://issues.apache.org/jira/browse/KAFKA-3709 for more background.
>
> If you have suggestions on how the page could be improved, pull requests
> are welcome. :)
>
> Ismael
>



-- 
-Regards,
Mayuresh R. Gharat
(862) 250-7125


Re: [ANNOUCE] Apache Kafka 0.10.0.0 Released

2016-05-24 Thread Mayuresh Gharat
Great!!! cheers :)

Thanks,

Mayuresh

On Tue, May 24, 2016 at 10:07 AM, Ismael Juma  wrote:

> Awesome, thanks for running the release Gwen. :)
>
> Ismael
>
> On Tue, May 24, 2016 at 5:24 PM, Gwen Shapira  wrote:
>
> > The Apache Kafka community is pleased to announce the release for Apache
> > Kafka 0.10.0.0.
> > This is a major release with exciting new features, including first
> > release of KafkaStreams and many other improvements.
> >
> > All of the changes in this release can be found:
> >
> >
> https://www.apache.org/dyn/closer.cgi?path=/kafka/0.10.0.0/RELEASE_NOTES.html
> >
> > Apache Kafka is high-throughput, publish-subscribe messaging system
> > rethought of as a distributed commit log.
> >
> > ** Fast => A single Kafka broker can handle hundreds of megabytes of
> reads
> > and
> > writes per second from thousands of clients.
> >
> > ** Scalable => Kafka is designed to allow a single cluster to serve as
> the
> > central data backbone
> > for a large organization. It can be elastically and transparently
> expanded
> > without downtime.
> > Data streams are partitioned and spread over a cluster of machines to
> allow
> > data streams
> > larger than the capability of any single machine and to allow clusters of
> > co-ordinated consumers.
> >
> > ** Durable => Messages are persisted on disk and replicated within the
> > cluster to prevent
> > data loss. Each broker can handle terabytes of messages without
> performance
> > impact.
> >
> > ** Distributed by Design => Kafka has a modern cluster-centric design
> that
> > offers
> > strong durability and fault-tolerance guarantees.
> >
> > You can download the source release from
> >
> >
> https://www.apache.org/dyn/closer.cgi?path=/kafka/0.10.0.0/kafka-0.10.0.0-src.tgz
> >
> > and binary releases from
> >
> >
> https://www.apache.org/dyn/closer.cgi?path=/kafka/0.10.0.0/kafka_2.10-0.10.0.0.tgz
> >
> >
> https://www.apache.org/dyn/closer.cgi?path=/kafka/0.10.0.0/kafka_2.11-0.10.0.0.tgz
> >
> > A big thank you for the following people who have contributed to the
> > 0.10.0.0 release.
> >
> > Adam Kunicki, Aditya Auradkar, Alex Loddengaard, Alex Sherwin, Allen
> > Wang, Andrea Cosentino, Anna Povzner, Ashish Singh, Atul Soman, Ben
> > Stopford, Bill Bejeck, BINLEI XUE, Chen Shangan, Chen Zhu, Christian
> > Posta, Cory Kolbeck, Damian Guy, dan norwood, Dana Powers, David
> > Jacot, Denise Fernandez, Dionysis Grigoropoulos, Dmitry Stratiychuk,
> > Dong Lin, Dongjoon Hyun, Drausin Wulsin, Duncan Sands, Dustin Cote,
> > Eamon Zhang, edoardo, Edward Ribeiro, Eno Thereska, Ewen
> > Cheslack-Postava, Flavio Junqueira, Francois Visconte, Frank Scholten,
> > Gabriel Zhang, gaob13, Geoff Anderson, glikson, Grant Henke, Greg
> > Fodor, Guozhang Wang, Gwen Shapira, Igor Stepanov, Ishita Mandhan,
> > Ismael Juma, Jaikiran Pai, Jakub Nowak, James Cheng, Jason Gustafson,
> > Jay Kreps, Jeff Klukas, Jeremy Custenborder, Jesse Anderson, jholoman,
> > Jiangjie Qin, Jin Xing, jinxing, Jonathan Bond, Jun Rao, Ján Koščo,
> > Kaufman Ng, kenji yoshida, Kim Christensen, Kishore Senji, Konrad,
> > Liquan Pei, Luciano Afranllie, Magnus Edenhill, Maksim Logvinenko,
> > manasvigupta, Manikumar reddy O, Mark Grover, Matt Fluet, Matt
> > McClure, Matthias J. Sax, Mayuresh Gharat, Micah Zoltu, Michael Blume,
> > Michael G. Noll, Mickael Maison, Onur Karaman, ouyangliduo, Parth
> > Brahmbhatt, Paul Cavallaro, Pierre-Yves Ritschard, Piotr Szwed,
> > Praveen Devarao, Rafael Winterhalter, Rajini Sivaram, Randall Hauch,
> > Richard Whaling, Ryan P, Samuel Julius Hecht, Sasaki Toru, Som Sahu,
> > Sriharsha Chintalapani, Stig Rohde Døssing, Tao Xiao, Tom Crayford,
> > Tom Dearman, Tom Graves, Tom Lee, Tomasz Nurkiewicz, Vahid Hashemian,
> > William Thurston, Xin Wang, Yasuhiro Matsuda, Yifan Ying, Yuto
> > Kawamura, zhuchen1018
> >
> > We welcome your help and feedback. For more information on how to
> > report problems, and to get involved, visit the project website at
> > http://kafka.apache.org/
> >
> > Thanks,
> >
> > Gwen
> >
>



-- 
-Regards,
Mayuresh R. Gharat
(862) 250-7125


Peer not authenticated when trying to send message to kafka over SSL with Authentication

2016-05-12 Thread Mayuresh Gharat
HI I am trying to establish an SSL connection from kafkaProducer and send
certificate to the Kafka Broker.


I deploy my kafka broker locally running 2 ports :
*listeners = PLAINTEXT://:9092,SSL://:16637 *

*My KafkaBroker SSL configs look like this :*

ssl.protocol = TLS
ssl.trustmanager.algorithm = SunX509
ssl.keymanager.algorithm = SunX509
ssl.keystore.type = VALUE1
ssl.keystore.location = /a/b/c
ssl.keystore.password = xyz
ssl.key.password = xyz
ssl.truststore.type = JKS
ssl.truststore.location = /u/v/w
ssl.truststore.password = 123

I run my producer locally on the same linux box as my KafkaBroker.
My produce command looks like this :

*bin/kafka-producer-perf-test.sh  --num-records 10 --topic testToic_1
--record-size 10 --throughput 1 --producer-props *
bootstrap.servers = localhost://:16637
security.protocol = SSL
ssl.protocol = TLS
ssl.trustmanager.algorithm = SunX509
ssl.keymanager.algorithm = SunX509
ssl.keystore.type = VALUE1
ssl.keystore.location = /a/b/c
ssl.keystore.password = xyz
ssl.key.password = xyz
ssl.truststore.type = JKS
ssl.truststore.location = /u/v/w
ssl.truststore.password = 123


On kafka broker, when I do inside buildPrincipal() api of PricipalBuilder

SSLSession session = ((SslTransportLayer)transportLayer).sslSession();
session.getPeerCertificates()

I get:
*org.apache.kafka.common.KafkaException:
javax.net.ssl.SSLPeerUnverifiedException: peer not authenticated*


I ran this command as listed here
http://kafka.apache.org/documentation.html#security_ssl :

*openssl s_client -debug -connect localhost:16637 -tls1*

and was able to see the certificate.

I am not able to understand the peer not authenticated exception here.
Am I missing any SSL config on producer request?



-- 
-Regards,
Mayuresh R. Gharat
(862) 250-7125


Re: KafkaProducer block on send

2016-05-04 Thread Mayuresh Gharat
I am not sure why max.block.ms does not suffice here?
Also the waitOnMetadata will block only for the first time, later on it
will have the metadata. I am not abler to understand the motivation here.
Can you explain with an example?

Thanks,

Mayuresh

On Wed, May 4, 2016 at 9:55 AM, Dana Powers  wrote:

> I think changes of this sort (design changes as opposed to bugs) typically
> go through a KIP process before work is assigned. You might consider
> starting a KIP discussion and see if there is interest in pursuing your
> proposed changes.
>
> -Dana
> On May 4, 2016 7:58 AM, "Oleg Zhurakousky" 
> wrote:
>
> > Indeed it is.
> >
> > Oleg
> > > On May 4, 2016, at 10:54 AM, Paolo Patierno 
> wrote:
> > >
> > > It's sad that after almost one month it's still "unassigned" :-(
> > >
> > > Paolo PatiernoSenior Software Engineer (IoT) @ Red Hat
> > > Microsoft MVP on Windows Embedded & IoTMicrosoft Azure Advisor
> > > Twitter : @ppatierno
> > > Linkedin : paolopatierno
> > > Blog : DevExperience
> > >
> > >> Subject: Re: KafkaProducer block on send
> > >> From: ozhurakou...@hortonworks.com
> > >> To: users@kafka.apache.org
> > >> Date: Wed, 4 May 2016 14:47:25 +
> > >>
> > >> Sure
> > >>
> > >> Here are both:
> > >> https://issues.apache.org/jira/browse/KAFKA-3539
> > >> https://issues.apache.org/jira/browse/KAFKA-3540
> > >>
> > >> On May 4, 2016, at 3:24 AM, Paolo Patierno   > ppatie...@live.com>> wrote:
> > >>
> > >> Hi Oleg,
> > >>
> > >> can you share the JIRA link here because I totally agree with you.
> > >> For me the send() should be totally asynchronous and not blocking for
> > the max.block.ms timeout.
> > >>
> > >> Currently I'm using the overload with callback that, of course, isn't
> > called if the send() fails due to timeout.
> > >> In order to catch this scenario I need to do the following :
> > >>
> > >> Future future = this.producer.send();
> > >>
> > >> if (future.isDone()) {
> > >>   try {
> > >>   future.get();
> > >>   } catch (InterruptedException e) {
> > >>   // TODO Auto-generated catch block
> > >>   e.printStackTrace();
> > >>   } catch (ExecutionException e) {
> > >>   // TODO Auto-generated catch block
> > >>   e.printStackTrace();
> > >>   }
> > >>   }
> > >>
> > >> I don't like it so much ...
> > >>
> > >> Thanks,
> > >> Paolo.
> > >>
> > >> Paolo PatiernoSenior Software Engineer (IoT) @ Red Hat
> > >> Microsoft MVP on Windows Embedded & IoTMicrosoft Azure Advisor
> > >> Twitter : @ppatierno
> > >> Linkedin : paolopatierno
> > >> Blog : DevExperience
> > >>
> > >> Subject: Re: KafkaProducer block on send
> > >> From: ozhurakou...@hortonworks.com ozhurakou...@hortonworks.com>
> > >> To: users@kafka.apache.org
> > >> Date: Mon, 11 Apr 2016 19:42:17 +
> > >>
> > >> Dana
> > >>
> > >> Thanks for the explanation, but it sounds more like a workaround since
> > everything you describe could be encapsulated within the Future itself.
> > After all it "represents the result of an asynchronous computation"
> > >>
> > >> executor.submit(new Callable() {
> > >>@Override
> > >>public RecordMetadata call() throws Exception {
> > >>// first make sure the metadata for the topic is available
> > >>long waitedOnMetadataMs = waitOnMetadata(record.topic(),
> > this.maxBlockTimeMs);
> > >>. . .
> > >>  }
> > >> });
> > >>
> > >>
> > >> The above would eliminate the confusion and keep user in control where
> > even a legitimate blockage could be interrupted/canceled etc., based on
> > various business/infrastructure requirements.
> > >> Anyway, I’ll raise the issue in JIRA and reference this thread
> > >>
> > >> Cheers
> > >> Oleg
> > >>
> > >> On Apr 8, 2016, at 10:31 AM, Dana Powers   > dana.pow...@gmail.com>> wrote:
> > >>
> > >> The prior discussion explained:
> > >>
> > >> (1) The code you point to blocks for a maximum of max.block.ms, which
> > is
> > >> user configurable. It does not block indefinitely with no user control
> > as
> > >> you suggest. You are free to configure this to 0 if you like at it
> will
> > not
> > >> block at all. Have you tried this like I suggested before?
> > >>
> > >> (2) Even if you convinced people to remove waitOnMetadata, the send
> > method
> > >> *still* blocks on memory back pressure (also configured by
> max.block.ms
> > ).
> > >> This is for good reason:
> > >>
> > >> while True:
> > >> producer.send(msg)
> > >>
> > >> Can quickly devour all of you local memory and crash your process if
> the
> > >> outflow rate decreases, say if brokers go down or network partition
> > occurs.
> > >>
> > >> -Dana
> > >> I totally agree with Oleg.
> > >>
> > >> As documentation says the producers send data in an asynchronous way
> > and it
> > >> is enforced by the send method signature with a Future returned.
> > >> It can't block indefinitely without

Re: batching related issue in 0.9.0 producer

2016-05-03 Thread Mayuresh Gharat
created a jira : https://issues.apache.org/jira/browse/KAFKA-3651

Thanks,

Mayuresh

On Tue, May 3, 2016 at 2:03 PM, Mayuresh Gharat 
wrote:

> Nice catch. Do you have a jira for this?
> I can submit a patch right away. This should be a small patch.
>
> Thanks,
>
> Mayuresh
>
> On Tue, May 3, 2016 at 1:56 PM, Prabhu V  wrote:
>
>> Whenever the BufferPool throws a "Failed to allocate memory within the
>> configured max blocking time" excepion, it should also remove the
>> condition
>> object from the waiters deque. Otherwise the condition object is stays
>> forever in the deque.
>>
>> (i.e) "this.waiters.remove(moreMemory);" should happen before the
>> exception
>> is thrown.
>>
>> .Otherwise the waiting thread count will never get to 0 after the
>> exception
>> and batching will not occur. This is because in the
>> RecordAccumulator.ready
>> method the exhausted flat is set as
>>
>> boolean exhausted = this.free.queued() > 0 where free.queued() returns the
>> waiters.size().
>>
>> I reported a issue with the producer on this thread
>> http://mail-archives.apache.org/mod_mbox/kafka-users/201605.mbox/browser
>>
>> and this was because of above issue.
>>
>>
>> Thanks
>>
>
>
>
> --
> -Regards,
> Mayuresh R. Gharat
> (862) 250-7125
>



-- 
-Regards,
Mayuresh R. Gharat
(862) 250-7125


Re: batching related issue in 0.9.0 producer

2016-05-03 Thread Mayuresh Gharat
Nice catch. Do you have a jira for this?
I can submit a patch right away. This should be a small patch.

Thanks,

Mayuresh

On Tue, May 3, 2016 at 1:56 PM, Prabhu V  wrote:

> Whenever the BufferPool throws a "Failed to allocate memory within the
> configured max blocking time" excepion, it should also remove the condition
> object from the waiters deque. Otherwise the condition object is stays
> forever in the deque.
>
> (i.e) "this.waiters.remove(moreMemory);" should happen before the exception
> is thrown.
>
> .Otherwise the waiting thread count will never get to 0 after the exception
> and batching will not occur. This is because in the RecordAccumulator.ready
> method the exhausted flat is set as
>
> boolean exhausted = this.free.queued() > 0 where free.queued() returns the
> waiters.size().
>
> I reported a issue with the producer on this thread
> http://mail-archives.apache.org/mod_mbox/kafka-users/201605.mbox/browser
>
> and this was because of above issue.
>
>
> Thanks
>



-- 
-Regards,
Mayuresh R. Gharat
(862) 250-7125


Re: Kafka topic deletion still not working well

2016-03-11 Thread Mayuresh Gharat
Hi Stevo,

Let me know if we want to open Kafka-2937 again. I can include the above
finding in to the patch or you want to create a separate JIra for this.

Thanks,

Mayuresh

On Fri, Mar 11, 2016 at 7:53 AM, Mayuresh Gharat  wrote:

> kafka-2937 is different from this I think. Kafka-2937 deals with the
> delete topic getting stuck because the LeaderAndISR in ZK was updated by a
> controller and then the controller dies and the new controller gets in to
> the exception and never completes deleting the topic. The topic existed in
> the cluster and was also marked for delete.
> The case reported here is that the topic does not exist in cluster but is
> marked for delete.
> Am I right in understanding?
>
> Thanks,
>
> Mayuresh
>
> On Fri, Mar 11, 2016 at 5:30 AM, Stevo Slavić  wrote:
>
>> Topic it seems would get deleted but request in ZK to delete topic would
>> not get cleared even after restarting Kafka cluster.
>>
>> I'm still investigating why deletion did not complete in the first place
>> without restarting any nodes. It seems something smelly happens when there
>> is request to delete more than one topic.
>>
>> Anyway, I think I found one potential bug in
>> ReplicaStateMachine.areAllReplicasForTopicDeleted check which could be
>> cause for not clearing deletion request from ZK even after restart of
>> whole
>> cluster. Line ReplicaStateMachine.scala#L285
>> <
>> https://github.com/sslavic/kafka/blob/trunk/core/src/main/scala/kafka/controller/ReplicaStateMachine.scala#L285
>> >
>>
>> replicaStatesForTopic.forall(_._2 == ReplicaDeletionSuccessful)
>>
>> which is return value of that function/check, probably should better be
>> checking for
>>
>> replicaStatesForTopic.isEmpty || replicaStatesForTopic.forall(_._2 ==
>> ReplicaDeletionSuccessful)
>>
>> I noticed it because in controller logs I found entries like:
>>
>> [2016-03-04 13:27:29,115] DEBUG [Replica state machine on controller 1]:
>> Are all replicas for topic foo deleted Map()
>> (kafka.controller.ReplicaStateMachine)
>>
>> even though normally they look like:
>>
>> [2016-03-04 09:33:41,036] DEBUG [Replica state machine on controller 1]:
>> Are all replicas for topic foo deleted
>> Map([Topic=foo,Partition=0,Replica=0] -> ReplicaDeletionStarted,
>> [Topic=foo,Partition=0,Replica=3] -> ReplicaDeletionStarted,
>> [Topic=foo,Partition=0,Replica=1] -> ReplicaDeletionSuccessful)
>> (kafka.controller.ReplicaStateMachine)
>>
>> Kind regards,
>> Stevo Slavic.
>>
>> On Sun, Mar 6, 2016 at 12:31 AM, Guozhang Wang 
>> wrote:
>>
>> > Thanks Stevo,
>> >
>> > Feel free to paste your findings in KAFKA-2937, we can re-open that
>> ticket
>> > if necessary.
>> >
>> > Guozhang
>> >
>> > On Fri, Mar 4, 2016 at 4:38 AM, Stevo Slavić  wrote:
>> >
>> > > Hell Apache Kafka community,
>> > >
>> > > I'm still investigating an incident; from initial findings topic
>> deletion
>> > > doesn't seem to work well still with Kafka 0.9.0.1, likely some edge
>> case
>> > > not covered.
>> > >
>> > > Before with 0.8.2.x it used to happen that non-lead replica would be
>> > stuck
>> > > in topic deletion process, and workaround was just to restart that
>> node.
>> > >
>> > > If I'm not mistaken, that edge case got (or at least is expected to
>> be)
>> > > fixed in 0.9.0.1 via KAFKA-2937
>> > > <https://issues.apache.org/jira/browse/KAFKA-2937>
>> > >
>> > > Request to delete topic continued to be there in ZK even after whole
>> > > cluster restart - topic seemed not to exist, seemed to actually be
>> > deleted,
>> > > but request to delete topic would remain. Had to manually delete
>> request
>> > > node in ZK.
>> > >
>> > > When I have more details, and reproducible use case, will report back.
>> > >
>> > > Kind regards,
>> > > Stevo Slavic.
>> > >
>> >
>> >
>> >
>> > --
>> > -- Guozhang
>> >
>>
>
>
>
> --
> -Regards,
> Mayuresh R. Gharat
> (862) 250-7125
>



-- 
-Regards,
Mayuresh R. Gharat
(862) 250-7125


Re: Kafka topic deletion still not working well

2016-03-11 Thread Mayuresh Gharat
kafka-2937 is different from this I think. Kafka-2937 deals with the delete
topic getting stuck because the LeaderAndISR in ZK was updated by a
controller and then the controller dies and the new controller gets in to
the exception and never completes deleting the topic. The topic existed in
the cluster and was also marked for delete.
The case reported here is that the topic does not exist in cluster but is
marked for delete.
Am I right in understanding?

Thanks,

Mayuresh

On Fri, Mar 11, 2016 at 5:30 AM, Stevo Slavić  wrote:

> Topic it seems would get deleted but request in ZK to delete topic would
> not get cleared even after restarting Kafka cluster.
>
> I'm still investigating why deletion did not complete in the first place
> without restarting any nodes. It seems something smelly happens when there
> is request to delete more than one topic.
>
> Anyway, I think I found one potential bug in
> ReplicaStateMachine.areAllReplicasForTopicDeleted check which could be
> cause for not clearing deletion request from ZK even after restart of whole
> cluster. Line ReplicaStateMachine.scala#L285
> <
> https://github.com/sslavic/kafka/blob/trunk/core/src/main/scala/kafka/controller/ReplicaStateMachine.scala#L285
> >
>
> replicaStatesForTopic.forall(_._2 == ReplicaDeletionSuccessful)
>
> which is return value of that function/check, probably should better be
> checking for
>
> replicaStatesForTopic.isEmpty || replicaStatesForTopic.forall(_._2 ==
> ReplicaDeletionSuccessful)
>
> I noticed it because in controller logs I found entries like:
>
> [2016-03-04 13:27:29,115] DEBUG [Replica state machine on controller 1]:
> Are all replicas for topic foo deleted Map()
> (kafka.controller.ReplicaStateMachine)
>
> even though normally they look like:
>
> [2016-03-04 09:33:41,036] DEBUG [Replica state machine on controller 1]:
> Are all replicas for topic foo deleted
> Map([Topic=foo,Partition=0,Replica=0] -> ReplicaDeletionStarted,
> [Topic=foo,Partition=0,Replica=3] -> ReplicaDeletionStarted,
> [Topic=foo,Partition=0,Replica=1] -> ReplicaDeletionSuccessful)
> (kafka.controller.ReplicaStateMachine)
>
> Kind regards,
> Stevo Slavic.
>
> On Sun, Mar 6, 2016 at 12:31 AM, Guozhang Wang  wrote:
>
> > Thanks Stevo,
> >
> > Feel free to paste your findings in KAFKA-2937, we can re-open that
> ticket
> > if necessary.
> >
> > Guozhang
> >
> > On Fri, Mar 4, 2016 at 4:38 AM, Stevo Slavić  wrote:
> >
> > > Hell Apache Kafka community,
> > >
> > > I'm still investigating an incident; from initial findings topic
> deletion
> > > doesn't seem to work well still with Kafka 0.9.0.1, likely some edge
> case
> > > not covered.
> > >
> > > Before with 0.8.2.x it used to happen that non-lead replica would be
> > stuck
> > > in topic deletion process, and workaround was just to restart that
> node.
> > >
> > > If I'm not mistaken, that edge case got (or at least is expected to be)
> > > fixed in 0.9.0.1 via KAFKA-2937
> > > 
> > >
> > > Request to delete topic continued to be there in ZK even after whole
> > > cluster restart - topic seemed not to exist, seemed to actually be
> > deleted,
> > > but request to delete topic would remain. Had to manually delete
> request
> > > node in ZK.
> > >
> > > When I have more details, and reproducible use case, will report back.
> > >
> > > Kind regards,
> > > Stevo Slavic.
> > >
> >
> >
> >
> > --
> > -- Guozhang
> >
>



-- 
-Regards,
Mayuresh R. Gharat
(862) 250-7125


Re: TimeoutException: failed to update metadata after

2016-02-02 Thread Mayuresh Gharat
This timeout is  used while fetching metadata and for blocking when there
is not enough space in the producers memory to store the batches that are
waiting to be sent to kafka brokers.

If you increase your producers memory, reduce your linger time (and also
batch size if required) you will have enough memory and also the batches
will get drained faster.

In this case you will only block MAX 300 ms for the first metadata request.
Subsequent requests should be fast.

Thanks,

Mayuresh



On Tue, Feb 2, 2016 at 5:01 AM, vahid karami  wrote:

> Hi
> I'm using kafka 0.9 in server and clients.
>
> I want to KafkaProducer send data fast and does not wait long, so I reduced
> max.block.ms to 100 ms. 100 ms is my ideal. And producer send data in less
> than 100 ms.
> But there is problem: document says first time that produce send data it
> fetch topic's metadata and in a busy system it makes I got following error.
> And producer even does not retry sending and nothing send to server.
> TimeoutException failed to update metadata after 100 ms.
>
> So to fix this problem I finally increased the value of to 300 ms, but it
> causes slowing down my system.
>
> My question is how to change configs to fix this problem properly???
>
> thanks in advance
>



-- 
-Regards,
Mayuresh R. Gharat
(862) 250-7125


Re: Frequent ZK session timeouts

2016-01-12 Thread Mayuresh Gharat
Can you paste the logs?

Thanks,

Mayuresh

On Tue, Jan 12, 2016 at 4:58 PM, Dillian Murphey 
wrote:

> Possibly running more stable with 1.7 JVM.
>
> Can someone explain the Zookeeper session?  SHould it never expire, unless
> the broker becomes unresponsive?  I set a massive timeout value in the
> broker config far beyond the amount of time I see the zk expiration. Is
> this entirely on the kafka side, or could zookeeper be doing something?
> From my zk logs I didn't see anything unusual, just exceptions as a result
> of the zk session expiring (my guess).
>
> tnx
>
> On Tue, Jan 12, 2016 at 3:05 PM, Dillian Murphey 
> wrote:
>
> > Our 2 node kafka cluster has become unhealthy.  We're running zookeeper
> as
> > a 3 node system, which very light load.
> >
> > What seems to be happening is in the controller log we get a ZK session
> > expire message, and in the process of re-assigning the leader for the
> > partitions (if I'm understanding this right, please correct me), the
> broker
> > goes offline and it interrupts our applications that are publishing
> > messages.
> >
> > We don't see this in production, and kafka has been stable for months,
> > since september.
> >
> > I've searched a lot and found some similiar complaints but no real
> > solutions.
> >
> > I'm running 0.8.2 and JVM 1.6.X on ubuntu.
> >
> > Thanks for any ideas at all.
> >
> >
>



-- 
-Regards,
Mayuresh R. Gharat
(862) 250-7125


Re: TMR should nopt create topic if not existing.

2016-01-07 Thread Mayuresh Gharat
+ dev

Hi

There has been discussion on the ticket :
https://issues.apache.org/jira/browse/KAFKA-2887, that we are going to
deprecate auto-creation of topic or at least turn it off by default once we
have the CreateTopics API. It also says the patch is available for this.

The only other ticket that I came across on the ticket is
https://issues.apache.org/jira/browse/KAFKA-1507.

I wanted to confirm that https://issues.apache.org/jira/browse/KAFKA-1507
is the ticket that has the CreateTopics API patch.
<%28862%29%20250-7125>


-- 
-Regards,
Mayuresh R. Gharat
(862) 250-7125


TMR should nopt create topic if not existing.

2016-01-07 Thread Mayuresh Gharat
Hi,

There has been discussion on the ticket :
https://issues.apache.org/jira/browse/KAFKA-2887, that we are going to
deprecate auto-creation of topic or at least turn it off by default once we
have the CreateTopics API. It also says the patch is available for this.

The only other ticket that I came across on the ticket is
https://issues.apache.org/jira/browse/KAFKA-1507.

I wanted to confirm that https://issues.apache.org/jira/browse/KAFKA-1507
is the ticket that has the CreateTopics API patch.

-- 
-Regards,
Mayuresh R. Gharat
(862) 250-7125


Re: Does MirrorMaker delete topics?

2016-01-05 Thread Mayuresh Gharat
Not sure, if this should be supported.

For example. if we have 2 kafka clusters C1 and C2 mirroring data to
C3-Aggregate using mirror-maker M1 and M2 respectively and if the topic
exist in both C1 and C2 and if we delete topic from C1, M1 should not
delete the topic from C3-Aggregate.

Thanks,

Mayuresh

On Tue, Jan 5, 2016 at 6:28 AM, Stevo Slavić  wrote:

> Yes, there are quite a few related tickets in JIRA (see here
> <
> https://issues.apache.org/jira/browse/KAFKA-658?jql=project%20%3D%20KAFKA%20AND%20status%20in%20%28Open%2C%20%22In%20Progress%22%2C%20Reopened%2C%20%22Patch%20Available%22%29%20AND%20text%20~%20%22mirror%22
> >
> )
>
> On Tue, Jan 5, 2016 at 3:22 PM, Simon Cooper <
> simon.coo...@featurespace.co.uk> wrote:
>
> > Hmm, could this feature be added in the future?
> >
> > Although a better solution might be to turn mirrormaker into something
> > like 'hidden replicas', where a certain set of brokers in a cross-site
> > super-cluster replicate topics onto the secondary site but don't take
> part
> > in topic leadership - is this on the cards at all?
> >
> > Thanks,
> > SimonC
> >
> > -Original Message-
> > From: Stevo Slavić [mailto:ssla...@gmail.com]
> > Sent: 05 January 2016 14:04
> > To: users@kafka.apache.org
> > Subject: Re: Does MirrorMaker delete topics?
> >
> > AFAIK "if the right options are set" actually means "if
> > auto.create.topics.enable is left to default, set to true". As on any
> Kafka
> > cluster with this configuration option, this will allow implicit topic
> > creation, e.g. on first message being published to the topic, if topic
> does
> > not exist it will get created. There are no other guarantees when it
> comes
> > to syncing topic metadata - topic metadata can be completely different
> e.g.
> > different number of partitions, different replica assignment including
> > preferred lead, and also topic can get deleted on source and will not be
> > deleted on mirror cluster. MirrorMaker only syncs data, not metadata.
> >
> > Kind regards,
> > Stevo Slavic.
> >
> > On Tue, Jan 5, 2016 at 2:53 PM, Simon Cooper <
> > simon.coo...@featurespace.co.uk> wrote:
> >
> > > Hi all,
> > >
> > > In the kafka docs, it mentions that MirrorMaker will automatically
> > > create topics on the mirror cluster if the right options are set. Does
> > > it automatically delete topics on the mirror that are deleted on the
> > > main cluster as well?
> > >
> > > Thanks,
> > > SimonC
> > >
> >
>



-- 
-Regards,
Mayuresh R. Gharat
(862) 250-7125


Re: Failed attempt to delete topic

2015-12-03 Thread Mayuresh Gharat
you can use the zookeeper shell inside the bin directory for that.

Thanks,

Mayuresh

On Thu, Dec 3, 2015 at 4:04 PM, Rakesh Vidyadharan <
rvidyadha...@gracenote.com> wrote:

> Thanks Stevo.  I did see some messages related to /admin/delete_topics.
> Will do some research on how I can clean up zookeeper.
>
> Thanks
> Rakesh
>
>
>
>
> On 03/12/2015 17:55, "Stevo Slavić"  wrote:
>
> >Delete was actually considered to be working since Kafka 0.8.2 (although
> >there are still not easily reproducible edge cases when it doesn't work
> >well even in in 0.8.2 or newer).
> >In 0.8.1 one could request topic to be deleted (request gets stored as
> >entry in ZooKeeper), because of presence of the request for topic to be
> >deleted topic would become unusable (cannot publish or read), but broker
> >would actually never (work on the request to) delete topic.
> >
> >Maybe it will be enough to delete from ZooKeeper entry for the topic
> >deletion request under /admin/delete_topics to have topic usable again.
> >
> >Otherwise, just upgrade broker side to 0.8.2.x or latest 0.9.0.0 - new
> >broker should work with old clients so maybe you don't have to upgrade
> >client side immediately.
> >
> >Kind regards,
> >Stevo Slavic.
> >
> >
> >On Fri, Dec 4, 2015 at 12:33 AM, Mayuresh Gharat <
> gharatmayures...@gmail.com
> >> wrote:
> >
> >> Can you paste some logs from the controller, when you deleted the topic?
> >>
> >> Thanks,
> >>
> >> Mayuresh
> >>
> >> On Thu, Dec 3, 2015 at 2:30 PM, Rakesh Vidyadharan <
> >> rvidyadha...@gracenote.com> wrote:
> >>
> >> > Hello,
> >> >
> >> > We are on an older kafka (0.8.1) version.  While a number of consumers
> >> > were running, we attempted to delete a few topics using the
> >> kafka-topics.sh
> >> > file (basically want to remove all messages in that topic and restart,
> >> > since our entities went through some incompatible changes).  We
> noticed
> >> > logs saying the topic has been queued for deletion.  After stopping
> all
> >> > processes accessing kafka, we restarted kafka and then our processes.
> >> The
> >> > old topics do not seem to have been deleted (I can still see the log
> >> > directories corresponding to the topics), and none of the clients are
> >> able
> >> > to either publish or read to the topics that we attempted to delete.
> >> > Attempting to read gives us the following type of error:
> >> >
> >> > Attempting to access an invalid KafkaTopic ( are you operating on a
> >> closed
> >> > KafkaTopic ?)
> >> >
> >> > Attempting to publish gives us a more general type of error:
> >> >
> >> > kafka.common.FailedToSendMessageException: Failed to send messages
> after
> >> 3
> >> > tries.
> >> > at
> >> >
> >>
> kafka.producer.async.DefaultEventHandler.handle(DefaultEventHandler.scala:90)
> >> > at kafka.producer.Producer.send(Producer.scala:76)
> >> >
> >> > How can be get around this issue and start using the topics that we
> tried
> >> > to clean up?  There may have been better ways to achieve what we
> wanted,
> >> if
> >> > so please suggest recommendations as well.
> >> >
> >> > Thanks
> >> > Rakesh
> >> >
> >> >
> >>
> >>
> >> --
> >> -Regards,
> >> Mayuresh R. Gharat
> >> (862) 250-7125
> >>
>



-- 
-Regards,
Mayuresh R. Gharat
(862) 250-7125


Re: Failed attempt to delete topic

2015-12-03 Thread Mayuresh Gharat
Can you paste some logs from the controller, when you deleted the topic?

Thanks,

Mayuresh

On Thu, Dec 3, 2015 at 2:30 PM, Rakesh Vidyadharan <
rvidyadha...@gracenote.com> wrote:

> Hello,
>
> We are on an older kafka (0.8.1) version.  While a number of consumers
> were running, we attempted to delete a few topics using the kafka-topics.sh
> file (basically want to remove all messages in that topic and restart,
> since our entities went through some incompatible changes).  We noticed
> logs saying the topic has been queued for deletion.  After stopping all
> processes accessing kafka, we restarted kafka and then our processes.  The
> old topics do not seem to have been deleted (I can still see the log
> directories corresponding to the topics), and none of the clients are able
> to either publish or read to the topics that we attempted to delete.
> Attempting to read gives us the following type of error:
>
> Attempting to access an invalid KafkaTopic ( are you operating on a closed
> KafkaTopic ?)
>
> Attempting to publish gives us a more general type of error:
>
> kafka.common.FailedToSendMessageException: Failed to send messages after 3
> tries.
> at
> kafka.producer.async.DefaultEventHandler.handle(DefaultEventHandler.scala:90)
> at kafka.producer.Producer.send(Producer.scala:76)
>
> How can be get around this issue and start using the topics that we tried
> to clean up?  There may have been better ways to achieve what we wanted, if
> so please suggest recommendations as well.
>
> Thanks
> Rakesh
>
>


-- 
-Regards,
Mayuresh R. Gharat
(862) 250-7125


Re: Log Compaction v. Retention

2015-11-13 Thread Mayuresh Gharat
I think you can mark a tombstone (null value) for the keys explicitly, that
will eventually delete those messages from the log.

Thanks,

Mayuresh

On Wed, Nov 11, 2015 at 6:18 AM, Morgan Kenyon  wrote:

> I came across an interesting question on StackOverflow dealing with the
> difference between compaction and retention. To sum up the question and my
> incorrect answer, I was unaware that retention and compaction were mutually
> exclusive (based on grokbase link below)
>
> Is it true that when setting a log to be compacted there is no way to
> delete old messages if they're not deleted through compaction? I imagine a
> use case where the user has a limited hard drive capacity and would like to
> compact a log up till a certain size, then delete old messages. While using
> compaction are unique kryed messages guaranteed to remain in log
> indefinitely? Or is there any other way to delete them?
>
>
> http://stackoverflow.com/questions/33632362/how-clean-old-segments-from-compacted-log-in-kafka-0-8-2/
> http://grokbase.com/t/kafka/users/14bv6gaz0t/kafka-0-8-2-log-cleaner
>
> --
>
> *Morgan Kenyon*
> Software Engineer
> Lymba Corporation
> Phone: 972-680-0800
> Email: mor...@lymba.com
>
> [image: Logo]
>



-- 
-Regards,
Mayuresh R. Gharat
(862) 250-7125


Re: request.timeout.ms not working as expected

2015-11-10 Thread Mayuresh Gharat
Are you seeing errors for  metadata update?

If yes, I think I know why this might be happening.

Thanks,

Mayuresh

On Tue, Nov 10, 2015 at 6:35 PM, Mayuresh Gharat  wrote:

> How many brokers are there in your test cluster?
>
>
> Thanks,
>
> Mayuresh
>
> On Tue, Nov 10, 2015 at 5:53 PM, Jason Gustafson 
> wrote:
>
>> Hey Luke,
>>
>> I agree the null check seems questionable. I went ahead and created
>> https://issues.apache.org/jira/browse/KAFKA-2805. At least we should
>> have a
>> comment clarifying why the check is correct.
>>
>> -Jason
>>
>> On Tue, Nov 10, 2015 at 2:15 PM, Luke Steensen <
>> luke.steen...@braintreepayments.com> wrote:
>>
>> > After some more investigation, I've been able to get the expected
>> behavior
>> > by removing the null check here:
>> >
>> >
>> https://github.com/apache/kafka/blob/ae5a5d7c08bb634576a414f6f2864c5b8a7e58a3/clients/src/main/java/org/apache/kafka/clients/producer/internals/RecordAccumulator.java#L220
>> >
>> > Hopefully someone more familiar with the code can comment, but that
>> > statement does appear to be preventing the correct behavior.
>> >
>> > Thanks,
>> > Luke
>> >
>> >
>> > On Tue, Nov 10, 2015 at 2:15 PM, Luke Steensen <
>> > luke.steen...@braintreepayments.com> wrote:
>> >
>> > > Hello,
>> > >
>> > > We've been testing recent versions of trunk and are seeing surprising
>> > > behavior when trying to use the new request timeout functionality. For
>> > > example, at revision ae5a5d7:
>> > >
>> > > # in separate terminals
>> > > $ ./bin/zookeeper-server-start.sh config/zookeeper.properties
>> > > $ ./bin/kafka-server-start.sh config/server.properties
>> > >
>> > > # set request timeout
>> > > $ cat producer.properties
>> > > request.timeout.ms=1000
>> > >
>> > > # run the verifiable producer, for example
>> > > $ ./bin/kafka-verifiable-producer.sh --broker-list localhost:9092
>> --topic
>> > > testing --throughput 5 --producer.config producer.properties
>> > >
>> > > If you then kill the kafka server process, you will see the producer
>> hang
>> > > indefinitely. This is a very simple case, but the behavior is
>> surprising.
>> > > We have also found it easy to reproduce this behavior in more
>> realistic
>> > > environments with multiple brokers, custom producers, etc. The end
>> result
>> > > is that we're not sure how to safely decommission a broker without
>> > > potentially leaving a producer with a permanently stuck request.
>> > >
>> > > Thanks,
>> > > Luke Steensen
>> > >
>> > >
>> >
>>
>
>
>
> --
> -Regards,
> Mayuresh R. Gharat
> (862) 250-7125
>



-- 
-Regards,
Mayuresh R. Gharat
(862) 250-7125


Re: request.timeout.ms not working as expected

2015-11-10 Thread Mayuresh Gharat
How many brokers are there in your test cluster?


Thanks,

Mayuresh

On Tue, Nov 10, 2015 at 5:53 PM, Jason Gustafson  wrote:

> Hey Luke,
>
> I agree the null check seems questionable. I went ahead and created
> https://issues.apache.org/jira/browse/KAFKA-2805. At least we should have
> a
> comment clarifying why the check is correct.
>
> -Jason
>
> On Tue, Nov 10, 2015 at 2:15 PM, Luke Steensen <
> luke.steen...@braintreepayments.com> wrote:
>
> > After some more investigation, I've been able to get the expected
> behavior
> > by removing the null check here:
> >
> >
> https://github.com/apache/kafka/blob/ae5a5d7c08bb634576a414f6f2864c5b8a7e58a3/clients/src/main/java/org/apache/kafka/clients/producer/internals/RecordAccumulator.java#L220
> >
> > Hopefully someone more familiar with the code can comment, but that
> > statement does appear to be preventing the correct behavior.
> >
> > Thanks,
> > Luke
> >
> >
> > On Tue, Nov 10, 2015 at 2:15 PM, Luke Steensen <
> > luke.steen...@braintreepayments.com> wrote:
> >
> > > Hello,
> > >
> > > We've been testing recent versions of trunk and are seeing surprising
> > > behavior when trying to use the new request timeout functionality. For
> > > example, at revision ae5a5d7:
> > >
> > > # in separate terminals
> > > $ ./bin/zookeeper-server-start.sh config/zookeeper.properties
> > > $ ./bin/kafka-server-start.sh config/server.properties
> > >
> > > # set request timeout
> > > $ cat producer.properties
> > > request.timeout.ms=1000
> > >
> > > # run the verifiable producer, for example
> > > $ ./bin/kafka-verifiable-producer.sh --broker-list localhost:9092
> --topic
> > > testing --throughput 5 --producer.config producer.properties
> > >
> > > If you then kill the kafka server process, you will see the producer
> hang
> > > indefinitely. This is a very simple case, but the behavior is
> surprising.
> > > We have also found it easy to reproduce this behavior in more realistic
> > > environments with multiple brokers, custom producers, etc. The end
> result
> > > is that we're not sure how to safely decommission a broker without
> > > potentially leaving a producer with a permanently stuck request.
> > >
> > > Thanks,
> > > Luke Steensen
> > >
> > >
> >
>



-- 
-Regards,
Mayuresh R. Gharat
(862) 250-7125


Re: Controller sometimes lose ISR

2015-11-05 Thread Mayuresh Gharat
I am not sure about this. It might be related to your GC settings. But I am
not sure why it only occurs on Friday night.

Thanks,

Mayuresh

On Tue, Nov 3, 2015 at 3:01 AM, Gleb Zhukov  wrote:

> Hi, Mayuresh. No, this log before restart 61.
> But I found some interesting logs about ZK on problem broker:
>
> root@kafka3d:~# zgrep 'zookeeper state changed (Expired)'
> /var/log/kafka/*/*
> /var/log/kafka/2015-10-30/kafka-2015-10-30.log.gz:[2015-10-30
> 23:02:31,001] 284371992 [main-EventThread] INFO
>  org.I0Itec.zkclient.ZkClient  - zookeeper state changed (Expired)
>
> root@kafka3d:~# zgrep -i shut
> /var/log/kafka/2015-10-30/kafka-2015-10-30.log.gz
> [2015-10-30 23:02:31,004] 284371995 [main-EventThread] INFO
>  org.apache.zookeeper.ClientCnxn  - EventThread shut down
> at kafka.utils.ShutdownableThread.run(ShutdownableThread.scala:60)
> [2015-10-30 23:10:22,206] 284843197 [kafka-request-handler-2] INFO
>  kafka.server.ReplicaFetcherThread  - [ReplicaFetcherThread-0-77], Shutting
> down
> [2015-10-30 23:10:22,213] 284843204 [kafka-request-handler-2] INFO
>  kafka.server.ReplicaFetcherThread  - [ReplicaFetcherThread-0-77], Shutdown
> completed
> [2015-10-30 23:10:22,213] 284843204 [kafka-request-handler-2] INFO
>  kafka.server.ReplicaFetcherThread  - [ReplicaFetcherThread-1-77], Shutting
> down
> [2015-10-30 23:10:22,215] 284843206 [kafka-request-handler-2] INFO
>  kafka.server.ReplicaFetcherThread  - [ReplicaFetcherThread-1-77], Shutdown
> completed
> [2015-10-30 23:10:22,215] 284843206 [kafka-request-handler-2] INFO
>  kafka.server.ReplicaFetcherThread  - [ReplicaFetcherThread-2-77], Shutting
> down
> [2015-10-30 23:10:22,396] 284843387 [kafka-request-handler-2] INFO
>  kafka.server.ReplicaFetcherThread  - [ReplicaFetcherThread-2-77], Shutdown
> completed
> [2015-10-30 23:10:22,396] 284843387 [kafka-request-handler-2] INFO
>  kafka.server.ReplicaFetcherThread  - [ReplicaFetcherThread-3-77], Shutting
> down
> [2015-10-30 23:10:22,439] 284843430 [kafka-request-handler-2] INFO
>  kafka.server.ReplicaFetcherThread  - [ReplicaFetcherThread-3-77], Shutdown
> completed
>
> Is it related to my GC settings?
>
> KAFKA_JVM_PERFORMANCE_OPTS="-server -XX:+UseG1GC -XX:MaxGCPauseMillis=20
> -XX:InitiatingHeapOccupancyPercent=35"
> KAFKA_HEAP_OPTS="-Xmx8G"
>
> Also I attached some GC graphs from JMX
>
>
>
> On Tue, Nov 3, 2015 at 1:21 AM, Mayuresh Gharat <
> gharatmayures...@gmail.com> wrote:
>
>> The broker 61 some how falls behind in fetching from the leader brokers
>> and
>> hence falls out of the ISR.
>>
>> [2015-10-30 23:02:34,012] ERROR Controller 61 epoch 2233 initiated state
>> change of replica 61 for partition [test-res-met.server_logs.conv,18] from
>> OnlineReplica to OfflineReplica...
>> means that the current controller underwent a failure and came back up,
>> but
>> some other controller was elected in meant time. The old controller will
>> eventually resign.
>> Is this log after you rebounce 61?
>>
>>
>> Thanks,
>>
>> Mayuresh
>>
>> On Sat, Oct 31, 2015 at 5:09 AM, Gleb Zhukov  wrote:
>>
>> > Hi, Everybody!
>> >
>> > Every week on Friday's night I lose ISR for some partitions in my kafka
>> > cluster:
>> >
>> > Topic: test-res-met.server_logs.conv  Partition: 18Leader: 45
>> > Replicas: 45,61Isr: 45
>> > Current controller: 45
>> > Partitions with leader #61 are available, I lose broker #61 only as ISR
>> for
>> > partitions with another leader.
>> >
>> > State logs on broker 61:
>> >
>> > [2015-10-30 23:02:34,012] ERROR Controller 61 epoch 2233 initiated state
>> > change of replica 61 for partition [test-res-met.server_logs.conv,18]
>> from
>> > OnlineReplica to OfflineReplic
>> > a failed (state.change.logger)
>> > kafka.common.StateChangeFailedException: Leader and isr path written by
>> > another controller. This probablymeans the current controller with epoch
>> > 2233 went through a soft failure
>> > and another controller was elected with epoch 2234. Aborting state
>> change
>> > by this controller
>> > at
>> >
>> >
>> kafka.controller.KafkaController.removeReplicaFromIsr(KafkaController.scala:1002)
>> > at
>> >
>> >
>> kafka.controller.ReplicaStateMachine.handleStateChange(ReplicaStateMachine.scala:250)
>> > at
>> >
>> >
>> kafka.controller.ReplicaStateMachine$$anonfun$handleStateChanges$2.apply(ReplicaStateMachine.scala:114)
>> > 

Re: Problems getting offsets

2015-11-02 Thread Mayuresh Gharat
plication factor has to be <= than number of
> live brokers when topic is created. Consumer offsets topic creation does
> not fail (although it takes time to finish), if it is done after some other
> topic creation has been requested, because that other topic creation
> request makes broker aware (updates its cache) that it is a sole live
> broker in the cluster, and then consumer offsets topic creation will ignore
> requested/configured replication factor of 3 and will (silently) fallback
> and use replication factor of 1 (= number of live brokers in cluster).
>
> Maybe things would be cleaner if topic creation allowed non-live brokers to
> be used in replica assignment. Then not only would (consumer offsets) topic
> creation not fail if there are not enough of live brokers to meet
> configured replication factor, but even better Kafka cluster controller
> broker could do this initialization of consumer offsets topic if it doesn't
> already exist.
>
> Btw, looking up topic metadata in zookeeper and checking that all of its
> partitions have a leader assigned is not enough for topic partitions to be
> writable/readable/online - if broker is still not aware that it is the
> leader for a given partition (e.g. broker metadata cache is outdated), that
> partition would still not be writable/readable. One could lookup topic
> metadata from Kafka brokers, and more specifically from lead brokers of
> partitions to check if they are ware that they are lead brokers for that
> partition. Not even that is guarantee that publishing will be successful,
> since leadership can change at any moment, and other things can fail (e.g.
> unreliable network), so one has to handle errors. But, as you see problem
> is not with your topic, but with internal consumer offsets topic.
>
>
> On Mon, Nov 2, 2015 at 1:56 AM, Stevo Slavić  wrote:
>
> > Hello David,
> >
> > In short, problem is not with your topic, it is with consumer offsets
> > topic initialization.
> >
> > You could modify your code to just retry fetching offsets (until
> > successful where successful is also return of -1 offset, or timeout), or
> > alternatively you could trigger consumer offsets topic init (by fetching
> > consumer offsets topic metadata from broker) before issuing offset fetch
> > request.
> >
> > For the init option you have (at least) two alternatives: do it after or
> > before request to create your topic.
> >
> > If you chose to do consumer offsets topic init before request to create
> > your (first) topic, make sure to configure broker with replication factor
> > of 1 for consumer offsets topic (offsets.topic.replication.factor config
> > property) otherwise consumer offsets topic init will fail. Do this config
> > change only for integration test with single broker, but not for
> production
> > cluster which should have 3 or more brokers anyway.
> >
> > Kind regards,
> > Stevo Slavic.
> >
> >
> >
> > On Sun, Nov 1, 2015 at 9:38 PM, David Corbin 
> wrote:
> >
> >> Yes.  I know all of that.  I guess my original message was not clear.
> >>
> >> The topic metadata indicates that there is a leader.
> >>
> >> Please see my comments interspersed below.
> >> Thanks
> >> David Corbin
> >>
> >> On 10/29/15, 17:53, "Mayuresh Gharat" 
> wrote:
> >>
> >> > NotCoordinatorForConsumerException is for consumer side.
> >>
> >> I am attempting to consume messages, so this means I¹m getting an
> >> exception that might be reasonable for what I¹m doing.
> >>
> >> >I think if you
> >> >are using simpleConsumer, then you have to do your own offset
> management.
> >>
> >> I am attempting to manage my offsets myself; that¹s why I called
> >> SimpleConsumer#fetchOffsets() which is throwing the exception.  I assume
> >> you are not suggesting that my offset management should be done entirely
> >> ³outside the scope² of Kafka.  If that is true, why do 4 of 3 methods on
> >> SimpleConsumer refer to offsets?
> >>
> >>
> >> >
> >> >TopicMetadata tells you whether there is a leader for the topic
> >> >partitions,
> >> >replicas, ISR.
> >>
> >>
> >> Yes.  After I create the topic, I ³poll² the metadata until the one
> >> topic/partition has a leader, before moving on.  If the metadata says
> >> there is a leader, I don¹t understand whyI would get
> >> NotCoordinatorForConsumerException
> >>
> >> >
> >> >Thanks,
> >> >
>

Re: Controller sometimes lose ISR

2015-11-02 Thread Mayuresh Gharat
The broker 61 some how falls behind in fetching from the leader brokers and
hence falls out of the ISR.

[2015-10-30 23:02:34,012] ERROR Controller 61 epoch 2233 initiated state
change of replica 61 for partition [test-res-met.server_logs.conv,18] from
OnlineReplica to OfflineReplica...
means that the current controller underwent a failure and came back up, but
some other controller was elected in meant time. The old controller will
eventually resign.
Is this log after you rebounce 61?


Thanks,

Mayuresh

On Sat, Oct 31, 2015 at 5:09 AM, Gleb Zhukov  wrote:

> Hi, Everybody!
>
> Every week on Friday's night I lose ISR for some partitions in my kafka
> cluster:
>
> Topic: test-res-met.server_logs.conv  Partition: 18Leader: 45
> Replicas: 45,61Isr: 45
> Current controller: 45
> Partitions with leader #61 are available, I lose broker #61 only as ISR for
> partitions with another leader.
>
> State logs on broker 61:
>
> [2015-10-30 23:02:34,012] ERROR Controller 61 epoch 2233 initiated state
> change of replica 61 for partition [test-res-met.server_logs.conv,18] from
> OnlineReplica to OfflineReplic
> a failed (state.change.logger)
> kafka.common.StateChangeFailedException: Leader and isr path written by
> another controller. This probablymeans the current controller with epoch
> 2233 went through a soft failure
> and another controller was elected with epoch 2234. Aborting state change
> by this controller
> at
>
> kafka.controller.KafkaController.removeReplicaFromIsr(KafkaController.scala:1002)
> at
>
> kafka.controller.ReplicaStateMachine.handleStateChange(ReplicaStateMachine.scala:250)
> at
>
> kafka.controller.ReplicaStateMachine$$anonfun$handleStateChanges$2.apply(ReplicaStateMachine.scala:114)
> at
>
> kafka.controller.ReplicaStateMachine$$anonfun$handleStateChanges$2.apply(ReplicaStateMachine.scala:114)
> at
> scala.collection.immutable.HashSet$HashSet1.foreach(HashSet.scala:153)
> at
> scala.collection.immutable.HashSet$HashTrieSet.foreach(HashSet.scala:306)
> at
> scala.collection.immutable.HashSet$HashTrieSet.foreach(HashSet.scala:306)
> at
> scala.collection.immutable.HashSet$HashTrieSet.foreach(HashSet.scala:306)
> at
>
> kafka.controller.ReplicaStateMachine.handleStateChanges(ReplicaStateMachine.scala:114)
> at
> kafka.controller.KafkaController.onBrokerFailure(KafkaController.scala:451)
> at
>
> kafka.controller.ReplicaStateMachine$BrokerChangeListener$$anonfun$handleChildChange$1$$anonfun$apply$mcV$sp$1.apply$mcV$sp(ReplicaStateMachine.scala:373)
> at
>
> kafka.controller.ReplicaStateMachine$BrokerChangeListener$$anonfun$handleChildChange$1$$anonfun$apply$mcV$sp$1.apply(ReplicaStateMachine.scala:359)
> at
>
> kafka.controller.ReplicaStateMachine$BrokerChangeListener$$anonfun$handleChildChange$1$$anonfun$apply$mcV$sp$1.apply(ReplicaStateMachine.scala:359)
> at kafka.metrics.KafkaTimer.time(KafkaTimer.scala:33)
> at
>
> kafka.controller.ReplicaStateMachine$BrokerChangeListener$$anonfun$handleChildChange$1.apply$mcV$sp(ReplicaStateMachine.scala:358)
> at
>
> kafka.controller.ReplicaStateMachine$BrokerChangeListener$$anonfun$handleChildChange$1.apply(ReplicaStateMachine.scala:357)
> at
>
> kafka.controller.ReplicaStateMachine$BrokerChangeListener$$anonfun$handleChildChange$1.apply(ReplicaStateMachine.scala:357)
> at kafka.utils.Utils$.inLock(Utils.scala:535)
> at
>
> kafka.controller.ReplicaStateMachine$BrokerChangeListener.handleChildChange(ReplicaStateMachine.scala:356)
> at org.I0Itec.zkclient.ZkClient$7.run(ZkClient.java:568)
> at org.I0Itec.zkclient.ZkEventThread.run(ZkEventThread.java:71)
>
> Restart of bad broker (#61) helps.
> We have 7day retention for our logs (log.retention.hours=168). Also I
> checked ZK and cron. Could anyone explain such issue? Kafka 0.8.2.1.
>



-- 
-Regards,
Mayuresh R. Gharat
(862) 250-7125


Re: Offset storage

2015-10-29 Thread Mayuresh Gharat
You can use either of them.
The new kafka consumer (still under development) does not store offsets in
zookeeper. It stores in kafka.

Thanks,

Mayuresh

On Wed, Oct 28, 2015 at 7:26 AM, Burtsev, Kirill <
kirill.burt...@cmegroup.com> wrote:

> Which one is considered preferred offset storage: zookeeper or kafka? I
> see that default for 0.8.2.2 high level consumer is zookeeper, but I saw a
> few references about migrating offset storage to kafka.
>
> Thanks
>



-- 
-Regards,
Mayuresh R. Gharat
(862) 250-7125


Re: Problems getting offsets

2015-10-29 Thread Mayuresh Gharat
 NotCoordinatorForConsumerException is for consumer side. I think if you
are using simpleConsumer, then you have to do your own offset management.

TopicMetadata tells you whether there is a leader for the topic partitions,
replicas, ISR.

Thanks,

Mayuresh

On Wed, Oct 28, 2015 at 8:23 AM, David Corbin  wrote:

> I'm working on a project that I hope to use the SimpleConsumer on.  I'm
> trying to write some test code around our code that is wrapping the
> SimpleConsumer.
>
> The environment is 1 kafka broker, and 1 zookeeper
> The test goes like this:
>
> Create a topic ("foo", 1 partition, 1 replica)
> Create a Producer, and send a message
> Create a SimpleConsumer, and try to read the offset
>
> Failure with: NotCoordinatorForConsumerException
>
> If I continue to require for an extended period, I continue to get that
> exception.
>
> As near as I can tell, the topic metadata says there is a leader, but the
> the broker thinks it's being rebalanced.
> I've done the above test immediately after stop, clean out old data, and
> restart both zookeeper and kafka.
>
> Suggestions welcome.
>
>


-- 
-Regards,
Mayuresh R. Gharat
(862) 250-7125


Re: Drop Topic from Consumer

2015-10-28 Thread Mayuresh Gharat
Np. :)

Thanks,

Mayuresh

On Wed, Oct 28, 2015 at 2:27 PM, Casey Daniell 
wrote:

> Thanks Mayuresh.
> I was able to use Zookeeper's zkCli.sh and find the offsets / owners for
> the consumer that's no longer using a topicand delete them!
> Issue solved. thanks again.> Date: Tue, 27 Oct 2015 10:17:12 -0700
> > Subject: Re: Drop Topic from Consumer
> > From: gharatmayures...@gmail.com
> > To: users@kafka.apache.org
> >
> > Is it a wildcard consumer or you specify the topics manually?
> > You can do this by modifying the znode in zookeeper for that consumer
> group.
> >
> > Thanks,
> >
> > Mayuresh
> >
> > On Tue, Oct 27, 2015 at 8:09 AM, Casey Daniell <
> casey_dani...@hotmail.com>
> > wrote:
> >
> > > We are using Kafka 0.8.1 and have a Consumer that used to pull multiple
> > > topics. One of these topics is no longer needed by this
> consumer/group, so
> > > when I look at
> > > /usr/local/kafka/bin/kafka-run-class.sh
> kafka.tools.ConsumerOffsetChecker
> > > --zkconnect localhost --group 
> > > We see the LAG for this measurement is high, but we know why. Is there
> > > anyway to let ZK know that this consumer group will never catch up for
> this
> > > topic, since it's not consumed any longer?
> > >
> >
> >
> >
> >
> > --
> > -Regards,
> > Mayuresh R. Gharat
> > (862) 250-7125
>
>



-- 
-Regards,
Mayuresh R. Gharat
(862) 250-7125


Re: Drop Topic from Consumer

2015-10-27 Thread Mayuresh Gharat
Is it a wildcard consumer or you specify the topics manually?
You can do this by modifying the znode in zookeeper for that consumer group.

Thanks,

Mayuresh

On Tue, Oct 27, 2015 at 8:09 AM, Casey Daniell 
wrote:

> We are using Kafka 0.8.1 and have a Consumer that used to pull multiple
> topics. One of these topics is no longer needed by this consumer/group, so
> when I look at
> /usr/local/kafka/bin/kafka-run-class.sh kafka.tools.ConsumerOffsetChecker
> --zkconnect localhost --group 
> We see the LAG for this measurement is high, but we know why. Is there
> anyway to let ZK know that this consumer group will never catch up for this
> topic, since it's not consumed any longer?
>




-- 
-Regards,
Mayuresh R. Gharat
(862) 250-7125


Re: Offset Management in Kafka

2015-10-15 Thread Mayuresh Gharat
This might help :
https://cwiki.apache.org/confluence/display/KAFKA/Committing+and+fetching+consumer+offsets+in+Kafka

http://www.slideshare.net/jjkoshy/offset-management-in-kafka


Thanks,

Mayuresh

On Thu, Oct 15, 2015 at 5:39 AM, Kiran Singh  wrote:

> Hi
>
> I am looking for offset management in Kafka.
> Is there any link which can explain how offset are updated in Kafka once a
> consumer read a message.
>



-- 
-Regards,
Mayuresh R. Gharat
(862) 250-7125


Re: Getting error code 15 while connect to the offset manager.

2015-10-15 Thread Mayuresh Gharat
This means that there is no coordinator available for the consumer group.
Can you send the log. Also which consumer are you using. Is there a wiki
you are referring to, to implement the commit and fetch offset API?

Thanks,

Mayuresh

On Thu, Oct 15, 2015 at 7:17 AM, Kiran Singh  wrote:

> Hi
>
> I am trying to implement fetch and commit offset API. But i am getting
> error code 15 "ConsumerCoordinatorNotAvailableCode" while connecting to
> kafka.
>
> Can any one explain this error?
>
> Thanks and Regards
> Kiran Singh
>



-- 
-Regards,
Mayuresh R. Gharat
(862) 250-7125


Re: How Kafka work to send message to consumer?

2015-10-14 Thread Mayuresh Gharat
Kafka is a pull mechanism.

Thanks,

Mayuresh

On Wed, Oct 14, 2015 at 9:47 PM, Kiran Singh  wrote:

> Hello
>
> I have one query related to Kafka data flow towards consumer.
>
> Means whether kafka used push or pull technic to send data to a consumer
> using high level API?
>



-- 
-Regards,
Mayuresh R. Gharat
(862) 250-7125


Re: some producers stuck when one broker is bad

2015-09-11 Thread Mayuresh Gharat
So how did you detect that the broker is bad? If bouncing brokers solved
the problem and you did not find any unusual things in the logs on brokers
, it is likely that the process was up but was isolated from producer
request and since the producer did not have timeout the producer buffer
filled up.

Thanks,

Mayuresh


On Thu, Sep 10, 2015 at 11:20 PM, Steven Wu  wrote:

> frankly I don't know exactly what went BAD for that broker. process is
> still UP.
>
> On Wed, Sep 9, 2015 at 10:10 AM, Mayuresh Gharat <
> gharatmayures...@gmail.com
> > wrote:
>
> > 1) any suggestion on how to identify the bad broker(s)?
> > ---> At Linkedin we have alerts that are setup using our internal scripts
> > for detecting if a broker has gone bad. We also check the under
> replicated
> > partitions and that can tell us which broker has gone bad. By broker
> going
> > bad, it can mean different things. Like the broker is alive but not
> > responding and is completely isolated or the broker has gone down, etc.
> > Can you tell us what you meant by your BROKER went BAD?
> >
> > 2) why bouncing of the bad broker got the producers recovered
> automatically
> > > This is because as you bounced, the leaders for other partitions
> > changed and producer sent out a TopicMetadataRequest which tells the
> > producer who are the new leaders for the partitions and the producer
> > started sending messages to those brokers.
> >
> > KAFKA-2120 will handle all of this for you automatically.
> >
> > Thanks,
> >
> > Mayuresh
> >
> > On Tue, Sep 8, 2015 at 8:26 PM, Steven Wu  wrote:
> >
> > > We have observed that some producer instances stopped sending traffic
> to
> > > brokers, because the memory buffer is full. those producers got stuck
> in
> > > this state permanently. Because we couldn't find out which broker is
> bad
> > > here. So I did a rolling restart the all brokers. after the bad broker
> > got
> > > bounce, those stuck producers out of the woods automatically.
> > >
> > > I don't know the exact problem with that bad broker. it seems to me
> that
> > > some ZK states are inconsistent.
> > >
> > > I know timeout fix from KAFKA-2120 can probably avoid the permanent
> > stuck.
> > > Here are some additional questions.
> > > 1) any suggestion on how to identify the bad broker(s)?
> > > 2) why bouncing of the bad broker got the producers recovered
> > automatically
> > > (without restarting producers)
> > >
> > > producer: 0.8.2.1
> > > broker: 0.8.2.1
> > >
> > > Thanks,
> > > Steven
> > >
> >
> >
> >
> > --
> > -Regards,
> > Mayuresh R. Gharat
> > (862) 250-7125
> >
>



-- 
-Regards,
Mayuresh R. Gharat
(862) 250-7125


Re: some producers stuck when one broker is bad

2015-09-09 Thread Mayuresh Gharat
1) any suggestion on how to identify the bad broker(s)?
---> At Linkedin we have alerts that are setup using our internal scripts
for detecting if a broker has gone bad. We also check the under replicated
partitions and that can tell us which broker has gone bad. By broker going
bad, it can mean different things. Like the broker is alive but not
responding and is completely isolated or the broker has gone down, etc.
Can you tell us what you meant by your BROKER went BAD?

2) why bouncing of the bad broker got the producers recovered automatically
> This is because as you bounced, the leaders for other partitions
changed and producer sent out a TopicMetadataRequest which tells the
producer who are the new leaders for the partitions and the producer
started sending messages to those brokers.

KAFKA-2120 will handle all of this for you automatically.

Thanks,

Mayuresh

On Tue, Sep 8, 2015 at 8:26 PM, Steven Wu  wrote:

> We have observed that some producer instances stopped sending traffic to
> brokers, because the memory buffer is full. those producers got stuck in
> this state permanently. Because we couldn't find out which broker is bad
> here. So I did a rolling restart the all brokers. after the bad broker got
> bounce, those stuck producers out of the woods automatically.
>
> I don't know the exact problem with that bad broker. it seems to me that
> some ZK states are inconsistent.
>
> I know timeout fix from KAFKA-2120 can probably avoid the permanent stuck.
> Here are some additional questions.
> 1) any suggestion on how to identify the bad broker(s)?
> 2) why bouncing of the bad broker got the producers recovered automatically
> (without restarting producers)
>
> producer: 0.8.2.1
> broker: 0.8.2.1
>
> Thanks,
> Steven
>



-- 
-Regards,
Mayuresh R. Gharat
(862) 250-7125


Re: async producer callback not reliable

2015-09-09 Thread Mayuresh Gharat
Make sure you have inflight requests set to 1 if you want ordered messages.

Thanks,

Mayuresh

On Tue, Sep 8, 2015 at 5:55 AM, Damian Guy  wrote:

> Can you do:
> producer.send(...)
> ...
> producer.send(...)
> producer.flush()
>
> By the time the flush returns all of your messages should have been sent
>
> On 8 September 2015 at 11:50, jinxing  wrote:
>
> > if i wanna send the message syncronously i can do as below:
> > future=producer.send(producerRecord, callback);
> > future.get();
> >
> >
> > but the throughput decrease dramatically;
> >
> >
> > is there a method to send the messages by batch but synchronously ?
> >
> >
>



-- 
-Regards,
Mayuresh R. Gharat
(862) 250-7125


Re: Question regarding Kafka EventThread

2015-08-05 Thread Mayuresh Gharat
Your producer might get stuck if the kafka Broker becomes unreachable since
there is no socketTimeout on new producer. We are adding that in KAFKA-2120.

Thanks,

Mayuresh

On Wed, Aug 5, 2015 at 3:47 PM, Chinmay Soman 
wrote:

> After digging in the 0.8.2 code, it seems like the callback is not getting
> invoked since 'handleDisconnections' is not adding a disconnected
> ClientResponse to the list of responses. I do see a 'Node 0 disconnected'
> message. However, I don't see a 'Cancelled request due to node being
> disconnected' message.
>
> Is this expected ? Is any of this because I'm only using 1 Kafka broker
> node ? Will the producer get stuck if the Kafka cluster is not reachable
> for whatever reason ?
>
> On Wed, Aug 5, 2015 at 12:12 PM, Chinmay Soman 
> wrote:
>
> > Hey guys,
> >
> > We're using Kafka version 0.8.2.0 and using the Java producer
> > (KafkaProducer) to write to a single Kafka broker with acks=1. There's a
> > callback defined on every produce request which takes corrective action
> if
> > the write fails.
> >
> > What we see is that, if the broker is down for an extended period of
> time,
> > any write to the producer gets stuck, with this message:
> >
> > [2015-08-05 12:02:04,743] WARN Error in I/O with /
> > (org.apache.kafka.common.network.Selector:276)
> > java.net.ConnectException: Connection refused
> > at sun.nio.ch.SocketChannelImpl.checkConnect(Native Method)
> > at sun.nio.ch.SocketChannelImpl.finishConnect(SocketChannelImpl.java:739)
> > at org.apache.kafka.common.network.Selector.poll(Selector.java:238)
> > at org.apache.kafka.clients.NetworkClient.poll(NetworkClient.java:192)
> > at
> org.apache.kafka.clients.producer.internals.Sender.run(Sender.java:191)
> > at
> org.apache.kafka.clients.producer.internals.Sender.run(Sender.java:122)
> > at java.lang.Thread.run(Thread.java:745)
> >
> > This goes on repeatedly and the callback is never invoked. Is there a way
> > to get the Kafka EventThread to give up after a while ?
> >
> >
> > --
> > Thanks and regards
> >
> > Chinmay Soman
> >
>
>
>
> --
> Thanks and regards
>
> Chinmay Soman
>



-- 
-Regards,
Mayuresh R. Gharat
(862) 250-7125


Re: Question regarding Kafka EventThread

2015-08-05 Thread Mayuresh Gharat
What do you mean by Broker is down? Has the process shutdown and exited or
is the broker not reachable?

Thanks,

Mayuresh

On Wed, Aug 5, 2015 at 12:12 PM, Chinmay Soman 
wrote:

> Hey guys,
>
> We're using Kafka version 0.8.2.0 and using the Java producer
> (KafkaProducer) to write to a single Kafka broker with acks=1. There's a
> callback defined on every produce request which takes corrective action if
> the write fails.
>
> What we see is that, if the broker is down for an extended period of time,
> any write to the producer gets stuck, with this message:
>
> [2015-08-05 12:02:04,743] WARN Error in I/O with /
> (org.apache.kafka.common.network.Selector:276)
> java.net.ConnectException: Connection refused
> at sun.nio.ch.SocketChannelImpl.checkConnect(Native Method)
> at sun.nio.ch.SocketChannelImpl.finishConnect(SocketChannelImpl.java:739)
> at org.apache.kafka.common.network.Selector.poll(Selector.java:238)
> at org.apache.kafka.clients.NetworkClient.poll(NetworkClient.java:192)
> at org.apache.kafka.clients.producer.internals.Sender.run(Sender.java:191)
> at org.apache.kafka.clients.producer.internals.Sender.run(Sender.java:122)
> at java.lang.Thread.run(Thread.java:745)
>
> This goes on repeatedly and the callback is never invoked. Is there a way
> to get the Kafka EventThread to give up after a while ?
>
>
> --
> Thanks and regards
>
> Chinmay Soman
>



-- 
-Regards,
Mayuresh R. Gharat
(862) 250-7125


Re: Log Deletion Behavior

2015-07-27 Thread Mayuresh Gharat
Hi Jiefu,

The topic will stay forever. You can do delete topic operation to get rid
of the topic.

Thanks,

Mayuresh

On Mon, Jul 27, 2015 at 11:19 AM, JIEFU GONG  wrote:

> Mayuresh,
>
> Yes, it seems like I misunderstood the behavior of log deletion but indeed
> my log segments were deleted after a specified amount of time. I have a
> small follow-up question, it seems that when the logs are deleted the topic
> persists and can be republished too -- is there a configuration for how
> long a topic persists or does it stay forever until it is manually marked
> for deletion?
>
> Also @Grant, thank you very much for your help as well. I ended up using
> the ms update configuration and understand the broker configs better.
> Thanks!
>
> On Mon, Jul 27, 2015 at 9:27 AM, Mayuresh Gharat <
> gharatmayures...@gmail.com
> > wrote:
>
> > Hi Jiefu,
> >
> > Any update on this? Were you able to delete those log segments?
> >
> > Thanks,
> >
> > Mayuresh
> >
> > On Fri, Jul 24, 2015 at 7:14 PM, Mayuresh Gharat <
> > gharatmayures...@gmail.com
> > > wrote:
> >
> > > To add on, the main thing here is you should be using only one of these
> > > properties.
> > >
> > > Thanks,
> > >
> > > Mayuresh
> > >
> > > On Fri, Jul 24, 2015 at 6:47 PM, Mayuresh Gharat <
> > > gharatmayures...@gmail.com> wrote:
> > >
> > >> Yes. It should. Do not set other retention settings. Just use the
> > "hours"
> > >> settings.
> > >> Let me know about this :)
> > >>
> > >> Thanks,
> > >>
> > >> Mayuresh
> > >>
> > >> On Fri, Jul 24, 2015 at 6:43 PM, JIEFU GONG 
> wrote:
> > >>
> > >>> Mayuresh, thanks for your comment. I won't be able to change these
> > >>> settings
> > >>> until next Monday, but just so confirm you are saying that if I
> restart
> > >>> the
> > >>> brokers my logs should delete themselves with respect to the newest
> > >>> settings, correct?
> > >>> ᐧ
> > >>>
> > >>> On Fri, Jul 24, 2015 at 6:29 PM, Mayuresh Gharat <
> > >>> gharatmayures...@gmail.com
> > >>> > wrote:
> > >>>
> > >>> > No. This should not happen. At Linkedin we just use the log
> retention
> > >>> > hours. Try using that. Chang e it and bounce the broker. It should
> > >>> work.
> > >>> > Also looking back at the config's I am not sure why we had 3
> > different
> > >>> > configs for the same property :
> > >>> >
> > >>> > "log.retention.ms"
> > >>> > "log.retention.minutes"
> > >>> > "log.retention.hours"
> > >>> >
> > >>> > We should probably be having just the milliseconds.
> > >>> >
> > >>> > Thanks,
> > >>> >
> > >>> > Mayuresh
> > >>> >
> > >>> > On Fri, Jul 24, 2015 at 4:12 PM, JIEFU GONG 
> > >>> wrote:
> > >>> >
> > >>> > > Hi all,
> > >>> > >
> > >>> > > I have a few broad questions on how log deletion works,
> > specifically
> > >>> in
> > >>> > > conjunction with the log.retention.time setting. Say I published
> > some
> > >>> > > messages to some topics when the configuration was originally set
> > to
> > >>> > > something like log.retention.hours=168 (default). If I publish
> > these
> > >>> > > messages successfully, then later set the configuration to
> > something
> > >>> like
> > >>> > > log.retention.minutes=1, are those logs supposed to persist for
> the
> > >>> > newest
> > >>> > > settings or the old settings? Right now my logs are refusing to
> > >>> delete
> > >>> > > themselves unless I specifically mark them for deletion -- is
> this
> > >>> the
> > >>> > > correct/anticipated/wanted behavior?
> > >>> > >
> > >>> > > Thanks for the help!
> > >>> > >
> > >>> > > --
> > >>> > >
> > >>> > > Jiefu Gong
> > >>> > > University of California, Berkeley | Class of 2017
> > >>> > > B.A Computer Science | College of Letters and Sciences
> > >>> > >
> > >>> > > jg...@berkeley.edu  | (925) 400-3427
> > >>> > >
> > >>> >
> > >>> >
> > >>> >
> > >>> > --
> > >>> > -Regards,
> > >>> > Mayuresh R. Gharat
> > >>> > (862) 250-7125
> > >>> >
> > >>>
> > >>>
> > >>>
> > >>> --
> > >>>
> > >>> Jiefu Gong
> > >>> University of California, Berkeley | Class of 2017
> > >>> B.A Computer Science | College of Letters and Sciences
> > >>>
> > >>> jg...@berkeley.edu  | (925) 400-3427
> > >>>
> > >>
> > >>
> > >>
> > >> --
> > >> -Regards,
> > >> Mayuresh R. Gharat
> > >> (862) 250-7125
> > >>
> > >
> > >
> > >
> > > --
> > > -Regards,
> > > Mayuresh R. Gharat
> > > (862) 250-7125
> > >
> >
> >
> >
> > --
> > -Regards,
> > Mayuresh R. Gharat
> > (862) 250-7125
> >
>
>
>
> --
>
> Jiefu Gong
> University of California, Berkeley | Class of 2017
> B.A Computer Science | College of Letters and Sciences
>
> jg...@berkeley.edu  | (925) 400-3427
>



-- 
-Regards,
Mayuresh R. Gharat
(862) 250-7125


Re: Log Deletion Behavior

2015-07-27 Thread Mayuresh Gharat
Hi Jiefu,

Any update on this? Were you able to delete those log segments?

Thanks,

Mayuresh

On Fri, Jul 24, 2015 at 7:14 PM, Mayuresh Gharat  wrote:

> To add on, the main thing here is you should be using only one of these
> properties.
>
> Thanks,
>
> Mayuresh
>
> On Fri, Jul 24, 2015 at 6:47 PM, Mayuresh Gharat <
> gharatmayures...@gmail.com> wrote:
>
>> Yes. It should. Do not set other retention settings. Just use the "hours"
>> settings.
>> Let me know about this :)
>>
>> Thanks,
>>
>> Mayuresh
>>
>> On Fri, Jul 24, 2015 at 6:43 PM, JIEFU GONG  wrote:
>>
>>> Mayuresh, thanks for your comment. I won't be able to change these
>>> settings
>>> until next Monday, but just so confirm you are saying that if I restart
>>> the
>>> brokers my logs should delete themselves with respect to the newest
>>> settings, correct?
>>> ᐧ
>>>
>>> On Fri, Jul 24, 2015 at 6:29 PM, Mayuresh Gharat <
>>> gharatmayures...@gmail.com
>>> > wrote:
>>>
>>> > No. This should not happen. At Linkedin we just use the log retention
>>> > hours. Try using that. Chang e it and bounce the broker. It should
>>> work.
>>> > Also looking back at the config's I am not sure why we had 3 different
>>> > configs for the same property :
>>> >
>>> > "log.retention.ms"
>>> > "log.retention.minutes"
>>> > "log.retention.hours"
>>> >
>>> > We should probably be having just the milliseconds.
>>> >
>>> > Thanks,
>>> >
>>> > Mayuresh
>>> >
>>> > On Fri, Jul 24, 2015 at 4:12 PM, JIEFU GONG 
>>> wrote:
>>> >
>>> > > Hi all,
>>> > >
>>> > > I have a few broad questions on how log deletion works, specifically
>>> in
>>> > > conjunction with the log.retention.time setting. Say I published some
>>> > > messages to some topics when the configuration was originally set to
>>> > > something like log.retention.hours=168 (default). If I publish these
>>> > > messages successfully, then later set the configuration to something
>>> like
>>> > > log.retention.minutes=1, are those logs supposed to persist for the
>>> > newest
>>> > > settings or the old settings? Right now my logs are refusing to
>>> delete
>>> > > themselves unless I specifically mark them for deletion -- is this
>>> the
>>> > > correct/anticipated/wanted behavior?
>>> > >
>>> > > Thanks for the help!
>>> > >
>>> > > --
>>> > >
>>> > > Jiefu Gong
>>> > > University of California, Berkeley | Class of 2017
>>> > > B.A Computer Science | College of Letters and Sciences
>>> > >
>>> > > jg...@berkeley.edu  | (925) 400-3427
>>> > >
>>> >
>>> >
>>> >
>>> > --
>>> > -Regards,
>>> > Mayuresh R. Gharat
>>> > (862) 250-7125
>>> >
>>>
>>>
>>>
>>> --
>>>
>>> Jiefu Gong
>>> University of California, Berkeley | Class of 2017
>>> B.A Computer Science | College of Letters and Sciences
>>>
>>> jg...@berkeley.edu  | (925) 400-3427
>>>
>>
>>
>>
>> --
>> -Regards,
>> Mayuresh R. Gharat
>> (862) 250-7125
>>
>
>
>
> --
> -Regards,
> Mayuresh R. Gharat
> (862) 250-7125
>



-- 
-Regards,
Mayuresh R. Gharat
(862) 250-7125


Re: Log Deletion Behavior

2015-07-24 Thread Mayuresh Gharat
To add on, the main thing here is you should be using only one of these
properties.

Thanks,

Mayuresh

On Fri, Jul 24, 2015 at 6:47 PM, Mayuresh Gharat  wrote:

> Yes. It should. Do not set other retention settings. Just use the "hours"
> settings.
> Let me know about this :)
>
> Thanks,
>
> Mayuresh
>
> On Fri, Jul 24, 2015 at 6:43 PM, JIEFU GONG  wrote:
>
>> Mayuresh, thanks for your comment. I won't be able to change these
>> settings
>> until next Monday, but just so confirm you are saying that if I restart
>> the
>> brokers my logs should delete themselves with respect to the newest
>> settings, correct?
>> ᐧ
>>
>> On Fri, Jul 24, 2015 at 6:29 PM, Mayuresh Gharat <
>> gharatmayures...@gmail.com
>> > wrote:
>>
>> > No. This should not happen. At Linkedin we just use the log retention
>> > hours. Try using that. Chang e it and bounce the broker. It should work.
>> > Also looking back at the config's I am not sure why we had 3 different
>> > configs for the same property :
>> >
>> > "log.retention.ms"
>> > "log.retention.minutes"
>> > "log.retention.hours"
>> >
>> > We should probably be having just the milliseconds.
>> >
>> > Thanks,
>> >
>> > Mayuresh
>> >
>> > On Fri, Jul 24, 2015 at 4:12 PM, JIEFU GONG  wrote:
>> >
>> > > Hi all,
>> > >
>> > > I have a few broad questions on how log deletion works, specifically
>> in
>> > > conjunction with the log.retention.time setting. Say I published some
>> > > messages to some topics when the configuration was originally set to
>> > > something like log.retention.hours=168 (default). If I publish these
>> > > messages successfully, then later set the configuration to something
>> like
>> > > log.retention.minutes=1, are those logs supposed to persist for the
>> > newest
>> > > settings or the old settings? Right now my logs are refusing to delete
>> > > themselves unless I specifically mark them for deletion -- is this the
>> > > correct/anticipated/wanted behavior?
>> > >
>> > > Thanks for the help!
>> > >
>> > > --
>> > >
>> > > Jiefu Gong
>> > > University of California, Berkeley | Class of 2017
>> > > B.A Computer Science | College of Letters and Sciences
>> > >
>> > > jg...@berkeley.edu  | (925) 400-3427
>> > >
>> >
>> >
>> >
>> > --
>> > -Regards,
>> > Mayuresh R. Gharat
>> > (862) 250-7125
>> >
>>
>>
>>
>> --
>>
>> Jiefu Gong
>> University of California, Berkeley | Class of 2017
>> B.A Computer Science | College of Letters and Sciences
>>
>> jg...@berkeley.edu  | (925) 400-3427
>>
>
>
>
> --
> -Regards,
> Mayuresh R. Gharat
> (862) 250-7125
>



-- 
-Regards,
Mayuresh R. Gharat
(862) 250-7125


Re: Log Deletion Behavior

2015-07-24 Thread Mayuresh Gharat
Yes. It should. Do not set other retention settings. Just use the "hours"
settings.
Let me know about this :)

Thanks,

Mayuresh

On Fri, Jul 24, 2015 at 6:43 PM, JIEFU GONG  wrote:

> Mayuresh, thanks for your comment. I won't be able to change these settings
> until next Monday, but just so confirm you are saying that if I restart the
> brokers my logs should delete themselves with respect to the newest
> settings, correct?
> ᐧ
>
> On Fri, Jul 24, 2015 at 6:29 PM, Mayuresh Gharat <
> gharatmayures...@gmail.com
> > wrote:
>
> > No. This should not happen. At Linkedin we just use the log retention
> > hours. Try using that. Chang e it and bounce the broker. It should work.
> > Also looking back at the config's I am not sure why we had 3 different
> > configs for the same property :
> >
> > "log.retention.ms"
> > "log.retention.minutes"
> > "log.retention.hours"
> >
> > We should probably be having just the milliseconds.
> >
> > Thanks,
> >
> > Mayuresh
> >
> > On Fri, Jul 24, 2015 at 4:12 PM, JIEFU GONG  wrote:
> >
> > > Hi all,
> > >
> > > I have a few broad questions on how log deletion works, specifically in
> > > conjunction with the log.retention.time setting. Say I published some
> > > messages to some topics when the configuration was originally set to
> > > something like log.retention.hours=168 (default). If I publish these
> > > messages successfully, then later set the configuration to something
> like
> > > log.retention.minutes=1, are those logs supposed to persist for the
> > newest
> > > settings or the old settings? Right now my logs are refusing to delete
> > > themselves unless I specifically mark them for deletion -- is this the
> > > correct/anticipated/wanted behavior?
> > >
> > > Thanks for the help!
> > >
> > > --
> > >
> > > Jiefu Gong
> > > University of California, Berkeley | Class of 2017
> > > B.A Computer Science | College of Letters and Sciences
> > >
> > > jg...@berkeley.edu  | (925) 400-3427
> > >
> >
> >
> >
> > --
> > -Regards,
> > Mayuresh R. Gharat
> > (862) 250-7125
> >
>
>
>
> --
>
> Jiefu Gong
> University of California, Berkeley | Class of 2017
> B.A Computer Science | College of Letters and Sciences
>
> jg...@berkeley.edu  | (925) 400-3427
>



-- 
-Regards,
Mayuresh R. Gharat
(862) 250-7125


Re: Log Deletion Behavior

2015-07-24 Thread Mayuresh Gharat
No. This should not happen. At Linkedin we just use the log retention
hours. Try using that. Chang e it and bounce the broker. It should work.
Also looking back at the config's I am not sure why we had 3 different
configs for the same property :

"log.retention.ms"
"log.retention.minutes"
"log.retention.hours"

We should probably be having just the milliseconds.

Thanks,

Mayuresh

On Fri, Jul 24, 2015 at 4:12 PM, JIEFU GONG  wrote:

> Hi all,
>
> I have a few broad questions on how log deletion works, specifically in
> conjunction with the log.retention.time setting. Say I published some
> messages to some topics when the configuration was originally set to
> something like log.retention.hours=168 (default). If I publish these
> messages successfully, then later set the configuration to something like
> log.retention.minutes=1, are those logs supposed to persist for the newest
> settings or the old settings? Right now my logs are refusing to delete
> themselves unless I specifically mark them for deletion -- is this the
> correct/anticipated/wanted behavior?
>
> Thanks for the help!
>
> --
>
> Jiefu Gong
> University of California, Berkeley | Class of 2017
> B.A Computer Science | College of Letters and Sciences
>
> jg...@berkeley.edu  | (925) 400-3427
>



-- 
-Regards,
Mayuresh R. Gharat
(862) 250-7125


Re: Got conflicted ephemeral node exception for several hours

2015-07-12 Thread Mayuresh Gharat
That would solve this. But it looks like a work around. We need to check
why this happens exactly and get to the root cause. What do you think?
Getting to the root cause of this might be really useful.

Thanks,

Mayuresh

On Sun, Jul 12, 2015 at 8:45 PM, tao xiao  wrote:

> Restart the consumers does fix the issue. But since the zk retry is wrapped
> in an infinite loop it doesn't give a chance to consumer to respond it
> until some one notices and restarts. Why I suggest to have a maximum retry
> policy is if max retry is reached it can invoke a customer handler which I
> can then inject a restart call so that it can remedy itself without
> people's attention.
>
> On Mon, 13 Jul 2015 at 11:36 Jiangjie Qin 
> wrote:
>
> > Hi Tao,
> >
> > We see this error from time to time but did not think of this as a big
> > issue. Any reason it bothers you much?
> > I¹m not sure if throwing exception to user on this exception is a good
> > handling or not. What are user supposed to do in that case other than
> > retry?
> >
> > Thanks,
> >
> > Jiangjie (Becket) Qin
> >
> > On 7/12/15, 7:16 PM, "tao xiao"  wrote:
> >
> > >We saw the error again in our cluster.  Anyone has the same issue
> before?
> > >
> > >On Fri, 10 Jul 2015 at 13:26 tao xiao  wrote:
> > >
> > >> Bump the thread. Any help would be appreciated.
> > >>
> > >> On Wed, 8 Jul 2015 at 20:09 tao xiao  wrote:
> > >>
> > >>> Additional info
> > >>> Kafka version: 0.8.2.1
> > >>> zookeeper: 3.4.6
> > >>>
> > >>> On Wed, 8 Jul 2015 at 20:07 tao xiao  wrote:
> > >>>
> >  Hi team,
> > 
> >  I have 10 high level consumers connecting to Kafka and one of them
> > kept
> >  complaining "conflicted ephemeral node" for about 8 hours. The log
> was
> >  filled with below exception
> > 
> >  [2015-07-07 14:03:51,615] INFO conflict in
> >  /consumers/group/ids/test-1435856975563-9a9fdc6c data:
> > 
> >
> {"version":1,"subscription":{"test.*":1},"pattern":"white_list","timest
> > amp":"1436275631510"}
> >  stored data:
> > 
> >
> {"version":1,"subscription":{"test.*":1},"pattern":"white_list","timest
> > amp":"1436275558570"}
> >  (kafka.utils.ZkUtils$)
> >  [2015-07-07 14:03:51,616] INFO I wrote this conflicted ephemeral
> node
> > 
> >
> [{"version":1,"subscription":{"test.*":1},"pattern":"white_list","times
> > tamp":"1436275631510"}]
> >  at /consumers/group/ids/test-1435856975563-9a9fdc6c a while back in
> a
> >  different session, hence I will backoff for this node to be deleted
> by
> >  Zookeeper and retry (kafka.utils.ZkUtils$)
> > 
> >  In the meantime zookeeper reported below exception for the same time
> > span
> > 
> >  2015-07-07 22:45:09,687 [myid:3] - INFO  [ProcessThread(sid:3
> >  cport:-1)::PrepRequestProcessor@645] - Got user-level
> KeeperException
> >  when processing sessionid:0x44e657ff19c0019 type:create cxid:0x7a26
> >  zxid:0x3015f6e77 txntype:-1 reqpath:n/a Error
> >  Path:/consumers/group/ids/test-1435856975563-9a9fdc6c
> > Error:KeeperErrorCode
> >  = NodeExists for /consumers/group/ids/test-1435856975563-9a9fdc6c
> > 
> >  At the end zookeeper timed out the session and consumers triggered
> >  rebalance.
> > 
> >  I know that conflicted ephemeral node warning is to handle a
> zookeeper
> >  bug that session expiration and ephemeral node deletion are not done
> >  atomically but as indicated from zookeeper log the zookeeper never
> > got a
> >  chance to delete the ephemeral node which made me think that the
> > session
> >  was not expired at that time. And for some reason zookeeper fired
> > session
> >  expire event which subsequently invoked ZKSessionExpireListener.  I
> > was
> >  just wondering if anyone have ever encountered similar issue before
> > and
> >  what I can do at zookeeper side to prevent this?
> > 
> >  Another problem is that createEphemeralPathExpectConflictHandleZKBug
> >  call is wrapped in a while(true) loop which runs forever until the
> >  ephemeral node is created. Would it be better that we can employ an
> >  exponential retry policy with a max number of retries so that it
> has a
> >  chance to re-throw the exception back to caller and let caller
> handle
> > it in
> >  situation like above?
> > 
> > 
> >
> >
>



-- 
-Regards,
Mayuresh R. Gharat
(862) 250-7125


Re: Kafka High Level Consumer Message Loss?

2015-07-12 Thread Mayuresh Gharat
Can you confirm that you are not actually seeing the messages on the
lagging broker ?
Because if the Max Lag is 0 it should mean that consumer has read the
offsets till log end offset of the broker.

Thanks,

Mayuresh

On Fri, Jul 10, 2015 at 8:29 PM, Allen Wang 
wrote:

> We have two applications that consume all messages from one Kafka cluster.
> We found that the MessagesPerSec metric started to diverge after some time.
> One of them matches the MessagesInPerSec metric from the Kafka broker,
> while the other is lower than the broker metric and appears to have some
> message loss.
>
> Both of them have the same OwnedPartitionsCount.
>
> Both of them have 0 MaxLag.
>
> How is that possible? Anything we should look at? Is the MaxLag metric not
> telling the truth?
>
> Thanks,
> Allen
>



-- 
-Regards,
Mayuresh R. Gharat
(862) 250-7125


Re: Custom topic metadata

2015-07-12 Thread Mayuresh Gharat
Storing topic metadata in kafka brokers can be done in the same way as the
offsets topic in kafka I  think. It would be a good point for discussion.

Thanks,

Mayuresh

On Sun, Jul 12, 2015 at 8:25 PM, Jiangjie Qin 
wrote:

> Hi Stevo,
>
> Kafka does not support customized topic metadata. What we are doing now is
> having topic metadata store in a separate zookeeper structure. But I agree
> this is probably a good candidate feature to consider.
>
> Thanks,
>
> Jiangjie (Becket) Qin
>
> On 7/12/15, 4:16 PM, "Stevo Slavić"  wrote:
>
> >Hello Apache Kafka Community,
> >
> >Is it possible to store and retrieve additional custom topic metadata
> >along
> >with existing Kafka managed ones, using some Kafka API? If not would it be
> >a problem (e.g. for Kafka broker or some client APIs) if I was to
> >store/retrieve additional custom topic metadata using ZooKeeper API?
> >
> >Kind regards,
> >Stevo Slavic.
>
>


-- 
-Regards,
Mayuresh R. Gharat
(862) 250-7125


Re: Got conflicted ephemeral node exception for several hours

2015-07-12 Thread Mayuresh Gharat
Bouncing the consumers should solve this issue in most cases.

Thanks,

Mayuresh

On Sun, Jul 12, 2015 at 8:21 PM, Jiangjie Qin 
wrote:

> Hi Tao,
>
> We see this error from time to time but did not think of this as a big
> issue. Any reason it bothers you much?
> I¹m not sure if throwing exception to user on this exception is a good
> handling or not. What are user supposed to do in that case other than
> retry?
>
> Thanks,
>
> Jiangjie (Becket) Qin
>
> On 7/12/15, 7:16 PM, "tao xiao"  wrote:
>
> >We saw the error again in our cluster.  Anyone has the same issue before?
> >
> >On Fri, 10 Jul 2015 at 13:26 tao xiao  wrote:
> >
> >> Bump the thread. Any help would be appreciated.
> >>
> >> On Wed, 8 Jul 2015 at 20:09 tao xiao  wrote:
> >>
> >>> Additional info
> >>> Kafka version: 0.8.2.1
> >>> zookeeper: 3.4.6
> >>>
> >>> On Wed, 8 Jul 2015 at 20:07 tao xiao  wrote:
> >>>
>  Hi team,
> 
>  I have 10 high level consumers connecting to Kafka and one of them
> kept
>  complaining "conflicted ephemeral node" for about 8 hours. The log was
>  filled with below exception
> 
>  [2015-07-07 14:03:51,615] INFO conflict in
>  /consumers/group/ids/test-1435856975563-9a9fdc6c data:
> 
> {"version":1,"subscription":{"test.*":1},"pattern":"white_list","timest
> amp":"1436275631510"}
>  stored data:
> 
> {"version":1,"subscription":{"test.*":1},"pattern":"white_list","timest
> amp":"1436275558570"}
>  (kafka.utils.ZkUtils$)
>  [2015-07-07 14:03:51,616] INFO I wrote this conflicted ephemeral node
> 
> [{"version":1,"subscription":{"test.*":1},"pattern":"white_list","times
> tamp":"1436275631510"}]
>  at /consumers/group/ids/test-1435856975563-9a9fdc6c a while back in a
>  different session, hence I will backoff for this node to be deleted by
>  Zookeeper and retry (kafka.utils.ZkUtils$)
> 
>  In the meantime zookeeper reported below exception for the same time
> span
> 
>  2015-07-07 22:45:09,687 [myid:3] - INFO  [ProcessThread(sid:3
>  cport:-1)::PrepRequestProcessor@645] - Got user-level KeeperException
>  when processing sessionid:0x44e657ff19c0019 type:create cxid:0x7a26
>  zxid:0x3015f6e77 txntype:-1 reqpath:n/a Error
>  Path:/consumers/group/ids/test-1435856975563-9a9fdc6c
> Error:KeeperErrorCode
>  = NodeExists for /consumers/group/ids/test-1435856975563-9a9fdc6c
> 
>  At the end zookeeper timed out the session and consumers triggered
>  rebalance.
> 
>  I know that conflicted ephemeral node warning is to handle a zookeeper
>  bug that session expiration and ephemeral node deletion are not done
>  atomically but as indicated from zookeeper log the zookeeper never
> got a
>  chance to delete the ephemeral node which made me think that the
> session
>  was not expired at that time. And for some reason zookeeper fired
> session
>  expire event which subsequently invoked ZKSessionExpireListener.  I
> was
>  just wondering if anyone have ever encountered similar issue before
> and
>  what I can do at zookeeper side to prevent this?
> 
>  Another problem is that createEphemeralPathExpectConflictHandleZKBug
>  call is wrapped in a while(true) loop which runs forever until the
>  ephemeral node is created. Would it be better that we can employ an
>  exponential retry policy with a max number of retries so that it has a
>  chance to re-throw the exception back to caller and let caller handle
> it in
>  situation like above?
> 
> 
>
>


-- 
-Regards,
Mayuresh R. Gharat
(862) 250-7125


Re: Launch Kafka/Zookeeper servers without using command line

2015-07-10 Thread Mayuresh Gharat
Hi Jeff,

I haven't tried this out, but I am planning to. Just a quick question : We
have TestHarness in Kafka that brings up Kafka and Zookeeper and also tears
them down. Have you tried using it?

Thanks,

Mayuresh

On Fri, Jul 10, 2015 at 10:09 AM, Jeff Gong  wrote:

> To follow up and provide a little more context on my second bullet point,
> when I run any command for the first time on command line that requires
> connecting to this code instantiated ZK server I get the specific error:
>
> > bin/kafka-topics.sh --list --zookeeper localhost:2181
>
> java.net.ConnectException: Connection refused
>
> at sun.nio.ch.SocketChannelImpl.checkConnect(Native Method)
>
> at sun.nio.ch.SocketChannelImpl.finishConnect(SocketChannelImpl.java:739)
>
> at
>
> org.apache.zookeeper.ClientCnxnSocketNIO.doTransport(ClientCnxnSocketNIO.java:361)
>
> at org.apache.zookeeper.ClientCnxn$SendThread.run(ClientCnxn.java:1081)
>
>
> But after that it looks like a reconnect is attempted, which succeeds and
> I'll get a list of my topics or my console consumer will succeed, etc. If I
> were to run any of those commands a second time, this error then
> disappears. Anyone have any ideas?
>
> On Thu, Jul 9, 2015 at 4:20 PM, Jeff Gong  wrote:
>
> > Hey Gwen,
> >
> > Thanks for your response. The example was very helpful and exactly what I
> > was looking for! I went ahead and used that as an example and while most
> > things are working as I wanted, there are a few issues I'd like to follow
> > up on:
> >
> > 1. If I launch a Zookeeper/Kafka server from code instead of command
> line,
> > most of the helpful debugging/status messages will not be displayed -- is
> > there any way I can retain this functionality?
> >
> > 2. I created a scenario where I successfully launched a Zookeeper
> instance
> > from code, then connected to that Zookeeper server using a Kafka Server
> via
> > command line. I then instantiated both a Kafka-Console-Consumer and
> > Kafka-Console-Producer with the topic 'test'. I was able to send messages
> > to the consumer as expected, but I noticed that when I try to connect the
> > console consumer to the server I always get a 'Connection Refused' error
> to
> > my Kafka Broker. Any ideas why this might be occuring?
> >
> > Best,
> > Jeff
> >
> >
> >
> > On Tue, Jul 7, 2015 at 10:43 AM, Jeff Gong  wrote:
> >
> >> Hi all,
> >>
> >> Is it possible to launch Kafka/Zookeeper servers via some part of the
> >> API? I noticed in this:
> >>
> http://stackoverflow.com/questions/16946778/how-can-we-create-a-topic-in-kafka-from-the-ide-using-api/
> >> that it seems to be possible to create a ZK instance and add a topic to
> it,
> >> but is this the same as launching the ZK server as we would in the
> command
> >> line?
> >>
> >> Similarly, would it be possible to do this for a Kafka server and have
> it
> >> connect to an existing ZK connection?
> >>
> >> Thanks!
> >>
> >
> >
>



-- 
-Regards,
Mayuresh R. Gharat
(862) 250-7125


Re: Which Kafka Consumer should I use?

2015-05-18 Thread Mayuresh Gharat
I suppose you can either use the High Level Consumer or the simple consumer
as per your use case. Simple Consumer does not do offset management and you
will have to do it yourself but provides more control. High level consumer
on the other hand does the offset management for you.
The new Kafka Consumer is not yet 100 % ready I suppose. I would suggest
using the High Level Cosnumer.

Thanks,

Mayuresh

On Mon, May 18, 2015 at 9:10 PM, luo.fucong  wrote:

> I want to rebuild my message queue from based on Redis to Kafka (because
> Redis persistence is terrible).
>
> Now I am using the new KafkaProducer, and I am wondering which consumer
> should I use.
>
> Currently there are 3 Kafka Consumers (until 0.8.2.1):
>
> 1. High Level Consumer
> 2. Simple Consumer
> 3. KafkaConsumer (new introduced since 0.8)
>
> The High Level Consumer API looks pretty simple. It does a lot of complex
> work like failure handling and partitions rebalancing underneath user’s
> codes. This is great and I suppose it’s robust enough because I have
> already saw some projects using it. However, I can do little besides poll
> messages and commit offsets, so sometimes I feel that my hands are tied.
>
> The Simple Consumer, on the other side, offers too much options over
> consumers control, which make it hard to write consumer codes.
>
> There’s a new KafkaConsumer in the new Kafka clients maven release:
> http://kafka.apache.org/083/javadoc/org/apache/kafka/clients/consumer/KafkaConsumer.html
> <
> http://kafka.apache.org/083/javadoc/org/apache/kafka/clients/consumer/KafkaConsumer.html
> >
> The new KafkaConsumer seems very moderate, it offers more control options
> over High Level Consumer, and hides some details than the Simple Consumer.
> So I want to know is this KafkaConsumer ready for production use? Or will
> it change it’s API in the future? And why is there no documents on this new
> consumer in the Kafka’s main document page? Is it not robust enough?
>
> Thanks in advance!
>




-- 
-Regards,
Mayuresh R. Gharat
(862) 250-7125


Re: Getting NotLeaderForPartitionException in kafka broker

2015-05-14 Thread Mayuresh Gharat
Can you try bouncing that broker?

Thanks,

Mayuresh

On Thu, May 14, 2015 at 3:43 AM, tao xiao  wrote:

> Yes, it does exist in ZK and the node that had the
> NotLeaderForPartitionException
> is the leader of the topic
>
> On Thu, May 14, 2015 at 6:12 AM, Jiangjie Qin 
> wrote:
>
> > Does this topic exist in Zookeeper?
> >
> > On 5/12/15, 11:35 PM, "tao xiao"  wrote:
> >
> > >Hi,
> > >
> > >Any updates on this issue? I keep seeing this issue happening over and
> > >over
> > >again
> > >
> > >On Thu, May 7, 2015 at 7:28 PM, tao xiao  wrote:
> > >
> > >> Hi team,
> > >>
> > >> I have a 12 nodes cluster that has 800 topics and each of which has
> > >>only 1
> > >> partition. I observed that one of the node keeps generating
> > >> NotLeaderForPartitionException that causes the node to be unresponsive
> > >>to
> > >> all requests. Below is the exception
> > >>
> > >> [2015-05-07 04:16:01,014] ERROR [ReplicaFetcherThread-1-12], Error for
> > >> partition [topic1,0] to broker 12:class
> > >> kafka.common.NotLeaderForPartitionException
> > >> (kafka.server.ReplicaFetcherThread)
> > >>
> > >> All other nodes in the cluster generate lots of replication error too
> as
> > >> shown below due to unresponsiveness of above node.
> > >>
> > >> [2015-05-07 04:17:34,917] WARN [Replica Manager on Broker 1]: Fetch
> > >> request with correlation id 3630911 from client
> > >>ReplicaFetcherThread-0-1 on
> > >> partition [topic1,0] failed due to Leader not local for partition
> > >> [cg22_user.item_attr_info.lcr,0] on broker 1
> > >>(kafka.server.ReplicaManager)
> > >>
> > >> Any suggestion why the node runs into the unstable stage and any
> > >> configuration I can set to prevent this?
> > >>
> > >> I use kafka 0.8.2.1
> > >>
> > >> And here is the server.properties
> > >>
> > >>
> > >> broker.id=5
> > >> port=9092
> > >> num.network.threads=3
> > >> num.io.threads=8
> > >> socket.send.buffer.bytes=1048576
> > >> socket.receive.buffer.bytes=1048576
> > >> socket.request.max.bytes=104857600
> > >> log.dirs=/mnt/kafka
> > >> num.partitions=1
> > >> num.recovery.threads.per.data.dir=1
> > >> log.retention.hours=1
> > >> log.segment.bytes=1073741824
> > >> log.retention.check.interval.ms=30
> > >> log.cleaner.enable=false
> > >> zookeeper.connect=ip:2181
> > >> zookeeper.connection.timeout.ms=6000
> > >> unclean.leader.election.enable=false
> > >> delete.topic.enable=true
> > >> default.replication.factor=3
> > >> num.replica.fetchers=3
> > >> delete.topic.enable=true
> > >> kafka.metrics.reporters=report.KafkaMetricsCollector
> > >> straas.hubble.conf.file=/etc/kafka/report.conf
> > >>
> > >>
> > >>
> > >>
> > >> --
> > >> Regards,
> > >> Tao
> > >>
> > >
> > >
> > >
> > >--
> > >Regards,
> > >Tao
> >
> >
>
>
> --
> Regards,
> Tao
>



-- 
-Regards,
Mayuresh R. Gharat
(862) 250-7125


Re: Hitting integer limit when setting log segment.bytes

2015-05-13 Thread Mayuresh Gharat
I suppose it is way log management works in kafka.
I am not sure the exact reason for this. Also the index files that are
constructed have a mapping of relative offset to the base offset of log
file to the real offset. The key value in index file is of the form
.


Thanks,

Mayuresh


On Wed, May 13, 2015 at 5:57 PM, Lance Laursen 
wrote:

> Hey folks,
>
> Any update on this?
>
> On Thu, Apr 30, 2015 at 5:34 PM, Lance Laursen <
> llaur...@rubiconproject.com>
> wrote:
>
> > Hey all,
> >
> > I am attempting to create a topic which uses 8GB log segment sizes, like
> > so:
> > ./kafka-topics.sh --zookeeper localhost:2181 --create --topic
> perftest6p2r
> > --partitions 6 --replication-factor 2 --config max.message.bytes=655360
> > --config segment.bytes=8589934592
> >
> > And am getting the following error:
> > Error while executing topic command For input string: "8589934592"
> > java.lang.NumberFormatException: For input string: "8589934592"
> > at
> >
> java.lang.NumberFormatException.forInputString(NumberFormatException.java:65)
> > at java.lang.Integer.parseInt(Integer.java:583)
> > ...
> > ...
> >
> > Upon further testing with --alter topic, it would appear that
> > segment.bytes will not accept a value higher than 2,147,483,647, which is
> > the upper limit for a signed 32bit int. This then restricts log segment
> > size to an upper limit of ~2GB.
> >
> > We run Kafka on hard drive dense machines, each with 10gbit uplinks. We
> > can set ulimits higher in order to deal with all the open file handles
> > (since Kafka keeps all log segment file handles open), but it would be
> > preferable to minimize this number, as well as minimize the amount of log
> > segment rollover experienced at high traffic (ie: a rollover every 1-2
> > seconds or so when saturating 10gbe).
> >
> > Is there a reason (performance or otherwise) that a 32 bit integer is
> used
> > rather than something larger?
> >
> > Thanks,
> > -Lance
> >
> >
>
>
> --
>
> [image: elogo.png]
>
> Leading the Automation of Advertising
>
> LANCE LAURSEN | Systems Architect
>
> ••• (M) 310.903.0546
>
> 12181 BLUFF CREEK DRIVE, 4TH FLOOR, PLAYA VISTA, CA 90094
>
> RUBICONPROJECT.COM  | @RUBICONPROJECT
> 
>



-- 
-Regards,
Mayuresh R. Gharat
(862) 250-7125


Re: New Producer Async - Metadata Fetch Timeout

2015-05-13 Thread Mayuresh Gharat
By application rebooting, do you mean you bounce the brokers?

Thanks,

Mayuresh

On Wed, May 13, 2015 at 4:06 AM, Mohit Gupta 
wrote:

> Thanks Jiangjie. This is helpful.
>
> Adding to what you have mentioned, I can think of one more scenario which
> may not be very rare.
> Say, the application is rebooted and the Kafka brokers registered in the
> producer are not reachable ( could be due to network issues or those
> brokers are actually down ).  Since, no metadata is available the send will
> block. Right?
>
> On Wed, May 13, 2015 at 10:51 AM, Jiangjie Qin 
> wrote:
>
> >
> > Application will not block on each metadata refresh or metadata is
> > expired.
> > Application will only be blocked when
> > 1. It sends the first message to a topic (only for that single message),
> or
> > 2. The topic has been deleted from broker thus refreshed metadata loses
> > the topic info (which is pretty rare).
> >
> > So I think the async here might mean a little bit different. It means
> when
> > you send first message to a topic, you wait till you know the topic
> exist,
> > after that point it is async.
> > It is very low chance that your application will block on send. If it is
> > then something probably really went wrong and needs immediate attention.
> >
> > Thanks.
> >
> > Jiangjie (Becket) Qin
> >
> > On 5/12/15, 5:08 PM, "Rendy Bambang Junior" 
> > wrote:
> >
> > >Thank you for the clarification.
> > >
> > >I think I agree with Mohit. Sometime blocking on logging is not
> acceptable
> > >by nature of application who uses kafka.
> > >
> > >Yes it is not blocking when metadata is still available. But application
> > >will be blocked once metada is expired.
> > >
> > >It might be handled by application, by implementing async call when do
> > >send() and manage buffer and async timeout internally, but it makes
> async
> > >feature in kafka producer has less meaning.
> > >
> > >Sorry if my understanding is incorrect.
> > >
> > >Rendy
> > >On May 13, 2015 6:59 AM, "Jiangjie Qin" 
> > wrote:
> > >
> > >> Send() will only block if the metadata is *not available* for the
> topic.
> > >> It won't block if metadata there is stale. The metadata refresh is
> async
> > >> to send(). However, if you send the message to a topic for the first
> > >>time,
> > >> send() will trigger a metadata refresh and block until it has metadata
> > >>for
> > >> that topic.
> > >>
> > >> Jiangjie (Becket) Qin
> > >>
> > >> On 5/12/15, 12:58 PM, "Magnus Edenhill"  wrote:
> > >>
> > >> >I completely agree with Mohit, an application should not have to know
> > >>or
> > >> >care about
> > >> >producer implementation internals.
> > >> >Given a message and its delivery constraints (produce retry count and
> > >> >timeout) the producer
> > >> >should hide any temporal failures until the message is succesfully
> > >> >delivered, a permanent
> > >> >error is encountered or the constraints are hit.
> > >> >This should also include internal start up sequencing, such as
> metadata
> > >> >retrieval.
> > >> >
> > >> >
> > >> >
> > >> >2015-05-12 21:22 GMT+02:00 Mohit Gupta <
> success.mohit.gu...@gmail.com
> > >:
> > >> >
> > >> >> I could not follow the reasoning behind blocking the send method if
> > >>the
> > >> >> metadata is not up-to-date. Though, I see that it as per design, it
> > >> >> requires the metadata to batch the message into appropriate
> > >> >>topicPartition
> > >> >> queue. Also, if the metadata could not be updated in the specified
> > >> >> interval, it throws an exception and the message is not queued to
> be
> > >> >> retried once the brokers are up.
> > >> >>
> > >> >> Should it not be that messages are buffered in another queue (
> up-to
> > >>a
> > >> >> limit ) if the brokers are down and retried later?
> > >> >> Is it not a general use case to require producer to be asynchronous
> > >>in
> > >> >>all
> > >> >> the scenarios?
> > >> >>
> > >> &g

Re: Compression and batching

2015-05-12 Thread Mayuresh Gharat
Well, the batch size is decided by the value set for the property :

 "batch.size";
 "The producer will attempt to batch records together into fewer requests
whenever multiple records are being sent to the same partition. This helps
performance on both the client and the server. This configuration controls
the  default batch size in bytes. No attempt will be made to batch records
larger than this size. Requests sent to brokers will contain multiple
batches, one for each partition with data available to be sent. A small
batch size will make batching less common and may reduce throughput (a
batch size of zero will disable batching entirely). A very large batch size
may use memory a bit more wastefully as we will always allocate a buffer of
the specified batch size in anticipation of additional records."

Also it may happen that message size may increase due to compression which
is kind of weird.

Thanks,

Mayuresh

On Tue, May 12, 2015 at 4:40 PM, Jamie X  wrote:

> Hi,
>
> I'm wondering when you call kafka.javaapi.Producer.send() with a list of
> messages, and also have compression on (snappy in this case), how does it
> decide how many messages to put together as one?
>
> The reason I'm asking is that even though my messages are only 70kb
> uncompressed, the broker complains that I'm hitting the 1mb message limit
> such as:
>
>
> kafka.common.MessageSizeTooLargeException: Message size is 1035608 bytes
> which exceeds the maximum configured message size of 112.
> at
> kafka.log.Log$$anonfun$analyzeAndValidateMessageSet$1.apply(Log.scala:378)
> at
> kafka.log.Log$$anonfun$analyzeAndValidateMessageSet$1.apply(Log.scala:361)
> at scala.collection.Iterator$class.foreach(Iterator.scala:727)
> at kafka.utils.IteratorTemplate.foreach(IteratorTemplate.scala:32)
> at kafka.log.Log.analyzeAndValidateMessageSet(Log.scala:361)
> at kafka.log.Log.append(Log.scala:257)
> at
>
> kafka.cluster.Partition$$anonfun$appendMessagesToLeader$1.apply(Partition.scala:379)
> at
>
> kafka.cluster.Partition$$anonfun$appendMessagesToLeader$1.apply(Partition.scala:365)
> at kafka.utils.Utils$.inLock(Utils.scala:535)
> at kafka.utils.Utils$.inReadLock(Utils.scala:541)
> at
> kafka.cluster.Partition.appendMessagesToLeader(Partition.scala:365)
> at
>
> kafka.server.KafkaApis$$anonfun$appendToLocalLog$2.apply(KafkaApis.scala:291)
> at
>
> kafka.server.KafkaApis$$anonfun$appendToLocalLog$2.apply(KafkaApis.scala:282)
> at
>
> scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:244)
> at
>
> scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:244)
> at
> scala.collection.mutable.HashMap$$anonfun$foreach$1.apply(HashMap.scala:98)
> at
> scala.collection.mutable.HashMap$$anonfun$foreach$1.apply(HashMap.scala:98)
> at
> scala.collection.mutable.HashTable$class.foreachEntry(HashTable.scala:226)
> at scala.collection.mutable.HashMap.foreachEntry(HashMap.scala:39)
> at scala.collection.mutable.HashMap.foreach(HashMap.scala:98)
>
> Thanks,
> Jamie
>



-- 
-Regards,
Mayuresh R. Gharat
(862) 250-7125


Re: Kafka 0.8.2.1 - Listing partitions owned by consumers

2015-05-12 Thread Mayuresh Gharat
Well, there is no separate tool available for importing and exporting
offsets from kafka, which will also provide this functionality. We are
working on it.

You can try the consumerOffsetChecker as Aditya mentioned.

Thanks,

Mayuresh

On Tue, May 12, 2015 at 8:11 PM, Aditya Auradkar <
aaurad...@linkedin.com.invalid> wrote:

> Perhaps you could try the ConsumerOffsetChecker. The "Owner" field might
> be what you want..
>
> Aditya
>
> 
> From: Bharath Srinivasan [bharath...@gmail.com]
> Sent: Tuesday, May 12, 2015 7:29 PM
> To: users@kafka.apache.org
> Subject: Kafka 0.8.2.1 - Listing partitions owned by consumers
>
> Hi,
>
> For monitorting purposes, is there a way to find the partitions for a topic
> that are assigned to consumers in a group? We are using high level consumer
> and the offsets are stored in kafka.
>
> Tried searching for methods in ZKUtils, but could not find anything that
> gives this information. Any pointers is appreciated.
>
> Thanks.
>



-- 
-Regards,
Mayuresh R. Gharat
(862) 250-7125


Re: New Producer Async - Metadata Fetch Timeout

2015-05-12 Thread Mayuresh Gharat
The way it works I suppose is that, the producer will do fetchMetadata, if
the last fetched metadata is stale (the refresh interval has expired) or if
it is not able to send data to a particular broker in its current metadata
(This might happen in some cases like if the leader moves).

It cannot produce without having the right metadata.

Thanks,

Mayuresh

On Tue, May 12, 2015 at 10:09 AM, Jiangjie Qin 
wrote:

> That¹s right. Send() will first try to get metadata of a topic, that is a
> blocking operation.
>
> On 5/12/15, 2:48 AM, "Rendy Bambang Junior" 
> wrote:
>
> >Hi, sorry if my understanding is incorrect.
> >
> >I am integrating kafka producer with application, when i try to shutdown
> >all kafka broker (preparing for prod env) I notice that 'send' method is
> >blocking.
> >
> >Is new producer fetch metadata not async?
> >
> >Rendy
>
>


-- 
-Regards,
Mayuresh R. Gharat
(862) 250-7125


Re: Kafka consumer offset checker hangs indefinitely

2015-05-11 Thread Mayuresh Gharat
Hi Meghana,

Let me try this out on my cluster that has latest trunk deployed.

Thanks,

Mayuresh

On Mon, May 11, 2015 at 1:53 PM, Meghana Narasimhan <
mnarasim...@bandwidth.com> wrote:

> Hi Mayuresh,
> A small update. The Kafka version I'm currently using is  2.10-0.8.2.1 (not
> 2.11 as previously mentioned). The cluster looks fine. Not sure why the
> consumer offset checker does not return a valid output and gets stuck.
>
> bin/kafka-topics.sh --describe --zookeeper localhost:2181 --topic test
> Topic:test   PartitionCount:3ReplicationFactor:3
> Configs:min.insync.replicas=2
> Topic: test  Partition: 0Leader: 1   Replicas: 1,2,0 Isr:
> 1,2,0
> Topic: test  Partition: 1Leader: 2   Replicas: 2,0,1 Isr:
> 1,2,0
> Topic: test  Partition: 2Leader: 0   Replicas: 0,1,2 Isr:
> 1,2,0
>
>
>
>
> On Fri, May 8, 2015 at 12:52 PM, Meghana Narasimhan <
> mnarasim...@bandwidth.com> wrote:
>
> > Hi Mayuresh,
> >
> > Yes, the broker is up and accepting connections. Multiple consumers are
> > consuming off topics on the broker.
> > Also I am seeing the issue only with this particular version (
> > 2.11-0.8.2.1). It worked fine with the beta that I was using earlier.
> >
> >
> > On Fri, May 8, 2015 at 12:45 PM, Mayuresh Gharat <
> > gharatmayures...@gmail.com> wrote:
> >
> >> Is X.X.X.X:9092 up and accepting connections?
> >> I am confused aas in why is it not connecting some other broker if
> >> connection to this broker fails. Can you check if the broker is up?
> >>
> >> The way it works is the consumer will send a ConsumerMetadataRequest to
> >> one
> >> of the brokers and get the offsetmanager for its group and then perform
> >> the
> >> offset management.
> >>
> >> Thanks,
> >>
> >> Mayuresh
> >>
> >> On Fri, May 8, 2015 at 9:22 AM, Meghana Narasimhan <
> >> mnarasim...@bandwidth.com> wrote:
> >>
> >> > Hi,
> >> > I'm using the Kafka 8.2.1 version(kafka_2.11-0.8.2.1) and the consumer
> >> > offset checker hangs indefinitely and does not return any results. I
> >> > enabled the debug for tools and below is the debug statements as seen
> on
> >> > the stdout. Any thoughts or inputs on this will be much appreciated.
> >> >
> >> > command used :
> >> > bin/kafka-consumer-offset-checker.sh --zookeeper localhost:2181
> --group
> >> > test-consumer-group
> >> >  or
> >> > ./kafka-consumer-offset-checker.sh --zookeeper
> >> > broker1:2181,broker2:2181,broker3:2181 --group test-consumer-group
> >> >
> >> >  DEBUG Querying X.X.X.X:9092 to locate offset manager for
> >> > test-consumer-group. (kafka.client.ClientUtils$)
> >> > [2015-05-08 10:23:55,090] DEBUG Consumer metadata response:
> >> > ConsumerMetadataResponse(None,15,0) (kafka.client.ClientUtils$)
> >> > [2015-05-08 10:23:55,091] DEBUG Query to X.X.X.X:9092 to locate offset
> >> > manager for test-consumer-group failed - will retry in 3000
> >> milliseconds.
> >> > (kafka.client.ClientUtils$)
> >> > [2015-05-08 10:23:58,093] DEBUG Querying X.X.X.X:9092 to locate offset
> >> > manager for test-consumer-group. (kafka.client.ClientUtils$)
> >> > [2015-05-08 10:23:58,102] DEBUG Consumer metadata response:
> >> > ConsumerMetadataResponse(None,15,0) (kafka.client.ClientUtils$)
> >> > [2015-05-08 10:23:58,103] DEBUG Query to X.X.X.X:9092 to locate offset
> >> > manager for test-consumer-group failed - will retry in 3000
> >> milliseconds.
> >> > (kafka.client.ClientUtils$)
> >> > [2015-05-08 10:24:01,107] DEBUG Querying X.X.X.X:9092 to locate offset
> >> > manager for test-consumer-group. (kafka.client.ClientUtils$)
> >> > [2015-05-08 10:24:01,115] DEBUG Consumer metadata response:
> >> > ConsumerMetadataResponse(None,15,0) (kafka.client.ClientUtils$)
> >> > [2015-05-08 10:24:01,116] DEBUG Query to X.X.X.X:9092 to locate offset
> >> > manager for test-consumer-group failed - will retry in 3000
> >> milliseconds.
> >> > (kafka.client.ClientUtils$)
> >> > [2015-05-08 10:24:04,119] DEBUG Querying X.X.X.X:9092 to locate offset
> >> > manager for test-consumer-group. (kafka.client.ClientUtils$)
> >> > [2015-05-08 10:24:04,124] DEBUG Consumer metadata response:
> >> > ConsumerMetadataResponse(None,15,0) (kafka.cl

Re: New Java producer broker metadata update stuck

2015-05-08 Thread Mayuresh Gharat
Also it would be great to know if you see the same issue when you don't
have different ip -> broker id mapping. Also it would be great if you can
explain " different ip -> broker id mapping" mean as Becket said.

Thanks,

Mayuresh

On Fri, May 8, 2015 at 9:48 AM, Mayuresh Gharat 
wrote:

> It should do a updateMetadataRequest in case it gets NOT_LEADER_FOR
> PARTITION. This looks like a bug.
>
> Thanks,
>
> Mayuresh
>
> On Fri, May 8, 2015 at 8:53 AM, Dan  wrote:
>
>> Hi,
>>
>> We've noticed an issue on our staging environment where all 3 of our Kafka
>> hosts shutdown and came back with a different ip -> broker id mapping. I
>> know this is not good and we're fixing that separately. But what we
>> noticed
>> is all the consumers recovered but the producers got stuck with the
>> following exceptions:
>>
>> WARN  2015-05-08 09:19:56,347
>> org.apache.kafka.clients.producer.internals.Sender: Got error produce
>> response with correlation id 3544968 on topic-partition
>> samza-metrics-0, retrying (2145750068 attempts left). Error:
>> NOT_LEADER_FOR_PARTITION
>> WARN  2015-05-08 09:19:56,448
>> org.apache.kafka.clients.producer.internals.Sender: Got error produce
>> response with correlation id 3544970 on topic-partition
>> samza-metrics-0, retrying (2145750067 attempts left). Error:
>> NOT_LEADER_FOR_PARTITION
>> WARN  2015-05-08 09:19:56,549
>> org.apache.kafka.clients.producer.internals.Sender: Got error produce
>> response with correlation id 3544972 on topic-partition
>> samza-metrics-0, retrying (2145750066 attempts left). Error:
>> NOT_LEADER_FOR_PARTITION
>> WARN  2015-05-08 09:19:56,649
>> org.apache.kafka.clients.producer.internals.Sender: Got error produce
>> response with correlation id 3544974 on topic-partition
>> samza-metrics-0, retrying (2145750065 attempts left). Error:
>> NOT_LEADER_FOR_PARTITION
>> WARN  2015-05-08 09:19:56,749
>> org.apache.kafka.clients.producer.internals.Sender: Got error produce
>> response with correlation id 3544976 on topic-partition
>> samza-metrics-0, retrying (2145750064 attempts left). Error:
>> NOT_LEADER_FOR_PARTITION
>> WARN  2015-05-08 09:19:56,850
>> org.apache.kafka.clients.producer.internals.Sender: Got error produce
>> response with correlation id 3544978 on topic-partition
>> samza-metrics-0, retrying (2145750063 attempts left). Error:
>> NOT_LEADER_FOR_PARTITION
>> WARN  2015-05-08 09:19:56,949
>> org.apache.kafka.clients.producer.internals.Sender: Got error produce
>> response with correlation id 3544980 on topic-partition
>> samza-metrics-0, retrying (2145750062 attempts left). Error:
>> NOT_LEADER_FOR_PARTITION
>> WARN  2015-05-08 09:19:57,049
>> org.apache.kafka.clients.producer.internals.Sender: Got error produce
>> response with correlation id 3544982 on topic-partition
>> samza-metrics-0, retrying (2145750061 attempts left). Error:
>> NOT_LEADER_FOR_PARTITION
>> WARN  2015-05-08 09:19:57,150
>> org.apache.kafka.clients.producer.internals.Sender: Got error produce
>> response with correlation id 3544984 on topic-partition
>> samza-metrics-0, retrying (2145750060 attempts left). Error:
>> NOT_LEADER_FOR_PARTITION
>> WARN  2015-05-08 09:19:57,254
>> org.apache.kafka.clients.producer.internals.Sender: Got error produce
>> response with correlation id 3544986 on topic-partition
>> samza-metrics-0, retrying (2145750059 attempts left). Error:
>> NOT_LEADER_FOR_PARTITION
>> WARN  2015-05-08 09:19:57,351
>> org.apache.kafka.clients.producer.internals.Sender: Got error produce
>> response with correlation id 3544988 on topic-partition
>> samza-metrics-0, retrying (2145750058 attempts left). Error:
>> NOT_LEADER_FOR_PARTITION
>> WARN  2015-05-08 09:19:57,454
>> org.apache.kafka.clients.producer.internals.Sender: Got error produce
>> response with correlation id 3544990 on topic-partition
>> samza-metrics-0, retrying (2145750057 attempts left). Error:
>> NOT_LEADER_FOR_PARTITION
>>
>>
>> So it appears as if the producer did not refresh the metadata once the
>> brokers had come back up. The exceptions carried on for a few hours until
>> we restarted them.
>>
>> We noticed this in both 0.8.2.1 Java clients and via, Kakfa-rest
>> https://github.com/confluentinc/kafka-rest which is using 0.8.2.0-cp.
>>
>> Is this a known issue when all brokers go away, or is it a subtle bug
>> we've
>> hit?
>>
>> Thanks,
>> Dan
>>
>
>
>
> --
> -Regards,
> Mayuresh R. Gharat
> (862) 250-7125
>



-- 
-Regards,
Mayuresh R. Gharat
(862) 250-7125


Re: New Java producer broker metadata update stuck

2015-05-08 Thread Mayuresh Gharat
It should do a updateMetadataRequest in case it gets NOT_LEADER_FOR
PARTITION. This looks like a bug.

Thanks,

Mayuresh

On Fri, May 8, 2015 at 8:53 AM, Dan  wrote:

> Hi,
>
> We've noticed an issue on our staging environment where all 3 of our Kafka
> hosts shutdown and came back with a different ip -> broker id mapping. I
> know this is not good and we're fixing that separately. But what we noticed
> is all the consumers recovered but the producers got stuck with the
> following exceptions:
>
> WARN  2015-05-08 09:19:56,347
> org.apache.kafka.clients.producer.internals.Sender: Got error produce
> response with correlation id 3544968 on topic-partition
> samza-metrics-0, retrying (2145750068 attempts left). Error:
> NOT_LEADER_FOR_PARTITION
> WARN  2015-05-08 09:19:56,448
> org.apache.kafka.clients.producer.internals.Sender: Got error produce
> response with correlation id 3544970 on topic-partition
> samza-metrics-0, retrying (2145750067 attempts left). Error:
> NOT_LEADER_FOR_PARTITION
> WARN  2015-05-08 09:19:56,549
> org.apache.kafka.clients.producer.internals.Sender: Got error produce
> response with correlation id 3544972 on topic-partition
> samza-metrics-0, retrying (2145750066 attempts left). Error:
> NOT_LEADER_FOR_PARTITION
> WARN  2015-05-08 09:19:56,649
> org.apache.kafka.clients.producer.internals.Sender: Got error produce
> response with correlation id 3544974 on topic-partition
> samza-metrics-0, retrying (2145750065 attempts left). Error:
> NOT_LEADER_FOR_PARTITION
> WARN  2015-05-08 09:19:56,749
> org.apache.kafka.clients.producer.internals.Sender: Got error produce
> response with correlation id 3544976 on topic-partition
> samza-metrics-0, retrying (2145750064 attempts left). Error:
> NOT_LEADER_FOR_PARTITION
> WARN  2015-05-08 09:19:56,850
> org.apache.kafka.clients.producer.internals.Sender: Got error produce
> response with correlation id 3544978 on topic-partition
> samza-metrics-0, retrying (2145750063 attempts left). Error:
> NOT_LEADER_FOR_PARTITION
> WARN  2015-05-08 09:19:56,949
> org.apache.kafka.clients.producer.internals.Sender: Got error produce
> response with correlation id 3544980 on topic-partition
> samza-metrics-0, retrying (2145750062 attempts left). Error:
> NOT_LEADER_FOR_PARTITION
> WARN  2015-05-08 09:19:57,049
> org.apache.kafka.clients.producer.internals.Sender: Got error produce
> response with correlation id 3544982 on topic-partition
> samza-metrics-0, retrying (2145750061 attempts left). Error:
> NOT_LEADER_FOR_PARTITION
> WARN  2015-05-08 09:19:57,150
> org.apache.kafka.clients.producer.internals.Sender: Got error produce
> response with correlation id 3544984 on topic-partition
> samza-metrics-0, retrying (2145750060 attempts left). Error:
> NOT_LEADER_FOR_PARTITION
> WARN  2015-05-08 09:19:57,254
> org.apache.kafka.clients.producer.internals.Sender: Got error produce
> response with correlation id 3544986 on topic-partition
> samza-metrics-0, retrying (2145750059 attempts left). Error:
> NOT_LEADER_FOR_PARTITION
> WARN  2015-05-08 09:19:57,351
> org.apache.kafka.clients.producer.internals.Sender: Got error produce
> response with correlation id 3544988 on topic-partition
> samza-metrics-0, retrying (2145750058 attempts left). Error:
> NOT_LEADER_FOR_PARTITION
> WARN  2015-05-08 09:19:57,454
> org.apache.kafka.clients.producer.internals.Sender: Got error produce
> response with correlation id 3544990 on topic-partition
> samza-metrics-0, retrying (2145750057 attempts left). Error:
> NOT_LEADER_FOR_PARTITION
>
>
> So it appears as if the producer did not refresh the metadata once the
> brokers had come back up. The exceptions carried on for a few hours until
> we restarted them.
>
> We noticed this in both 0.8.2.1 Java clients and via, Kakfa-rest
> https://github.com/confluentinc/kafka-rest which is using 0.8.2.0-cp.
>
> Is this a known issue when all brokers go away, or is it a subtle bug we've
> hit?
>
> Thanks,
> Dan
>



-- 
-Regards,
Mayuresh R. Gharat
(862) 250-7125


Re: Kafka consumer offset checker hangs indefinitely

2015-05-08 Thread Mayuresh Gharat
Is X.X.X.X:9092 up and accepting connections?
I am confused aas in why is it not connecting some other broker if
connection to this broker fails. Can you check if the broker is up?

The way it works is the consumer will send a ConsumerMetadataRequest to one
of the brokers and get the offsetmanager for its group and then perform the
offset management.

Thanks,

Mayuresh

On Fri, May 8, 2015 at 9:22 AM, Meghana Narasimhan <
mnarasim...@bandwidth.com> wrote:

> Hi,
> I'm using the Kafka 8.2.1 version(kafka_2.11-0.8.2.1) and the consumer
> offset checker hangs indefinitely and does not return any results. I
> enabled the debug for tools and below is the debug statements as seen on
> the stdout. Any thoughts or inputs on this will be much appreciated.
>
> command used :
> bin/kafka-consumer-offset-checker.sh --zookeeper localhost:2181 --group
> test-consumer-group
>  or
> ./kafka-consumer-offset-checker.sh --zookeeper
> broker1:2181,broker2:2181,broker3:2181 --group test-consumer-group
>
>  DEBUG Querying X.X.X.X:9092 to locate offset manager for
> test-consumer-group. (kafka.client.ClientUtils$)
> [2015-05-08 10:23:55,090] DEBUG Consumer metadata response:
> ConsumerMetadataResponse(None,15,0) (kafka.client.ClientUtils$)
> [2015-05-08 10:23:55,091] DEBUG Query to X.X.X.X:9092 to locate offset
> manager for test-consumer-group failed - will retry in 3000 milliseconds.
> (kafka.client.ClientUtils$)
> [2015-05-08 10:23:58,093] DEBUG Querying X.X.X.X:9092 to locate offset
> manager for test-consumer-group. (kafka.client.ClientUtils$)
> [2015-05-08 10:23:58,102] DEBUG Consumer metadata response:
> ConsumerMetadataResponse(None,15,0) (kafka.client.ClientUtils$)
> [2015-05-08 10:23:58,103] DEBUG Query to X.X.X.X:9092 to locate offset
> manager for test-consumer-group failed - will retry in 3000 milliseconds.
> (kafka.client.ClientUtils$)
> [2015-05-08 10:24:01,107] DEBUG Querying X.X.X.X:9092 to locate offset
> manager for test-consumer-group. (kafka.client.ClientUtils$)
> [2015-05-08 10:24:01,115] DEBUG Consumer metadata response:
> ConsumerMetadataResponse(None,15,0) (kafka.client.ClientUtils$)
> [2015-05-08 10:24:01,116] DEBUG Query to X.X.X.X:9092 to locate offset
> manager for test-consumer-group failed - will retry in 3000 milliseconds.
> (kafka.client.ClientUtils$)
> [2015-05-08 10:24:04,119] DEBUG Querying X.X.X.X:9092 to locate offset
> manager for test-consumer-group. (kafka.client.ClientUtils$)
> [2015-05-08 10:24:04,124] DEBUG Consumer metadata response:
> ConsumerMetadataResponse(None,15,0) (kafka.client.ClientUtils$)
> [2015-05-08 10:24:04,126] DEBUG Query to X.X.X.X:9092 to locate offset
> manager for test-consumer-group failed - will retry in 3000 milliseconds.
> (kafka.client.ClientUtils$)
> [2015-05-08 10:24:04,993] DEBUG Got ping response for sessionid:
> 0x14d33e7fbc80002 after 3ms (org.apache.zookeeper.ClientCnxn)
> [2015-05-08 10:24:07,127] DEBUG Querying X.X.X.X:9092 to locate offset
> manager for test-consumer-group. (kafka.client.ClientUtils$)
> [2015-05-08 10:24:07,131] DEBUG Consumer metadata response:
> ConsumerMetadataResponse(None,15,0) (kafka.client.ClientUtils$)
> [2015-05-08 10:24:07,132] DEBUG Query to X.X.X.X:9092 to locate offset
> manager for test-consumer-group failed - will retry in 3000 milliseconds.
> (kafka.client.ClientUtils$)
> [2015-05-08 10:24:10,132] DEBUG Querying X.X.X.X:9092 to locate offset
> manager for test-consumer-group. (kafka.client.ClientUtils$)
> [2015-05-08 10:24:10,138] DEBUG Consumer metadata response:
> ConsumerMetadataResponse(None,15,0) (kafka.client.ClientUtils$)
> [2015-05-08 10:24:10,139] DEBUG Query to X.X.X.X:9092 to locate offset
> manager for test-consumer-group failed - will retry in 3000 milliseconds.
> (kafka.client.ClientUtils$)
> [2015-05-08 10:24:13,143] DEBUG Querying X.X.X.X:9092 to locate offset
> manager for test-consumer-group. (kafka.client.ClientUtils$)
> [2015-05-08 10:24:13,151] DEBUG Consumer metadata response:
> ConsumerMetadataResponse(None,15,0) (kafka.client.ClientUtils$)
> [2015-05-08 10:24:13,152] DEBUG Query to X.X.X.X:9092 to locate offset
> manager for test-consumer-group failed - will retry in 3000 milliseconds.
> (kafka.client.ClientUtils$)
>
> Thanks,
> Meghana
>



-- 
-Regards,
Mayuresh R. Gharat
(862) 250-7125


Re: New producer: metadata update problem on 2 Node cluster.

2015-05-05 Thread Mayuresh Gharat
I agree that to find the least Loaded node the producer should fall back to
the bootstrap nodes if its not able to connect to any nodes in the current
metadata. That should resolve this.

Rahul, I suppose the problem went off because the dead node in your case
might have came back up and allowed for a metadata update. Can you confirm
this?

Thanks,

Mayuresh

On Tue, May 5, 2015 at 5:10 AM, Rahul Jain  wrote:

> We observed the exact same error. Not very clear about the root cause
> although it appears to be related to leastLoadedNode implementation.
> Interestingly, the problem went away by increasing the value of
> reconnect.backoff.ms to 1000ms.
> On 29 Apr 2015 00:32, "Ewen Cheslack-Postava"  wrote:
>
> > Ok, all of that makes sense. The only way to possibly recover from that
> > state is either for K2 to come back up allowing the metadata refresh to
> > eventually succeed or to eventually try some other node in the cluster.
> > Reusing the bootstrap nodes is one possibility. Another would be for the
> > client to get more metadata than is required for the topics it needs in
> > order to ensure it has more nodes to use as options when looking for a
> node
> > to fetch metadata from. I added your description to KAFKA-1843, although
> it
> > might also make sense as a separate bug since fixing it could be
> considered
> > incremental progress towards resolving 1843.
> >
> > On Tue, Apr 28, 2015 at 9:18 AM, Manikumar Reddy 
> > wrote:
> >
> > > Hi Ewen,
> > >
> > >  Thanks for the response.  I agree with you, In some case we should use
> > > bootstrap servers.
> > >
> > >
> > > >
> > > > If you have logs at debug level, are you seeing this message in
> between
> > > the
> > > > connection attempts:
> > > >
> > > > Give up sending metadata request since no node is available
> > > >
> > >
> > >  Yes, this log came for couple of times.
> > >
> > >
> > > >
> > > > Also, if you let it continue running, does it recover after the
> > > > metadata.max.age.ms timeout?
> > > >
> > >
> > >  It does not reconnect.  It is continuously trying to connect with dead
> > > node.
> > >
> > >
> > > -Manikumar
> > >
> >
> >
> >
> > --
> > Thanks,
> > Ewen
> >
>



-- 
-Regards,
Mayuresh R. Gharat
(862) 250-7125


Re: Kafka behind AWS ELB

2015-05-04 Thread Mayuresh Gharat
Ok. You can deploy kafka in AWS. You can have brokers on AWS servers.
Kafka is not a push system. So you will need someone writing to kafka and
consuming from kafka. It will work. My suggestion will be to try it out on
a smaller instance in AWS and see the effects.

As I do not know the actual use case about why you want to use kafka for, I
cannot comment on whether it will work for you personalized use case.

Thanks,

Mayuresh

On Mon, May 4, 2015 at 8:55 AM, Chandrashekhar Kotekar <
shekhar.kote...@gmail.com> wrote:

> I am sorry but I cannot reveal those details due to confidentiality issues.
> I hope you understand.
>
>
> Regards,
> Chandrash3khar Kotekar
> Mobile - +91 8600011455
>
> On Mon, May 4, 2015 at 9:18 PM, Mayuresh Gharat <
> gharatmayures...@gmail.com>
> wrote:
>
> > Hi Chandrashekar,
> >
> > Can you please elaborate the use case for Kafka here, like how you are
> > planning to use it.
> >
> >
> > Thanks,
> >
> > Mayuresh
> >
> > On Sat, May 2, 2015 at 9:08 PM, Chandrashekhar Kotekar <
> > shekhar.kote...@gmail.com> wrote:
> >
> > > Hi,
> > >
> > > I am new to Apache Kafka. I have played with it on my laptop.
> > >
> > > I want to use Kafka in AWS. Currently we have tomcat web servers based
> > REST
> > > API. We want to replace REST API with Apache Kafka, web servers are
> > behind
> > > ELB.
> > >
> > > I would like to know if we can keep Kafka brokers behind ELB? Will it
> > work?
> > >
> > > Regards,
> > > Chandrash3khar Kotekar
> > > Mobile - +91 8600011455
> > >
> >
> >
> >
> > --
> > -Regards,
> > Mayuresh R. Gharat
> > (862) 250-7125
> >
>



-- 
-Regards,
Mayuresh R. Gharat
(862) 250-7125


Re: When in-sync isn't in sync?

2015-05-04 Thread Mayuresh Gharat
FYI,

Basically, the ISR returned by topic metadata response is unreliable.

This is discussed in KAFKA-1367

Thanks,


Mayuresh

On Mon, May 4, 2015 at 8:09 AM, Steve Miller 
wrote:

> [ BTW, after some more research, I think what might be happening here is
> that we had some de-facto network partitioning happen as a side-effect of
> us renaming some network interfaces, though if that's the case, I'd like to
> know how to get everything back into sync. ]
>
>Hi.  I'm seeing something weird, where if I do a MetadataRequest, what
> I get back says I have out-of-sync replicas... but if I use kafka-topic.sh,
> it says I don't.  I'm running Kafka 0.8.1.1, still, for the moment, on Java
> 1.7.0_55.
>
>The code I have to do this uses kafka-python:
>
> ==
> #!/usr/bin/python
>
> import logging
> import signal
> import sys
>
> # Should use argparse, but we shouldn't use python 2.6, either...
> from optparse import OptionParser
>
> import simplejson as json
>
> from kafka.client import KafkaClient
> from kafka.protocol import KafkaProtocol
>
> #logging.basicConfig(level=logging.DEBUG)
>
> def main():
> parser = OptionParser()
> parser.add_option('-t', '--topic', dest='topic',
>   help='topic to which we should subscribe', default='mytopic')
> parser.add_option('-b', '--broker', dest='kafkaHost',
>   help='Kafka broker to which we should connect',
>   default='host309'-ilg1.rtc.vrsn.com')
>
> (options, args) = parser.parse_args()
>
> kafka = KafkaClient('%s:9092' % options.kafkaHost)
>
> # WARNING: terrible abuse of private methods follows.
>
> id = kafka._next_id()
>
> request = KafkaProtocol.encode_metadata_request(kafka.client_id, id)
> response = kafka._send_broker_unaware_request(id, request)
>
> (brokers, topics) = KafkaProtocol.decode_metadata_response(response)
>
> if options.topic != '*':
> topics_we_want = [options.topic]
> else:
> topics_we_want = sorted(topics.keys())
>
> for topic in topics_we_want:
> for partition in sorted(topics[topic].keys()):
> meta = topics[topic][partition]
> delta = set(meta.replicas) - set(meta.isr)
> if len(delta) == 0:
> print 'topic', topic, 'partition', partition, 'leader',
> meta.leader, 'replicas', meta.replicas, 'isr', meta.isr
> else:
> print 'topic', topic, 'partition', partition, 'leader',
> meta.leader, 'replicas', meta.replicas, 'isr', meta.isr, 'OUT-OF-SYNC',
> delta
>
> sys.exit(0)
>
> if __name__ == "__main__":
> #logging.basicConfig(level=logging.DEBUG)
> main()
> ==
>
> And if I run that against "mytopic", I get:
>
> topic mytopic partition 0 leader 311 replicas (311, 323) isr (311, 323)
> topic mytopic partition 1 leader 323 replicas (323, 312) isr (312, 323)
> topic mytopic partition 2 leader 324 replicas (324, 313) isr (324, 313)
> topic mytopic partition 3 leader 309 replicas (309, 314) isr (314, 309)
> topic mytopic partition 4 leader 315 replicas (310, 315) isr (315,)
> OUT-OF-SYNC set([310])
> topic mytopic partition 5 leader 311 replicas (311, 316) isr (311, 316)
> topic mytopic partition 6 leader 312 replicas (312, 317) isr (317, 312)
> topic mytopic partition 7 leader 318 replicas (313, 318) isr (318, 313)
> topic mytopic partition 8 leader 314 replicas (314, 319) isr (314, 319)
> topic mytopic partition 9 leader 315 replicas (315, 320) isr (320, 315)
> topic mytopic partition 10 leader 316 replicas (316, 321) isr (316, 321)
> topic mytopic partition 11 leader 317 replicas (317, 322) isr (317, 322)
> topic mytopic partition 12 leader 318 replicas (318, 323) isr (318, 323)
> topic mytopic partition 13 leader 324 replicas (319, 324) isr (324,)
> OUT-OF-SYNC set([319])
> topic mytopic partition 14 leader 320 replicas (320, 309) isr (320, 309)
> topic mytopic partition 15 leader 321 replicas (321, 310) isr (321,)
> OUT-OF-SYNC set([310])
> topic mytopic partition 16 leader 312 replicas (312, 320) isr (312, 320)
> topic mytopic partition 17 leader 323 replicas (323, 313) isr (323, 313)
> topic mytopic partition 18 leader 324 replicas (324, 314) isr (314, 324)
> topic mytopic partition 19 leader 309 replicas (309, 315) isr (309, 315)
>
> but if I do:
>
> /opt/kafka/bin/kafka-topics.sh --describe --zookeeper host301:2181 --topic
> mytopic
>
> I get:
>
> Topic:mytopic   PartitionCount:20   ReplicationFactor:2
>  Configs:retention.bytes=1000
> Topic: mytopic  Partition: 0Leader: 311 Replicas: 311,323
>  Isr: 311,323
> Topic: mytopic  Partition: 1Leader: 323 Replicas: 323,312
>  Isr: 312,323
> Topic: mytopic  Partition: 2Leader: 324 Replicas: 324,313
>  Isr: 324,313
> Topic: mytopic  Partition: 3Leader: 309 Replicas: 309,314
>  Isr: 314,309
> Topic: mytopic  Partition: 4Leader: 315 Replicas: 310,315
>  Isr: 315,310
> Topic: mytopic  Partition: 5Lea

Re: Kafka behind AWS ELB

2015-05-04 Thread Mayuresh Gharat
Hi Chandrashekar,

Can you please elaborate the use case for Kafka here, like how you are
planning to use it.


Thanks,

Mayuresh

On Sat, May 2, 2015 at 9:08 PM, Chandrashekhar Kotekar <
shekhar.kote...@gmail.com> wrote:

> Hi,
>
> I am new to Apache Kafka. I have played with it on my laptop.
>
> I want to use Kafka in AWS. Currently we have tomcat web servers based REST
> API. We want to replace REST API with Apache Kafka, web servers are behind
> ELB.
>
> I would like to know if we can keep Kafka brokers behind ELB? Will it work?
>
> Regards,
> Chandrash3khar Kotekar
> Mobile - +91 8600011455
>



-- 
-Regards,
Mayuresh R. Gharat
(862) 250-7125


Re: Unclaimed partitions

2015-05-04 Thread Mayuresh Gharat
Hi Dave,

For simpleConsumer can you try using :

/**
   *  Fetch a set of messages from a topic.
   *
   *  @param request specifies the topic name, topic partition, starting
byte offset, maximum bytes to be fetched.
   *  @return a set of fetched messages
   */
  def fetch(request: kafka.javaapi.FetchRequest): FetchResponse = {
fetch(request.underlying)
  }

This takes a TopicPartition.

Thanks,

Mayuresh

On Mon, May 4, 2015 at 7:34 AM, Dave Hamilton 
wrote:

> Hi, this happens most times I restart the consumer group, but not every
> time. There are no log errors and nothing seems to be indicating that a
> rebalance is occurring. Here are the ZK logs I see on one of the processes
> that isn’t receiving partitions.
>
> 2015-05-04 13:55:32,365 [main] INFO  org.apache.zookeeper.ZooKeeper:438 -
> Initiating client connection, connectString=lxpkfkdal01.nanigans.com
> sessionTime
> out=400 watcher=org.I0Itec.zkclient.ZkClient@6971e8ba
> 2015-05-04 13:55:32,366 [main-SendThread(10.8.44.121:2181)] INFO
> org.apache.zookeeper.ClientCnxn:966 - Opening socket connection to server
> 10.8.44.121/10
> .8.44.121:2181. Will not attempt to authenticate using SASL (unknown error)
> 2015-05-04 13:55:32,367 [main-SendThread(10.8.44.121:2181)] INFO
> org.apache.zookeeper.ClientCnxn:849 - Socket connection established to
> 10.8.44.121/10.8.
> 44.121:2181, initiating session
> 2015-05-04 13:55:32,371 [main-SendThread(10.8.44.121:2181)] INFO
> org.apache.zookeeper.ClientCnxn:1207 - Session establishment complete on
> server 10.8.44.
> 121/10.8.44.121:2181, sessionid = 0x14691649cf75e2c, negotiated timeout =
> 4000
>
>
>
> Here is the output of the ConsumerOffsetChecker, note that 6 of the
> partitions are unclaimed:
>
>
> Group   Topic  Pid Offset
> logSize Lag Owner
> rtb_targeting_server compile_request0   328831805
>  328832108   303
>  rtb_targeting_server_lxptedal01.nanigans.com-1430747732348-fd8b839e-0
> rtb_targeting_server compile_request1   328680629
>  328680761   132
>  rtb_targeting_server_lxptedal01.nanigans.com-1430747732348-fd8b839e-1
> rtb_targeting_server compile_request2   328322706
>  328626882   304176  none
> rtb_targeting_server compile_request3   328397868
>  328703662   305794  none
> rtb_targeting_server compile_request4   328393846
>  328393923   77
> rtb_targeting_server_lxptedal02.nanigans.com-1430747790699-36d3501a-0
> rtb_targeting_server compile_request5   329085299
>  329085385   86
> rtb_targeting_server_lxptedal02.nanigans.com-1430747790699-36d3501a-1
> rtb_targeting_server compile_request6   328667153
>  328667153   0
>  rtb_targeting_server_lxptedal02.nanigans.com-1430747831428-55fd145a-0
> rtb_targeting_server compile_request7   328537143
>  328537272   129
>  rtb_targeting_server_lxptedal02.nanigans.com-1430747831428-55fd145a-1
> rtb_targeting_server compile_request8   328613787
>  328913671   299884  none
> rtb_targeting_server compile_request9   328212202
>  328516662   304460  none
> rtb_targeting_server compile_request10  329370706
>  329370951   245
>  rtb_targeting_server_lxptedal03.nanigans.com-1430747931179-ea46a266-0
> rtb_targeting_server compile_request11  328207478
>  328207705   227
>  rtb_targeting_server_lxptedal03.nanigans.com-1430747931179-ea46a266-1
> rtb_targeting_server compile_request12  328564790
>  328564790   0
>  rtb_targeting_server_lxptedal04.nanigans.com-1430747991705-492127bc-0
> rtb_targeting_server compile_request13  328473600
>  328473672   72
> rtb_targeting_server_lxptedal04.nanigans.com-1430747991705-492127bc-1
> rtb_targeting_server compile_request14  329088239
>  329088315   76
> rtb_targeting_server_lxptedal04.nanigans.com-1430748032481-7b5b56d7-0
> rtb_targeting_server compile_request15  328311986
>  328311986   0
>  rtb_targeting_server_lxptedal04.nanigans.com-1430748032481-7b5b56d7-1
> rtb_targeting_server compile_request16  328615462
>  328615497   35
> rtb_targeting_server_lxptedal05.nanigans.com-1430748084888-c523a089-0
> rtb_targeting_server compile_request17  327853920
>  327853949   29
> rtb_targeting_server_lxptedal05.nanigans.com-1430748084888-c523a089-1
> rtb_targeting_server compile_request18  328196285
>  328497010   300725  none
> rtb_targeting_server compile_request19  330429455
>  330733318   303863  none
> rtb_targeting_server compile_request20  328678091
>  328678137   46
> rtb_targeting_server_lxptedal06.nanigans.com-1430748183878-b5f84424-0
> rtb_targeting_server compile_request21  328089585
>  3280895

Re: Issue with high level consumer in 8.1.1 after restart

2015-04-22 Thread Mayuresh Gharat
Can you resend the logs?
Also I did not get what you meant by :

"Sometimes after restart consumers picks the messages sometimes it starts
but not receives any messages. "

Thanks,

Mayuresh

On Wed, Apr 22, 2015 at 5:20 AM, Madhukar Bharti 
wrote:

> Hi All,
>
> Any update on this? We are facing same issue all the time when ever we
> re-start consumers.
>
> On Mon, Apr 6, 2015 at 11:19 PM, Madhukar Bharti  >
> wrote:
>
> > Hi Mayuresh,
> >
> > We are having only one consumer in the group and only one partition for
> > that topic.
> > We have set auto.commit.enable false, zookeeper.session.timeout.ms
> =6,
> > rebalance.backoff.ms to 2000 and rebalance.max.retries to 20.
> >
> > Thanks!
> >
> > On Mon, Apr 6, 2015 at 9:59 PM, Mayuresh Gharat <
> > gharatmayures...@gmail.com> wrote:
> >
> >> It actually depends on how many consumers you have in the same group and
> >> how many partitions the particular topic has.
> >>
> >> Can you elaborate on your configuration?
> >>
> >> Thanks,
> >>
> >> Mayuresh
> >>
> >> On Mon, Apr 6, 2015 at 3:35 AM, Madhukar Bharti <
> bhartimadhu...@gmail.com
> >> >
> >> wrote:
> >>
> >> > Hi All,
> >> >
> >> > We are facing issue with Kafka high Level consumers. We are using
> >> 0.8.1.1
> >> > version. Sometimes after restart consumers picks the messages
> sometimes
> >> it
> >> > starts but not receives any messages. Is high level consumer is not
> >> > reliable?
> >> >
> >> > I have checked with the log. Even if re-balance succeed, consumer
> didn't
> >> > receives any messages. Is there any way to overcome this.
> >> >
> >> > I am attaching log also when the consumer restarted. Please find the
> >> > attachment and let us know what can be possible reason. We have only
> one
> >> > partition for that topic.
> >> >
> >> > Thanks in Advance!
> >> >
> >>
> >>
> >>
> >> --
> >> -Regards,
> >> Mayuresh R. Gharat
> >> (862) 250-7125
> >
> >
>



-- 
-Regards,
Mayuresh R. Gharat
(862) 250-7125


Re: Kafka server - conflicted ephemeral node

2015-04-21 Thread Mayuresh Gharat
This happens due to a bug in zookeeper, sometimes the znode does not get
deleted automatically.We have seen it many times at Linkedin and are trying
to investigate further.

Thanks,

Mayuresh

On Mon, Apr 20, 2015 at 8:52 PM, 小宇  wrote:

> Thanks for your response gharatmayuresh1, but I don't know what you mean
> exactly. I have restart my server and I want to find out the cause in case
> it happen again.
>
> 2015-04-21 11:36 GMT+08:00 :
>
> > Try bouncing
> > 10.144.38.185
> >
> > This should resolve the issue.
> >
> > Thanks,
> >
> > Mayuresh
> > Sent from my iPhone
> >
> > > On Apr 20, 2015, at 8:22 PM, 小宇  wrote:
> > >
> > > 10.144.38.185
> >
>



-- 
-Regards,
Mayuresh R. Gharat
(862) 250-7125


Re: Issue with high level consumer in 8.1.1 after restart

2015-04-06 Thread Mayuresh Gharat
It actually depends on how many consumers you have in the same group and
how many partitions the particular topic has.

Can you elaborate on your configuration?

Thanks,

Mayuresh

On Mon, Apr 6, 2015 at 3:35 AM, Madhukar Bharti 
wrote:

> Hi All,
>
> We are facing issue with Kafka high Level consumers. We are using 0.8.1.1
> version. Sometimes after restart consumers picks the messages sometimes it
> starts but not receives any messages. Is high level consumer is not
> reliable?
>
> I have checked with the log. Even if re-balance succeed, consumer didn't
> receives any messages. Is there any way to overcome this.
>
> I am attaching log also when the consumer restarted. Please find the
> attachment and let us know what can be possible reason. We have only one
> partition for that topic.
>
> Thanks in Advance!
>



-- 
-Regards,
Mayuresh R. Gharat
(862) 250-7125


Re: Kafka Consumer Offset

2015-04-02 Thread Mayuresh Gharat
You can use it. Are you facing any problems using it?

On Thu, Apr 2, 2015 at 6:53 AM, Amreen Khan  wrote:

> Is KafkaConsumerOffsetChecker still in development for 0.8.2?
>
> Amreen Khan




-- 
-Regards,
Mayuresh R. Gharat
(862) 250-7125


Re: blocking KafkaProducer call

2015-04-01 Thread Mayuresh Gharat
Great !!. The sync mode and those properties are orthogonal.

Thanks,

Mayuresh

On Wed, Apr 1, 2015 at 10:46 AM, sunil kalva  wrote:

> thanks grant
> by changing linger.ms=0,batch.size=1, problem solved
>
>
> On Wed, Apr 1, 2015 at 11:11 PM, Grant Henke  wrote:
>
> > They can. You can read more about configuring the new java producer here:
> > http://kafka.apache.org/documentation.html#newproducerconfigs
> >
> > Thanks,
> > Grant
> >
> > On Wed, Apr 1, 2015 at 12:34 PM, sunil kalva 
> wrote:
> >
> >> Does these config params has effect when i try to simulate "sync" mode
> by
> >> not passing callback ?
> >>
> >> On Wed, Apr 1, 2015 at 10:32 PM, Mayuresh Gharat <
> >> gharatmayures...@gmail.com> wrote:
> >>
> >>> Whats your "linger.ms" and "batch.size" ?
> >>>
> >>> Thanks,
> >>>
> >>> Mayuresh
> >>>
> >>> On Wed, Apr 1, 2015 at 5:51 AM, sunil kalva 
> >>> wrote:
> >>>
> >>> > I am trying to simulate "sync" call using following code,
> >>> >
> >>> > try {
> >>> >
> >>> > Future send = producer.send(new
> >>> > ProducerRecord("the-topic", "key".getBytes(),
> >>> > "value".getBytes())).get();
> >>> >
> >>> >  send.get();
> >>> >
> >>> > System.out.println("Time = " + (System.currentTimeMillis() - b));
> >>> > } catch (Exception e) {
> >>> >
> >>> > }
> >>> >
> >>> > And i am using new "org.apache.kafka.clients.producer.KafkaProducer"
> >>> > class for sending messages, each  message  is taking more than 100ms,
> >>> > am i missing something. If i use old "kafka.javaapi.producer.Produce"
> >>> > it is giving the desired throughput.
> >>> >
> >>> > Please advice me hot to fix this.
> >>> >
> >>> >
> >>> > On Tue, Mar 31, 2015 at 11:21 PM, sunil kalva 
> >>> > wrote:
> >>> >
> >>> > > thanks ghenke, that was a quick response. I will test and will let
> >>> you
> >>> > > know if i have some questions.
> >>> > >
> >>> > > On Tue, Mar 31, 2015 at 11:17 PM, Grant Henke  >
> >>> > wrote:
> >>> > >
> >>> > >> I think you are looking at is this section:
> >>> > >>
> >>> > >> > If you want to simulate a simple blocking call you can do the
> >>> > following:
> >>> > >> >
> >>> > >> > producer.send(new ProducerRecord("the-topic",
> >>> > >> "key".getBytes(), "value".getBytes())).get();
> >>> > >> >
> >>> > >> > What that is doing is calling .get() on the Future returned by
> the
> >>> > send
> >>> > >> method. This will block until the message is sent or an exception
> is
> >>> > >> thrown.
> >>> > >>
> >>> > >> The documentation for Future is here:
> >>> > >>
> >>> > >>
> >>> >
> >>>
> http://docs.oracle.com/javase/7/docs/api/java/util/concurrent/Future.html#get()
> >>> > >>
> >>> > >> On Tue, Mar 31, 2015 at 12:30 PM, sunil kalva <
> sambarc...@gmail.com
> >>> >
> >>> > >> wrote:
> >>> > >>
> >>> > >> > Hi
> >>> > >> > According to this
> >>> > >> >
> >>> > >> >
> >>> > >>
> >>> >
> >>>
> http://kafka.apache.org/082/javadoc/index.html?org/apache/kafka/clients/producer/KafkaProducer.html
> >>> > >> > documentation, if i don't pass callback it will work as blocking
> >>> call,
> >>> > >> Does
> >>> > >> > it mean that message will be immediately sent to kafka cluster
> >>> and all
> >>> > >> > possible exceptions will be throws immediately if not able to
> >>> send ?
> >>> > >> >
> >>> > >> > --
> >>> > >> > SunilKalva
> >>> > >> >
> >>> > >>
> >>> > >>
> >>> > >>
> >>> > >> --
> >>> > >> Grant Henke
> >>> > >> Solutions Consultant | Cloudera
> >>> > >> ghe...@cloudera.com | 920-980-8979
> >>> > >> twitter.com/ghenke <http://twitter.com/gchenke> |
> >>> > >> linkedin.com/in/granthenke
> >>> > >>
> >>> > >
> >>> > >
> >>> > >
> >>> > > --
> >>> > > SunilKalva
> >>> > >
> >>> >
> >>> >
> >>> >
> >>> > --
> >>> > SunilKalva
> >>> >
> >>>
> >>>
> >>>
> >>> --
> >>> -Regards,
> >>> Mayuresh R. Gharat
> >>> (862) 250-7125
> >>>
> >>
> >>
> >>
> >> --
> >> SunilKalva
> >>
> >
> >
> >
> > --
> > Grant Henke
> > Solutions Consultant | Cloudera
> > ghe...@cloudera.com | twitter.com/ghenke <http://twitter.com/gchenke> |
> > linkedin.com/in/granthenke
> >
>
>
>
> --
> SunilKalva
>



-- 
-Regards,
Mayuresh R. Gharat
(862) 250-7125


Re: Consumer in Java client

2015-04-01 Thread Mayuresh Gharat
It will have an API for committing offsets. We are planning to provide
functionality with separate client for fetching offsets and Consumer may or
may not use that client.

Thanks,

Mayuresh

On Tue, Mar 31, 2015 at 3:16 PM, Hema Bhatia  wrote:

> 2 more questions to add to it:
>
> - Any estimation for when new consumer client will be ready for high level
> consumer?
> - Also, will it come with offset management functionality for high level
> consumer too?
>
>
> -Original Message-
> From: Jiangjie Qin [mailto:j...@linkedin.com.INVALID]
> Sent: Monday, March 30, 2015 11:06 PM
> To: users@kafka.apache.org
> Subject: Re: Consumer in Java client
>
> Hi,
>
> KafkaConsumer is still under development and not ready for wide use case.
> Currently, it can be used to replace SimpleConsumer (low level consumer),
> but can not replace ZookeeperConsumerConnector(high level consumer). So if
> you need to use simple consumer, I would suggest using KafkaConsumer
> instead, otherwise, you probably still want to use
> ZookeeperConsumerConnector.
>
> - Jiangjie (Becket) Qin
>
> On 3/30/15, 7:32 PM, "LongkerDandy"  wrote:
>
> >Hi
> >
> >I'm new to kafka and using 0.8.2.1 kafka-clients and kafka_2.10 packages.
> >The document says: ³We are in the process of rewritting the JVM clients
> >for Kafka. As of 0.8.2 Kafka includes a newly rewritten Java producer.
> >The next release will include an equivalent Java consumer.² It seems
> >the kafka-clients package already ships with a KafkaConsumer.
> >So should I use the KafkaConsumer from kafka-clients?
> >Or I should stick with kafka.javaapi.consumer.ConsumerConnector from
> >kafka_2.10?
> >
> >Regards
> >LongkerDandy
>
>
> This message is private and confidential. If you have received it in
> error, please notify the sender and remove it from your system.
>



-- 
-Regards,
Mayuresh R. Gharat
(862) 250-7125


Re: blocking KafkaProducer call

2015-04-01 Thread Mayuresh Gharat
Whats your "linger.ms" and "batch.size" ?

Thanks,

Mayuresh

On Wed, Apr 1, 2015 at 5:51 AM, sunil kalva  wrote:

> I am trying to simulate "sync" call using following code,
>
> try {
>
> Future send = producer.send(new
> ProducerRecord("the-topic", "key".getBytes(),
> "value".getBytes())).get();
>
>  send.get();
>
> System.out.println("Time = " + (System.currentTimeMillis() - b));
> } catch (Exception e) {
>
> }
>
> And i am using new "org.apache.kafka.clients.producer.KafkaProducer"
> class for sending messages, each  message  is taking more than 100ms,
> am i missing something. If i use old "kafka.javaapi.producer.Produce"
> it is giving the desired throughput.
>
> Please advice me hot to fix this.
>
>
> On Tue, Mar 31, 2015 at 11:21 PM, sunil kalva 
> wrote:
>
> > thanks ghenke, that was a quick response. I will test and will let you
> > know if i have some questions.
> >
> > On Tue, Mar 31, 2015 at 11:17 PM, Grant Henke 
> wrote:
> >
> >> I think you are looking at is this section:
> >>
> >> > If you want to simulate a simple blocking call you can do the
> following:
> >> >
> >> > producer.send(new ProducerRecord("the-topic",
> >> "key".getBytes(), "value".getBytes())).get();
> >> >
> >> > What that is doing is calling .get() on the Future returned by the
> send
> >> method. This will block until the message is sent or an exception is
> >> thrown.
> >>
> >> The documentation for Future is here:
> >>
> >>
> http://docs.oracle.com/javase/7/docs/api/java/util/concurrent/Future.html#get()
> >>
> >> On Tue, Mar 31, 2015 at 12:30 PM, sunil kalva 
> >> wrote:
> >>
> >> > Hi
> >> > According to this
> >> >
> >> >
> >>
> http://kafka.apache.org/082/javadoc/index.html?org/apache/kafka/clients/producer/KafkaProducer.html
> >> > documentation, if i don't pass callback it will work as blocking call,
> >> Does
> >> > it mean that message will be immediately sent to kafka cluster and all
> >> > possible exceptions will be throws immediately if not able to send ?
> >> >
> >> > --
> >> > SunilKalva
> >> >
> >>
> >>
> >>
> >> --
> >> Grant Henke
> >> Solutions Consultant | Cloudera
> >> ghe...@cloudera.com | 920-980-8979
> >> twitter.com/ghenke  |
> >> linkedin.com/in/granthenke
> >>
> >
> >
> >
> > --
> > SunilKalva
> >
>
>
>
> --
> SunilKalva
>



-- 
-Regards,
Mayuresh R. Gharat
(862) 250-7125


Re: Which version works for kafka 0.8.2 as consumer?

2015-04-01 Thread Mayuresh Gharat
What do you mean by offset syncing?

Thanks,

Mayuresh

On Wed, Apr 1, 2015 at 9:59 AM, pushkar priyadarshi <
priyadarshi.push...@gmail.com> wrote:

> So in 0.8.2.0/0.8.2.1 high level consumer can not make use of offset
> syncing in kafka?
>
> On Wed, Apr 1, 2015 at 12:51 PM, Jiangjie Qin 
> wrote:
>
> > Yes, KafkaConsumer in 0.8.2 is still in development. You probably still
> > want to use ZookeeperConsumerConnector for now.
> >
> > On 4/1/15, 9:28 AM, "Mark Zang"  wrote:
> >
> > >I found the 0.8.2.0 and 0.8.2.1 has a KafkaConsumer. But this class
> seems
> > >not completed and not functional. Lots of method returns null or throws
> > >NSM. Which version of consumer for kafka 0.8.2 broker?
> > >
> > >Thanks!
> > >
> > >--
> > >Best regards!
> > >Mike Zang
> >
> >
>



-- 
-Regards,
Mayuresh R. Gharat
(862) 250-7125


Re: Kafka 8.2.1 Offset fetch Request

2015-03-31 Thread Mayuresh Gharat
On a first pass, the code has a small bug in getConsumedOffset().

You need to close the channel after getting the coordinator   : // broker =
cmr.coordinator();
 and then reconnect to coordinator as follows :

  broker = cmr.coordinator();
// if the coordinator is different, from the above channel's host then
reconnect
*channel.disconnect();*
*channel = new BlockingChannel(broker.host(), broker.port(),*
*
BlockingChannel.UseDefaultBufferSize(),*
*
BlockingChannel.UseDefaultBufferSize(),*
*  5000 /* read timeout in millis
*/);*
*channel.connect();*
..


Same goes for commitOffsets().



Thanks,

Mayuresh


On Tue, Mar 31, 2015 at 11:52 AM, Madhukar Bharti 
wrote:

> Hi Mayuresh,
>
> Have you gone through that code? Please help me in that. If it will work
> for high level consumer then will plan accordingly to store offset of
> low-level consumer too in this offset topic. Will that work if I will give
> some group name for low-level consumers also but process it partition wise
> as Simple consumers are there.
>
> Thanks and awaiting for your response!
>
> On Mon, Mar 30, 2015 at 9:45 PM, Mayuresh Gharat <
> gharatmayures...@gmail.com> wrote:
>
>> Cool. Will try reviewing it today and get back :)
>>
>> Thanks,
>>
>> Mayuresh
>>
>> On Mon, Mar 30, 2015 at 2:53 AM, Madhukar Bharti <
>> bhartimadhu...@gmail.com> wrote:
>>
>>> Hi Mayuresh,
>>>
>>> Thanks for your quick response!
>>>
>>> I have tried to use offset manager for fetch/commit request. I have set
>>> "dua.commit.enable" to false and using offset.storage=kafka.
>>>
>>> But I am not able to commit the offset. My code is here
>>> <https://github.com/madhukarbharti/kafka-8.2.1-test/tree/master/src/com/bharti/kafka/offset>
>>> .
>>>
>>> Kindly check this. Am I missing anything. I am running with single
>>> broker.
>>>
>>> Thanks!
>>>
>>> On Fri, Mar 27, 2015 at 10:06 PM, Mayuresh Gharat <
>>> gharatmayures...@gmail.com> wrote:
>>>
>>>> In your case you are trying to issue an offsetRequest and not a
>>>> fetchOffsetRequest. I know this is little confusing.
>>>>
>>>> Let me point you to a scala patch which has a client for doing fetch
>>>> offset and commit offset.
>>>>
>>>> I am going to rewrite that in java. Here is the Kafka ticket :
>>>>
>>>> https://issues.apache.org/jira/browse/KAFKA-1013
>>>>
>>>> You can look at the RB and see how it is done. If you have any further
>>>> questions I will be happy to answer them.
>>>>
>>>> Thanks,
>>>>
>>>> Mayuresh
>>>>
>>>>
>>>> On Fri, Mar 27, 2015 at 9:30 AM, Mayuresh Gharat <
>>>> gharatmayures...@gmail.com> wrote:
>>>>
>>>>> Other thing is if you are using SimpleConsumer, it is up to your app
>>>>> to do the offsetManagement. The ZK based offsets or Kafka based offsets
>>>>> will work if you are using the HighLevel Consumer.
>>>>>
>>>>> Thanks,
>>>>>
>>>>> Mayuresh
>>>>>
>>>>> On Fri, Mar 27, 2015 at 9:17 AM, Mayuresh Gharat <
>>>>> gharatmayures...@gmail.com> wrote:
>>>>>
>>>>>> Hi Madhukar,
>>>>>>
>>>>>> I am going through your code now. Let me see what I can find.
>>>>>>
>>>>>> Where were you storing your offsets before?
>>>>>> Was it always Zookeeper or was it Kafka?
>>>>>> If it was Zookeeper, the correct way to migrate from zookeeper to
>>>>>> kafka based offsets is this :
>>>>>>
>>>>>> 1) Config Change :
>>>>>>  - offsets.storage = kafka
>>>>>>  - dual.commit.enabled = true
>>>>>> 2) Rolling Bounce
>>>>>> 3) Config Change :
>>>>>>  - dual.commit.enabled=false
>>>>>> 4) Rolling Bounce.
>>>>>>
>>>>>> For more info on Offset Management, you can also refer these slides
>>>>>> from Kafka Meetup:
>>>>>> http://www.slideshare.net/jjkoshy/offset-management-in-kafka
>>>>>>
>>>>>>
>>>>>> Apart from that for using Kafka based offsets, to do a
>>>>>> fetchOffsetRequest or commit offset request you don&#x

Re: Kafka 8.2.1 Offset fetch Request

2015-03-27 Thread Mayuresh Gharat
In your case you are trying to issue an offsetRequest and not a
fetchOffsetRequest. I know this is little confusing.

Let me point you to a scala patch which has a client for doing fetch offset
and commit offset.

I am going to rewrite that in java. Here is the Kafka ticket :

https://issues.apache.org/jira/browse/KAFKA-1013

You can look at the RB and see how it is done. If you have any further
questions I will be happy to answer them.

Thanks,

Mayuresh


On Fri, Mar 27, 2015 at 9:30 AM, Mayuresh Gharat  wrote:

> Other thing is if you are using SimpleConsumer, it is up to your app to do
> the offsetManagement. The ZK based offsets or Kafka based offsets will work
> if you are using the HighLevel Consumer.
>
> Thanks,
>
> Mayuresh
>
> On Fri, Mar 27, 2015 at 9:17 AM, Mayuresh Gharat <
> gharatmayures...@gmail.com> wrote:
>
>> Hi Madhukar,
>>
>> I am going through your code now. Let me see what I can find.
>>
>> Where were you storing your offsets before?
>> Was it always Zookeeper or was it Kafka?
>> If it was Zookeeper, the correct way to migrate from zookeeper to kafka
>> based offsets is this :
>>
>> 1) Config Change :
>>  - offsets.storage = kafka
>>  - dual.commit.enabled = true
>> 2) Rolling Bounce
>> 3) Config Change :
>>  - dual.commit.enabled=false
>> 4) Rolling Bounce.
>>
>> For more info on Offset Management, you can also refer these slides from
>> Kafka Meetup:
>> http://www.slideshare.net/jjkoshy/offset-management-in-kafka
>>
>>
>> Apart from that for using Kafka based offsets, to do a fetchOffsetRequest
>> or commit offset request you don't need a consumer. You need to know the
>> groupId. You need to connect to kafka, issue a consumerMetaData Request.
>> This will fetch you the OffsetManager for that groupId. You can then issue
>> the fetch or commit request to that OffsetManager.
>>
>> BTW, we are coming up with an offsetClient soon.
>>
>> Thanks,
>>
>> Mayuresh
>>
>> On Fri, Mar 27, 2015 at 1:53 AM, Madhukar Bharti <
>> bhartimadhu...@gmail.com> wrote:
>>
>>> Hi Mayuresh,
>>>
>>> Please check this
>>> <https://github.com/madhukarbharti/kafka-8.2.1-test/blob/master/src/com/bharti/kafka/offset/OffsetRequester.java>
>>>  program.
>>> Am I doing any mistake?
>>>
>>> Thanks
>>>
>>>
>>> On Thu, Mar 26, 2015 at 6:27 PM, Madhukar Bharti <
>>> bhartimadhu...@gmail.com> wrote:
>>>
>>>> Hi Mayuresh,
>>>>
>>>> I have tried to fetch the offset using OffsetFetchRequest as given in
>>>> this wiki
>>>>
>>>>
>>>> https://cwiki.apache.org/confluence/display/KAFKA/Committing+and+fetching+consumer+offsets+in+Kafka
>>>>
>>>> But It only works if we set "dual.commit.enabled" to "true" and
>>>> "offsets.storage" to "kafka". Otherwise it returns -1.
>>>>
>>>> Do I need to change anything?
>>>>
>>>>
>>>> Thanks in advance!
>>>>
>>>
>>>
>>
>>
>> --
>> -Regards,
>> Mayuresh R. Gharat
>> (862) 250-7125
>>
>
>
>
> --
> -Regards,
> Mayuresh R. Gharat
> (862) 250-7125
>



-- 
-Regards,
Mayuresh R. Gharat
(862) 250-7125


Re: Kafka 8.2.1 Offset fetch Request

2015-03-27 Thread Mayuresh Gharat
Other thing is if you are using SimpleConsumer, it is up to your app to do
the offsetManagement. The ZK based offsets or Kafka based offsets will work
if you are using the HighLevel Consumer.

Thanks,

Mayuresh

On Fri, Mar 27, 2015 at 9:17 AM, Mayuresh Gharat  wrote:

> Hi Madhukar,
>
> I am going through your code now. Let me see what I can find.
>
> Where were you storing your offsets before?
> Was it always Zookeeper or was it Kafka?
> If it was Zookeeper, the correct way to migrate from zookeeper to kafka
> based offsets is this :
>
> 1) Config Change :
>  - offsets.storage = kafka
>  - dual.commit.enabled = true
> 2) Rolling Bounce
> 3) Config Change :
>  - dual.commit.enabled=false
> 4) Rolling Bounce.
>
> For more info on Offset Management, you can also refer these slides from
> Kafka Meetup:
> http://www.slideshare.net/jjkoshy/offset-management-in-kafka
>
>
> Apart from that for using Kafka based offsets, to do a fetchOffsetRequest
> or commit offset request you don't need a consumer. You need to know the
> groupId. You need to connect to kafka, issue a consumerMetaData Request.
> This will fetch you the OffsetManager for that groupId. You can then issue
> the fetch or commit request to that OffsetManager.
>
> BTW, we are coming up with an offsetClient soon.
>
> Thanks,
>
> Mayuresh
>
> On Fri, Mar 27, 2015 at 1:53 AM, Madhukar Bharti  > wrote:
>
>> Hi Mayuresh,
>>
>> Please check this
>> <https://github.com/madhukarbharti/kafka-8.2.1-test/blob/master/src/com/bharti/kafka/offset/OffsetRequester.java>
>>  program.
>> Am I doing any mistake?
>>
>> Thanks
>>
>>
>> On Thu, Mar 26, 2015 at 6:27 PM, Madhukar Bharti <
>> bhartimadhu...@gmail.com> wrote:
>>
>>> Hi Mayuresh,
>>>
>>> I have tried to fetch the offset using OffsetFetchRequest as given in
>>> this wiki
>>>
>>>
>>> https://cwiki.apache.org/confluence/display/KAFKA/Committing+and+fetching+consumer+offsets+in+Kafka
>>>
>>> But It only works if we set "dual.commit.enabled" to "true" and
>>> "offsets.storage" to "kafka". Otherwise it returns -1.
>>>
>>> Do I need to change anything?
>>>
>>>
>>> Thanks in advance!
>>>
>>
>>
>
>
> --
> -Regards,
> Mayuresh R. Gharat
> (862) 250-7125
>



-- 
-Regards,
Mayuresh R. Gharat
(862) 250-7125


Re: Kafka 8.2.1 Offset fetch Request

2015-03-27 Thread Mayuresh Gharat
Hi Madhukar,

I am going through your code now. Let me see what I can find.

Where were you storing your offsets before?
Was it always Zookeeper or was it Kafka?
If it was Zookeeper, the correct way to migrate from zookeeper to kafka
based offsets is this :

1) Config Change :
 - offsets.storage = kafka
 - dual.commit.enabled = true
2) Rolling Bounce
3) Config Change :
 - dual.commit.enabled=false
4) Rolling Bounce.

For more info on Offset Management, you can also refer these slides from
Kafka Meetup:
http://www.slideshare.net/jjkoshy/offset-management-in-kafka


Apart from that for using Kafka based offsets, to do a fetchOffsetRequest
or commit offset request you don't need a consumer. You need to know the
groupId. You need to connect to kafka, issue a consumerMetaData Request.
This will fetch you the OffsetManager for that groupId. You can then issue
the fetch or commit request to that OffsetManager.

BTW, we are coming up with an offsetClient soon.

Thanks,

Mayuresh

On Fri, Mar 27, 2015 at 1:53 AM, Madhukar Bharti 
wrote:

> Hi Mayuresh,
>
> Please check this
> 
>  program.
> Am I doing any mistake?
>
> Thanks
>
>
> On Thu, Mar 26, 2015 at 6:27 PM, Madhukar Bharti  > wrote:
>
>> Hi Mayuresh,
>>
>> I have tried to fetch the offset using OffsetFetchRequest as given in
>> this wiki
>>
>>
>> https://cwiki.apache.org/confluence/display/KAFKA/Committing+and+fetching+consumer+offsets+in+Kafka
>>
>> But It only works if we set "dual.commit.enabled" to "true" and
>> "offsets.storage" to "kafka". Otherwise it returns -1.
>>
>> Do I need to change anything?
>>
>>
>> Thanks in advance!
>>
>
>


-- 
-Regards,
Mayuresh R. Gharat
(862) 250-7125


Re: Rebalance not happening even after increasing max retries causing conflict in ZK

2015-03-26 Thread Mayuresh Gharat
Did you see something like this in any of the consumer logs :

"Conflict in ….. data : ……. stored data :……”  ?

Thanks,

Mayuresh

On Thu, Mar 26, 2015 at 1:50 PM, Mike Axiak  wrote:

> Hi guys,
>
> At HubSpot we think the issue is related to slow consumers. During a
> rebalance, one of the first things the consumer does is signal a shutdown
> to the fetcher [1] [2], in order to relinquish ownership of the partitions.
>
> This waits for the shutdown of all shutdown fetcher threads, which can only
> happen until the thread's "enqueue current chunk" command finishes. If you
> have a slow consumer or large chunk sizes, this could take a while which
> would make it difficult for the rebalance to actually occur successfully.
>
> We're testing out different solutions now. Currently under review, we're
> thinking about making the enqueue into the blocking queue timeout so we can
> check to see if we're running, to end the process of the current chunk
> early.
>
> Has anyone else noticed this? If so, are there any patches people have
> written. Once we have a clearer picture of solutions we'll send a few
> patches in JIRAs.
>
> Best,
> Mike
>
> 1:
>
> https://github.com/apache/kafka/blob/0.8.2/core/src/main/scala/kafka/consumer/ZookeeperConsumerConnector.scala#L655
> 2:
>
> https://github.com/apache/kafka/blob/0.8.2/core/src/main/scala/kafka/consumer/ZookeeperConsumerConnector.scala#L712
>
>
> On Mon, Dec 22, 2014 at 6:51 PM, Neha Narkhede  wrote:
>
> > Can you share a reproducible test case?
> >
> > On Tue, Dec 9, 2014 at 7:11 AM, Mohit Kathuria 
> > wrote:
> >
> > > Neha,
> > >
> > > The same issue reoccured with just 2 consumer processes. The exception
> > was
> > > related to conflict in writing the ephemeral node. Below was the
> > exception.
> > > Topic name is
> > >
> >
> "lst_plugin_com.spr.listening.plugin.impl.plugins.SemantriaEnrichmentPlugin"
> > > with 30 partitions. The 2 processes were running on 2 servers with ips
> > > 10.0.8.222 and 10.0.8.225.
> > >
> > > *2014-12-09 13:22:11 k.u.ZkUtils$ [INFO] I wrote this conflicted
> > ephemeral
> > > node
> > >
> >
> [{"version":1,"subscription":{"lst_plugin_com.spr.listening.plugin.impl.plugins.SemantriaEnrichmentPlugin":5},"pattern":"static","timestamp":"1417964160024"}]
> > > at
> > >
> >
> /consumers/lst_plugin_com.spr.listening.plugin.impl.plugins.SemantriaEnrichmentPlugin/ids/lst_plugin_com.spr.listening.plugin.impl.plugins.SemantriaEnrichmentPlugin_ip-10-0-8-222-1417963753598-b19de58d
> > > a while back in a different session, hence I will backoff for this node
> > to
> > > be deleted by Zookeeper and retry*
> > > Attached the complete error logs. The exception occured after the
> > > rebalance failed even after 40 retries. Rebalance failed as the process
> > > already owning some of the partitions did not give us ownership due to
> > > conflicting ephemeral nodes. As you suggested, we ran the wchp command
> > on
> > > the 3 zookeeper nodes at this time and figured out that the watcher was
> > > registered for only one of the process. I am copying the kafka consumer
> > > watcher registered on one of the zookeeper servers. (Attached are the
> > wchp
> > > outputs of all 3 zk servers)
> > >
> > > *$echo wchp | nc localhost 2181 *
> > >
> > >
> > >
> >
> */kafka/consumers/lst_plugin_com.spr.listening.plugin.impl.plugins.SemantriaEnrichmentPlugin/ids*
> > >
> > > * 0x34a175e1d5d0130*
> > >
> > >
> > > "0x34a175e1d5d0130" was the ephemeral node session Id. I went back to
> the
> > > zookeeper shell and checked the consumers registered for this topic and
> > > consumer group(same as topic name). Attaching the output in
> > zkCommands.txt.
> > > This clearly shows that
> > >
> > > 10.0.8.222 has ephemeralOwner = 0x34a175e1d5d0130
> > >
> > > 10.0.8.225 has ephemeralOwner = 0x34a175e1d5d0127
> > >
> > >
> > > I think we have the issue here that both consumers have written to
> > > different ephemeral nodes. Watchers are registered for the one of the 2
> > > ephemeral node. The root cause seems to be the inconsistent state while
> > > writing the ephemeral nodes in ZK.
> > >
> > > Let me know if you need more details.
> > >
> > > -Thanks,
> > >
> > > Mohit
> > >
> > >
> > >
> > >
> > > On Mon, Nov 10, 2014 at 8:46 AM, Neha Narkhede <
> neha.narkh...@gmail.com>
> > > wrote:
> > >
> > >> A rebalance should trigger on all consumers when you add a new
> consumer
> > to
> > >> the group. If you don't see the zookeeper watch fire, the consumer may
> > >> have
> > >> somehow lost the watch. We have seen this behavior on older zk
> > versions, I
> > >> wonder it that bug got reintroduced. To verify if this is the case,
> you
> > >> can
> > >> run the wchp zookeeper command on the zk leader and check if each
> > consumer
> > >> has a watch registered.
> > >>
> > >> Do you have a way to try this on zk 3.3.4? I would recommend you try
> the
> > >> wchp suggestion as well.
> > >>
> > >> On Fri, Nov 7, 2014 at 6:07 AM, Mohit Kathuria <
> mkathu...@sprinklr.com>
> > >> wr

Re: Kafka server relocation

2015-03-25 Thread Mayuresh Gharat
You can use the Mirror maker to move data from one data center to other and
once all the data has been moved you can shut down the source data center
by doing a controlled shutdown.

Thanks,

Mayuresh

On Wed, Mar 25, 2015 at 2:35 PM, Jiangjie Qin 
wrote:

> If you want to do a seamless migration. I think a better way is to build a
> cross datacenter Kafka cluster temporarily. So the process is:
> 1. Add several new Kafka brokers in your new datacenter and add them to
> the old cluster.
> 2. Use replica assignment tool to reassign all the partitions to brokers
> in new datacenter.
> 3. Perform controlled shutdown on the brokers in old datacenter.
>
> Jiangjie (Becket) Qin
>
> On 3/25/15, 2:01 PM, "nitin sharma"  wrote:
>
> >Hi Team,
> >
> >in my project, we have built a new datacenter for Kafka brokers and wants
> >to migrate from current datacenter to new one.
> >
> >Switching producers and consumers wont be a problem provided New
> >Datacenter
> >has all the messages of existing Datacenter.
> >
> >
> >i have only 1 topic with 2 partition that need to be migrated... that too
> >it is only 1 time activity.
> >
> >Kindly suggest the best way to deal with this situation.
> >
> >
> >Regards,
> >Nitin Kumar Sharma.
>
>


-- 
-Regards,
Mayuresh R. Gharat
(862) 250-7125


Re: lost messages -?

2015-03-25 Thread Mayuresh Gharat
DumpLogSegments will give you output something like this :

offset: 780613873770 isvalid: true payloadsize: 8055 magic: 1 compresscodec:
GZIPCompressionCodec

If this is what you want you can use the tool, to detect if the messages
are getting to your brokers.
Console-Consumer will output the messages for you.

Thanks,

Mayuresh


On Wed, Mar 25, 2015 at 9:33 AM, tao xiao  wrote:

> You can use kafka-console-consumer consuming the topic from the beginning
>
> *kafka-console-consumer.sh --zookeeper localhost:2181 --topic test
> --from-beginning*
>
>
> On Thu, Mar 26, 2015 at 12:17 AM, Victor L  wrote:
>
> > Can someone let me know how to dump contents of topics?
> > I have producers sending messages to 3 brokers but about half of them
> don't
> > seem to be consumed. I suppose they are getting stuck in queues but how
> can
> > i figure out where?
> > Thks,
> >
>
>
>
> --
> Regards,
> Tao
>



-- 
-Regards,
Mayuresh R. Gharat
(862) 250-7125


Re: kafka.admin as separate module

2015-03-25 Thread Mayuresh Gharat
Yeah, that would be great having a separate admin package. +1.

Thanks,

Mayuresh

On Wed, Mar 25, 2015 at 8:26 AM, Stevo Slavić  wrote:

> Hello Apache Kafka community,
>
> I like that kafka-clients is now separate module, and has no scala
> dependency even. I'd like to propose that kafka.admin package gets
> published as separate module too.
>
> I'm writing some tests, and to be able to use kafka.admin tools/utils in
> them I have to bring in too large kafka module, with server stuff and all
> dependencies, like netty. Test framework happens to use netty too but
> different version - classpath hell.
>
> Any thoughts? Is proposal sound enough for a JIRA ticket?
>
> Kind regards,
> Stevo Slavic.
>



-- 
-Regards,
Mayuresh R. Gharat
(862) 250-7125


Re: lost messages -?

2015-03-25 Thread Mayuresh Gharat
You can use the DumpLogSegment tool.

Thanks,

Mayuresh

On Wed, Mar 25, 2015 at 9:17 AM, Victor L  wrote:

> Can someone let me know how to dump contents of topics?
> I have producers sending messages to 3 brokers but about half of them don't
> seem to be consumed. I suppose they are getting stuck in queues but how can
> i figure out where?
> Thks,
>



-- 
-Regards,
Mayuresh R. Gharat
(862) 250-7125


Re: New Java Producer Client handling case where Kafka is unreachable

2015-03-20 Thread Mayuresh Gharat
I think if the leader is down, it a leader is down, the producer would
issue a metadata request and get the new leader and start producing to it.
But if the entire kafka cluster is down, it would try for some configured
number of retries and would return back an error in future. This is my
understanding. Please correct me if I am wrong.

Thanks,

Mayuresh

On Fri, Mar 20, 2015 at 9:34 AM, Ewen Cheslack-Postava 
wrote:

> Even if you have metadata cached, if the broker isn't available then
> messages can get stuck in the producer indefinitely. Currently the new
> producer doesn't have any client-side timeouts, which is a bug. See
> https://issues.apache.org/jira/browse/KAFKA-1788 for more details.
>
>
> On Fri, Mar 20, 2015 at 8:09 AM, Jiangjie Qin 
> wrote:
>
> > This is correct when you send to a topic for the first time. After that
> > the metadata will be cached, the metadata cache has an age and after it
> > expires, metadata will be refreshed.
> > So the time a producer found a broker is not reachable is the minimum
> > value of the following times:
> > 1. Linger.ms + retries * retry.backoff.ms
> > 2. Metadata.max.age.ms
> > 3. Metadata.fetch.timeout.ms (only when sending to a topic for the first
> > time)
> >
> > Typically you will hit the first one. The default value is linger.ms=0,
> > retries=0. But you need to send records with callback to detect the
> > failure.
> >
> > Jiangjie (Becket) Qin
> >
> > On 3/20/15, 3:46 AM, "Samuel Chase"  wrote:
> >
> > >@Sunil
> > >
> > >The else branch will be executed if
> > >`metadata.fetch().partitionsForTopic(topic)` returns non NULL value. I
> > >assume that when Kafka is unreachable, it will return NULL.
> > >`waitOnMetadata()` then returns; we never enter the else branch when
> > >Kafka is unreachable.
> > >
> > >@Everyone: Is this explanation correct?
> > >
> > >On Fri, Mar 20, 2015 at 3:56 PM, sunil kalva 
> > wrote:
> > >> @Samuel
> > >> My point was
> > >> The else branch of the code will be executed when metadata is not
> > >> available, and metadata is not available when kafka cluster is not
> > >>rachable.
> > >>
> > >> please correct me if i am wrong..
> > >>
> > >> On Fri, Mar 20, 2015 at 3:43 PM, Samuel Chase 
> > >>wrote:
> > >>
> > >>> @Sunil
> > >>>
> > >>> On Fri, Mar 20, 2015 at 3:36 PM, sunil kalva 
> > >>>wrote:
> > >>> > I think KafkaProducer.send method blocks until it fetches partition
> > >>> > metadata for configured time using "metadata.fetch.timeout.ms",
> once
> > >>> time
> > >>> > out it throws TimeoutException. You might be experiencing
> > >>> TimeoutException ?
> > >>>
> > >>> My co-worker pointed out that over here:
> > >>>
> > >>>
> > >>>
> > https://github.com/apache/kafka/blob/0.8.2/clients/src/main/java/org/apa
> > >>>che/kafka/clients/producer/KafkaProducer.java#L368
> > >>>
> > >>> waitOnMetadata just returns. The else branch of the code is not
> > >>> executed when Kafka is unreachable.
> > >>>
> > >>> Trying to investigate what else must be causing the wait.
> > >>>
> > >>
> > >>
> > >>
> > >> --
> > >> SunilKalva
> >
> >
>
>
> --
> Thanks,
> Ewen
>



-- 
-Regards,
Mayuresh R. Gharat
(862) 250-7125


Re: 'roundrobin' partition assignment strategy restrictions

2015-03-19 Thread Mayuresh Gharat
Hi Becket,

Can you list down an example for this. It would be easier to understand :)

Thanks,

Mayuresh

On Thu, Mar 19, 2015 at 4:46 PM, Jiangjie Qin 
wrote:

> Hi Jason,
>
> The round-robin strategy first takes the partitions of all the topics a
> consumer is consuming from, then distributed them across all the consumers.
> If different consumers are consuming from different topics, the assigning
> algorithm will generate different answers on different consumers.
> It is OK for consumers to have different thread count, but the consumers
> have to consume from the same set of topics.
>
>
> For range strategy, the balance is for each individual topic instead of
> cross topics. So the balance is only done for the consumers consuming from
> the same topic.
>
> Thanks.
>
> Jiangjie (Becket) Qin
>
> On 3/19/15, 4:14 PM, "Jason Rosenberg"  wrote:
>
> >So,
> >
> >I've run into an issue migrating a consumer to use the new 'roundrobin'
> >partition.assignment.strategy.  It turns out that several of our consumers
> >use the same group id, but instantiate several different consumer
> >instances
> >(with different topic selectors and thread counts).  Often, this is done
> >in
> >a single shared process.  It turns out this arrangement is not allowed
> >when
> >using the 'roundrobin' assignment strategy.
> >
> >I'm curious as to the reason for this restriction?  Why is it not also a
> >restriction for the 'range' strategy (which we've been happily using for
> >some time now)?
> >
> >It would seem that as long as you always assign a partition to a consumer
> >instance that is actually selecting it, you should still be able to
> >proceed
> >with the round-robin algorithm (potentially skipping consumers if they
> >can't select the next partition in the list, etc.).
> >
> >Jason
>
>


-- 
-Regards,
Mayuresh R. Gharat
(862) 250-7125


Re: Async Producer Callback

2015-03-19 Thread Mayuresh Gharat
Yes. Thats right. I misunderstood, my bad.

Thanks,

Mayuresh

On Thu, Mar 19, 2015 at 11:05 AM, sunil kalva  wrote:

> future returns RecordMetadata class which contains only metadata not the
> actual message.
> But i think *steven* had a point like saving the reference in impl class
> and retry if there is an exception in callback method.
>
> On Thu, Mar 19, 2015 at 10:27 PM, Mayuresh Gharat <
> gharatmayures...@gmail.com> wrote:
>
> > Also you can use the other API that returns a Future and save those
> futures
> > into a list and do get() on them to check which message has been sent and
> > which returned an error so that they can be retried.
> >
> > Thanks,
> >
> > Mayuresh
> >
> > On Thu, Mar 19, 2015 at 9:19 AM, Steven Wu  wrote:
> >
> > > in your callback impl object, you can save a reference to the actual
> > > message.
> > >
> > > On Wed, Mar 18, 2015 at 10:45 PM, sunil kalva 
> > > wrote:
> > >
> > > > Hi
> > > > How do i access the actual message which is failed to send to cluster
> > > using
> > > > Callback interface and onCompletion method.
> > > >
> > > > Basically if the sender is failed to send, i want to add it to a temp
> > > queue
> > > > and retry them later.
> > > >
> > > > --Sunil
> > > >
> > >
> >
> >
> >
> > --
> > -Regards,
> > Mayuresh R. Gharat
> > (862) 250-7125
> >
>
>
>
> --
> SunilKalva
>



-- 
-Regards,
Mayuresh R. Gharat
(862) 250-7125


Re: producer queue size

2015-03-19 Thread Mayuresh Gharat
I suppose sending is controlled by the linger time and max batch size of
the queue. The messages are sent to kafka when either of these meet.

The new kafkaProducer returns a Future. So its the responsibility of the
application to do a .get() on it to see the success or failure.

Thanks,

Mayuresh

On Wed, Mar 18, 2015 at 2:32 PM, Ewen Cheslack-Postava 
wrote:

> The setting you want is buffer.memory, but I don't think there's a way to
> get the amount of remaining space.
>
> The setting block.on.buffer.full controls the behavior when you run out of
> space. Neither setting silently drops messages. It will either block until
> there is space to add the message or throw an exception, which your
> application can catch and handle however it wants.
>
> -Ewen
>
> On Wed, Mar 18, 2015 at 9:24 AM, sunil kalva  wrote:
>
> > essentially i want to use this property "queue.buffering.max.messages"
> with
> > new KafkaProducer class, and also want to access the current value of the
> > queue
> >
> > SunilKalva
> >
> > On Wed, Mar 18, 2015 at 9:51 PM, sunil kalva 
> wrote:
> >
> > >
> > > Hi
> > > How do i get the size of the inmemory queue which are holding messages
> > and
> > > ready to send in async producer, i am using new KafkaProducer class in
> > > 0.8.2.
> > >
> > > Basically instead of dropping the messages silently, i want to avoid
> > > sending messages if the queue is already full. I am using async
> > > KafkaProdcuer class.
> > >
> > > Or is there anyother better way to handle this, since i am using async
> > > client i can not catch the exception i think.
> > >
> > > --
> > > SunilKalva
> > >
> >
> >
> >
> > --
> > SunilKalva
> >
>
>
>
> --
> Thanks,
> Ewen
>



-- 
-Regards,
Mayuresh R. Gharat
(862) 250-7125


Re: Async Producer Callback

2015-03-19 Thread Mayuresh Gharat
Also you can use the other API that returns a Future and save those futures
into a list and do get() on them to check which message has been sent and
which returned an error so that they can be retried.

Thanks,

Mayuresh

On Thu, Mar 19, 2015 at 9:19 AM, Steven Wu  wrote:

> in your callback impl object, you can save a reference to the actual
> message.
>
> On Wed, Mar 18, 2015 at 10:45 PM, sunil kalva 
> wrote:
>
> > Hi
> > How do i access the actual message which is failed to send to cluster
> using
> > Callback interface and onCompletion method.
> >
> > Basically if the sender is failed to send, i want to add it to a temp
> queue
> > and retry them later.
> >
> > --Sunil
> >
>



-- 
-Regards,
Mayuresh R. Gharat
(862) 250-7125


Re: No topic owner when using different assignment strategies

2015-03-17 Thread Mayuresh Gharat
Probably we should return an error response if you already have a partition
assignment strategy inplace for a group and you try to use other strategy.

Thanks,

Mayuresh

On Tue, Mar 17, 2015 at 2:10 PM, Jiangjie Qin 
wrote:

> Yeah, using different partition assignment algorithms in the same consumer
> group won¹t work. Is there a particular reason you want to do this?
>
> On 3/17/15, 8:32 AM, "tao xiao"  wrote:
>
> >This is the corrected zk result
> >
> >Here is the result from zk
> >[zk: localhost:2181(CONNECTED) 0] get
> >/consumers/test/owners/mm-benchmark-test/0
> >
> >Node does not exist: /consumers/test/owners/mm-benchmark-test/0
> >
> >[zk: localhost:2181(CONNECTED) 1] get
> >/consumers/test/owners/mm-benchmark-test1/0
> >
> >test-localhost-1426605370072-904d6fba-0
> >
> >On Tue, Mar 17, 2015 at 11:30 PM, tao xiao  wrote:
> >
> >> Hi team,
> >>
> >> I have two consumer instances with the same group id connecting to two
> >> different topics with 1 partition created for each. One consumer uses
> >> partition.assignment.strategy=roundrobin and the other one uses default
> >> assignment strategy. Both consumers have 1 thread spawned internally and
> >> connect kafka using createMessageStreamsByFilter.
> >> The consumer with roundrobin assignment strategy connected kafka first
> >>and
> >> had 2 topics assigned to itself and then I brought up another consumer
> >>that
> >> has default assignment strategy configured. I saw rebalancing happened
> >>in
> >> both consumers but at the end only one of the topics was assigned to the
> >> consumer that is configured roundrobin assignment strategy and no topics
> >> were assigned to the other consumer. This led to one topic missing its
> >> owner.
> >>
> >> Here is the result from zk
> >> [zk: localhost:2181(CONNECTED) 0] get
> >> /consumers/test/owners/mm-benchmark-test/0
> >>
> >> Node does not exist:
> >> /consumers/test12345667f/owners/mm-benchmark-test/0
> >>
> >> [zk: localhost:2181(CONNECTED) 1] get
> >> /consumers/test/owners/mm-benchmark-test1/0
> >>
> >> test-localhost-1426605370072-904d6fba-0
> >>
> >> The kafka version I use is 0.8.2.1
> >>
> >> --
> >> Regards,
> >> Tao
> >>
> >
> >
> >
> >--
> >Regards,
> >Tao
>
>


-- 
-Regards,
Mayuresh R. Gharat
(862) 250-7125


Re: Broker Exceptions

2015-03-17 Thread Mayuresh Gharat
We are trying to see what might have caused it.

We had some questions :
1) Is this reproducible? That way we can dig deep.


This looks interesting problem to solve and you might have caught a bug,
but we need to verify the root cause before filing a ticket.

Thanks,

Mayuresh

On Tue, Mar 17, 2015 at 2:10 PM, Zakee  wrote:

> > What version are you running ?
>
> Version 0.8.2.0
>
> > Your case is 2). But the only thing weird is your replica (broker 3) is
> > requesting for offset which is greater than the leaders log end offset.
>
>
> So what could be the cause?
>
> Thanks
> Zakee
>
>
>
> > On Mar 17, 2015, at 11:45 AM, Mayuresh Gharat <
> gharatmayures...@gmail.com> wrote:
> >
> > What version are you running ?
> >
> > The code for latest version says that :
> >
> > 1) if the log end offset of the replica is greater than the leaders log
> end
> > offset, the replicas offset will be reset to logEndOffset of the leader.
> >
> > 2) Else if the log end offset of the replica is smaller than the leaders
> > log end offset and its out of range, the replicas offset will be reset to
> > logStartOffset of the leader.
> >
> > Your case is 2). But the only thing weird is your replica (broker 3) is
> > requesting for offset which is greater than the leaders log end offset.
> >
> > Thanks,
> >
> > Mayuresh
> >
> >
> > On Tue, Mar 17, 2015 at 10:26 AM, Mayuresh Gharat <
> > gharatmayures...@gmail.com <mailto:gharatmayures...@gmail.com>> wrote:
> >
> >> cool.
> >>
> >> On Tue, Mar 17, 2015 at 10:15 AM, Zakee  wrote:
> >>
> >>> Hi Mayuresh,
> >>>
> >>> The logs are already attached and are in reverse order starting
> backwards
> >>> from [2015-03-14 07:46:52,517] to the time when brokers were started.
> >>>
> >>> Thanks
> >>> Zakee
> >>>
> >>>
> >>>
> >>>> On Mar 17, 2015, at 12:07 AM, Mayuresh Gharat <
> >>> gharatmayures...@gmail.com> wrote:
> >>>>
> >>>> Hi Zakee,
> >>>>
> >>>> Thanks for the logs. Can you paste earlier logs from broker-3 up to :
> >>>>
> >>>> [2015-03-14 07:46:52,517] ERROR [ReplicaFetcherThread-2-4], Current
> >>>> offset 1754769769 for partition [Topic22kv,5] out of range; reset
> >>>> offset to 1400864851 (kafka.server.ReplicaFetcherThread)
> >>>>
> >>>> That would help us figure out what was happening on this broker before
> >>> it
> >>>> issued a replicaFetch request to broker-4.
> >>>>
> >>>> Thanks,
> >>>>
> >>>> Mayuresh
> >>>>
> >>>> On Mon, Mar 16, 2015 at 11:32 PM, Zakee  wrote:
> >>>>
> >>>>> Hi Mayuresh,
> >>>>>
> >>>>> Here are the logs.
> >>>>>
> >>>>> 
> >>>>> Old School Yearbook Pics
> >>>>> View Class Yearbooks Online Free. Search by School & Year. Look Now!
> >>>>>
> >>>
> http://thirdpartyoffers.netzero.net/TGL3231/5507ca8137dc94a805e6bst01vuc
> >>>>>
> >>>>>
> >>>>> Thanks,
> >>>>> Kazim Zakee
> >>>>>
> >>>>>
> >>>>>
> >>>>>> On Mar 16, 2015, at 10:48 AM, Mayuresh Gharat <
> >>>>> gharatmayures...@gmail.com> wrote:
> >>>>>>
> >>>>>> Can you provide more logs (complete) on Broker 3 till time :
> >>>>>>
> >>>>>> *[2015-03-14 07:46:52,517*] WARN [ReplicaFetcherThread-2-4],
> Replica 3
> >>>>> for
> >>>>>> partition [Topic22kv,5] reset its fetch offset from 1400864851 to
> >>> current
> >>>>>> leader 4's start offset 1400864851
> (kafka.server.ReplicaFetcherThread)
> >>>>>>
> >>>>>> I would like to see logs from time much before it sent the fetch
> >>> request
> >>>>> to
> >>>>>> Broker 4 to the time above. I want to check if in any case Broker 3
> >>> was a
> >>>>>> leader before broker 4 took over.
> >>>>>>
> >>>>>> Additional logs will help.
> >>>>>>
> >&

Re: Broker Exceptions

2015-03-17 Thread Mayuresh Gharat
What version are you running ?

The code for latest version says that :

1) if the log end offset of the replica is greater than the leaders log end
offset, the replicas offset will be reset to logEndOffset of the leader.

2) Else if the log end offset of the replica is smaller than the leaders
log end offset and its out of range, the replicas offset will be reset to
logStartOffset of the leader.

Your case is 2). But the only thing weird is your replica (broker 3) is
requesting for offset which is greater than the leaders log end offset.

Thanks,

Mayuresh


On Tue, Mar 17, 2015 at 10:26 AM, Mayuresh Gharat <
gharatmayures...@gmail.com> wrote:

> cool.
>
> On Tue, Mar 17, 2015 at 10:15 AM, Zakee  wrote:
>
>> Hi Mayuresh,
>>
>> The logs are already attached and are in reverse order starting backwards
>> from [2015-03-14 07:46:52,517] to the time when brokers were started.
>>
>> Thanks
>> Zakee
>>
>>
>>
>> > On Mar 17, 2015, at 12:07 AM, Mayuresh Gharat <
>> gharatmayures...@gmail.com> wrote:
>> >
>> > Hi Zakee,
>> >
>> > Thanks for the logs. Can you paste earlier logs from broker-3 up to :
>> >
>> > [2015-03-14 07:46:52,517] ERROR [ReplicaFetcherThread-2-4], Current
>> > offset 1754769769 for partition [Topic22kv,5] out of range; reset
>> > offset to 1400864851 (kafka.server.ReplicaFetcherThread)
>> >
>> > That would help us figure out what was happening on this broker before
>> it
>> > issued a replicaFetch request to broker-4.
>> >
>> > Thanks,
>> >
>> > Mayuresh
>> >
>> > On Mon, Mar 16, 2015 at 11:32 PM, Zakee  wrote:
>> >
>> >> Hi Mayuresh,
>> >>
>> >> Here are the logs.
>> >>
>> >> ____
>> >> Old School Yearbook Pics
>> >> View Class Yearbooks Online Free. Search by School & Year. Look Now!
>> >>
>> http://thirdpartyoffers.netzero.net/TGL3231/5507ca8137dc94a805e6bst01vuc
>> >>
>> >>
>> >> Thanks,
>> >> Kazim Zakee
>> >>
>> >>
>> >>
>> >>> On Mar 16, 2015, at 10:48 AM, Mayuresh Gharat <
>> >> gharatmayures...@gmail.com> wrote:
>> >>>
>> >>> Can you provide more logs (complete) on Broker 3 till time :
>> >>>
>> >>> *[2015-03-14 07:46:52,517*] WARN [ReplicaFetcherThread-2-4], Replica 3
>> >> for
>> >>> partition [Topic22kv,5] reset its fetch offset from 1400864851 to
>> current
>> >>> leader 4's start offset 1400864851 (kafka.server.ReplicaFetcherThread)
>> >>>
>> >>> I would like to see logs from time much before it sent the fetch
>> request
>> >> to
>> >>> Broker 4 to the time above. I want to check if in any case Broker 3
>> was a
>> >>> leader before broker 4 took over.
>> >>>
>> >>> Additional logs will help.
>> >>>
>> >>>
>> >>> Thanks,
>> >>>
>> >>> Mayuresh
>> >>>
>> >>>
>> >>>
>> >>> On Sat, Mar 14, 2015 at 8:35 PM, Zakee  wrote:
>> >>>
>> >>>> log.cleanup.policy is delete not compact.
>> >>>> log.cleaner.enable=true
>> >>>> log.cleaner.threads=5
>> >>>> log.cleanup.policy=delete
>> >>>> log.flush.scheduler.interval.ms=3000
>> >>>> log.retention.minutes=1440
>> >>>> log.segment.bytes=1073741824  (1gb)
>> >>>>
>> >>>> Messages are keyed but not compressed, producer async and uses kafka
>> >>>> default partitioner.
>> >>>> String message = msg.getString();
>> >>>> String uniqKey = ""+rnd.nextInt();// random key
>> >>>> String partKey = getPartitionKey();// partition key
>> >>>> KeyedMessage data = new KeyedMessage> >>>> String>(this.topicName, uniqKey, partKey, message);
>> >>>> producer.send(data);
>> >>>>
>> >>>> Thanks
>> >>>> Zakee
>> >>>>
>> >>>>
>> >>>>
>> >>>>> On Mar 14, 2015, at 4:23 PM, gharatmayures...@gmail.com wrote:
>> >>>>>
>> >>>>> Is your topic log compacted? Also if it is are the messages keyed?
>> Or
>

Re: Broker Exceptions

2015-03-17 Thread Mayuresh Gharat
cool.

On Tue, Mar 17, 2015 at 10:15 AM, Zakee  wrote:

> Hi Mayuresh,
>
> The logs are already attached and are in reverse order starting backwards
> from [2015-03-14 07:46:52,517] to the time when brokers were started.
>
> Thanks
> Zakee
>
>
>
> > On Mar 17, 2015, at 12:07 AM, Mayuresh Gharat <
> gharatmayures...@gmail.com> wrote:
> >
> > Hi Zakee,
> >
> > Thanks for the logs. Can you paste earlier logs from broker-3 up to :
> >
> > [2015-03-14 07:46:52,517] ERROR [ReplicaFetcherThread-2-4], Current
> > offset 1754769769 for partition [Topic22kv,5] out of range; reset
> > offset to 1400864851 (kafka.server.ReplicaFetcherThread)
> >
> > That would help us figure out what was happening on this broker before it
> > issued a replicaFetch request to broker-4.
> >
> > Thanks,
> >
> > Mayuresh
> >
> > On Mon, Mar 16, 2015 at 11:32 PM, Zakee  wrote:
> >
> >> Hi Mayuresh,
> >>
> >> Here are the logs.
> >>
> >> 
> >> Old School Yearbook Pics
> >> View Class Yearbooks Online Free. Search by School & Year. Look Now!
> >>
> http://thirdpartyoffers.netzero.net/TGL3231/5507ca8137dc94a805e6bst01vuc
> >>
> >>
> >> Thanks,
> >> Kazim Zakee
> >>
> >>
> >>
> >>> On Mar 16, 2015, at 10:48 AM, Mayuresh Gharat <
> >> gharatmayures...@gmail.com> wrote:
> >>>
> >>> Can you provide more logs (complete) on Broker 3 till time :
> >>>
> >>> *[2015-03-14 07:46:52,517*] WARN [ReplicaFetcherThread-2-4], Replica 3
> >> for
> >>> partition [Topic22kv,5] reset its fetch offset from 1400864851 to
> current
> >>> leader 4's start offset 1400864851 (kafka.server.ReplicaFetcherThread)
> >>>
> >>> I would like to see logs from time much before it sent the fetch
> request
> >> to
> >>> Broker 4 to the time above. I want to check if in any case Broker 3
> was a
> >>> leader before broker 4 took over.
> >>>
> >>> Additional logs will help.
> >>>
> >>>
> >>> Thanks,
> >>>
> >>> Mayuresh
> >>>
> >>>
> >>>
> >>> On Sat, Mar 14, 2015 at 8:35 PM, Zakee  wrote:
> >>>
> >>>> log.cleanup.policy is delete not compact.
> >>>> log.cleaner.enable=true
> >>>> log.cleaner.threads=5
> >>>> log.cleanup.policy=delete
> >>>> log.flush.scheduler.interval.ms=3000
> >>>> log.retention.minutes=1440
> >>>> log.segment.bytes=1073741824  (1gb)
> >>>>
> >>>> Messages are keyed but not compressed, producer async and uses kafka
> >>>> default partitioner.
> >>>> String message = msg.getString();
> >>>> String uniqKey = ""+rnd.nextInt();// random key
> >>>> String partKey = getPartitionKey();// partition key
> >>>> KeyedMessage data = new KeyedMessage >>>> String>(this.topicName, uniqKey, partKey, message);
> >>>> producer.send(data);
> >>>>
> >>>> Thanks
> >>>> Zakee
> >>>>
> >>>>
> >>>>
> >>>>> On Mar 14, 2015, at 4:23 PM, gharatmayures...@gmail.com wrote:
> >>>>>
> >>>>> Is your topic log compacted? Also if it is are the messages keyed? Or
> >>>> are the messages compressed?
> >>>>>
> >>>>> Thanks,
> >>>>>
> >>>>> Mayuresh
> >>>>>
> >>>>> Sent from my iPhone
> >>>>>
> >>>>>> On Mar 14, 2015, at 2:02 PM, Zakee  >>>> kzak...@netzero.net>> wrote:
> >>>>>>
> >>>>>> Thanks, Jiangjie for helping resolve the kafka controller migration
> >>>> driven partition leader rebalance issue. The logs are much cleaner
> now.
> >>>>>>
> >>>>>> There are a few incidences of Out of range offset even though  there
> >> is
> >>>> no consumers running, only producers and replica fetchers. I was
> trying
> >> to
> >>>> relate to a cause, looks like compaction (log segment deletion)
> causing
> >>>> this. Not sure whether this is expected behavior.
> >>>>>>
> >>>>

Re: Broker Exceptions

2015-03-17 Thread Mayuresh Gharat
Hi Zakee,

Thanks for the logs. Can you paste earlier logs from broker-3 up to :

[2015-03-14 07:46:52,517] ERROR [ReplicaFetcherThread-2-4], Current
offset 1754769769 for partition [Topic22kv,5] out of range; reset
offset to 1400864851 (kafka.server.ReplicaFetcherThread)

That would help us figure out what was happening on this broker before it
issued a replicaFetch request to broker-4.

Thanks,

Mayuresh

On Mon, Mar 16, 2015 at 11:32 PM, Zakee  wrote:

> Hi Mayuresh,
>
> Here are the logs.
>
> 
> Old School Yearbook Pics
> View Class Yearbooks Online Free. Search by School & Year. Look Now!
> http://thirdpartyoffers.netzero.net/TGL3231/5507ca8137dc94a805e6bst01vuc
>
>
> Thanks,
> Kazim Zakee
>
>
>
> > On Mar 16, 2015, at 10:48 AM, Mayuresh Gharat <
> gharatmayures...@gmail.com> wrote:
> >
> > Can you provide more logs (complete) on Broker 3 till time :
> >
> > *[2015-03-14 07:46:52,517*] WARN [ReplicaFetcherThread-2-4], Replica 3
> for
> > partition [Topic22kv,5] reset its fetch offset from 1400864851 to current
> > leader 4's start offset 1400864851 (kafka.server.ReplicaFetcherThread)
> >
> > I would like to see logs from time much before it sent the fetch request
> to
> > Broker 4 to the time above. I want to check if in any case Broker 3 was a
> > leader before broker 4 took over.
> >
> > Additional logs will help.
> >
> >
> > Thanks,
> >
> > Mayuresh
> >
> >
> >
> > On Sat, Mar 14, 2015 at 8:35 PM, Zakee  wrote:
> >
> >> log.cleanup.policy is delete not compact.
> >> log.cleaner.enable=true
> >> log.cleaner.threads=5
> >> log.cleanup.policy=delete
> >> log.flush.scheduler.interval.ms=3000
> >> log.retention.minutes=1440
> >> log.segment.bytes=1073741824  (1gb)
> >>
> >> Messages are keyed but not compressed, producer async and uses kafka
> >> default partitioner.
> >> String message = msg.getString();
> >> String uniqKey = ""+rnd.nextInt();// random key
> >> String partKey = getPartitionKey();// partition key
> >> KeyedMessage data = new KeyedMessage >> String>(this.topicName, uniqKey, partKey, message);
> >> producer.send(data);
> >>
> >> Thanks
> >> Zakee
> >>
> >>
> >>
> >>> On Mar 14, 2015, at 4:23 PM, gharatmayures...@gmail.com wrote:
> >>>
> >>> Is your topic log compacted? Also if it is are the messages keyed? Or
> >> are the messages compressed?
> >>>
> >>> Thanks,
> >>>
> >>> Mayuresh
> >>>
> >>> Sent from my iPhone
> >>>
> >>>> On Mar 14, 2015, at 2:02 PM, Zakee  >> kzak...@netzero.net>> wrote:
> >>>>
> >>>> Thanks, Jiangjie for helping resolve the kafka controller migration
> >> driven partition leader rebalance issue. The logs are much cleaner now.
> >>>>
> >>>> There are a few incidences of Out of range offset even though  there
> is
> >> no consumers running, only producers and replica fetchers. I was trying
> to
> >> relate to a cause, looks like compaction (log segment deletion) causing
> >> this. Not sure whether this is expected behavior.
> >>>>
> >>>> Broker-4:
> >>>> [2015-03-14 07:46:52,338] ERROR [Replica Manager on Broker 4]: Error
> >> when processing fetch request for partition [Topic22kv,5] offset
> 1754769769
> >> from follower with correlation id 1645671. Possible cause: Request for
> >> offset 1754769769 but we only have log segments in the range 1400864851
> to
> >> 1754769732. (kafka.server.ReplicaManager)
> >>>>
> >>>> Broker-3:
> >>>> [2015-03-14 07:46:52,356] INFO The cleaning for partition
> [Topic22kv,5]
> >> is aborted and paused (kafka.log.LogCleaner)
> >>>> [2015-03-14 07:46:52,408] INFO Scheduling log segment 1400864851 for
> >> log Topic22kv-5 for deletion. (kafka.log.Log)
> >>>> …
> >>>> [2015-03-14 07:46:52,421] INFO Compaction for partition [Topic22kv,5]
> >> is resumed (kafka.log.LogCleaner)
> >>>> [2015-03-14 07:46:52,517] ERROR [ReplicaFetcherThread-2-4], Current
> >> offset 1754769769 for partition [Topic22kv,5] out of range; reset
> offset to
> >> 1400864851 (kafka.server.ReplicaFetcherThread)
> >>>> [2015-03-14 07:46:52,517] WARN [ReplicaFetcherThread-2-4], Replica 3
> 

Re: Broker Exceptions

2015-03-16 Thread Mayuresh Gharat
Can you provide more logs (complete) on Broker 3 till time :

*[2015-03-14 07:46:52,517*] WARN [ReplicaFetcherThread-2-4], Replica 3 for
partition [Topic22kv,5] reset its fetch offset from 1400864851 to current
leader 4's start offset 1400864851 (kafka.server.ReplicaFetcherThread)

I would like to see logs from time much before it sent the fetch request to
Broker 4 to the time above. I want to check if in any case Broker 3 was a
leader before broker 4 took over.

Additional logs will help.


Thanks,

Mayuresh



On Sat, Mar 14, 2015 at 8:35 PM, Zakee  wrote:

> log.cleanup.policy is delete not compact.
> log.cleaner.enable=true
> log.cleaner.threads=5
> log.cleanup.policy=delete
> log.flush.scheduler.interval.ms=3000
> log.retention.minutes=1440
> log.segment.bytes=1073741824  (1gb)
>
> Messages are keyed but not compressed, producer async and uses kafka
> default partitioner.
> String message = msg.getString();
> String uniqKey = ""+rnd.nextInt();// random key
> String partKey = getPartitionKey();// partition key
> KeyedMessage data = new KeyedMessage String>(this.topicName, uniqKey, partKey, message);
> producer.send(data);
>
> Thanks
> Zakee
>
>
>
> > On Mar 14, 2015, at 4:23 PM, gharatmayures...@gmail.com wrote:
> >
> > Is your topic log compacted? Also if it is are the messages keyed? Or
> are the messages compressed?
> >
> > Thanks,
> >
> > Mayuresh
> >
> > Sent from my iPhone
> >
> >> On Mar 14, 2015, at 2:02 PM, Zakee  kzak...@netzero.net>> wrote:
> >>
> >> Thanks, Jiangjie for helping resolve the kafka controller migration
> driven partition leader rebalance issue. The logs are much cleaner now.
> >>
> >> There are a few incidences of Out of range offset even though  there is
> no consumers running, only producers and replica fetchers. I was trying to
> relate to a cause, looks like compaction (log segment deletion) causing
> this. Not sure whether this is expected behavior.
> >>
> >> Broker-4:
> >> [2015-03-14 07:46:52,338] ERROR [Replica Manager on Broker 4]: Error
> when processing fetch request for partition [Topic22kv,5] offset 1754769769
> from follower with correlation id 1645671. Possible cause: Request for
> offset 1754769769 but we only have log segments in the range 1400864851 to
> 1754769732. (kafka.server.ReplicaManager)
> >>
> >> Broker-3:
> >> [2015-03-14 07:46:52,356] INFO The cleaning for partition [Topic22kv,5]
> is aborted and paused (kafka.log.LogCleaner)
> >> [2015-03-14 07:46:52,408] INFO Scheduling log segment 1400864851 for
> log Topic22kv-5 for deletion. (kafka.log.Log)
> >> …
> >> [2015-03-14 07:46:52,421] INFO Compaction for partition [Topic22kv,5]
> is resumed (kafka.log.LogCleaner)
> >> [2015-03-14 07:46:52,517] ERROR [ReplicaFetcherThread-2-4], Current
> offset 1754769769 for partition [Topic22kv,5] out of range; reset offset to
> 1400864851 (kafka.server.ReplicaFetcherThread)
> >> [2015-03-14 07:46:52,517] WARN [ReplicaFetcherThread-2-4], Replica 3
> for partition [Topic22kv,5] reset its fetch offset from 1400864851 to
> current leader 4's start offset 1400864851
> (kafka.server.ReplicaFetcherThread)
> >>
> >> 
> >> Old School Yearbook Pics
> >> View Class Yearbooks Online Free. Search by School & Year. Look Now!
> >>
> http://thirdpartyoffers.netzero.net/TGL3231/5504a2032e49422021991st02vuc <
> http://thirdpartyoffers.netzero.net/TGL3231/5504a2032e49422021991st02vuc>
> >> 
> >>
> >>
> >> Thanks
> >> Zakee
> >>
> >>> On Mar 9, 2015, at 12:18 PM, Zakee  wrote:
> >>>
> >>> No broker restarts.
> >>>
> >>> Created a kafka issue:
> https://issues.apache.org/jira/browse/KAFKA-2011 <
> https://issues.apache.org/jira/browse/KAFKA-2011>
> >>>
> > Logs for rebalance:
> > [2015-03-07 16:52:48,969] INFO [Controller 2]: Resuming preferred
> replica election for partitions: (kafka.controller.KafkaController)
> > [2015-03-07 16:52:48,969] INFO [Controller 2]: Partitions that
> completed preferred replica election: (kafka.controller.KafkaController)
> > …
> > [2015-03-07 12:07:06,783] INFO [Controller 4]: Resuming preferred
> replica election for partitions: (kafka.controller.KafkaController)
> > ...
> > [2015-03-07 09:10:41,850] INFO [Controller 3]: Resuming preferred
> replica election for partitions: (kafka.controller.KafkaController)
> > ...
> > [2015-03-07 08:26:56,396] INFO [Controller 1]: Starting preferred
> replica leader election for partitions (kafka.controller.KafkaController)
> > ...
> > [2015-03-06 16:52:59,506] INFO [Controller 2]: Partitions undergoing
> preferred replica election:  (kafka.controller.KafkaController)
> >
> > Also, I still see lots of below errors (~69k) going on in the logs
> since the restart. Is there any other reason than rebalance for these
> errors?
> >
> > [2015-03-07 14:23:28,963] ERROR [ReplicaFetcherThread-2-5], Error
> for partition [Topic-11,7] to broker 5:class
> kafka.common.NotLeaderForPartitionException
> (kafka.serve

  1   2   >