This is an automated email from the ASF dual-hosted git repository.

quantranhong1999 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/james-project.git


The following commit(s) were added to refs/heads/master by this push:
     new 5792107ba0 JAMES-4182 Implement ZstdBlobStoreDAO (#3016)
5792107ba0 is described below

commit 5792107ba06bbad84d9ffea1db378b16237f5b26
Author: Trần Hồng Quân <[email protected]>
AuthorDate: Wed Apr 22 15:43:31 2026 +0700

    JAMES-4182 Implement ZstdBlobStoreDAO (#3016)
---
 pom.xml                                            |  11 +
 server/blob/blob-zstd/pom.xml                      |  96 +++++
 .../james/blob/zstd/CompressionConfiguration.java  |  75 ++++
 .../apache/james/blob/zstd/ZstdBlobStoreDAO.java   | 465 +++++++++++++++++++++
 .../blob/zstd/CompressionConfigurationTest.java    |  97 +++++
 .../james/blob/zstd/ZstdBlobStoreDAOTest.java      | 453 ++++++++++++++++++++
 .../blob-zstd/src/test/resources/zstd/document.pdf | Bin 0 -> 49672 bytes
 .../src/test/resources/zstd/james-logo.jpg         | Bin 0 -> 22168 bytes
 .../blob-zstd/src/test/resources/zstd/mail1.eml    |  69 +++
 .../blob-zstd/src/test/resources/zstd/text.txt     | 249 +++++++++++
 server/blob/pom.xml                                |   1 +
 11 files changed, 1516 insertions(+)

diff --git a/pom.xml b/pom.xml
index c354a41ac8..3fae88f68a 100644
--- a/pom.xml
+++ b/pom.xml
@@ -679,6 +679,7 @@
         <guice.version>7.0.0</guice.version>
         <logback.version>1.5.19</logback.version>
         <tink.version>1.17.0</tink.version>
+        <zstd-jni.version>1.5.7-7</zstd-jni.version>
         <lettuce.core.version>6.7.1.RELEASE</lettuce.core.version>
         <io.micrometer.core.version>1.15.1</io.micrometer.core.version>
         <io.micrometer.tracing.version>1.5.1</io.micrometer.tracing.version>
@@ -1233,6 +1234,11 @@
                 <version>${project.version}</version>
                 <type>test-jar</type>
             </dependency>
+            <dependency>
+                <groupId>${james.groupId}</groupId>
+                <artifactId>blob-zstd</artifactId>
+                <version>${project.version}</version>
+            </dependency>
             <dependency>
                 <groupId>${james.groupId}</groupId>
                 <artifactId>dead-letter-cassandra</artifactId>
@@ -2227,6 +2233,11 @@
                 <artifactId>throwing-lambdas</artifactId>
                 <version>0.5.0</version>
             </dependency>
+            <dependency>
+                <groupId>com.github.luben</groupId>
+                <artifactId>zstd-jni</artifactId>
+                <version>${zstd-jni.version}</version>
+            </dependency>
             <dependency>
                 <groupId>com.github.spullara.mustache.java</groupId>
                 <artifactId>compiler</artifactId>
diff --git a/server/blob/blob-zstd/pom.xml b/server/blob/blob-zstd/pom.xml
new file mode 100644
index 0000000000..053921f2cb
--- /dev/null
+++ b/server/blob/blob-zstd/pom.xml
@@ -0,0 +1,96 @@
+<?xml version="1.0" encoding="UTF-8"?>
+<!--
+    Licensed to the Apache Software Foundation (ASF) under one
+    or more contributor license agreements. See the NOTICE file
+    distributed with this work for additional information
+    regarding copyright ownership. The ASF licenses this file
+    to you under the Apache License, Version 2.0 (the
+    "License"); you may not use this file except in compliance
+    with the License. You may obtain a copy of the License at
+
+    http://www.apache.org/licenses/LICENSE-2.0
+
+    Unless required by applicable law or agreed to in writing,
+    software distributed under the License is distributed on an
+    "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+    KIND, either express or implied. See the License for the
+    specific language governing permissions and limitations
+    under the License.
+-->
+<project xmlns="http://maven.apache.org/POM/4.0.0"; 
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"; 
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 
http://maven.apache.org/xsd/maven-4.0.0.xsd";>
+    <modelVersion>4.0.0</modelVersion>
+
+    <parent>
+        <groupId>org.apache.james</groupId>
+        <artifactId>james-project</artifactId>
+        <version>3.10.0-SNAPSHOT</version>
+        <relativePath>../../../pom.xml</relativePath>
+    </parent>
+
+    <artifactId>blob-zstd</artifactId>
+    <name>Apache James :: Server :: Blob :: Zstd compression</name>
+
+    <dependencies>
+        <dependency>
+            <groupId>${james.groupId}</groupId>
+            <artifactId>blob-api</artifactId>
+        </dependency>
+        <dependency>
+            <groupId>${james.groupId}</groupId>
+            <artifactId>blob-api</artifactId>
+            <type>test-jar</type>
+            <scope>test</scope>
+        </dependency>
+        <dependency>
+            <groupId>${james.groupId}</groupId>
+            <artifactId>blob-s3</artifactId>
+            <scope>test</scope>
+        </dependency>
+        <dependency>
+            <groupId>${james.groupId}</groupId>
+            <artifactId>blob-s3</artifactId>
+            <type>test-jar</type>
+            <scope>test</scope>
+        </dependency>
+        <dependency>
+            <groupId>${james.groupId}</groupId>
+            <artifactId>metrics-api</artifactId>
+        </dependency>
+        <dependency>
+            <groupId>${james.groupId}</groupId>
+            <artifactId>metrics-tests</artifactId>
+            <scope>test</scope>
+        </dependency>
+        <dependency>
+            <groupId>${james.groupId}</groupId>
+            <artifactId>testing-base</artifactId>
+            <scope>test</scope>
+        </dependency>
+        <dependency>
+            <groupId>com.github.fge</groupId>
+            <artifactId>throwing-lambdas</artifactId>
+        </dependency>
+        <dependency>
+            <groupId>com.github.luben</groupId>
+            <artifactId>zstd-jni</artifactId>
+        </dependency>
+        <dependency>
+            <groupId>com.google.guava</groupId>
+            <artifactId>guava</artifactId>
+        </dependency>
+        <dependency>
+            <groupId>io.projectreactor</groupId>
+            <artifactId>reactor-core</artifactId>
+        </dependency>
+        <dependency>
+            <groupId>org.testcontainers</groupId>
+            <artifactId>junit-jupiter</artifactId>
+            <scope>test</scope>
+        </dependency>
+        <dependency>
+            <groupId>org.testcontainers</groupId>
+            <artifactId>testcontainers</artifactId>
+            <scope>test</scope>
+        </dependency>
+    </dependencies>
+</project>
diff --git 
a/server/blob/blob-zstd/src/main/java/org/apache/james/blob/zstd/CompressionConfiguration.java
 
b/server/blob/blob-zstd/src/main/java/org/apache/james/blob/zstd/CompressionConfiguration.java
new file mode 100644
index 0000000000..2d35010ec6
--- /dev/null
+++ 
b/server/blob/blob-zstd/src/main/java/org/apache/james/blob/zstd/CompressionConfiguration.java
@@ -0,0 +1,75 @@
+/****************************************************************
+ * Licensed to the Apache Software Foundation (ASF) under one   *
+ * or more contributor license agreements.  See the NOTICE file *
+ * distributed with this work for additional information        *
+ * regarding copyright ownership.  The ASF licenses this file   *
+ * to you under the Apache License, Version 2.0 (the            *
+ * "License"); you may not use this file except in compliance   *
+ * with the License.  You may obtain a copy of the License at   *
+ *                                                              *
+ *   http://www.apache.org/licenses/LICENSE-2.0                 *
+ *                                                              *
+ * Unless required by applicable law or agreed to in writing,   *
+ * software distributed under the License is distributed on an  *
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY       *
+ * KIND, either express or implied.  See the License for the    *
+ * specific language governing permissions and limitations      *
+ * under the License.                                           *
+ ****************************************************************/
+
+package org.apache.james.blob.zstd;
+
+public record CompressionConfiguration(boolean enabled, long threshold, float 
minRatio) {
+    public static final boolean DISABLED = false;
+    public static final long DEFAULT_THRESHOLD = 16 * 1024L;
+    public static final float DEFAULT_MIN_RATIO = 1F;
+    public static final CompressionConfiguration DEFAULT = builder().build();
+
+    public static class Builder {
+        private boolean enabled = DISABLED;
+        private long threshold = DEFAULT_THRESHOLD;
+        private float minRatio = DEFAULT_MIN_RATIO;
+
+        public Builder enabled(boolean enabled) {
+            this.enabled = enabled;
+            return this;
+        }
+
+        public Builder threshold(long threshold) {
+            if (threshold <= 0) {
+                throw new IllegalArgumentException("'threshold' needs to be 
strictly positive");
+            }
+            this.threshold = threshold;
+            return this;
+        }
+
+        public Builder minRatio(float minRatio) {
+            if (minRatio < 0 || minRatio > 1) {
+                throw new IllegalArgumentException("'minRatio' needs to be 
between 0 and 1");
+            }
+            this.minRatio = minRatio;
+            return this;
+        }
+
+        public CompressionConfiguration build() {
+            return new CompressionConfiguration(enabled, threshold, minRatio);
+        }
+    }
+
+    public CompressionConfiguration {
+        if (threshold <= 0) {
+            throw new IllegalArgumentException("'threshold' needs to be 
strictly positive");
+        }
+        if (minRatio < 0 || minRatio > 1) {
+            throw new IllegalArgumentException("'minRatio' needs to be between 
0 and 1");
+        }
+    }
+
+    public static Builder builder() {
+        return new Builder();
+    }
+
+    public static CompressionConfiguration disabled() {
+        return DEFAULT;
+    }
+}
diff --git 
a/server/blob/blob-zstd/src/main/java/org/apache/james/blob/zstd/ZstdBlobStoreDAO.java
 
b/server/blob/blob-zstd/src/main/java/org/apache/james/blob/zstd/ZstdBlobStoreDAO.java
new file mode 100644
index 0000000000..1425b5c3b5
--- /dev/null
+++ 
b/server/blob/blob-zstd/src/main/java/org/apache/james/blob/zstd/ZstdBlobStoreDAO.java
@@ -0,0 +1,465 @@
+/****************************************************************
+ * Licensed to the Apache Software Foundation (ASF) under one   *
+ * or more contributor license agreements.  See the NOTICE file *
+ * distributed with this work for additional information        *
+ * regarding copyright ownership.  The ASF licenses this file   *
+ * to you under the Apache License, Version 2.0 (the            *
+ * "License"); you may not use this file except in compliance   *
+ * with the License.  You may obtain a copy of the License at   *
+ *                                                              *
+ *   http://www.apache.org/licenses/LICENSE-2.0                 *
+ *                                                              *
+ * Unless required by applicable law or agreed to in writing,   *
+ * software distributed under the License is distributed on an  *
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY       *
+ * KIND, either express or implied.  See the License for the    *
+ * specific language governing permissions and limitations      *
+ * under the License.                                           *
+ ****************************************************************/
+
+package org.apache.james.blob.zstd;
+
+import java.io.IOException;
+import java.io.InputStream;
+import java.util.Collection;
+import java.util.Set;
+import java.util.function.Function;
+import java.util.function.Supplier;
+import java.util.stream.Collectors;
+
+import org.apache.james.blob.api.BlobId;
+import org.apache.james.blob.api.BlobStoreDAO;
+import org.apache.james.blob.api.BucketName;
+import org.apache.james.blob.api.ObjectNotFoundException;
+import org.apache.james.blob.api.ObjectStoreIOException;
+import org.apache.james.metrics.api.Metric;
+import org.apache.james.metrics.api.MetricFactory;
+import org.apache.james.metrics.api.TimeMetric;
+import org.reactivestreams.Publisher;
+
+import com.github.fge.lambdas.Throwing;
+import com.github.luben.zstd.Zstd;
+import com.github.luben.zstd.ZstdInputStream;
+import com.github.luben.zstd.ZstdOutputStream;
+import com.google.common.io.ByteSource;
+import com.google.common.io.CountingOutputStream;
+import com.google.common.io.FileBackedOutputStream;
+
+import reactor.core.publisher.Mono;
+import reactor.core.scheduler.Schedulers;
+
+public class ZstdBlobStoreDAO implements BlobStoreDAO {
+    private record CompressionDecision(long originalSize, long compressedSize) 
{
+        boolean satisfyCompressionMinRatio(float minCompressionRatio) {
+            // minCompressionRatio == 0 means decompress-only mode: never 
compress on save
+            if (minCompressionRatio == 0) {
+                return false;
+            }
+
+            return ((float) compressedSize / originalSize) <= 
minCompressionRatio;
+        }
+    }
+
+    static final class MetricRecorder {
+        static final String BLOB_ZSTD_COMPRESS_SAVE_COUNT_METRIC_NAME = 
"blobZstdCompressSaveCount";
+        static final String BLOB_ZSTD_THRESHOLD_SKIP_COUNT_METRIC_NAME = 
"blobZstdThresholdSkipCount";
+        static final String BLOB_ZSTD_RATIO_SKIP_COUNT_METRIC_NAME = 
"blobZstdRatioSkipCount";
+        static final String BLOB_ZSTD_DECOMPRESS_COUNT_METRIC_NAME = 
"blobZstdDecompressCount";
+        static final String BLOB_ZSTD_SAVED_BYTES_METRIC_NAME = 
"blobZstdSavedBytes";
+        static final String BLOB_ZSTD_COMPRESS_LATENCY_METRIC_NAME = 
"blobZstdCompressLatency";
+        static final String BLOB_ZSTD_DECOMPRESS_LATENCY_METRIC_NAME = 
"blobZstdDecompressLatency";
+
+        private final MetricFactory metricFactory;
+        private final Metric compressSaveCount;
+        private final Metric thresholdSkipCount;
+        private final Metric ratioSkipCount;
+        private final Metric decompressCount;
+        private final Metric savedBytes;
+
+        private MetricRecorder(MetricFactory metricFactory) {
+            this.metricFactory = metricFactory;
+            this.compressSaveCount = 
metricFactory.generate(BLOB_ZSTD_COMPRESS_SAVE_COUNT_METRIC_NAME);
+            this.thresholdSkipCount = 
metricFactory.generate(BLOB_ZSTD_THRESHOLD_SKIP_COUNT_METRIC_NAME);
+            this.ratioSkipCount = 
metricFactory.generate(BLOB_ZSTD_RATIO_SKIP_COUNT_METRIC_NAME);
+            this.decompressCount = 
metricFactory.generate(BLOB_ZSTD_DECOMPRESS_COUNT_METRIC_NAME);
+            this.savedBytes = 
metricFactory.generate(BLOB_ZSTD_SAVED_BYTES_METRIC_NAME);
+        }
+
+        TimeMetric startCompressionLatencyTimer() {
+            return metricFactory.timer(BLOB_ZSTD_COMPRESS_LATENCY_METRIC_NAME);
+        }
+
+        TimeMetric startDecompressionLatencyTimer() {
+            return 
metricFactory.timer(BLOB_ZSTD_DECOMPRESS_LATENCY_METRIC_NAME);
+        }
+
+        void recordCompressedSave(long originalSize, long compressedSize) {
+            compressSaveCount.increment();
+            savedBytes.add(Math.toIntExact(originalSize - compressedSize));
+        }
+
+        void recordThresholdSkip() {
+            thresholdSkipCount.increment();
+        }
+
+        void recordRatioSkip() {
+            ratioSkipCount.increment();
+        }
+
+        void recordDecompression() {
+            decompressCount.increment();
+        }
+    }
+
+    public static final BlobMetadataName CONTENT_ORIGINAL_SIZE = new 
BlobMetadataName("content-original-size");
+    private static final int FILE_THRESHOLD = 100 * 1024;
+    private static final Set<BlobMetadataName> RESERVED_METADATA_NAMES = 
Set.of(ContentTransferEncoding.NAME, CONTENT_ORIGINAL_SIZE);
+
+    private final BlobStoreDAO underlying;
+    private final CompressionConfiguration compressionConfiguration;
+    private final MetricRecorder metricRecorder;
+
+    public ZstdBlobStoreDAO(BlobStoreDAO underlying, CompressionConfiguration 
compressionConfiguration, MetricFactory metricFactory) {
+        this.underlying = underlying;
+        this.compressionConfiguration = compressionConfiguration;
+        this.metricRecorder = new MetricRecorder(metricFactory);
+    }
+
+    @Override
+    public InputStreamBlob read(BucketName bucketName, BlobId blobId) throws 
ObjectStoreIOException, ObjectNotFoundException {
+        InputStreamBlob blob = underlying.read(bucketName, blobId);
+        if (isCompressed(blob.metadata())) {
+            return decompress(blob);
+        }
+        return blob;
+    }
+
+    @Override
+    public Publisher<InputStreamBlob> readReactive(BucketName bucketName, 
BlobId blobId) {
+        return Mono.from(underlying.readReactive(bucketName, blobId))
+            .flatMap(blob -> {
+                if (isCompressed(blob.metadata())) {
+                    return decompressReactive(blob);
+                }
+                return Mono.just(blob);
+            });
+    }
+
+    @Override
+    public Publisher<BytesBlob> readBytes(BucketName bucketName, BlobId 
blobId) {
+        return Mono.from(underlying.readBytes(bucketName, blobId))
+            .flatMap(blob -> {
+                if (isCompressed(blob.metadata())) {
+                    return decompressBytesReactive(blob);
+                }
+                return Mono.just(blob);
+            });
+    }
+
+    @Override
+    public Publisher<Void> save(BucketName bucketName, BlobId blobId, Blob 
blob) {
+        return switch (blob) {
+            case BytesBlob bytesBlob -> save(bucketName, blobId, bytesBlob);
+            case InputStreamBlob inputStreamBlob -> save(bucketName, blobId, 
inputStreamBlob);
+            case ByteSourceBlob byteSourceBlob -> save(bucketName, blobId, 
byteSourceBlob);
+        };
+    }
+
+    @Override
+    public Publisher<Void> delete(BucketName bucketName, BlobId blobId) {
+        return underlying.delete(bucketName, blobId);
+    }
+
+    @Override
+    public Publisher<Void> delete(BucketName bucketName, Collection<BlobId> 
blobIds) {
+        return underlying.delete(bucketName, blobIds);
+    }
+
+    @Override
+    public Publisher<Void> deleteBucket(BucketName bucketName) {
+        return underlying.deleteBucket(bucketName);
+    }
+
+    @Override
+    public Publisher<BucketName> listBuckets() {
+        return underlying.listBuckets();
+    }
+
+    @Override
+    public Publisher<BlobId> listBlobs(BucketName bucketName) {
+        return underlying.listBlobs(bucketName);
+    }
+
+    private InputStreamBlob decompress(InputStreamBlob blob) throws 
ObjectStoreIOException {
+        try {
+            metricRecorder.recordDecompression();
+            return InputStreamBlob.of(new ZstdInputStream(blob.payload()), 
blob.metadata());
+        } catch (IOException e) {
+            throw new ObjectStoreIOException("Failed to initialize zstd 
decompression", e);
+        }
+    }
+
+    private Mono<InputStreamBlob> decompressReactive(InputStreamBlob blob) {
+        return Mono.fromCallable(() -> InputStreamBlob.of(new 
ZstdInputStream(blob.payload()), blob.metadata()))
+            .doOnNext(ignored -> metricRecorder.recordDecompression())
+            .onErrorMap(IOException.class, e -> new 
ObjectStoreIOException("Failed to initialize zstd decompression", e));
+    }
+
+    private Mono<BytesBlob> decompressBytesReactive(BytesBlob blob) {
+        return Mono.fromCallable(() -> decompress(blob))
+            .subscribeOn(Schedulers.parallel())
+            .doOnNext(ignored -> metricRecorder.recordDecompression())
+            .onErrorMap(IOException.class, e -> new 
ObjectStoreIOException("Failed to decompress blob", e));
+    }
+
+    private BytesBlob decompress(BytesBlob blob) throws IOException {
+        TimeMetric timeMetric = 
metricRecorder.startDecompressionLatencyTimer();
+        try {
+            return BytesBlob.of(Zstd.decompress(blob.payload(), 
originalSize(blob.metadata())), blob.metadata());
+        } finally {
+            timeMetric.stopAndPublish();
+        }
+    }
+
+    private int originalSize(BlobMetadata metadata) throws IOException {
+        BlobMetadataValue sizeMetadata = metadata.get(CONTENT_ORIGINAL_SIZE)
+            .orElseThrow(() -> new IOException("Missing " + 
CONTENT_ORIGINAL_SIZE.name() + " metadata for compressed blob"));
+
+        try {
+            long originalSize = Long.parseLong(sizeMetadata.value());
+            return Math.toIntExact(originalSize);
+        } catch (NumberFormatException | ArithmeticException e) {
+            throw new IOException("Invalid " + CONTENT_ORIGINAL_SIZE.name() + 
" metadata value: " + sizeMetadata.value(), e);
+        }
+    }
+
+    private Publisher<Void> save(BucketName bucketName, BlobId blobId, 
BytesBlob bytesBlob) {
+        validateMetadata(bytesBlob.metadata());
+
+        if (shouldAttemptCompression(bytesBlob.payload().length)) {
+            return Mono.fromCallable(() -> compress(bytesBlob.payload()))
+                .subscribeOn(Schedulers.parallel())
+                .flatMap(compressed -> 
saveCompressedIfWorthKeeping(bucketName, blobId,
+                    BytesBlob.of(compressed, 
withCompressionMetadata(bytesBlob.metadata(), bytesBlob.payload().length)),
+                    bytesBlob))
+                .onErrorMap(IOException.class, e -> new 
ObjectStoreIOException("Error saving blob " + blobId.asString(), e));
+        }
+
+        return Mono.from(underlying.save(bucketName, blobId, bytesBlob))
+            .doOnSuccess(ignored -> {
+                if (compressionEnabled() && bytesBlob.payload().length < 
compressionConfiguration.threshold()) {
+                    metricRecorder.recordThresholdSkip();
+                }
+            });
+    }
+
+    private Mono<Void> saveCompressedIfWorthKeeping(BucketName bucketName, 
BlobId blobId, BytesBlob compressedBlob,
+                                                    BytesBlob 
uncompressedBlob) {
+        CompressionDecision compressionDecision = 
compressionDecision(uncompressedBlob.payload().length, 
compressedBlob.payload().length);
+        if 
(compressionDecision.satisfyCompressionMinRatio(compressionConfiguration.minRatio()))
 {
+            return Mono.from(underlying.save(bucketName, blobId, 
compressedBlob))
+                .doOnSuccess(ignored -> 
metricRecorder.recordCompressedSave(uncompressedBlob.payload().length, 
compressedBlob.payload().length));
+        }
+        return Mono.from(underlying.save(bucketName, blobId, uncompressedBlob))
+            .doOnSuccess(ignored -> metricRecorder.recordRatioSkip());
+    }
+
+    private Publisher<Void> save(BucketName bucketName, BlobId blobId, 
InputStreamBlob inputStreamBlob) {
+        validateMetadata(inputStreamBlob.metadata());
+
+        if (!compressionEnabled()) {
+            return Mono.from(underlying.save(bucketName, blobId, 
inputStreamBlob));
+        }
+
+        return Mono.usingWhen(Mono.fromCallable(() -> new 
FileBackedOutputStream(FILE_THRESHOLD)),
+                originalContent -> Mono.fromCallable(() -> 
inputStreamBlob.payload().transferTo(originalContent))
+                    .flatMap(originalSize -> {
+                        if (originalSize >= 
compressionConfiguration.threshold()) {
+                            return compressAndSave(bucketName, blobId, 
originalContent, originalSize, inputStreamBlob.metadata());
+                        }
+
+                        return Mono.from(underlying.save(bucketName, blobId, 
byteSourceBlobWithSize(originalContent.asByteSource(), originalSize, 
inputStreamBlob.metadata())))
+                            .doOnSuccess(ignored -> 
metricRecorder.recordThresholdSkip());
+                    }),
+                originalContent -> 
Mono.fromRunnable(Throwing.runnable(originalContent::reset)).subscribeOn(Schedulers.boundedElastic()))
+            .subscribeOn(Schedulers.boundedElastic())
+            .onErrorMap(IOException.class, e -> new 
ObjectStoreIOException("Error saving blob " + blobId.asString(), e));
+    }
+
+    private Publisher<Void> save(BucketName bucketName, BlobId blobId, 
ByteSourceBlob byteSourceBlob) {
+        validateMetadata(byteSourceBlob.metadata());
+
+        if (!compressionEnabled()) {
+            return Mono.from(underlying.save(bucketName, blobId, 
byteSourceBlob));
+        }
+
+        return Mono.fromCallable(() -> resolveSize(byteSourceBlob.payload()))
+            .flatMap(originalSize -> {
+                if (originalSize < compressionConfiguration.threshold()) {
+                    return Mono.from(underlying.save(bucketName, blobId, 
byteSourceBlobWithSize(byteSourceBlob.payload(), originalSize, 
byteSourceBlob.metadata())))
+                        .doOnSuccess(ignored -> 
metricRecorder.recordThresholdSkip());
+                }
+
+                return compressAndSave(bucketName, blobId, byteSourceBlob, 
originalSize);
+            })
+            .subscribeOn(Schedulers.boundedElastic())
+            .onErrorMap(IOException.class, e -> new 
ObjectStoreIOException("Error saving blob " + blobId.asString(), e));
+    }
+
+    private Mono<Void> compressAndSave(BucketName bucketName, BlobId blobId, 
FileBackedOutputStream originalContent,
+                                       long originalSize, BlobMetadata 
metadata) {
+        return compressAndSave(bucketName, blobId, originalSize, metadata,
+            compressContent -> prepareCompressedContent(originalContent, 
originalSize, compressContent),
+            () -> Mono.from(underlying.save(bucketName, blobId, 
byteSourceBlobWithSize(originalContent.asByteSource(), originalSize, metadata)))
+                .doOnSuccess(ignored -> metricRecorder.recordRatioSkip()));
+    }
+
+    private Mono<Void> compressAndSave(BucketName bucketName, BlobId blobId, 
ByteSourceBlob byteSourceBlob, long originalSize) {
+        return compressAndSave(bucketName, blobId, originalSize, 
byteSourceBlob.metadata(),
+            compressContent -> 
prepareCompressedContent(byteSourceBlob.payload(), originalSize, 
compressContent),
+            () -> Mono.from(underlying.save(bucketName, blobId, 
byteSourceBlobWithSize(byteSourceBlob.payload(), originalSize, 
byteSourceBlob.metadata())))
+                .doOnSuccess(ignored -> metricRecorder.recordRatioSkip()));
+    }
+
+    private Mono<Void> compressAndSave(BucketName bucketName, BlobId blobId, 
long originalSize, BlobMetadata metadata,
+                                       Function<FileBackedOutputStream, 
Mono<CompressionDecision>> prepareCompressedContent,
+                                       Supplier<Mono<Void>> saveOriginal) {
+        return Mono.usingWhen(Mono.fromCallable(() -> new 
FileBackedOutputStream(FILE_THRESHOLD)),
+                compressContent -> 
prepareCompressedContent.apply(compressContent)
+                    .flatMap(compressionDecision -> 
saveCompressedIfWorthKeeping(bucketName, blobId, originalSize, metadata,
+                        compressContent, compressionDecision, saveOriginal)),
+                compressContent -> 
Mono.fromRunnable(Throwing.runnable(compressContent::reset)).subscribeOn(Schedulers.boundedElastic()))
+            .subscribeOn(Schedulers.boundedElastic());
+    }
+
+    private Mono<Void> saveCompressedIfWorthKeeping(BucketName bucketName, 
BlobId blobId, long originalSize,
+                                                    BlobMetadata metadata, 
FileBackedOutputStream compressedContent,
+                                                    CompressionDecision 
compressionDecision, Supplier<Mono<Void>> saveOriginal) {
+        if 
(compressionDecision.satisfyCompressionMinRatio(compressionConfiguration.minRatio()))
 {
+            return Mono.from(underlying.save(bucketName, blobId,
+                    byteSourceBlobWithSize(compressedContent.asByteSource(), 
compressionDecision.compressedSize(), withCompressionMetadata(metadata, 
originalSize))))
+                .doOnSuccess(ignored -> 
metricRecorder.recordCompressedSave(originalSize, 
compressionDecision.compressedSize()));
+        }
+
+        return saveOriginal.get();
+    }
+
+    private Mono<CompressionDecision> 
prepareCompressedContent(FileBackedOutputStream originalContent, long 
originalSize,
+                                                               
FileBackedOutputStream compressContent) {
+        return Mono.fromCallable(() -> {
+            try (InputStream originalStream = 
originalContent.asByteSource().openStream()) {
+                long compressedSize = compress(originalStream, 
compressContent);
+                return compressionDecision(originalSize, compressedSize);
+            }
+        });
+    }
+
+    private Mono<CompressionDecision> prepareCompressedContent(ByteSource 
byteSource, long originalSize,
+                                                               
FileBackedOutputStream compressContent) {
+        return Mono.fromCallable(() -> {
+            long compressedSize = compress(byteSource, compressContent);
+            return compressionDecision(originalSize, compressedSize);
+        });
+    }
+
+    private byte[] compress(byte[] data) {
+        TimeMetric timeMetric = metricRecorder.startCompressionLatencyTimer();
+        try {
+            return Zstd.compress(data);
+        } finally {
+            timeMetric.stopAndPublish();
+        }
+    }
+
+    private long compress(InputStream inputStream, FileBackedOutputStream 
compressedContent) throws IOException {
+        TimeMetric timeMetric = metricRecorder.startCompressionLatencyTimer();
+
+        try (CountingOutputStream countingOutputStream = new 
CountingOutputStream(compressedContent);
+             ZstdOutputStream zstdOutputStream = new 
ZstdOutputStream(countingOutputStream)) {
+            inputStream.transferTo(zstdOutputStream);
+            zstdOutputStream.close();
+            return countingOutputStream.getCount();
+        } finally {
+            timeMetric.stopAndPublish();
+        }
+    }
+
+    private long compress(ByteSource byteSource, FileBackedOutputStream 
compressedContent) throws IOException {
+        TimeMetric timeMetric = metricRecorder.startCompressionLatencyTimer();
+
+        try (CountingOutputStream countingOutputStream = new 
CountingOutputStream(compressedContent);
+             ZstdOutputStream zstdOutputStream = new 
ZstdOutputStream(countingOutputStream)) {
+            byteSource.copyTo(zstdOutputStream);
+            zstdOutputStream.close();
+            return countingOutputStream.getCount();
+        } finally {
+            timeMetric.stopAndPublish();
+        }
+    }
+
+    private boolean shouldAttemptCompression(long originalSize) {
+        return compressionEnabled()
+            && originalSize >= compressionConfiguration.threshold();
+    }
+
+    private boolean compressionEnabled() {
+        return compressionConfiguration.enabled()
+            && compressionConfiguration.minRatio() > 0; // minRatio == 0 means 
decompress-only mode: never compress on save
+    }
+
+    private CompressionDecision compressionDecision(long originalSize, long 
compressedSize) {
+        return new CompressionDecision(originalSize, compressedSize);
+    }
+
+    private boolean isCompressed(BlobMetadata metadata) {
+        return metadata.contentTransferEncoding()
+            .filter(ContentTransferEncoding.ZSTD::equals)
+            .isPresent();
+    }
+
+    private long resolveSize(ByteSource byteSource) throws IOException {
+        if (byteSource.sizeIfKnown().isPresent()) {
+            return byteSource.sizeIfKnown().get();
+        }
+
+        return byteSource.size();
+    }
+
+    private void validateMetadata(BlobMetadata metadata) {
+        Set<BlobMetadataName> reservedNames = metadata.underlyingMap()
+            .keySet()
+            .stream()
+            .filter(RESERVED_METADATA_NAMES::contains)
+            .collect(Collectors.toSet());
+
+        if (!reservedNames.isEmpty()) {
+            throw new IllegalArgumentException("Reserved zstd metadata are not 
allowed: " + reservedNames);
+        }
+    }
+
+    private BlobMetadata withCompressionMetadata(BlobMetadata metadata, long 
originalSize) {
+        return metadata
+            .withContentTransferEncoding(ContentTransferEncoding.ZSTD)
+            .withMetadata(CONTENT_ORIGINAL_SIZE, new 
BlobMetadataValue(String.valueOf(originalSize)));
+    }
+
+    private ByteSourceBlob byteSourceBlobWithSize(ByteSource byteSource, long 
size, BlobMetadata metadata) {
+        return ByteSourceBlob.of(new ByteSource() {
+            @Override
+            public InputStream openStream() throws IOException {
+                return byteSource.openStream();
+            }
+
+            @Override
+            public com.google.common.base.Optional<Long> sizeIfKnown() {
+                return com.google.common.base.Optional.of(size);
+            }
+
+            @Override
+            public long size() {
+                return size;
+            }
+        }, metadata);
+    }
+
+}
diff --git 
a/server/blob/blob-zstd/src/test/java/org/apache/james/blob/zstd/CompressionConfigurationTest.java
 
b/server/blob/blob-zstd/src/test/java/org/apache/james/blob/zstd/CompressionConfigurationTest.java
new file mode 100644
index 0000000000..18a8f5cfb0
--- /dev/null
+++ 
b/server/blob/blob-zstd/src/test/java/org/apache/james/blob/zstd/CompressionConfigurationTest.java
@@ -0,0 +1,97 @@
+/****************************************************************
+ * Licensed to the Apache Software Foundation (ASF) under one   *
+ * or more contributor license agreements.  See the NOTICE file *
+ * distributed with this work for additional information        *
+ * regarding copyright ownership.  The ASF licenses this file   *
+ * to you under the Apache License, Version 2.0 (the            *
+ * "License"); you may not use this file except in compliance   *
+ * with the License.  You may obtain a copy of the License at   *
+ *                                                              *
+ *   http://www.apache.org/licenses/LICENSE-2.0                 *
+ *                                                              *
+ * Unless required by applicable law or agreed to in writing,   *
+ * software distributed under the License is distributed on an  *
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY       *
+ * KIND, either express or implied.  See the License for the    *
+ * specific language governing permissions and limitations      *
+ * under the License.                                           *
+ ****************************************************************/
+
+package org.apache.james.blob.zstd;
+
+import static org.assertj.core.api.Assertions.assertThat;
+import static org.assertj.core.api.Assertions.assertThatThrownBy;
+import static org.assertj.core.api.SoftAssertions.assertSoftly;
+
+import org.junit.jupiter.api.Test;
+
+class CompressionConfigurationTest {
+    @Test
+    void builderShouldReturnDefaultConfigurationByDefault() {
+        CompressionConfiguration compressionConfiguration = 
CompressionConfiguration.builder()
+            .build();
+
+        assertSoftly(softly -> {
+            softly.assertThat(compressionConfiguration.enabled()).isFalse();
+            
softly.assertThat(compressionConfiguration.threshold()).isEqualTo(16 * 1024L);
+            
softly.assertThat(compressionConfiguration.minRatio()).isEqualTo(1F);
+        });
+    }
+
+    @Test
+    void builderShouldSupportEnabledConfiguration() {
+        CompressionConfiguration compressionConfiguration = 
CompressionConfiguration.builder()
+            .enabled(true)
+            .threshold(42)
+            .minRatio(0.8F)
+            .build();
+
+        assertSoftly(softly -> {
+            softly.assertThat(compressionConfiguration.enabled()).isTrue();
+            
softly.assertThat(compressionConfiguration.threshold()).isEqualTo(42L);
+            
softly.assertThat(compressionConfiguration.minRatio()).isEqualTo(0.8F);
+        });
+    }
+
+    @Test
+    void builderShouldSupportZeroMinRatio() {
+        assertThat(CompressionConfiguration.builder()
+            .minRatio(0F)
+            .build()
+            .minRatio())
+            .isZero();
+    }
+
+    @Test
+    void builderShouldThrowWhenThresholdIsZero() {
+        assertThatThrownBy(() -> CompressionConfiguration.builder()
+            .threshold(0))
+            .isInstanceOf(IllegalArgumentException.class)
+            .hasMessage("'threshold' needs to be strictly positive");
+    }
+
+    @Test
+    void builderShouldThrowWhenThresholdIsNegative() {
+        assertThatThrownBy(() -> CompressionConfiguration.builder()
+            .threshold(-1))
+            .isInstanceOf(IllegalArgumentException.class)
+            .hasMessage("'threshold' needs to be strictly positive");
+    }
+
+    @Test
+    void builderShouldThrowWhenMinRatioIsNegative() {
+        assertThatThrownBy(() -> CompressionConfiguration.builder()
+            .minRatio(-0.1F))
+            .isInstanceOf(IllegalArgumentException.class)
+            .hasMessage("'minRatio' needs to be between 0 and 1");
+    }
+
+    @Test
+    void builderShouldThrowWhenMinRatioIsAboveOne() {
+        assertThatThrownBy(() -> CompressionConfiguration.builder()
+            .minRatio(1.1F))
+            .isInstanceOf(IllegalArgumentException.class)
+            .hasMessage("'minRatio' needs to be between 0 and 1");
+    }
+
+}
diff --git 
a/server/blob/blob-zstd/src/test/java/org/apache/james/blob/zstd/ZstdBlobStoreDAOTest.java
 
b/server/blob/blob-zstd/src/test/java/org/apache/james/blob/zstd/ZstdBlobStoreDAOTest.java
new file mode 100644
index 0000000000..67fd2e20f7
--- /dev/null
+++ 
b/server/blob/blob-zstd/src/test/java/org/apache/james/blob/zstd/ZstdBlobStoreDAOTest.java
@@ -0,0 +1,453 @@
+/****************************************************************
+ * Licensed to the Apache Software Foundation (ASF) under one   *
+ * or more contributor license agreements.  See the NOTICE file *
+ * distributed with this work for additional information        *
+ * regarding copyright ownership.  The ASF licenses this file   *
+ * to you under the Apache License, Version 2.0 (the            *
+ * "License"); you may not use this file except in compliance   *
+ * with the License.  You may obtain a copy of the License at   *
+ *                                                              *
+ *   http://www.apache.org/licenses/LICENSE-2.0                 *
+ *                                                              *
+ * Unless required by applicable law or agreed to in writing,   *
+ * software distributed under the License is distributed on an  *
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY       *
+ * KIND, either express or implied.  See the License for the    *
+ * specific language governing permissions and limitations      *
+ * under the License.                                           *
+ ****************************************************************/
+
+package org.apache.james.blob.zstd;
+
+import static org.apache.james.blob.api.BlobStoreDAOFixture.ELEVEN_KILOBYTES;
+import static org.apache.james.blob.api.BlobStoreDAOFixture.SHORT_BYTEARRAY;
+import static org.apache.james.blob.api.BlobStoreDAOFixture.TEST_BLOB_ID;
+import static org.apache.james.blob.api.BlobStoreDAOFixture.TEST_BUCKET_NAME;
+import static 
org.apache.james.blob.objectstorage.aws.JamesS3MetricPublisher.DEFAULT_S3_METRICS_PREFIX;
+import static 
org.apache.james.blob.objectstorage.aws.S3BlobStoreConfiguration.UPLOAD_RETRY_EXCEPTION_PREDICATE;
+import static org.assertj.core.api.Assertions.assertThat;
+import static org.assertj.core.api.Assertions.assertThatThrownBy;
+import static org.assertj.core.api.SoftAssertions.assertSoftly;
+
+import java.io.ByteArrayInputStream;
+import java.io.IOException;
+import java.io.InputStream;
+import java.time.Duration;
+import java.util.Optional;
+import java.util.Random;
+import java.util.stream.Stream;
+
+import org.apache.james.blob.api.BlobStoreDAO;
+import org.apache.james.blob.api.BlobStoreDAOContract;
+import org.apache.james.blob.api.BucketName;
+import org.apache.james.blob.api.MetadataAwareBlobStoreDAOContract;
+import org.apache.james.blob.api.TestBlobId;
+import org.apache.james.blob.objectstorage.aws.AwsS3AuthConfiguration;
+import org.apache.james.blob.objectstorage.aws.DockerAwsS3Container;
+import org.apache.james.blob.objectstorage.aws.DockerAwsS3Extension;
+import org.apache.james.blob.objectstorage.aws.JamesS3MetricPublisher;
+import org.apache.james.blob.objectstorage.aws.S3BlobStoreConfiguration;
+import org.apache.james.blob.objectstorage.aws.S3BlobStoreDAO;
+import org.apache.james.blob.objectstorage.aws.S3ClientFactory;
+import org.apache.james.blob.objectstorage.aws.S3RequestOption;
+import org.apache.james.metrics.api.NoopGaugeRegistry;
+import org.apache.james.metrics.tests.RecordingMetricFactory;
+import org.junit.jupiter.api.AfterAll;
+import org.junit.jupiter.api.AfterEach;
+import org.junit.jupiter.api.BeforeAll;
+import org.junit.jupiter.api.BeforeEach;
+import org.junit.jupiter.api.Test;
+import org.junit.jupiter.api.extension.ExtendWith;
+import org.junit.jupiter.params.ParameterizedTest;
+import org.junit.jupiter.params.provider.Arguments;
+import org.junit.jupiter.params.provider.MethodSource;
+
+import com.google.common.io.ByteSource;
+
+import reactor.core.publisher.Mono;
+import reactor.util.retry.Retry;
+
+@ExtendWith(DockerAwsS3Extension.class)
+class ZstdBlobStoreDAOTest implements BlobStoreDAOContract, 
MetadataAwareBlobStoreDAOContract {
+    private static final CompressionConfiguration 
DEFAULT_COMPRESSION_CONFIGURATION = CompressionConfiguration.builder()
+        .enabled(true)
+        .build();
+    private static final BucketName FALLBACK_BUCKET = 
BucketName.of("fallback");
+
+    private static S3BlobStoreDAO underlying;
+    private static S3ClientFactory s3ClientFactory;
+
+    private RecordingMetricFactory metricFactory;
+    private ZstdBlobStoreDAO testee;
+
+    @BeforeAll
+    static void setUp(DockerAwsS3Container dockerAwsS3) {
+        AwsS3AuthConfiguration authConfiguration = 
AwsS3AuthConfiguration.builder()
+            .endpoint(dockerAwsS3.getEndpoint())
+            .accessKeyId(DockerAwsS3Container.ACCESS_KEY_ID)
+            .secretKey(DockerAwsS3Container.SECRET_ACCESS_KEY)
+            .build();
+
+        S3BlobStoreConfiguration s3Configuration = 
S3BlobStoreConfiguration.builder()
+            .authConfiguration(authConfiguration)
+            .region(dockerAwsS3.dockerAwsS3().region())
+            .uploadRetrySpec(Optional.of(Retry.backoff(3, 
Duration.ofSeconds(1))
+                .filter(UPLOAD_RETRY_EXCEPTION_PREDICATE)))
+            .defaultBucketName(BucketName.DEFAULT)
+            .fallbackBucketName(Optional.of(FALLBACK_BUCKET))
+            .build();
+
+        s3ClientFactory = new S3ClientFactory(s3Configuration, () -> new 
JamesS3MetricPublisher(new RecordingMetricFactory(),
+            new NoopGaugeRegistry(), DEFAULT_S3_METRICS_PREFIX));
+        underlying = new S3BlobStoreDAO(s3ClientFactory, s3Configuration, new 
TestBlobId.Factory(), S3RequestOption.DEFAULT);
+    }
+
+    @AfterAll
+    static void tearDownClass() {
+        if (s3ClientFactory != null) {
+            s3ClientFactory.close();
+        }
+    }
+
+    @BeforeEach
+    void setUp() {
+        metricFactory = new RecordingMetricFactory();
+        testee = new ZstdBlobStoreDAO(underlying, 
DEFAULT_COMPRESSION_CONFIGURATION, metricFactory);
+    }
+
+    @AfterEach
+    void tearDown() {
+        if (underlying != null) {
+            underlying.deleteAllBuckets().block();
+        }
+    }
+
+    @Override
+    public BlobStoreDAO testee() {
+        return testee;
+    }
+
+    @Override
+    @Test
+    public void retrieveContentTransferEncodingShouldSucceed() {
+        ZstdBlobStoreDAO localTestee = new ZstdBlobStoreDAO(underlying,
+            CompressionConfiguration.builder()
+                .enabled(true)
+                .threshold(1)
+                .build(),
+            metricFactory);
+
+        // should compress and append content-transfer-encoding metadata, when 
threshold is met.
+        Mono.from(localTestee.save(TEST_BUCKET_NAME, TEST_BLOB_ID, 
ELEVEN_KILOBYTES)).block();
+
+        assertThat(Mono.from(localTestee.readBytes(TEST_BUCKET_NAME, 
TEST_BLOB_ID)).block().metadata().contentTransferEncoding())
+            .contains(BlobStoreDAO.ContentTransferEncoding.ZSTD);
+    }
+
+    @Test
+    void shouldPreserveCallerMetadataWhenCompressionHappens() {
+        ZstdBlobStoreDAO localTestee = new ZstdBlobStoreDAO(underlying,
+            CompressionConfiguration.builder()
+                .enabled(true)
+                .threshold(1)
+                .build(),
+            metricFactory);
+        BlobStoreDAO.BlobMetadata metadata = BlobStoreDAO.BlobMetadata.empty()
+            .withMetadata(new BlobStoreDAO.BlobMetadataName("name"), new 
BlobStoreDAO.BlobMetadataValue("value"))
+            .withMetadata(new BlobStoreDAO.BlobMetadataName("type"), new 
BlobStoreDAO.BlobMetadataValue("attachment"));
+        BlobStoreDAO.BytesBlob blob = 
BlobStoreDAO.BytesBlob.of(ELEVEN_KILOBYTES.payload(), metadata);
+
+        Mono.from(localTestee.save(TEST_BUCKET_NAME, TEST_BLOB_ID, 
blob)).block();
+
+        BlobStoreDAO.BytesBlob readBlob = 
Mono.from(localTestee.readBytes(TEST_BUCKET_NAME, TEST_BLOB_ID)).block();
+
+        assertSoftly(softly -> {
+            softly.assertThat(readBlob.metadata().underlyingMap())
+                .containsEntry(new BlobStoreDAO.BlobMetadataName("name"), new 
BlobStoreDAO.BlobMetadataValue("value"))
+                .containsEntry(new BlobStoreDAO.BlobMetadataName("type"), new 
BlobStoreDAO.BlobMetadataValue("attachment"))
+                .containsEntry(BlobStoreDAO.ContentTransferEncoding.NAME, 
BlobStoreDAO.ContentTransferEncoding.ZSTD.asValue())
+                .containsEntry(ZstdBlobStoreDAO.CONTENT_ORIGINAL_SIZE,
+                    new 
BlobStoreDAO.BlobMetadataValue(String.valueOf(ELEVEN_KILOBYTES.payload().length)));
+        });
+    }
+
+    @Test
+    void readExistingNonCompressedBlobShouldSucceed() {
+        Mono.from(underlying.save(TEST_BUCKET_NAME, TEST_BLOB_ID, 
ELEVEN_KILOBYTES)).block();
+
+        assertThat(Mono.from(testee.readBytes(TEST_BUCKET_NAME, 
TEST_BLOB_ID)).block())
+            .isEqualTo(ELEVEN_KILOBYTES);
+    }
+
+    @Test
+    void shouldNotCompressBlobIfThresholdIsNotMet() {
+        ZstdBlobStoreDAO localTestee = new ZstdBlobStoreDAO(underlying,
+            CompressionConfiguration.builder()
+                .enabled(true)
+                .threshold(16 * 1024)
+                .build(),
+            metricFactory);
+
+        Mono.from(localTestee.save(TEST_BUCKET_NAME, TEST_BLOB_ID, 
SHORT_BYTEARRAY)).block();
+
+        BlobStoreDAO.BytesBlob storedBlob = 
Mono.from(underlying.readBytes(TEST_BUCKET_NAME, TEST_BLOB_ID)).block();
+
+        assertSoftly(softly -> {
+            
softly.assertThat(storedBlob.payload()).isEqualTo(SHORT_BYTEARRAY.payload());
+            
softly.assertThat(storedBlob.metadata().contentTransferEncoding()).isEmpty();
+            
softly.assertThat(storedBlob.metadata().get(ZstdBlobStoreDAO.CONTENT_ORIGINAL_SIZE)).isEmpty();
+        });
+    }
+
+    @Test
+    void readShouldDecompressCompressedBlob() throws IOException {
+        ZstdBlobStoreDAO localTestee = new ZstdBlobStoreDAO(underlying,
+            CompressionConfiguration.builder()
+                .enabled(true)
+                .threshold(1)
+                .build(),
+            metricFactory);
+
+        Mono.from(localTestee.save(TEST_BUCKET_NAME, TEST_BLOB_ID, 
ELEVEN_KILOBYTES)).block();
+
+        BlobStoreDAO.InputStreamBlob readBlob = 
localTestee.read(TEST_BUCKET_NAME, TEST_BLOB_ID);
+        byte[] payload = readBlob.payload().readAllBytes();
+
+        assertSoftly(softly -> {
+            softly.assertThat(payload).isEqualTo(ELEVEN_KILOBYTES.payload());
+            
softly.assertThat(readBlob.metadata().contentTransferEncoding()).contains(BlobStoreDAO.ContentTransferEncoding.ZSTD);
+            
softly.assertThat(readBlob.metadata().get(ZstdBlobStoreDAO.CONTENT_ORIGINAL_SIZE))
+                .contains(new 
BlobStoreDAO.BlobMetadataValue(String.valueOf(ELEVEN_KILOBYTES.payload().length)));
+        });
+    }
+
+    @Test
+    void readReactiveShouldDecompressCompressedBlob() throws IOException {
+        ZstdBlobStoreDAO localTestee = new ZstdBlobStoreDAO(underlying,
+            CompressionConfiguration.builder()
+                .enabled(true)
+                .threshold(1)
+                .build(),
+            metricFactory);
+
+        Mono.from(localTestee.save(TEST_BUCKET_NAME, TEST_BLOB_ID, 
ELEVEN_KILOBYTES)).block();
+
+        BlobStoreDAO.InputStreamBlob readBlob = 
Mono.from(localTestee.readReactive(TEST_BUCKET_NAME, TEST_BLOB_ID)).block();
+        byte[] payload = readBlob.payload().readAllBytes();
+
+        assertSoftly(softly -> {
+            softly.assertThat(payload).isEqualTo(ELEVEN_KILOBYTES.payload());
+            
softly.assertThat(readBlob.metadata().contentTransferEncoding()).contains(BlobStoreDAO.ContentTransferEncoding.ZSTD);
+            
softly.assertThat(readBlob.metadata().get(ZstdBlobStoreDAO.CONTENT_ORIGINAL_SIZE))
+                .contains(new 
BlobStoreDAO.BlobMetadataValue(String.valueOf(ELEVEN_KILOBYTES.payload().length)));
+        });
+    }
+
+    @Test
+    void shouldNotCompressBlobWhenMinRatioIsZero() {
+        ZstdBlobStoreDAO localTestee = new ZstdBlobStoreDAO(underlying,
+            CompressionConfiguration.builder()
+                .enabled(true)
+                .threshold(1)
+                .minRatio(0F)
+                .build(),
+            metricFactory);
+
+        Mono.from(localTestee.save(TEST_BUCKET_NAME, TEST_BLOB_ID, 
ELEVEN_KILOBYTES)).block();
+
+        BlobStoreDAO.BytesBlob storedBlob = 
Mono.from(underlying.readBytes(TEST_BUCKET_NAME, TEST_BLOB_ID)).block();
+
+        assertSoftly(softly -> {
+            
softly.assertThat(storedBlob.payload()).isEqualTo(ELEVEN_KILOBYTES.payload());
+            
softly.assertThat(storedBlob.metadata().contentTransferEncoding()).isEmpty();
+            
softly.assertThat(storedBlob.metadata().get(ZstdBlobStoreDAO.CONTENT_ORIGINAL_SIZE)).isEmpty();
+        });
+    }
+
+    @Test
+    void shouldStillDecompressBlobWhenMinRatioIsZero() {
+        ZstdBlobStoreDAO compressingTestee = new ZstdBlobStoreDAO(underlying,
+            CompressionConfiguration.builder()
+                .enabled(true)
+                .threshold(1)
+                .build(),
+            metricFactory);
+        ZstdBlobStoreDAO uncompressingOnlyTestee = new 
ZstdBlobStoreDAO(underlying,
+            CompressionConfiguration.builder()
+                .enabled(true)
+                .threshold(1)
+                .minRatio(0F)
+                .build(),
+            metricFactory);
+
+        Mono.from(compressingTestee.save(TEST_BUCKET_NAME, TEST_BLOB_ID, 
ELEVEN_KILOBYTES)).block();
+
+        BlobStoreDAO.BytesBlob readBlob = 
Mono.from(uncompressingOnlyTestee.readBytes(TEST_BUCKET_NAME, 
TEST_BLOB_ID)).block();
+
+        assertSoftly(softly -> {
+            
softly.assertThat(readBlob.payload()).isEqualTo(ELEVEN_KILOBYTES.payload());
+            
softly.assertThat(readBlob.metadata().contentTransferEncoding()).contains(BlobStoreDAO.ContentTransferEncoding.ZSTD);
+            
softly.assertThat(readBlob.metadata().get(ZstdBlobStoreDAO.CONTENT_ORIGINAL_SIZE))
+                .contains(new 
BlobStoreDAO.BlobMetadataValue(String.valueOf(ELEVEN_KILOBYTES.payload().length)));
+        });
+    }
+
+    @Test
+    void shouldNotCompressBlobWhenMinRatioIsNotMet() {
+        ZstdBlobStoreDAO localTestee = new ZstdBlobStoreDAO(underlying,
+            CompressionConfiguration.builder()
+                .enabled(true)
+                .threshold(1)
+                .minRatio(0.5F)
+                .build(),
+            metricFactory);
+        byte[] randomPayload = new byte[4096];
+        new Random(1).nextBytes(randomPayload);
+        BlobStoreDAO.BytesBlob randomBlob = 
BlobStoreDAO.BytesBlob.of(randomPayload);
+
+        Mono.from(localTestee.save(TEST_BUCKET_NAME, TEST_BLOB_ID, 
randomBlob)).block();
+
+        BlobStoreDAO.BytesBlob storedBlob = 
Mono.from(underlying.readBytes(TEST_BUCKET_NAME, TEST_BLOB_ID)).block();
+
+        assertSoftly(softly -> {
+            softly.assertThat(storedBlob.payload()).isEqualTo(randomPayload);
+            
softly.assertThat(storedBlob.metadata().contentTransferEncoding()).isEmpty();
+            
softly.assertThat(storedBlob.metadata().get(ZstdBlobStoreDAO.CONTENT_ORIGINAL_SIZE)).isEmpty();
+        });
+    }
+
+    @Test
+    void shouldRecordMetrics() {
+        ZstdBlobStoreDAO localTestee = new ZstdBlobStoreDAO(underlying,
+            CompressionConfiguration.builder()
+                .enabled(true)
+                .threshold(1)
+                .build(),
+            metricFactory);
+
+        Mono.from(localTestee.save(TEST_BUCKET_NAME, TEST_BLOB_ID, 
ELEVEN_KILOBYTES)).block();
+        Mono.from(localTestee.readBytes(TEST_BUCKET_NAME, 
TEST_BLOB_ID)).block();
+        BlobStoreDAO.BytesBlob storedBlob = 
Mono.from(underlying.readBytes(TEST_BUCKET_NAME, TEST_BLOB_ID)).block();
+
+        assertSoftly(softly -> {
+            
softly.assertThat(metricFactory.countFor(ZstdBlobStoreDAO.MetricRecorder.BLOB_ZSTD_COMPRESS_SAVE_COUNT_METRIC_NAME)).isEqualTo(1);
+            
softly.assertThat(metricFactory.countFor(ZstdBlobStoreDAO.MetricRecorder.BLOB_ZSTD_DECOMPRESS_COUNT_METRIC_NAME)).isEqualTo(1);
+            
softly.assertThat(metricFactory.countFor(ZstdBlobStoreDAO.MetricRecorder.BLOB_ZSTD_SAVED_BYTES_METRIC_NAME))
+                .isEqualTo(ELEVEN_KILOBYTES.payload().length - 
storedBlob.payload().length);
+            
softly.assertThat(metricFactory.executionTimesFor(ZstdBlobStoreDAO.MetricRecorder.BLOB_ZSTD_COMPRESS_LATENCY_METRIC_NAME)).hasSize(1);
+            
softly.assertThat(metricFactory.executionTimesFor(ZstdBlobStoreDAO.MetricRecorder.BLOB_ZSTD_DECOMPRESS_LATENCY_METRIC_NAME)).hasSize(1);
+        });
+    }
+
+    @Test
+    void shouldRecordThresholdSkipMetricIfThresholdNotMatch() {
+        Mono.from(testee.save(TEST_BUCKET_NAME, TEST_BLOB_ID, 
SHORT_BYTEARRAY)).block();
+
+        
assertThat(metricFactory.countFor(ZstdBlobStoreDAO.MetricRecorder.BLOB_ZSTD_THRESHOLD_SKIP_COUNT_METRIC_NAME))
+            .isEqualTo(1);
+    }
+
+    @ParameterizedTest
+    @MethodSource("blobsWithReservedCompressionMetadata")
+    void saveShouldRejectReservedCompressionMetadata(BlobStoreDAO.Blob blob) {
+        assertThatThrownBy(() -> Mono.from(testee.save(TEST_BUCKET_NAME, 
TEST_BLOB_ID, blob)).block())
+            .isInstanceOf(IllegalArgumentException.class)
+            .hasMessageContaining("Reserved zstd metadata are not allowed");
+    }
+
+    @ParameterizedTest
+    @MethodSource("compressionSamples")
+    void readBytesShouldRoundTripCompressedResourcesWithoutCorruption(String 
resourcePath) throws IOException {
+        ZstdBlobStoreDAO localTestee = new ZstdBlobStoreDAO(underlying,
+            CompressionConfiguration.builder()
+                .enabled(true)
+                .threshold(1)
+                .build(),
+            metricFactory);
+        byte[] resourceBytes = readResource(resourcePath);
+
+        Mono.from(localTestee.save(TEST_BUCKET_NAME, TEST_BLOB_ID, 
BlobStoreDAO.BytesBlob.of(resourceBytes))).block();
+
+        BlobStoreDAO.BytesBlob readBlob = 
Mono.from(localTestee.readBytes(TEST_BUCKET_NAME, TEST_BLOB_ID)).block();
+
+        assertSoftly(softly -> {
+            softly.assertThat(readBlob.payload()).isEqualTo(resourceBytes);
+            
softly.assertThat(readBlob.metadata().contentTransferEncoding()).contains(BlobStoreDAO.ContentTransferEncoding.ZSTD);
+            
softly.assertThat(readBlob.metadata().get(ZstdBlobStoreDAO.CONTENT_ORIGINAL_SIZE))
+                .contains(new 
BlobStoreDAO.BlobMetadataValue(String.valueOf(resourceBytes.length)));
+        });
+    }
+
+    @ParameterizedTest
+    @MethodSource("compressionSamples")
+    void readShouldRoundTripCompressedResourcesWithoutCorruption(String 
resourcePath) throws IOException {
+        ZstdBlobStoreDAO localTestee = new ZstdBlobStoreDAO(underlying,
+            CompressionConfiguration.builder()
+                .enabled(true)
+                .threshold(1)
+                .build(),
+            metricFactory);
+        byte[] resourceBytes = readResource(resourcePath);
+
+        Mono.from(localTestee.save(TEST_BUCKET_NAME, TEST_BLOB_ID, 
BlobStoreDAO.BytesBlob.of(resourceBytes))).block();
+
+        BlobStoreDAO.InputStreamBlob readBlob = 
localTestee.read(TEST_BUCKET_NAME, TEST_BLOB_ID);
+        byte[] payload = readBlob.payload().readAllBytes();
+
+        assertSoftly(softly -> {
+            softly.assertThat(payload).isEqualTo(resourceBytes);
+            
softly.assertThat(readBlob.metadata().contentTransferEncoding()).contains(BlobStoreDAO.ContentTransferEncoding.ZSTD);
+            
softly.assertThat(readBlob.metadata().get(ZstdBlobStoreDAO.CONTENT_ORIGINAL_SIZE))
+                .contains(new 
BlobStoreDAO.BlobMetadataValue(String.valueOf(resourceBytes.length)));
+        });
+    }
+
+    @ParameterizedTest
+    @MethodSource("compressionSamples")
+    void 
readReactiveShouldRoundTripCompressedResourcesWithoutCorruption(String 
resourcePath) throws IOException {
+        ZstdBlobStoreDAO localTestee = new ZstdBlobStoreDAO(underlying,
+            CompressionConfiguration.builder()
+                .enabled(true)
+                .threshold(1)
+                .build(),
+            metricFactory);
+        byte[] resourceBytes = readResource(resourcePath);
+
+        Mono.from(localTestee.save(TEST_BUCKET_NAME, TEST_BLOB_ID, 
BlobStoreDAO.BytesBlob.of(resourceBytes))).block();
+
+        BlobStoreDAO.InputStreamBlob readBlob = 
Mono.from(localTestee.readReactive(TEST_BUCKET_NAME, TEST_BLOB_ID)).block();
+        byte[] payload = readBlob.payload().readAllBytes();
+
+        assertSoftly(softly -> {
+            softly.assertThat(payload).isEqualTo(resourceBytes);
+            
softly.assertThat(readBlob.metadata().contentTransferEncoding()).contains(BlobStoreDAO.ContentTransferEncoding.ZSTD);
+            
softly.assertThat(readBlob.metadata().get(ZstdBlobStoreDAO.CONTENT_ORIGINAL_SIZE))
+                .contains(new 
BlobStoreDAO.BlobMetadataValue(String.valueOf(resourceBytes.length)));
+        });
+    }
+
+    private static Stream<Arguments> compressionSamples() {
+        return Stream.of(
+            Arguments.of("zstd/text.txt"),
+            Arguments.of("zstd/james-logo.jpg"),
+            Arguments.of("zstd/mail1.eml"),
+            Arguments.of("zstd/document.pdf"));
+    }
+
+    private static Stream<Arguments> blobsWithReservedCompressionMetadata() {
+        BlobStoreDAO.BlobMetadata reservedMetadata = 
BlobStoreDAO.BlobMetadata.empty()
+            .withMetadata(BlobStoreDAO.ContentTransferEncoding.NAME, 
BlobStoreDAO.ContentTransferEncoding.ZSTD.asValue());
+
+        return Stream.of(
+            Arguments.of(BlobStoreDAO.BytesBlob.of(ELEVEN_KILOBYTES.payload(), 
reservedMetadata)),
+            Arguments.of(BlobStoreDAO.InputStreamBlob.of(new 
ByteArrayInputStream(ELEVEN_KILOBYTES.payload()), reservedMetadata)),
+            
Arguments.of(BlobStoreDAO.ByteSourceBlob.of(ByteSource.wrap(ELEVEN_KILOBYTES.payload()),
 reservedMetadata)));
+    }
+
+    private byte[] readResource(String resourcePath) throws IOException {
+        try (InputStream inputStream = 
ClassLoader.getSystemClassLoader().getResourceAsStream(resourcePath)) {
+            assertThat(inputStream).describedAs("resource %s should exist", 
resourcePath).isNotNull();
+            return inputStream.readAllBytes();
+        }
+    }
+}
diff --git a/server/blob/blob-zstd/src/test/resources/zstd/document.pdf 
b/server/blob/blob-zstd/src/test/resources/zstd/document.pdf
new file mode 100644
index 0000000000..4603bd39c1
Binary files /dev/null and 
b/server/blob/blob-zstd/src/test/resources/zstd/document.pdf differ
diff --git a/server/blob/blob-zstd/src/test/resources/zstd/james-logo.jpg 
b/server/blob/blob-zstd/src/test/resources/zstd/james-logo.jpg
new file mode 100644
index 0000000000..8299376397
Binary files /dev/null and 
b/server/blob/blob-zstd/src/test/resources/zstd/james-logo.jpg differ
diff --git a/server/blob/blob-zstd/src/test/resources/zstd/mail1.eml 
b/server/blob/blob-zstd/src/test/resources/zstd/mail1.eml
new file mode 100644
index 0000000000..4b89f591bb
--- /dev/null
+++ b/server/blob/blob-zstd/src/test/resources/zstd/mail1.eml
@@ -0,0 +1,69 @@
+Return-Path: <[email protected]>
+Received: from mx1.minet.net (mx1.minet.net [192.168.102.25])
+        by imap (Cyrus v2.4.16-Debian-2.4.16-4+deb7u1) with LMTPA;
+        Thu, 04 Jun 2015 11:23:39 +0200
+X-Sieve: CMU Sieve 2.4
+Received: from localhost (spam.minet.net [192.168.102.97])
+       by mx1.minet.net (Postfix) with ESMTP id 0113F385C15
+       for <[email protected]>; Thu,  4 Jun 2015 11:23:43 +0200 (CEST)
+X-Virus-Scanned: by amavisd-new using ClamAV at minet.net
+X-Spam-Flag: NO
+X-Spam-Score: -1.51
+X-Spam-Level:
+X-Spam-Status: No, score=-1.51 required=1 tests=[BAYES_00=-1.5,
+       T_RP_MATCHES_RCVD=-0.01] autolearn=ham
+Received: from mx2.minet.net ([IPv6:::ffff:192.168.102.26])
+       by localhost (spam.minet.net [::ffff:192.168.102.97]) (amavisd-new, 
port 10024)
+       with ESMTP id IeILbadS9lo5 for <[email protected]>;
+       Thu,  4 Jun 2015 09:23:42 +0000 (UTC)
+Received-SPF: Pass (sender SPF authorized) identity=mailfrom; 
client-ip=140.211.11.3; helo=mail.apache.org; 
envelope-from=server-dev-return-56862-benwa=minet....@james.apache.org; 
[email protected]
+Received: from mail.apache.org (hermes.apache.org [140.211.11.3])
+       by mx2.minet.net (Postfix) with SMTP id CDE83A1C7FC
+       for <[email protected]>; Thu,  4 Jun 2015 11:23:40 +0200 (CEST)
+Received: (qmail 37249 invoked by uid 500); 4 Jun 2015 09:23:38 -0000
+Mailing-List: contact [email protected]; run by ezmlm
+Precedence: bulk
+List-Unsubscribe: <mailto:[email protected]>
+List-Help: <mailto:[email protected]>
+List-Post: <mailto:[email protected]>
+List-Id: "James Developers List" <server-dev.james.apache.org>
+Reply-To: "James Developers List" <[email protected]>
+Delivered-To: mailing list [email protected]
+Received: (qmail 37236 invoked by uid 99); 4 Jun 2015 09:23:38 -0000
+Received: from arcas.apache.org (HELO arcas.apache.org) (140.211.11.28)
+    by apache.org (qpsmtpd/0.29) with ESMTP; Thu, 04 Jun 2015 09:23:38 +0000
+Date: Thu, 4 Jun 2015 09:23:37 +0000 (UTC)
+From: "Tellier Benoit (JIRA)" <[email protected]>
+To: "abc" <[email protected]>
+Message-ID: <[email protected]>
+In-Reply-To: <[email protected]>
+References: <[email protected]> 
<JIRA.12835341.1433409792972@arcas>
+Subject: [jira] [Created] (MAILBOX-234) Convert Message into JSON
+MIME-Version: 1.0
+Content-Type: text/plain; charset=utf-8
+Content-Transfer-Encoding: 7bit
+X-JIRA-FingerPrint: 30527f35849b9dde25b450d4833f0394
+
+Tellier Benoit created MAILBOX-234:
+--------------------------------------
+
+             Summary: Convert Message into JSON
+                 Key: MAILBOX-234
+                 URL: https://issues.apache.org/jira/browse/MAILBOX-234
+             Project: James Mailbox
+          Issue Type: New Feature
+            Reporter: Tellier Benoit
+
+
+This would give us the ability to index e-mails in ElasticSearch.
+
+
+
+--
+This message was sent by Atlassian JIRA
+(v6.3.4#6332)
+
+---------------------------------------------------------------------
+To unsubscribe, e-mail: [email protected]
+For additional commands, e-mail: [email protected]
+
diff --git a/server/blob/blob-zstd/src/test/resources/zstd/text.txt 
b/server/blob/blob-zstd/src/test/resources/zstd/text.txt
new file mode 100644
index 0000000000..8427b45a26
--- /dev/null
+++ b/server/blob/blob-zstd/src/test/resources/zstd/text.txt
@@ -0,0 +1,249 @@
+<?xml version="1.0"?>
+<!--
+  Licensed to the Apache Software Foundation (ASF) under one
+  or more contributor license agreements.  See the NOTICE file
+  distributed with this work for additional information
+  regarding copyright ownership.  The ASF licenses this file
+  to you under the Apache License, Version 2.0 (the
+  "License"); you may not use this file except in compliance
+  with the License.  You may obtain a copy of the License at
+
+    http://www.apache.org/licenses/LICENSE-2.0
+
+  Unless required by applicable law or agreed to in writing,
+  software distributed under the License is distributed on an
+  "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+  KIND, either express or implied.  See the License for the
+  specific language governing permissions and limitations
+  under the License.    
+-->
+<document>
+
+    <properties>
+        <title>Apache James Server 3 - Blob Configuration</title>
+    </properties>
+
+    <body>
+
+        <section name="BlobStore Configuration">
+            <p>
+                BlobStore is the dedicated component to store blobs, 
non-indexable content.
+                James uses the BlobStore for storing blobs which are usually 
mail contents, attachments, deleted mails...
+                You can choose the underlying implementation of BlobStore to 
fit with your James setup.
+                It could be the implementation on top of Cassandra or file 
storage service like Openstack Swift, AWS S3.
+
+                This configuration is only applicable with Guice products.
+            </p>
+            <p>
+                Consult <a 
href="https://github.com/apache/james-project/blob/master/server/apps/distributed-app/sample-configuration/blob.properties";>blob.properties</a>
+                in GIT to get some examples and hints.
+            </p>
+
+            <p>
+                Blobs storing configuration
+            </p>
+            <dl>
+                <dt><strong>implementation</strong></dt>
+                <dd>cassandra: use cassandra based BlobStore</dd>
+                <dd>s3: use AWS S3 based BlobStore</dd>
+                <dd>file: (experimental) use directly the file system. Useful 
for legacy architecture based on shared ISCI SANs and/or
+                distributed file system with no object store available.</dd>
+                <dd><b>WARNING</b>: JAMES-3591 Cassandra is not made to store 
large binary content, its use will be suboptimal compared to
+                    alternatives (namely S3 compatible BlobStores backed by 
for instance S3, MinIO or Ozone)
+                </dd>
+                <dd>
+                    The generated startup warning log can be deactivated via 
the <code>cassandra.blob.store.disable.startup.warning</code> environment
+                    variable being positioned to <code>false</code>.
+                </dd>
+                <dt><strong>deduplication.enable</strong></dt>
+                <dd>Mandatory. Supported value: true and false.</dd>
+                <dd>If you choose to enable deduplication, the mails with the 
same content will be stored only once.</dd>
+                <dd>Warning: Once this feature is enabled, there is no turning 
back as turning it off will lead to the deletion of all</dd>
+                <dd>the mails sharing the same content once one is 
deleted.</dd>
+                <dd>This feature also requires a garbage collector mechanism 
to effectively drop blobs. A first implementation
+                    based on bloom filters can be used and triggered using the 
WebAdmin REST API. See
+                    <a 
href="manage-webadmin.html#Running_blob_garbage_collection">Running blob 
garbage collection</a>.
+                    In order to avoid concurrency issues upon garbage 
collection, we slice the blobs in generation, the two more recent
+                    generations are not garbage collected.</dd>
+                <dd><strong>deduplication.gc.generation.duration</strong></dd>
+                <dd>Allow controlling the duration of one generation. Longer 
implies better deduplication
+                    but deleted blobs will live longer. Duration, defaults on 
30 days, the default unit is in days.</dd>
+                <dd><strong>deduplication.gc.generation.family</strong></dd>
+                <dd>Every time the duration is changed, this integer counter 
must be incremented to avoid
+                    conflicts. Defaults to 1.</dd>
+                <dd>Upgrade note: If you are upgrading from James 3.5 or 
older, the deduplication was enabled.</dd>
+            </dl>
+
+
+            <subsection name="Cassandra BlobStore Cache">
+                <p>A Cassandra cache can be enabled to reduce latency when 
reading small blobs frequently.
+                    A dedicated keyspace with a replication factor of one is 
then used.
+                    Cache eviction policy is TTL based.
+                    Only blobs below a given threshold will be stored.
+                    To be noted that blobs are stored within a single 
Cassandra row, hence a low threshold should be used.
+                </p>
+                <dl>
+                    <dt><strong>cache.enable</strong></dt>
+                    <dd>DEFAULT: false, optional, must be a boolean. Whether 
the cache should be enabled.</dd>
+                </dl>
+                <dl>
+                    <dt><strong>cache.cassandra.ttl</strong></dt>
+                    <dd>DEFAULT: 7 days, optional, must be a duration. Cache 
eviction policy is TTL based. </dd>
+                </dl>
+                <dl>
+                    <dt><strong>cache.sizeThresholdInBytes</strong></dt>
+                    <dd>DEFAULT: 8192, optional, must be a positive integer. 
Unit: bytes.
+                        Supported units: bytes, Kib, MiB, GiB, TiB
+                        Maximum size of stored objects expressed in bytes.</dd>
+                </dl>
+            </subsection>
+            <subsection name="Encryption choice">
+                <p>
+                    Data can be optionally encrypted with a symmetric key 
using AES before being stored in the blobStore. As many user relies
+                    on third party for object storage, a compromised third 
party will not escalate to a data disclosure. Of course, a
+                    performance price have to be paid, as encryption takes 
resources.
+                </p>
+                <dl>
+                    <dt><strong>encryption.aes.enable</strong></dt>
+                    <dd>Optional boolean, defaults to false</dd>
+                </dl>
+                <p>If AES encryption is enabled, then the following properties 
MUST be present:</p>
+                <dl>
+                    <dt><strong>encryption.aes.password</strong></dt>
+                    <dd>String</dd>
+                </dl>
+                <dl>
+                    <dt><strong>encryption.aes.salt</strong></dt>
+                    <dd>Hexadecimal string.</dd>
+                </dl>
+                <p>If AES encryption is enabled, then the following properties 
COULD be present:</p>
+                <dl>
+                    
<dt><strong>encryption.aes.private.key.algorithm</strong></dt>
+                    <dd>String, defaulting to PBKDF2WithHmacSHA512. Previously 
was
+                        PBKDF2WithHmacSHA1.</dd>
+                </dl>
+
+                <p><b>WARNING:</b> Once chosen this choice can not be 
reverted, all the data is either clear or encrypted. Mixed encryption
+                    is not supported.</p>
+                <p>
+                    Here is an example of how you can generate the above 
values (be mindful to customize the byte lengths in order to add
+                    enough entropy.
+                </p>
+                <pre>
+                    <code>
+# Password generation
+openssl rand -base64 64
+
+# Salt generation
+generate salt with : openssl rand -hex 16
+                    </code>
+                </pre>
+            </subsection>
+            <subsection name="ObjectStorage BlobStore Buckets Configuration">
+                <dl>
+                    <dt><strong>objectstorage.bucketPrefix</strong></dt>
+                    <dd>
+                        Bucket is an concept in James and similar to 
Containers in Swift or Buckets in AWS S3.
+                        BucketPrefix is the prefix of bucket names in James 
BlobStore
+                    </dd>
+
+                    <dt><strong>objectstorage.namespace</strong></dt>
+                    <dd>
+                        BlobStore default bucket name. Most of blobs storing 
in BlobStore are inside the default bucket.
+                        Unless a special case like storing blobs of deleted 
messages.
+                    </dd>
+                </dl>
+            </subsection>
+            <subsection name="ObjectStorage Underlying Service Configuration">
+                <subsection name="ObjectStorage AWS S3 Configuration">
+                    <dl>
+                        <dt><strong>objectstorage.s3.endPoint</strong></dt>
+                        <dd>S3 service endpoint</dd>
+
+                        <dt><strong>objectstorage.s3.region</strong></dt>
+                        <dd>S3 region</dd>
+
+                        <dt><strong>objectstorage.s3.accessKeyId</strong></dt>
+                        <dd><a 
href="https://docs.aws.amazon.com/general/latest/gr/aws-sec-cred-types.html#access-keys-and-secret-access-keys";>S3
 access key id</a></dd>
+
+                        <dt><strong>objectstorage.s3.secretKey</strong></dt>
+                        <dd><a 
href="https://docs.aws.amazon.com/general/latest/gr/aws-sec-cred-types.html#access-keys-and-secret-access-keys";>S3
 access key secret</a></dd>
+
+                        
<dt><strong>objectstorage.s3.http.concurrency</strong></dt>
+                        <dd>Allow setting the number of concurrent HTTP 
requests allowed by the Netty driver.</dd>
+
+                        
<dt><strong>objectstorage.s3.truststore.path</strong></dt>
+                        <dd><i>optional:</i> Verify the S3 server certificate 
against this trust store file.</dd>
+
+                        
<dt><strong>objectstorage.s3.truststore.type</strong></dt>
+                        <dd><i>optional:</i> Specify the type of the trust 
store, e.g. JKS, PKCS12</dd>
+
+                        
<dt><strong>objectstorage.s3.truststore.secret</strong></dt>
+                        <dd><i>optional:</i> Use this secret/password to 
access the trust store; default none</dd>
+
+                        
<dt><strong>objectstorage.s3.truststore.algorithm</strong></dt>
+                        <dd><i>optional:</i> Use this specific trust store 
algorithm; default SunX509</dd>
+
+                        <dt><strong>objectstorage.s3.trustall</strong></dt>
+                        <dd><i>optional:</i>  boolean. Defaults to false. 
Cannot be set to true with other trustore options. Wether James should validate
+                        S3 endpoint SSL certificates.</dd>
+
+                        <dt><strong>objectstorage.s3.read.timeout</strong></dt>
+                        <dd><i>optional:</i> HTTP read timeout. duration, 
default value being second. Leaving it empty relies on S3 driver defaults.</dd>
+
+                        
<dt><strong>objectstorage.s3.write.timeout</strong></dt>
+                        <dd><i>optional:</i> HTTP write timeout. duration, 
default value being second. Leaving it empty relies on S3 driver defaults.</dd>
+
+                        
<dt><strong>objectstorage.s3.connection.timeout</strong></dt>
+                        <dd><i>optional:</i> HTTP connection timeout. 
duration, default value being second. Leaving it empty relies on S3 driver 
defaults.</dd>
+
+                        
<dt><strong>objectstorage.s3.in.read.limit</strong></dt>
+                        <dd><i>optional:</i> Object read in memory will be 
rejected if they exceed the size limit exposed here. Size, exemple `100M`.
+                            Supported units: K, M, G, defaults to B if no unit 
is specified. If unspecified, big object won't be prevented
+                            from being loaded in memory. This settings 
complements protocol limits.</dd>
+
+                        
<dt><strong>objectstorage.s3.upload.retry.maxAttempts</strong></dt>
+                        <dd><i>optional:</i> Integer. Default is zero. This 
property specifies the maximum number of retry attempts allowed for failed 
upload operations.</dd>
+
+                        
<dt><strong>objectstorage.s3.upload.retry.backoffDurationMillis</strong></dt>
+                        <dd><i>optional:</i> Long (Milliseconds). Default is 
10 (miliseconds).
+                            Only takes effect when the 
"objectstorage.s3.upload.retry.maxAttempts" property is declared.
+                            This property determines the duration (in 
milliseconds) to wait between retry attempts for failed upload operations.
+                            This delay is known as backoff. The jitter factor 
is 0.5</dd>
+
+                    </dl>
+                </subsection>
+            </subsection>
+            <subsection name="SSE-C Configuration">
+                <dl>
+                    <dt><strong>encryption.s3.sse.c.enable</strong></dt>
+                    <dd>
+                        optional: Boolean. Default is false.
+                        Controls whether to use Server-Side Encryption with 
Customer-Provided Keys (SSE-C) for S3 blobs
+                    </dd>
+
+                    
<dt><strong>encryption.s3.sse.c.master.key.algorithm</strong></dt>
+                    <dd>
+                        String. Required if `encryption.s3.sse.c.enable` is 
true.
+                        The algorithm used to derive the master key from the 
provided password. Eg: AES256
+                    </dd>
+
+                    
<dt><strong>encryption.s3.sse.c.master.key.password</strong></dt>
+                    <dd>
+                        String. Required if `encryption.s3.sse.c.enable` is 
true.
+                        The password used to generate the customer key.
+                    </dd>
+
+                    
<dt><strong>encryption.s3.sse.c.master.key.salt</strong></dt>
+                    <dd>
+                        String. Required if `encryption.s3.sse.c.enable` is 
true.
+                        The salt used to generate the customer key.
+                    </dd>
+
+                </dl>
+            </subsection>
+        </section>
+
+    </body>
+
+</document>
diff --git a/server/blob/pom.xml b/server/blob/pom.xml
index 1644a0e15a..508296036d 100644
--- a/server/blob/pom.xml
+++ b/server/blob/pom.xml
@@ -44,6 +44,7 @@
         <module>blob-postgres</module>
         <module>blob-s3</module>
         <module>blob-storage-strategy</module>
+        <module>blob-zstd</module>
 
         <module>mail-store</module>
     </modules>


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to