Re: Kafka Streams scaling questions

2016-03-22 Thread Kishore Senji
), giving them independent parallelism over the same data.
>
> In theory there is another knob to consider. Consumers (actually the
> leader consumer) can control which partitions they get assigned. KStreams
> already uses this feature to do things like create stand by replicas, but I
> don’t think (but I may be wrong) this helps you with your problem directly.
>
> All the best
>
> B
>
> > On 21 Mar 2016, at 03:12, Kishore Senji  wrote:
> >
> > I will scale back the question to get some replies :)
> >
> > Suppose the use-case is to build a monitoring platform -
> > For log aggregation from thousands of nodes, I believe that a Kafka topic
> > should be partitioned n-ways and the data should be sprayed in a
> > round-robin fashion to get a good even distribution of data (because we
> > don't know upfront how the data is sliced by semantically and we don't
> know
> > whether the key for semantic partitioning gives a even distribution of
> > data). Later in stream processing, the appropriate group-bys would be
> done
> > on the same source of data to support various ways of slicing.
> >
> >
> > http://kafka.apache.org/documentation.html#design_loadbalancing - "This
> can
> > be done at random, implementing a kind of random load balancing, or it
> can
> > be done by some semantic partitioning function"
> > http://kafka.apache.org/documentation.html#basic_ops_modify_topic - "Be
> > aware that one use case for partitions is to semantically partition data,
> > and adding partitions doesn't change the partitioning of existing data so
> > this may disturb consumers if they rely on that partition"
> >
> > The above docs caution the use of semantic partitioning as it can lead to
> > uneven distribution (hotspots) if the semantic key does not give even
> > distribution, plus on a flex up of partitions the data would now be in
> two
> > partitions. For these reasons, I strongly believe the data should be
> pushed
> > to Kafka in a round-robin fashion and later a Stream processing framework
> > should use the appropriate group-bys (this also gives us the flexibility
> to
> > slice in different ways as well at runtime)
> >
> > KStreams let us do stream processing on a partition of data. So to do
> > windowed aggregation, the data for the same key should be in the same
> > partition. This means to use KStreams we have to use Semantic
> partitioning,
> > which will have the above issues as shown in Kafka docs. So my question
> is -
> >
> > If we want to use KStreams how should we deal with "Load balancing" (it
> can
> > happen that the semantic partitioning can overload a single partition and
> > so Kafka partition will be overloaded as well as the KStream task)  and
> > "Flex up of partitions" (more than one partition will have data for a
> given
> > key and so the windowed aggregations result in incorrect data)?
> >
> > Thanks,
> > Kishore.
> >
> > On Thu, Mar 17, 2016 at 4:28 PM, Kishore Senji  wrote:
> >
> >> Hi All,
> >>
> >> I read through the doc on KStreams here:
> >>
> http://www.confluent.io/blog/introducing-kafka-streams-stream-processing-made-simple
> >> <
> http://www.google.com/url?q=http%3A%2F%2Fwww.confluent.io%2Fblog%2Fintroducing-kafka-streams-stream-processing-made-simple&sa=D&sntz=1&usg=AFQjCNGJu-bDlzStDwxPDIOKpG10Ts9xvA
> >
> >>
> >> I was wondering about how an use-case that I have be solved with
> KStream?
> >>
> >> Use-case: Logs are obtained from a service pool. It contains many nodes.
> >> We want to alert if a particular consumer (identified by consumer id) is
> >> making calls to the service more than X number of times in the last 1
> min.
> >> The data is available in logs similar to access logs for example. The
> >> window is a sliding window. We have to check back 60s from the current
> >> event and see if in total (with the current event) it would exceed the
> >> threshold. Logs are pushed to Kafka using a random partitioner whose
> range
> >> is [1 to n] where n is the total number of partitions.
> >>
> >> One way of achieving this is to push data in to the first Kafka topic
> >> (using random partitioning) and then a set of KStream tasks re-shuffling
> >> the data on consumer_id in to the second topic. The next set of KStream
> >> tasks operate on the second topic (1 task/partition) and do the
> >> aggregation. If this is an acceptable solution, here are my questions on

Kafka Streams scaling questions

2016-03-19 Thread Kishore Senji
Hi All,

I read through the doc on KStreams here:
http://www.confluent.io/blog/introducing-kafka-streams-stream-processing-made-simple


I was wondering about how an use-case that I have be solved with KStream?

Use-case: Logs are obtained from a service pool. It contains many nodes. We
want to alert if a particular consumer (identified by consumer id) is
making calls to the service more than X number of times in the last 1 min.
The data is available in logs similar to access logs for example. The
window is a sliding window. We have to check back 60s from the current
event and see if in total (with the current event) it would exceed the
threshold. Logs are pushed to Kafka using a random partitioner whose range
is [1 to n] where n is the total number of partitions.

One way of achieving this is to push data in to the first Kafka topic
(using random partitioning) and then a set of KStream tasks re-shuffling
the data on consumer_id in to the second topic. The next set of KStream
tasks operate on the second topic (1 task/partition) and do the
aggregation. If this is an acceptable solution, here are my questions on
scaling.


   - I can see that the second topic is prone to hotspots. If we get
   billions of requests for a given consumer_id and only few hundreds for
   another consumer_id, the second kafka topic partitions will become hotspots
   (and the partition getting lot of volume of logs can suffocate other
   partitions on the same broker). If we try to create more partitions and
   probably isolate the partition getting lot of volume, this wastes resources.
   - The max parallelism that we can get for a KStream task is the number
   of partitions - this may work for a single stream. How would this work for
   a multi-tenant stream processing where people want to write multiple stream
   jobs on the same set of data? If the parallelism does not work, they would
   have to copy and group the data in to another topic with more partitions. I
   think like we need two knobs one for scaling Kafka (number of partitions)
   and one for scaling stream. It sounds like with KStream it is only one knob
   for both.
   - How would we deal with organic growth of data? Let us say the
   partitions we chose for the second topic (where it is grouped by
   consumer_id) is not enough to deal with organic growth in volume. If we
   increase partitions, for a given consumer some data could be in one
   partition before the flex up and data could end up in a different topic
   after flex up. Since the KStream jobs are unique per partition and are
   stateless across them, the aggregated result would be incorrect, unless we
   have only one job to read all the data in which case it will become a
   bottleneck.

In storm (or any other streaming engine), the way to solve it would be to
have only one topic (partitioned n-ways) and data pushed in to using a
random partitioner (so no hotspots and scaling issues). We will have n
Spouts reading data from those partitions and we can then have m bolts
getting the data using fields grouping on consumer_id. Since all the data
for a given consumer_id ends up in a bolt we will do the sliding window and
the alert.

If we solve it the Storm way in KStream, we would only have one topic
(partitioned
n-ways) and data pushed in to using a random partitioner (so no hotspots
and scaling issues). But we can only have one KStream task running reading
all the data and doing the windowing and aggregation. This will become a
bottleneck for scaling.

So it sounds like KStreams will either have "hotspots" in kafka topic (as
each partition needs to have the data that the KStream task needs and work
independently) or scaling issues in the KStream task for "aggregation".

How would one solve this kind of problems with KStream?






Re: java.lang.NoClassDefFoundError: Could not initialize class com.yammer.metrics.Metrics

2015-09-02 Thread Kishore Senji
It is a NoClassDefFoundError so the clinit of Metrics class is failing.
Please look for surrounding stack traces, there must be a root cause for
why the class initialization failed.

On Wed, Sep 2, 2015 at 1:05 PM, Vadim Keylis  wrote:

> I suddenly started getting this error using the code that was perfectly
> working a day before. The library containing yammer metrics exists and part
> of mvn build. Please help how to resolve the issue or disable
> yammer.metrics.
>
> Thanks in advance
>
> Exception in thread "Thread-3" java.lang.NoClassDefFoundError: Could not
> initialize class com.yammer.metrics.Metrics
> at
> kafka.metrics.KafkaMetricsGroup$class.newTimer(KafkaMetricsGroup.scala:52)
> at
>
> kafka.consumer.FetchRequestAndResponseMetrics.newTimer(FetchRequestAndResponseStats.scala:25)
> at
>
> kafka.consumer.FetchRequestAndResponseMetrics.(FetchRequestAndResponseStats.scala:26)
> at
>
> kafka.consumer.FetchRequestAndResponseStats.(FetchRequestAndResponseStats.scala:37)
> at
>
> kafka.consumer.FetchRequestAndResponseStatsRegistry$$anonfun$2.apply(FetchRequestAndResponseStats.scala:50)
> at
>
> kafka.consumer.FetchRequestAndResponseStatsRegistry$$anonfun$2.apply(FetchRequestAndResponseStats.scala:50)
> at kafka.utils.Pool.getAndMaybePut(Pool.scala:61)
> at
>
> kafka.consumer.FetchRequestAndResponseStatsRegistry$.getFetchRequestAndResponseStats(FetchRequestAndResponseStats.scala:54)
> at kafka.consumer.SimpleConsumer.(SimpleConsumer.scala:39)
> at kafka.javaapi.consumer.SimpleConsumer.(SimpleConsumer.scala:34)
> at
>
> com.tagged.etlhub.client.kafka.connector.driver.LeaderDiscovery.connectToLeader(LeaderDiscovery.java:90)
> at
>
> com.tagged.etlhub.client.kafka.connector.driver.LeaderDiscovery.findLeader(LeaderDiscovery.java:50)
> at
>
> com.tagged.etlhub.client.kafka.connector.driver.Connector.connect(Connector.java:208)
> at
>
> com.tagged.etlhub.client.kafka.connector.driver.Connector.createConnector(Connector.java:65)
> at
>
> com.tagged.etlhub.client.kafka.connector.driver.ConnectorFactory.getConnector(ConnectorFactory.java:11)
> at
>
> com.tagged.etlhub.client.kafka.message.TopicMessages.getConnector(TopicMessages.java:156)
> at
>
> com.tagged.etlhub.client.kafka.message.TopicMessages.connect(TopicMessages.java:45)
> at
>
> com.tagged.etlhub.client.StreamThreadPool.startStream(StreamThreadPool.java:109)
> at
>
> com.tagged.etlhub.client.StreamThreadPool.access$0(StreamThreadPool.java:93)
> at
> com.tagged.etlhub.client.StreamThreadPool$1.run(StreamThreadPool.java:73)
> at java.lang.Thread.run(Unknown Source)
>


Re: 0.8.2 producer and single message requests

2015-09-01 Thread Kishore Senji
Yes, this will be a problem if you are providing batching for your REST
service on top of Kafka and have to acknowledge to your client only when
all the callbacks for individual sends are called.

Here is one implementation I have done:
https://github.com/ksenji/KafkaBatchProducer/blob/master/src/main/java/kafka/samples/KafkaBatchProducer.java
which
takes a list of messages to send, with one callback which gets called when
all the messages onCompletion() are called. Take a look at the
BatchCallbackHelper and also the sample Producer which uses this batch
producer.



On Tue, Sep 1, 2015 at 1:42 PM Gwen Shapira  wrote:

> We've seen a lot of requests for this, and I don't think there is any
> general objection.
>
> If you want to discuss concrete API suggestions, perhaps the dev mailing
> list is the right place for the discussion.
>
> Gwen
>
> On Tue, Sep 1, 2015 at 11:25 AM, Neelesh  wrote:
>
> > Here's what I think :
> > # The new producer generates Java futures  , we all know the problems
> with
> > java futures (cannot compose, blocking, does not work well with other JVM
> > languages /libraries - RxJava/RxScala etc)
> >
> > # or we can pass in a callback - works okay when we are dealing with
> single
> > messages but with a batch of messages pushes lot of book keeping on the
> > caller. The client now will have to deal with coordinating one callback
> per
> > producer.send(), or deal with a single stateful callback and handle
> > synchronization across all the state generated by callbacks. Granted, we
> > can simplify the model because we know there is a single i/o thread that
> > runs the callbacks, but then, we are relying on an implementation detail.
> > Does not feel very clean
> >
> > Overall, when I have to send a bunch of messages synchronously, the new
> > producer does not give me a good way to model it. It feels like the new
> > producer is more prescriptive.
> >
> > Now if the producer had just one more API that took a list of messages
> and
> > handed me back a callback for that list, things would've been much
> simpler.
> >
> >
> > On Mon, Aug 17, 2015 at 10:41 PM, Kishore Senji 
> wrote:
> >
> > > If linger.ms is 0, batching does not add to the latency. It will
> > actually
> > > improve throughput without affecting latency. Enabling batching does
> not
> > > mean it will wait for the batch to be full. Whatever gets filled during
> > the
> > > previous batch send will be sent in the current batch even if it count
> is
> > > less than batch.size
> > >
> > > You do not have to work with Future. With callback you will get Async
> > model
> > > essentially (and you can make use of it if you webservice is using
> > Servlet
> > > 3.0)
> > >
> > >
> > > producer.send(record, new AsyncCallback(request, response));
> > >
> > >
> > > static final class AsyncCallback implements Callback {
> > >
> > > HttpServletRequest request;
> > > HttpServletResponse response;
> > >
> > > void onCompletion(RecordMetadata metadata, java.lang.Exception
> > exception) {
> > >
> > >   // Check exception and send appropriate response
> > >
> > > }
> > > }
> > >
> > > On Mon, Aug 17, 2015 at 10:49 AM Neelesh  wrote:
> > >
> > > > Thanks for the answers. Indeed, the callback model is the same
> > regardless
> > > > of batching. But for a synchronous web service, batching creates a
> > > latency
> > > > issue. linger.ms is by default set to zero. Also, java futures are
> > hard
> > > > to
> > > > work with compared to Scala futures.  The current API also returns
> one
> > > > future per single record send (correct me if I missed another
> variant)
> > > that
> > > > leaves the client code to deal with hundreds of futures and/or
> > callbacks.
> > > > May I'm missing something very obvious in the new API, but this model
> > and
> > > > the fact that the scala APIs are going away makes writing an
> ingestion
> > > > service in front of Kafka  more involved than the 0.8.1 API.
> > > >
> > > > On Sun, Aug 16, 2015 at 12:02 AM, Kishore Senji 
> > > wrote:
> > > >
> > > > > Adding to what Gwen already mentioned -
> > > > >
> > > > > The programming model for the Producer is send() with an optional
> > > > callback
> > > > > and we ge

Re: KafkaProducer recovery/restart if broker dies

2015-08-28 Thread Kishore Senji
I faced the exact same problem recently. The JIRA is filed here:
https://issues.apache.org/jira/browse/KAFKA-2459

Please have reconnect.backoff.ms to be greater than retry.backoff.ms (like
1sec more). I think the metadata expired and when it is trying to fetch the
new metadata for this producer instance, it is trying to connect to the
broker that is down but the reconnect.backoff.ms would not effect this
because the retry.backoff.ms is equal to reconnect.backoff.ms and so it
would already be expired and the node is not blacklisted and so the it
would loop to fetch metadata on the same broker.

On Fri, Aug 28, 2015 at 8:39 AM, Helleren, Erik 
wrote:

> Hi Alexey,
>
> So, a couple things.  Your config seems to have some issues that would
> result in long wait times,
>
> You should try this configuration and see if you still have the issue:
>
> acks=1
> compression.type=snappy
> retries=3 #Retry a few times to make it so they don¹t get dropped when a
> broker fails, at least not right away
> batch_size= 32768
> buffer.memory=67108864
> linger.ms=1500
> metadata.fetch.timeout.ms=6 # Default to give zookeeper a lot of time
> to return the metadata
> timeout.ms= 1 #give kafka some time to respond before you consider it
> a failure.
> retry.backoff.ms=100 # Default. Keep this small so the producer fails
> quickly enough times to know a broker is down
> reconnect.backoff.ms=10 # Default. Same reason as above
>
>
>
> Hopefully the explanations of the changes make sense.  At the very least,
> I would try changing retires up to 2 first.  Also, what is your topic¹s
> configuration?
> -Erik
>
> On 8/28/15, 8:36 AM, "Alexey Sverdelov" 
> wrote:
>
> >Hi everyone,
> >
> >we run load tests against our web application (about 50K req/sec) and
> >every
> >time a kafka broker dies (also controlled shutdown), the producer tries to
> >connect with the dead broker for about 10-15 minutes. For this time the
> >application monitoring shows a constant error rate (about of 1/10 all
> >kafka
> >writes fail).
> >
> >Our spec:
> >
> >* web-app in tomcat writes to kafka
> >* 3 node kafka cluster
> >* kafka 0.8.2
> >* new producer
> >
> >The producer config:
> >
> >acks=1
> >compression.type=snappy
> >retries=0
> >batch_size=32768
> >buffer.memory=67108864
> >linger.ms=1500
> >metadata.fetch.timeout.ms=5000
> >timeout.ms= 1500
> >retry.backoff.ms=1
> >reconnect.backoff.ms=1
> >
> >I can poll our Zookeeper and check if all brokers are alive, but I think
> >KafkaProducer checks it already.
> >
> >Alexey
>
>


Re: Mirror a partition of a topic

2015-08-25 Thread Kishore Senji
Flume could be an option with an Interceptor although the throughput could
be less compared to Mirror Maker with compression and shallow iterator
enabled.
On Tue, Aug 25, 2015 at 10:28 PM tao xiao  wrote:

> In the trunk code mirror maker provides the ability to filter out messages
> on demand by supplying a message handler.
>
>
> https://github.com/apache/kafka/blob/trunk/core/src/main/scala/kafka/tools/MirrorMaker.scala#L443
>
> On Wed, 26 Aug 2015 at 11:35 Binh Nguyen Van  wrote:
>
> > Hi all,
> >
> > Is there any tool that allows to mirror a partition of a topic?
> > I looked at the mirror maker but it is used to mirror the whole topic
> > and I don't see any option that allow me to configure partitions.
> >
> > I want to mirror our live data to staging environment but my staging
> > environment
> > can't handle the whole topic so I am looking for a tool that I can use to
> > mirror specific
> > partitions of a topic only.
> >
> > Thanks
> > -Binh
> >
>


Re: bootstrap.servers for the new Producer

2015-08-22 Thread Kishore Senji
Yes that is exactly the issue. I did not notice the close(key) is called
from the poll() method as well. I was observing this even when I run my app
(not in debug). I noticed it was taking 1sec (with a conditional debug) and
like you mentioned the default time for reconnect.backoff.ms is 10ms and so
it was already elapsed.

If I increase reconnect.backoff.ms to 5sec, then it works to pick another
node.

[image: connect.time.PNG]


On Sat, Aug 22, 2015 at 3:56 PM Ewen Cheslack-Postava 
wrote:

> You're just seeing that exception in the debugger, not the log, right?
>
> ConnectException is an IOException, so it should be caught by this block
>
> https://github.com/apache/kafka/blob/0.8.2.1/clients/src/main/java/org/apache/kafka/common/network/Selector.java#L271
> , logged, and then the SelectionKey should be closed. Part of the close
> process adds it to the list of disconnections if there's an associated
> transmission object (which there should be, it is set up in the connect()
> call). This list is then processed by NetworkClient in
> handleDisconnections, which is invoked after the poll() call. That, in
> turn, marks the node as disconnected via the ClusterConnectionStates
> object. So it should still be getting marked as disconnected.
>
> However, maybe the issues is in the way we handle the blackout period. The
> corresponding setting is ProducerConfig.RECONNECT_BACKOFF_MS_CONFIG (
> reconnect.backoff.ms). The default value is 10ms. However, the way this is
> currently handled, we mark the time when we *start* the connection attempt.
> If it takes more than 10ms for the connection attempt to fail, then the
> blackout period wouldn't actually apply since the the period would have
> already elapsed. If that happened, then leastLoadedNode would indeed
> continue to select the same node repeatedly.
>
> Can you tell from the logs how long the connection attempts are taking? You
> could try increasing the backoff time, although that has broader impact
> that could be negative (e.g., if a broker is temporarily down and you
> aren't stuck in this metadata fetch state, it increases the amount of time
> before you can start producing to that broker again). However, if you can't
> verify that this is the problem from the logs, it might at least help to
> verify in a test environment.
>
> I've filed https://issues.apache.org/jira/browse/KAFKA-2459 for that
> issue.
>
> -Ewen
>
>
> On Fri, Aug 21, 2015 at 11:42 PM, Kishore Senji  wrote:
>
> > Thank you Ewen. This behavior is something that I'm observing. I see in
> the
> > logs continuous Connect failures to the dead broker.
> >
> > The important thing here is I'm starting a brand new instance of the
> > Producer after a broker is down (so no prior metadata), with that down
> > broker also as part of the bootstrap list. With the brand new instance
> all
> > requests to send are blocked until the metadata is fetched. The metadata
> > fetching is where I'm seeing the issue. Currently the code randomly
> picks a
> > node to fetch the metadata and if it happens to the down node, I see the
> > connect failure and then it tries to fetch metadata again from the same
> > node (I do not see it going to black out because the status is always
> > "CONNECTING" and other nodes are not yet connected). This goes on forever
> > until I either bring the broker up or kill & restart the Producer and
> > on-restart if it picks a different node then it works to get the
> metadata.
> > Once it gets the metadata, it is fine as like you described above, it
> > updates the Cluster nodes.
> >
> > This can be a problem because we have to give a standard set of bootstrap
> > brokers across multiple producers whose lifecycle is not in control. The
> > producers can go down and a new instance can be brought up just like the
> > brokers where we expect a broker going down (so we do more partitioning
> and
> > replications)
> >
> > I get this exception -
> >
> > java.net.ConnectException: Connection refused: no further information
> > at sun.nio.ch.SocketChannelImpl.checkConnect(Native Method)
> ~[na:1.7.0_67]
> > at sun.nio.ch.SocketChannelImpl.finishConnect(Unknown Source)
> > ~[na:1.7.0_67]
> > at org.apache.kafka.common.network.Selector.poll(Selector.java:238)
> > ~[kafka-clients-0.8.2.1.jar:na]
> > at org.apache.kafka.clients.NetworkClient.poll(NetworkClient.java:192)
> > [kafka-clients-0.8.2.1.jar:na]
> > at
> org.apache.kafka.clients.producer.internals.Sender.run(Sender.java:191)
> > [kafka-clients-0.8.2.1.jar:na]
> > at
> org.apache.kafka.clients.producer.internals.Sender.run(

Re: bootstrap.servers for the new Producer

2015-08-21 Thread Kishore Senji
Thank you Ewen. This behavior is something that I'm observing. I see in the
logs continuous Connect failures to the dead broker.

The important thing here is I'm starting a brand new instance of the
Producer after a broker is down (so no prior metadata), with that down
broker also as part of the bootstrap list. With the brand new instance all
requests to send are blocked until the metadata is fetched. The metadata
fetching is where I'm seeing the issue. Currently the code randomly picks a
node to fetch the metadata and if it happens to the down node, I see the
connect failure and then it tries to fetch metadata again from the same
node (I do not see it going to black out because the status is always
"CONNECTING" and other nodes are not yet connected). This goes on forever
until I either bring the broker up or kill & restart the Producer and
on-restart if it picks a different node then it works to get the metadata.
Once it gets the metadata, it is fine as like you described above, it
updates the Cluster nodes.

This can be a problem because we have to give a standard set of bootstrap
brokers across multiple producers whose lifecycle is not in control. The
producers can go down and a new instance can be brought up just like the
brokers where we expect a broker going down (so we do more partitioning and
replications)

I get this exception -

java.net.ConnectException: Connection refused: no further information
at sun.nio.ch.SocketChannelImpl.checkConnect(Native Method) ~[na:1.7.0_67]
at sun.nio.ch.SocketChannelImpl.finishConnect(Unknown Source) ~[na:1.7.0_67]
at org.apache.kafka.common.network.Selector.poll(Selector.java:238)
~[kafka-clients-0.8.2.1.jar:na]
at org.apache.kafka.clients.NetworkClient.poll(NetworkClient.java:192)
[kafka-clients-0.8.2.1.jar:na]
at org.apache.kafka.clients.producer.internals.Sender.run(Sender.java:191)
[kafka-clients-0.8.2.1.jar:na]
at org.apache.kafka.clients.producer.internals.Sender.run(Sender.java:122)
[kafka-clients-0.8.2.1.jar:na]
at java.lang.Thread.run(Unknown Source) [na:1.7.0_67]

I think the status never goes to blackout because this exception really
happens in the poll() and not in the connect() method, which is also
mentioned in the javadoc that the call is only initiated (and as long as
the dns entry is there) it only fails to connect in the poll() method. And
in the poll() method the status is not reset to DISCONNECTED and so it not
blacked out.


On Fri, Aug 21, 2015 at 10:06 PM, Ewen Cheslack-Postava 
wrote:

> Are you seeing this in practice or is this just a concern about the way the
> code currently works? If the broker is actually down and the host is
> rejecting connections, the situation you describe shouldn't be a problem.
> It's true that the NetworkClient chooses a fixed nodeIndexOffset, but the
> expectation is that if we run one iteration of leastLoadedNode and select a
> node, we'll try to connect and any failure will be handled by putting that
> node into a blackout period during which subsequent calls to
> leastLoadedNode will give priority to other options. If your server is
> *not* explicitly rejecting connections, I think it could be possible that
> we end up hanging for a long while just waiting for that connection. If
> this is the case (e.g., if you are running on EC2 and it has this behavior
> -- I believe default firewall rules will not kill the connection), this
> would be useful to know.
>
> A couple of bugs you might want to be aware of:
>
> https://issues.apache.org/jira/browse/KAFKA-1843 is meant to generally
> address the fact that there are a lot of states that we could be in, and
> the way we handle them (especially with leastLoadedNode), may not work well
> in all cases. It's very difficult to be comprehensive here, so if there is
> a scenario that is not failing for you, the more information you can give
> about the state of the system and the producer, the better.
>
> https://issues.apache.org/jira/browse/KAFKA-1842 might also be relevant --
> right now we rely on the underlying TCP connection timeouts, but this is
> definitely not ideal. They can be quite long by default, and we might want
> to consider connections failed much sooner.
>
> I also could have sworn there was a JIRA filed about the fact that the
> bootstrap servers are never reused, but I can't find it at the moment -- in
> some cases, if you have no better option then it would be best to revert
> back to the original set of bootstrap servers for loading metadata. This
> can especially become a problem in some cases where your only producing to
> one or a small number of topics and therefore only have metadata for a
> couple of servers. If anything happens to those servers too quickly (within
> the metadata refresh period) you might potentially get stuck with only
> references to dead nodes.
>
> -

bootstrap.servers for the new Producer

2015-08-21 Thread Kishore Senji
If one of the broker we specify in the bootstrap servers list is down,
there is a chance that the Producer (a brand new instance with no prior
metadata) will never be able to publish anything to Kafka until that broker
is up. Because the logic for getting the initial metadata is based on some
random index to the set of bootstrap nodes and if it happens to be the down
node, Kafka producer keeps on trying to get the metadata on that node only.
It is never switched to another node. Without metadata, the Producer can
never send anything.

The nodeIndexOffset is chosen at the creation of the NetworkClient (and
this offset is not changed when we fail to get a new connection) and so for
getting the metadata for the first time, there is a possibility that we
keep on trying on the broker that is down.

This can be a problem if a broker goes down and also a Producer is
restarted or a new instance is brought up. Is this a known issue?


Re: Possible DEAD LOCK for one day at broker controller?

2015-08-20 Thread Kishore Senji
Hi Zhao, Do you see any other errors regarding checkpoint file? Is this
reproducible by you and if you can you enable debug log level to get more
info.​

On Thu, Aug 20, 2015 at 7:44 AM, Zhao Weinan  wrote:

> Hi Kishore Senji,
>
> I've been busy recovering some data these two days... and found that I
> maybe hit more serious problem than I thought. I lost almost all data on
> one broker at least at some time, here is some log from server.log pasted
> below, and very like the situation described by Jason and Thunder here.
> <
> http://mail-archives.apache.org/mod_mbox/kafka-users/201504.mbox/%3CCAA%2BBczTUBqg1-tpcUjwfZgZYZyOXC-Myuhd_2EaGkeKWkrCVUQ%40mail.gmail.com%3E
> >
>
> Any idea about this? Why Kafka got segments with no-zero LEO then said No
> checkpointed highwatermark?
>
> From broker's logs/server.log:
>
> > [2015-08-16 17:14:35,204] INFO Completed load of log xx-1 with log
> end
> > offset 863967227 (kafka.log.Log)
> > [2015-08-16 17:15:29,648] WARN Partition [xx,1] on broker 1: No
> > checkpointed highwatermark is found for partition [xx,1]
> > (kafka.cluster.Partition)
> > [2015-08-16 17:15:36,887] INFO Truncating log xx-1 to offset 0.
> > (kafka.log.Log)
> > [2015-08-16 17:15:36,887] INFO Scheduling log segment 763102206 for log
> > xx-1 for deletion. (kafka.log.Log)
> > [2015-08-16 17:15:36,888] INFO Scheduling log segment 768984638 for log
> > xx-1 for deletion. (kafka.log.Log)
> > [2015-08-16 17:15:36,888] INFO Scheduling log segment 773712058 for log
> > xx-1 for deletion. (kafka.log.Log)
> > [2015-08-16 17:15:36,888] INFO Scheduling log segment 778002546 for log
> > xx-1 for deletion. (kafka.log.Log)
> > .
> > [2015-08-16 17:34:18,168] INFO Scheduling log segment 0 for log xx-1
> > for deletion. (kafka.log.Log)
> > [2015-08-16 17:36:37,811] WARN [ReplicaFetcherThread-0-0], Replica 1 for
> > partition [xx,1] reset its fetch offset from 791913697 to current
> > leader 0's start offset 791913697 (kafka.server.ReplicaFetcherThread)
> > [2015-08-16 17:36:37,811] ERROR [ReplicaFetcherThread-0-0], Current
> offset
> > 0 for partition [xx,1] out of range; reset offset to 791913697
> > (kafka.server.ReplicaFetcherThread)
> >
> > From broker's logs/controller.log:
>
> >  [2015-08-16 17:14:41,444] INFO [Controller 1]: Controller starting up
> > (kafka.controller.KafkaController)
> > [2015-08-16 17:14:41,492] INFO [Controller 1]: Controller startup
> complete
> > (kafka.controller.KafkaController)
> > [2015-08-16 17:16:24,850] INFO [SessionExpirationListener on 1], ZK
> > expired; shut down all controller components and try to re-elect
> > (kafka.controller.KafkaController$SessionExpirationListener)
> >
>
> 2015-08-18 23:43 GMT+08:00 Kishore Senji :
>
> > Yes you are right. I misread the code. So the only thing that can explain
> > the behavior you are seeing is that may be there are many segments that
> > need to be deleted all at once. Can you try may be reducing the
> > retention.ms
> > in smaller intervals - like reduce it to 9 days from 10 days and see if
> the
> > brokers are fine.
> >
> > On Tue, Aug 18, 2015 at 12:48 AM Zhao Weinan  wrote:
> >
> > > Hi Kishore Senji,
> > >
> > > Did you constantly send messages to your test topic? Or just one time
> > send?
> > > I've just did some test, the log.lastModified is updated with every
> > message
> > > received (or every flush to disk at least). So I think if your interval
> > > between two neibouring messages is never smaller than retention.ms,
> then
> > > your current segments should never be deleted.
> > >
> > > The wired thing is the offset of one partition in my data-loss-topic
> > became
> > > to be zero, while offsets in other partition are normally minimum...
> > >
> > > 2015-08-18 14:45 GMT+08:00 Kishore Senji :
> > >
> > > > It is log.deleteOldSegments(startMs - _.lastModified >
> > > > log.config.retentionMs)
> > > >
> > > > You might have missed the startMs.
> > > >
> > > > I have tested it myself. I created a test topic with retention.ms
> > equal
> > > to
> > > > 20 minutes and added some messages. Later I changed the retention.ms
> > to
> > > 2
> > > > min. I can see whenever the delete thread runs (every five min), it
> > > deletes
> > > > even the latest Segment because that Segment age is older than
> > > > retention.ms
> > >

Re: Error handling in New AsyncProducer

2015-08-18 Thread Kishore Senji
If you have enough memory for batch size then the scenario is no different
than your original question because block on full memory is true by default
(and if it is false you actually get a notification via exception). So when
you said reduce the buffer memory to "minimum", I assumed you are referring
to reducing the number of messages buffered in memory so that the potential
lose of messages is less (in case the producer totally dies)

On Tue, Aug 18, 2015 at 9:07 AM sunil kalva  wrote:

> kishore
> How is going to reduce throughput if we have enough memory for batch size
> ?, could you please explain
>
> On Tue, Aug 18, 2015 at 11:47 AM, Kishore Senji  wrote:
>
> > But this will reduce the throughput in a good scenario. May be we need to
> > enhance the Callback interface appropriately.
> >
> > On Mon, Aug 17, 2015 at 7:15 PM, sunil kalva 
> > wrote:
> >
> > > tx jeff,
> > > Actually we need to set "buffer.memory" to minimum (default is ~35 MB)
> > and
> > > "block.on.buffer.full" to "true"  so that the sender will block as soon
> > as
> > > these conditions met. And then release once the cluster is healthy.
> > >
> > > --
> > > SunilKalva
> > >
> > > On Mon, Aug 17, 2015 at 11:20 PM, Jeff Holoman 
> > > wrote:
> > >
> > > > Actually this won't work. The problem is if the producer loses
> > > connectivity
> > > > to the broker, then messages will continue to queue up until
> batch.size
> > > is
> > > > exhausted. Then the send will block.  At this point, if you gain
> > > > connectivity again, then the messages will be resent.
> > > >
> > > > If all brokers die, you should get Not Enough Replicas if you have
> the
> > > > min.isr.set first. This should be an indication that you are having a
> > > > problem, before you get to the error in the actual prodcuer thread.
> > Keep
> > > in
> > > > mind that as of right now you need to manually set the min.isr's with
> > > alter
> > > > topic due to KAFKA-2114
> > > > Either way, you will eventually get an error in the producer thread
> > that
> > > > isn't passed to the callback.
> > > >
> > > > And I think that's the point.
> > > >
> > > > Thanks
> > > >
> > > > Jeff
> > > >
> > > >
> > > >
> > > >
> > > >
> > > >
> > > > On Mon, Aug 17, 2015 at 12:39 PM, Madhukar Bharti <
> > > > bhartimadhu...@gmail.com>
> > > > wrote:
> > > >
> > > > > Hi Sunil,
> > > > >
> > > > > Producer will throw an Exception in callback if there is problem
> > while
> > > > > sending data. You can check like:
> > > > >
> > > > > public void onCompletion(RecordMetadata arg0, Exception arg1) {
> > > > > if (arg1 != null) {
> > > > > System.out.println("exception occured");
> > > > > }
> > > > > System.out.println("sent")
> > > > >
> > > > >
> > > > >
> > > > >
> > > > > On Mon, Aug 17, 2015 at 9:20 PM, sunil kalva <
> kalva.ka...@gmail.com>
> > > > > wrote:
> > > > >
> > > > > > Hi all
> > > > > > I am using new java producer in async mode, when my entire
> cluster
> > is
> > > > > down
> > > > > > i am loosing all my messages. How do we get notification when the
> > > > cluster
> > > > > > is down so that i can send messages to another cluster. The
> > callback
> > > is
> > > > > > only triggered when the cluster is reachable .
> > > > > >
> > > > > > --SK
> > > > > >
> > > > >
> > > > >
> > > > >
> > > > > --
> > > > > Thanks and Regards,
> > > > > Madhukar Bharti
> > > > > Mob: 7845755539
> > > > >
> > > >
> > > >
> > > >
> > > > --
> > > > Jeff Holoman
> > > > Systems Engineer
> > > >
> > >
> >
>


Re: Possible DEAD LOCK for one day at broker controller?

2015-08-18 Thread Kishore Senji
Yes you are right. I misread the code. So the only thing that can explain
the behavior you are seeing is that may be there are many segments that
need to be deleted all at once. Can you try may be reducing the retention.ms
in smaller intervals - like reduce it to 9 days from 10 days and see if the
brokers are fine.

On Tue, Aug 18, 2015 at 12:48 AM Zhao Weinan  wrote:

> Hi Kishore Senji,
>
> Did you constantly send messages to your test topic? Or just one time send?
> I've just did some test, the log.lastModified is updated with every message
> received (or every flush to disk at least). So I think if your interval
> between two neibouring messages is never smaller than retention.ms, then
> your current segments should never be deleted.
>
> The wired thing is the offset of one partition in my data-loss-topic became
> to be zero, while offsets in other partition are normally minimum...
>
> 2015-08-18 14:45 GMT+08:00 Kishore Senji :
>
> > It is log.deleteOldSegments(startMs - _.lastModified >
> > log.config.retentionMs)
> >
> > You might have missed the startMs.
> >
> > I have tested it myself. I created a test topic with retention.ms equal
> to
> > 20 minutes and added some messages. Later I changed the retention.ms to
> 2
> > min. I can see whenever the delete thread runs (every five min), it
> deletes
> > even the latest Segment because that Segment age is older than
> > retention.ms
> >
> >
> > On Mon, Aug 17, 2015 at 11:30 PM, Zhao Weinan 
> wrote:
> >
> > > Hi Kishore Senji,
> > >
> > > The size of segement file is default 1GB.
> > >
> > > According to the LogManager.scala#cleanupExpiredSegments, Kafka will
> only
> > > delete segments whose lastModTime is older than retention.ms, so I
> dont
> > > think this is the reason for my data loss. Actually I lost some data in
> > > topic other than the topic I reduced the retention...
> > >
> > > I dont know whether destage these several GB files will cause this kind
> > of
> > > system chattering, though we do use not very fancy hardwares.
> > >
> > > 2015-08-18 7:48 GMT+08:00 Kishore Senji :
> > >
> > > > What is the size of the segment file? You are reducing the retention
> > from
> > > > 10 days to 1 day. The moment you do this, it will delete all segments
> > > which
> > > > are older than 1 day. So for example, if your latest segment is older
> > > than
> > > > 1 day and if there are consumers which are still catching up (let us
> > say
> > > 10
> > > > min lag), Kafka will roll over and delete the older segments and
> there
> > is
> > > > potential for data loss. One pattern could be to make sure you change
> > > this
> > > > config parameter only when a new segment is created and all consumers
> > are
> > > > on the new segment and also make sure all clients will be done with
> the
> > > > segment before the file is deleted.
> > > >
> > > > My guess is that your segment file is huge and the OS may be taking a
> > > long
> > > > time to destage the file cache on to disk before letting it to be
> > > deleted.
> > > > This could be the reason for long pause which could be causing the Zk
> > > > connections to be timed out.
> > > >
> > > >
> > > >
> > > > On Mon, Aug 17, 2015 at 6:59 AM Zhao Weinan 
> > wrote:
> > > >
> > > > > Hi Kishore Senji,
> > > > >
> > > > > Thanks for the reply.
> > > > >
> > > > > Do you have some suggestions before the fix came up? Try not to
> > modify
> > > > the
> > > > > retention.ms? Or disable the auto rebalance? Cause this problem is
> > > 100%
> > > > > reproduceable in my scenario (two times got dead lock in two
> > > > retention.ms
> > > > > modification), and I even found some data loss which I'm still
> > looking
> > > > for
> > > > > the reason.
> > > > >
> > > > > Any idea on why shrinking the retention.ms causes the network
> > > unstable?
> > > > >
> > > > > And yes I use the comma for clarity :)
> > > > >
> > > > > 2015-08-17 8:59 GMT+08:00 Kishore Senji :
> > > > >
> > > > > > Interesting problem you ran in to. It seems like this broker was
> > > chosen
> > > > > as
> >

Re: Possible DEAD LOCK for one day at broker controller?

2015-08-17 Thread Kishore Senji
It is log.deleteOldSegments(startMs - _.lastModified >
log.config.retentionMs)

You might have missed the startMs.

I have tested it myself. I created a test topic with retention.ms equal to
20 minutes and added some messages. Later I changed the retention.ms to 2
min. I can see whenever the delete thread runs (every five min), it deletes
even the latest Segment because that Segment age is older than retention.ms


On Mon, Aug 17, 2015 at 11:30 PM, Zhao Weinan  wrote:

> Hi Kishore Senji,
>
> The size of segement file is default 1GB.
>
> According to the LogManager.scala#cleanupExpiredSegments, Kafka will only
> delete segments whose lastModTime is older than retention.ms, so I dont
> think this is the reason for my data loss. Actually I lost some data in
> topic other than the topic I reduced the retention...
>
> I dont know whether destage these several GB files will cause this kind of
> system chattering, though we do use not very fancy hardwares.
>
> 2015-08-18 7:48 GMT+08:00 Kishore Senji :
>
> > What is the size of the segment file? You are reducing the retention from
> > 10 days to 1 day. The moment you do this, it will delete all segments
> which
> > are older than 1 day. So for example, if your latest segment is older
> than
> > 1 day and if there are consumers which are still catching up (let us say
> 10
> > min lag), Kafka will roll over and delete the older segments and there is
> > potential for data loss. One pattern could be to make sure you change
> this
> > config parameter only when a new segment is created and all consumers are
> > on the new segment and also make sure all clients will be done with the
> > segment before the file is deleted.
> >
> > My guess is that your segment file is huge and the OS may be taking a
> long
> > time to destage the file cache on to disk before letting it to be
> deleted.
> > This could be the reason for long pause which could be causing the Zk
> > connections to be timed out.
> >
> >
> >
> > On Mon, Aug 17, 2015 at 6:59 AM Zhao Weinan  wrote:
> >
> > > Hi Kishore Senji,
> > >
> > > Thanks for the reply.
> > >
> > > Do you have some suggestions before the fix came up? Try not to modify
> > the
> > > retention.ms? Or disable the auto rebalance? Cause this problem is
> 100%
> > > reproduceable in my scenario (two times got dead lock in two
> > retention.ms
> > > modification), and I even found some data loss which I'm still looking
> > for
> > > the reason.
> > >
> > > Any idea on why shrinking the retention.ms causes the network
> unstable?
> > >
> > > And yes I use the comma for clarity :)
> > >
> > > 2015-08-17 8:59 GMT+08:00 Kishore Senji :
> > >
> > > > Interesting problem you ran in to. It seems like this broker was
> chosen
> > > as
> > > > the Controller and onControllerFailure() method was called. This will
> > > > schedule the checkAndTriggerPartitionRebalance method to execute
> after
> > 5
> > > > seconds (when auto rebalance enabled). In the mean time this broker
> > lost
> > > > zookeeper connection and so this broker resigns from the Controller
> > > status
> > > > and so the onControllerResignation() method is called and this method
> > > will
> > > > try to shutdown the auto rebalance executor.  But it is doing it by
> > > holding
> > > > the lock and this is what caused the dead lock in your case.
> > > >
> > > > I do not think we need to hold the lock to shutdown the executor.
> This
> > > > could be the fix we might need.
> > > >
> > > > retention.ms config parameter should not have commas in the value.
> Are
> > > you
> > > > just using it here to clarify for us.
> > > >
> > > > It so happened in your
> > > > On Sun, Aug 16, 2015 at 1:52 AM Zhao Weinan 
> > wrote:
> > > >
> > > > > Hi guys,
> > > > >
> > > > > I got this problem, after changing one topic's config to
> > > > > retention.ms=86,400,000
> > > > > from 864,000,000, the brokers start to shedule and do deletions of
> > > > outdated
> > > > > index of that topic.
> > > > >
> > > > > Then for some reason some brokers' connection with zookeeper were
> > > > expired,
> > > > > suddenly lots of ERRORs showed up in logs/server.log: At controller
> > > > > broker(id=5) are:

Re: Error handling in New AsyncProducer

2015-08-17 Thread Kishore Senji
But this will reduce the throughput in a good scenario. May be we need to
enhance the Callback interface appropriately.

On Mon, Aug 17, 2015 at 7:15 PM, sunil kalva  wrote:

> tx jeff,
> Actually we need to set "buffer.memory" to minimum (default is ~35 MB) and
> "block.on.buffer.full" to "true"  so that the sender will block as soon as
> these conditions met. And then release once the cluster is healthy.
>
> --
> SunilKalva
>
> On Mon, Aug 17, 2015 at 11:20 PM, Jeff Holoman 
> wrote:
>
> > Actually this won't work. The problem is if the producer loses
> connectivity
> > to the broker, then messages will continue to queue up until batch.size
> is
> > exhausted. Then the send will block.  At this point, if you gain
> > connectivity again, then the messages will be resent.
> >
> > If all brokers die, you should get Not Enough Replicas if you have the
> > min.isr.set first. This should be an indication that you are having a
> > problem, before you get to the error in the actual prodcuer thread. Keep
> in
> > mind that as of right now you need to manually set the min.isr's with
> alter
> > topic due to KAFKA-2114
> > Either way, you will eventually get an error in the producer thread that
> > isn't passed to the callback.
> >
> > And I think that's the point.
> >
> > Thanks
> >
> > Jeff
> >
> >
> >
> >
> >
> >
> > On Mon, Aug 17, 2015 at 12:39 PM, Madhukar Bharti <
> > bhartimadhu...@gmail.com>
> > wrote:
> >
> > > Hi Sunil,
> > >
> > > Producer will throw an Exception in callback if there is problem while
> > > sending data. You can check like:
> > >
> > > public void onCompletion(RecordMetadata arg0, Exception arg1) {
> > > if (arg1 != null) {
> > > System.out.println("exception occured");
> > > }
> > > System.out.println("sent")
> > >
> > >
> > >
> > >
> > > On Mon, Aug 17, 2015 at 9:20 PM, sunil kalva 
> > > wrote:
> > >
> > > > Hi all
> > > > I am using new java producer in async mode, when my entire cluster is
> > > down
> > > > i am loosing all my messages. How do we get notification when the
> > cluster
> > > > is down so that i can send messages to another cluster. The callback
> is
> > > > only triggered when the cluster is reachable .
> > > >
> > > > --SK
> > > >
> > >
> > >
> > >
> > > --
> > > Thanks and Regards,
> > > Madhukar Bharti
> > > Mob: 7845755539
> > >
> >
> >
> >
> > --
> > Jeff Holoman
> > Systems Engineer
> >
>


Re: 0.8.2 producer and single message requests

2015-08-17 Thread Kishore Senji
If linger.ms is 0, batching does not add to the latency. It will actually
improve throughput without affecting latency. Enabling batching does not
mean it will wait for the batch to be full. Whatever gets filled during the
previous batch send will be sent in the current batch even if it count is
less than batch.size

You do not have to work with Future. With callback you will get Async model
essentially (and you can make use of it if you webservice is using Servlet
3.0)


producer.send(record, new AsyncCallback(request, response));


static final class AsyncCallback implements Callback {

HttpServletRequest request;
HttpServletResponse response;

void onCompletion(RecordMetadata metadata, java.lang.Exception exception) {

  // Check exception and send appropriate response

}
}

On Mon, Aug 17, 2015 at 10:49 AM Neelesh  wrote:

> Thanks for the answers. Indeed, the callback model is the same regardless
> of batching. But for a synchronous web service, batching creates a latency
> issue. linger.ms is by default set to zero. Also, java futures are  hard
> to
> work with compared to Scala futures.  The current API also returns one
> future per single record send (correct me if I missed another variant) that
> leaves the client code to deal with hundreds of futures and/or callbacks.
> May I'm missing something very obvious in the new API, but this model and
> the fact that the scala APIs are going away makes writing an ingestion
> service in front of Kafka  more involved than the 0.8.1 API.
>
> On Sun, Aug 16, 2015 at 12:02 AM, Kishore Senji  wrote:
>
> > Adding to what Gwen already mentioned -
> >
> > The programming model for the Producer is send() with an optional
> callback
> > and we get a Future. This model does not change whether behind the scenes
> > batching is done or not. So your fault tolerance logic really should not
> > depend on whether batching is done over the wire for performance reasons.
> > So assuming that you will get better fault tolerance without batching is
> > also not accurate, as you have to check you have any exception in the
> > onCompletion()
> >
> > The webservice should have a callback registered (using which you
> > essentially get async model) for every send() and based on that it should
> > respond to its clients whether the call is successful or not. The clients
> > of your webservice should have fault tolerance built on top of your
> > response codes.
> >
> > I think batching is a good thing as you get better throughput plus if you
> > do not have linger.ms set, it does not wait until it completely reaches
> > the
> > batch.size so all the concurrent requests to your webservice will get
> > batched and sent to the broker which will increase the throughput of the
> > Producer and in turn your webservice.
> >
> > On Fri, Aug 14, 2015 at 6:10 PM Gwen Shapira  wrote:
> >
> > > Hi Neelesh :)
> > >
> > > The new producer has configuration for controlling the batch sizes.
> > > By default, it will batch as much as possible without delay (controlled
> > by
> > > linger.ms) and without using too much memory (controlled by
> batch.size).
> > >
> > > As mentioned in the docs, you can set batch.size to 0 to disable
> batching
> > > completely if you want.
> > >
> > > It is worthwhile to consider using the producer callback to avoid
> losing
> > > messages when the webservice crashes (for example have the webservice
> > only
> > > consider messages as sent if the callback is triggered for a successful
> > > send).
> > >
> > > You can read more information on batching here:
> > >
> > >
> >
> http://ingest.tips/2015/07/19/tips-for-improving-performance-of-kafka-producer/
> > >
> > > And some examples on how to produce data to Kafka with the new
> producer -
> > > both with futures and callbacks here:
> > >
> > >
> >
> https://github.com/gwenshap/kafka-examples/blob/master/SimpleCounter/src/main/java/com/shapira/examples/producer/simplecounter/DemoProducerNewJava.java
> > >
> > > Gwen
> > >
> > >
> > >
> > > On Fri, Aug 14, 2015 at 5:07 PM, Neelesh  wrote:
> > >
> > > > We are fronting all our Kafka requests with a simple web service (we
> do
> > > > some additional massaging and writing to other stores as well). The
> new
> > > > KafkaProducer in 0.8.2 seems very geared towards producer batching.
> > Most
> > > of
> > > > our payload are single messages.
> > > >
> > > > Producer batching basically sets us up for lost messages if our web
> > > service
> > > > goes down with unflushed messaged in the producer.
> > > >
> > > > Another issue is when we have a batch of records. It looks like I
> have
> > to
> > > > call producer.send for each record and deal with individual futures
> > > > returned.
> > > >
> > > > Are there any patterns for primarily single message requests, without
> > > > losing data? I understand the throughput will be low.
> > > >
> > > > Thanks!
> > > > -Neelesh
> > > >
> > >
> >
>


Re: Reduce latency

2015-08-17 Thread Kishore Senji
Just to clarify it is 1 thread / Broker / Producer. Javadoc recommends to
use one Producer across multiple threads and batching is done behind the
scenes so throughput should not be a problem. So if you have more brokers,
the same Producer instance will use multiple threads to send to each
Broker, this way load is also spread out on the brokers and you will get a
good performance from the Producer.

But if you notice that the network thread is busy on IO all the time (in
visual vm) and there is more bandwidth that you can leverage and the
Brokers can handle more requests, then you can go for multiple instances of
Producers (may be a pool of Producers). But this will also increase the
memory usage as the buffer for batching is per Producer.

On Mon, Aug 17, 2015 at 9:55 PM Tao Feng  wrote:

> If you run producerPerformance test, there is only one thread per
> KafkaProducer doing the actual sending. But one network request could
> contain multiple batches from what I understand.
>
> On Mon, Aug 17, 2015 at 5:42 PM, Yuheng Du 
> wrote:
>
> > Thank you Kishore, I made the buffer twice the size of the batch size and
> > the latency has reduced significantly.
> >
> > But is there only one thread io thread sending the batches? Can I
> increase
> > the number of threads sending the batches so more than one batch could be
> > sent at the same time?
> >
> > Thanks.
> >
> >
> >
> > On Thu, Aug 13, 2015 at 5:38 PM, Kishore Senji  wrote:
> >
> > > Your batch.size is 8196 and your buffer.memory is 67108864. This means
> > > 67108864/8196
> > > ~ 8188 batches are in memory ready to the sent. There is only one
> thread
> > io
> > > thread sending them. I would guess that the io thread (
> > > kafka-producer-network-thread) would be busy. Please check it in visual
> > vm.
> > >
> > > In steady state, every batch has to wait for the previous 8187 batches
> to
> > > be done before it gets a chance to be sent out, but the latency is
> > counted
> > > from the time is added to the queue. This is the reason that you are
> > seeing
> > > very high end-to-end latency.
> > >
> > > Have the buffer.memory to be only twice that of the batch.size so that
> > > while one is in flight, you can another batch ready to go (and the
> > > KafkaProducer would block to send more when there is no memory and this
> > way
> > > the batches are not waiting in the queue unnecessarily) . Also may be
> you
> > > want to increase the batch.size further more, you will get even better
> > > throughput with more or less same latency (as there is no shortage of
> > > events in the test program).
> > >
> > > On Thu, Aug 13, 2015 at 1:13 PM Yuheng Du 
> > > wrote:
> > >
> > > > Yes there is. But if we are using ProducerPerformance test, it's
> > > configured
> > > > as giving input when running the test command. Do you write a java
> > > program
> > > > to test the latency? Thanks.
> > > >
> > > > On Thu, Aug 13, 2015 at 3:54 PM, Alvaro Gareppe 
> > > > wrote:
> > > >
> > > > > I'm using last one, but not using the ProducerPerformance, I
> created
> > my
> > > > > own. but I think there is a producer.properties file in config
> folder
> > > in
> > > > > kafka.. is that configuration not for this tester ?
> > > > >
> > > > > On Thu, Aug 13, 2015 at 4:18 PM, Yuheng Du <
> yuheng.du.h...@gmail.com
> > >
> > > > > wrote:
> > > > >
> > > > > > Thank you Alvaro,
> > > > > >
> > > > > > How to use sync producers? I am running the standard
> > > > ProducerPerformance
> > > > > > test from kafka to measure the latency of each message to send
> from
> > > > > > producer to broker only.
> > > > > > The command is like "bin/kafka-run-class.sh
> > > > > > org.apache.kafka.clients.tools.ProducerPerformance test7 5000
> > 100
> > > > -1
> > > > > > acks=1 bootstrap.servers=esv4-hcl198.grid.linkedin.com:9092
> > > > > > buffer.memory=67108864 batch.size=8196"
> > > > > >
> > > > > > For running producers, where should I put the producer.type=sync
> > > > > > configuration into? The config/server.properties? Also Does this
> > mean
> > > > we
> > > > > > are using 

Re: Possible DEAD LOCK for one day at broker controller?

2015-08-17 Thread Kishore Senji
What is the size of the segment file? You are reducing the retention from
10 days to 1 day. The moment you do this, it will delete all segments which
are older than 1 day. So for example, if your latest segment is older than
1 day and if there are consumers which are still catching up (let us say 10
min lag), Kafka will roll over and delete the older segments and there is
potential for data loss. One pattern could be to make sure you change this
config parameter only when a new segment is created and all consumers are
on the new segment and also make sure all clients will be done with the
segment before the file is deleted.

My guess is that your segment file is huge and the OS may be taking a long
time to destage the file cache on to disk before letting it to be deleted.
This could be the reason for long pause which could be causing the Zk
connections to be timed out.



On Mon, Aug 17, 2015 at 6:59 AM Zhao Weinan  wrote:

> Hi Kishore Senji,
>
> Thanks for the reply.
>
> Do you have some suggestions before the fix came up? Try not to modify the
> retention.ms? Or disable the auto rebalance? Cause this problem is 100%
> reproduceable in my scenario (two times got dead lock in two retention.ms
> modification), and I even found some data loss which I'm still looking for
> the reason.
>
> Any idea on why shrinking the retention.ms causes the network unstable?
>
> And yes I use the comma for clarity :)
>
> 2015-08-17 8:59 GMT+08:00 Kishore Senji :
>
> > Interesting problem you ran in to. It seems like this broker was chosen
> as
> > the Controller and onControllerFailure() method was called. This will
> > schedule the checkAndTriggerPartitionRebalance method to execute after 5
> > seconds (when auto rebalance enabled). In the mean time this broker lost
> > zookeeper connection and so this broker resigns from the Controller
> status
> > and so the onControllerResignation() method is called and this method
> will
> > try to shutdown the auto rebalance executor.  But it is doing it by
> holding
> > the lock and this is what caused the dead lock in your case.
> >
> > I do not think we need to hold the lock to shutdown the executor. This
> > could be the fix we might need.
> >
> > retention.ms config parameter should not have commas in the value. Are
> you
> > just using it here to clarify for us.
> >
> > It so happened in your
> > On Sun, Aug 16, 2015 at 1:52 AM Zhao Weinan  wrote:
> >
> > > Hi guys,
> > >
> > > I got this problem, after changing one topic's config to
> > > retention.ms=86,400,000
> > > from 864,000,000, the brokers start to shedule and do deletions of
> > outdated
> > > index of that topic.
> > >
> > > Then for some reason some brokers' connection with zookeeper were
> > expired,
> > > suddenly lots of ERRORs showed up in logs/server.log: At controller
> > > broker(id=5) are:
> > >
> > > > *ERROR [ReplicaFetcherThread-2-4], Error for partition [X,4] to
> > > broker
> > > > 4:class kafka.common.NotLeaderForPartitionException
> > > > (kafka.server.ReplicaFetcherThread).*
> > > >
> > > At other broker which the controller broker try to fetch are:
> > >
> > > > *[Replica Manager on Broker 4]: Fetch request with correlation id
> > 1920630
> > > > from client ReplicaFetcherThread-2-4 on partition [X,4] failed
> due
> > to
> > > > Leader not local for partition [XXX,4] on broker 4
> > > > (kafka.server.ReplicaManager).*
> > > >
> > >
> > > In controller broker's server.log there are zk reconnections:
> > >
> > > > *INFO Client session timed out, have not heard from server in 5126ms
> > for
> > > > sessionid 0x54e0aaa9582b8e4, closing socket connection and attempting
> > > > reconnect (org.apache.zookeeper.ClientCnxn)*
> > > > *INFO zookeeper state changed (Disconnected)
> > > > (org.I0Itec.zkclient.ZkClient)*
> > > > *NFO Opening socket connection to server xxx. Will not
> attempt
> > to
> > > > authenticate using SASL (java.lang.SecurityException: Unable to
> locate
> > a
> > > > login configuration) (org.apache.zookeeper.ClientCnxn)*
> > > > *INFO Socket connection established to x, initiating
> > session
> > > > (org.apache.zookeeper.ClientCnxn)*
> > > > *INFO Session establishment complete on server xx, sessionid
> =
> > > > 0x54e0aaa9582b8e4, negotiated timeout = 6000
> > > > (org.apache.zookeeper.Cl

Re: Possible DEAD LOCK for one day at broker controller?

2015-08-16 Thread Kishore Senji
Interesting problem you ran in to. It seems like this broker was chosen as
the Controller and onControllerFailure() method was called. This will
schedule the checkAndTriggerPartitionRebalance method to execute after 5
seconds (when auto rebalance enabled). In the mean time this broker lost
zookeeper connection and so this broker resigns from the Controller status
and so the onControllerResignation() method is called and this method will
try to shutdown the auto rebalance executor.  But it is doing it by holding
the lock and this is what caused the dead lock in your case.

I do not think we need to hold the lock to shutdown the executor. This
could be the fix we might need.

retention.ms config parameter should not have commas in the value. Are you
just using it here to clarify for us.

It so happened in your
On Sun, Aug 16, 2015 at 1:52 AM Zhao Weinan  wrote:

> Hi guys,
>
> I got this problem, after changing one topic's config to
> retention.ms=86,400,000
> from 864,000,000, the brokers start to shedule and do deletions of outdated
> index of that topic.
>
> Then for some reason some brokers' connection with zookeeper were expired,
> suddenly lots of ERRORs showed up in logs/server.log: At controller
> broker(id=5) are:
>
> > *ERROR [ReplicaFetcherThread-2-4], Error for partition [X,4] to
> broker
> > 4:class kafka.common.NotLeaderForPartitionException
> > (kafka.server.ReplicaFetcherThread).*
> >
> At other broker which the controller broker try to fetch are:
>
> > *[Replica Manager on Broker 4]: Fetch request with correlation id 1920630
> > from client ReplicaFetcherThread-2-4 on partition [X,4] failed due to
> > Leader not local for partition [XXX,4] on broker 4
> > (kafka.server.ReplicaManager).*
> >
>
> In controller broker's server.log there are zk reconnections:
>
> > *INFO Client session timed out, have not heard from server in 5126ms for
> > sessionid 0x54e0aaa9582b8e4, closing socket connection and attempting
> > reconnect (org.apache.zookeeper.ClientCnxn)*
> > *INFO zookeeper state changed (Disconnected)
> > (org.I0Itec.zkclient.ZkClient)*
> > *NFO Opening socket connection to server xxx. Will not attempt to
> > authenticate using SASL (java.lang.SecurityException: Unable to locate a
> > login configuration) (org.apache.zookeeper.ClientCnxn)*
> > *INFO Socket connection established to x, initiating session
> > (org.apache.zookeeper.ClientCnxn)*
> > *INFO Session establishment complete on server xx, sessionid =
> > 0x54e0aaa9582b8e4, negotiated timeout = 6000
> > (org.apache.zookeeper.ClientCnxn)*
> > *INFO zookeeper state changed (SyncConnected)
> > (org.I0Itec.zkclient.ZkClient)*
> >
> But on zookeeper /brokers/ids/ there is no controller broker's id 5.
>
> Then I tried to restart the controller broker, found the process won't
> quit.
>
> Then I jstacked it, found the broker process kind of stucked, some
> keypoints pasted as below. It seems zk-client-expired-callback aquired the
> controllerLock and wait the kafka-scheduler Executor to exit (for one day),
> but some thread in that Executor is doing Rebalance job which need to
> aquire the controllerLock, then the broker is in DEAD LOCK and will be
> totally lost from zookeeper for ONE DAY if I'm corrected? And since it's
> still hold outdated view of the cluster, it will try to try to follower up
> the Leaders which maybe not actual Leader, caused the ERRORs as above
> mentioned.
>
> I'm using 8 Kafka brokers @0.8.2.1 with 3 Zookeeper server @3.4.6, all on
> different host in same data center, the cluster load is about 200K messages
> in and 30M bytes in and 80M bytes out totally.
>
> Does some one has the same issue? Any suggestion is appreciated.
>
>
> jstack:
>
> > "kafka-scheduler-0" daemon prio=10 tid=0x57967800 nid=0x2994
> > waiting on condition [0x46dac000]
> >java.lang.Thread.State: WAITING (parking)
> > at sun.misc.Unsafe.park(Native Method)
> > - parking to wait for  <0xc2ec6418> (a
> > java.util.concurrent.locks.ReentrantLock$NonfairSync)
> > at
> > java.util.concurrent.locks.LockSupport.park(LockSupport.java:156)
> > at
> >
> java.util.concurrent.locks.AbstractQueuedSynchronizer.parkAndCheckInterrupt(AbstractQueuedSynchronizer.java:811)
> > at
> >
> java.util.concurrent.locks.AbstractQueuedSynchronizer.acquireQueued(AbstractQueuedSynchronizer.java:842)
> > at
> >
> java.util.concurrent.locks.AbstractQueuedSynchronizer.acquire(AbstractQueuedSynchronizer.java:1178)
> > at
> >
> java.util.concurrent.locks.ReentrantLock$NonfairSync.lock(ReentrantLock.java:186)
> > at
> > java.util.concurrent.locks.ReentrantLock.lock(ReentrantLock.java:262)
> > at kafka.utils.Utils$.inLock(Utils.scala:533)
> > at
> >
> kafka.controller.KafkaController$$anonfun$kafka$controller$KafkaController$$checkAndTriggerPartitionRebalance$4.apply(KafkaController.scala:1131)
> > at
> >
> kafka.controller.KafkaC

Re: 0.8.2 producer and single message requests

2015-08-16 Thread Kishore Senji
Adding to what Gwen already mentioned -

The programming model for the Producer is send() with an optional callback
and we get a Future. This model does not change whether behind the scenes
batching is done or not. So your fault tolerance logic really should not
depend on whether batching is done over the wire for performance reasons.
So assuming that you will get better fault tolerance without batching is
also not accurate, as you have to check you have any exception in the
onCompletion()

The webservice should have a callback registered (using which you
essentially get async model) for every send() and based on that it should
respond to its clients whether the call is successful or not. The clients
of your webservice should have fault tolerance built on top of your
response codes.

I think batching is a good thing as you get better throughput plus if you
do not have linger.ms set, it does not wait until it completely reaches the
batch.size so all the concurrent requests to your webservice will get
batched and sent to the broker which will increase the throughput of the
Producer and in turn your webservice.

On Fri, Aug 14, 2015 at 6:10 PM Gwen Shapira  wrote:

> Hi Neelesh :)
>
> The new producer has configuration for controlling the batch sizes.
> By default, it will batch as much as possible without delay (controlled by
> linger.ms) and without using too much memory (controlled by batch.size).
>
> As mentioned in the docs, you can set batch.size to 0 to disable batching
> completely if you want.
>
> It is worthwhile to consider using the producer callback to avoid losing
> messages when the webservice crashes (for example have the webservice only
> consider messages as sent if the callback is triggered for a successful
> send).
>
> You can read more information on batching here:
>
> http://ingest.tips/2015/07/19/tips-for-improving-performance-of-kafka-producer/
>
> And some examples on how to produce data to Kafka with the new producer -
> both with futures and callbacks here:
>
> https://github.com/gwenshap/kafka-examples/blob/master/SimpleCounter/src/main/java/com/shapira/examples/producer/simplecounter/DemoProducerNewJava.java
>
> Gwen
>
>
>
> On Fri, Aug 14, 2015 at 5:07 PM, Neelesh  wrote:
>
> > We are fronting all our Kafka requests with a simple web service (we do
> > some additional massaging and writing to other stores as well). The new
> > KafkaProducer in 0.8.2 seems very geared towards producer batching. Most
> of
> > our payload are single messages.
> >
> > Producer batching basically sets us up for lost messages if our web
> service
> > goes down with unflushed messaged in the producer.
> >
> > Another issue is when we have a batch of records. It looks like I have to
> > call producer.send for each record and deal with individual futures
> > returned.
> >
> > Are there any patterns for primarily single message requests, without
> > losing data? I understand the throughput will be low.
> >
> > Thanks!
> > -Neelesh
> >
>


Re: use page cache as much as possiblee

2015-08-15 Thread Kishore Senji
Please check this:
https://lonesysadmin.net/2013/12/22/better-linux-disk-caching-performance-vm-dirty_ratio/

I depends on your OS, please research those parameters appropriately and
see what they are on your current system. Based on those parameters the
background syncing will be done by the OS. But I would recommend not to
completely stop the OS from asynchronously writing the file cache to disk
as it would cause large pauses when it has to write because of file cache
outgrowing the available RAM.

On Fri, Aug 14, 2015 at 8:00 PM, Yuheng Du  wrote:

> Thank you Kishore, I see that the end-to-end latency may not be reduced by
> resetting the flush time manually.
>
> But if the default flush.ms is Long.Max_Value, why I see the disk usage of
> the brokers constantly increasing when the producer is pushing in data?
> Should that be happen once a while? The os page cache usage should not be
> reflected when using "watch df -h" command, am I correct?
>
> Thanks.
>
> On Fri, Aug 14, 2015 at 10:12 PM, Kishore Senji  wrote:
>
> > Actually in 0.8.2, flush.ms & flush.messages are recommended to be left
> > defaults (Long.MAX_VALUE)
> > http://kafka.apache.org/documentation.html (search for flush.ms)
> >
> > The disk flush and the committed offset are two independent things. As
> long
> > as you have replication, the recommended thing is to leave the flushing
> to
> > the OS. But if you choose to flush manually the time interval at which
> you
> > flush may not influence the end-to-end latency from the Producer to
> > Consumer, however it can influence the throughput of the broker.
> >
> > On Fri, Aug 14, 2015 at 9:20 AM Yuheng Du 
> > wrote:
> >
> > > So if I understand correctly, even if I delay flushing, the consumer
> will
> > > get the messages as soon as the broker receives them and put them into
> > page
> > > cache (assuming producer doesn't wait for acks from brokers)?
> > >
> > > And will the decrease of log.flush interval help reduce latency between
> > > producer and consumer?
> > >
> > > Thanks.
> > >
> > >
> > > On Fri, Aug 14, 2015 at 11:57 AM, Kishore Senji 
> > wrote:
> > >
> > > > Thank you Gwen for correcting me. This document (
> > > > https://cwiki.apache.org/confluence/display/KAFKA/Kafka+Replication)
> > in
> > > > "Writes" section also has specified the same thing as you have
> > mentioned.
> > > > One thing is not clear to me as to what happens when the Replicas add
> > the
> > > > message to memory but the leader fails before acking to the producer.
> > > Later
> > > > the leader replica is chosen to be the leader for the partition, it
> > will
> > > > advance the HW to its LEO (which has the message). The producer can
> > > resend
> > > > the same message thinking it failed and there will be a duplicate
> > > message.
> > > > Is my understanding correct here?
> > > >
> > > > On Thu, Aug 13, 2015 at 10:50 PM, Gwen Shapira 
> > > wrote:
> > > >
> > > > > On Thu, Aug 13, 2015 at 4:10 PM, Kishore Senji 
> > > wrote:
> > > > >
> > > > > > Consumers can only fetch data up to the committed offset and the
> > > reason
> > > > > is
> > > > > > reliability and durability on a broker crash (some consumers
> might
> > > get
> > > > > the
> > > > > > new data and some may not as the data is not yet committed and
> > lost).
> > > > > Data
> > > > > > will be committed when it is flushed. So if you delay the
> flushing,
> > > > > > consumers won't get those messages until that time.
> > > > > >
> > > > >
> > > > > As far as I know, this is not accurate.
> > > > >
> > > > > A message is considered committed when all ISR replicas received it
> > > (this
> > > > > much is documented). This doesn't need to include writing to disk,
> > > which
> > > > > will happen asynchronously.
> > > > >
> > > > >
> > > > > >
> > > > > > Even though you flush periodically based on
> > > log.flush.interval.messages
> > > > > and
> > > > > > log.flush.interval.ms, if the segment file is in the pagecache,
> > the
> > > > > > consumers will still benefit from that pagecache and OS wouldn'

Re: use page cache as much as possiblee

2015-08-14 Thread Kishore Senji
Actually in 0.8.2, flush.ms & flush.messages are recommended to be left
defaults (Long.MAX_VALUE)
http://kafka.apache.org/documentation.html (search for flush.ms)

The disk flush and the committed offset are two independent things. As long
as you have replication, the recommended thing is to leave the flushing to
the OS. But if you choose to flush manually the time interval at which you
flush may not influence the end-to-end latency from the Producer to
Consumer, however it can influence the throughput of the broker.

On Fri, Aug 14, 2015 at 9:20 AM Yuheng Du  wrote:

> So if I understand correctly, even if I delay flushing, the consumer will
> get the messages as soon as the broker receives them and put them into page
> cache (assuming producer doesn't wait for acks from brokers)?
>
> And will the decrease of log.flush interval help reduce latency between
> producer and consumer?
>
> Thanks.
>
>
> On Fri, Aug 14, 2015 at 11:57 AM, Kishore Senji  wrote:
>
> > Thank you Gwen for correcting me. This document (
> > https://cwiki.apache.org/confluence/display/KAFKA/Kafka+Replication) in
> > "Writes" section also has specified the same thing as you have mentioned.
> > One thing is not clear to me as to what happens when the Replicas add the
> > message to memory but the leader fails before acking to the producer.
> Later
> > the leader replica is chosen to be the leader for the partition, it will
> > advance the HW to its LEO (which has the message). The producer can
> resend
> > the same message thinking it failed and there will be a duplicate
> message.
> > Is my understanding correct here?
> >
> > On Thu, Aug 13, 2015 at 10:50 PM, Gwen Shapira 
> wrote:
> >
> > > On Thu, Aug 13, 2015 at 4:10 PM, Kishore Senji 
> wrote:
> > >
> > > > Consumers can only fetch data up to the committed offset and the
> reason
> > > is
> > > > reliability and durability on a broker crash (some consumers might
> get
> > > the
> > > > new data and some may not as the data is not yet committed and lost).
> > > Data
> > > > will be committed when it is flushed. So if you delay the flushing,
> > > > consumers won't get those messages until that time.
> > > >
> > >
> > > As far as I know, this is not accurate.
> > >
> > > A message is considered committed when all ISR replicas received it
> (this
> > > much is documented). This doesn't need to include writing to disk,
> which
> > > will happen asynchronously.
> > >
> > >
> > > >
> > > > Even though you flush periodically based on
> log.flush.interval.messages
> > > and
> > > > log.flush.interval.ms, if the segment file is in the pagecache, the
> > > > consumers will still benefit from that pagecache and OS wouldn't read
> > it
> > > > again from disk.
> > > >
> > > > On Thu, Aug 13, 2015 at 2:54 PM Yuheng Du 
> > > > wrote:
> > > >
> > > > > Hi,
> > > > >
> > > > > As I understand it, kafka brokers will store the incoming messages
> > into
> > > > > pagecache as much as possible and then flush them into disk, right?
> > > > >
> > > > > But in my experiment where 90 producers is publishing data into 6
> > > > brokers,
> > > > > I see that the log directory on disk where broker stores the data
> is
> > > > > constantly increasing (every seconds.) So why this is happening?
> Does
> > > > this
> > > > > has to do with the default "log.flush.interval" setting?
> > > > >
> > > > > I want the broker to write to disk less often when serving some
> > on-line
> > > > > consumers to reduce latency. I tested in my broker the disk write
> > speed
> > > > is
> > > > > around 110MB/s.
> > > > >
> > > > > Thanks for any replies.
> > > > >
> > > >
> > >
> >
>


Re: use page cache as much as possiblee

2015-08-14 Thread Kishore Senji
Thank you Gwen for correcting me. This document (
https://cwiki.apache.org/confluence/display/KAFKA/Kafka+Replication) in
"Writes" section also has specified the same thing as you have mentioned.
One thing is not clear to me as to what happens when the Replicas add the
message to memory but the leader fails before acking to the producer. Later
the leader replica is chosen to be the leader for the partition, it will
advance the HW to its LEO (which has the message). The producer can resend
the same message thinking it failed and there will be a duplicate message.
Is my understanding correct here?

On Thu, Aug 13, 2015 at 10:50 PM, Gwen Shapira  wrote:

> On Thu, Aug 13, 2015 at 4:10 PM, Kishore Senji  wrote:
>
> > Consumers can only fetch data up to the committed offset and the reason
> is
> > reliability and durability on a broker crash (some consumers might get
> the
> > new data and some may not as the data is not yet committed and lost).
> Data
> > will be committed when it is flushed. So if you delay the flushing,
> > consumers won't get those messages until that time.
> >
>
> As far as I know, this is not accurate.
>
> A message is considered committed when all ISR replicas received it (this
> much is documented). This doesn't need to include writing to disk, which
> will happen asynchronously.
>
>
> >
> > Even though you flush periodically based on log.flush.interval.messages
> and
> > log.flush.interval.ms, if the segment file is in the pagecache, the
> > consumers will still benefit from that pagecache and OS wouldn't read it
> > again from disk.
> >
> > On Thu, Aug 13, 2015 at 2:54 PM Yuheng Du 
> > wrote:
> >
> > > Hi,
> > >
> > > As I understand it, kafka brokers will store the incoming messages into
> > > pagecache as much as possible and then flush them into disk, right?
> > >
> > > But in my experiment where 90 producers is publishing data into 6
> > brokers,
> > > I see that the log directory on disk where broker stores the data is
> > > constantly increasing (every seconds.) So why this is happening? Does
> > this
> > > has to do with the default "log.flush.interval" setting?
> > >
> > > I want the broker to write to disk less often when serving some on-line
> > > consumers to reduce latency. I tested in my broker the disk write speed
> > is
> > > around 110MB/s.
> > >
> > > Thanks for any replies.
> > >
> >
>


Re: use page cache as much as possiblee

2015-08-13 Thread Kishore Senji
Consumers can only fetch data up to the committed offset and the reason is
reliability and durability on a broker crash (some consumers might get the
new data and some may not as the data is not yet committed and lost). Data
will be committed when it is flushed. So if you delay the flushing,
consumers won't get those messages until that time.

Even though you flush periodically based on log.flush.interval.messages and
log.flush.interval.ms, if the segment file is in the pagecache, the
consumers will still benefit from that pagecache and OS wouldn't read it
again from disk.

On Thu, Aug 13, 2015 at 2:54 PM Yuheng Du  wrote:

> Hi,
>
> As I understand it, kafka brokers will store the incoming messages into
> pagecache as much as possible and then flush them into disk, right?
>
> But in my experiment where 90 producers is publishing data into 6 brokers,
> I see that the log directory on disk where broker stores the data is
> constantly increasing (every seconds.) So why this is happening? Does this
> has to do with the default "log.flush.interval" setting?
>
> I want the broker to write to disk less often when serving some on-line
> consumers to reduce latency. I tested in my broker the disk write speed is
> around 110MB/s.
>
> Thanks for any replies.
>


Re: Reduce latency

2015-08-13 Thread Kishore Senji
Your batch.size is 8196 and your buffer.memory is 67108864. This means
67108864/8196
~ 8188 batches are in memory ready to the sent. There is only one thread io
thread sending them. I would guess that the io thread (
kafka-producer-network-thread) would be busy. Please check it in visual vm.

In steady state, every batch has to wait for the previous 8187 batches to
be done before it gets a chance to be sent out, but the latency is counted
from the time is added to the queue. This is the reason that you are seeing
very high end-to-end latency.

Have the buffer.memory to be only twice that of the batch.size so that
while one is in flight, you can another batch ready to go (and the
KafkaProducer would block to send more when there is no memory and this way
the batches are not waiting in the queue unnecessarily) . Also may be you
want to increase the batch.size further more, you will get even better
throughput with more or less same latency (as there is no shortage of
events in the test program).

On Thu, Aug 13, 2015 at 1:13 PM Yuheng Du  wrote:

> Yes there is. But if we are using ProducerPerformance test, it's configured
> as giving input when running the test command. Do you write a java program
> to test the latency? Thanks.
>
> On Thu, Aug 13, 2015 at 3:54 PM, Alvaro Gareppe 
> wrote:
>
> > I'm using last one, but not using the ProducerPerformance, I created my
> > own. but I think there is a producer.properties file in config folder in
> > kafka.. is that configuration not for this tester ?
> >
> > On Thu, Aug 13, 2015 at 4:18 PM, Yuheng Du 
> > wrote:
> >
> > > Thank you Alvaro,
> > >
> > > How to use sync producers? I am running the standard
> ProducerPerformance
> > > test from kafka to measure the latency of each message to send from
> > > producer to broker only.
> > > The command is like "bin/kafka-run-class.sh
> > > org.apache.kafka.clients.tools.ProducerPerformance test7 5000 100
> -1
> > > acks=1 bootstrap.servers=esv4-hcl198.grid.linkedin.com:9092
> > > buffer.memory=67108864 batch.size=8196"
> > >
> > > For running producers, where should I put the producer.type=sync
> > > configuration into? The config/server.properties? Also Does this mean
> we
> > > are using batch size of 1? Which version of Kafka are you using?
> > > thanks.
> > >
> > > On Thu, Aug 13, 2015 at 3:01 PM, Alvaro Gareppe 
> > > wrote:
> > >
> > > > Are you measuring latency as time between producer and consumer ?
> > > >
> > > > In that case, the ack shouldn't affect the latency, cause even tough
> > your
> > > > producer is not going to wait for the ack, the consumer will only get
> > the
> > > > message after its commited in the server.
> > > >
> > > > About latency my best result occur with sync producers, but the
> > > throughput
> > > > is much lower in that case.
> > > >
> > > > About not flushing to disk I'm pretty sure that it's not an option in
> > > kafka
> > > > (correct me if I'm wrong)
> > > >
> > > > Regards,
> > > > Alvaro Gareppe
> > > >
> > > > On Thu, Aug 13, 2015 at 12:59 PM, Yuheng Du <
> yuheng.du.h...@gmail.com>
> > > > wrote:
> > > >
> > > > > Also, the latency results show no major difference when using ack=0
> > or
> > > > > ack=1. Why is that?
> > > > >
> > > > > On Thu, Aug 13, 2015 at 11:51 AM, Yuheng Du <
> > yuheng.du.h...@gmail.com>
> > > > > wrote:
> > > > >
> > > > > > I am running an experiment where 92 producers is publishing data
> > > into 6
> > > > > > brokers and 10 consumer are reading online data simultaneously.
> > > > > >
> > > > > > How should I do to reduce the latency? Currently when I run the
> > > > producer
> > > > > > performance test the average latency is around 10s.
> > > > > >
> > > > > > Should I disable log.flush? How to do that? Thanks.
> > > > > >
> > > > >
> > > >
> > > >
> > > >
> > > > --
> > > > Ing. Alvaro Gareppe
> > > > agare...@gmail.com
> > > >
> > >
> >
> >
> >
> > --
> > Ing. Alvaro Gareppe
> > agare...@gmail.com
> >
>