blambov commented on code in PR #3764:
URL: https://github.com/apache/cassandra/pull/3764#discussion_r1900963333


##########
src/java/org/apache/cassandra/db/NativeClustering.java:
##########
@@ -3,7 +3,7 @@
 * or more contributor license agreements.  See the NOTICE file
 * distributed with this work for additional information
 * regarding copyright ownership.  The ASF licenses this file
-* to you under the Apache License, Version 2.0 (the
+* to you under the Apache License, Version 2.0 (thec

Review Comment:
   Unwanted change?



##########
src/java/org/apache/cassandra/db/marshal/NativeAccessor.java:
##########
@@ -0,0 +1,447 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.cassandra.db.marshal;
+
+import java.io.IOException;
+import java.nio.ByteBuffer;
+import java.nio.FloatBuffer;
+import java.nio.charset.CharacterCodingException;
+import java.nio.charset.Charset;
+import java.util.UUID;
+
+import org.apache.cassandra.db.Digest;
+import org.apache.cassandra.db.TypeSizes;
+import org.apache.cassandra.io.util.DataInputPlus;
+import org.apache.cassandra.io.util.DataOutputPlus;
+import org.apache.cassandra.service.paxos.Ballot;
+import org.apache.cassandra.utils.ByteBufferUtil;
+import org.apache.cassandra.utils.FastByteOperations;
+import org.apache.cassandra.utils.TimeUUID;
+import org.apache.cassandra.utils.UUIDGen;
+
+/**
+ * ValueAccessor has a lot of different methods are grouped together in a 
single interface.
+ * Technically the methods can be classfied to 4 categories:
+ * 1) basic methods to deal with the existing data as an abstract read-only 
container of bytes
+ * 2) deserialization methods to decode the data into different data types
+ * 3) serialization methods to encode and write different data types into the 
value entity
+ * 4) Value object creation methods
+ *
+ *  NativeAccessor provides a support for real NativeData objects (on top of 
off-heap memory) for 1-3 categories
+ *  with a focus on 1) category and only emulates 4th category using 
ByteBufferSliceNativeData on top of heap ByteBuffers.
+ *  We expect NativeData is used only to store data in Memtables with an 
explicit allocator and memory regions lifecycle
+ *  and not used to create short-living Mutation requests and transfer them 
between nodes.
+ */
+public class NativeAccessor implements ValueAccessor<NativeData>
+{
+    public static final ValueAccessor<NativeData> instance = new 
NativeAccessor();
+
+    // 
-----------------------------------------------------------------------------
+    // basic methods to deal with data as a read-only container of bytes
+
+    @Override
+    public int size(NativeData value)
+    {
+        return value.nativeDataSize();
+    }
+
+    @Override
+    public void write(NativeData sourceValue, DataOutputPlus out) throws 
IOException
+    {
+        sourceValue.writeTo(out);
+    }
+
+    @Override
+    public ByteBuffer toBuffer(NativeData value)
+    {
+        if (value == null)
+            return null;
+        return value.asByteBuffer();
+    }
+
+    @Override
+    public void write(NativeData value, ByteBuffer out)
+    {
+        out.put(value.asByteBuffer().duplicate()); // 
ByteBufferSliceNativeDataasByteBuffer() returns a re-usable byte buffer
+    }
+
+    @Override
+    public <V2> int copyTo(NativeData src, int srcOffset, V2 dst, 
ValueAccessor<V2> dstAccessor, int dstOffset, int size)
+    {
+        dstAccessor.copyByteBufferTo(src.asByteBuffer(), srcOffset, dst, 
dstOffset, size);
+        return size;
+    }
+
+    @Override
+    public int copyByteArrayTo(byte[] src, int srcOffset, NativeData 
dstNative, int dstOffset, int size)
+    {
+        ByteBuffer dst = dstNative.asByteBuffer();
+        FastByteOperations.copy(src, srcOffset, dst, dst.position() + 
dstOffset, size);
+        return size;
+    }
+
+    @Override
+    public int copyByteBufferTo(ByteBuffer src, int srcOffset, NativeData 
dstNative, int dstOffset, int size)
+    {
+        ByteBuffer dst = dstNative.asByteBuffer();
+        FastByteOperations.copy(src, src.position() + srcOffset, dst, 
dst.position() + dstOffset, size);
+        return size;
+    }
+
+    @Override
+    public void digest(NativeData value, int offset, int size, Digest digest)
+    {
+        ByteBuffer byteBuffer = value.asByteBuffer();
+        digest.update(byteBuffer, byteBuffer.position() + offset, size);
+    }
+
+    @Override
+    public NativeData slice(NativeData input, int offset, int length)
+    {
+        return input.slice(offset, length);
+    }
+
+    @Override
+    public <VR> int compare(NativeData left, VR right, ValueAccessor<VR> 
accessorR)
+    {
+        if (right instanceof NativeData)
+        {
+            return left.compareTo((NativeData) right);
+        }
+        return left.compareTo(accessorR.toBuffer(right));
+    }
+
+    @Override
+    public int compareByteArrayTo(byte[] left, NativeData right)
+    {
+        return ByteBufferUtil.compare(left, right.asByteBuffer());
+    }
+
+    @Override
+    public int compareByteBufferTo(ByteBuffer left, NativeData right)
+    {
+        return -right.compareTo(left); // we want to avoid ByteBuffer 
retrieval from NativeData
+    }
+
+     // 
-----------------------------------------------------------------------------
+     // Data deserialization methods
+
+    @Override
+    public byte[] toArray(NativeData value)
+    {
+        if (value == null)
+            return null;
+        return ByteBufferUtil.getArray(value.asByteBuffer());
+    }
+
+    @Override
+    public byte[] toArray(NativeData value, int offset, int length)
+    {
+        if (value == null)
+            return null;
+        ByteBuffer byteBuffer = value.asByteBuffer();
+        return ByteBufferUtil.getArray(byteBuffer, byteBuffer.position() + 
offset, length);
+    }
+
+    @Override
+    public String toString(NativeData value, Charset charset) throws 
CharacterCodingException
+    {
+        return ByteBufferUtil.string(value.asByteBuffer(), charset);
+    }
+
+    @Override
+    public String toHex(NativeData value)
+    {
+        return ByteBufferUtil.bytesToHex(value.asByteBuffer());
+    }
+
+    @Override
+    public byte toByte(NativeData value)
+    {
+        return ByteBufferUtil.toByte(value.asByteBuffer());
+    }
+
+    @Override
+    public byte getByte(NativeData value, int offset)

Review Comment:
   `NativeData` should have an address and length, and then all these can be 
implemented using `MemoryUtil`'s methods.



##########
src/java/org/apache/cassandra/db/NativeClustering.java:
##########
@@ -93,10 +100,172 @@ public int size()
         return MemoryUtil.getShort(peer);
     }
 
-    public ByteBuffer get(int i)
+    private static class NativeClusteringValue implements NativeData {
+        private final long peer;
+        private final int i;
+
+        private NativeClusteringValue(long peer, int i)
+        {
+            this.peer = peer;
+            this.i = i;
+        }
+
+        @Override
+        public int nativeDataSize()
+        {
+            int size = parentSize();
+            return NativeClustering.nativeDataSize(peer, size, i);
+        }
+
+        @Override
+        public ByteBuffer asByteBuffer()
+        {
+            return getByteBuffer((address, length) -> 
MemoryUtil.getByteBuffer(address, length, ByteOrder.BIG_ENDIAN));
+        }
+
+        @Override
+        public NativeData slice(int offset, int length)
+        {
+            ByteBuffer byteBuffer = asByteBuffer(); // we get a new buffer 
here each time, so duplicate() is not needed
+            byteBuffer.position(byteBuffer.position() + offset);
+            byteBuffer.limit(byteBuffer.position() + length);
+            return new ByteBufferSliceNativeData(byteBuffer);
+        }
+
+        private int parentSize() {
+            return MemoryUtil.getShort(peer);
+        }
+
+        private static final FastThreadLocal<ByteBuffer> 
REUSABLE_WRITE_BUFFER_1 = new FastThreadLocal<ByteBuffer>()
+        {
+            @Override
+            protected ByteBuffer initialValue()
+            {
+                return 
MemoryUtil.getHollowDirectByteBuffer(ByteOrder.BIG_ENDIAN);
+            }
+        };
+
+        /**
+         * we need two re-usable buffers to be able to do compare operation 
without memory allocations
+         */
+        private static final FastThreadLocal<ByteBuffer> 
REUSABLE_WRITE_BUFFER_2 = new FastThreadLocal<ByteBuffer>()
+        {
+            @Override
+            protected ByteBuffer initialValue()
+            {
+                return 
MemoryUtil.getHollowDirectByteBuffer(ByteOrder.BIG_ENDIAN);
+            }
+        };
+
+        @Override
+        public void writeTo(DataOutputPlus out) throws IOException
+        {
+            ByteBuffer byteBuffer = getByteBuffer((address, length) -> {
+                ByteBuffer result = REUSABLE_WRITE_BUFFER_1.get();
+                MemoryUtil.setDirectByteBuffer(result, address, length);
+                return result;
+            });
+            out.write(byteBuffer);

Review Comment:
   I would much prefer to add a method to `DataOutputPlus` to write from a 
memory location rather than use thread locals.
   
   This method could be implemented e.g. as
   ```
   Subject: [PATCH] Make CorruptedSSTablesCompactionsTest not flaky
   
   by corrupting sstables until they are verifiably corrupted
   ---
   Index: src/java/org/apache/cassandra/utils/memory/MemoryUtil.java
   IDEA additional info:
   Subsystem: com.intellij.openapi.diff.impl.patch.CharsetEP
   <+>UTF-8
   ===================================================================
   diff --git a/src/java/org/apache/cassandra/utils/memory/MemoryUtil.java 
b/src/java/org/apache/cassandra/utils/memory/MemoryUtil.java
   --- a/src/java/org/apache/cassandra/utils/memory/MemoryUtil.java     
(revision a88ff2046983f733a2b6ddf160f4578ae78add73)
   +++ b/src/java/org/apache/cassandra/utils/memory/MemoryUtil.java     (date 
1735837568258)
   @@ -424,4 +424,38 @@
    
            unsafe.copyMemory(null, address, buffer, BYTE_ARRAY_BASE_OFFSET + 
bufferOffset, count);
        }
   +
   +
   +    /**
   +     * Transfers count bytes from Memory starting at memoryOffset to buffer
   +     *
   +     * @param address start offset in the memory
   +     * @param buffer the data buffer
   +     * @param length number of bytes to transfer
   +     */
   +    public static void getBytes(long address, int length, ByteBuffer buffer)
   +    {
   +        if (buffer == null)
   +            throw new NullPointerException();
   +        else if (length < 0 || length > buffer.remaining())
   +            throw new IndexOutOfBoundsException();
   +        else if (length == 0)
   +            return;
   +
   +        Object obj;
   +        long offset;
   +        if (buffer.hasArray())
   +        {
   +            obj = buffer.array();
   +            offset = BYTE_ARRAY_BASE_OFFSET + buffer.arrayOffset();
   +        }
   +        else
   +        {
   +            obj = null;
   +            offset = unsafe.getLong(buffer, 
DIRECT_BYTE_BUFFER_ADDRESS_OFFSET);
   +        }
   +        offset += buffer.position();
   +
   +        unsafe.copyMemory(null, address, obj, offset, length);
   +    }
    }
   Index: src/java/org/apache/cassandra/io/util/DataOutputPlus.java
   IDEA additional info:
   Subsystem: com.intellij.openapi.diff.impl.patch.CharsetEP
   <+>UTF-8
   ===================================================================
   diff --git a/src/java/org/apache/cassandra/io/util/DataOutputPlus.java 
b/src/java/org/apache/cassandra/io/util/DataOutputPlus.java
   --- a/src/java/org/apache/cassandra/io/util/DataOutputPlus.java      
(revision a88ff2046983f733a2b6ddf160f4578ae78add73)
   +++ b/src/java/org/apache/cassandra/io/util/DataOutputPlus.java      (date 
1735836867730)
   @@ -18,6 +18,7 @@
    package org.apache.cassandra.io.util;
    
    import org.apache.cassandra.utils.Shared;
   +import org.apache.cassandra.utils.memory.MemoryUtil;
    import org.apache.cassandra.utils.vint.VIntCoding;
    
    import java.io.DataOutput;
   @@ -44,6 +45,11 @@
                write(buffer);
        }
    
   +    default void writeMemory(long address, int length) throws IOException
   +    {
   +        write(MemoryUtil.getByteBuffer(address, length));
   +    }
   +
        default void writeVInt(long i) throws IOException
        {
            VIntCoding.writeVInt(i, this);
   Index: 
src/java/org/apache/cassandra/io/util/BufferedDataOutputStreamPlus.java
   IDEA additional info:
   Subsystem: com.intellij.openapi.diff.impl.patch.CharsetEP
   <+>UTF-8
   ===================================================================
   diff --git 
a/src/java/org/apache/cassandra/io/util/BufferedDataOutputStreamPlus.java 
b/src/java/org/apache/cassandra/io/util/BufferedDataOutputStreamPlus.java
   --- 
a/src/java/org/apache/cassandra/io/util/BufferedDataOutputStreamPlus.java       
 (revision a88ff2046983f733a2b6ddf160f4578ae78add73)
   +++ 
b/src/java/org/apache/cassandra/io/util/BufferedDataOutputStreamPlus.java       
 (date 1735837800186)
   @@ -27,6 +27,7 @@
    
    import net.nicoulaj.compilecommand.annotations.DontInline;
    import org.apache.cassandra.utils.FastByteOperations;
   +import org.apache.cassandra.utils.memory.MemoryUtil;
    
    import static 
org.apache.cassandra.config.CassandraRelevantProperties.NIO_DATA_OUTPUT_STREAM_PLUS_BUFFER_SIZE;
    
   @@ -137,6 +138,25 @@
            FastByteOperations.copy(src, srcPos, buffer, buffer.position(), 
srcCount);
            buffer.position(buffer.position() + srcCount);
        }
   +
   +    @Override
   +    public void writeMemory(long address, int length) throws IOException
   +    {
   +        assert buffer != null : "Attempt to use a closed data output";
   +        long srcPos = address;
   +        int srcCount = length;
   +        int trgAvailable;
   +        while (srcCount > (trgAvailable = buffer.remaining()))
   +        {
   +            MemoryUtil.getBytes(srcPos, trgAvailable, buffer);
   +            buffer.position(buffer.position() + trgAvailable);
   +            srcPos += trgAvailable;
   +            srcCount -= trgAvailable;
   +            doFlush(srcCount);
   +        }
   +        MemoryUtil.getBytes(srcPos, srcCount, buffer);
   +        buffer.position(buffer.position() + srcCount);
   +    }
    
        @Override
        public void write(int b) throws IOException
   ```



##########
src/java/org/apache/cassandra/db/marshal/ByteBufferSliceNativeData.java:
##########
@@ -0,0 +1,64 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.cassandra.db.marshal;
+
+import java.io.IOException;
+import java.nio.ByteBuffer;
+
+import org.apache.cassandra.io.util.DataOutputPlus;
+import org.apache.cassandra.utils.ByteBufferUtil;
+
+/**
+ * Temporary created object as a part of slicing (usually to parse collection 
parts)
+ */
+public class ByteBufferSliceNativeData implements NativeData

Review Comment:
   This should be native-memory-based to make sure we are testing the right 
code. We can add a subclass that allocates and holds a reference to a direct 
memory buffer to handle lifecycle for instances that need it (e.g. the 
`valueOf` methods used for testing).



##########
src/java/org/apache/cassandra/db/NativeClustering.java:
##########
@@ -93,10 +100,172 @@ public int size()
         return MemoryUtil.getShort(peer);
     }
 
-    public ByteBuffer get(int i)
+    private static class NativeClusteringValue implements NativeData {
+        private final long peer;
+        private final int i;
+
+        private NativeClusteringValue(long peer, int i)
+        {
+            this.peer = peer;
+            this.i = i;
+        }
+
+        @Override
+        public int nativeDataSize()
+        {
+            int size = parentSize();
+            return NativeClustering.nativeDataSize(peer, size, i);
+        }
+
+        @Override
+        public ByteBuffer asByteBuffer()
+        {
+            return getByteBuffer((address, length) -> 
MemoryUtil.getByteBuffer(address, length, ByteOrder.BIG_ENDIAN));
+        }
+
+        @Override
+        public NativeData slice(int offset, int length)
+        {
+            ByteBuffer byteBuffer = asByteBuffer(); // we get a new buffer 
here each time, so duplicate() is not needed
+            byteBuffer.position(byteBuffer.position() + offset);
+            byteBuffer.limit(byteBuffer.position() + length);
+            return new ByteBufferSliceNativeData(byteBuffer);
+        }
+
+        private int parentSize() {
+            return MemoryUtil.getShort(peer);
+        }
+
+        private static final FastThreadLocal<ByteBuffer> 
REUSABLE_WRITE_BUFFER_1 = new FastThreadLocal<ByteBuffer>()
+        {
+            @Override
+            protected ByteBuffer initialValue()
+            {
+                return 
MemoryUtil.getHollowDirectByteBuffer(ByteOrder.BIG_ENDIAN);
+            }
+        };
+
+        /**
+         * we need two re-usable buffers to be able to do compare operation 
without memory allocations
+         */
+        private static final FastThreadLocal<ByteBuffer> 
REUSABLE_WRITE_BUFFER_2 = new FastThreadLocal<ByteBuffer>()
+        {
+            @Override
+            protected ByteBuffer initialValue()
+            {
+                return 
MemoryUtil.getHollowDirectByteBuffer(ByteOrder.BIG_ENDIAN);
+            }
+        };
+
+        @Override
+        public void writeTo(DataOutputPlus out) throws IOException
+        {
+            ByteBuffer byteBuffer = getByteBuffer((address, length) -> {
+                ByteBuffer result = REUSABLE_WRITE_BUFFER_1.get();
+                MemoryUtil.setDirectByteBuffer(result, address, length);
+                return result;
+            });
+            out.write(byteBuffer);
+        }
+
+        private ByteBuffer getByteBuffer(ByteBufferProvider provider)
+        {
+            int size = parentSize();
+            if (i >= size)
+                throw new IndexOutOfBoundsException();
+
+            return NativeClustering.getByteBuffer(peer, i, size, provider);
+        }
+
+        private interface ByteBufferProvider
+        {
+            ByteBuffer get(long address, int length);
+        }
+
+        @Override
+        public int compareTo(NativeData right)
+        {
+            ByteBuffer leftByteBuffer = getByteBuffer((address, length) -> {

Review Comment:
   As above, I would much prefer to add the relevant method(s) to 
`FastByteOperations` (via the optimized implementation in 
`UnsafeOperations.compareTo(null, address, length, null, address, length)`) to 
avoid the thread locals.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to