This is an automated email from the ASF dual-hosted git repository.
fanningpj pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/pekko.git
The following commit(s) were added to refs/heads/main by this push:
new f193ec111e move readLong, readInt, readShort from ByteStringParser to
ByteString so they can be optimized (#2847)
f193ec111e is described below
commit f193ec111ee580b73cc1bae6574afd659c1583dc
Author: PJ Fanning <[email protected]>
AuthorDate: Wed Apr 8 13:18:08 2026 +0200
move readLong, readInt, readShort from ByteStringParser to ByteString so
they can be optimized (#2847)
* Add bounds checking and @throws Javadoc to ByteString read methods
Agent-Logs-Url:
https://github.com/pjfanning/incubator-pekko/sessions/0eb96aa0-0ec3-47d6-a26f-732a0367640e
Co-authored-by: pjfanning <[email protected]>
* Add bounds checking, @throws Javadoc, and bounds tests to ByteString read
methods
Agent-Logs-Url:
https://github.com/pjfanning/incubator-pekko/sessions/0eb96aa0-0ec3-47d6-a26f-732a0367640e
Co-authored-by: pjfanning <[email protected]>
* Add explicit Short casts to SWARUtil VarHandle getShort calls
Agent-Logs-Url:
https://github.com/pjfanning/incubator-pekko/sessions/0eb96aa0-0ec3-47d6-a26f-732a0367640e
Co-authored-by: pjfanning <[email protected]>
* Add unchecked read methods, remove semicolons, document VarHandle JDK
usage in SWARUtil
Agent-Logs-Url:
https://github.com/pjfanning/incubator-pekko/sessions/6c70b3dc-cb53-4f8c-a1e3-692d6e6265c7
Co-authored-by: pjfanning <[email protected]>
* Convert {at}code tags to backticks in SWARUtil Scaladoc
Agent-Logs-Url:
https://github.com/pjfanning/incubator-pekko/sessions/6c70b3dc-cb53-4f8c-a1e3-692d6e6265c7
Co-authored-by: pjfanning <[email protected]>
* Override ByteArrayIterator getShort/getInt/getLong to use SWARUtil
VarHandle reads
Agent-Logs-Url:
https://github.com/pjfanning/incubator-pekko/sessions/db26e214-8a2d-41ef-8720-86247c45c74c
Co-authored-by: pjfanning <[email protected]>
* refactor swarutil to make ByteOrder param explicit
* scaladoc
* javafmt
* Create ByteStringParser_readNum_Benchmark.scala
* Update ByteStringParser_readNum_Benchmark.scala
* Update ByteStringParser_readNum_Benchmark.scala
---------
Co-authored-by: copilot-swe-agent[bot]
<[email protected]>
Co-authored-by: pjfanning <[email protected]>
---
.../org/apache/pekko/util/ByteStringSpec.scala | 130 +++++++++++++++
.../scala/org/apache/pekko/util/SWARUtilSpec.scala | 32 ++--
.../org/apache/pekko/util/ByteIterator.scala | 21 +++
.../org/apache/pekko/util/ByteIterator.scala | 21 +++
.../scala/org/apache/pekko/util/ByteString.scala | 185 ++++++++++++++++++++-
.../scala/org/apache/pekko/util/SWARUtil.scala | 138 ++++++++++-----
.../util/ByteStringParser_readNum_Benchmark.scala | 86 ++++++++++
.../serialization/SnapshotSerializer.scala | 5 +-
.../remote/artery/compress/CountMinSketch.java | 3 +-
.../pekko/stream/impl/io/ByteStringParser.scala | 42 ++++-
10 files changed, 601 insertions(+), 62 deletions(-)
diff --git
a/actor-tests/src/test/scala/org/apache/pekko/util/ByteStringSpec.scala
b/actor-tests/src/test/scala/org/apache/pekko/util/ByteStringSpec.scala
index e98e6016dd..1fb4102878 100644
--- a/actor-tests/src/test/scala/org/apache/pekko/util/ByteStringSpec.scala
+++ b/actor-tests/src/test/scala/org/apache/pekko/util/ByteStringSpec.scala
@@ -1397,6 +1397,136 @@ class ByteStringSpec extends AnyWordSpec with Matchers
with Checkers {
val combinedBytes = combinedBs.toArrayUnsafe()
combinedBytes should ===(bytes ++ bytes)
}
+
+ "read short values" in {
+ val data = Array[Byte](1, 2, 3, 4)
+ val byteString1C = ByteString1C(data)
+ byteString1C.readShortBE(0) should ===(0x0102.toShort)
+ byteString1C.readShortLE(0) should ===(0x0201.toShort)
+ byteString1C.readShortBE(2) should ===(0x0304.toShort)
+ byteString1C.readShortLE(2) should ===(0x0403.toShort)
+
+ val arr = Array[Byte](0, 1, 2, 3, 4, 5)
+ val byteString1 = ByteString1(arr, 2, 4)
+ byteString1.readShortBE(0) should ===(0x0203.toShort)
+ byteString1.readShortLE(0) should ===(0x0302.toShort)
+ byteString1.readShortBE(2) should ===(0x0405.toShort)
+ byteString1.readShortLE(2) should ===(0x0504.toShort)
+
+ val byteStrings = ByteStrings(ByteString1.fromString("ab"),
ByteString1.fromString("cd"))
+ byteStrings.readShortBE(0) should ===(0x6162.toShort)
+ byteStrings.readShortLE(0) should ===(0x6261.toShort)
+ byteStrings.readShortBE(2) should ===(0x6364.toShort)
+ byteStrings.readShortLE(2) should ===(0x6463.toShort)
+ }
+
+ "read int values" in {
+ val data = Array[Byte](1, 2, 3, 4, 5, 6, 7, 8)
+ val byteString1C = ByteString1C(data)
+ byteString1C.readIntBE(0) should ===(0x01020304)
+ byteString1C.readIntLE(0) should ===(0x04030201)
+ byteString1C.readIntBE(4) should ===(0x05060708)
+ byteString1C.readIntLE(4) should ===(0x08070605)
+
+ val arr = Array[Byte](0, 1, 2, 3, 4, 5, 6, 7, 8, 9)
+ val byteString1 = ByteString1(arr, 1, 8)
+ byteString1.readIntBE(0) should ===(0x01020304)
+ byteString1.readIntLE(0) should ===(0x04030201)
+ byteString1.readIntBE(4) should ===(0x05060708)
+ byteString1.readIntLE(4) should ===(0x08070605)
+
+ val byteStrings = ByteStrings(
+ ByteString1(Array[Byte](1, 2), 0, 2),
+ ByteString1(Array[Byte](3, 4, 5, 6, 7, 8), 0, 6))
+ byteStrings.readIntBE(0) should ===(0x01020304)
+ byteStrings.readIntLE(0) should ===(0x04030201)
+ byteStrings.readIntBE(4) should ===(0x05060708)
+ byteStrings.readIntLE(4) should ===(0x08070605)
+ }
+
+ "read long values" in {
+ val data = Array[Byte](1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 11, 12, 13, 14,
15, 16)
+ val byteString1C = ByteString1C(data)
+ byteString1C.readLongBE(0) should ===(0x0102030405060708L)
+ byteString1C.readLongLE(0) should ===(0x0807060504030201L)
+ byteString1C.readLongBE(8) should ===(0x090A0B0C0D0E0F10L)
+ byteString1C.readLongLE(8) should ===(0x100F0E0D0C0B0A09L)
+
+ val arr = Array[Byte](0, 1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 11, 12, 13, 14,
15, 16, 17)
+ val byteString1 = ByteString1(arr, 1, 16)
+ byteString1.readLongBE(0) should ===(0x0102030405060708L)
+ byteString1.readLongLE(0) should ===(0x0807060504030201L)
+ byteString1.readLongBE(8) should ===(0x090A0B0C0D0E0F10L)
+ byteString1.readLongLE(8) should ===(0x100F0E0D0C0B0A09L)
+
+ val byteStrings = ByteStrings(
+ ByteString1(Array[Byte](1, 2, 3, 4), 0, 4),
+ ByteString1(Array[Byte](5, 6, 7, 8, 9, 10, 11, 12, 13, 14, 15, 16), 0,
12))
+ byteStrings.readLongBE(0) should ===(0x0102030405060708L)
+ byteStrings.readLongLE(0) should ===(0x0807060504030201L)
+ byteStrings.readLongBE(8) should ===(0x090A0B0C0D0E0F10L)
+ byteStrings.readLongLE(8) should ===(0x100F0E0D0C0B0A09L)
+ }
+
+ "throw IndexOutOfBoundsException for readShortBE/LE with insufficient
data" in {
+ val bs1C = ByteString1C(Array[Byte](1))
+ an[IndexOutOfBoundsException] should be thrownBy bs1C.readShortBE(0)
+ an[IndexOutOfBoundsException] should be thrownBy bs1C.readShortLE(0)
+ an[IndexOutOfBoundsException] should be thrownBy bs1C.readShortBE(-1)
+ an[IndexOutOfBoundsException] should be thrownBy bs1C.readShortLE(-1)
+
+ val bs1 = ByteString1(Array[Byte](0, 1, 2), 1, 1)
+ an[IndexOutOfBoundsException] should be thrownBy bs1.readShortBE(0)
+ an[IndexOutOfBoundsException] should be thrownBy bs1.readShortLE(0)
+ an[IndexOutOfBoundsException] should be thrownBy bs1.readShortBE(-1)
+ an[IndexOutOfBoundsException] should be thrownBy bs1.readShortLE(-1)
+
+ val bss = ByteStrings(ByteString1.fromString("a"),
ByteString1.fromString("b"))
+ an[IndexOutOfBoundsException] should be thrownBy bss.readShortBE(1)
+ an[IndexOutOfBoundsException] should be thrownBy bss.readShortLE(1)
+ an[IndexOutOfBoundsException] should be thrownBy bss.readShortBE(-1)
+ an[IndexOutOfBoundsException] should be thrownBy bss.readShortLE(-1)
+ }
+
+ "throw IndexOutOfBoundsException for readIntBE/LE with insufficient data"
in {
+ val bs1C = ByteString1C(Array[Byte](1, 2, 3))
+ an[IndexOutOfBoundsException] should be thrownBy bs1C.readIntBE(0)
+ an[IndexOutOfBoundsException] should be thrownBy bs1C.readIntLE(0)
+ an[IndexOutOfBoundsException] should be thrownBy bs1C.readIntBE(-1)
+ an[IndexOutOfBoundsException] should be thrownBy bs1C.readIntLE(-1)
+
+ val bs1 = ByteString1(Array[Byte](0, 1, 2, 3, 4), 1, 3)
+ an[IndexOutOfBoundsException] should be thrownBy bs1.readIntBE(0)
+ an[IndexOutOfBoundsException] should be thrownBy bs1.readIntLE(0)
+ an[IndexOutOfBoundsException] should be thrownBy bs1.readIntBE(-1)
+ an[IndexOutOfBoundsException] should be thrownBy bs1.readIntLE(-1)
+
+ val bss = ByteStrings(ByteString1.fromString("abc"),
ByteString1.fromString("d"))
+ an[IndexOutOfBoundsException] should be thrownBy bss.readIntBE(1)
+ an[IndexOutOfBoundsException] should be thrownBy bss.readIntLE(1)
+ an[IndexOutOfBoundsException] should be thrownBy bss.readIntBE(-1)
+ an[IndexOutOfBoundsException] should be thrownBy bss.readIntLE(-1)
+ }
+
+ "throw IndexOutOfBoundsException for readLongBE/LE with insufficient data"
in {
+ val bs1C = ByteString1C(Array[Byte](1, 2, 3, 4, 5, 6, 7))
+ an[IndexOutOfBoundsException] should be thrownBy bs1C.readLongBE(0)
+ an[IndexOutOfBoundsException] should be thrownBy bs1C.readLongLE(0)
+ an[IndexOutOfBoundsException] should be thrownBy bs1C.readLongBE(-1)
+ an[IndexOutOfBoundsException] should be thrownBy bs1C.readLongLE(-1)
+
+ val bs1 = ByteString1(Array[Byte](0, 1, 2, 3, 4, 5, 6, 7, 8), 1, 7)
+ an[IndexOutOfBoundsException] should be thrownBy bs1.readLongBE(0)
+ an[IndexOutOfBoundsException] should be thrownBy bs1.readLongLE(0)
+ an[IndexOutOfBoundsException] should be thrownBy bs1.readLongBE(-1)
+ an[IndexOutOfBoundsException] should be thrownBy bs1.readLongLE(-1)
+
+ val bss = ByteStrings(ByteString1.fromString("abcdef"),
ByteString1.fromString("g"))
+ an[IndexOutOfBoundsException] should be thrownBy bss.readLongBE(0)
+ an[IndexOutOfBoundsException] should be thrownBy bss.readLongLE(0)
+ an[IndexOutOfBoundsException] should be thrownBy bss.readLongBE(-1)
+ an[IndexOutOfBoundsException] should be thrownBy bss.readLongLE(-1)
+ }
}
"A ByteStringIterator" must {
diff --git
a/actor-tests/src/test/scala/org/apache/pekko/util/SWARUtilSpec.scala
b/actor-tests/src/test/scala/org/apache/pekko/util/SWARUtilSpec.scala
index b660e37406..9a9688ded6 100644
--- a/actor-tests/src/test/scala/org/apache/pekko/util/SWARUtilSpec.scala
+++ b/actor-tests/src/test/scala/org/apache/pekko/util/SWARUtilSpec.scala
@@ -17,6 +17,8 @@
package org.apache.pekko.util
+import java.nio.ByteOrder
+
import org.scalatest.wordspec.AnyWordSpec
import org.scalatest.matchers.should.Matchers
@@ -26,29 +28,35 @@ class SWARUtilSpec extends AnyWordSpec with Matchers {
"SWARUtil" must {
"getLong" in {
- SWARUtil.getLong(testData, 0) should ===(0x0001020304050607L)
- SWARUtil.getLong(testData, 0, true) should ===(0x0001020304050607L)
- SWARUtil.getLong(testData, 0, false) should ===(0x0706050403020100L)
+ SWARUtil.getLong(testData, 0, ByteOrder.BIG_ENDIAN) should
===(0x0001020304050607L)
+ SWARUtil.getLong(testData, 0, ByteOrder.LITTLE_ENDIAN) should
===(0x0706050403020100L)
SWARUtil.getLongBEWithoutMethodHandle(testData, 0) should
===(0x0001020304050607L)
SWARUtil.getLongLEWithoutMethodHandle(testData, 0) should
===(0x0706050403020100L)
- SWARUtil.getLong(testData, 8) should ===(0x08090A0B0C0D0E0FL)
- SWARUtil.getLong(testData, 8, true) should ===(0x08090A0B0C0D0E0FL)
- SWARUtil.getLong(testData, 8, false) should ===(0x0F0E0D0C0B0A0908L)
+ SWARUtil.getLong(testData, 8, ByteOrder.BIG_ENDIAN) should
===(0x08090A0B0C0D0E0FL)
+ SWARUtil.getLong(testData, 8, ByteOrder.LITTLE_ENDIAN) should
===(0x0F0E0D0C0B0A0908L)
SWARUtil.getLongBEWithoutMethodHandle(testData, 8) should
===(0x08090A0B0C0D0E0FL)
SWARUtil.getLongLEWithoutMethodHandle(testData, 8) should
===(0x0F0E0D0C0B0A0908L)
}
"getInt" in {
- SWARUtil.getInt(testData, 0) should ===(0x00010203)
- SWARUtil.getInt(testData, 0, true) should ===(0x00010203)
- SWARUtil.getInt(testData, 0, false) should ===(0x03020100)
+ SWARUtil.getInt(testData, 0, ByteOrder.BIG_ENDIAN) should ===(0x00010203)
+ SWARUtil.getInt(testData, 0, ByteOrder.LITTLE_ENDIAN) should
===(0x03020100)
SWARUtil.getIntBEWithoutMethodHandle(testData, 0) should ===(0x00010203)
SWARUtil.getIntLEWithoutMethodHandle(testData, 0) should ===(0x03020100)
- SWARUtil.getInt(testData, 4) should ===(0x04050607)
- SWARUtil.getInt(testData, 4, true) should ===(0x04050607)
- SWARUtil.getInt(testData, 4, false) should ===(0x07060504)
+ SWARUtil.getInt(testData, 4, ByteOrder.BIG_ENDIAN) should ===(0x04050607)
+ SWARUtil.getInt(testData, 4, ByteOrder.LITTLE_ENDIAN) should
===(0x07060504)
SWARUtil.getIntBEWithoutMethodHandle(testData, 4) should ===(0x04050607)
SWARUtil.getIntLEWithoutMethodHandle(testData, 4) should ===(0x07060504)
}
+ "getShort" in {
+ SWARUtil.getShort(testData, 0, ByteOrder.BIG_ENDIAN) should
===(0x0001.toShort)
+ SWARUtil.getShort(testData, 0, ByteOrder.LITTLE_ENDIAN) should
===(0x0100.toShort)
+ SWARUtil.getShortBEWithoutMethodHandle(testData, 0) should
===(0x0001.toShort)
+ SWARUtil.getShortLEWithoutMethodHandle(testData, 0) should
===(0x0100.toShort)
+ SWARUtil.getShort(testData, 2, ByteOrder.BIG_ENDIAN) should
===(0x0203.toShort)
+ SWARUtil.getShort(testData, 2, ByteOrder.LITTLE_ENDIAN) should
===(0x0302.toShort)
+ SWARUtil.getShortBEWithoutMethodHandle(testData, 2) should
===(0x0203.toShort)
+ SWARUtil.getShortLEWithoutMethodHandle(testData, 2) should
===(0x0302.toShort)
+ }
}
}
diff --git a/actor/src/main/scala-2.13/org/apache/pekko/util/ByteIterator.scala
b/actor/src/main/scala-2.13/org/apache/pekko/util/ByteIterator.scala
index ae56c30df0..75eb67ace4 100644
--- a/actor/src/main/scala-2.13/org/apache/pekko/util/ByteIterator.scala
+++ b/actor/src/main/scala-2.13/org/apache/pekko/util/ByteIterator.scala
@@ -141,6 +141,27 @@ object ByteIterator {
} else throw new NoSuchElementException("next on empty iterator")
}
+ override def getShort(implicit byteOrder: ByteOrder): Short = {
+ if (len < 2) throw new NoSuchElementException("next on empty iterator")
+ val result = SWARUtil.getShort(array, from, byteOrder)
+ from += 2
+ result
+ }
+
+ override def getInt(implicit byteOrder: ByteOrder): Int = {
+ if (len < 4) throw new NoSuchElementException("next on empty iterator")
+ val result = SWARUtil.getInt(array, from, byteOrder)
+ from += 4
+ result
+ }
+
+ override def getLong(implicit byteOrder: ByteOrder): Long = {
+ if (len < 8) throw new NoSuchElementException("next on empty iterator")
+ val result = SWARUtil.getLong(array, from, byteOrder)
+ from += 8
+ result
+ }
+
private def wrappedByteBuffer: ByteBuffer = ByteBuffer.wrap(array, from,
len).asReadOnlyBuffer
def getShorts(xs: Array[Short], offset: Int, n: Int)(implicit byteOrder:
ByteOrder): this.type = {
diff --git a/actor/src/main/scala-3/org/apache/pekko/util/ByteIterator.scala
b/actor/src/main/scala-3/org/apache/pekko/util/ByteIterator.scala
index 42d1d83e6a..31518cd81d 100644
--- a/actor/src/main/scala-3/org/apache/pekko/util/ByteIterator.scala
+++ b/actor/src/main/scala-3/org/apache/pekko/util/ByteIterator.scala
@@ -137,6 +137,27 @@ object ByteIterator {
} else throw new NoSuchElementException("next on empty iterator")
}
+ override def getShort(implicit byteOrder: ByteOrder): Short = {
+ if (len < 2) throw new NoSuchElementException("next on empty iterator")
+ val result = SWARUtil.getShort(array, from, byteOrder)
+ from += 2
+ result
+ }
+
+ override def getInt(implicit byteOrder: ByteOrder): Int = {
+ if (len < 4) throw new NoSuchElementException("next on empty iterator")
+ val result = SWARUtil.getInt(array, from, byteOrder)
+ from += 4
+ result
+ }
+
+ override def getLong(implicit byteOrder: ByteOrder): Long = {
+ if (len < 8) throw new NoSuchElementException("next on empty iterator")
+ val result = SWARUtil.getLong(array, from, byteOrder)
+ from += 8
+ result
+ }
+
private def wrappedByteBuffer: ByteBuffer = ByteBuffer.wrap(array, from,
len).asReadOnlyBuffer
def getShorts(xs: Array[Short], offset: Int, n: Int)(implicit byteOrder:
ByteOrder): this.type = {
diff --git a/actor/src/main/scala/org/apache/pekko/util/ByteString.scala
b/actor/src/main/scala/org/apache/pekko/util/ByteString.scala
index ed647d33b4..a74f3dc246 100644
--- a/actor/src/main/scala/org/apache/pekko/util/ByteString.scala
+++ b/actor/src/main/scala/org/apache/pekko/util/ByteString.scala
@@ -262,7 +262,7 @@ object ByteString {
val pattern = SWARUtil.compilePattern(elem)
var i = 0
while (i < longCount) {
- val word = SWARUtil.getLong(bytes, offset)
+ val word = SWARUtil.getLong(bytes, offset, ByteOrder.BIG_ENDIAN)
val result = SWARUtil.applyPattern(word, pattern)
if (result != 0) return offset + SWARUtil.getIndex(result)
offset += java.lang.Long.BYTES
@@ -288,7 +288,7 @@ object ByteString {
val pattern = SWARUtil.compilePattern(elem)
var i = 0
while (i < longCount) {
- val word = SWARUtil.getLong(bytes, offset)
+ val word = SWARUtil.getLong(bytes, offset, ByteOrder.BIG_ENDIAN)
val result = SWARUtil.applyPattern(word, pattern)
if (result != 0) return offset + SWARUtil.getIndex(result)
offset += java.lang.Long.BYTES
@@ -350,6 +350,19 @@ object ByteString {
override def toArrayUnsafe(): Array[Byte] = bytes
override def asInputStream: InputStream = new
UnsynchronizedByteArrayInputStream(bytes)
+
+ private[pekko] override def readShortBEUnchecked(offset: Int): Short =
+ SWARUtil.getShort(bytes, offset, ByteOrder.BIG_ENDIAN)
+ private[pekko] override def readShortLEUnchecked(offset: Int): Short =
+ SWARUtil.getShort(bytes, offset, ByteOrder.LITTLE_ENDIAN)
+ private[pekko] override def readIntBEUnchecked(offset: Int): Int =
+ SWARUtil.getInt(bytes, offset, ByteOrder.BIG_ENDIAN)
+ private[pekko] override def readIntLEUnchecked(offset: Int): Int =
+ SWARUtil.getInt(bytes, offset, ByteOrder.LITTLE_ENDIAN)
+ private[pekko] override def readLongBEUnchecked(offset: Int): Long =
+ SWARUtil.getLong(bytes, offset, ByteOrder.BIG_ENDIAN)
+ private[pekko] override def readLongLEUnchecked(offset: Int): Long =
+ SWARUtil.getLong(bytes, offset, ByteOrder.LITTLE_ENDIAN)
}
/** INTERNAL API: ByteString backed by exactly one array, with start / end
markers */
@@ -521,7 +534,7 @@ object ByteString {
val pattern = SWARUtil.compilePattern(elem)
var i = 0
while (i < longCount) {
- val word = SWARUtil.getLong(bytes, startIndex + offset)
+ val word = SWARUtil.getLong(bytes, startIndex + offset,
ByteOrder.BIG_ENDIAN)
val result = SWARUtil.applyPattern(word, pattern)
if (result != 0) return offset + SWARUtil.getIndex(result)
offset += java.lang.Long.BYTES
@@ -547,7 +560,7 @@ object ByteString {
val pattern = SWARUtil.compilePattern(elem)
var i = 0
while (i < longCount) {
- val word = SWARUtil.getLong(bytes, startIndex + offset)
+ val word = SWARUtil.getLong(bytes, startIndex + offset,
ByteOrder.BIG_ENDIAN)
val result = SWARUtil.applyPattern(word, pattern)
if (result != 0) return offset + SWARUtil.getIndex(result)
offset += java.lang.Long.BYTES
@@ -593,6 +606,19 @@ object ByteString {
override def asInputStream: InputStream =
new UnsynchronizedByteArrayInputStream(bytes, startIndex, length)
+
+ private[pekko] override def readShortBEUnchecked(offset: Int): Short =
+ SWARUtil.getShort(bytes, startIndex + offset, ByteOrder.BIG_ENDIAN)
+ private[pekko] override def readShortLEUnchecked(offset: Int): Short =
+ SWARUtil.getShort(bytes, startIndex + offset, ByteOrder.LITTLE_ENDIAN)
+ private[pekko] override def readIntBEUnchecked(offset: Int): Int =
+ SWARUtil.getInt(bytes, startIndex + offset, ByteOrder.BIG_ENDIAN)
+ private[pekko] override def readIntLEUnchecked(offset: Int): Int =
+ SWARUtil.getInt(bytes, startIndex + offset, ByteOrder.LITTLE_ENDIAN)
+ private[pekko] override def readLongBEUnchecked(offset: Int): Long =
+ SWARUtil.getLong(bytes, startIndex + offset, ByteOrder.BIG_ENDIAN)
+ private[pekko] override def readLongLEUnchecked(offset: Int): Long =
+ SWARUtil.getLong(bytes, startIndex + offset, ByteOrder.LITTLE_ENDIAN)
}
private[pekko] object ByteStrings extends Companion {
@@ -1337,6 +1363,157 @@ sealed abstract class ByteString
*/
final def mapI(f: Byte => Int): ByteString = map(f.andThen(_.toByte))
+ protected final def checkReadBounds(offset: Int, size: Int): Unit =
+ if (offset < 0 || offset + size > length)
+ throw new IndexOutOfBoundsException(
+ s"offset $offset with required size $size exceeds ByteString length
$length")
+
+ /**
+ * Read a short from this ByteString at the given offset in big-endian byte
order.
+ *
+ * @param offset the offset to read from
+ * @return the short value
+ * @throws IndexOutOfBoundsException if the offset is negative or there are
fewer than 2 bytes available from offset
+ * @since 2.0.0
+ */
+ def readShortBE(offset: Int): Short = {
+ checkReadBounds(offset, 2)
+ readShortBEUnchecked(offset)
+ }
+
+ /**
+ * Read a short from this ByteString at the given offset in little-endian
byte order.
+ *
+ * @param offset the offset to read from
+ * @return the short value
+ * @throws IndexOutOfBoundsException if the offset is negative or there are
fewer than 2 bytes available from offset
+ * @since 2.0.0
+ */
+ def readShortLE(offset: Int): Short = {
+ checkReadBounds(offset, 2)
+ readShortLEUnchecked(offset)
+ }
+
+ /**
+ * Read an int from this ByteString at the given offset in big-endian byte
order.
+ *
+ * @param offset the offset to read from
+ * @return the int value
+ * @throws IndexOutOfBoundsException if the offset is negative or there are
fewer than 4 bytes available from offset
+ * @since 2.0.0
+ */
+ def readIntBE(offset: Int): Int = {
+ checkReadBounds(offset, 4)
+ readIntBEUnchecked(offset)
+ }
+
+ /**
+ * Read an int from this ByteString at the given offset in little-endian
byte order.
+ *
+ * @param offset the offset to read from
+ * @return the int value
+ * @throws IndexOutOfBoundsException if the offset is negative or there are
fewer than 4 bytes available from offset
+ * @since 2.0.0
+ */
+ def readIntLE(offset: Int): Int = {
+ checkReadBounds(offset, 4)
+ readIntLEUnchecked(offset)
+ }
+
+ /**
+ * Read a long from this ByteString at the given offset in big-endian byte
order.
+ *
+ * @param offset the offset to read from
+ * @return the long value
+ * @throws IndexOutOfBoundsException if the offset is negative or there are
fewer than 8 bytes available from offset
+ * @since 2.0.0
+ */
+ def readLongBE(offset: Int): Long = {
+ checkReadBounds(offset, 8)
+ readLongBEUnchecked(offset)
+ }
+
+ /**
+ * Read a long from this ByteString at the given offset in little-endian
byte order.
+ *
+ * @param offset the offset to read from
+ * @return the long value
+ * @throws IndexOutOfBoundsException if the offset is negative or there are
fewer than 8 bytes available from offset
+ * @since 2.0.0
+ */
+ def readLongLE(offset: Int): Long = {
+ checkReadBounds(offset, 8)
+ readLongLEUnchecked(offset)
+ }
+
+ /**
+ * INTERNAL API
+ * Optimized in subclasses when we have byte arrays where we can use {@link
SWARUtil}
+ * methods.
+ */
+ private[pekko] def readShortBEUnchecked(offset: Int): Short =
+ ((apply(offset) & 0xFF) << 8 | (apply(offset + 1) & 0xFF)).toShort
+
+ /**
+ * INTERNAL API
+ * Optimized in subclasses when we have byte arrays where we can use {@link
SWARUtil}
+ * methods.
+ */
+ private[pekko] def readShortLEUnchecked(offset: Int): Short =
+ ((apply(offset) & 0xFF) | (apply(offset + 1) & 0xFF) << 8).toShort
+
+ /**
+ * INTERNAL API
+ * Optimized in subclasses when we have byte arrays where we can use {@link
SWARUtil}
+ * methods.
+ */
+ private[pekko] def readIntBEUnchecked(offset: Int): Int =
+ (apply(offset) & 0xFF) << 24 |
+ (apply(offset + 1) & 0xFF) << 16 |
+ (apply(offset + 2) & 0xFF) << 8 |
+ (apply(offset + 3) & 0xFF)
+
+ /**
+ * INTERNAL API
+ * Optimized in subclasses when we have byte arrays where we can use {@link
SWARUtil}
+ * methods.
+ */
+ private[pekko] def readIntLEUnchecked(offset: Int): Int =
+ (apply(offset) & 0xFF) |
+ (apply(offset + 1) & 0xFF) << 8 |
+ (apply(offset + 2) & 0xFF) << 16 |
+ (apply(offset + 3) & 0xFF) << 24
+
+ /**
+ * INTERNAL API
+ * Optimized in subclasses when we have byte arrays where we can use {@link
SWARUtil}
+ * methods.
+ */
+ private[pekko] def readLongBEUnchecked(offset: Int): Long =
+ (apply(offset).toLong & 0xFF) << 56 |
+ (apply(offset + 1).toLong & 0xFF) << 48 |
+ (apply(offset + 2).toLong & 0xFF) << 40 |
+ (apply(offset + 3).toLong & 0xFF) << 32 |
+ (apply(offset + 4).toLong & 0xFF) << 24 |
+ (apply(offset + 5).toLong & 0xFF) << 16 |
+ (apply(offset + 6).toLong & 0xFF) << 8 |
+ (apply(offset + 7).toLong & 0xFF)
+
+ /**
+ * INTERNAL API
+ * Optimized in subclasses when we have byte arrays where we can use {@link
SWARUtil}
+ * methods.
+ */
+ private[pekko] def readLongLEUnchecked(offset: Int): Long =
+ (apply(offset).toLong & 0xFF) |
+ (apply(offset + 1).toLong & 0xFF) << 8 |
+ (apply(offset + 2).toLong & 0xFF) << 16 |
+ (apply(offset + 3).toLong & 0xFF) << 24 |
+ (apply(offset + 4).toLong & 0xFF) << 32 |
+ (apply(offset + 5).toLong & 0xFF) << 40 |
+ (apply(offset + 6).toLong & 0xFF) << 48 |
+ (apply(offset + 7).toLong & 0xFF) << 56
+
def map[A](f: Byte => Byte): ByteString = fromSpecific(super.map(f))
}
diff --git a/actor/src/main/scala/org/apache/pekko/util/SWARUtil.scala
b/actor/src/main/scala/org/apache/pekko/util/SWARUtil.scala
index bbc8fc22c8..a0d44fd52d 100644
--- a/actor/src/main/scala/org/apache/pekko/util/SWARUtil.scala
+++ b/actor/src/main/scala/org/apache/pekko/util/SWARUtil.scala
@@ -16,6 +16,7 @@
package org.apache.pekko.util
import java.lang.invoke.MethodHandles
+import java.nio.ByteOrder
import org.apache.pekko.annotation.InternalApi
@@ -25,6 +26,38 @@ import org.apache.pekko.annotation.InternalApi
* Copied from the Netty Project.
*
https://github.com/netty/netty/blob/d28a0fc6598b50fbe8f296831777cf4b653a475f/common/src/main/java/io/netty/util/internal/SWARUtil.java
* </p>
+ * <p>
+ * Multi-byte reads use
[[java.lang.invoke.MethodHandles#byteArrayViewVarHandle]], which allows
+ * reading several bytes from a byte array as a single typed value (e.g.
`short`, `int`,
+ * or `long`) in one operation rather than reading and shifting each byte
individually.
+ * </p>
+ * <p>
+ * The JDK itself uses the same technique. Since Java 17,
`jdk.internal.util.ByteArray` (big
+ * endian) and `jdk.internal.util.ByteArrayLittleEndian` (little endian) use
+ * `MethodHandles.byteArrayViewVarHandle` for every primitive type, and those
helpers back the
+ * public APIs of `java.io.DataInputStream` (`readShort`, `readInt`,
+ * `readLong`, etc.) and `java.util.UUID` construction from bytes.
+ * </p>
+ * <h3>Why this is faster than byte-by-byte shifts</h3>
+ * <ul>
+ * <li><b>Single native load instruction</b> – on x86/x64 and AArch64 the
HotSpot JIT intrinsifies
+ * the VarHandle access into a single `MOVZX`, `MOV`, or `LDR`
instruction
+ * that reads the full value directly from memory, whereas manual
byte-shift code requires
+ * multiple load-and-shift-and-or sequences that are harder for the JIT
to collapse.</li>
+ * <li><b>Consolidated bounds check</b> – a single range check covers the
entire multi-byte read;
+ * individual `array(i)` accesses each carry their own implicit bounds
check.</li>
+ * <li><b>No alignment requirement</b> – unlike `sun.misc.Unsafe` the
VarHandle variant
+ * works correctly on unaligned offsets, so callers do not need to pad
or copy data to satisfy
+ * alignment constraints.</li>
+ * <li><b>SWAR arithmetic</b> – reading a full `long` with a single
VarHandle call means
+ * eight bytes arrive in one register, enabling SWAR patterns that test
all eight bytes in
+ * parallel (see [[applyPattern]]).</li>
+ * </ul>
+ * <p>
+ * A runtime `try/catch` guards each VarHandle creation; if the JVM does not
support the API
+ * (e.g. older Android runtimes) the code falls back to explicit byte-by-byte
shift implementations
+ * (`getLongBEWithoutMethodHandle`, etc.) so behaviour is always correct.
+ * </p>
*/
@InternalApi
private[pekko] object SWARUtil {
@@ -65,6 +98,24 @@ private[pekko] object SWARUtil {
case _: Throwable => (null, false)
}
+ private val (shortBeArrayView, shortBeArrayViewSupported) =
+ try {
+ (MethodHandles.byteArrayViewVarHandle(
+ classOf[Array[Short]], java.nio.ByteOrder.BIG_ENDIAN),
+ true)
+ } catch {
+ case _: Throwable => (null, false)
+ }
+
+ private val (shortLeArrayView, shortLeArrayViewSupported) =
+ try {
+ (MethodHandles.byteArrayViewVarHandle(
+ classOf[Array[Short]], java.nio.ByteOrder.LITTLE_ENDIAN),
+ true)
+ } catch {
+ case _: Throwable => (null, false)
+ }
+
/**
* Compiles given byte into a long pattern suitable for SWAR operations.
*/
@@ -102,32 +153,22 @@ private[pekko] object SWARUtil {
*
* @param array the byte array to read from
* @param index the index to read from
+ * @param byteOrder the byte order to use (big-endian or little-endian)
* @return the long value at the specified index
*/
- def getLong(array: Array[Byte], index: Int): Long = {
- if (longBeArrayViewSupported) {
- longBeArrayView.get(array, index)
+ def getLong(array: Array[Byte], index: Int, byteOrder: ByteOrder): Long = {
+ if (byteOrder == ByteOrder.BIG_ENDIAN) {
+ if (longBeArrayViewSupported) {
+ longBeArrayView.get(array, index)
+ } else {
+ getLongBEWithoutMethodHandle(array, index)
+ }
} else {
- getLongBEWithoutMethodHandle(array, index)
- }
- }
-
- /**
- * Returns the long value at the specified index in the given byte array.
- * Uses a VarHandle byte array view if supported.
- * Does not range check - assumes caller has checked bounds.
- *
- * @param array the byte array to read from
- * @param index the index to read from
- * @return the long value at the specified index
- */
- def getLong(array: Array[Byte], index: Int, bigEndian: Boolean): Long = {
- if (bigEndian) {
- getLong(array, index)
- } else if (longLeArrayViewSupported) {
- longLeArrayView.get(array, index)
- } else {
- getLongLEWithoutMethodHandle(array, index)
+ if (longLeArrayViewSupported) {
+ longLeArrayView.get(array, index)
+ } else {
+ getLongLEWithoutMethodHandle(array, index)
+ }
}
}
@@ -138,36 +179,53 @@ private[pekko] object SWARUtil {
*
* @param array the byte array to read from
* @param index the index to read from
+ * @param byteOrder the byte order to use (big-endian or little-endian)
* @return the int value at the specified index
*/
- def getInt(array: Array[Byte], index: Int): Int = {
- if (intBeArrayViewSupported) {
- intBeArrayView.get(array, index)
+ def getInt(array: Array[Byte], index: Int, byteOrder: ByteOrder): Int = {
+ if (byteOrder == ByteOrder.BIG_ENDIAN) {
+ if (intBeArrayViewSupported) {
+ intBeArrayView.get(array, index)
+ } else {
+ getIntBEWithoutMethodHandle(array, index)
+ }
} else {
- getIntBEWithoutMethodHandle(array, index)
+ if (intLeArrayViewSupported) {
+ intLeArrayView.get(array, index)
+ } else {
+ getIntLEWithoutMethodHandle(array, index)
+ }
}
}
/**
- * Returns the int value at the specified index in the given byte array.
- * Uses a VarHandle byte array view if supported.
+ * Returns the short value at the specified index in the given byte array.
+ * Uses big-endian byte order. Uses a VarHandle byte array view if supported.
* Does not range check - assumes caller has checked bounds.
*
* @param array the byte array to read from
* @param index the index to read from
- * @param bigEndian whether to use big-endian or little-endian byte order
- * @return the int value at the specified index
+ * @param byteOrder the byte order to use (big-endian or little-endian)
+ * @return the short value at the specified index
*/
- def getInt(array: Array[Byte], index: Int, bigEndian: Boolean): Int = {
- if (bigEndian) {
- getInt(array, index)
- } else if (intLeArrayViewSupported) {
- intLeArrayView.get(array, index)
+ def getShort(array: Array[Byte], index: Int, byteOrder: ByteOrder): Short = {
+ if (byteOrder == ByteOrder.BIG_ENDIAN) {
+ if (shortBeArrayViewSupported) {
+ shortBeArrayView.get(array, index).asInstanceOf[Short]
+ } else {
+ getShortBEWithoutMethodHandle(array, index)
+ }
} else {
- getIntLEWithoutMethodHandle(array, index)
+ if (shortLeArrayViewSupported) {
+ shortLeArrayView.get(array, index).asInstanceOf[Short]
+ } else {
+ getShortLEWithoutMethodHandle(array, index)
+ }
}
}
+ // Fallback implementations for environments that do not support
MethodHandles.byteArrayViewVarHandle
+
private[pekko] def getLongBEWithoutMethodHandle(array: Array[Byte], index:
Int): Long = {
(array(index).toLong & 0xFF) << 56 |
(array(index + 1).toLong & 0xFF) << 48 |
@@ -204,4 +262,10 @@ private[pekko] object SWARUtil {
(array(index + 3) & 0xFF) << 24
}
+ private[pekko] def getShortBEWithoutMethodHandle(array: Array[Byte], index:
Int): Short =
+ ((array(index) & 0xFF) << 8 | (array(index + 1) & 0xFF)).toShort
+
+ private[pekko] def getShortLEWithoutMethodHandle(array: Array[Byte], index:
Int): Short =
+ ((array(index) & 0xFF) | (array(index + 1) & 0xFF) << 8).toShort
+
}
diff --git
a/bench-jmh/src/main/scala/org/apache/pekko/util/ByteStringParser_readNum_Benchmark.scala
b/bench-jmh/src/main/scala/org/apache/pekko/util/ByteStringParser_readNum_Benchmark.scala
new file mode 100644
index 0000000000..a018c10980
--- /dev/null
+++
b/bench-jmh/src/main/scala/org/apache/pekko/util/ByteStringParser_readNum_Benchmark.scala
@@ -0,0 +1,86 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.pekko.util
+
+import java.util.concurrent.TimeUnit
+
+import org.apache.pekko
+import pekko.stream.impl.io.ByteStringParser
+
+import org.openjdk.jmh.annotations._
+
+@State(Scope.Benchmark)
+@Measurement(timeUnit = TimeUnit.MILLISECONDS)
+class ByteStringParser_readNum_Benchmark {
+ val start = ByteString("abcdefg") ++ ByteString("hijklmno") ++
ByteString("pqrstuv")
+ val bss = start ++ start ++ start ++ start ++ start ++ ByteString("xyz")
+ val bs = bss.compact
+
+ @Benchmark
+ def readIntBE: Int = {
+ val reader = new ByteStringParser.ByteReader(bs)
+ var i: Int = 0
+ try {
+ while (true) i = reader.readIntBE()
+ } catch {
+ case _: Exception => 0
+ }
+ i
+ }
+
+ @Benchmark
+ def readLongBE: Long = {
+ val reader = new ByteStringParser.ByteReader(bs)
+ var l: Long = 0L
+ try {
+ while (true) l = reader.readLongBE()
+ } catch {
+ case _: Exception => 0L
+ }
+ l
+ }
+
+ // bss is a worst case scenario for the ByteReader/ByteString because we
cannot optimize
+ // the readIntBE/LongBE by reading directly from the ByteString's
+ // internal array, but have to read byte by byte.
+
+ @Benchmark
+ def readIntBE_ConcatString: Int = {
+ val reader = new ByteStringParser.ByteReader(bss)
+ var i: Int = 0
+ try {
+ while (true) i = reader.readIntBE()
+ } catch {
+ case _: Exception => 0
+ }
+ i
+ }
+
+ @Benchmark
+ def readLongBE_ConcatString: Long = {
+ val reader = new ByteStringParser.ByteReader(bss)
+ var l: Long = 0L
+ try {
+ while (true) l = reader.readLongBE()
+ } catch {
+ case _: Exception => 0L
+ }
+ l
+ }
+
+}
diff --git
a/persistence/src/main/scala/org/apache/pekko/persistence/serialization/SnapshotSerializer.scala
b/persistence/src/main/scala/org/apache/pekko/persistence/serialization/SnapshotSerializer.scala
index 44c354ddbc..200c835ff6 100644
---
a/persistence/src/main/scala/org/apache/pekko/persistence/serialization/SnapshotSerializer.scala
+++
b/persistence/src/main/scala/org/apache/pekko/persistence/serialization/SnapshotSerializer.scala
@@ -14,6 +14,7 @@
package org.apache.pekko.persistence.serialization
import java.io._
+import java.nio.ByteOrder
import org.apache.pekko
import pekko.actor._
@@ -91,7 +92,7 @@ class SnapshotSerializer(val system: ExtendedActorSystem)
extends BaseSerializer
private def headerFromBinary(bytes: Array[Byte]): (Int, String) = {
if (bytes.length < 4) throw new IllegalArgumentException("Invalid snapshot
header, too short")
- val serializerId = SWARUtil.getInt(bytes, 0, false)
+ val serializerId = SWARUtil.getInt(bytes, 0, ByteOrder.LITTLE_ENDIAN)
if ((serializerId & 0xEDAC) == 0xEDAC) // Java Serialization magic value
throw new NotSerializableException(s"Replaying snapshot from akka 2.3.x
version is not supported any more")
@@ -164,7 +165,7 @@ class SnapshotSerializer(val system: ExtendedActorSystem)
extends BaseSerializer
}
private def snapshotFromBinary(bytes: Array[Byte]): AnyRef = {
- val headerLength = SWARUtil.getInt(bytes, 0, false)
+ val headerLength = SWARUtil.getInt(bytes, 0, ByteOrder.LITTLE_ENDIAN)
val headerBytes = bytes.slice(4, headerLength + 4)
val snapshotBytes = bytes.drop(headerLength + 4)
diff --git
a/remote/src/main/java/org/apache/pekko/remote/artery/compress/CountMinSketch.java
b/remote/src/main/java/org/apache/pekko/remote/artery/compress/CountMinSketch.java
index 222ab2b925..737841dff3 100644
---
a/remote/src/main/java/org/apache/pekko/remote/artery/compress/CountMinSketch.java
+++
b/remote/src/main/java/org/apache/pekko/remote/artery/compress/CountMinSketch.java
@@ -13,6 +13,7 @@
package org.apache.pekko.remote.artery.compress;
+import java.nio.ByteOrder;
import org.apache.pekko.actor.ActorRef;
import org.apache.pekko.util.SWARUtil;
@@ -199,7 +200,7 @@ public class CountMinSketch {
// Body
int i = 0;
while (len >= 4) {
- int k = SWARUtil.getInt(data, i, false);
+ int k = SWARUtil.getInt(data, i, ByteOrder.LITTLE_ENDIAN);
h = mix(h, k);
i += 4;
diff --git
a/stream/src/main/scala/org/apache/pekko/stream/impl/io/ByteStringParser.scala
b/stream/src/main/scala/org/apache/pekko/stream/impl/io/ByteStringParser.scala
index c1516351a7..bef6e0e7d4 100644
---
a/stream/src/main/scala/org/apache/pekko/stream/impl/io/ByteStringParser.scala
+++
b/stream/src/main/scala/org/apache/pekko/stream/impl/io/ByteStringParser.scala
@@ -219,13 +219,43 @@ import pekko.util.ByteString
off += 1
x & 0xFF
} else throw NeedMoreData
- def readShortLE(): Int = readByte() | (readByte() << 8)
- def readIntLE(): Int = readShortLE() | (readShortLE() << 16)
- def readLongLE(): Long = (readIntLE() & 0xFFFFFFFFL) | ((readIntLE() &
0xFFFFFFFFL) << 32)
+ def readShortLE(): Int = {
+ if (off + 2 > input.length) throw NeedMoreData
+ val result = input.readShortLEUnchecked(off) & 0xFFFF
+ off += 2
+ result
+ }
+ def readIntLE(): Int = {
+ if (off + 4 > input.length) throw NeedMoreData
+ val result = input.readIntLEUnchecked(off)
+ off += 4
+ result
+ }
+ def readLongLE(): Long = {
+ if (off + 8 > input.length) throw NeedMoreData
+ val result = input.readLongLEUnchecked(off)
+ off += 8
+ result
+ }
- def readShortBE(): Int = (readByte() << 8) | readByte()
- def readIntBE(): Int = (readShortBE() << 16) | readShortBE()
- def readLongBE(): Long = ((readIntBE() & 0xFFFFFFFFL) << 32) |
(readIntBE() & 0xFFFFFFFFL)
+ def readShortBE(): Int = {
+ if (off + 2 > input.length) throw NeedMoreData
+ val result = input.readShortBEUnchecked(off) & 0xFFFF
+ off += 2
+ result
+ }
+ def readIntBE(): Int = {
+ if (off + 4 > input.length) throw NeedMoreData
+ val result = input.readIntBEUnchecked(off)
+ off += 4
+ result
+ }
+ def readLongBE(): Long = {
+ if (off + 8 > input.length) throw NeedMoreData
+ val result = input.readLongBEUnchecked(off)
+ off += 8
+ result
+ }
def skip(numBytes: Int): Unit =
if (off + numBytes <= input.length) off += numBytes
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]