[GitHub] [kafka] rodesai commented on a change in pull request #8254: KIP-557: Add Emit On Change Support

2020-05-14 Thread GitBox


rodesai commented on a change in pull request #8254:
URL: https://github.com/apache/kafka/pull/8254#discussion_r424941145



##
File path: 
streams/src/main/java/org/apache/kafka/streams/state/internals/MeteredTimestampedKeyValueStore.java
##
@@ -53,4 +56,48 @@ void initStoreSerde(final ProcessorContext context) {
 keySerde == null ? (Serde) context.keySerde() : keySerde,
 valueSerde == null ? new ValueAndTimestampSerde<>((Serde) 
context.valueSerde()) : valueSerde);
 }
-}
\ No newline at end of file
+
+public RawAndDeserializedValue getWithBinary(final K key) {
+try {
+return maybeMeasureLatency(() -> { 
+final byte[] serializedValue = wrapped().get(keyBytes(key));
+return new RawAndDeserializedValue(serializedValue, 
outerValue(serializedValue));
+}, time, getSensor);
+} catch (final ProcessorStateException e) {
+final String message = String.format(e.getMessage(), key);
+throw new ProcessorStateException(message, e);
+}
+}
+
+public boolean putIfDifferentValues(final K key,
+final ValueAndTimestamp newValue,
+final byte[] oldSerializedValue) {
+try {
+return maybeMeasureLatency(
+() -> {
+final byte[] newSerializedValue = 
serdes.rawValue(newValue);
+if 
(ValueAndTimestampSerializer.compareValuesAndCheckForIncreasingTimestamp(oldSerializedValue,
 newSerializedValue)) {
+return false;
+} else {
+wrapped().put(keyBytes(key), newSerializedValue);

Review comment:
   @ConcurrencyPractitioner @vvcephei I'm trying to understand this to 
debug some broken tests in ksql. Couple questions:
   
   When the timestamp of the newer value is lower (ignoring the value), why do 
we want to put the new value into the store? Surely the store should have the 
value with the newer timestamp? Otherwise we could wind up with a corrupt store.
   
   Don't we still want to put the value in the store (even if we don't forward 
it on to the next context) if the values are the same but the timestamp is 
newer? Otherwise if we get an out-of-order update with a different value, but a 
timestamp in between the rows with the same value, we'd incorrectly put that 
value into the store, e.g. the following updates:
   
   TS: 1, K: X, V: A
   TS: 3, K: X, V: A
   TS: 2, K: X, V: B
   
   would result in the table containing `K: X, V: B`, which is wrong.





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] rodesai commented on a change in pull request #8254: KIP-557: Add Emit On Change Support

2020-05-14 Thread GitBox


rodesai commented on a change in pull request #8254:
URL: https://github.com/apache/kafka/pull/8254#discussion_r424941145



##
File path: 
streams/src/main/java/org/apache/kafka/streams/state/internals/MeteredTimestampedKeyValueStore.java
##
@@ -53,4 +56,48 @@ void initStoreSerde(final ProcessorContext context) {
 keySerde == null ? (Serde) context.keySerde() : keySerde,
 valueSerde == null ? new ValueAndTimestampSerde<>((Serde) 
context.valueSerde()) : valueSerde);
 }
-}
\ No newline at end of file
+
+public RawAndDeserializedValue getWithBinary(final K key) {
+try {
+return maybeMeasureLatency(() -> { 
+final byte[] serializedValue = wrapped().get(keyBytes(key));
+return new RawAndDeserializedValue(serializedValue, 
outerValue(serializedValue));
+}, time, getSensor);
+} catch (final ProcessorStateException e) {
+final String message = String.format(e.getMessage(), key);
+throw new ProcessorStateException(message, e);
+}
+}
+
+public boolean putIfDifferentValues(final K key,
+final ValueAndTimestamp newValue,
+final byte[] oldSerializedValue) {
+try {
+return maybeMeasureLatency(
+() -> {
+final byte[] newSerializedValue = 
serdes.rawValue(newValue);
+if 
(ValueAndTimestampSerializer.compareValuesAndCheckForIncreasingTimestamp(oldSerializedValue,
 newSerializedValue)) {
+return false;
+} else {
+wrapped().put(keyBytes(key), newSerializedValue);

Review comment:
   @ConcurrencyPractitioner @vvcephei I'm trying to understand this to 
debug some broken tests in ksql. Couple questions:
   
   Why when the timestamp of the newer value is lower, do we want to put the 
new value into the store? Surely the store should have the value with the newer 
timestamp? Otherwise we could wind up with a corrupt store.
   
   Don't we still want to put the value in the store if the values are the same 
but the timestamp is newer? Otherwise if we get an out-of-order update with a 
different value, but a timestamp in between the rows with the same value, we'd 
incorrectly put that value into the store, e.g. the following updates:
   
   TS: 1, K: X, V: A
   TS: 3, K: X, V: A
   TS: 2, K: X, V: B
   
   would result in the table containing `K: X, V: B`, which is wrong.





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org