It absolutely is important that the partitioning logic for a single topic be 
the same across an entire cluster. IOW, if a topology has a single sink, then 
no matter where that topology is run in the cluster, it had better use the same 
partitioning logic. I would argue that when the partitioning logic varies from 
the default logic, it’s far better to encapsulate it within the topology’s 
definition, and adding it to the sink is a very easy way to do this (and very 
natural for the developer using Kafka Streams).

However, centralizing the partitioning logic for all streams is certainly not 
ideal, primarily because different topics will likely need to be partitioned in 
different ways. This is especially true for stateful stream processing, which 
depends on messages with the same key going to the same processor instance that 
owns that keyed data. IOW, the partitioning logic used by a producer is 
strongly informed by how the *downstream stateful consumers* are 
organized/clustered. It gets far more complicated when considering built-in 
topics used by offset management, state storage, and metrics. 

The bottom line is that *different* topics will likely need to be partitioned 
differently.

On October 14, 2015 at 12:57:37 PM, Yasuhiro Matsuda 
(yasuhiro.mats...@gmail.com) wrote:

A partitioning scheme should be a cluster wide thing. Letting each sink  
have a different partitioning scheme does not make sense to me. A  
partitioning scheme is not specific to a stream job, each task or a sink. I  
think specifying it at sink level is more error prone.  

If a user wants to customize a partitioning scheme, he/she also want to  
manage it at some central place, maybe a code repo, or a jar file. All  
application must use the same logic, otherwise data will be messed up.  
Thus, a single class representing all partitioning logic is not a bad thing  
at all. (The code organization wise, all logic does not necessarily in the  
single class, of course.)  


On Wed, Oct 14, 2015 at 8:47 AM, Randall Hauch <rha...@gmail.com> wrote:  

> Created https://issues.apache.org/jira/browse/KAFKA-2649 and attached a  
> PR with the proposed change.  
>  
> Thanks!  
>  
>  
> On October 14, 2015 at 3:12:34 AM, Guozhang Wang (wangg...@gmail.com)  
> wrote:  
>  
> Thanks!  
>  
> On Tue, Oct 13, 2015 at 9:34 PM, Randall Hauch <rha...@gmail.com> wrote:  
> Ok, cool. I agree we want something simple. I'll create an issue and  
> create a pull request with a proposal. Look for it tomorrow.  
>  
> On Oct 13, 2015, at 10:25 PM, Guozhang Wang <wangg...@gmail.com> wrote:  
>  
> I see your point. Yeah I think it is a good way to add a Partitioner into  
> addSink(...) but the Partitioner interface in producer is a bit overkill:  
>  
> "partition(String topic, Object key, byte[] keyBytes, Object value, byte[]  
> valueBytes, Cluster cluster)"  
>  
> whereas for us we only want to partition on (K key, V value).  
>  
> Perhaps we should add a new Partitioner interface in Kafka Streams?  
>  
> Guozhang  
>  
> On Tue, Oct 13, 2015 at 6:38 PM, Randall Hauch <rha...@gmail.com> wrote:  
> This overrides the partitioning logic for all topics, right? That means I  
> have to explicitly call the default partitioning logic for all topics  
> except those that my Producer forwards. I’m guess the best way to do by  
> extending org.apache.kafka.clients.producer.DefaultProducer. Of course,  
> with multiple sinks in my topology, I have to put all of the partitioning  
> logic inside a single class.  
>  
> What would you think about adding an overloaded TopologyBuilder.addSink(…)  
> method that takes a Partitioner (or better yet a smaller functional  
> interface). The resulting SinkProcessor could use that Partitioner instance  
> to set the partition number? That’d be super convenient for users, would  
> keep the logic where it belongs (where the topology defines the sinks), and  
> best of all the implementations won't have to worry about any other topics,  
> such as those used by stores, metrics, or other sinks.  
>  
> Best regards,  
>  
> Randall  
>  
>  
> On October 13, 2015 at 8:09:41 PM, Guozhang Wang (wangg...@gmail.com)  
> wrote:  
>  
> Hi Randall,  
>  
> You can try to set the partitioner class as  
> ProducerConfig.PARTITIONER_CLASS_CONFIG in the StreamsConfig, its interface  
> can be found in  
>  
> org.apache.kafka.clients.producer.Partitioner  
>  
> Let me know if it works for you.  
>  
> Guozhang  
>  
> On Tue, Oct 13, 2015 at 10:59 AM, Randall Hauch <rha...@gmail.com> wrote:  
>  
> > The new streams API added with KIP-28 is great. I’ve been using it on a  
> > prototype for a few weeks, and I’m looking forward to it being included  
> in  
> > 0.9.0. However, at the moment, a Processor implementation is not able to  
> > specify the partition number when it outputs messages.  
> >  
> > I’d be happy to log a JIRA and create a PR to add it to the API, but  
> > without knowing all of the history I’m wondering if leaving it out of the  
> > API was intentional.  
> >  
> > Thoughts?  
> >  
> > Best regards,  
> >  
> > Randall Hauch  
> >  
>  
>  
>  
> --  
> -- Guozhang  
>  
>  
>  
> --  
> -- Guozhang  
>  
>  
>  
> --  
> -- Guozhang  
>  

Reply via email to