[ 
https://issues.apache.org/jira/browse/KAFKA-15602?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Matthias J. Sax resolved KAFKA-15602.
-------------------------------------
    Fix Version/s: 3.4.2
                   3.5.2
                   3.7.0
                   3.6.1
         Assignee: Matthias J. Sax
       Resolution: Fixed

As discussed, reverted this in all applicable branches.

> Breaking change in 3.4.0 ByteBufferSerializer
> ---------------------------------------------
>
>                 Key: KAFKA-15602
>                 URL: https://issues.apache.org/jira/browse/KAFKA-15602
>             Project: Kafka
>          Issue Type: Bug
>          Components: producer 
>    Affects Versions: 3.4.0, 3.5.0, 3.4.1, 3.6.0, 3.5.1
>            Reporter: Luke Kirby
>            Assignee: Matthias J. Sax
>            Priority: Critical
>             Fix For: 3.4.2, 3.5.2, 3.7.0, 3.6.1
>
>
> [This PR|https://github.com/apache/kafka/pull/12683/files] claims to have 
> solved the situation described by KAFKA-4852, namely, to have 
> ByteBufferSerializer respect ByteBuffers wrapping byte arrays with non-0 
> offsets (or, put another way, to honor the buffer's position() as the start 
> point to consume bytes from). Unfortunately, it failed to actually do this, 
> and instead changed the expectations for how an input ByteBuffer's limit and 
> position should be set before being provided to send() on a producer 
> configured with ByteBufferSerializer. Code that worked with pre-3.4.0 
> releases now produce 0-length messages instead of the intended messages, 
> effectively introducing a breaking change for existing users of the 
> serializer in the wild.
> Here are a few different inputs and serialized outputs under pre-3.4.0 and 
> 3.4.0+ to summarize the breaking change:
> ||buffer argument||3.3.2 serialized output||3.4.0+ serialized output||
> |ByteBuffer.wrap("test".getBytes(UTF_8))|len=4 
> val=test|len=4 val=test|
> |ByteBuffer.allocate(8).put("test".getBytes(UTF_8)).flip()|len=4 
> val=test|len=0 val=|
> |ByteBuffer.allocate(8).put("test".getBytes(UTF_8))|len=8 
> val=test<0><0><0><0>|len=4 val=test|
> |ByteBuffer buff = ByteBuffer.allocate(8).put("test".getBytes(UTF_8));
> buff.limit(buff.position());|len=4 
> val=test|len=4 val=test|
> |ByteBuffer.wrap("test".getBytes(UTF_8), 1, 3)|len=4 val=test|len=1 val=t|
> Notably, plain-wrappers of byte arrays continue to work under both versions 
> due to the special case in the serializer for them. I suspect that this is 
> the dominant use-case, which is why this has apparently gone un-reported to 
> this point. The wrapped-with-offset case fails for both cases for different 
> reasons (the expected value would be "est"). As demonstrated here, you can 
> ensure that a manually assembled ByteBuffer will work under both versions by 
> ensuring that your buffers start have position == limit == message-length 
> (and an actual desired start position of 0). Clearly, though, behavior has 
> changed dramatically for the second and third case there, with the 3.3.2 
> behavior, in my experience, aligning better with naive expectations.
> [Previously|https://github.com/apache/kafka/blob/35a0de32ee3823dfb548a1cd5d5faf4f7c99e4e0/clients/src/main/java/org/apache/kafka/common/serialization/ByteBufferSerializer.java],
>  the serializer would just rewind() the buffer and respect the limit as the 
> indicator as to how much data was in the buffer. So, essentially, the 
> prevailing contract was that the data from position 0 (always!) up to the 
> limit on the buffer would be serialized; so it was really just the limit that 
> was honored. So if, per the original issue, you have a byte[] array wrapped 
> with, say, ByteBuffer.wrap(bytes, 3, 5) then that will yield a ByteBuffer() 
> with position = 3 indicating the desired start point to read from, but 
> effectively ignored by the serializer due to the rewind().
> So while the serializer didn't work when presenting a ByteBuffer view onto a 
> sub-view of a backing array, it did however follow expected behavior when 
> employing standard patterns to populate ByteBuffers backed by 
> larger-than-necessary arrays and using limit() to identify the end of actual 
> data, consistent with conventional usage of flip() to switch from writing to 
> a buffer to setting it up to be read from (e.g., to be passed into a 
> producer.send() call). E.g.,
> {code:java}
> ByteBuffer bb = ByteBuffer.allocate(TOO_MUCH);
> ... // some sequence of 
> bb.put(...); // populate buffer with some number of bytes less than TOO_MUCH 
> ... 
> bb.flip(); /* logically, this says "I am done writing, let's set this up for 
> reading"; pragmatically, it sets the limit to the current position so that 
> whoever reads the buffer knows when to stop reading, and sets the position to 
> zero so it knows where to start reading from */ 
> producer.send(bb); {code}
> Technically, you wouldn't even need to use flip() there, since position is 
> ignored; it would sufficient to just call {{{}bb.limit(bb.position()){}}}. 
> Notably, a buffer constructed using any variant of ByteBuffer.wrap() is 
> essentially immediately in read-mode with position indicating the start and 
> limit the end.
> With the change introduced in 3.4.0, however, the contract changes 
> dramatically, and the code just presented produces a 0-byte message. As 
> indicated above, it also continues to fail if you just passed in an 
> offset-specified ByteBuffer.wrap()ped message, too, i.e., the case described 
> by KAFKA-4852:
> {code:java}
> @Test
> public void testByteBufferSerializerOnOffsetWrappedBytes() {
>     final byte[] bytes = "Hello".getBytes(UTF_8);
>     try (final ByteBufferSerializer serializer = new ByteBufferSerializer()) {
>         assertArrayEquals("ello".getBytes(UTF_8), 
>                 // FAILS: this will yield "H", not "ello"
>                 serializer.serialize(topic, ByteBuffer.wrap(bytes, 1, 
> bytes.length - 1)));
>     }
> }
>  {code}
> What happened here?
> The resulting PR, it seems, focussed on a flawed proposed test case in the 
> first comment of KAFKA-4852 that failed against pre-3.4.0 Kafka. I reproduce 
> that here with commented annotations from me: 
> {code:java}
> @Test // flawed proposed test case
> public void testByteBufferSerializer() {
>     final byte[] bytes = "Hello".getBytes(UTF_8);
>     final ByteBuffer buffer = ByteBuffer.allocate(7);
>     buffer.put(bytes);
>     // buffer.flip();   <-- would make the test work
>     try (final ByteBufferSerializer serializer = new ByteBufferSerializer()) {
>         assertArrayEquals(bytes, serializer.serialize(topic, buffer));
>     }
> }  {code}
> In pre-3.4.0, this would yield a 7-byte serialization of "Hello" followed by 
> 2 0-value bytes. I contend that this was actually expected and correct 
> behavior, as the ByteBuffer had never had its limit set, so the implicit and 
> mildly expected contract was never actually abided by. If there was a 
> buffer.flip() after the .put(bytes) call, as the calling code _should_ have 
> applied, however, then the test would have succeeded. In short, by trying to 
> make this test case succeed, I think this PR represented nothing more than a 
> misunderstanding for how one should prepare ByteBuffers for reading, but has 
> managed to result in a breaking change. The breaking nature of this was 
> actually briefly noted in PR comments but kind of shrugged off with some test 
> changes and explanatory comments on the class.
> Obviously a correction to restore 3.3.2 behavior would represent another 
> breaking change for users that are running on 3.4+, unless they were also 
> somewhat surprisingly configuring buffers for position() == limit() before 
> passing them to send. Arguably, it would also be a breaking change (though 
> possibly not one of great consequence) if either version was changed to 
> correctly handle the wrapped-with-offset case as represented in the original 
> ticket.
> I do not have much experience contending with a situation like this, but at 
> the risk of jumping to a solution here, I wonder if the only way to really 
> move forward safely and unambiguously here is to remove ByteBufferSerializer 
> as it stands and replace it with a differently named substitute that handles 
> both the plain-wrapped special case and just serializes content from 
> position() to limit(), forcing an evaluation by users when upgrading as to 
> whether the provided byte buffer is correctly configured or not. Of course, a 
> change like that would have be released at an appropriate version level, too, 
> so I don't know exactly what the desired interim behavior would be 
> (deprecation?). I believe I would be eager to contribute to a fix, but 
> obviously I would need guidance from maintainers regarding the correct path 
> forward semantically.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)

Reply via email to