Re: Timestamps unique?

2022-01-13 Thread Svante Karlsson
No guarantee,

/svante

Den tors 13 jan. 2022 kl 20:21 skrev Chad Preisler :
>
> Hello,
>
> For ConsumerRecord.timestamp() is the timestamp guaranteed to be
> unique within the topic's partition, or can there be records inside the
> topics partition that have the same timestamp?
>
> Thanks.
> Chad


Re: guidelines for replacing a lost Kafka Broker

2019-09-13 Thread Svante Karlsson
Just bring a new broker up and give it the id of the lost one. It will sync
itself

/svante

Den fre 13 sep. 2019 kl 13:51 skrev saurav suman :

> Hi,
>
> When the old data is lost and another broker is added to the cluster then
> it is a new fresh broker with no data. You can reassign the partitions of
> the topics using  kafka-reassign-partitions.sh script.
>
> Please check the below links for more details.
> https://blog.imaginea.com/how-to-rebalance-topics-in-kafka-cluster/
>
> https://kafka.apache.org/documentation/
>
> Regards,
> Saurav Suman
>
> On Fri, Sep 13, 2019 at 4:21 PM Alok Dwivedi 
> wrote:
>
> > Hi All
> > Can someone please advise what is the recommended practice for replacing
> a
> > lost broker in a Kafka cluster. Lets consider this sequence of events
> >
> >
> >   1.  3 node cluster say with 1 topic of 3 partitions and RF of 2.
> >   2.  That gives 2 partitions on each Broker (B1,B2,B3)
> >   3.  Lets say we lost B3. We still have B1 and B2 and 4 partitions.
> > Assuming each broker had only 1 leader, by losing B3 we lost 1 leader
> and 1
> > replica. Out of remaining replicas another leader will be selected for
> any
> > lost leaders so we carry on with no downtime.
> >   4.  I want to know what we should do to go back to 3 broker cluster.
> Are
> > there standard guidelines around any procedure to be done after
> restarting
> > the lost broker mainly when the lost broker has been replaced with
> another
> > instance which no longer has old log.dirs i.e. data existing on this lost
> > broker is gone. Is there a way to rebuild missing replica partitions now
> > from existing partitions or what is the recommended practice in that
> > scenario? Will we need some kind of partition re-assignment here and will
> > it ensure that we now go back from 4 partitions to 6 partitions?
> >
> >
> > Can someone please advise what happens automatically in this case and
> what
> > (if anything) should be done as a standard practice to get things back to
> > normalcy.
> >
> > Thanks
> > Alok Dwivedi
> > Data Architect
> > Maersk Always On Platform
> >
> >
> >
> >
> >
> > Classification: Public
> >
> > 
> >
> > The information contained in this message is privileged and intended only
> > for the recipients named. If the reader is not a representative of the
> > intended recipient, any review, dissemination or copying of this message
> or
> > the information it contains is prohibited. If you have received this
> > message in error, please immediately notify the sender, and delete the
> > original message and attachments.
> >
> > Maersk will as part of our communication and interaction with you collect
> > and process your personal data. You can read more about Maersk's
> collection
> > and processing of your personal data and your rights as a data subject in
> > our privacy policy <
> > https://www.maersk.com/front-page-requirements/privacy-policy>
> >
> > Please consider the environment before printing this email.
> >
>
>
> --
>
> Thanks & Regards,
>
> Saurav Suman
> MOB - 8884222451
>


Re: Same key found on 2 different partitions of compacted topic (kafka-streams)

2019-05-17 Thread Svante Karlsson
Yes that sound likely, if you changed the number of partitions then the
hashing of the key's will change destination. You need to either clear the
data (ie change retention to very small and roll the logs) or recreate the
topic.

/svante

Den fre 17 maj 2019 kl 12:32 skrev Nitay Kufert :

> I would like to add that during this time I played around with my
> kafka-streams application, including several resets of the application.
> I think maybe I caused it by not deleting all the messages from the input
> topic of the streams application before increasing the number of partitions
> and resetting.
> If this sounds like a possible explanation (It does to me) - than I guess
> this whole threads is redundant hehe :)
>
> On Fri, May 17, 2019 at 1:01 PM Nitay Kufert  wrote:
>
> > Hey all,
> >
> > I am trying to understand a situation I came across and can't find an
> > explanation...
> >
> > In my compacted topic I have 32 partitions, and when searching the files
> I
> > have found that a message is found on 2 different partitions (4 and 16).
> > It seems that the "right" partition is 16 because it keeps updating the
> > value, while the message on partition 4 is never updated and contains an
> > out-of-date value.
> > This topic is an output of my kafka-stream application.
> >
> > The problem I get is when I consume the whole topic, I get the "bad"
> > message last and this will cause the value I am seeing to be wrong
> > (out-of-date).
> > I can solve it programmatically but I am not sure this situation should
> > even happen (maybe I am wrong making this assumption?)
> >
> > Here is a snippet of the values I get when consuming and filtering for
> the
> > desired message (in *bold* you can see the "bad" message):
> >
> > CreateTime:1557847043142 unique_key_123 14351.7399916489
> > CreateTime:1557848585761 unique_key_123 14352.6399916489
> > CreateTime:1557849467135 unique_key_123 14353.0899916489
> > CreateTime:1557849565972 unique_key_123 14356.8399916489
> > CreateTime:1557850075281 unique_key_123 14357.5199916489
> > CreateTime:1557850272513 unique_key_123 14358.116489
> > CreateTime:1557850323990 unique_key_123 14358.6499916489
> > CreateTime:1557850919067 unique_key_123 14360.816489
> > CreateTime:1557851600162 unique_key_123 14361.5799916489
> > CreateTime:1557852735188 unique_key_123 14367.4299916489
> > CreateTime:1557852870033 unique_key_123 14369.6799916489
> > CreateTime:1557852913943 unique_key_123 14372.8299916489
> > CreateTime:1557853111931 unique_key_123 14375.9799916489
> > CreateTime:1557853257402 unique_key_123 14376.4299916489
> > CreateTime:1557853654326 unique_key_123 14378.6799916489
> > CreateTime:1557854126273 unique_key_123 14382.2799916489
> > CreateTime:1557854882530 unique_key_123 14387.6799916489
> > CreateTime:1557855665133 unique_key_123 14392.8599916489
> > CreateTime:1557856458505 unique_key_123 14394.6099916489
> > CreateTime:1557856814818 unique_key_123 14398.2099916489
> > CreateTime:1557857560956 unique_key_123 14400.4599916489
> > CreateTime:1557858721649 unique_key_123 14404.0599916489
> > CreateTime:1557859251577 unique_key_123 14405.8099916489
> > CreateTime:1557859736903 unique_key_123 14409.4099916489
> > *CreateTime:1556637853098 unique_key_123 9954.93*
> > CreateTime:1557860847333 unique_key_123 14411.4099916489
> > CreateTime:1557861171424 unique_key_123 14413.2099916489
> > CreateTime:1557861411332 unique_key_123 14413.8899916489
> > CreateTime:1557861459234 unique_key_123 14414.5699916489
> > CreateTime:1557861550401 unique_key_123 14422.6699916489
> > CreateTime:1557861709925 unique_key_123 14425.3699916489
> > CreateTime:1557862790701 unique_key_123 14427.6199916489
> > CreateTime:1557863450236 unique_key_123 14428.0699916489
> > CreateTime:1557865167738 unique_key_123 14433.0699916489
> > CreateTime:1557865378520 unique_key_123 14435.3199916489
> > CreateTime:1557865441654 unique_key_123 14437.0699916489
> > CreateTime:1557865503081 unique_key_123 14437.5199916489
> > CreateTime:1557865907130 unique_key_123 14438.116489
> > CreateTime:1557867190954 unique_key_123 14440.6799916489
> > CreateTime:1557867416951 unique_key_123 14441.1299916489
> > CreateTime:1557869616387 unique_key_123 14443.6099916489
> > CreateTime:1557870569098 unique_key_123 14445.3599916489
> > CreateTime:1557871319283 unique_key_123 14450.3599916489
> > CreateTime:1557872731646 unique_key_123 14451.2599916489
> >
> >
> > I would really appreciate an explanation or reassurance that this is not
> > expected behavior.
> >
> > Let me know if I can supply more information
> >
> > Thanks!
> > --
> >
> > Nitay Kufert
> > Backend Developer
> > [image: ironSource] 
> >
> > email nita...@ironsrc.com
> > mobile +972-54-5480021
> > fax +972-77-5448273
> > skype nitay.kufert.ssa
> > 9 Ehad Ha'am st. Tel- Aviv
> > ironsrc.com 
> > [image: linkedin]  [image:
> > twitter] 

Re: Streaming Data

2019-04-09 Thread Svante Karlsson
I would stream to influxdb and visualize with grafana. Works great for
dashboards. But I would rethink your line format. It's very convenient to
have tags (or labels) that are key/value pair on each metric if you ever
want to aggregate over a group of similar metrics.

Svante


Re: Kafka Deplopyment Using Kubernetes (on Cloud) - settings for log.dirs

2018-10-22 Thread Svante Karlsson
Different directories, they cannot share path. A broker will delete
everything under the log directory that it does not know about

Den mån 22 okt. 2018 kl 17:47 skrev M. Manna :

> Hello,
>
> We are thinking of rolling out Kafka on Kubernetes deployed on public cloud
> (AWS or GCP, or other). We were hoping if someone could provide some
> suggestion or insight here.
>
> What we are trying to understand is how logs.dir property is affected when
> we run Pods in a specific worker node? if we are running 3 broker PODs with
> 1 ZK Pod, how are we treating the logs.dir property for brokers (i.e. their
> location for offsets/data etc.)? Are the log.dirs mounted to different
> locations? Or, are they sharing the same path i.e. same set of files?
>
> We understand that people are quite busy with the ongoing release. So any
> insight you can provide will be highly appreciated.
>
> Thanks,
>


Re: Have connector be paused from start

2018-09-28 Thread Svante Karlsson
Sound like a workflow/pipeline thing in jenkins (or equivalent) to me.



Den ons 26 sep. 2018 kl 17:27 skrev Rickard Cardell
:

> Hi
> Is there a way to have a Kafka Connect connector begin in state 'PAUSED'?
> I.e I would like to have the connector set to paused before it can process
> any data from Kafka.
>
> Some background:
>
> I have a use case where we will push data from Kafka into S3 using Kafka
> Connect. It also involves a one-time backfill of data from Hadoop to get
> all the historic data into S3 as well, into the same dataset.
>
> To avoid too many duplicates we want the Kafka Connect pipeline and the
> HDFS-to-S3 pipeline to overlap just a few hours, i.e:
>
> 1. start kafka-connect kafka-to-s3 pipeline
> 2. wait a few hours
> 3. start pushing data from Hadoop to S3
>
> However, I have one process that deploys Kafka Connect connectors and
> another one that will handle this backfilling process, so one way of
> solving this would be if a connector could start in paused state and be
> resumed by the backfilling process.
>
> One less pretty solution to make the connector be paused before it can
> consume any data is by deploying it with faulty Kafka settings, set it to
> paused and then correct the settings, but I hope there are better solutions
> than that
>
> regards
> Rickard
>
> --
>
> *Rickard Cardell*
> Software developer
> Data Infrastructure
>
> Klarna Bank AB (publ)
> Sveavägen 46, 111 34 Stockholm
> Tel: +46 8 120 120 00 <+46812012000>
> Reg no: 556737-0431
> klarna.com
>


Re: Low level kafka consumer API to KafkaStreams App.

2018-09-13 Thread Svante Karlsson
You are doing something wrong if you need 10k threads to produce 800k
messages per second. It feels you are a factor of 1000 off. What size are
your messages?

On Thu, Sep 13, 2018, 21:04 Praveen  wrote:

> Hi there,
>
> I have a kafka application that uses kafka consumer low-level api to help
> us process data from a single partition concurrently. Our use case is to
> send out 800k messages per sec. We are able to do that with 4 boxes using
> 10k threads and each request taking 50ms in a thread. (1000/50*1*4)
>
> I understand that kafka in general uses partitions as its parallelism
> model. It is my understanding that if I want the exact same behavior with
> kafka streams, I'd need to create 40k partitions for this topic. Is that
> right?
>
> What is the overhead on creating thousands of partitions? If we end up
> wanting to send out millions of messages per second, is increasing the
> partitions the only way?
>
> Best,
> Praveen
>


Re: Reliability against rack failure

2018-08-05 Thread Svante Karlsson
You need 3 racks for your zookeepers anyway. It needs 2 out of three. How
have you solved that?

Den sön 5 aug. 2018 20:31Sanjay Awatramani 
skrev:

> Thanks for the quick response Svante.
> I forgot to mention that the deployment I am looking at has 2 racks. We
> came up with this solution, but for this specific deployment adding a rack
> is out of question.
> Is there a way to resolve this with 2 racks ?
>
> Regards,
> Sanjay
>
> On 05/08/18, 11:57 PM, "Svante Karlsson"  wrote:
>
> >3 racks,  Replication Factor = 3, min.insync.replicas=2, ack=all
> >
> >2018-08-05 20:21 GMT+02:00 Sanjay Awatramani
> >:
> >
> >> Hi,
> >>
> >> I have done some experiments and gone through kafka documentation, which
> >> makes me conclude that there is a small chance of data loss or
> >>availability
> >> in a rack scenario. Can someone please validate my understanding ?
> >>
> >> The minimum configuration for a single rack system against single
> >>machine
> >> failure is Replication Factor = 3, min.insync.replicas=2, ack=all. This
> >> will ensure that leader + at least one replica receives the data
> >>written by
> >> a producer and there will be no data loss as well as the system
> >>continues
> >> to be available for further writes by the producer when a broker goes
> >>down.
> >>
> >> With rack awareness enabled, Kafka will distribute replicas of a
> >>partition
> >> across racks, giving reliability in case of rack failure. However rack
> >> awareness is only concerned with distribution of replicas, not
> >>prioritising
> >> the order of replication when followers catch up with the leader.
> >>
> >> Moving to a rack aware setup which has 2 racks, the above configuration
> >> would create a problem because one of the racks might get 2 replicas
> >>and if
> >> that rack goes down, data will be lost.
> >>
> >> Extending the minimum configuration for a 2 rack setup, Replication
> >>Factor
> >> = 4, min.insync.replicas=2, ack=all. This will ensure that when a rack
> >>goes
> >> down, one of the replicas will be available as it would be on a
> >>different
> >> rack than the leader. This was my understanding and I cannot find any
> >> documentation to back this. I studied the mechanism by which producer
> >> writes to leader - all IN SYNC REPLICAS (ISR) pull the newest data, and
> >>if
> >> the leader confirms that at least min.insync.replicas have got the
> >>newest
> >> data, it sends an ack back to the producer. In a rack aware system, I
> >>think
> >> Kafka will send an ack even if the 2 replicas which are in sync are on
> >>the
> >> same rack. And at this instant if that rack goes down, data is lost.
> >>
> >> If we make min.insync.replicas=3, we can guarantee that one of the
> >> replicas will be on a different rack and data will not be lost. However
> >>if
> >> any rack goes down, producer¹s writes will start failing as it won¹t
> >>have
> >> the requisite replicas available.
> >>
> >> Is my understanding correct ? Is there a way to configure Kafka in a
> >>rack
> >> scenario to make it tolerant to data loss as well as make it available
> >>for
> >> further writes even when a single node or an entire rack goes down ?
> >>
> >> Regards,
> >> Sanjay
> >>
> >>
>
>


Re: Reliability against rack failure

2018-08-05 Thread Svante Karlsson
3 racks,  Replication Factor = 3, min.insync.replicas=2, ack=all

2018-08-05 20:21 GMT+02:00 Sanjay Awatramani :

> Hi,
>
> I have done some experiments and gone through kafka documentation, which
> makes me conclude that there is a small chance of data loss or availability
> in a rack scenario. Can someone please validate my understanding ?
>
> The minimum configuration for a single rack system against single machine
> failure is Replication Factor = 3, min.insync.replicas=2, ack=all. This
> will ensure that leader + at least one replica receives the data written by
> a producer and there will be no data loss as well as the system continues
> to be available for further writes by the producer when a broker goes down.
>
> With rack awareness enabled, Kafka will distribute replicas of a partition
> across racks, giving reliability in case of rack failure. However rack
> awareness is only concerned with distribution of replicas, not prioritising
> the order of replication when followers catch up with the leader.
>
> Moving to a rack aware setup which has 2 racks, the above configuration
> would create a problem because one of the racks might get 2 replicas and if
> that rack goes down, data will be lost.
>
> Extending the minimum configuration for a 2 rack setup, Replication Factor
> = 4, min.insync.replicas=2, ack=all. This will ensure that when a rack goes
> down, one of the replicas will be available as it would be on a different
> rack than the leader. This was my understanding and I cannot find any
> documentation to back this. I studied the mechanism by which producer
> writes to leader - all IN SYNC REPLICAS (ISR) pull the newest data, and if
> the leader confirms that at least min.insync.replicas have got the newest
> data, it sends an ack back to the producer. In a rack aware system, I think
> Kafka will send an ack even if the 2 replicas which are in sync are on the
> same rack. And at this instant if that rack goes down, data is lost.
>
> If we make min.insync.replicas=3, we can guarantee that one of the
> replicas will be on a different rack and data will not be lost. However if
> any rack goes down, producer’s writes will start failing as it won’t have
> the requisite replicas available.
>
> Is my understanding correct ? Is there a way to configure Kafka in a rack
> scenario to make it tolerant to data loss as well as make it available for
> further writes even when a single node or an entire rack goes down ?
>
> Regards,
> Sanjay
>
>


Re: log compaction v log rotation - best of the two worlds

2018-03-21 Thread Svante Karlsson
alt1)
if you can store a generation counter in the value of the "latest value"
topic you could do as follows

topic latest_value key [id]

topic full_history key[id, generation]

on delete get the latest_value.generation_counter and issue deletes on
full_history
key[id, 0..generation_counter]

alt2)
if you cannot store a generation_counter in "latest_value" store a
timestamp or uuid to make each key unique

topic latest_value key [id]

topic full_history key[id, timestamp/uuid]
on delete of "id" scan full_history topic from beginning and issue deletes
on full_history key[id, timestamp]

you could optimized this by having another topic that contains a "to be
purged ids"

/svante



2018-03-21 11:16 GMT+01:00 Manikumar :

> Sorry, I was wrong. For history topic, we can use regular topic with
> sufficient retention period.
> maybe others can give more ideas.
>
> On Wed, Mar 21, 2018 at 3:34 PM, Kopacki, Tomasz (Nokia - PL/Wroclaw) <
> tomasz.kopa...@nokia.com> wrote:
>
> > Do you mean I can use tombstone if my clean policy is 'delete' and it
> > still work ?
> >
> > Sincerely,
> > Tomasz Kopacki
> > DevOps Engineer @ Nokia
> >
> > -Original Message-
> > From: Manikumar [mailto:manikumar.re...@gmail.com]
> > Sent: Wednesday, March 21, 2018 11:03 AM
> > To: users@kafka.apache.org
> > Subject: Re: log compaction v log rotation - best of the two worlds
> >
> > Not sure if understood requirement correctly.  one option is to use two
> > compacted topic topics. one is for current state of the resource and one
> is
> > for history. and use tombstones whenever you want to clear them.
> >
> > On Wed, Mar 21, 2018 at 2:53 PM, Kopacki, Tomasz (Nokia - PL/Wroclaw) <
> > tomasz.kopa...@nokia.com> wrote:
> >
> > > Almost,
> > > Consider this example:
> > >
> > > |R1|R2|R1|R3|R2|R4|R4|R1|R2| <- this is an example of a stream where
> > > |R1|R2|R1|R3|R2|R4|R4|R1|R2| RX
> > > represents updates of a particular resource. I need to keep the
> > > history of changes forever for all the resources but only until
> > > resource is alive. If resource expires/dies I'd like to remove it
> > > completely. In this example consider that resource R2 dies but others
> > > are still alive. In such case I'd like to able to transform this into:
> > > |R1|R1|R3|R4|R4|R1| <- so, it's not compaction because I need the
> > > |R1|R1|R3|R4|R4|R1| history
> > > of changes but neither I can simply remove 'old' messages because I
> > > need to do this based of the lifecycle of the resource not just their
> > age.
> > >
> > >
> > >
> > > Sincerely,
> > > Tomasz Kopacki
> > > DevOps Engineer @ Nokia
> > >
> > > -Original Message-
> > > From: Manikumar [mailto:manikumar.re...@gmail.com]
> > > Sent: Wednesday, March 21, 2018 10:17 AM
> > > To: users@kafka.apache.org
> > > Subject: Re: log compaction v log rotation - best of the two worlds
> > >
> > > We can enable both compaction and retention for a topic by setting
> > > cleanup.policy="delete,compact"
> > > http://kafka.apache.org/documentation/#topicconfigs
> > >
> > > Does this handle your requirement?
> > >
> > > On Wed, Mar 21, 2018 at 2:36 PM, Kopacki, Tomasz (Nokia - PL/Wroclaw)
> > > < tomasz.kopa...@nokia.com> wrote:
> > >
> > > > Hi,
> > > > I've been recently exploring log handling in kafka and I wonder
> > > > if/how can I mixed log compaction with log rotation.
> > > > A little background first:
> > > > I have an application that uses kafka topics as a backend for event
> > > > sourcing. Messages represents change of state of my 'resources'.
> > > > Each resource has UID that is also used as a key for the messages.
> > > > My resources have a lifecycle and when their life ends I don't need
> > > > them anymore and there is no point in keeping their history. Having
> > > > said that I thought that best choice for me will be log compaction
> > > > with tombstone feature but I also would like to have a possibility
> > > > to keep history of changes for the resources(only until they die).
> > > > With those requirements I'd love to have a possibility to use
> > > > tombstone feature for log rotation but I guess it ain't working like
> > > that.
> > > >
> > > > Does anyone here had similar requirements and solve that somehow ?
> > > >
> > > >
> > > > Sincerely,
> > > > Tomasz Kopacki
> > > > DevOps Engineer @ Nokia
> > > >
> > > >
> > >
> >
>


Re: Suggestion over architecture

2018-03-10 Thread Svante Karlsson
Yes, but I misread his reply and thought that he meant the "kafka rest
proxy". But now I see that we say the same thing - sorry for the confusion.

The normal way to do the authentication and authorization would  be in the
rest/grpc endpoint before sending it to kafka.

2018-03-10 19:39 GMT+01:00 adrien ruffie :

> Thank Nick, thank Svante,
>
>
> Svante, you say like Nick right ? Send a client message type which
> encapsulates the emailing to a REST endpoint in our infrastructure and the
> endpoint
>
> push into a kafka's topic ?
>
> And if we need to ensure that client which send any emailing is allowed,
> where you potentially check is authorization ? After message reception on
> the REST endpoint ? Directly by the sender in on premise webapp ? Ok before
> push the topic ? I think it's really better to check that before sending
> message to our infrastructure side, but the webapp is unaware if it allowed
> or not ...
>
>
>
> thank for your reply 😊
>
> Adrien
>
> 
> De : Svante Karlsson 
> Envoyé : samedi 10 mars 2018 19:13:04
> À : users@kafka.apache.org
> Objet : Re: Suggestion over architecture
>
> You do not want to expose the kafka instance to your different clients. put
> some api endpoint between. rest/grpc or whatever.
>
> 2018-03-10 19:01 GMT+01:00 Nick Vasilyev :
>
> > Hard to say without more info, but why not just deploy something like a
> > REST api and expose it to your clients, they will send the data to the
> api
> > and it will in turn feed the Kafka  topic.
> >
> > You will minimize coupling and be able to scale / upgrade easier.
> >
> > On Mar 10, 2018 2:47 AM, "adrien ruffie" 
> > wrote:
> >
> > > Hello all,
> > >
> > >
> > > in my company we plan to set up the following architecture for our
> > client:
> > >
> > >
> > > An internal kafka cluster in our company, and deploy a webapp (our
> > > software solution) on premise for our clients.
> > >
> > >
> > > We think to create one producer by "webapp" client in order to push in
> a
> > > global topic (in our kafka) an message which represent an email.
> > >
> > >
> > > The idea behind this, is to unload the client webapp to process several
> > > mass mailing operation groups, and treat them ourselves with
> > >
> > > dedicateds servers into our infrastructure. And each dedicated server
> > will
> > > be a topic's consumer where the message(email) will be streamed.
> > >
> > >
> > > My main question is, do you think, that each client can be a producer ?
> > > (if we have for example 200/300 clients ?)
> > >
> > > Second question, each client should be a producer ? 😊
> > >
> > > Do you have another idea for this subject ?
> > >
> > >
> > > Thank you & best regards.
> > >
> > >
> > > Adrien
> > >
> >
>


Re: Suggestion over architecture

2018-03-10 Thread Svante Karlsson
You do not want to expose the kafka instance to your different clients. put
some api endpoint between. rest/grpc or whatever.

2018-03-10 19:01 GMT+01:00 Nick Vasilyev :

> Hard to say without more info, but why not just deploy something like a
> REST api and expose it to your clients, they will send the data to the api
> and it will in turn feed the Kafka  topic.
>
> You will minimize coupling and be able to scale / upgrade easier.
>
> On Mar 10, 2018 2:47 AM, "adrien ruffie" 
> wrote:
>
> > Hello all,
> >
> >
> > in my company we plan to set up the following architecture for our
> client:
> >
> >
> > An internal kafka cluster in our company, and deploy a webapp (our
> > software solution) on premise for our clients.
> >
> >
> > We think to create one producer by "webapp" client in order to push in a
> > global topic (in our kafka) an message which represent an email.
> >
> >
> > The idea behind this, is to unload the client webapp to process several
> > mass mailing operation groups, and treat them ourselves with
> >
> > dedicateds servers into our infrastructure. And each dedicated server
> will
> > be a topic's consumer where the message(email) will be streamed.
> >
> >
> > My main question is, do you think, that each client can be a producer ?
> > (if we have for example 200/300 clients ?)
> >
> > Second question, each client should be a producer ? 😊
> >
> > Do you have another idea for this subject ?
> >
> >
> > Thank you & best regards.
> >
> >
> > Adrien
> >
>


Re: Consultant Help

2018-03-02 Thread Svante Karlsson
try https://www.confluent.io/ - that's what they do

/svante

2018-03-02 21:21 GMT+01:00 Matt Stone :

> We are looking for a consultant or contractor that can come onsite to our
> Ogden, Utah location in the US, to help with a Kafka set up and maintenance
> project.  What we need is someone with the knowledge and experience to
> build out the Kafka environment from scratch.
>
> We are thinking they would need to be onsite for 6-12 months  to set it
> up, and mentor some of our team so they can get up to speed to do the
> maintenance once the contractor is gone.  If anyone has the experience
> setting up Kafka from scratch in a Linux environment, maintain node
> clusters, and help train others on the team how to do it, and you are
> interested in a long term project working at the client site, I would love
> to start up  a discussion, to see if we could use you for the role.
>
> I would also be interested in hearing about any consulting firms that
> might have resources that could help with this role.
>
> Matt Stone
>
>
> -Original Message-
> From: Matt Daum [mailto:m...@setfive.com]
> Sent: Friday, March 2, 2018 1:11 PM
> To: users@kafka.apache.org
> Subject: Re: Kafka Setup for Daily counts on wide array of keys
>
> Actually it looks like the better way would be to output the counts to a
> new topic then ingest that topic into the DB itself.  Is that the correct
> way?
>
> On Fri, Mar 2, 2018 at 9:24 AM, Matt Daum  wrote:
>
> > I am new to Kafka but I think I have a good use case for it.  I am
> > trying to build daily counts of requests based on a number of
> > different attributes in a high throughput system (~1 million
> > requests/sec. across all  8 servers).  The different attributes are
> > unbounded in terms of values, and some will spread across 100's of
> > millions values.  This is my current through process, let me know
> > where I could be more efficient or if there is a better way to do it.
> >
> > I'll create an AVRO object "Impression" which has all the attributes
> > of the inbound request.  My application servers then will on each
> > request create and send this to a single kafka topic.
> >
> > I'll then have a consumer which creates a stream from the topic.  From
> > there I'll use the windowed timeframes and groupBy to group by the
> > attributes on each given day.  At the end of the day I'd need to read
> > out the data store to an external system for storage.  Since I won't
> > know all the values I'd need something similar to the KVStore.all()
> > but for WindowedKV Stores.  This appears that it'd be possible in 1.1
> > with this
> > commit: https://github.com/apache/kafka/commit/
> > 1d1c8575961bf6bce7decb049be7f10ca76bd0c5 .
> >
> > Is this the best approach to doing this?  Or would I be better using
> > the stream to listen and then an external DB like Aerospike to store
> > the counts and read out of it directly end of day.
> >
> > Thanks for the help!
> > Daum
> >
>


Re: Hardware Guidance

2018-03-01 Thread Svante Karlsson
It's per broker. Usually you run with 4-6GB of java heap. The rest is used
as disk cache and it's more that 64GB seems like a sweet spot between
memory cost and performance.

/svante

2018-03-01 18:30 GMT+01:00 Michal Michalski :

> I'm quite sure it's per broker (it's a standard way to provide
> recommendation on node sizes in systems like Kafka), but you should
> definitely read it in the context of the data size and traffic the cluster
> has to handle. I didn't read the presentation, so not sure if it contains
> such information (if it doesn't, maybe the video does?), but this context
> is necessary to size Kafka properly (that includes const efficiency). To
> put that in context: I've been running small Kafka cluster on AWS'
> m4.xlarge instances in the past with no issues (low number of terabytes
> stored in total, low single-digit thousands of messages produced per second
> in peak) - I actually think it was oversized for that use case.
>
> On 1 March 2018 at 17:09, adrien ruffie  wrote:
>
> > Hi all,
> >
> >
> > on the slide 5 in the following link:
> >
> > https://fr.slideshare.net/HadoopSummit/apache-kafka-best-practices/1
> >
> >
> >
> > The "Memory" mentions that "24GB+ (for small) and 64GB+ (for large)"
> Kafka
> > Brokers
> >
> > but is it 24 or 64 GB spread over all brokers ? Or 24 GB for example for
> > each broker ?
> >
> >
> > Thank you very much,
> >
> >
> > and best regards,
> >
> >
> > Adrien
> >
>


Re: Regarding : Store stream for infinite time

2018-01-23 Thread Svante Karlsson
Yes, it will store the last value for each key

2018-01-23 18:30 GMT+01:00 Aman Rastogi :

> Hi All,
>
> We have a use case to store stream for infinite time (given we have enough
> storage).
>
> We are planning to solve this by Log Compaction. If each message key is
> unique and Log compaction is enabled, it will store whole stream for
> infinite time. Just wanted to check if my assumption is correct and this is
> an appropriate way to solve this.
>
> Thanks in advance.
>
> Regards,
> Aman
>


Re: Kafka Replication Factor

2018-01-17 Thread Svante Karlsson
whats your config for min.insync.replicas?

2018-01-17 13:37 GMT+01:00 Sameer Kumar :

> Hi,
>
> I have a cluster of 3 Kafka brokers, and replication factor is 2. This
> means I can tolerate failure of 1 node without data loss.
>
> Recently, one of my node crashed and some of my partitions went offline.
> I am not sure if this should be the case. Am I missing something.
>
> -Sameer.
>


Re: one machine that have four network.....

2018-01-16 Thread Svante Karlsson
Even if you bind your socket to an ip of a specific card, when the packet
is about to leave your host it hits the routing table and gets routed
through the interface with least cost (arbitrary but static since all
interfaces have same cost since they are on the same subnet) thus you will
not reach transfer rate above a single card. To bypass this you either have
to make adjustments to your hosts routing settings or just assign each nic
a ip in different subnets (and make sure they can reach each other...)

that said, I agree with Jakob - just run the services on different ports.

regards


2018-01-16 9:56 GMT+01:00 Jakub Scholz :

> Maybe a stupid question ... but if you just want to create a setup with 3
> zookeepers and 3 brokers on a single machine you just need to use different
> port numbers. You do not need separate network interfaces. What are you
> trying to achieve with the different network interfaces?
>
> Regards
> Jakub
>
> On Tue, Jan 16, 2018 at 3:14 AM, 猪少爷  wrote:
>
> > hi guys,
> >  I have a linux(Centos7) that have four network interface,  and
> > i'm tryying  to build a pseudo-cluster in this machine.
> > Four cards correspond to four ip(101, 104,105,106),
> > and three brokers config :
> > listeners=xxx.xxx.xxx.104:9090.
> > listeners=xxx.xxx.xxx.105:9091.
> > listeners=xxx.xxx.xxx.106:9092.
> > three zookeepers: zk1---xxx.104:2181, zk2---xxx:105:2182,
> > zk3---xxx.106:2183.
> >
> >
> > run zks first, then run in right.
> > run kafka broker,  then  run in  right.
> >
> >
> > produce data to this to this pseudo-cluster...Trouble is coming:
> >
> >
> > sar -n DEV 1:
> > network-101-- IO(0-1000Mbps)
> > network-104-- IO(0-10Kbps)
> > network-105-- IO(0-10kbps)
> > network-106-- IO(0-10Kbps)
> > lo---IO(0-1000Mbps)
> >
> >
> > When production data throughput reaches 1000 Mbps, reproduction
> fails,Then
> > unplug the network cable of 101. continue:
> >
> >
> > sar -n DEV 1:
> > network-101-- IO(0bps)
> > network-104-- IO(0-10Kbps)
> > network-105-- IO(0-10kbps)
> > network-106-- IO(0-1000Mbps)
> > lo---IO(0-1000Mbps)
> >
> >
> >
> > what happens, and why??
> > Would you like to give me some advice, guys?
> >
> >
> > Urgent, online and so on
>


Re: Broker won't exit...

2018-01-10 Thread Svante Karlsson
if you really want all the brokers to die, try

change server.properties

controlled.shutdown.enable=false

I had a similar problem on dev laptop with a single broker. It refused to
die on system shutdowns (or took a very long time).

2018-01-10 12:57 GMT+01:00 Ted Yu :

> Skip:Can you pastebin the stack trace of the stuck broker ?
> Thanks
>  Original message From: Skip Montanaro <
> skip.montan...@gmail.com> Date: 1/10/18  3:52 AM  (GMT-08:00) To:
> users@kafka.apache.org Subject: Re: Broker won't exit...
> Did you stop the broker before stoping zookeeper?
>
>
> Yes. My stop script executes the server stop scripts in reverse order from
> my start script. Should I have stuck in a couple second sleep between
> stopping the brokers and stopping zookeeper?
>
> I was actually running two brokers. The one my stop script stopped first
> exited properly.
>
> Skip
>


Re: Multiple brokers - do they share the load?

2017-11-28 Thread Svante Karlsson
You are connecting to a single seed node - your kafka library will then
under the hood connect to the partition leaders for each partition you
subscribe or post to.

The load is not different compared to if you gave all nodes as connect
parameter. However if your seed node crashes then your client cannot
connect to the cluster.,.

2017-11-28 15:06 GMT+01:00 Skip Montanaro :

> Apologies for the rather long set-up...
>
> I've been using Kafka as a client for a few months now. The setup I've
> been using has three brokers on separate servers, all listening to
> port 9092. My consumers always connected to server1:9092. I've ignored
> server2 and server3.
>
> Now I'm starting to mess around a bit with setting up my own
> itsy-bitsy cluster. Step one is a single instance at host1:9092. Next
> step in the instructions (I'm following the recipe laid out in the
> documentation) will be to add two more brokers at host1:9093 and
> host1:9094.
>
> My question: If every consumer connects to host1:9092 will the brokers
> listening to the other ports starve for attention, or does the
> connection process somehow redirect clients to the other brokers so
> the three (or more) brokers get fairly equitable loads?
>
> Thanks,
>
> Skip Montanaro
>


Re: Building news feed of social app using kafka

2017-11-01 Thread Svante Karlsson
Nope, that's the wrong design. It does not scale. You would end up in a
wide and shallow thing. To few messages per partition to make sense. You
want many thousands per partition per second to amortize the consumer to
broker round-trip.


On Nov 1, 2017 21:12, "Anshuman Ghosh" 
wrote:

> Hello!
>
> I am currently designing a social app (with the whole gamut of users
> following each other and personal feeds - consisting of posts by those you
> follow). To implement this "news feed" for each user, I was considering
> having a Kafka stream/topic per user.
>
> Given that our intention is to get 1M+ users on the app, is this a good
> idea to have 1 topic per user, thus ending up with a million topics?
>
> Thanks and regards,
> Anshuman
>


Re: Kafka Streams Avro SerDe version/id caching

2017-10-03 Thread Svante Karlsson
I've implemented the same logic for a c++ client - caching is the only way
to go since the performance impact of not doing it would be to big. So bet
on caching on all clients.

2017-10-03 18:12 GMT+02:00 Damian Guy :

> If you are using the confluent schema registry then the will be cached by
> the SchemaRegistryClient.
>
> Thanks,
> Damian
>
> On Tue, 3 Oct 2017 at 09:00 Ted Yu  wrote:
>
> > I did a quick search in the code base - there doesn't seem to be caching
> as
> > you described.
> >
> > On Tue, Oct 3, 2017 at 6:36 AM, Kristopher Kane 
> > wrote:
> >
> > > If using a Byte SerDe and schema registry in the consumer configs of a
> > > Kafka streams application, does it cache the Avro schemas by ID and
> > version
> > > after fetching from the registry once?
> > >
> > > Thanks,
> > >
> > > Kris
> > >
> >
>


Re: Is there a way in increase number of partitions

2017-08-21 Thread Svante Karlsson
Short answer - you cannot. The existing data is not reprocessed since kafka
itself has no knowledge on how you did your partitioning.

The normal workaround is that you stop producers and consumers. Create a
new topic with the desired number of partitions. Consume the old topic from
beginning and write all data to new topic. Restart producers and consumers
from your new topic. You most likely will mess up your consumer offsets.





2017-08-21 8:32 GMT+02:00 Sachin Mittal :

> Hi,
> I have a topic which has four partitions and data is distributed among
> those based on a specified key.
>
> If I want to increase the number of partitions to six how can I do the same
> and also making sure that messages for a given key always go to one
> (specific) partition only.
>
> Will the existing message redistribute themselves among new partition.
>
> Also say earlier message of key A went to partition 1 and going forward any
> new message go to same partition where earlier messages for that key are?
>
> And by increasing partitions some keys may use a different partition now,
> so how do I ensure the case of all messages of that key belong to single
> partition.
>
> Thanks
> Sachin
>


Re: Kafka rack-id and min in-sync replicas

2017-08-20 Thread Svante Karlsson
I think you are right, The rack awareness is used to spread the partitions
on creation, assignment -etc  so get as many racks as your replication
count.

/svante

2017-08-20 13:33 GMT+02:00 Carl Samuelson :

> Hi
>
> I asked this question on SO here:
> https://stackoverflow.com/questions/45778455/kafka-rack-
> id-and-min-in-sync-replicas.
> Basically, I am trying to understand how rack-id helps in DR situations.
>
> Kafka has introduced rack-id to provide redundancy capabilities if a whole
> rack fails. There is a min in-sync replica setting to specify the minimum
> number of replicas that need to be in-sync before a producer receives an
> ack (-1 / all config). There is an unclean leader election setting to
> specify whether a leader can be elected when it is not in-sync.
>
> So, given the following scenario:
>
>- Two racks. Rack 1, 2.
>- Replication count is 4.
>- Min in-sync replicas = 2
>- Producer ack=-1 (all).
>- Unclean leader election = false
>
> Is it possible that there is a moment where all 4 replicas are available,
> but the two in-sync replicas both come from rack 1, so the producer
> receives an ack and at that point rack 1 crashes (before any replicas from
> rack 2 are in-sync)? This means that rack 2 will only contain unclean
> replicas and no producers would be able to add messages to the partition
> essentially grinding to a halt. The replicas would be unclean in any case,
> so no new leader could be elected in any case.
>
> Is my analysis correct, or is there something under the hood to ensure that
> the replicas forming min in-sync replicas have to be from different racks?
> Since replicas on the same rack would have lower latency it seems that the
> above scenario is reasonably likely.
>
> 
>
> Thanks,
>
> Carl
>


Re: Different Schemas on same Kafka Topic

2017-08-17 Thread Svante Karlsson
Well, the purpose of the schema registry is to map a 16 bit id to a avro
schema. with or without rules on how you may update a schema with a given
name. To decode avro you need a schema. Either you "know" whats in a given
topic and then you can hardcode it. Or you prepend it with something. ie
the 16 bit id. But you could hardcode your possible schemas in the
consumers and prepend it with something else (fingerprint, uuid ...).

That said a schema registry is by far the easiest way forward. You really
should stick to either text encoding or avro using schema registry. I've
been down the road with lots of binary schemas and it works fine for a
while. When your schema changes is when you will feel the pain.

even if using NONE, you still have a benefits from the schema registry

regards
svante

2017-08-17 20:10 GMT+02:00 Sreejith S :

> Hi Stephen,
>
> Thank you very much.
>
> Please give clarity on the statement.
>
> "each unique avro schema has a unique id associated with it. That id
> can be used across multiple different topics. The enforcement of which
> schemas are allowed in a particular topic comes down to the combination of
> the subject (usually topic-name-key/value) and version (the version itself
> starts at 1 inside the subject, and itself has an id that ties to the
> globally unique schema id). ".
>
> How ?. You are always registering a schema against a topic using the
> topicname and schema registry is assiging a unique id across the registry
> cluster. Where is the global unique schema id here ?
>
> I think in Producer Consumer API you will have more freedom to pass a
> schema id of ur choice and ask avro serialize/deserialize. But in connect
> framework all these things are abstracted.
>
> Its a good pointer on using NONE compatibility type so that even if schema
> registry holds same id for a topic, each schema version under it is
> entirely different schema. Is my understanding correct ?
>
> But,  when defines NONE,  the purpose of the schema registry itself
> lost.Rght ?
>
> Regards
> Sreejith
>
> On 17-Aug-2017 11:03 pm, "Stephen Durfey"  wrote:
>
> > There is a little nuance to this topic (hehe). When it comes down to it,
> > yes, each unique avro schema has a unique id associated with it. That id
> > can be used across multiple different topics. The enforcement of which
> > schemas are allowed in a particular topic comes down to the combination
> of
> > the subject (usually topic-name-key/value) and version (the version
> itself
> > starts at 1 inside the subject, and itself has an id that ties to the
> > globally unique schema id). . So, yes, you can have multiple schemas
> > within
> > the same topic, and thats perfectly fine, so long as you're correctly
> > configuring the schema registry.
> >
> > Whether or not a schema is allowed to be registered for a particular
> > subject is dependent upon the type of avro compatilibty enforced. There
> are
> > 4 types: BACKWARD, FORWARD, FULL (combines forward and backward), and
> NONE.
> > The schema registry is going to evaluate the schema being published to
> the
> > history of schemas it knows about in the past for that subject + version
> > combination. If the schema is evolved correctly according to the
> particular
> > type configured in the schema registry, it will be allowed.
> >
> > So, if you select NONE as the compatibility type the schema registry will
> > allow any schema to be registered, even if they are not compatible
> because
> > you've informed the registry not to care. So, you should really choose
> > amongst backward, forward, and full. I use FULL in production because the
> > data being written is long lived, and will have multiple readers and
> > writers of the data, and the data needs to be passively evolved. Backward
> > and forward can be fine too, just depending upon the needs of the data
> > being produced and consumed.
> >
> > On Thu, Aug 17, 2017 at 12:22 PM, Tauzell, Dave <
> > dave.tauz...@surescripts.com> wrote:
> >
> > > Hmm, I think you are right that you cannot have multiple schemas on the
> > > same topic.
> > >
> > > -Dave
> > >
> > >
> > > -Original Message-
> > > From: Sreejith S [mailto:srssreej...@gmail.com]
> > > Sent: Thursday, August 17, 2017 11:42 AM
> > > To: users@kafka.apache.org
> > > Subject: RE: Different Schemas on same Kafka Topic
> > >
> > > Hi Dave,
> > >
> > > Would like to get a clarity on one thing.  If i register more than one
> > > schema for a topic, i am providing topic-key, topic-value to the schema
> > > registry.
> > >
> > > Id is created by schema registry and it will create different version
> of
> > > different schema. Still all schema have same id.  Am i right ?
> > >
> > > If so, all avro messages holds same id. Then how multiple schemas on
> same
> > > topic possble ?
> > >
> > > Please clarify
> > >
> > > Thanks,
> > > Sreejith
> > >
> > > On 17-Aug-2017 9:49 pm, "Tauzell, Dave" 
> > > wrote:
> > >
> > > > > How does consumer  know A is the avro class when there 

Re: Limit of simultaneous consumers/clients?

2017-07-31 Thread Svante Karlsson
It feels like the wrong usecase for kafka. Its not meant as something you
connect your end users to. Maybe MQTT would be a better fit as the serving
layer to end users or just poll as you said.

2017-07-31 17:10 GMT+02:00 Thakrar, Jayesh :

> You may want to look at the Kafka REST API instead of having so many
> direct client connections.
>
> https://github.com/confluentinc/kafka-rest
>
>
>
> On 7/31/17, 1:29 AM, "Dr. Sven Abels"  wrote:
>
> Hi guys,
>
> does anyone have an idea about the possible limits of concurrent users?
>
> -Ursprüngliche Nachricht-
> Von: Dr. Sven Abels [mailto:ab...@ascora.de]
> Gesendet: Freitag, 28. Juli 2017 12:11
> An: users@kafka.apache.org
> Betreff: Limit of simultaneous consumers/clients?
>
> Hello,
>
>
>
> we would like to use Kafka as a way to inform users about events of
> certain
> topics. For this purpose, we want to develop Windows and Mac clients
> which
> users would install on their desktop PCs.
>
>
>
> We got a broad number of users, so it's likely that there will be
> >10.000
> clients running in parallel.
>
>
>
> If I understand it correctly, then Kafka uses Sockets and the user
> clients
> would maintain an active connection to Kafka. If this is correct, I
> wondered:
>
>
>
> -What is the limit of clients that may run in parallel? Do 10.000
> clients
> mean 10.000 server connections? Would that be a problem for a typical
> server?
>
>
>
> -Can we solve this problem by simply running kafka on several servers
> and
> using something like a round-robin for the DNS so that the clients
> connect
> to different servers?
>
>
>
> -We expect to only send a few messages each day. Messages should arrive
> quickly (<30 seconds delay) but we don't need realtime. Considering
> this: Is
> kafka still a good solution or should we better switch to e.g. polling
> of
> clients to the server without Kafka?
>
>
>
>
>
>
>
> Best regards,
>
>
>
> Sven
>
>
>
>
>
>


Re: Using JMXMP to access Kafka metrics

2017-07-19 Thread Svante Karlsson
I've used jolokia which gets JMX metrics without RMI (actually json over
http)
https://jolokia.org/

Integrates nicely with telegraf (and influxdb)

2017-07-19 20:47 GMT+02:00 Vijay Prakash <
vijay.prak...@microsoft.com.invalid>:

> Hey,
>
> Is there a way to use JMXMP instead of RMI to access Kafka metrics through
> JMX? I tried creating a JMXMP JMXConnector but the connect attempt just
> hangs forever.
>
> Thanks,
> Vijay
>


Re: Issue in Kafka running for few days

2017-04-30 Thread Svante Karlsson
@michal

My interpretation is that he's running 2 instances of zookeeper - not 6. (1
on the "4 broker machine" and one on the other)

I'm not sure where that leaves you in zookeeper land - ie if you happen to
have a timeout between the two zookeepers will you be out of service or
will you have a split brain problem? None of the alternatives are good.
That said - it should be visible in the logs.

Anyway two zk is not a good config - stick to one or go to three.





2017-04-30 15:41 GMT+02:00 Michal Borowiecki 
:

> Hi Jan,
>
> Correct. As I said before it's not common or recommended practice to run
> an even number, and I wouldn't recommend it myself. I hope it didn't sound
> as if I did.
>
> However, I don't see how this would cause the issue at hand unless at
> least 3 out of the 6 zookeepers died, but that could also have happened in
> a 5 node setup.
>
> In either case, changing the number of zookeepers is not a prerequisite to
> progress debugging this issue further.
>
> Cheers,
>
> Michal
>
> On 30/04/17 13:35, jan wrote:
>
> I looked this up yesterday  when I read the grandparent, as my old
> company ran two and I needed to know.
> Your link is a bit ambiguous but it has a link to the zookeeper
> Getting Started guide which says this:
>
> "
> For replicated mode, a minimum of three servers are required, and it
> is strongly recommended that you have an odd number of servers. If you
> only have two servers, then you are in a situation where if one of
> them fails, there are not enough machines to form a majority quorum.
> Two servers is inherently less stable than a single server, because
> there are two single points of failure.
> "
> <https://zookeeper.apache.org/doc/r3.4.10/zookeeperStarted.html> 
> <https://zookeeper.apache.org/doc/r3.4.10/zookeeperStarted.html>
>
> cheers
>
> jan
>
>
> On 30/04/2017, Michal Borowiecki  
>  wrote:
>
> Svante, I don't share your opinion.
> Having an even number of zookeepers is not a problem in itself, it
> simply means you don't get any better resilience than if you had one
> fewer instance.
> Yes, it's not common or recommended practice, but you are allowed to
> have an even number of zookeepers and it's most likely not related to
> the problem at hand and does NOT need to be addressed 
> first.https://zookeeper.apache.org/doc/r3.4.10/zookeeperAdmin.html#sc_zkMulitServerSetup
>
> Abhit, I'm afraid the log snippet is not enough for me to help.
> Maybe someone else in the community with more experience can recognize
> the symptoms but in the meantime, if you haven't already done so, you
> may want to search for similar issues:
> https://issues.apache.org/jira/issues/?jql=project%20%3D%20KAFKA%20AND%20text%20~%20%22ZK%20expired%3B%20shut%20down%20all%20controller%22
>
> searching for text like "ZK expired; shut down all controller" or "No
> broker in ISR is alive for" or other interesting events form the log.
>
> Hope that helps,
> Michal
>
>
> On 26/04/17 21:40, Svante Karlsson wrote:
>
> You are not supposed to run an even number of zookeepers. Fix that first
>
> On Apr 26, 2017 20:59, "Abhit Kalsotra"  
>  wrote:
>
>
> Any pointers please
>
>
> Abhi
>
> On Wed, Apr 26, 2017 at 11:03 PM, Abhit Kalsotra  
> 
> wrote:
>
>
> Hi *
>
> My kafka setup
>
>
> **OS: Windows Machine*6 broker nodes , 4 on one Machine and 2 on other
> Machine*
>
> **ZK instance on (4 broker nodes Machine) and another ZK on (2 broker
> nodes machine)*
> ** 2 Topics with partition size = 50 and replication factor = 3*
>
> I am producing on an average of around 500 messages / sec with each
> message size close to 98 bytes...
>
> More or less the message rate stays constant throughout, but after
>
> running
>
> the setup for close to 2 weeks , my Kafka cluster broke and this
> happened
> twice in a month.  Not able to understand what's the issue, Kafka gurus
> please do share your inputs...
>
> the controlle.log file at the time of Kafka broken looks like
>
>
>
>
> *[2017-04-26 12:03:34,998] INFO [Controller 0]: Broker failure callback
> for 0,1,3,5,6 (kafka.controller.KafkaController)[2017-04-26
>
> 12:03:34,998]
>
> INFO [Controller 0]: Removed ArrayBuffer() from list of shutting down
> brokers. (kafka.controller.KafkaController)[2017-04-26 12:03:34,998]
>
> INFO
>
> [Partition state machine on Controller 0]: Invoking state change to
> OfflinePartition for partitions
> [__consumer_offsets,19],[mytopic,11],[__consumer_
>
> offsets,30],[mytopicOLD,18],[mytopic,13],[__consumer_
> of

Re: Issue in Kafka running for few days

2017-04-26 Thread Svante Karlsson
You are not supposed to run an even number of zookeepers. Fix that first

On Apr 26, 2017 20:59, "Abhit Kalsotra"  wrote:

> Any pointers please
>
>
> Abhi
>
> On Wed, Apr 26, 2017 at 11:03 PM, Abhit Kalsotra 
> wrote:
>
> > Hi *
> >
> > My kafka setup
> >
> >
> > **OS: Windows Machine*6 broker nodes , 4 on one Machine and 2 on other
> > Machine*
> >
> > **ZK instance on (4 broker nodes Machine) and another ZK on (2 broker
> > nodes machine)*
> > ** 2 Topics with partition size = 50 and replication factor = 3*
> >
> > I am producing on an average of around 500 messages / sec with each
> > message size close to 98 bytes...
> >
> > More or less the message rate stays constant throughout, but after
> running
> > the setup for close to 2 weeks , my Kafka cluster broke and this happened
> > twice in a month.  Not able to understand what's the issue, Kafka gurus
> > please do share your inputs...
> >
> > the controlle.log file at the time of Kafka broken looks like
> >
> >
> >
> >
> >
> >
> >
> >
> >
> >
> >
> >
> >
> >
> >
> > *[2017-04-26 12:03:34,998] INFO [Controller 0]: Broker failure callback
> > for 0,1,3,5,6 (kafka.controller.KafkaController)[2017-04-26
> 12:03:34,998]
> > INFO [Controller 0]: Removed ArrayBuffer() from list of shutting down
> > brokers. (kafka.controller.KafkaController)[2017-04-26 12:03:34,998]
> INFO
> > [Partition state machine on Controller 0]: Invoking state change to
> > OfflinePartition for partitions
> > [__consumer_offsets,19],[mytopic,11],[__consumer_
> offsets,30],[mytopicOLD,18],[mytopic,13],[__consumer_
> offsets,47],[mytopicOLD,26],[__consumer_offsets,29],[
> mytopicOLD,0],[__consumer_offsets,41],[mytopic,44],[
> mytopicOLD,38],[mytopicOLD,2],[__consumer_offsets,17],[__
> consumer_offsets,10],[mytopic,20],[mytopic,23],[mytopic,30],
> [__consumer_offsets,14],[__consumer_offsets,40],[mytopic,
> 31],[mytopicOLD,43],[mytopicOLD,19],[mytopicOLD,35]
> ,[__consumer_offsets,18],[mytopic,43],[__consumer_offsets,26],[__consumer_
> offsets,0],[mytopic,32],[__consumer_offsets,24],[
> mytopicOLD,3],[mytopic,2],[mytopic,3],[mytopicOLD,45],[
> mytopic,35],[__consumer_offsets,20],[mytopic,1],[
> mytopicOLD,33],[__consumer_offsets,5],[mytopicOLD,47],[__
> consumer_offsets,22],[mytopicOLD,8],[mytopic,33],[
> mytopic,36],[mytopicOLD,11],[mytopic,47],[mytopicOLD,20],[
> mytopic,48],[__consumer_offsets,12],[mytopicOLD,32],[_
> _consumer_offsets,8],[mytopicOLD,39],[mytopicOLD,27]
> ,[mytopicOLD,49],[mytopicOLD,42],[mytopic,21],[mytopicOLD,
> 31],[mytopic,29],[__consumer_offsets,23],[mytopicOLD,21],[_
> _consumer_offsets,48],[__consumer_offsets,11],[mytopic,
> 18],[__consumer_offsets,13],[mytopic,45],[mytopic,5],[
> mytopicOLD,25],[mytopic,6],[mytopicOLD,23],[mytopicOLD,37]
> ,[__consumer_offsets,6],[__consumer_offsets,49],[
> mytopicOLD,13],[__consumer_offsets,28],[__consumer_offsets,4],[__consumer_
> offsets,37],[mytopic,12],[mytopicOLD,30],[__consumer_
> offsets,31],[__consumer_offsets,44],[mytopicOLD,15],[
> mytopicOLD,29],[mytopic,37],[mytopic,38],[__consumer_
> offsets,42],[mytopic,27],[mytopic,26],[mytopic,15],[__
> consumer_offsets,34],[mytopic,42],[__consumer_offsets,46],[
> mytopic,14],[mytopicOLD,12],[mytopicOLD,1],[mytopic,7],[__
> consumer_offsets,25],[mytopicOLD,24],[mytopicOLD,44]
> ,[mytopicOLD,14],[__consumer_offsets,32],[mytopic,0],[__
> consumer_offsets,43],[mytopic,39],[mytopicOLD,5],[mytopic,9]
> ,[mytopic,24],[__consumer_offsets,36],[mytopic,25],[
> mytopicOLD,36],[mytopic,19],[__consumer_offsets,35],[__
> consumer_offsets,7],[mytopic,8],[__consumer_offsets,38],[
> mytopicOLD,48],[mytopicOLD,9],[__consumer_offsets,1],[
> mytopicOLD,6],[mytopic,41],[mytopicOLD,41],[mytopicOLD,7],
> [mytopic,17],[mytopicOLD,17],[mytopic,49],[__consumer_
> offsets,16],[__consumer_offsets,2]
> > (kafka.controller.PartitionStateMachine)[2017-04-26 12:03:35,045] INFO
> > [SessionExpirationListener on 1], ZK expired; shut down all controller
> > components and try to re-elect
> > (kafka.controller.KafkaController$SessionExpirationListener)[2017-04-26
> > 12:03:35,045] DEBUG [Controller 1]: Controller resigning, broker id 1
> > (kafka.controller.KafkaController)[2017-04-26 12:03:35,045] DEBUG
> > [Controller 1]: De-registering IsrChangeNotificationListener
> > (kafka.controller.KafkaController)[2017-04-26 12:03:35,060] INFO
> [Partition
> > state machine on Controller 1]: Stopped partition state machine
> > (kafka.controller.PartitionStateMachine)[2017-04-26 12:03:35,060] INFO
> > [Replica state machine on controller 1]: Stopped replica state machine
> > (kafka.controller.ReplicaStateMachine)[2017-04-26 12:03:35,060] INFO
> > [Controller 1]: Broker 1 resigned as the controller
> > (kafka.controller.KafkaController)[2017-04-26 12:03:36,013] DEBUG
> > [OfflinePartitionLeaderSelector]: No broker in ISR is alive for
> > [__consumer_offsets,19]. Pick the leader from the alive assigned
> replicas:
> > (kafka.controller.OfflinePartitionLeaderSelector)[2017-04-26
> 12:03:36,029]
> > DEBUG [OfflineParti

Re: Initializing StateStores takes *really* long for large datasets

2016-11-25 Thread Svante Karlsson
What kind of disk are you using for the rocksdb store? ie spinning or ssd?

2016-11-25 12:51 GMT+01:00 Damian Guy :

> Hi Frank,
>
> Is this on a restart of the application?
>
> Thanks,
> Damian
>
> On Fri, 25 Nov 2016 at 11:09 Frank Lyaruu  wrote:
>
> > Hi y'all,
> >
> > I have a reasonably simple KafkaStream application, which merges about 20
> > topics a few times.
> > The thing is, some of those topic datasets are pretty big, about 10M
> > messages. In total I've got
> > about 200Gb worth of state in RocksDB, the largest topic is 38 Gb.
> >
> > I had set the MAX_POLL_INTERVAL_MS_CONFIG to one hour to cover the
> > initialization time,
> > but that does not seem nearly enough, I'm looking at more than two hour
> > startup times, and
> > that starts to be a bit ridiculous.
> >
> > Any tips / experiences on how to deal with this case? Move away from
> Rocks
> > and use an external
> > data store? Any tuning tips on how to tune Rocks to be a bit more useful
> > here?
> >
> > regards, Frank
> >
>


Re: kafka connect(copycat) question

2015-12-03 Thread Svante Karlsson
Hi, I tried building this today and the problem seems to remain.

/svante



[INFO] Building kafka-connect-hdfs 2.0.0-SNAPSHOT
[INFO]

Downloading:
http://packages.confluent.io/maven/io/confluent/kafka-connect-avro-converter/2.0.0-SNAPSHOT/maven-metadata.xml
Downloading:
http://packages.confluent.io/maven/io/confluent/kafka-connect-avro-converter/2.0.0-SNAPSHOT/kafka-connect-avro-converter-2.0.0-SNAPSHOT.pom
[WARNING] The POM for
io.confluent:kafka-connect-avro-converter:jar:2.0.0-SNAPSHOT is missing, no
dependency information available
Downloading:
http://packages.confluent.io/maven/io/confluent/common-config/2.0.0-SNAPSHOT/maven-metadata.xml
Downloading:
http://packages.confluent.io/maven/io/confluent/common-config/2.0.0-SNAPSHOT/common-config-2.0.0-SNAPSHOT.pom
[WARNING] The POM for io.confluent:common-config:jar:2.0.0-SNAPSHOT is
missing, no dependency information available
Downloading:
http://packages.confluent.io/maven/io/confluent/kafka-connect-avro-converter/2.0.0-SNAPSHOT/kafka-connect-avro-converter-2.0.0-SNAPSHOT.jar
Downloading:
http://packages.confluent.io/maven/io/confluent/common-config/2.0.0-SNAPSHOT/common-config-2.0.0-SNAPSHOT.jar
[INFO]

[INFO] BUILD FAILURE


> --
> Thanks,
> Ewen
>


Re: Locality question

2015-11-12 Thread Svante Karlsson
If you have a kafka partition that is replicated to 3 nodes the partition
varies (in time) thus making the colocation pointless. You can only produce
and consume to/from the leader.

/svante



2015-11-12 9:00 GMT+01:00 Young, Ben :

> Hi,
>
> Any thoughts on this? Perhaps Kafka is not the best way to go for this,
> but the docs do mention transaction/replication logs as a use case, and I'd
> have thought locality would have been important for that?
>
> Thanks,
> Ben
>
> -Original Message-
> From: Young, Ben [mailto:ben.yo...@sungard.com]
> Sent: 06 November 2015 08:20
> To: users@kafka.apache.org
> Subject: Locality question
>
> Hi,
>
> I've had a look over the website and searched the archives, but I can't
> find any obvious answers to this, so apologies if it's been asked before.
>
> I'm investigating potentially using Kafka for the transaction log for our
> in-memory database technology. The idea is the Kafka partitioning and
> replication will "automatically" give us sharding and hot-standby
> capabilities in the db (obviously with a fair amount of work).
>
> The database can ingest hundreds of gigabytes of data extremely quickly,
> easily enough to saturate any reasonable network connection, so I've
> thought about co-locating the db on the same nodes of the kafka cluster
> that actually store the data, to cut out the network entirely from the
> loading process. We'd also probably want the db topology to be defined
> first, and the kafka partitioning to follow. I can see how to use the
> partitioner class to assign a specific partition to a key, but I can't
> currently see how to assume partitions to known machines upfront. Is this
> possible?
>
> Does the plan sound reasonable in general? I've also considered a log
> shipping approach like Flume, but Kafka seems simplest all round, and a
> really like the idea of just being able to set the log offset to zero to
> reload on startup.
>
> Thanks,
> Ben Young
>
>
> Ben Young . Principal Software Engineer . Adaptiv .
>
>
>


Re: How to correctly handle offsets?

2015-06-01 Thread svante karlsson
1) correlationId is just a number that you get back in your reply. you
can safely set it to anything. If you have some kind of call identification
is your system that you want to trace through logs - this is what you would
use.

2) You can safely use any external offset management you like. just store
the latest offset you got back from kafka. You should handle the error case
when your stored last offset is no longer in kafka's log. ie should you
restart from oldest or latest  logs.

/svante

2015-06-01 10:08 GMT+02:00 luo.fucong :

> Hi all:
>
> I am using Kafka 0.8.2 and SimpleConsumer in maven:
>
> 
> org.apache.kafka
> kafka_2.11
> 0.8.2.1
> 
>
> I follow the SimpleConsumer example in the wiki, and there are some
> questions:
>
> 1. There seems lacking of how to commit the offset to Kafka. As far as I
> google it, it seems that I should use the OffsetCommitRequest. However,
> like a thread that posted in this email group several months ago, I am also
> very confused with the parameter “correlationId” in the constructor of
> OffsetCommitRequest.
>
> 2. Maybe there are another way of doing the offset management. I am
> thinking store the offset to Redis. If the offset is just a long number
> that indicating the “offset” in the partition, and has nothing else tricky
> (like truncated at sometime or someplace), I suppose it’s doable, is it?
>
> 3. I really want to know what’s the formally and officially recommended
> way to handle the offset. Without using the High Level Consumers.
>
> Thank you very much!


Re: Kafka still aware of old zookeeper nodes

2015-04-30 Thread svante karlsson
Have you changed

zookeeper.connect=

in server.properties.

A better procedure for replacing zookeeper nodes would be to shutdown one
and install the new one with the same ip. This can easily be done to a
running cluster.

/svante

2015-04-30 20:08 GMT+02:00 Dillian Murphey :

> I had 3 zookeeper nodes. I added 3 new ones and shut down the old 3.
>
> The server.log shows Closing socket connection error to the old IPs. I
> rebooted the kafka server entirely but it still somehow seems aware of
> these servers.
>
> Any ideas what's up?
>


hive output to kafka

2015-04-28 Thread Svante Karlsson
What's the best way of exporting contents (avro encoded) from hive queries
to kafka?

Kind of camus, the other way around

best regards
svante


Re: Kafka Consumer

2015-03-31 Thread svante karlsson
Your consumer "might" belong to a consumer group. Just commit offsets to
that consumer groups/topic/partition and it will work.

That said - if you want to figure out the consumers groups that exists you
have to look in zookeeper. There is no kafka API to get or create them. In
the java client it is done internally by a zookeeper connector

the following paths seems to exists for all consumer groups
/consumers/"consumer_group"/offsets
/consumers/"consumer_group"/owners

/svante




2015-03-31 15:10 GMT+02:00 James King :

> I created a topic using:
>
> bin/kafka-topics.sh --create --zookeeper localhost:2181
> --replication-factor 1 --partitions 1 --topic test
>
> How do I find out what group it belongs to?
>
> Thank you.
>


Re: Producer Behavior When one or more Brokers' Disk is Full.

2015-03-26 Thread svante karlsson
>4. As for recovering broker from disk full, if replication is enabled one
>can just bring it down (the leader of the partition will then migrate to
>other brokers), clear the disk space, and bring it up again; if replication
>is not enabled then you can first move the partitions away from this broker
>using the partition-reassignment tool and then do the same.


I believe that this is handled in a rather abrupt way at the server. It
will crash and if you have replication the partition leader will move.

However you must manually solve the disk space issue before restarting the
failed broker since replication will immediately crash it again.

(The same thing also applies to a broken disk)

I think that partition-reassignment requires a healthy broker but I might
be wrong on this.

/svante


Re: Broker shutdown, Can't restart

2015-03-21 Thread svante karlsson
>Is there a specific reason for the collocation of all partitions of a
topic?

Not all partitions - any partition of a topic is kept in a separate dir.
(hopefully not all on the same server)

>This means,  the capacity of required volume is to be determined by the
retention size of the topic with largest data load.

Yes but see above - determined by number of partitions and the retention
size of the topic with largest data load. (so you can always use more
partitions to spread the load)

>I will try moving the data from full to less used volumes, though it seems
an unclean workaround, not suitable in Prod.

Agreed - but you should really monitor the disks for space - you should
never run out of space like this in production.

best
svante



2015-03-21 20:58 GMT+01:00 Zakee :

> > The shutdown is expected. All data in a partition is kept in a single
> directory (=> single disk)
>
> Is there a specific reason for the collocation of all partitions of a
> topic?  This means,  the capacity of required volume is to be determined by
> the retention size of the topic with largest data load.
> So it is not as distributed across folders as expected, i.e.,  across all
> directories by partition, not by topic.
>
> >  You could get a bigger disk and copy the data to this one. As a last
> resort you can manually delete logs from the full partition (start with the
> oldest)
>
>
> I will try moving the data from full to less used volumes, though it seems
> an unclean workaround, not suitable in Prod.
>
> Thanks,
> Zakee
>
>
>


Re: Broker shutdown, Can't restart

2015-03-21 Thread svante karlsson
The shutdown is expected. All data in a partition is kept in a single
directory (=> single disk)

I would move some topics/partitions from a full disk to a disk (on the same
broker) with more space.

If you have very unbalanced topics this might be hard.

You could get a bigger disk and copy the data to this one

As a last resort you can manually delete logs from the full partition
(start with the oldest)

/svante

2015-03-21 19:34 GMT+01:00 Zakee :

> What happens if one of the log directories configured with broker is 100%
> full. Is it expected that brokers will shutdown themselves?
>
> We ran into the full disk space on one of the volumes (out of 8) on each
> of 5 brokers, and brokers shutdown themselves. We still have about 60% of
> total disk space provided by 8 volumes/directories. Should n’t the brokers
> continue to function as long as they have space left on the last log
> directory.
>
> In this case, how do I fix and restart the broker. Trying to restart also
> failed with fatal error.
>
> Opened a kafka core issue:
> https://issues.apache.org/jira/browse/KAFKA-2038
>
> Thanks
> Zakee
>
>
>
> 
> Old School Yearbook Pics
> View Class Yearbooks Online Free. Search by School & Year. Look Now!
> http://thirdpartyoffers.netzero.net/TGL3231/550db9cebda5839ce0dcbst04vuc


Re: Kafka 0.8.2 log cleaner

2015-03-02 Thread svante karlsson
Wouldn't it be rather simple to add a retention time on "deleted" items ie
keys with null value for topics that are compacted?

The retention time would then be set to some "large" time to allow all
consumers to understand that a previous k/v is being deleted.



2015-03-02 17:30 GMT+01:00 Ivan Balashov :

> Guozhang,
>
> I agree, but upon restart the application still needs to init
> KV-storage. And even though values are empty, keys will generate
> traffic (delaying app startup time).
> Besides, the idea of keeping needless data in kafka forever, even keys
> only, sounds rather unsettling.
>
> I guess we could try to reduce the key update count, and adjust
> retention of KV topic.
>
> Thanks,
>
> 2015-03-02 19:14 GMT+03:00 Guozhang Wang :
> > Currently Kafka log compaction does not support removing keys, but as
> long
> > as you also have log cleaning done at the app level #.keys will not
> > increase indefinitely.
>


Re: Consuming a snapshot from log compacted topic

2015-02-18 Thread svante karlsson
Do you have to separate the snapshot from the "normal" update flow.

I've used a compacting kafka topic as the source of truth to a solr
database and fed the topic both with real time updates and "snapshots" from
a hive job. This worked very well. The nice point is that there is a
seamless transition between loading an initial state and after that
continuing with incremental updates. Thus no need to actually tell whether
you are at the end of the stream or not. (Just our normal monitor the "lag"
problem...)



2015-02-18 19:18 GMT+01:00 Will Funnell :

> We are currently using Kafka 0.8.1.1 with log compaction in order to
> provide streams of messages to our clients.
>
> As well as constantly consuming the stream, one of our use cases is to
> provide a snapshot, meaning the user will receive a copy of every message
> at least once.
>
> Each one of these messages represents an item of content in our system.
>
>
> The problem comes when determining if the client has actually reached the
> end of the topic.
>
> The standard Kafka way of dealing with this seems to be by using a
> ConsumerTimeoutException, but we are frequently getting this error when the
> end of the topic has not been reached or even it may take a long time
> before a timeout naturally occurs.
>
>
> On first glance it would seem possible to do a lookup for the max offset
> for each partition when you begin consuming, stopping when this position it
> reached.
>
> But log compaction means that if an update to a piece of content arrives
> with the same message key, then this will be written to the end so the
> snapshot will be incomplete.
>
>
> Another thought is to make use of the cleaner point. Currently Kafka writes
> out to a "cleaner-offset-checkpoint" file in each data directory which is
> written to after log compaction completes.
>
> If the consumer was able to access the cleaner-offset-checkpoint you would
> be able to consume up to this point, check the point was still the same,
> and compaction had not yet occurred, and therefore determine you had
> receive everything at least once. (Assuming there was no race condition
> between compaction and writing to the file)
>
>
> Has anybody got any thoughts?
>
> Will
>


Re: kafka.server.ReplicaManager error

2015-02-05 Thread svante karlsson
In our case unclean leader selection was enabled

As the cluster should have been empty I can't really say that we did not
lose any data but as I wrote earlier, I could not get the log messages to
stop until I took down all brokers at the same time.








2015-02-05 22:16 GMT+01:00 Kyle Banker :

> Thanks for sharing, svante. We're also running 0.8.2.
>
> Our cluster appears to be completely unusable at this point. We tried
> restarting the "down" broker with a clean log directory, and it's doing
> nothing. It doesn't seem to be able to get topic data, which this Zookeeper
> message appears to confirm:
>
> [ProcessThread(sid:5 cport:-1)::PrepRequestProcessor@645] - Got user-level
> KeeperException when processing sessionid:0x54b0e251a5cd0ec type:setData
> cxid:0x2b7ab zxid:0x100b9ad88 txntype:-1 reqpath:n/a Error
> Path:/brokers/topics/mytopic/partitions/143/state Error:KeeperErrorCode =
> BadVersion for /brokers/topics/mytopic/partitions/143/state
>
> It's probably worthwhile to note that we've disabled unclean leader
> election.
>
>
>
> On Thu, Feb 5, 2015 at 2:01 PM, svante karlsson  wrote:
>
> > I believe I've had the same problem on the 0.8.2 rc2. We had a idle test
> > cluster with unknown health status and I applied rc3 without checking if
> > everything was ok before. Since that cluster had been doing nothing for a
> > couple of days and the retention time was 48 hours it's reasonable to
> > assume that no actual data was left on the cluster. The same type of logs
> > was emitted in big amounts and never stopped. I then rebooted each
> > zookeeper in series. No change, Then bumped each broker - no change,
> > Finally I took down all brokers at the same time.
> >
> > The logging stopped but then one broker did not have any partitions in
> > sync, including the the internal consumer offset topic that was living
> > (with replicas=1) on that broker. I then bumped this broker once more and
> > then my whole cluster became in sync.
> >
> > I suspect that something related to 0 size topics caused this since the
> the
> > cluster worked fine the week before during testing and also after during
> > more testing with rc3.
> >
> >
> >
> >
> >
> >
> >
> > 2015-02-05 19:22 GMT+01:00 Kyle Banker :
> >
> > > Digging in a bit more, it appears that the "down" broker had likely
> > > partially failed. Thus, it was still attempting to fetch offsets that
> no
> > > longer exists. Does this make sense as an explanation of the
> > > above-mentioned behavior?
> > >
> > > On Thu, Feb 5, 2015 at 10:58 AM, Kyle Banker 
> > wrote:
> > >
> > > > Dug into this a bit more, and it turns out that we lost one of our 9
> > > > brokers at the exact moment when this started happening. At the time
> > that
> > > > we lost the broker, we had no under-replicated partitions. Since the
> > > broker
> > > > disappeared, we've had a fairly constant number of under replicated
> > > > partitions. This makes some sense, of course.
> > > >
> > > > Still, the log message doesn't.
> > > >
> > > > On Thu, Feb 5, 2015 at 10:39 AM, Kyle Banker 
> > > wrote:
> > > >
> > > >> I have a 9-node Kafka cluster, and all of the brokers just started
> > > >> spouting the following error:
> > > >>
> > > >> ERROR [Replica Manager on Broker 1]: Error when processing fetch
> > request
> > > >> for partition [mytopic,57] offset 0 from follower with correlation
> id
> > > >> 58166. Possible cause: Request for offset 0 but we only have log
> > > segments
> > > >> in the range 39 to 39. (kafka.server.ReplicaManager)
> > > >>
> > > >> The "mytopic" topic has a replication factor of 3, and metrics are
> > > >> showing a large number of under replicated partitions.
> > > >>
> > > >> My assumption is that a log aged out but that the replicas weren't
> > aware
> > > >> of it.
> > > >>
> > > >> In any case, this problem isn't fixing itself, and the volume of log
> > > >> messages of this type is enormous.
> > > >>
> > > >> What might have caused this? How does one resolve it?
> > > >>
> > > >
> > > >
> > >
> >
>


Re: kafka.server.ReplicaManager error

2015-02-05 Thread svante karlsson
I believe I've had the same problem on the 0.8.2 rc2. We had a idle test
cluster with unknown health status and I applied rc3 without checking if
everything was ok before. Since that cluster had been doing nothing for a
couple of days and the retention time was 48 hours it's reasonable to
assume that no actual data was left on the cluster. The same type of logs
was emitted in big amounts and never stopped. I then rebooted each
zookeeper in series. No change, Then bumped each broker - no change,
Finally I took down all brokers at the same time.

The logging stopped but then one broker did not have any partitions in
sync, including the the internal consumer offset topic that was living
(with replicas=1) on that broker. I then bumped this broker once more and
then my whole cluster became in sync.

I suspect that something related to 0 size topics caused this since the the
cluster worked fine the week before during testing and also after during
more testing with rc3.







2015-02-05 19:22 GMT+01:00 Kyle Banker :

> Digging in a bit more, it appears that the "down" broker had likely
> partially failed. Thus, it was still attempting to fetch offsets that no
> longer exists. Does this make sense as an explanation of the
> above-mentioned behavior?
>
> On Thu, Feb 5, 2015 at 10:58 AM, Kyle Banker  wrote:
>
> > Dug into this a bit more, and it turns out that we lost one of our 9
> > brokers at the exact moment when this started happening. At the time that
> > we lost the broker, we had no under-replicated partitions. Since the
> broker
> > disappeared, we've had a fairly constant number of under replicated
> > partitions. This makes some sense, of course.
> >
> > Still, the log message doesn't.
> >
> > On Thu, Feb 5, 2015 at 10:39 AM, Kyle Banker 
> wrote:
> >
> >> I have a 9-node Kafka cluster, and all of the brokers just started
> >> spouting the following error:
> >>
> >> ERROR [Replica Manager on Broker 1]: Error when processing fetch request
> >> for partition [mytopic,57] offset 0 from follower with correlation id
> >> 58166. Possible cause: Request for offset 0 but we only have log
> segments
> >> in the range 39 to 39. (kafka.server.ReplicaManager)
> >>
> >> The "mytopic" topic has a replication factor of 3, and metrics are
> >> showing a large number of under replicated partitions.
> >>
> >> My assumption is that a log aged out but that the replicas weren't aware
> >> of it.
> >>
> >> In any case, this problem isn't fixing itself, and the volume of log
> >> messages of this type is enormous.
> >>
> >> What might have caused this? How does one resolve it?
> >>
> >
> >
>


Re: kafka sending duplicate content to consumer

2015-01-23 Thread svante karlsson
A kafka broker never pushes data to a consumer. It's the consumer that does
a long fetch and it provides the offset to read from.

The problem lies in how your consumer handles the for example 1000 messages
that it just got. If you handle 500 of them and crash without committing
the offsets somewhere (either to kafka or in some other system). When you
restart the you will start your fetch again from the last committed offset.
Kafka has no notion of an already consumed message.



2015-01-23 7:54 GMT+01:00 Tousif :

> Hi,
>
> i want know in which situation does kafka send same event  multiple times
> to consumer. Is there a consumer side configuration to tell kafka to send
> only once and stop retries?
>
> --
>
>
> Regards
> Tousif Khazi
>


Re: Isr difference between Metadata Response vs /kafka-topics.sh --describe

2015-01-21 Thread svante karlsson
thanks,

svante

2015-01-21 16:30 GMT+01:00 Joe Stein :

> Sounds like you are bumping into this
> https://issues.apache.org/jira/browse/KAFKA-1367
>
>


Isr difference between Metadata Response vs /kafka-topics.sh --describe

2015-01-21 Thread svante karlsson
We are running an external (like in non supported) C++ client library
agains 0.8.2-rc2 and see differences in the Isr vector in Metadata Response
compared to what ./kafka-topics.sh --describe returns.

We have a triple replicated topic that is not updated during the test.

kafka-topics.sh
returns

Topic: saka.test.int_datastream Partition: 0Leader: 3
Replicas: 3,1,2 Isr: 2,1,3
Topic: saka.test.int_datastream Partition: 1Leader: 1
Replicas: 1,2,3 Isr: 2,1,3


After some debugging of the received packet it seems the data is actually
missing from the server.

After a sequensial restart of each broker - everything was back to normal

two pairs of loglines every 10s

initial state:

saka.test.int_datastream Partition: 1 Leader: 1 Replicas: 1, 2, 3, Isr: 2,
1,
saka.test.int_datastream Partition: 0 Leader: 3 Replicas: 3, 1, 2, Isr: 2,
1, 3,
saka.test.int_datastream Partition: 1 Leader: 1 Replicas: 1, 2, 3, Isr: 2,
1,
saka.test.int_datastream Partition: 0 Leader: 3 Replicas: 3, 1, 2, Isr: 2,
1, 3,
saka.test.int_datastream Partition: 1 Leader: 1 Replicas: 1, 2, 3, Isr: 2,
1,
saka.test.int_datastream Partition: 0 Leader: 3 Replicas: 3, 1, 2, Isr: 2,
1, 3,
saka.test.int_datastream Partition: 1 Leader: 1 Replicas: 1, 2, 3, Isr: 2,
1,
saka.test.int_datastream Partition: 0 Leader: 3 Replicas: 3, 1, 2, Isr: 2,
1, 3,

restart broker 1

handle_connect_retry_timer
_connect_async_next z8r102-mc12-4-4.sth-tc2.videoplaza.net:9092

saka.test.int_datastream Partition: 1 Leader: 2 Replicas: 1, 2, 3, Isr: 2,
3,
saka.test.int_datastream Partition: 0 Leader: 3 Replicas: 3, 1, 2, Isr: 2,
3,
saka.test.int_datastream Partition: 1 Leader: 2 Replicas: 1, 2, 3, Isr: 2,
3,
saka.test.int_datastream Partition: 0 Leader: 3 Replicas: 3, 1, 2, Isr: 2,
3,
...
saka.test.int_datastream Partition: 1 Leader: 2 Replicas: 1, 2, 3, Isr: 2,
3,
saka.test.int_datastream Partition: 0 Leader: 3 Replicas: 3, 1, 2, Isr: 2,
3,
saka.test.int_datastream Partition: 1 Leader: 2 Replicas: 1, 2, 3, Isr: 2,
3,
saka.test.int_datastream Partition: 0 Leader: 3 Replicas: 3, 1, 2, Isr: 2,
3,
saka.test.int_datastream Partition: 1 Leader: 1 Replicas: 1, 2, 3, Isr: 2,
3, 1,
saka.test.int_datastream Partition: 0 Leader: 3 Replicas: 3, 1, 2, Isr: 2,
3,
saka.test.int_datastream Partition: 1 Leader: 1 Replicas: 1, 2, 3, Isr: 2,
3, 1,
saka.test.int_datastream Partition: 0 Leader: 3 Replicas: 3, 1, 2, Isr: 2,
3,
saka.test.int_datastream Partition: 1 Leader: 1 Replicas: 1, 2, 3, Isr: 2,
3, 1,
saka.test.int_datastream Partition: 0 Leader: 3 Replicas: 3, 1, 2, Isr: 2,
3,
saka.test.int_datastream Partition: 1 Leader: 1 Replicas: 1, 2, 3, Isr: 2,
3, 1,
saka.test.int_datastream Partition: 0 Leader: 3 Replicas: 3, 1, 2, Isr: 2,
3,
saka.test.int_datastream Partition: 1 Leader: 1 Replicas: 1, 2, 3, Isr: 2,
3, 1,
saka.test.int_datastream Partition: 0 Leader: 3 Replicas: 3, 1, 2, Isr: 2,
3,
saka.test.int_datastream Partition: 1 Leader: 1 Replicas: 1, 2, 3, Isr: 2,
3, 1,
saka.test.int_datastream Partition: 0 Leader: 3 Replicas: 3, 1, 2, Isr: 2,
3,
saka.test.int_datastream Partition: 1 Leader: 1 Replicas: 1, 2, 3, Isr: 2,
3, 1,
saka.test.int_datastream Partition: 0 Leader: 3 Replicas: 3, 1, 2, Isr: 2,
3,
saka.test.int_datastream Partition: 1 Leader: 1 Replicas: 1, 2, 3, Isr: 2,
3, 1,
saka.test.int_datastream Partition: 0 Leader: 3 Replicas: 3, 1, 2, Isr: 2,
3,
saka.test.int_datastream Partition: 1 Leader: 1 Replicas: 1, 2, 3, Isr: 2,
3, 1,
saka.test.int_datastream Partition: 0 Leader: 3 Replicas: 3, 1, 2, Isr: 2,
3,
saka.test.int_datastream Partition: 1 Leader: 1 Replicas: 1, 2, 3, Isr: 2,
3, 1,
saka.test.int_datastream Partition: 0 Leader: 3 Replicas: 3, 1, 2, Isr: 2,
3,
saka.test.int_datastream Partition: 1 Leader: 1 Replicas: 1, 2, 3, Isr: 2,
3, 1,
saka.test.int_datastream Partition: 0 Leader: 3 Replicas: 3, 1, 2, Isr: 2,
3,
saka.test.int_datastream Partition: 1 Leader: 1 Replicas: 1, 2, 3, Isr: 2,
3, 1,
saka.test.int_datastream Partition: 0 Leader: 3 Replicas: 3, 1, 2, Isr: 2,
3,
saka.test.int_datastream Partition: 1 Leader: 1 Replicas: 1, 2, 3, Isr: 2,
3, 1,

saka.test.int_datastream Partition: 0 Leader: 3 Replicas: 3, 1, 2, Isr: 2,
3,

restart broker 3

known brokers changed {  }
saka.test.int_datastream Partition: 1 Leader: 1 Replicas: 1, 2, Isr: 2, 1,
saka.test.int_datastream Partition: 0 Leader: 2 Replicas: 1, 2, Isr: 2, 1,
known brokers changed {  }
saka.test.int_datastream Partition: 1 Leader: 1 Replicas: 1, 2, 3, Isr: 2,
1,
saka.test.int_datastream Partition: 0 Leader: 2 Replicas: 3, 1, 2, Isr: 2,
1,
saka.test.int_datastream Partition: 1 Leader: 1 Replicas: 1, 2, 3, Isr: 2,
1,
saka.test.int_datastream Partition: 0 Leader: 2 Replicas: 3, 1, 2, Isr: 2,
1,
saka.test.int_datastream Partition: 1 Leader: 1 Replicas: 1, 2, 3, Isr: 2,
1,
saka.test.int_datastream Partition: 0 Leader: 2 Replicas: 3, 1, 2, Isr: 2,
1,
saka.test.int_datastream Partition: 1 Leader: 1 Replicas: 1, 2, 3, Isr: 2,
1,
saka.test.int_datastream Partition: 0 Leader: 3 Replicas: 3, 1, 2, I

Re: How to handle broker disk failure

2015-01-21 Thread svante karlsson
Is it possible to continue to server topics from the remaining disks while
waiting for a replacement disk or will the broker exit/stop working. (we
would like to be able to replace disks in a relaxed manner since we have
the datacenter colocated and we don't have permanent staff there since
there is simply not enough things to do to motivate 24h staffing)

If we trigger a rebalance during the downtime the under replicated
topics/partitions will hopefully be moved somewhere else? What happens the
when we add the broker again - now with a new empty disk. Will all over
replicated partitions be removed from the reinserted broker and finally
should/must we trigger a rebalance?

/svante

2015-01-21 2:56 GMT+01:00 Jun Rao :

> Actually, you don't need to reassign partitions in this case. You just need
> to replace the bad disk and restart the broker. It will copy the missing
> data over automatically.
>
> Thanks,
>
> Jun
>
> On Tue, Jan 20, 2015 at 1:02 AM, svante karlsson  wrote:
>
> > I'm trying to figure out the best way to handle a disk failure in a live
> > environment.
> >
> > The obvious (and naive) solution is to decommission the broker and let
> > other brokers taker over and create new followers. Then replace the disk
> > and clean the remaining log directories and add the broker again.
> >
> > The disadvantage with this approach is of course the network overhead and
> > the time it takes to reassign partitions.
> >
> > Is there a better way?
> >
> > As a sub question, is it possible to continue running a broker with a
> > failed drive and still serve the remaining partitions?
> >
> > thanks,
> > svante
> >
>


typo in wiki

2015-01-20 Thread svante karlsson
In the wiki - there is a statement that a partition must fit on a single
machine, while technically true, isn't it so that a partition must fit on a
single disk on that machine.


https://cwiki.apache.org/confluence/display/KAFKA/FAQ#FAQ-HowmanytopicscanIhave
?

>A partition is basically a directory of log files.
>Each partition must fit entirely on one machine. So if you have only one
partition in your topic you >cannot scale your write rate or retention
beyond the capability of a single machine. If you have 1000 >partitions you
could potentially use 1000 machines.

thanks,
svante


How to handle broker disk failure

2015-01-20 Thread svante karlsson
I'm trying to figure out the best way to handle a disk failure in a live
environment.

The obvious (and naive) solution is to decommission the broker and let
other brokers taker over and create new followers. Then replace the disk
and clean the remaining log directories and add the broker again.

The disadvantage with this approach is of course the network overhead and
the time it takes to reassign partitions.

Is there a better way?

As a sub question, is it possible to continue running a broker with a
failed drive and still serve the remaining partitions?

thanks,
svante


Re: [VOTE] 0.8.2.0 Candidate 1

2015-01-16 Thread svante karlsson
Hmm, produce "msg/sec in rate" seems to be per broker and "produce msg/per
sec" should also be per broker and thus be related. The problem is that for
a time period the graphs indicated that 1) messages where only produced to
one broker 2) messages where produces to two brokers.

when I restarted brokers everything looked normal again. I made no changes
to the parts that were collecting the metrics during this time. This is of
course hearsay since I can't repeat it but at least the graphs supports the
view that something is strange.

I agree that the value looks ok for most ("all") of the time but I suspect
that there might be issues here.

/svante

2015-01-17 0:19 GMT+01:00 Jun Rao :

> I did some quick tests and the mbean values look reasonable. On the
> producer side, produce msg/sec is actually for all brokers.
>
> Thanks,
>
> Jun
>
> On Fri, Jan 16, 2015 at 12:09 PM, svante karlsson  wrote:
>
> > Disregard the previous message, it was send accidently..
> >
> > Jun,
> >
> > I don't know if it was an issue with graphite or the mbean and have not
> > seen it since - and we have tried of several cases of failover.
> >
> > That said, I have the feeling that it was a kafka issue and I'm a bit
> > suspicious about the new mbeans.
> >
> > I attach a screenshot from the grafana dashboard and if you look at the
> > first graph (top left) at ~10% it shows the startup after upgrade.
> >
> > This is a 3 node cluster with a topic of 2 partitions. When we start up a
> > single producer it produces messages to both partitions without message
> > loss. I know that all messages are acked.
> >
> >  If you look at the "produce msg/sec" graph it seems to hit 2 servers
> (it's
> > per broker) but "messages in rate" & "byte in rate",& "byte out rate"
> (all
> > from the new mbeans) look as if the data only hits one broker. (those are
> > also per broker)
> >
> > At 70% I restarted two brokers after each other. After that point all
> three
> > graphs looks fine.
> >
> > I'm not at work now and can't dig into the graphite data but I now see
> that
> > the "fetch follower" also looks strange
> >
> > I can't file it as a bug report as I can't reproduce it but I have a
> > distinct feeling that I can't trust the new mbeans or have to find
> another
> > explanation.
> >
> > regard it as an observation if someone else reports issues.
> >
> >
> > thanks,
> >
> > svante
> >
> > 2015-01-16 20:56 GMT+01:00 svante karlsson :
> >
> > > Jun,
> > >
> > > I don't know if it was an issue with graphite or the mbean but I have
> not
> > > seen it since - and we have tried of several cases of failover and this
> > > problem has only been seen once.
> > >
> > > That said, I have the feeling that it was a kafka issue and I'm a bit
> > > suspicious about the new mbeans.
> > >
> > > I attach a screenshot from the grafana dashboard and if you look at the
> > > first graph (top left) at ~10% it shows the startup after upgrade.
> > >
> > > This is a 3 node cluster with a topic of 2 partitions. When we start
> up a
> > > single producer it produces messages to both partitions without message
> > > loss. I know that all messages are acked.
> > >
> > >  If you look at the "produce message msg/sec" graph it seems to hit 2
> > > servers (it's per broker)
> > >
> > >
> > > Bad picture but
> > >
> > > 2015-01-16 18:05 GMT+01:00 Jun Rao :
> > >
> > >> Svante,
> > >>
> > >> I tested this out locally and the mbeans for those metrics do show up
> on
> > >> startup. Can you reproduce the issue reliably? Also, is what you saw
> an
> > >> issue with the mbean itself or graphite?
> > >>
> > >> Thanks,
> > >>
> > >> Jun
> > >>
> > >> On Fri, Jan 16, 2015 at 4:38 AM, svante karlsson  wrote:
> > >>
> > >> > I upgrade two small test cluster and I had two small issues but I'm,
> > not
> > >> > clear yet as to if those were an issue due to us using ansible to
> > >> configure
> > >> > and deploy the cluster.
> > >> >
> > >> > The first issue could be us doing something bad when distributing
> the
> > >> > update (I updated, not reinstalled) but it sh

Re: [VOTE] 0.8.2.0 Candidate 1

2015-01-16 Thread svante karlsson
Disregard the previous message, it was send accidently..

Jun,

I don't know if it was an issue with graphite or the mbean and have not
seen it since - and we have tried of several cases of failover.

That said, I have the feeling that it was a kafka issue and I'm a bit
suspicious about the new mbeans.

I attach a screenshot from the grafana dashboard and if you look at the
first graph (top left) at ~10% it shows the startup after upgrade.

This is a 3 node cluster with a topic of 2 partitions. When we start up a
single producer it produces messages to both partitions without message
loss. I know that all messages are acked.

 If you look at the "produce msg/sec" graph it seems to hit 2 servers (it's
per broker) but "messages in rate" & "byte in rate",& "byte out rate" (all
from the new mbeans) look as if the data only hits one broker. (those are
also per broker)

At 70% I restarted two brokers after each other. After that point all three
graphs looks fine.

I'm not at work now and can't dig into the graphite data but I now see that
the "fetch follower" also looks strange

I can't file it as a bug report as I can't reproduce it but I have a
distinct feeling that I can't trust the new mbeans or have to find another
explanation.

regard it as an observation if someone else reports issues.


thanks,

svante

2015-01-16 20:56 GMT+01:00 svante karlsson :

> Jun,
>
> I don't know if it was an issue with graphite or the mbean but I have not
> seen it since - and we have tried of several cases of failover and this
> problem has only been seen once.
>
> That said, I have the feeling that it was a kafka issue and I'm a bit
> suspicious about the new mbeans.
>
> I attach a screenshot from the grafana dashboard and if you look at the
> first graph (top left) at ~10% it shows the startup after upgrade.
>
> This is a 3 node cluster with a topic of 2 partitions. When we start up a
> single producer it produces messages to both partitions without message
> loss. I know that all messages are acked.
>
>  If you look at the "produce message msg/sec" graph it seems to hit 2
> servers (it's per broker)
>
>
> Bad picture but
>
> 2015-01-16 18:05 GMT+01:00 Jun Rao :
>
>> Svante,
>>
>> I tested this out locally and the mbeans for those metrics do show up on
>> startup. Can you reproduce the issue reliably? Also, is what you saw an
>> issue with the mbean itself or graphite?
>>
>> Thanks,
>>
>> Jun
>>
>> On Fri, Jan 16, 2015 at 4:38 AM, svante karlsson  wrote:
>>
>> > I upgrade two small test cluster and I had two small issues but I'm, not
>> > clear yet as to if those were an issue due to us using ansible to
>> configure
>> > and deploy the cluster.
>> >
>> > The first issue could be us doing something bad when distributing the
>> > update (I updated, not reinstalled) but it should be easy for you to
>> > disregard since it seems so trivial.
>> >
>> > We replace the kafka-server-start.sh with something else but we had the
>> > line
>> >
>> > EXTRA_ARGS="-name kafkaServer -loggc"
>> >
>> > then kafka-run-class.sh exits without starting the VM and complains on
>> > unknown options. ( both -name and -loggc ) - once we removed the
>> EXTRA_ARGS
>> > everything starts.
>> >
>> > as I said - everyone should have this issue if it was a problem...
>> >
>> >
>> > The second thing is regarding the jmx beans. I reconfigured our graphite
>> > monitoring and noticed that the following metrics stopped working on one
>> > broker
>> > - server.BrokerTopicMetrics.MessagesInPerSec.OneMinuteRate,
>> > - server.BrokerTopicMetrics.ByteInPerSec.OneMinuteRate,
>> > - server.BrokerTopicMetrics.ByteOutPerSec.OneMinuteRate
>> >
>> > I had graphs running and the it looked like the traffic was dropping on
>> > those metrics but our producers was working without problems and the
>> > metrics
>> > network.RequestMetrics.Produce.RequestsPerSec.OneMinuteRate confirmed
>> that
>> > on all brokers.
>> >
>> > A restart of the offending broker brought the metrics online again.
>> >
>> > /svante
>> >
>> >
>> >
>> >
>> >
>> >
>> >
>> >
>> >
>> >
>> >
>> >
>> >
>> >
>> >
>> >
>> >
>> >
>> >
>> >
>> >
>> >
>> > 2015-01-16 3:42 GMT+01:00 Gwen Shapi

Re: [VOTE] 0.8.2.0 Candidate 1

2015-01-16 Thread svante karlsson
Jun,

I don't know if it was an issue with graphite or the mbean but I have not
seen it since - and we have tried of several cases of failover and this
problem has only been seen once.

That said, I have the feeling that it was a kafka issue and I'm a bit
suspicious about the new mbeans.

I attach a screenshot from the grafana dashboard and if you look at the
first graph (top left) at ~10% it shows the startup after upgrade.

This is a 3 node cluster with a topic of 2 partitions. When we start up a
single producer it produces messages to both partitions without message
loss. I know that all messages are acked.

 If you look at the "produce message msg/sec" graph it seems to hit 2
servers (it's per broker)


Bad picture but

2015-01-16 18:05 GMT+01:00 Jun Rao :

> Svante,
>
> I tested this out locally and the mbeans for those metrics do show up on
> startup. Can you reproduce the issue reliably? Also, is what you saw an
> issue with the mbean itself or graphite?
>
> Thanks,
>
> Jun
>
> On Fri, Jan 16, 2015 at 4:38 AM, svante karlsson  wrote:
>
> > I upgrade two small test cluster and I had two small issues but I'm, not
> > clear yet as to if those were an issue due to us using ansible to
> configure
> > and deploy the cluster.
> >
> > The first issue could be us doing something bad when distributing the
> > update (I updated, not reinstalled) but it should be easy for you to
> > disregard since it seems so trivial.
> >
> > We replace the kafka-server-start.sh with something else but we had the
> > line
> >
> > EXTRA_ARGS="-name kafkaServer -loggc"
> >
> > then kafka-run-class.sh exits without starting the VM and complains on
> > unknown options. ( both -name and -loggc ) - once we removed the
> EXTRA_ARGS
> > everything starts.
> >
> > as I said - everyone should have this issue if it was a problem...
> >
> >
> > The second thing is regarding the jmx beans. I reconfigured our graphite
> > monitoring and noticed that the following metrics stopped working on one
> > broker
> > - server.BrokerTopicMetrics.MessagesInPerSec.OneMinuteRate,
> > - server.BrokerTopicMetrics.ByteInPerSec.OneMinuteRate,
> > - server.BrokerTopicMetrics.ByteOutPerSec.OneMinuteRate
> >
> > I had graphs running and the it looked like the traffic was dropping on
> > those metrics but our producers was working without problems and the
> > metrics
> > network.RequestMetrics.Produce.RequestsPerSec.OneMinuteRate confirmed
> that
> > on all brokers.
> >
> > A restart of the offending broker brought the metrics online again.
> >
> > /svante
> >
> >
> >
> >
> >
> >
> >
> >
> >
> >
> >
> >
> >
> >
> >
> >
> >
> >
> >
> >
> >
> >
> > 2015-01-16 3:42 GMT+01:00 Gwen Shapira :
> >
> > > Would make sense to enable it after we have authorization feature and
> > > admins can control who can delete what.
> > >
> > > On Thu, Jan 15, 2015 at 6:32 PM, Jun Rao  wrote:
> > > > Yes, I agree it's probably better not to enable "delete.topic.enable"
> > by
> > > > default.
> > > >
> > > > Thanks,
> > > >
> > > > Jun
> > > >
> > > > On Thu, Jan 15, 2015 at 6:29 PM, Joe Stein 
> > wrote:
> > > >
> > > >> I think that is a change of behavior that organizations may get
> burned
> > > on.
> > > >> Right now there is no delete data feature. If an operations teams
> > > upgrades
> > > >> to 0.8.2 and someone decides to delete a topic then there will be
> data
> > > >> loss. The organization may not have wanted that to happen. I would
> > > argue to
> > > >> not have a way to "by default" delete data. There is something
> > > actionable
> > > >> about consciously turning on a feature that allows anyone with
> access
> > to
> > > >> kafka-topics (or zookeeper for that matter) to delete Kafka data. If
> > > folks
> > > >> want that feature then flip the switch prior to upgrade or after and
> > > >> rolling restart and have at it. By not setting it as default they
> will
> > > know
> > > >> they have to turn it on and figure out what they need to-do from a
> > > security
> > > >> perspective (until Kafka gives them that) to protect their data
> > (through
> > > >> network or other type of m

Re: [VOTE] 0.8.2.0 Candidate 1

2015-01-16 Thread svante karlsson
I upgrade two small test cluster and I had two small issues but I'm, not
clear yet as to if those were an issue due to us using ansible to configure
and deploy the cluster.

The first issue could be us doing something bad when distributing the
update (I updated, not reinstalled) but it should be easy for you to
disregard since it seems so trivial.

We replace the kafka-server-start.sh with something else but we had the line

EXTRA_ARGS="-name kafkaServer -loggc"

then kafka-run-class.sh exits without starting the VM and complains on
unknown options. ( both -name and -loggc ) - once we removed the EXTRA_ARGS
everything starts.

as I said - everyone should have this issue if it was a problem...


The second thing is regarding the jmx beans. I reconfigured our graphite
monitoring and noticed that the following metrics stopped working on one
broker
- server.BrokerTopicMetrics.MessagesInPerSec.OneMinuteRate,
- server.BrokerTopicMetrics.ByteInPerSec.OneMinuteRate,
- server.BrokerTopicMetrics.ByteOutPerSec.OneMinuteRate

I had graphs running and the it looked like the traffic was dropping on
those metrics but our producers was working without problems and the metrics
network.RequestMetrics.Produce.RequestsPerSec.OneMinuteRate confirmed that
on all brokers.

A restart of the offending broker brought the metrics online again.

/svante






















2015-01-16 3:42 GMT+01:00 Gwen Shapira :

> Would make sense to enable it after we have authorization feature and
> admins can control who can delete what.
>
> On Thu, Jan 15, 2015 at 6:32 PM, Jun Rao  wrote:
> > Yes, I agree it's probably better not to enable "delete.topic.enable" by
> > default.
> >
> > Thanks,
> >
> > Jun
> >
> > On Thu, Jan 15, 2015 at 6:29 PM, Joe Stein  wrote:
> >
> >> I think that is a change of behavior that organizations may get burned
> on.
> >> Right now there is no delete data feature. If an operations teams
> upgrades
> >> to 0.8.2 and someone decides to delete a topic then there will be data
> >> loss. The organization may not have wanted that to happen. I would
> argue to
> >> not have a way to "by default" delete data. There is something
> actionable
> >> about consciously turning on a feature that allows anyone with access to
> >> kafka-topics (or zookeeper for that matter) to delete Kafka data. If
> folks
> >> want that feature then flip the switch prior to upgrade or after and
> >> rolling restart and have at it. By not setting it as default they will
> know
> >> they have to turn it on and figure out what they need to-do from a
> security
> >> perspective (until Kafka gives them that) to protect their data (through
> >> network or other type of measures).
> >>
> >> On Thu, Jan 15, 2015 at 8:24 PM, Manikumar Reddy 
> >> wrote:
> >>
> >> > Also can we remove "delete.topic.enable" config property and enable
> topic
> >> > deletion by default?
> >> > On Jan 15, 2015 10:07 PM, "Jun Rao"  wrote:
> >> >
> >> > > Thanks for reporting this. I will remove that option in RC2.
> >> > >
> >> > > Jun
> >> > >
> >> > > On Thu, Jan 15, 2015 at 5:21 AM, Jaikiran Pai <
> >> jai.forums2...@gmail.com>
> >> > > wrote:
> >> > >
> >> > > > I just downloaded the Kafka binary and am trying this on my 32 bit
> >> JVM
> >> > > > (Java 7)? Trying to start Zookeeper or Kafka server keeps failing
> >> with
> >> > > > "Unrecognized VM option 'UseCompressedOops'":
> >> > > >
> >> > > > ./zookeeper-server-start.sh ../config/zookeeper.properties
> >> > > > Unrecognized VM option 'UseCompressedOops'
> >> > > > Error: Could not create the Java Virtual Machine.
> >> > > > Error: A fatal exception has occurred. Program will exit.
> >> > > >
> >> > > > Same with the Kafka server startup scripts. My Java version is:
> >> > > >
> >> > > > java version "1.7.0_71"
> >> > > > Java(TM) SE Runtime Environment (build 1.7.0_71-b14)
> >> > > > Java HotSpot(TM) Server VM (build 24.71-b01, mixed mode)
> >> > > >
> >> > > > Should there be a check in the script, before adding this option?
> >> > > >
> >> > > > -Jaikiran
> >> > > >
> >> > > > On Wednesday 14 January 2015 10:08 PM, Jun Rao wrote:
> >> > > >
> >> > > >> + users mailing list. It would be great if people can test this
> out
> >> > and
> >> > > >> report any blocker issues.
> >> > > >>
> >> > > >> Thanks,
> >> > > >>
> >> > > >> Jun
> >> > > >>
> >> > > >> On Tue, Jan 13, 2015 at 7:16 PM, Jun Rao 
> wrote:
> >> > > >>
> >> > > >>  This is the first candidate for release of Apache Kafka 0.8.2.0.
> >> > There
> >> > > >>> has been some changes since the 0.8.2 beta release, especially
> in
> >> the
> >> > > new
> >> > > >>> java producer api and jmx mbean names. It would be great if
> people
> >> > can
> >> > > >>> test
> >> > > >>> this out thoroughly. We are giving people 10 days for testing
> and
> >> > > voting.
> >> > > >>>
> >> > > >>> Release Notes for the 0.8.2.0 release
> >> > > >>> *https://people.apache.org/~junrao/kafka-0.8.2.0-
> >> > > >>> candidate1/RELEASE_NOTES.html
> >> > > >>> 

Re: how to order message between different partition

2015-01-08 Thread svante karlsson
The messages are ordered per partition. No order between partitions.

If you really need ordering use one partition.



2015-01-08 9:44 GMT+01:00 YuanJia Li :

> Hi all,
> I have a topic with 3 partitions, and each partion has its sequency in
> kafka.
> How to order message between different partion? Anyone has experiences
> about that, thanks.
>
>
> Thanks & Regards
>
> YuanJia Li


Re: mirrormaker tool in 0.82beta

2015-01-07 Thread svante karlsson
No, I missed that.



thanks,
svante








2015-01-07 6:44 GMT+01:00 Jun Rao :

> Did you set offsets.storage to kafka in the consumer of mirror maker?
>
> Thanks,
>
> Jun
>
> On Mon, Jan 5, 2015 at 3:49 PM, svante karlsson  wrote:
>
> > I'm using 0.82beta and I'm trying to push data with the mirrormaker tool
> > from several remote sites to two datacenters. I'm testing this from a
> node
> > containing zk, broker and mirrormaker and the data is pushed to a
> "normal"
> > cluster. 3 zk and 4 brokers with replication.
> >
> > While the configuration seems straight forward things are a bit shaky.
> >
> > First, if something goes wrong on either consumer or producer side it
> stops
> > transmitting data without dying which makes it harder to run under a
> > supervisor (upstart in my case)
> >
> > I start pushing data to site 02 and mirrormaker pushes it to my
> datacenter
> > - so far all good...
> >
> > But, when I run. (on my remote site)
> > kafka-consumer-offset-checker.sh
> > --group mirrormaker.from.site_02.to.site_ktv --zookeeper 192.168.0.106
> >
> > Could not fetch offset for [saka.test.ext_datastream,7] due to
> > kafka.common.NotCoordinatorForConsumerException.
> > Could not fetch offset for [saka.test.ext_datastream,4] due to
> > kafka.common.NotCoordinatorForConsumerException.
> > Could not fetch offset for [saka.test.ext_datastream,2] due to
> > kafka.common.NotCoordinatorForConsumerException.
> > Could not fetch offset for [saka.test.ext_datastream,5] due to
> > kafka.common.NotCoordinatorForConsumerException.
> > Could not fetch offset for [saka.test.ext_datastream,6] due to
> > kafka.common.NotCoordinatorForConsumerException.
> > Could not fetch offset for [saka.test.ext_datastream,3] due to
> > kafka.common.NotCoordinatorForConsumerException.
> > Could not fetch offset for [saka.test.ext_datastream,1] due to
> > kafka.common.NotCoordinatorForConsumerException.
> > Could not fetch offset for [saka.test.ext_datastream,0] due to
> > kafka.common.NotCoordinatorForConsumerException.
> >
> > Group   Topic  Pid Offset
> logSize
> > Lag Owner
> > mirrormaker.from.site_02.to.site_ktv saka.test.ext_datastream   0
> > unknown 9310560 unknown
> > mirrormaker.from.site_02.to.site_ktv_kafka-4-1420497015616-725cd1b5-0
> > mirrormaker.from.site_02.to.site_ktv saka.test.ext_datastream   1
> > unknown 9313497 unknown
> > mirrormaker.from.site_02.to.site_ktv_kafka-4-1420497015616-725cd1b5-0
> > mirrormaker.from.site_02.to.site_ktv saka.test.ext_datastream   2
> > unknown 9323623 unknown
> > mirrormaker.from.site_02.to.site_ktv_kafka-4-1420497015616-725cd1b5-0
> > mirrormaker.from.site_02.to.site_ktv saka.test.ext_datastream   3
> > unknown 9334005 unknown
> > mirrormaker.from.site_02.to.site_ktv_kafka-4-1420497015616-725cd1b5-0
> > mirrormaker.from.site_02.to.site_ktv saka.test.ext_datastream   4
> > unknown 9324176 unknown
> > mirrormaker.from.site_02.to.site_ktv_kafka-4-1420497015616-725cd1b5-0
> > mirrormaker.from.site_02.to.site_ktv saka.test.ext_datastream   5
> > unknown 9336504 unknown
> > mirrormaker.from.site_02.to.site_ktv_kafka-4-1420497015616-725cd1b5-0
> > mirrormaker.from.site_02.to.site_ktv saka.test.ext_datastream   6
> > unknown 9316139 unknown
> > mirrormaker.from.site_02.to.site_ktv_kafka-4-1420497015616-725cd1b5-0
> > mirrormaker.from.site_02.to.site_ktv saka.test.ext_datastream   7
> > unknown 9318770 unknown
> > mirrormaker.from.site_02.to.site_ktv_kafka-4-1420497015616-725cd1b5-0
> >
> >
> > Since I can push data between my sites something is working but what can
> be
> > the case for NotCoordinatorForConsumerException on a single node cluster?
> >
> >
> > Finally the __consumer_offsets topic should probably be masked out from
> > mirroring without using whitelists or blacklists
> >
> >
> > thanks
> > /svante
> >
>


mirrormaker tool in 0.82beta

2015-01-05 Thread svante karlsson
I'm using 0.82beta and I'm trying to push data with the mirrormaker tool
from several remote sites to two datacenters. I'm testing this from a node
containing zk, broker and mirrormaker and the data is pushed to a "normal"
cluster. 3 zk and 4 brokers with replication.

While the configuration seems straight forward things are a bit shaky.

First, if something goes wrong on either consumer or producer side it stops
transmitting data without dying which makes it harder to run under a
supervisor (upstart in my case)

I start pushing data to site 02 and mirrormaker pushes it to my datacenter
- so far all good...

But, when I run. (on my remote site)
kafka-consumer-offset-checker.sh
--group mirrormaker.from.site_02.to.site_ktv --zookeeper 192.168.0.106

Could not fetch offset for [saka.test.ext_datastream,7] due to
kafka.common.NotCoordinatorForConsumerException.
Could not fetch offset for [saka.test.ext_datastream,4] due to
kafka.common.NotCoordinatorForConsumerException.
Could not fetch offset for [saka.test.ext_datastream,2] due to
kafka.common.NotCoordinatorForConsumerException.
Could not fetch offset for [saka.test.ext_datastream,5] due to
kafka.common.NotCoordinatorForConsumerException.
Could not fetch offset for [saka.test.ext_datastream,6] due to
kafka.common.NotCoordinatorForConsumerException.
Could not fetch offset for [saka.test.ext_datastream,3] due to
kafka.common.NotCoordinatorForConsumerException.
Could not fetch offset for [saka.test.ext_datastream,1] due to
kafka.common.NotCoordinatorForConsumerException.
Could not fetch offset for [saka.test.ext_datastream,0] due to
kafka.common.NotCoordinatorForConsumerException.

Group   Topic  Pid Offset  logSize
Lag Owner
mirrormaker.from.site_02.to.site_ktv saka.test.ext_datastream   0
unknown 9310560 unknown
mirrormaker.from.site_02.to.site_ktv_kafka-4-1420497015616-725cd1b5-0
mirrormaker.from.site_02.to.site_ktv saka.test.ext_datastream   1
unknown 9313497 unknown
mirrormaker.from.site_02.to.site_ktv_kafka-4-1420497015616-725cd1b5-0
mirrormaker.from.site_02.to.site_ktv saka.test.ext_datastream   2
unknown 9323623 unknown
mirrormaker.from.site_02.to.site_ktv_kafka-4-1420497015616-725cd1b5-0
mirrormaker.from.site_02.to.site_ktv saka.test.ext_datastream   3
unknown 9334005 unknown
mirrormaker.from.site_02.to.site_ktv_kafka-4-1420497015616-725cd1b5-0
mirrormaker.from.site_02.to.site_ktv saka.test.ext_datastream   4
unknown 9324176 unknown
mirrormaker.from.site_02.to.site_ktv_kafka-4-1420497015616-725cd1b5-0
mirrormaker.from.site_02.to.site_ktv saka.test.ext_datastream   5
unknown 9336504 unknown
mirrormaker.from.site_02.to.site_ktv_kafka-4-1420497015616-725cd1b5-0
mirrormaker.from.site_02.to.site_ktv saka.test.ext_datastream   6
unknown 9316139 unknown
mirrormaker.from.site_02.to.site_ktv_kafka-4-1420497015616-725cd1b5-0
mirrormaker.from.site_02.to.site_ktv saka.test.ext_datastream   7
unknown 9318770 unknown
mirrormaker.from.site_02.to.site_ktv_kafka-4-1420497015616-725cd1b5-0


Since I can push data between my sites something is working but what can be
the case for NotCoordinatorForConsumerException on a single node cluster?


Finally the __consumer_offsets topic should probably be masked out from
mirroring without using whitelists or blacklists


thanks
/svante


Re: Increase in Kafka replication fetcher thread not reducing log replication

2014-12-22 Thread svante karlsson
What kind of network do you have? gigabit? if so 90 MB/s would make
sense
Also since you have one partition what's your raw transfer speed to the
disk? 90 MB/s makes sense here as well...

If I were looking for rapid replica catch up I'd have at least 2x Gbit and
partitioned topics spread out over several physical nodes. That would make
it possible to catch up at ~200 MB/s without totally blocking normal
operation of the cluster. For a single partition you would face lower catch
up rate since all data is coming from one physical disk

/svante








2014-12-22 17:26 GMT+01:00 Jun Rao :

> Increasing replica.fetch.max.bytes will help, but will only get diminishing
> returns at some point.
>
> Increasing num.replica.fetchers will also help. You need to make sure that
> the leaders are balanced in the cluster. See
> http://kafka.apache.org/documentation.html#basic_ops_leader_balancing
>
> Thanks,
>
> Jun
>
> On Wed, Dec 17, 2014 at 8:08 AM, nitin sharma  >
> wrote:
>
> > Hi All,
> >
> > I am trying to figure out best configuration for my Kafka brokers so that
> > in case of restarted, the new node catch up with Leader at quick pace.
> >
> > My test environment has 2 kafka brokers and 1 Topic with one Partition.
> >
> > I first ran the test (Test#1) with default setting, i.e.
> > num.replica.fetchers =1 and replica.fetch.max.bytes = 1048576 Bytes (1
> MB).
> > it took 11min and 40 sec to copy the 37.9 GB @ the rate 55.5MB/sec (
> > 37.9*1024/700)
> >
> > Later I increased the num.replica.fetchers = 5 and
> > replica.fetch.max.bytes=1MB and ran another test (Test #2). I got the
> > replica @ 89 MB/sec. Which is good but i was expecting 4*55 =  221
> MB/sec.
> >
> > i ran two more test and results got much worse:
> > Test#3 : replica thread = 5 and replica.fetch.max.bytes = 5MB.
> >   replication rate = 92.7 MB/sec
> >
> > Test #4: replication thread = 20 and replica.fetch.max.bytes = 5 MB.
> >   replication rate = 93.54 MB/sec
> >
> > Any reason why increasing the replica fetcher thread or increase in fetch
> > max bytes not increasing my replication rate linearly.
> >
> > note: in all the test CPU utilization was not more than 45%
> >
> >
> > Regards,
> > Nitin Kumar Sharma.
> >
>


Re: How do I create a consumer group

2014-12-16 Thread svante karlsson
>Yes - see the offsets.topic.num.partitions and
>offsets.topic.replication.factor broker configs.

Joel, that exactly what I was looking for. I'll look into that and the
source for OffsetsMessageFormatter later today!


thanks

svante



>


Re: How do I create a consumer group

2014-12-15 Thread svante karlsson
What's your use case for knowing all consumer groups? For admin?

Yes, I need to monitor producer and consumer lags from several
interconnected kafka sites.

/svante

2014-12-16 0:34 GMT+01:00 Jun Rao :
>
> What's your use case for knowing all consumer groups? For admin?
>
> Thanks,
>
> Jun
>
> On Fri, Dec 12, 2014 at 2:45 AM, svante karlsson  wrote:
>
> > Disregard the creation question - we must have done something wrong
> because
> > now our code is working without obvious changes (on another set of
> > brokers).
> >
> > However it turns out to be difficult to know the existing consumer group
> > strings. Is the message format in __consumer_offsets "public"/stable in
> any
> > way or is there a better way of listing the existing group names?
> >
> > svante
> >
> >
> > 2014-12-11 20:59 GMT+01:00 svante karlsson :
> > >
> > > We're using 0.82 beta and  a homegrown c++ async library based on boost
> > > asio that has support for the offset api.
> > > (apikeys  OffsetCommitRequest = 8, OffsetFetchRequest = 9,
> > > ConsumerMetadataRequest = 10)
> > >
> > > If we use a java client and commit an offset then the consumer group
> > shows
> > > up in the response from ConsumerMetadataRequest. However I cant figure
> > out
> > > how to create a new one using the Kafka API.
> > >
> > > Also, my __consumer_offsets topic shows up with a replication factor of
> > 1.
> > > Is that changeable?
> > >
> > > thanks,
> > > svante
> > >
> >
>


Re: How do I create a consumer group

2014-12-12 Thread svante karlsson
If I understand KAFKA-1476 it is only a command line tool that gives access
by using ZKUtils not an API to Kafka. We're looking for a Kafka API so I
guess that this functionality is missing.

thanks for the pointer

Svante Karlsson




2014-12-12 19:03 GMT+01:00 Jiangjie Qin :
>
> KAFKA-1476 is addressing this need, but it¹s not checked in yet. Currently
> maybe you can use zookeeper client to check the zookeeper path.
>
> ‹Jiangjie (Becket) Qin
>
> On 12/12/14, 2:45 AM, "svante karlsson"  wrote:
>
> >Disregard the creation question - we must have done something wrong
> >because
> >now our code is working without obvious changes (on another set of
> >brokers).
> >
> >However it turns out to be difficult to know the existing consumer group
> >strings. Is the message format in __consumer_offsets "public"/stable in
> >any
> >way or is there a better way of listing the existing group names?
> >
> >svante
> >
> >
> >2014-12-11 20:59 GMT+01:00 svante karlsson :
> >>
> >> We're using 0.82 beta and  a homegrown c++ async library based on boost
> >> asio that has support for the offset api.
> >> (apikeys  OffsetCommitRequest = 8, OffsetFetchRequest = 9,
> >> ConsumerMetadataRequest = 10)
> >>
> >> If we use a java client and commit an offset then the consumer group
> >>shows
> >> up in the response from ConsumerMetadataRequest. However I cant figure
> >>out
> >> how to create a new one using the Kafka API.
> >>
> >> Also, my __consumer_offsets topic shows up with a replication factor of
> >>1.
> >> Is that changeable?
> >>
> >> thanks,
> >> svante
> >>
>
>


Re: How do I create a consumer group

2014-12-12 Thread svante karlsson
Disregard the creation question - we must have done something wrong because
now our code is working without obvious changes (on another set of
brokers).

However it turns out to be difficult to know the existing consumer group
strings. Is the message format in __consumer_offsets "public"/stable in any
way or is there a better way of listing the existing group names?

svante


2014-12-11 20:59 GMT+01:00 svante karlsson :
>
> We're using 0.82 beta and  a homegrown c++ async library based on boost
> asio that has support for the offset api.
> (apikeys  OffsetCommitRequest = 8, OffsetFetchRequest = 9,
> ConsumerMetadataRequest = 10)
>
> If we use a java client and commit an offset then the consumer group shows
> up in the response from ConsumerMetadataRequest. However I cant figure out
> how to create a new one using the Kafka API.
>
> Also, my __consumer_offsets topic shows up with a replication factor of 1.
> Is that changeable?
>
> thanks,
> svante
>


How do I create a consumer group

2014-12-11 Thread svante karlsson
We're using 0.82 beta and  a homegrown c++ async library based on boost
asio that has support for the offset api.
(apikeys  OffsetCommitRequest = 8, OffsetFetchRequest = 9,
ConsumerMetadataRequest = 10)

If we use a java client and commit an offset then the consumer group shows
up in the response from ConsumerMetadataRequest. However I cant figure out
how to create a new one using the Kafka API.

Also, my __consumer_offsets topic shows up with a replication factor of 1.
Is that changeable?

thanks,
svante


Re: Producer connection unsucessfull

2014-12-05 Thread svante karlsson
I haven't run the sandbox but check if the kafka server is started at all.

ps -ef | grep kafka



2014-12-05 14:34 GMT+01:00 Marco :

> Hi,
>
> I've installed the Hortonworks Sandbox and try to get into Kafka.
>
> Unfortunately, even the simple tutorial does not work :(
> http://kafka.apache.org/documentation.html#introduction
>
> If I try to send a message via
>
> bin/kafka-console-producer.sh --broker-list localhost:9092 --topic test
>
> I get
>
>  ERROR Producer connection to localhost:9092 unsuccessful
> (kafka.producer.SyncProducer)
>
> java.net.ConnectException: Connection refused
> at sun.nio.ch.Net.connect0(Native Method)
> at sun.nio.ch.Net.connect(Net.java:465)
> at sun.nio.ch.Net.connect(Net.java:457)
> at sun.nio.ch.SocketChannelImpl.connect(SocketChannelImpl.java:670)
> at kafka.network.BlockingChannel.connect(BlockingChannel.scala:57)
> at kafka.producer.SyncProducer.connect(SyncProducer.scala:141)
> at
> kafka.producer.SyncProducer.getOrMakeConnection(SyncProducer.scala:156)
> at
> kafka.producer.SyncProducer.kafka$producer$SyncProducer$$doSend(SyncProducer.scala:68)
> at kafka.producer.SyncProducer.send(SyncProducer.scala:112)
> at
> kafka.client.ClientUtils$.fetchTopicMetadata(ClientUtils.scala:53)
> at
> kafka.producer.BrokerPartitionInfo.updateInfo(BrokerPartitionInfo.scala:82)
> at
> kafka.producer.async.DefaultEventHandler$$anonfun$handle$1.apply$mcV$sp(DefaultEventHandler.scala:67)
> at kafka.utils.Utils$.swallow(Utils.scala:167)
> at kafka.utils.Logging$class.swallowError(Logging.scala:106)
> at kafka.utils.Utils$.swallowError(Utils.scala:46)
> at
> kafka.producer.async.DefaultEventHandler.handle(DefaultEventHandler.scala:67)
> at
> kafka.producer.async.ProducerSendThread.tryToHandle(ProducerSendThread.scala:104)
> at
> kafka.producer.async.ProducerSendThread$$anonfun$processEvents$3.apply(ProducerSendThread.scala:87)
> at
> kafka.producer.async.ProducerSendThread$$anonfun$processEvents$3.apply(ProducerSendThread.scala:67)
> at scala.collection.immutable.Stream.foreach(Stream.scala:547)
> at
> kafka.producer.async.ProducerSendThread.processEvents(ProducerSendThread.scala:66)
>
> In my server.properties the
> host is set to host.name=sandbox.hortonworks.com, which is correct.
>
> Thanks for any help,
> Marco
>
>
>
> --
> Viele Grüße,
> Marco
>


Re: KafkaException: Should not set log end offset on partition

2014-12-04 Thread svante karlsson
Thanks, kind of obvious afterwards :-). I used ip addresses in my ansible
configuration so everything else worked.

2 ->
{"jmx_port":13005,"timestamp":"1417702658359","host":"ubuntu","version":1,"port":9092}...
4 ->
{"jmx_port":13005,"timestamp":"1417698861869","host":"ubuntu","version":1,"port":9092}
etc...

best regards

svante










2014-12-04 1:37 GMT+01:00 Jun Rao :

> Could you look at the broker registration in Zookeeper (
>
> https://cwiki.apache.org/confluence/display/KAFKA/Kafka+data+structures+in+Zookeeper
> )
> and make sure the 3 registered hosts are unique?
>
> Thanks,
>
> Jun
>
> On Wed, Dec 3, 2014 at 5:54 AM, svante karlsson  wrote:
>
> > I've installed (for ansible scripting testing purposes) 3 VM's each
> > containing kafka & zookeeer clustered together
> >
> > Ubuntu 14.04
> > Zookeepers are 3.4.6 and kafka 2.11-0.8.2-beta
> > The kafka servers have broker id's 2, 4, 6
> >
> > The zookeepers seems happy.
> > The kafka servers start up and seems happy.
> >
> > I can created two test topics as in the getting started guide
> > bin/kafka-topics.sh --create --zookeeper localhost:2181
> > --replication-factor 1 --partitions 1 --topic test
> > bin/kafka-topics.sh --create --zookeeper localhost:2181
> > --replication-factor 3 --partitions 32 --topic test3
> >
> > I published some messages on each topic and after a while I noticed that
> I
> > ran out of disk space and there are millions of logs in syslog similar to
> > the one below.
> >
> > Any hints on what can cause this problem?
> >
> >
> > /svante
> >
> >
> > Dec  2 07:40:17 ubuntu supervisord: kafka-broker [2014-12-02
> 07:40:17,120]
> > WARN [Replica Manager on Broker 4]: Fetch request with  correlation id 21
> > from client ReplicaFetcherThread-0-6 on partition [test3,3] failed due to
> > Leader not local for partition [test3, 3] on broker 4
> > (kafka.server.ReplicaManager)
> > Dec  2 07:40:17 ubuntu supervisord: kafka-broker [2014-12-02
> 07:40:17,120]
> > WARN [Replica Manager on Broker 4]: Fetch request with correlation id 21
> > from client ReplicaFetcherThread-0-6 on partition [test3,4] failed due to
> > Leader not local for partition [test3, 4] on broker 4
> > (kafka.server.ReplicaManager)
> > Dec  2 07:40:17 ubuntu supervisord: kafka-broker [2014-12-02
> 07:40:17,120]
> > WARN [Replica Manager on Broker 4]: Fetch request with  correlation id 21
> > from client ReplicaFetcherThread-0-6 on partition [test3,26] failed due
> to
> > Leader not local for partition [test3,26] on broker 4
> > (kafka.server.ReplicaManager)
> > Dec  2 07:40:17 ubuntu supervisord: kafka-broker [2014-12-02
> 07:40:17,121]
> > ERROR [KafkaApi-4] error when handling request Name: FetchRequest;
> Version:
> > 0; CorrelationId: 21; ClientId: ReplicaFetcherThread-0-6; ReplicaId: 4;
> > MaxWait: 500 ms; MinBytes: 1 bytes; RequestInfo: [test3,22] ->
> > PartitionFetchInfo(0,1048576),[test3,31] ->
> > PartitionFetchInfo(0,1048576),[test3,21]
> > ->PartitionFetchInfo(0,1048576),[test3,13] ->
> > PartitionFetchInfo(0,1048576),[test3,9] ->
> > PartitionFetchInfo(0,1048576),[test3,28] ->
> > PartitionFetchInfo(0,1048576),[test3,27] ->
> > PartitionFetchInfo(0,1048576),[test3,15] ->
> > PartitionFetchInfo(0,1048576),[test3,7] ->
> > PartitionFetchInfo(0,1048576),[test3,8] ->
> > PartitionFetchInfo(0,1048576),[test3,2] ->
> > PartitionFetchInfo(0,1048576),[test3,19] ->
> > PartitionFetchInfo(0,1048576),[test3,25] ->
> > PartitionFetchInfo(0,1048576),[test3,20] ->
> > PartitionFetchInfo(0,1048576),[test3,14] ->
> > PartitionFetchInfo(0,1048576),[test2,0] ->
> > PartitionFetchInfo(0,1048576),[test3,16] ->
> > PartitionFetchInfo(0,1048576),[test3,1] ->
> > PartitionFetchInfo(0,1048576),[test3,10] ->
> > PartitionFetchInfo(0,1048576),[test3,3] ->
> > PartitionFetchInfo(0,1048576),[test3,4] ->
> > PartitionFetchInfo(0,1048576),[test3,26] -> PartitionFetchInfo(0,1048576)
> > (kafka.server.KafkaApis)
> > Dec  2 07:40:17 ubuntu supervisord: kafka-broker
> > kafka.common.KafkaException: Should not set log end offset on partition
> > [test3,22]'s local replica 4
> > Dec  2 07:40:17 ubuntu supervisord: kafka-broker #011at
> > kafka.cluster.Replica.logEndOffset_$eq(Replica.scala:52)
> > Dec  2 07:40:17 ubuntu supervisord: kafka-broker #011at
> >
&g

Re: KafkaException: Should not set log end offset on partition

2014-12-03 Thread svante karlsson
I found some logs like this before everything started to go wrong

...
[2014-12-02 07:08:11,722] WARN Partition [test3,13] on broker 2: No
checkpointed highwatermark is found for partition [test3,7]
(kafka.cluster.Partition)
[2014-12-02 07:08:11,722] WARN Partition [test3,7] on broker 2: No
checkpointed highwatermark is found for partition [test3,7]
(kafka.cluster.Partition)


and further down

[2014-12-02 07:08:11,740] INFO Truncating log test3-13 to offset 0.
(kafka.log.Log)
[2014-12-02 07:08:11,740] INFO Truncating log test3-9 to offset 0.
(kafka.log.Log)
[2014-12-02 07:08:11,740] INFO Truncating log test3-6 to offset 0.
(kafka.log.Log)
[2014-12-02 07:08:11,740] INFO Truncating log test3-28 to offset 0.
(kafka.log.Log)
[2014-12-02 07:08:11,741] INFO Truncating log test3-27 to offset 0.
(kafka.log.Log)
[2014-12-02 07:08:11,741] INFO Truncating log test3-15 to offset 0.
(kafka.log.Log)
[2014-12-02 07:08:11,741] INFO Truncating log test3-7 to offset 0.
(kafka.log.Log)


after that the logs from the first post starts. I don't know if that helps
or explains anything

/svante


KafkaException: Should not set log end offset on partition

2014-12-03 Thread svante karlsson
I've installed (for ansible scripting testing purposes) 3 VM's each
containing kafka & zookeeer clustered together

Ubuntu 14.04
Zookeepers are 3.4.6 and kafka 2.11-0.8.2-beta
The kafka servers have broker id's 2, 4, 6

The zookeepers seems happy.
The kafka servers start up and seems happy.

I can created two test topics as in the getting started guide
bin/kafka-topics.sh --create --zookeeper localhost:2181
--replication-factor 1 --partitions 1 --topic test
bin/kafka-topics.sh --create --zookeeper localhost:2181
--replication-factor 3 --partitions 32 --topic test3

I published some messages on each topic and after a while I noticed that I
ran out of disk space and there are millions of logs in syslog similar to
the one below.

Any hints on what can cause this problem?


/svante


Dec  2 07:40:17 ubuntu supervisord: kafka-broker [2014-12-02 07:40:17,120]
WARN [Replica Manager on Broker 4]: Fetch request with  correlation id 21
from client ReplicaFetcherThread-0-6 on partition [test3,3] failed due to
Leader not local for partition [test3, 3] on broker 4
(kafka.server.ReplicaManager)
Dec  2 07:40:17 ubuntu supervisord: kafka-broker [2014-12-02 07:40:17,120]
WARN [Replica Manager on Broker 4]: Fetch request with correlation id 21
from client ReplicaFetcherThread-0-6 on partition [test3,4] failed due to
Leader not local for partition [test3, 4] on broker 4
(kafka.server.ReplicaManager)
Dec  2 07:40:17 ubuntu supervisord: kafka-broker [2014-12-02 07:40:17,120]
WARN [Replica Manager on Broker 4]: Fetch request with  correlation id 21
from client ReplicaFetcherThread-0-6 on partition [test3,26] failed due to
Leader not local for partition [test3,26] on broker 4
(kafka.server.ReplicaManager)
Dec  2 07:40:17 ubuntu supervisord: kafka-broker [2014-12-02 07:40:17,121]
ERROR [KafkaApi-4] error when handling request Name: FetchRequest; Version:
0; CorrelationId: 21; ClientId: ReplicaFetcherThread-0-6; ReplicaId: 4;
MaxWait: 500 ms; MinBytes: 1 bytes; RequestInfo: [test3,22] ->
PartitionFetchInfo(0,1048576),[test3,31] ->
PartitionFetchInfo(0,1048576),[test3,21]
->PartitionFetchInfo(0,1048576),[test3,13] ->
PartitionFetchInfo(0,1048576),[test3,9] ->
PartitionFetchInfo(0,1048576),[test3,28] ->
PartitionFetchInfo(0,1048576),[test3,27] ->
PartitionFetchInfo(0,1048576),[test3,15] ->
PartitionFetchInfo(0,1048576),[test3,7] ->
PartitionFetchInfo(0,1048576),[test3,8] ->
PartitionFetchInfo(0,1048576),[test3,2] ->
PartitionFetchInfo(0,1048576),[test3,19] ->
PartitionFetchInfo(0,1048576),[test3,25] ->
PartitionFetchInfo(0,1048576),[test3,20] ->
PartitionFetchInfo(0,1048576),[test3,14] ->
PartitionFetchInfo(0,1048576),[test2,0] ->
PartitionFetchInfo(0,1048576),[test3,16] ->
PartitionFetchInfo(0,1048576),[test3,1] ->
PartitionFetchInfo(0,1048576),[test3,10] ->
PartitionFetchInfo(0,1048576),[test3,3] ->
PartitionFetchInfo(0,1048576),[test3,4] ->
PartitionFetchInfo(0,1048576),[test3,26] -> PartitionFetchInfo(0,1048576)
(kafka.server.KafkaApis)
Dec  2 07:40:17 ubuntu supervisord: kafka-broker
kafka.common.KafkaException: Should not set log end offset on partition
[test3,22]'s local replica 4
Dec  2 07:40:17 ubuntu supervisord: kafka-broker #011at
kafka.cluster.Replica.logEndOffset_$eq(Replica.scala:52)
Dec  2 07:40:17 ubuntu supervisord: kafka-broker #011at
kafka.server.ReplicaManager.updateReplicaLEOAndPartitionHW(ReplicaManager.scala:565)
Dec  2 07:40:17 ubuntu supervisord: kafka-broker #011at
kafka.server.KafkaApis$$anonfun$recordFollowerLogEndOffsets$2.apply(KafkaApis.scala:348)
Dec  2 07:40:17 ubuntu supervisord: kafka-broker #011at
kafka.server.KafkaApis$$anonfun$recordFollowerLogEndOffsets$2.apply(KafkaApis.scala:346)
Dec  2 07:40:17 ubuntu supervisord: kafka-broker #011at
scala.collection.MapLike$MappedValues$$anonfun$foreach$3.apply(MapLike.scala:245)
Dec  2 07:40:17 ubuntu supervisord: kafka-broker #011at
scala.collection.MapLike$MappedValues$$anonfun$foreach$3.apply(MapLike.scala:245)
Dec  2 07:40:17 ubuntu supervisord: kafka-broker #011at
scala.collection.TraversableLike$WithFilter$$anonfun$foreach$1.apply(TraversableLike.scala:778)
Dec  2 07:40:17 ubuntu supervisord: kafka-broker #011at
scala.collection.immutable.HashMap$HashMap1.foreach(HashMap.scala:221)
Dec  2 07:40:17 ubuntu supervisord: kafka-broker #011at
scala.collection.immutable.HashMap$HashTrieMap.foreach(HashMap.scala:428)
Dec  2 07:40:17 ubuntu supervisord: kafka-broker #011at
scala.collection.TraversableLike$WithFilter.foreach(TraversableLike.scala:777)
Dec  2 07:40:17 ubuntu supervisord: kafka-broker #011at
scala.collection.MapLike$MappedValues.foreach(MapLike.scala:245)
Dec  2 07:40:17 ubuntu supervisord: kafka-broker #011at
kafka.server.KafkaApis.recordFollowerLogEndOffsets(KafkaApis.scala:346)
Dec  2 07:40:17 ubuntu supervisord: kafka-broker #011at
kafka.server.KafkaApis.handleFetchRequest(KafkaApis.scala:311)
Dec  2 07:40:17 ubuntu supervisord: kafka-broker #011at
kafka.server.KafkaApis.handle(KafkaApis.scala:60)
Dec  2 07:40:17 ubuntu supervisord: kafka-b

Re: Partition key not working properly

2014-11-25 Thread svante karlsson
By default, the partition key is used for hashing then it's placed in a
partition that has the appropriate hashed keyspace.

If you have three physical partitions and then give the partition key "5"
it has nothing to do with physical partition 5 (that does not exist) ,
similar to physical: partition = hash("5") mod 3


The only guarantee is that all messages with the same key goes to the same
partition. This is useful to make sure that for example all logs from the
same ip goest to the same partition which means that they can be read by
the same producer.

/svante



2014-11-26 2:42 GMT+01:00 Haoming Zhang :

>
>
>
> Hi all,
>
> I'm struggling with how to use the partition key mechanism properly. My
> logic is set the partition number as 3, then  create three partition keys
> as "0", "1", "2", then use the partition keys to create three KeyedMessage
> such as
> KeyedMessage(topic, "0", message),
> KeyedMessage(topic, "1", message),
> KeyedMessage(topic, "2", message)
>
> After this, creating a producer instance to send out all the KeyedMessage.
>
> I expecting each KeyedMessage should enter to different partitions
> according to the different partition keys, which means
> KeyedMessage(topic, "0", message) go to Partition 0,
> KeyedMessage(topic, "1", message) go to Partition 1,
> KeyedMessage(topic, "2", message) go to Partition 2
>
> I'm using Kafka-web-console to watch the topic status, but the result is
> not like what I'm expecting. KeyedMessage still go to partitions randomly,
> some times two KeyedMessage will enter the same partition even they have
> different partition keys, .
>
> Not sure whether my logic is incorrect or I didn't understand the
> partition key mechanism correctly. Anyone could provides some sample code
> or explanation would be great!
>
> Thanks,
> Haoming
>
>


Re: Using Kafka for ETL from DW to Hadoop

2014-10-23 Thread svante karlsson
Both variants will work well (if your kafka cluster can handle the full
volume of the transmitted data for the duration of the ttl on each topic) .

I would run the whole thing through kafka since you will be "stresstesting"
you production flow - consider if you at some later time lost your
destination tables - how would you then repopulate them? It would be nice
to know that your normal flow handles this situation.





2014-10-23 12:08 GMT+02:00 Po Cheung :

> Hello,
>
> We are planning to set up a data pipeline and send periodic, incremental
> updates from Teradata to Hadoop via Kafka.  For a large DW table with
> hundreds of GB of data, is it okay (in terms of performance) to use Kafka
> for the initial bulk data load?  Or will Sqoop with Teradata connector be
> more appropriate?
>
>
> Thanks,
> Po


Re: C/C++ kafka client API's

2014-10-14 Thread svante karlsson
Magnus,

Do you have any plans to update the protocol to 0.9? I built a boost asio
based version half a year ago but that did only implement v0.8 and I have
not found time to upgrade it. It is a quite big job to have something equal
to java high and low level API.

/svante


>
>


Re: How to use RPC mechanism in Kafka?

2014-09-22 Thread svante karlsson
1 )  HA proxy -> node.js (rest api). Use aerospike as session store if you
need one.
2)   Difficult since the prioritization must be passed to lower levels and
that's usually "hard". (Get rid of this constraint and go for a SLA - like
99,9% within 50ms or something like that)
2b) Group you customers on different servers with different SLAs
3)   Log your requests in kafka from node.js (but do not expect replies on
this path)

/svante



2014-09-22 13:32 GMT+02:00 lavish goel :

> Thank you Svante for response.
>
> *Below are some requirements that makes us to consider Message Broker.*
> 1. We have to handle 30,000 TPS.
> 2. We need to prioritize the requests.
> 3. Request Data should not be lost.
>
>
> Thanks
>
> Regards
> Lavish Goel
>
>
>
> On Mon, Sep 22, 2014 at 4:20 PM, svante karlsson  wrote:
>
> > Why do you want a message broker for RPC?
> > What is "large amounts of requests"?
> >
> > /svante
> >
> >
> >
> >
> >
> > 2014-09-22 12:38 GMT+02:00 lavish goel :
> >
> > > Thank you so much Svante for your response.
> > >
> > > The application which we are designing depends lot upon
> request/response
> > > mechanism. In that case should we move to some other message broker? If
> > > yes, Can you please tell me the name which is best for this use case
> and
> > > can handle large amount of requests?
> > > Is there any workaround in Kafka? If Yes, Please tell me.
> > >
> > > Thanks
> > >
> > > Warm Regards
> > > Lavish Goel
> > >
> > >
> > > On Mon, Sep 22, 2014 at 3:41 PM, svante karlsson  wrote:
> > >
> > > > Wrong use-case. Kafka is a queue (in normal case a TTL (time to live)
> > on
> > > > messages). There is no correlation between producers and consumers.
> > There
> > > > is no concept of a consumed message. There is no "request" and no
> > > > "response".
> > > >
> > > > You can produce messages (in another topic) as result of your
> > processing
> > > > but you cant respond to a producer.
> > > >
> > > > /svante
> > > >
> > > >
> > > > 2014-09-22 11:42 GMT+02:00 lavish goel :
> > > >
> > > > > Thank you for your response. I have gone through the protocol wiki.
> > > Now I
> > > > > have some understanding of it.
> > > > > Sorry for again asking the question.
> > > > >
> > > > > I want to know, Is it possible:
> > > > >
> > > > > Let say, I have producer PA,PB,PC. They send request messages A,B,C
> > > > > respectively. Now these messages goes to topic. There are few
> > consumers
> > > > > that took message from topic and do some computation on message.
> > > > > Let say after computation request A,B,C becomes AA,BB,CC
> > respectively.
> > > > Now
> > > > > I want to send this response to Producer A,B,C respectively.
> > > > >
> > > > > Is it possible? if yes. Can You please explain in bit detail.
> > > > >
> > > > > Thank you so much
> > > > >
> > > > >
> > > > > Regards
> > > > > Lavish Goel
> > > > >
> > > > >
> > > > > On Sat, Sep 20, 2014 at 10:18 PM, Guozhang Wang <
> wangg...@gmail.com>
> > > > > wrote:
> > > > >
> > > > > > Also you can take a look at the SimpleConsumer and SyncProducer
> > > > > > implementation.
> > > > > >
> > > > > > On Sat, Sep 20, 2014 at 9:47 AM, Guozhang Wang <
> wangg...@gmail.com
> > >
> > > > > wrote:
> > > > > >
> > > > > >> You can take a look at the protocol wiki to understand the
> > request /
> > > > > >> response data types. Kafka server accepts socket connection for
> > > > > requests.
> > > > > >>
> > > > > >> Guozhang
> > > > > >>
> > > > > >> On Fri, Sep 19, 2014 at 10:42 PM, lavish goel <
> lavis...@gmail.com
> > >
> > > > > wrote:
> > > > > >>
> > > > > >>> Thank you for your response. I want to implement
> request/response
> > > > type
> > > 

Re: How to use RPC mechanism in Kafka?

2014-09-22 Thread svante karlsson
Why do you want a message broker for RPC?
What is "large amounts of requests"?

/svante





2014-09-22 12:38 GMT+02:00 lavish goel :

> Thank you so much Svante for your response.
>
> The application which we are designing depends lot upon request/response
> mechanism. In that case should we move to some other message broker? If
> yes, Can you please tell me the name which is best for this use case and
> can handle large amount of requests?
> Is there any workaround in Kafka? If Yes, Please tell me.
>
> Thanks
>
> Warm Regards
> Lavish Goel
>
>
> On Mon, Sep 22, 2014 at 3:41 PM, svante karlsson  wrote:
>
> > Wrong use-case. Kafka is a queue (in normal case a TTL (time to live) on
> > messages). There is no correlation between producers and consumers. There
> > is no concept of a consumed message. There is no "request" and no
> > "response".
> >
> > You can produce messages (in another topic) as result of your processing
> > but you cant respond to a producer.
> >
> > /svante
> >
> >
> > 2014-09-22 11:42 GMT+02:00 lavish goel :
> >
> > > Thank you for your response. I have gone through the protocol wiki.
> Now I
> > > have some understanding of it.
> > > Sorry for again asking the question.
> > >
> > > I want to know, Is it possible:
> > >
> > > Let say, I have producer PA,PB,PC. They send request messages A,B,C
> > > respectively. Now these messages goes to topic. There are few consumers
> > > that took message from topic and do some computation on message.
> > > Let say after computation request A,B,C becomes AA,BB,CC respectively.
> > Now
> > > I want to send this response to Producer A,B,C respectively.
> > >
> > > Is it possible? if yes. Can You please explain in bit detail.
> > >
> > > Thank you so much
> > >
> > >
> > > Regards
> > > Lavish Goel
> > >
> > >
> > > On Sat, Sep 20, 2014 at 10:18 PM, Guozhang Wang 
> > > wrote:
> > >
> > > > Also you can take a look at the SimpleConsumer and SyncProducer
> > > > implementation.
> > > >
> > > > On Sat, Sep 20, 2014 at 9:47 AM, Guozhang Wang 
> > > wrote:
> > > >
> > > >> You can take a look at the protocol wiki to understand the request /
> > > >> response data types. Kafka server accepts socket connection for
> > > requests.
> > > >>
> > > >> Guozhang
> > > >>
> > > >> On Fri, Sep 19, 2014 at 10:42 PM, lavish goel 
> > > wrote:
> > > >>
> > > >>> Thank you for your response. I want to implement request/response
> > type
> > > >>> model.
> > > >>> For eg. I have a producer that publish a message(He wants to know
> > some
> > > >>> status) to topic. A consumer pulls the message and processes the
> > > request.
> > > >>> Now I want that, the response of this request should go that
> > producer.
> > > Can
> > > >>> you please tell me how can I implement this.
> > > >>>
> > > >>> Thanks
> > > >>> Lavish
> > > >>>
> > > >>> On Sat, Sep 20, 2014 at 1:26 AM, Guozhang Wang  >
> > > >>> wrote:
> > > >>>
> > > >>>> Do you mean that you want to know the protocol?
> > > >>>>
> > > >>>>
> > > >>>>
> > >
> >
> https://cwiki.apache.org/confluence/display/KAFKA/A+Guide+To+The+Kafka+Protocol
> > > >>>>
> > > >>>> Guozhang
> > > >>>>
> > > >>>> On Fri, Sep 19, 2014 at 12:18 PM, lavish goel  >
> > > >>>> wrote:
> > > >>>>
> > > >>>> > Hi,
> > > >>>> >
> > > >>>> > Please tell me how to use request/response mechanism in kafka?
> > > >>>> > Thanks
> > > >>>> > Lavish Goel
> > > >>>> >
> > > >>>>
> > > >>>>
> > > >>>>
> > > >>>> --
> > > >>>> -- Guozhang
> > > >>>>
> > > >>>
> > > >>>
> > > >>
> > > >>
> > > >> --
> > > >> -- Guozhang
> > > >>
> > > >
> > > >
> > > >
> > > > --
> > > > -- Guozhang
> > > >
> > >
> >
>


Re: How to use RPC mechanism in Kafka?

2014-09-22 Thread svante karlsson
Wrong use-case. Kafka is a queue (in normal case a TTL (time to live) on
messages). There is no correlation between producers and consumers. There
is no concept of a consumed message. There is no "request" and no
"response".

You can produce messages (in another topic) as result of your processing
but you cant respond to a producer.

/svante


2014-09-22 11:42 GMT+02:00 lavish goel :

> Thank you for your response. I have gone through the protocol wiki. Now I
> have some understanding of it.
> Sorry for again asking the question.
>
> I want to know, Is it possible:
>
> Let say, I have producer PA,PB,PC. They send request messages A,B,C
> respectively. Now these messages goes to topic. There are few consumers
> that took message from topic and do some computation on message.
> Let say after computation request A,B,C becomes AA,BB,CC respectively. Now
> I want to send this response to Producer A,B,C respectively.
>
> Is it possible? if yes. Can You please explain in bit detail.
>
> Thank you so much
>
>
> Regards
> Lavish Goel
>
>
> On Sat, Sep 20, 2014 at 10:18 PM, Guozhang Wang 
> wrote:
>
> > Also you can take a look at the SimpleConsumer and SyncProducer
> > implementation.
> >
> > On Sat, Sep 20, 2014 at 9:47 AM, Guozhang Wang 
> wrote:
> >
> >> You can take a look at the protocol wiki to understand the request /
> >> response data types. Kafka server accepts socket connection for
> requests.
> >>
> >> Guozhang
> >>
> >> On Fri, Sep 19, 2014 at 10:42 PM, lavish goel 
> wrote:
> >>
> >>> Thank you for your response. I want to implement request/response type
> >>> model.
> >>> For eg. I have a producer that publish a message(He wants to know some
> >>> status) to topic. A consumer pulls the message and processes the
> request.
> >>> Now I want that, the response of this request should go that producer.
> Can
> >>> you please tell me how can I implement this.
> >>>
> >>> Thanks
> >>> Lavish
> >>>
> >>> On Sat, Sep 20, 2014 at 1:26 AM, Guozhang Wang 
> >>> wrote:
> >>>
>  Do you mean that you want to know the protocol?
> 
> 
> 
> https://cwiki.apache.org/confluence/display/KAFKA/A+Guide+To+The+Kafka+Protocol
> 
>  Guozhang
> 
>  On Fri, Sep 19, 2014 at 12:18 PM, lavish goel 
>  wrote:
> 
>  > Hi,
>  >
>  > Please tell me how to use request/response mechanism in kafka?
>  > Thanks
>  > Lavish Goel
>  >
> 
> 
> 
>  --
>  -- Guozhang
> 
> >>>
> >>>
> >>
> >>
> >> --
> >> -- Guozhang
> >>
> >
> >
> >
> > --
> > -- Guozhang
> >
>


Re: 答复: kafka performance question

2014-05-26 Thread svante karlsson
Do you read from the file in the callback from kafka? I just implemented
c++ bindings and in one of the tests i did I got the following results:

1000 messages per batch (fairly small messages ~150 bytes) and then wait
for the network layer to ack the send (not server ack)'s before putting
another message on the tcp socket. This seems to give me a average latency
of 17 ms. Througput about 10MB/s .

If you are serializing your requests and is reading data from disk between
calls to kafka then that would easily explain some added milliseconds in
each call and thus a reduced throughput. Partitioning will not reduce
latency.

/svante






2014-05-26 6:18 GMT+02:00 Zhujie (zhujie, Smartcare) <
first.zhu...@huawei.com>:

> Only one broker,and eight partitions, async mode.
>
> Increase the number of batch.num.messages is useless.
>
> We split the whole file into 1K per block.
>
>
> -邮件原件-
> 发件人: robairrob...@gmail.com [mailto:robairrob...@gmail.com] 代表 Robert
> Turner
> 发送时间: 2014年5月16日 13:45
> 收件人: users@kafka.apache.org
> 主题: Re: kafka performance question
>
> A couple of thoughts spring to mind, are you sending the whole file as 1
> message? and is your producer code using sync or async mode?
>
> Cheers
>Rob.
>
>
> On 14 May 2014 15:49, Jun Rao  wrote:
>
> > How many brokers and partitions do you have? You may try increasing
> > batch.num.messages.
> >
> > Thanks,
> >
> > Jun
> >
> >
> > On Tue, May 13, 2014 at 5:56 PM, Zhujie (zhujie, Smartcare) <
> > first.zhu...@huawei.com> wrote:
> >
> > > Dear all,
> > >
> > > We want to use kafka to collect and dispatch data file, but the
> > > performance is maybe lower than we want.
> > >
> > > In our cluster,there is a provider and a broker. We use a one thread
> > > read file from local disk of provider and send it to broker. The
> > > average throughput is only 3 MB/S~4MB/S.
> > > But if we just use java NIO API to send file ,the throughput can
> > > exceed 200MB/S.
> > > Why the kafka performance is so bad in our test, are we missing
> > something??
> > >
> > >
> > >
> > > Our server:
> > > Cpu: Intel(R) Xeon(R) CPU E5-4650 0 @ 2.70GHz*4 Mem:300G Disk:600G
> > > 15K RPM SAS*8
> > >
> > > Configuration of provider:
> > > props.put("serializer.class", "kafka.serializer.NullEncoder");
> > > props.put("metadata.broker.list", "169.10.35.57:9092");
> > > props.put("request.required.acks", "0"); props.put("producer.type",
> > > "async");//异步
> > > props.put("queue.buffering.max.ms","500");
> > > props.put("queue.buffering.max.messages","10");
> > > props.put("batch.num.messages", "1200");
> > > props.put("queue.enqueue.timeout.ms", "-1");
> > > props.put("send.buffer.bytes", "10240");
> > >
> > > Configuration of broker:
> > >
> > > # Licensed to the Apache Software Foundation (ASF) under one or more
> > > # contributor license agreements.  See the NOTICE file distributed
> > > with # this work for additional information regarding copyright
> ownership.
> > > # The ASF licenses this file to You under the Apache License,
> > > Version 2.0 # (the "License"); you may not use this file except in
> > > compliance with # the License.  You may obtain a copy of the License
> > > at #
> > > #http://www.apache.org/licenses/LICENSE-2.0
> > > #
> > > # Unless required by applicable law or agreed to in writing,
> > > software # distributed under the License is distributed on an "AS
> > > IS" BASIS, # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either
> > > express or
> > implied.
> > > # See the License for the specific language governing permissions
> > > and # limitations under the License.
> > > # see kafka.server.KafkaConfig for additional details and defaults
> > >
> > > # Server Basics
> > > #
> > >
> > > # The id of the broker. This must be set to a unique integer for
> > > each broker.
> > > broker.id=0
> > >
> > > # Socket Server Settings
> > > #
> > >
> > > # The port the socket server listens on
> > > port=9092
> > >
> > > # Hostname the broker will bind to. If not set, the server will bind
> > > to all interfaces #host.name=localhost
> > >
> > > # Hostname the broker will advertise to producers and consumers. If
> > > not set, it uses the # value for "host.name" if configured.
> > > Otherwise, it will use the value returned from #
> > > java.net.InetAddress.getCanonicalHostName().
> > > #advertised.host.name=
> > >
> > > # The port to publish to ZooKeeper for clients to use. If this is
> > > not
> > set,
> > > # it will publish the same port that the broker binds to.
> > > #advertised.port=
> > >
> > > # The number of threads handling network requests
> > > #num.network.threads=2
> > > # The number of threads doing disk I/O
> > > #num.io.threads=8
> > >
> > > # The send buffer (SO_SNDBUF) used by the socket server
> > > #socket.send.buffer.bytes=1048576
> > >
> > > # The receive buffer (SO_RCVBUF) used by the socket server
> > > #so

Re: Adding partitions...

2014-05-23 Thread svante karlsson
No reshuffeling will take place. And reading messages and put them back in
again will not remove the messages from their "old" partition so the same
message will the exist in more than one partition - eventually to get aged
out of the oldest partion.

If you use partitioning to distribute the load then it's possible to just
leave the old data in the existing partitions and let newer data fill all
the partitions.

If the partitioning means something to your applications then you can
always create a new topic and populate that from the old topic. This will
double your data for some time before you can age out the old topic.


2014-05-23 12:44 GMT+02:00 Niclas Hedhman :

> Hi,
>
> we are trying to figure out how to layout our topics and partitions. And
> one thing I can't find in the documentation is;
>
> What happens to data that is sitting in the 'old partitions' when I add a
> new one?
>
> My gut feeling says that everything remains in the partitions as they are,
> and if we need to have partitions associated with keys, we have to
> implement this re-shuffling ourselves, i.e. read all the messages out and
> put them back in.
>
> Is it so? And is there any hooks to assist in this?
>
>
> Cheers
> --
> Niclas Hedhman, Software Developer
> 河南南路555弄15号1901室。
> http://www.qi4j.org - New Energy for Java
>
> I live here; http://tinyurl.com/3xugrbk
> I work here; http://tinyurl.com/6a2pl4j
> I relax here; http://tinyurl.com/2cgsug
>