vcrfxia commented on code in PR #13292:
URL: https://github.com/apache/kafka/pull/13292#discussion_r1136249663


##########
streams/src/main/java/org/apache/kafka/streams/processor/internals/InternalTopologyBuilder.java:
##########
@@ -1293,12 +1306,16 @@ private void setRegexMatchedTopicToStateStore() {
 
     private <S extends StateStore> InternalTopicConfig 
createChangelogTopicConfig(final StateStoreFactory<S> factory,
                                                                                
   final String name) {
-        if (factory.isWindowStore()) {
+        if (factory.isVersionedStore()) {
+            final VersionedChangelogTopicConfig config = new 
VersionedChangelogTopicConfig(name, factory.logConfig());
+            config.setMinCompactionLagMs(factory.historyRetention());

Review Comment:
   > Yes, I think it would be cleaner this way.
   
   OK, made the updates.
   
   > `delete.retetion.ms` is not for retention based topics, but it's for 
compacted topic
   
   Ah I see. My scala's not the best but it looks like `min.compaction.lag.ms` 
guarantees that any record within `min.compaction.lag.ms` of "now" will not be 
compacted, regardless of `delete.retention.ms`, which is the important point 
that we need to guarantee that older record versions are not prematurely 
compacted.
   
   Interestingly, we might have a case for setting `delete.retention.ms = 0` 
(rather than using the default of 24 hours) since we know that we no longer 
need the older tombstones once `min.compaction.lag.ms` is expired. It's not 
strictly necessary though. Do you think we should set it for completeness? I'm 
fine either way.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org

Reply via email to