[ 
https://issues.apache.org/jira/browse/KAFKA-1835?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=14286967#comment-14286967
 ] 

Ewen Cheslack-Postava commented on KAFKA-1835:
----------------------------------------------

[~ppearcy] these are easier to review if they're on reviewboard -- might help 
to use the patch submission tool in the future. Here are some notes on the 
current patch:

KafkaProducer.java
* No need to use the object forms of primitive types, change Boolean -> 
boolean, Long -> long, etc.
* initialized should be an AtomicBoolean or volatile boolean since it's 
read/written from different threads
* Error handling when waiting for the Future to finish seems wrong -- if there 
is an exception, we probably want to pass it along/throw another one to 
indicate the problem to the caller. Currently it just falls through and then 
only throws an exception when send() is called, so the error ends up 
disconnected from the source of the problem. It seems like it would be better 
to just handle the error immediately.
* Similarly, I don't think send() should check initialized if preinitialization 
is handled in the constructor -- if failure to preinitialize also threw an 
exception, then it would be impossible to call send() unless preinitialization 
was complete.
* If you follow the above approach, you can avoid making initialized a field in 
the class. It would only need to be a local variable since it would only be 
used in the constructor.
* Do we even need the ExecutorService? Since the thread creating the producer 
is going to block by calling Future.get(), what does having the executor 
accomplish?
* initializeProducer() doesn't need a return value since only ever returns true.

ProducerConfig.java
* Config has a getList() method and ConfigDef has a LIST type. Use those for 
pre.initialize.topics instead of parsing the list yourself.
* I think the docstrings could be better, e.g.:
pre.initialize.topics: "List of topics to preload metadata for when creating 
the producer so subsequent calls to send are guaranteed not to block. If 
metadata for these topics cannot be loaded within 
<code>pre.initialize.timeout.ms</code> milliseconds, the producer constructor 
will throw an exception."
pre.initialize.timeout.ms:  "The producer blocks when sending the first message 
to a topic if metadata is not yet available for that topic. When this 
configuration is greater than 0, metadata for the topics specified by 
<code>pre.initialize.topics</code> are prefetched during construction, throwing 
an exception after <code>pre.initialize.timeout.ms</code> milliseconds if the 
metadata has not been populated."

> Kafka new producer needs options to make blocking behavior explicit
> -------------------------------------------------------------------
>
>                 Key: KAFKA-1835
>                 URL: https://issues.apache.org/jira/browse/KAFKA-1835
>             Project: Kafka
>          Issue Type: Improvement
>          Components: clients
>    Affects Versions: 0.8.2, 0.8.3, 0.9.0
>            Reporter: Paul Pearcy
>             Fix For: 0.8.2
>
>         Attachments: KAFKA-1835-New-producer--blocking_v0.patch
>
>   Original Estimate: 504h
>  Remaining Estimate: 504h
>
> The new (0.8.2 standalone) producer will block the first time it attempts to 
> retrieve metadata for a topic. This is not the desired behavior in some use 
> cases where async non-blocking guarantees are required and message loss is 
> acceptable in known cases. Also, most developers will assume an API that 
> returns a future is safe to call in a critical request path. 
> Discussing on the mailing list, the most viable option is to have the 
> following settings:
>  pre.initialize.topics=x,y,z
>  pre.initialize.timeout=x
>  
> This moves potential blocking to the init of the producer and outside of some 
> random request. The potential will still exist for blocking in a corner case 
> where connectivity with Kafka is lost and a topic not included in pre-init 
> has a message sent for the first time. 
> There is the question of what to do when initialization fails. There are a 
> couple of options that I'd like available:
> - Fail creation of the client 
> - Fail all sends until the meta is available 
> Open to input on how the above option should be expressed. 
> It is also worth noting more nuanced solutions exist that could work without 
> the extra settings, they just end up having extra complications and at the 
> end of the day not adding much value. For instance, the producer could accept 
> and queue messages(note: more complicated than I am making it sound due to 
> storing all accepted messages in pre-partitioned compact binary form), but 
> you're still going to be forced to choose to either start blocking or 
> dropping messages at some point. 
> I have some test cases I am going to port over to the Kafka producer 
> integration ones and start from there. My current impl is in scala, but 
> porting to Java shouldn't be a big deal (was using a promise to track init 
> status, but will likely need to make that an atomic bool). 



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)

Reply via email to