Re: compatibility: 0.8.1.1 broker, 0.8.2.2 producer

2015-12-23 Thread Andrey Yegorov
I am using 0.8.2.2 producer with 0.8.1.1 brokers without problems.
Version of scala matters if you are building with scala or some other
components that use scala.
Hope this helps.

--
Andrey Yegorov

On Wed, Dec 23, 2015 at 1:11 PM, Ewen Cheslack-Postava 
wrote:

> Shlomi,
>
> You should always upgrade brokers before clients. Newer versions of clients
> aren't guaranteed to work with older versions of brokers.
>
> For scala versions, there is no functional difference. Generally you only
> need to worry about the Scala version if you are using the old clients
> (which are in the core jar) and the rest of your app requires a specific
> Scala version.
>
> -Ewen
>
> On Wed, Dec 23, 2015 at 6:31 AM, Shlomi Hazan  wrote:
>
> > Hi All,
> >
> > Does someone has experience / encountered any issues using a 0.8.2.2
> > producer against a 0.8.1.1 broker (specifically kafka_2.9.2-0.8.1.1)?
> > I want to upgrade my existing producer (0.8.2-beta).
> > Also, is there a functional difference between the scala versions
> > (2.9.2,2.10,2.11)?
> >
> > Thanks,
> > Shlomi
> >
>
>
>
> --
> Thanks,
> Ewen
>


Re: KafkaException: Size of FileMessageSet has been truncated during write

2015-05-28 Thread Andrey Yegorov
Thank you!

--
Andrey Yegorov

On Wed, May 27, 2015 at 4:42 PM, Jiangjie Qin 
wrote:

> This should be just a message fetch failure. The socket was disconnected
> when broker was writing to it. There should not be data loss.
>
> Jiangjie (Becket) Qin
>
> On 5/27/15, 11:00 AM, "Andrey Yegorov"  wrote:
>
> >I've noticed a few exceptions in the logs like the one below, does it
> >indicate data loss? should I worry about this?
> >What is the possible reason for this to happen?
> >I am using kafka 0.8.1.1
> >
> >ERROR Closing socket for /xx.xxx.xxx.xxx because of error
> >(kafka.network.Processor)
> >
> >kafka.common.KafkaException: Size of FileMessageSet
> >/data/kafka/topic-name-11/14340499.log has been truncated
> >during write: old size 26935, new size 0
> >
> >at kafka.log.FileMessageSet.writeTo(FileMessageSet.scala:144)
> >
> >at kafka.api.PartitionDataSend.writeTo(FetchResponse.scala:69)
> >
> >at kafka.network.MultiSend.writeTo(Transmission.scala:102)
> >
> >at kafka.api.TopicDataSend.writeTo(FetchResponse.scala:124)
> >
> >at kafka.network.MultiSend.writeTo(Transmission.scala:102)
> >
> >at kafka.api.FetchResponseSend.writeTo(FetchResponse.scala:219)
> >
> >at kafka.network.Processor.write(SocketServer.scala:375)
> >
> >at kafka.network.Processor.run(SocketServer.scala:247)
> >
> >at java.lang.Thread.run(Thread.java:745)
> >
> >--
> >Andrey Yegorov
>
>


KafkaException: Size of FileMessageSet has been truncated during write

2015-05-27 Thread Andrey Yegorov
I've noticed a few exceptions in the logs like the one below, does it
indicate data loss? should I worry about this?
What is the possible reason for this to happen?
I am using kafka 0.8.1.1

ERROR Closing socket for /xx.xxx.xxx.xxx because of error
(kafka.network.Processor)

kafka.common.KafkaException: Size of FileMessageSet
/data/kafka/topic-name-11/14340499.log has been truncated
during write: old size 26935, new size 0

at kafka.log.FileMessageSet.writeTo(FileMessageSet.scala:144)

at kafka.api.PartitionDataSend.writeTo(FetchResponse.scala:69)

at kafka.network.MultiSend.writeTo(Transmission.scala:102)

at kafka.api.TopicDataSend.writeTo(FetchResponse.scala:124)

at kafka.network.MultiSend.writeTo(Transmission.scala:102)

at kafka.api.FetchResponseSend.writeTo(FetchResponse.scala:219)

at kafka.network.Processor.write(SocketServer.scala:375)

at kafka.network.Processor.run(SocketServer.scala:247)

at java.lang.Thread.run(Thread.java:745)

--
Andrey Yegorov


Re: Best way to replace a single broker

2015-05-14 Thread Andrey Yegorov
As I remember, you can simply stop old broker, start the new one with the
same broker id as the old one.
It will start syncing replicas from other brokers and eventually will get
all of them
After this is done (all replicas are in sync) you can trigger leader
election (or preferred replica election, whatever it is called) if it does
not happen automatically.

--
Andrey Yegorov

On Thu, May 14, 2015 at 11:12 AM, Rajiv Kurian  wrote:

> Hi all,
>
> Sometimes we need to replace a kafka broker because it turns out to be a
> bad instance. What is the best way of doing this?
>
> We have been using the kafka-reassign-partitions.sh  to migrate all topics
> to the new list of brokers which is the (old list + the new instance - the
> bad instance). Then we terminate the bad instance once we ensure that it is
> getting no traffic. But it seems like this causes an unnecessary amount of
> topic churn and is not equivalent to just moving the partitions the bad
> broker was responsible for to the new instance.
>
> Is there a better way of going about replacing a single instance (not
> adding capacity)? I'd ideally like to just be able to move the partitions
> from the old broker to the new one instead of a complete rebalance.
>
> Thanks!
>


Re: High Latency in Kafka

2015-02-10 Thread Andrey Yegorov
I am not familiar with logstash, but in custom log replay tool (used to
replay messages logged locally in case if e.g. kafka was not available and
useful in some other scenarios) I've seen it reaching 30,000 messages/sec
with avg message size of 4.5 kilobytes, all with regular production load on
kafka (6 brokers). At this rate sending 30G of logs should take about 4 min.

Tool has:
one thread to read messages and put into the queue.
5 (configurable) threads that read messages from the queue and send them to
kafka, with one producer per thread.
I am using new producer from kafka 0.8.2.-beta and async send.
I remember that I had to tune some parameters for kafka producer, increased
buffer sizes and something else.

HTH.



--
Andrey Yegorov

On Tue, Feb 10, 2015 at 5:54 AM, Vineet Mishra 
wrote:

> Hi Gwen,
>
> Well I have gone through this link while trying to setup my Logstash Kafka
> handler,
>
> https://github.com/joekiller/logstash-kafka
>
> I could achieve what I was looking for but the performance is badly
> affected while trying to write a big file of GB's.
> I guess there should be some way so as to parallelise the existing running
> process.
>
> Thanks!
>
> On Sun, Feb 8, 2015 at 8:06 PM, Gwen Shapira 
> wrote:
>
> > I'm wondering how much of the time is spent by Logstash reading and
> > processing the log vs. time spent sending data to Kafka. Also, I'm not
> > familiar with log.stash internals, perhaps it can be tuned to send the
> data
> > to Kafka in larger batches?
> >
> > At the moment its difficult to tell where is the slowdown. More
> information
> > about the breakdown of time will help.
> >
> > You can try Flume's SpoolingDirectory source with Kafka Channel or Sink
> and
> > see if you get improved performance out of other tools.
> >
> >
> > Gwen
> >
> > On Sun, Feb 8, 2015 at 12:06 AM, Vineet Mishra 
> > wrote:
> >
> > > Hi All,
> > >
> > > I am having some log files of around 30GB, I am trying to event process
> > > these logs by pushing them to Kafka. I could clearly see the throughput
> > > achieved while publishing these event to Kafka is quiet slow.
> > >
> > > So as mentioned for the single log file of 30GB, the Logstash is
> > > continuously emitting to Kafka and it is running from more than 2 days
> > but
> > > still it has processed just 60% of the log data. I was looking out for
> a
> > > way to increase the efficiency of the publishing the event to kafka as
> > with
> > > this rate of data ingestion I don't think it will be a good option to
> > move
> > > ahead.
> > >
> > > Looking out for performance improvisation for the same.
> > >
> > > Experts advise required!
> > >
> > > Thanks!
> > >
> >
>


mirrormaker's configuration to minimize/prevent data loss

2014-06-16 Thread Andrey Yegorov
As I read, consumer and producer in mirrormaker are independent and use
queue to communicate. Therefore consumers keep on consuming/commiting
offsets to zk even if producer is failing. Is it still the way it works in
0.8.0, any plans to change?

Is there any way to minimize data loss in this case? I am ok with not using
async mode on producer, but will it help? Can I configure mirrormaker to
exit immediately if producer fails? If this should be a responsibility of
an external process, what should I monitor log for to kill the mirroring
process in case of error?

--
Andrey Yegorov


estimating log.retention.bytes

2014-04-22 Thread Andrey Yegorov
Hi,

Please help me understand how one should estimate upper limit for
log.retention.bytes in this situation.

Let's say kafka cluster has 3 machines (broker per machine) with 15TB disk
space per machine.
Cluster will have one topic with 30 partitions and replication factor 2.

My thinking is:
with replication, I'll have 60 'partitions' spread across 3 machines hence
20 per machine.
Max space I can allocate per partition is 15TB/20 = 768GB per partition.

Am I on the right track?

--
Andrey Yegorov


Re: New Producer Public API

2014-01-24 Thread Andrey Yegorov
So for each message that I need to send asynchronously I have to create a
new instance of callback and hold on to the message?
This looks nice in theory but in case of few thousands of request/sec this
could use up too much extra memory and push too much to garbage collector,
especially in case connection breaks for a few seconds and all this piles
up as result of retry logic.

I guess I can pool callbacks and do something like setMessage() on callback
but this looks like an attempt to workaround limitations of the API.
I'd prefer to create one instance of callback per app or per thread and
reuse it. Since kafka producer already have messages in the batch and knows
the batch that failed, it can pass the message to the onError() callback.

Am I over-thinking this?


--
Andrey Yegorov


On Fri, Jan 24, 2014 at 1:15 PM, Jay Kreps  wrote:

> If I understand your use case I think usage would be something like
>
>   producer.send(message, new Callback() {
> public void onCompletion(RecordSend send) {
>if(send.hasError())
>   log.write(message);
> }
>   });
>
> Reasonable?
>
> In other words you can include references to any variables you like in the
> callback. We could provide the message for you but that would require us to
> hang on to the message object for the duration of the call which has memory
> implications so I think it is better for people to only do this if they
> want to use it.
>
> -Jay
>
>
> On Fri, Jan 24, 2014 at 1:05 PM, Andrey Yegorov  >wrote:
>
> > I love the callback in send() but I do not see how it helps in case of an
> > error.
> >
> > Imagine the usecase: I want to write messages to the log so I can replay
> > them to kafka later in case if async send failed.
> > From a brief look at the API I see that I'll get back RecordSend object
> > (which is not true already - it was not send in case of error.) From that
> > object I can get some info about the error and offset. How d I get the
> > original message back so I can write it to the log? Can you please
> provide
> > an example?
> >
> >
> >
> > --
> > Andrey Yegorov
> >
> >
> > On Fri, Jan 24, 2014 at 11:54 AM, Jay Kreps  wrote:
> >
> > > As mentioned in a previous email we are working on a re-implementation
> of
> > > the producer. I would like to use this email thread to discuss the
> > details
> > > of the public API and the configuration. I would love for us to be
> > > incredibly picky about this public api now so it is as good as possible
> > and
> > > we don't need to break it in the future.
> > >
> > > The best way to get a feel for the API is actually to take a look at
> the
> > > javadoc, my hope is to get the api docs good enough so that it is
> > > self-explanatory:
> > >
> > >
> >
> http://empathybox.com/kafka-javadoc/index.html?kafka/clients/producer/KafkaProducer.html
> > >
> > > Please take a look at this API and give me any thoughts you may have!
> > >
> > > It may also be reasonable to take a look at the configs:
> > >
> > >
> >
> http://empathybox.com/kafka-javadoc/kafka/clients/producer/ProducerConfig.html
> > >
> > > The actual code is posted here:
> > > https://issues.apache.org/jira/browse/KAFKA-1227
> > >
> > > A few questions or comments to kick things off:
> > > 1. We need to make a decision on whether serialization of the user's
> key
> > > and value should be done by the user (with our api just taking byte[])
> or
> > > if we should take an object and allow the user to configure a
> Serializer
> > > class which we instantiate via reflection. We take the later approach
> in
> > > the current producer, and I have carried this through to this
> prototype.
> > > The tradeoff I see is this: taking byte[] is actually simpler, the user
> > can
> > > directly do whatever serialization they like. The complication is
> > actually
> > > partitioning. Currently partitioning is done by a similar plug-in api
> > > (Partitioner) which the user can implement and configure to override
> how
> > > partitions are assigned. If we take byte[] as input then we have no
> > access
> > > to the original object and partitioning MUST be done on the byte[].
> This
> > is
> > > fine for hash partitioning. However for various types of semantic
> > > partitioning (range partitioning, or whatever) you would want access to
> > the
> > > original object. In the current approach a producer wh

Re: New Producer Public API

2014-01-24 Thread Andrey Yegorov
I love the callback in send() but I do not see how it helps in case of an
error.

Imagine the usecase: I want to write messages to the log so I can replay
them to kafka later in case if async send failed.
>From a brief look at the API I see that I'll get back RecordSend object
(which is not true already - it was not send in case of error.) From that
object I can get some info about the error and offset. How d I get the
original message back so I can write it to the log? Can you please provide
an example?



------
Andrey Yegorov


On Fri, Jan 24, 2014 at 11:54 AM, Jay Kreps  wrote:

> As mentioned in a previous email we are working on a re-implementation of
> the producer. I would like to use this email thread to discuss the details
> of the public API and the configuration. I would love for us to be
> incredibly picky about this public api now so it is as good as possible and
> we don't need to break it in the future.
>
> The best way to get a feel for the API is actually to take a look at the
> javadoc, my hope is to get the api docs good enough so that it is
> self-explanatory:
>
> http://empathybox.com/kafka-javadoc/index.html?kafka/clients/producer/KafkaProducer.html
>
> Please take a look at this API and give me any thoughts you may have!
>
> It may also be reasonable to take a look at the configs:
>
> http://empathybox.com/kafka-javadoc/kafka/clients/producer/ProducerConfig.html
>
> The actual code is posted here:
> https://issues.apache.org/jira/browse/KAFKA-1227
>
> A few questions or comments to kick things off:
> 1. We need to make a decision on whether serialization of the user's key
> and value should be done by the user (with our api just taking byte[]) or
> if we should take an object and allow the user to configure a Serializer
> class which we instantiate via reflection. We take the later approach in
> the current producer, and I have carried this through to this prototype.
> The tradeoff I see is this: taking byte[] is actually simpler, the user can
> directly do whatever serialization they like. The complication is actually
> partitioning. Currently partitioning is done by a similar plug-in api
> (Partitioner) which the user can implement and configure to override how
> partitions are assigned. If we take byte[] as input then we have no access
> to the original object and partitioning MUST be done on the byte[]. This is
> fine for hash partitioning. However for various types of semantic
> partitioning (range partitioning, or whatever) you would want access to the
> original object. In the current approach a producer who wishes to send
> byte[] they have serialized in their own code can configure the
> BytesSerialization we supply which is just a "no op" serialization.
> 2. We should obsess over naming and make sure each of the class names are
> good.
> 3. Jun has already pointed out that we need to include the topic and
> partition in the response, which is absolutely right. I haven't done that
> yet but that definitely needs to be there.
> 4. Currently RecordSend.await will throw an exception if the request
> failed. The intention here is that producer.send(message).await() exactly
> simulates a synchronous call. Guozhang has noted that this is a little
> annoying since the user must then catch exceptions. However if we remove
> this then if the user doesn't check for errors they won't know one has
> occurred, which I predict will be a common mistake.
> 5. Perhaps there is more we could do to make the async callbacks and future
> we give back intuitive and easy to program against?
>
> Some background info on implementation:
>
> At a high level the primary difference in this producer is that it removes
> the distinction between the "sync" and "async" producer. Effectively all
> requests are sent asynchronously but always return a future response object
> that gives the offset as well as any error that may have occurred when the
> request is complete. The batching that is done in the async producer only
> today is done whenever possible now. This means that the sync producer,
> under load, can get performance as good as the async producer (preliminary
> results show the producer getting 1m messages/sec). This works similar to
> group commit in databases but with respect to the actual network
> transmission--any messages that arrive while a send is in progress are
> batched together. It is also possible to encourage batching even under low
> load to save server resources by introducing a delay on the send to allow
> more messages to accumulate; this is done using the linger.ms config (this
> is similar to Nagle's algorithm in TCP).
>
> This producer does all network communication asynchronously and in 

Re: Mirroring datacenters without vpn

2014-01-16 Thread Andrey Yegorov
Thank you.

Reference to KAFKA-1092 is very useful. Unfortunately it is not a part of
release version 0.8 but I hope 0.8.1 will see the light soon enough.

--
Andrey Yegorov


On Fri, Jan 10, 2014 at 5:08 PM, Joel Koshy  wrote:

> >
> > Ops proposed to set up mirror to work over open internet channel without
> > secured vpn. Security of this particular data is not a concern and, as I
> > understood, it will give us more bandwidth (unless we buy some extra
> > hardware, lot's of internal details there).
> >
> > Is this configuration possible at all? Have anyone tried/using such
> > configuration? I'd appreciate any feedback.
> >
> > Major source of confusion is how MirrorMaker/other producers would handle
> > external names for the brokers. As I understand, producer connects to the
> > broker in the configuration only to bootstrap (get list of all available
> > brokers), and after that talks to the brokers received during
> > bootstrapping. So local clients won't work (or will route to external
> > interface) if I configure brokers to use external names. Remote clients
> > won't work if internal names configured.
> > Is there some reasonable way to configure kafka to support such scenario?
>
> Would this feature help in your case:
> https://issues.apache.org/jira/browse/KAFKA-1092
> i.e., you can configure the broker to publish a separate hostname to
> zookeeper which is what the producers should use when actually sending
> data. So you would need to override the advertised.host.name and port
> properties.
>
> >
> > Also, should I run MirrorMaker in the same DC as central kafka cluster or
> > multiple MirrorMakers in remote DCs?
> >
> > Any description of how it is setup in your case is helpful. Do you use
> vpn
> > between DCs? Where do you run MirrorMaker - in central dc or in remote
> and
> > why?
>
> We generally run the mirror-maker in the target data center. i.e., we
> do a remote consume but local produce. If you have a flaky connection
> between the two clusters the consumers may encounter hit session
> expirations and rebalance and reduce the overall throughput. You can
> also do local consumption and remote produce although we have not
> tried that. In either case you will need to set a high socket buffer
> to help amortize the high network latencies.
>
> Thanks,
>
> Joel
>
>


Mirroring datacenters without vpn

2014-01-10 Thread Andrey Yegorov
Hi,

I am trying to figure out best deployment plan and configuration with ops
to ship new version of our system that will use kafka. Multiple
geo-distributed datacenters are a given, and we are planning to build
central DC to aggregate the data.

Ops proposed to set up mirror to work over open internet channel without
secured vpn. Security of this particular data is not a concern and, as I
understood, it will give us more bandwidth (unless we buy some extra
hardware, lot's of internal details there).

Is this configuration possible at all? Have anyone tried/using such
configuration? I'd appreciate any feedback.

Major source of confusion is how MirrorMaker/other producers would handle
external names for the brokers. As I understand, producer connects to the
broker in the configuration only to bootstrap (get list of all available
brokers), and after that talks to the brokers received during
bootstrapping. So local clients won't work (or will route to external
interface) if I configure brokers to use external names. Remote clients
won't work if internal names configured.
Is there some reasonable way to configure kafka to support such scenario?
So far I only tried opening ssh tunnel from devbox to remote machine and
configuring local producer to talk to localhost, it failed as described
above.


Also, should I run MirrorMaker in the same DC as central kafka cluster or
multiple MirrorMakers in remote DCs?

Any description of how it is setup in your case is helpful. Do you use vpn
between DCs? Where do you run MirrorMaker - in central dc or in remote and
why?

A lot of question, thank you beforehand for your answers.

--
Andrey Yegorov