[ 
https://issues.apache.org/jira/browse/KAFKA-496?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=13458030#comment-13458030
 ] 

Jay Kreps commented on KAFKA-496:
---------------------------------

I would propose we work back from what the user code would look like.

One point I would like to bring up is that the current producer only allows a 
single request at a time. This is a huge hit on throughput since a single 
producer can only utilize only one partition on one broker at any given time. 
For some uses where the # producers is huge compared to the number of brokers 
this is fine, but this is not universally true. The fix for this is to make the 
send() call non-blocking and support multiplexing requests over the connection. 
This is not hard to do--we have the correlation id in the requests so the only 
change is in the network layer to avoid reordering requests from the same 
connection. If we made this change that would make the producer request ALWAYS 
be a sort of future.

The state of futures in java and scala seems to be a little complex. Java has a 
future but it doesn't allow registering a callback. Fineagle and Akka both have 
custom versions of Future. There is a proposal to unify all these, though I 
don't know the status 
(http://docs.scala-lang.org/sips/pending/futures-promises.html).

For our purpose I recommend we just make our own. It supports two methods
trait Future[T] {
  /* returns true if the result is ready */
  def complete: Boolean
  /* add a function to be called when the result is ready. The function takes 
the result of the execution--either an exception or a object of type T. Note 
you can call this more than once to register multiple actions. */
  def onComplete((Either[T, Exception]) => Unit): Future[T]
  /* await completion and return the result or throw the exception */
  def result: T
}

So the function prototype would be
   def send(data: ProduceData*) => Future[ProduceResponse]

In the current code the future would immediately be satisfied for the sync 
producer. When we have fully implemented the non-blocking client it wouldn't. 
But this change would be transparent to the user.

I think there are a couple of use cases
1. You don't really care what the result it (basically "fire and forget"), in 
which case you use this api as you do today:
     send(...)
2. You want to make sure the send succeeded or do some follow up action but you 
don't mind blocking the current thread:
    val response = send(...).result
3. You want to do something more complicated. This could be sending out lots of 
requests without blocking then handling responses or asynchronously handling 
failures or whatever. In this case you would use
    send(...).onComplete { result: Either[T, Exception] =>
        result match {
          case result: T => .. do something
          case e: Exception => handle exception
     }
      


  
                
> high level producer send should return a response
> -------------------------------------------------
>
>                 Key: KAFKA-496
>                 URL: https://issues.apache.org/jira/browse/KAFKA-496
>             Project: Kafka
>          Issue Type: Bug
>          Components: core
>            Reporter: Jun Rao
>            Priority: Blocker
>              Labels: features
>             Fix For: 0.8
>
>   Original Estimate: 72h
>  Remaining Estimate: 72h
>
> Currently, Producer.send() doesn't return any value. In 0.8, since each 
> produce request will be acked, we should pass the response back. What we can 
> do is that if the producer is in sync mode, we can return a map of 
> (topic,partitionId) -> (errorcode, offset). If the producer is in async mode, 
> we can just return a null.

--
This message is automatically generated by JIRA.
If you think it was sent incorrectly, please contact your JIRA administrators
For more information on JIRA, see: http://www.atlassian.com/software/jira

Reply via email to