cadonna commented on code in PR #13756:
URL: https://github.com/apache/kafka/pull/13756#discussion_r1227894014


##########
streams/src/main/java/org/apache/kafka/streams/state/internals/RocksDBTimeOrderedKeyValueBuffer.java:
##########
@@ -0,0 +1,171 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.streams.state.internals;
+
+import org.apache.kafka.common.serialization.Serde;
+import org.apache.kafka.common.utils.Bytes;
+import org.apache.kafka.streams.KeyValue;
+import org.apache.kafka.streams.processor.ProcessorContext;
+import org.apache.kafka.streams.processor.StateStore;
+import org.apache.kafka.streams.processor.StateStoreContext;
+import org.apache.kafka.streams.processor.api.Record;
+import org.apache.kafka.streams.processor.internals.ProcessorRecordContext;
+import org.apache.kafka.streams.processor.internals.SerdeGetter;
+import org.apache.kafka.streams.state.KeyValueIterator;
+import org.apache.kafka.streams.state.ValueAndTimestamp;
+
+import java.nio.ByteBuffer;
+import java.time.Duration;
+import java.util.function.Consumer;
+import java.util.function.Supplier;
+
+import static java.util.Objects.requireNonNull;
+
+public class RocksDBTimeOrderedKeyValueBuffer<K, V, T> extends 
WrappedStateStore<RocksDBTimeOrderedKeyValueSegmentedBytesStore, Object, 
Object> implements TimeOrderedKeyValueBuffer<K, V, V> {

Review Comment:
   @wcarlson5 Sorry for commenting again on the segmented state store but I 
really want to understand since they are quite new to me. 
   As far as I understand, you extend the 
`AbstractRocksDBTimeOrderedSegmentedBytesStore` which is a state store that 
stores records in timestamp-order into segments and it has an index on the 
record key. It seems to me that you do not use the segments nor the index. I 
understand that `AbstractRocksDBTimeOrderedSegmentedBytesStore` implements 
interface `SegmentedByteStore` which gives you the ability to fetch data by 
time ranges. Furthermore, `AbstractRocksDBTimeOrderedSegmentedBytesStore` 
maintains the observed stream time which is also something that comes in handy 
for your PR.
   
   My proposal is to hide the segmented aspect of the state store as much as 
possible. More precisely: 
   - Rename `RocksDBTimeOrderedKeyValueSegmentedBytesStore` to 
`RocksDBTimeOrderedKeyValueBytesStore`
   - Do not expose `retention` and `segmentInterval` in 
`RocksDBTimeOrderedKeyValueBytesStore`'s constructor and in the corresponding 
supplier.
   - Set both `retention` and `segmentInterval` in the call to the super 
constructor to high values as @vcrfxia suggested.
   
   In such a way, you can use the features you want to use from the time 
ordered segmented state store and at the same hide all the aspects that you do 
not need behind your abstraction. WDYT? 
   



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org

Reply via email to