chibenwa commented on code in PR #3016:
URL: https://github.com/apache/james-project/pull/3016#discussion_r3100359780


##########
server/blob/blob-zstd/src/main/java/org/apache/james/blob/zstd/ZstdBlobStoreDAO.java:
##########
@@ -0,0 +1,377 @@
+/****************************************************************
+ * Licensed to the Apache Software Foundation (ASF) under one   *
+ * or more contributor license agreements.  See the NOTICE file *
+ * distributed with this work for additional information        *
+ * regarding copyright ownership.  The ASF licenses this file   *
+ * to you under the Apache License, Version 2.0 (the            *
+ * "License"); you may not use this file except in compliance   *
+ * with the License.  You may obtain a copy of the License at   *
+ *                                                              *
+ *   http://www.apache.org/licenses/LICENSE-2.0                 *
+ *                                                              *
+ * Unless required by applicable law or agreed to in writing,   *
+ * software distributed under the License is distributed on an  *
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY       *
+ * KIND, either express or implied.  See the License for the    *
+ * specific language governing permissions and limitations      *
+ * under the License.                                           *
+ ****************************************************************/
+
+package org.apache.james.blob.zstd;
+
+import java.io.IOException;
+import java.io.InputStream;
+import java.util.Collection;
+import java.util.Map;
+import java.util.Set;
+import java.util.stream.Collectors;
+
+import org.apache.james.blob.api.BlobId;
+import org.apache.james.blob.api.BlobStoreDAO;
+import org.apache.james.blob.api.BucketName;
+import org.apache.james.blob.api.ObjectNotFoundException;
+import org.apache.james.blob.api.ObjectStoreIOException;
+import org.apache.james.metrics.api.Metric;
+import org.apache.james.metrics.api.MetricFactory;
+import org.apache.james.metrics.api.TimeMetric;
+import org.reactivestreams.Publisher;
+
+import com.github.fge.lambdas.Throwing;
+import com.github.luben.zstd.Zstd;
+import com.github.luben.zstd.ZstdInputStream;
+import com.github.luben.zstd.ZstdOutputStream;
+import com.google.common.io.ByteSource;
+import com.google.common.io.CountingOutputStream;
+import com.google.common.io.FileBackedOutputStream;
+
+import reactor.core.publisher.Mono;
+import reactor.core.scheduler.Schedulers;
+
+public class ZstdBlobStoreDAO implements BlobStoreDAO {
+    private record CompressionDecision(long originalSize, long compressedSize) 
{
+        boolean satisfyCompressionMinRatio(float minRatio) {
+            return ((float) compressedSize / originalSize) <= minRatio;
+        }
+    }
+
+    public static final BlobMetadataName CONTENT_ORIGINAL_SIZE = new 
BlobMetadataName("content-original-size");
+    public static final String BLOB_ZSTD_COMPRESS_SAVE_COUNT_METRIC_NAME = 
"blobZstdCompressSaveCount";
+    public static final String BLOB_ZSTD_THRESHOLD_SKIP_COUNT_METRIC_NAME = 
"blobZstdThresholdSkipCount";
+    public static final String BLOB_ZSTD_RATIO_SKIP_COUNT_METRIC_NAME = 
"blobZstdRatioSkipCount";
+    public static final String BLOB_ZSTD_DECOMPRESS_COUNT_METRIC_NAME = 
"blobZstdDecompressCount";
+    public static final String BLOB_ZSTD_SAVED_BYTES_METRIC_NAME = 
"blobZstdSavedBytes";
+    public static final String BLOB_ZSTD_COMPRESS_LATENCY_METRIC_NAME = 
"blobZstdCompressLatency";
+    public static final String BLOB_ZSTD_DECOMPRESS_LATENCY_METRIC_NAME = 
"blobZstdDecompressLatency";
+    private static final int FILE_THRESHOLD = 100 * 1024;
+    private static final Set<BlobMetadataName> RESERVED_METADATA_NAMES = 
Set.of(ContentTransferEncoding.NAME, CONTENT_ORIGINAL_SIZE);
+
+    private final BlobStoreDAO underlying;
+    private final CompressionConfiguration compressionConfiguration;
+    private final MetricFactory metricFactory;
+    private final Metric compressSaveCount;
+    private final Metric thresholdSkipCount;
+    private final Metric ratioSkipCount;
+    private final Metric decompressCount;
+    private final Metric savedBytes;
+
+    public ZstdBlobStoreDAO(BlobStoreDAO underlying, CompressionConfiguration 
compressionConfiguration, MetricFactory metricFactory) {
+        this.underlying = underlying;
+        this.compressionConfiguration = compressionConfiguration;
+        this.metricFactory = metricFactory;
+        this.compressSaveCount = 
metricFactory.generate(BLOB_ZSTD_COMPRESS_SAVE_COUNT_METRIC_NAME);
+        this.thresholdSkipCount = 
metricFactory.generate(BLOB_ZSTD_THRESHOLD_SKIP_COUNT_METRIC_NAME);
+        this.ratioSkipCount = 
metricFactory.generate(BLOB_ZSTD_RATIO_SKIP_COUNT_METRIC_NAME);
+        this.decompressCount = 
metricFactory.generate(BLOB_ZSTD_DECOMPRESS_COUNT_METRIC_NAME);
+        this.savedBytes = 
metricFactory.generate(BLOB_ZSTD_SAVED_BYTES_METRIC_NAME);
+    }
+
+    @Override
+    public InputStreamBlob read(BucketName bucketName, BlobId blobId) throws 
ObjectStoreIOException, ObjectNotFoundException {
+        InputStreamBlob blob = underlying.read(bucketName, blobId);
+        if (isCompressed(blob.metadata())) {
+            return decompress(blob);
+        }
+        return blob;
+    }
+
+    @Override
+    public Publisher<InputStreamBlob> readReactive(BucketName bucketName, 
BlobId blobId) {
+        return Mono.from(underlying.readReactive(bucketName, blobId))
+            .flatMap(blob -> {
+                if (isCompressed(blob.metadata())) {
+                    return decompressReactive(blob);
+                }
+                return Mono.just(blob);
+            });
+    }
+
+    @Override
+    public Publisher<BytesBlob> readBytes(BucketName bucketName, BlobId 
blobId) {
+        return Mono.from(underlying.readBytes(bucketName, blobId))
+            .flatMap(blob -> {
+                if (isCompressed(blob.metadata())) {
+                    return decompressBytesReactive(blob);
+                }
+                return Mono.just(blob);
+            });
+    }
+
+    @Override
+    public Publisher<Void> save(BucketName bucketName, BlobId blobId, Blob 
blob) {
+        return switch (blob) {
+            case BytesBlob bytesBlob -> save(bucketName, blobId, 
bytesBlob.payload(), bytesBlob.metadata());
+            case InputStreamBlob inputStreamBlob -> save(bucketName, blobId, 
inputStreamBlob.payload(), inputStreamBlob.metadata());
+            case ByteSourceBlob byteSourceBlob -> save(bucketName, blobId, 
byteSourceBlob.payload(), byteSourceBlob.metadata());
+        };
+    }
+
+    @Override
+    public Publisher<Void> delete(BucketName bucketName, BlobId blobId) {
+        return underlying.delete(bucketName, blobId);
+    }
+
+    @Override
+    public Publisher<Void> delete(BucketName bucketName, Collection<BlobId> 
blobIds) {
+        return underlying.delete(bucketName, blobIds);
+    }
+
+    @Override
+    public Publisher<Void> deleteBucket(BucketName bucketName) {
+        return underlying.deleteBucket(bucketName);
+    }
+
+    @Override
+    public Publisher<BucketName> listBuckets() {
+        return underlying.listBuckets();
+    }
+
+    @Override
+    public Publisher<BlobId> listBlobs(BucketName bucketName) {
+        return underlying.listBlobs(bucketName);
+    }
+
+    private InputStreamBlob decompress(InputStreamBlob blob) throws 
ObjectStoreIOException {
+        try {
+            decompressCount.increment();
+            return InputStreamBlob.of(new ZstdInputStream(blob.payload()), 
blob.metadata());
+        } catch (IOException e) {
+            throw new ObjectStoreIOException("Failed to initialize zstd 
decompression", e);
+        }
+    }
+
+    private Mono<InputStreamBlob> decompressReactive(InputStreamBlob blob) {
+        return Mono.fromCallable(() -> InputStreamBlob.of(new 
ZstdInputStream(blob.payload()), blob.metadata()))
+            .doOnNext(ignored -> decompressCount.increment())
+            .onErrorMap(IOException.class, e -> new 
ObjectStoreIOException("Failed to initialize zstd decompression", e));
+    }
+
+    private Mono<BytesBlob> decompressBytesReactive(BytesBlob blob) {
+        return Mono.fromCallable(() -> decompress(blob))
+            .subscribeOn(Schedulers.parallel())
+            .doOnNext(ignored -> decompressCount.increment())

Review Comment:
   Decompress metric can be obtained throught he count property of 
BLOB_ZSTD_DECOMPRESS_LATENCY_METRIC_NAME
   
   I suggest we remove.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to