[ 
https://issues.apache.org/jira/browse/KAFKA-15602?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17778511#comment-17778511
 ] 

Matthias J. Sax commented on KAFKA-15602:
-----------------------------------------

Hey, [~pnee] pointed me to this ticket. I am not a ByterBuffer expert, but it 
seems to me, that a reasonable API contract would be, that the user is 
responsible to prepare the buffer for reading, and the returned byte[] array is 
from the buffer's position to limit. Any other behavior seems odd to me. 
Thoughts? (Not sure if the position should be modified by the serializer or not 
thought? I tend to think yes?)

Overall, it seems like a simple case to fix forward? Given that the standard 
use case (position = 0 and limit = max) works in all releases (example (1) in 
the ticket description), I am not really worries about introducing a breaking 
change. If the above propose contract makes sense, it's rather a bug fix and we 
should just push it out? (In the end KAFKA-4852 was done without a KIP, too – 
kinda boarder line to begin with...) 

For example (2), pre-3.4 seems to be correct, while 3.4+ is broken.

For example (3), pre-3.4 seems to be correct, while 3.4+ seems to be incorrect 
– the user did not limit to 4 bytes, so all 8 bytes should be returned IMHO?

For example (4), pre-3.4 seems to be incorrect, while 3.4+ seems to be 
incorrect, too? In the end, the position was not set back to zero, hence a 
zero-length array should be returned?

And last example (5), as already discussed, is also broken in all versions, but 
should result "est".
{quote}I can see the problem comes from the serializer doesn't know if the 
position is an offset or just the next byte to be written.  These are two 
different definitions of the position so it doesn't really make sense to handle 
both cases in a single API call.
{quote}
Not sure if I can follow here? What do you mean bey "is an offset or just the 
next byte to be written"? Why would position have two different definitions?

\cc [~showuon] as RM for 3.5.2 release – might be worth to get this fix in, too?

> Breaking change in 3.4.0 ByteBufferSerializer
> ---------------------------------------------
>
>                 Key: KAFKA-15602
>                 URL: https://issues.apache.org/jira/browse/KAFKA-15602
>             Project: Kafka
>          Issue Type: Bug
>          Components: producer 
>    Affects Versions: 3.4.0, 3.5.0, 3.4.1, 3.6.0, 3.5.1
>            Reporter: Luke Kirby
>            Priority: Critical
>
> [This PR|https://github.com/apache/kafka/pull/12683/files] claims to have 
> solved the situation described by KAFKA-4852, namely, to have 
> ByteBufferSerializer respect ByteBuffers wrapping byte arrays with non-0 
> offsets (or, put another way, to honor the buffer's position() as the start 
> point to consume bytes from). Unfortunately, it failed to actually do this, 
> and instead changed the expectations for how an input ByteBuffer's limit and 
> position should be set before being provided to send() on a producer 
> configured with ByteBufferSerializer. Code that worked with pre-3.4.0 
> releases now produce 0-length messages instead of the intended messages, 
> effectively introducing a breaking change for existing users of the 
> serializer in the wild.
> Here are a few different inputs and serialized outputs under pre-3.4.0 and 
> 3.4.0+ to summarize the breaking change:
> ||buffer argument||3.3.2 serialized output||3.4.0+ serialized output||
> |ByteBuffer.wrap("test".getBytes(UTF_8))|len=4 
> val=test|len=4 val=test|
> |ByteBuffer.allocate(8).put("test".getBytes(UTF_8)).flip()|len=4 
> val=test|len=0 val=|
> |ByteBuffer.allocate(8).put("test".getBytes(UTF_8))|len=8 
> val=test<0><0><0><0>|len=4 val=test|
> |ByteBuffer buff = ByteBuffer.allocate(8).put("test".getBytes(UTF_8));
> buff.limit(buff.position());|len=4 
> val=test|len=4 val=test|
> |ByteBuffer.wrap("test".getBytes(UTF_8), 1, 3)|len=4 val=test|len=1 val=t|
> Notably, plain-wrappers of byte arrays continue to work under both versions 
> due to the special case in the serializer for them. I suspect that this is 
> the dominant use-case, which is why this has apparently gone un-reported to 
> this point. The wrapped-with-offset case fails for both cases for different 
> reasons (the expected value would be "est"). As demonstrated here, you can 
> ensure that a manually assembled ByteBuffer will work under both versions by 
> ensuring that your buffers start have position == limit == message-length 
> (and an actual desired start position of 0). Clearly, though, behavior has 
> changed dramatically for the second and third case there, with the 3.3.2 
> behavior, in my experience, aligning better with naive expectations.
> [Previously|https://github.com/apache/kafka/blob/35a0de32ee3823dfb548a1cd5d5faf4f7c99e4e0/clients/src/main/java/org/apache/kafka/common/serialization/ByteBufferSerializer.java],
>  the serializer would just rewind() the buffer and respect the limit as the 
> indicator as to how much data was in the buffer. So, essentially, the 
> prevailing contract was that the data from position 0 (always!) up to the 
> limit on the buffer would be serialized; so it was really just the limit that 
> was honored. So if, per the original issue, you have a byte[] array wrapped 
> with, say, ByteBuffer.wrap(bytes, 3, 5) then that will yield a ByteBuffer() 
> with position = 3 indicating the desired start point to read from, but 
> effectively ignored by the serializer due to the rewind().
> So while the serializer didn't work when presenting a ByteBuffer view onto a 
> sub-view of a backing array, it did however follow expected behavior when 
> employing standard patterns to populate ByteBuffers backed by 
> larger-than-necessary arrays and using limit() to identify the end of actual 
> data, consistent with conventional usage of flip() to switch from writing to 
> a buffer to setting it up to be read from (e.g., to be passed into a 
> producer.send() call). E.g.,
> {code:java}
> ByteBuffer bb = ByteBuffer.allocate(TOO_MUCH);
> ... // some sequence of 
> bb.put(...); // populate buffer with some number of bytes less than TOO_MUCH 
> ... 
> bb.flip(); /* logically, this says "I am done writing, let's set this up for 
> reading"; pragmatically, it sets the limit to the current position so that 
> whoever reads the buffer knows when to stop reading, and sets the position to 
> zero so it knows where to start reading from */ 
> producer.send(bb); {code}
> Technically, you wouldn't even need to use flip() there, since position is 
> ignored; it would sufficient to just call {{{}bb.limit(bb.position()){}}}. 
> Notably, a buffer constructed using any variant of ByteBuffer.wrap() is 
> essentially immediately in read-mode with position indicating the start and 
> limit the end.
> With the change introduced in 3.4.0, however, the contract changes 
> dramatically, and the code just presented produces a 0-byte message. As 
> indicated above, it also continues to fail if you just passed in an 
> offset-specified ByteBuffer.wrap()ped message, too, i.e., the case described 
> by KAFKA-4852:
> {code:java}
> @Test
> public void testByteBufferSerializerOnOffsetWrappedBytes() {
>     final byte[] bytes = "Hello".getBytes(UTF_8);
>     try (final ByteBufferSerializer serializer = new ByteBufferSerializer()) {
>         assertArrayEquals("ello".getBytes(UTF_8), 
>                 // FAILS: this will yield "H", not "ello"
>                 serializer.serialize(topic, ByteBuffer.wrap(bytes, 1, 
> bytes.length - 1)));
>     }
> }
>  {code}
> What happened here?
> The resulting PR, it seems, focussed on a flawed proposed test case in the 
> first comment of KAFKA-4852 that failed against pre-3.4.0 Kafka. I reproduce 
> that here with commented annotations from me: 
> {code:java}
> @Test // flawed proposed test case
> public void testByteBufferSerializer() {
>     final byte[] bytes = "Hello".getBytes(UTF_8);
>     final ByteBuffer buffer = ByteBuffer.allocate(7);
>     buffer.put(bytes);
>     // buffer.flip();   <-- would make the test work
>     try (final ByteBufferSerializer serializer = new ByteBufferSerializer()) {
>         assertArrayEquals(bytes, serializer.serialize(topic, buffer));
>     }
> }  {code}
> In pre-3.4.0, this would yield a 7-byte serialization of "Hello" followed by 
> 2 0-value bytes. I contend that this was actually expected and correct 
> behavior, as the ByteBuffer had never had its limit set, so the implicit and 
> mildly expected contract was never actually abided by. If there was a 
> buffer.flip() after the .put(bytes) call, as the calling code _should_ have 
> applied, however, then the test would have succeeded. In short, by trying to 
> make this test case succeed, I think this PR represented nothing more than a 
> misunderstanding for how one should prepare ByteBuffers for reading, but has 
> managed to result in a breaking change. The breaking nature of this was 
> actually briefly noted in PR comments but kind of shrugged off with some test 
> changes and explanatory comments on the class.
> Obviously a correction to restore 3.3.2 behavior would represent another 
> breaking change for users that are running on 3.4+, unless they were also 
> somewhat surprisingly configuring buffers for position() == limit() before 
> passing them to send. Arguably, it would also be a breaking change (though 
> possibly not one of great consequence) if either version was changed to 
> correctly handle the wrapped-with-offset case as represented in the original 
> ticket.
> I do not have much experience contending with a situation like this, but at 
> the risk of jumping to a solution here, I wonder if the only way to really 
> move forward safely and unambiguously here is to remove ByteBufferSerializer 
> as it stands and replace it with a differently named substitute that handles 
> both the plain-wrapped special case and just serializes content from 
> position() to limit(), forcing an evaluation by users when upgrading as to 
> whether the provided byte buffer is correctly configured or not. Of course, a 
> change like that would have be released at an appropriate version level, too, 
> so I don't know exactly what the desired interim behavior would be 
> (deprecation?). I believe I would be eager to contribute to a fix, but 
> obviously I would need guidance from maintainers regarding the correct path 
> forward semantically.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)

Reply via email to