Re: compatibility: 0.8.1.1 broker, 0.8.2.2 producer
I am using 0.8.2.2 producer with 0.8.1.1 brokers without problems. Version of scala matters if you are building with scala or some other components that use scala. Hope this helps. -- Andrey Yegorov On Wed, Dec 23, 2015 at 1:11 PM, Ewen Cheslack-Postava wrote: > Shlomi, > > You should always upgrade brokers before clients. Newer versions of clients > aren't guaranteed to work with older versions of brokers. > > For scala versions, there is no functional difference. Generally you only > need to worry about the Scala version if you are using the old clients > (which are in the core jar) and the rest of your app requires a specific > Scala version. > > -Ewen > > On Wed, Dec 23, 2015 at 6:31 AM, Shlomi Hazan wrote: > > > Hi All, > > > > Does someone has experience / encountered any issues using a 0.8.2.2 > > producer against a 0.8.1.1 broker (specifically kafka_2.9.2-0.8.1.1)? > > I want to upgrade my existing producer (0.8.2-beta). > > Also, is there a functional difference between the scala versions > > (2.9.2,2.10,2.11)? > > > > Thanks, > > Shlomi > > > > > > -- > Thanks, > Ewen >
Re: KafkaException: Size of FileMessageSet has been truncated during write
Thank you! -- Andrey Yegorov On Wed, May 27, 2015 at 4:42 PM, Jiangjie Qin wrote: > This should be just a message fetch failure. The socket was disconnected > when broker was writing to it. There should not be data loss. > > Jiangjie (Becket) Qin > > On 5/27/15, 11:00 AM, "Andrey Yegorov" wrote: > > >I've noticed a few exceptions in the logs like the one below, does it > >indicate data loss? should I worry about this? > >What is the possible reason for this to happen? > >I am using kafka 0.8.1.1 > > > >ERROR Closing socket for /xx.xxx.xxx.xxx because of error > >(kafka.network.Processor) > > > >kafka.common.KafkaException: Size of FileMessageSet > >/data/kafka/topic-name-11/14340499.log has been truncated > >during write: old size 26935, new size 0 > > > >at kafka.log.FileMessageSet.writeTo(FileMessageSet.scala:144) > > > >at kafka.api.PartitionDataSend.writeTo(FetchResponse.scala:69) > > > >at kafka.network.MultiSend.writeTo(Transmission.scala:102) > > > >at kafka.api.TopicDataSend.writeTo(FetchResponse.scala:124) > > > >at kafka.network.MultiSend.writeTo(Transmission.scala:102) > > > >at kafka.api.FetchResponseSend.writeTo(FetchResponse.scala:219) > > > >at kafka.network.Processor.write(SocketServer.scala:375) > > > >at kafka.network.Processor.run(SocketServer.scala:247) > > > >at java.lang.Thread.run(Thread.java:745) > > > >-- > >Andrey Yegorov > >
KafkaException: Size of FileMessageSet has been truncated during write
I've noticed a few exceptions in the logs like the one below, does it indicate data loss? should I worry about this? What is the possible reason for this to happen? I am using kafka 0.8.1.1 ERROR Closing socket for /xx.xxx.xxx.xxx because of error (kafka.network.Processor) kafka.common.KafkaException: Size of FileMessageSet /data/kafka/topic-name-11/14340499.log has been truncated during write: old size 26935, new size 0 at kafka.log.FileMessageSet.writeTo(FileMessageSet.scala:144) at kafka.api.PartitionDataSend.writeTo(FetchResponse.scala:69) at kafka.network.MultiSend.writeTo(Transmission.scala:102) at kafka.api.TopicDataSend.writeTo(FetchResponse.scala:124) at kafka.network.MultiSend.writeTo(Transmission.scala:102) at kafka.api.FetchResponseSend.writeTo(FetchResponse.scala:219) at kafka.network.Processor.write(SocketServer.scala:375) at kafka.network.Processor.run(SocketServer.scala:247) at java.lang.Thread.run(Thread.java:745) -- Andrey Yegorov
Re: Best way to replace a single broker
As I remember, you can simply stop old broker, start the new one with the same broker id as the old one. It will start syncing replicas from other brokers and eventually will get all of them After this is done (all replicas are in sync) you can trigger leader election (or preferred replica election, whatever it is called) if it does not happen automatically. -- Andrey Yegorov On Thu, May 14, 2015 at 11:12 AM, Rajiv Kurian wrote: > Hi all, > > Sometimes we need to replace a kafka broker because it turns out to be a > bad instance. What is the best way of doing this? > > We have been using the kafka-reassign-partitions.sh to migrate all topics > to the new list of brokers which is the (old list + the new instance - the > bad instance). Then we terminate the bad instance once we ensure that it is > getting no traffic. But it seems like this causes an unnecessary amount of > topic churn and is not equivalent to just moving the partitions the bad > broker was responsible for to the new instance. > > Is there a better way of going about replacing a single instance (not > adding capacity)? I'd ideally like to just be able to move the partitions > from the old broker to the new one instead of a complete rebalance. > > Thanks! >
Re: High Latency in Kafka
I am not familiar with logstash, but in custom log replay tool (used to replay messages logged locally in case if e.g. kafka was not available and useful in some other scenarios) I've seen it reaching 30,000 messages/sec with avg message size of 4.5 kilobytes, all with regular production load on kafka (6 brokers). At this rate sending 30G of logs should take about 4 min. Tool has: one thread to read messages and put into the queue. 5 (configurable) threads that read messages from the queue and send them to kafka, with one producer per thread. I am using new producer from kafka 0.8.2.-beta and async send. I remember that I had to tune some parameters for kafka producer, increased buffer sizes and something else. HTH. -- Andrey Yegorov On Tue, Feb 10, 2015 at 5:54 AM, Vineet Mishra wrote: > Hi Gwen, > > Well I have gone through this link while trying to setup my Logstash Kafka > handler, > > https://github.com/joekiller/logstash-kafka > > I could achieve what I was looking for but the performance is badly > affected while trying to write a big file of GB's. > I guess there should be some way so as to parallelise the existing running > process. > > Thanks! > > On Sun, Feb 8, 2015 at 8:06 PM, Gwen Shapira > wrote: > > > I'm wondering how much of the time is spent by Logstash reading and > > processing the log vs. time spent sending data to Kafka. Also, I'm not > > familiar with log.stash internals, perhaps it can be tuned to send the > data > > to Kafka in larger batches? > > > > At the moment its difficult to tell where is the slowdown. More > information > > about the breakdown of time will help. > > > > You can try Flume's SpoolingDirectory source with Kafka Channel or Sink > and > > see if you get improved performance out of other tools. > > > > > > Gwen > > > > On Sun, Feb 8, 2015 at 12:06 AM, Vineet Mishra > > wrote: > > > > > Hi All, > > > > > > I am having some log files of around 30GB, I am trying to event process > > > these logs by pushing them to Kafka. I could clearly see the throughput > > > achieved while publishing these event to Kafka is quiet slow. > > > > > > So as mentioned for the single log file of 30GB, the Logstash is > > > continuously emitting to Kafka and it is running from more than 2 days > > but > > > still it has processed just 60% of the log data. I was looking out for > a > > > way to increase the efficiency of the publishing the event to kafka as > > with > > > this rate of data ingestion I don't think it will be a good option to > > move > > > ahead. > > > > > > Looking out for performance improvisation for the same. > > > > > > Experts advise required! > > > > > > Thanks! > > > > > >
mirrormaker's configuration to minimize/prevent data loss
As I read, consumer and producer in mirrormaker are independent and use queue to communicate. Therefore consumers keep on consuming/commiting offsets to zk even if producer is failing. Is it still the way it works in 0.8.0, any plans to change? Is there any way to minimize data loss in this case? I am ok with not using async mode on producer, but will it help? Can I configure mirrormaker to exit immediately if producer fails? If this should be a responsibility of an external process, what should I monitor log for to kill the mirroring process in case of error? -- Andrey Yegorov
estimating log.retention.bytes
Hi, Please help me understand how one should estimate upper limit for log.retention.bytes in this situation. Let's say kafka cluster has 3 machines (broker per machine) with 15TB disk space per machine. Cluster will have one topic with 30 partitions and replication factor 2. My thinking is: with replication, I'll have 60 'partitions' spread across 3 machines hence 20 per machine. Max space I can allocate per partition is 15TB/20 = 768GB per partition. Am I on the right track? -- Andrey Yegorov
Re: New Producer Public API
So for each message that I need to send asynchronously I have to create a new instance of callback and hold on to the message? This looks nice in theory but in case of few thousands of request/sec this could use up too much extra memory and push too much to garbage collector, especially in case connection breaks for a few seconds and all this piles up as result of retry logic. I guess I can pool callbacks and do something like setMessage() on callback but this looks like an attempt to workaround limitations of the API. I'd prefer to create one instance of callback per app or per thread and reuse it. Since kafka producer already have messages in the batch and knows the batch that failed, it can pass the message to the onError() callback. Am I over-thinking this? -- Andrey Yegorov On Fri, Jan 24, 2014 at 1:15 PM, Jay Kreps wrote: > If I understand your use case I think usage would be something like > > producer.send(message, new Callback() { > public void onCompletion(RecordSend send) { >if(send.hasError()) > log.write(message); > } > }); > > Reasonable? > > In other words you can include references to any variables you like in the > callback. We could provide the message for you but that would require us to > hang on to the message object for the duration of the call which has memory > implications so I think it is better for people to only do this if they > want to use it. > > -Jay > > > On Fri, Jan 24, 2014 at 1:05 PM, Andrey Yegorov >wrote: > > > I love the callback in send() but I do not see how it helps in case of an > > error. > > > > Imagine the usecase: I want to write messages to the log so I can replay > > them to kafka later in case if async send failed. > > From a brief look at the API I see that I'll get back RecordSend object > > (which is not true already - it was not send in case of error.) From that > > object I can get some info about the error and offset. How d I get the > > original message back so I can write it to the log? Can you please > provide > > an example? > > > > > > > > -- > > Andrey Yegorov > > > > > > On Fri, Jan 24, 2014 at 11:54 AM, Jay Kreps wrote: > > > > > As mentioned in a previous email we are working on a re-implementation > of > > > the producer. I would like to use this email thread to discuss the > > details > > > of the public API and the configuration. I would love for us to be > > > incredibly picky about this public api now so it is as good as possible > > and > > > we don't need to break it in the future. > > > > > > The best way to get a feel for the API is actually to take a look at > the > > > javadoc, my hope is to get the api docs good enough so that it is > > > self-explanatory: > > > > > > > > > http://empathybox.com/kafka-javadoc/index.html?kafka/clients/producer/KafkaProducer.html > > > > > > Please take a look at this API and give me any thoughts you may have! > > > > > > It may also be reasonable to take a look at the configs: > > > > > > > > > http://empathybox.com/kafka-javadoc/kafka/clients/producer/ProducerConfig.html > > > > > > The actual code is posted here: > > > https://issues.apache.org/jira/browse/KAFKA-1227 > > > > > > A few questions or comments to kick things off: > > > 1. We need to make a decision on whether serialization of the user's > key > > > and value should be done by the user (with our api just taking byte[]) > or > > > if we should take an object and allow the user to configure a > Serializer > > > class which we instantiate via reflection. We take the later approach > in > > > the current producer, and I have carried this through to this > prototype. > > > The tradeoff I see is this: taking byte[] is actually simpler, the user > > can > > > directly do whatever serialization they like. The complication is > > actually > > > partitioning. Currently partitioning is done by a similar plug-in api > > > (Partitioner) which the user can implement and configure to override > how > > > partitions are assigned. If we take byte[] as input then we have no > > access > > > to the original object and partitioning MUST be done on the byte[]. > This > > is > > > fine for hash partitioning. However for various types of semantic > > > partitioning (range partitioning, or whatever) you would want access to > > the > > > original object. In the current approach a producer wh
Re: New Producer Public API
I love the callback in send() but I do not see how it helps in case of an error. Imagine the usecase: I want to write messages to the log so I can replay them to kafka later in case if async send failed. >From a brief look at the API I see that I'll get back RecordSend object (which is not true already - it was not send in case of error.) From that object I can get some info about the error and offset. How d I get the original message back so I can write it to the log? Can you please provide an example? ------ Andrey Yegorov On Fri, Jan 24, 2014 at 11:54 AM, Jay Kreps wrote: > As mentioned in a previous email we are working on a re-implementation of > the producer. I would like to use this email thread to discuss the details > of the public API and the configuration. I would love for us to be > incredibly picky about this public api now so it is as good as possible and > we don't need to break it in the future. > > The best way to get a feel for the API is actually to take a look at the > javadoc, my hope is to get the api docs good enough so that it is > self-explanatory: > > http://empathybox.com/kafka-javadoc/index.html?kafka/clients/producer/KafkaProducer.html > > Please take a look at this API and give me any thoughts you may have! > > It may also be reasonable to take a look at the configs: > > http://empathybox.com/kafka-javadoc/kafka/clients/producer/ProducerConfig.html > > The actual code is posted here: > https://issues.apache.org/jira/browse/KAFKA-1227 > > A few questions or comments to kick things off: > 1. We need to make a decision on whether serialization of the user's key > and value should be done by the user (with our api just taking byte[]) or > if we should take an object and allow the user to configure a Serializer > class which we instantiate via reflection. We take the later approach in > the current producer, and I have carried this through to this prototype. > The tradeoff I see is this: taking byte[] is actually simpler, the user can > directly do whatever serialization they like. The complication is actually > partitioning. Currently partitioning is done by a similar plug-in api > (Partitioner) which the user can implement and configure to override how > partitions are assigned. If we take byte[] as input then we have no access > to the original object and partitioning MUST be done on the byte[]. This is > fine for hash partitioning. However for various types of semantic > partitioning (range partitioning, or whatever) you would want access to the > original object. In the current approach a producer who wishes to send > byte[] they have serialized in their own code can configure the > BytesSerialization we supply which is just a "no op" serialization. > 2. We should obsess over naming and make sure each of the class names are > good. > 3. Jun has already pointed out that we need to include the topic and > partition in the response, which is absolutely right. I haven't done that > yet but that definitely needs to be there. > 4. Currently RecordSend.await will throw an exception if the request > failed. The intention here is that producer.send(message).await() exactly > simulates a synchronous call. Guozhang has noted that this is a little > annoying since the user must then catch exceptions. However if we remove > this then if the user doesn't check for errors they won't know one has > occurred, which I predict will be a common mistake. > 5. Perhaps there is more we could do to make the async callbacks and future > we give back intuitive and easy to program against? > > Some background info on implementation: > > At a high level the primary difference in this producer is that it removes > the distinction between the "sync" and "async" producer. Effectively all > requests are sent asynchronously but always return a future response object > that gives the offset as well as any error that may have occurred when the > request is complete. The batching that is done in the async producer only > today is done whenever possible now. This means that the sync producer, > under load, can get performance as good as the async producer (preliminary > results show the producer getting 1m messages/sec). This works similar to > group commit in databases but with respect to the actual network > transmission--any messages that arrive while a send is in progress are > batched together. It is also possible to encourage batching even under low > load to save server resources by introducing a delay on the send to allow > more messages to accumulate; this is done using the linger.ms config (this > is similar to Nagle's algorithm in TCP). > > This producer does all network communication asynchronously and in
Re: Mirroring datacenters without vpn
Thank you. Reference to KAFKA-1092 is very useful. Unfortunately it is not a part of release version 0.8 but I hope 0.8.1 will see the light soon enough. -- Andrey Yegorov On Fri, Jan 10, 2014 at 5:08 PM, Joel Koshy wrote: > > > > Ops proposed to set up mirror to work over open internet channel without > > secured vpn. Security of this particular data is not a concern and, as I > > understood, it will give us more bandwidth (unless we buy some extra > > hardware, lot's of internal details there). > > > > Is this configuration possible at all? Have anyone tried/using such > > configuration? I'd appreciate any feedback. > > > > Major source of confusion is how MirrorMaker/other producers would handle > > external names for the brokers. As I understand, producer connects to the > > broker in the configuration only to bootstrap (get list of all available > > brokers), and after that talks to the brokers received during > > bootstrapping. So local clients won't work (or will route to external > > interface) if I configure brokers to use external names. Remote clients > > won't work if internal names configured. > > Is there some reasonable way to configure kafka to support such scenario? > > Would this feature help in your case: > https://issues.apache.org/jira/browse/KAFKA-1092 > i.e., you can configure the broker to publish a separate hostname to > zookeeper which is what the producers should use when actually sending > data. So you would need to override the advertised.host.name and port > properties. > > > > > Also, should I run MirrorMaker in the same DC as central kafka cluster or > > multiple MirrorMakers in remote DCs? > > > > Any description of how it is setup in your case is helpful. Do you use > vpn > > between DCs? Where do you run MirrorMaker - in central dc or in remote > and > > why? > > We generally run the mirror-maker in the target data center. i.e., we > do a remote consume but local produce. If you have a flaky connection > between the two clusters the consumers may encounter hit session > expirations and rebalance and reduce the overall throughput. You can > also do local consumption and remote produce although we have not > tried that. In either case you will need to set a high socket buffer > to help amortize the high network latencies. > > Thanks, > > Joel > >
Mirroring datacenters without vpn
Hi, I am trying to figure out best deployment plan and configuration with ops to ship new version of our system that will use kafka. Multiple geo-distributed datacenters are a given, and we are planning to build central DC to aggregate the data. Ops proposed to set up mirror to work over open internet channel without secured vpn. Security of this particular data is not a concern and, as I understood, it will give us more bandwidth (unless we buy some extra hardware, lot's of internal details there). Is this configuration possible at all? Have anyone tried/using such configuration? I'd appreciate any feedback. Major source of confusion is how MirrorMaker/other producers would handle external names for the brokers. As I understand, producer connects to the broker in the configuration only to bootstrap (get list of all available brokers), and after that talks to the brokers received during bootstrapping. So local clients won't work (or will route to external interface) if I configure brokers to use external names. Remote clients won't work if internal names configured. Is there some reasonable way to configure kafka to support such scenario? So far I only tried opening ssh tunnel from devbox to remote machine and configuring local producer to talk to localhost, it failed as described above. Also, should I run MirrorMaker in the same DC as central kafka cluster or multiple MirrorMakers in remote DCs? Any description of how it is setup in your case is helpful. Do you use vpn between DCs? Where do you run MirrorMaker - in central dc or in remote and why? A lot of question, thank you beforehand for your answers. -- Andrey Yegorov