Fine with most of the changes. Just a few questions/comments.

1. Would it be better to change Callback to ProducerCallback to distinguish
it from controller callback and potential future other types of callbacks
(e.g. consumer)?

2. If no key is present AND no partition id is present, partitions
will be chosen
in a round robin fashion.

Currently, our default event handler picks a random and available
partition. This is probably better than round robin because (1) if the
replication factor is 1 and there is a broker failure, we can still route
the message, (2) if a bunch of producers are started at the same time, this
prevents them from picking up the same partition in a synchronized way.

3. I think this will still make it possible to implement any
partitioning strategy
you would want. The "challenge case" I considered was the partitioner that
minimizes the number of TCP connections. This artitioner
would chose the partition hosted on the node it had most recently
chosen in hopes
that it would still have a connection to that node.

To support this mode, we probably need a method in the producer to close
all existing sockets.

Thanks,

Jun



On Fri, Jan 31, 2014 at 3:04 PM, Jay Kreps <jay.kr...@gmail.com> wrote:

> Hey folks,
>
> Thanks for all the excellent suggestions on the producer API, I think this
> really made things better. We'll do a similar thing for the consumer as we
> get a proposal together. I wanted to summarize everything I heard and the
> proposed changes I plan to do versus ignore. I'd like to get feedback from
> the committers or anyone else interest on this as a proposal (+1/0/-1)
> before I go make the changes just to avoid churn at code review time.
>
> 1. Change send to use java.util.concurrent.Future in send():
>   Future<RecordPosition> send(ProducerRecord record, Callback callback)
> The cancel method will always return false and not do anything. Callback
> will then be changed to
>   interface Callback {
>     public void onCompletion(RecordPosition position, Exception exception);
>   }
> so that the exception is available there too and will be null if no error
> occurred. I'm not planning on changing the name callback because I haven't
> thought of a better one.
>
> 2. We will change the way serialization works to proposal 1A in the
> previous discussion. That is the Partitioner and Serializer interfaces will
> disappear. ProducerRecord will change to:
>   class ProducerRecord {
>     public byte[] key() {...}
>     public byte[] value() {...}
>     public Integer partition() {...} // can be null
>   }
> So key and value are now byte[]; partitionKey will be replaced by an
> optional partition. The behavior will be the following:
> 1. If partition is present it will be used.
> 2. If no partition is present but a key is present, a partition will be
> chosen by a hash of the key
> 3. If no key is present AND no partition id is present, partitions will be
> chosen in a round robin fashion
> In other words what is currently the DefaultPartitioner will now be hard
> coded as the behavior whenever no partition is provided.
>
> In order to allow correctly choosing a partition the Producer interface
> will include a new method:
>   List<PartitionInfo> partitionsForTopic(String topic);
> PartitionInfo will be changed to include the actual Node objects not just
> the Node ids.
>
> I think this will still make it possible to implement any partitioning
> strategy you would want. The "challenge case" I considered was the
> partitioner that minimizes the number of TCP connections. This partitioner
> would chose the partition hosted on the node it had most recently chosen in
> hopes that it would still have a connection to that node.
>
> It will be slightly more awkward to package partitioners in this model but
> the simple cases are simpler so hopefully its worth it.
>
> 3. I will make the producer implement java.io.Closable but not throw any
> exceptions as there doesn't really seem to be any disadvantage to this and
> the interface may remind people to call close.
>
> 4. I am going to break the config stuff into a separate discussion as I
> don't think I have done enough to document and name the configs well and I
> need to do a pass on that first.
>
> 5. There were a number of comments about internals, mostly right on, which
> I am going to handle separately.
>
> Non-changes
>
> 1. I don't plan to change the build system. The SBT=>gradle change is
> basically orthoganol and we should debate it in the context of its ticket.
> 2. I'm going to stick with my oddball kafka.* rather than
> org.apache.kafka.* package name and non-getter methods unless everyone
> complains.
> 3. I'm not going to introduce a zookeeper dependency in the client as I
> don't see any advantage.
> 4. There were a number of reasonable suggestions on the Callback execution
> model. I'm going to leave it as is, though. Because we are moving to use
> Java Future we can't include functionality like ListenableFuture. I think
> the simplestic callback model where callbacks are executed in the i/o
> thread should be good enough for most cases and other cases can use their
> own threading.
>

Reply via email to