vcrfxia commented on code in PR #13442:
URL: https://github.com/apache/kafka/pull/13442#discussion_r1151109249


##########
streams/src/main/java/org/apache/kafka/streams/state/Stores.java:
##########
@@ -110,6 +116,73 @@ public static KeyValueBytesStoreSupplier 
persistentTimestampedKeyValueStore(fina
         return new RocksDbKeyValueBytesStoreSupplier(name, true);
     }
 
+    /**
+     * Create a persistent versioned key-value store {@link 
VersionedBytesStoreSupplier}.
+     * <p>
+     * This store supplier can be passed into a
+     * {@link #versionedKeyValueStoreBuilder(VersionedBytesStoreSupplier, 
Serde, Serde)}.
+     *
+     * @param name             name of the store (cannot be {@code null})
+     * @param historyRetention length of time that old record versions are 
available for query
+     *                         (cannot be negative). If a timestamp bound 
provided to
+     *                         {@link VersionedKeyValueStore#get(Object, 
long)} is older than this
+     *                         specified history retention, then the get 
operation will not return data.
+     *                         This parameter also determines the "grace 
period" after which
+     *                         out-of-order writes will no longer be accepted.
+     * @return an instance of {@link VersionedBytesStoreSupplier}
+     * @throws IllegalArgumentException if {@code historyRetention} can't be 
represented as {@code long milliseconds}
+     */
+    public static VersionedBytesStoreSupplier 
persistentVersionedKeyValueStore(final String name,
+                                                                               
final Duration historyRetention) {
+        Objects.requireNonNull(name, "name cannot be null");
+        final String hrMsgPrefix = 
prepareMillisCheckFailMsgPrefix(historyRetention, "historyRetention");
+        final long historyRetentionMs = 
validateMillisecondDuration(historyRetention, hrMsgPrefix);
+        if (historyRetentionMs < 0L) {
+            throw new IllegalArgumentException("historyRetention cannot be 
negative");
+        }
+        return new RocksDbVersionedKeyValueBytesStoreSupplier(name, 
historyRetentionMs);
+    }
+
+    /**
+     * Create a persistent versioned key-value store {@link 
VersionedBytesStoreSupplier}.
+     * <p>
+     * This store supplier can be passed into a
+     * {@link #versionedKeyValueStoreBuilder(VersionedBytesStoreSupplier, 
Serde, Serde)}.
+     *
+     * @param name             name of the store (cannot be {@code null})
+     * @param historyRetention length of time that old record versions are 
available for query
+     *                         (cannot be negative). If a timestamp bound 
provided to
+     *                         {@link VersionedKeyValueStore#get(Object, 
long)} is older than this
+     *                         specified history retention, then the get 
operation will not return data.
+     *                         This parameter also determines the "grace 
period" after which
+     *                         out-of-order writes will no longer be accepted.
+     * @param segmentInterval  size of segments for storing old record 
versions (must be positive). Old record versions
+     *                         for the same key in a single segment are stored 
(updated and accessed) together.
+     *                         The only impact of this parameter is 
performance. If segments are large
+     *                         and a workload results in many record versions 
for the same key being collected
+     *                         in a single segment, performance may degrade as 
a result. On the other hand,
+     *                         reads and out-of-order writes which access 
older segments may slow down if

Review Comment:
   Yeah, that's correct. Updated.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org

Reply via email to