This is an automated email from the ASF dual-hosted git repository.

viktor pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/kafka.git


The following commit(s) were added to refs/heads/trunk by this push:
     new def8d724c82 KAFKA-14540: Fix DataOutputStreamWritable#writeByteBuffer 
(#13032)
def8d724c82 is described below

commit def8d724c82f84964f353f8cf45ef8d52247b2cc
Author: Michael Marshall <[email protected]>
AuthorDate: Tue Jan 10 04:50:07 2023 -0600

    KAFKA-14540: Fix DataOutputStreamWritable#writeByteBuffer (#13032)
    
    When writing a ByteBuffer backed by a HeapBuffer to a DataOutputStream, it 
is necessary to pass in the offset and the position, not just the position. It 
is also necessary to pass the remain length, not the limit. The current code 
results in writing the wrong data to DataOutputStream. While the 
DataOutputStreamWritable is used in the project, I do not see any references 
that would utilize this code path, so this bug fix is relatively minor.
    
    I added a new test to cover the exact bug. The test fails without this 
change.
---
 .../common/protocol/DataOutputStreamWritable.java  |  2 +-
 .../protocol/DataOutputStreamWritableTest.java     | 67 ++++++++++++++++++++++
 2 files changed, 68 insertions(+), 1 deletion(-)

diff --git 
a/clients/src/main/java/org/apache/kafka/common/protocol/DataOutputStreamWritable.java
 
b/clients/src/main/java/org/apache/kafka/common/protocol/DataOutputStreamWritable.java
index f484016a530..dcf53c4e52b 100644
--- 
a/clients/src/main/java/org/apache/kafka/common/protocol/DataOutputStreamWritable.java
+++ 
b/clients/src/main/java/org/apache/kafka/common/protocol/DataOutputStreamWritable.java
@@ -99,7 +99,7 @@ public class DataOutputStreamWritable implements Writable, 
Closeable {
     public void writeByteBuffer(ByteBuffer buf) {
         try {
             if (buf.hasArray()) {
-                out.write(buf.array(), buf.position(), buf.limit());
+                out.write(buf.array(), buf.arrayOffset() + buf.position(), 
buf.remaining());
             } else {
                 byte[] bytes = Utils.toArray(buf);
                 out.write(bytes);
diff --git 
a/clients/src/test/java/org/apache/kafka/common/protocol/DataOutputStreamWritableTest.java
 
b/clients/src/test/java/org/apache/kafka/common/protocol/DataOutputStreamWritableTest.java
new file mode 100644
index 00000000000..9e06d13a9ac
--- /dev/null
+++ 
b/clients/src/test/java/org/apache/kafka/common/protocol/DataOutputStreamWritableTest.java
@@ -0,0 +1,67 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.common.protocol;
+
+import static org.junit.jupiter.api.Assertions.assertArrayEquals;
+import static org.junit.jupiter.api.Assertions.assertEquals;
+
+import java.io.DataOutputStream;
+import java.nio.ByteBuffer;
+import org.apache.kafka.common.utils.ByteBufferOutputStream;
+import org.junit.jupiter.api.Test;
+
+public class DataOutputStreamWritableTest {
+    @Test
+    public void testWritingSlicedByteBuffer() {
+        byte[] expectedArray = new byte[]{2, 3, 0, 0};
+        ByteBuffer sourceBuffer = ByteBuffer.wrap(new byte[]{0, 1, 2, 3});
+        ByteBuffer resultBuffer = ByteBuffer.allocate(4);
+
+        // Move position forward to ensure slice is not whole buffer
+        sourceBuffer.position(2);
+        ByteBuffer slicedBuffer = sourceBuffer.slice();
+
+        Writable writable = new DataOutputStreamWritable(
+                new DataOutputStream(new 
ByteBufferOutputStream(resultBuffer)));
+
+        writable.writeByteBuffer(slicedBuffer);
+
+        assertEquals(2, resultBuffer.position(), "Writing to the buffer moves 
the position forward");
+        assertArrayEquals(expectedArray, resultBuffer.array(), "Result buffer 
should have expected elements");
+    }
+
+    @Test
+    public void testWritingSlicedByteBufferWithNonZeroPosition() {
+        byte[] expectedArray = new byte[]{3, 0, 0, 0};
+        ByteBuffer originalBuffer = ByteBuffer.wrap(new byte[]{0, 1, 2, 3});
+        ByteBuffer resultBuffer = ByteBuffer.allocate(4);
+
+        // Move position forward to ensure slice is backed by heap buffer with 
non-zero offset
+        originalBuffer.position(2);
+        ByteBuffer slicedBuffer = originalBuffer.slice();
+        // Move the slice's position forward to ensure the writer starts 
reading at that position
+        slicedBuffer.position(1);
+
+        Writable writable = new DataOutputStreamWritable(
+                new DataOutputStream(new 
ByteBufferOutputStream(resultBuffer)));
+
+        writable.writeByteBuffer(slicedBuffer);
+
+        assertEquals(1, resultBuffer.position(), "Writing to the buffer moves 
the position forward");
+        assertArrayEquals(expectedArray, resultBuffer.array(), "Result buffer 
should have expected elements");
+    }
+}
\ No newline at end of file

Reply via email to