This is an automated email from the ASF dual-hosted git repository.
hqtran pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/james-project.git
The following commit(s) were added to refs/heads/master by this push:
new 0cbe36b20e JAMES-4182 Implement blob metadata storage for Postgres
0cbe36b20e is described below
commit 0cbe36b20e4d5534d2518bbd47dc06c4d74c296b
Author: Quan Tran <[email protected]>
AuthorDate: Wed Apr 15 16:34:19 2026 +0700
JAMES-4182 Implement blob metadata storage for Postgres
---
.../PostgresBlobStorageDataDefinition.java | 4 ++
.../james/blob/postgres/PostgresBlobStoreDAO.java | 50 ++++++++++++++++------
.../blob/postgres/PostgresBlobStoreDAOTest.java | 3 +-
upgrade-instructions.md | 16 +++++++
4 files changed, 58 insertions(+), 15 deletions(-)
diff --git
a/server/blob/blob-postgres/src/main/java/org/apache/james/blob/postgres/PostgresBlobStorageDataDefinition.java
b/server/blob/blob-postgres/src/main/java/org/apache/james/blob/postgres/PostgresBlobStorageDataDefinition.java
index 51b2c3a034..9787d9c43b 100644
---
a/server/blob/blob-postgres/src/main/java/org/apache/james/blob/postgres/PostgresBlobStorageDataDefinition.java
+++
b/server/blob/blob-postgres/src/main/java/org/apache/james/blob/postgres/PostgresBlobStorageDataDefinition.java
@@ -19,6 +19,7 @@
package org.apache.james.blob.postgres;
+import static
org.apache.james.backends.postgres.PostgresCommons.DataTypes.HSTORE;
import static
org.apache.james.blob.postgres.PostgresBlobStorageDataDefinition.PostgresBlobStorageTable.BUCKET_NAME_INDEX;
import static org.jooq.impl.SQLDataType.BLOB;
@@ -30,6 +31,7 @@ import org.jooq.Record;
import org.jooq.Table;
import org.jooq.impl.DSL;
import org.jooq.impl.SQLDataType;
+import org.jooq.postgres.extensions.types.Hstore;
public interface PostgresBlobStorageDataDefinition {
interface PostgresBlobStorageTable {
@@ -39,6 +41,7 @@ public interface PostgresBlobStorageDataDefinition {
Field<String> BLOB_ID = DSL.field("blob_id",
SQLDataType.VARCHAR(200).notNull());
Field<byte[]> DATA = DSL.field("data", BLOB.notNull());
Field<Integer> SIZE = DSL.field("size", SQLDataType.INTEGER.notNull());
+ Field<Hstore> METADATA = DSL.field("metadata", HSTORE);
PostgresTable TABLE = PostgresTable.name(TABLE_NAME.getName())
.createTableStep(((dsl, tableName) ->
dsl.createTableIfNotExists(tableName)
@@ -46,6 +49,7 @@ public interface PostgresBlobStorageDataDefinition {
.column(BLOB_ID)
.column(DATA)
.column(SIZE)
+ .column(METADATA)
.constraint(DSL.primaryKey(BUCKET_NAME, BLOB_ID))))
.disableRowLevelSecurity()
.build();
diff --git
a/server/blob/blob-postgres/src/main/java/org/apache/james/blob/postgres/PostgresBlobStoreDAO.java
b/server/blob/blob-postgres/src/main/java/org/apache/james/blob/postgres/PostgresBlobStoreDAO.java
index 1eb1a9e4d8..fc20507a36 100644
---
a/server/blob/blob-postgres/src/main/java/org/apache/james/blob/postgres/PostgresBlobStoreDAO.java
+++
b/server/blob/blob-postgres/src/main/java/org/apache/james/blob/postgres/PostgresBlobStoreDAO.java
@@ -22,6 +22,7 @@ package org.apache.james.blob.postgres;
import static
org.apache.james.blob.postgres.PostgresBlobStorageDataDefinition.PostgresBlobStorageTable.BLOB_ID;
import static
org.apache.james.blob.postgres.PostgresBlobStorageDataDefinition.PostgresBlobStorageTable.BUCKET_NAME;
import static
org.apache.james.blob.postgres.PostgresBlobStorageDataDefinition.PostgresBlobStorageTable.DATA;
+import static
org.apache.james.blob.postgres.PostgresBlobStorageDataDefinition.PostgresBlobStorageTable.METADATA;
import static
org.apache.james.blob.postgres.PostgresBlobStorageDataDefinition.PostgresBlobStorageTable.SIZE;
import static
org.apache.james.blob.postgres.PostgresBlobStorageDataDefinition.PostgresBlobStorageTable.TABLE_NAME;
@@ -29,6 +30,7 @@ import java.io.IOException;
import java.io.InputStream;
import java.util.Collection;
import java.util.List;
+import java.util.Map;
import java.util.Optional;
import java.util.function.Function;
@@ -43,10 +45,12 @@ import org.apache.james.blob.api.BucketName;
import org.apache.james.blob.api.ObjectNotFoundException;
import org.apache.james.blob.api.ObjectStoreIOException;
import org.jooq.impl.DSL;
+import org.jooq.postgres.extensions.types.Hstore;
import org.reactivestreams.Publisher;
import com.google.common.base.Preconditions;
import com.google.common.collect.ImmutableList;
+import com.google.common.collect.ImmutableMap;
import com.google.common.io.ByteSource;
import reactor.core.publisher.Flux;
@@ -76,40 +80,41 @@ public class PostgresBlobStoreDAO implements BlobStoreDAO {
@Override
public Publisher<BytesBlob> readBytes(BucketName bucketName, BlobId
blobId) {
- return postgresExecutor.executeRow(dsl -> Mono.from(dsl.select(DATA)
+ return postgresExecutor.executeRow(dsl -> Mono.from(dsl.select(DATA,
METADATA)
.from(TABLE_NAME)
.where(BUCKET_NAME.eq(bucketName.asString()))
.and(BLOB_ID.eq(blobId.asString()))))
- .map(record -> record.get(DATA))
.switchIfEmpty(Mono.error(() -> new ObjectNotFoundException("Blob
" + blobId + " does not exist in bucket " + bucketName)))
- .map(BytesBlob::of);
+ .map(record -> BytesBlob.of(record.get(DATA),
asBlobMetadata(record.get(METADATA))));
}
@Override
public Publisher<Void> save(BucketName bucketName, BlobId blobId, Blob
blob) {
return switch (blob) {
- case BytesBlob bytesBlob -> save(bucketName, blobId,
bytesBlob.payload());
- case InputStreamBlob inputStreamBlob -> save(bucketName, blobId,
inputStreamBlob.payload());
- case ByteSourceBlob byteSourceBlob -> save(bucketName, blobId,
byteSourceBlob.payload());
+ case BytesBlob bytesBlob -> save(bucketName, blobId,
bytesBlob.payload(), bytesBlob.metadata());
+ case InputStreamBlob inputStreamBlob -> save(bucketName, blobId,
inputStreamBlob.payload(), inputStreamBlob.metadata());
+ case ByteSourceBlob byteSourceBlob -> save(bucketName, blobId,
byteSourceBlob.payload(), byteSourceBlob.metadata());
};
}
- public Mono<Void> save(BucketName bucketName, BlobId blobId, byte[] data) {
+ private Mono<Void> save(BucketName bucketName, BlobId blobId, byte[] data,
BlobMetadata metadata) {
Preconditions.checkNotNull(data);
return postgresExecutor.executeVoid(dslContext ->
- Mono.from(dslContext.insertInto(TABLE_NAME, BUCKET_NAME, BLOB_ID,
DATA, SIZE)
+ Mono.from(dslContext.insertInto(TABLE_NAME, BUCKET_NAME, BLOB_ID,
DATA, SIZE, METADATA)
.values(bucketName.asString(),
blobId.asString(),
data,
- data.length)
+ data.length,
+ asHstore(metadata))
.onConflict(BUCKET_NAME, BLOB_ID)
.doUpdate()
.set(DATA, data)
- .set(SIZE, data.length)));
+ .set(SIZE, data.length)
+ .set(METADATA, asHstore(metadata))));
}
- public Mono<Void> save(BucketName bucketName, BlobId blobId, InputStream
inputStream) {
+ private Mono<Void> save(BucketName bucketName, BlobId blobId, InputStream
inputStream, BlobMetadata metadata) {
Preconditions.checkNotNull(inputStream);
return Mono.fromCallable(() -> {
@@ -118,17 +123,17 @@ public class PostgresBlobStoreDAO implements BlobStoreDAO
{
} catch (IOException e) {
throw new ObjectStoreIOException("IOException occurred", e);
}
- }).flatMap(bytes -> save(bucketName, blobId, bytes));
+ }).flatMap(bytes -> save(bucketName, blobId, bytes, metadata));
}
- public Mono<Void> save(BucketName bucketName, BlobId blobId, ByteSource
content) {
+ private Mono<Void> save(BucketName bucketName, BlobId blobId, ByteSource
content, BlobMetadata metadata) {
return Mono.fromCallable(() -> {
try {
return content.read();
} catch (IOException e) {
throw new ObjectStoreIOException("IOException occurred", e);
}
- }).flatMap(bytes -> save(bucketName, blobId, bytes));
+ }).flatMap(bytes -> save(bucketName, blobId, bytes, metadata));
}
@Override
@@ -184,4 +189,21 @@ public class PostgresBlobStoreDAO implements BlobStoreDAO {
.collectList()
.switchIfEmpty(Mono.just(ImmutableList.of()));
}
+
+ private Hstore asHstore(BlobMetadata metadata) {
+ return Hstore.hstore(metadata.underlyingMap().entrySet().stream()
+ .collect(ImmutableMap.toImmutableMap(
+ entry -> entry.getKey().name(),
+ entry -> entry.getValue().value())));
+ }
+
+ private BlobMetadata asBlobMetadata(Hstore hstore) {
+ return new BlobMetadata(Optional.ofNullable(hstore)
+ .map(Hstore::data)
+ .orElseGet(Map::of)
+ .entrySet().stream()
+ .collect(ImmutableMap.toImmutableMap(
+ entry -> new BlobMetadataName(entry.getKey()),
+ entry -> new BlobMetadataValue(entry.getValue()))));
+ }
}
diff --git
a/server/blob/blob-postgres/src/test/java/org/apache/james/blob/postgres/PostgresBlobStoreDAOTest.java
b/server/blob/blob-postgres/src/test/java/org/apache/james/blob/postgres/PostgresBlobStoreDAOTest.java
index 84ace19075..95d17b840c 100644
---
a/server/blob/blob-postgres/src/test/java/org/apache/james/blob/postgres/PostgresBlobStoreDAOTest.java
+++
b/server/blob/blob-postgres/src/test/java/org/apache/james/blob/postgres/PostgresBlobStoreDAOTest.java
@@ -28,6 +28,7 @@ import java.util.concurrent.ExecutionException;
import org.apache.james.backends.postgres.PostgresExtension;
import org.apache.james.blob.api.BlobStoreDAO;
import org.apache.james.blob.api.BlobStoreDAOContract;
+import org.apache.james.blob.api.MetadataAwareBlobStoreDAOContract;
import org.apache.james.blob.api.PlainBlobId;
import org.apache.james.util.concurrency.ConcurrentTestRunner;
import org.junit.jupiter.api.BeforeEach;
@@ -38,7 +39,7 @@ import org.junit.jupiter.params.provider.MethodSource;
import reactor.core.publisher.Mono;
-class PostgresBlobStoreDAOTest implements BlobStoreDAOContract {
+class PostgresBlobStoreDAOTest implements BlobStoreDAOContract,
MetadataAwareBlobStoreDAOContract {
static Duration CONCURRENT_TEST_DURATION = Duration.ofMinutes(5);
@RegisterExtension
diff --git a/upgrade-instructions.md b/upgrade-instructions.md
index 5e7ee6e621..29987f7061 100644
--- a/upgrade-instructions.md
+++ b/upgrade-instructions.md
@@ -42,6 +42,22 @@ ALTER TABLE james_keyspace.blobs ADD metadata
frozen<map<text, text>>;
ALTER TABLE james_keyspace.blobsInBucket ADD metadata frozen<map<text, text>>;
```
+#### PostgreSQL
+
+Date: 15/04/2026
+
+Concerned products: James products using PostgreSQL as blob storage
+
+James now persists blob metadata using the `metadata` column of the
`blob_storage` table.
+
+Existing deployments still need to add this new column manually before a
rolling upgrade.
+
+To add this column, run the following SQL command:
+
+```sql
+ALTER TABLE blob_storage ADD COLUMN metadata hstore;
+```
+
### Lucene mailbox index schema update for collapseThreads support
Date: 06/02/2026
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]