cadonna commented on a change in pull request #8254:
URL: https://github.com/apache/kafka/pull/8254#discussion_r424971223



##########
File path: 
streams/src/main/java/org/apache/kafka/streams/state/internals/MeteredTimestampedKeyValueStore.java
##########
@@ -53,4 +56,48 @@ void initStoreSerde(final ProcessorContext context) {
             keySerde == null ? (Serde<K>) context.keySerde() : keySerde,
             valueSerde == null ? new ValueAndTimestampSerde<>((Serde<V>) 
context.valueSerde()) : valueSerde);
     }
-}
\ No newline at end of file
+
+    public RawAndDeserializedValue<V> getWithBinary(final K key) {
+        try {
+            return maybeMeasureLatency(() -> { 
+                final byte[] serializedValue = wrapped().get(keyBytes(key));
+                return new RawAndDeserializedValue<V>(serializedValue, 
outerValue(serializedValue));
+            }, time, getSensor);
+        } catch (final ProcessorStateException e) {
+            final String message = String.format(e.getMessage(), key);
+            throw new ProcessorStateException(message, e);
+        }
+    }
+
+    public boolean putIfDifferentValues(final K key,
+                                        final ValueAndTimestamp<V> newValue,
+                                        final byte[] oldSerializedValue) {
+        try {
+            return maybeMeasureLatency(
+                () -> {
+                    final byte[] newSerializedValue = 
serdes.rawValue(newValue);
+                    if 
(ValueAndTimestampSerializer.compareValuesAndCheckForIncreasingTimestamp(oldSerializedValue,
 newSerializedValue)) {
+                        return false;
+                    } else {
+                        wrapped().put(keyBytes(key), newSerializedValue);

Review comment:
       > Why when the timestamp of the newer value is lower, do we want to put 
the new value into the store? Surely the store should have the value with the 
newer timestamp? Otherwise we could wind up with a corrupt store.
   
   This behavior was there also before this PR. If a out-of-order record is 
encountered, a log message was written, but the record was nevertheless put 
into the state store (cf. 
https://github.com/apache/kafka/blob/7624e6247984223901aa34d7b7c2789c3e1d0c3f/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KTableSource.java#L122).
 The only thing that changed is that if the serialized value of the new record 
is equal to the serialized value of the old value and the timestamp of the new 
record is equal or newer, we drop the record because it is a idempotent update.
   Could you elaborate on why a store should get corrupted because of this? 




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


Reply via email to