[ 
https://issues.apache.org/jira/browse/KAFKA-1835?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=14625837#comment-14625837
 ] 

Ewen Cheslack-Postava commented on KAFKA-1835:
----------------------------------------------

[~becket_qin] Agreed that guaranteeing an error on first send is awkward. 
That's why I said that behavior would be perversely "good" behavior, only 
because it forces them to handle that type of error :) Then again, if you do 
something like start a metadata fetch upon instantiation, the time between 
instantiation and first send could be arbitrary, and often times might be 
extremely small. So even starting a fetch then may still result in the same 
error very commonly and wouldn't significantly change the behavior.

Your response to the stale metadata question is interesting because the end 
result is "enqueue, but notify of error". I think that is behavior that 
[~stevenz3wu] would probably also be happy with in the case of first send -- 
enqueue the data without partitioning, but notify of the error. Not saying 
that's the *right* solution, just that it's a solution that would be symmetric 
in both cases and satisfy the non-blocking constraint.

The point about unkeyed messages is really interesting -- it's a good point 
that there's really no good reason to indefinitely delay those messages just 
because we chose their partitions arbitrarily and that partition happens to be 
offline. But I'm not sure tracking that subset of messages and separately 
re-partitioning them so they can get sent out is worth the overhead and 
complexity of tracking all that extra info. Then again, if your application is 
only sending unkeyed messages, it could be pretty beneficial to enable 
resending to other topics (and support a random partitioner that ignores 
known-unavailable partitions). In any case, this is a giant tangent (my bad...).

Coming back to the original issue, I think with the proper explanation, the 
behavior of failing on the first send isn't that unintuitive. The short version 
is:
* KafkaProducer will only queue records when it knows the partition (and 
therefore, indirectly, the broker) the data is destined for. When it starts, 
the producer has no information about any topics and therefore cannot enqueue 
any data. Initial requests to send records will fail, but trigger requests for 
this metadata, and after it is received all subsequent send() calls will 
succeed assuming there is enough queue space.

The long version requires explaining that:
* Figuring out which partition a message should be sent to requires some 
information about the topic (such as number of partitions).
* By setting a 0 or very small max.block.ms, you have given us basically no 
time to look this information up.
* Queuing records before we know what partition they are destined for adds an 
extra layer of queuing and complexity.
* If you just created the producer, we've had little time to get the info we 
need. Therefore, to avoid an extra layer of queuing, you will see an error. If 
you are willing to accept a small *potential* delay, which might average XX ms 
for common configurations, you would not normally see this error. If you 
absolutely need to not block for XX ms, then you should handle this error.

I think that in practice, this is probably a good compromise. People who 
*really* understand what's going on can get the behavior they want, but have to 
jump through a couple of extra hoops, including setting the right configs and 
handling errors that most users would be unlikely to see. The vast majority of 
users who don't care about blocking a bit just leave the default settings and 
never notice that the producer blocks on the first send unless they have a 
really long outage where they can't fetch metadata. In other words, while the 
completely non-blocking case isn't ideal, I think since it would require a very 
specific configuration change, it won't affect most users and so the somewhat 
odd behavior is acceptable given clear documentation.


> Kafka new producer needs options to make blocking behavior explicit
> -------------------------------------------------------------------
>
>                 Key: KAFKA-1835
>                 URL: https://issues.apache.org/jira/browse/KAFKA-1835
>             Project: Kafka
>          Issue Type: Improvement
>          Components: clients
>    Affects Versions: 0.8.2.0, 0.8.3, 0.9.0
>            Reporter: Paul Pearcy
>             Fix For: 0.8.3
>
>         Attachments: KAFKA-1835-New-producer--blocking_v0.patch, 
> KAFKA-1835.patch
>
>   Original Estimate: 504h
>  Remaining Estimate: 504h
>
> The new (0.8.2 standalone) producer will block the first time it attempts to 
> retrieve metadata for a topic. This is not the desired behavior in some use 
> cases where async non-blocking guarantees are required and message loss is 
> acceptable in known cases. Also, most developers will assume an API that 
> returns a future is safe to call in a critical request path. 
> Discussing on the mailing list, the most viable option is to have the 
> following settings:
>  pre.initialize.topics=x,y,z
>  pre.initialize.timeout=x
>  
> This moves potential blocking to the init of the producer and outside of some 
> random request. The potential will still exist for blocking in a corner case 
> where connectivity with Kafka is lost and a topic not included in pre-init 
> has a message sent for the first time. 
> There is the question of what to do when initialization fails. There are a 
> couple of options that I'd like available:
> - Fail creation of the client 
> - Fail all sends until the meta is available 
> Open to input on how the above option should be expressed. 
> It is also worth noting more nuanced solutions exist that could work without 
> the extra settings, they just end up having extra complications and at the 
> end of the day not adding much value. For instance, the producer could accept 
> and queue messages(note: more complicated than I am making it sound due to 
> storing all accepted messages in pre-partitioned compact binary form), but 
> you're still going to be forced to choose to either start blocking or 
> dropping messages at some point. 
> I have some test cases I am going to port over to the Kafka producer 
> integration ones and start from there. My current impl is in scala, but 
> porting to Java shouldn't be a big deal (was using a promise to track init 
> status, but will likely need to make that an atomic bool). 



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)

Reply via email to