Luke Kirby created KAFKA-15602:
----------------------------------
Summary: Breaking change in 3.4.0 ByteBufferSerializer
Key: KAFKA-15602
URL: https://issues.apache.org/jira/browse/KAFKA-15602
Project: Kafka
Issue Type: Bug
Components: producer
Affects Versions: 3.5.1, 3.6.0, 3.4.1, 3.5.0, 3.4.0
Reporter: Luke Kirby
[This PR|https://github.com/apache/kafka/pull/12683/files] claims to have
solved the situation described by KAFKA-4852, namely, to have
ByteBufferSerializer respect ByteBuffers wrapping byte arrays with non-0
offsets (or, put another way, to honor the buffer's position() as the start
point to consume bytes from). Unfortunately, it failed to actually do this, and
instead changed the expectations for how an input ByteBuffer's limit and
position should be set before being provided to send() on a producer configured
with ByteBufferSerializer. Code that worked with pre-3.4.0 releases now produce
0-length messages instead of the intended messages, effectively introducing a
breaking change for existing users of the serializer in the wild.
Here are a few different inputs and serialized outputs under pre-3.4.0 and
3.4.0+ to summarize the breaking change:
||buffer argument||3.3.2 serialized output||3.4.0+ serialized output||
|ByteBuffer.wrap("test".getBytes(UTF_8))|len=4
val=test|len=4 val=test|
|ByteBuffer.allocate(8).put("test".getBytes(UTF_8)).flip()|len=4
val=test|len=0 val=|
|ByteBuffer.allocate(8).put("test".getBytes(UTF_8))|len=8
val=test<0><0><0><0>|len=4 val=test|
|ByteBuffer buff = ByteBuffer.allocate(8).put("test".getBytes(UTF_8));
buff.limit(buff.position());|len=4
val=test|len=4 val=test|
|ByteBuffer.wrap("test".getBytes(UTF_8), 1, 3)|len=4 val=test|len=1 val=t|
Notably, plain-wrappers of byte arrays continue to work under both versions due
to the special case in the serializer for them. I suspect that this is the
dominant use-case, which is why this has apparently gone un-reported to this
point. The wrapped-with-offset case fails for both cases for different reasons
(the expected value would be "est"). As demonstrated here, you can ensure that
a manually assembled ByteBuffer will work under both versions by ensuring that
your buffers start have position == limit == message-length (and an actual
desired start position of 0). Clearly, though, behavior has changed
dramatically for the second and third case there, with the 3.3.2 behavior, in
my experience, aligning better with naive expectations.
[Previously|https://github.com/apache/kafka/blob/35a0de32ee3823dfb548a1cd5d5faf4f7c99e4e0/clients/src/main/java/org/apache/kafka/common/serialization/ByteBufferSerializer.java],
the serializer would just rewind() the buffer and respect the limit as the
indicator as to how much data was in the buffer. So, essentially, the
prevailing contract was that the data from position 0 (always!) up to the limit
on the buffer would be serialized; so it was really just the limit that was
honored. So if, per the original issue, you have a byte[] array wrapped with,
say, ByteBuffer.wrap(bytes, 3, 5) then that will yield a ByteBuffer() with
position = 3 indicating the desired start point to read from, but effectively
ignored by the serializer due to the rewind().
So while the serializer didn't work when presenting a ByteBuffer view onto a
sub-view of a backing array, it did however follow expected behavior when
employing standard patterns to populate ByteBuffers backed by
larger-than-necessary arrays and using limit() to identify the end of actual
data, consistent with conventional usage of flip() to switch from writing to a
buffer to setting it up to be read from (e.g., to be passed into a
producer.send() call). E.g.,
{code:java}
ByteBuffer bb = ByteBuffer.allocate(TOO_MUCH);
... // some sequence of
bb.put(...); // populate buffer with some number of bytes less than TOO_MUCH
...
bb.flip(); /* logically, this says "I am done writing, let's set this up for
reading"; pragmatically, it sets the limit to the current position so that
whoever reads the buffer knows when to stop reading, and sets the position to
zero so it knows where to start reading from */
producer.send(bb); {code}
Technically, you wouldn't even need to use flip() there, since position is
ignored; it would sufficient to just call {{{}bb.limit(bb.position()){}}}.
Notably, a buffer constructed using any variant of ByteBuffer.wrap() is
essentially immediately in read-mode with position indicating the start and
limit the end.
With the change introduced in 3.4.0, however, the contract changes
dramatically, and the code just presented produces a 0-byte message. As
indicated above, it also continues to fail if you just passed in an
offset-specified ByteBuffer.wrap()ped message, too, i.e., the case described by
KAFKA-4852:
{code:java}
@Test
public void testByteBufferSerializerOnOffsetWrappedBytes() {
final byte[] bytes = "Hello".getBytes(UTF_8);
try (final ByteBufferSerializer serializer = new ByteBufferSerializer()) {
assertArrayEquals("ello".getBytes(UTF_8),
// FAILS: this will yield "H", not "ello"
serializer.serialize(topic, ByteBuffer.wrap(bytes, 1,
bytes.length - 1)));
}
}
{code}
What happened here?
The resulting PR, it seems, focussed on a flawed proposed test case in the
first comment of KAFKA-4852 that failed against pre-3.4.0 Kafka. I reproduce
that here with commented annotations from me:
{code:java}
@Test // flawed proposed test case
public void testByteBufferSerializer() {
final byte[] bytes = "Hello".getBytes(UTF_8);
final ByteBuffer buffer = ByteBuffer.allocate(7);
buffer.put(bytes);
// buffer.flip(); <-- would make the test work
try (final ByteBufferSerializer serializer = new ByteBufferSerializer()) {
assertArrayEquals(bytes, serializer.serialize(topic, buffer));
}
} {code}
In pre-3.4.0, this would yield a 7-byte serialization of "Hello" followed by 2
0-value bytes. I contend that this was actually expected and correct behavior,
as the ByteBuffer had never had its limit set, so the implicit and mildly
expected contract was never actually abided by. If there was a buffer.flip()
after the .put(bytes) call, as the calling code _should_ have applied, however,
then the test would have succeeded. In short, by trying to make this test case
succeed, I think this PR represented nothing more than a misunderstanding for
how one should prepare ByteBuffers for reading, but has managed to result in a
breaking change. The breaking nature of this was actually briefly noted in PR
comments but kind of shrugged off with some test changes and explanatory
comments on the class.
Obviously a correction to restore 3.3.2 behavior would represent another
breaking change for users that are running on 3.4+, unless they were also
somewhat surprisingly configuring buffers for position() == limit() before
passing them to send. Arguably, it would also be a breaking change (though
possibly not one of great consequence) if either version was changed to
correctly handle the wrapped-with-offset case as represented in the original
ticket.
I do not have much experience contending with a situation like this, but at the
risk of jumping to a solution here, I wonder if the only way to really move
forward safely and unambiguously here is to remove ByteBufferSerializer as it
stands and replace it with a differently named substitute that handles both the
plain-wrapped special case and just serializes content from position() to
limit(), forcing an evaluation by users when upgrading as to whether the
provided byte buffer is correctly configured or not. Of course, a change like
that would have be released at an appropriate version level, too, so I don't
know exactly what the desired interim behavior would be (deprecation?). I
believe I would be eager to contribute to a fix, but obviously I would need
guidance from maintainers regarding the correct path forward semantically.
--
This message was sent by Atlassian Jira
(v8.20.10#820010)