This is an automated email from the ASF dual-hosted git repository. btellier pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/james-project.git
commit e5751b8b3ad6c430d926d2a3421de5873e8a62fc Author: Quan Tran <[email protected]> AuthorDate: Tue Apr 14 16:40:31 2026 +0700 JAMES-4182 Implement blob metadata storage for Cassandra --- .../apache/james/blob/cassandra/BlobTables.java | 2 + .../cassandra/CassandraBlobDataDefinition.java | 7 +- .../blob/cassandra/CassandraBlobDescriptor.java | 48 ++++++++++++ .../blob/cassandra/CassandraBlobStoreDAO.java | 90 +++++++++++----------- .../james/blob/cassandra/CassandraBucketDAO.java | 36 +++++++-- .../blob/cassandra/CassandraDefaultBucketDAO.java | 36 +++++++-- .../cassandra/CassandraBlobStoreClOneTest.java | 8 +- .../blob/cassandra/CassandraBlobStoreDAOTest.java | 3 +- upgrade-instructions.md | 21 +++++ 9 files changed, 188 insertions(+), 63 deletions(-) diff --git a/server/blob/blob-cassandra/src/main/java/org/apache/james/blob/cassandra/BlobTables.java b/server/blob/blob-cassandra/src/main/java/org/apache/james/blob/cassandra/BlobTables.java index 11ca1d40f0..0b95e44dd5 100644 --- a/server/blob/blob-cassandra/src/main/java/org/apache/james/blob/cassandra/BlobTables.java +++ b/server/blob/blob-cassandra/src/main/java/org/apache/james/blob/cassandra/BlobTables.java @@ -27,6 +27,7 @@ public interface BlobTables { String TABLE_NAME = "blobs"; CqlIdentifier ID = CqlIdentifier.fromCql("id"); CqlIdentifier NUMBER_OF_CHUNK = CqlIdentifier.fromCql("position"); + CqlIdentifier METADATA = CqlIdentifier.fromCql("metadata"); } interface DefaultBucketBlobParts { @@ -41,6 +42,7 @@ public interface BlobTables { CqlIdentifier BUCKET = CqlIdentifier.fromCql("bucket"); CqlIdentifier ID = CqlIdentifier.fromCql("id"); CqlIdentifier NUMBER_OF_CHUNK = CqlIdentifier.fromCql("position"); + CqlIdentifier METADATA = CqlIdentifier.fromCql("metadata"); } interface BucketBlobParts { diff --git a/server/blob/blob-cassandra/src/main/java/org/apache/james/blob/cassandra/CassandraBlobDataDefinition.java b/server/blob/blob-cassandra/src/main/java/org/apache/james/blob/cassandra/CassandraBlobDataDefinition.java index d562b7e9c4..da21297bce 100644 --- a/server/blob/blob-cassandra/src/main/java/org/apache/james/blob/cassandra/CassandraBlobDataDefinition.java +++ b/server/blob/blob-cassandra/src/main/java/org/apache/james/blob/cassandra/CassandraBlobDataDefinition.java @@ -19,6 +19,7 @@ package org.apache.james.blob.cassandra; +import static com.datastax.oss.driver.api.core.type.DataTypes.TEXT; import static org.apache.james.blob.cassandra.BlobTables.DefaultBucketBlobParts.DATA; import org.apache.james.backends.cassandra.components.CassandraDataDefinition; @@ -46,7 +47,8 @@ public interface CassandraBlobDataDefinition { "Messages` headers and bodies are stored as blobparts.") .statement(statement -> types -> statement .withPartitionKey(DefaultBucketBlobTable.ID, DataTypes.TEXT) - .withClusteringColumn(DefaultBucketBlobTable.NUMBER_OF_CHUNK, DataTypes.INT)) + .withClusteringColumn(DefaultBucketBlobTable.NUMBER_OF_CHUNK, DataTypes.INT) + .withColumn(DefaultBucketBlobTable.METADATA, DataTypes.frozenMapOf(TEXT, TEXT))) .table(BucketBlobParts.TABLE_NAME) .comment("Holds blob parts composing blobs in a non-default bucket." + @@ -63,7 +65,8 @@ public interface CassandraBlobDataDefinition { .statement(statement -> types -> statement .withPartitionKey(BucketBlobParts.BUCKET, DataTypes.TEXT) .withPartitionKey(BucketBlobParts.ID, DataTypes.TEXT) - .withClusteringColumn(BucketBlobTable.NUMBER_OF_CHUNK, DataTypes.INT)) + .withClusteringColumn(BucketBlobTable.NUMBER_OF_CHUNK, DataTypes.INT) + .withColumn(BucketBlobTable.METADATA, DataTypes.frozenMapOf(TEXT, TEXT))) .build(); } diff --git a/server/blob/blob-cassandra/src/main/java/org/apache/james/blob/cassandra/CassandraBlobDescriptor.java b/server/blob/blob-cassandra/src/main/java/org/apache/james/blob/cassandra/CassandraBlobDescriptor.java new file mode 100644 index 0000000000..0618d55843 --- /dev/null +++ b/server/blob/blob-cassandra/src/main/java/org/apache/james/blob/cassandra/CassandraBlobDescriptor.java @@ -0,0 +1,48 @@ +/**************************************************************** + * Licensed to the Apache Software Foundation (ASF) under one * + * or more contributor license agreements. See the NOTICE file * + * distributed with this work for additional information * + * regarding copyright ownership. The ASF licenses this file * + * to you under the Apache License, Version 2.0 (the * + * "License"); you may not use this file except in compliance * + * with the License. You may obtain a copy of the License at * + * * + * http://www.apache.org/licenses/LICENSE-2.0 * + * * + * Unless required by applicable law or agreed to in writing, * + * software distributed under the License is distributed on an * + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY * + * KIND, either express or implied. See the License for the * + * specific language governing permissions and limitations * + * under the License. * + ****************************************************************/ + +package org.apache.james.blob.cassandra; + +import java.util.Map; + +import org.apache.james.blob.api.BlobStoreDAO.BlobMetadata; +import org.apache.james.blob.api.BlobStoreDAO.BlobMetadataName; +import org.apache.james.blob.api.BlobStoreDAO.BlobMetadataValue; + +import com.google.common.collect.ImmutableMap; + +record CassandraBlobDescriptor(int rowCount, BlobMetadata metadata) { + static CassandraBlobDescriptor from(int rowCount, Map<String, String> metadata) { + return new CassandraBlobDescriptor(rowCount, toBlobMetadata(metadata)); + } + + static Map<String, String> toRawMetadata(BlobMetadata metadata) { + return metadata.underlyingMap().entrySet().stream() + .collect(ImmutableMap.toImmutableMap( + entry -> entry.getKey().name(), + entry -> entry.getValue().value())); + } + + private static BlobMetadata toBlobMetadata(Map<String, String> metadata) { + return new BlobMetadata(metadata.entrySet().stream() + .collect(ImmutableMap.toImmutableMap( + entry -> new BlobMetadataName(entry.getKey()), + entry -> new BlobMetadataValue(entry.getValue())))); + } +} diff --git a/server/blob/blob-cassandra/src/main/java/org/apache/james/blob/cassandra/CassandraBlobStoreDAO.java b/server/blob/blob-cassandra/src/main/java/org/apache/james/blob/cassandra/CassandraBlobStoreDAO.java index 5afd769d41..4083007773 100644 --- a/server/blob/blob-cassandra/src/main/java/org/apache/james/blob/cassandra/CassandraBlobStoreDAO.java +++ b/server/blob/blob-cassandra/src/main/java/org/apache/james/blob/cassandra/CassandraBlobStoreDAO.java @@ -98,58 +98,63 @@ public class CassandraBlobStoreDAO implements BlobStoreDAO { @Override public InputStreamBlob read(BucketName bucketName, BlobId blobId) throws ObjectStoreIOException, ObjectNotFoundException { - return InputStreamBlob.of(ReactorUtils.toInputStream(readBlobParts(bucketName, blobId))); + return Mono.from(readBlobDescriptor(bucketName, blobId)) + .map(blobDescriptor -> InputStreamBlob.of(ReactorUtils.toInputStream(readBlobParts(bucketName, blobId, blobDescriptor.rowCount())), blobDescriptor.metadata())) + .block(); } @Override public Publisher<InputStreamBlob> readReactive(BucketName bucketName, BlobId blobId) { - return Mono.just(read(bucketName, blobId)); + return Mono.from(readBlobDescriptor(bucketName, blobId)) + .publishOn(ReactorUtils.BLOCKING_CALL_WRAPPER) + .map(blobDescriptor -> InputStreamBlob.of(ReactorUtils.toInputStream(readBlobParts(bucketName, blobId, blobDescriptor.rowCount())), blobDescriptor.metadata())); } @Override public Publisher<BytesBlob> readBytes(BucketName bucketName, BlobId blobId) { - return readBlobParts(bucketName, blobId) - .collectList() - .map(this::byteBuffersToBytesArray) - .map(BytesBlob::of); + return Mono.from(readBlobDescriptor(bucketName, blobId)) + .flatMap(blobDescriptor -> readBlobParts(bucketName, blobId, blobDescriptor.rowCount()) + .collectList() + .map(this::byteBuffersToBytesArray) + .map(bytes -> BytesBlob.of(bytes, blobDescriptor.metadata()))); } @Override public Publisher<Void> save(BucketName bucketName, BlobId blobId, Blob blob) { return switch (blob) { - case BytesBlob bytesBlob -> save(bucketName, blobId, bytesBlob.payload()); - case InputStreamBlob inputStreamBlob -> save(bucketName, blobId, inputStreamBlob.payload()); - case ByteSourceBlob byteSourceBlob -> save(bucketName, blobId, byteSourceBlob.payload()); + case BytesBlob bytesBlob -> save(bucketName, blobId, bytesBlob.payload(), bytesBlob.metadata()); + case InputStreamBlob inputStreamBlob -> save(bucketName, blobId, inputStreamBlob.payload(), inputStreamBlob.metadata()); + case ByteSourceBlob byteSourceBlob -> save(bucketName, blobId, byteSourceBlob.payload(), byteSourceBlob.metadata()); }; } - public Mono<Void> save(BucketName bucketName, BlobId blobId, byte[] data) { + private Mono<Void> save(BucketName bucketName, BlobId blobId, byte[] data, BlobMetadata metadata) { Preconditions.checkNotNull(data); return Mono.fromCallable(() -> DataChunker.chunk(data, configuration.getBlobPartSize())) - .flatMap(chunks -> save(bucketName, blobId, chunks)); + .flatMap(chunks -> save(bucketName, blobId, chunks, metadata)); } - public Mono<Void> save(BucketName bucketName, BlobId blobId, InputStream inputStream) { + private Mono<Void> save(BucketName bucketName, BlobId blobId, InputStream inputStream, BlobMetadata metadata) { Preconditions.checkNotNull(bucketName); Preconditions.checkNotNull(inputStream); return Mono.fromCallable(() -> ReactorUtils.toChunks(inputStream, configuration.getBlobPartSize()) .subscribeOn(Schedulers.boundedElastic())) - .flatMap(chunks -> save(bucketName, blobId, chunks)) - .onErrorMap(e -> new ObjectStoreIOException("Exception occurred while saving input stream", e)); + .flatMap(chunks -> save(bucketName, blobId, chunks, metadata)) + .onErrorMap(Throwable.class, e -> new ObjectStoreIOException("Exception occurred while saving input stream", e)); } - public Mono<Void> save(BucketName bucketName, BlobId blobId, ByteSource content) { + private Mono<Void> save(BucketName bucketName, BlobId blobId, ByteSource content, BlobMetadata metadata) { return Mono.using(content::openBufferedStream, - stream -> save(bucketName, blobId, stream), + stream -> save(bucketName, blobId, stream, metadata), Throwing.consumer(InputStream::close).sneakyThrow(), LAZY); } - private Mono<Void> save(BucketName bucketName, BlobId blobId, Flux<ByteBuffer> chunksAsFlux) { + private Mono<Void> save(BucketName bucketName, BlobId blobId, Flux<ByteBuffer> chunksAsFlux, BlobMetadata metadata) { return saveBlobParts(bucketName, blobId, chunksAsFlux) - .flatMap(numberOfChunk -> saveBlobPartReference(bucketName, blobId, numberOfChunk)); + .flatMap(numberOfChunk -> saveBlobPartReference(bucketName, blobId, numberOfChunk, metadata)); } private Mono<Integer> saveBlobParts(BucketName bucketName, BlobId blobId, Flux<ByteBuffer> chunksAsFlux) { @@ -171,11 +176,11 @@ public class CassandraBlobStoreDAO implements BlobStoreDAO { return write.thenReturn(anyNonEmptyValue); } - private Mono<Void> saveBlobPartReference(BucketName bucketName, BlobId blobId, Integer numberOfChunk) { + private Mono<Void> saveBlobPartReference(BucketName bucketName, BlobId blobId, Integer numberOfChunk, BlobMetadata metadata) { if (isDefaultBucket(bucketName)) { - return defaultBucketDAO.saveBlobPartsReferences(blobId, numberOfChunk); + return defaultBucketDAO.saveBlobPartsReferences(blobId, numberOfChunk, metadata); } else { - return bucketDAO.saveBlobPartsReferences(bucketName, blobId, numberOfChunk); + return bucketDAO.saveBlobPartsReferences(bucketName, blobId, numberOfChunk, metadata); } } @@ -240,43 +245,40 @@ public class CassandraBlobStoreDAO implements BlobStoreDAO { } } - private Mono<Integer> selectRowCount(BucketName bucketName, BlobId blobId) { - if (configuration.isOptimisticConsistencyLevel()) { - return selectRowCountClOne(bucketName, blobId) + private Mono<CassandraBlobDescriptor> readBlobDescriptor(BucketName bucketName, BlobId blobId) { + Mono<CassandraBlobDescriptor> descriptor = configuration.isOptimisticConsistencyLevel() + ? readBlobDescriptorClOne(bucketName, blobId) .doOnNext(any -> metricClOneHitCount.increment()) .switchIfEmpty(Mono.fromRunnable(metricClOneMissCount::increment) - .then(selectRowCountClDefault(bucketName, blobId))); - } else { - return selectRowCountClDefault(bucketName, blobId); - } + .then(readBlobDescriptorClDefault(bucketName, blobId))) + : readBlobDescriptorClDefault(bucketName, blobId); + + return descriptor.switchIfEmpty(Mono.error(() -> + new ObjectNotFoundException(String.format("Could not retrieve blob metadata for %s", blobId)))); } - private Mono<Integer> selectRowCountClOne(BucketName bucketName, BlobId blobId) { + private Mono<CassandraBlobDescriptor> readBlobDescriptorClOne(BucketName bucketName, BlobId blobId) { if (isDefaultBucket(bucketName)) { - return defaultBucketDAO.selectRowCountClOne(blobId); + return defaultBucketDAO.selectBlobDescriptorClOne(blobId); } else { - return bucketDAO.selectRowCountClOne(bucketName, blobId); + return bucketDAO.selectBlobDescriptorClOne(bucketName, blobId); } } - private Mono<Integer> selectRowCountClDefault(BucketName bucketName, BlobId blobId) { + private Mono<CassandraBlobDescriptor> readBlobDescriptorClDefault(BucketName bucketName, BlobId blobId) { if (isDefaultBucket(bucketName)) { - return defaultBucketDAO.selectRowCount(blobId); + return defaultBucketDAO.selectBlobDescriptor(blobId); } else { - return bucketDAO.selectRowCount(bucketName, blobId); + return bucketDAO.selectBlobDescriptor(bucketName, blobId); } } - private Flux<ByteBuffer> readBlobParts(BucketName bucketName, BlobId blobId) { - return selectRowCount(bucketName, blobId) - .single() - .onErrorMap(NoSuchElementException.class, e -> - new ObjectNotFoundException(String.format("Could not retrieve blob metadata for %s", blobId))) - .flatMapMany(rowCount -> Flux.range(0, rowCount) - .concatMap(partIndex -> readPart(bucketName, blobId, partIndex) - .single() - .onErrorMap(NoSuchElementException.class, e -> - new ObjectNotFoundException(String.format("Missing blob part for blobId %s and position %d", blobId.asString(), partIndex))))); + private Flux<ByteBuffer> readBlobParts(BucketName bucketName, BlobId blobId, int rowCount) { + return Flux.range(0, rowCount) + .concatMap(partIndex -> readPart(bucketName, blobId, partIndex) + .single() + .onErrorMap(NoSuchElementException.class, e -> + new ObjectNotFoundException(String.format("Missing blob part for blobId %s and position %d", blobId.asString(), partIndex)))); } private byte[] byteBuffersToBytesArray(List<ByteBuffer> byteBuffers) { diff --git a/server/blob/blob-cassandra/src/main/java/org/apache/james/blob/cassandra/CassandraBucketDAO.java b/server/blob/blob-cassandra/src/main/java/org/apache/james/blob/cassandra/CassandraBucketDAO.java index cab12525fc..69e5eed7bf 100644 --- a/server/blob/blob-cassandra/src/main/java/org/apache/james/blob/cassandra/CassandraBucketDAO.java +++ b/server/blob/blob-cassandra/src/main/java/org/apache/james/blob/cassandra/CassandraBucketDAO.java @@ -25,9 +25,12 @@ import static com.datastax.oss.driver.api.querybuilder.QueryBuilder.insertInto; import static com.datastax.oss.driver.api.querybuilder.QueryBuilder.selectFrom; import static org.apache.james.blob.cassandra.BlobTables.BucketBlobTable.BUCKET; import static org.apache.james.blob.cassandra.BlobTables.BucketBlobTable.ID; +import static org.apache.james.blob.cassandra.BlobTables.BucketBlobTable.METADATA; import static org.apache.james.blob.cassandra.BlobTables.BucketBlobTable.NUMBER_OF_CHUNK; import java.nio.ByteBuffer; +import java.util.Map; +import java.util.Optional; import jakarta.inject.Inject; @@ -35,6 +38,7 @@ import org.apache.commons.lang3.tuple.Pair; import org.apache.james.backends.cassandra.init.configuration.JamesExecutionProfiles; import org.apache.james.backends.cassandra.utils.CassandraAsyncExecutor; import org.apache.james.blob.api.BlobId; +import org.apache.james.blob.api.BlobStoreDAO.BlobMetadata; import org.apache.james.blob.api.BucketName; import org.apache.james.blob.cassandra.BlobTables.BucketBlobParts; @@ -71,6 +75,7 @@ public class CassandraBucketDAO { .value(BUCKET, bindMarker(BUCKET)) .value(ID, bindMarker(ID)) .value(NUMBER_OF_CHUNK, bindMarker(NUMBER_OF_CHUNK)) + .value(METADATA, bindMarker(METADATA)) .build()); this.insertPart = session.prepare(insertInto(BucketBlobParts.TABLE_NAME) @@ -123,29 +128,39 @@ public class CassandraBucketDAO { .setByteBuffer(BucketBlobParts.DATA, data)); } - Mono<Void> saveBlobPartsReferences(BucketName bucketName, BlobId blobId, int numberOfChunk) { + Mono<Void> saveBlobPartsReferences(BucketName bucketName, BlobId blobId, int numberOfChunk, BlobMetadata metadata) { return cassandraAsyncExecutor.executeVoid( insert.bind() .setString(BUCKET, bucketName.asString()) .setString(ID, blobId.asString()) - .setInt(NUMBER_OF_CHUNK, numberOfChunk)); + .setInt(NUMBER_OF_CHUNK, numberOfChunk) + .setMap(METADATA, CassandraBlobDescriptor.toRawMetadata(metadata), String.class, String.class)); } - Mono<Integer> selectRowCount(BucketName bucketName, BlobId blobId) { + Mono<Void> saveBlobPartsReferences(BucketName bucketName, BlobId blobId, int numberOfChunk) { + return saveBlobPartsReferences(bucketName, blobId, numberOfChunk, BlobMetadata.empty()); + } + + Mono<CassandraBlobDescriptor> selectBlobDescriptor(BucketName bucketName, BlobId blobId) { return cassandraAsyncExecutor.executeSingleRow( select.bind() .setString(BUCKET, bucketName.asString()) .setString(ID, blobId.asString())) - .map(row -> row.getInt(NUMBER_OF_CHUNK)); + .map(this::rowToBlobDescriptor); + } + + Mono<Integer> selectRowCount(BucketName bucketName, BlobId blobId) { + return selectBlobDescriptor(bucketName, blobId) + .map(CassandraBlobDescriptor::rowCount); } - Mono<Integer> selectRowCountClOne(BucketName bucketName, BlobId blobId) { + Mono<CassandraBlobDescriptor> selectBlobDescriptorClOne(BucketName bucketName, BlobId blobId) { return cassandraAsyncExecutor.executeSingleRow( select.bind() .setString(BUCKET, bucketName.asString()) .setString(ID, blobId.asString()) .setExecutionProfile(optimisticConsistencyLevelProfile)) - .map(row -> row.getInt(NUMBER_OF_CHUNK)); + .map(this::rowToBlobDescriptor); } Mono<ByteBuffer> readPart(BucketName bucketName, BlobId blobId, int position) { @@ -195,4 +210,13 @@ public class CassandraBucketDAO { private ByteBuffer rowToData(Row row) { return row.getByteBuffer(BucketBlobParts.DATA); } + + private CassandraBlobDescriptor rowToBlobDescriptor(Row row) { + return CassandraBlobDescriptor.from(row.getInt(NUMBER_OF_CHUNK), metadata(row)); + } + + private Map<String, String> metadata(Row row) { + return Optional.ofNullable(row.getMap(METADATA, String.class, String.class)) + .orElseGet(Map::of); + } } diff --git a/server/blob/blob-cassandra/src/main/java/org/apache/james/blob/cassandra/CassandraDefaultBucketDAO.java b/server/blob/blob-cassandra/src/main/java/org/apache/james/blob/cassandra/CassandraDefaultBucketDAO.java index ee2baa6389..316b52e6a7 100644 --- a/server/blob/blob-cassandra/src/main/java/org/apache/james/blob/cassandra/CassandraDefaultBucketDAO.java +++ b/server/blob/blob-cassandra/src/main/java/org/apache/james/blob/cassandra/CassandraDefaultBucketDAO.java @@ -24,15 +24,19 @@ import static com.datastax.oss.driver.api.querybuilder.QueryBuilder.deleteFrom; import static com.datastax.oss.driver.api.querybuilder.QueryBuilder.insertInto; import static com.datastax.oss.driver.api.querybuilder.QueryBuilder.selectFrom; import static org.apache.james.blob.cassandra.BlobTables.DefaultBucketBlobTable.ID; +import static org.apache.james.blob.cassandra.BlobTables.DefaultBucketBlobTable.METADATA; import static org.apache.james.blob.cassandra.BlobTables.DefaultBucketBlobTable.NUMBER_OF_CHUNK; import java.nio.ByteBuffer; +import java.util.Map; +import java.util.Optional; import jakarta.inject.Inject; import org.apache.james.backends.cassandra.init.configuration.JamesExecutionProfiles; import org.apache.james.backends.cassandra.utils.CassandraAsyncExecutor; import org.apache.james.blob.api.BlobId; +import org.apache.james.blob.api.BlobStoreDAO.BlobMetadata; import org.apache.james.blob.cassandra.BlobTables.DefaultBucketBlobParts; import com.datastax.oss.driver.api.core.CqlSession; @@ -65,6 +69,7 @@ public class CassandraDefaultBucketDAO { this.insert = session.prepare(insertInto(BlobTables.DefaultBucketBlobTable.TABLE_NAME) .value(ID, bindMarker(ID)) .value(NUMBER_OF_CHUNK, bindMarker(NUMBER_OF_CHUNK)) + .value(METADATA, bindMarker(METADATA)) .build()); this.select = session.prepare(selectFrom(BlobTables.DefaultBucketBlobTable.TABLE_NAME) @@ -107,26 +112,36 @@ public class CassandraDefaultBucketDAO { .setByteBuffer(DefaultBucketBlobParts.DATA, data)); } - Mono<Void> saveBlobPartsReferences(BlobId blobId, int numberOfChunk) { + Mono<Void> saveBlobPartsReferences(BlobId blobId, int numberOfChunk, BlobMetadata metadata) { return cassandraAsyncExecutor.executeVoid( insert.bind() .setString(ID, blobId.asString()) - .setInt(NUMBER_OF_CHUNK, numberOfChunk)); + .setInt(NUMBER_OF_CHUNK, numberOfChunk) + .setMap(METADATA, CassandraBlobDescriptor.toRawMetadata(metadata), String.class, String.class)); } - Mono<Integer> selectRowCount(BlobId blobId) { + Mono<Void> saveBlobPartsReferences(BlobId blobId, int numberOfChunk) { + return saveBlobPartsReferences(blobId, numberOfChunk, BlobMetadata.empty()); + } + + Mono<CassandraBlobDescriptor> selectBlobDescriptor(BlobId blobId) { return cassandraAsyncExecutor.executeSingleRow( select.bind() .setString(ID, blobId.asString())) - .map(row -> row.getInt(NUMBER_OF_CHUNK)); + .map(this::rowToBlobDescriptor); + } + + Mono<Integer> selectRowCount(BlobId blobId) { + return selectBlobDescriptor(blobId) + .map(CassandraBlobDescriptor::rowCount); } - Mono<Integer> selectRowCountClOne(BlobId blobId) { + Mono<CassandraBlobDescriptor> selectBlobDescriptorClOne(BlobId blobId) { return cassandraAsyncExecutor.executeSingleRow( select.bind() .setString(ID, blobId.asString()) .setExecutionProfile(optimisticConsistencyLevelProfile)) - .map(row -> row.getInt(NUMBER_OF_CHUNK)); + .map(this::rowToBlobDescriptor); } Mono<ByteBuffer> readPart(BlobId blobId, int position) { @@ -166,4 +181,13 @@ public class CassandraDefaultBucketDAO { private ByteBuffer rowToData(Row row) { return row.getByteBuffer(DefaultBucketBlobParts.DATA); } + + private CassandraBlobDescriptor rowToBlobDescriptor(Row row) { + return CassandraBlobDescriptor.from(row.getInt(NUMBER_OF_CHUNK), metadata(row)); + } + + private Map<String, String> metadata(Row row) { + return Optional.ofNullable(row.getMap(METADATA, String.class, String.class)) + .orElseGet(Map::of); + } } diff --git a/server/blob/blob-cassandra/src/test/java/org/apache/james/blob/cassandra/CassandraBlobStoreClOneTest.java b/server/blob/blob-cassandra/src/test/java/org/apache/james/blob/cassandra/CassandraBlobStoreClOneTest.java index cc58a313d1..5cecf8a4c7 100644 --- a/server/blob/blob-cassandra/src/test/java/org/apache/james/blob/cassandra/CassandraBlobStoreClOneTest.java +++ b/server/blob/blob-cassandra/src/test/java/org/apache/james/blob/cassandra/CassandraBlobStoreClOneTest.java @@ -137,7 +137,7 @@ class CassandraBlobStoreClOneTest implements CassandraBlobStoreContract, Dedupli String longString = Strings.repeat("0123456789\n", repeatCount); BlobId blobId = Mono.from(testee().save(testee().getDefaultBucketName(), longString, LOW_COST)).block(); - when(defaultBucketDAO().selectRowCountClOne(blobId)).thenReturn(Mono.empty()); + when(defaultBucketDAO().selectBlobDescriptorClOne(blobId)).thenReturn(Mono.empty()); String data = IOUtils.toString(testee().read(testee().getDefaultBucketName(), blobId), StandardCharsets.UTF_8); @@ -150,7 +150,7 @@ class CassandraBlobStoreClOneTest implements CassandraBlobStoreContract, Dedupli String longString = Strings.repeat("0123456789\n", repeatCount); BlobId blobId = Mono.from(testee().save(testee().getDefaultBucketName(), longString, LOW_COST)).block(); - when(defaultBucketDAO().selectRowCountClOne(blobId)).thenReturn(Mono.empty()); + when(defaultBucketDAO().selectBlobDescriptorClOne(blobId)).thenReturn(Mono.empty()); byte[] bytes = Mono.from(testee().readBytes(testee().getDefaultBucketName(), blobId)).block(); @@ -213,7 +213,7 @@ class CassandraBlobStoreClOneTest implements CassandraBlobStoreContract, Dedupli BlobId blobId = Mono.from(store.save(store.getDefaultBucketName(), BYTES_CONTENT, LOW_COST)).block(); - when(defaultBucketDAO().selectRowCountClOne(blobId)).thenReturn(Mono.empty()); + when(defaultBucketDAO().selectBlobDescriptorClOne(blobId)).thenReturn(Mono.empty()); store.read(store.getDefaultBucketName(), blobId); when(defaultBucketDAO().readPartClOne(blobId, 1)).thenReturn(Mono.empty()); @@ -230,7 +230,7 @@ class CassandraBlobStoreClOneTest implements CassandraBlobStoreContract, Dedupli BlobId blobId = Mono.from(store.save(store.getDefaultBucketName(), BYTES_CONTENT, LOW_COST)).block(); - when(defaultBucketDAO().selectRowCountClOne(blobId)).thenReturn(Mono.empty()); + when(defaultBucketDAO().selectBlobDescriptorClOne(blobId)).thenReturn(Mono.empty()); Mono.from(store.readBytes(store.getDefaultBucketName(), blobId)).block(); when(defaultBucketDAO().readPartClOne(blobId, 1)).thenReturn(Mono.empty()); diff --git a/server/blob/blob-cassandra/src/test/java/org/apache/james/blob/cassandra/CassandraBlobStoreDAOTest.java b/server/blob/blob-cassandra/src/test/java/org/apache/james/blob/cassandra/CassandraBlobStoreDAOTest.java index 4c3ba2c44f..9e829c456b 100644 --- a/server/blob/blob-cassandra/src/test/java/org/apache/james/blob/cassandra/CassandraBlobStoreDAOTest.java +++ b/server/blob/blob-cassandra/src/test/java/org/apache/james/blob/cassandra/CassandraBlobStoreDAOTest.java @@ -25,13 +25,14 @@ import org.apache.james.backends.cassandra.init.configuration.CassandraConfigura import org.apache.james.blob.api.BlobStoreDAO; import org.apache.james.blob.api.BlobStoreDAOContract; import org.apache.james.blob.api.BucketName; +import org.apache.james.blob.api.MetadataAwareBlobStoreDAOContract; import org.apache.james.blob.api.TestBlobId; import org.apache.james.metrics.tests.RecordingMetricFactory; import org.junit.jupiter.api.BeforeEach; import org.junit.jupiter.api.Disabled; import org.junit.jupiter.api.extension.RegisterExtension; -public class CassandraBlobStoreDAOTest implements BlobStoreDAOContract { +public class CassandraBlobStoreDAOTest implements BlobStoreDAOContract, MetadataAwareBlobStoreDAOContract { private static final int CHUNK_SIZE = 10240; @RegisterExtension diff --git a/upgrade-instructions.md b/upgrade-instructions.md index cc79b2954e..be43c4cfb8 100644 --- a/upgrade-instructions.md +++ b/upgrade-instructions.md @@ -15,10 +15,31 @@ Note: this section is in progress. It will be updated during all the development Changes to apply between 3.9.x and 3.10.0 will be reported here. Change list: + - [Adding metadata column to Cassandra blob tables](#adding-metadata-column-to-cassandra-blob-tables) - [Adding thread_id column to Cassandra email_query_view_sent_at and email_query_view_received_at tables](#adding-thread_id-column-to-cassandra-email_query_view_sent_at-and-email_query_view_received_at-tables) - [Adding thread_id column to Postgresql email_query_view table](#adding-thread_id-column-to-postgresql-email_query_view-table) - [Lucene mailbox index schema update for collapseThreads support](#lucene-mailbox-index-schema-update-for-collapsethreads-support) +### Adding metadata column to Cassandra blob tables + +Date: 14/04/2026 + +Concerned products: James products using Cassandra as blob storage + +James now persists blob metadata in the Cassandra blob descriptor tables: + +- `blobs.metadata` (`frozen<map<text, text>>`) +- `blobsInBucket.metadata` (`frozen<map<text, text>>`) + +Existing Cassandra deployments need to add these columns manually before a rolling upgrade. + +To add these columns, run the following CQL commands: + +```sql +ALTER TABLE james_keyspace.blobs ADD metadata frozen<map<text, text>>; +ALTER TABLE james_keyspace.blobsInBucket ADD metadata frozen<map<text, text>>; +``` + ### Lucene mailbox index schema update for collapseThreads support Date: 06/02/2026 --------------------------------------------------------------------- To unsubscribe, e-mail: [email protected] For additional commands, e-mail: [email protected]
