As explained on https://issues.apache.org/jira/browse/KAFKA-8207, setting a custom partition assignor in not allows in Kafka Streams.
There is work in progress to address the reported issue. Unfortunately, I am not aware of a workaround for now. -Matthias On 4/10/19 11:35 PM, Neeraj Bhatt wrote: > Hi Bill > > Thanks for replying. > > We have a 5 node cluster of KStream which is reading from a Topic A. In > node 1 consumer we are reading partitions from 1 to 50 of Topic A using > KStream and aggregating the records and saving in the local state store > (Rocks DB). > > In case node 1 goes down these 50 partitions will be assigned to node2, > node3, node 4 and node 5 and they will start aggregating the records of > these 50 partitions from starting. if node 1 comes back again after one day > we want that only these partitions should be assigned to node 1 as it has > already been aggregated to some extent. > > I am aware that we can use num.standby.replicas to replicate local state > store across clusters, but what if the whole cluster goes down. > > Streams tasks will work only on those partitions as earlier so that we can > save the replay time to build local state store from the changelog. > > Do I need to write my own partition assignor for the producer in this case? > > Thanks > > > > On Wed, Apr 10, 2019 at 9:31 PM Bill Bejeck <[email protected]> wrote: > >> Hi Neeraj, >> >> I have a couple of questions, are you trying to adjust the partition >> assignment of records consumed by streams? In that case, you can assign a >> custom partition assignor for the producer sending records to the topic >> feeding the streams application. If you are trying to do custom partition >> assignments for the topics streams is *producing to *then you can provide a >> custom StreamPartitioner like so stream.to(topicName, >> Produced.with(keySerde, >> valueSerde).withStreamPartitiioner(YourCustomStreamPartitioner) >> >> Does this help? >> >> -Bill >> >> On Wed, Apr 10, 2019 at 7:15 AM Neeraj Bhatt <[email protected]> >> wrote: >> >>> Hi >>> >>> Which partition strategy Kafka stream uses? Can we change the partition >>> strategy in Kafka Stream as we can change in normal Kafka Consumer >>> >>> >>> >> streamsConfiguration.put(ConsumerConfig.PARTITION_ASSIGNMENT_STRATEGY_CONFIG,Collections.singletonList(ColombiaStrictStickyAssignor.class)); >>> so not change the partition assignor >>> >>> Thanks >>> >> >
signature.asc
Description: OpenPGP digital signature
