This is an automated email from the ASF dual-hosted git repository.

fanningpj pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/pekko.git


The following commit(s) were added to refs/heads/main by this push:
     new 30a0046944 use UnsynchronizedByteArrayInputStream (#2300)
30a0046944 is described below

commit 30a00469440b89072c77bdb6b441222e727c8ba8
Author: PJ Fanning <[email protected]>
AuthorDate: Tue Oct 7 16:36:15 2025 +0100

    use UnsynchronizedByteArrayInputStream (#2300)
    
    * use UnsynchronizedByteArrayInputStream
    
    * javafmt
    
    * add test
    
    * Update UnsynchronizedByteArrayInputStreamSpec.scala
    
    * Update UnsynchronizedByteArrayInputStreamSpec.scala
    
    * Update UnsynchronizedByteArrayInputStream.java
---
 LICENSE                                            |   6 +-
 .../UnsynchronizedByteArrayInputStreamSpec.scala   |  53 ++++++
 .../pekko/util/ByteStringInputStreamSpec.scala     |   4 +-
 .../org/apache/pekko/util/ByteStringSpec.scala     |   9 +-
 .../io/UnsynchronizedByteArrayInputStream.java     | 180 +++++++++++++++++++++
 .../apache/pekko/serialization/Serializer.scala    |  10 +-
 .../scala/org/apache/pekko/util/ByteString.scala   |   8 +-
 .../util/ByteString_asInputStream_Benchmark.scala  |   7 +-
 .../metrics/protobuf/MessageSerializer.scala       |   8 +-
 .../ClusterShardingMessageSerializer.scala         |   5 +-
 .../DistributedPubSubMessageSerializer.scala       |   5 +-
 .../protobuf/ClusterMessageSerializer.scala        |   5 +-
 .../ddata/protobuf/SerializationSupport.scala      |   4 +-
 legal/pekko-actor-jar-license.txt                  |   7 +
 .../serialization/SnapshotSerializer.scala         |   5 +-
 .../serialization/jackson/JacksonSerializer.scala  |   5 +-
 .../pekko/stream/io/compression/CoderSpec.scala    |   4 +-
 .../apache/pekko/testkit/TestJavaSerializer.scala  |  11 +-
 18 files changed, 294 insertions(+), 42 deletions(-)

diff --git a/LICENSE b/LICENSE
index 8622a26ba3..f4b62dab0e 100644
--- a/LICENSE
+++ b/LICENSE
@@ -306,8 +306,10 @@ Copyright 2015 Ben Manes. All Rights Reserved.
 
 ---------------
 
-pekko-actor contains code in `org.apache.pekko.io.ByteBufferCleaner` which was 
based on code
-from Apache commons-io which was developed under the Apache 2.0 license.
+pekko-actor contains code from Apache commons-io which was developed under the
+Apache 2.0 license.
+- actor/src/main/java/org/apache/pekko/io/ByteBufferCleaner.java
+- 
actor/src/main/java/org/apache/pekko/io/UnsynchronizedByteArrayInputStream.java
 
 ---------------
 
diff --git 
a/actor-tests/src/test/scala/org/apache/pekko/io/UnsynchronizedByteArrayInputStreamSpec.scala
 
b/actor-tests/src/test/scala/org/apache/pekko/io/UnsynchronizedByteArrayInputStreamSpec.scala
new file mode 100644
index 0000000000..0fdff98c5a
--- /dev/null
+++ 
b/actor-tests/src/test/scala/org/apache/pekko/io/UnsynchronizedByteArrayInputStreamSpec.scala
@@ -0,0 +1,53 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.pekko.util
+
+import java.nio.charset.StandardCharsets
+
+import org.apache.pekko
+import pekko.io.UnsynchronizedByteArrayInputStream
+
+import org.scalatest.matchers.should.Matchers
+import org.scalatest.wordspec.AnyWordSpec
+
+class UnsynchronizedByteArrayInputStreamSpec extends AnyWordSpec with Matchers 
{
+  "UnsynchronizedByteArrayInputStream" must {
+    "support mark and reset" in {
+      val stream = new 
UnsynchronizedByteArrayInputStream("abc".getBytes(StandardCharsets.UTF_8))
+      stream.markSupported() should ===(true)
+      stream.read() should ===('a')
+      stream.mark(1) // the parameter value (a readAheadLimit) is ignored as 
it is in ByteArrayInputStream too
+      stream.read() should ===('b')
+      stream.reset()
+      stream.read() should ===('b')
+      stream.close()
+    }
+    "support skip" in {
+      val stream = new 
UnsynchronizedByteArrayInputStream("abc".getBytes(StandardCharsets.UTF_8))
+      stream.skip(1) should ===(1)
+      stream.read() should ===('b')
+      stream.close()
+    }
+    "support skip with large value" in {
+      val stream = new 
UnsynchronizedByteArrayInputStream("abc".getBytes(StandardCharsets.UTF_8))
+      stream.skip(50) should ===(3) // only 3 bytes to skip
+      stream.read() should ===(-1)
+      stream.close()
+    }
+  }
+}
diff --git 
a/actor-tests/src/test/scala/org/apache/pekko/util/ByteStringInputStreamSpec.scala
 
b/actor-tests/src/test/scala/org/apache/pekko/util/ByteStringInputStreamSpec.scala
index b49f4de017..3bab079071 100644
--- 
a/actor-tests/src/test/scala/org/apache/pekko/util/ByteStringInputStreamSpec.scala
+++ 
b/actor-tests/src/test/scala/org/apache/pekko/util/ByteStringInputStreamSpec.scala
@@ -30,7 +30,7 @@ class ByteStringInputStreamSpec extends AnyWordSpec with 
Matchers {
   "ByteString1" must {
     "support asInputStream" in {
       ByteString1.empty.asInputStream.read() shouldEqual -1
-      ByteString1.empty.asInputStream.read(Array.empty) shouldEqual -1
+      ByteString1.empty.asInputStream.read(Array(1)) shouldEqual -1
       toUtf8String(ByteString1.empty.asInputStream) shouldEqual ""
       toUtf8String(ByteString1.fromString("abc").asInputStream) shouldEqual 
"abc"
     }
@@ -47,7 +47,7 @@ class ByteStringInputStreamSpec extends AnyWordSpec with 
Matchers {
     "support asInputStream" in {
       val empty = ByteStrings(ByteString1.fromString(""), 
ByteString1.fromString(""))
       empty.asInputStream.read() shouldEqual -1
-      empty.asInputStream.read(Array.empty) shouldEqual -1
+      empty.asInputStream.read(Array(1)) shouldEqual -1
       toUtf8String(empty.asInputStream) shouldEqual ""
       val abc = ByteStrings(ByteString1.fromString("a"), 
ByteString1.fromString("bc"))
       toUtf8String(abc.asInputStream) shouldEqual "abc"
diff --git 
a/actor-tests/src/test/scala/org/apache/pekko/util/ByteStringSpec.scala 
b/actor-tests/src/test/scala/org/apache/pekko/util/ByteStringSpec.scala
index 0aa077ffa1..54044cf98d 100644
--- a/actor-tests/src/test/scala/org/apache/pekko/util/ByteStringSpec.scala
+++ b/actor-tests/src/test/scala/org/apache/pekko/util/ByteStringSpec.scala
@@ -13,7 +13,7 @@
 
 package org.apache.pekko.util
 
-import java.io.{ ByteArrayInputStream, ByteArrayOutputStream, 
ObjectInputStream, ObjectOutputStream }
+import java.io.{ ByteArrayOutputStream, ObjectInputStream, ObjectOutputStream }
 import java.lang.Double.doubleToRawLongBits
 import java.lang.Float.floatToRawIntBits
 import java.nio.{ ByteBuffer, ByteOrder }
@@ -29,6 +29,7 @@ import org.scalacheck.Arbitrary.arbitrary
 import org.scalatestplus.scalacheck.Checkers
 
 import org.apache.pekko
+import pekko.io.UnsynchronizedByteArrayInputStream
 import pekko.util.ByteString.{ ByteString1, ByteString1C, ByteStrings }
 
 import org.scalatest.matchers.should.Matchers
@@ -96,9 +97,9 @@ class ByteStringSpec extends AnyWordSpec with Matchers with 
Checkers {
   }
 
   def deserialize(bytes: Array[Byte]): AnyRef = {
-    val is = new ObjectInputStream(new ByteArrayInputStream(bytes))
-
-    is.readObject
+    val is = new ObjectInputStream(new 
UnsynchronizedByteArrayInputStream(bytes))
+    try is.readObject
+    finally is.close()
   }
 
   def testSer(obj: AnyRef) = {
diff --git 
a/actor/src/main/java/org/apache/pekko/io/UnsynchronizedByteArrayInputStream.java
 
b/actor/src/main/java/org/apache/pekko/io/UnsynchronizedByteArrayInputStream.java
new file mode 100644
index 0000000000..3ac6b0cc58
--- /dev/null
+++ 
b/actor/src/main/java/org/apache/pekko/io/UnsynchronizedByteArrayInputStream.java
@@ -0,0 +1,180 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.pekko.io;
+
+import java.io.ByteArrayInputStream;
+import java.io.InputStream;
+import java.util.Objects;
+import org.apache.pekko.annotation.InternalApi;
+
+/**
+ * Internal API: An unsynchronized byte array input stream. This class does 
not copy the provided
+ * byte array, and it is not thread safe.
+ *
+ * @see ByteArrayInputStream
+ * @since 2.0.0
+ */
+// @NotThreadSafe
+// copied from
+// 
https://github.com/apache/commons-io/blob/26e5aa9661a72bfd9697fb384ca72f58e5d672e9/src/main/java/org/apache/commons/io/input/UnsynchronizedByteArrayInputStream.java
+@InternalApi
+public class UnsynchronizedByteArrayInputStream extends InputStream {
+
+  /** The end of stream marker. */
+  private static final int END_OF_STREAM = -1;
+
+  private static int minPosLen(final byte[] data, final int defaultValue) {
+    requireNonNegative(defaultValue, "defaultValue");
+    return Math.min(defaultValue, data.length > 0 ? data.length : 
defaultValue);
+  }
+
+  private static int requireNonNegative(final int value, final String name) {
+    if (value < 0) {
+      throw new IllegalArgumentException(name + " cannot be negative");
+    }
+    return value;
+  }
+
+  private static void checkFromIndexSize(final byte[] array, final int off, 
final int len) {
+    final int arrayLength = Objects.requireNonNull(array, "byte array").length;
+    if ((off | len | arrayLength) < 0 || arrayLength - len < off) {
+      throw new IndexOutOfBoundsException(
+          String.format("Range [%s, %<s + %s) out of bounds for length %s", 
off, len, arrayLength));
+    }
+  }
+
+  /** The underlying data buffer. */
+  private final byte[] data;
+
+  /**
+   * End Of Data.
+   *
+   * <p>Similar to data.length, which is the last readable offset + 1.
+   */
+  private final int eod;
+
+  /** Current offset in the data buffer. */
+  private int offset;
+
+  /** The current mark (if any). */
+  private int markedOffset;
+
+  /**
+   * Constructs a new byte array input stream.
+   *
+   * @param data the buffer
+   */
+  public UnsynchronizedByteArrayInputStream(final byte[] data) {
+    this.data = Objects.requireNonNull(data, "data");
+    this.offset = 0;
+    this.markedOffset = 0;
+    this.eod = data.length;
+  }
+
+  /**
+   * Constructs a new byte array input stream.
+   *
+   * @param data the buffer
+   * @param offset the offset into the buffer
+   * @param length the length of the buffer
+   * @throws IllegalArgumentException if the offset or length less than zero
+   */
+  public UnsynchronizedByteArrayInputStream(final byte[] data, final int 
offset, final int length) {
+    requireNonNegative(offset, "offset");
+    requireNonNegative(length, "length");
+    this.data = Objects.requireNonNull(data, "data");
+    this.eod = Math.min(minPosLen(data, offset) + length, data.length);
+    this.offset = minPosLen(data, offset);
+    this.markedOffset = minPosLen(data, offset);
+  }
+
+  @Override
+  public int available() {
+    return offset < eod ? eod - offset : 0;
+  }
+
+  @SuppressWarnings("sync-override")
+  @Override
+  public void mark(final int readLimit) {
+    this.markedOffset = this.offset;
+  }
+
+  @Override
+  public boolean markSupported() {
+    return true;
+  }
+
+  @Override
+  public int read() {
+    return offset < eod ? data[offset++] & 0xff : END_OF_STREAM;
+  }
+
+  @Override
+  public int read(final byte[] dest) {
+    Objects.requireNonNull(dest, "dest");
+    return readLocal(dest, 0, dest.length);
+  }
+
+  @Override
+  public int read(final byte[] dest, final int off, final int len) {
+    checkFromIndexSize(dest, off, len);
+    return readLocal(dest, off, len);
+  }
+
+  private final int readLocal(final byte[] dest, final int off, final int len) 
{
+    if (len == 0) {
+      return 0;
+    }
+
+    if (offset >= eod) {
+      return END_OF_STREAM;
+    }
+
+    int actualLen = eod - offset;
+    if (len < actualLen) {
+      actualLen = len;
+    }
+    if (actualLen <= 0) {
+      return 0;
+    }
+    System.arraycopy(data, offset, dest, off, actualLen);
+    offset += actualLen;
+    return actualLen;
+  }
+
+  @SuppressWarnings("sync-override")
+  @Override
+  public void reset() {
+    this.offset = this.markedOffset;
+  }
+
+  @Override
+  public long skip(final long n) {
+    if (n < 0) {
+      throw new IllegalArgumentException("Skipping backward is not supported");
+    }
+
+    long actualSkip = eod - offset;
+    if (n < actualSkip) {
+      actualSkip = n;
+    }
+
+    offset = Math.addExact(offset, Math.toIntExact(n));
+    return actualSkip;
+  }
+}
diff --git 
a/actor/src/main/scala/org/apache/pekko/serialization/Serializer.scala 
b/actor/src/main/scala/org/apache/pekko/serialization/Serializer.scala
index 95627fb831..1e3a06f5f6 100644
--- a/actor/src/main/scala/org/apache/pekko/serialization/Serializer.scala
+++ b/actor/src/main/scala/org/apache/pekko/serialization/Serializer.scala
@@ -13,7 +13,6 @@
 
 package org.apache.pekko.serialization
 
-import java.io.ByteArrayInputStream
 import java.io.ByteArrayOutputStream
 import java.io.NotSerializableException
 import java.io.ObjectOutputStream
@@ -28,6 +27,7 @@ import pekko.actor.ExtendedActorSystem
 import pekko.annotation.InternalApi
 import pekko.event.LogMarker
 import pekko.event.Logging
+import pekko.io.UnsynchronizedByteArrayInputStream
 import pekko.util.ClassLoaderObjectInputStream
 
 /**
@@ -354,10 +354,10 @@ class JavaSerializer(val system: ExtendedActorSystem) 
extends BaseSerializer {
 
   @throws(classOf[NotSerializableException])
   def fromBinary(bytes: Array[Byte], clazz: Option[Class[_]]): AnyRef = {
-    val in = new 
ClassLoaderObjectInputStream(system.dynamicAccess.classLoader, new 
ByteArrayInputStream(bytes))
-    val obj = JavaSerializer.currentSystem.withValue(system) { in.readObject }
-    in.close()
-    obj
+    val in =
+      new ClassLoaderObjectInputStream(system.dynamicAccess.classLoader, new 
UnsynchronizedByteArrayInputStream(bytes))
+    try JavaSerializer.currentSystem.withValue(system) { in.readObject }
+    finally in.close()
   }
 }
 
diff --git a/actor/src/main/scala/org/apache/pekko/util/ByteString.scala 
b/actor/src/main/scala/org/apache/pekko/util/ByteString.scala
index 83bd21418f..dfcce42376 100644
--- a/actor/src/main/scala/org/apache/pekko/util/ByteString.scala
+++ b/actor/src/main/scala/org/apache/pekko/util/ByteString.scala
@@ -13,12 +13,14 @@
 
 package org.apache.pekko.util
 
-import java.io.{ ByteArrayInputStream, InputStream, ObjectInputStream, 
ObjectOutputStream, SequenceInputStream }
+import java.io.{ InputStream, ObjectInputStream, ObjectOutputStream, 
SequenceInputStream }
 import java.lang.{ Iterable => JIterable }
 import java.nio.{ ByteBuffer, ByteOrder }
 import java.nio.charset.{ Charset, StandardCharsets }
 import java.util.Base64
 
+import org.apache.pekko.io.UnsynchronizedByteArrayInputStream
+
 import scala.annotation.{ nowarn, tailrec, varargs }
 import scala.collection.{ immutable, mutable }
 import scala.collection.immutable.{ IndexedSeq, IndexedSeqOps, 
StrictOptimizedSeqOps, VectorBuilder }
@@ -347,7 +349,7 @@ object ByteString {
 
     override def toArrayUnsafe(): Array[Byte] = bytes
 
-    override def asInputStream: InputStream = new ByteArrayInputStream(bytes)
+    override def asInputStream: InputStream = new 
UnsynchronizedByteArrayInputStream(bytes)
   }
 
   /** INTERNAL API: ByteString backed by exactly one array, with start / end 
markers */
@@ -590,7 +592,7 @@ object ByteString {
     }
 
     override def asInputStream: InputStream =
-      new ByteArrayInputStream(bytes, startIndex, length)
+      new UnsynchronizedByteArrayInputStream(bytes, startIndex, length)
   }
 
   private[pekko] object ByteStrings extends Companion {
diff --git 
a/bench-jmh/src/main/scala/org/apache/pekko/util/ByteString_asInputStream_Benchmark.scala
 
b/bench-jmh/src/main/scala/org/apache/pekko/util/ByteString_asInputStream_Benchmark.scala
index 36fb2b9e2d..0cc29106bf 100644
--- 
a/bench-jmh/src/main/scala/org/apache/pekko/util/ByteString_asInputStream_Benchmark.scala
+++ 
b/bench-jmh/src/main/scala/org/apache/pekko/util/ByteString_asInputStream_Benchmark.scala
@@ -17,9 +17,10 @@
 
 package org.apache.pekko.util
 
-import java.io.{ ByteArrayInputStream, InputStream }
+import java.io.InputStream
 import java.util.concurrent.TimeUnit
 
+import org.apache.pekko.io.UnsynchronizedByteArrayInputStream
 import org.openjdk.jmh.annotations._
 import org.openjdk.jmh.infra.Blackhole
 
@@ -67,12 +68,12 @@ class ByteString_asInputStream_Benchmark {
 
   @Benchmark
   def single_bs_bytes_to_input_stream(blackhole: Blackhole): Unit = {
-    blackhole.consume(countBytes(new ByteArrayInputStream(bs.toArray)))
+    blackhole.consume(countBytes(new 
UnsynchronizedByteArrayInputStream(bs.toArray)))
   }
 
   @Benchmark
   def composed_bs_bytes_to_input_stream(blackhole: Blackhole): Unit = {
-    blackhole.consume(countBytes(new ByteArrayInputStream(composed.toArray)))
+    blackhole.consume(countBytes(new 
UnsynchronizedByteArrayInputStream(composed.toArray)))
   }
 
   @Benchmark
diff --git 
a/cluster-metrics/src/main/scala/org/apache/pekko/cluster/metrics/protobuf/MessageSerializer.scala
 
b/cluster-metrics/src/main/scala/org/apache/pekko/cluster/metrics/protobuf/MessageSerializer.scala
index b658576754..434f61aa7a 100644
--- 
a/cluster-metrics/src/main/scala/org/apache/pekko/cluster/metrics/protobuf/MessageSerializer.scala
+++ 
b/cluster-metrics/src/main/scala/org/apache/pekko/cluster/metrics/protobuf/MessageSerializer.scala
@@ -14,8 +14,7 @@
 package org.apache.pekko.cluster.metrics.protobuf
 
 import java.{ lang => jl }
-import java.io.{ ByteArrayInputStream, ByteArrayOutputStream, 
ObjectOutputStream }
-import java.io.NotSerializableException
+import java.io.{ ByteArrayOutputStream, NotSerializableException, 
ObjectOutputStream }
 import java.util.zip.{ GZIPInputStream, GZIPOutputStream }
 
 import scala.annotation.tailrec
@@ -27,6 +26,7 @@ import pekko.actor.{ Address, ExtendedActorSystem }
 import pekko.cluster.metrics._
 import pekko.cluster.metrics.protobuf.msg.{ ClusterMetricsMessages => cm }
 import pekko.dispatch.Dispatchers
+import pekko.io.UnsynchronizedByteArrayInputStream
 import pekko.protobufv3.internal.MessageLite
 import pekko.remote.ByteStringUtils
 import pekko.serialization.{ BaseSerializer, SerializationExtension, 
SerializerWithStringManifest, Serializers }
@@ -78,7 +78,7 @@ class MessageSerializer(val system: ExtendedActorSystem) 
extends SerializerWithS
   }
 
   def decompress(bytes: Array[Byte]): Array[Byte] = {
-    val in = new GZIPInputStream(new ByteArrayInputStream(bytes))
+    val in = new GZIPInputStream(new UnsynchronizedByteArrayInputStream(bytes))
     val out = new ByteArrayOutputStream()
     val buffer = new Array[Byte](BufferSize)
 
@@ -264,7 +264,7 @@ class MessageSerializer(val system: ExtendedActorSystem) 
extends SerializerWithS
         case NumberType.Serialized_VALUE =>
           val in = new NumberInputStream(
             system.dynamicAccess.classLoader,
-            new ByteArrayInputStream(number.getSerialized.toByteArray))
+            new 
UnsynchronizedByteArrayInputStream(number.getSerialized.toByteArray))
           val obj = in.readObject
           in.close()
           obj.asInstanceOf[jl.Number]
diff --git 
a/cluster-sharding/src/main/scala/org/apache/pekko/cluster/sharding/protobuf/ClusterShardingMessageSerializer.scala
 
b/cluster-sharding/src/main/scala/org/apache/pekko/cluster/sharding/protobuf/ClusterShardingMessageSerializer.scala
index cec6c3a9d1..6951a109a8 100644
--- 
a/cluster-sharding/src/main/scala/org/apache/pekko/cluster/sharding/protobuf/ClusterShardingMessageSerializer.scala
+++ 
b/cluster-sharding/src/main/scala/org/apache/pekko/cluster/sharding/protobuf/ClusterShardingMessageSerializer.scala
@@ -13,7 +13,7 @@
 
 package org.apache.pekko.cluster.sharding.protobuf
 
-import java.io.{ ByteArrayInputStream, ByteArrayOutputStream }
+import java.io.ByteArrayOutputStream
 import java.io.NotSerializableException
 import java.util.zip.GZIPInputStream
 import java.util.zip.GZIPOutputStream
@@ -38,6 +38,7 @@ import 
pekko.cluster.sharding.internal.EventSourcedRememberEntitiesShardStore.{
 import 
pekko.cluster.sharding.internal.EventSourcedRememberEntitiesShardStore.{ State 
=> EntityState }
 import pekko.cluster.sharding.protobuf.msg.{ ClusterShardingMessages => sm }
 import pekko.cluster.sharding.protobuf.msg.ClusterShardingMessages
+import pekko.io.UnsynchronizedByteArrayInputStream
 import pekko.protobufv3.internal.MessageLite
 import pekko.serialization.BaseSerializer
 import pekko.serialization.Serialization
@@ -623,7 +624,7 @@ private[pekko] class ClusterShardingMessageSerializer(val 
system: ExtendedActorS
   }
 
   private def decompress(bytes: Array[Byte]): Array[Byte] = {
-    val in = new GZIPInputStream(new ByteArrayInputStream(bytes))
+    val in = new GZIPInputStream(new UnsynchronizedByteArrayInputStream(bytes))
     val out = new ByteArrayOutputStream()
     val buffer = new Array[Byte](BufferSize)
 
diff --git 
a/cluster-tools/src/main/scala/org/apache/pekko/cluster/pubsub/protobuf/DistributedPubSubMessageSerializer.scala
 
b/cluster-tools/src/main/scala/org/apache/pekko/cluster/pubsub/protobuf/DistributedPubSubMessageSerializer.scala
index a17b3a7f37..b4a3a1a14e 100644
--- 
a/cluster-tools/src/main/scala/org/apache/pekko/cluster/pubsub/protobuf/DistributedPubSubMessageSerializer.scala
+++ 
b/cluster-tools/src/main/scala/org/apache/pekko/cluster/pubsub/protobuf/DistributedPubSubMessageSerializer.scala
@@ -13,7 +13,7 @@
 
 package org.apache.pekko.cluster.pubsub.protobuf
 
-import java.io.{ ByteArrayInputStream, ByteArrayOutputStream }
+import java.io.ByteArrayOutputStream
 import java.io.NotSerializableException
 import java.util.zip.GZIPInputStream
 import java.util.zip.GZIPOutputStream
@@ -28,6 +28,7 @@ import pekko.actor.ActorRef
 import pekko.cluster.pubsub.DistributedPubSubMediator._
 import pekko.cluster.pubsub.DistributedPubSubMediator.Internal._
 import pekko.cluster.pubsub.protobuf.msg.{ DistributedPubSubMessages => dm }
+import pekko.io.UnsynchronizedByteArrayInputStream
 import pekko.protobufv3.internal.{ ByteString, MessageLite }
 import pekko.remote.ByteStringUtils
 import pekko.serialization._
@@ -97,7 +98,7 @@ private[pekko] class DistributedPubSubMessageSerializer(val 
system: ExtendedActo
   }
 
   private def decompress(bytes: Array[Byte]): Array[Byte] = {
-    val in = new GZIPInputStream(new ByteArrayInputStream(bytes))
+    val in = new GZIPInputStream(new UnsynchronizedByteArrayInputStream(bytes))
     val out = new ByteArrayOutputStream()
     val buffer = new Array[Byte](BufferSize)
 
diff --git 
a/cluster/src/main/scala/org/apache/pekko/cluster/protobuf/ClusterMessageSerializer.scala
 
b/cluster/src/main/scala/org/apache/pekko/cluster/protobuf/ClusterMessageSerializer.scala
index 299211886a..13cf1a09e0 100644
--- 
a/cluster/src/main/scala/org/apache/pekko/cluster/protobuf/ClusterMessageSerializer.scala
+++ 
b/cluster/src/main/scala/org/apache/pekko/cluster/protobuf/ClusterMessageSerializer.scala
@@ -13,7 +13,7 @@
 
 package org.apache.pekko.cluster.protobuf
 
-import java.io.{ ByteArrayInputStream, ByteArrayOutputStream }
+import java.io.ByteArrayOutputStream
 import java.util.zip.{ GZIPInputStream, GZIPOutputStream }
 
 import scala.annotation.tailrec
@@ -28,6 +28,7 @@ import pekko.cluster._
 import pekko.cluster.InternalClusterAction._
 import pekko.cluster.protobuf.msg.{ ClusterMessages => cm }
 import pekko.cluster.routing.{ ClusterRouterPool, ClusterRouterPoolSettings }
+import pekko.io.UnsynchronizedByteArrayInputStream
 import pekko.protobufv3.internal.MessageLite
 import pekko.remote.ByteStringUtils
 import pekko.routing.Pool
@@ -165,7 +166,7 @@ final class ClusterMessageSerializer(val system: 
ExtendedActorSystem)
   }
 
   def decompress(bytes: Array[Byte]): Array[Byte] = {
-    val in = new GZIPInputStream(new ByteArrayInputStream(bytes))
+    val in = new GZIPInputStream(new UnsynchronizedByteArrayInputStream(bytes))
     val out = new ByteArrayOutputStream()
     val buffer = new Array[Byte](BufferSize)
 
diff --git 
a/distributed-data/src/main/scala/org/apache/pekko/cluster/ddata/protobuf/SerializationSupport.scala
 
b/distributed-data/src/main/scala/org/apache/pekko/cluster/ddata/protobuf/SerializationSupport.scala
index f6861cbaf9..c526e0af0a 100644
--- 
a/distributed-data/src/main/scala/org/apache/pekko/cluster/ddata/protobuf/SerializationSupport.scala
+++ 
b/distributed-data/src/main/scala/org/apache/pekko/cluster/ddata/protobuf/SerializationSupport.scala
@@ -13,7 +13,6 @@
 
 package org.apache.pekko.cluster.ddata.protobuf
 
-import java.io.ByteArrayInputStream
 import java.io.ByteArrayOutputStream
 import java.util.zip.GZIPInputStream
 import java.util.zip.GZIPOutputStream
@@ -29,6 +28,7 @@ import pekko.actor.ExtendedActorSystem
 import pekko.cluster.UniqueAddress
 import pekko.cluster.ddata.VersionVector
 import pekko.cluster.ddata.protobuf.msg.{ ReplicatorMessages => dm }
+import pekko.io.UnsynchronizedByteArrayInputStream
 import pekko.protobufv3.internal.ByteString
 import pekko.protobufv3.internal.MessageLite
 import pekko.remote.ByteStringUtils
@@ -76,7 +76,7 @@ trait SerializationSupport {
   }
 
   def decompress(bytes: Array[Byte]): Array[Byte] = {
-    val in = new GZIPInputStream(new ByteArrayInputStream(bytes))
+    val in = new GZIPInputStream(new UnsynchronizedByteArrayInputStream(bytes))
     val out = new ByteArrayOutputStream()
     val buffer = new Array[Byte](BufferSize)
 
diff --git a/legal/pekko-actor-jar-license.txt 
b/legal/pekko-actor-jar-license.txt
index deca4cefa1..fe77d08cc3 100644
--- a/legal/pekko-actor-jar-license.txt
+++ b/legal/pekko-actor-jar-license.txt
@@ -303,3 +303,10 @@ For more information, please refer to 
<http://unlicense.org/>
 pekko-actor contains code in `org.apache.pekko.util.FrequencySketch.scala` 
which was based on code from
 Caffeine <https://github.com/ben-manes/caffeine> which was developed under the 
Apache 2.0 license.
 Copyright 2015 Ben Manes. All Rights Reserved.
+
+---------------
+
+pekko-actor contains code from Apache commons-io which was developed under the
+Apache 2.0 license.
+- actor/src/main/java/org/apache/pekko/io/ByteBufferCleaner.java
+- 
actor/src/main/java/org/apache/pekko/io/UnsynchronizedByteArrayInputStream.java
diff --git 
a/persistence/src/main/scala/org/apache/pekko/persistence/serialization/SnapshotSerializer.scala
 
b/persistence/src/main/scala/org/apache/pekko/persistence/serialization/SnapshotSerializer.scala
index 531fb07e63..5270369cf2 100644
--- 
a/persistence/src/main/scala/org/apache/pekko/persistence/serialization/SnapshotSerializer.scala
+++ 
b/persistence/src/main/scala/org/apache/pekko/persistence/serialization/SnapshotSerializer.scala
@@ -17,6 +17,7 @@ import java.io._
 
 import org.apache.pekko
 import pekko.actor._
+import pekko.io.UnsynchronizedByteArrayInputStream
 import pekko.serialization._
 import pekko.util.ByteString.UTF_8
 
@@ -89,7 +90,7 @@ class SnapshotSerializer(val system: ExtendedActorSystem) 
extends BaseSerializer
   }
 
   private def headerFromBinary(bytes: Array[Byte]): (Int, String) = {
-    val in = new ByteArrayInputStream(bytes)
+    val in = new UnsynchronizedByteArrayInputStream(bytes)
     val serializerId = readInt(in)
 
     if ((serializerId & 0xEDAC) == 0xEDAC) // Java Serialization magic value
@@ -163,7 +164,7 @@ class SnapshotSerializer(val system: ExtendedActorSystem) 
extends BaseSerializer
   }
 
   private def snapshotFromBinary(bytes: Array[Byte]): AnyRef = {
-    val headerLength = readInt(new ByteArrayInputStream(bytes))
+    val headerLength = readInt(new UnsynchronizedByteArrayInputStream(bytes))
     val headerBytes = bytes.slice(4, headerLength + 4)
     val snapshotBytes = bytes.drop(headerLength + 4)
 
diff --git 
a/serialization-jackson/src/main/scala/org/apache/pekko/serialization/jackson/JacksonSerializer.scala
 
b/serialization-jackson/src/main/scala/org/apache/pekko/serialization/jackson/JacksonSerializer.scala
index 2cdb7717fb..689f5f89b9 100644
--- 
a/serialization-jackson/src/main/scala/org/apache/pekko/serialization/jackson/JacksonSerializer.scala
+++ 
b/serialization-jackson/src/main/scala/org/apache/pekko/serialization/jackson/JacksonSerializer.scala
@@ -13,7 +13,7 @@
 
 package org.apache.pekko.serialization.jackson
 
-import java.io.{ ByteArrayInputStream, ByteArrayOutputStream, 
NotSerializableException }
+import java.io.{ ByteArrayOutputStream, NotSerializableException }
 import java.nio.ByteBuffer
 import java.util.zip.{ GZIPInputStream, GZIPOutputStream }
 
@@ -30,6 +30,7 @@ import org.apache.pekko
 import pekko.actor.ExtendedActorSystem
 import pekko.annotation.InternalApi
 import pekko.event.{ LogMarker, Logging }
+import pekko.io.UnsynchronizedByteArrayInputStream
 import pekko.serialization.{ BaseSerializer, SerializationExtension, 
SerializerWithStringManifest }
 import pekko.util.Helpers.toRootLowerCase
 import pekko.util.OptionVal
@@ -533,7 +534,7 @@ import pekko.util.OptionVal
 
   def decompress(bytes: Array[Byte]): Array[Byte] = {
     if (isGZipped(bytes)) {
-      val in = new GZIPInputStream(new ByteArrayInputStream(bytes))
+      val in = new GZIPInputStream(new 
UnsynchronizedByteArrayInputStream(bytes))
       val out = new ByteArrayOutputStream()
       val buffer = new Array[Byte](BufferSize)
 
diff --git 
a/stream-tests/src/test/scala/org/apache/pekko/stream/io/compression/CoderSpec.scala
 
b/stream-tests/src/test/scala/org/apache/pekko/stream/io/compression/CoderSpec.scala
index 476550fa4a..bed6fef7d9 100644
--- 
a/stream-tests/src/test/scala/org/apache/pekko/stream/io/compression/CoderSpec.scala
+++ 
b/stream-tests/src/test/scala/org/apache/pekko/stream/io/compression/CoderSpec.scala
@@ -13,7 +13,7 @@
 
 package org.apache.pekko.stream.io.compression
 
-import java.io.{ ByteArrayInputStream, ByteArrayOutputStream, InputStream, 
OutputStream }
+import java.io.{ ByteArrayOutputStream, InputStream, OutputStream }
 import java.util.concurrent.ThreadLocalRandom
 import java.util.zip.DataFormatException
 
@@ -186,7 +186,7 @@ abstract class CoderSpec(codecName: String) extends 
AnyWordSpec with CodecSpecSu
 
   def streamDecode(bytes: ByteString): ByteString = {
     val output = new ByteArrayOutputStream()
-    val input = newDecodedInputStream(new ByteArrayInputStream(bytes.toArray))
+    val input = newDecodedInputStream(bytes.asInputStream)
 
     val buffer = new Array[Byte](500)
     @tailrec def copy(from: InputStream, to: OutputStream): Unit = {
diff --git 
a/testkit/src/main/scala/org/apache/pekko/testkit/TestJavaSerializer.scala 
b/testkit/src/main/scala/org/apache/pekko/testkit/TestJavaSerializer.scala
index 4dac31c04b..4978b21738 100644
--- a/testkit/src/main/scala/org/apache/pekko/testkit/TestJavaSerializer.scala
+++ b/testkit/src/main/scala/org/apache/pekko/testkit/TestJavaSerializer.scala
@@ -13,10 +13,11 @@
 
 package org.apache.pekko.testkit
 
-import java.io.{ ByteArrayInputStream, ByteArrayOutputStream, 
ObjectOutputStream }
+import java.io.{ ByteArrayOutputStream, ObjectOutputStream }
 
 import org.apache.pekko
 import pekko.actor.ExtendedActorSystem
+import pekko.io.UnsynchronizedByteArrayInputStream
 import pekko.serialization.{ BaseSerializer, JavaSerializer }
 import pekko.util.ClassLoaderObjectInputStream
 
@@ -43,9 +44,9 @@ class TestJavaSerializer(val system: ExtendedActorSystem) 
extends BaseSerializer
   }
 
   def fromBinary(bytes: Array[Byte], clazz: Option[Class[_]]): AnyRef = {
-    val in = new 
ClassLoaderObjectInputStream(system.dynamicAccess.classLoader, new 
ByteArrayInputStream(bytes))
-    val obj = JavaSerializer.currentSystem.withValue(system) { in.readObject }
-    in.close()
-    obj
+    val in =
+      new ClassLoaderObjectInputStream(system.dynamicAccess.classLoader, new 
UnsynchronizedByteArrayInputStream(bytes))
+    try JavaSerializer.currentSystem.withValue(system) { in.readObject }
+    finally in.close()
   }
 }


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to