mjsax commented on code in PR #13264:
URL: https://github.com/apache/kafka/pull/13264#discussion_r1122527645


##########
streams/src/main/java/org/apache/kafka/streams/state/internals/KeyValueStoreWrapper.java:
##########
@@ -0,0 +1,149 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.streams.state.internals;
+
+import org.apache.kafka.streams.errors.InvalidStateStoreException;
+import org.apache.kafka.streams.processor.StateStore;
+import org.apache.kafka.streams.processor.StateStoreContext;
+import org.apache.kafka.streams.processor.api.ProcessorContext;
+import org.apache.kafka.streams.query.Position;
+import org.apache.kafka.streams.query.PositionBound;
+import org.apache.kafka.streams.query.Query;
+import org.apache.kafka.streams.query.QueryConfig;
+import org.apache.kafka.streams.query.QueryResult;
+import org.apache.kafka.streams.state.TimestampedKeyValueStore;
+import org.apache.kafka.streams.state.ValueAndTimestamp;
+import org.apache.kafka.streams.state.VersionedKeyValueStore;
+import org.apache.kafka.streams.state.VersionedRecord;
+
+/**
+ * A wrapper class for non-windowed key-value stores used within the DSL. All 
such stores are
+ * instances of either {@link TimestampedKeyValueStore} or {@link 
VersionedKeyValueStore}.
+ *
+ * @param <K> The key type
+ * @param <V> The value type
+ */
+public class KeyValueStoreWrapper<K, V> implements StateStore {
+
+    private final StateStore store;
+    private final StoreType storeType;
+
+    private enum StoreType {
+        TIMESTAMPED,
+        VERSIONED;
+    }
+
+    public KeyValueStoreWrapper(final ProcessorContext<?, ?> context, final 
String storeName) {
+        store = context.getStateStore(storeName);
+
+        if (store instanceof TimestampedKeyValueStore) {

Review Comment:
   I think this is fine. -- Even if we have the cast as in the original code, I 
don't think it would be safer. The compile check cannot verify the generic 
types anyway, because `getStateStore` does not contain any information about it 
-- and at runtime, the generic types are gone and the actually "cast" if 
necessary from `Object` to `K` (or `V`) would happen somewhere else in the code.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org

Reply via email to