[GitHub] [kafka] rodesai commented on a change in pull request #8254: KIP-557: Add Emit On Change Support
rodesai commented on a change in pull request #8254: URL: https://github.com/apache/kafka/pull/8254#discussion_r424941145 ## File path: streams/src/main/java/org/apache/kafka/streams/state/internals/MeteredTimestampedKeyValueStore.java ## @@ -53,4 +56,48 @@ void initStoreSerde(final ProcessorContext context) { keySerde == null ? (Serde) context.keySerde() : keySerde, valueSerde == null ? new ValueAndTimestampSerde<>((Serde) context.valueSerde()) : valueSerde); } -} \ No newline at end of file + +public RawAndDeserializedValue getWithBinary(final K key) { +try { +return maybeMeasureLatency(() -> { +final byte[] serializedValue = wrapped().get(keyBytes(key)); +return new RawAndDeserializedValue(serializedValue, outerValue(serializedValue)); +}, time, getSensor); +} catch (final ProcessorStateException e) { +final String message = String.format(e.getMessage(), key); +throw new ProcessorStateException(message, e); +} +} + +public boolean putIfDifferentValues(final K key, +final ValueAndTimestamp newValue, +final byte[] oldSerializedValue) { +try { +return maybeMeasureLatency( +() -> { +final byte[] newSerializedValue = serdes.rawValue(newValue); +if (ValueAndTimestampSerializer.compareValuesAndCheckForIncreasingTimestamp(oldSerializedValue, newSerializedValue)) { +return false; +} else { +wrapped().put(keyBytes(key), newSerializedValue); Review comment: @ConcurrencyPractitioner @vvcephei I'm trying to understand this to debug some broken tests in ksql. Couple questions: When the timestamp of the newer value is lower (ignoring the value), why do we want to put the new value into the store? Surely the store should have the value with the newer timestamp? Otherwise we could wind up with a corrupt store. Don't we still want to put the value in the store (even if we don't forward it on to the next context) if the values are the same but the timestamp is newer? Otherwise if we get an out-of-order update with a different value, but a timestamp in between the rows with the same value, we'd incorrectly put that value into the store, e.g. the following updates: TS: 1, K: X, V: A TS: 3, K: X, V: A TS: 2, K: X, V: B would result in the table containing `K: X, V: B`, which is wrong. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] rodesai commented on a change in pull request #8254: KIP-557: Add Emit On Change Support
rodesai commented on a change in pull request #8254: URL: https://github.com/apache/kafka/pull/8254#discussion_r424941145 ## File path: streams/src/main/java/org/apache/kafka/streams/state/internals/MeteredTimestampedKeyValueStore.java ## @@ -53,4 +56,48 @@ void initStoreSerde(final ProcessorContext context) { keySerde == null ? (Serde) context.keySerde() : keySerde, valueSerde == null ? new ValueAndTimestampSerde<>((Serde) context.valueSerde()) : valueSerde); } -} \ No newline at end of file + +public RawAndDeserializedValue getWithBinary(final K key) { +try { +return maybeMeasureLatency(() -> { +final byte[] serializedValue = wrapped().get(keyBytes(key)); +return new RawAndDeserializedValue(serializedValue, outerValue(serializedValue)); +}, time, getSensor); +} catch (final ProcessorStateException e) { +final String message = String.format(e.getMessage(), key); +throw new ProcessorStateException(message, e); +} +} + +public boolean putIfDifferentValues(final K key, +final ValueAndTimestamp newValue, +final byte[] oldSerializedValue) { +try { +return maybeMeasureLatency( +() -> { +final byte[] newSerializedValue = serdes.rawValue(newValue); +if (ValueAndTimestampSerializer.compareValuesAndCheckForIncreasingTimestamp(oldSerializedValue, newSerializedValue)) { +return false; +} else { +wrapped().put(keyBytes(key), newSerializedValue); Review comment: @ConcurrencyPractitioner @vvcephei I'm trying to understand this to debug some broken tests in ksql. Couple questions: Why when the timestamp of the newer value is lower, do we want to put the new value into the store? Surely the store should have the value with the newer timestamp? Otherwise we could wind up with a corrupt store. Don't we still want to put the value in the store if the values are the same but the timestamp is newer? Otherwise if we get an out-of-order update with a different value, but a timestamp in between the rows with the same value, we'd incorrectly put that value into the store, e.g. the following updates: TS: 1, K: X, V: A TS: 3, K: X, V: A TS: 2, K: X, V: B would result in the table containing `K: X, V: B`, which is wrong. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org