This is an automated email from the ASF dual-hosted git repository. quantranhong1999 pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/james-project.git
commit 5e2e062384712f658bf830915afdc4103d8292e3 Author: Quan Tran <[email protected]> AuthorDate: Tue Apr 21 17:04:44 2026 +0700 JAMES-4182 Guice binding + integration tests + documentation for ZstdBlobStoreDAO --- .../servers/partials/configure/blobstore.adoc | 18 +++ server/apps/distributed-app/pom.xml | 5 + .../sample-configuration/blob.properties | 18 +++ .../james/WithEncryptedAndZstdBlobStoreTest.java | 137 +++++++++++++++++++++ .../org/apache/james/WithZstdBlobStoreTest.java | 115 +++++++++++++++++ .../org/apache/james/JpaToPgCoreDataMigration.java | 4 +- server/container/guice/distributed/pom.xml | 4 + .../modules/blobstore/BlobStoreConfiguration.java | 64 +++++++++- .../modules/blobstore/BlobStoreModulesChooser.java | 61 +++++++-- .../blobstore/BlobStoreConfigurationTest.java | 71 +++++++++++ .../blobstore/BlobStoreModulesChooserTest.java | 26 ++++ src/site/xdoc/server/config-blobstore.xml | 24 ++++ 12 files changed, 532 insertions(+), 15 deletions(-) diff --git a/docs/modules/servers/partials/configure/blobstore.adoc b/docs/modules/servers/partials/configure/blobstore.adoc index 223035ebec..a50c0f7fbb 100644 --- a/docs/modules/servers/partials/configure/blobstore.adoc +++ b/docs/modules/servers/partials/configure/blobstore.adoc @@ -67,6 +67,24 @@ james.blob.aes.file.threshold.decrypt=256K james.blob.aes.blob.max.size=100M .... +=== Compression choice + +Data can be optionally compressed using Zstd before being stored in the blobStore. This especially benefits +text-heavy mail contents and attachments when using S3 compatible object stores where storage costs matter. + +*compression.enabled* : Optional boolean, defaults to false. + +*compression.threshold* : Optional size, defaults to 16K. Compression is only attempted for blobs whose original +size is greater than or equal to this threshold. Supported units: no suffix for bytes, or B, K, M, G. + +*compression.min-ratio* : Optional float between 0 and 1, defaults to 1. + +- James keeps the compressed payload only when `compressedSize / originalSize <= compression.min-ratio` +- `compression.min-ratio=0` enables a "decompress-only" mode: James can still read previously compressed blobs but +will not compress new writes + +If both compression and AES encryption are enabled, James compresses first and encrypts afterwards. + === Object storage configuration ==== AWS S3 Configuration diff --git a/server/apps/distributed-app/pom.xml b/server/apps/distributed-app/pom.xml index 4692033189..cca08e9bb6 100644 --- a/server/apps/distributed-app/pom.xml +++ b/server/apps/distributed-app/pom.xml @@ -118,6 +118,11 @@ <type>test-jar</type> <scope>test</scope> </dependency> + <dependency> + <groupId>${james.groupId}</groupId> + <artifactId>blob-zstd</artifactId> + <scope>test</scope> + </dependency> <dependency> <groupId>${james.groupId}</groupId> <artifactId>event-bus-distributed</artifactId> diff --git a/server/apps/distributed-app/sample-configuration/blob.properties b/server/apps/distributed-app/sample-configuration/blob.properties index 7d16de865b..bc956e55b6 100644 --- a/server/apps/distributed-app/sample-configuration/blob.properties +++ b/server/apps/distributed-app/sample-configuration/blob.properties @@ -40,6 +40,24 @@ encryption.aes.enable=false # Optional, defaults to PBKDF2WithHmacSHA512 #encryption.aes.private.key.algorithm=PBKDF2WithHmacSHA512 +# ========================================= Compression ======================================== +# If enabled, James will compress blobs before storing them whenever the blob is large enough +# and the resulting compressed payload satisfies the configured compression ratio. +# Optional, Allowed values are: true, false, defaults to false +compression.enabled=false + +# Minimum blob size before compression is attempted. +# Optional, defaults to 16K. +# Supported units: no suffix for bytes, or B, K, M, G +#compression.threshold=16K + +# Keep compressed payloads only when compressedSize / originalSize <= compression.min-ratio. +# Optional float between 0 and 1, defaults to 1. +# Use 0 for "decompress-only" mode: James will still read previously compressed blobs but will not compress new writes. +#compression.min-ratio=1 + +# If both compression and AES encryption are enabled, James compresses first and encrypts afterwards. + # ========================================= Cassandra BlobStore Cache ====================================== # A cassandra cache can be enabled to reduce latency when reading small blobs frequently # A dedicated keyspace with a replication factor of one is then used diff --git a/server/apps/distributed-app/src/test/java/org/apache/james/WithEncryptedAndZstdBlobStoreTest.java b/server/apps/distributed-app/src/test/java/org/apache/james/WithEncryptedAndZstdBlobStoreTest.java new file mode 100644 index 0000000000..908338da07 --- /dev/null +++ b/server/apps/distributed-app/src/test/java/org/apache/james/WithEncryptedAndZstdBlobStoreTest.java @@ -0,0 +1,137 @@ +/**************************************************************** + * Licensed to the Apache Software Foundation (ASF) under one * + * or more contributor license agreements. See the NOTICE file * + * distributed with this work for additional information * + * regarding copyright ownership. The ASF licenses this file * + * to you under the Apache License, Version 2.0 (the * + * "License"); you may not use this file except in compliance * + * with the License. You may obtain a copy of the License at * + * * + * http://www.apache.org/licenses/LICENSE-2.0 * + * * + * Unless required by applicable law or agreed to in writing, * + * software distributed under the License is distributed on an * + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY * + * KIND, either express or implied. See the License for the * + * specific language governing permissions and limitations * + * under the License. * + ****************************************************************/ + +package org.apache.james; + +import static org.apache.james.blob.api.BlobStoreDAO.ContentTransferEncoding.ZSTD; +import static org.assertj.core.api.SoftAssertions.assertSoftly; + +import java.nio.charset.StandardCharsets; + +import jakarta.inject.Inject; +import jakarta.inject.Named; + +import org.apache.james.blob.aes.CryptoConfig; +import org.apache.james.blob.api.BlobId; +import org.apache.james.blob.api.BlobStore; +import org.apache.james.blob.api.BlobStoreDAO; +import org.apache.james.blob.zstd.CompressionConfiguration; +import org.apache.james.blob.zstd.ZstdBlobStoreDAO; +import org.apache.james.modules.AwsS3BlobStoreExtension; +import org.apache.james.modules.RabbitMQExtension; +import org.apache.james.modules.TestJMAPServerModule; +import org.apache.james.modules.blobstore.BlobStoreConfiguration; +import org.apache.james.utils.GuiceProbe; +import org.junit.jupiter.api.Test; +import org.junit.jupiter.api.extension.RegisterExtension; + +import com.github.luben.zstd.Zstd; +import com.google.inject.multibindings.Multibinder; + +import reactor.core.publisher.Mono; + +public class WithEncryptedAndZstdBlobStoreTest implements MailsShouldBeWellReceivedConcreteContract { + private record BlobSnapshot(byte[] originalPayload, byte[] readPayload, BlobStoreDAO.BytesBlob encryptionLayerBlob, + BlobStoreDAO.BytesBlob rawS3StoredBlob) { + } + + public static class EncryptedZstdBlobStoreProbe implements GuiceProbe { + private final BlobStore blobStore; + private final BlobStoreDAO encryptionBlobStoreDAO; + private final BlobStoreDAO rawBlobStoreDAO; + + @Inject + public EncryptedZstdBlobStoreProbe(BlobStore blobStore, @Named("encryption") BlobStoreDAO encryptionBlobStoreDAO, + @Named("raw") BlobStoreDAO rawBlobStoreDAO) { + this.blobStore = blobStore; + this.encryptionBlobStoreDAO = encryptionBlobStoreDAO; + this.rawBlobStoreDAO = rawBlobStoreDAO; + } + + public BlobSnapshot saveAndRead(String payload) { + byte[] originalPayload = payload.getBytes(StandardCharsets.UTF_8); + BlobId blobId = Mono.from(blobStore.save(blobStore.getDefaultBucketName(), originalPayload, BlobStore.StoragePolicy.LOW_COST)) + .block(); + byte[] readPayload = Mono.from(blobStore.readBytes(blobStore.getDefaultBucketName(), blobId)) + .block(); + BlobStoreDAO.BytesBlob encryptionLayerBlob = Mono.from(encryptionBlobStoreDAO.readBytes(blobStore.getDefaultBucketName(), blobId)) + .block(); + BlobStoreDAO.BytesBlob rawS3StoredBlob = Mono.from(rawBlobStoreDAO.readBytes(blobStore.getDefaultBucketName(), blobId)) + .block(); + + return new BlobSnapshot(originalPayload, readPayload, encryptionLayerBlob, rawS3StoredBlob); + } + } + + private static final String COMPRESSIBLE_PAYLOAD = "James encrypted zstd blob store integration payload.\n".repeat(2048); + + @RegisterExtension + static JamesServerExtension jamesServerExtension = new JamesServerBuilder<CassandraRabbitMQJamesConfiguration>(tmpDir -> + CassandraRabbitMQJamesConfiguration.builder() + .workingDirectory(tmpDir) + .configurationFromClasspath() + .blobStore(BlobStoreConfiguration.s3() + .disableCache() + .passthrough() + .withCryptoConfig(CryptoConfig.builder() + .password("myPass".toCharArray()) + .salt("73616c7479") + .build()) + .compressionConfig(CompressionConfiguration.builder() + .enabled(true) + .threshold(1) + .build())) + .searchConfiguration(SearchConfiguration.openSearch()) + .build()) + .extension(new DockerOpenSearchExtension()) + .extension(new CassandraExtension()) + .extension(new RabbitMQExtension()) + .server(configuration -> CassandraRabbitMQJamesServerMain.createServer(configuration) + .overrideWith(new TestJMAPServerModule()) + .overrideWith(binder -> Multibinder.newSetBinder(binder, GuiceProbe.class) + .addBinding() + .to(EncryptedZstdBlobStoreProbe.class))) + .extension(new AwsS3BlobStoreExtension()) + .lifeCycle(JamesServerExtension.Lifecycle.PER_TEST) + .build(); + + @Test + void blobStoreShouldCompressBeforeEncryptingBlobs(GuiceJamesServer server) { + BlobSnapshot blobSnapshot = server.getProbe(EncryptedZstdBlobStoreProbe.class).saveAndRead(COMPRESSIBLE_PAYLOAD); + + assertSoftly(softly -> { + // read should round trip to the original saving payload + softly.assertThat(blobSnapshot.readPayload()).isEqualTo(blobSnapshot.originalPayload()); + + // The intermediate encryption layer returns bytes that were zstd-compressed: + // they differ from the original payload, keep zstd metadata, and round-trip through zstd decompression back to the original payload. + softly.assertThat(blobSnapshot.encryptionLayerBlob().payload()).isNotEqualTo(blobSnapshot.originalPayload()); + softly.assertThat(blobSnapshot.encryptionLayerBlob().metadata().contentTransferEncoding()).contains(ZSTD); + softly.assertThat(Zstd.decompress(blobSnapshot.encryptionLayerBlob().payload(), blobSnapshot.originalPayload().length)) + .isEqualTo(blobSnapshot.originalPayload()); + + // Raw S3 storage must therefore be the encrypted form of those compressed bytes. + softly.assertThat(blobSnapshot.rawS3StoredBlob().payload()).isNotEqualTo(blobSnapshot.originalPayload()); + softly.assertThat(blobSnapshot.rawS3StoredBlob().payload()).isNotEqualTo(blobSnapshot.encryptionLayerBlob().payload()); + softly.assertThat(blobSnapshot.rawS3StoredBlob().metadata().contentTransferEncoding()).contains(ZSTD); + softly.assertThat(blobSnapshot.rawS3StoredBlob().metadata().get(ZstdBlobStoreDAO.CONTENT_ORIGINAL_SIZE)) + .contains(new BlobStoreDAO.BlobMetadataValue(String.valueOf(blobSnapshot.originalPayload().length))); + }); + } +} diff --git a/server/apps/distributed-app/src/test/java/org/apache/james/WithZstdBlobStoreTest.java b/server/apps/distributed-app/src/test/java/org/apache/james/WithZstdBlobStoreTest.java new file mode 100644 index 0000000000..1a98eac5da --- /dev/null +++ b/server/apps/distributed-app/src/test/java/org/apache/james/WithZstdBlobStoreTest.java @@ -0,0 +1,115 @@ +/**************************************************************** + * Licensed to the Apache Software Foundation (ASF) under one * + * or more contributor license agreements. See the NOTICE file * + * distributed with this work for additional information * + * regarding copyright ownership. The ASF licenses this file * + * to you under the Apache License, Version 2.0 (the * + * "License"); you may not use this file except in compliance * + * with the License. You may obtain a copy of the License at * + * * + * http://www.apache.org/licenses/LICENSE-2.0 * + * * + * Unless required by applicable law or agreed to in writing, * + * software distributed under the License is distributed on an * + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY * + * KIND, either express or implied. See the License for the * + * specific language governing permissions and limitations * + * under the License. * + ****************************************************************/ + +package org.apache.james; + +import static org.apache.james.blob.api.BlobStoreDAO.ContentTransferEncoding.ZSTD; +import static org.assertj.core.api.SoftAssertions.assertSoftly; + +import java.nio.charset.StandardCharsets; + +import jakarta.inject.Inject; +import jakarta.inject.Named; + +import org.apache.james.blob.api.BlobId; +import org.apache.james.blob.api.BlobStore; +import org.apache.james.blob.api.BlobStoreDAO; +import org.apache.james.blob.zstd.CompressionConfiguration; +import org.apache.james.blob.zstd.ZstdBlobStoreDAO; +import org.apache.james.modules.AwsS3BlobStoreExtension; +import org.apache.james.modules.RabbitMQExtension; +import org.apache.james.modules.TestJMAPServerModule; +import org.apache.james.modules.blobstore.BlobStoreConfiguration; +import org.apache.james.utils.GuiceProbe; +import org.junit.jupiter.api.Test; +import org.junit.jupiter.api.extension.RegisterExtension; + +import com.google.inject.multibindings.Multibinder; + +import reactor.core.publisher.Mono; + +public class WithZstdBlobStoreTest { + private record BlobSnapshot(byte[] originalPayload, byte[] readPayload, BlobStoreDAO.BytesBlob storedBlob) { + } + + public static class ZstdBlobStoreProbe implements GuiceProbe { + private final BlobStore blobStore; + private final BlobStoreDAO rawBlobStoreDAO; + + @Inject + public ZstdBlobStoreProbe(BlobStore blobStore, @Named("raw") BlobStoreDAO rawBlobStoreDAO) { + this.blobStore = blobStore; + this.rawBlobStoreDAO = rawBlobStoreDAO; + } + + public BlobSnapshot saveAndRead(String payload) { + byte[] originalPayload = payload.getBytes(StandardCharsets.UTF_8); + BlobId blobId = Mono.from(blobStore.save(blobStore.getDefaultBucketName(), originalPayload, BlobStore.StoragePolicy.LOW_COST)) + .block(); + byte[] readPayload = Mono.from(blobStore.readBytes(blobStore.getDefaultBucketName(), blobId)) + .block(); + BlobStoreDAO.BytesBlob storedBlob = Mono.from(rawBlobStoreDAO.readBytes(blobStore.getDefaultBucketName(), blobId)) + .block(); + + return new BlobSnapshot(originalPayload, readPayload, storedBlob); + } + } + + private static final String COMPRESSIBLE_PAYLOAD = "James zstd blob store integration payload.\n".repeat(2048); + + @RegisterExtension + static JamesServerExtension jamesServerExtension = new JamesServerBuilder<CassandraRabbitMQJamesConfiguration>(tmpDir -> + CassandraRabbitMQJamesConfiguration.builder() + .workingDirectory(tmpDir) + .configurationFromClasspath() + .blobStore(BlobStoreConfiguration.s3() + .disableCache() + .passthrough() + .withNoCryptoConfig() + .compressionConfig(CompressionConfiguration.builder() + .enabled(true) + .threshold(1) + .build())) + .searchConfiguration(SearchConfiguration.openSearch()) + .build()) + .extension(new DockerOpenSearchExtension()) + .extension(new CassandraExtension()) + .extension(new RabbitMQExtension()) + .server(configuration -> CassandraRabbitMQJamesServerMain.createServer(configuration) + .overrideWith(new TestJMAPServerModule()) + .overrideWith(binder -> Multibinder.newSetBinder(binder, GuiceProbe.class) + .addBinding() + .to(ZstdBlobStoreProbe.class))) + .extension(new AwsS3BlobStoreExtension()) + .lifeCycle(JamesServerExtension.Lifecycle.PER_TEST) + .build(); + + @Test + void blobStoreShouldPersistCompressedBlobsWithZstdMetadata(GuiceJamesServer server) { + BlobSnapshot blobSnapshot = server.getProbe(ZstdBlobStoreProbe.class).saveAndRead(COMPRESSIBLE_PAYLOAD); + + assertSoftly(softly -> { + softly.assertThat(blobSnapshot.readPayload()).isEqualTo(blobSnapshot.originalPayload()); + softly.assertThat(blobSnapshot.storedBlob().payload().length).isLessThan(blobSnapshot.originalPayload().length); + softly.assertThat(blobSnapshot.storedBlob().metadata().contentTransferEncoding()).contains(ZSTD); + softly.assertThat(blobSnapshot.storedBlob().metadata().get(ZstdBlobStoreDAO.CONTENT_ORIGINAL_SIZE)) + .contains(new BlobStoreDAO.BlobMetadataValue(String.valueOf(blobSnapshot.originalPayload().length))); + }); + } +} diff --git a/server/apps/migration/core-data-jpa-to-pg/src/main/java/org/apache/james/JpaToPgCoreDataMigration.java b/server/apps/migration/core-data-jpa-to-pg/src/main/java/org/apache/james/JpaToPgCoreDataMigration.java index c7fb918688..84c9008dc8 100644 --- a/server/apps/migration/core-data-jpa-to-pg/src/main/java/org/apache/james/JpaToPgCoreDataMigration.java +++ b/server/apps/migration/core-data-jpa-to-pg/src/main/java/org/apache/james/JpaToPgCoreDataMigration.java @@ -20,6 +20,7 @@ package org.apache.james; import static org.apache.james.modules.blobstore.BlobStoreModulesChooser.chooseBlobStoreDAOModule; +import static org.apache.james.modules.blobstore.BlobStoreModulesChooser.chooseCompressionModule; import static org.apache.james.modules.blobstore.BlobStoreModulesChooser.chooseEncryptionModule; import static org.apache.james.modules.blobstore.BlobStoreModulesChooser.chooseStoragePolicyModule; @@ -160,8 +161,9 @@ public class JpaToPgCoreDataMigration { public static List<Module> chooseModules(BlobStoreConfiguration choosingConfiguration) { return ImmutableList.<Module>builder() - .add(chooseEncryptionModule(choosingConfiguration.getCryptoConfig())) .add(chooseBlobStoreDAOModule(choosingConfiguration.getImplementation())) + .add(chooseEncryptionModule(choosingConfiguration.getCryptoConfig())) + .add(chooseCompressionModule(choosingConfiguration.getCompressionConfiguration())) .addAll(chooseStoragePolicyModule(choosingConfiguration.storageStrategy())) .add(binder -> binder.bind(BlobStoreConfiguration.class).toInstance(choosingConfiguration)) .build(); diff --git a/server/container/guice/distributed/pom.xml b/server/container/guice/distributed/pom.xml index 2e7c963171..5fb89fefa0 100644 --- a/server/container/guice/distributed/pom.xml +++ b/server/container/guice/distributed/pom.xml @@ -71,6 +71,10 @@ <groupId>${james.groupId}</groupId> <artifactId>blob-s3-guice</artifactId> </dependency> + <dependency> + <groupId>${james.groupId}</groupId> + <artifactId>blob-zstd</artifactId> + </dependency> <dependency> <groupId>${james.groupId}</groupId> <artifactId>event-bus-distributed</artifactId> diff --git a/server/container/guice/distributed/src/main/java/org/apache/james/modules/blobstore/BlobStoreConfiguration.java b/server/container/guice/distributed/src/main/java/org/apache/james/modules/blobstore/BlobStoreConfiguration.java index 84d39ca0d5..b198c068e0 100644 --- a/server/container/guice/distributed/src/main/java/org/apache/james/modules/blobstore/BlobStoreConfiguration.java +++ b/server/container/guice/distributed/src/main/java/org/apache/james/modules/blobstore/BlobStoreConfiguration.java @@ -29,9 +29,11 @@ import org.apache.commons.configuration2.Configuration; import org.apache.commons.configuration2.ex.ConfigurationException; import org.apache.commons.lang3.StringUtils; import org.apache.james.blob.aes.CryptoConfig; +import org.apache.james.blob.zstd.CompressionConfiguration; import org.apache.james.modules.mailbox.ConfigurationComponent; import org.apache.james.server.blob.deduplication.StorageStrategy; import org.apache.james.server.core.filesystem.FileSystemImpl; +import org.apache.james.util.Size; import org.apache.james.utils.PropertiesProvider; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -95,15 +97,32 @@ public class BlobStoreConfiguration { public interface RequireCryptoConfig { BlobStoreConfiguration cryptoConfig(Optional<CryptoConfig> cryptoConfig); + default RequireCompressionConfig withNoCryptoConfig() { + return compressionConfiguration -> noCryptoConfig().compressionConfig(compressionConfiguration); + } + default BlobStoreConfiguration noCryptoConfig() { return cryptoConfig(Optional.empty()); } + default RequireCompressionConfig withCryptoConfig(CryptoConfig cryptoConfig) { + return compressionConfiguration -> cryptoConfig(cryptoConfig).compressionConfig(compressionConfiguration); + } + default BlobStoreConfiguration cryptoConfig(CryptoConfig cryptoConfig) { return cryptoConfig(Optional.of(cryptoConfig)); } } + @FunctionalInterface + public interface RequireCompressionConfig { + BlobStoreConfiguration compressionConfig(CompressionConfiguration compressionConfiguration); + + default BlobStoreConfiguration noCompressionConfig() { + return compressionConfig(CompressionConfiguration.disabled()); + } + } + public static RequireImplementation builder() { return implementation -> enableCache -> storageStrategy -> cryptoConfig -> new BlobStoreConfiguration(implementation, enableCache, storageStrategy, cryptoConfig); @@ -145,6 +164,9 @@ public class BlobStoreConfiguration { static final String ENCRYPTION_ENABLE_PROPERTY = "encryption.aes.enable"; static final String ENCRYPTION_PASSWORD_PROPERTY = "encryption.aes.password"; static final String ENCRYPTION_SALT_PROPERTY = "encryption.aes.salt"; + static final String COMPRESSION_ENABLE_PROPERTY = "compression.enabled"; + static final String COMPRESSION_THRESHOLD_PROPERTY = "compression.threshold"; + static final String COMPRESSION_MIN_RATIO_PROPERTY = "compression.min-ratio"; static final boolean CACHE_ENABLED = true; static final String DEDUPLICATION_ENABLE_PROPERTY = "deduplication.enable"; @@ -189,19 +211,22 @@ public class BlobStoreConfiguration { "the mails sharing the same content once one is deleted.\n" + "Upgrade note: If you are upgrading from James 3.5 or older, the deduplication was enabled.")); Optional<CryptoConfig> cryptoConfig = parseCryptoConfig(configuration); + CompressionConfiguration compressionConfiguration = parseCompressionConfiguration(configuration); if (deduplicationEnabled) { return builder() .implementation(blobStoreImplName) .enableCache(cacheEnabled) .deduplication() - .cryptoConfig(cryptoConfig); + .cryptoConfig(cryptoConfig) + .compressionConfig(compressionConfiguration); } else { return builder() .implementation(blobStoreImplName) .enableCache(cacheEnabled) .passthrough() - .cryptoConfig(cryptoConfig); + .cryptoConfig(cryptoConfig) + .compressionConfig(compressionConfiguration); } } @@ -216,6 +241,20 @@ public class BlobStoreConfiguration { return Optional.empty(); } + private static CompressionConfiguration parseCompressionConfiguration(Configuration configuration) { + return CompressionConfiguration.builder() + .enabled(configuration.getBoolean(COMPRESSION_ENABLE_PROPERTY, CompressionConfiguration.DISABLED)) + .threshold(Optional.ofNullable(configuration.getString(COMPRESSION_THRESHOLD_PROPERTY, null)) + .map(StringUtils::trim) + .filter(StringUtils::isNotBlank) + .map(StringUtils::deleteWhitespace) + .map(Size::parse) + .map(Size::asBytes) + .orElse(CompressionConfiguration.DEFAULT_THRESHOLD)) + .minRatio(configuration.getFloat(COMPRESSION_MIN_RATIO_PROPERTY, CompressionConfiguration.DEFAULT_MIN_RATIO)) + .build(); + } + @VisibleForTesting public static RequireStoringStrategy cassandra() { return builder() @@ -231,12 +270,19 @@ public class BlobStoreConfiguration { private final boolean cacheEnabled; private final StorageStrategy storageStrategy; private final Optional<CryptoConfig> cryptoConfig; + private final CompressionConfiguration compressionConfiguration; BlobStoreConfiguration(BlobStoreImplName implementation, boolean cacheEnabled, StorageStrategy storageStrategy, Optional<CryptoConfig> cryptoConfig) { + this(implementation, cacheEnabled, storageStrategy, cryptoConfig, CompressionConfiguration.disabled()); + } + + BlobStoreConfiguration(BlobStoreImplName implementation, boolean cacheEnabled, StorageStrategy storageStrategy, + Optional<CryptoConfig> cryptoConfig, CompressionConfiguration compressionConfiguration) { this.implementation = implementation; this.cacheEnabled = cacheEnabled; this.storageStrategy = storageStrategy; this.cryptoConfig = cryptoConfig; + this.compressionConfiguration = compressionConfiguration; } public boolean cacheEnabled() { @@ -255,6 +301,14 @@ public class BlobStoreConfiguration { return cryptoConfig; } + public CompressionConfiguration getCompressionConfiguration() { + return compressionConfiguration; + } + + public BlobStoreConfiguration compressionConfig(CompressionConfiguration compressionConfiguration) { + return new BlobStoreConfiguration(implementation, cacheEnabled, storageStrategy, cryptoConfig, compressionConfiguration); + } + @Override public final boolean equals(Object o) { if (o instanceof BlobStoreConfiguration) { @@ -263,14 +317,15 @@ public class BlobStoreConfiguration { return Objects.equals(this.implementation, that.implementation) && Objects.equals(this.cacheEnabled, that.cacheEnabled) && Objects.equals(this.storageStrategy, that.storageStrategy) - && Objects.equals(this.cryptoConfig, that.cryptoConfig); + && Objects.equals(this.cryptoConfig, that.cryptoConfig) + && Objects.equals(this.compressionConfiguration, that.compressionConfiguration); } return false; } @Override public final int hashCode() { - return Objects.hash(implementation, cacheEnabled, storageStrategy, cryptoConfig); + return Objects.hash(implementation, cacheEnabled, storageStrategy, cryptoConfig, compressionConfiguration); } @Override @@ -280,6 +335,7 @@ public class BlobStoreConfiguration { .add("cacheEnabled", cacheEnabled) .add("storageStrategy", storageStrategy.name()) .add("cryptoConfig", cryptoConfig) + .add("compressionConfiguration", compressionConfiguration) .toString(); } } diff --git a/server/container/guice/distributed/src/main/java/org/apache/james/modules/blobstore/BlobStoreModulesChooser.java b/server/container/guice/distributed/src/main/java/org/apache/james/modules/blobstore/BlobStoreModulesChooser.java index a0c467bdb9..0499b0d79a 100644 --- a/server/container/guice/distributed/src/main/java/org/apache/james/modules/blobstore/BlobStoreModulesChooser.java +++ b/server/container/guice/distributed/src/main/java/org/apache/james/modules/blobstore/BlobStoreModulesChooser.java @@ -39,7 +39,10 @@ import org.apache.james.blob.objectstorage.aws.sse.S3SSECConfiguration; import org.apache.james.blob.objectstorage.aws.sse.S3SSECustomerKeyFactory; import org.apache.james.blob.objectstorage.aws.sse.S3SSECustomerKeyFactory.SingleCustomerKeyFactory; import org.apache.james.blob.postgres.PostgresBlobStoreDAO; +import org.apache.james.blob.zstd.CompressionConfiguration; +import org.apache.james.blob.zstd.ZstdBlobStoreDAO; import org.apache.james.core.healthcheck.HealthCheck; +import org.apache.james.metrics.api.MetricFactory; import org.apache.james.modules.blobstore.validation.BlobStoreConfigurationValidationStartUpCheck.StorageStrategySupplier; import org.apache.james.modules.blobstore.validation.StoragePolicyConfigurationSanityEnforcementModule; import org.apache.james.modules.mailbox.BlobStoreAPIModule; @@ -65,7 +68,8 @@ import com.google.inject.name.Names; import modules.BlobPostgresModule; public class BlobStoreModulesChooser { - private static final String UNENCRYPTED = "unencrypted"; + private static final String RAW = "raw"; + private static final String ENCRYPTION = "encryption"; static class CassandraBlobStoreDAODeclarationModule extends AbstractModule { @Override @@ -73,7 +77,7 @@ public class BlobStoreModulesChooser { install(new CassandraBlobStoreDependenciesModule()); install(new DefaultBucketModule()); - bind(BlobStoreDAO.class).annotatedWith(Names.named(UNENCRYPTED)).to(CassandraBlobStoreDAO.class); + bind(BlobStoreDAO.class).annotatedWith(Names.named(RAW)).to(CassandraBlobStoreDAO.class); } } @@ -83,7 +87,7 @@ public class BlobStoreModulesChooser { install(new S3BlobStoreModule()); install(new S3BucketModule()); - bind(BlobStoreDAO.class).annotatedWith(Names.named(UNENCRYPTED)).to(S3BlobStoreDAO.class) + bind(BlobStoreDAO.class).annotatedWith(Names.named(RAW)).to(S3BlobStoreDAO.class) .in(Scopes.SINGLETON); Multibinder.newSetBinder(binder(), HealthCheck.class).addBinding().to(ObjectStorageHealthCheck.class); } @@ -107,7 +111,7 @@ public class BlobStoreModulesChooser { protected void configure() { install(new DefaultBucketModule()); - bind(BlobStoreDAO.class).annotatedWith(Names.named(UNENCRYPTED)).to(FileBlobStoreDAO.class); + bind(BlobStoreDAO.class).annotatedWith(Names.named(RAW)).to(FileBlobStoreDAO.class); } } @@ -118,15 +122,43 @@ public class BlobStoreModulesChooser { install(new DefaultBucketModule()); - bind(BlobStoreDAO.class).annotatedWith(Names.named(UNENCRYPTED)).to(PostgresBlobStoreDAO.class); + bind(BlobStoreDAO.class).annotatedWith(Names.named(RAW)).to(PostgresBlobStoreDAO.class); + } + } + + static class NoCompressionModule extends AbstractModule { + @Provides + @Singleton + BlobStoreDAO blobStoreDAO(@Named(ENCRYPTION) BlobStoreDAO encryption) { + return encryption; + } + } + + static class CompressionModule extends AbstractModule { + private final CompressionConfiguration compressionConfiguration; + + CompressionModule(CompressionConfiguration compressionConfiguration) { + this.compressionConfiguration = compressionConfiguration; + } + + @Provides + @Singleton + BlobStoreDAO blobStoreDAO(@Named(ENCRYPTION) BlobStoreDAO encryption, MetricFactory metricFactory) { + return new ZstdBlobStoreDAO(encryption, compressionConfiguration, metricFactory); + } + + @Provides + CompressionConfiguration compressionConfiguration() { + return compressionConfiguration; } } static class NoEncryptionModule extends AbstractModule { @Provides @Singleton - BlobStoreDAO blobStoreDAO(@Named(UNENCRYPTED) BlobStoreDAO unencrypted) { - return unencrypted; + @Named(ENCRYPTION) + BlobStoreDAO blobStoreDAO(@Named(RAW) BlobStoreDAO raw) { + return raw; } } @@ -139,8 +171,9 @@ public class BlobStoreModulesChooser { @Provides @Singleton - BlobStoreDAO blobStoreDAO(@Named(UNENCRYPTED) BlobStoreDAO unencrypted) { - return new AESBlobStoreDAO(unencrypted, cryptoConfig); + @Named(ENCRYPTION) + BlobStoreDAO blobStoreDAO(@Named(RAW) BlobStoreDAO raw) { + return new AESBlobStoreDAO(raw, cryptoConfig); } @Provides @@ -151,8 +184,9 @@ public class BlobStoreModulesChooser { public static List<Module> chooseModules(BlobStoreConfiguration choosingConfiguration) { return ImmutableList.<Module>builder() - .add(chooseEncryptionModule(choosingConfiguration.getCryptoConfig())) .add(chooseBlobStoreDAOModule(choosingConfiguration.getImplementation())) + .add(chooseEncryptionModule(choosingConfiguration.getCryptoConfig())) + .add(chooseCompressionModule(choosingConfiguration.getCompressionConfiguration())) .addAll(chooseStoragePolicyModule(choosingConfiguration.storageStrategy())) .add(new StoragePolicyConfigurationSanityEnforcementModule()) .add(binder -> binder.bind(BlobStoreConfiguration.class).toInstance(choosingConfiguration)) @@ -180,6 +214,13 @@ public class BlobStoreModulesChooser { return encryptionModule.orElse(new NoEncryptionModule()); } + public static Module chooseCompressionModule(CompressionConfiguration compressionConfiguration) { + if (compressionConfiguration.enabled()) { + return new CompressionModule(compressionConfiguration); + } + return new NoCompressionModule(); + } + public static List<Module> chooseStoragePolicyModule(StorageStrategy storageStrategy) { switch (storageStrategy) { case DEDUPLICATION: diff --git a/server/container/guice/distributed/src/test/java/org/apache/james/modules/blobstore/BlobStoreConfigurationTest.java b/server/container/guice/distributed/src/test/java/org/apache/james/modules/blobstore/BlobStoreConfigurationTest.java index 65aff8ff8b..2a2397deb3 100644 --- a/server/container/guice/distributed/src/test/java/org/apache/james/modules/blobstore/BlobStoreConfigurationTest.java +++ b/server/container/guice/distributed/src/test/java/org/apache/james/modules/blobstore/BlobStoreConfigurationTest.java @@ -26,6 +26,7 @@ import static org.assertj.core.api.Assertions.assertThatThrownBy; import org.apache.commons.configuration2.PropertiesConfiguration; import org.apache.james.FakePropertiesProvider; import org.apache.james.blob.aes.CryptoConfig; +import org.apache.james.blob.zstd.CompressionConfiguration; import org.apache.james.modules.mailbox.ConfigurationComponent; import org.apache.james.server.blob.deduplication.StorageStrategy; import org.junit.jupiter.api.Test; @@ -157,6 +158,76 @@ class BlobStoreConfigurationTest { .build())); } + @Test + void compressionShouldBeDisabledByDefault() throws Exception { + PropertiesConfiguration configuration = new PropertiesConfiguration(); + configuration.addProperty("implementation", "cassandra"); + configuration.addProperty("deduplication.enable", false); + FakePropertiesProvider propertyProvider = FakePropertiesProvider.builder() + .register(ConfigurationComponent.NAME, configuration) + .build(); + + assertThat(parse(propertyProvider).getCompressionConfiguration()) + .isEqualTo(CompressionConfiguration.disabled()); + } + + @Test + void compressionCanBeActivated() throws Exception { + PropertiesConfiguration configuration = new PropertiesConfiguration(); + configuration.addProperty("implementation", "cassandra"); + configuration.addProperty("deduplication.enable", false); + configuration.addProperty("compression.enabled", true); + configuration.addProperty("compression.threshold", "32K"); + configuration.addProperty("compression.min-ratio", 0.8F); + FakePropertiesProvider propertyProvider = FakePropertiesProvider.builder() + .register(ConfigurationComponent.NAME, configuration) + .build(); + + assertThat(parse(propertyProvider).getCompressionConfiguration()) + .isEqualTo(CompressionConfiguration.builder() + .enabled(true) + .threshold(32 * 1024L) + .minRatio(0.8F) + .build()); + } + + @Test + void compressionThresholdShouldAcceptWhitespaceAroundSizeUnit() throws Exception { + PropertiesConfiguration configuration = new PropertiesConfiguration(); + configuration.addProperty("implementation", "cassandra"); + configuration.addProperty("deduplication.enable", false); + configuration.addProperty("compression.enabled", true); + configuration.addProperty("compression.threshold", "32 K"); + FakePropertiesProvider propertyProvider = FakePropertiesProvider.builder() + .register(ConfigurationComponent.NAME, configuration) + .build(); + + assertThat(parse(propertyProvider).getCompressionConfiguration().threshold()) + .isEqualTo(32 * 1024L); + } + + @Test + void builderShouldAllowFluentCompressionConfigurationAfterCryptoChoice() { + CompressionConfiguration compressionConfiguration = CompressionConfiguration.builder() + .enabled(true) + .threshold(32 * 1024L) + .minRatio(0.8F) + .build(); + + assertThat(BlobStoreConfiguration.builder() + .cassandra() + .disableCache() + .passthrough() + .withNoCryptoConfig() + .compressionConfig(compressionConfiguration)) + .isEqualTo(BlobStoreConfiguration.builder() + .cassandra() + .disableCache() + .passthrough() + .noCryptoConfig() + .compressionConfig(compressionConfiguration)); + } + @Test void provideChoosingConfigurationShouldThrowWhenPropertyFieldIsNotInSupportedList() { PropertiesConfiguration configuration = new PropertiesConfiguration(); diff --git a/server/container/guice/distributed/src/test/java/org/apache/james/modules/blobstore/BlobStoreModulesChooserTest.java b/server/container/guice/distributed/src/test/java/org/apache/james/modules/blobstore/BlobStoreModulesChooserTest.java index 60b8d6de91..53040161e2 100644 --- a/server/container/guice/distributed/src/test/java/org/apache/james/modules/blobstore/BlobStoreModulesChooserTest.java +++ b/server/container/guice/distributed/src/test/java/org/apache/james/modules/blobstore/BlobStoreModulesChooserTest.java @@ -22,6 +22,7 @@ package org.apache.james.modules.blobstore; import static org.assertj.core.api.Assertions.assertThat; import org.apache.james.blob.aes.CryptoConfig; +import org.apache.james.blob.zstd.CompressionConfiguration; import org.junit.jupiter.api.Test; class BlobStoreModulesChooserTest { @@ -73,4 +74,29 @@ class BlobStoreModulesChooserTest { .filteredOn(module -> module instanceof BlobStoreModulesChooser.EncryptionModule) .hasSize(1); } + + @Test + void provideBlobStoreShouldReturnNoCompressionWhenCompressionDisabled() { + assertThat(BlobStoreModulesChooser.chooseModules(BlobStoreConfiguration.builder() + .s3() + .disableCache() + .deduplication() + .noCryptoConfig())) + .filteredOn(module -> module instanceof BlobStoreModulesChooser.NoCompressionModule) + .hasSize(1); + } + + @Test + void provideBlobStoreShouldReturnCompressionWhenConfigured() { + assertThat(BlobStoreModulesChooser.chooseModules(BlobStoreConfiguration.builder() + .cassandra() + .disableCache() + .passthrough() + .noCryptoConfig() + .compressionConfig(CompressionConfiguration.builder() + .enabled(true) + .build()))) + .filteredOn(module -> module instanceof BlobStoreModulesChooser.CompressionModule) + .hasSize(1); + } } \ No newline at end of file diff --git a/src/site/xdoc/server/config-blobstore.xml b/src/site/xdoc/server/config-blobstore.xml index 8427b45a26..7fe3f5932b 100644 --- a/src/site/xdoc/server/config-blobstore.xml +++ b/src/site/xdoc/server/config-blobstore.xml @@ -139,6 +139,30 @@ generate salt with : openssl rand -hex 16 </code> </pre> </subsection> + <subsection name="Compression choice"> + <p> + Data can be optionally compressed with zstd before being stored in the blobStore. This especially + benefits text-heavy mail contents and attachments when using S3 compatible object stores where + storage costs matter. + </p> + <dl> + <dt><strong>compression.enabled</strong></dt> + <dd>Optional boolean, defaults to false.</dd> + </dl> + <dl> + <dt><strong>compression.threshold</strong></dt> + <dd>Optional size, defaults to 16K. Compression is only attempted for blobs whose original + size is greater than or equal to this threshold. Supported units: no suffix for bytes, or B, K, M, G.</dd> + </dl> + <dl> + <dt><strong>compression.min-ratio</strong></dt> + <dd>Optional float between 0 and 1, defaults to 1. James keeps the compressed payload only when + compressedSize / originalSize <= compression.min-ratio.</dd> + <dd>Use <code>0</code> for a "decompress-only" mode: James will still read previously compressed + blobs but will not compress new writes.</dd> + </dl> + <p>If both compression and AES encryption are enabled, James compresses first and encrypts afterwards.</p> + </subsection> <subsection name="ObjectStorage BlobStore Buckets Configuration"> <dl> <dt><strong>objectstorage.bucketPrefix</strong></dt> --------------------------------------------------------------------- To unsubscribe, e-mail: [email protected] For additional commands, e-mail: [email protected]
