This is an automated email from the ASF dual-hosted git repository.

hqtran pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/james-project.git


The following commit(s) were added to refs/heads/master by this push:
     new 0cbe36b20e JAMES-4182 Implement blob metadata storage for Postgres
0cbe36b20e is described below

commit 0cbe36b20e4d5534d2518bbd47dc06c4d74c296b
Author: Quan Tran <[email protected]>
AuthorDate: Wed Apr 15 16:34:19 2026 +0700

    JAMES-4182 Implement blob metadata storage for Postgres
---
 .../PostgresBlobStorageDataDefinition.java         |  4 ++
 .../james/blob/postgres/PostgresBlobStoreDAO.java  | 50 ++++++++++++++++------
 .../blob/postgres/PostgresBlobStoreDAOTest.java    |  3 +-
 upgrade-instructions.md                            | 16 +++++++
 4 files changed, 58 insertions(+), 15 deletions(-)

diff --git 
a/server/blob/blob-postgres/src/main/java/org/apache/james/blob/postgres/PostgresBlobStorageDataDefinition.java
 
b/server/blob/blob-postgres/src/main/java/org/apache/james/blob/postgres/PostgresBlobStorageDataDefinition.java
index 51b2c3a034..9787d9c43b 100644
--- 
a/server/blob/blob-postgres/src/main/java/org/apache/james/blob/postgres/PostgresBlobStorageDataDefinition.java
+++ 
b/server/blob/blob-postgres/src/main/java/org/apache/james/blob/postgres/PostgresBlobStorageDataDefinition.java
@@ -19,6 +19,7 @@
 
 package org.apache.james.blob.postgres;
 
+import static 
org.apache.james.backends.postgres.PostgresCommons.DataTypes.HSTORE;
 import static 
org.apache.james.blob.postgres.PostgresBlobStorageDataDefinition.PostgresBlobStorageTable.BUCKET_NAME_INDEX;
 import static org.jooq.impl.SQLDataType.BLOB;
 
@@ -30,6 +31,7 @@ import org.jooq.Record;
 import org.jooq.Table;
 import org.jooq.impl.DSL;
 import org.jooq.impl.SQLDataType;
+import org.jooq.postgres.extensions.types.Hstore;
 
 public interface PostgresBlobStorageDataDefinition {
     interface PostgresBlobStorageTable {
@@ -39,6 +41,7 @@ public interface PostgresBlobStorageDataDefinition {
         Field<String> BLOB_ID = DSL.field("blob_id", 
SQLDataType.VARCHAR(200).notNull());
         Field<byte[]> DATA = DSL.field("data", BLOB.notNull());
         Field<Integer> SIZE = DSL.field("size", SQLDataType.INTEGER.notNull());
+        Field<Hstore> METADATA = DSL.field("metadata", HSTORE);
 
         PostgresTable TABLE = PostgresTable.name(TABLE_NAME.getName())
             .createTableStep(((dsl, tableName) -> 
dsl.createTableIfNotExists(tableName)
@@ -46,6 +49,7 @@ public interface PostgresBlobStorageDataDefinition {
                 .column(BLOB_ID)
                 .column(DATA)
                 .column(SIZE)
+                .column(METADATA)
                 .constraint(DSL.primaryKey(BUCKET_NAME, BLOB_ID))))
             .disableRowLevelSecurity()
             .build();
diff --git 
a/server/blob/blob-postgres/src/main/java/org/apache/james/blob/postgres/PostgresBlobStoreDAO.java
 
b/server/blob/blob-postgres/src/main/java/org/apache/james/blob/postgres/PostgresBlobStoreDAO.java
index 1eb1a9e4d8..fc20507a36 100644
--- 
a/server/blob/blob-postgres/src/main/java/org/apache/james/blob/postgres/PostgresBlobStoreDAO.java
+++ 
b/server/blob/blob-postgres/src/main/java/org/apache/james/blob/postgres/PostgresBlobStoreDAO.java
@@ -22,6 +22,7 @@ package org.apache.james.blob.postgres;
 import static 
org.apache.james.blob.postgres.PostgresBlobStorageDataDefinition.PostgresBlobStorageTable.BLOB_ID;
 import static 
org.apache.james.blob.postgres.PostgresBlobStorageDataDefinition.PostgresBlobStorageTable.BUCKET_NAME;
 import static 
org.apache.james.blob.postgres.PostgresBlobStorageDataDefinition.PostgresBlobStorageTable.DATA;
+import static 
org.apache.james.blob.postgres.PostgresBlobStorageDataDefinition.PostgresBlobStorageTable.METADATA;
 import static 
org.apache.james.blob.postgres.PostgresBlobStorageDataDefinition.PostgresBlobStorageTable.SIZE;
 import static 
org.apache.james.blob.postgres.PostgresBlobStorageDataDefinition.PostgresBlobStorageTable.TABLE_NAME;
 
@@ -29,6 +30,7 @@ import java.io.IOException;
 import java.io.InputStream;
 import java.util.Collection;
 import java.util.List;
+import java.util.Map;
 import java.util.Optional;
 import java.util.function.Function;
 
@@ -43,10 +45,12 @@ import org.apache.james.blob.api.BucketName;
 import org.apache.james.blob.api.ObjectNotFoundException;
 import org.apache.james.blob.api.ObjectStoreIOException;
 import org.jooq.impl.DSL;
+import org.jooq.postgres.extensions.types.Hstore;
 import org.reactivestreams.Publisher;
 
 import com.google.common.base.Preconditions;
 import com.google.common.collect.ImmutableList;
+import com.google.common.collect.ImmutableMap;
 import com.google.common.io.ByteSource;
 
 import reactor.core.publisher.Flux;
@@ -76,40 +80,41 @@ public class PostgresBlobStoreDAO implements BlobStoreDAO {
 
     @Override
     public Publisher<BytesBlob> readBytes(BucketName bucketName, BlobId 
blobId) {
-        return postgresExecutor.executeRow(dsl -> Mono.from(dsl.select(DATA)
+        return postgresExecutor.executeRow(dsl -> Mono.from(dsl.select(DATA, 
METADATA)
                 .from(TABLE_NAME)
                 .where(BUCKET_NAME.eq(bucketName.asString()))
                 .and(BLOB_ID.eq(blobId.asString()))))
-            .map(record -> record.get(DATA))
             .switchIfEmpty(Mono.error(() -> new ObjectNotFoundException("Blob 
" + blobId + " does not exist in bucket " + bucketName)))
-            .map(BytesBlob::of);
+            .map(record -> BytesBlob.of(record.get(DATA), 
asBlobMetadata(record.get(METADATA))));
     }
 
     @Override
     public Publisher<Void> save(BucketName bucketName, BlobId blobId, Blob 
blob) {
         return switch (blob) {
-            case BytesBlob bytesBlob -> save(bucketName, blobId, 
bytesBlob.payload());
-            case InputStreamBlob inputStreamBlob -> save(bucketName, blobId, 
inputStreamBlob.payload());
-            case ByteSourceBlob byteSourceBlob -> save(bucketName, blobId, 
byteSourceBlob.payload());
+            case BytesBlob bytesBlob -> save(bucketName, blobId, 
bytesBlob.payload(), bytesBlob.metadata());
+            case InputStreamBlob inputStreamBlob -> save(bucketName, blobId, 
inputStreamBlob.payload(), inputStreamBlob.metadata());
+            case ByteSourceBlob byteSourceBlob -> save(bucketName, blobId, 
byteSourceBlob.payload(), byteSourceBlob.metadata());
         };
     }
 
-    public Mono<Void> save(BucketName bucketName, BlobId blobId, byte[] data) {
+    private Mono<Void> save(BucketName bucketName, BlobId blobId, byte[] data, 
BlobMetadata metadata) {
         Preconditions.checkNotNull(data);
 
         return postgresExecutor.executeVoid(dslContext ->
-            Mono.from(dslContext.insertInto(TABLE_NAME, BUCKET_NAME, BLOB_ID, 
DATA, SIZE)
+            Mono.from(dslContext.insertInto(TABLE_NAME, BUCKET_NAME, BLOB_ID, 
DATA, SIZE, METADATA)
                 .values(bucketName.asString(),
                     blobId.asString(),
                     data,
-                    data.length)
+                    data.length,
+                    asHstore(metadata))
                 .onConflict(BUCKET_NAME, BLOB_ID)
                 .doUpdate()
                 .set(DATA, data)
-                .set(SIZE, data.length)));
+                .set(SIZE, data.length)
+                .set(METADATA, asHstore(metadata))));
     }
 
-    public Mono<Void> save(BucketName bucketName, BlobId blobId, InputStream 
inputStream) {
+    private Mono<Void> save(BucketName bucketName, BlobId blobId, InputStream 
inputStream, BlobMetadata metadata) {
         Preconditions.checkNotNull(inputStream);
 
         return Mono.fromCallable(() -> {
@@ -118,17 +123,17 @@ public class PostgresBlobStoreDAO implements BlobStoreDAO 
{
             } catch (IOException e) {
                 throw new ObjectStoreIOException("IOException occurred", e);
             }
-        }).flatMap(bytes -> save(bucketName, blobId, bytes));
+        }).flatMap(bytes -> save(bucketName, blobId, bytes, metadata));
     }
 
-    public Mono<Void> save(BucketName bucketName, BlobId blobId, ByteSource 
content) {
+    private Mono<Void> save(BucketName bucketName, BlobId blobId, ByteSource 
content, BlobMetadata metadata) {
         return Mono.fromCallable(() -> {
             try {
                 return content.read();
             } catch (IOException e) {
                 throw new ObjectStoreIOException("IOException occurred", e);
             }
-        }).flatMap(bytes -> save(bucketName, blobId, bytes));
+        }).flatMap(bytes -> save(bucketName, blobId, bytes, metadata));
     }
 
     @Override
@@ -184,4 +189,21 @@ public class PostgresBlobStoreDAO implements BlobStoreDAO {
             .collectList()
             .switchIfEmpty(Mono.just(ImmutableList.of()));
     }
+
+    private Hstore asHstore(BlobMetadata metadata) {
+        return Hstore.hstore(metadata.underlyingMap().entrySet().stream()
+            .collect(ImmutableMap.toImmutableMap(
+                entry -> entry.getKey().name(),
+                entry -> entry.getValue().value())));
+    }
+
+    private BlobMetadata asBlobMetadata(Hstore hstore) {
+        return new BlobMetadata(Optional.ofNullable(hstore)
+            .map(Hstore::data)
+            .orElseGet(Map::of)
+            .entrySet().stream()
+            .collect(ImmutableMap.toImmutableMap(
+                entry -> new BlobMetadataName(entry.getKey()),
+                entry -> new BlobMetadataValue(entry.getValue()))));
+    }
 }
diff --git 
a/server/blob/blob-postgres/src/test/java/org/apache/james/blob/postgres/PostgresBlobStoreDAOTest.java
 
b/server/blob/blob-postgres/src/test/java/org/apache/james/blob/postgres/PostgresBlobStoreDAOTest.java
index 84ace19075..95d17b840c 100644
--- 
a/server/blob/blob-postgres/src/test/java/org/apache/james/blob/postgres/PostgresBlobStoreDAOTest.java
+++ 
b/server/blob/blob-postgres/src/test/java/org/apache/james/blob/postgres/PostgresBlobStoreDAOTest.java
@@ -28,6 +28,7 @@ import java.util.concurrent.ExecutionException;
 import org.apache.james.backends.postgres.PostgresExtension;
 import org.apache.james.blob.api.BlobStoreDAO;
 import org.apache.james.blob.api.BlobStoreDAOContract;
+import org.apache.james.blob.api.MetadataAwareBlobStoreDAOContract;
 import org.apache.james.blob.api.PlainBlobId;
 import org.apache.james.util.concurrency.ConcurrentTestRunner;
 import org.junit.jupiter.api.BeforeEach;
@@ -38,7 +39,7 @@ import org.junit.jupiter.params.provider.MethodSource;
 
 import reactor.core.publisher.Mono;
 
-class PostgresBlobStoreDAOTest implements BlobStoreDAOContract {
+class PostgresBlobStoreDAOTest implements BlobStoreDAOContract, 
MetadataAwareBlobStoreDAOContract {
     static Duration CONCURRENT_TEST_DURATION = Duration.ofMinutes(5);
 
     @RegisterExtension
diff --git a/upgrade-instructions.md b/upgrade-instructions.md
index 5e7ee6e621..29987f7061 100644
--- a/upgrade-instructions.md
+++ b/upgrade-instructions.md
@@ -42,6 +42,22 @@ ALTER TABLE james_keyspace.blobs ADD metadata 
frozen<map<text, text>>;
 ALTER TABLE james_keyspace.blobsInBucket ADD metadata frozen<map<text, text>>;
 ```
 
+#### PostgreSQL
+
+Date: 15/04/2026
+
+Concerned products: James products using PostgreSQL as blob storage
+
+James now persists blob metadata using the `metadata` column of the 
`blob_storage` table.
+
+Existing deployments still need to add this new column manually before a 
rolling upgrade.
+
+To add this column, run the following SQL command:
+
+```sql
+ALTER TABLE blob_storage ADD COLUMN metadata hstore;
+```
+
 ### Lucene mailbox index schema update for collapseThreads support
 
 Date: 06/02/2026


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to