Luke Kirby created KAFKA-15602:
----------------------------------

             Summary: Breaking change in 3.4.0 ByteBufferSerializer
                 Key: KAFKA-15602
                 URL: https://issues.apache.org/jira/browse/KAFKA-15602
             Project: Kafka
          Issue Type: Bug
          Components: producer 
    Affects Versions: 3.5.1, 3.6.0, 3.4.1, 3.5.0, 3.4.0
            Reporter: Luke Kirby


[This PR|https://github.com/apache/kafka/pull/12683/files] claims to have 
solved the situation described by KAFKA-4852, namely, to have 
ByteBufferSerializer respect ByteBuffers wrapping byte arrays with non-0 
offsets (or, put another way, to honor the buffer's position() as the start 
point to consume bytes from). Unfortunately, it failed to actually do this, and 
instead changed the expectations for how an input ByteBuffer's limit and 
position should be set before being provided to send() on a producer configured 
with ByteBufferSerializer. Code that worked with pre-3.4.0 releases now produce 
0-length messages instead of the intended messages, effectively introducing a 
breaking change for existing users of the serializer in the wild.

Here are a few different inputs and serialized outputs under pre-3.4.0 and 
3.4.0+ to summarize the breaking change:
||buffer argument||3.3.2 serialized output||3.4.0+ serialized output||
|ByteBuffer.wrap("test".getBytes(UTF_8))|len=4 
val=test|len=4 val=test|
|ByteBuffer.allocate(8).put("test".getBytes(UTF_8)).flip()|len=4 
val=test|len=0 val=|
|ByteBuffer.allocate(8).put("test".getBytes(UTF_8))|len=8 
val=test<0><0><0><0>|len=4 val=test|
|ByteBuffer buff = ByteBuffer.allocate(8).put("test".getBytes(UTF_8));
buff.limit(buff.position());|len=4 
val=test|len=4 val=test|
|ByteBuffer.wrap("test".getBytes(UTF_8), 1, 3)|len=4 val=test|len=1 val=t|

Notably, plain-wrappers of byte arrays continue to work under both versions due 
to the special case in the serializer for them. I suspect that this is the 
dominant use-case, which is why this has apparently gone un-reported to this 
point. The wrapped-with-offset case fails for both cases for different reasons 
(the expected value would be "est"). As demonstrated here, you can ensure that 
a manually assembled ByteBuffer will work under both versions by ensuring that 
your buffers start have position == limit == message-length (and an actual 
desired start position of 0). Clearly, though, behavior has changed 
dramatically for the second and third case there, with the 3.3.2 behavior, in 
my experience, aligning better with naive expectations.

[Previously|https://github.com/apache/kafka/blob/35a0de32ee3823dfb548a1cd5d5faf4f7c99e4e0/clients/src/main/java/org/apache/kafka/common/serialization/ByteBufferSerializer.java],
 the serializer would just rewind() the buffer and respect the limit as the 
indicator as to how much data was in the buffer. So, essentially, the 
prevailing contract was that the data from position 0 (always!) up to the limit 
on the buffer would be serialized; so it was really just the limit that was 
honored. So if, per the original issue, you have a byte[] array wrapped with, 
say, ByteBuffer.wrap(bytes, 3, 5) then that will yield a ByteBuffer() with 
position = 3 indicating the desired start point to read from, but effectively 
ignored by the serializer due to the rewind().

So while the serializer didn't work when presenting a ByteBuffer view onto a 
sub-view of a backing array, it did however follow expected behavior when 
employing standard patterns to populate ByteBuffers backed by 
larger-than-necessary arrays and using limit() to identify the end of actual 
data, consistent with conventional usage of flip() to switch from writing to a 
buffer to setting it up to be read from (e.g., to be passed into a 
producer.send() call). E.g.,
{code:java}
ByteBuffer bb = ByteBuffer.allocate(TOO_MUCH);
... // some sequence of 
bb.put(...); // populate buffer with some number of bytes less than TOO_MUCH 
... 
bb.flip(); /* logically, this says "I am done writing, let's set this up for 
reading"; pragmatically, it sets the limit to the current position so that 
whoever reads the buffer knows when to stop reading, and sets the position to 
zero so it knows where to start reading from */ 
producer.send(bb); {code}
Technically, you wouldn't even need to use flip() there, since position is 
ignored; it would sufficient to just call {{{}bb.limit(bb.position()){}}}. 
Notably, a buffer constructed using any variant of ByteBuffer.wrap() is 
essentially immediately in read-mode with position indicating the start and 
limit the end.

With the change introduced in 3.4.0, however, the contract changes 
dramatically, and the code just presented produces a 0-byte message. As 
indicated above, it also continues to fail if you just passed in an 
offset-specified ByteBuffer.wrap()ped message, too, i.e., the case described by 
KAFKA-4852:
{code:java}
@Test
public void testByteBufferSerializerOnOffsetWrappedBytes() {
    final byte[] bytes = "Hello".getBytes(UTF_8);
    try (final ByteBufferSerializer serializer = new ByteBufferSerializer()) {
        assertArrayEquals("ello".getBytes(UTF_8), 
                // FAILS: this will yield "H", not "ello"
                serializer.serialize(topic, ByteBuffer.wrap(bytes, 1, 
bytes.length - 1)));
    }
}
 {code}
What happened here?

The resulting PR, it seems, focussed on a flawed proposed test case in the 
first comment of KAFKA-4852 that failed against pre-3.4.0 Kafka. I reproduce 
that here with commented annotations from me: 
{code:java}
@Test // flawed proposed test case
public void testByteBufferSerializer() {
    final byte[] bytes = "Hello".getBytes(UTF_8);
    final ByteBuffer buffer = ByteBuffer.allocate(7);
    buffer.put(bytes);
    // buffer.flip();   <-- would make the test work

    try (final ByteBufferSerializer serializer = new ByteBufferSerializer()) {
        assertArrayEquals(bytes, serializer.serialize(topic, buffer));
    }
}  {code}
In pre-3.4.0, this would yield a 7-byte serialization of "Hello" followed by 2 
0-value bytes. I contend that this was actually expected and correct behavior, 
as the ByteBuffer had never had its limit set, so the implicit and mildly 
expected contract was never actually abided by. If there was a buffer.flip() 
after the .put(bytes) call, as the calling code _should_ have applied, however, 
then the test would have succeeded. In short, by trying to make this test case 
succeed, I think this PR represented nothing more than a misunderstanding for 
how one should prepare ByteBuffers for reading, but has managed to result in a 
breaking change. The breaking nature of this was actually briefly noted in PR 
comments but kind of shrugged off with some test changes and explanatory 
comments on the class.

Obviously a correction to restore 3.3.2 behavior would represent another 
breaking change for users that are running on 3.4+, unless they were also 
somewhat surprisingly configuring buffers for position() == limit() before 
passing them to send. Arguably, it would also be a breaking change (though 
possibly not one of great consequence) if either version was changed to 
correctly handle the wrapped-with-offset case as represented in the original 
ticket.

I do not have much experience contending with a situation like this, but at the 
risk of jumping to a solution here, I wonder if the only way to really move 
forward safely and unambiguously here is to remove ByteBufferSerializer as it 
stands and replace it with a differently named substitute that handles both the 
plain-wrapped special case and just serializes content from position() to 
limit(), forcing an evaluation by users when upgrading as to whether the 
provided byte buffer is correctly configured or not. Of course, a change like 
that would have be released at an appropriate version level, too, so I don't 
know exactly what the desired interim behavior would be (deprecation?). I 
believe I would be eager to contribute to a fix, but obviously I would need 
guidance from maintainers regarding the correct path forward semantically.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)

Reply via email to