This is an automated email from the ASF dual-hosted git repository. bbejeck pushed a commit to branch trunk in repository https://gitbox.apache.org/repos/asf/kafka.git
The following commit(s) were added to refs/heads/trunk by this push: new a7e771c KAFKA-8452: Compressed BufferValue review follow-up (#6940) a7e771c is described below commit a7e771c6da72bb7f3c5c5cbab3dc9c4fd403f866 Author: John Roesler <vvcep...@users.noreply.github.com> AuthorDate: Wed Jun 19 11:45:35 2019 -0500 KAFKA-8452: Compressed BufferValue review follow-up (#6940) Belatedly address a few code review comments from #6848 Reviewers: Bill Bejeck <bbej...@gmail.com> --- .../kafka/streams/kstream/internals/Change.java | 2 +- .../streams/kstream/internals/FullChangeSerde.java | 4 ++-- .../kafka/streams/state/internals/BufferValue.java | 5 ++++- .../internals/InMemoryTimeOrderedKeyValueBuffer.java | 4 ++-- .../kstream/internals/FullChangeSerdeTest.java | 20 ++++++++++---------- .../streams/state/internals/BufferValueTest.java | 4 ++++ .../internals/TimeOrderedKeyValueBufferTest.java | 14 +++++++------- 7 files changed, 30 insertions(+), 23 deletions(-) diff --git a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/Change.java b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/Change.java index f28a16d..c9a18de 100644 --- a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/Change.java +++ b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/Change.java @@ -30,7 +30,7 @@ public class Change<T> { @Override public String toString() { - return "(" + String.valueOf(newValue) + "<-" + String.valueOf(oldValue) + ")"; + return "(" + newValue + "<-" + oldValue + ")"; } @Override diff --git a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/FullChangeSerde.java b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/FullChangeSerde.java index f28f9e7..5d7c7e3 100644 --- a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/FullChangeSerde.java +++ b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/FullChangeSerde.java @@ -72,7 +72,7 @@ public final class FullChangeSerde<T> { * We used to serialize a Change into a single byte[]. Now, we don't anymore, but we still keep this logic here * so that we can produce the legacy format to test that we can still deserialize it. */ - public static byte[] composeLegacyFormat(final Change<byte[]> serialChange) { + public static byte[] mergeChangeArraysIntoSingleLegacyFormattedArray(final Change<byte[]> serialChange) { if (serialChange == null) { return null; } @@ -99,7 +99,7 @@ public final class FullChangeSerde<T> { * We used to serialize a Change into a single byte[]. Now, we don't anymore, but we still * need to be able to read it (so that we can load the state store from previously-written changelog records). */ - public static Change<byte[]> decomposeLegacyFormat(final byte[] data) { + public static Change<byte[]> decomposeLegacyFormattedArrayIntoChangeArrays(final byte[] data) { if (data == null) { return null; } diff --git a/streams/src/main/java/org/apache/kafka/streams/state/internals/BufferValue.java b/streams/src/main/java/org/apache/kafka/streams/state/internals/BufferValue.java index f1990c7..b52ec24 100644 --- a/streams/src/main/java/org/apache/kafka/streams/state/internals/BufferValue.java +++ b/streams/src/main/java/org/apache/kafka/streams/state/internals/BufferValue.java @@ -30,7 +30,10 @@ public final class BufferValue { private final byte[] newValue; private final ProcessorRecordContext recordContext; - BufferValue(final byte[] priorValue, final byte[] oldValue, final byte[] newValue, final ProcessorRecordContext recordContext) { + BufferValue(final byte[] priorValue, + final byte[] oldValue, + final byte[] newValue, + final ProcessorRecordContext recordContext) { this.oldValue = oldValue; this.newValue = newValue; this.recordContext = recordContext; diff --git a/streams/src/main/java/org/apache/kafka/streams/state/internals/InMemoryTimeOrderedKeyValueBuffer.java b/streams/src/main/java/org/apache/kafka/streams/state/internals/InMemoryTimeOrderedKeyValueBuffer.java index 6c6ef36..cecfa4d 100644 --- a/streams/src/main/java/org/apache/kafka/streams/state/internals/InMemoryTimeOrderedKeyValueBuffer.java +++ b/streams/src/main/java/org/apache/kafka/streams/state/internals/InMemoryTimeOrderedKeyValueBuffer.java @@ -296,7 +296,7 @@ public final class InMemoryTimeOrderedKeyValueBuffer<K, V> implements TimeOrdere final byte[] changelogValue = new byte[record.value().length - 8]; timeAndValue.get(changelogValue); - final Change<byte[]> change = requireNonNull(FullChangeSerde.decomposeLegacyFormat(changelogValue)); + final Change<byte[]> change = requireNonNull(FullChangeSerde.decomposeLegacyFormattedArrayIntoChangeArrays(changelogValue)); final ProcessorRecordContext recordContext = new ProcessorRecordContext( record.timestamp(), @@ -326,7 +326,7 @@ public final class InMemoryTimeOrderedKeyValueBuffer<K, V> implements TimeOrdere timeAndValue.get(changelogValue); final ContextualRecord contextualRecord = ContextualRecord.deserialize(ByteBuffer.wrap(changelogValue)); - final Change<byte[]> change = requireNonNull(FullChangeSerde.decomposeLegacyFormat(contextualRecord.value())); + final Change<byte[]> change = requireNonNull(FullChangeSerde.decomposeLegacyFormattedArrayIntoChangeArrays(contextualRecord.value())); cleanPut( time, diff --git a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/FullChangeSerdeTest.java b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/FullChangeSerdeTest.java index 97e6c06..ac6762f 100644 --- a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/FullChangeSerdeTest.java +++ b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/FullChangeSerdeTest.java @@ -29,8 +29,8 @@ public class FullChangeSerdeTest { @Test public void shouldRoundTripNull() { assertThat(serde.serializeParts(null, null), nullValue()); - assertThat(FullChangeSerde.composeLegacyFormat(null), nullValue()); - assertThat(FullChangeSerde.decomposeLegacyFormat(null), nullValue()); + assertThat(FullChangeSerde.mergeChangeArraysIntoSingleLegacyFormattedArray(null), nullValue()); + assertThat(FullChangeSerde.decomposeLegacyFormattedArrayIntoChangeArrays(null), nullValue()); assertThat(serde.deserializeParts(null, null), nullValue()); } @@ -47,9 +47,9 @@ public class FullChangeSerdeTest { is(new Change<String>(null, null)) ); - final byte[] legacyFormat = FullChangeSerde.composeLegacyFormat(new Change<>(null, null)); + final byte[] legacyFormat = FullChangeSerde.mergeChangeArraysIntoSingleLegacyFormattedArray(new Change<>(null, null)); assertThat( - FullChangeSerde.decomposeLegacyFormat(legacyFormat), + FullChangeSerde.decomposeLegacyFormattedArrayIntoChangeArrays(legacyFormat), is(new Change<byte[]>(null, null)) ); } @@ -57,8 +57,8 @@ public class FullChangeSerdeTest { @Test public void shouldRoundTripOldNull() { final Change<byte[]> serialized = serde.serializeParts(null, new Change<>("new", null)); - final byte[] legacyFormat = FullChangeSerde.composeLegacyFormat(serialized); - final Change<byte[]> decomposedLegacyFormat = FullChangeSerde.decomposeLegacyFormat(legacyFormat); + final byte[] legacyFormat = FullChangeSerde.mergeChangeArraysIntoSingleLegacyFormattedArray(serialized); + final Change<byte[]> decomposedLegacyFormat = FullChangeSerde.decomposeLegacyFormattedArrayIntoChangeArrays(legacyFormat); assertThat( serde.deserializeParts(null, decomposedLegacyFormat), is(new Change<>("new", null)) @@ -68,8 +68,8 @@ public class FullChangeSerdeTest { @Test public void shouldRoundTripNewNull() { final Change<byte[]> serialized = serde.serializeParts(null, new Change<>(null, "old")); - final byte[] legacyFormat = FullChangeSerde.composeLegacyFormat(serialized); - final Change<byte[]> decomposedLegacyFormat = FullChangeSerde.decomposeLegacyFormat(legacyFormat); + final byte[] legacyFormat = FullChangeSerde.mergeChangeArraysIntoSingleLegacyFormattedArray(serialized); + final Change<byte[]> decomposedLegacyFormat = FullChangeSerde.decomposeLegacyFormattedArrayIntoChangeArrays(legacyFormat); assertThat( serde.deserializeParts(null, decomposedLegacyFormat), is(new Change<>(null, "old")) @@ -79,8 +79,8 @@ public class FullChangeSerdeTest { @Test public void shouldRoundTripChange() { final Change<byte[]> serialized = serde.serializeParts(null, new Change<>("new", "old")); - final byte[] legacyFormat = FullChangeSerde.composeLegacyFormat(serialized); - final Change<byte[]> decomposedLegacyFormat = FullChangeSerde.decomposeLegacyFormat(legacyFormat); + final byte[] legacyFormat = FullChangeSerde.mergeChangeArraysIntoSingleLegacyFormattedArray(serialized); + final Change<byte[]> decomposedLegacyFormat = FullChangeSerde.decomposeLegacyFormattedArrayIntoChangeArrays(legacyFormat); assertThat( serde.deserializeParts(null, decomposedLegacyFormat), is(new Change<>("new", "old")) diff --git a/streams/src/test/java/org/apache/kafka/streams/state/internals/BufferValueTest.java b/streams/src/test/java/org/apache/kafka/streams/state/internals/BufferValueTest.java index d663461..ad9b5f8 100644 --- a/streams/src/test/java/org/apache/kafka/streams/state/internals/BufferValueTest.java +++ b/streams/src/test/java/org/apache/kafka/streams/state/internals/BufferValueTest.java @@ -25,6 +25,7 @@ import java.util.Arrays; import static org.hamcrest.MatcherAssert.assertThat; import static org.hamcrest.Matchers.is; import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertNotEquals; import static org.junit.Assert.assertNull; import static org.junit.Assert.assertSame; @@ -55,6 +56,7 @@ public class BufferValueTest { final BufferValue bufferValue = new BufferValue(priorValue, oldValue, null, null); assertSame(priorValue, bufferValue.priorValue()); assertSame(oldValue, bufferValue.oldValue()); + assertNotEquals(bufferValue.priorValue(), bufferValue.oldValue()); } @Test @@ -64,6 +66,7 @@ public class BufferValueTest { final BufferValue bufferValue = new BufferValue(priorValue, oldValue, null, null); assertNull(bufferValue.priorValue()); assertSame(oldValue, bufferValue.oldValue()); + assertNotEquals(bufferValue.priorValue(), bufferValue.oldValue()); } @Test @@ -73,6 +76,7 @@ public class BufferValueTest { final BufferValue bufferValue = new BufferValue(priorValue, oldValue, null, null); assertSame(priorValue, bufferValue.priorValue()); assertNull(bufferValue.oldValue()); + assertNotEquals(bufferValue.priorValue(), bufferValue.oldValue()); } @Test diff --git a/streams/src/test/java/org/apache/kafka/streams/state/internals/TimeOrderedKeyValueBufferTest.java b/streams/src/test/java/org/apache/kafka/streams/state/internals/TimeOrderedKeyValueBufferTest.java index 5c9cbf9..57816c7 100644 --- a/streams/src/test/java/org/apache/kafka/streams/state/internals/TimeOrderedKeyValueBufferTest.java +++ b/streams/src/test/java/org/apache/kafka/streams/state/internals/TimeOrderedKeyValueBufferTest.java @@ -362,10 +362,10 @@ public class TimeOrderedKeyValueBufferTest<B extends TimeOrderedKeyValueBuffer<S final FullChangeSerde<String> serializer = FullChangeSerde.wrap(Serdes.String()); - final byte[] todeleteValue = FullChangeSerde.composeLegacyFormat(serializer.serializeParts(null, new Change<>("doomed", null))); - final byte[] asdfValue = FullChangeSerde.composeLegacyFormat(serializer.serializeParts(null, new Change<>("qwer", null))); - final byte[] zxcvValue1 = FullChangeSerde.composeLegacyFormat(serializer.serializeParts(null, new Change<>("eo4im", "previous"))); - final byte[] zxcvValue2 = FullChangeSerde.composeLegacyFormat(serializer.serializeParts(null, new Change<>("next", "eo4im"))); + final byte[] todeleteValue = FullChangeSerde.mergeChangeArraysIntoSingleLegacyFormattedArray(serializer.serializeParts(null, new Change<>("doomed", null))); + final byte[] asdfValue = FullChangeSerde.mergeChangeArraysIntoSingleLegacyFormattedArray(serializer.serializeParts(null, new Change<>("qwer", null))); + final byte[] zxcvValue1 = FullChangeSerde.mergeChangeArraysIntoSingleLegacyFormattedArray(serializer.serializeParts(null, new Change<>("eo4im", "previous"))); + final byte[] zxcvValue2 = FullChangeSerde.mergeChangeArraysIntoSingleLegacyFormattedArray(serializer.serializeParts(null, new Change<>("next", "eo4im"))); stateRestoreCallback.restoreBatch(asList( new ConsumerRecord<>("changelog-topic", 0, @@ -478,11 +478,11 @@ public class TimeOrderedKeyValueBufferTest<B extends TimeOrderedKeyValueBuffer<S final byte[] asdfValue = getContextualRecord("qwer", 1).serialize(0).array(); final FullChangeSerde<String> fullChangeSerde = FullChangeSerde.wrap(Serdes.String()); final byte[] zxcvValue1 = new ContextualRecord( - FullChangeSerde.composeLegacyFormat(fullChangeSerde.serializeParts(null, new Change<>("3o4im", "previous"))), + FullChangeSerde.mergeChangeArraysIntoSingleLegacyFormattedArray(fullChangeSerde.serializeParts(null, new Change<>("3o4im", "previous"))), getContext(2L) ).serialize(0).array(); final byte[] zxcvValue2 = new ContextualRecord( - FullChangeSerde.composeLegacyFormat(fullChangeSerde.serializeParts(null, new Change<>("next", "3o4im"))), + FullChangeSerde.mergeChangeArraysIntoSingleLegacyFormattedArray(fullChangeSerde.serializeParts(null, new Change<>("next", "3o4im"))), getContext(3L) ).serialize(0).array(); stateRestoreCallback.restoreBatch(asList( @@ -773,7 +773,7 @@ public class TimeOrderedKeyValueBufferTest<B extends TimeOrderedKeyValueBuffer<S private static ContextualRecord getContextualRecord(final String value, final long timestamp) { final FullChangeSerde<String> fullChangeSerde = FullChangeSerde.wrap(Serdes.String()); return new ContextualRecord( - FullChangeSerde.composeLegacyFormat(fullChangeSerde.serializeParts(null, new Change<>(value, null))), + FullChangeSerde.mergeChangeArraysIntoSingleLegacyFormattedArray(fullChangeSerde.serializeParts(null, new Change<>(value, null))), getContext(timestamp) ); }