Re: [Discussion] KIP-34 Add Partitioner Change Listener to Partitioner Interface

2015-09-24 Thread Bhavesh Mistry
HI Becket,

Thanks for answering and  providing feedback.  I will withdraw KIP and
put into rejected section.

Thanks,

Bhavesh

On Mon, Sep 21, 2015 at 9:53 AM, Jiangjie Qin  wrote:
> Hey Bhavesh,
>
> I kind of think this metadata change capture logic should be implemented by
> each user by themselves for the following reasons:
>
> 1. Most user do not really care about partition change. Adding the
> logic/interface to default partitioner means for users who don't care about
> the partition change, they are paying the price of making a cluster diff
> for each metadata update. For a big cluster, this metadata diff could be
> costly depending on how frequent the metadata is refreshed.
>
> 2. In some cases, user might only care about partition change for some
> specific topic, in that case, there is no need to do a cluster diff for all
> the topics a producer is producing data to. If the cluster diff is
> implemented in user code, it would be more efficient because user can only
> check the topic they are interested. Also, different users might care about
> different changes in the metadata, e.g. topic create/delete/node change,
> etc. So it seems better to leave the actual metadata change capture logic
> to user instead of doing it in the producer.
>
> 3. The cluster diff code itself is short and not complicated so even if
> user do it on their own it should be simple. e.g.:
> {
>   if (this.cachedCluster.hashCode() != cluster.hashCode())
> for (String topic : cluster.topics()) {
>   if (this.cachedCluster.hashCode().contains(topic) &&
>   this.cachedCluster.partitionsForTopic(topic).partition() !=
> cluster.partitionsForTopic(topic).partition())
>   // handle partition change.
> }
> }
>
> Thanks,
>
> Jiangjie (Becket) Qin
>
> On Mon, Sep 21, 2015 at 9:13 AM, Bhavesh Mistry 
> wrote:
>
>> HI Jiagjie,
>>
>> Thanks for valuable feedback.
>>
>> 1) Thread Coordination for Change of partitions could be issue.
>>
>> I do agree with you that coordination between the application thread
>> and sender thread would be tough one.   The only concern I had was to
>> share the same logic you had described among all the partitioner
>> interface implementation, and let the Kafka framework level take care
>> of doing the diff like you exactly describe
>>
>> In multithreaded environment, the change listener is being called from
>> same thread that just finish the MetaData update will receive.
>>
>> Metadata Listener:
>>
>>
>> https://github.com/apache/kafka/blob/trunk/clients/src/main/java/org/apache/kafka/clients/Metadata.java#L163
>>
>> producer.send()
>>
>>
>> https://github.com/apache/kafka/blob/trunk/clients/src/main/java/org/apache/kafka/clients/producer/KafkaProducer.java#L370
>>
>> But the behavior of onChange() will not be different than what is today.
>>
>> For example,
>>
>> public Future send(ProducerRecord record,
>> Callback callback) {
>>
>> //  Determine partition for message
>>
>> int partition = partition(record, serializedKey, serializedValue,
>> metadata.fetch());
>>
>> /**
>>
>> Metadata update occurs after the application thread determine the
>> partition for given method but before adding message  to record queue
>> the cluster change happened.  So In my opinion behavior is same.
>>
>> ***/RecordAccumulator.RecordAppendResult result =
>> accumulator.append(tp, serializedKey, serializedValue, callback);
>>
>>
>>
>> What do you think of adding the diff logic as you describe to Default
>> Partitioner Implementation or (another implementation class called it
>> Change Partitioner class ) which within partition() method calls
>> onChange() method and whoever care or needs to know can do what they
>> like (Log event, or use that to change partitioning strategy etc).
>>
>> This give ability to share the diff code and not all implementation
>> have to implement diff logic that is main concern.
>>
>>
>> Thanks,
>>
>> Bhavesh
>>
>>
>> On Fri, Sep 18, 2015 at 3:47 PM, Jiangjie Qin 
>> wrote:
>> > Hey Bhavesh,
>> >
>> > I think it is useful to notify the user about the partition change.
>> >
>> > The problem of having a listener in producer is that it is hard to
>> > guarantee the synchronization. For example, consider the following
>> sequence:
>> > 1. producer sender thread refreshes the metadata with partition change.
>> > 2. user thread called send with customized partitioner, the partitioner
>> > decided the partition with new metadata refreshed in step 1.
>> > 3. producer sender thread calls onParitionChange()
>> >
>> > At that point, the message sent in step 2 was sent using the new
>> metadata.
>> > If we update the metadata after invoking onParttitionChange(), it is a
>> > little strange because the metadata has not changed yet.
>> >
>> > Also the metadata refresh can happen in caller thread as well, not sure
>> how
>> > it would work with multiple caller thread.
>> >
>> > I am thinking it 

Re: [Discussion] KIP-34 Add Partitioner Change Listener to Partitioner Interface

2015-09-21 Thread Jiangjie Qin
Hey Bhavesh,

I kind of think this metadata change capture logic should be implemented by
each user by themselves for the following reasons:

1. Most user do not really care about partition change. Adding the
logic/interface to default partitioner means for users who don't care about
the partition change, they are paying the price of making a cluster diff
for each metadata update. For a big cluster, this metadata diff could be
costly depending on how frequent the metadata is refreshed.

2. In some cases, user might only care about partition change for some
specific topic, in that case, there is no need to do a cluster diff for all
the topics a producer is producing data to. If the cluster diff is
implemented in user code, it would be more efficient because user can only
check the topic they are interested. Also, different users might care about
different changes in the metadata, e.g. topic create/delete/node change,
etc. So it seems better to leave the actual metadata change capture logic
to user instead of doing it in the producer.

3. The cluster diff code itself is short and not complicated so even if
user do it on their own it should be simple. e.g.:
{
  if (this.cachedCluster.hashCode() != cluster.hashCode())
for (String topic : cluster.topics()) {
  if (this.cachedCluster.hashCode().contains(topic) &&
  this.cachedCluster.partitionsForTopic(topic).partition() !=
cluster.partitionsForTopic(topic).partition())
  // handle partition change.
}
}

Thanks,

Jiangjie (Becket) Qin

On Mon, Sep 21, 2015 at 9:13 AM, Bhavesh Mistry 
wrote:

> HI Jiagjie,
>
> Thanks for valuable feedback.
>
> 1) Thread Coordination for Change of partitions could be issue.
>
> I do agree with you that coordination between the application thread
> and sender thread would be tough one.   The only concern I had was to
> share the same logic you had described among all the partitioner
> interface implementation, and let the Kafka framework level take care
> of doing the diff like you exactly describe
>
> In multithreaded environment, the change listener is being called from
> same thread that just finish the MetaData update will receive.
>
> Metadata Listener:
>
>
> https://github.com/apache/kafka/blob/trunk/clients/src/main/java/org/apache/kafka/clients/Metadata.java#L163
>
> producer.send()
>
>
> https://github.com/apache/kafka/blob/trunk/clients/src/main/java/org/apache/kafka/clients/producer/KafkaProducer.java#L370
>
> But the behavior of onChange() will not be different than what is today.
>
> For example,
>
> public Future send(ProducerRecord record,
> Callback callback) {
>
> //  Determine partition for message
>
> int partition = partition(record, serializedKey, serializedValue,
> metadata.fetch());
>
> /**
>
> Metadata update occurs after the application thread determine the
> partition for given method but before adding message  to record queue
> the cluster change happened.  So In my opinion behavior is same.
>
> ***/RecordAccumulator.RecordAppendResult result =
> accumulator.append(tp, serializedKey, serializedValue, callback);
>
>
>
> What do you think of adding the diff logic as you describe to Default
> Partitioner Implementation or (another implementation class called it
> Change Partitioner class ) which within partition() method calls
> onChange() method and whoever care or needs to know can do what they
> like (Log event, or use that to change partitioning strategy etc).
>
> This give ability to share the diff code and not all implementation
> have to implement diff logic that is main concern.
>
>
> Thanks,
>
> Bhavesh
>
>
> On Fri, Sep 18, 2015 at 3:47 PM, Jiangjie Qin 
> wrote:
> > Hey Bhavesh,
> >
> > I think it is useful to notify the user about the partition change.
> >
> > The problem of having a listener in producer is that it is hard to
> > guarantee the synchronization. For example, consider the following
> sequence:
> > 1. producer sender thread refreshes the metadata with partition change.
> > 2. user thread called send with customized partitioner, the partitioner
> > decided the partition with new metadata refreshed in step 1.
> > 3. producer sender thread calls onParitionChange()
> >
> > At that point, the message sent in step 2 was sent using the new
> metadata.
> > If we update the metadata after invoking onParttitionChange(), it is a
> > little strange because the metadata has not changed yet.
> >
> > Also the metadata refresh can happen in caller thread as well, not sure
> how
> > it would work with multiple caller thread.
> >
> > I am thinking it seems the user can actually get the idea of whether the
> > cluster has changed or not because the partition() method actually takes
> a
> > cluster parameter. So if user cares about the partition number change,
> they
> > can do the following:
> > 1. store a copy of cluster as cache in the partitioner.
> > 2. when partition() is called, check if the hash of this 

Re: [Discussion] KIP-34 Add Partitioner Change Listener to Partitioner Interface

2015-09-21 Thread Bhavesh Mistry
HI Jiagjie,

Thanks for valuable feedback.

1) Thread Coordination for Change of partitions could be issue.

I do agree with you that coordination between the application thread
and sender thread would be tough one.   The only concern I had was to
share the same logic you had described among all the partitioner
interface implementation, and let the Kafka framework level take care
of doing the diff like you exactly describe

In multithreaded environment, the change listener is being called from
same thread that just finish the MetaData update will receive.

Metadata Listener:

https://github.com/apache/kafka/blob/trunk/clients/src/main/java/org/apache/kafka/clients/Metadata.java#L163

producer.send()

https://github.com/apache/kafka/blob/trunk/clients/src/main/java/org/apache/kafka/clients/producer/KafkaProducer.java#L370

But the behavior of onChange() will not be different than what is today.

For example,

public Future send(ProducerRecord record,
Callback callback) {

//  Determine partition for message

int partition = partition(record, serializedKey, serializedValue,
metadata.fetch());

/**

Metadata update occurs after the application thread determine the
partition for given method but before adding message  to record queue
the cluster change happened.  So In my opinion behavior is same.

***/RecordAccumulator.RecordAppendResult result =
accumulator.append(tp, serializedKey, serializedValue, callback);



What do you think of adding the diff logic as you describe to Default
Partitioner Implementation or (another implementation class called it
Change Partitioner class ) which within partition() method calls
onChange() method and whoever care or needs to know can do what they
like (Log event, or use that to change partitioning strategy etc).

This give ability to share the diff code and not all implementation
have to implement diff logic that is main concern.


Thanks,

Bhavesh


On Fri, Sep 18, 2015 at 3:47 PM, Jiangjie Qin  wrote:
> Hey Bhavesh,
>
> I think it is useful to notify the user about the partition change.
>
> The problem of having a listener in producer is that it is hard to
> guarantee the synchronization. For example, consider the following sequence:
> 1. producer sender thread refreshes the metadata with partition change.
> 2. user thread called send with customized partitioner, the partitioner
> decided the partition with new metadata refreshed in step 1.
> 3. producer sender thread calls onParitionChange()
>
> At that point, the message sent in step 2 was sent using the new metadata.
> If we update the metadata after invoking onParttitionChange(), it is a
> little strange because the metadata has not changed yet.
>
> Also the metadata refresh can happen in caller thread as well, not sure how
> it would work with multiple caller thread.
>
> I am thinking it seems the user can actually get the idea of whether the
> cluster has changed or not because the partition() method actually takes a
> cluster parameter. So if user cares about the partition number change, they
> can do the following:
> 1. store a copy of cluster as cache in the partitioner.
> 2. when partition() is called, check if the hash of this cluster is the
> same as the cached cluster.
> 3. If the hash of the passed in cluster is different from the hash of
> cached cluster, that means a metadata refresh occurred, people can check if
> there is partition change or not before do the partitioning.
>
> Thanks,
>
> Jiangjie (Becket) Qin
>
> On Wed, Sep 16, 2015 at 12:08 AM, Bhavesh Mistry > wrote:
>
>> Hi Kafka Dev Team,
>>
>> I would like you get your feedback about adding yet another method or API
>> call to onPartitionsChange( ) to Partitioner Interface to get notify about
>> partition changes upon metadata refresh.
>>
>> This will allow custom logic (implementor of Partitioner) to be notified if
>> partition ownership or partition online vs offline, or partition
>> increase/decrease event happen and when changes were propagated to
>> individual producer instance.
>>
>> Please note this is my first KIP and if process is not followed correctly,
>> please do let me know.  I will be more than happy to follow or correct
>> something that I may have missed.
>>
>> Thanks in advance and looking forward to your feedback.
>>
>> Thanks,
>>
>> Bhavesh
>>


Re: [Discussion] KIP-34 Add Partitioner Change Listener to Partitioner Interface

2015-09-18 Thread Jiangjie Qin
Hey Bhavesh,

I think it is useful to notify the user about the partition change.

The problem of having a listener in producer is that it is hard to
guarantee the synchronization. For example, consider the following sequence:
1. producer sender thread refreshes the metadata with partition change.
2. user thread called send with customized partitioner, the partitioner
decided the partition with new metadata refreshed in step 1.
3. producer sender thread calls onParitionChange()

At that point, the message sent in step 2 was sent using the new metadata.
If we update the metadata after invoking onParttitionChange(), it is a
little strange because the metadata has not changed yet.

Also the metadata refresh can happen in caller thread as well, not sure how
it would work with multiple caller thread.

I am thinking it seems the user can actually get the idea of whether the
cluster has changed or not because the partition() method actually takes a
cluster parameter. So if user cares about the partition number change, they
can do the following:
1. store a copy of cluster as cache in the partitioner.
2. when partition() is called, check if the hash of this cluster is the
same as the cached cluster.
3. If the hash of the passed in cluster is different from the hash of
cached cluster, that means a metadata refresh occurred, people can check if
there is partition change or not before do the partitioning.

Thanks,

Jiangjie (Becket) Qin

On Wed, Sep 16, 2015 at 12:08 AM, Bhavesh Mistry  wrote:

> Hi Kafka Dev Team,
>
> I would like you get your feedback about adding yet another method or API
> call to onPartitionsChange( ) to Partitioner Interface to get notify about
> partition changes upon metadata refresh.
>
> This will allow custom logic (implementor of Partitioner) to be notified if
> partition ownership or partition online vs offline, or partition
> increase/decrease event happen and when changes were propagated to
> individual producer instance.
>
> Please note this is my first KIP and if process is not followed correctly,
> please do let me know.  I will be more than happy to follow or correct
> something that I may have missed.
>
> Thanks in advance and looking forward to your feedback.
>
> Thanks,
>
> Bhavesh
>


[Discussion] KIP-34 Add Partitioner Change Listener to Partitioner Interface

2015-09-16 Thread Bhavesh Mistry
Hi Kafka Dev Team,

I would like you get your feedback about adding yet another method or API
call to onPartitionsChange( ) to Partitioner Interface to get notify about
partition changes upon metadata refresh.

This will allow custom logic (implementor of Partitioner) to be notified if
partition ownership or partition online vs offline, or partition
increase/decrease event happen and when changes were propagated to
individual producer instance.

Please note this is my first KIP and if process is not followed correctly,
please do let me know.  I will be more than happy to follow or correct
something that I may have missed.

Thanks in advance and looking forward to your feedback.

Thanks,

Bhavesh