Re: compression-rate-avg returns 0 with compression enabled

2015-12-15 Thread Ismael Juma
Yes, please.

Ismael
On 15 Dec 2015 11:52, "tao xiao"  wrote:

> Hi team,
>
> I found that the producer metric compression-rate-avg always returns 0 even
> with compression.type set to snappy. I drilled down the code and discovered
> that the position of bytebuffer in
> org.apache.kafka.common.record.Compressor is reset to 0 by
> RecordAccumulator.drain() before calling metric updates in sender thread.
>
> Should I file a bug for this?
>


Consumer offset not reset to smallest when auto.offset.reset=smallest

2015-12-15 Thread Buntu Dev
We are noticing into these errors against certain partitions (4 out of 12
partitions) on the consumer and keeps filling up the logs:

 ERROR consumer.ConsumerFetcherThread: [ConsumerFetcherThread-],
Current offset 104088851 for partition [some-topic,5] out of range;
reset offset to 104088851

If I'm reading this correctly, the offset is being reset to what the
consumer is requesting and not to the smallest offset for the
partition. We got the 'auto.offset.reset=smallest' property set and
I'm under the impression that this should reset the offset to the
smallest offset available in the topic. Please let me know what could
be causing this issue, we are using v0.8.2.


Thanks!


0.8.2 high level consumer with one-time-use group.id's?

2015-12-15 Thread James Cheng
When using the 0.8.2 high level consumer, what is the impact of creating many 
one-time use groupIds and checkpointing offsets using those?

I have a use case where upon every boot, I want to consume an entire topic from 
the very beginning, all partitions. We are using the high level consumer for 
convenience in handling leader discovery and rebalancing, but we do not need 
need consumer groups functionality.

We do not need checkpointing of offsets to allow continuing after a restart of 
our application, since we want to re-consume the stream upon restarts. However, 
it appears that if you do *not* checkpoint, then when there is an intermittent 
disconnect, the consumer will restart at the beginning of the topic. I haven't 
yet traced down why this happens.

We were thinking of simply creating a new consumer group id upon every reboot, 
but this seems messy, leaving around a lot of unused consumer group ids. A 
couple questions:

1) What resources does a groupId use, when it is active (a consumer using it) 
and when it is inactive (no consumers using it)?

The only resources I can identify are:
* kafka/zookeeper using it for group membership (only when the group is active)
* disk storage for most recent offset in zookeeper (only the most recent is 
stored per partition)
* disk storage for all offsets in kafka (all checkpoints stored, but there is 
log compaction)
* in-memory storage for most recent offset in kafka, for lookups.

2) Are old non-active groupId's ever deleted?

Thanks,
-James




This email and any attachments may contain confidential and privileged material 
for the sole use of the intended recipient. Any review, copying, or 
distribution of this email (or any attachments) by others is prohibited. If you 
are not the intended recipient, please contact the sender immediately and 
permanently delete this email and any attachments. No employee or agent of TiVo 
Inc. is authorized to conclude any binding agreement on behalf of TiVo Inc. by 
email. Binding agreements with TiVo Inc. may only be made by a signed written 
agreement.


how to programatically monitor Kafka availability

2015-12-15 Thread Hohl, Ken
We want to be able to monitor the ability to send messages to Kafka topics.  We 
want to be aware of the inability to do so before the time we attempt to send a 
message.  What we're looking for is something like a heartbeat.  The reason we 
need this is that in our deployment environment, Kafka and its clients will not 
be co-located.  As such, network issues could cause Kafka to not be available 
to its client.

We've considered using Zookeeper that's already managing the Kafka cluster but 
have not been able to determine exactly how we would use it.

We've also considered requesting a JMX MBean periodically and concluding the 
cluster is not accessible if we can't get the MBean from at least 1 broker.

What is the recommended way of accomplishing what we're trying to do?

Thanks.

Ken Hohl
Cars.com



Re: Consumer offset not reset to smallest when auto.offset.reset=smallest

2015-12-15 Thread Buntu Dev
I also notice that the errors are for the partitions that seem to be under
replicated. We got 3 brokers and one of the brokers never seems to be
chosen as leader for any of the 12 partitions of the topic.

On Tue, Dec 15, 2015 at 11:24 AM, Buntu Dev  wrote:

> We are noticing into these errors against certain partitions (4 out of 12
> partitions) on the consumer and keeps filling up the logs:
>
>  ERROR consumer.ConsumerFetcherThread: [ConsumerFetcherThread-], Current 
> offset 104088851 for partition [some-topic,5] out of range; reset offset to 
> 104088851
>
> If I'm reading this correctly, the offset is being reset to what the consumer 
> is requesting and not to the smallest offset for the partition. We got the 
> 'auto.offset.reset=smallest' property set and I'm under the impression that 
> this should reset the offset to the smallest offset available in the topic. 
> Please let me know what could be causing this issue, we are using v0.8.2.
>
>
> Thanks!
>


Re: Low-latency, high message size variance

2015-12-15 Thread Jason Gustafson
Hey Jens,

The purpose of pause() is to stop fetches for a set of partitions. This
lets you continue calling poll() to send heartbeats. Also note that poll()
generally only blocks for rebalances. In code, something like this is what
I was thinking:

while (running) {
  ConsumerRecords records = consumer.poll(1000);
  if (queue.offer(records))
continue;

  TopicPartition[] assignment = toArray(consumer.assignment());
  consumer.pause(assignment);
  while (!queue.offer(records, heartbeatIntervalMs, TimeUnit.MILLISECONDS))
consumer.poll(0);
  consumer.resume(assignment);
}

The tricky thing is handling rebalances since they might occur in either
call to poll(). In a rebalance, you have to 1) drain the queue, 2) commit
current offsets, and 3) maybe break from the inner poll loop. If the
processing thread is busy when the rebalance is triggered, then you may
have to discard the results when it's finished. It's also a little
difficult communicating completion to the poll loop, which is where the
offset commit needs to take place. I suppose another queue would work,
sigh.

Well, I think you can make that work, but I tend to agree that it's pretty
complicated. Perhaps instead of a queue, you should just submit the
processor to an executor service for each record set returned and await its
completion directly. For example:

while (running) {
  ConsumerRecords records = consumer.poll(1000);
  Future future = executor.submit(new Processor(records));

  TopicPartition[] assignment = toArray(consumer.assignment());
  consumer.pause(assignment);
  while (!complete(future, heartbeatIntervalMs, TimeUnit.MILLLISECONDS))
consumer.poll(0);
  consumer.resume(assignment);
  consumer.commitSync();
}

This seems closer to the spirit of the poll loop, and it makes handling
commits a lot easier. You still have to deal with the rebalance problem,
but at least you don't have to deal with the queue. It's still a little
complex though. Maybe the consumer needs a ping() API which does the same
thing as poll() but doesn't send or return any fetches. That would simplify
things a little more:

while (running) {
  ConsumerRecords records = consumer.poll(1000);
  Future future = executor.submit(new Processor(records));
  while (!complete(future, heartbeatIntervalMs, TimeUnit.MILLLISECONDS))
consumer.ping();
  consumer.commitSync();
}

Anyway, I'll think about it a little more and see if any other approaches
come to mind. I do agree that we should have a way to handle this case
without too much extra work.


-Jason


On Tue, Dec 15, 2015 at 5:09 AM, Jens Rantil  wrote:

> Hi Jason,
>
> Thanks for your response. See replies inline:
>
> On Tuesday, December 15, 2015, Jason Gustafson  wrote:
>
> > Hey Jens,
> >
> > I'm not sure I understand why increasing the session timeout is not an
> > option. Is the issue that there's too much uncertainly about processing
> > time to set an upper bound for each round of the poll loop?
> >
>
> Yes, that's the issue.
>
>
> > One general workaround would be to move the processing into another
> thread.
> > For example, you could add messages to a blocking queue and have the
> > processor poll them. We have a pause() API which can be used to prevent
> > additional fetches when the queue is full. This would allow you to
> continue
> > the poll loop instead of blocking in offer(). The tricky thing would be
> > handling rebalances since you'd need to clear the queue and get the last
> > offset from the processors. Enough people probably will end up doing
> > something like this that we should probably work through an example on
> how
> > to do it properly.
> >
>
> Hm, as far as I've understood the consumer will only send heartbeats to the
> broker when poll() is being called. If I would call pause() on a consumer
> (from a separate thread) I understand poll() will block undefinitely. Will
> the polling consumer still send heartbeats when blocked? Or would a pause
> for too long (while my records are being processed) eventually lead to
> session timeout? If the latter, that would sort of defeat the purpose since
> I am trying to avoid unnecessary rebalancing of consumers when there is
> high pressure on the consumers.
>
> Regarding handling of rebalancing for a queue solution you describe; It
> really sounds very complicated. It's probably doable, but doesn't this sort
> of defeat the purpose of the high level consumer API? I mean, it sounds
> like it should gracefully handle slow consumption of varying size. I might
> be wrong.
>
> Thanks,
> Jens
>
>
> --
> Jens Rantil
> Backend engineer
> Tink AB
>
> Email: jens.ran...@tink.se
> Phone: +46 708 84 18 32
> Web: www.tink.se
>
> Facebook  Linkedin
> <
> http://www.linkedin.com/company/2735919?trk=vsrp_companies_res_photo=VSRPsearchId%3A1057023381369207406670%2CVSRPtargetId%3A2735919%2CVSRPcmpt%3Aprimary
> >
>  Twitter 
>


Re: Kafka Producer 0.9 performance issue with small messages

2015-12-15 Thread Gary Gershon
Dave,

This is a new app being developed to use the Bluemix Messaging Hub 0.9 beta 
service.  No migration involved from 0.8.

With batching working for us (having fixed our own coding bug), we’re quite 
pleased with the numbers we’re seeing.

Our mindset is that in using a cloud service solution, we don’t invest the time 
-- or maintain the skills — to install, operate, tune, apply fixes, or upgrade 
the server.

With batching working, we expect to have ample throughput for our project’s 
needs.  Without batching, it would have been marginal to keep up with the flow 
rate with messages between 50 and 150 bytes,  

I did a single timing this morning using my laptop and then another by 
deploying the identical Docker image to Bluemix Containers.

Of course, having the client in the same cloud facility as the service was 
faster than our local WiFi configuration. :)

For 100,000 77 byte text records, linger.ms=10:

Laptop Producer: 17,437 ms
Laptop Consumer: 3,146 ms

Bluemix Producer:   6,659 ms
Bluemix Consumer: 2,184 ms

Again, all the disclaimers.  One run each.  No tuning of client.  And Bluemix 
service is in beta, so IBM’s configuration would be considered at best to be an 
untuned dev/test environment.  Unable to assess multi-tenancy impact at this 
point.

Dave, this is about 15K msg/sec with a single producer instance in Bluemix.  
Our design needs ~1K msg/sec for message ingestion (8GB/day).  Headroom looks 
good in our case.

Next design challenge is cloud harvesting and longer-term persistence from the 
Messaging Hub service for use with Spark (per Lambda Architecture).

Gary


> On Dec 14, 2015, at 12:50 PM, Dave Ariens  wrote:
> 
> Gary,
> 
> I was asking last week on the dev list regarding performance in 0.9.x and how 
> best to achieve optimal message throughput and find your results rather 
> interesting.
> 
> Is producing 7142 msg/sec a fairly typical rate for your test environment (I 
> realize you're  just using your laptop, though).  Are you able to share your 
> peak message rates in your target/production environment under peak load?
> 
> Also--have you noticed any increase/decrease moving from 0.8.x to 0.9.x (if 
> applicable)?
> 
> I'm attempting to compare producing and consuming rates using Kafka (0.8.x 
> and 0.9.x) and our own in-house low overhead library and would be interested 
> in learning how others are faring.
> 




compression-rate-avg returns 0 with compression enabled

2015-12-15 Thread tao xiao
Hi team,

I found that the producer metric compression-rate-avg always returns 0 even
with compression.type set to snappy. I drilled down the code and discovered
that the position of bytebuffer in
org.apache.kafka.common.record.Compressor is reset to 0 by
RecordAccumulator.drain() before calling metric updates in sender thread.

Should I file a bug for this?


Re: Per Topic Metrics

2015-12-15 Thread Wollert, Fabian
i found the problem by myself. since i don't know nothing about JMX and the
Beans, i was querying the bean wrong. i still don't know how to query the
specific topic metrics in jconsole. but it works for me to query (with
jolokia) instead of:

kafka.server:name=BytesInPerSec,type=BrokerTopicMetrics

to query:

kafka.server:name=BytesInPerSec,type=BrokerTopicMetrics, topic=

still, this is not like it says in the documentation. maybe worth reworking?

Cheers

Fabian

2015-12-11 21:28 GMT+01:00 Otis Gospodnetić :

> Hi Fabian,
>
> Hmmm, there are certainly per-topic metrics.
> You can see it in this SPM demo for Kafka monitoring:
> * click on https://apps.sematext.com/demo to get into the demo account
> * go into any SPM app with "kafka" in the name, like
> https://apps.sematext.com/spm-reports/mainPage.do?selectedApplication=4293
> * click on "Kafka" on the left side
>
> You should see per-topic metrics there. For example here:
>
> https://apps.sematext.com/spm-reports/mainPage.do?selectedApplication=4293=kafkaBrokerTopicBytesMassagesFailedReportPage=1449865640781=false
>
> So you should be able to find all those things in JMX, too.  Note that some
> metrics are in Broker, some in Producer, and some in Consumer JMX.
>
> Otis
> --
> Monitoring - Log Management - Alerting - Anomaly Detection
> Solr & Elasticsearch Consulting Support Training - http://sematext.com/
>
>
> On Thu, Dec 10, 2015 at 10:14 AM, Wollert, Fabian <
> fabian.woll...@zalando.de
> > wrote:
>
> > Hi everyone, when i browse via jconsole through a freshly started kafka
> > 0.8.2.1 JVM i can only find metrics for the whole broker (like
> > MessagesInPerSec in
> > kafka.server:name=MessagesInPerSec,type=BrokerTopicMetrics") but not
> > per topic. what am i missing, e.g. where can i find those metrics?
> >
> > the documentation (
> >
> >
> https://cwiki.apache.org/confluence/display/KAFKA/Operations#Operations-ServerStats
> > )
> > says kafka:type=kafka.BrokerAllTopicStat.[topic] but this is not
> exisiting
> > for me, i guess this is related to 0.7??
> >
> > Cheers
> >
> > --
> > *Fabian Wollert*
> > Business Intelligence
> >
> > *Zalando SE*
> >
> > *POSTAL ADDRESS*
> > Zalando SE
> > 11501 Berlin
> >
> > *OFFICE*
> > Zalando SE
> > Mollstraße 1
> > 10178 Berlin
> > Germany
> >
> > Phone: +49 30 20968 1819
> > Fax:   +49 30 27594 693
> > E-Mail: fabian.woll...@zalando.de
> > Web: www.zalando.de
> > Jobs: jobs.zalando.de
> >
> > Zalando SE, Tamara-Danz-Straße 1, 10243 Berlin
> > Company registration: Amtsgericht Charlottenburg, HRB 158855 B
> > Tax ID: 29/560/00596 * VAT registration number: DE 260543043
> > Management Board: Robert Gentz, David Schneider, Rubin Ritter
> > Chairperson of the Supervisory Board: Cristina Stenbeck
> > Registered office: Berlinn
> >
>



-- 
*Fabian Wollert*
Business Intelligence

*Zalando SE*

*POSTAL ADDRESS*
Zalando SE
11501 Berlin

*OFFICE*
Zalando SE
Mollstraße 1
10178 Berlin
Germany

Phone: +49 30 20968 1819
Fax:   +49 30 27594 693
E-Mail: fabian.woll...@zalando.de
Web: www.zalando.de
Jobs: jobs.zalando.de

Zalando SE, Tamara-Danz-Straße 1, 10243 Berlin
Company registration: Amtsgericht Charlottenburg, HRB 158855 B
Tax ID: 29/560/00596 * VAT registration number: DE 260543043
Management Board: Robert Gentz, David Schneider, Rubin Ritter
Chairperson of the Supervisory Board: Cristina Stenbeck
Registered office: Berlinn


Re: Kafka 0.9 consumer API question

2015-12-15 Thread Jason Gustafson
Hey Rajiv,

I agree the Set/List inconsistency is a little unfortunate (another
annoying one is pause() which uses a vararg). I think we should probably
add the following variants:

assign(Collection)
subscribe(Collection)
pause(Collection)

I can open a JIRA to fix this. As for returning the unmodifiable set, I can
see your point, but I think it's a little dangerous for user code to depend
on being able to modify a collection returned from the API. Making it
immutable reduces the coupling with user code and gives us more freedom in
the future (not that we have any intention of changing the set type, but we
could). I think the way I might try to implement your use case would be to
maintain the assignment set yourself. You can make changes to that set and
always pass it to assign(), which would avoid the need to use assignment().
Also, I probably wouldn't be overly concerned about the copying overhead
unless profiling shows that it is actually a problem. Are your partition
assignments generally very large?

-Jason


On Tue, Dec 15, 2015 at 1:32 PM, Rajiv Kurian  wrote:

> We are trying to use the Kafka 0.9 consumer API to poll specific
> partitions. We consume partitions based on our own logic instead of
> delegating that to Kafka. One of our use cases is handling a change in the
> partitions that we consume. This means that sometimes we need to consume
> additional partitions and other times we need to stop consuming (not pause
> but stop entirely) some of the partitions that we are currently polling.
>
> The semantics of the assign() call at
>
> http://kafka.apache.org/090/javadoc/index.html?org/apache/kafka/clients/consumer/KafkaConsumer.html
> is that we need to provide the entire list of subscriptions. So when we
> want to add or remove partitions we call the assignment() method to get the
> existing set of TopicPartitions being polled, and then modify this set and
> pass it back to the assign() call. However it seems weird that the assign()
> call takes a List whereas the assignment call returns a
> Set. Further the Set returned by the method is an
> unmodifiable set which means to change this set we need to create a new
> List/Set from it and then modify the new collection. Looking at the code
> for the assignment() method further shows that a copy of the underlying set
> is made and then wrapped in an unmodifiable set. The wrapping seems
> unnecessary given that a copy is already being made. Excerpt here:
>
> public Set assignment() {
>
> acquire();
>
> try {
>
> return Collections.unmodifiableSet(new HashSet<>(this.
> subscriptions.assignedPartitions()));
>
> } finally {
>
> release();
>
> }
>
> }
>
> Ideally the API would take and return a Set instead of taking in a List and
> returning a Set. Further given that the Set returned is a copy of the
> existing assignments, wrapping it in an unmodifiable set seems overkill
> which requires the user of the API to make yet another copy just to modify
> what is already a copy.
>
> Thanks,
> Rajiv
>


Kafka 0.9 consumer API question

2015-12-15 Thread Rajiv Kurian
We are trying to use the Kafka 0.9 consumer API to poll specific
partitions. We consume partitions based on our own logic instead of
delegating that to Kafka. One of our use cases is handling a change in the
partitions that we consume. This means that sometimes we need to consume
additional partitions and other times we need to stop consuming (not pause
but stop entirely) some of the partitions that we are currently polling.

The semantics of the assign() call at
http://kafka.apache.org/090/javadoc/index.html?org/apache/kafka/clients/consumer/KafkaConsumer.html
is that we need to provide the entire list of subscriptions. So when we
want to add or remove partitions we call the assignment() method to get the
existing set of TopicPartitions being polled, and then modify this set and
pass it back to the assign() call. However it seems weird that the assign()
call takes a List whereas the assignment call returns a
Set. Further the Set returned by the method is an
unmodifiable set which means to change this set we need to create a new
List/Set from it and then modify the new collection. Looking at the code
for the assignment() method further shows that a copy of the underlying set
is made and then wrapped in an unmodifiable set. The wrapping seems
unnecessary given that a copy is already being made. Excerpt here:

public Set assignment() {

acquire();

try {

return Collections.unmodifiableSet(new HashSet<>(this.
subscriptions.assignedPartitions()));

} finally {

release();

}

}

Ideally the API would take and return a Set instead of taking in a List and
returning a Set. Further given that the Set returned is a copy of the
existing assignments, wrapping it in an unmodifiable set seems overkill
which requires the user of the API to make yet another copy just to modify
what is already a copy.

Thanks,
Rajiv


Re: Fallout from upgrading to kafka 0.9 from 0.8.2.3

2015-12-15 Thread Rajiv Kurian
We had to revert to 0.8.3 because three of our topics seem to have gotten
corrupted during the upgrade. As soon as we did the upgrade producers to
the three topics I mentioned stopped being able to do writes. The clients
complained (occasionally) about leader not found exceptions. We restarted
our clients and brokers but that didn't seem to help. Actually even after
reverting to 0.8.3 these three topics were broken. To fix it we had to stop
all clients, delete the topics, create them again and then restart the
clients.

I realize this is not a lot of info. I couldn't wait to get more debug info
because the cluster was actually being used. Has any one run into something
like this? Are there any known issues with old consumers/producers. The
topics that got busted had clients writing to them using the old Java
wrapper over the Scala producer.

Here are the steps I took to upgrade.

For each broker:

1. Stop the broker.
2. Restart with the 0.9 broker running with
inter.broker.protocol.version=0.8.2.X
3. Wait for under replicated partitions to go down to 0.
4. Go to step 1.
Once all the brokers were running the 0.9 code with
inter.broker.protocol.version=0.8.2.X we restarted them one by one with
inter.broker.protocol.version=0.9.0.0

When reverting I did the following.

For each broker.

1. Stop the broker.
2. Restart with the 0.9 broker running with
inter.broker.protocol.version=0.8.2.X
3. Wait for under replicated partitions to go down to 0.
4. Go to step 1.

Once all the brokers were running 0.9 code with
inter.broker.protocol.version=0.8.2.X  I restarted them one by one with the
0.8.2.3 broker code. This however like I mentioned did not fix the three
broken topics.


On Mon, Dec 14, 2015 at 3:13 PM, Rajiv Kurian  wrote:

> Now that it has been a bit longer, the spikes I was seeing are gone but
> the CPU and network in/out on the three brokers that were showing the
> spikes are still much higher than before the upgrade. Their CPUs have
> increased from around 1-2% to 12-20%. The network in on the same brokers
> has gone up from under 2 Mb/sec to 19-33 Mb/sec. The network out has gone
> up from under 2 Mb/sec to 29-42 Mb/sec. I don't see a corresponding
> increase in kafka messages in per second or kafka bytes in per second JMX
> metrics.
>
> Thanks,
> Rajiv
>


Re: Kafka 0.9 consumer API question

2015-12-15 Thread Jason Gustafson
Hey Rajiv,

My point was that you could maintain the assignment set yourself in a
field, which would eliminate the need to copy the set returned by
assignment(). Then it's just one copy to convert it to a list, and we can
fix this by adding the assign() variant I suggested above.

By the way, here's a link to the JIRA I created:
https://issues.apache.org/jira/browse/KAFKA-2991.

-Jason

On Tue, Dec 15, 2015 at 2:53 PM, Rajiv Kurian  wrote:

> Hi Jason,
>
> The copying is not a problem in terms of performance. It's just annoying to
> write the extra code. My point with the copy is that since the client is
> already making a copy when it returns the set to me, why would it matter if
> I modify the copy. Creating an unmodifiable set on top of a copy seems
> redundant. It would be easiest for us as users to do something like this:
>
> final Set partitions = consumer.assignment();  // This
> already returns a copy of the underlying assignment, thus ensuring that the
> internal data structures are protected.
> partitions.add(myNewTopicPartition);  // This is fine to modify since
> consumer.assignment() returns a copy.
> partitions.remove(topicPartitionToBeRemoved);
> consumer.assign(partitions);
>
> Instead we have to do something like this right now.
>
> final Set partitions = consumer.assignment();  // This
> returns a copy of the underlying assignment wrapped in an UnmodifiableSet
> which seems redundant.
> final Set yetAnotherCopy = new HashSet<>(partitions);  //
> We need this copy since consumer.assignment() is unmodifiable, even though
> it is a copy.
> yetAnotherCopy.add(myNewTopicPartition);
> yetAnotherCopy.remove(topicPartitionToBeRemoved);
> List wayTooManyCopies = new ArrayList<>(yetAnotherCopy);
> consumer.assign(wayTooManyCopies);
>
> Thanks,
> Rajiv
>
>
> On Tue, Dec 15, 2015 at 2:35 PM, Jason Gustafson 
> wrote:
>
> > Hey Rajiv,
> >
> > I agree the Set/List inconsistency is a little unfortunate (another
> > annoying one is pause() which uses a vararg). I think we should probably
> > add the following variants:
> >
> > assign(Collection)
> > subscribe(Collection)
> > pause(Collection)
> >
> > I can open a JIRA to fix this. As for returning the unmodifiable set, I
> can
> > see your point, but I think it's a little dangerous for user code to
> depend
> > on being able to modify a collection returned from the API. Making it
> > immutable reduces the coupling with user code and gives us more freedom
> in
> > the future (not that we have any intention of changing the set type, but
> we
> > could). I think the way I might try to implement your use case would be
> to
> > maintain the assignment set yourself. You can make changes to that set
> and
> > always pass it to assign(), which would avoid the need to use
> assignment().
> > Also, I probably wouldn't be overly concerned about the copying overhead
> > unless profiling shows that it is actually a problem. Are your partition
> > assignments generally very large?
> >
> > -Jason
> >
> >
> > On Tue, Dec 15, 2015 at 1:32 PM, Rajiv Kurian 
> wrote:
> >
> > > We are trying to use the Kafka 0.9 consumer API to poll specific
> > > partitions. We consume partitions based on our own logic instead of
> > > delegating that to Kafka. One of our use cases is handling a change in
> > the
> > > partitions that we consume. This means that sometimes we need to
> consume
> > > additional partitions and other times we need to stop consuming (not
> > pause
> > > but stop entirely) some of the partitions that we are currently
> polling.
> > >
> > > The semantics of the assign() call at
> > >
> > >
> >
> http://kafka.apache.org/090/javadoc/index.html?org/apache/kafka/clients/consumer/KafkaConsumer.html
> > > is that we need to provide the entire list of subscriptions. So when we
> > > want to add or remove partitions we call the assignment() method to get
> > the
> > > existing set of TopicPartitions being polled, and then modify this set
> > and
> > > pass it back to the assign() call. However it seems weird that the
> > assign()
> > > call takes a List whereas the assignment call returns
> a
> > > Set. Further the Set returned by the method is an
> > > unmodifiable set which means to change this set we need to create a new
> > > List/Set from it and then modify the new collection. Looking at the
> code
> > > for the assignment() method further shows that a copy of the underlying
> > set
> > > is made and then wrapped in an unmodifiable set. The wrapping seems
> > > unnecessary given that a copy is already being made. Excerpt here:
> > >
> > > public Set assignment() {
> > >
> > > acquire();
> > >
> > > try {
> > >
> > > return Collections.unmodifiableSet(new HashSet<>(this.
> > > subscriptions.assignedPartitions()));
> > >
> > > } finally {
> > >
> > > release();
> > >
> > > }
> > >
> > > }
> > >
> > > Ideally the API would take and return a Set 

Re: Fallout from upgrading to kafka 0.9 from 0.8.2.3

2015-12-15 Thread Lance Laursen
Hey Rajiv,

Are you using snappy compression?

On Tue, Dec 15, 2015 at 12:52 PM, Rajiv Kurian  wrote:

> We had to revert to 0.8.3 because three of our topics seem to have gotten
> corrupted during the upgrade. As soon as we did the upgrade producers to
> the three topics I mentioned stopped being able to do writes. The clients
> complained (occasionally) about leader not found exceptions. We restarted
> our clients and brokers but that didn't seem to help. Actually even after
> reverting to 0.8.3 these three topics were broken. To fix it we had to stop
> all clients, delete the topics, create them again and then restart the
> clients.
>
> I realize this is not a lot of info. I couldn't wait to get more debug info
> because the cluster was actually being used. Has any one run into something
> like this? Are there any known issues with old consumers/producers. The
> topics that got busted had clients writing to them using the old Java
> wrapper over the Scala producer.
>
> Here are the steps I took to upgrade.
>
> For each broker:
>
> 1. Stop the broker.
> 2. Restart with the 0.9 broker running with
> inter.broker.protocol.version=0.8.2.X
> 3. Wait for under replicated partitions to go down to 0.
> 4. Go to step 1.
> Once all the brokers were running the 0.9 code with
> inter.broker.protocol.version=0.8.2.X we restarted them one by one with
> inter.broker.protocol.version=0.9.0.0
>
> When reverting I did the following.
>
> For each broker.
>
> 1. Stop the broker.
> 2. Restart with the 0.9 broker running with
> inter.broker.protocol.version=0.8.2.X
> 3. Wait for under replicated partitions to go down to 0.
> 4. Go to step 1.
>
> Once all the brokers were running 0.9 code with
> inter.broker.protocol.version=0.8.2.X  I restarted them one by one with the
> 0.8.2.3 broker code. This however like I mentioned did not fix the three
> broken topics.
>
>
> On Mon, Dec 14, 2015 at 3:13 PM, Rajiv Kurian  wrote:
>
> > Now that it has been a bit longer, the spikes I was seeing are gone but
> > the CPU and network in/out on the three brokers that were showing the
> > spikes are still much higher than before the upgrade. Their CPUs have
> > increased from around 1-2% to 12-20%. The network in on the same brokers
> > has gone up from under 2 Mb/sec to 19-33 Mb/sec. The network out has gone
> > up from under 2 Mb/sec to 29-42 Mb/sec. I don't see a corresponding
> > increase in kafka messages in per second or kafka bytes in per second JMX
> > metrics.
> >
> > Thanks,
> > Rajiv
> >
>


Re: Kafka 0.9 consumer API question

2015-12-15 Thread Rajiv Kurian
Hi Jason,

The copying is not a problem in terms of performance. It's just annoying to
write the extra code. My point with the copy is that since the client is
already making a copy when it returns the set to me, why would it matter if
I modify the copy. Creating an unmodifiable set on top of a copy seems
redundant. It would be easiest for us as users to do something like this:

final Set partitions = consumer.assignment();  // This
already returns a copy of the underlying assignment, thus ensuring that the
internal data structures are protected.
partitions.add(myNewTopicPartition);  // This is fine to modify since
consumer.assignment() returns a copy.
partitions.remove(topicPartitionToBeRemoved);
consumer.assign(partitions);

Instead we have to do something like this right now.

final Set partitions = consumer.assignment();  // This
returns a copy of the underlying assignment wrapped in an UnmodifiableSet
which seems redundant.
final Set yetAnotherCopy = new HashSet<>(partitions);  //
We need this copy since consumer.assignment() is unmodifiable, even though
it is a copy.
yetAnotherCopy.add(myNewTopicPartition);
yetAnotherCopy.remove(topicPartitionToBeRemoved);
List wayTooManyCopies = new ArrayList<>(yetAnotherCopy);
consumer.assign(wayTooManyCopies);

Thanks,
Rajiv


On Tue, Dec 15, 2015 at 2:35 PM, Jason Gustafson  wrote:

> Hey Rajiv,
>
> I agree the Set/List inconsistency is a little unfortunate (another
> annoying one is pause() which uses a vararg). I think we should probably
> add the following variants:
>
> assign(Collection)
> subscribe(Collection)
> pause(Collection)
>
> I can open a JIRA to fix this. As for returning the unmodifiable set, I can
> see your point, but I think it's a little dangerous for user code to depend
> on being able to modify a collection returned from the API. Making it
> immutable reduces the coupling with user code and gives us more freedom in
> the future (not that we have any intention of changing the set type, but we
> could). I think the way I might try to implement your use case would be to
> maintain the assignment set yourself. You can make changes to that set and
> always pass it to assign(), which would avoid the need to use assignment().
> Also, I probably wouldn't be overly concerned about the copying overhead
> unless profiling shows that it is actually a problem. Are your partition
> assignments generally very large?
>
> -Jason
>
>
> On Tue, Dec 15, 2015 at 1:32 PM, Rajiv Kurian  wrote:
>
> > We are trying to use the Kafka 0.9 consumer API to poll specific
> > partitions. We consume partitions based on our own logic instead of
> > delegating that to Kafka. One of our use cases is handling a change in
> the
> > partitions that we consume. This means that sometimes we need to consume
> > additional partitions and other times we need to stop consuming (not
> pause
> > but stop entirely) some of the partitions that we are currently polling.
> >
> > The semantics of the assign() call at
> >
> >
> http://kafka.apache.org/090/javadoc/index.html?org/apache/kafka/clients/consumer/KafkaConsumer.html
> > is that we need to provide the entire list of subscriptions. So when we
> > want to add or remove partitions we call the assignment() method to get
> the
> > existing set of TopicPartitions being polled, and then modify this set
> and
> > pass it back to the assign() call. However it seems weird that the
> assign()
> > call takes a List whereas the assignment call returns a
> > Set. Further the Set returned by the method is an
> > unmodifiable set which means to change this set we need to create a new
> > List/Set from it and then modify the new collection. Looking at the code
> > for the assignment() method further shows that a copy of the underlying
> set
> > is made and then wrapped in an unmodifiable set. The wrapping seems
> > unnecessary given that a copy is already being made. Excerpt here:
> >
> > public Set assignment() {
> >
> > acquire();
> >
> > try {
> >
> > return Collections.unmodifiableSet(new HashSet<>(this.
> > subscriptions.assignedPartitions()));
> >
> > } finally {
> >
> > release();
> >
> > }
> >
> > }
> >
> > Ideally the API would take and return a Set instead of taking in a List
> and
> > returning a Set. Further given that the Set returned is a copy of the
> > existing assignments, wrapping it in an unmodifiable set seems overkill
> > which requires the user of the API to make yet another copy just to
> modify
> > what is already a copy.
> >
> > Thanks,
> > Rajiv
> >
>


Re: Kafka 0.9 consumer API question

2015-12-15 Thread Rajiv Kurian
Right I could do that. Thanks for creating the JIRA!


On Tue, Dec 15, 2015 at 3:01 PM, Jason Gustafson  wrote:

> Hey Rajiv,
>
> My point was that you could maintain the assignment set yourself in a
> field, which would eliminate the need to copy the set returned by
> assignment(). Then it's just one copy to convert it to a list, and we can
> fix this by adding the assign() variant I suggested above.
>
> By the way, here's a link to the JIRA I created:
> https://issues.apache.org/jira/browse/KAFKA-2991.
>
> -Jason
>
> On Tue, Dec 15, 2015 at 2:53 PM, Rajiv Kurian  wrote:
>
> > Hi Jason,
> >
> > The copying is not a problem in terms of performance. It's just annoying
> to
> > write the extra code. My point with the copy is that since the client is
> > already making a copy when it returns the set to me, why would it matter
> if
> > I modify the copy. Creating an unmodifiable set on top of a copy seems
> > redundant. It would be easiest for us as users to do something like this:
> >
> > final Set partitions = consumer.assignment();  // This
> > already returns a copy of the underlying assignment, thus ensuring that
> the
> > internal data structures are protected.
> > partitions.add(myNewTopicPartition);  // This is fine to modify since
> > consumer.assignment() returns a copy.
> > partitions.remove(topicPartitionToBeRemoved);
> > consumer.assign(partitions);
> >
> > Instead we have to do something like this right now.
> >
> > final Set partitions = consumer.assignment();  // This
> > returns a copy of the underlying assignment wrapped in an UnmodifiableSet
> > which seems redundant.
> > final Set yetAnotherCopy = new HashSet<>(partitions);  //
> > We need this copy since consumer.assignment() is unmodifiable, even
> though
> > it is a copy.
> > yetAnotherCopy.add(myNewTopicPartition);
> > yetAnotherCopy.remove(topicPartitionToBeRemoved);
> > List wayTooManyCopies = new ArrayList<>(yetAnotherCopy);
> > consumer.assign(wayTooManyCopies);
> >
> > Thanks,
> > Rajiv
> >
> >
> > On Tue, Dec 15, 2015 at 2:35 PM, Jason Gustafson 
> > wrote:
> >
> > > Hey Rajiv,
> > >
> > > I agree the Set/List inconsistency is a little unfortunate (another
> > > annoying one is pause() which uses a vararg). I think we should
> probably
> > > add the following variants:
> > >
> > > assign(Collection)
> > > subscribe(Collection)
> > > pause(Collection)
> > >
> > > I can open a JIRA to fix this. As for returning the unmodifiable set, I
> > can
> > > see your point, but I think it's a little dangerous for user code to
> > depend
> > > on being able to modify a collection returned from the API. Making it
> > > immutable reduces the coupling with user code and gives us more freedom
> > in
> > > the future (not that we have any intention of changing the set type,
> but
> > we
> > > could). I think the way I might try to implement your use case would be
> > to
> > > maintain the assignment set yourself. You can make changes to that set
> > and
> > > always pass it to assign(), which would avoid the need to use
> > assignment().
> > > Also, I probably wouldn't be overly concerned about the copying
> overhead
> > > unless profiling shows that it is actually a problem. Are your
> partition
> > > assignments generally very large?
> > >
> > > -Jason
> > >
> > >
> > > On Tue, Dec 15, 2015 at 1:32 PM, Rajiv Kurian 
> > wrote:
> > >
> > > > We are trying to use the Kafka 0.9 consumer API to poll specific
> > > > partitions. We consume partitions based on our own logic instead of
> > > > delegating that to Kafka. One of our use cases is handling a change
> in
> > > the
> > > > partitions that we consume. This means that sometimes we need to
> > consume
> > > > additional partitions and other times we need to stop consuming (not
> > > pause
> > > > but stop entirely) some of the partitions that we are currently
> > polling.
> > > >
> > > > The semantics of the assign() call at
> > > >
> > > >
> > >
> >
> http://kafka.apache.org/090/javadoc/index.html?org/apache/kafka/clients/consumer/KafkaConsumer.html
> > > > is that we need to provide the entire list of subscriptions. So when
> we
> > > > want to add or remove partitions we call the assignment() method to
> get
> > > the
> > > > existing set of TopicPartitions being polled, and then modify this
> set
> > > and
> > > > pass it back to the assign() call. However it seems weird that the
> > > assign()
> > > > call takes a List whereas the assignment call
> returns
> > a
> > > > Set. Further the Set returned by the method is an
> > > > unmodifiable set which means to change this set we need to create a
> new
> > > > List/Set from it and then modify the new collection. Looking at the
> > code
> > > > for the assignment() method further shows that a copy of the
> underlying
> > > set
> > > > is made and then wrapped in an unmodifiable set. The wrapping seems
> > > > unnecessary given that a copy is already being made. 

Re: Low-latency, high message size variance

2015-12-15 Thread Jason Gustafson
I was talking with Jay this afternoon about this use case. The tricky thing
about adding a ping() or heartbeat() API is that you have to deal with the
potential for rebalancing. This means either allowing it to block while a
rebalance completes or having it raise an exception indicating that a
rebalance is needed. In code, the latter might look like this:

while (running) {
  ConsumerRecords records = consumer.poll(1000);
  try {
for (ConsumerRecord record : records) {
  process(record);
  consumer.heartbeat();
}
  } catch (RebalanceException e){
continue;
  }
}

Unfortunately, this wouldn't work with auto-commit since it would tend to
break message processing early which would let the committed position get
ahead of the last offset processed. The alternative blocking approach
wouldn't be any better in this regard. Overall, it seems like this might
introduce a bigger problem than it solves.

Perhaps the simpler solution is to provide a way to set the maximum number
of messages returned. This could either be a new configuration option or a
second argument in poll, but it would let you handle messages one-by-one if
you needed to. You'd then be able to set the session timeout according to
the expected time to handle a single message. It'd be a bit more work to
implement this, but if the use case is common enough, it might be
worthwhile.

-Jason

On Tue, Dec 15, 2015 at 10:31 AM, Jason Gustafson 
wrote:

> Hey Jens,
>
> The purpose of pause() is to stop fetches for a set of partitions. This
> lets you continue calling poll() to send heartbeats. Also note that poll()
> generally only blocks for rebalances. In code, something like this is what
> I was thinking:
>
> while (running) {
>   ConsumerRecords records = consumer.poll(1000);
>   if (queue.offer(records))
> continue;
>
>   TopicPartition[] assignment = toArray(consumer.assignment());
>   consumer.pause(assignment);
>   while (!queue.offer(records, heartbeatIntervalMs, TimeUnit.MILLISECONDS))
> consumer.poll(0);
>   consumer.resume(assignment);
> }
>
> The tricky thing is handling rebalances since they might occur in either
> call to poll(). In a rebalance, you have to 1) drain the queue, 2) commit
> current offsets, and 3) maybe break from the inner poll loop. If the
> processing thread is busy when the rebalance is triggered, then you may
> have to discard the results when it's finished. It's also a little
> difficult communicating completion to the poll loop, which is where the
> offset commit needs to take place. I suppose another queue would work,
> sigh.
>
> Well, I think you can make that work, but I tend to agree that it's pretty
> complicated. Perhaps instead of a queue, you should just submit the
> processor to an executor service for each record set returned and await its
> completion directly. For example:
>
> while (running) {
>   ConsumerRecords records = consumer.poll(1000);
>   Future future = executor.submit(new Processor(records));
>
>   TopicPartition[] assignment = toArray(consumer.assignment());
>   consumer.pause(assignment);
>   while (!complete(future, heartbeatIntervalMs, TimeUnit.MILLLISECONDS))
> consumer.poll(0);
>   consumer.resume(assignment);
>   consumer.commitSync();
> }
>
> This seems closer to the spirit of the poll loop, and it makes handling
> commits a lot easier. You still have to deal with the rebalance problem,
> but at least you don't have to deal with the queue. It's still a little
> complex though. Maybe the consumer needs a ping() API which does the same
> thing as poll() but doesn't send or return any fetches. That would simplify
> things a little more:
>
> while (running) {
>   ConsumerRecords records = consumer.poll(1000);
>   Future future = executor.submit(new Processor(records));
>   while (!complete(future, heartbeatIntervalMs, TimeUnit.MILLLISECONDS))
> consumer.ping();
>   consumer.commitSync();
> }
>
> Anyway, I'll think about it a little more and see if any other approaches
> come to mind. I do agree that we should have a way to handle this case
> without too much extra work.
>
>
> -Jason
>
>
> On Tue, Dec 15, 2015 at 5:09 AM, Jens Rantil  wrote:
>
>> Hi Jason,
>>
>> Thanks for your response. See replies inline:
>>
>> On Tuesday, December 15, 2015, Jason Gustafson 
>> wrote:
>>
>> > Hey Jens,
>> >
>> > I'm not sure I understand why increasing the session timeout is not an
>> > option. Is the issue that there's too much uncertainly about processing
>> > time to set an upper bound for each round of the poll loop?
>> >
>>
>> Yes, that's the issue.
>>
>>
>> > One general workaround would be to move the processing into another
>> thread.
>> > For example, you could add messages to a blocking queue and have the
>> > processor poll them. We have a pause() API which can be used to prevent
>> > additional fetches when the queue is full. This would allow you to
>> continue
>> > the poll loop 

Re: Low-latency, high message size variance

2015-12-15 Thread Jens Rantil
Hi Jason,

Thanks for your response. See replies inline:

On Tuesday, December 15, 2015, Jason Gustafson  wrote:

> Hey Jens,
>
> I'm not sure I understand why increasing the session timeout is not an
> option. Is the issue that there's too much uncertainly about processing
> time to set an upper bound for each round of the poll loop?
>

Yes, that's the issue.


> One general workaround would be to move the processing into another thread.
> For example, you could add messages to a blocking queue and have the
> processor poll them. We have a pause() API which can be used to prevent
> additional fetches when the queue is full. This would allow you to continue
> the poll loop instead of blocking in offer(). The tricky thing would be
> handling rebalances since you'd need to clear the queue and get the last
> offset from the processors. Enough people probably will end up doing
> something like this that we should probably work through an example on how
> to do it properly.
>

Hm, as far as I've understood the consumer will only send heartbeats to the
broker when poll() is being called. If I would call pause() on a consumer
(from a separate thread) I understand poll() will block undefinitely. Will
the polling consumer still send heartbeats when blocked? Or would a pause
for too long (while my records are being processed) eventually lead to
session timeout? If the latter, that would sort of defeat the purpose since
I am trying to avoid unnecessary rebalancing of consumers when there is
high pressure on the consumers.

Regarding handling of rebalancing for a queue solution you describe; It
really sounds very complicated. It's probably doable, but doesn't this sort
of defeat the purpose of the high level consumer API? I mean, it sounds
like it should gracefully handle slow consumption of varying size. I might
be wrong.

Thanks,
Jens


-- 
Jens Rantil
Backend engineer
Tink AB

Email: jens.ran...@tink.se
Phone: +46 708 84 18 32
Web: www.tink.se

Facebook  Linkedin

 Twitter 


Re: compression-rate-avg returns 0 with compression enabled

2015-12-15 Thread tao xiao
created https://issues.apache.org/jira/browse/KAFKA-2993. I will submit a
patch for this

On Wed, 16 Dec 2015 at 00:53 Ismael Juma  wrote:

> Yes, please.
>
> Ismael
> On 15 Dec 2015 11:52, "tao xiao"  wrote:
>
> > Hi team,
> >
> > I found that the producer metric compression-rate-avg always returns 0
> even
> > with compression.type set to snappy. I drilled down the code and
> discovered
> > that the position of bytebuffer in
> > org.apache.kafka.common.record.Compressor is reset to 0 by
> > RecordAccumulator.drain() before calling metric updates in sender thread.
> >
> > Should I file a bug for this?
> >
>