[ 
https://issues.apache.org/jira/browse/STORM-2914?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16347294#comment-16347294
 ] 

Stig Rohde Døssing commented on STORM-2914:
-------------------------------------------

[~hmclouro]
 I think my reasoning for not using/allowing enable.auto.commit basically 
matches the reasoning for why we switched off of the subscribe API; It fits the 
spout poorly, it increases complexity for no reason, and we can do better 
because we're likely not the use case that setting was designed for. I'll add 
that there's no reason for us to use a feature in the consumer just because 
it's there, we should have a reason to want to support it.

My guess for why enable.auto.commit exists is because it is useful for simple 
processors that do something like
{code:java}
while (true) {
  records = kafkaConsumer.poll();
  processRecords(records);
}
{code}
The reason the setting is a great idea here, is that the (async) commit for the 
previous iteration's records happens during poll (assuming the 
auto.commit.interval.ms has expired, otherwise the commit may be postponed a 
bit), so you get nice at-least-once semantics "for free", without bothering 
with committing manually. You could get basically the same result by doing
{code:java}
while (true) {
  records = kafkaConsumer.poll();
  processRecords(records);
  if (timer.isExpired()) {
    kafkaConsumer.commitAsync()
    timer.reset();
  }
}
{code}
but enabling auto commit by default provides a nice out of the box experience 
for the consumer where the user doesn't have to remember to commit.

If processRecords is asynchronous, you end up with a situation like ours where 
autocommit causes you to lose the ability to guarantee processing. The only 
reason we would want to keep it around is for people who really don't care 
about whether tuples are processed, because there may be a performance 
difference between doing
{code:java}
while (true) {
  records = kafkaConsumer.poll();
  processRecords(records);
  if (timer.isExpired()) {
    kafkaConsumer.commitAsync()
    timer.reset();
  }
}
{code}
and doing at-most-once
{code:java}
while (true) {
  records = kafkaConsumer.poll();
  kafkaConsumer.commitSync();
  processRecords(records);
}
{code}
due to the greater number of blocking calls in the at-most-once case.

The reason I think we should remove enable.auto.commit support and provide our 
own NONE ("no guarantees, but probably the fastest option") implementation, is 
that https://issues.apache.org/jira/browse/STORM-2844 slightly changed the 
semantics of EARLIEST and LATEST. For the at-least-once case EARLIEST now means 
"start over when the topology is redeployed", where it previously meant "start 
over when the worker is restarted". I think the new semantics are much more 
useful than the old ones. We can get this behavior because we commit metadata 
to Kafka as part of commits, which tells us which topology committed the offset.

We didn't update the at-most-once or NONE cases to store metadata, which is 
causing STORM-2913. We can (and should) support the same behavior for EARLIEST 
and LATEST for at-most-once, and we can do it very easily by adding metadata to 
those commits.

For the NONE/enable.auto.commit case, we can't support the same behavior. It's 
not possible for us to specify metadata that the consumer should include in 
autocommits. This means that we'd have to special case EARLIEST/LATEST to 
specify that they do one thing for at-least-once/at-most-once and a completely 
different (and much less useful) thing for autocommit. We'd also have to add 
special case code to support it. This was the initial reason I wanted to get 
rid of autocommit, because it would add more complexity than I thought it was 
worth.

Since there isn't really anything complicated about implementing NONE on our 
own without enable.auto.commit (we even have the timer already), I think it's 
much more attractive to implement NONE ourselves, because it means we can make 
EARLIEST/LATEST behave consistently regardless of ProcessingGuarantee, rather 
than special casing autocommit out and creating a trap for users.
{quote}If the goal is to avoid the WARN exception that was getting printed when 
enable.auto.commit=true, the obvious thing to do in my opinion is to simply not 
log the message if the processing guarantee is not AT_LEAST_ONCE.
{quote}
I hope I've explained now why I don't think it's good enough just to ignore the 
warning if we're in autocommit mode. We'd still be making EARLIEST/LATEST 
inconsistent across ProcessingGuarantees. Besides, if we just swallow the error 
we're still throwing an exception per emitted message which I don't think we 
want.
{quote}As far as the WARN level, initially when I created the PR with that 
change I had it with level DEBUG and I never felt that it should be WARN to 
begin with. During reviews someone asked for it to be WARN and I did it, but 
never felt it should be WARN, and I still don't think it should be.
{quote}
Yes, that was me. I disagree. The log is in a place where we've caught an 
exception because we couldn't deserialize some data we expected to be there. It 
changes the spout's behavior a lot when it's hit during initialization as well 
(ignore the committed offset and do EARLIEST/LATEST if those strategies are in 
use), and ideally we never want to hit the error, so I really think DEBUG and 
INFO levels are too low. With this patch and 2913, the warning should only show 
up if it's the first time you start the spout on a partition and there were 
previous commits for that partition and consumer group (barring bugs).
{quote}That leads me to ask why are we removing the option to set the Kafka 
property enable.auto.commit and throwing an exception
{quote}
The exception isn't new. It's there because I don't want people to set 
enable.auto.commit manually. The spout needs to know whether the consumer is 
autocommitting, because it needs to change its behavior. The best way to ensure 
that the consumer and spout settings are in sync is to make people set NONE, 
and using that setting to set enable.auto.commit. We're throwing an error if 
people try to set enable.auto.commit themselves in 2.x and in 1.x we're warning 
them that they should set the processing guarantee to NONE instead of using the 
property, then overwriting their settings.

All this being said, I haven't checked what the performance impact of 
at-most-once vs NONE is, but I suspect it would vary by latency to Kafka. If we 
want to drop NONE entirely, I would be okay with it, if we're sure that the 
performance penalty for using at-most-once for cases where you could also use 
NONE is not too great.

Sorry about the wall of text, hope this addresses your issues.

[~kabhwan] [~avermeerbergen] you might also be interested to read this.

> Remove enable.auto.commit support from storm-kafka-client
> ---------------------------------------------------------
>
>                 Key: STORM-2914
>                 URL: https://issues.apache.org/jira/browse/STORM-2914
>             Project: Apache Storm
>          Issue Type: Improvement
>          Components: storm-kafka-client
>    Affects Versions: 2.0.0, 1.2.0
>            Reporter: Stig Rohde Døssing
>            Assignee: Stig Rohde Døssing
>            Priority: Major
>              Labels: pull-request-available
>          Time Spent: 20m
>  Remaining Estimate: 0h
>
> The enable.auto.commit option causes the KafkaConsumer to periodically commit 
> the latest offsets it has returned from poll(). It is convenient for use 
> cases where messages are polled from Kafka and processed synchronously, in a 
> loop. 
> Due to https://issues.apache.org/jira/browse/STORM-2913 we'd really like to 
> store some metadata in Kafka when the spout commits. This is not possible 
> with enable.auto.commit. I took at look at what that setting actually does, 
> and it just causes the KafkaConsumer to call commitAsync during poll (and 
> during a few other operations, e.g. close and assign) with some interval. 
> Ideally I'd like to get rid of ProcessingGuarantee.NONE, since I think 
> ProcessingGuarantee.AT_MOST_ONCE covers the same use cases, and is likely 
> almost as fast. The primary difference between them is that AT_MOST_ONCE 
> commits synchronously.
> If we really want to keep ProcessingGuarantee.NONE, I think we should make 
> our ProcessingGuarantee.NONE setting cause the spout to call commitAsync 
> after poll, and never use the enable.auto.commit option. This allows us to 
> include metadata in the commit.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)

Reply via email to