cadonna commented on a change in pull request #8254:
URL: https://github.com/apache/kafka/pull/8254#discussion_r424984599



##########
File path: 
streams/src/main/java/org/apache/kafka/streams/state/internals/MeteredTimestampedKeyValueStore.java
##########
@@ -53,4 +56,48 @@ void initStoreSerde(final ProcessorContext context) {
             keySerde == null ? (Serde<K>) context.keySerde() : keySerde,
             valueSerde == null ? new ValueAndTimestampSerde<>((Serde<V>) 
context.valueSerde()) : valueSerde);
     }
-}
\ No newline at end of file
+
+    public RawAndDeserializedValue<V> getWithBinary(final K key) {
+        try {
+            return maybeMeasureLatency(() -> { 
+                final byte[] serializedValue = wrapped().get(keyBytes(key));
+                return new RawAndDeserializedValue<V>(serializedValue, 
outerValue(serializedValue));
+            }, time, getSensor);
+        } catch (final ProcessorStateException e) {
+            final String message = String.format(e.getMessage(), key);
+            throw new ProcessorStateException(message, e);
+        }
+    }
+
+    public boolean putIfDifferentValues(final K key,
+                                        final ValueAndTimestamp<V> newValue,
+                                        final byte[] oldSerializedValue) {
+        try {
+            return maybeMeasureLatency(
+                () -> {
+                    final byte[] newSerializedValue = 
serdes.rawValue(newValue);
+                    if 
(ValueAndTimestampSerializer.compareValuesAndCheckForIncreasingTimestamp(oldSerializedValue,
 newSerializedValue)) {
+                        return false;
+                    } else {
+                        wrapped().put(keyBytes(key), newSerializedValue);

Review comment:
       > Don't we still want to put the value in the store (even if we don't 
forward it on to the next context) if the values are the same but the timestamp 
is newer?
   
   If we just put the value in the store but did not forward it, then the store 
would actually be corrupted, because the local state would not be consistent 
with downstream anymore.
   
   Not putting a record with the same value but a newer timestamp in the store 
and not forwarding it was the main point of this KIP.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


Reply via email to