This is an automated email from the ASF dual-hosted git repository.

btellier pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/james-project.git


The following commit(s) were added to refs/heads/master by this push:
     new 1e2a6904d0 JAMES-3763 Generics for BlobIdProvider (#2540)
1e2a6904d0 is described below

commit 1e2a6904d03bb2299d2769960937355ccb769059
Author: Benoit TELLIER <[email protected]>
AuthorDate: Tue Dec 3 14:37:45 2024 +0100

    JAMES-3763 Generics for BlobIdProvider (#2540)
    
    
    
    It avoids buffering byte arrays onto temporary files..
    
    Triggering IOs is a MAJOR performance hit in itself but adding this onto a 
dedicated large pool for IOs when we are compute bound also skyrockets the 
count of threads. Not to mention that my experience with resource management 
with reactor let me think that edge cases like cancelation could trigger a 
ressource leak (been there).
    
    We can beatifully avoid these inneficiencies by making BlobIdProvider a 
generic.
    
    Disaster that waited to happen...
---
 .../java/org/apache/james/blob/api/BlobStore.java  | 10 ++--
 .../apache/james/blob/api/MetricableBlobStore.java |  6 +--
 .../blob/cassandra/cache/CachedBlobStore.java      |  6 +--
 .../deduplication/DeDuplicationBlobStore.scala     | 55 +++++++++++++---------
 .../blob/deduplication/PassThroughBlobStore.scala  | 37 ++++++++-------
 5 files changed, 63 insertions(+), 51 deletions(-)

diff --git 
a/server/blob/blob-api/src/main/java/org/apache/james/blob/api/BlobStore.java 
b/server/blob/blob-api/src/main/java/org/apache/james/blob/api/BlobStore.java
index 22f08167e0..8d5101e0db 100644
--- 
a/server/blob/blob-api/src/main/java/org/apache/james/blob/api/BlobStore.java
+++ 
b/server/blob/blob-api/src/main/java/org/apache/james/blob/api/BlobStore.java
@@ -37,8 +37,8 @@ public interface BlobStore {
     }
 
     @FunctionalInterface
-    interface BlobIdProvider {
-        Publisher<Tuple2<BlobId, InputStream>> apply(InputStream stream);
+    interface BlobIdProvider<T> {
+        Publisher<Tuple2<BlobId, T>> apply(T stream);
     }
 
     Publisher<BlobId> save(BucketName bucketName, byte[] data, StoragePolicy 
storagePolicy);
@@ -47,11 +47,11 @@ public interface BlobStore {
 
     Publisher<BlobId> save(BucketName bucketName, ByteSource data, 
StoragePolicy storagePolicy);
 
-    Publisher<BlobId> save(BucketName bucketName, byte[] data, BlobIdProvider 
blobIdProvider, StoragePolicy storagePolicy);
+    Publisher<BlobId> save(BucketName bucketName, byte[] data, 
BlobIdProvider<byte[]> blobIdProvider, StoragePolicy storagePolicy);
 
-    Publisher<BlobId> save(BucketName bucketName, InputStream data, 
BlobIdProvider blobIdProvider, StoragePolicy storagePolicy);
+    Publisher<BlobId> save(BucketName bucketName, InputStream data, 
BlobIdProvider<InputStream> blobIdProvider, StoragePolicy storagePolicy);
 
-    Publisher<BlobId> save(BucketName bucketName, ByteSource data, 
BlobIdProvider blobIdProvider, StoragePolicy storagePolicy);
+    Publisher<BlobId> save(BucketName bucketName, ByteSource data, 
BlobIdProvider<ByteSource> blobIdProvider, StoragePolicy storagePolicy);
 
     default Publisher<BlobId> save(BucketName bucketName, String data, 
StoragePolicy storagePolicy) {
         return save(bucketName, data.getBytes(StandardCharsets.UTF_8), 
storagePolicy);
diff --git 
a/server/blob/blob-api/src/main/java/org/apache/james/blob/api/MetricableBlobStore.java
 
b/server/blob/blob-api/src/main/java/org/apache/james/blob/api/MetricableBlobStore.java
index e5abf56c28..0d60274937 100644
--- 
a/server/blob/blob-api/src/main/java/org/apache/james/blob/api/MetricableBlobStore.java
+++ 
b/server/blob/blob-api/src/main/java/org/apache/james/blob/api/MetricableBlobStore.java
@@ -66,17 +66,17 @@ public class MetricableBlobStore implements BlobStore {
     }
 
     @Override
-    public Publisher<BlobId> save(BucketName bucketName, byte[] data, 
BlobIdProvider blobIdProvider, StoragePolicy storagePolicy) {
+    public Publisher<BlobId> save(BucketName bucketName, byte[] data, 
BlobIdProvider<byte[]> blobIdProvider, StoragePolicy storagePolicy) {
         return 
metricFactory.decoratePublisherWithTimerMetric(SAVE_BYTES_TIMER_NAME, 
blobStoreImpl.save(bucketName, data, blobIdProvider, storagePolicy));
     }
 
     @Override
-    public Publisher<BlobId> save(BucketName bucketName, InputStream data, 
BlobIdProvider blobIdProvider, StoragePolicy storagePolicy) {
+    public Publisher<BlobId> save(BucketName bucketName, InputStream data, 
BlobIdProvider<InputStream> blobIdProvider, StoragePolicy storagePolicy) {
         return 
metricFactory.decoratePublisherWithTimerMetric(SAVE_INPUT_STREAM_TIMER_NAME, 
blobStoreImpl.save(bucketName, data, blobIdProvider, storagePolicy));
     }
 
     @Override
-    public Publisher<BlobId> save(BucketName bucketName, ByteSource data, 
BlobIdProvider blobIdProvider, StoragePolicy storagePolicy) {
+    public Publisher<BlobId> save(BucketName bucketName, ByteSource data, 
BlobIdProvider<ByteSource> blobIdProvider, StoragePolicy storagePolicy) {
         return 
metricFactory.decoratePublisherWithTimerMetric(SAVE_INPUT_STREAM_TIMER_NAME, 
blobStoreImpl.save(bucketName, data, blobIdProvider, storagePolicy));
     }
 
diff --git 
a/server/blob/blob-cassandra/src/main/java/org/apache/james/blob/cassandra/cache/CachedBlobStore.java
 
b/server/blob/blob-cassandra/src/main/java/org/apache/james/blob/cassandra/cache/CachedBlobStore.java
index 063c71fad1..e3c95f7dce 100644
--- 
a/server/blob/blob-cassandra/src/main/java/org/apache/james/blob/cassandra/cache/CachedBlobStore.java
+++ 
b/server/blob/blob-cassandra/src/main/java/org/apache/james/blob/cassandra/cache/CachedBlobStore.java
@@ -253,7 +253,7 @@ public class CachedBlobStore implements BlobStore {
     }
 
     @Override
-    public Mono<BlobId> save(BucketName bucketName, byte[] bytes, 
BlobIdProvider blobIdProvider, StoragePolicy storagePolicy) {
+    public Mono<BlobId> save(BucketName bucketName, byte[] bytes, 
BlobIdProvider<byte[]> blobIdProvider, StoragePolicy storagePolicy) {
         return Mono.from(backend.save(bucketName, bytes, blobIdProvider, 
storagePolicy))
             .flatMap(blobId -> {
                 if (isAbleToCache(bucketName, bytes, storagePolicy)) {
@@ -264,7 +264,7 @@ public class CachedBlobStore implements BlobStore {
     }
 
     @Override
-    public Publisher<BlobId> save(BucketName bucketName, InputStream 
inputStream, BlobIdProvider blobIdProvider, StoragePolicy storagePolicy) {
+    public Publisher<BlobId> save(BucketName bucketName, InputStream 
inputStream, BlobIdProvider<InputStream> blobIdProvider, StoragePolicy 
storagePolicy) {
         Preconditions.checkNotNull(inputStream, "InputStream must not be 
null");
 
         if (isAbleToCache(bucketName, storagePolicy)) {
@@ -280,7 +280,7 @@ public class CachedBlobStore implements BlobStore {
     }
 
     @Override
-    public Publisher<BlobId> save(BucketName bucketName, ByteSource 
byteSource, BlobIdProvider blobIdProvider, StoragePolicy storagePolicy) {
+    public Publisher<BlobId> save(BucketName bucketName, ByteSource 
byteSource, BlobIdProvider<ByteSource> blobIdProvider, StoragePolicy 
storagePolicy) {
         Preconditions.checkNotNull(byteSource, "ByteSource must not be null");
 
         if (isAbleToCache(bucketName, storagePolicy)) {
diff --git 
a/server/blob/blob-storage-strategy/src/main/scala/org/apache/james/server/blob/deduplication/DeDuplicationBlobStore.scala
 
b/server/blob/blob-storage-strategy/src/main/scala/org/apache/james/server/blob/deduplication/DeDuplicationBlobStore.scala
index 3ca823f45b..ace4cd5510 100644
--- 
a/server/blob/blob-storage-strategy/src/main/scala/org/apache/james/server/blob/deduplication/DeDuplicationBlobStore.scala
+++ 
b/server/blob/blob-storage-strategy/src/main/scala/org/apache/james/server/blob/deduplication/DeDuplicationBlobStore.scala
@@ -28,11 +28,11 @@ import org.apache.james.blob.api.BlobStore.BlobIdProvider
 import org.apache.james.blob.api.{BlobId, BlobStore, BlobStoreDAO, BucketName}
 import org.reactivestreams.Publisher
 import reactor.core.publisher.{Flux, Mono}
-import reactor.core.scala.publisher.{SMono, tupleTwo2ScalaTuple2}
+import reactor.core.scala.publisher.SMono
 import reactor.core.scheduler.Schedulers
-import reactor.util.function.{Tuple2, Tuples}
+import reactor.util.function.Tuples
 
-import java.io.{ByteArrayInputStream, InputStream}
+import java.io.InputStream
 import java.util.concurrent.Callable
 import scala.compat.java8.FunctionConverters._
 
@@ -67,7 +67,7 @@ class DeDuplicationBlobStore @Inject()(blobStoreDAO: 
BlobStoreDAO,
   private val baseEncoding = 
Option(System.getProperty(HASH_BLOB_ID_ENCODING_TYPE_PROPERTY)).map(DeDuplicationBlobStore.baseEncodingFrom).getOrElse(HASH_BLOB_ID_ENCODING_DEFAULT)
 
   override def save(bucketName: BucketName, data: Array[Byte], storagePolicy: 
BlobStore.StoragePolicy): Publisher[BlobId] = {
-    save(bucketName, data, withBlobId, storagePolicy)
+    save(bucketName, data, withBlobIdFromArray, storagePolicy)
   }
 
   override def save(bucketName: BucketName, data: InputStream, storagePolicy: 
BlobStore.StoragePolicy): Publisher[BlobId] = {
@@ -75,32 +75,30 @@ class DeDuplicationBlobStore @Inject()(blobStoreDAO: 
BlobStoreDAO,
   }
 
   override def save(bucketName: BucketName, data: ByteSource, storagePolicy: 
BlobStore.StoragePolicy): Publisher[BlobId] = {
-    save(bucketName, data, withBlobId, storagePolicy)
+    save(bucketName, data, withBlobIdFromByteSource, storagePolicy)
   }
 
-  override def save(bucketName: BucketName, data: Array[Byte], blobIdProvider: 
BlobIdProvider, storagePolicy: BlobStore.StoragePolicy): Publisher[BlobId] = {
+  override def save(bucketName: BucketName, data: Array[Byte], blobIdProvider: 
BlobIdProvider[Array[Byte]], storagePolicy: BlobStore.StoragePolicy): 
Publisher[BlobId] = {
     Preconditions.checkNotNull(bucketName)
     Preconditions.checkNotNull(data)
-    save(bucketName, new ByteArrayInputStream(data), blobIdProvider, 
storagePolicy)
+    SMono(blobIdProvider.apply(data))
+      .map(_.getT1)
+      .flatMap(blobId => SMono(blobStoreDAO.save(bucketName, blobId, data))
+      .`then`(SMono.just(blobId)))
   }
 
-  override def save(bucketName: BucketName, data: ByteSource, blobIdProvider: 
BlobIdProvider, storagePolicy: BlobStore.StoragePolicy): Publisher[BlobId] = {
+  override def save(bucketName: BucketName, data: ByteSource, blobIdProvider: 
BlobIdProvider[ByteSource], storagePolicy: BlobStore.StoragePolicy): 
Publisher[BlobId] = {
     Preconditions.checkNotNull(bucketName)
     Preconditions.checkNotNull(data)
 
-    SMono.fromCallable(() => data.openStream())
-      .using(
-        use = stream => SMono(blobIdProvider.apply(stream))
-          .subscribeOn(Schedulers.boundedElastic())
-          .map(tupleTwo2ScalaTuple2)
-          .flatMap { case (blobId, inputStream) =>
-            SMono(blobStoreDAO.save(bucketName, blobId, inputStream))
-              .`then`(SMono.just(blobId))
-          })(
-        release = _.close())
+    SMono(blobIdProvider.apply(data))
+      .map(_.getT1)
+      .flatMap(blobId => SMono(blobStoreDAO.save(bucketName, blobId, data))
+          .`then`(SMono.just(blobId)))
+      .subscribeOn(Schedulers.boundedElastic())
   }
 
-  private def withBlobId(data: InputStream): Publisher[Tuple2[BlobId, 
InputStream]] = {
+  private def withBlobId: BlobIdProvider[InputStream] = data => {
     val hashingInputStream = new HashingInputStream(Hashing.sha256, data)
     val ressourceSupplier: Callable[FileBackedOutputStream] = () => new 
FileBackedOutputStream(DeDuplicationBlobStore.FILE_THRESHOLD)
     val sourceSupplier: FileBackedOutputStream => Mono[(BlobId, InputStream)] =
@@ -114,11 +112,24 @@ class DeDuplicationBlobStore @Inject()(blobStoreDAO: 
BlobStoreDAO,
       ressourceSupplier,
       sourceSupplier.asJava,
       ((fileBackedOutputStream: FileBackedOutputStream) => 
fileBackedOutputStream.reset()).asJava,
-      DeDuplicationBlobStore.LAZY_RESOURCE_CLEANUP
-    ) .subscribeOn(Schedulers.boundedElastic())
+      DeDuplicationBlobStore.LAZY_RESOURCE_CLEANUP)
+      .subscribeOn(Schedulers.boundedElastic())
       .map{ case (blobId, data) => Tuples.of(blobId, data)}
   }
 
+  private def withBlobIdFromByteSource: BlobIdProvider[ByteSource] =
+    data => Mono.fromCallable(() => data.hash(Hashing.sha256()))
+      .subscribeOn(Schedulers.boundedElastic())
+      .map(base64)
+      .map(blobIdFactory.of)
+      .map(blobId => Tuples.of(blobId, data))
+
+  private def withBlobIdFromArray: BlobIdProvider[Array[Byte]] = data => {
+    val code = Hashing.sha256.hashBytes(data)
+    val blobId = blobIdFactory.of(base64(code))
+    Mono.just(Tuples.of(blobId, data))
+  }
+
   private def base64(hashCode: HashCode) = {
     val bytes = hashCode.asBytes
     baseEncoding.encode(bytes)
@@ -126,7 +137,7 @@ class DeDuplicationBlobStore @Inject()(blobStoreDAO: 
BlobStoreDAO,
 
   override def save(bucketName: BucketName,
                     data: InputStream,
-                    blobIdProvider: BlobIdProvider,
+                    blobIdProvider: BlobIdProvider[InputStream],
                     storagePolicy: BlobStore.StoragePolicy): Publisher[BlobId] 
= {
     Preconditions.checkNotNull(bucketName)
     Preconditions.checkNotNull(data)
diff --git 
a/server/blob/blob-storage-strategy/src/main/scala/org/apache/james/server/blob/deduplication/PassThroughBlobStore.scala
 
b/server/blob/blob-storage-strategy/src/main/scala/org/apache/james/server/blob/deduplication/PassThroughBlobStore.scala
index ec451d75dc..26de0c5bde 100644
--- 
a/server/blob/blob-storage-strategy/src/main/scala/org/apache/james/server/blob/deduplication/PassThroughBlobStore.scala
+++ 
b/server/blob/blob-storage-strategy/src/main/scala/org/apache/james/server/blob/deduplication/PassThroughBlobStore.scala
@@ -38,7 +38,7 @@ class PassThroughBlobStore @Inject()(blobStoreDAO: 
BlobStoreDAO,
                                      blobIdFactory: BlobId.Factory) extends 
BlobStore {
 
   override def save(bucketName: BucketName, data: Array[Byte], storagePolicy: 
BlobStore.StoragePolicy): Publisher[BlobId] = {
-    save(bucketName, data, withBlobId, storagePolicy)
+    save(bucketName, data, withBlobIdByteArray, storagePolicy)
   }
 
   override def save(bucketName: BucketName, data: InputStream, storagePolicy: 
BlobStore.StoragePolicy): Publisher[BlobId] = {
@@ -46,36 +46,33 @@ class PassThroughBlobStore @Inject()(blobStoreDAO: 
BlobStoreDAO,
   }
 
   override def save(bucketName: BucketName, data: ByteSource, storagePolicy: 
BlobStore.StoragePolicy): Publisher[BlobId] = {
-    save(bucketName, data, withBlobId, storagePolicy)
+    save(bucketName, data, withBlobIdByteSource, storagePolicy)
   }
 
-  override def save(bucketName: BucketName, data: Array[Byte], blobIdProvider: 
BlobIdProvider, storagePolicy: BlobStore.StoragePolicy): Publisher[BlobId] = {
+  override def save(bucketName: BucketName, data: Array[Byte], blobIdProvider: 
BlobIdProvider[Array[Byte]], storagePolicy: BlobStore.StoragePolicy): 
Publisher[BlobId] = {
     Preconditions.checkNotNull(bucketName)
     Preconditions.checkNotNull(data)
-    save(bucketName, new ByteArrayInputStream(data), blobIdProvider, 
storagePolicy)
+    SMono(blobIdProvider.apply(data))
+      .map(_.getT1)
+      .flatMap(blobId => SMono(blobStoreDAO.save(bucketName, blobId, data))
+        .`then`(SMono.just(blobId)))
   }
 
-  override def save(bucketName: BucketName, data: ByteSource, blobIdProvider: 
BlobIdProvider, storagePolicy: BlobStore.StoragePolicy): Publisher[BlobId] = {
+  override def save(bucketName: BucketName, data: ByteSource, blobIdProvider: 
BlobIdProvider[ByteSource], storagePolicy: BlobStore.StoragePolicy): 
Publisher[BlobId] = {
     Preconditions.checkNotNull(bucketName)
     Preconditions.checkNotNull(data)
 
-    SMono.fromCallable(() => data.openStream())
-      .using(
-        use = stream => SMono(blobIdProvider.apply(stream))
-          .subscribeOn(Schedulers.boundedElastic())
-          .map(tupleTwo2ScalaTuple2)
-          .flatMap { case (blobId, inputStream) =>
-            SMono(blobStoreDAO.save(bucketName, blobId, inputStream))
-              .`then`(SMono.just(blobId))
-          })(
-        release = _.close())
-
+    SMono(blobIdProvider.apply(data))
+      .map(_.getT1)
+      .flatMap(blobId => SMono(blobStoreDAO.save(bucketName, blobId, data))
+        .`then`(SMono.just(blobId)))
+      .subscribeOn(Schedulers.boundedElastic())
   }
 
   override def save(
                      bucketName: BucketName,
                      data: InputStream,
-                     blobIdProvider: BlobIdProvider,
+                     blobIdProvider: BlobIdProvider[InputStream],
                      storagePolicy: BlobStore.StoragePolicy): 
Publisher[BlobId] = {
     Preconditions.checkNotNull(bucketName)
     Preconditions.checkNotNull(data)
@@ -88,7 +85,11 @@ class PassThroughBlobStore @Inject()(blobStoreDAO: 
BlobStoreDAO,
       }
   }
 
-  private def withBlobId(data: InputStream): Publisher[Tuple2[BlobId, 
InputStream]] =
+  private def withBlobId: BlobIdProvider[InputStream] = data =>
+    SMono.just(Tuples.of(blobIdFactory.of(UUID.randomUUID.toString), data))
+  private def withBlobIdByteArray: BlobIdProvider[Array[Byte]] = data =>
+    SMono.just(Tuples.of(blobIdFactory.of(UUID.randomUUID.toString), data))
+  private def withBlobIdByteSource: BlobIdProvider[ByteSource] = data =>
     SMono.just(Tuples.of(blobIdFactory.of(UUID.randomUUID.toString), data))
 
   override def readBytes(bucketName: BucketName, blobId: BlobId): 
Publisher[Array[Byte]] = {


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to