This is an automated email from the ASF dual-hosted git repository.

btellier pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/james-project.git

commit e5751b8b3ad6c430d926d2a3421de5873e8a62fc
Author: Quan Tran <[email protected]>
AuthorDate: Tue Apr 14 16:40:31 2026 +0700

    JAMES-4182 Implement blob metadata storage for Cassandra
---
 .../apache/james/blob/cassandra/BlobTables.java    |  2 +
 .../cassandra/CassandraBlobDataDefinition.java     |  7 +-
 .../blob/cassandra/CassandraBlobDescriptor.java    | 48 ++++++++++++
 .../blob/cassandra/CassandraBlobStoreDAO.java      | 90 +++++++++++-----------
 .../james/blob/cassandra/CassandraBucketDAO.java   | 36 +++++++--
 .../blob/cassandra/CassandraDefaultBucketDAO.java  | 36 +++++++--
 .../cassandra/CassandraBlobStoreClOneTest.java     |  8 +-
 .../blob/cassandra/CassandraBlobStoreDAOTest.java  |  3 +-
 upgrade-instructions.md                            | 21 +++++
 9 files changed, 188 insertions(+), 63 deletions(-)

diff --git 
a/server/blob/blob-cassandra/src/main/java/org/apache/james/blob/cassandra/BlobTables.java
 
b/server/blob/blob-cassandra/src/main/java/org/apache/james/blob/cassandra/BlobTables.java
index 11ca1d40f0..0b95e44dd5 100644
--- 
a/server/blob/blob-cassandra/src/main/java/org/apache/james/blob/cassandra/BlobTables.java
+++ 
b/server/blob/blob-cassandra/src/main/java/org/apache/james/blob/cassandra/BlobTables.java
@@ -27,6 +27,7 @@ public interface BlobTables {
         String TABLE_NAME = "blobs";
         CqlIdentifier ID = CqlIdentifier.fromCql("id");
         CqlIdentifier NUMBER_OF_CHUNK = CqlIdentifier.fromCql("position");
+        CqlIdentifier METADATA = CqlIdentifier.fromCql("metadata");
     }
 
     interface DefaultBucketBlobParts {
@@ -41,6 +42,7 @@ public interface BlobTables {
         CqlIdentifier BUCKET = CqlIdentifier.fromCql("bucket");
         CqlIdentifier ID = CqlIdentifier.fromCql("id");
         CqlIdentifier NUMBER_OF_CHUNK = CqlIdentifier.fromCql("position");
+        CqlIdentifier METADATA = CqlIdentifier.fromCql("metadata");
     }
 
     interface BucketBlobParts {
diff --git 
a/server/blob/blob-cassandra/src/main/java/org/apache/james/blob/cassandra/CassandraBlobDataDefinition.java
 
b/server/blob/blob-cassandra/src/main/java/org/apache/james/blob/cassandra/CassandraBlobDataDefinition.java
index d562b7e9c4..da21297bce 100644
--- 
a/server/blob/blob-cassandra/src/main/java/org/apache/james/blob/cassandra/CassandraBlobDataDefinition.java
+++ 
b/server/blob/blob-cassandra/src/main/java/org/apache/james/blob/cassandra/CassandraBlobDataDefinition.java
@@ -19,6 +19,7 @@
 
 package org.apache.james.blob.cassandra;
 
+import static com.datastax.oss.driver.api.core.type.DataTypes.TEXT;
 import static 
org.apache.james.blob.cassandra.BlobTables.DefaultBucketBlobParts.DATA;
 
 import org.apache.james.backends.cassandra.components.CassandraDataDefinition;
@@ -46,7 +47,8 @@ public interface CassandraBlobDataDefinition {
             "Messages` headers and bodies are stored as blobparts.")
         .statement(statement -> types -> statement
             .withPartitionKey(DefaultBucketBlobTable.ID, DataTypes.TEXT)
-            .withClusteringColumn(DefaultBucketBlobTable.NUMBER_OF_CHUNK, 
DataTypes.INT))
+            .withClusteringColumn(DefaultBucketBlobTable.NUMBER_OF_CHUNK, 
DataTypes.INT)
+            .withColumn(DefaultBucketBlobTable.METADATA, 
DataTypes.frozenMapOf(TEXT, TEXT)))
 
         .table(BucketBlobParts.TABLE_NAME)
         .comment("Holds blob parts composing blobs in a non-default bucket." +
@@ -63,7 +65,8 @@ public interface CassandraBlobDataDefinition {
         .statement(statement -> types -> statement
             .withPartitionKey(BucketBlobParts.BUCKET, DataTypes.TEXT)
             .withPartitionKey(BucketBlobParts.ID, DataTypes.TEXT)
-            .withClusteringColumn(BucketBlobTable.NUMBER_OF_CHUNK, 
DataTypes.INT))
+            .withClusteringColumn(BucketBlobTable.NUMBER_OF_CHUNK, 
DataTypes.INT)
+            .withColumn(BucketBlobTable.METADATA, DataTypes.frozenMapOf(TEXT, 
TEXT)))
 
         .build();
 }
diff --git 
a/server/blob/blob-cassandra/src/main/java/org/apache/james/blob/cassandra/CassandraBlobDescriptor.java
 
b/server/blob/blob-cassandra/src/main/java/org/apache/james/blob/cassandra/CassandraBlobDescriptor.java
new file mode 100644
index 0000000000..0618d55843
--- /dev/null
+++ 
b/server/blob/blob-cassandra/src/main/java/org/apache/james/blob/cassandra/CassandraBlobDescriptor.java
@@ -0,0 +1,48 @@
+/****************************************************************
+ * Licensed to the Apache Software Foundation (ASF) under one   *
+ * or more contributor license agreements.  See the NOTICE file *
+ * distributed with this work for additional information        *
+ * regarding copyright ownership.  The ASF licenses this file   *
+ * to you under the Apache License, Version 2.0 (the            *
+ * "License"); you may not use this file except in compliance   *
+ * with the License.  You may obtain a copy of the License at   *
+ *                                                              *
+ *   http://www.apache.org/licenses/LICENSE-2.0                 *
+ *                                                              *
+ * Unless required by applicable law or agreed to in writing,   *
+ * software distributed under the License is distributed on an  *
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY       *
+ * KIND, either express or implied.  See the License for the    *
+ * specific language governing permissions and limitations      *
+ * under the License.                                           *
+ ****************************************************************/
+
+package org.apache.james.blob.cassandra;
+
+import java.util.Map;
+
+import org.apache.james.blob.api.BlobStoreDAO.BlobMetadata;
+import org.apache.james.blob.api.BlobStoreDAO.BlobMetadataName;
+import org.apache.james.blob.api.BlobStoreDAO.BlobMetadataValue;
+
+import com.google.common.collect.ImmutableMap;
+
+record CassandraBlobDescriptor(int rowCount, BlobMetadata metadata) {
+    static CassandraBlobDescriptor from(int rowCount, Map<String, String> 
metadata) {
+        return new CassandraBlobDescriptor(rowCount, toBlobMetadata(metadata));
+    }
+
+    static Map<String, String> toRawMetadata(BlobMetadata metadata) {
+        return metadata.underlyingMap().entrySet().stream()
+            .collect(ImmutableMap.toImmutableMap(
+                entry -> entry.getKey().name(),
+                entry -> entry.getValue().value()));
+    }
+
+    private static BlobMetadata toBlobMetadata(Map<String, String> metadata) {
+        return new BlobMetadata(metadata.entrySet().stream()
+            .collect(ImmutableMap.toImmutableMap(
+                entry -> new BlobMetadataName(entry.getKey()),
+                entry -> new BlobMetadataValue(entry.getValue()))));
+    }
+}
diff --git 
a/server/blob/blob-cassandra/src/main/java/org/apache/james/blob/cassandra/CassandraBlobStoreDAO.java
 
b/server/blob/blob-cassandra/src/main/java/org/apache/james/blob/cassandra/CassandraBlobStoreDAO.java
index 5afd769d41..4083007773 100644
--- 
a/server/blob/blob-cassandra/src/main/java/org/apache/james/blob/cassandra/CassandraBlobStoreDAO.java
+++ 
b/server/blob/blob-cassandra/src/main/java/org/apache/james/blob/cassandra/CassandraBlobStoreDAO.java
@@ -98,58 +98,63 @@ public class CassandraBlobStoreDAO implements BlobStoreDAO {
 
     @Override
     public InputStreamBlob read(BucketName bucketName, BlobId blobId) throws 
ObjectStoreIOException, ObjectNotFoundException {
-        return 
InputStreamBlob.of(ReactorUtils.toInputStream(readBlobParts(bucketName, 
blobId)));
+        return Mono.from(readBlobDescriptor(bucketName, blobId))
+            .map(blobDescriptor -> 
InputStreamBlob.of(ReactorUtils.toInputStream(readBlobParts(bucketName, blobId, 
blobDescriptor.rowCount())), blobDescriptor.metadata()))
+            .block();
     }
 
     @Override
     public Publisher<InputStreamBlob> readReactive(BucketName bucketName, 
BlobId blobId) {
-        return Mono.just(read(bucketName, blobId));
+        return Mono.from(readBlobDescriptor(bucketName, blobId))
+            .publishOn(ReactorUtils.BLOCKING_CALL_WRAPPER)
+            .map(blobDescriptor -> 
InputStreamBlob.of(ReactorUtils.toInputStream(readBlobParts(bucketName, blobId, 
blobDescriptor.rowCount())), blobDescriptor.metadata()));
     }
 
     @Override
     public Publisher<BytesBlob> readBytes(BucketName bucketName, BlobId 
blobId) {
-        return readBlobParts(bucketName, blobId)
-            .collectList()
-            .map(this::byteBuffersToBytesArray)
-            .map(BytesBlob::of);
+        return Mono.from(readBlobDescriptor(bucketName, blobId))
+            .flatMap(blobDescriptor -> readBlobParts(bucketName, blobId, 
blobDescriptor.rowCount())
+                .collectList()
+                .map(this::byteBuffersToBytesArray)
+                .map(bytes -> BytesBlob.of(bytes, blobDescriptor.metadata())));
     }
 
     @Override
     public Publisher<Void> save(BucketName bucketName, BlobId blobId, Blob 
blob) {
         return switch (blob) {
-            case BytesBlob bytesBlob -> save(bucketName, blobId, 
bytesBlob.payload());
-            case InputStreamBlob inputStreamBlob -> save(bucketName, blobId, 
inputStreamBlob.payload());
-            case ByteSourceBlob byteSourceBlob -> save(bucketName, blobId, 
byteSourceBlob.payload());
+            case BytesBlob bytesBlob -> save(bucketName, blobId, 
bytesBlob.payload(), bytesBlob.metadata());
+            case InputStreamBlob inputStreamBlob -> save(bucketName, blobId, 
inputStreamBlob.payload(), inputStreamBlob.metadata());
+            case ByteSourceBlob byteSourceBlob -> save(bucketName, blobId, 
byteSourceBlob.payload(), byteSourceBlob.metadata());
         };
     }
 
-    public Mono<Void> save(BucketName bucketName, BlobId blobId, byte[] data) {
+    private Mono<Void> save(BucketName bucketName, BlobId blobId, byte[] data, 
BlobMetadata metadata) {
         Preconditions.checkNotNull(data);
 
         return Mono.fromCallable(() -> DataChunker.chunk(data, 
configuration.getBlobPartSize()))
-            .flatMap(chunks -> save(bucketName, blobId, chunks));
+            .flatMap(chunks -> save(bucketName, blobId, chunks, metadata));
     }
 
-    public Mono<Void> save(BucketName bucketName, BlobId blobId, InputStream 
inputStream) {
+    private Mono<Void> save(BucketName bucketName, BlobId blobId, InputStream 
inputStream, BlobMetadata metadata) {
         Preconditions.checkNotNull(bucketName);
         Preconditions.checkNotNull(inputStream);
 
         return Mono.fromCallable(() -> ReactorUtils.toChunks(inputStream, 
configuration.getBlobPartSize())
                 .subscribeOn(Schedulers.boundedElastic()))
-            .flatMap(chunks -> save(bucketName, blobId, chunks))
-            .onErrorMap(e -> new ObjectStoreIOException("Exception occurred 
while saving input stream", e));
+            .flatMap(chunks -> save(bucketName, blobId, chunks, metadata))
+            .onErrorMap(Throwable.class, e -> new 
ObjectStoreIOException("Exception occurred while saving input stream", e));
     }
 
-    public Mono<Void> save(BucketName bucketName, BlobId blobId, ByteSource 
content) {
+    private Mono<Void> save(BucketName bucketName, BlobId blobId, ByteSource 
content, BlobMetadata metadata) {
         return Mono.using(content::openBufferedStream,
-            stream -> save(bucketName, blobId, stream),
+            stream -> save(bucketName, blobId, stream, metadata),
             Throwing.consumer(InputStream::close).sneakyThrow(),
             LAZY);
     }
 
-    private Mono<Void> save(BucketName bucketName, BlobId blobId, 
Flux<ByteBuffer> chunksAsFlux) {
+    private Mono<Void> save(BucketName bucketName, BlobId blobId, 
Flux<ByteBuffer> chunksAsFlux, BlobMetadata metadata) {
         return saveBlobParts(bucketName, blobId, chunksAsFlux)
-            .flatMap(numberOfChunk -> saveBlobPartReference(bucketName, 
blobId, numberOfChunk));
+            .flatMap(numberOfChunk -> saveBlobPartReference(bucketName, 
blobId, numberOfChunk, metadata));
     }
 
     private Mono<Integer> saveBlobParts(BucketName bucketName, BlobId blobId, 
Flux<ByteBuffer> chunksAsFlux) {
@@ -171,11 +176,11 @@ public class CassandraBlobStoreDAO implements 
BlobStoreDAO {
         return write.thenReturn(anyNonEmptyValue);
     }
 
-    private Mono<Void> saveBlobPartReference(BucketName bucketName, BlobId 
blobId, Integer numberOfChunk) {
+    private Mono<Void> saveBlobPartReference(BucketName bucketName, BlobId 
blobId, Integer numberOfChunk, BlobMetadata metadata) {
         if (isDefaultBucket(bucketName)) {
-            return defaultBucketDAO.saveBlobPartsReferences(blobId, 
numberOfChunk);
+            return defaultBucketDAO.saveBlobPartsReferences(blobId, 
numberOfChunk, metadata);
         } else {
-            return bucketDAO.saveBlobPartsReferences(bucketName, blobId, 
numberOfChunk);
+            return bucketDAO.saveBlobPartsReferences(bucketName, blobId, 
numberOfChunk, metadata);
         }
     }
 
@@ -240,43 +245,40 @@ public class CassandraBlobStoreDAO implements 
BlobStoreDAO {
         }
     }
 
-    private Mono<Integer> selectRowCount(BucketName bucketName, BlobId blobId) 
{
-        if (configuration.isOptimisticConsistencyLevel()) {
-            return selectRowCountClOne(bucketName, blobId)
+    private Mono<CassandraBlobDescriptor> readBlobDescriptor(BucketName 
bucketName, BlobId blobId) {
+        Mono<CassandraBlobDescriptor> descriptor = 
configuration.isOptimisticConsistencyLevel()
+            ? readBlobDescriptorClOne(bucketName, blobId)
                 .doOnNext(any -> metricClOneHitCount.increment())
                 
.switchIfEmpty(Mono.fromRunnable(metricClOneMissCount::increment)
-                    .then(selectRowCountClDefault(bucketName, blobId)));
-        } else {
-            return selectRowCountClDefault(bucketName, blobId);
-        }
+                    .then(readBlobDescriptorClDefault(bucketName, blobId)))
+            : readBlobDescriptorClDefault(bucketName, blobId);
+
+        return descriptor.switchIfEmpty(Mono.error(() ->
+            new ObjectNotFoundException(String.format("Could not retrieve blob 
metadata for %s", blobId))));
     }
 
-    private Mono<Integer> selectRowCountClOne(BucketName bucketName, BlobId 
blobId) {
+    private Mono<CassandraBlobDescriptor> readBlobDescriptorClOne(BucketName 
bucketName, BlobId blobId) {
         if (isDefaultBucket(bucketName)) {
-            return defaultBucketDAO.selectRowCountClOne(blobId);
+            return defaultBucketDAO.selectBlobDescriptorClOne(blobId);
         } else {
-            return bucketDAO.selectRowCountClOne(bucketName, blobId);
+            return bucketDAO.selectBlobDescriptorClOne(bucketName, blobId);
         }
     }
 
-    private Mono<Integer> selectRowCountClDefault(BucketName bucketName, 
BlobId blobId) {
+    private Mono<CassandraBlobDescriptor> 
readBlobDescriptorClDefault(BucketName bucketName, BlobId blobId) {
         if (isDefaultBucket(bucketName)) {
-            return defaultBucketDAO.selectRowCount(blobId);
+            return defaultBucketDAO.selectBlobDescriptor(blobId);
         } else {
-            return bucketDAO.selectRowCount(bucketName, blobId);
+            return bucketDAO.selectBlobDescriptor(bucketName, blobId);
         }
     }
 
-    private Flux<ByteBuffer> readBlobParts(BucketName bucketName, BlobId 
blobId) {
-        return selectRowCount(bucketName, blobId)
-            .single()
-            .onErrorMap(NoSuchElementException.class, e ->
-                new ObjectNotFoundException(String.format("Could not retrieve 
blob metadata for %s", blobId)))
-            .flatMapMany(rowCount -> Flux.range(0, rowCount)
-                .concatMap(partIndex -> readPart(bucketName, blobId, partIndex)
-                    .single()
-                    .onErrorMap(NoSuchElementException.class, e ->
-                        new ObjectNotFoundException(String.format("Missing 
blob part for blobId %s and position %d", blobId.asString(), partIndex)))));
+    private Flux<ByteBuffer> readBlobParts(BucketName bucketName, BlobId 
blobId, int rowCount) {
+        return Flux.range(0, rowCount)
+            .concatMap(partIndex -> readPart(bucketName, blobId, partIndex)
+                .single()
+                .onErrorMap(NoSuchElementException.class, e ->
+                    new ObjectNotFoundException(String.format("Missing blob 
part for blobId %s and position %d", blobId.asString(), partIndex))));
     }
 
     private byte[] byteBuffersToBytesArray(List<ByteBuffer> byteBuffers) {
diff --git 
a/server/blob/blob-cassandra/src/main/java/org/apache/james/blob/cassandra/CassandraBucketDAO.java
 
b/server/blob/blob-cassandra/src/main/java/org/apache/james/blob/cassandra/CassandraBucketDAO.java
index cab12525fc..69e5eed7bf 100644
--- 
a/server/blob/blob-cassandra/src/main/java/org/apache/james/blob/cassandra/CassandraBucketDAO.java
+++ 
b/server/blob/blob-cassandra/src/main/java/org/apache/james/blob/cassandra/CassandraBucketDAO.java
@@ -25,9 +25,12 @@ import static 
com.datastax.oss.driver.api.querybuilder.QueryBuilder.insertInto;
 import static com.datastax.oss.driver.api.querybuilder.QueryBuilder.selectFrom;
 import static 
org.apache.james.blob.cassandra.BlobTables.BucketBlobTable.BUCKET;
 import static org.apache.james.blob.cassandra.BlobTables.BucketBlobTable.ID;
+import static 
org.apache.james.blob.cassandra.BlobTables.BucketBlobTable.METADATA;
 import static 
org.apache.james.blob.cassandra.BlobTables.BucketBlobTable.NUMBER_OF_CHUNK;
 
 import java.nio.ByteBuffer;
+import java.util.Map;
+import java.util.Optional;
 
 import jakarta.inject.Inject;
 
@@ -35,6 +38,7 @@ import org.apache.commons.lang3.tuple.Pair;
 import 
org.apache.james.backends.cassandra.init.configuration.JamesExecutionProfiles;
 import org.apache.james.backends.cassandra.utils.CassandraAsyncExecutor;
 import org.apache.james.blob.api.BlobId;
+import org.apache.james.blob.api.BlobStoreDAO.BlobMetadata;
 import org.apache.james.blob.api.BucketName;
 import org.apache.james.blob.cassandra.BlobTables.BucketBlobParts;
 
@@ -71,6 +75,7 @@ public class CassandraBucketDAO {
             .value(BUCKET, bindMarker(BUCKET))
             .value(ID, bindMarker(ID))
             .value(NUMBER_OF_CHUNK, bindMarker(NUMBER_OF_CHUNK))
+            .value(METADATA, bindMarker(METADATA))
             .build());
 
         this.insertPart = 
session.prepare(insertInto(BucketBlobParts.TABLE_NAME)
@@ -123,29 +128,39 @@ public class CassandraBucketDAO {
                 .setByteBuffer(BucketBlobParts.DATA, data));
     }
 
-    Mono<Void> saveBlobPartsReferences(BucketName bucketName, BlobId blobId, 
int numberOfChunk) {
+    Mono<Void> saveBlobPartsReferences(BucketName bucketName, BlobId blobId, 
int numberOfChunk, BlobMetadata metadata) {
         return cassandraAsyncExecutor.executeVoid(
             insert.bind()
                 .setString(BUCKET, bucketName.asString())
                 .setString(ID, blobId.asString())
-                .setInt(NUMBER_OF_CHUNK, numberOfChunk));
+                .setInt(NUMBER_OF_CHUNK, numberOfChunk)
+                .setMap(METADATA, 
CassandraBlobDescriptor.toRawMetadata(metadata), String.class, String.class));
     }
 
-    Mono<Integer> selectRowCount(BucketName bucketName, BlobId blobId) {
+    Mono<Void> saveBlobPartsReferences(BucketName bucketName, BlobId blobId, 
int numberOfChunk) {
+        return saveBlobPartsReferences(bucketName, blobId, numberOfChunk, 
BlobMetadata.empty());
+    }
+
+    Mono<CassandraBlobDescriptor> selectBlobDescriptor(BucketName bucketName, 
BlobId blobId) {
         return cassandraAsyncExecutor.executeSingleRow(
                 select.bind()
                     .setString(BUCKET, bucketName.asString())
                     .setString(ID, blobId.asString()))
-            .map(row -> row.getInt(NUMBER_OF_CHUNK));
+            .map(this::rowToBlobDescriptor);
+    }
+
+    Mono<Integer> selectRowCount(BucketName bucketName, BlobId blobId) {
+        return selectBlobDescriptor(bucketName, blobId)
+            .map(CassandraBlobDescriptor::rowCount);
     }
 
-    Mono<Integer> selectRowCountClOne(BucketName bucketName, BlobId blobId) {
+    Mono<CassandraBlobDescriptor> selectBlobDescriptorClOne(BucketName 
bucketName, BlobId blobId) {
         return cassandraAsyncExecutor.executeSingleRow(
                 select.bind()
                     .setString(BUCKET, bucketName.asString())
                     .setString(ID, blobId.asString())
                     .setExecutionProfile(optimisticConsistencyLevelProfile))
-            .map(row -> row.getInt(NUMBER_OF_CHUNK));
+            .map(this::rowToBlobDescriptor);
     }
 
     Mono<ByteBuffer> readPart(BucketName bucketName, BlobId blobId, int 
position) {
@@ -195,4 +210,13 @@ public class CassandraBucketDAO {
     private ByteBuffer rowToData(Row row) {
         return row.getByteBuffer(BucketBlobParts.DATA);
     }
+
+    private CassandraBlobDescriptor rowToBlobDescriptor(Row row) {
+        return CassandraBlobDescriptor.from(row.getInt(NUMBER_OF_CHUNK), 
metadata(row));
+    }
+
+    private Map<String, String> metadata(Row row) {
+        return Optional.ofNullable(row.getMap(METADATA, String.class, 
String.class))
+            .orElseGet(Map::of);
+    }
 }
diff --git 
a/server/blob/blob-cassandra/src/main/java/org/apache/james/blob/cassandra/CassandraDefaultBucketDAO.java
 
b/server/blob/blob-cassandra/src/main/java/org/apache/james/blob/cassandra/CassandraDefaultBucketDAO.java
index ee2baa6389..316b52e6a7 100644
--- 
a/server/blob/blob-cassandra/src/main/java/org/apache/james/blob/cassandra/CassandraDefaultBucketDAO.java
+++ 
b/server/blob/blob-cassandra/src/main/java/org/apache/james/blob/cassandra/CassandraDefaultBucketDAO.java
@@ -24,15 +24,19 @@ import static 
com.datastax.oss.driver.api.querybuilder.QueryBuilder.deleteFrom;
 import static com.datastax.oss.driver.api.querybuilder.QueryBuilder.insertInto;
 import static com.datastax.oss.driver.api.querybuilder.QueryBuilder.selectFrom;
 import static 
org.apache.james.blob.cassandra.BlobTables.DefaultBucketBlobTable.ID;
+import static 
org.apache.james.blob.cassandra.BlobTables.DefaultBucketBlobTable.METADATA;
 import static 
org.apache.james.blob.cassandra.BlobTables.DefaultBucketBlobTable.NUMBER_OF_CHUNK;
 
 import java.nio.ByteBuffer;
+import java.util.Map;
+import java.util.Optional;
 
 import jakarta.inject.Inject;
 
 import 
org.apache.james.backends.cassandra.init.configuration.JamesExecutionProfiles;
 import org.apache.james.backends.cassandra.utils.CassandraAsyncExecutor;
 import org.apache.james.blob.api.BlobId;
+import org.apache.james.blob.api.BlobStoreDAO.BlobMetadata;
 import org.apache.james.blob.cassandra.BlobTables.DefaultBucketBlobParts;
 
 import com.datastax.oss.driver.api.core.CqlSession;
@@ -65,6 +69,7 @@ public class CassandraDefaultBucketDAO {
         this.insert = 
session.prepare(insertInto(BlobTables.DefaultBucketBlobTable.TABLE_NAME)
             .value(ID, bindMarker(ID))
             .value(NUMBER_OF_CHUNK, bindMarker(NUMBER_OF_CHUNK))
+            .value(METADATA, bindMarker(METADATA))
             .build());
 
         this.select = 
session.prepare(selectFrom(BlobTables.DefaultBucketBlobTable.TABLE_NAME)
@@ -107,26 +112,36 @@ public class CassandraDefaultBucketDAO {
                 .setByteBuffer(DefaultBucketBlobParts.DATA, data));
     }
 
-    Mono<Void> saveBlobPartsReferences(BlobId blobId, int numberOfChunk) {
+    Mono<Void> saveBlobPartsReferences(BlobId blobId, int numberOfChunk, 
BlobMetadata metadata) {
         return cassandraAsyncExecutor.executeVoid(
             insert.bind()
                 .setString(ID, blobId.asString())
-                .setInt(NUMBER_OF_CHUNK, numberOfChunk));
+                .setInt(NUMBER_OF_CHUNK, numberOfChunk)
+                .setMap(METADATA, 
CassandraBlobDescriptor.toRawMetadata(metadata), String.class, String.class));
     }
 
-    Mono<Integer> selectRowCount(BlobId blobId) {
+    Mono<Void> saveBlobPartsReferences(BlobId blobId, int numberOfChunk) {
+        return saveBlobPartsReferences(blobId, numberOfChunk, 
BlobMetadata.empty());
+    }
+
+    Mono<CassandraBlobDescriptor> selectBlobDescriptor(BlobId blobId) {
         return cassandraAsyncExecutor.executeSingleRow(
                 select.bind()
                     .setString(ID, blobId.asString()))
-            .map(row -> row.getInt(NUMBER_OF_CHUNK));
+            .map(this::rowToBlobDescriptor);
+    }
+
+    Mono<Integer> selectRowCount(BlobId blobId) {
+        return selectBlobDescriptor(blobId)
+            .map(CassandraBlobDescriptor::rowCount);
     }
 
-    Mono<Integer> selectRowCountClOne(BlobId blobId) {
+    Mono<CassandraBlobDescriptor> selectBlobDescriptorClOne(BlobId blobId) {
         return cassandraAsyncExecutor.executeSingleRow(
                 select.bind()
                     .setString(ID, blobId.asString())
                     .setExecutionProfile(optimisticConsistencyLevelProfile))
-            .map(row -> row.getInt(NUMBER_OF_CHUNK));
+            .map(this::rowToBlobDescriptor);
     }
 
     Mono<ByteBuffer> readPart(BlobId blobId, int position) {
@@ -166,4 +181,13 @@ public class CassandraDefaultBucketDAO {
     private ByteBuffer rowToData(Row row) {
         return row.getByteBuffer(DefaultBucketBlobParts.DATA);
     }
+
+    private CassandraBlobDescriptor rowToBlobDescriptor(Row row) {
+        return CassandraBlobDescriptor.from(row.getInt(NUMBER_OF_CHUNK), 
metadata(row));
+    }
+
+    private Map<String, String> metadata(Row row) {
+        return Optional.ofNullable(row.getMap(METADATA, String.class, 
String.class))
+            .orElseGet(Map::of);
+    }
 }
diff --git 
a/server/blob/blob-cassandra/src/test/java/org/apache/james/blob/cassandra/CassandraBlobStoreClOneTest.java
 
b/server/blob/blob-cassandra/src/test/java/org/apache/james/blob/cassandra/CassandraBlobStoreClOneTest.java
index cc58a313d1..5cecf8a4c7 100644
--- 
a/server/blob/blob-cassandra/src/test/java/org/apache/james/blob/cassandra/CassandraBlobStoreClOneTest.java
+++ 
b/server/blob/blob-cassandra/src/test/java/org/apache/james/blob/cassandra/CassandraBlobStoreClOneTest.java
@@ -137,7 +137,7 @@ class CassandraBlobStoreClOneTest implements 
CassandraBlobStoreContract, Dedupli
         String longString = Strings.repeat("0123456789\n", repeatCount);
         BlobId blobId = 
Mono.from(testee().save(testee().getDefaultBucketName(), longString, 
LOW_COST)).block();
 
-        
when(defaultBucketDAO().selectRowCountClOne(blobId)).thenReturn(Mono.empty());
+        
when(defaultBucketDAO().selectBlobDescriptorClOne(blobId)).thenReturn(Mono.empty());
 
         String data = 
IOUtils.toString(testee().read(testee().getDefaultBucketName(), blobId), 
StandardCharsets.UTF_8);
 
@@ -150,7 +150,7 @@ class CassandraBlobStoreClOneTest implements 
CassandraBlobStoreContract, Dedupli
         String longString = Strings.repeat("0123456789\n", repeatCount);
         BlobId blobId = 
Mono.from(testee().save(testee().getDefaultBucketName(), longString, 
LOW_COST)).block();
 
-        
when(defaultBucketDAO().selectRowCountClOne(blobId)).thenReturn(Mono.empty());
+        
when(defaultBucketDAO().selectBlobDescriptorClOne(blobId)).thenReturn(Mono.empty());
 
         byte[] bytes = 
Mono.from(testee().readBytes(testee().getDefaultBucketName(), blobId)).block();
 
@@ -213,7 +213,7 @@ class CassandraBlobStoreClOneTest implements 
CassandraBlobStoreContract, Dedupli
 
         BlobId blobId = Mono.from(store.save(store.getDefaultBucketName(), 
BYTES_CONTENT, LOW_COST)).block();
 
-        
when(defaultBucketDAO().selectRowCountClOne(blobId)).thenReturn(Mono.empty());
+        
when(defaultBucketDAO().selectBlobDescriptorClOne(blobId)).thenReturn(Mono.empty());
         store.read(store.getDefaultBucketName(), blobId);
 
         when(defaultBucketDAO().readPartClOne(blobId, 
1)).thenReturn(Mono.empty());
@@ -230,7 +230,7 @@ class CassandraBlobStoreClOneTest implements 
CassandraBlobStoreContract, Dedupli
 
         BlobId blobId = Mono.from(store.save(store.getDefaultBucketName(), 
BYTES_CONTENT, LOW_COST)).block();
 
-        
when(defaultBucketDAO().selectRowCountClOne(blobId)).thenReturn(Mono.empty());
+        
when(defaultBucketDAO().selectBlobDescriptorClOne(blobId)).thenReturn(Mono.empty());
         Mono.from(store.readBytes(store.getDefaultBucketName(), 
blobId)).block();
 
         when(defaultBucketDAO().readPartClOne(blobId, 
1)).thenReturn(Mono.empty());
diff --git 
a/server/blob/blob-cassandra/src/test/java/org/apache/james/blob/cassandra/CassandraBlobStoreDAOTest.java
 
b/server/blob/blob-cassandra/src/test/java/org/apache/james/blob/cassandra/CassandraBlobStoreDAOTest.java
index 4c3ba2c44f..9e829c456b 100644
--- 
a/server/blob/blob-cassandra/src/test/java/org/apache/james/blob/cassandra/CassandraBlobStoreDAOTest.java
+++ 
b/server/blob/blob-cassandra/src/test/java/org/apache/james/blob/cassandra/CassandraBlobStoreDAOTest.java
@@ -25,13 +25,14 @@ import 
org.apache.james.backends.cassandra.init.configuration.CassandraConfigura
 import org.apache.james.blob.api.BlobStoreDAO;
 import org.apache.james.blob.api.BlobStoreDAOContract;
 import org.apache.james.blob.api.BucketName;
+import org.apache.james.blob.api.MetadataAwareBlobStoreDAOContract;
 import org.apache.james.blob.api.TestBlobId;
 import org.apache.james.metrics.tests.RecordingMetricFactory;
 import org.junit.jupiter.api.BeforeEach;
 import org.junit.jupiter.api.Disabled;
 import org.junit.jupiter.api.extension.RegisterExtension;
 
-public class CassandraBlobStoreDAOTest implements BlobStoreDAOContract {
+public class CassandraBlobStoreDAOTest implements BlobStoreDAOContract, 
MetadataAwareBlobStoreDAOContract {
     private static final int CHUNK_SIZE = 10240;
 
     @RegisterExtension
diff --git a/upgrade-instructions.md b/upgrade-instructions.md
index cc79b2954e..be43c4cfb8 100644
--- a/upgrade-instructions.md
+++ b/upgrade-instructions.md
@@ -15,10 +15,31 @@ Note: this section is in progress. It will be updated 
during all the development
 Changes to apply between 3.9.x and 3.10.0 will be reported here.
 
 Change list:
+ - [Adding metadata column to Cassandra blob 
tables](#adding-metadata-column-to-cassandra-blob-tables)
  - [Adding thread_id column to Cassandra email_query_view_sent_at and 
email_query_view_received_at 
tables](#adding-thread_id-column-to-cassandra-email_query_view_sent_at-and-email_query_view_received_at-tables)
  - [Adding thread_id column to Postgresql email_query_view 
table](#adding-thread_id-column-to-postgresql-email_query_view-table)
  - [Lucene mailbox index schema update for collapseThreads 
support](#lucene-mailbox-index-schema-update-for-collapsethreads-support)
 
+### Adding metadata column to Cassandra blob tables
+
+Date: 14/04/2026
+
+Concerned products: James products using Cassandra as blob storage
+
+James now persists blob metadata in the Cassandra blob descriptor tables:
+
+- `blobs.metadata` (`frozen<map<text, text>>`)
+- `blobsInBucket.metadata` (`frozen<map<text, text>>`)
+
+Existing Cassandra deployments need to add these columns manually before a 
rolling upgrade.
+
+To add these columns, run the following CQL commands:
+
+```sql
+ALTER TABLE james_keyspace.blobs ADD metadata frozen<map<text, text>>;
+ALTER TABLE james_keyspace.blobsInBucket ADD metadata frozen<map<text, text>>;
+```
+
 ### Lucene mailbox index schema update for collapseThreads support
 
 Date: 06/02/2026


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to