quantranhong1999 commented on code in PR #3016: URL: https://github.com/apache/james-project/pull/3016#discussion_r3108228184
########## server/blob/blob-zstd/src/main/java/org/apache/james/blob/zstd/ZstdBlobStoreDAO.java: ########## @@ -0,0 +1,377 @@ +/**************************************************************** + * Licensed to the Apache Software Foundation (ASF) under one * + * or more contributor license agreements. See the NOTICE file * + * distributed with this work for additional information * + * regarding copyright ownership. The ASF licenses this file * + * to you under the Apache License, Version 2.0 (the * + * "License"); you may not use this file except in compliance * + * with the License. You may obtain a copy of the License at * + * * + * http://www.apache.org/licenses/LICENSE-2.0 * + * * + * Unless required by applicable law or agreed to in writing, * + * software distributed under the License is distributed on an * + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY * + * KIND, either express or implied. See the License for the * + * specific language governing permissions and limitations * + * under the License. * + ****************************************************************/ + +package org.apache.james.blob.zstd; + +import java.io.IOException; +import java.io.InputStream; +import java.util.Collection; +import java.util.Map; +import java.util.Set; +import java.util.stream.Collectors; + +import org.apache.james.blob.api.BlobId; +import org.apache.james.blob.api.BlobStoreDAO; +import org.apache.james.blob.api.BucketName; +import org.apache.james.blob.api.ObjectNotFoundException; +import org.apache.james.blob.api.ObjectStoreIOException; +import org.apache.james.metrics.api.Metric; +import org.apache.james.metrics.api.MetricFactory; +import org.apache.james.metrics.api.TimeMetric; +import org.reactivestreams.Publisher; + +import com.github.fge.lambdas.Throwing; +import com.github.luben.zstd.Zstd; +import com.github.luben.zstd.ZstdInputStream; +import com.github.luben.zstd.ZstdOutputStream; +import com.google.common.io.ByteSource; +import com.google.common.io.CountingOutputStream; +import com.google.common.io.FileBackedOutputStream; + +import reactor.core.publisher.Mono; +import reactor.core.scheduler.Schedulers; + +public class ZstdBlobStoreDAO implements BlobStoreDAO { + private record CompressionDecision(long originalSize, long compressedSize) { + boolean satisfyCompressionMinRatio(float minRatio) { + return ((float) compressedSize / originalSize) <= minRatio; + } + } + + public static final BlobMetadataName CONTENT_ORIGINAL_SIZE = new BlobMetadataName("content-original-size"); + public static final String BLOB_ZSTD_COMPRESS_SAVE_COUNT_METRIC_NAME = "blobZstdCompressSaveCount"; + public static final String BLOB_ZSTD_THRESHOLD_SKIP_COUNT_METRIC_NAME = "blobZstdThresholdSkipCount"; + public static final String BLOB_ZSTD_RATIO_SKIP_COUNT_METRIC_NAME = "blobZstdRatioSkipCount"; + public static final String BLOB_ZSTD_DECOMPRESS_COUNT_METRIC_NAME = "blobZstdDecompressCount"; + public static final String BLOB_ZSTD_SAVED_BYTES_METRIC_NAME = "blobZstdSavedBytes"; + public static final String BLOB_ZSTD_COMPRESS_LATENCY_METRIC_NAME = "blobZstdCompressLatency"; + public static final String BLOB_ZSTD_DECOMPRESS_LATENCY_METRIC_NAME = "blobZstdDecompressLatency"; + private static final int FILE_THRESHOLD = 100 * 1024; + private static final Set<BlobMetadataName> RESERVED_METADATA_NAMES = Set.of(ContentTransferEncoding.NAME, CONTENT_ORIGINAL_SIZE); + + private final BlobStoreDAO underlying; + private final CompressionConfiguration compressionConfiguration; + private final MetricFactory metricFactory; + private final Metric compressSaveCount; + private final Metric thresholdSkipCount; + private final Metric ratioSkipCount; + private final Metric decompressCount; + private final Metric savedBytes; + + public ZstdBlobStoreDAO(BlobStoreDAO underlying, CompressionConfiguration compressionConfiguration, MetricFactory metricFactory) { + this.underlying = underlying; + this.compressionConfiguration = compressionConfiguration; + this.metricFactory = metricFactory; + this.compressSaveCount = metricFactory.generate(BLOB_ZSTD_COMPRESS_SAVE_COUNT_METRIC_NAME); + this.thresholdSkipCount = metricFactory.generate(BLOB_ZSTD_THRESHOLD_SKIP_COUNT_METRIC_NAME); + this.ratioSkipCount = metricFactory.generate(BLOB_ZSTD_RATIO_SKIP_COUNT_METRIC_NAME); + this.decompressCount = metricFactory.generate(BLOB_ZSTD_DECOMPRESS_COUNT_METRIC_NAME); + this.savedBytes = metricFactory.generate(BLOB_ZSTD_SAVED_BYTES_METRIC_NAME); + } + + @Override + public InputStreamBlob read(BucketName bucketName, BlobId blobId) throws ObjectStoreIOException, ObjectNotFoundException { + InputStreamBlob blob = underlying.read(bucketName, blobId); + if (isCompressed(blob.metadata())) { + return decompress(blob); + } + return blob; + } + + @Override + public Publisher<InputStreamBlob> readReactive(BucketName bucketName, BlobId blobId) { + return Mono.from(underlying.readReactive(bucketName, blobId)) + .flatMap(blob -> { + if (isCompressed(blob.metadata())) { + return decompressReactive(blob); + } + return Mono.just(blob); + }); + } + + @Override + public Publisher<BytesBlob> readBytes(BucketName bucketName, BlobId blobId) { + return Mono.from(underlying.readBytes(bucketName, blobId)) + .flatMap(blob -> { + if (isCompressed(blob.metadata())) { + return decompressBytesReactive(blob); + } + return Mono.just(blob); + }); + } + + @Override + public Publisher<Void> save(BucketName bucketName, BlobId blobId, Blob blob) { + return switch (blob) { + case BytesBlob bytesBlob -> save(bucketName, blobId, bytesBlob.payload(), bytesBlob.metadata()); + case InputStreamBlob inputStreamBlob -> save(bucketName, blobId, inputStreamBlob.payload(), inputStreamBlob.metadata()); + case ByteSourceBlob byteSourceBlob -> save(bucketName, blobId, byteSourceBlob.payload(), byteSourceBlob.metadata()); + }; + } + + @Override + public Publisher<Void> delete(BucketName bucketName, BlobId blobId) { + return underlying.delete(bucketName, blobId); + } + + @Override + public Publisher<Void> delete(BucketName bucketName, Collection<BlobId> blobIds) { + return underlying.delete(bucketName, blobIds); + } + + @Override + public Publisher<Void> deleteBucket(BucketName bucketName) { + return underlying.deleteBucket(bucketName); + } + + @Override + public Publisher<BucketName> listBuckets() { + return underlying.listBuckets(); + } + + @Override + public Publisher<BlobId> listBlobs(BucketName bucketName) { + return underlying.listBlobs(bucketName); + } + + private InputStreamBlob decompress(InputStreamBlob blob) throws ObjectStoreIOException { + try { + decompressCount.increment(); + return InputStreamBlob.of(new ZstdInputStream(blob.payload()), blob.metadata()); + } catch (IOException e) { + throw new ObjectStoreIOException("Failed to initialize zstd decompression", e); + } + } + + private Mono<InputStreamBlob> decompressReactive(InputStreamBlob blob) { + return Mono.fromCallable(() -> InputStreamBlob.of(new ZstdInputStream(blob.payload()), blob.metadata())) + .doOnNext(ignored -> decompressCount.increment()) + .onErrorMap(IOException.class, e -> new ObjectStoreIOException("Failed to initialize zstd decompression", e)); + } + + private Mono<BytesBlob> decompressBytesReactive(BytesBlob blob) { + return Mono.fromCallable(() -> decompress(blob)) + .subscribeOn(Schedulers.parallel()) + .doOnNext(ignored -> decompressCount.increment()) Review Comment: Likely not. It is a bit more complicated than that. `blobZstdDecompressLatency` only measures eager decompression in `readBytes()`. It does not cover: - `read()` - `readReactive()` because those two return a `ZstdInputStream`, and the actual decompression happens later when the caller consumes the stream. So for the current implementation: - `BLOB_ZSTD_DECOMPRESS_COUNT_METRIC_NAME` counts all decompression paths - `read()` - `readReactive()` - `readBytes()` - `BLOB_ZSTD_DECOMPRESS_LATENCY_METRIC_NAME` only measures `readBytes()` -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: [email protected] For queries about this service, please contact Infrastructure at: [email protected] --------------------------------------------------------------------- To unsubscribe, e-mail: [email protected] For additional commands, e-mail: [email protected]
