chibenwa commented on code in PR #3016: URL: https://github.com/apache/james-project/pull/3016#discussion_r3115712121
########## server/blob/blob-zstd/src/main/java/org/apache/james/blob/zstd/ZstdBlobStoreDAO.java: ########## @@ -0,0 +1,452 @@ +/**************************************************************** + * Licensed to the Apache Software Foundation (ASF) under one * + * or more contributor license agreements. See the NOTICE file * + * distributed with this work for additional information * + * regarding copyright ownership. The ASF licenses this file * + * to you under the Apache License, Version 2.0 (the * + * "License"); you may not use this file except in compliance * + * with the License. You may obtain a copy of the License at * + * * + * http://www.apache.org/licenses/LICENSE-2.0 * + * * + * Unless required by applicable law or agreed to in writing, * + * software distributed under the License is distributed on an * + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY * + * KIND, either express or implied. See the License for the * + * specific language governing permissions and limitations * + * under the License. * + ****************************************************************/ + +package org.apache.james.blob.zstd; + +import java.io.IOException; +import java.io.InputStream; +import java.util.Collection; +import java.util.Map; +import java.util.Set; +import java.util.function.Function; +import java.util.function.Supplier; +import java.util.stream.Collectors; + +import org.apache.james.blob.api.BlobId; +import org.apache.james.blob.api.BlobStoreDAO; +import org.apache.james.blob.api.BucketName; +import org.apache.james.blob.api.ObjectNotFoundException; +import org.apache.james.blob.api.ObjectStoreIOException; +import org.apache.james.metrics.api.Metric; +import org.apache.james.metrics.api.MetricFactory; +import org.apache.james.metrics.api.TimeMetric; +import org.reactivestreams.Publisher; + +import com.github.fge.lambdas.Throwing; +import com.github.luben.zstd.Zstd; +import com.github.luben.zstd.ZstdInputStream; +import com.github.luben.zstd.ZstdOutputStream; +import com.google.common.io.ByteSource; +import com.google.common.io.CountingOutputStream; +import com.google.common.io.FileBackedOutputStream; + +import reactor.core.publisher.Mono; +import reactor.core.scheduler.Schedulers; + +public class ZstdBlobStoreDAO implements BlobStoreDAO { + private record CompressionDecision(long originalSize, long compressedSize) { + boolean satisfyCompressionMinRatio(float minCompressionRatio) { + // minCompressionRatio == 0 means decompress-only mode: never compress on save + if (minCompressionRatio == 0) { + return false; + } + + return ((float) compressedSize / originalSize) <= minCompressionRatio; + } + } + + public static final BlobMetadataName CONTENT_ORIGINAL_SIZE = new BlobMetadataName("content-original-size"); + public static final String BLOB_ZSTD_COMPRESS_SAVE_COUNT_METRIC_NAME = "blobZstdCompressSaveCount"; + public static final String BLOB_ZSTD_THRESHOLD_SKIP_COUNT_METRIC_NAME = "blobZstdThresholdSkipCount"; + public static final String BLOB_ZSTD_RATIO_SKIP_COUNT_METRIC_NAME = "blobZstdRatioSkipCount"; + public static final String BLOB_ZSTD_DECOMPRESS_COUNT_METRIC_NAME = "blobZstdDecompressCount"; + public static final String BLOB_ZSTD_SAVED_BYTES_METRIC_NAME = "blobZstdSavedBytes"; + public static final String BLOB_ZSTD_COMPRESS_LATENCY_METRIC_NAME = "blobZstdCompressLatency"; + public static final String BLOB_ZSTD_DECOMPRESS_LATENCY_METRIC_NAME = "blobZstdDecompressLatency"; + private static final int FILE_THRESHOLD = 100 * 1024; + private static final Set<BlobMetadataName> RESERVED_METADATA_NAMES = Set.of(ContentTransferEncoding.NAME, CONTENT_ORIGINAL_SIZE); + + private final BlobStoreDAO underlying; + private final CompressionConfiguration compressionConfiguration; + private final MetricFactory metricFactory; + private final Metric compressSaveCount; + private final Metric thresholdSkipCount; + private final Metric ratioSkipCount; + private final Metric decompressCount; + private final Metric savedBytes; + + public ZstdBlobStoreDAO(BlobStoreDAO underlying, CompressionConfiguration compressionConfiguration, MetricFactory metricFactory) { + this.underlying = underlying; + this.compressionConfiguration = compressionConfiguration; + this.metricFactory = metricFactory; + this.compressSaveCount = metricFactory.generate(BLOB_ZSTD_COMPRESS_SAVE_COUNT_METRIC_NAME); + this.thresholdSkipCount = metricFactory.generate(BLOB_ZSTD_THRESHOLD_SKIP_COUNT_METRIC_NAME); + this.ratioSkipCount = metricFactory.generate(BLOB_ZSTD_RATIO_SKIP_COUNT_METRIC_NAME); + this.decompressCount = metricFactory.generate(BLOB_ZSTD_DECOMPRESS_COUNT_METRIC_NAME); + this.savedBytes = metricFactory.generate(BLOB_ZSTD_SAVED_BYTES_METRIC_NAME); + } + + @Override + public InputStreamBlob read(BucketName bucketName, BlobId blobId) throws ObjectStoreIOException, ObjectNotFoundException { + InputStreamBlob blob = underlying.read(bucketName, blobId); + if (isCompressed(blob.metadata())) { + return decompress(blob); + } + return blob; + } + + @Override + public Publisher<InputStreamBlob> readReactive(BucketName bucketName, BlobId blobId) { + return Mono.from(underlying.readReactive(bucketName, blobId)) + .flatMap(blob -> { + if (isCompressed(blob.metadata())) { + return decompressReactive(blob); + } + return Mono.just(blob); + }); + } + + @Override + public Publisher<BytesBlob> readBytes(BucketName bucketName, BlobId blobId) { + return Mono.from(underlying.readBytes(bucketName, blobId)) + .flatMap(blob -> { + if (isCompressed(blob.metadata())) { + return decompressBytesReactive(blob); + } + return Mono.just(blob); + }); + } + + @Override + public Publisher<Void> save(BucketName bucketName, BlobId blobId, Blob blob) { + return switch (blob) { + case BytesBlob bytesBlob -> save(bucketName, blobId, bytesBlob); + case InputStreamBlob inputStreamBlob -> save(bucketName, blobId, inputStreamBlob); + case ByteSourceBlob byteSourceBlob -> save(bucketName, blobId, byteSourceBlob); + }; + } + + @Override + public Publisher<Void> delete(BucketName bucketName, BlobId blobId) { + return underlying.delete(bucketName, blobId); + } + + @Override + public Publisher<Void> delete(BucketName bucketName, Collection<BlobId> blobIds) { + return underlying.delete(bucketName, blobIds); + } + + @Override + public Publisher<Void> deleteBucket(BucketName bucketName) { + return underlying.deleteBucket(bucketName); + } + + @Override + public Publisher<BucketName> listBuckets() { + return underlying.listBuckets(); + } + + @Override + public Publisher<BlobId> listBlobs(BucketName bucketName) { + return underlying.listBlobs(bucketName); + } + + private InputStreamBlob decompress(InputStreamBlob blob) throws ObjectStoreIOException { + try { + decompressCount.increment(); + return InputStreamBlob.of(new ZstdInputStream(blob.payload()), blob.metadata()); + } catch (IOException e) { + throw new ObjectStoreIOException("Failed to initialize zstd decompression", e); + } + } + + private Mono<InputStreamBlob> decompressReactive(InputStreamBlob blob) { + return Mono.fromCallable(() -> InputStreamBlob.of(new ZstdInputStream(blob.payload()), blob.metadata())) + .doOnNext(ignored -> decompressCount.increment()) + .onErrorMap(IOException.class, e -> new ObjectStoreIOException("Failed to initialize zstd decompression", e)); + } + + private Mono<BytesBlob> decompressBytesReactive(BytesBlob blob) { + return Mono.fromCallable(() -> decompress(blob)) + .subscribeOn(Schedulers.parallel()) + .doOnNext(ignored -> decompressCount.increment()) + .onErrorMap(IOException.class, e -> new ObjectStoreIOException("Failed to decompress blob", e)); + } + + private BytesBlob decompress(BytesBlob blob) throws IOException { + TimeMetric timeMetric = metricFactory.timer(BLOB_ZSTD_DECOMPRESS_LATENCY_METRIC_NAME); + try { + return BytesBlob.of(Zstd.decompress(blob.payload(), originalSize(blob.metadata())), blob.metadata()); + } finally { + timeMetric.stopAndPublish(); + } + } + + private int originalSize(BlobMetadata metadata) throws IOException { + BlobMetadataValue sizeMetadata = metadata.get(CONTENT_ORIGINAL_SIZE) + .orElseThrow(() -> new IOException("Missing " + CONTENT_ORIGINAL_SIZE.name() + " metadata for compressed blob")); + + try { + long originalSize = Long.parseLong(sizeMetadata.value()); + return Math.toIntExact(originalSize); + } catch (NumberFormatException | ArithmeticException e) { + throw new IOException("Invalid " + CONTENT_ORIGINAL_SIZE.name() + " metadata value: " + sizeMetadata.value(), e); + } + } + + private Publisher<Void> save(BucketName bucketName, BlobId blobId, BytesBlob bytesBlob) { + byte[] data = bytesBlob.payload(); + BlobMetadata sanitizedMetadata = sanitizeMetadata(bytesBlob.metadata()); + BytesBlob uncompressedBlob = BytesBlob.of(data, sanitizedMetadata); + + if (shouldAttemptCompression(data.length)) { + return Mono.fromCallable(() -> compress(data)) + .subscribeOn(Schedulers.parallel()) + .flatMap(compressed -> { + CompressionDecision compressionDecision = compressionDecision(data.length, compressed.length); + if (compressionDecision.satisfyCompressionMinRatio(compressionConfiguration.minRatio())) { + return saveCompressed(bucketName, blobId, + BytesBlob.of(compressed, withCompressionMetadata(sanitizedMetadata, data.length)), + uncompressedBlob); + } + + return saveOriginal(bucketName, blobId, uncompressedBlob) + .doOnSuccess(ignored -> ratioSkipCount.increment()); + }) + .onErrorMap(IOException.class, e -> new ObjectStoreIOException("Error saving blob " + blobId.asString(), e)); + } + + recordThresholdSkipIfNeeded(data.length); + return Mono.from(underlying.save(bucketName, blobId, uncompressedBlob)); Review Comment: ```suggestion recordThresholdSkipIfNeeded(data.length); return Mono.from(underlying.save(bucketName, blobId, uncompressedBlob)) .doOnSuccess(ignored -> metricRecorder.recordSkip()); ``` Use do on success for consistency Get rid of this "if needed". If compression is enabled and we get in here it means size criteria is not met otherwize we would be handled in saveCompressedIfWorthWhile -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: [email protected] For queries about this service, please contact Infrastructure at: [email protected] --------------------------------------------------------------------- To unsubscribe, e-mail: [email protected] For additional commands, e-mail: [email protected]
