This is an automated email from the ASF dual-hosted git repository.
btellier pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/james-project.git
The following commit(s) were added to refs/heads/master by this push:
new 1e2a6904d0 JAMES-3763 Generics for BlobIdProvider (#2540)
1e2a6904d0 is described below
commit 1e2a6904d03bb2299d2769960937355ccb769059
Author: Benoit TELLIER <[email protected]>
AuthorDate: Tue Dec 3 14:37:45 2024 +0100
JAMES-3763 Generics for BlobIdProvider (#2540)
It avoids buffering byte arrays onto temporary files..
Triggering IOs is a MAJOR performance hit in itself but adding this onto a
dedicated large pool for IOs when we are compute bound also skyrockets the
count of threads. Not to mention that my experience with resource management
with reactor let me think that edge cases like cancelation could trigger a
ressource leak (been there).
We can beatifully avoid these inneficiencies by making BlobIdProvider a
generic.
Disaster that waited to happen...
---
.../java/org/apache/james/blob/api/BlobStore.java | 10 ++--
.../apache/james/blob/api/MetricableBlobStore.java | 6 +--
.../blob/cassandra/cache/CachedBlobStore.java | 6 +--
.../deduplication/DeDuplicationBlobStore.scala | 55 +++++++++++++---------
.../blob/deduplication/PassThroughBlobStore.scala | 37 ++++++++-------
5 files changed, 63 insertions(+), 51 deletions(-)
diff --git
a/server/blob/blob-api/src/main/java/org/apache/james/blob/api/BlobStore.java
b/server/blob/blob-api/src/main/java/org/apache/james/blob/api/BlobStore.java
index 22f08167e0..8d5101e0db 100644
---
a/server/blob/blob-api/src/main/java/org/apache/james/blob/api/BlobStore.java
+++
b/server/blob/blob-api/src/main/java/org/apache/james/blob/api/BlobStore.java
@@ -37,8 +37,8 @@ public interface BlobStore {
}
@FunctionalInterface
- interface BlobIdProvider {
- Publisher<Tuple2<BlobId, InputStream>> apply(InputStream stream);
+ interface BlobIdProvider<T> {
+ Publisher<Tuple2<BlobId, T>> apply(T stream);
}
Publisher<BlobId> save(BucketName bucketName, byte[] data, StoragePolicy
storagePolicy);
@@ -47,11 +47,11 @@ public interface BlobStore {
Publisher<BlobId> save(BucketName bucketName, ByteSource data,
StoragePolicy storagePolicy);
- Publisher<BlobId> save(BucketName bucketName, byte[] data, BlobIdProvider
blobIdProvider, StoragePolicy storagePolicy);
+ Publisher<BlobId> save(BucketName bucketName, byte[] data,
BlobIdProvider<byte[]> blobIdProvider, StoragePolicy storagePolicy);
- Publisher<BlobId> save(BucketName bucketName, InputStream data,
BlobIdProvider blobIdProvider, StoragePolicy storagePolicy);
+ Publisher<BlobId> save(BucketName bucketName, InputStream data,
BlobIdProvider<InputStream> blobIdProvider, StoragePolicy storagePolicy);
- Publisher<BlobId> save(BucketName bucketName, ByteSource data,
BlobIdProvider blobIdProvider, StoragePolicy storagePolicy);
+ Publisher<BlobId> save(BucketName bucketName, ByteSource data,
BlobIdProvider<ByteSource> blobIdProvider, StoragePolicy storagePolicy);
default Publisher<BlobId> save(BucketName bucketName, String data,
StoragePolicy storagePolicy) {
return save(bucketName, data.getBytes(StandardCharsets.UTF_8),
storagePolicy);
diff --git
a/server/blob/blob-api/src/main/java/org/apache/james/blob/api/MetricableBlobStore.java
b/server/blob/blob-api/src/main/java/org/apache/james/blob/api/MetricableBlobStore.java
index e5abf56c28..0d60274937 100644
---
a/server/blob/blob-api/src/main/java/org/apache/james/blob/api/MetricableBlobStore.java
+++
b/server/blob/blob-api/src/main/java/org/apache/james/blob/api/MetricableBlobStore.java
@@ -66,17 +66,17 @@ public class MetricableBlobStore implements BlobStore {
}
@Override
- public Publisher<BlobId> save(BucketName bucketName, byte[] data,
BlobIdProvider blobIdProvider, StoragePolicy storagePolicy) {
+ public Publisher<BlobId> save(BucketName bucketName, byte[] data,
BlobIdProvider<byte[]> blobIdProvider, StoragePolicy storagePolicy) {
return
metricFactory.decoratePublisherWithTimerMetric(SAVE_BYTES_TIMER_NAME,
blobStoreImpl.save(bucketName, data, blobIdProvider, storagePolicy));
}
@Override
- public Publisher<BlobId> save(BucketName bucketName, InputStream data,
BlobIdProvider blobIdProvider, StoragePolicy storagePolicy) {
+ public Publisher<BlobId> save(BucketName bucketName, InputStream data,
BlobIdProvider<InputStream> blobIdProvider, StoragePolicy storagePolicy) {
return
metricFactory.decoratePublisherWithTimerMetric(SAVE_INPUT_STREAM_TIMER_NAME,
blobStoreImpl.save(bucketName, data, blobIdProvider, storagePolicy));
}
@Override
- public Publisher<BlobId> save(BucketName bucketName, ByteSource data,
BlobIdProvider blobIdProvider, StoragePolicy storagePolicy) {
+ public Publisher<BlobId> save(BucketName bucketName, ByteSource data,
BlobIdProvider<ByteSource> blobIdProvider, StoragePolicy storagePolicy) {
return
metricFactory.decoratePublisherWithTimerMetric(SAVE_INPUT_STREAM_TIMER_NAME,
blobStoreImpl.save(bucketName, data, blobIdProvider, storagePolicy));
}
diff --git
a/server/blob/blob-cassandra/src/main/java/org/apache/james/blob/cassandra/cache/CachedBlobStore.java
b/server/blob/blob-cassandra/src/main/java/org/apache/james/blob/cassandra/cache/CachedBlobStore.java
index 063c71fad1..e3c95f7dce 100644
---
a/server/blob/blob-cassandra/src/main/java/org/apache/james/blob/cassandra/cache/CachedBlobStore.java
+++
b/server/blob/blob-cassandra/src/main/java/org/apache/james/blob/cassandra/cache/CachedBlobStore.java
@@ -253,7 +253,7 @@ public class CachedBlobStore implements BlobStore {
}
@Override
- public Mono<BlobId> save(BucketName bucketName, byte[] bytes,
BlobIdProvider blobIdProvider, StoragePolicy storagePolicy) {
+ public Mono<BlobId> save(BucketName bucketName, byte[] bytes,
BlobIdProvider<byte[]> blobIdProvider, StoragePolicy storagePolicy) {
return Mono.from(backend.save(bucketName, bytes, blobIdProvider,
storagePolicy))
.flatMap(blobId -> {
if (isAbleToCache(bucketName, bytes, storagePolicy)) {
@@ -264,7 +264,7 @@ public class CachedBlobStore implements BlobStore {
}
@Override
- public Publisher<BlobId> save(BucketName bucketName, InputStream
inputStream, BlobIdProvider blobIdProvider, StoragePolicy storagePolicy) {
+ public Publisher<BlobId> save(BucketName bucketName, InputStream
inputStream, BlobIdProvider<InputStream> blobIdProvider, StoragePolicy
storagePolicy) {
Preconditions.checkNotNull(inputStream, "InputStream must not be
null");
if (isAbleToCache(bucketName, storagePolicy)) {
@@ -280,7 +280,7 @@ public class CachedBlobStore implements BlobStore {
}
@Override
- public Publisher<BlobId> save(BucketName bucketName, ByteSource
byteSource, BlobIdProvider blobIdProvider, StoragePolicy storagePolicy) {
+ public Publisher<BlobId> save(BucketName bucketName, ByteSource
byteSource, BlobIdProvider<ByteSource> blobIdProvider, StoragePolicy
storagePolicy) {
Preconditions.checkNotNull(byteSource, "ByteSource must not be null");
if (isAbleToCache(bucketName, storagePolicy)) {
diff --git
a/server/blob/blob-storage-strategy/src/main/scala/org/apache/james/server/blob/deduplication/DeDuplicationBlobStore.scala
b/server/blob/blob-storage-strategy/src/main/scala/org/apache/james/server/blob/deduplication/DeDuplicationBlobStore.scala
index 3ca823f45b..ace4cd5510 100644
---
a/server/blob/blob-storage-strategy/src/main/scala/org/apache/james/server/blob/deduplication/DeDuplicationBlobStore.scala
+++
b/server/blob/blob-storage-strategy/src/main/scala/org/apache/james/server/blob/deduplication/DeDuplicationBlobStore.scala
@@ -28,11 +28,11 @@ import org.apache.james.blob.api.BlobStore.BlobIdProvider
import org.apache.james.blob.api.{BlobId, BlobStore, BlobStoreDAO, BucketName}
import org.reactivestreams.Publisher
import reactor.core.publisher.{Flux, Mono}
-import reactor.core.scala.publisher.{SMono, tupleTwo2ScalaTuple2}
+import reactor.core.scala.publisher.SMono
import reactor.core.scheduler.Schedulers
-import reactor.util.function.{Tuple2, Tuples}
+import reactor.util.function.Tuples
-import java.io.{ByteArrayInputStream, InputStream}
+import java.io.InputStream
import java.util.concurrent.Callable
import scala.compat.java8.FunctionConverters._
@@ -67,7 +67,7 @@ class DeDuplicationBlobStore @Inject()(blobStoreDAO:
BlobStoreDAO,
private val baseEncoding =
Option(System.getProperty(HASH_BLOB_ID_ENCODING_TYPE_PROPERTY)).map(DeDuplicationBlobStore.baseEncodingFrom).getOrElse(HASH_BLOB_ID_ENCODING_DEFAULT)
override def save(bucketName: BucketName, data: Array[Byte], storagePolicy:
BlobStore.StoragePolicy): Publisher[BlobId] = {
- save(bucketName, data, withBlobId, storagePolicy)
+ save(bucketName, data, withBlobIdFromArray, storagePolicy)
}
override def save(bucketName: BucketName, data: InputStream, storagePolicy:
BlobStore.StoragePolicy): Publisher[BlobId] = {
@@ -75,32 +75,30 @@ class DeDuplicationBlobStore @Inject()(blobStoreDAO:
BlobStoreDAO,
}
override def save(bucketName: BucketName, data: ByteSource, storagePolicy:
BlobStore.StoragePolicy): Publisher[BlobId] = {
- save(bucketName, data, withBlobId, storagePolicy)
+ save(bucketName, data, withBlobIdFromByteSource, storagePolicy)
}
- override def save(bucketName: BucketName, data: Array[Byte], blobIdProvider:
BlobIdProvider, storagePolicy: BlobStore.StoragePolicy): Publisher[BlobId] = {
+ override def save(bucketName: BucketName, data: Array[Byte], blobIdProvider:
BlobIdProvider[Array[Byte]], storagePolicy: BlobStore.StoragePolicy):
Publisher[BlobId] = {
Preconditions.checkNotNull(bucketName)
Preconditions.checkNotNull(data)
- save(bucketName, new ByteArrayInputStream(data), blobIdProvider,
storagePolicy)
+ SMono(blobIdProvider.apply(data))
+ .map(_.getT1)
+ .flatMap(blobId => SMono(blobStoreDAO.save(bucketName, blobId, data))
+ .`then`(SMono.just(blobId)))
}
- override def save(bucketName: BucketName, data: ByteSource, blobIdProvider:
BlobIdProvider, storagePolicy: BlobStore.StoragePolicy): Publisher[BlobId] = {
+ override def save(bucketName: BucketName, data: ByteSource, blobIdProvider:
BlobIdProvider[ByteSource], storagePolicy: BlobStore.StoragePolicy):
Publisher[BlobId] = {
Preconditions.checkNotNull(bucketName)
Preconditions.checkNotNull(data)
- SMono.fromCallable(() => data.openStream())
- .using(
- use = stream => SMono(blobIdProvider.apply(stream))
- .subscribeOn(Schedulers.boundedElastic())
- .map(tupleTwo2ScalaTuple2)
- .flatMap { case (blobId, inputStream) =>
- SMono(blobStoreDAO.save(bucketName, blobId, inputStream))
- .`then`(SMono.just(blobId))
- })(
- release = _.close())
+ SMono(blobIdProvider.apply(data))
+ .map(_.getT1)
+ .flatMap(blobId => SMono(blobStoreDAO.save(bucketName, blobId, data))
+ .`then`(SMono.just(blobId)))
+ .subscribeOn(Schedulers.boundedElastic())
}
- private def withBlobId(data: InputStream): Publisher[Tuple2[BlobId,
InputStream]] = {
+ private def withBlobId: BlobIdProvider[InputStream] = data => {
val hashingInputStream = new HashingInputStream(Hashing.sha256, data)
val ressourceSupplier: Callable[FileBackedOutputStream] = () => new
FileBackedOutputStream(DeDuplicationBlobStore.FILE_THRESHOLD)
val sourceSupplier: FileBackedOutputStream => Mono[(BlobId, InputStream)] =
@@ -114,11 +112,24 @@ class DeDuplicationBlobStore @Inject()(blobStoreDAO:
BlobStoreDAO,
ressourceSupplier,
sourceSupplier.asJava,
((fileBackedOutputStream: FileBackedOutputStream) =>
fileBackedOutputStream.reset()).asJava,
- DeDuplicationBlobStore.LAZY_RESOURCE_CLEANUP
- ) .subscribeOn(Schedulers.boundedElastic())
+ DeDuplicationBlobStore.LAZY_RESOURCE_CLEANUP)
+ .subscribeOn(Schedulers.boundedElastic())
.map{ case (blobId, data) => Tuples.of(blobId, data)}
}
+ private def withBlobIdFromByteSource: BlobIdProvider[ByteSource] =
+ data => Mono.fromCallable(() => data.hash(Hashing.sha256()))
+ .subscribeOn(Schedulers.boundedElastic())
+ .map(base64)
+ .map(blobIdFactory.of)
+ .map(blobId => Tuples.of(blobId, data))
+
+ private def withBlobIdFromArray: BlobIdProvider[Array[Byte]] = data => {
+ val code = Hashing.sha256.hashBytes(data)
+ val blobId = blobIdFactory.of(base64(code))
+ Mono.just(Tuples.of(blobId, data))
+ }
+
private def base64(hashCode: HashCode) = {
val bytes = hashCode.asBytes
baseEncoding.encode(bytes)
@@ -126,7 +137,7 @@ class DeDuplicationBlobStore @Inject()(blobStoreDAO:
BlobStoreDAO,
override def save(bucketName: BucketName,
data: InputStream,
- blobIdProvider: BlobIdProvider,
+ blobIdProvider: BlobIdProvider[InputStream],
storagePolicy: BlobStore.StoragePolicy): Publisher[BlobId]
= {
Preconditions.checkNotNull(bucketName)
Preconditions.checkNotNull(data)
diff --git
a/server/blob/blob-storage-strategy/src/main/scala/org/apache/james/server/blob/deduplication/PassThroughBlobStore.scala
b/server/blob/blob-storage-strategy/src/main/scala/org/apache/james/server/blob/deduplication/PassThroughBlobStore.scala
index ec451d75dc..26de0c5bde 100644
---
a/server/blob/blob-storage-strategy/src/main/scala/org/apache/james/server/blob/deduplication/PassThroughBlobStore.scala
+++
b/server/blob/blob-storage-strategy/src/main/scala/org/apache/james/server/blob/deduplication/PassThroughBlobStore.scala
@@ -38,7 +38,7 @@ class PassThroughBlobStore @Inject()(blobStoreDAO:
BlobStoreDAO,
blobIdFactory: BlobId.Factory) extends
BlobStore {
override def save(bucketName: BucketName, data: Array[Byte], storagePolicy:
BlobStore.StoragePolicy): Publisher[BlobId] = {
- save(bucketName, data, withBlobId, storagePolicy)
+ save(bucketName, data, withBlobIdByteArray, storagePolicy)
}
override def save(bucketName: BucketName, data: InputStream, storagePolicy:
BlobStore.StoragePolicy): Publisher[BlobId] = {
@@ -46,36 +46,33 @@ class PassThroughBlobStore @Inject()(blobStoreDAO:
BlobStoreDAO,
}
override def save(bucketName: BucketName, data: ByteSource, storagePolicy:
BlobStore.StoragePolicy): Publisher[BlobId] = {
- save(bucketName, data, withBlobId, storagePolicy)
+ save(bucketName, data, withBlobIdByteSource, storagePolicy)
}
- override def save(bucketName: BucketName, data: Array[Byte], blobIdProvider:
BlobIdProvider, storagePolicy: BlobStore.StoragePolicy): Publisher[BlobId] = {
+ override def save(bucketName: BucketName, data: Array[Byte], blobIdProvider:
BlobIdProvider[Array[Byte]], storagePolicy: BlobStore.StoragePolicy):
Publisher[BlobId] = {
Preconditions.checkNotNull(bucketName)
Preconditions.checkNotNull(data)
- save(bucketName, new ByteArrayInputStream(data), blobIdProvider,
storagePolicy)
+ SMono(blobIdProvider.apply(data))
+ .map(_.getT1)
+ .flatMap(blobId => SMono(blobStoreDAO.save(bucketName, blobId, data))
+ .`then`(SMono.just(blobId)))
}
- override def save(bucketName: BucketName, data: ByteSource, blobIdProvider:
BlobIdProvider, storagePolicy: BlobStore.StoragePolicy): Publisher[BlobId] = {
+ override def save(bucketName: BucketName, data: ByteSource, blobIdProvider:
BlobIdProvider[ByteSource], storagePolicy: BlobStore.StoragePolicy):
Publisher[BlobId] = {
Preconditions.checkNotNull(bucketName)
Preconditions.checkNotNull(data)
- SMono.fromCallable(() => data.openStream())
- .using(
- use = stream => SMono(blobIdProvider.apply(stream))
- .subscribeOn(Schedulers.boundedElastic())
- .map(tupleTwo2ScalaTuple2)
- .flatMap { case (blobId, inputStream) =>
- SMono(blobStoreDAO.save(bucketName, blobId, inputStream))
- .`then`(SMono.just(blobId))
- })(
- release = _.close())
-
+ SMono(blobIdProvider.apply(data))
+ .map(_.getT1)
+ .flatMap(blobId => SMono(blobStoreDAO.save(bucketName, blobId, data))
+ .`then`(SMono.just(blobId)))
+ .subscribeOn(Schedulers.boundedElastic())
}
override def save(
bucketName: BucketName,
data: InputStream,
- blobIdProvider: BlobIdProvider,
+ blobIdProvider: BlobIdProvider[InputStream],
storagePolicy: BlobStore.StoragePolicy):
Publisher[BlobId] = {
Preconditions.checkNotNull(bucketName)
Preconditions.checkNotNull(data)
@@ -88,7 +85,11 @@ class PassThroughBlobStore @Inject()(blobStoreDAO:
BlobStoreDAO,
}
}
- private def withBlobId(data: InputStream): Publisher[Tuple2[BlobId,
InputStream]] =
+ private def withBlobId: BlobIdProvider[InputStream] = data =>
+ SMono.just(Tuples.of(blobIdFactory.of(UUID.randomUUID.toString), data))
+ private def withBlobIdByteArray: BlobIdProvider[Array[Byte]] = data =>
+ SMono.just(Tuples.of(blobIdFactory.of(UUID.randomUUID.toString), data))
+ private def withBlobIdByteSource: BlobIdProvider[ByteSource] = data =>
SMono.just(Tuples.of(blobIdFactory.of(UUID.randomUUID.toString), data))
override def readBytes(bucketName: BucketName, blobId: BlobId):
Publisher[Array[Byte]] = {
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]