vcrfxia commented on code in PR #13292: URL: https://github.com/apache/kafka/pull/13292#discussion_r1136249663
########## streams/src/main/java/org/apache/kafka/streams/processor/internals/InternalTopologyBuilder.java: ########## @@ -1293,12 +1306,16 @@ private void setRegexMatchedTopicToStateStore() { private <S extends StateStore> InternalTopicConfig createChangelogTopicConfig(final StateStoreFactory<S> factory, final String name) { - if (factory.isWindowStore()) { + if (factory.isVersionedStore()) { + final VersionedChangelogTopicConfig config = new VersionedChangelogTopicConfig(name, factory.logConfig()); + config.setMinCompactionLagMs(factory.historyRetention()); Review Comment: > Yes, I think it would be cleaner this way. OK, made the updates. > `delete.retetion.ms` is not for retention based topics, but it's for compacted topic Ah I see. My scala's not the best but it looks like `min.compaction.lag.ms` guarantees that any record within `min.compaction.lag.ms` of "now" will not be compacted, regardless of `delete.retention.ms`, which is the important point that we need to guarantee that older record versions are not prematurely compacted. Interestingly, we might have a case for setting `delete.retention.ms = 0` (rather than using the default of 24 hours) since we know that we no longer need the older tombstones once `min.compaction.lag.ms` is expired. It's not strictly necessary though. Do you think we should set it for completeness? I'm fine either way. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org