ableegoldman commented on a change in pull request #11424:
URL: https://github.com/apache/kafka/pull/11424#discussion_r798399133



##########
File path: 
streams/src/main/java/org/apache/kafka/streams/processor/internals/namedtopology/TopologyConfig.java
##########
@@ -116,14 +123,30 @@ public TopologyConfig(final String topologyName, final 
StreamsConfig globalAppCo
             maxBufferedSize = getInt(BUFFERED_RECORDS_PER_PARTITION_CONFIG);
             log.info("Topology {} is overriding {} to {}", topologyName, 
BUFFERED_RECORDS_PER_PARTITION_CONFIG, maxBufferedSize);
         } else {
-            maxBufferedSize = 
globalAppConfigs.getInt(BUFFERED_RECORDS_PER_PARTITION_CONFIG);
+            maxBufferedSize = 
globalAppConfigs.originals().containsKey(StreamsConfig.BUFFERED_RECORDS_PER_PARTITION_CONFIG)
+                    ? 
globalAppConfigs.getInt(StreamsConfig.BUFFERED_RECORDS_PER_PARTITION_CONFIG) : 
-1;
         }
 
-        if (isTopologyOverride(CACHE_MAX_BYTES_BUFFERING_CONFIG, 
topologyOverrides)) {
-            cacheSize = getLong(CACHE_MAX_BYTES_BUFFERING_CONFIG);
-            log.info("Topology {} is overriding {} to {}", topologyName, 
CACHE_MAX_BYTES_BUFFERING_CONFIG, cacheSize);
+        if (isTopologyOverride(STATESTORE_CACHE_MAX_BYTES_CONFIG, 
topologyOverrides) ||
+                isTopologyOverride(CACHE_MAX_BYTES_BUFFERING_CONFIG, 
topologyOverrides)) {
+
+            if (isTopologyOverride(STATESTORE_CACHE_MAX_BYTES_CONFIG, 
topologyOverrides) && isTopologyOverride(CACHE_MAX_BYTES_BUFFERING_CONFIG, 
topologyOverrides)) {
+                cacheSize = getLong(STATESTORE_CACHE_MAX_BYTES_CONFIG);
+                log.info("Topology {} is using both {} and deprecated config 
{}. overriding {} to {}",
+                        topologyName,
+                        STATESTORE_CACHE_MAX_BYTES_CONFIG,
+                        CACHE_MAX_BYTES_BUFFERING_CONFIG,
+                        STATESTORE_CACHE_MAX_BYTES_CONFIG,
+                        cacheSize);
+            } else if (isTopologyOverride(CACHE_MAX_BYTES_BUFFERING_CONFIG, 
topologyOverrides)) {
+                cacheSize = getLong(CACHE_MAX_BYTES_BUFFERING_CONFIG);
+                log.info("Topology {} is using deprecated config {}. 
overriding {} to {}", topologyName, CACHE_MAX_BYTES_BUFFERING_CONFIG, 
CACHE_MAX_BYTES_BUFFERING_CONFIG, cacheSize);
+            } else {
+                cacheSize = getLong(STATESTORE_CACHE_MAX_BYTES_CONFIG);
+                log.info("Topology {} is overriding {} to {}", topologyName, 
STATESTORE_CACHE_MAX_BYTES_CONFIG, cacheSize);
+            }
         } else {
-            cacheSize = 
globalAppConfigs.getLong(CACHE_MAX_BYTES_BUFFERING_CONFIG);
+            cacheSize = 
globalAppConfigs.getLong(STATESTORE_CACHE_MAX_BYTES_CONFIG);

Review comment:
       Ah, I see the confusion. The `#isTopologyOverride` method checks whether 
the config has been overridden for the specific topology, ie has been set in 
the Properties passed in to `StreamsBuilder#build` -- it's not looking at what 
we call the `globalAppConfigs` which are the actual application configs: ie 
those passed in to the `KafkaStreams` constructor.
   
   So basically there are two sets of configs. The value should be taken as the 
first of these to be set by the user, in the following order:
   1) `statestore.cache.max.bytes` in `topologyOverrides` 
   2) `cache.max.bytes.buffering` in `topologyOverrides` 
   3)`statestore.cache.max.bytes` in `globalAppConfigs`
   4) `cache.max.bytes.buffering` in `globalAppConfigs`
   
   Essentially, using `#getTotalCacheSize` on the `topologyOverrides` if either 
of them is set (which this PR is doing) and on the `globalAppConfigs` if they 
are not (which is the regression here).
   
   On that note -- we also need to move `##getTotalCacheSize` out of 
StreamsConfig, because it's a public class and wasn't listed as a public API in 
the KIP (nor should it be, imo). I recommend creating a new static utility 
class for things like this, eg `StreamsConfigUtils` in the 
`org.apache.kafka.streams.internals` package. There are some other methods that 
would belong there, for example the `StreamThread` methods `#processingMode` 
and `#eosEnabled` should be moved as well
   
   Hope that all makes sense -- and lmk if you don't think you'll have the time 
to put out a full patch, and I or another Streams dev can help out 🙂 




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


Reply via email to