Hey there,
over the weekend I was debugging the streams configuration not passed within
threads. I noticed that one of the code path from KafkaConsumer (L743) was to
initialize the StreamPartitionAssignor:
this.assignors = config.getConfiguredInstances(
ConsumerConfig.PARTITION_ASSIGNMENT_STRATEGY_CONFIG,
PartitionAssignor.class);
However, it was using the ConsumerConfig instance (that config is passed in),
so if I want to make some configuration change in the assignor, I need to put
consumer prefix. To make the debugging even harder, there was an logAll()
function in AbstractConfig which will print "StreamsConfig values" at the
beginning, since it is indeed a stream config:
@Override
public void configure(final Map<String, ?> configs) {
final StreamsConfig streamsConfig = new StreamsConfig(configs);
(L190 in StreamPartitionAssignor)
This would further confuse developer as they see two different sets of
StreamsConfig: one from top level, one from this derived level per thread.
My point is that we could either: 1. let developer be aware that they need to
add consumer prefix to pass in configs to StreamPartitionAssignor 2. we found a
way to pass in original StreamsConfig.
I know this is a little bit lengthy description, let me know if you feel
unclear about my proposal, or this is not a concern since most people already
know the trick here, thank you!