showuon commented on a change in pull request #11959: URL: https://github.com/apache/kafka/pull/11959#discussion_r837110177
########## File path: streams/src/main/java/org/apache/kafka/streams/TopologyConfig.java ########## @@ -136,37 +137,54 @@ public TopologyConfig(final String topologyName, final StreamsConfig globalAppCo maxBufferedSize = getInt(BUFFERED_RECORDS_PER_PARTITION_CONFIG); log.info("Topology {} is overriding {} to {}", topologyName, BUFFERED_RECORDS_PER_PARTITION_CONFIG, maxBufferedSize); } else { + // If the user hasn't explicitly set the buffered.records.per.partition config, then leave it unbounded + // and rely on the input.buffer.max.bytes instead to keep the memory usage under control maxBufferedSize = globalAppConfigs.originals().containsKey(StreamsConfig.BUFFERED_RECORDS_PER_PARTITION_CONFIG) ? globalAppConfigs.getInt(StreamsConfig.BUFFERED_RECORDS_PER_PARTITION_CONFIG) : -1; } - if (isTopologyOverride(STATESTORE_CACHE_MAX_BYTES_CONFIG, topologyOverrides) && isTopologyOverride(CACHE_MAX_BYTES_BUFFERING_CONFIG, topologyOverrides)) { - cacheSize = getLong(STATESTORE_CACHE_MAX_BYTES_CONFIG); - log.info("Topology {} is using both deprecated config {} and new config {}, hence {} is ignored and the new config {} (value {}) is used", - topologyName, - CACHE_MAX_BYTES_BUFFERING_CONFIG, - STATESTORE_CACHE_MAX_BYTES_CONFIG, - CACHE_MAX_BYTES_BUFFERING_CONFIG, - STATESTORE_CACHE_MAX_BYTES_CONFIG, - cacheSize); - } else if (isTopologyOverride(CACHE_MAX_BYTES_BUFFERING_CONFIG, topologyOverrides)) { - cacheSize = getLong(CACHE_MAX_BYTES_BUFFERING_CONFIG); - log.info("Topology {} is using only deprecated config {}, and will be used to set cache size to {}; " + - "we suggest setting the new config {} instead as deprecated {} would be removed in the future.", - topologyName, - CACHE_MAX_BYTES_BUFFERING_CONFIG, - cacheSize, - STATESTORE_CACHE_MAX_BYTES_CONFIG, - CACHE_MAX_BYTES_BUFFERING_CONFIG); - } else if (isTopologyOverride(STATESTORE_CACHE_MAX_BYTES_CONFIG, topologyOverrides)) { - cacheSize = getLong(STATESTORE_CACHE_MAX_BYTES_CONFIG); + final boolean stateStoreCacheMaxBytesOverridden = isTopologyOverride(STATESTORE_CACHE_MAX_BYTES_CONFIG, topologyOverrides); + final boolean cacheMaxBytesBufferingOverridden = isTopologyOverride(CACHE_MAX_BYTES_BUFFERING_CONFIG, topologyOverrides); + + if (!stateStoreCacheMaxBytesOverridden && !cacheMaxBytesBufferingOverridden) { + cacheSize = getTotalCacheSize(globalAppConfigs); } else { - cacheSize = globalAppConfigs.getTotalCacheSize(); + if (stateStoreCacheMaxBytesOverridden && cacheMaxBytesBufferingOverridden) { + cacheSize = getLong(STATESTORE_CACHE_MAX_BYTES_CONFIG); + log.info("Topology {} is using both deprecated config {} and new config {}, hence {} is ignored and the new config {} (value {}) is used", + topologyName, + CACHE_MAX_BYTES_BUFFERING_CONFIG, + STATESTORE_CACHE_MAX_BYTES_CONFIG, + CACHE_MAX_BYTES_BUFFERING_CONFIG, + STATESTORE_CACHE_MAX_BYTES_CONFIG, + cacheSize); + } else if (cacheMaxBytesBufferingOverridden) { + cacheSize = getLong(CACHE_MAX_BYTES_BUFFERING_CONFIG); + log.info("Topology {} is using only deprecated config {}, and will be used to set cache size to {}; " + + "we suggest setting the new config {} instead as deprecated {} would be removed in the future.", + topologyName, + CACHE_MAX_BYTES_BUFFERING_CONFIG, + cacheSize, + STATESTORE_CACHE_MAX_BYTES_CONFIG, + CACHE_MAX_BYTES_BUFFERING_CONFIG); + } else { + cacheSize = getLong(STATESTORE_CACHE_MAX_BYTES_CONFIG); + } + + if (cacheSize != 0) { + log.warn("Topology {} is overriding cache size to {} but this will not have any effect as the " + + "topology-level cache size config only controls whether record buffering is enabled " + + "or disabled, thus the only valid override value is 0", + topologyName, cacheSize); Review comment: nit: call `maybeLogWarningForAllowedCacheSizeValue` instead? ########## File path: streams/src/main/java/org/apache/kafka/streams/KafkaStreams.java ########## @@ -1047,9 +1048,9 @@ private static Metrics getMetrics(final StreamsConfig config, final Time time, f // and then resize them later streamThread = createAndAddStreamThread(0L, 0L, threadIdx); final int numLiveThreads = getNumLiveStreamThreads(); - resizeThreadCacheAndBufferMemory(numLiveThreads + 1); + resizeThreadCacheAndBufferMemory(numLiveThreads); log.info("Adding StreamThread-{}, there are now {} threads with cache size/max buffer size values as {} per thread.", - threadIdx, numLiveThreads + 1, getThreadCacheAndBufferMemoryString()); + threadIdx, numLiveThreads, getThreadCacheAndBufferMemoryString()); Review comment: I think we should add tests for it, to make sure after adding a thread, the cache size and buffer memory is set as what we expected. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org