fqaiser94 commented on code in PR #13533: URL: https://github.com/apache/kafka/pull/13533#discussion_r1174622973
########## streams/src/main/java/org/apache/kafka/streams/kstream/internals/ChangedDeserializer.java: ########## @@ -50,64 +50,76 @@ public void setIfUnset(final SerdeGetter getter) { @Override public Change<T> deserialize(final String topic, final Headers headers, final byte[] data) { // The format we need to deserialize is: - // {BYTE_ARRAY oldValue}{BYTE newOldFlag=0} - // {BYTE_ARRAY newValue}{BYTE newOldFlag=1} - // {UINT32 newDataLength}{BYTE_ARRAY newValue}{BYTE_ARRAY oldValue}{BYTE newOldFlag=2} - // {BYTE_ARRAY oldValue}{BYTE isLatest}{BYTE newOldFlag=3} - // {BYTE_ARRAY newValue}{BYTE isLatest}{BYTE newOldFlag=4} - // {UINT32 newDataLength}{BYTE_ARRAY newValue}{BYTE_ARRAY oldValue}{BYTE isLatest}{BYTE newOldFlag=5} + // {BYTE_ARRAY oldValue}{BYTE encodingFlag=0} + // {BYTE_ARRAY newValue}{BYTE encodingFlag=1} + // {VARINT newDataLength}{BYTE_ARRAY newValue}{BYTE_ARRAY oldValue}{BYTE newOlvencodingFlagdFlag=2} Review Comment: ```suggestion // {VARINT newDataLength}{BYTE_ARRAY newValue}{BYTE_ARRAY oldValue}{BYTE encodingFlag=2} ``` ########## streams/src/main/java/org/apache/kafka/streams/kstream/internals/ChangedDeserializer.java: ########## @@ -50,64 +50,76 @@ public void setIfUnset(final SerdeGetter getter) { @Override public Change<T> deserialize(final String topic, final Headers headers, final byte[] data) { // The format we need to deserialize is: - // {BYTE_ARRAY oldValue}{BYTE newOldFlag=0} - // {BYTE_ARRAY newValue}{BYTE newOldFlag=1} - // {UINT32 newDataLength}{BYTE_ARRAY newValue}{BYTE_ARRAY oldValue}{BYTE newOldFlag=2} - // {BYTE_ARRAY oldValue}{BYTE isLatest}{BYTE newOldFlag=3} - // {BYTE_ARRAY newValue}{BYTE isLatest}{BYTE newOldFlag=4} - // {UINT32 newDataLength}{BYTE_ARRAY newValue}{BYTE_ARRAY oldValue}{BYTE isLatest}{BYTE newOldFlag=5} + // {BYTE_ARRAY oldValue}{BYTE encodingFlag=0} + // {BYTE_ARRAY newValue}{BYTE encodingFlag=1} + // {VARINT newDataLength}{BYTE_ARRAY newValue}{BYTE_ARRAY oldValue}{BYTE newOlvencodingFlagdFlag=2} + // {BYTE_ARRAY oldValue}{BYTE isLatest}{BYTE encodingFlag=3} + // {BYTE_ARRAY newValue}{BYTE isLatest}{BYTE encodingFlag=4} + // {VARINT newDataLength}{BYTE_ARRAY newValue}{BYTE_ARRAY oldValue}{BYTE isLatest}{BYTE vencodingFlag=5} Review Comment: ```suggestion // {VARINT newDataLength}{BYTE_ARRAY newValue}{BYTE_ARRAY oldValue}{BYTE isLatest}{BYTE encodingFlag=5} ``` ########## streams/src/test/java/org/apache/kafka/streams/integration/NamedTopologyIntegrationTest.java: ########## @@ -543,6 +543,16 @@ public void shouldAllowRemovingAndAddingNamedTopologyToRunningApplicationWithMul @Test public void shouldRemoveOneNamedTopologyWhileAnotherContinuesProcessing() throws Exception { + /* + Gradle Test Run :streams:unitTest > Gradle Test Executor 19 > NamedTopologyIntegrationTest > shouldRemoveOneNamedTopologyWhileAnotherContinuesProcessing() FAILED + java.lang.AssertionError: + Expected: <[KeyValue(B, 1), KeyValue(A, 2), KeyValue(C, 2)]> + but: was <[KeyValue(B, 1), KeyValue(A, 2), KeyValue(C, 1)]> + at org.hamcrest.MatcherAssert.assertThat(MatcherAssert.java:20) + at org.hamcrest.MatcherAssert.assertThat(MatcherAssert.java:6) + at org.apache.kafka.streams.integration.NamedTopologyIntegrationTest.shouldRemoveOneNamedTopologyWhileAnotherContinuesProcessing(NamedTopologyIntegrationTest.java:563) + + */ Review Comment: is there an issue with this test? It passes for me locally on your PR and on trunk. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org