This is an automated email from the ASF dual-hosted git repository.

charlesconnell pushed a commit to branch HBASE-29688/varhandle
in repository https://gitbox.apache.org/repos/asf/hbase.git

commit 11b8b7d65b9fa41e4dfb41ee411072295cfd3fb3
Author: Charles Connell <[email protected]>
AuthorDate: Mon Oct 27 14:33:28 2025 -0400

    Remove many uses of Unsafe
---
 .../apache/hadoop/hbase/io/ByteBuffAllocator.java  |   8 +-
 .../apache/hadoop/hbase/nio/SingleByteBuff.java    |  35 +-
 .../apache/hadoop/hbase/util/ByteBufferUtils.java  | 545 +++---------
 .../java/org/apache/hadoop/hbase/util/Bytes.java   | 359 ++------
 .../org/apache/hadoop/hbase/util/UnsafeAccess.java | 293 -------
 .../hadoop/hbase/util/ByteBufferUtilsTestBase.java | 930 +++++++++++++++++++++
 .../apache/hadoop/hbase/util/BytesTestBase.java    | 341 ++++++++
 .../hbase/util/TestByteBufferUtilsWoUnsafe.java    |   1 -
 .../hadoop/hbase/util/TestBytesWoUnsafe.java       |  41 -
 .../hbase/util/TestFromClientSide3WoUnsafe.java    |   1 -
 10 files changed, 1460 insertions(+), 1094 deletions(-)

diff --git 
a/hbase-common/src/main/java/org/apache/hadoop/hbase/io/ByteBuffAllocator.java 
b/hbase-common/src/main/java/org/apache/hadoop/hbase/io/ByteBuffAllocator.java
index 737d93207cb..2954720ebe7 100644
--- 
a/hbase-common/src/main/java/org/apache/hadoop/hbase/io/ByteBuffAllocator.java
+++ 
b/hbase-common/src/main/java/org/apache/hadoop/hbase/io/ByteBuffAllocator.java
@@ -29,7 +29,6 @@ import org.apache.hadoop.hbase.HConstants;
 import org.apache.hadoop.hbase.nio.ByteBuff;
 import org.apache.hadoop.hbase.nio.SingleByteBuff;
 import org.apache.hadoop.hbase.util.ReflectionUtils;
-import org.apache.hadoop.hbase.util.UnsafeAccess;
 import org.apache.yetus.audience.InterfaceAudience;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
@@ -349,12 +348,7 @@ public class ByteBuffAllocator {
    * Free all direct buffers if allocated, mainly used for testing.
    */
   public void clean() {
-    while (!buffers.isEmpty()) {
-      ByteBuffer b = buffers.poll();
-      if (b.isDirect()) {
-        UnsafeAccess.freeDirectBuffer(b);
-      }
-    }
+    this.buffers.clear();
     this.usedBufCount.set(0);
     this.maxPoolSizeInfoLevelLogged = false;
     this.poolAllocationBytes.reset();
diff --git 
a/hbase-common/src/main/java/org/apache/hadoop/hbase/nio/SingleByteBuff.java 
b/hbase-common/src/main/java/org/apache/hadoop/hbase/nio/SingleByteBuff.java
index 320696ada70..b49fdd243ff 100644
--- a/hbase-common/src/main/java/org/apache/hadoop/hbase/nio/SingleByteBuff.java
+++ b/hbase-common/src/main/java/org/apache/hadoop/hbase/nio/SingleByteBuff.java
@@ -24,10 +24,8 @@ import java.nio.ByteBuffer;
 import java.nio.channels.FileChannel;
 import java.nio.channels.ReadableByteChannel;
 import org.apache.hadoop.hbase.io.ByteBuffAllocator.Recycler;
-import org.apache.hadoop.hbase.unsafe.HBasePlatformDependent;
 import org.apache.hadoop.hbase.util.ByteBufferUtils;
 import org.apache.hadoop.hbase.util.ObjectIntPair;
-import org.apache.hadoop.hbase.util.UnsafeAccess;
 import org.apache.yetus.audience.InterfaceAudience;
 
 /**
@@ -37,16 +35,9 @@ import org.apache.yetus.audience.InterfaceAudience;
 @InterfaceAudience.Private
 public class SingleByteBuff extends ByteBuff {
 
-  private static final boolean UNSAFE_AVAIL = 
HBasePlatformDependent.isUnsafeAvailable();
-  private static final boolean UNSAFE_UNALIGNED = 
HBasePlatformDependent.unaligned();
-
   // Underlying BB
   private final ByteBuffer buf;
 
-  // To access primitive values from underlying ByteBuffer using Unsafe
-  private long unsafeOffset;
-  private Object unsafeRef = null;
-
   public SingleByteBuff(ByteBuffer buf) {
     this(NONE, buf);
   }
@@ -58,12 +49,6 @@ public class SingleByteBuff extends ByteBuff {
   SingleByteBuff(RefCnt refCnt, ByteBuffer buf) {
     this.refCnt = refCnt;
     this.buf = buf;
-    if (buf.hasArray()) {
-      this.unsafeOffset = UnsafeAccess.BYTE_ARRAY_BASE_OFFSET + 
buf.arrayOffset();
-      this.unsafeRef = buf.array();
-    } else {
-      this.unsafeOffset = UnsafeAccess.directBufferAddress(buf);
-    }
   }
 
   @Override
@@ -181,10 +166,7 @@ public class SingleByteBuff extends ByteBuff {
   @Override
   public byte get(int index) {
     checkRefCount();
-    if (UNSAFE_AVAIL) {
-      return UnsafeAccess.toByte(this.unsafeRef, this.unsafeOffset + index);
-    }
-    return this.buf.get(index);
+    return ByteBufferUtils.toByte(buf, index);
   }
 
   @Override
@@ -284,10 +266,7 @@ public class SingleByteBuff extends ByteBuff {
   @Override
   public short getShort(int index) {
     checkRefCount();
-    if (UNSAFE_UNALIGNED) {
-      return UnsafeAccess.toShort(unsafeRef, unsafeOffset + index);
-    }
-    return this.buf.getShort(index);
+    return ByteBufferUtils.toShort(this.buf, index);
   }
 
   @Override
@@ -312,10 +291,7 @@ public class SingleByteBuff extends ByteBuff {
   @Override
   public int getInt(int index) {
     checkRefCount();
-    if (UNSAFE_UNALIGNED) {
-      return UnsafeAccess.toInt(unsafeRef, unsafeOffset + index);
-    }
-    return this.buf.getInt(index);
+    return ByteBufferUtils.toInt(this.buf, index);
   }
 
   @Override
@@ -340,10 +316,7 @@ public class SingleByteBuff extends ByteBuff {
   @Override
   public long getLong(int index) {
     checkRefCount();
-    if (UNSAFE_UNALIGNED) {
-      return UnsafeAccess.toLong(unsafeRef, unsafeOffset + index);
-    }
-    return this.buf.getLong(index);
+    return ByteBufferUtils.toLong(this.buf, index);
   }
 
   @Override
diff --git 
a/hbase-common/src/main/java/org/apache/hadoop/hbase/util/ByteBufferUtils.java 
b/hbase-common/src/main/java/org/apache/hadoop/hbase/util/ByteBufferUtils.java
index d6b93632340..d2c3b7cc320 100644
--- 
a/hbase-common/src/main/java/org/apache/hadoop/hbase/util/ByteBufferUtils.java
+++ 
b/hbase-common/src/main/java/org/apache/hadoop/hbase/util/ByteBufferUtils.java
@@ -24,9 +24,12 @@ import java.io.DataOutput;
 import java.io.IOException;
 import java.io.InputStream;
 import java.io.OutputStream;
+import java.lang.invoke.MethodHandles;
+import java.lang.invoke.VarHandle;
 import java.math.BigDecimal;
 import java.math.BigInteger;
 import java.nio.ByteBuffer;
+import java.nio.ByteOrder;
 import java.util.Arrays;
 import org.apache.hadoop.hbase.io.ByteBufferWriter;
 import org.apache.hadoop.hbase.io.util.StreamUtils;
@@ -47,7 +50,19 @@ public final class ByteBufferUtils {
   public final static int NEXT_BIT_SHIFT = 7;
   public final static int NEXT_BIT_MASK = 1 << 7;
   final static boolean UNSAFE_AVAIL = 
HBasePlatformDependent.isUnsafeAvailable();
-  public final static boolean UNSAFE_UNALIGNED = 
HBasePlatformDependent.unaligned();
+
+  /*
+   * The VarHandles below must be used directly, not via non-static aliases. 
Non-static usage of a
+   * VarHandle is much slower than static usage.
+   */
+  private static final VarHandle BYTE_BUFFER_SHORT_VIEW_BIG_ENDIAN_VAR_HANDLE =
+    MethodHandles.byteBufferViewVarHandle(short[].class, ByteOrder.BIG_ENDIAN);
+  private static final VarHandle BYTE_BUFFER_INT_VIEW_BIG_ENDIAN_VAR_HANDLE =
+    MethodHandles.byteBufferViewVarHandle(int[].class, ByteOrder.BIG_ENDIAN);
+  private static final VarHandle BYTE_BUFFER_LONG_VIEW_BIG_ENDIAN_VAR_HANDLE =
+    MethodHandles.byteBufferViewVarHandle(long[].class, ByteOrder.BIG_ENDIAN);
+  private static final VarHandle BYTE_ARRAY_LONG_VIEW_BIG_ENDIAN_VAR_HANDLE =
+    MethodHandles.byteArrayViewVarHandle(long[].class, ByteOrder.BIG_ENDIAN);
 
   private ByteBufferUtils() {
   }
@@ -58,28 +73,6 @@ public final class ByteBufferUtils {
     abstract int compareTo(ByteBuffer buf1, int o1, int l1, ByteBuffer buf2, 
int o2, int l2);
   }
 
-  static abstract class Converter {
-    abstract short toShort(ByteBuffer buffer, int offset);
-
-    abstract int toInt(ByteBuffer buffer);
-
-    abstract int toInt(ByteBuffer buffer, int offset);
-
-    abstract long toLong(ByteBuffer buffer, int offset);
-
-    abstract void putInt(ByteBuffer buffer, int val);
-
-    abstract int putInt(ByteBuffer buffer, int index, int val);
-
-    abstract void putShort(ByteBuffer buffer, short val);
-
-    abstract int putShort(ByteBuffer buffer, int index, short val);
-
-    abstract void putLong(ByteBuffer buffer, long val);
-
-    abstract int putLong(ByteBuffer buffer, int index, long val);
-  }
-
   static abstract class CommonPrefixer {
     abstract int findCommonPrefix(ByteBuffer left, int leftOffset, int 
leftLength, byte[] right,
       int rightOffset, int rightLength);
@@ -88,349 +81,150 @@ public final class ByteBufferUtils {
       int rightOffset, int rightLength);
   }
 
+  /**
+   * <a href=
+   * 
"https://github.com/google/guava/blob/v21.0/guava/src/com/google/common/primitives/UnsignedBytes.java#L362";>Adapted
+   * from Guava</a>
+   */
   static class ComparerHolder {
-    static final String UNSAFE_COMPARER_NAME = ComparerHolder.class.getName() 
+ "$UnsafeComparer";
-
     static final Comparer BEST_COMPARER = getBestComparer();
 
     static Comparer getBestComparer() {
-      try {
-        Class<? extends Comparer> theClass =
-          Class.forName(UNSAFE_COMPARER_NAME).asSubclass(Comparer.class);
-
-        return theClass.getConstructor().newInstance();
-      } catch (Throwable t) { // ensure we really catch *everything*
-        return PureJavaComparer.INSTANCE;
-      }
+      return VarHandleComparer.INSTANCE;
     }
 
-    static final class PureJavaComparer extends Comparer {
-      static final PureJavaComparer INSTANCE = new PureJavaComparer();
+    static final class VarHandleComparer extends Comparer {
 
-      private PureJavaComparer() {
-      }
+      static final VarHandleComparer INSTANCE = new VarHandleComparer();
+      private static final int STRIDE = Long.BYTES;
 
       @Override
-      public int compareTo(byte[] buf1, int o1, int l1, ByteBuffer buf2, int 
o2, int l2) {
-        int end1 = o1 + l1;
-        int end2 = o2 + l2;
-        for (int i = o1, j = o2; i < end1 && j < end2; i++, j++) {
-          int a = buf1[i] & 0xFF;
-          int b = buf2.get(j) & 0xFF;
-          if (a != b) {
-            return a - b;
+      int compareTo(byte[] buf1, int o1, int l1, ByteBuffer buf2, int o2, int 
l2) {
+        final int minLength = Math.min(l1, l2);
+        int strideLimit = minLength & ~(STRIDE - 1);
+        int i;
+
+        for (i = 0; i < strideLimit; i += STRIDE) {
+          // big-endian fetches are faster than little-endian because the 
bytes don't need to get
+          // reversed
+          long lw = (long) 
BYTE_ARRAY_LONG_VIEW_BIG_ENDIAN_VAR_HANDLE.get(buf1, o1 + i);
+          long rw = (long) 
BYTE_BUFFER_LONG_VIEW_BIG_ENDIAN_VAR_HANDLE.get(buf2, o2 + i);
+          if (lw != rw) {
+            return ((lw + Long.MIN_VALUE) < (rw + Long.MIN_VALUE)) ? -1 : 1;
           }
         }
-        return l1 - l2;
-      }
 
-      @Override
-      public int compareTo(ByteBuffer buf1, int o1, int l1, ByteBuffer buf2, 
int o2, int l2) {
-        int end1 = o1 + l1;
-        int end2 = o2 + l2;
-        for (int i = o1, j = o2; i < end1 && j < end2; i++, j++) {
-          int a = buf1.get(i) & 0xFF;
-          int b = buf2.get(j) & 0xFF;
-          if (a != b) {
-            return a - b;
+        // The epilogue to cover the last (minLength % stride) elements.
+        for (; i < minLength; i++) {
+          int il = buf1[o1 + i] & 0xFF;
+          int ir = buf2.get(o2 + i) & 0xFF;
+          if (il != ir) {
+            return il - ir;
           }
         }
         return l1 - l2;
       }
-    }
-
-    static final class UnsafeComparer extends Comparer {
-
-      public UnsafeComparer() {
-      }
-
-      static {
-        if (!UNSAFE_UNALIGNED) {
-          throw new Error();
-        }
-      }
-
-      @Override
-      public int compareTo(byte[] buf1, int o1, int l1, ByteBuffer buf2, int 
o2, int l2) {
-        long offset2Adj;
-        Object refObj2 = null;
-        if (buf2.isDirect()) {
-          offset2Adj = o2 + UnsafeAccess.directBufferAddress(buf2);
-        } else {
-          offset2Adj = o2 + buf2.arrayOffset() + 
UnsafeAccess.BYTE_ARRAY_BASE_OFFSET;
-          refObj2 = buf2.array();
-        }
-        return compareToUnsafe(buf1, o1 + UnsafeAccess.BYTE_ARRAY_BASE_OFFSET, 
l1, refObj2,
-          offset2Adj, l2);
-      }
 
       @Override
       public int compareTo(ByteBuffer buf1, int o1, int l1, ByteBuffer buf2, 
int o2, int l2) {
-        long offset1Adj, offset2Adj;
-        Object refObj1 = null, refObj2 = null;
-        if (buf1.isDirect()) {
-          offset1Adj = o1 + UnsafeAccess.directBufferAddress(buf1);
-        } else {
-          offset1Adj = o1 + buf1.arrayOffset() + 
UnsafeAccess.BYTE_ARRAY_BASE_OFFSET;
-          refObj1 = buf1.array();
-        }
-        if (buf2.isDirect()) {
-          offset2Adj = o2 + UnsafeAccess.directBufferAddress(buf2);
-        } else {
-          offset2Adj = o2 + buf2.arrayOffset() + 
UnsafeAccess.BYTE_ARRAY_BASE_OFFSET;
-          refObj2 = buf2.array();
+        final int minLength = Math.min(l1, l2);
+        int strideLimit = minLength & ~(STRIDE - 1);
+        int i;
+
+        for (i = 0; i < strideLimit; i += STRIDE) {
+          // big-endian fetches are faster than little-endian because the 
bytes don't need to get
+          // reversed
+          long lw = (long) 
BYTE_BUFFER_LONG_VIEW_BIG_ENDIAN_VAR_HANDLE.get(buf1, o1 + i);
+          long rw = (long) 
BYTE_BUFFER_LONG_VIEW_BIG_ENDIAN_VAR_HANDLE.get(buf2, o2 + i);
+          if (lw != rw) {
+            return ((lw + Long.MIN_VALUE) < (rw + Long.MIN_VALUE)) ? -1 : 1;
+          }
         }
-        return compareToUnsafe(refObj1, offset1Adj, l1, refObj2, offset2Adj, 
l2);
-      }
-    }
-  }
-
-  static class ConverterHolder {
-    static final String UNSAFE_CONVERTER_NAME =
-      ConverterHolder.class.getName() + "$UnsafeConverter";
-    static final Converter BEST_CONVERTER = getBestConverter();
-
-    static Converter getBestConverter() {
-      try {
-        Class<? extends Converter> theClass =
-          Class.forName(UNSAFE_CONVERTER_NAME).asSubclass(Converter.class);
-
-        // yes, UnsafeComparer does implement Comparer<byte[]>
-        return theClass.getConstructor().newInstance();
-      } catch (Throwable t) { // ensure we really catch *everything*
-        return PureJavaConverter.INSTANCE;
-      }
-    }
-
-    static final class PureJavaConverter extends Converter {
-      static final PureJavaConverter INSTANCE = new PureJavaConverter();
-
-      private PureJavaConverter() {
-      }
-
-      @Override
-      short toShort(ByteBuffer buffer, int offset) {
-        return buffer.getShort(offset);
-      }
-
-      @Override
-      int toInt(ByteBuffer buffer) {
-        return buffer.getInt();
-      }
-
-      @Override
-      int toInt(ByteBuffer buffer, int offset) {
-        return buffer.getInt(offset);
-      }
-
-      @Override
-      long toLong(ByteBuffer buffer, int offset) {
-        return buffer.getLong(offset);
-      }
-
-      @Override
-      void putInt(ByteBuffer buffer, int val) {
-        buffer.putInt(val);
-      }
-
-      @Override
-      int putInt(ByteBuffer buffer, int index, int val) {
-        buffer.putInt(index, val);
-        return index + Bytes.SIZEOF_INT;
-      }
 
-      @Override
-      void putShort(ByteBuffer buffer, short val) {
-        buffer.putShort(val);
-      }
-
-      @Override
-      int putShort(ByteBuffer buffer, int index, short val) {
-        buffer.putShort(index, val);
-        return index + Bytes.SIZEOF_SHORT;
-      }
-
-      @Override
-      void putLong(ByteBuffer buffer, long val) {
-        buffer.putLong(val);
-      }
-
-      @Override
-      int putLong(ByteBuffer buffer, int index, long val) {
-        buffer.putLong(index, val);
-        return index + Bytes.SIZEOF_LONG;
-      }
-    }
-
-    static final class UnsafeConverter extends Converter {
-
-      public UnsafeConverter() {
-      }
-
-      static {
-        if (!UNSAFE_UNALIGNED) {
-          throw new Error();
+        // The epilogue to cover the last (minLength % stride) elements.
+        for (; i < minLength; i++) {
+          int il = buf1.get(o1 + i) & 0xFF;
+          int ir = buf2.get(o2 + i) & 0xFF;
+          if (il != ir) {
+            return il - ir;
+          }
         }
-      }
-
-      @Override
-      short toShort(ByteBuffer buffer, int offset) {
-        return UnsafeAccess.toShort(buffer, offset);
-      }
-
-      @Override
-      int toInt(ByteBuffer buffer) {
-        int i = UnsafeAccess.toInt(buffer, buffer.position());
-        buffer.position(buffer.position() + Bytes.SIZEOF_INT);
-        return i;
-      }
-
-      @Override
-      int toInt(ByteBuffer buffer, int offset) {
-        return UnsafeAccess.toInt(buffer, offset);
-      }
-
-      @Override
-      long toLong(ByteBuffer buffer, int offset) {
-        return UnsafeAccess.toLong(buffer, offset);
-      }
-
-      @Override
-      void putInt(ByteBuffer buffer, int val) {
-        int newPos = UnsafeAccess.putInt(buffer, buffer.position(), val);
-        buffer.position(newPos);
-      }
-
-      @Override
-      int putInt(ByteBuffer buffer, int index, int val) {
-        return UnsafeAccess.putInt(buffer, index, val);
-      }
-
-      @Override
-      void putShort(ByteBuffer buffer, short val) {
-        int newPos = UnsafeAccess.putShort(buffer, buffer.position(), val);
-        buffer.position(newPos);
-      }
-
-      @Override
-      int putShort(ByteBuffer buffer, int index, short val) {
-        return UnsafeAccess.putShort(buffer, index, val);
-      }
-
-      @Override
-      void putLong(ByteBuffer buffer, long val) {
-        int newPos = UnsafeAccess.putLong(buffer, buffer.position(), val);
-        buffer.position(newPos);
-      }
-
-      @Override
-      int putLong(ByteBuffer buffer, int index, long val) {
-        return UnsafeAccess.putLong(buffer, index, val);
+        return l1 - l2;
       }
     }
   }
 
   static class CommonPrefixerHolder {
-    static final String UNSAFE_COMMON_PREFIXER_NAME =
-      CommonPrefixerHolder.class.getName() + "$UnsafeCommonPrefixer";
-
     static final CommonPrefixer BEST_COMMON_PREFIXER = getBestCommonPrefixer();
 
     static CommonPrefixer getBestCommonPrefixer() {
-      try {
-        Class<? extends CommonPrefixer> theClass =
-          
Class.forName(UNSAFE_COMMON_PREFIXER_NAME).asSubclass(CommonPrefixer.class);
-
-        return theClass.getConstructor().newInstance();
-      } catch (Throwable t) { // ensure we really catch *everything*
-        return PureJavaCommonPrefixer.INSTANCE;
-      }
+      return VarHandleCommonPrefixer.INSTANCE;
     }
 
-    static final class PureJavaCommonPrefixer extends CommonPrefixer {
-      static final PureJavaCommonPrefixer INSTANCE = new 
PureJavaCommonPrefixer();
+    static final class VarHandleCommonPrefixer extends CommonPrefixer {
 
-      private PureJavaCommonPrefixer() {
-      }
+      static final VarHandleCommonPrefixer INSTANCE = new 
VarHandleCommonPrefixer();
 
-      @Override
-      public int findCommonPrefix(ByteBuffer left, int leftOffset, int 
leftLength, byte[] right,
-        int rightOffset, int rightLength) {
-        int length = Math.min(leftLength, rightLength);
-        int result = 0;
-
-        while (
-          result < length
-            && ByteBufferUtils.toByte(left, leftOffset + result) == 
right[rightOffset + result]
-        ) {
-          result++;
-        }
-
-        return result;
-      }
+      private static final int STRIDE = Long.BYTES;
 
       @Override
-      int findCommonPrefix(ByteBuffer left, int leftOffset, int leftLength, 
ByteBuffer right,
+      public int findCommonPrefix(ByteBuffer left, int leftOffset, int 
leftLength, byte[] right,
         int rightOffset, int rightLength) {
-        int length = Math.min(leftLength, rightLength);
-        int result = 0;
-
-        while (
-          result < length && ByteBufferUtils.toByte(left, leftOffset + result)
-              == ByteBufferUtils.toByte(right, rightOffset + result)
-        ) {
-          result++;
+        final int minLength = Math.min(leftLength, rightLength);
+        int strideLimit = minLength & ~(STRIDE - 1);
+        int i;
+
+        for (i = 0; i < strideLimit; i += STRIDE) {
+          // big-endian fetches are faster than little-endian because the 
bytes don't need to get
+          // reversed
+          long lw = (long) 
BYTE_BUFFER_LONG_VIEW_BIG_ENDIAN_VAR_HANDLE.get(left, leftOffset + i);
+          long rw = (long) 
BYTE_ARRAY_LONG_VIEW_BIG_ENDIAN_VAR_HANDLE.get(right, rightOffset + i);
+
+          if (lw != rw) {
+            return i + (Long.numberOfLeadingZeros(lw ^ rw) / 
Bytes.SIZEOF_LONG);
+          }
         }
 
-        return result;
-      }
-    }
-
-    static final class UnsafeCommonPrefixer extends CommonPrefixer {
-
-      static {
-        if (!UNSAFE_UNALIGNED) {
-          throw new Error();
+        // The epilogue to cover the last (minLength % stride) elements.
+        for (; i < minLength; i++) {
+          byte il = left.get(leftOffset + i);
+          byte ir = right[rightOffset + i];
+          if (il != ir) {
+            return i;
+          }
         }
-      }
 
-      public UnsafeCommonPrefixer() {
-      }
-
-      @Override
-      public int findCommonPrefix(ByteBuffer left, int leftOffset, int 
leftLength, byte[] right,
-        int rightOffset, int rightLength) {
-        long offset1Adj;
-        Object refObj1 = null;
-        if (left.isDirect()) {
-          offset1Adj = leftOffset + UnsafeAccess.directBufferAddress(left);
-        } else {
-          offset1Adj = leftOffset + left.arrayOffset() + 
UnsafeAccess.BYTE_ARRAY_BASE_OFFSET;
-          refObj1 = left.array();
-        }
-        return findCommonPrefixUnsafe(refObj1, offset1Adj, leftLength, right,
-          rightOffset + UnsafeAccess.BYTE_ARRAY_BASE_OFFSET, rightLength);
+        return i;
       }
 
       @Override
       public int findCommonPrefix(ByteBuffer left, int leftOffset, int 
leftLength, ByteBuffer right,
         int rightOffset, int rightLength) {
-        long offset1Adj, offset2Adj;
-        Object refObj1 = null, refObj2 = null;
-        if (left.isDirect()) {
-          offset1Adj = leftOffset + UnsafeAccess.directBufferAddress(left);
-        } else {
-          offset1Adj = leftOffset + left.arrayOffset() + 
UnsafeAccess.BYTE_ARRAY_BASE_OFFSET;
-          refObj1 = left.array();
+        final int minLength = Math.min(leftLength, rightLength);
+        int strideLimit = minLength & ~(STRIDE - 1);
+        int i;
+
+        for (i = 0; i < strideLimit; i += STRIDE) {
+          // big-endian fetches are faster than little-endian because the 
bytes don't need to get
+          // reversed
+          long lw = (long) 
BYTE_BUFFER_LONG_VIEW_BIG_ENDIAN_VAR_HANDLE.get(left, leftOffset + i);
+          long rw = (long) 
BYTE_BUFFER_LONG_VIEW_BIG_ENDIAN_VAR_HANDLE.get(right, rightOffset + i);
+
+          if (lw != rw) {
+            return i + (Long.numberOfLeadingZeros(lw ^ rw) / 
Bytes.SIZEOF_LONG);
+          }
         }
-        if (right.isDirect()) {
-          offset2Adj = rightOffset + UnsafeAccess.directBufferAddress(right);
-        } else {
-          offset2Adj = rightOffset + right.arrayOffset() + 
UnsafeAccess.BYTE_ARRAY_BASE_OFFSET;
-          refObj2 = right.array();
+
+        // The epilogue to cover the last (minLength % stride) elements.
+        for (; i < minLength; i++) {
+          byte il = left.get(leftOffset + i);
+          byte ir = right.get(rightOffset + i);
+          if (il != ir) {
+            return i;
+          }
         }
-        return findCommonPrefixUnsafe(refObj1, offset1Adj, leftLength, 
refObj2, offset2Adj,
-          rightLength);
+
+        return i;
       }
     }
   }
@@ -580,11 +374,9 @@ public final class ByteBufferUtils {
   }
 
   public static byte toByte(ByteBuffer buffer, int offset) {
-    if (UNSAFE_AVAIL) {
-      return UnsafeAccess.toByte(buffer, offset);
-    } else {
-      return buffer.get(offset);
-    }
+    // For some reason there is no way to get one byte from a ByteBuffer via a 
VarHandle, so
+    // for now just do things the slow way.
+    return buffer.get(offset);
   }
 
   /**
@@ -1055,85 +847,6 @@ public final class ByteBufferUtils {
     return compareTo(buf2, o2, l2, buf1, o1, l1) * -1;
   }
 
-  static int compareToUnsafe(Object obj1, long o1, int l1, Object obj2, long 
o2, int l2) {
-    final int stride = 8;
-    final int minLength = Math.min(l1, l2);
-    int strideLimit = minLength & ~(stride - 1);
-    int i;
-
-    /*
-     * Compare 8 bytes at a time. Benchmarking shows comparing 8 bytes at a 
time is no slower than
-     * comparing 4 bytes at a time even on 32-bit. On the other hand, it is 
substantially faster on
-     * 64-bit.
-     */
-    for (i = 0; i < strideLimit; i += stride) {
-      long lw = HBasePlatformDependent.getLong(obj1, o1 + (long) i);
-      long rw = HBasePlatformDependent.getLong(obj2, o2 + (long) i);
-      if (lw != rw) {
-        if (!UnsafeAccess.LITTLE_ENDIAN) {
-          return ((lw + Long.MIN_VALUE) < (rw + Long.MIN_VALUE)) ? -1 : 1;
-        }
-
-        /*
-         * We want to compare only the first index where left[index] != 
right[index]. This
-         * corresponds to the least significant nonzero byte in lw ^ rw, since 
lw and rw are
-         * little-endian. Long.numberOfTrailingZeros(diff) tells us the least 
significant nonzero
-         * bit, and zeroing out the first three bits of L.nTZ gives us the 
shift to get that least
-         * significant nonzero byte. This comparison logic is based on 
UnsignedBytes from guava v21
-         */
-        int n = Long.numberOfTrailingZeros(lw ^ rw) & ~0x7;
-        return ((int) ((lw >>> n) & 0xFF)) - ((int) ((rw >>> n) & 0xFF));
-      }
-    }
-
-    // The epilogue to cover the last (minLength % stride) elements.
-    for (; i < minLength; i++) {
-      int il = (HBasePlatformDependent.getByte(obj1, o1 + i) & 0xFF);
-      int ir = (HBasePlatformDependent.getByte(obj2, o2 + i) & 0xFF);
-      if (il != ir) {
-        return il - ir;
-      }
-    }
-    return l1 - l2;
-  }
-
-  static int findCommonPrefixUnsafe(Object left, long leftOffset, int 
leftLength, Object right,
-    long rightOffset, int rightLength) {
-    final int stride = 8;
-    final int minLength = Math.min(leftLength, rightLength);
-    int strideLimit = minLength & ~(stride - 1);
-    int result = 0;
-    int i;
-
-    for (i = 0; i < strideLimit; i += stride) {
-      long lw = HBasePlatformDependent.getLong(left, leftOffset + (long) i);
-      long rw = HBasePlatformDependent.getLong(right, rightOffset + (long) i);
-
-      if (lw != rw) {
-        if (!UnsafeAccess.LITTLE_ENDIAN) {
-          return result + (Long.numberOfLeadingZeros(lw ^ rw) / 
Bytes.SIZEOF_LONG);
-        } else {
-          return result + (Long.numberOfTrailingZeros(lw ^ rw) / 
Bytes.SIZEOF_LONG);
-        }
-      } else {
-        result += Bytes.SIZEOF_LONG;
-      }
-    }
-
-    // The epilogue to cover the last (minLength % stride) elements.
-    for (; i < minLength; i++) {
-      byte il = HBasePlatformDependent.getByte(left, leftOffset + i);
-      byte ir = HBasePlatformDependent.getByte(right, rightOffset + i);
-      if (il != ir) {
-        return result;
-      } else {
-        result++;
-      }
-    }
-
-    return result;
-  }
-
   /**
    * Reads a short value at the given buffer's offset.
    * @param buffer input byte buffer to read
@@ -1141,14 +854,16 @@ public final class ByteBufferUtils {
    * @return short value at offset
    */
   public static short toShort(ByteBuffer buffer, int offset) {
-    return ConverterHolder.BEST_CONVERTER.toShort(buffer, offset);
+    return (short) BYTE_BUFFER_SHORT_VIEW_BIG_ENDIAN_VAR_HANDLE.get(buffer, 
offset);
   }
 
   /**
    * Reads an int value at the given buffer's current position. Also advances 
the buffer's position
    */
   public static int toInt(ByteBuffer buffer) {
-    return ConverterHolder.BEST_CONVERTER.toInt(buffer);
+    int i = (int) BYTE_BUFFER_INT_VIEW_BIG_ENDIAN_VAR_HANDLE.get(buffer, 
buffer.position());
+    buffer.position(buffer.position() + Integer.BYTES);
+    return i;
   }
 
   /**
@@ -1158,7 +873,7 @@ public final class ByteBufferUtils {
    * @return int value at offset
    */
   public static int toInt(ByteBuffer buffer, int offset) {
-    return ConverterHolder.BEST_CONVERTER.toInt(buffer, offset);
+    return (int) BYTE_BUFFER_INT_VIEW_BIG_ENDIAN_VAR_HANDLE.get(buffer, 
offset);
   }
 
   /**
@@ -1188,7 +903,7 @@ public final class ByteBufferUtils {
    * @return long value at offset
    */
   public static long toLong(ByteBuffer buffer, int offset) {
-    return ConverterHolder.BEST_CONVERTER.toLong(buffer, offset);
+    return (long) BYTE_BUFFER_LONG_VIEW_BIG_ENDIAN_VAR_HANDLE.get(buffer, 
offset);
   }
 
   /**
@@ -1198,11 +913,13 @@ public final class ByteBufferUtils {
    * @param val    int to write out
    */
   public static void putInt(ByteBuffer buffer, int val) {
-    ConverterHolder.BEST_CONVERTER.putInt(buffer, val);
+    BYTE_BUFFER_INT_VIEW_BIG_ENDIAN_VAR_HANDLE.set(buffer, buffer.position(), 
val);
+    buffer.position(buffer.position() + Integer.BYTES);
   }
 
   public static int putInt(ByteBuffer buffer, int index, int val) {
-    return ConverterHolder.BEST_CONVERTER.putInt(buffer, index, val);
+    BYTE_BUFFER_INT_VIEW_BIG_ENDIAN_VAR_HANDLE.set(buffer, buffer.position(), 
val);
+    return index + Integer.BYTES;
   }
 
   /**
@@ -1239,11 +956,13 @@ public final class ByteBufferUtils {
    * @param val    short to write out
    */
   public static void putShort(ByteBuffer buffer, short val) {
-    ConverterHolder.BEST_CONVERTER.putShort(buffer, val);
+    BYTE_BUFFER_SHORT_VIEW_BIG_ENDIAN_VAR_HANDLE.set(buffer, 
buffer.position(), val);
+    buffer.position(buffer.position() + Short.BYTES);
   }
 
   public static int putShort(ByteBuffer buffer, int index, short val) {
-    return ConverterHolder.BEST_CONVERTER.putShort(buffer, index, val);
+    BYTE_BUFFER_SHORT_VIEW_BIG_ENDIAN_VAR_HANDLE.set(buffer, 
buffer.position(), val);
+    return index + Short.BYTES;
   }
 
   public static int putAsShort(ByteBuffer buf, int index, int val) {
@@ -1260,11 +979,13 @@ public final class ByteBufferUtils {
    * @param val    long to write out
    */
   public static void putLong(ByteBuffer buffer, long val) {
-    ConverterHolder.BEST_CONVERTER.putLong(buffer, val);
+    BYTE_BUFFER_LONG_VIEW_BIG_ENDIAN_VAR_HANDLE.set(buffer, buffer.position(), 
val);
+    buffer.position(buffer.position() + Long.BYTES);
   }
 
   public static int putLong(ByteBuffer buffer, int index, long val) {
-    return ConverterHolder.BEST_CONVERTER.putLong(buffer, index, val);
+    BYTE_BUFFER_LONG_VIEW_BIG_ENDIAN_VAR_HANDLE.set(buffer, buffer.position(), 
val);
+    return index + Long.BYTES;
   }
 
   /**
diff --git a/hbase-common/src/main/java/org/apache/hadoop/hbase/util/Bytes.java 
b/hbase-common/src/main/java/org/apache/hadoop/hbase/util/Bytes.java
index 96b3dbd4a8a..9ccf6447dcf 100644
--- a/hbase-common/src/main/java/org/apache/hadoop/hbase/util/Bytes.java
+++ b/hbase-common/src/main/java/org/apache/hadoop/hbase/util/Bytes.java
@@ -25,9 +25,12 @@ import java.io.DataInput;
 import java.io.DataOutput;
 import java.io.IOException;
 import java.io.UnsupportedEncodingException;
+import java.lang.invoke.MethodHandles;
+import java.lang.invoke.VarHandle;
 import java.math.BigDecimal;
 import java.math.BigInteger;
 import java.nio.ByteBuffer;
+import java.nio.ByteOrder;
 import java.nio.charset.StandardCharsets;
 import java.security.SecureRandom;
 import java.util.ArrayList;
@@ -40,7 +43,6 @@ import java.util.List;
 import java.util.Random;
 import org.apache.hadoop.hbase.Cell;
 import org.apache.hadoop.hbase.CellComparator;
-import org.apache.hadoop.hbase.unsafe.HBasePlatformDependent;
 import org.apache.hadoop.io.RawComparator;
 import org.apache.hadoop.io.WritableComparator;
 import org.apache.hadoop.io.WritableUtils;
@@ -124,7 +126,18 @@ public class Bytes implements Comparable<Bytes> {
   // SizeOf which uses java.lang.instrument says 24 bytes. (3 longs?)
   public static final int ESTIMATED_HEAP_TAX = 16;
 
-  static final boolean UNSAFE_UNALIGNED = HBasePlatformDependent.unaligned();
+  /*
+   * The VarHandles below must be used directly, not via non-static aliases. 
Non-static usage of a
+   * VarHandle is much slower than static usage. Native byte order access is 
used when we are
+   * getting longs for equality comparison and don't care about numeric value. 
Endian-specific is
+   * used when the numeric value matters.
+   */
+  private static final VarHandle BYTE_ARRAY_SHORT_VIEW_BIG_ENDIAN_VAR_HANDLE =
+    MethodHandles.byteArrayViewVarHandle(short[].class, ByteOrder.BIG_ENDIAN);
+  private static final VarHandle BYTE_ARRAY_INT_VIEW_BIG_ENDIAN_VAR_HANDLE =
+    MethodHandles.byteArrayViewVarHandle(int[].class, ByteOrder.BIG_ENDIAN);
+  private static final VarHandle BYTE_ARRAY_LONG_VIEW_BIG_ENDIAN_VAR_HANDLE =
+    MethodHandles.byteArrayViewVarHandle(long[].class, ByteOrder.BIG_ENDIAN);
 
   /**
    * Returns length of the byte array, returning 0 if the array is null. 
Useful for calculating
@@ -718,7 +731,7 @@ public class Bytes implements Comparable<Bytes> {
     if (length != SIZEOF_LONG || offset + length > bytes.length) {
       throw explainWrongLengthOrOffset(bytes, offset, length, SIZEOF_LONG);
     }
-    return ConverterHolder.BEST_CONVERTER.toLong(bytes, offset, length);
+    return (long) BYTE_ARRAY_LONG_VIEW_BIG_ENDIAN_VAR_HANDLE.get(bytes, 
offset);
   }
 
   private static IllegalArgumentException explainWrongLengthOrOffset(final 
byte[] bytes,
@@ -747,7 +760,8 @@ public class Bytes implements Comparable<Bytes> {
       throw new IllegalArgumentException("Not enough room to put a long at" + 
" offset " + offset
         + " in a " + bytes.length + " byte array");
     }
-    return ConverterHolder.BEST_CONVERTER.putLong(bytes, offset, val);
+    BYTE_ARRAY_LONG_VIEW_BIG_ENDIAN_VAR_HANDLE.set(bytes, offset, val);
+    return offset + Long.BYTES;
   }
 
   /**
@@ -867,7 +881,7 @@ public class Bytes implements Comparable<Bytes> {
     if (length != SIZEOF_INT || offset + length > bytes.length) {
       throw explainWrongLengthOrOffset(bytes, offset, length, SIZEOF_INT);
     }
-    return ConverterHolder.BEST_CONVERTER.toInt(bytes, offset, length);
+    return (int) BYTE_ARRAY_INT_VIEW_BIG_ENDIAN_VAR_HANDLE.get(bytes, offset);
   }
 
   /**
@@ -906,7 +920,8 @@ public class Bytes implements Comparable<Bytes> {
       throw new IllegalArgumentException("Not enough room to put an int at" + 
" offset " + offset
         + " in a " + bytes.length + " byte array");
     }
-    return ConverterHolder.BEST_CONVERTER.putInt(bytes, offset, val);
+    BYTE_ARRAY_INT_VIEW_BIG_ENDIAN_VAR_HANDLE.set(bytes, offset, val);
+    return offset + Integer.BYTES;
   }
 
   /**
@@ -954,7 +969,7 @@ public class Bytes implements Comparable<Bytes> {
     if (length != SIZEOF_SHORT || offset + length > bytes.length) {
       throw explainWrongLengthOrOffset(bytes, offset, length, SIZEOF_SHORT);
     }
-    return ConverterHolder.BEST_CONVERTER.toShort(bytes, offset, length);
+    return (short) BYTE_ARRAY_SHORT_VIEW_BIG_ENDIAN_VAR_HANDLE.get(bytes, 
offset);
   }
 
   /**
@@ -982,7 +997,8 @@ public class Bytes implements Comparable<Bytes> {
       throw new IllegalArgumentException("Not enough room to put a short at" + 
" offset " + offset
         + " in a " + bytes.length + " byte array");
     }
-    return ConverterHolder.BEST_CONVERTER.putShort(bytes, offset, val);
+    BYTE_ARRAY_SHORT_VIEW_BIG_ENDIAN_VAR_HANDLE.set(bytes, offset, val);
+    return offset + Short.BYTES;
   }
 
   /**
@@ -1164,236 +1180,27 @@ public class Bytes implements Comparable<Bytes> {
     int compareTo(T buffer1, int offset1, int length1, T buffer2, int offset2, 
int length2);
   }
 
-  static abstract class Converter {
-    abstract long toLong(byte[] bytes, int offset, int length);
-
-    abstract int putLong(byte[] bytes, int offset, long val);
-
-    abstract int toInt(byte[] bytes, int offset, final int length);
-
-    abstract int putInt(byte[] bytes, int offset, int val);
-
-    abstract short toShort(byte[] bytes, int offset, final int length);
-
-    abstract int putShort(byte[] bytes, int offset, short val);
-
-  }
-
   static abstract class CommonPrefixer {
     abstract int findCommonPrefix(byte[] left, int leftOffset, int leftLength, 
byte[] right,
       int rightOffset, int rightLength);
   }
 
-  static Comparer<byte[]> lexicographicalComparerJavaImpl() {
-    return LexicographicalComparerHolder.PureJavaComparer.INSTANCE;
-  }
-
-  static class ConverterHolder {
-    static final String UNSAFE_CONVERTER_NAME =
-      ConverterHolder.class.getName() + "$UnsafeConverter";
-
-    static final Converter BEST_CONVERTER = getBestConverter();
-
-    /**
-     * Returns the Unsafe-using Converter, or falls back to the pure-Java 
implementation if unable
-     * to do so.
-     */
-    static Converter getBestConverter() {
-      try {
-        Class<?> theClass = Class.forName(UNSAFE_CONVERTER_NAME);
-
-        // yes, UnsafeComparer does implement Comparer<byte[]>
-        @SuppressWarnings("unchecked")
-        Converter converter = (Converter) 
theClass.getConstructor().newInstance();
-        return converter;
-      } catch (Throwable t) { // ensure we really catch *everything*
-        return PureJavaConverter.INSTANCE;
-      }
-    }
-
-    protected static final class PureJavaConverter extends Converter {
-      static final PureJavaConverter INSTANCE = new PureJavaConverter();
-
-      private PureJavaConverter() {
-      }
-
-      @Override
-      long toLong(byte[] bytes, int offset, int length) {
-        long l = 0;
-        for (int i = offset; i < offset + length; i++) {
-          l <<= 8;
-          l ^= bytes[i] & 0xFF;
-        }
-        return l;
-      }
-
-      @Override
-      int putLong(byte[] bytes, int offset, long val) {
-        for (int i = offset + 7; i > offset; i--) {
-          bytes[i] = (byte) val;
-          val >>>= 8;
-        }
-        bytes[offset] = (byte) val;
-        return offset + SIZEOF_LONG;
-      }
-
-      @Override
-      int toInt(byte[] bytes, int offset, int length) {
-        int n = 0;
-        for (int i = offset; i < (offset + length); i++) {
-          n <<= 8;
-          n ^= bytes[i] & 0xFF;
-        }
-        return n;
-      }
-
-      @Override
-      int putInt(byte[] bytes, int offset, int val) {
-        for (int i = offset + 3; i > offset; i--) {
-          bytes[i] = (byte) val;
-          val >>>= 8;
-        }
-        bytes[offset] = (byte) val;
-        return offset + SIZEOF_INT;
-      }
-
-      @Override
-      short toShort(byte[] bytes, int offset, int length) {
-        short n = 0;
-        n = (short) ((n ^ bytes[offset]) & 0xFF);
-        n = (short) (n << 8);
-        n ^= (short) (bytes[offset + 1] & 0xFF);
-        return n;
-      }
-
-      @Override
-      int putShort(byte[] bytes, int offset, short val) {
-        bytes[offset + 1] = (byte) val;
-        val >>= 8;
-        bytes[offset] = (byte) val;
-        return offset + SIZEOF_SHORT;
-      }
-    }
-
-    protected static final class UnsafeConverter extends Converter {
-
-      public UnsafeConverter() {
-      }
-
-      static {
-        if (!UNSAFE_UNALIGNED) {
-          // It doesn't matter what we throw;
-          // it's swallowed in getBestComparer().
-          throw new Error();
-        }
-
-        // sanity check - this should never fail
-        if (HBasePlatformDependent.arrayIndexScale(byte[].class) != 1) {
-          throw new AssertionError();
-        }
-      }
-
-      @Override
-      long toLong(byte[] bytes, int offset, int length) {
-        return UnsafeAccess.toLong(bytes, offset);
-      }
-
-      @Override
-      int putLong(byte[] bytes, int offset, long val) {
-        return UnsafeAccess.putLong(bytes, offset, val);
-      }
-
-      @Override
-      int toInt(byte[] bytes, int offset, int length) {
-        return UnsafeAccess.toInt(bytes, offset);
-      }
-
-      @Override
-      int putInt(byte[] bytes, int offset, int val) {
-        return UnsafeAccess.putInt(bytes, offset, val);
-      }
-
-      @Override
-      short toShort(byte[] bytes, int offset, int length) {
-        return UnsafeAccess.toShort(bytes, offset);
-      }
-
-      @Override
-      int putShort(byte[] bytes, int offset, short val) {
-        return UnsafeAccess.putShort(bytes, offset, val);
-      }
-    }
-  }
-
   /**
-   * Provides a lexicographical comparer implementation; either a Java 
implementation or a faster
-   * implementation based on {@code Unsafe}.
-   * <p>
-   * Uses reflection to gracefully fall back to the Java implementation if 
{@code Unsafe} isn't
-   * available.
+   * <a href=
+   * 
"https://github.com/google/guava/blob/v21.0/guava/src/com/google/common/primitives/UnsignedBytes.java#L362";>Adapted
+   * from Guava</a>
    */
   static class LexicographicalComparerHolder {
-    static final String UNSAFE_COMPARER_NAME =
-      LexicographicalComparerHolder.class.getName() + "$UnsafeComparer";
-
     static final Comparer<byte[]> BEST_COMPARER = getBestComparer();
 
-    /**
-     * Returns the Unsafe-using Comparer, or falls back to the pure-Java 
implementation if unable to
-     * do so.
-     */
     static Comparer<byte[]> getBestComparer() {
-      try {
-        Class<?> theClass = Class.forName(UNSAFE_COMPARER_NAME);
-
-        // yes, UnsafeComparer does implement Comparer<byte[]>
-        @SuppressWarnings("unchecked")
-        Comparer<byte[]> comparer = (Comparer<byte[]>) 
theClass.getEnumConstants()[0];
-        return comparer;
-      } catch (Throwable t) { // ensure we really catch *everything*
-        return lexicographicalComparerJavaImpl();
-      }
-    }
-
-    enum PureJavaComparer implements Comparer<byte[]> {
-      INSTANCE;
-
-      @Override
-      public int compareTo(byte[] buffer1, int offset1, int length1, byte[] 
buffer2, int offset2,
-        int length2) {
-        // Short circuit equal case
-        if (buffer1 == buffer2 && offset1 == offset2 && length1 == length2) {
-          return 0;
-        }
-        // Bring WritableComparator code local
-        int end1 = offset1 + length1;
-        int end2 = offset2 + length2;
-        for (int i = offset1, j = offset2; i < end1 && j < end2; i++, j++) {
-          int a = (buffer1[i] & 0xff);
-          int b = (buffer2[j] & 0xff);
-          if (a != b) {
-            return a - b;
-          }
-        }
-        return length1 - length2;
-      }
+      return VarHandleComparer.INSTANCE;
     }
 
-    enum UnsafeComparer implements Comparer<byte[]> {
+    enum VarHandleComparer implements Comparer<byte[]> {
       INSTANCE;
 
-      static {
-        if (!UNSAFE_UNALIGNED) {
-          // It doesn't matter what we throw;
-          // it's swallowed in getBestComparer().
-          throw new Error();
-        }
-
-        // sanity check - this should never fail
-        if (HBasePlatformDependent.arrayIndexScale(byte[].class) != 1) {
-          throw new AssertionError();
-        }
-      }
+      private static final int STRIDE = Long.BYTES;
 
       /**
        * Lexicographically compare two arrays.
@@ -1410,38 +1217,24 @@ public class Bytes implements Comparable<Bytes> {
         int length2) {
 
         // Short circuit equal case
-        if (buffer1 == buffer2 && offset1 == offset2 && length1 == length2) {
+        if ((buffer1 == buffer2) && (offset1 == offset2) && (length1 == 
length2)) {
           return 0;
         }
-        final int stride = 8;
         final int minLength = Math.min(length1, length2);
-        int strideLimit = minLength & ~(stride - 1);
-        final long offset1Adj = offset1 + UnsafeAccess.BYTE_ARRAY_BASE_OFFSET;
-        final long offset2Adj = offset2 + UnsafeAccess.BYTE_ARRAY_BASE_OFFSET;
+        int strideLimit = minLength & ~(STRIDE - 1);
         int i;
 
         /*
          * Compare 8 bytes at a time. Benchmarking on x86 shows a stride of 8 
bytes is no slower
          * than 4 bytes even on 32-bit. On the other hand, it is substantially 
faster on 64-bit.
          */
-        for (i = 0; i < strideLimit; i += stride) {
-          long lw = HBasePlatformDependent.getLong(buffer1, offset1Adj + i);
-          long rw = HBasePlatformDependent.getLong(buffer2, offset2Adj + i);
+        for (i = 0; i < strideLimit; i += STRIDE) {
+          // big-endian fetches are faster than little-endian because the 
bytes don't need to get
+          // reversed
+          long lw = (long) 
BYTE_ARRAY_LONG_VIEW_BIG_ENDIAN_VAR_HANDLE.get(buffer1, offset1 + i);
+          long rw = (long) 
BYTE_ARRAY_LONG_VIEW_BIG_ENDIAN_VAR_HANDLE.get(buffer2, offset2 + i);
           if (lw != rw) {
-            if (!UnsafeAccess.LITTLE_ENDIAN) {
-              return ((lw + Long.MIN_VALUE) < (rw + Long.MIN_VALUE)) ? -1 : 1;
-            }
-
-            /*
-             * We want to compare only the first index where left[index] != 
right[index]. This
-             * corresponds to the least significant nonzero byte in lw ^ rw, 
since lw and rw are
-             * little-endian. Long.numberOfTrailingZeros(diff) tells us the 
least significant
-             * nonzero bit, and zeroing out the first three bits of L.nTZ 
gives us the shift to get
-             * that least significant nonzero byte. This comparison logic is 
based on UnsignedBytes
-             * comparator from guava v21
-             */
-            int n = Long.numberOfTrailingZeros(lw ^ rw) & ~0x7;
-            return ((int) ((lw >>> n) & 0xFF)) - ((int) ((rw >>> n) & 0xFF));
+            return ((lw + Long.MIN_VALUE) < (rw + Long.MIN_VALUE)) ? -1 : 1;
           }
         }
 
@@ -1459,79 +1252,31 @@ public class Bytes implements Comparable<Bytes> {
   }
 
   static class CommonPrefixerHolder {
-    static final String UNSAFE_COMMON_PREFIXER_NAME =
-      CommonPrefixerHolder.class.getName() + "$UnsafeCommonPrefixer";
-
     static final CommonPrefixer BEST_COMMON_PREFIXER = getBestCommonPrefixer();
 
     static CommonPrefixer getBestCommonPrefixer() {
-      try {
-        Class<? extends CommonPrefixer> theClass =
-          
Class.forName(UNSAFE_COMMON_PREFIXER_NAME).asSubclass(CommonPrefixer.class);
-
-        return theClass.getConstructor().newInstance();
-      } catch (Throwable t) { // ensure we really catch *everything*
-        return CommonPrefixerHolder.PureJavaCommonPrefixer.INSTANCE;
-      }
+      return VarHandleCommonPrefixer.INSTANCE;
     }
 
-    static final class PureJavaCommonPrefixer extends CommonPrefixer {
-      static final PureJavaCommonPrefixer INSTANCE = new 
PureJavaCommonPrefixer();
+    static final class VarHandleCommonPrefixer extends CommonPrefixer {
+      static final VarHandleCommonPrefixer INSTANCE = new 
VarHandleCommonPrefixer();
 
-      private PureJavaCommonPrefixer() {
-      }
+      private static final int STRIDE = Long.BYTES;
 
       @Override
       public int findCommonPrefix(byte[] left, int leftOffset, int leftLength, 
byte[] right,
         int rightOffset, int rightLength) {
-        int length = Math.min(leftLength, rightLength);
-        int result = 0;
-
-        while (result < length && left[leftOffset + result] == 
right[rightOffset + result]) {
-          result++;
-        }
-        return result;
-      }
-    }
-
-    static final class UnsafeCommonPrefixer extends CommonPrefixer {
-
-      static {
-        if (!UNSAFE_UNALIGNED) {
-          throw new Error();
-        }
-
-        // sanity check - this should never fail
-        if (HBasePlatformDependent.arrayIndexScale(byte[].class) != 1) {
-          throw new AssertionError();
-        }
-      }
-
-      public UnsafeCommonPrefixer() {
-      }
-
-      @Override
-      public int findCommonPrefix(byte[] left, int leftOffset, int leftLength, 
byte[] right,
-        int rightOffset, int rightLength) {
-        final int stride = 8;
         final int minLength = Math.min(leftLength, rightLength);
-        int strideLimit = minLength & ~(stride - 1);
-        final long leftOffsetAdj = leftOffset + 
UnsafeAccess.BYTE_ARRAY_BASE_OFFSET;
-        final long rightOffsetAdj = rightOffset + 
UnsafeAccess.BYTE_ARRAY_BASE_OFFSET;
-        int result = 0;
+        int strideLimit = minLength & ~(STRIDE - 1);
         int i;
 
-        for (i = 0; i < strideLimit; i += stride) {
-          long lw = HBasePlatformDependent.getLong(left, leftOffsetAdj + i);
-          long rw = HBasePlatformDependent.getLong(right, rightOffsetAdj + i);
+        for (i = 0; i < strideLimit; i += STRIDE) {
+          // big-endian fetches are faster than little-endian because the 
bytes don't need to get
+          // reversed
+          long lw = (long) 
BYTE_ARRAY_LONG_VIEW_BIG_ENDIAN_VAR_HANDLE.get(left, leftOffset + i);
+          long rw = (long) 
BYTE_ARRAY_LONG_VIEW_BIG_ENDIAN_VAR_HANDLE.get(right, rightOffset + i);
           if (lw != rw) {
-            if (!UnsafeAccess.LITTLE_ENDIAN) {
-              return result + (Long.numberOfLeadingZeros(lw ^ rw) / 
Bytes.SIZEOF_LONG);
-            } else {
-              return result + (Long.numberOfTrailingZeros(lw ^ rw) / 
Bytes.SIZEOF_LONG);
-            }
-          } else {
-            result += Bytes.SIZEOF_LONG;
+            return i + (Long.numberOfLeadingZeros(lw ^ rw) / 
Bytes.SIZEOF_LONG);
           }
         }
 
@@ -1540,13 +1285,11 @@ public class Bytes implements Comparable<Bytes> {
           int il = (left[leftOffset + i]);
           int ir = (right[rightOffset + i]);
           if (il != ir) {
-            return result;
-          } else {
-            result++;
+            return i;
           }
         }
 
-        return result;
+        return i;
       }
     }
   }
diff --git 
a/hbase-common/src/main/java/org/apache/hadoop/hbase/util/UnsafeAccess.java 
b/hbase-common/src/main/java/org/apache/hadoop/hbase/util/UnsafeAccess.java
index 3aa8a6ec123..fb46058210d 100644
--- a/hbase-common/src/main/java/org/apache/hadoop/hbase/util/UnsafeAccess.java
+++ b/hbase-common/src/main/java/org/apache/hadoop/hbase/util/UnsafeAccess.java
@@ -18,7 +18,6 @@
 package org.apache.hadoop.hbase.util;
 
 import java.nio.ByteBuffer;
-import java.nio.ByteOrder;
 import org.apache.hadoop.hbase.unsafe.HBasePlatformDependent;
 import org.apache.yetus.audience.InterfaceAudience;
 import org.apache.yetus.audience.InterfaceStability;
@@ -32,9 +31,6 @@ public final class UnsafeAccess {
   /** The offset to the first element in a byte array. */
   public static final long BYTE_ARRAY_BASE_OFFSET;
 
-  public static final boolean LITTLE_ENDIAN =
-    ByteOrder.nativeOrder().equals(ByteOrder.LITTLE_ENDIAN);
-
   // This number limits the number of bytes to copy per call to Unsafe's
   // copyMemory method. A limit is imposed to allow for safepoint polling
   // during a large copy
@@ -50,249 +46,6 @@ public final class UnsafeAccess {
   private UnsafeAccess() {
   }
 
-  // APIs to read primitive data from a byte[] using Unsafe way
-  /**
-   * Converts a byte array to a short value considering it was written in 
big-endian format.
-   * @param bytes  byte array
-   * @param offset offset into array
-   * @return the short value
-   */
-  public static short toShort(byte[] bytes, int offset) {
-    if (LITTLE_ENDIAN) {
-      return Short
-        .reverseBytes(HBasePlatformDependent.getShort(bytes, offset + 
BYTE_ARRAY_BASE_OFFSET));
-    } else {
-      return HBasePlatformDependent.getShort(bytes, offset + 
BYTE_ARRAY_BASE_OFFSET);
-    }
-  }
-
-  /**
-   * Converts a byte array to an int value considering it was written in 
big-endian format.
-   * @param bytes  byte array
-   * @param offset offset into array
-   * @return the int value
-   */
-  public static int toInt(byte[] bytes, int offset) {
-    if (LITTLE_ENDIAN) {
-      return Integer
-        .reverseBytes(HBasePlatformDependent.getInt(bytes, offset + 
BYTE_ARRAY_BASE_OFFSET));
-    } else {
-      return HBasePlatformDependent.getInt(bytes, offset + 
BYTE_ARRAY_BASE_OFFSET);
-    }
-  }
-
-  /**
-   * Converts a byte array to a long value considering it was written in 
big-endian format.
-   * @param bytes  byte array
-   * @param offset offset into array
-   * @return the long value
-   */
-  public static long toLong(byte[] bytes, int offset) {
-    if (LITTLE_ENDIAN) {
-      return Long
-        .reverseBytes(HBasePlatformDependent.getLong(bytes, offset + 
BYTE_ARRAY_BASE_OFFSET));
-    } else {
-      return HBasePlatformDependent.getLong(bytes, offset + 
BYTE_ARRAY_BASE_OFFSET);
-    }
-  }
-
-  // APIs to write primitive data to a byte[] using Unsafe way
-  /**
-   * Put a short value out to the specified byte array position in big-endian 
format.
-   * @param bytes  the byte array
-   * @param offset position in the array
-   * @param val    short to write out
-   * @return incremented offset
-   */
-  public static int putShort(byte[] bytes, int offset, short val) {
-    if (LITTLE_ENDIAN) {
-      val = Short.reverseBytes(val);
-    }
-    HBasePlatformDependent.putShort(bytes, offset + BYTE_ARRAY_BASE_OFFSET, 
val);
-    return offset + Bytes.SIZEOF_SHORT;
-  }
-
-  /**
-   * Put an int value out to the specified byte array position in big-endian 
format.
-   * @param bytes  the byte array
-   * @param offset position in the array
-   * @param val    int to write out
-   * @return incremented offset
-   */
-  public static int putInt(byte[] bytes, int offset, int val) {
-    if (LITTLE_ENDIAN) {
-      val = Integer.reverseBytes(val);
-    }
-    HBasePlatformDependent.putInt(bytes, offset + BYTE_ARRAY_BASE_OFFSET, val);
-    return offset + Bytes.SIZEOF_INT;
-  }
-
-  /**
-   * Put a long value out to the specified byte array position in big-endian 
format.
-   * @param bytes  the byte array
-   * @param offset position in the array
-   * @param val    long to write out
-   * @return incremented offset
-   */
-  public static int putLong(byte[] bytes, int offset, long val) {
-    if (LITTLE_ENDIAN) {
-      val = Long.reverseBytes(val);
-    }
-    HBasePlatformDependent.putLong(bytes, offset + BYTE_ARRAY_BASE_OFFSET, 
val);
-    return offset + Bytes.SIZEOF_LONG;
-  }
-
-  // APIs to read primitive data from a ByteBuffer using Unsafe way
-  /**
-   * Reads a short value at the given buffer's offset considering it was 
written in big-endian
-   * format.
-   * @return short value at offset
-   */
-  public static short toShort(ByteBuffer buf, int offset) {
-    if (LITTLE_ENDIAN) {
-      return Short.reverseBytes(getAsShort(buf, offset));
-    }
-    return getAsShort(buf, offset);
-  }
-
-  /**
-   * Reads a short value at the given Object's offset considering it was 
written in big-endian
-   * format.
-   * @return short value at offset
-   */
-  public static short toShort(Object ref, long offset) {
-    if (LITTLE_ENDIAN) {
-      return Short.reverseBytes(HBasePlatformDependent.getShort(ref, offset));
-    }
-    return HBasePlatformDependent.getShort(ref, offset);
-  }
-
-  /**
-   * Reads bytes at the given offset as a short value.
-   * @return short value at offset
-   */
-  private static short getAsShort(ByteBuffer buf, int offset) {
-    if (buf.isDirect()) {
-      return HBasePlatformDependent.getShort(directBufferAddress(buf) + 
offset);
-    }
-    return HBasePlatformDependent.getShort(buf.array(),
-      BYTE_ARRAY_BASE_OFFSET + buf.arrayOffset() + offset);
-  }
-
-  /**
-   * Reads an int value at the given buffer's offset considering it was 
written in big-endian
-   * format.
-   * @return int value at offset
-   */
-  public static int toInt(ByteBuffer buf, int offset) {
-    if (LITTLE_ENDIAN) {
-      return Integer.reverseBytes(getAsInt(buf, offset));
-    }
-    return getAsInt(buf, offset);
-  }
-
-  /**
-   * Reads a int value at the given Object's offset considering it was written 
in big-endian format.
-   * @return int value at offset
-   */
-  public static int toInt(Object ref, long offset) {
-    if (LITTLE_ENDIAN) {
-      return Integer.reverseBytes(HBasePlatformDependent.getInt(ref, offset));
-    }
-    return HBasePlatformDependent.getInt(ref, offset);
-  }
-
-  /**
-   * Reads bytes at the given offset as an int value.
-   * @return int value at offset
-   */
-  private static int getAsInt(ByteBuffer buf, int offset) {
-    if (buf.isDirect()) {
-      return HBasePlatformDependent.getInt(directBufferAddress(buf) + offset);
-    }
-    return HBasePlatformDependent.getInt(buf.array(),
-      BYTE_ARRAY_BASE_OFFSET + buf.arrayOffset() + offset);
-  }
-
-  /**
-   * Reads a long value at the given buffer's offset considering it was 
written in big-endian
-   * format.
-   * @return long value at offset
-   */
-  public static long toLong(ByteBuffer buf, int offset) {
-    if (LITTLE_ENDIAN) {
-      return Long.reverseBytes(getAsLong(buf, offset));
-    }
-    return getAsLong(buf, offset);
-  }
-
-  /**
-   * Reads a long value at the given Object's offset considering it was 
written in big-endian
-   * format.
-   * @return long value at offset
-   */
-  public static long toLong(Object ref, long offset) {
-    if (LITTLE_ENDIAN) {
-      return Long.reverseBytes(HBasePlatformDependent.getLong(ref, offset));
-    }
-    return HBasePlatformDependent.getLong(ref, offset);
-  }
-
-  /**
-   * Reads bytes at the given offset as a long value.
-   * @return long value at offset
-   */
-  private static long getAsLong(ByteBuffer buf, int offset) {
-    if (buf.isDirect()) {
-      return HBasePlatformDependent.getLong(directBufferAddress(buf) + offset);
-    }
-    return HBasePlatformDependent.getLong(buf.array(),
-      BYTE_ARRAY_BASE_OFFSET + buf.arrayOffset() + offset);
-  }
-
-  /**
-   * Returns the byte at the given offset
-   * @param buf    the buffer to read
-   * @param offset the offset at which the byte has to be read
-   * @return the byte at the given offset
-   */
-  public static byte toByte(ByteBuffer buf, int offset) {
-    if (buf.isDirect()) {
-      return HBasePlatformDependent.getByte(directBufferAddress(buf) + offset);
-    } else {
-      return HBasePlatformDependent.getByte(buf.array(),
-        BYTE_ARRAY_BASE_OFFSET + buf.arrayOffset() + offset);
-    }
-  }
-
-  /**
-   * Returns the byte at the given offset of the object
-   * @return the byte at the given offset
-   */
-  public static byte toByte(Object ref, long offset) {
-    return HBasePlatformDependent.getByte(ref, offset);
-  }
-
-  /**
-   * Put an int value out to the specified ByteBuffer offset in big-endian 
format.
-   * @param buf    the ByteBuffer to write to
-   * @param offset offset in the ByteBuffer
-   * @param val    int to write out
-   * @return incremented offset
-   */
-  public static int putInt(ByteBuffer buf, int offset, int val) {
-    if (LITTLE_ENDIAN) {
-      val = Integer.reverseBytes(val);
-    }
-    if (buf.isDirect()) {
-      HBasePlatformDependent.putInt(directBufferAddress(buf) + offset, val);
-    } else {
-      HBasePlatformDependent.putInt(buf.array(),
-        offset + buf.arrayOffset() + BYTE_ARRAY_BASE_OFFSET, val);
-    }
-    return offset + Bytes.SIZEOF_INT;
-  }
-
   // APIs to copy data. This will be direct memory location copy and will be 
much faster
   /**
    * Copies the bytes from given array's offset to length part into the given 
buffer.
@@ -375,47 +128,6 @@ public final class UnsafeAccess {
     unsafeCopy(srcBase, srcAddress, destBase, destAddress, length);
   }
 
-  // APIs to add primitives to BBs
-  /**
-   * Put a short value out to the specified BB position in big-endian format.
-   * @param buf    the byte buffer
-   * @param offset position in the buffer
-   * @param val    short to write out
-   * @return incremented offset
-   */
-  public static int putShort(ByteBuffer buf, int offset, short val) {
-    if (LITTLE_ENDIAN) {
-      val = Short.reverseBytes(val);
-    }
-    if (buf.isDirect()) {
-      HBasePlatformDependent.putShort(directBufferAddress(buf) + offset, val);
-    } else {
-      HBasePlatformDependent.putShort(buf.array(),
-        BYTE_ARRAY_BASE_OFFSET + buf.arrayOffset() + offset, val);
-    }
-    return offset + Bytes.SIZEOF_SHORT;
-  }
-
-  /**
-   * Put a long value out to the specified BB position in big-endian format.
-   * @param buf    the byte buffer
-   * @param offset position in the buffer
-   * @param val    long to write out
-   * @return incremented offset
-   */
-  public static int putLong(ByteBuffer buf, int offset, long val) {
-    if (LITTLE_ENDIAN) {
-      val = Long.reverseBytes(val);
-    }
-    if (buf.isDirect()) {
-      HBasePlatformDependent.putLong(directBufferAddress(buf) + offset, val);
-    } else {
-      HBasePlatformDependent.putLong(buf.array(),
-        BYTE_ARRAY_BASE_OFFSET + buf.arrayOffset() + offset, val);
-    }
-    return offset + Bytes.SIZEOF_LONG;
-  }
-
   /**
    * Put a byte value out to the specified BB position in big-endian format.
    * @param buf    the byte buffer
@@ -436,9 +148,4 @@ public final class UnsafeAccess {
   public static long directBufferAddress(ByteBuffer buf) {
     return PlatformDependent.directBufferAddress(buf);
   }
-
-  public static void freeDirectBuffer(ByteBuffer buffer) {
-    // here we just use the method in netty
-    PlatformDependent.freeDirectBuffer(buffer);
-  }
 }
diff --git 
a/hbase-common/src/test/java/org/apache/hadoop/hbase/util/ByteBufferUtilsTestBase.java
 
b/hbase-common/src/test/java/org/apache/hadoop/hbase/util/ByteBufferUtilsTestBase.java
index 0c7c00fbfd6..80b701601dc 100644
--- 
a/hbase-common/src/test/java/org/apache/hadoop/hbase/util/ByteBufferUtilsTestBase.java
+++ 
b/hbase-common/src/test/java/org/apache/hadoop/hbase/util/ByteBufferUtilsTestBase.java
@@ -503,6 +503,174 @@ public class ByteBufferUtilsTestBase {
     assertEquals(0, result);
   }
 
+  @Test
+  public void testCompareToEmptyBuffers() {
+    ByteBuffer empty1 = ByteBuffer.allocate(0);
+    ByteBuffer empty2 = ByteBuffer.allocate(0);
+    ByteBuffer nonEmpty = ByteBuffer.allocate(10);
+    fillBB(nonEmpty, (byte) 1);
+    byte[] emptyArray = new byte[0];
+    byte[] nonEmptyArray = new byte[10];
+    fillArray(nonEmptyArray, (byte) 1);
+
+    // two empty buffers are equal
+    assertEquals(0, ByteBufferUtils.compareTo(empty1, 0, 0, empty2, 0, 0));
+    // nonempty buffer is greater than empty
+    assertTrue(ByteBufferUtils.compareTo(nonEmpty, 0, 10, empty1, 0, 0) > 0);
+    // empty buffer is less than nonempty
+    assertTrue(ByteBufferUtils.compareTo(empty1, 0, 0, nonEmpty, 0, 10) < 0);
+    // empty buffer is equal to empty array
+    assertEquals(0, ByteBufferUtils.compareTo(empty1, 0, 0, emptyArray, 0, 0));
+    // nonempty buffer is greater than empty array
+    assertTrue(ByteBufferUtils.compareTo(nonEmpty, 0, 10, emptyArray, 0, 0) > 
0);
+    // empty buffer is less than nonempty array
+    assertTrue(ByteBufferUtils.compareTo(empty1, 0, 0, nonEmptyArray, 0, 10) < 
0);
+  }
+
+  @Test
+  public void testCompareToSingleByte() {
+    ByteBuffer bb1 = ByteBuffer.allocate(1);
+    ByteBuffer bb2 = ByteBuffer.allocate(1);
+    byte[] b1 = new byte[1];
+    byte[] b2 = new byte[1];
+
+    bb1.put(0, (byte) 5);
+    bb2.put(0, (byte) 10);
+    b1[0] = (byte) 5;
+    b2[0] = (byte) 10;
+
+    // 5 < 10
+    assertTrue(ByteBufferUtils.compareTo(bb1, 0, 1, bb2, 0, 1) < 0);
+    // 5 < 10
+    assertTrue(ByteBufferUtils.compareTo(bb1, 0, 1, b2, 0, 1) < 0);
+    // 10 > 5
+    assertTrue(ByteBufferUtils.compareTo(bb2, 0, 1, b1, 0, 1) > 0);
+    // 5 == 5
+    assertEquals(0, ByteBufferUtils.compareTo(bb1, 0, 1, b1, 0, 1));
+  }
+
+  @Test
+  public void testCompareToLengthDifferences() {
+    ByteBuffer longer = ByteBuffer.allocate(20);
+    ByteBuffer shorter = ByteBuffer.allocate(10);
+    fillBB(longer, (byte) 5);
+    fillBB(shorter, (byte) 5);
+
+    // long buffer > short buffer
+    assertTrue(ByteBufferUtils.compareTo(longer, 0, 20, shorter, 0, 10) > 0);
+    // short buffer < long buffer
+    assertTrue(ByteBufferUtils.compareTo(shorter, 0, 10, longer, 0, 20) < 0);
+    // short slice of long buffer == short buffer
+    assertEquals(0, ByteBufferUtils.compareTo(longer, 0, 10, shorter, 0, 10));
+
+    byte[] longerArray = new byte[20];
+    byte[] shorterArray = new byte[10];
+    fillArray(longerArray, (byte) 5);
+    fillArray(shorterArray, (byte) 5);
+
+    // long buffer > short array
+    assertTrue(ByteBufferUtils.compareTo(longer, 0, 20, shorterArray, 0, 10) > 
0);
+    // short buffer < long array
+    assertTrue(ByteBufferUtils.compareTo(shorter, 0, 10, longerArray, 0, 20) < 
0);
+  }
+
+  @Test
+  public void testCompareToWithOffsets() {
+    ByteBuffer bb = ByteBuffer.allocate(30);
+    for (int i = 0; i < 30; i++) {
+      bb.put(i, (byte) (i % 10));
+    }
+
+    // equal slices of equal buffers are equal
+    assertEquals(0, ByteBufferUtils.compareTo(bb, 5, 10, bb, 5, 10));
+    assertEquals(0, ByteBufferUtils.compareTo(bb, 0, 10, bb, 10, 10));
+
+    // nonequal slices of equal buffers are nonequal
+    assertTrue(ByteBufferUtils.compareTo(bb, 0, 10, bb, 11, 10) < 0);
+    assertTrue(ByteBufferUtils.compareTo(bb, 11, 10, bb, 0, 10) > 0);
+
+    byte[] array = new byte[10];
+    for (int i = 0; i < 10; i++) {
+      array[i] = (byte) (i % 10);
+    }
+    // equal slices of buffers + arrays are equal
+    assertEquals(0, ByteBufferUtils.compareTo(bb, 0, 10, array, 0, 10));
+    assertEquals(0, ByteBufferUtils.compareTo(bb, 10, 10, array, 0, 10));
+  }
+
+  @Test
+  public void testCompareToDirectVsHeapBuffers() {
+    ByteBuffer heap = ByteBuffer.allocate(100);
+    ByteBuffer direct = ByteBuffer.allocateDirect(100);
+    fillBB(heap, (byte) 42);
+    fillBB(direct, (byte) 42);
+
+    // equal buffers are equal, even if direct versus heap
+    assertEquals(0, ByteBufferUtils.compareTo(heap, 0, 100, direct, 0, 100));
+    assertEquals(0, ByteBufferUtils.compareTo(direct, 0, 100, heap, 0, 100));
+
+    heap.put(50, (byte) 41);
+    assertTrue(ByteBufferUtils.compareTo(heap, 0, 100, direct, 0, 100) < 0);
+    assertTrue(ByteBufferUtils.compareTo(direct, 0, 100, heap, 0, 100) > 0);
+
+    heap.put(50, (byte) 43);
+    assertTrue(ByteBufferUtils.compareTo(heap, 0, 100, direct, 0, 100) > 0);
+    assertTrue(ByteBufferUtils.compareTo(direct, 0, 100, heap, 0, 100) < 0);
+  }
+
+  @Test
+  public void testCompareToStrideBoundaries() {
+    ByteBuffer bb1 = ByteBuffer.allocate(100);
+    ByteBuffer bb2 = ByteBuffer.allocate(100);
+    fillBB(bb1, (byte) 5);
+    fillBB(bb2, (byte) 5);
+
+    bb1.put(7, (byte) 6);
+    assertTrue(ByteBufferUtils.compareTo(bb1, 0, 100, bb2, 0, 100) > 0);
+
+    fillBB(bb1, (byte) 5);
+    bb1.put(8, (byte) 6);
+    assertTrue(ByteBufferUtils.compareTo(bb1, 0, 100, bb2, 0, 100) > 0);
+
+    fillBB(bb1, (byte) 5);
+    bb1.put(16, (byte) 6);
+    assertTrue(ByteBufferUtils.compareTo(bb1, 0, 100, bb2, 0, 100) > 0);
+
+    fillBB(bb1, (byte) 5);
+    bb1.put(24, (byte) 6);
+    assertTrue(ByteBufferUtils.compareTo(bb1, 0, 100, bb2, 0, 100) > 0);
+
+    fillBB(bb1, (byte) 5);
+    bb1.put(31, (byte) 6);
+    assertTrue(ByteBufferUtils.compareTo(bb1, 0, 100, bb2, 0, 100) > 0);
+  }
+
+  @Test
+  public void testCompareToUnsignedBytes() {
+    ByteBuffer bb1 = ByteBuffer.allocate(10);
+    ByteBuffer bb2 = ByteBuffer.allocate(10);
+    fillBB(bb1, (byte) 0);
+    fillBB(bb2, (byte) 0);
+
+    bb1.put(5, (byte) 127);
+    bb2.put(5, (byte) -128);
+    assertTrue(ByteBufferUtils.compareTo(bb1, 0, 10, bb2, 0, 10) < 0);
+
+    fillBB(bb1, (byte) 0);
+    fillBB(bb2, (byte) 0);
+    bb1.put(5, (byte) -1);
+    bb2.put(5, (byte) 0);
+    assertTrue(ByteBufferUtils.compareTo(bb1, 0, 10, bb2, 0, 10) > 0);
+
+    byte[] b1 = new byte[10];
+    byte[] b2 = new byte[10];
+    Arrays.fill(b1, (byte) 0);
+    Arrays.fill(b2, (byte) 0);
+    b1[5] = (byte) -1;
+    b2[5] = (byte) 0;
+    assertTrue(ByteBufferUtils.compareTo(bb1, 0, 10, b2, 0, 10) > 0);
+  }
+
   @Test
   public void testEquals() {
     byte[] a = Bytes.toBytes("http://A";);
@@ -558,6 +726,768 @@ public class ByteBufferUtilsTestBase {
       ByteBufferUtils.findCommonPrefix(bb1, 0, bb1.remaining(), bb2, 0, 
bb2.remaining()));
   }
 
+  @Test
+  public void testConverterToShort() {
+    ByteBuffer heap = ByteBuffer.allocate(20);
+    ByteBuffer direct = ByteBuffer.allocateDirect(20);
+
+    short[] testValues =
+      { 0, 1, -1, Short.MIN_VALUE, Short.MAX_VALUE, 127, -128, 255, -255, 
1000, -1000 };
+
+    for (short value : testValues) {
+      heap.clear();
+      direct.clear();
+
+      heap.putShort(0, value);
+      heap.putShort(5, value);
+      heap.putShort(10, value);
+      direct.putShort(0, value);
+      direct.putShort(5, value);
+      direct.putShort(10, value);
+
+      assertEquals(value, ByteBufferUtils.toShort(heap, 0));
+      assertEquals(value, ByteBufferUtils.toShort(heap, 5));
+      assertEquals(value, ByteBufferUtils.toShort(heap, 10));
+      assertEquals(value, ByteBufferUtils.toShort(direct, 0));
+      assertEquals(value, ByteBufferUtils.toShort(direct, 5));
+      assertEquals(value, ByteBufferUtils.toShort(direct, 10));
+    }
+  }
+
+  @Test
+  public void testConverterToIntWithPosition() {
+    ByteBuffer heap = ByteBuffer.allocate(20);
+    ByteBuffer direct = ByteBuffer.allocateDirect(20);
+
+    int[] testValues = { 0, 1, -1, Integer.MIN_VALUE, Integer.MAX_VALUE, 127, 
-128, 65535, -65535,
+      1000000, -1000000 };
+
+    for (int value : testValues) {
+      heap.clear();
+      direct.clear();
+
+      heap.putInt(0, value);
+      heap.putInt(4, value + 1);
+      heap.putInt(8, value + 2);
+      direct.putInt(0, value);
+      direct.putInt(4, value + 1);
+      direct.putInt(8, value + 2);
+
+      heap.position(0);
+      assertEquals(value, ByteBufferUtils.toInt(heap));
+      assertEquals(4, heap.position());
+      assertEquals(value + 1, ByteBufferUtils.toInt(heap));
+      assertEquals(8, heap.position());
+
+      direct.position(0);
+      assertEquals(value, ByteBufferUtils.toInt(direct));
+      assertEquals(4, direct.position());
+      assertEquals(value + 1, ByteBufferUtils.toInt(direct));
+      assertEquals(8, direct.position());
+    }
+  }
+
+  @Test
+  public void testConverterToIntWithOffset() {
+    ByteBuffer heap = ByteBuffer.allocate(20);
+    ByteBuffer direct = ByteBuffer.allocateDirect(20);
+
+    int[] testValues = { 0, 1, -1, Integer.MIN_VALUE, Integer.MAX_VALUE, 127, 
-128, 65535, -65535,
+      1000000, -1000000 };
+
+    for (int value : testValues) {
+      heap.clear();
+      direct.clear();
+
+      heap.putInt(0, value);
+      heap.putInt(4, value);
+      heap.putInt(12, value);
+      direct.putInt(0, value);
+      direct.putInt(4, value);
+      direct.putInt(12, value);
+
+      assertEquals(value, ByteBufferUtils.toInt(heap, 0));
+      assertEquals(value, ByteBufferUtils.toInt(heap, 4));
+      assertEquals(value, ByteBufferUtils.toInt(heap, 12));
+      assertEquals(value, ByteBufferUtils.toInt(direct, 0));
+      assertEquals(value, ByteBufferUtils.toInt(direct, 4));
+      assertEquals(value, ByteBufferUtils.toInt(direct, 12));
+    }
+  }
+
+  @Test
+  public void testConverterToLong() {
+    ByteBuffer heap = ByteBuffer.allocate(32);
+    ByteBuffer direct = ByteBuffer.allocateDirect(32);
+
+    long[] testValues = { 0L, 1L, -1L, Long.MIN_VALUE, Long.MAX_VALUE, 127L, 
-128L, 65535L, -65535L,
+      Integer.MAX_VALUE + 1L, Integer.MIN_VALUE - 1L, 1000000000000L, 
-1000000000000L };
+
+    for (long value : testValues) {
+      heap.clear();
+      direct.clear();
+
+      heap.putLong(0, value);
+      heap.putLong(8, value);
+      heap.putLong(16, value);
+      direct.putLong(0, value);
+      direct.putLong(8, value);
+      direct.putLong(16, value);
+
+      assertEquals(value, ByteBufferUtils.toLong(heap, 0));
+      assertEquals(value, ByteBufferUtils.toLong(heap, 8));
+      assertEquals(value, ByteBufferUtils.toLong(heap, 16));
+      assertEquals(value, ByteBufferUtils.toLong(direct, 0));
+      assertEquals(value, ByteBufferUtils.toLong(direct, 8));
+      assertEquals(value, ByteBufferUtils.toLong(direct, 16));
+    }
+  }
+
+  @Test
+  public void testConverterPutShortWithPosition() {
+    ByteBuffer heap = ByteBuffer.allocate(20);
+    ByteBuffer direct = ByteBuffer.allocateDirect(20);
+
+    short[] testValues =
+      { 0, 1, -1, Short.MIN_VALUE, Short.MAX_VALUE, 127, -128, 255, -255, 
1000, -1000 };
+
+    for (short value : testValues) {
+      heap.clear();
+      direct.clear();
+
+      heap.position(0);
+      ByteBufferUtils.putShort(heap, value);
+      assertEquals(2, heap.position());
+      ByteBufferUtils.putShort(heap, (short) (value + 1));
+      assertEquals(4, heap.position());
+
+      heap.position(0);
+      assertEquals(value, heap.getShort());
+      assertEquals((short) (value + 1), heap.getShort());
+
+      direct.position(0);
+      ByteBufferUtils.putShort(direct, value);
+      assertEquals(2, direct.position());
+      ByteBufferUtils.putShort(direct, (short) (value + 1));
+      assertEquals(4, direct.position());
+
+      direct.position(0);
+      assertEquals(value, direct.getShort());
+      assertEquals((short) (value + 1), direct.getShort());
+    }
+  }
+
+  @Test
+  public void testConverterPutShortWithIndex() {
+    ByteBuffer heap = ByteBuffer.allocate(20);
+    ByteBuffer direct = ByteBuffer.allocateDirect(20);
+
+    short[] testValues =
+      { 0, 1, -1, Short.MIN_VALUE, Short.MAX_VALUE, 127, -128, 255, -255, 
1000, -1000 };
+
+    for (short value : testValues) {
+      heap.clear();
+      direct.clear();
+
+      heap.position(5);
+      int newIndex = ByteBufferUtils.putShort(heap, 0, value);
+      assertEquals(0 + Short.BYTES, newIndex);
+      assertEquals(5, heap.position());
+      assertEquals(value, heap.getShort(5));
+
+      direct.position(5);
+      newIndex = ByteBufferUtils.putShort(direct, 0, value);
+      assertEquals(0 + Short.BYTES, newIndex);
+      assertEquals(5, direct.position());
+      assertEquals(value, direct.getShort(5));
+    }
+  }
+
+  @Test
+  public void testConverterPutIntWithPosition() {
+    ByteBuffer heap = ByteBuffer.allocate(20);
+    ByteBuffer direct = ByteBuffer.allocateDirect(20);
+
+    int[] testValues = { 0, 1, -1, Integer.MIN_VALUE, Integer.MAX_VALUE, 127, 
-128, 65535, -65535,
+      1000000, -1000000 };
+
+    for (int value : testValues) {
+      heap.clear();
+      direct.clear();
+
+      heap.position(0);
+      ByteBufferUtils.putInt(heap, value);
+      assertEquals(4, heap.position());
+      ByteBufferUtils.putInt(heap, value + 1);
+      assertEquals(8, heap.position());
+
+      heap.position(0);
+      assertEquals(value, heap.getInt());
+      assertEquals(value + 1, heap.getInt());
+
+      direct.position(0);
+      ByteBufferUtils.putInt(direct, value);
+      assertEquals(4, direct.position());
+      ByteBufferUtils.putInt(direct, value + 1);
+      assertEquals(8, direct.position());
+
+      direct.position(0);
+      assertEquals(value, direct.getInt());
+      assertEquals(value + 1, direct.getInt());
+    }
+  }
+
+  @Test
+  public void testConverterPutIntWithIndex() {
+    ByteBuffer heap = ByteBuffer.allocate(20);
+    ByteBuffer direct = ByteBuffer.allocateDirect(20);
+
+    int[] testValues = { 0, 1, -1, Integer.MIN_VALUE, Integer.MAX_VALUE, 127, 
-128, 65535, -65535,
+      1000000, -1000000 };
+
+    for (int value : testValues) {
+      heap.clear();
+      direct.clear();
+
+      heap.position(5);
+      int newIndex = ByteBufferUtils.putInt(heap, 0, value);
+      assertEquals(0 + Integer.BYTES, newIndex);
+      assertEquals(5, heap.position());
+      assertEquals(value, heap.getInt(5));
+
+      direct.position(5);
+      newIndex = ByteBufferUtils.putInt(direct, 0, value);
+      assertEquals(0 + Integer.BYTES, newIndex);
+      assertEquals(5, direct.position());
+      assertEquals(value, direct.getInt(5));
+    }
+  }
+
+  @Test
+  public void testConverterPutLongWithPosition() {
+    ByteBuffer heap = ByteBuffer.allocate(32);
+    ByteBuffer direct = ByteBuffer.allocateDirect(32);
+
+    long[] testValues = { 0L, 1L, -1L, Long.MIN_VALUE, Long.MAX_VALUE, 127L, 
-128L, 65535L, -65535L,
+      Integer.MAX_VALUE + 1L, Integer.MIN_VALUE - 1L, 1000000000000L, 
-1000000000000L };
+
+    for (long value : testValues) {
+      heap.clear();
+      direct.clear();
+
+      heap.position(0);
+      ByteBufferUtils.putLong(heap, value);
+      assertEquals(8, heap.position());
+      ByteBufferUtils.putLong(heap, value + 1);
+      assertEquals(16, heap.position());
+
+      heap.position(0);
+      assertEquals(value, heap.getLong());
+      assertEquals(value + 1, heap.getLong());
+
+      direct.position(0);
+      ByteBufferUtils.putLong(direct, value);
+      assertEquals(8, direct.position());
+      ByteBufferUtils.putLong(direct, value + 1);
+      assertEquals(16, direct.position());
+
+      direct.position(0);
+      assertEquals(value, direct.getLong());
+      assertEquals(value + 1, direct.getLong());
+    }
+  }
+
+  @Test
+  public void testConverterPutLongWithIndex() {
+    ByteBuffer heap = ByteBuffer.allocate(32);
+    ByteBuffer direct = ByteBuffer.allocateDirect(32);
+
+    long[] testValues = { 0L, 1L, -1L, Long.MIN_VALUE, Long.MAX_VALUE, 127L, 
-128L, 65535L, -65535L,
+      Integer.MAX_VALUE + 1L, Integer.MIN_VALUE - 1L, 1000000000000L, 
-1000000000000L };
+
+    for (long value : testValues) {
+      heap.clear();
+      direct.clear();
+
+      heap.position(8);
+      int newIndex = ByteBufferUtils.putLong(heap, 0, value);
+      assertEquals(0 + Long.BYTES, newIndex);
+      assertEquals(8, heap.position());
+      assertEquals(value, heap.getLong(8));
+
+      direct.position(8);
+      newIndex = ByteBufferUtils.putLong(direct, 0, value);
+      assertEquals(0 + Long.BYTES, newIndex);
+      assertEquals(8, direct.position());
+      assertEquals(value, direct.getLong(8));
+    }
+  }
+
+  @Test
+  public void testConverterRoundTripShort() {
+    ByteBuffer heap = ByteBuffer.allocate(20);
+    ByteBuffer direct = ByteBuffer.allocateDirect(20);
+
+    short[] testValues =
+      { 0, 1, -1, Short.MIN_VALUE, Short.MAX_VALUE, 127, -128, 255, -255, 
1000, -1000 };
+
+    for (short value : testValues) {
+      heap.clear();
+      direct.clear();
+
+      heap.position(5);
+      ByteBufferUtils.putShort(heap, value);
+      assertEquals(value, ByteBufferUtils.toShort(heap, 5));
+
+      direct.position(5);
+      ByteBufferUtils.putShort(direct, value);
+      assertEquals(value, ByteBufferUtils.toShort(direct, 5));
+    }
+  }
+
+  @Test
+  public void testConverterRoundTripInt() {
+    ByteBuffer heap = ByteBuffer.allocate(20);
+    ByteBuffer direct = ByteBuffer.allocateDirect(20);
+
+    int[] testValues = { 0, 1, -1, Integer.MIN_VALUE, Integer.MAX_VALUE, 127, 
-128, 65535, -65535,
+      1000000, -1000000 };
+
+    for (int value : testValues) {
+      heap.clear();
+      direct.clear();
+
+      heap.position(5);
+      ByteBufferUtils.putInt(heap, value);
+      assertEquals(value, ByteBufferUtils.toInt(heap, 5));
+      heap.position(5);
+      assertEquals(value, ByteBufferUtils.toInt(heap));
+
+      direct.position(5);
+      ByteBufferUtils.putInt(direct, value);
+      assertEquals(value, ByteBufferUtils.toInt(direct, 5));
+      direct.position(5);
+      assertEquals(value, ByteBufferUtils.toInt(direct));
+    }
+  }
+
+  @Test
+  public void testConverterRoundTripLong() {
+    ByteBuffer heap = ByteBuffer.allocate(32);
+    ByteBuffer direct = ByteBuffer.allocateDirect(32);
+
+    long[] testValues = { 0L, 1L, -1L, Long.MIN_VALUE, Long.MAX_VALUE, 127L, 
-128L, 65535L, -65535L,
+      Integer.MAX_VALUE + 1L, Integer.MIN_VALUE - 1L, 1000000000000L, 
-1000000000000L };
+
+    for (long value : testValues) {
+      heap.clear();
+      direct.clear();
+
+      heap.position(8);
+      ByteBufferUtils.putLong(heap, value);
+      assertEquals(value, ByteBufferUtils.toLong(heap, 8));
+
+      direct.position(8);
+      ByteBufferUtils.putLong(direct, value);
+      assertEquals(value, ByteBufferUtils.toLong(direct, 8));
+    }
+  }
+
+  @Test
+  public void testConverterMultipleOperations() {
+    ByteBuffer heap = ByteBuffer.allocate(50);
+    ByteBuffer direct = ByteBuffer.allocateDirect(50);
+
+    for (ByteBuffer buffer : Arrays.asList(heap, direct)) {
+      buffer.clear();
+      buffer.position(0);
+
+      ByteBufferUtils.putShort(buffer, (short) 100);
+      ByteBufferUtils.putInt(buffer, 200000);
+      ByteBufferUtils.putLong(buffer, 3000000000L);
+      ByteBufferUtils.putShort(buffer, Short.MIN_VALUE);
+      ByteBufferUtils.putInt(buffer, Integer.MAX_VALUE);
+      ByteBufferUtils.putLong(buffer, Long.MIN_VALUE);
+
+      assertEquals(28, buffer.position());
+
+      assertEquals((short) 100, ByteBufferUtils.toShort(buffer, 0));
+      assertEquals(200000, ByteBufferUtils.toInt(buffer, 2));
+      assertEquals(3000000000L, ByteBufferUtils.toLong(buffer, 6));
+      assertEquals(Short.MIN_VALUE, ByteBufferUtils.toShort(buffer, 14));
+      assertEquals(Integer.MAX_VALUE, ByteBufferUtils.toInt(buffer, 16));
+      assertEquals(Long.MIN_VALUE, ByteBufferUtils.toLong(buffer, 20));
+
+      assertEquals((short) 100, ByteBufferUtils.toShort(buffer, 0));
+      buffer.position(2);
+      assertEquals(200000, ByteBufferUtils.toInt(buffer));
+      assertEquals(3000000000L, ByteBufferUtils.toLong(buffer, 6));
+    }
+  }
+
+  @Test
+  public void testConverterBigEndianByteOrder() {
+    ByteBuffer heap = ByteBuffer.allocate(20);
+    ByteBuffer direct = ByteBuffer.allocateDirect(20);
+
+    for (ByteBuffer buffer : Arrays.asList(heap, direct)) {
+      buffer.clear();
+
+      buffer.position(0);
+      ByteBufferUtils.putInt(buffer, 0x01020304);
+      assertEquals((byte) 0x01, buffer.get(0));
+      assertEquals((byte) 0x02, buffer.get(1));
+      assertEquals((byte) 0x03, buffer.get(2));
+      assertEquals((byte) 0x04, buffer.get(3));
+
+      buffer.position(0);
+      ByteBufferUtils.putShort(buffer, (short) 0x0102);
+      assertEquals((byte) 0x01, buffer.get(0));
+      assertEquals((byte) 0x02, buffer.get(1));
+
+      buffer.position(0);
+      ByteBufferUtils.putLong(buffer, 0x0102030405060708L);
+      assertEquals((byte) 0x01, buffer.get(0));
+      assertEquals((byte) 0x02, buffer.get(1));
+      assertEquals((byte) 0x03, buffer.get(2));
+      assertEquals((byte) 0x04, buffer.get(3));
+      assertEquals((byte) 0x05, buffer.get(4));
+      assertEquals((byte) 0x06, buffer.get(5));
+      assertEquals((byte) 0x07, buffer.get(6));
+      assertEquals((byte) 0x08, buffer.get(7));
+    }
+  }
+
+  @Test
+  public void testCommonPrefixerIdenticalSequences() {
+    ByteBuffer heap = ByteBuffer.allocate(100);
+    ByteBuffer direct = ByteBuffer.allocateDirect(100);
+    byte[] array = new byte[100];
+
+    for (int i = 0; i < 100; i++) {
+      heap.put(i, (byte) i);
+      direct.put(i, (byte) i);
+      array[i] = (byte) i;
+    }
+
+    assertEquals(100, ByteBufferUtils.findCommonPrefix(heap, 0, 100, heap, 0, 
100));
+    assertEquals(100, ByteBufferUtils.findCommonPrefix(direct, 0, 100, direct, 
0, 100));
+    assertEquals(100, ByteBufferUtils.findCommonPrefix(heap, 0, 100, direct, 
0, 100));
+    assertEquals(100, ByteBufferUtils.findCommonPrefix(heap, 0, 100, array, 0, 
100));
+    assertEquals(100, ByteBufferUtils.findCommonPrefix(direct, 0, 100, array, 
0, 100));
+  }
+
+  @Test
+  public void testCommonPrefixerEmptySequences() {
+    ByteBuffer heap = ByteBuffer.allocate(10);
+    ByteBuffer direct = ByteBuffer.allocateDirect(10);
+    byte[] array = new byte[10];
+
+    assertEquals(0, ByteBufferUtils.findCommonPrefix(heap, 0, 0, heap, 0, 0));
+    assertEquals(0, ByteBufferUtils.findCommonPrefix(direct, 0, 0, direct, 0, 
0));
+    assertEquals(0, ByteBufferUtils.findCommonPrefix(heap, 0, 0, array, 0, 0));
+    assertEquals(0, ByteBufferUtils.findCommonPrefix(direct, 0, 0, array, 0, 
0));
+  }
+
+  @Test
+  public void testCommonPrefixerDifferentLengths() {
+    ByteBuffer heap = ByteBuffer.allocate(100);
+    ByteBuffer direct = ByteBuffer.allocateDirect(100);
+    byte[] shortArray = new byte[50];
+    byte[] longArray = new byte[100];
+
+    for (int i = 0; i < 100; i++) {
+      byte value = (byte) i;
+      heap.put(i, value);
+      direct.put(i, value);
+      if (i < 50) {
+        shortArray[i] = value;
+      }
+      longArray[i] = value;
+    }
+
+    assertEquals(50, ByteBufferUtils.findCommonPrefix(heap, 0, 100, 
shortArray, 0, 50));
+    assertEquals(50, ByteBufferUtils.findCommonPrefix(direct, 0, 100, 
shortArray, 0, 50));
+    assertEquals(50, ByteBufferUtils.findCommonPrefix(heap, 0, 50, longArray, 
0, 100));
+    assertEquals(50, ByteBufferUtils.findCommonPrefix(direct, 0, 50, 
longArray, 0, 100));
+  }
+
+  @Test
+  public void testCommonPrefixerDifferenceAtStrideBoundaries() {
+    ByteBuffer heap1 = ByteBuffer.allocate(100);
+    ByteBuffer heap2 = ByteBuffer.allocate(100);
+    ByteBuffer direct1 = ByteBuffer.allocateDirect(100);
+    ByteBuffer direct2 = ByteBuffer.allocateDirect(100);
+
+    int[] boundaryPositions = { 0, 1, 7, 8, 9, 15, 16, 17, 23, 24, 31, 32 };
+
+    for (int diffPos : boundaryPositions) {
+      fillBB(heap1, (byte) 5);
+      fillBB(heap2, (byte) 5);
+      fillBB(direct1, (byte) 5);
+      fillBB(direct2, (byte) 5);
+
+      heap2.put(diffPos, (byte) 99);
+      direct2.put(diffPos, (byte) 99);
+
+      assertEquals(diffPos, ByteBufferUtils.findCommonPrefix(heap1, 0, 100, 
heap2, 0, 100));
+      assertEquals(diffPos, ByteBufferUtils.findCommonPrefix(direct1, 0, 100, 
direct2, 0, 100));
+      assertEquals(diffPos, ByteBufferUtils.findCommonPrefix(heap1, 0, 100, 
direct2, 0, 100));
+    }
+  }
+
+  @Test
+  public void testCommonPrefixerDifferenceAtStrideBoundariesWithArray() {
+    ByteBuffer heap = ByteBuffer.allocate(100);
+    ByteBuffer direct = ByteBuffer.allocateDirect(100);
+    byte[] array = new byte[100];
+
+    int[] boundaryPositions = { 0, 1, 7, 8, 9, 15, 16, 17, 23, 24, 31, 32 };
+
+    for (int diffPos : boundaryPositions) {
+      fillBB(heap, (byte) 5);
+      fillBB(direct, (byte) 5);
+      fillArray(array, (byte) 5);
+
+      array[diffPos] = (byte) 99;
+
+      assertEquals(diffPos, ByteBufferUtils.findCommonPrefix(heap, 0, 100, 
array, 0, 100));
+      assertEquals(diffPos, ByteBufferUtils.findCommonPrefix(direct, 0, 100, 
array, 0, 100));
+    }
+  }
+
+  @Test
+  public void testCommonPrefixerWithOffsets() {
+    ByteBuffer heap = ByteBuffer.allocate(120);
+    ByteBuffer direct = ByteBuffer.allocateDirect(120);
+    byte[] array = new byte[120];
+
+    for (int i = 0; i < 120; i++) {
+      heap.put(i, (byte) 42);
+      direct.put(i, (byte) 42);
+      array[i] = (byte) 42;
+    }
+
+    heap.put(70, (byte) 99);
+    direct.put(70, (byte) 99);
+    array[70] = (byte) 99;
+
+    assertEquals(50, ByteBufferUtils.findCommonPrefix(heap, 10, 60, heap, 20, 
60));
+    assertEquals(50, ByteBufferUtils.findCommonPrefix(direct, 10, 60, direct, 
20, 60));
+    assertEquals(50, ByteBufferUtils.findCommonPrefix(heap, 10, 60, array, 20, 
60));
+    assertEquals(50, ByteBufferUtils.findCommonPrefix(direct, 10, 60, array, 
20, 60));
+  }
+
+  @Test
+  public void testCommonPrefixerSingleByteDifferences() {
+    ByteBuffer heap1 = ByteBuffer.allocate(50);
+    ByteBuffer heap2 = ByteBuffer.allocate(50);
+    ByteBuffer direct1 = ByteBuffer.allocateDirect(50);
+    ByteBuffer direct2 = ByteBuffer.allocateDirect(50);
+    byte[] array = new byte[50];
+
+    for (int diffPos = 0; diffPos < 50; diffPos++) {
+      fillBB(heap1, (byte) 10);
+      fillBB(heap2, (byte) 10);
+      fillBB(direct1, (byte) 10);
+      fillBB(direct2, (byte) 10);
+      fillArray(array, (byte) 10);
+
+      heap2.put(diffPos, (byte) 20);
+      direct2.put(diffPos, (byte) 20);
+      array[diffPos] = (byte) 20;
+
+      assertEquals(diffPos, ByteBufferUtils.findCommonPrefix(heap1, 0, 50, 
heap2, 0, 50));
+      assertEquals(diffPos, ByteBufferUtils.findCommonPrefix(direct1, 0, 50, 
direct2, 0, 50));
+      assertEquals(diffPos, ByteBufferUtils.findCommonPrefix(heap1, 0, 50, 
direct2, 0, 50));
+      assertEquals(diffPos, ByteBufferUtils.findCommonPrefix(heap1, 0, 50, 
array, 0, 50));
+      assertEquals(diffPos, ByteBufferUtils.findCommonPrefix(direct1, 0, 50, 
array, 0, 50));
+    }
+  }
+
+  @Test
+  public void testCommonPrefixerEpilogueBytes() {
+    for (int length = 1; length <= 15; length++) {
+      ByteBuffer heap1 = ByteBuffer.allocate(length);
+      ByteBuffer heap2 = ByteBuffer.allocate(length);
+      ByteBuffer direct1 = ByteBuffer.allocateDirect(length);
+      ByteBuffer direct2 = ByteBuffer.allocateDirect(length);
+      byte[] array = new byte[length];
+
+      for (int i = 0; i < length; i++) {
+        heap1.put(i, (byte) i);
+        heap2.put(i, (byte) i);
+        direct1.put(i, (byte) i);
+        direct2.put(i, (byte) i);
+        array[i] = (byte) i;
+      }
+
+      assertEquals(length, ByteBufferUtils.findCommonPrefix(heap1, 0, length, 
heap2, 0, length));
+      assertEquals(length,
+        ByteBufferUtils.findCommonPrefix(direct1, 0, length, direct2, 0, 
length));
+      assertEquals(length, ByteBufferUtils.findCommonPrefix(heap1, 0, length, 
array, 0, length));
+      assertEquals(length, ByteBufferUtils.findCommonPrefix(direct1, 0, 
length, array, 0, length));
+    }
+  }
+
+  @Test
+  public void testCommonPrefixerEpilogueDifference() {
+    for (int length = 9; length <= 15; length++) {
+      for (int diffPos = 8; diffPos < length; diffPos++) {
+        ByteBuffer heap1 = ByteBuffer.allocate(length);
+        ByteBuffer heap2 = ByteBuffer.allocate(length);
+        ByteBuffer direct1 = ByteBuffer.allocateDirect(length);
+        ByteBuffer direct2 = ByteBuffer.allocateDirect(length);
+        byte[] array = new byte[length];
+
+        for (int i = 0; i < length; i++) {
+          heap1.put(i, (byte) 5);
+          heap2.put(i, (byte) 5);
+          direct1.put(i, (byte) 5);
+          direct2.put(i, (byte) 5);
+          array[i] = (byte) 5;
+        }
+
+        heap2.put(diffPos, (byte) 99);
+        direct2.put(diffPos, (byte) 99);
+        array[diffPos] = (byte) 99;
+
+        assertEquals(diffPos, ByteBufferUtils.findCommonPrefix(heap1, 0, 
length, heap2, 0, length));
+        assertEquals(diffPos,
+          ByteBufferUtils.findCommonPrefix(direct1, 0, length, direct2, 0, 
length));
+        assertEquals(diffPos, ByteBufferUtils.findCommonPrefix(heap1, 0, 
length, array, 0, length));
+        assertEquals(diffPos,
+          ByteBufferUtils.findCommonPrefix(direct1, 0, length, array, 0, 
length));
+      }
+    }
+  }
+
+  @Test
+  public void testCommonPrefixerExtremeValues() {
+    ByteBuffer heap1 = ByteBuffer.allocate(50);
+    ByteBuffer heap2 = ByteBuffer.allocate(50);
+    ByteBuffer direct1 = ByteBuffer.allocateDirect(50);
+    ByteBuffer direct2 = ByteBuffer.allocateDirect(50);
+    byte[] array = new byte[50];
+
+    byte[] extremeValues = { Byte.MIN_VALUE, Byte.MAX_VALUE, 0, -1, 1, 127, 
-128, (byte) 0xFF };
+
+    for (byte value : extremeValues) {
+      fillBB(heap1, value);
+      fillBB(heap2, value);
+      fillBB(direct1, value);
+      fillBB(direct2, value);
+      fillArray(array, value);
+
+      assertEquals(50, ByteBufferUtils.findCommonPrefix(heap1, 0, 50, heap2, 
0, 50));
+      assertEquals(50, ByteBufferUtils.findCommonPrefix(direct1, 0, 50, 
direct2, 0, 50));
+      assertEquals(50, ByteBufferUtils.findCommonPrefix(heap1, 0, 50, direct2, 
0, 50));
+      assertEquals(50, ByteBufferUtils.findCommonPrefix(heap1, 0, 50, array, 
0, 50));
+      assertEquals(50, ByteBufferUtils.findCommonPrefix(direct1, 0, 50, array, 
0, 50));
+    }
+  }
+
+  @Test
+  public void testCommonPrefixerMultipleStrides() {
+    ByteBuffer heap1 = ByteBuffer.allocate(100);
+    ByteBuffer heap2 = ByteBuffer.allocate(100);
+    ByteBuffer direct1 = ByteBuffer.allocateDirect(100);
+    ByteBuffer direct2 = ByteBuffer.allocateDirect(100);
+    byte[] array = new byte[100];
+
+    for (int i = 0; i < 100; i++) {
+      heap1.put(i, (byte) (i % 10));
+      heap2.put(i, (byte) (i % 10));
+      direct1.put(i, (byte) (i % 10));
+      direct2.put(i, (byte) (i % 10));
+      array[i] = (byte) (i % 10);
+    }
+
+    int[] diffPositions = { 72, 73, 74, 75, 76, 77, 78, 79, 80 };
+
+    for (int diffPos : diffPositions) {
+      heap2.put(diffPos, (byte) 99);
+      direct2.put(diffPos, (byte) 99);
+      array[diffPos] = (byte) 99;
+
+      assertEquals(diffPos, ByteBufferUtils.findCommonPrefix(heap1, 0, 100, 
heap2, 0, 100));
+      assertEquals(diffPos, ByteBufferUtils.findCommonPrefix(direct1, 0, 100, 
direct2, 0, 100));
+      assertEquals(diffPos, ByteBufferUtils.findCommonPrefix(heap1, 0, 100, 
array, 0, 100));
+      assertEquals(diffPos, ByteBufferUtils.findCommonPrefix(direct1, 0, 100, 
array, 0, 100));
+
+      heap2.put(diffPos, (byte) (diffPos % 10));
+      direct2.put(diffPos, (byte) (diffPos % 10));
+      array[diffPos] = (byte) (diffPos % 10);
+    }
+  }
+
+  @Test
+  public void testCommonPrefixerHeapVsDirectConsistency() {
+    ByteBuffer heap1 = ByteBuffer.allocate(100);
+    ByteBuffer heap2 = ByteBuffer.allocate(100);
+    ByteBuffer direct1 = ByteBuffer.allocateDirect(100);
+    ByteBuffer direct2 = ByteBuffer.allocateDirect(100);
+
+    for (int i = 0; i < 100; i++) {
+      byte value = (byte) (i * 3 % 256);
+      heap1.put(i, value);
+      heap2.put(i, value);
+      direct1.put(i, value);
+      direct2.put(i, value);
+    }
+
+    heap2.put(42, (byte) 0);
+    direct2.put(42, (byte) 0);
+
+    int heapResult = ByteBufferUtils.findCommonPrefix(heap1, 0, 100, heap2, 0, 
100);
+    int directResult = ByteBufferUtils.findCommonPrefix(direct1, 0, 100, 
direct2, 0, 100);
+    int mixedResult1 = ByteBufferUtils.findCommonPrefix(heap1, 0, 100, 
direct2, 0, 100);
+    int mixedResult2 = ByteBufferUtils.findCommonPrefix(direct1, 0, 100, 
heap2, 0, 100);
+
+    assertEquals(heapResult, directResult);
+    assertEquals(heapResult, mixedResult1);
+    assertEquals(heapResult, mixedResult2);
+    assertEquals(42, heapResult);
+  }
+
+  @Test
+  public void testCommonPrefixerPartialMatches() {
+    ByteBuffer heap = ByteBuffer.allocate(64);
+    ByteBuffer direct = ByteBuffer.allocateDirect(64);
+    byte[] array = new byte[64];
+
+    for (int i = 0; i < 32; i++) {
+      heap.put(i, (byte) i);
+      direct.put(i, (byte) i);
+      array[i] = (byte) i;
+    }
+    for (int i = 32; i < 64; i++) {
+      heap.put(i, (byte) (i + 100));
+      direct.put(i, (byte) (i + 100));
+      array[i] = (byte) (i + 100);
+    }
+
+    ByteBuffer heap2 = ByteBuffer.allocate(64);
+    ByteBuffer direct2 = ByteBuffer.allocateDirect(64);
+    byte[] array2 = new byte[64];
+
+    for (int i = 0; i < 32; i++) {
+      heap2.put(i, (byte) i);
+      direct2.put(i, (byte) i);
+      array2[i] = (byte) i;
+    }
+    for (int i = 32; i < 64; i++) {
+      heap2.put(i, (byte) (i + 200));
+      direct2.put(i, (byte) (i + 200));
+      array2[i] = (byte) (i + 200);
+    }
+
+    assertEquals(32, ByteBufferUtils.findCommonPrefix(heap, 0, 64, heap2, 0, 
64));
+    assertEquals(32, ByteBufferUtils.findCommonPrefix(direct, 0, 64, direct2, 
0, 64));
+    assertEquals(32, ByteBufferUtils.findCommonPrefix(heap, 0, 64, array2, 0, 
64));
+    assertEquals(32, ByteBufferUtils.findCommonPrefix(direct, 0, 64, array2, 
0, 64));
+  }
+
   // Below are utility methods invoked from test methods
   private static void testCompressedInt(int value) throws IOException {
     ByteArrayOutputStream bos = new ByteArrayOutputStream();
diff --git 
a/hbase-common/src/test/java/org/apache/hadoop/hbase/util/BytesTestBase.java 
b/hbase-common/src/test/java/org/apache/hadoop/hbase/util/BytesTestBase.java
index 96df8bc3939..098a5e95916 100644
--- a/hbase-common/src/test/java/org/apache/hadoop/hbase/util/BytesTestBase.java
+++ b/hbase-common/src/test/java/org/apache/hadoop/hbase/util/BytesTestBase.java
@@ -575,4 +575,345 @@ public class BytesTestBase {
     assertEquals(2, Bytes.findCommonPrefix(hellohello, hellohellohi, 
hellohello.length,
       hellohellohi.length, 0, 0));
   }
+
+  @Test
+  public void testConverterToShort() {
+    byte[] array = new byte[20];
+
+    short[] testValues =
+      { 0, 1, -1, Short.MIN_VALUE, Short.MAX_VALUE, 127, -128, 255, -255, 
1000, -1000 };
+
+    for (short value : testValues) {
+      Arrays.fill(array, (byte) 0);
+
+      Bytes.putShort(array, 0, value);
+      Bytes.putShort(array, 5, value);
+      Bytes.putShort(array, 10, value);
+
+      assertEquals(value, Bytes.toShort(array, 0));
+      assertEquals(value, Bytes.toShort(array, 5));
+      assertEquals(value, Bytes.toShort(array, 10));
+    }
+  }
+
+  @Test
+  public void testConverterToInt() {
+    byte[] array = new byte[20];
+
+    int[] testValues = { 0, 1, -1, Integer.MIN_VALUE, Integer.MAX_VALUE, 127, 
-128, 65535, -65535,
+      1000000, -1000000 };
+
+    for (int value : testValues) {
+      Arrays.fill(array, (byte) 0);
+
+      Bytes.putInt(array, 0, value);
+      Bytes.putInt(array, 4, value);
+      Bytes.putInt(array, 12, value);
+
+      assertEquals(value, Bytes.toInt(array, 0));
+      assertEquals(value, Bytes.toInt(array, 4));
+      assertEquals(value, Bytes.toInt(array, 12));
+    }
+  }
+
+  @Test
+  public void testConverterToLong() {
+    byte[] array = new byte[32];
+
+    long[] testValues = { 0L, 1L, -1L, Long.MIN_VALUE, Long.MAX_VALUE, 127L, 
-128L, 65535L, -65535L,
+      Integer.MAX_VALUE + 1L, Integer.MIN_VALUE - 1L, 1000000000000L, 
-1000000000000L };
+
+    for (long value : testValues) {
+      Arrays.fill(array, (byte) 0);
+
+      Bytes.putLong(array, 0, value);
+      Bytes.putLong(array, 8, value);
+      Bytes.putLong(array, 16, value);
+
+      assertEquals(value, Bytes.toLong(array, 0));
+      assertEquals(value, Bytes.toLong(array, 8));
+      assertEquals(value, Bytes.toLong(array, 16));
+    }
+  }
+
+  @Test
+  public void testConverterPutShort() {
+    byte[] array = new byte[20];
+
+    short[] testValues =
+      { 0, 1, -1, Short.MIN_VALUE, Short.MAX_VALUE, 127, -128, 255, -255, 
1000, -1000 };
+
+    for (short value : testValues) {
+      Arrays.fill(array, (byte) 0);
+
+      int newIndex = Bytes.putShort(array, 0, value);
+      assertEquals(Short.BYTES, newIndex);
+      assertEquals(value, Bytes.toShort(array, 0));
+
+      newIndex = Bytes.putShort(array, 5, (short) (value + 1));
+      assertEquals(5 + Short.BYTES, newIndex);
+      assertEquals((short) (value + 1), Bytes.toShort(array, 5));
+    }
+  }
+
+  @Test
+  public void testConverterPutInt() {
+    byte[] array = new byte[20];
+
+    int[] testValues = { 0, 1, -1, Integer.MIN_VALUE, Integer.MAX_VALUE, 127, 
-128, 65535, -65535,
+      1000000, -1000000 };
+
+    for (int value : testValues) {
+      Arrays.fill(array, (byte) 0);
+
+      int newIndex = Bytes.putInt(array, 0, value);
+      assertEquals(Integer.BYTES, newIndex);
+      assertEquals(value, Bytes.toInt(array, 0));
+
+      newIndex = Bytes.putInt(array, 5, value + 1);
+      assertEquals(5 + Integer.BYTES, newIndex);
+      assertEquals(value + 1, Bytes.toInt(array, 5));
+    }
+  }
+
+  @Test
+  public void testConverterPutLong() {
+    byte[] array = new byte[32];
+
+    long[] testValues = { 0L, 1L, -1L, Long.MIN_VALUE, Long.MAX_VALUE, 127L, 
-128L, 65535L, -65535L,
+      Integer.MAX_VALUE + 1L, Integer.MIN_VALUE - 1L, 1000000000000L, 
-1000000000000L };
+
+    for (long value : testValues) {
+      Arrays.fill(array, (byte) 0);
+
+      int newIndex = Bytes.putLong(array, 0, value);
+      assertEquals(Long.BYTES, newIndex);
+      assertEquals(value, Bytes.toLong(array, 0));
+
+      newIndex = Bytes.putLong(array, 8, value + 1);
+      assertEquals(8 + Long.BYTES, newIndex);
+      assertEquals(value + 1, Bytes.toLong(array, 8));
+    }
+  }
+
+  @Test
+  public void testConverterRoundTrip() {
+    byte[] array = new byte[50];
+
+    short shortValue = 12345;
+    int intValue = 200000;
+    long longValue = 3000000000L;
+
+    int offset = 0;
+    offset = Bytes.putShort(array, offset, shortValue);
+    offset = Bytes.putInt(array, offset, intValue);
+    offset = Bytes.putLong(array, offset, longValue);
+    offset = Bytes.putShort(array, offset, Short.MIN_VALUE);
+    offset = Bytes.putInt(array, offset, Integer.MAX_VALUE);
+    offset = Bytes.putLong(array, offset, Long.MIN_VALUE);
+
+    assertEquals(28, offset);
+
+    assertEquals(shortValue, Bytes.toShort(array, 0));
+    assertEquals(intValue, Bytes.toInt(array, 2));
+    assertEquals(longValue, Bytes.toLong(array, 6));
+    assertEquals(Short.MIN_VALUE, Bytes.toShort(array, 14));
+    assertEquals(Integer.MAX_VALUE, Bytes.toInt(array, 16));
+    assertEquals(Long.MIN_VALUE, Bytes.toLong(array, 20));
+  }
+
+  @Test
+  public void testConverterBigEndianByteOrder() {
+    byte[] array = new byte[20];
+
+    Bytes.putInt(array, 0, 0x01020304);
+    assertEquals((byte) 0x01, array[0]);
+    assertEquals((byte) 0x02, array[1]);
+    assertEquals((byte) 0x03, array[2]);
+    assertEquals((byte) 0x04, array[3]);
+
+    Bytes.putShort(array, 0, (short) 0x0102);
+    assertEquals((byte) 0x01, array[0]);
+    assertEquals((byte) 0x02, array[1]);
+
+    Bytes.putLong(array, 0, 0x0102030405060708L);
+    assertEquals((byte) 0x01, array[0]);
+    assertEquals((byte) 0x02, array[1]);
+    assertEquals((byte) 0x03, array[2]);
+    assertEquals((byte) 0x04, array[3]);
+    assertEquals((byte) 0x05, array[4]);
+    assertEquals((byte) 0x06, array[5]);
+    assertEquals((byte) 0x07, array[6]);
+    assertEquals((byte) 0x08, array[7]);
+  }
+
+  @Test
+  public void testCommonPrefixerIdenticalSequences() {
+    byte[] array1 = new byte[100];
+    byte[] array2 = new byte[100];
+
+    for (int i = 0; i < 100; i++) {
+      array1[i] = (byte) i;
+      array2[i] = (byte) i;
+    }
+
+    assertEquals(100, Bytes.findCommonPrefix(array1, array2, array1.length, 
array2.length, 0, 0));
+  }
+
+  @Test
+  public void testCommonPrefixerEmptySequences() {
+    byte[] array1 = new byte[10];
+    byte[] array2 = new byte[10];
+
+    assertEquals(0, Bytes.findCommonPrefix(array1, array2, 0, 0, 0, 0));
+  }
+
+  @Test
+  public void testCommonPrefixerDifferentLengths() {
+    byte[] shortArray = new byte[50];
+    byte[] longArray = new byte[100];
+
+    for (int i = 0; i < 100; i++) {
+      byte value = (byte) i;
+      if (i < 50) {
+        shortArray[i] = value;
+      }
+      longArray[i] = value;
+    }
+
+    assertEquals(50, Bytes.findCommonPrefix(longArray, shortArray, 100, 50, 0, 
0));
+    assertEquals(50, Bytes.findCommonPrefix(shortArray, longArray, 50, 100, 0, 
0));
+  }
+
+  @Test
+  public void testCommonPrefixerDifferenceAtStrideBoundaries() {
+    byte[] array1 = new byte[100];
+    byte[] array2 = new byte[100];
+
+    int[] boundaryPositions = { 0, 1, 7, 8, 9, 15, 16, 17, 23, 24, 31, 32 };
+
+    for (int diffPos : boundaryPositions) {
+      Arrays.fill(array1, (byte) 5);
+      Arrays.fill(array2, (byte) 5);
+
+      array2[diffPos] = (byte) 99;
+
+      assertEquals(diffPos, Bytes.findCommonPrefix(array1, array2, 100, 100, 
0, 0));
+    }
+  }
+
+  @Test
+  public void testCommonPrefixerWithOffsets() {
+    byte[] array = new byte[120];
+
+    for (int i = 0; i < 120; i++) {
+      array[i] = (byte) 42;
+    }
+
+    array[70] = (byte) 99;
+
+    assertEquals(50, Bytes.findCommonPrefix(array, array, 60, 60, 10, 20));
+  }
+
+  @Test
+  public void testCommonPrefixerSingleByteDifferences() {
+    byte[] array1 = new byte[50];
+    byte[] array2 = new byte[50];
+
+    for (int diffPos = 0; diffPos < 50; diffPos++) {
+      Arrays.fill(array1, (byte) 10);
+      Arrays.fill(array2, (byte) 10);
+
+      array2[diffPos] = (byte) 20;
+
+      assertEquals(diffPos, Bytes.findCommonPrefix(array1, array2, 50, 50, 0, 
0));
+    }
+  }
+
+  @Test
+  public void testCommonPrefixerEpilogueBytes() {
+    for (int length = 1; length <= 15; length++) {
+      byte[] array1 = new byte[length];
+      byte[] array2 = new byte[length];
+
+      for (int i = 0; i < length; i++) {
+        array1[i] = (byte) i;
+        array2[i] = (byte) i;
+      }
+
+      assertEquals(length, Bytes.findCommonPrefix(array1, array2, length, 
length, 0, 0));
+    }
+  }
+
+  @Test
+  public void testCommonPrefixerEpilogueDifference() {
+    for (int length = 9; length <= 15; length++) {
+      for (int diffPos = 8; diffPos < length; diffPos++) {
+        byte[] array1 = new byte[length];
+        byte[] array2 = new byte[length];
+
+        for (int i = 0; i < length; i++) {
+          array1[i] = (byte) 5;
+          array2[i] = (byte) 5;
+        }
+
+        array2[diffPos] = (byte) 99;
+
+        assertEquals(diffPos, Bytes.findCommonPrefix(array1, array2, length, 
length, 0, 0));
+      }
+    }
+  }
+
+  @Test
+  public void testCommonPrefixerExtremeValues() {
+    byte[] array1 = new byte[50];
+    byte[] array2 = new byte[50];
+
+    byte[] extremeValues = { Byte.MIN_VALUE, Byte.MAX_VALUE, 0, -1, 1, 127, 
-128, (byte) 0xFF };
+
+    for (byte value : extremeValues) {
+      Arrays.fill(array1, value);
+      Arrays.fill(array2, value);
+
+      assertEquals(50, Bytes.findCommonPrefix(array1, array2, 50, 50, 0, 0));
+    }
+  }
+
+  @Test
+  public void testCommonPrefixerMultipleStrides() {
+    byte[] array1 = new byte[100];
+    byte[] array2 = new byte[100];
+
+    for (int i = 0; i < 100; i++) {
+      array1[i] = (byte) (i % 10);
+      array2[i] = (byte) (i % 10);
+    }
+
+    int[] diffPositions = { 72, 73, 74, 75, 76, 77, 78, 79, 80 };
+
+    for (int diffPos : diffPositions) {
+      array2[diffPos] = (byte) 99;
+
+      assertEquals(diffPos, Bytes.findCommonPrefix(array1, array2, 100, 100, 
0, 0));
+
+      array2[diffPos] = (byte) (diffPos % 10);
+    }
+  }
+
+  @Test
+  public void testCommonPrefixerPartialMatches() {
+    byte[] array1 = new byte[64];
+    byte[] array2 = new byte[64];
+
+    for (int i = 0; i < 32; i++) {
+      array1[i] = (byte) i;
+      array2[i] = (byte) i;
+    }
+    for (int i = 32; i < 64; i++) {
+      array1[i] = (byte) (i + 100);
+      array2[i] = (byte) (i + 200);
+    }
+
+    assertEquals(32, Bytes.findCommonPrefix(array1, array2, 64, 64, 0, 0));
+  }
 }
diff --git 
a/hbase-common/src/test/java/org/apache/hadoop/hbase/util/TestByteBufferUtilsWoUnsafe.java
 
b/hbase-common/src/test/java/org/apache/hadoop/hbase/util/TestByteBufferUtilsWoUnsafe.java
index b77cfe55dee..c937f0503bb 100644
--- 
a/hbase-common/src/test/java/org/apache/hadoop/hbase/util/TestByteBufferUtilsWoUnsafe.java
+++ 
b/hbase-common/src/test/java/org/apache/hadoop/hbase/util/TestByteBufferUtilsWoUnsafe.java
@@ -36,7 +36,6 @@ public class TestByteBufferUtilsWoUnsafe extends 
ByteBufferUtilsTestBase {
       mocked.when(HBasePlatformDependent::isUnsafeAvailable).thenReturn(false);
       mocked.when(HBasePlatformDependent::unaligned).thenReturn(false);
       assertFalse(ByteBufferUtils.UNSAFE_AVAIL);
-      assertFalse(ByteBufferUtils.UNSAFE_UNALIGNED);
     }
   }
 
diff --git 
a/hbase-common/src/test/java/org/apache/hadoop/hbase/util/TestBytesWoUnsafe.java
 
b/hbase-common/src/test/java/org/apache/hadoop/hbase/util/TestBytesWoUnsafe.java
deleted file mode 100644
index 8aacab4b851..00000000000
--- 
a/hbase-common/src/test/java/org/apache/hadoop/hbase/util/TestBytesWoUnsafe.java
+++ /dev/null
@@ -1,41 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.hadoop.hbase.util;
-
-import static org.junit.jupiter.api.Assertions.assertFalse;
-import static org.mockito.Mockito.mockStatic;
-
-import org.apache.hadoop.hbase.testclassification.MiscTests;
-import org.apache.hadoop.hbase.testclassification.SmallTests;
-import org.apache.hadoop.hbase.unsafe.HBasePlatformDependent;
-import org.junit.jupiter.api.BeforeAll;
-import org.junit.jupiter.api.Tag;
-import org.mockito.MockedStatic;
-
-@Tag(MiscTests.TAG)
-@Tag(SmallTests.TAG)
-public class TestBytesWoUnsafe extends BytesTestBase {
-
-  @BeforeAll
-  public static void disableUnsafe() {
-    try (MockedStatic<HBasePlatformDependent> mocked = 
mockStatic(HBasePlatformDependent.class)) {
-      mocked.when(HBasePlatformDependent::unaligned).thenReturn(false);
-      assertFalse(Bytes.UNSAFE_UNALIGNED);
-    }
-  }
-}
diff --git 
a/hbase-server/src/test/java/org/apache/hadoop/hbase/util/TestFromClientSide3WoUnsafe.java
 
b/hbase-server/src/test/java/org/apache/hadoop/hbase/util/TestFromClientSide3WoUnsafe.java
index 70b2f3d5f29..7014f56b2ea 100644
--- 
a/hbase-server/src/test/java/org/apache/hadoop/hbase/util/TestFromClientSide3WoUnsafe.java
+++ 
b/hbase-server/src/test/java/org/apache/hadoop/hbase/util/TestFromClientSide3WoUnsafe.java
@@ -50,7 +50,6 @@ public class TestFromClientSide3WoUnsafe extends 
FromClientSideTest3 {
       mocked.when(HBasePlatformDependent::isUnsafeAvailable).thenReturn(false);
       mocked.when(HBasePlatformDependent::unaligned).thenReturn(false);
       assertFalse(ByteBufferUtils.UNSAFE_AVAIL);
-      assertFalse(ByteBufferUtils.UNSAFE_UNALIGNED);
     }
     startCluster(MultiRowMutationEndpoint.class);
   }

Reply via email to