This is an automated email from the ASF dual-hosted git repository.
quantranhong1999 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/james-project.git
The following commit(s) were added to refs/heads/master by this push:
new 5792107ba0 JAMES-4182 Implement ZstdBlobStoreDAO (#3016)
5792107ba0 is described below
commit 5792107ba06bbad84d9ffea1db378b16237f5b26
Author: Trần Hồng Quân <[email protected]>
AuthorDate: Wed Apr 22 15:43:31 2026 +0700
JAMES-4182 Implement ZstdBlobStoreDAO (#3016)
---
pom.xml | 11 +
server/blob/blob-zstd/pom.xml | 96 +++++
.../james/blob/zstd/CompressionConfiguration.java | 75 ++++
.../apache/james/blob/zstd/ZstdBlobStoreDAO.java | 465 +++++++++++++++++++++
.../blob/zstd/CompressionConfigurationTest.java | 97 +++++
.../james/blob/zstd/ZstdBlobStoreDAOTest.java | 453 ++++++++++++++++++++
.../blob-zstd/src/test/resources/zstd/document.pdf | Bin 0 -> 49672 bytes
.../src/test/resources/zstd/james-logo.jpg | Bin 0 -> 22168 bytes
.../blob-zstd/src/test/resources/zstd/mail1.eml | 69 +++
.../blob-zstd/src/test/resources/zstd/text.txt | 249 +++++++++++
server/blob/pom.xml | 1 +
11 files changed, 1516 insertions(+)
diff --git a/pom.xml b/pom.xml
index c354a41ac8..3fae88f68a 100644
--- a/pom.xml
+++ b/pom.xml
@@ -679,6 +679,7 @@
<guice.version>7.0.0</guice.version>
<logback.version>1.5.19</logback.version>
<tink.version>1.17.0</tink.version>
+ <zstd-jni.version>1.5.7-7</zstd-jni.version>
<lettuce.core.version>6.7.1.RELEASE</lettuce.core.version>
<io.micrometer.core.version>1.15.1</io.micrometer.core.version>
<io.micrometer.tracing.version>1.5.1</io.micrometer.tracing.version>
@@ -1233,6 +1234,11 @@
<version>${project.version}</version>
<type>test-jar</type>
</dependency>
+ <dependency>
+ <groupId>${james.groupId}</groupId>
+ <artifactId>blob-zstd</artifactId>
+ <version>${project.version}</version>
+ </dependency>
<dependency>
<groupId>${james.groupId}</groupId>
<artifactId>dead-letter-cassandra</artifactId>
@@ -2227,6 +2233,11 @@
<artifactId>throwing-lambdas</artifactId>
<version>0.5.0</version>
</dependency>
+ <dependency>
+ <groupId>com.github.luben</groupId>
+ <artifactId>zstd-jni</artifactId>
+ <version>${zstd-jni.version}</version>
+ </dependency>
<dependency>
<groupId>com.github.spullara.mustache.java</groupId>
<artifactId>compiler</artifactId>
diff --git a/server/blob/blob-zstd/pom.xml b/server/blob/blob-zstd/pom.xml
new file mode 100644
index 0000000000..053921f2cb
--- /dev/null
+++ b/server/blob/blob-zstd/pom.xml
@@ -0,0 +1,96 @@
+<?xml version="1.0" encoding="UTF-8"?>
+<!--
+ Licensed to the Apache Software Foundation (ASF) under one
+ or more contributor license agreements. See the NOTICE file
+ distributed with this work for additional information
+ regarding copyright ownership. The ASF licenses this file
+ to you under the Apache License, Version 2.0 (the
+ "License"); you may not use this file except in compliance
+ with the License. You may obtain a copy of the License at
+
+ http://www.apache.org/licenses/LICENSE-2.0
+
+ Unless required by applicable law or agreed to in writing,
+ software distributed under the License is distributed on an
+ "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ KIND, either express or implied. See the License for the
+ specific language governing permissions and limitations
+ under the License.
+-->
+<project xmlns="http://maven.apache.org/POM/4.0.0"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0
http://maven.apache.org/xsd/maven-4.0.0.xsd">
+ <modelVersion>4.0.0</modelVersion>
+
+ <parent>
+ <groupId>org.apache.james</groupId>
+ <artifactId>james-project</artifactId>
+ <version>3.10.0-SNAPSHOT</version>
+ <relativePath>../../../pom.xml</relativePath>
+ </parent>
+
+ <artifactId>blob-zstd</artifactId>
+ <name>Apache James :: Server :: Blob :: Zstd compression</name>
+
+ <dependencies>
+ <dependency>
+ <groupId>${james.groupId}</groupId>
+ <artifactId>blob-api</artifactId>
+ </dependency>
+ <dependency>
+ <groupId>${james.groupId}</groupId>
+ <artifactId>blob-api</artifactId>
+ <type>test-jar</type>
+ <scope>test</scope>
+ </dependency>
+ <dependency>
+ <groupId>${james.groupId}</groupId>
+ <artifactId>blob-s3</artifactId>
+ <scope>test</scope>
+ </dependency>
+ <dependency>
+ <groupId>${james.groupId}</groupId>
+ <artifactId>blob-s3</artifactId>
+ <type>test-jar</type>
+ <scope>test</scope>
+ </dependency>
+ <dependency>
+ <groupId>${james.groupId}</groupId>
+ <artifactId>metrics-api</artifactId>
+ </dependency>
+ <dependency>
+ <groupId>${james.groupId}</groupId>
+ <artifactId>metrics-tests</artifactId>
+ <scope>test</scope>
+ </dependency>
+ <dependency>
+ <groupId>${james.groupId}</groupId>
+ <artifactId>testing-base</artifactId>
+ <scope>test</scope>
+ </dependency>
+ <dependency>
+ <groupId>com.github.fge</groupId>
+ <artifactId>throwing-lambdas</artifactId>
+ </dependency>
+ <dependency>
+ <groupId>com.github.luben</groupId>
+ <artifactId>zstd-jni</artifactId>
+ </dependency>
+ <dependency>
+ <groupId>com.google.guava</groupId>
+ <artifactId>guava</artifactId>
+ </dependency>
+ <dependency>
+ <groupId>io.projectreactor</groupId>
+ <artifactId>reactor-core</artifactId>
+ </dependency>
+ <dependency>
+ <groupId>org.testcontainers</groupId>
+ <artifactId>junit-jupiter</artifactId>
+ <scope>test</scope>
+ </dependency>
+ <dependency>
+ <groupId>org.testcontainers</groupId>
+ <artifactId>testcontainers</artifactId>
+ <scope>test</scope>
+ </dependency>
+ </dependencies>
+</project>
diff --git
a/server/blob/blob-zstd/src/main/java/org/apache/james/blob/zstd/CompressionConfiguration.java
b/server/blob/blob-zstd/src/main/java/org/apache/james/blob/zstd/CompressionConfiguration.java
new file mode 100644
index 0000000000..2d35010ec6
--- /dev/null
+++
b/server/blob/blob-zstd/src/main/java/org/apache/james/blob/zstd/CompressionConfiguration.java
@@ -0,0 +1,75 @@
+/****************************************************************
+ * Licensed to the Apache Software Foundation (ASF) under one *
+ * or more contributor license agreements. See the NOTICE file *
+ * distributed with this work for additional information *
+ * regarding copyright ownership. The ASF licenses this file *
+ * to you under the Apache License, Version 2.0 (the *
+ * "License"); you may not use this file except in compliance *
+ * with the License. You may obtain a copy of the License at *
+ * *
+ * http://www.apache.org/licenses/LICENSE-2.0 *
+ * *
+ * Unless required by applicable law or agreed to in writing, *
+ * software distributed under the License is distributed on an *
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY *
+ * KIND, either express or implied. See the License for the *
+ * specific language governing permissions and limitations *
+ * under the License. *
+ ****************************************************************/
+
+package org.apache.james.blob.zstd;
+
+public record CompressionConfiguration(boolean enabled, long threshold, float
minRatio) {
+ public static final boolean DISABLED = false;
+ public static final long DEFAULT_THRESHOLD = 16 * 1024L;
+ public static final float DEFAULT_MIN_RATIO = 1F;
+ public static final CompressionConfiguration DEFAULT = builder().build();
+
+ public static class Builder {
+ private boolean enabled = DISABLED;
+ private long threshold = DEFAULT_THRESHOLD;
+ private float minRatio = DEFAULT_MIN_RATIO;
+
+ public Builder enabled(boolean enabled) {
+ this.enabled = enabled;
+ return this;
+ }
+
+ public Builder threshold(long threshold) {
+ if (threshold <= 0) {
+ throw new IllegalArgumentException("'threshold' needs to be
strictly positive");
+ }
+ this.threshold = threshold;
+ return this;
+ }
+
+ public Builder minRatio(float minRatio) {
+ if (minRatio < 0 || minRatio > 1) {
+ throw new IllegalArgumentException("'minRatio' needs to be
between 0 and 1");
+ }
+ this.minRatio = minRatio;
+ return this;
+ }
+
+ public CompressionConfiguration build() {
+ return new CompressionConfiguration(enabled, threshold, minRatio);
+ }
+ }
+
+ public CompressionConfiguration {
+ if (threshold <= 0) {
+ throw new IllegalArgumentException("'threshold' needs to be
strictly positive");
+ }
+ if (minRatio < 0 || minRatio > 1) {
+ throw new IllegalArgumentException("'minRatio' needs to be between
0 and 1");
+ }
+ }
+
+ public static Builder builder() {
+ return new Builder();
+ }
+
+ public static CompressionConfiguration disabled() {
+ return DEFAULT;
+ }
+}
diff --git
a/server/blob/blob-zstd/src/main/java/org/apache/james/blob/zstd/ZstdBlobStoreDAO.java
b/server/blob/blob-zstd/src/main/java/org/apache/james/blob/zstd/ZstdBlobStoreDAO.java
new file mode 100644
index 0000000000..1425b5c3b5
--- /dev/null
+++
b/server/blob/blob-zstd/src/main/java/org/apache/james/blob/zstd/ZstdBlobStoreDAO.java
@@ -0,0 +1,465 @@
+/****************************************************************
+ * Licensed to the Apache Software Foundation (ASF) under one *
+ * or more contributor license agreements. See the NOTICE file *
+ * distributed with this work for additional information *
+ * regarding copyright ownership. The ASF licenses this file *
+ * to you under the Apache License, Version 2.0 (the *
+ * "License"); you may not use this file except in compliance *
+ * with the License. You may obtain a copy of the License at *
+ * *
+ * http://www.apache.org/licenses/LICENSE-2.0 *
+ * *
+ * Unless required by applicable law or agreed to in writing, *
+ * software distributed under the License is distributed on an *
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY *
+ * KIND, either express or implied. See the License for the *
+ * specific language governing permissions and limitations *
+ * under the License. *
+ ****************************************************************/
+
+package org.apache.james.blob.zstd;
+
+import java.io.IOException;
+import java.io.InputStream;
+import java.util.Collection;
+import java.util.Set;
+import java.util.function.Function;
+import java.util.function.Supplier;
+import java.util.stream.Collectors;
+
+import org.apache.james.blob.api.BlobId;
+import org.apache.james.blob.api.BlobStoreDAO;
+import org.apache.james.blob.api.BucketName;
+import org.apache.james.blob.api.ObjectNotFoundException;
+import org.apache.james.blob.api.ObjectStoreIOException;
+import org.apache.james.metrics.api.Metric;
+import org.apache.james.metrics.api.MetricFactory;
+import org.apache.james.metrics.api.TimeMetric;
+import org.reactivestreams.Publisher;
+
+import com.github.fge.lambdas.Throwing;
+import com.github.luben.zstd.Zstd;
+import com.github.luben.zstd.ZstdInputStream;
+import com.github.luben.zstd.ZstdOutputStream;
+import com.google.common.io.ByteSource;
+import com.google.common.io.CountingOutputStream;
+import com.google.common.io.FileBackedOutputStream;
+
+import reactor.core.publisher.Mono;
+import reactor.core.scheduler.Schedulers;
+
+public class ZstdBlobStoreDAO implements BlobStoreDAO {
+ private record CompressionDecision(long originalSize, long compressedSize)
{
+ boolean satisfyCompressionMinRatio(float minCompressionRatio) {
+ // minCompressionRatio == 0 means decompress-only mode: never
compress on save
+ if (minCompressionRatio == 0) {
+ return false;
+ }
+
+ return ((float) compressedSize / originalSize) <=
minCompressionRatio;
+ }
+ }
+
+ static final class MetricRecorder {
+ static final String BLOB_ZSTD_COMPRESS_SAVE_COUNT_METRIC_NAME =
"blobZstdCompressSaveCount";
+ static final String BLOB_ZSTD_THRESHOLD_SKIP_COUNT_METRIC_NAME =
"blobZstdThresholdSkipCount";
+ static final String BLOB_ZSTD_RATIO_SKIP_COUNT_METRIC_NAME =
"blobZstdRatioSkipCount";
+ static final String BLOB_ZSTD_DECOMPRESS_COUNT_METRIC_NAME =
"blobZstdDecompressCount";
+ static final String BLOB_ZSTD_SAVED_BYTES_METRIC_NAME =
"blobZstdSavedBytes";
+ static final String BLOB_ZSTD_COMPRESS_LATENCY_METRIC_NAME =
"blobZstdCompressLatency";
+ static final String BLOB_ZSTD_DECOMPRESS_LATENCY_METRIC_NAME =
"blobZstdDecompressLatency";
+
+ private final MetricFactory metricFactory;
+ private final Metric compressSaveCount;
+ private final Metric thresholdSkipCount;
+ private final Metric ratioSkipCount;
+ private final Metric decompressCount;
+ private final Metric savedBytes;
+
+ private MetricRecorder(MetricFactory metricFactory) {
+ this.metricFactory = metricFactory;
+ this.compressSaveCount =
metricFactory.generate(BLOB_ZSTD_COMPRESS_SAVE_COUNT_METRIC_NAME);
+ this.thresholdSkipCount =
metricFactory.generate(BLOB_ZSTD_THRESHOLD_SKIP_COUNT_METRIC_NAME);
+ this.ratioSkipCount =
metricFactory.generate(BLOB_ZSTD_RATIO_SKIP_COUNT_METRIC_NAME);
+ this.decompressCount =
metricFactory.generate(BLOB_ZSTD_DECOMPRESS_COUNT_METRIC_NAME);
+ this.savedBytes =
metricFactory.generate(BLOB_ZSTD_SAVED_BYTES_METRIC_NAME);
+ }
+
+ TimeMetric startCompressionLatencyTimer() {
+ return metricFactory.timer(BLOB_ZSTD_COMPRESS_LATENCY_METRIC_NAME);
+ }
+
+ TimeMetric startDecompressionLatencyTimer() {
+ return
metricFactory.timer(BLOB_ZSTD_DECOMPRESS_LATENCY_METRIC_NAME);
+ }
+
+ void recordCompressedSave(long originalSize, long compressedSize) {
+ compressSaveCount.increment();
+ savedBytes.add(Math.toIntExact(originalSize - compressedSize));
+ }
+
+ void recordThresholdSkip() {
+ thresholdSkipCount.increment();
+ }
+
+ void recordRatioSkip() {
+ ratioSkipCount.increment();
+ }
+
+ void recordDecompression() {
+ decompressCount.increment();
+ }
+ }
+
+ public static final BlobMetadataName CONTENT_ORIGINAL_SIZE = new
BlobMetadataName("content-original-size");
+ private static final int FILE_THRESHOLD = 100 * 1024;
+ private static final Set<BlobMetadataName> RESERVED_METADATA_NAMES =
Set.of(ContentTransferEncoding.NAME, CONTENT_ORIGINAL_SIZE);
+
+ private final BlobStoreDAO underlying;
+ private final CompressionConfiguration compressionConfiguration;
+ private final MetricRecorder metricRecorder;
+
+ public ZstdBlobStoreDAO(BlobStoreDAO underlying, CompressionConfiguration
compressionConfiguration, MetricFactory metricFactory) {
+ this.underlying = underlying;
+ this.compressionConfiguration = compressionConfiguration;
+ this.metricRecorder = new MetricRecorder(metricFactory);
+ }
+
+ @Override
+ public InputStreamBlob read(BucketName bucketName, BlobId blobId) throws
ObjectStoreIOException, ObjectNotFoundException {
+ InputStreamBlob blob = underlying.read(bucketName, blobId);
+ if (isCompressed(blob.metadata())) {
+ return decompress(blob);
+ }
+ return blob;
+ }
+
+ @Override
+ public Publisher<InputStreamBlob> readReactive(BucketName bucketName,
BlobId blobId) {
+ return Mono.from(underlying.readReactive(bucketName, blobId))
+ .flatMap(blob -> {
+ if (isCompressed(blob.metadata())) {
+ return decompressReactive(blob);
+ }
+ return Mono.just(blob);
+ });
+ }
+
+ @Override
+ public Publisher<BytesBlob> readBytes(BucketName bucketName, BlobId
blobId) {
+ return Mono.from(underlying.readBytes(bucketName, blobId))
+ .flatMap(blob -> {
+ if (isCompressed(blob.metadata())) {
+ return decompressBytesReactive(blob);
+ }
+ return Mono.just(blob);
+ });
+ }
+
+ @Override
+ public Publisher<Void> save(BucketName bucketName, BlobId blobId, Blob
blob) {
+ return switch (blob) {
+ case BytesBlob bytesBlob -> save(bucketName, blobId, bytesBlob);
+ case InputStreamBlob inputStreamBlob -> save(bucketName, blobId,
inputStreamBlob);
+ case ByteSourceBlob byteSourceBlob -> save(bucketName, blobId,
byteSourceBlob);
+ };
+ }
+
+ @Override
+ public Publisher<Void> delete(BucketName bucketName, BlobId blobId) {
+ return underlying.delete(bucketName, blobId);
+ }
+
+ @Override
+ public Publisher<Void> delete(BucketName bucketName, Collection<BlobId>
blobIds) {
+ return underlying.delete(bucketName, blobIds);
+ }
+
+ @Override
+ public Publisher<Void> deleteBucket(BucketName bucketName) {
+ return underlying.deleteBucket(bucketName);
+ }
+
+ @Override
+ public Publisher<BucketName> listBuckets() {
+ return underlying.listBuckets();
+ }
+
+ @Override
+ public Publisher<BlobId> listBlobs(BucketName bucketName) {
+ return underlying.listBlobs(bucketName);
+ }
+
+ private InputStreamBlob decompress(InputStreamBlob blob) throws
ObjectStoreIOException {
+ try {
+ metricRecorder.recordDecompression();
+ return InputStreamBlob.of(new ZstdInputStream(blob.payload()),
blob.metadata());
+ } catch (IOException e) {
+ throw new ObjectStoreIOException("Failed to initialize zstd
decompression", e);
+ }
+ }
+
+ private Mono<InputStreamBlob> decompressReactive(InputStreamBlob blob) {
+ return Mono.fromCallable(() -> InputStreamBlob.of(new
ZstdInputStream(blob.payload()), blob.metadata()))
+ .doOnNext(ignored -> metricRecorder.recordDecompression())
+ .onErrorMap(IOException.class, e -> new
ObjectStoreIOException("Failed to initialize zstd decompression", e));
+ }
+
+ private Mono<BytesBlob> decompressBytesReactive(BytesBlob blob) {
+ return Mono.fromCallable(() -> decompress(blob))
+ .subscribeOn(Schedulers.parallel())
+ .doOnNext(ignored -> metricRecorder.recordDecompression())
+ .onErrorMap(IOException.class, e -> new
ObjectStoreIOException("Failed to decompress blob", e));
+ }
+
+ private BytesBlob decompress(BytesBlob blob) throws IOException {
+ TimeMetric timeMetric =
metricRecorder.startDecompressionLatencyTimer();
+ try {
+ return BytesBlob.of(Zstd.decompress(blob.payload(),
originalSize(blob.metadata())), blob.metadata());
+ } finally {
+ timeMetric.stopAndPublish();
+ }
+ }
+
+ private int originalSize(BlobMetadata metadata) throws IOException {
+ BlobMetadataValue sizeMetadata = metadata.get(CONTENT_ORIGINAL_SIZE)
+ .orElseThrow(() -> new IOException("Missing " +
CONTENT_ORIGINAL_SIZE.name() + " metadata for compressed blob"));
+
+ try {
+ long originalSize = Long.parseLong(sizeMetadata.value());
+ return Math.toIntExact(originalSize);
+ } catch (NumberFormatException | ArithmeticException e) {
+ throw new IOException("Invalid " + CONTENT_ORIGINAL_SIZE.name() +
" metadata value: " + sizeMetadata.value(), e);
+ }
+ }
+
+ private Publisher<Void> save(BucketName bucketName, BlobId blobId,
BytesBlob bytesBlob) {
+ validateMetadata(bytesBlob.metadata());
+
+ if (shouldAttemptCompression(bytesBlob.payload().length)) {
+ return Mono.fromCallable(() -> compress(bytesBlob.payload()))
+ .subscribeOn(Schedulers.parallel())
+ .flatMap(compressed ->
saveCompressedIfWorthKeeping(bucketName, blobId,
+ BytesBlob.of(compressed,
withCompressionMetadata(bytesBlob.metadata(), bytesBlob.payload().length)),
+ bytesBlob))
+ .onErrorMap(IOException.class, e -> new
ObjectStoreIOException("Error saving blob " + blobId.asString(), e));
+ }
+
+ return Mono.from(underlying.save(bucketName, blobId, bytesBlob))
+ .doOnSuccess(ignored -> {
+ if (compressionEnabled() && bytesBlob.payload().length <
compressionConfiguration.threshold()) {
+ metricRecorder.recordThresholdSkip();
+ }
+ });
+ }
+
+ private Mono<Void> saveCompressedIfWorthKeeping(BucketName bucketName,
BlobId blobId, BytesBlob compressedBlob,
+ BytesBlob
uncompressedBlob) {
+ CompressionDecision compressionDecision =
compressionDecision(uncompressedBlob.payload().length,
compressedBlob.payload().length);
+ if
(compressionDecision.satisfyCompressionMinRatio(compressionConfiguration.minRatio()))
{
+ return Mono.from(underlying.save(bucketName, blobId,
compressedBlob))
+ .doOnSuccess(ignored ->
metricRecorder.recordCompressedSave(uncompressedBlob.payload().length,
compressedBlob.payload().length));
+ }
+ return Mono.from(underlying.save(bucketName, blobId, uncompressedBlob))
+ .doOnSuccess(ignored -> metricRecorder.recordRatioSkip());
+ }
+
+ private Publisher<Void> save(BucketName bucketName, BlobId blobId,
InputStreamBlob inputStreamBlob) {
+ validateMetadata(inputStreamBlob.metadata());
+
+ if (!compressionEnabled()) {
+ return Mono.from(underlying.save(bucketName, blobId,
inputStreamBlob));
+ }
+
+ return Mono.usingWhen(Mono.fromCallable(() -> new
FileBackedOutputStream(FILE_THRESHOLD)),
+ originalContent -> Mono.fromCallable(() ->
inputStreamBlob.payload().transferTo(originalContent))
+ .flatMap(originalSize -> {
+ if (originalSize >=
compressionConfiguration.threshold()) {
+ return compressAndSave(bucketName, blobId,
originalContent, originalSize, inputStreamBlob.metadata());
+ }
+
+ return Mono.from(underlying.save(bucketName, blobId,
byteSourceBlobWithSize(originalContent.asByteSource(), originalSize,
inputStreamBlob.metadata())))
+ .doOnSuccess(ignored ->
metricRecorder.recordThresholdSkip());
+ }),
+ originalContent ->
Mono.fromRunnable(Throwing.runnable(originalContent::reset)).subscribeOn(Schedulers.boundedElastic()))
+ .subscribeOn(Schedulers.boundedElastic())
+ .onErrorMap(IOException.class, e -> new
ObjectStoreIOException("Error saving blob " + blobId.asString(), e));
+ }
+
+ private Publisher<Void> save(BucketName bucketName, BlobId blobId,
ByteSourceBlob byteSourceBlob) {
+ validateMetadata(byteSourceBlob.metadata());
+
+ if (!compressionEnabled()) {
+ return Mono.from(underlying.save(bucketName, blobId,
byteSourceBlob));
+ }
+
+ return Mono.fromCallable(() -> resolveSize(byteSourceBlob.payload()))
+ .flatMap(originalSize -> {
+ if (originalSize < compressionConfiguration.threshold()) {
+ return Mono.from(underlying.save(bucketName, blobId,
byteSourceBlobWithSize(byteSourceBlob.payload(), originalSize,
byteSourceBlob.metadata())))
+ .doOnSuccess(ignored ->
metricRecorder.recordThresholdSkip());
+ }
+
+ return compressAndSave(bucketName, blobId, byteSourceBlob,
originalSize);
+ })
+ .subscribeOn(Schedulers.boundedElastic())
+ .onErrorMap(IOException.class, e -> new
ObjectStoreIOException("Error saving blob " + blobId.asString(), e));
+ }
+
+ private Mono<Void> compressAndSave(BucketName bucketName, BlobId blobId,
FileBackedOutputStream originalContent,
+ long originalSize, BlobMetadata
metadata) {
+ return compressAndSave(bucketName, blobId, originalSize, metadata,
+ compressContent -> prepareCompressedContent(originalContent,
originalSize, compressContent),
+ () -> Mono.from(underlying.save(bucketName, blobId,
byteSourceBlobWithSize(originalContent.asByteSource(), originalSize, metadata)))
+ .doOnSuccess(ignored -> metricRecorder.recordRatioSkip()));
+ }
+
+ private Mono<Void> compressAndSave(BucketName bucketName, BlobId blobId,
ByteSourceBlob byteSourceBlob, long originalSize) {
+ return compressAndSave(bucketName, blobId, originalSize,
byteSourceBlob.metadata(),
+ compressContent ->
prepareCompressedContent(byteSourceBlob.payload(), originalSize,
compressContent),
+ () -> Mono.from(underlying.save(bucketName, blobId,
byteSourceBlobWithSize(byteSourceBlob.payload(), originalSize,
byteSourceBlob.metadata())))
+ .doOnSuccess(ignored -> metricRecorder.recordRatioSkip()));
+ }
+
+ private Mono<Void> compressAndSave(BucketName bucketName, BlobId blobId,
long originalSize, BlobMetadata metadata,
+ Function<FileBackedOutputStream,
Mono<CompressionDecision>> prepareCompressedContent,
+ Supplier<Mono<Void>> saveOriginal) {
+ return Mono.usingWhen(Mono.fromCallable(() -> new
FileBackedOutputStream(FILE_THRESHOLD)),
+ compressContent ->
prepareCompressedContent.apply(compressContent)
+ .flatMap(compressionDecision ->
saveCompressedIfWorthKeeping(bucketName, blobId, originalSize, metadata,
+ compressContent, compressionDecision, saveOriginal)),
+ compressContent ->
Mono.fromRunnable(Throwing.runnable(compressContent::reset)).subscribeOn(Schedulers.boundedElastic()))
+ .subscribeOn(Schedulers.boundedElastic());
+ }
+
+ private Mono<Void> saveCompressedIfWorthKeeping(BucketName bucketName,
BlobId blobId, long originalSize,
+ BlobMetadata metadata,
FileBackedOutputStream compressedContent,
+ CompressionDecision
compressionDecision, Supplier<Mono<Void>> saveOriginal) {
+ if
(compressionDecision.satisfyCompressionMinRatio(compressionConfiguration.minRatio()))
{
+ return Mono.from(underlying.save(bucketName, blobId,
+ byteSourceBlobWithSize(compressedContent.asByteSource(),
compressionDecision.compressedSize(), withCompressionMetadata(metadata,
originalSize))))
+ .doOnSuccess(ignored ->
metricRecorder.recordCompressedSave(originalSize,
compressionDecision.compressedSize()));
+ }
+
+ return saveOriginal.get();
+ }
+
+ private Mono<CompressionDecision>
prepareCompressedContent(FileBackedOutputStream originalContent, long
originalSize,
+
FileBackedOutputStream compressContent) {
+ return Mono.fromCallable(() -> {
+ try (InputStream originalStream =
originalContent.asByteSource().openStream()) {
+ long compressedSize = compress(originalStream,
compressContent);
+ return compressionDecision(originalSize, compressedSize);
+ }
+ });
+ }
+
+ private Mono<CompressionDecision> prepareCompressedContent(ByteSource
byteSource, long originalSize,
+
FileBackedOutputStream compressContent) {
+ return Mono.fromCallable(() -> {
+ long compressedSize = compress(byteSource, compressContent);
+ return compressionDecision(originalSize, compressedSize);
+ });
+ }
+
+ private byte[] compress(byte[] data) {
+ TimeMetric timeMetric = metricRecorder.startCompressionLatencyTimer();
+ try {
+ return Zstd.compress(data);
+ } finally {
+ timeMetric.stopAndPublish();
+ }
+ }
+
+ private long compress(InputStream inputStream, FileBackedOutputStream
compressedContent) throws IOException {
+ TimeMetric timeMetric = metricRecorder.startCompressionLatencyTimer();
+
+ try (CountingOutputStream countingOutputStream = new
CountingOutputStream(compressedContent);
+ ZstdOutputStream zstdOutputStream = new
ZstdOutputStream(countingOutputStream)) {
+ inputStream.transferTo(zstdOutputStream);
+ zstdOutputStream.close();
+ return countingOutputStream.getCount();
+ } finally {
+ timeMetric.stopAndPublish();
+ }
+ }
+
+ private long compress(ByteSource byteSource, FileBackedOutputStream
compressedContent) throws IOException {
+ TimeMetric timeMetric = metricRecorder.startCompressionLatencyTimer();
+
+ try (CountingOutputStream countingOutputStream = new
CountingOutputStream(compressedContent);
+ ZstdOutputStream zstdOutputStream = new
ZstdOutputStream(countingOutputStream)) {
+ byteSource.copyTo(zstdOutputStream);
+ zstdOutputStream.close();
+ return countingOutputStream.getCount();
+ } finally {
+ timeMetric.stopAndPublish();
+ }
+ }
+
+ private boolean shouldAttemptCompression(long originalSize) {
+ return compressionEnabled()
+ && originalSize >= compressionConfiguration.threshold();
+ }
+
+ private boolean compressionEnabled() {
+ return compressionConfiguration.enabled()
+ && compressionConfiguration.minRatio() > 0; // minRatio == 0 means
decompress-only mode: never compress on save
+ }
+
+ private CompressionDecision compressionDecision(long originalSize, long
compressedSize) {
+ return new CompressionDecision(originalSize, compressedSize);
+ }
+
+ private boolean isCompressed(BlobMetadata metadata) {
+ return metadata.contentTransferEncoding()
+ .filter(ContentTransferEncoding.ZSTD::equals)
+ .isPresent();
+ }
+
+ private long resolveSize(ByteSource byteSource) throws IOException {
+ if (byteSource.sizeIfKnown().isPresent()) {
+ return byteSource.sizeIfKnown().get();
+ }
+
+ return byteSource.size();
+ }
+
+ private void validateMetadata(BlobMetadata metadata) {
+ Set<BlobMetadataName> reservedNames = metadata.underlyingMap()
+ .keySet()
+ .stream()
+ .filter(RESERVED_METADATA_NAMES::contains)
+ .collect(Collectors.toSet());
+
+ if (!reservedNames.isEmpty()) {
+ throw new IllegalArgumentException("Reserved zstd metadata are not
allowed: " + reservedNames);
+ }
+ }
+
+ private BlobMetadata withCompressionMetadata(BlobMetadata metadata, long
originalSize) {
+ return metadata
+ .withContentTransferEncoding(ContentTransferEncoding.ZSTD)
+ .withMetadata(CONTENT_ORIGINAL_SIZE, new
BlobMetadataValue(String.valueOf(originalSize)));
+ }
+
+ private ByteSourceBlob byteSourceBlobWithSize(ByteSource byteSource, long
size, BlobMetadata metadata) {
+ return ByteSourceBlob.of(new ByteSource() {
+ @Override
+ public InputStream openStream() throws IOException {
+ return byteSource.openStream();
+ }
+
+ @Override
+ public com.google.common.base.Optional<Long> sizeIfKnown() {
+ return com.google.common.base.Optional.of(size);
+ }
+
+ @Override
+ public long size() {
+ return size;
+ }
+ }, metadata);
+ }
+
+}
diff --git
a/server/blob/blob-zstd/src/test/java/org/apache/james/blob/zstd/CompressionConfigurationTest.java
b/server/blob/blob-zstd/src/test/java/org/apache/james/blob/zstd/CompressionConfigurationTest.java
new file mode 100644
index 0000000000..18a8f5cfb0
--- /dev/null
+++
b/server/blob/blob-zstd/src/test/java/org/apache/james/blob/zstd/CompressionConfigurationTest.java
@@ -0,0 +1,97 @@
+/****************************************************************
+ * Licensed to the Apache Software Foundation (ASF) under one *
+ * or more contributor license agreements. See the NOTICE file *
+ * distributed with this work for additional information *
+ * regarding copyright ownership. The ASF licenses this file *
+ * to you under the Apache License, Version 2.0 (the *
+ * "License"); you may not use this file except in compliance *
+ * with the License. You may obtain a copy of the License at *
+ * *
+ * http://www.apache.org/licenses/LICENSE-2.0 *
+ * *
+ * Unless required by applicable law or agreed to in writing, *
+ * software distributed under the License is distributed on an *
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY *
+ * KIND, either express or implied. See the License for the *
+ * specific language governing permissions and limitations *
+ * under the License. *
+ ****************************************************************/
+
+package org.apache.james.blob.zstd;
+
+import static org.assertj.core.api.Assertions.assertThat;
+import static org.assertj.core.api.Assertions.assertThatThrownBy;
+import static org.assertj.core.api.SoftAssertions.assertSoftly;
+
+import org.junit.jupiter.api.Test;
+
+class CompressionConfigurationTest {
+ @Test
+ void builderShouldReturnDefaultConfigurationByDefault() {
+ CompressionConfiguration compressionConfiguration =
CompressionConfiguration.builder()
+ .build();
+
+ assertSoftly(softly -> {
+ softly.assertThat(compressionConfiguration.enabled()).isFalse();
+
softly.assertThat(compressionConfiguration.threshold()).isEqualTo(16 * 1024L);
+
softly.assertThat(compressionConfiguration.minRatio()).isEqualTo(1F);
+ });
+ }
+
+ @Test
+ void builderShouldSupportEnabledConfiguration() {
+ CompressionConfiguration compressionConfiguration =
CompressionConfiguration.builder()
+ .enabled(true)
+ .threshold(42)
+ .minRatio(0.8F)
+ .build();
+
+ assertSoftly(softly -> {
+ softly.assertThat(compressionConfiguration.enabled()).isTrue();
+
softly.assertThat(compressionConfiguration.threshold()).isEqualTo(42L);
+
softly.assertThat(compressionConfiguration.minRatio()).isEqualTo(0.8F);
+ });
+ }
+
+ @Test
+ void builderShouldSupportZeroMinRatio() {
+ assertThat(CompressionConfiguration.builder()
+ .minRatio(0F)
+ .build()
+ .minRatio())
+ .isZero();
+ }
+
+ @Test
+ void builderShouldThrowWhenThresholdIsZero() {
+ assertThatThrownBy(() -> CompressionConfiguration.builder()
+ .threshold(0))
+ .isInstanceOf(IllegalArgumentException.class)
+ .hasMessage("'threshold' needs to be strictly positive");
+ }
+
+ @Test
+ void builderShouldThrowWhenThresholdIsNegative() {
+ assertThatThrownBy(() -> CompressionConfiguration.builder()
+ .threshold(-1))
+ .isInstanceOf(IllegalArgumentException.class)
+ .hasMessage("'threshold' needs to be strictly positive");
+ }
+
+ @Test
+ void builderShouldThrowWhenMinRatioIsNegative() {
+ assertThatThrownBy(() -> CompressionConfiguration.builder()
+ .minRatio(-0.1F))
+ .isInstanceOf(IllegalArgumentException.class)
+ .hasMessage("'minRatio' needs to be between 0 and 1");
+ }
+
+ @Test
+ void builderShouldThrowWhenMinRatioIsAboveOne() {
+ assertThatThrownBy(() -> CompressionConfiguration.builder()
+ .minRatio(1.1F))
+ .isInstanceOf(IllegalArgumentException.class)
+ .hasMessage("'minRatio' needs to be between 0 and 1");
+ }
+
+}
diff --git
a/server/blob/blob-zstd/src/test/java/org/apache/james/blob/zstd/ZstdBlobStoreDAOTest.java
b/server/blob/blob-zstd/src/test/java/org/apache/james/blob/zstd/ZstdBlobStoreDAOTest.java
new file mode 100644
index 0000000000..67fd2e20f7
--- /dev/null
+++
b/server/blob/blob-zstd/src/test/java/org/apache/james/blob/zstd/ZstdBlobStoreDAOTest.java
@@ -0,0 +1,453 @@
+/****************************************************************
+ * Licensed to the Apache Software Foundation (ASF) under one *
+ * or more contributor license agreements. See the NOTICE file *
+ * distributed with this work for additional information *
+ * regarding copyright ownership. The ASF licenses this file *
+ * to you under the Apache License, Version 2.0 (the *
+ * "License"); you may not use this file except in compliance *
+ * with the License. You may obtain a copy of the License at *
+ * *
+ * http://www.apache.org/licenses/LICENSE-2.0 *
+ * *
+ * Unless required by applicable law or agreed to in writing, *
+ * software distributed under the License is distributed on an *
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY *
+ * KIND, either express or implied. See the License for the *
+ * specific language governing permissions and limitations *
+ * under the License. *
+ ****************************************************************/
+
+package org.apache.james.blob.zstd;
+
+import static org.apache.james.blob.api.BlobStoreDAOFixture.ELEVEN_KILOBYTES;
+import static org.apache.james.blob.api.BlobStoreDAOFixture.SHORT_BYTEARRAY;
+import static org.apache.james.blob.api.BlobStoreDAOFixture.TEST_BLOB_ID;
+import static org.apache.james.blob.api.BlobStoreDAOFixture.TEST_BUCKET_NAME;
+import static
org.apache.james.blob.objectstorage.aws.JamesS3MetricPublisher.DEFAULT_S3_METRICS_PREFIX;
+import static
org.apache.james.blob.objectstorage.aws.S3BlobStoreConfiguration.UPLOAD_RETRY_EXCEPTION_PREDICATE;
+import static org.assertj.core.api.Assertions.assertThat;
+import static org.assertj.core.api.Assertions.assertThatThrownBy;
+import static org.assertj.core.api.SoftAssertions.assertSoftly;
+
+import java.io.ByteArrayInputStream;
+import java.io.IOException;
+import java.io.InputStream;
+import java.time.Duration;
+import java.util.Optional;
+import java.util.Random;
+import java.util.stream.Stream;
+
+import org.apache.james.blob.api.BlobStoreDAO;
+import org.apache.james.blob.api.BlobStoreDAOContract;
+import org.apache.james.blob.api.BucketName;
+import org.apache.james.blob.api.MetadataAwareBlobStoreDAOContract;
+import org.apache.james.blob.api.TestBlobId;
+import org.apache.james.blob.objectstorage.aws.AwsS3AuthConfiguration;
+import org.apache.james.blob.objectstorage.aws.DockerAwsS3Container;
+import org.apache.james.blob.objectstorage.aws.DockerAwsS3Extension;
+import org.apache.james.blob.objectstorage.aws.JamesS3MetricPublisher;
+import org.apache.james.blob.objectstorage.aws.S3BlobStoreConfiguration;
+import org.apache.james.blob.objectstorage.aws.S3BlobStoreDAO;
+import org.apache.james.blob.objectstorage.aws.S3ClientFactory;
+import org.apache.james.blob.objectstorage.aws.S3RequestOption;
+import org.apache.james.metrics.api.NoopGaugeRegistry;
+import org.apache.james.metrics.tests.RecordingMetricFactory;
+import org.junit.jupiter.api.AfterAll;
+import org.junit.jupiter.api.AfterEach;
+import org.junit.jupiter.api.BeforeAll;
+import org.junit.jupiter.api.BeforeEach;
+import org.junit.jupiter.api.Test;
+import org.junit.jupiter.api.extension.ExtendWith;
+import org.junit.jupiter.params.ParameterizedTest;
+import org.junit.jupiter.params.provider.Arguments;
+import org.junit.jupiter.params.provider.MethodSource;
+
+import com.google.common.io.ByteSource;
+
+import reactor.core.publisher.Mono;
+import reactor.util.retry.Retry;
+
+@ExtendWith(DockerAwsS3Extension.class)
+class ZstdBlobStoreDAOTest implements BlobStoreDAOContract,
MetadataAwareBlobStoreDAOContract {
+ private static final CompressionConfiguration
DEFAULT_COMPRESSION_CONFIGURATION = CompressionConfiguration.builder()
+ .enabled(true)
+ .build();
+ private static final BucketName FALLBACK_BUCKET =
BucketName.of("fallback");
+
+ private static S3BlobStoreDAO underlying;
+ private static S3ClientFactory s3ClientFactory;
+
+ private RecordingMetricFactory metricFactory;
+ private ZstdBlobStoreDAO testee;
+
+ @BeforeAll
+ static void setUp(DockerAwsS3Container dockerAwsS3) {
+ AwsS3AuthConfiguration authConfiguration =
AwsS3AuthConfiguration.builder()
+ .endpoint(dockerAwsS3.getEndpoint())
+ .accessKeyId(DockerAwsS3Container.ACCESS_KEY_ID)
+ .secretKey(DockerAwsS3Container.SECRET_ACCESS_KEY)
+ .build();
+
+ S3BlobStoreConfiguration s3Configuration =
S3BlobStoreConfiguration.builder()
+ .authConfiguration(authConfiguration)
+ .region(dockerAwsS3.dockerAwsS3().region())
+ .uploadRetrySpec(Optional.of(Retry.backoff(3,
Duration.ofSeconds(1))
+ .filter(UPLOAD_RETRY_EXCEPTION_PREDICATE)))
+ .defaultBucketName(BucketName.DEFAULT)
+ .fallbackBucketName(Optional.of(FALLBACK_BUCKET))
+ .build();
+
+ s3ClientFactory = new S3ClientFactory(s3Configuration, () -> new
JamesS3MetricPublisher(new RecordingMetricFactory(),
+ new NoopGaugeRegistry(), DEFAULT_S3_METRICS_PREFIX));
+ underlying = new S3BlobStoreDAO(s3ClientFactory, s3Configuration, new
TestBlobId.Factory(), S3RequestOption.DEFAULT);
+ }
+
+ @AfterAll
+ static void tearDownClass() {
+ if (s3ClientFactory != null) {
+ s3ClientFactory.close();
+ }
+ }
+
+ @BeforeEach
+ void setUp() {
+ metricFactory = new RecordingMetricFactory();
+ testee = new ZstdBlobStoreDAO(underlying,
DEFAULT_COMPRESSION_CONFIGURATION, metricFactory);
+ }
+
+ @AfterEach
+ void tearDown() {
+ if (underlying != null) {
+ underlying.deleteAllBuckets().block();
+ }
+ }
+
+ @Override
+ public BlobStoreDAO testee() {
+ return testee;
+ }
+
+ @Override
+ @Test
+ public void retrieveContentTransferEncodingShouldSucceed() {
+ ZstdBlobStoreDAO localTestee = new ZstdBlobStoreDAO(underlying,
+ CompressionConfiguration.builder()
+ .enabled(true)
+ .threshold(1)
+ .build(),
+ metricFactory);
+
+ // should compress and append content-transfer-encoding metadata, when
threshold is met.
+ Mono.from(localTestee.save(TEST_BUCKET_NAME, TEST_BLOB_ID,
ELEVEN_KILOBYTES)).block();
+
+ assertThat(Mono.from(localTestee.readBytes(TEST_BUCKET_NAME,
TEST_BLOB_ID)).block().metadata().contentTransferEncoding())
+ .contains(BlobStoreDAO.ContentTransferEncoding.ZSTD);
+ }
+
+ @Test
+ void shouldPreserveCallerMetadataWhenCompressionHappens() {
+ ZstdBlobStoreDAO localTestee = new ZstdBlobStoreDAO(underlying,
+ CompressionConfiguration.builder()
+ .enabled(true)
+ .threshold(1)
+ .build(),
+ metricFactory);
+ BlobStoreDAO.BlobMetadata metadata = BlobStoreDAO.BlobMetadata.empty()
+ .withMetadata(new BlobStoreDAO.BlobMetadataName("name"), new
BlobStoreDAO.BlobMetadataValue("value"))
+ .withMetadata(new BlobStoreDAO.BlobMetadataName("type"), new
BlobStoreDAO.BlobMetadataValue("attachment"));
+ BlobStoreDAO.BytesBlob blob =
BlobStoreDAO.BytesBlob.of(ELEVEN_KILOBYTES.payload(), metadata);
+
+ Mono.from(localTestee.save(TEST_BUCKET_NAME, TEST_BLOB_ID,
blob)).block();
+
+ BlobStoreDAO.BytesBlob readBlob =
Mono.from(localTestee.readBytes(TEST_BUCKET_NAME, TEST_BLOB_ID)).block();
+
+ assertSoftly(softly -> {
+ softly.assertThat(readBlob.metadata().underlyingMap())
+ .containsEntry(new BlobStoreDAO.BlobMetadataName("name"), new
BlobStoreDAO.BlobMetadataValue("value"))
+ .containsEntry(new BlobStoreDAO.BlobMetadataName("type"), new
BlobStoreDAO.BlobMetadataValue("attachment"))
+ .containsEntry(BlobStoreDAO.ContentTransferEncoding.NAME,
BlobStoreDAO.ContentTransferEncoding.ZSTD.asValue())
+ .containsEntry(ZstdBlobStoreDAO.CONTENT_ORIGINAL_SIZE,
+ new
BlobStoreDAO.BlobMetadataValue(String.valueOf(ELEVEN_KILOBYTES.payload().length)));
+ });
+ }
+
+ @Test
+ void readExistingNonCompressedBlobShouldSucceed() {
+ Mono.from(underlying.save(TEST_BUCKET_NAME, TEST_BLOB_ID,
ELEVEN_KILOBYTES)).block();
+
+ assertThat(Mono.from(testee.readBytes(TEST_BUCKET_NAME,
TEST_BLOB_ID)).block())
+ .isEqualTo(ELEVEN_KILOBYTES);
+ }
+
+ @Test
+ void shouldNotCompressBlobIfThresholdIsNotMet() {
+ ZstdBlobStoreDAO localTestee = new ZstdBlobStoreDAO(underlying,
+ CompressionConfiguration.builder()
+ .enabled(true)
+ .threshold(16 * 1024)
+ .build(),
+ metricFactory);
+
+ Mono.from(localTestee.save(TEST_BUCKET_NAME, TEST_BLOB_ID,
SHORT_BYTEARRAY)).block();
+
+ BlobStoreDAO.BytesBlob storedBlob =
Mono.from(underlying.readBytes(TEST_BUCKET_NAME, TEST_BLOB_ID)).block();
+
+ assertSoftly(softly -> {
+
softly.assertThat(storedBlob.payload()).isEqualTo(SHORT_BYTEARRAY.payload());
+
softly.assertThat(storedBlob.metadata().contentTransferEncoding()).isEmpty();
+
softly.assertThat(storedBlob.metadata().get(ZstdBlobStoreDAO.CONTENT_ORIGINAL_SIZE)).isEmpty();
+ });
+ }
+
+ @Test
+ void readShouldDecompressCompressedBlob() throws IOException {
+ ZstdBlobStoreDAO localTestee = new ZstdBlobStoreDAO(underlying,
+ CompressionConfiguration.builder()
+ .enabled(true)
+ .threshold(1)
+ .build(),
+ metricFactory);
+
+ Mono.from(localTestee.save(TEST_BUCKET_NAME, TEST_BLOB_ID,
ELEVEN_KILOBYTES)).block();
+
+ BlobStoreDAO.InputStreamBlob readBlob =
localTestee.read(TEST_BUCKET_NAME, TEST_BLOB_ID);
+ byte[] payload = readBlob.payload().readAllBytes();
+
+ assertSoftly(softly -> {
+ softly.assertThat(payload).isEqualTo(ELEVEN_KILOBYTES.payload());
+
softly.assertThat(readBlob.metadata().contentTransferEncoding()).contains(BlobStoreDAO.ContentTransferEncoding.ZSTD);
+
softly.assertThat(readBlob.metadata().get(ZstdBlobStoreDAO.CONTENT_ORIGINAL_SIZE))
+ .contains(new
BlobStoreDAO.BlobMetadataValue(String.valueOf(ELEVEN_KILOBYTES.payload().length)));
+ });
+ }
+
+ @Test
+ void readReactiveShouldDecompressCompressedBlob() throws IOException {
+ ZstdBlobStoreDAO localTestee = new ZstdBlobStoreDAO(underlying,
+ CompressionConfiguration.builder()
+ .enabled(true)
+ .threshold(1)
+ .build(),
+ metricFactory);
+
+ Mono.from(localTestee.save(TEST_BUCKET_NAME, TEST_BLOB_ID,
ELEVEN_KILOBYTES)).block();
+
+ BlobStoreDAO.InputStreamBlob readBlob =
Mono.from(localTestee.readReactive(TEST_BUCKET_NAME, TEST_BLOB_ID)).block();
+ byte[] payload = readBlob.payload().readAllBytes();
+
+ assertSoftly(softly -> {
+ softly.assertThat(payload).isEqualTo(ELEVEN_KILOBYTES.payload());
+
softly.assertThat(readBlob.metadata().contentTransferEncoding()).contains(BlobStoreDAO.ContentTransferEncoding.ZSTD);
+
softly.assertThat(readBlob.metadata().get(ZstdBlobStoreDAO.CONTENT_ORIGINAL_SIZE))
+ .contains(new
BlobStoreDAO.BlobMetadataValue(String.valueOf(ELEVEN_KILOBYTES.payload().length)));
+ });
+ }
+
+ @Test
+ void shouldNotCompressBlobWhenMinRatioIsZero() {
+ ZstdBlobStoreDAO localTestee = new ZstdBlobStoreDAO(underlying,
+ CompressionConfiguration.builder()
+ .enabled(true)
+ .threshold(1)
+ .minRatio(0F)
+ .build(),
+ metricFactory);
+
+ Mono.from(localTestee.save(TEST_BUCKET_NAME, TEST_BLOB_ID,
ELEVEN_KILOBYTES)).block();
+
+ BlobStoreDAO.BytesBlob storedBlob =
Mono.from(underlying.readBytes(TEST_BUCKET_NAME, TEST_BLOB_ID)).block();
+
+ assertSoftly(softly -> {
+
softly.assertThat(storedBlob.payload()).isEqualTo(ELEVEN_KILOBYTES.payload());
+
softly.assertThat(storedBlob.metadata().contentTransferEncoding()).isEmpty();
+
softly.assertThat(storedBlob.metadata().get(ZstdBlobStoreDAO.CONTENT_ORIGINAL_SIZE)).isEmpty();
+ });
+ }
+
+ @Test
+ void shouldStillDecompressBlobWhenMinRatioIsZero() {
+ ZstdBlobStoreDAO compressingTestee = new ZstdBlobStoreDAO(underlying,
+ CompressionConfiguration.builder()
+ .enabled(true)
+ .threshold(1)
+ .build(),
+ metricFactory);
+ ZstdBlobStoreDAO uncompressingOnlyTestee = new
ZstdBlobStoreDAO(underlying,
+ CompressionConfiguration.builder()
+ .enabled(true)
+ .threshold(1)
+ .minRatio(0F)
+ .build(),
+ metricFactory);
+
+ Mono.from(compressingTestee.save(TEST_BUCKET_NAME, TEST_BLOB_ID,
ELEVEN_KILOBYTES)).block();
+
+ BlobStoreDAO.BytesBlob readBlob =
Mono.from(uncompressingOnlyTestee.readBytes(TEST_BUCKET_NAME,
TEST_BLOB_ID)).block();
+
+ assertSoftly(softly -> {
+
softly.assertThat(readBlob.payload()).isEqualTo(ELEVEN_KILOBYTES.payload());
+
softly.assertThat(readBlob.metadata().contentTransferEncoding()).contains(BlobStoreDAO.ContentTransferEncoding.ZSTD);
+
softly.assertThat(readBlob.metadata().get(ZstdBlobStoreDAO.CONTENT_ORIGINAL_SIZE))
+ .contains(new
BlobStoreDAO.BlobMetadataValue(String.valueOf(ELEVEN_KILOBYTES.payload().length)));
+ });
+ }
+
+ @Test
+ void shouldNotCompressBlobWhenMinRatioIsNotMet() {
+ ZstdBlobStoreDAO localTestee = new ZstdBlobStoreDAO(underlying,
+ CompressionConfiguration.builder()
+ .enabled(true)
+ .threshold(1)
+ .minRatio(0.5F)
+ .build(),
+ metricFactory);
+ byte[] randomPayload = new byte[4096];
+ new Random(1).nextBytes(randomPayload);
+ BlobStoreDAO.BytesBlob randomBlob =
BlobStoreDAO.BytesBlob.of(randomPayload);
+
+ Mono.from(localTestee.save(TEST_BUCKET_NAME, TEST_BLOB_ID,
randomBlob)).block();
+
+ BlobStoreDAO.BytesBlob storedBlob =
Mono.from(underlying.readBytes(TEST_BUCKET_NAME, TEST_BLOB_ID)).block();
+
+ assertSoftly(softly -> {
+ softly.assertThat(storedBlob.payload()).isEqualTo(randomPayload);
+
softly.assertThat(storedBlob.metadata().contentTransferEncoding()).isEmpty();
+
softly.assertThat(storedBlob.metadata().get(ZstdBlobStoreDAO.CONTENT_ORIGINAL_SIZE)).isEmpty();
+ });
+ }
+
+ @Test
+ void shouldRecordMetrics() {
+ ZstdBlobStoreDAO localTestee = new ZstdBlobStoreDAO(underlying,
+ CompressionConfiguration.builder()
+ .enabled(true)
+ .threshold(1)
+ .build(),
+ metricFactory);
+
+ Mono.from(localTestee.save(TEST_BUCKET_NAME, TEST_BLOB_ID,
ELEVEN_KILOBYTES)).block();
+ Mono.from(localTestee.readBytes(TEST_BUCKET_NAME,
TEST_BLOB_ID)).block();
+ BlobStoreDAO.BytesBlob storedBlob =
Mono.from(underlying.readBytes(TEST_BUCKET_NAME, TEST_BLOB_ID)).block();
+
+ assertSoftly(softly -> {
+
softly.assertThat(metricFactory.countFor(ZstdBlobStoreDAO.MetricRecorder.BLOB_ZSTD_COMPRESS_SAVE_COUNT_METRIC_NAME)).isEqualTo(1);
+
softly.assertThat(metricFactory.countFor(ZstdBlobStoreDAO.MetricRecorder.BLOB_ZSTD_DECOMPRESS_COUNT_METRIC_NAME)).isEqualTo(1);
+
softly.assertThat(metricFactory.countFor(ZstdBlobStoreDAO.MetricRecorder.BLOB_ZSTD_SAVED_BYTES_METRIC_NAME))
+ .isEqualTo(ELEVEN_KILOBYTES.payload().length -
storedBlob.payload().length);
+
softly.assertThat(metricFactory.executionTimesFor(ZstdBlobStoreDAO.MetricRecorder.BLOB_ZSTD_COMPRESS_LATENCY_METRIC_NAME)).hasSize(1);
+
softly.assertThat(metricFactory.executionTimesFor(ZstdBlobStoreDAO.MetricRecorder.BLOB_ZSTD_DECOMPRESS_LATENCY_METRIC_NAME)).hasSize(1);
+ });
+ }
+
+ @Test
+ void shouldRecordThresholdSkipMetricIfThresholdNotMatch() {
+ Mono.from(testee.save(TEST_BUCKET_NAME, TEST_BLOB_ID,
SHORT_BYTEARRAY)).block();
+
+
assertThat(metricFactory.countFor(ZstdBlobStoreDAO.MetricRecorder.BLOB_ZSTD_THRESHOLD_SKIP_COUNT_METRIC_NAME))
+ .isEqualTo(1);
+ }
+
+ @ParameterizedTest
+ @MethodSource("blobsWithReservedCompressionMetadata")
+ void saveShouldRejectReservedCompressionMetadata(BlobStoreDAO.Blob blob) {
+ assertThatThrownBy(() -> Mono.from(testee.save(TEST_BUCKET_NAME,
TEST_BLOB_ID, blob)).block())
+ .isInstanceOf(IllegalArgumentException.class)
+ .hasMessageContaining("Reserved zstd metadata are not allowed");
+ }
+
+ @ParameterizedTest
+ @MethodSource("compressionSamples")
+ void readBytesShouldRoundTripCompressedResourcesWithoutCorruption(String
resourcePath) throws IOException {
+ ZstdBlobStoreDAO localTestee = new ZstdBlobStoreDAO(underlying,
+ CompressionConfiguration.builder()
+ .enabled(true)
+ .threshold(1)
+ .build(),
+ metricFactory);
+ byte[] resourceBytes = readResource(resourcePath);
+
+ Mono.from(localTestee.save(TEST_BUCKET_NAME, TEST_BLOB_ID,
BlobStoreDAO.BytesBlob.of(resourceBytes))).block();
+
+ BlobStoreDAO.BytesBlob readBlob =
Mono.from(localTestee.readBytes(TEST_BUCKET_NAME, TEST_BLOB_ID)).block();
+
+ assertSoftly(softly -> {
+ softly.assertThat(readBlob.payload()).isEqualTo(resourceBytes);
+
softly.assertThat(readBlob.metadata().contentTransferEncoding()).contains(BlobStoreDAO.ContentTransferEncoding.ZSTD);
+
softly.assertThat(readBlob.metadata().get(ZstdBlobStoreDAO.CONTENT_ORIGINAL_SIZE))
+ .contains(new
BlobStoreDAO.BlobMetadataValue(String.valueOf(resourceBytes.length)));
+ });
+ }
+
+ @ParameterizedTest
+ @MethodSource("compressionSamples")
+ void readShouldRoundTripCompressedResourcesWithoutCorruption(String
resourcePath) throws IOException {
+ ZstdBlobStoreDAO localTestee = new ZstdBlobStoreDAO(underlying,
+ CompressionConfiguration.builder()
+ .enabled(true)
+ .threshold(1)
+ .build(),
+ metricFactory);
+ byte[] resourceBytes = readResource(resourcePath);
+
+ Mono.from(localTestee.save(TEST_BUCKET_NAME, TEST_BLOB_ID,
BlobStoreDAO.BytesBlob.of(resourceBytes))).block();
+
+ BlobStoreDAO.InputStreamBlob readBlob =
localTestee.read(TEST_BUCKET_NAME, TEST_BLOB_ID);
+ byte[] payload = readBlob.payload().readAllBytes();
+
+ assertSoftly(softly -> {
+ softly.assertThat(payload).isEqualTo(resourceBytes);
+
softly.assertThat(readBlob.metadata().contentTransferEncoding()).contains(BlobStoreDAO.ContentTransferEncoding.ZSTD);
+
softly.assertThat(readBlob.metadata().get(ZstdBlobStoreDAO.CONTENT_ORIGINAL_SIZE))
+ .contains(new
BlobStoreDAO.BlobMetadataValue(String.valueOf(resourceBytes.length)));
+ });
+ }
+
+ @ParameterizedTest
+ @MethodSource("compressionSamples")
+ void
readReactiveShouldRoundTripCompressedResourcesWithoutCorruption(String
resourcePath) throws IOException {
+ ZstdBlobStoreDAO localTestee = new ZstdBlobStoreDAO(underlying,
+ CompressionConfiguration.builder()
+ .enabled(true)
+ .threshold(1)
+ .build(),
+ metricFactory);
+ byte[] resourceBytes = readResource(resourcePath);
+
+ Mono.from(localTestee.save(TEST_BUCKET_NAME, TEST_BLOB_ID,
BlobStoreDAO.BytesBlob.of(resourceBytes))).block();
+
+ BlobStoreDAO.InputStreamBlob readBlob =
Mono.from(localTestee.readReactive(TEST_BUCKET_NAME, TEST_BLOB_ID)).block();
+ byte[] payload = readBlob.payload().readAllBytes();
+
+ assertSoftly(softly -> {
+ softly.assertThat(payload).isEqualTo(resourceBytes);
+
softly.assertThat(readBlob.metadata().contentTransferEncoding()).contains(BlobStoreDAO.ContentTransferEncoding.ZSTD);
+
softly.assertThat(readBlob.metadata().get(ZstdBlobStoreDAO.CONTENT_ORIGINAL_SIZE))
+ .contains(new
BlobStoreDAO.BlobMetadataValue(String.valueOf(resourceBytes.length)));
+ });
+ }
+
+ private static Stream<Arguments> compressionSamples() {
+ return Stream.of(
+ Arguments.of("zstd/text.txt"),
+ Arguments.of("zstd/james-logo.jpg"),
+ Arguments.of("zstd/mail1.eml"),
+ Arguments.of("zstd/document.pdf"));
+ }
+
+ private static Stream<Arguments> blobsWithReservedCompressionMetadata() {
+ BlobStoreDAO.BlobMetadata reservedMetadata =
BlobStoreDAO.BlobMetadata.empty()
+ .withMetadata(BlobStoreDAO.ContentTransferEncoding.NAME,
BlobStoreDAO.ContentTransferEncoding.ZSTD.asValue());
+
+ return Stream.of(
+ Arguments.of(BlobStoreDAO.BytesBlob.of(ELEVEN_KILOBYTES.payload(),
reservedMetadata)),
+ Arguments.of(BlobStoreDAO.InputStreamBlob.of(new
ByteArrayInputStream(ELEVEN_KILOBYTES.payload()), reservedMetadata)),
+
Arguments.of(BlobStoreDAO.ByteSourceBlob.of(ByteSource.wrap(ELEVEN_KILOBYTES.payload()),
reservedMetadata)));
+ }
+
+ private byte[] readResource(String resourcePath) throws IOException {
+ try (InputStream inputStream =
ClassLoader.getSystemClassLoader().getResourceAsStream(resourcePath)) {
+ assertThat(inputStream).describedAs("resource %s should exist",
resourcePath).isNotNull();
+ return inputStream.readAllBytes();
+ }
+ }
+}
diff --git a/server/blob/blob-zstd/src/test/resources/zstd/document.pdf
b/server/blob/blob-zstd/src/test/resources/zstd/document.pdf
new file mode 100644
index 0000000000..4603bd39c1
Binary files /dev/null and
b/server/blob/blob-zstd/src/test/resources/zstd/document.pdf differ
diff --git a/server/blob/blob-zstd/src/test/resources/zstd/james-logo.jpg
b/server/blob/blob-zstd/src/test/resources/zstd/james-logo.jpg
new file mode 100644
index 0000000000..8299376397
Binary files /dev/null and
b/server/blob/blob-zstd/src/test/resources/zstd/james-logo.jpg differ
diff --git a/server/blob/blob-zstd/src/test/resources/zstd/mail1.eml
b/server/blob/blob-zstd/src/test/resources/zstd/mail1.eml
new file mode 100644
index 0000000000..4b89f591bb
--- /dev/null
+++ b/server/blob/blob-zstd/src/test/resources/zstd/mail1.eml
@@ -0,0 +1,69 @@
+Return-Path: <[email protected]>
+Received: from mx1.minet.net (mx1.minet.net [192.168.102.25])
+ by imap (Cyrus v2.4.16-Debian-2.4.16-4+deb7u1) with LMTPA;
+ Thu, 04 Jun 2015 11:23:39 +0200
+X-Sieve: CMU Sieve 2.4
+Received: from localhost (spam.minet.net [192.168.102.97])
+ by mx1.minet.net (Postfix) with ESMTP id 0113F385C15
+ for <[email protected]>; Thu, 4 Jun 2015 11:23:43 +0200 (CEST)
+X-Virus-Scanned: by amavisd-new using ClamAV at minet.net
+X-Spam-Flag: NO
+X-Spam-Score: -1.51
+X-Spam-Level:
+X-Spam-Status: No, score=-1.51 required=1 tests=[BAYES_00=-1.5,
+ T_RP_MATCHES_RCVD=-0.01] autolearn=ham
+Received: from mx2.minet.net ([IPv6:::ffff:192.168.102.26])
+ by localhost (spam.minet.net [::ffff:192.168.102.97]) (amavisd-new,
port 10024)
+ with ESMTP id IeILbadS9lo5 for <[email protected]>;
+ Thu, 4 Jun 2015 09:23:42 +0000 (UTC)
+Received-SPF: Pass (sender SPF authorized) identity=mailfrom;
client-ip=140.211.11.3; helo=mail.apache.org;
envelope-from=server-dev-return-56862-benwa=minet....@james.apache.org;
[email protected]
+Received: from mail.apache.org (hermes.apache.org [140.211.11.3])
+ by mx2.minet.net (Postfix) with SMTP id CDE83A1C7FC
+ for <[email protected]>; Thu, 4 Jun 2015 11:23:40 +0200 (CEST)
+Received: (qmail 37249 invoked by uid 500); 4 Jun 2015 09:23:38 -0000
+Mailing-List: contact [email protected]; run by ezmlm
+Precedence: bulk
+List-Unsubscribe: <mailto:[email protected]>
+List-Help: <mailto:[email protected]>
+List-Post: <mailto:[email protected]>
+List-Id: "James Developers List" <server-dev.james.apache.org>
+Reply-To: "James Developers List" <[email protected]>
+Delivered-To: mailing list [email protected]
+Received: (qmail 37236 invoked by uid 99); 4 Jun 2015 09:23:38 -0000
+Received: from arcas.apache.org (HELO arcas.apache.org) (140.211.11.28)
+ by apache.org (qpsmtpd/0.29) with ESMTP; Thu, 04 Jun 2015 09:23:38 +0000
+Date: Thu, 4 Jun 2015 09:23:37 +0000 (UTC)
+From: "Tellier Benoit (JIRA)" <[email protected]>
+To: "abc" <[email protected]>
+Message-ID: <[email protected]>
+In-Reply-To: <[email protected]>
+References: <[email protected]>
<JIRA.12835341.1433409792972@arcas>
+Subject: [jira] [Created] (MAILBOX-234) Convert Message into JSON
+MIME-Version: 1.0
+Content-Type: text/plain; charset=utf-8
+Content-Transfer-Encoding: 7bit
+X-JIRA-FingerPrint: 30527f35849b9dde25b450d4833f0394
+
+Tellier Benoit created MAILBOX-234:
+--------------------------------------
+
+ Summary: Convert Message into JSON
+ Key: MAILBOX-234
+ URL: https://issues.apache.org/jira/browse/MAILBOX-234
+ Project: James Mailbox
+ Issue Type: New Feature
+ Reporter: Tellier Benoit
+
+
+This would give us the ability to index e-mails in ElasticSearch.
+
+
+
+--
+This message was sent by Atlassian JIRA
+(v6.3.4#6332)
+
+---------------------------------------------------------------------
+To unsubscribe, e-mail: [email protected]
+For additional commands, e-mail: [email protected]
+
diff --git a/server/blob/blob-zstd/src/test/resources/zstd/text.txt
b/server/blob/blob-zstd/src/test/resources/zstd/text.txt
new file mode 100644
index 0000000000..8427b45a26
--- /dev/null
+++ b/server/blob/blob-zstd/src/test/resources/zstd/text.txt
@@ -0,0 +1,249 @@
+<?xml version="1.0"?>
+<!--
+ Licensed to the Apache Software Foundation (ASF) under one
+ or more contributor license agreements. See the NOTICE file
+ distributed with this work for additional information
+ regarding copyright ownership. The ASF licenses this file
+ to you under the Apache License, Version 2.0 (the
+ "License"); you may not use this file except in compliance
+ with the License. You may obtain a copy of the License at
+
+ http://www.apache.org/licenses/LICENSE-2.0
+
+ Unless required by applicable law or agreed to in writing,
+ software distributed under the License is distributed on an
+ "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ KIND, either express or implied. See the License for the
+ specific language governing permissions and limitations
+ under the License.
+-->
+<document>
+
+ <properties>
+ <title>Apache James Server 3 - Blob Configuration</title>
+ </properties>
+
+ <body>
+
+ <section name="BlobStore Configuration">
+ <p>
+ BlobStore is the dedicated component to store blobs,
non-indexable content.
+ James uses the BlobStore for storing blobs which are usually
mail contents, attachments, deleted mails...
+ You can choose the underlying implementation of BlobStore to
fit with your James setup.
+ It could be the implementation on top of Cassandra or file
storage service like Openstack Swift, AWS S3.
+
+ This configuration is only applicable with Guice products.
+ </p>
+ <p>
+ Consult <a
href="https://github.com/apache/james-project/blob/master/server/apps/distributed-app/sample-configuration/blob.properties">blob.properties</a>
+ in GIT to get some examples and hints.
+ </p>
+
+ <p>
+ Blobs storing configuration
+ </p>
+ <dl>
+ <dt><strong>implementation</strong></dt>
+ <dd>cassandra: use cassandra based BlobStore</dd>
+ <dd>s3: use AWS S3 based BlobStore</dd>
+ <dd>file: (experimental) use directly the file system. Useful
for legacy architecture based on shared ISCI SANs and/or
+ distributed file system with no object store available.</dd>
+ <dd><b>WARNING</b>: JAMES-3591 Cassandra is not made to store
large binary content, its use will be suboptimal compared to
+ alternatives (namely S3 compatible BlobStores backed by
for instance S3, MinIO or Ozone)
+ </dd>
+ <dd>
+ The generated startup warning log can be deactivated via
the <code>cassandra.blob.store.disable.startup.warning</code> environment
+ variable being positioned to <code>false</code>.
+ </dd>
+ <dt><strong>deduplication.enable</strong></dt>
+ <dd>Mandatory. Supported value: true and false.</dd>
+ <dd>If you choose to enable deduplication, the mails with the
same content will be stored only once.</dd>
+ <dd>Warning: Once this feature is enabled, there is no turning
back as turning it off will lead to the deletion of all</dd>
+ <dd>the mails sharing the same content once one is
deleted.</dd>
+ <dd>This feature also requires a garbage collector mechanism
to effectively drop blobs. A first implementation
+ based on bloom filters can be used and triggered using the
WebAdmin REST API. See
+ <a
href="manage-webadmin.html#Running_blob_garbage_collection">Running blob
garbage collection</a>.
+ In order to avoid concurrency issues upon garbage
collection, we slice the blobs in generation, the two more recent
+ generations are not garbage collected.</dd>
+ <dd><strong>deduplication.gc.generation.duration</strong></dd>
+ <dd>Allow controlling the duration of one generation. Longer
implies better deduplication
+ but deleted blobs will live longer. Duration, defaults on
30 days, the default unit is in days.</dd>
+ <dd><strong>deduplication.gc.generation.family</strong></dd>
+ <dd>Every time the duration is changed, this integer counter
must be incremented to avoid
+ conflicts. Defaults to 1.</dd>
+ <dd>Upgrade note: If you are upgrading from James 3.5 or
older, the deduplication was enabled.</dd>
+ </dl>
+
+
+ <subsection name="Cassandra BlobStore Cache">
+ <p>A Cassandra cache can be enabled to reduce latency when
reading small blobs frequently.
+ A dedicated keyspace with a replication factor of one is
then used.
+ Cache eviction policy is TTL based.
+ Only blobs below a given threshold will be stored.
+ To be noted that blobs are stored within a single
Cassandra row, hence a low threshold should be used.
+ </p>
+ <dl>
+ <dt><strong>cache.enable</strong></dt>
+ <dd>DEFAULT: false, optional, must be a boolean. Whether
the cache should be enabled.</dd>
+ </dl>
+ <dl>
+ <dt><strong>cache.cassandra.ttl</strong></dt>
+ <dd>DEFAULT: 7 days, optional, must be a duration. Cache
eviction policy is TTL based. </dd>
+ </dl>
+ <dl>
+ <dt><strong>cache.sizeThresholdInBytes</strong></dt>
+ <dd>DEFAULT: 8192, optional, must be a positive integer.
Unit: bytes.
+ Supported units: bytes, Kib, MiB, GiB, TiB
+ Maximum size of stored objects expressed in bytes.</dd>
+ </dl>
+ </subsection>
+ <subsection name="Encryption choice">
+ <p>
+ Data can be optionally encrypted with a symmetric key
using AES before being stored in the blobStore. As many user relies
+ on third party for object storage, a compromised third
party will not escalate to a data disclosure. Of course, a
+ performance price have to be paid, as encryption takes
resources.
+ </p>
+ <dl>
+ <dt><strong>encryption.aes.enable</strong></dt>
+ <dd>Optional boolean, defaults to false</dd>
+ </dl>
+ <p>If AES encryption is enabled, then the following properties
MUST be present:</p>
+ <dl>
+ <dt><strong>encryption.aes.password</strong></dt>
+ <dd>String</dd>
+ </dl>
+ <dl>
+ <dt><strong>encryption.aes.salt</strong></dt>
+ <dd>Hexadecimal string.</dd>
+ </dl>
+ <p>If AES encryption is enabled, then the following properties
COULD be present:</p>
+ <dl>
+
<dt><strong>encryption.aes.private.key.algorithm</strong></dt>
+ <dd>String, defaulting to PBKDF2WithHmacSHA512. Previously
was
+ PBKDF2WithHmacSHA1.</dd>
+ </dl>
+
+ <p><b>WARNING:</b> Once chosen this choice can not be
reverted, all the data is either clear or encrypted. Mixed encryption
+ is not supported.</p>
+ <p>
+ Here is an example of how you can generate the above
values (be mindful to customize the byte lengths in order to add
+ enough entropy.
+ </p>
+ <pre>
+ <code>
+# Password generation
+openssl rand -base64 64
+
+# Salt generation
+generate salt with : openssl rand -hex 16
+ </code>
+ </pre>
+ </subsection>
+ <subsection name="ObjectStorage BlobStore Buckets Configuration">
+ <dl>
+ <dt><strong>objectstorage.bucketPrefix</strong></dt>
+ <dd>
+ Bucket is an concept in James and similar to
Containers in Swift or Buckets in AWS S3.
+ BucketPrefix is the prefix of bucket names in James
BlobStore
+ </dd>
+
+ <dt><strong>objectstorage.namespace</strong></dt>
+ <dd>
+ BlobStore default bucket name. Most of blobs storing
in BlobStore are inside the default bucket.
+ Unless a special case like storing blobs of deleted
messages.
+ </dd>
+ </dl>
+ </subsection>
+ <subsection name="ObjectStorage Underlying Service Configuration">
+ <subsection name="ObjectStorage AWS S3 Configuration">
+ <dl>
+ <dt><strong>objectstorage.s3.endPoint</strong></dt>
+ <dd>S3 service endpoint</dd>
+
+ <dt><strong>objectstorage.s3.region</strong></dt>
+ <dd>S3 region</dd>
+
+ <dt><strong>objectstorage.s3.accessKeyId</strong></dt>
+ <dd><a
href="https://docs.aws.amazon.com/general/latest/gr/aws-sec-cred-types.html#access-keys-and-secret-access-keys">S3
access key id</a></dd>
+
+ <dt><strong>objectstorage.s3.secretKey</strong></dt>
+ <dd><a
href="https://docs.aws.amazon.com/general/latest/gr/aws-sec-cred-types.html#access-keys-and-secret-access-keys">S3
access key secret</a></dd>
+
+
<dt><strong>objectstorage.s3.http.concurrency</strong></dt>
+ <dd>Allow setting the number of concurrent HTTP
requests allowed by the Netty driver.</dd>
+
+
<dt><strong>objectstorage.s3.truststore.path</strong></dt>
+ <dd><i>optional:</i> Verify the S3 server certificate
against this trust store file.</dd>
+
+
<dt><strong>objectstorage.s3.truststore.type</strong></dt>
+ <dd><i>optional:</i> Specify the type of the trust
store, e.g. JKS, PKCS12</dd>
+
+
<dt><strong>objectstorage.s3.truststore.secret</strong></dt>
+ <dd><i>optional:</i> Use this secret/password to
access the trust store; default none</dd>
+
+
<dt><strong>objectstorage.s3.truststore.algorithm</strong></dt>
+ <dd><i>optional:</i> Use this specific trust store
algorithm; default SunX509</dd>
+
+ <dt><strong>objectstorage.s3.trustall</strong></dt>
+ <dd><i>optional:</i> boolean. Defaults to false.
Cannot be set to true with other trustore options. Wether James should validate
+ S3 endpoint SSL certificates.</dd>
+
+ <dt><strong>objectstorage.s3.read.timeout</strong></dt>
+ <dd><i>optional:</i> HTTP read timeout. duration,
default value being second. Leaving it empty relies on S3 driver defaults.</dd>
+
+
<dt><strong>objectstorage.s3.write.timeout</strong></dt>
+ <dd><i>optional:</i> HTTP write timeout. duration,
default value being second. Leaving it empty relies on S3 driver defaults.</dd>
+
+
<dt><strong>objectstorage.s3.connection.timeout</strong></dt>
+ <dd><i>optional:</i> HTTP connection timeout.
duration, default value being second. Leaving it empty relies on S3 driver
defaults.</dd>
+
+
<dt><strong>objectstorage.s3.in.read.limit</strong></dt>
+ <dd><i>optional:</i> Object read in memory will be
rejected if they exceed the size limit exposed here. Size, exemple `100M`.
+ Supported units: K, M, G, defaults to B if no unit
is specified. If unspecified, big object won't be prevented
+ from being loaded in memory. This settings
complements protocol limits.</dd>
+
+
<dt><strong>objectstorage.s3.upload.retry.maxAttempts</strong></dt>
+ <dd><i>optional:</i> Integer. Default is zero. This
property specifies the maximum number of retry attempts allowed for failed
upload operations.</dd>
+
+
<dt><strong>objectstorage.s3.upload.retry.backoffDurationMillis</strong></dt>
+ <dd><i>optional:</i> Long (Milliseconds). Default is
10 (miliseconds).
+ Only takes effect when the
"objectstorage.s3.upload.retry.maxAttempts" property is declared.
+ This property determines the duration (in
milliseconds) to wait between retry attempts for failed upload operations.
+ This delay is known as backoff. The jitter factor
is 0.5</dd>
+
+ </dl>
+ </subsection>
+ </subsection>
+ <subsection name="SSE-C Configuration">
+ <dl>
+ <dt><strong>encryption.s3.sse.c.enable</strong></dt>
+ <dd>
+ optional: Boolean. Default is false.
+ Controls whether to use Server-Side Encryption with
Customer-Provided Keys (SSE-C) for S3 blobs
+ </dd>
+
+
<dt><strong>encryption.s3.sse.c.master.key.algorithm</strong></dt>
+ <dd>
+ String. Required if `encryption.s3.sse.c.enable` is
true.
+ The algorithm used to derive the master key from the
provided password. Eg: AES256
+ </dd>
+
+
<dt><strong>encryption.s3.sse.c.master.key.password</strong></dt>
+ <dd>
+ String. Required if `encryption.s3.sse.c.enable` is
true.
+ The password used to generate the customer key.
+ </dd>
+
+
<dt><strong>encryption.s3.sse.c.master.key.salt</strong></dt>
+ <dd>
+ String. Required if `encryption.s3.sse.c.enable` is
true.
+ The salt used to generate the customer key.
+ </dd>
+
+ </dl>
+ </subsection>
+ </section>
+
+ </body>
+
+</document>
diff --git a/server/blob/pom.xml b/server/blob/pom.xml
index 1644a0e15a..508296036d 100644
--- a/server/blob/pom.xml
+++ b/server/blob/pom.xml
@@ -44,6 +44,7 @@
<module>blob-postgres</module>
<module>blob-s3</module>
<module>blob-storage-strategy</module>
+ <module>blob-zstd</module>
<module>mail-store</module>
</modules>
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]