vcrfxia commented on code in PR #13274:
URL: https://github.com/apache/kafka/pull/13274#discussion_r1110456636


##########
streams/src/main/java/org/apache/kafka/streams/kstream/internals/KeyValueStoreMaterializer.java:
##########
@@ -48,20 +55,30 @@ public StoreBuilder<TimestampedKeyValueStore<K, V>> 
materialize() {
             }
         }
 
-        final StoreBuilder<TimestampedKeyValueStore<K, V>> builder = 
Stores.timestampedKeyValueStoreBuilder(
-            supplier,
-            materialized.keySerde(),
-            materialized.valueSerde());
+        final StoreBuilder<?> builder;
+        if (supplier instanceof VersionedBytesStoreSupplier) {
+            builder = new VersionedKeyValueStoreBuilder<>(
+                (VersionedBytesStoreSupplier) supplier,
+                materialized.keySerde(),
+                materialized.valueSerde(),
+                Time.SYSTEM);
+        } else {
+            builder = Stores.timestampedKeyValueStoreBuilder(
+                supplier,
+                materialized.keySerde(),
+                materialized.valueSerde());
+        }
 
         if (materialized.loggingEnabled()) {
             builder.withLoggingEnabled(materialized.logConfig());
         } else {
             builder.withLoggingDisabled();
         }
 
-        if (materialized.cachingEnabled()) {
+        // versioned stores do not support caching

Review Comment:
   Regardless of whether the `Materialized` instance passed for a versioned 
store has caching enabled or not, the resulting store will not have caching 
enabled as versioned stores don't support caching. This could be confusing to 
users but I don't think we have a good way around it since `Materialized` 
instances default to having caching enabled. If `Materialized` defaulted to 
caching disabled, then we could throw an error if `withCachingEnabled()` is 
called for `Materialized` instances with versioned suppliers, but given that 
the default is already to have caching enabled, it seems harsh/inconvenient to 
require users to explicitly call `withCachingDisabled()` in order to use 
versioned stores.
   
   I think the most we should do is log a warning. Curious to hear other 
opinions.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org

Reply via email to