vcrfxia commented on code in PR #13442: URL: https://github.com/apache/kafka/pull/13442#discussion_r1151109249
########## streams/src/main/java/org/apache/kafka/streams/state/Stores.java: ########## @@ -110,6 +116,73 @@ public static KeyValueBytesStoreSupplier persistentTimestampedKeyValueStore(fina return new RocksDbKeyValueBytesStoreSupplier(name, true); } + /** + * Create a persistent versioned key-value store {@link VersionedBytesStoreSupplier}. + * <p> + * This store supplier can be passed into a + * {@link #versionedKeyValueStoreBuilder(VersionedBytesStoreSupplier, Serde, Serde)}. + * + * @param name name of the store (cannot be {@code null}) + * @param historyRetention length of time that old record versions are available for query + * (cannot be negative). If a timestamp bound provided to + * {@link VersionedKeyValueStore#get(Object, long)} is older than this + * specified history retention, then the get operation will not return data. + * This parameter also determines the "grace period" after which + * out-of-order writes will no longer be accepted. + * @return an instance of {@link VersionedBytesStoreSupplier} + * @throws IllegalArgumentException if {@code historyRetention} can't be represented as {@code long milliseconds} + */ + public static VersionedBytesStoreSupplier persistentVersionedKeyValueStore(final String name, + final Duration historyRetention) { + Objects.requireNonNull(name, "name cannot be null"); + final String hrMsgPrefix = prepareMillisCheckFailMsgPrefix(historyRetention, "historyRetention"); + final long historyRetentionMs = validateMillisecondDuration(historyRetention, hrMsgPrefix); + if (historyRetentionMs < 0L) { + throw new IllegalArgumentException("historyRetention cannot be negative"); + } + return new RocksDbVersionedKeyValueBytesStoreSupplier(name, historyRetentionMs); + } + + /** + * Create a persistent versioned key-value store {@link VersionedBytesStoreSupplier}. + * <p> + * This store supplier can be passed into a + * {@link #versionedKeyValueStoreBuilder(VersionedBytesStoreSupplier, Serde, Serde)}. + * + * @param name name of the store (cannot be {@code null}) + * @param historyRetention length of time that old record versions are available for query + * (cannot be negative). If a timestamp bound provided to + * {@link VersionedKeyValueStore#get(Object, long)} is older than this + * specified history retention, then the get operation will not return data. + * This parameter also determines the "grace period" after which + * out-of-order writes will no longer be accepted. + * @param segmentInterval size of segments for storing old record versions (must be positive). Old record versions + * for the same key in a single segment are stored (updated and accessed) together. + * The only impact of this parameter is performance. If segments are large + * and a workload results in many record versions for the same key being collected + * in a single segment, performance may degrade as a result. On the other hand, + * reads and out-of-order writes which access older segments may slow down if Review Comment: Yeah, that's correct. Updated. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org