This is an automated email from the ASF dual-hosted git repository.
fanningpj pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/pekko.git
The following commit(s) were added to refs/heads/main by this push:
new 30a0046944 use UnsynchronizedByteArrayInputStream (#2300)
30a0046944 is described below
commit 30a00469440b89072c77bdb6b441222e727c8ba8
Author: PJ Fanning <[email protected]>
AuthorDate: Tue Oct 7 16:36:15 2025 +0100
use UnsynchronizedByteArrayInputStream (#2300)
* use UnsynchronizedByteArrayInputStream
* javafmt
* add test
* Update UnsynchronizedByteArrayInputStreamSpec.scala
* Update UnsynchronizedByteArrayInputStreamSpec.scala
* Update UnsynchronizedByteArrayInputStream.java
---
LICENSE | 6 +-
.../UnsynchronizedByteArrayInputStreamSpec.scala | 53 ++++++
.../pekko/util/ByteStringInputStreamSpec.scala | 4 +-
.../org/apache/pekko/util/ByteStringSpec.scala | 9 +-
.../io/UnsynchronizedByteArrayInputStream.java | 180 +++++++++++++++++++++
.../apache/pekko/serialization/Serializer.scala | 10 +-
.../scala/org/apache/pekko/util/ByteString.scala | 8 +-
.../util/ByteString_asInputStream_Benchmark.scala | 7 +-
.../metrics/protobuf/MessageSerializer.scala | 8 +-
.../ClusterShardingMessageSerializer.scala | 5 +-
.../DistributedPubSubMessageSerializer.scala | 5 +-
.../protobuf/ClusterMessageSerializer.scala | 5 +-
.../ddata/protobuf/SerializationSupport.scala | 4 +-
legal/pekko-actor-jar-license.txt | 7 +
.../serialization/SnapshotSerializer.scala | 5 +-
.../serialization/jackson/JacksonSerializer.scala | 5 +-
.../pekko/stream/io/compression/CoderSpec.scala | 4 +-
.../apache/pekko/testkit/TestJavaSerializer.scala | 11 +-
18 files changed, 294 insertions(+), 42 deletions(-)
diff --git a/LICENSE b/LICENSE
index 8622a26ba3..f4b62dab0e 100644
--- a/LICENSE
+++ b/LICENSE
@@ -306,8 +306,10 @@ Copyright 2015 Ben Manes. All Rights Reserved.
---------------
-pekko-actor contains code in `org.apache.pekko.io.ByteBufferCleaner` which was
based on code
-from Apache commons-io which was developed under the Apache 2.0 license.
+pekko-actor contains code from Apache commons-io which was developed under the
+Apache 2.0 license.
+- actor/src/main/java/org/apache/pekko/io/ByteBufferCleaner.java
+-
actor/src/main/java/org/apache/pekko/io/UnsynchronizedByteArrayInputStream.java
---------------
diff --git
a/actor-tests/src/test/scala/org/apache/pekko/io/UnsynchronizedByteArrayInputStreamSpec.scala
b/actor-tests/src/test/scala/org/apache/pekko/io/UnsynchronizedByteArrayInputStreamSpec.scala
new file mode 100644
index 0000000000..0fdff98c5a
--- /dev/null
+++
b/actor-tests/src/test/scala/org/apache/pekko/io/UnsynchronizedByteArrayInputStreamSpec.scala
@@ -0,0 +1,53 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.pekko.util
+
+import java.nio.charset.StandardCharsets
+
+import org.apache.pekko
+import pekko.io.UnsynchronizedByteArrayInputStream
+
+import org.scalatest.matchers.should.Matchers
+import org.scalatest.wordspec.AnyWordSpec
+
+class UnsynchronizedByteArrayInputStreamSpec extends AnyWordSpec with Matchers
{
+ "UnsynchronizedByteArrayInputStream" must {
+ "support mark and reset" in {
+ val stream = new
UnsynchronizedByteArrayInputStream("abc".getBytes(StandardCharsets.UTF_8))
+ stream.markSupported() should ===(true)
+ stream.read() should ===('a')
+ stream.mark(1) // the parameter value (a readAheadLimit) is ignored as
it is in ByteArrayInputStream too
+ stream.read() should ===('b')
+ stream.reset()
+ stream.read() should ===('b')
+ stream.close()
+ }
+ "support skip" in {
+ val stream = new
UnsynchronizedByteArrayInputStream("abc".getBytes(StandardCharsets.UTF_8))
+ stream.skip(1) should ===(1)
+ stream.read() should ===('b')
+ stream.close()
+ }
+ "support skip with large value" in {
+ val stream = new
UnsynchronizedByteArrayInputStream("abc".getBytes(StandardCharsets.UTF_8))
+ stream.skip(50) should ===(3) // only 3 bytes to skip
+ stream.read() should ===(-1)
+ stream.close()
+ }
+ }
+}
diff --git
a/actor-tests/src/test/scala/org/apache/pekko/util/ByteStringInputStreamSpec.scala
b/actor-tests/src/test/scala/org/apache/pekko/util/ByteStringInputStreamSpec.scala
index b49f4de017..3bab079071 100644
---
a/actor-tests/src/test/scala/org/apache/pekko/util/ByteStringInputStreamSpec.scala
+++
b/actor-tests/src/test/scala/org/apache/pekko/util/ByteStringInputStreamSpec.scala
@@ -30,7 +30,7 @@ class ByteStringInputStreamSpec extends AnyWordSpec with
Matchers {
"ByteString1" must {
"support asInputStream" in {
ByteString1.empty.asInputStream.read() shouldEqual -1
- ByteString1.empty.asInputStream.read(Array.empty) shouldEqual -1
+ ByteString1.empty.asInputStream.read(Array(1)) shouldEqual -1
toUtf8String(ByteString1.empty.asInputStream) shouldEqual ""
toUtf8String(ByteString1.fromString("abc").asInputStream) shouldEqual
"abc"
}
@@ -47,7 +47,7 @@ class ByteStringInputStreamSpec extends AnyWordSpec with
Matchers {
"support asInputStream" in {
val empty = ByteStrings(ByteString1.fromString(""),
ByteString1.fromString(""))
empty.asInputStream.read() shouldEqual -1
- empty.asInputStream.read(Array.empty) shouldEqual -1
+ empty.asInputStream.read(Array(1)) shouldEqual -1
toUtf8String(empty.asInputStream) shouldEqual ""
val abc = ByteStrings(ByteString1.fromString("a"),
ByteString1.fromString("bc"))
toUtf8String(abc.asInputStream) shouldEqual "abc"
diff --git
a/actor-tests/src/test/scala/org/apache/pekko/util/ByteStringSpec.scala
b/actor-tests/src/test/scala/org/apache/pekko/util/ByteStringSpec.scala
index 0aa077ffa1..54044cf98d 100644
--- a/actor-tests/src/test/scala/org/apache/pekko/util/ByteStringSpec.scala
+++ b/actor-tests/src/test/scala/org/apache/pekko/util/ByteStringSpec.scala
@@ -13,7 +13,7 @@
package org.apache.pekko.util
-import java.io.{ ByteArrayInputStream, ByteArrayOutputStream,
ObjectInputStream, ObjectOutputStream }
+import java.io.{ ByteArrayOutputStream, ObjectInputStream, ObjectOutputStream }
import java.lang.Double.doubleToRawLongBits
import java.lang.Float.floatToRawIntBits
import java.nio.{ ByteBuffer, ByteOrder }
@@ -29,6 +29,7 @@ import org.scalacheck.Arbitrary.arbitrary
import org.scalatestplus.scalacheck.Checkers
import org.apache.pekko
+import pekko.io.UnsynchronizedByteArrayInputStream
import pekko.util.ByteString.{ ByteString1, ByteString1C, ByteStrings }
import org.scalatest.matchers.should.Matchers
@@ -96,9 +97,9 @@ class ByteStringSpec extends AnyWordSpec with Matchers with
Checkers {
}
def deserialize(bytes: Array[Byte]): AnyRef = {
- val is = new ObjectInputStream(new ByteArrayInputStream(bytes))
-
- is.readObject
+ val is = new ObjectInputStream(new
UnsynchronizedByteArrayInputStream(bytes))
+ try is.readObject
+ finally is.close()
}
def testSer(obj: AnyRef) = {
diff --git
a/actor/src/main/java/org/apache/pekko/io/UnsynchronizedByteArrayInputStream.java
b/actor/src/main/java/org/apache/pekko/io/UnsynchronizedByteArrayInputStream.java
new file mode 100644
index 0000000000..3ac6b0cc58
--- /dev/null
+++
b/actor/src/main/java/org/apache/pekko/io/UnsynchronizedByteArrayInputStream.java
@@ -0,0 +1,180 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.pekko.io;
+
+import java.io.ByteArrayInputStream;
+import java.io.InputStream;
+import java.util.Objects;
+import org.apache.pekko.annotation.InternalApi;
+
+/**
+ * Internal API: An unsynchronized byte array input stream. This class does
not copy the provided
+ * byte array, and it is not thread safe.
+ *
+ * @see ByteArrayInputStream
+ * @since 2.0.0
+ */
+// @NotThreadSafe
+// copied from
+//
https://github.com/apache/commons-io/blob/26e5aa9661a72bfd9697fb384ca72f58e5d672e9/src/main/java/org/apache/commons/io/input/UnsynchronizedByteArrayInputStream.java
+@InternalApi
+public class UnsynchronizedByteArrayInputStream extends InputStream {
+
+ /** The end of stream marker. */
+ private static final int END_OF_STREAM = -1;
+
+ private static int minPosLen(final byte[] data, final int defaultValue) {
+ requireNonNegative(defaultValue, "defaultValue");
+ return Math.min(defaultValue, data.length > 0 ? data.length :
defaultValue);
+ }
+
+ private static int requireNonNegative(final int value, final String name) {
+ if (value < 0) {
+ throw new IllegalArgumentException(name + " cannot be negative");
+ }
+ return value;
+ }
+
+ private static void checkFromIndexSize(final byte[] array, final int off,
final int len) {
+ final int arrayLength = Objects.requireNonNull(array, "byte array").length;
+ if ((off | len | arrayLength) < 0 || arrayLength - len < off) {
+ throw new IndexOutOfBoundsException(
+ String.format("Range [%s, %<s + %s) out of bounds for length %s",
off, len, arrayLength));
+ }
+ }
+
+ /** The underlying data buffer. */
+ private final byte[] data;
+
+ /**
+ * End Of Data.
+ *
+ * <p>Similar to data.length, which is the last readable offset + 1.
+ */
+ private final int eod;
+
+ /** Current offset in the data buffer. */
+ private int offset;
+
+ /** The current mark (if any). */
+ private int markedOffset;
+
+ /**
+ * Constructs a new byte array input stream.
+ *
+ * @param data the buffer
+ */
+ public UnsynchronizedByteArrayInputStream(final byte[] data) {
+ this.data = Objects.requireNonNull(data, "data");
+ this.offset = 0;
+ this.markedOffset = 0;
+ this.eod = data.length;
+ }
+
+ /**
+ * Constructs a new byte array input stream.
+ *
+ * @param data the buffer
+ * @param offset the offset into the buffer
+ * @param length the length of the buffer
+ * @throws IllegalArgumentException if the offset or length less than zero
+ */
+ public UnsynchronizedByteArrayInputStream(final byte[] data, final int
offset, final int length) {
+ requireNonNegative(offset, "offset");
+ requireNonNegative(length, "length");
+ this.data = Objects.requireNonNull(data, "data");
+ this.eod = Math.min(minPosLen(data, offset) + length, data.length);
+ this.offset = minPosLen(data, offset);
+ this.markedOffset = minPosLen(data, offset);
+ }
+
+ @Override
+ public int available() {
+ return offset < eod ? eod - offset : 0;
+ }
+
+ @SuppressWarnings("sync-override")
+ @Override
+ public void mark(final int readLimit) {
+ this.markedOffset = this.offset;
+ }
+
+ @Override
+ public boolean markSupported() {
+ return true;
+ }
+
+ @Override
+ public int read() {
+ return offset < eod ? data[offset++] & 0xff : END_OF_STREAM;
+ }
+
+ @Override
+ public int read(final byte[] dest) {
+ Objects.requireNonNull(dest, "dest");
+ return readLocal(dest, 0, dest.length);
+ }
+
+ @Override
+ public int read(final byte[] dest, final int off, final int len) {
+ checkFromIndexSize(dest, off, len);
+ return readLocal(dest, off, len);
+ }
+
+ private final int readLocal(final byte[] dest, final int off, final int len)
{
+ if (len == 0) {
+ return 0;
+ }
+
+ if (offset >= eod) {
+ return END_OF_STREAM;
+ }
+
+ int actualLen = eod - offset;
+ if (len < actualLen) {
+ actualLen = len;
+ }
+ if (actualLen <= 0) {
+ return 0;
+ }
+ System.arraycopy(data, offset, dest, off, actualLen);
+ offset += actualLen;
+ return actualLen;
+ }
+
+ @SuppressWarnings("sync-override")
+ @Override
+ public void reset() {
+ this.offset = this.markedOffset;
+ }
+
+ @Override
+ public long skip(final long n) {
+ if (n < 0) {
+ throw new IllegalArgumentException("Skipping backward is not supported");
+ }
+
+ long actualSkip = eod - offset;
+ if (n < actualSkip) {
+ actualSkip = n;
+ }
+
+ offset = Math.addExact(offset, Math.toIntExact(n));
+ return actualSkip;
+ }
+}
diff --git
a/actor/src/main/scala/org/apache/pekko/serialization/Serializer.scala
b/actor/src/main/scala/org/apache/pekko/serialization/Serializer.scala
index 95627fb831..1e3a06f5f6 100644
--- a/actor/src/main/scala/org/apache/pekko/serialization/Serializer.scala
+++ b/actor/src/main/scala/org/apache/pekko/serialization/Serializer.scala
@@ -13,7 +13,6 @@
package org.apache.pekko.serialization
-import java.io.ByteArrayInputStream
import java.io.ByteArrayOutputStream
import java.io.NotSerializableException
import java.io.ObjectOutputStream
@@ -28,6 +27,7 @@ import pekko.actor.ExtendedActorSystem
import pekko.annotation.InternalApi
import pekko.event.LogMarker
import pekko.event.Logging
+import pekko.io.UnsynchronizedByteArrayInputStream
import pekko.util.ClassLoaderObjectInputStream
/**
@@ -354,10 +354,10 @@ class JavaSerializer(val system: ExtendedActorSystem)
extends BaseSerializer {
@throws(classOf[NotSerializableException])
def fromBinary(bytes: Array[Byte], clazz: Option[Class[_]]): AnyRef = {
- val in = new
ClassLoaderObjectInputStream(system.dynamicAccess.classLoader, new
ByteArrayInputStream(bytes))
- val obj = JavaSerializer.currentSystem.withValue(system) { in.readObject }
- in.close()
- obj
+ val in =
+ new ClassLoaderObjectInputStream(system.dynamicAccess.classLoader, new
UnsynchronizedByteArrayInputStream(bytes))
+ try JavaSerializer.currentSystem.withValue(system) { in.readObject }
+ finally in.close()
}
}
diff --git a/actor/src/main/scala/org/apache/pekko/util/ByteString.scala
b/actor/src/main/scala/org/apache/pekko/util/ByteString.scala
index 83bd21418f..dfcce42376 100644
--- a/actor/src/main/scala/org/apache/pekko/util/ByteString.scala
+++ b/actor/src/main/scala/org/apache/pekko/util/ByteString.scala
@@ -13,12 +13,14 @@
package org.apache.pekko.util
-import java.io.{ ByteArrayInputStream, InputStream, ObjectInputStream,
ObjectOutputStream, SequenceInputStream }
+import java.io.{ InputStream, ObjectInputStream, ObjectOutputStream,
SequenceInputStream }
import java.lang.{ Iterable => JIterable }
import java.nio.{ ByteBuffer, ByteOrder }
import java.nio.charset.{ Charset, StandardCharsets }
import java.util.Base64
+import org.apache.pekko.io.UnsynchronizedByteArrayInputStream
+
import scala.annotation.{ nowarn, tailrec, varargs }
import scala.collection.{ immutable, mutable }
import scala.collection.immutable.{ IndexedSeq, IndexedSeqOps,
StrictOptimizedSeqOps, VectorBuilder }
@@ -347,7 +349,7 @@ object ByteString {
override def toArrayUnsafe(): Array[Byte] = bytes
- override def asInputStream: InputStream = new ByteArrayInputStream(bytes)
+ override def asInputStream: InputStream = new
UnsynchronizedByteArrayInputStream(bytes)
}
/** INTERNAL API: ByteString backed by exactly one array, with start / end
markers */
@@ -590,7 +592,7 @@ object ByteString {
}
override def asInputStream: InputStream =
- new ByteArrayInputStream(bytes, startIndex, length)
+ new UnsynchronizedByteArrayInputStream(bytes, startIndex, length)
}
private[pekko] object ByteStrings extends Companion {
diff --git
a/bench-jmh/src/main/scala/org/apache/pekko/util/ByteString_asInputStream_Benchmark.scala
b/bench-jmh/src/main/scala/org/apache/pekko/util/ByteString_asInputStream_Benchmark.scala
index 36fb2b9e2d..0cc29106bf 100644
---
a/bench-jmh/src/main/scala/org/apache/pekko/util/ByteString_asInputStream_Benchmark.scala
+++
b/bench-jmh/src/main/scala/org/apache/pekko/util/ByteString_asInputStream_Benchmark.scala
@@ -17,9 +17,10 @@
package org.apache.pekko.util
-import java.io.{ ByteArrayInputStream, InputStream }
+import java.io.InputStream
import java.util.concurrent.TimeUnit
+import org.apache.pekko.io.UnsynchronizedByteArrayInputStream
import org.openjdk.jmh.annotations._
import org.openjdk.jmh.infra.Blackhole
@@ -67,12 +68,12 @@ class ByteString_asInputStream_Benchmark {
@Benchmark
def single_bs_bytes_to_input_stream(blackhole: Blackhole): Unit = {
- blackhole.consume(countBytes(new ByteArrayInputStream(bs.toArray)))
+ blackhole.consume(countBytes(new
UnsynchronizedByteArrayInputStream(bs.toArray)))
}
@Benchmark
def composed_bs_bytes_to_input_stream(blackhole: Blackhole): Unit = {
- blackhole.consume(countBytes(new ByteArrayInputStream(composed.toArray)))
+ blackhole.consume(countBytes(new
UnsynchronizedByteArrayInputStream(composed.toArray)))
}
@Benchmark
diff --git
a/cluster-metrics/src/main/scala/org/apache/pekko/cluster/metrics/protobuf/MessageSerializer.scala
b/cluster-metrics/src/main/scala/org/apache/pekko/cluster/metrics/protobuf/MessageSerializer.scala
index b658576754..434f61aa7a 100644
---
a/cluster-metrics/src/main/scala/org/apache/pekko/cluster/metrics/protobuf/MessageSerializer.scala
+++
b/cluster-metrics/src/main/scala/org/apache/pekko/cluster/metrics/protobuf/MessageSerializer.scala
@@ -14,8 +14,7 @@
package org.apache.pekko.cluster.metrics.protobuf
import java.{ lang => jl }
-import java.io.{ ByteArrayInputStream, ByteArrayOutputStream,
ObjectOutputStream }
-import java.io.NotSerializableException
+import java.io.{ ByteArrayOutputStream, NotSerializableException,
ObjectOutputStream }
import java.util.zip.{ GZIPInputStream, GZIPOutputStream }
import scala.annotation.tailrec
@@ -27,6 +26,7 @@ import pekko.actor.{ Address, ExtendedActorSystem }
import pekko.cluster.metrics._
import pekko.cluster.metrics.protobuf.msg.{ ClusterMetricsMessages => cm }
import pekko.dispatch.Dispatchers
+import pekko.io.UnsynchronizedByteArrayInputStream
import pekko.protobufv3.internal.MessageLite
import pekko.remote.ByteStringUtils
import pekko.serialization.{ BaseSerializer, SerializationExtension,
SerializerWithStringManifest, Serializers }
@@ -78,7 +78,7 @@ class MessageSerializer(val system: ExtendedActorSystem)
extends SerializerWithS
}
def decompress(bytes: Array[Byte]): Array[Byte] = {
- val in = new GZIPInputStream(new ByteArrayInputStream(bytes))
+ val in = new GZIPInputStream(new UnsynchronizedByteArrayInputStream(bytes))
val out = new ByteArrayOutputStream()
val buffer = new Array[Byte](BufferSize)
@@ -264,7 +264,7 @@ class MessageSerializer(val system: ExtendedActorSystem)
extends SerializerWithS
case NumberType.Serialized_VALUE =>
val in = new NumberInputStream(
system.dynamicAccess.classLoader,
- new ByteArrayInputStream(number.getSerialized.toByteArray))
+ new
UnsynchronizedByteArrayInputStream(number.getSerialized.toByteArray))
val obj = in.readObject
in.close()
obj.asInstanceOf[jl.Number]
diff --git
a/cluster-sharding/src/main/scala/org/apache/pekko/cluster/sharding/protobuf/ClusterShardingMessageSerializer.scala
b/cluster-sharding/src/main/scala/org/apache/pekko/cluster/sharding/protobuf/ClusterShardingMessageSerializer.scala
index cec6c3a9d1..6951a109a8 100644
---
a/cluster-sharding/src/main/scala/org/apache/pekko/cluster/sharding/protobuf/ClusterShardingMessageSerializer.scala
+++
b/cluster-sharding/src/main/scala/org/apache/pekko/cluster/sharding/protobuf/ClusterShardingMessageSerializer.scala
@@ -13,7 +13,7 @@
package org.apache.pekko.cluster.sharding.protobuf
-import java.io.{ ByteArrayInputStream, ByteArrayOutputStream }
+import java.io.ByteArrayOutputStream
import java.io.NotSerializableException
import java.util.zip.GZIPInputStream
import java.util.zip.GZIPOutputStream
@@ -38,6 +38,7 @@ import
pekko.cluster.sharding.internal.EventSourcedRememberEntitiesShardStore.{
import
pekko.cluster.sharding.internal.EventSourcedRememberEntitiesShardStore.{ State
=> EntityState }
import pekko.cluster.sharding.protobuf.msg.{ ClusterShardingMessages => sm }
import pekko.cluster.sharding.protobuf.msg.ClusterShardingMessages
+import pekko.io.UnsynchronizedByteArrayInputStream
import pekko.protobufv3.internal.MessageLite
import pekko.serialization.BaseSerializer
import pekko.serialization.Serialization
@@ -623,7 +624,7 @@ private[pekko] class ClusterShardingMessageSerializer(val
system: ExtendedActorS
}
private def decompress(bytes: Array[Byte]): Array[Byte] = {
- val in = new GZIPInputStream(new ByteArrayInputStream(bytes))
+ val in = new GZIPInputStream(new UnsynchronizedByteArrayInputStream(bytes))
val out = new ByteArrayOutputStream()
val buffer = new Array[Byte](BufferSize)
diff --git
a/cluster-tools/src/main/scala/org/apache/pekko/cluster/pubsub/protobuf/DistributedPubSubMessageSerializer.scala
b/cluster-tools/src/main/scala/org/apache/pekko/cluster/pubsub/protobuf/DistributedPubSubMessageSerializer.scala
index a17b3a7f37..b4a3a1a14e 100644
---
a/cluster-tools/src/main/scala/org/apache/pekko/cluster/pubsub/protobuf/DistributedPubSubMessageSerializer.scala
+++
b/cluster-tools/src/main/scala/org/apache/pekko/cluster/pubsub/protobuf/DistributedPubSubMessageSerializer.scala
@@ -13,7 +13,7 @@
package org.apache.pekko.cluster.pubsub.protobuf
-import java.io.{ ByteArrayInputStream, ByteArrayOutputStream }
+import java.io.ByteArrayOutputStream
import java.io.NotSerializableException
import java.util.zip.GZIPInputStream
import java.util.zip.GZIPOutputStream
@@ -28,6 +28,7 @@ import pekko.actor.ActorRef
import pekko.cluster.pubsub.DistributedPubSubMediator._
import pekko.cluster.pubsub.DistributedPubSubMediator.Internal._
import pekko.cluster.pubsub.protobuf.msg.{ DistributedPubSubMessages => dm }
+import pekko.io.UnsynchronizedByteArrayInputStream
import pekko.protobufv3.internal.{ ByteString, MessageLite }
import pekko.remote.ByteStringUtils
import pekko.serialization._
@@ -97,7 +98,7 @@ private[pekko] class DistributedPubSubMessageSerializer(val
system: ExtendedActo
}
private def decompress(bytes: Array[Byte]): Array[Byte] = {
- val in = new GZIPInputStream(new ByteArrayInputStream(bytes))
+ val in = new GZIPInputStream(new UnsynchronizedByteArrayInputStream(bytes))
val out = new ByteArrayOutputStream()
val buffer = new Array[Byte](BufferSize)
diff --git
a/cluster/src/main/scala/org/apache/pekko/cluster/protobuf/ClusterMessageSerializer.scala
b/cluster/src/main/scala/org/apache/pekko/cluster/protobuf/ClusterMessageSerializer.scala
index 299211886a..13cf1a09e0 100644
---
a/cluster/src/main/scala/org/apache/pekko/cluster/protobuf/ClusterMessageSerializer.scala
+++
b/cluster/src/main/scala/org/apache/pekko/cluster/protobuf/ClusterMessageSerializer.scala
@@ -13,7 +13,7 @@
package org.apache.pekko.cluster.protobuf
-import java.io.{ ByteArrayInputStream, ByteArrayOutputStream }
+import java.io.ByteArrayOutputStream
import java.util.zip.{ GZIPInputStream, GZIPOutputStream }
import scala.annotation.tailrec
@@ -28,6 +28,7 @@ import pekko.cluster._
import pekko.cluster.InternalClusterAction._
import pekko.cluster.protobuf.msg.{ ClusterMessages => cm }
import pekko.cluster.routing.{ ClusterRouterPool, ClusterRouterPoolSettings }
+import pekko.io.UnsynchronizedByteArrayInputStream
import pekko.protobufv3.internal.MessageLite
import pekko.remote.ByteStringUtils
import pekko.routing.Pool
@@ -165,7 +166,7 @@ final class ClusterMessageSerializer(val system:
ExtendedActorSystem)
}
def decompress(bytes: Array[Byte]): Array[Byte] = {
- val in = new GZIPInputStream(new ByteArrayInputStream(bytes))
+ val in = new GZIPInputStream(new UnsynchronizedByteArrayInputStream(bytes))
val out = new ByteArrayOutputStream()
val buffer = new Array[Byte](BufferSize)
diff --git
a/distributed-data/src/main/scala/org/apache/pekko/cluster/ddata/protobuf/SerializationSupport.scala
b/distributed-data/src/main/scala/org/apache/pekko/cluster/ddata/protobuf/SerializationSupport.scala
index f6861cbaf9..c526e0af0a 100644
---
a/distributed-data/src/main/scala/org/apache/pekko/cluster/ddata/protobuf/SerializationSupport.scala
+++
b/distributed-data/src/main/scala/org/apache/pekko/cluster/ddata/protobuf/SerializationSupport.scala
@@ -13,7 +13,6 @@
package org.apache.pekko.cluster.ddata.protobuf
-import java.io.ByteArrayInputStream
import java.io.ByteArrayOutputStream
import java.util.zip.GZIPInputStream
import java.util.zip.GZIPOutputStream
@@ -29,6 +28,7 @@ import pekko.actor.ExtendedActorSystem
import pekko.cluster.UniqueAddress
import pekko.cluster.ddata.VersionVector
import pekko.cluster.ddata.protobuf.msg.{ ReplicatorMessages => dm }
+import pekko.io.UnsynchronizedByteArrayInputStream
import pekko.protobufv3.internal.ByteString
import pekko.protobufv3.internal.MessageLite
import pekko.remote.ByteStringUtils
@@ -76,7 +76,7 @@ trait SerializationSupport {
}
def decompress(bytes: Array[Byte]): Array[Byte] = {
- val in = new GZIPInputStream(new ByteArrayInputStream(bytes))
+ val in = new GZIPInputStream(new UnsynchronizedByteArrayInputStream(bytes))
val out = new ByteArrayOutputStream()
val buffer = new Array[Byte](BufferSize)
diff --git a/legal/pekko-actor-jar-license.txt
b/legal/pekko-actor-jar-license.txt
index deca4cefa1..fe77d08cc3 100644
--- a/legal/pekko-actor-jar-license.txt
+++ b/legal/pekko-actor-jar-license.txt
@@ -303,3 +303,10 @@ For more information, please refer to
<http://unlicense.org/>
pekko-actor contains code in `org.apache.pekko.util.FrequencySketch.scala`
which was based on code from
Caffeine <https://github.com/ben-manes/caffeine> which was developed under the
Apache 2.0 license.
Copyright 2015 Ben Manes. All Rights Reserved.
+
+---------------
+
+pekko-actor contains code from Apache commons-io which was developed under the
+Apache 2.0 license.
+- actor/src/main/java/org/apache/pekko/io/ByteBufferCleaner.java
+-
actor/src/main/java/org/apache/pekko/io/UnsynchronizedByteArrayInputStream.java
diff --git
a/persistence/src/main/scala/org/apache/pekko/persistence/serialization/SnapshotSerializer.scala
b/persistence/src/main/scala/org/apache/pekko/persistence/serialization/SnapshotSerializer.scala
index 531fb07e63..5270369cf2 100644
---
a/persistence/src/main/scala/org/apache/pekko/persistence/serialization/SnapshotSerializer.scala
+++
b/persistence/src/main/scala/org/apache/pekko/persistence/serialization/SnapshotSerializer.scala
@@ -17,6 +17,7 @@ import java.io._
import org.apache.pekko
import pekko.actor._
+import pekko.io.UnsynchronizedByteArrayInputStream
import pekko.serialization._
import pekko.util.ByteString.UTF_8
@@ -89,7 +90,7 @@ class SnapshotSerializer(val system: ExtendedActorSystem)
extends BaseSerializer
}
private def headerFromBinary(bytes: Array[Byte]): (Int, String) = {
- val in = new ByteArrayInputStream(bytes)
+ val in = new UnsynchronizedByteArrayInputStream(bytes)
val serializerId = readInt(in)
if ((serializerId & 0xEDAC) == 0xEDAC) // Java Serialization magic value
@@ -163,7 +164,7 @@ class SnapshotSerializer(val system: ExtendedActorSystem)
extends BaseSerializer
}
private def snapshotFromBinary(bytes: Array[Byte]): AnyRef = {
- val headerLength = readInt(new ByteArrayInputStream(bytes))
+ val headerLength = readInt(new UnsynchronizedByteArrayInputStream(bytes))
val headerBytes = bytes.slice(4, headerLength + 4)
val snapshotBytes = bytes.drop(headerLength + 4)
diff --git
a/serialization-jackson/src/main/scala/org/apache/pekko/serialization/jackson/JacksonSerializer.scala
b/serialization-jackson/src/main/scala/org/apache/pekko/serialization/jackson/JacksonSerializer.scala
index 2cdb7717fb..689f5f89b9 100644
---
a/serialization-jackson/src/main/scala/org/apache/pekko/serialization/jackson/JacksonSerializer.scala
+++
b/serialization-jackson/src/main/scala/org/apache/pekko/serialization/jackson/JacksonSerializer.scala
@@ -13,7 +13,7 @@
package org.apache.pekko.serialization.jackson
-import java.io.{ ByteArrayInputStream, ByteArrayOutputStream,
NotSerializableException }
+import java.io.{ ByteArrayOutputStream, NotSerializableException }
import java.nio.ByteBuffer
import java.util.zip.{ GZIPInputStream, GZIPOutputStream }
@@ -30,6 +30,7 @@ import org.apache.pekko
import pekko.actor.ExtendedActorSystem
import pekko.annotation.InternalApi
import pekko.event.{ LogMarker, Logging }
+import pekko.io.UnsynchronizedByteArrayInputStream
import pekko.serialization.{ BaseSerializer, SerializationExtension,
SerializerWithStringManifest }
import pekko.util.Helpers.toRootLowerCase
import pekko.util.OptionVal
@@ -533,7 +534,7 @@ import pekko.util.OptionVal
def decompress(bytes: Array[Byte]): Array[Byte] = {
if (isGZipped(bytes)) {
- val in = new GZIPInputStream(new ByteArrayInputStream(bytes))
+ val in = new GZIPInputStream(new
UnsynchronizedByteArrayInputStream(bytes))
val out = new ByteArrayOutputStream()
val buffer = new Array[Byte](BufferSize)
diff --git
a/stream-tests/src/test/scala/org/apache/pekko/stream/io/compression/CoderSpec.scala
b/stream-tests/src/test/scala/org/apache/pekko/stream/io/compression/CoderSpec.scala
index 476550fa4a..bed6fef7d9 100644
---
a/stream-tests/src/test/scala/org/apache/pekko/stream/io/compression/CoderSpec.scala
+++
b/stream-tests/src/test/scala/org/apache/pekko/stream/io/compression/CoderSpec.scala
@@ -13,7 +13,7 @@
package org.apache.pekko.stream.io.compression
-import java.io.{ ByteArrayInputStream, ByteArrayOutputStream, InputStream,
OutputStream }
+import java.io.{ ByteArrayOutputStream, InputStream, OutputStream }
import java.util.concurrent.ThreadLocalRandom
import java.util.zip.DataFormatException
@@ -186,7 +186,7 @@ abstract class CoderSpec(codecName: String) extends
AnyWordSpec with CodecSpecSu
def streamDecode(bytes: ByteString): ByteString = {
val output = new ByteArrayOutputStream()
- val input = newDecodedInputStream(new ByteArrayInputStream(bytes.toArray))
+ val input = newDecodedInputStream(bytes.asInputStream)
val buffer = new Array[Byte](500)
@tailrec def copy(from: InputStream, to: OutputStream): Unit = {
diff --git
a/testkit/src/main/scala/org/apache/pekko/testkit/TestJavaSerializer.scala
b/testkit/src/main/scala/org/apache/pekko/testkit/TestJavaSerializer.scala
index 4dac31c04b..4978b21738 100644
--- a/testkit/src/main/scala/org/apache/pekko/testkit/TestJavaSerializer.scala
+++ b/testkit/src/main/scala/org/apache/pekko/testkit/TestJavaSerializer.scala
@@ -13,10 +13,11 @@
package org.apache.pekko.testkit
-import java.io.{ ByteArrayInputStream, ByteArrayOutputStream,
ObjectOutputStream }
+import java.io.{ ByteArrayOutputStream, ObjectOutputStream }
import org.apache.pekko
import pekko.actor.ExtendedActorSystem
+import pekko.io.UnsynchronizedByteArrayInputStream
import pekko.serialization.{ BaseSerializer, JavaSerializer }
import pekko.util.ClassLoaderObjectInputStream
@@ -43,9 +44,9 @@ class TestJavaSerializer(val system: ExtendedActorSystem)
extends BaseSerializer
}
def fromBinary(bytes: Array[Byte], clazz: Option[Class[_]]): AnyRef = {
- val in = new
ClassLoaderObjectInputStream(system.dynamicAccess.classLoader, new
ByteArrayInputStream(bytes))
- val obj = JavaSerializer.currentSystem.withValue(system) { in.readObject }
- in.close()
- obj
+ val in =
+ new ClassLoaderObjectInputStream(system.dynamicAccess.classLoader, new
UnsynchronizedByteArrayInputStream(bytes))
+ try JavaSerializer.currentSystem.withValue(system) { in.readObject }
+ finally in.close()
}
}
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]