ableegoldman commented on a change in pull request #11424: URL: https://github.com/apache/kafka/pull/11424#discussion_r798399133
########## File path: streams/src/main/java/org/apache/kafka/streams/processor/internals/namedtopology/TopologyConfig.java ########## @@ -116,14 +123,30 @@ public TopologyConfig(final String topologyName, final StreamsConfig globalAppCo maxBufferedSize = getInt(BUFFERED_RECORDS_PER_PARTITION_CONFIG); log.info("Topology {} is overriding {} to {}", topologyName, BUFFERED_RECORDS_PER_PARTITION_CONFIG, maxBufferedSize); } else { - maxBufferedSize = globalAppConfigs.getInt(BUFFERED_RECORDS_PER_PARTITION_CONFIG); + maxBufferedSize = globalAppConfigs.originals().containsKey(StreamsConfig.BUFFERED_RECORDS_PER_PARTITION_CONFIG) + ? globalAppConfigs.getInt(StreamsConfig.BUFFERED_RECORDS_PER_PARTITION_CONFIG) : -1; } - if (isTopologyOverride(CACHE_MAX_BYTES_BUFFERING_CONFIG, topologyOverrides)) { - cacheSize = getLong(CACHE_MAX_BYTES_BUFFERING_CONFIG); - log.info("Topology {} is overriding {} to {}", topologyName, CACHE_MAX_BYTES_BUFFERING_CONFIG, cacheSize); + if (isTopologyOverride(STATESTORE_CACHE_MAX_BYTES_CONFIG, topologyOverrides) || + isTopologyOverride(CACHE_MAX_BYTES_BUFFERING_CONFIG, topologyOverrides)) { + + if (isTopologyOverride(STATESTORE_CACHE_MAX_BYTES_CONFIG, topologyOverrides) && isTopologyOverride(CACHE_MAX_BYTES_BUFFERING_CONFIG, topologyOverrides)) { + cacheSize = getLong(STATESTORE_CACHE_MAX_BYTES_CONFIG); + log.info("Topology {} is using both {} and deprecated config {}. overriding {} to {}", + topologyName, + STATESTORE_CACHE_MAX_BYTES_CONFIG, + CACHE_MAX_BYTES_BUFFERING_CONFIG, + STATESTORE_CACHE_MAX_BYTES_CONFIG, + cacheSize); + } else if (isTopologyOverride(CACHE_MAX_BYTES_BUFFERING_CONFIG, topologyOverrides)) { + cacheSize = getLong(CACHE_MAX_BYTES_BUFFERING_CONFIG); + log.info("Topology {} is using deprecated config {}. overriding {} to {}", topologyName, CACHE_MAX_BYTES_BUFFERING_CONFIG, CACHE_MAX_BYTES_BUFFERING_CONFIG, cacheSize); + } else { + cacheSize = getLong(STATESTORE_CACHE_MAX_BYTES_CONFIG); + log.info("Topology {} is overriding {} to {}", topologyName, STATESTORE_CACHE_MAX_BYTES_CONFIG, cacheSize); + } } else { - cacheSize = globalAppConfigs.getLong(CACHE_MAX_BYTES_BUFFERING_CONFIG); + cacheSize = globalAppConfigs.getLong(STATESTORE_CACHE_MAX_BYTES_CONFIG); Review comment: Ah, I see the confusion. The `#isTopologyOverride` method checks whether the config has been overridden for the specific topology, ie has been set in the Properties passed in to `StreamsBuilder#build` -- it's not looking at what we call the `globalAppConfigs` which are the actual application configs: ie those passed in to the `KafkaStreams` constructor. So basically there are two sets of configs. The value should be taken as the first of these to be set by the user, in the following order: 1) `statestore.cache.max.bytes` in `topologyOverrides` 2) `cache.max.bytes.buffering` in `topologyOverrides` 3)`statestore.cache.max.bytes` in `globalAppConfigs` 4) `cache.max.bytes.buffering` in `globalAppConfigs` Essentially, using `#getTotalCacheSize` on the `topologyOverrides` if either of them is set (which this PR is doing) and on the `globalAppConfigs` if they are not (which is the regression here). On that note -- we also need to move `##getTotalCacheSize` out of StreamsConfig, because it's a public class and wasn't listed as a public API in the KIP (nor should it be, imo). I recommend creating a new static utility class for things like this, eg `StreamsConfigUtils` in the `org.apache.kafka.streams.internals` package. There are some other methods that would belong there, for example the `StreamThread` methods `#processingMode` and `#eosEnabled` should be moved as well Hope that all makes sense -- and lmk if you don't think you'll have the time to put out a full patch, and I or another Streams dev can help out 🙂 -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org