This is an automated email from the ASF dual-hosted git repository.

btellier pushed a commit to branch postgresql
in repository https://gitbox.apache.org/repos/asf/james-project.git

commit 5c125e46e26553580a96ccac075b8cb2c689ce64
Author: hung phan <hp...@linagora.com>
AuthorDate: Tue Dec 26 20:51:07 2023 +0700

    JAMES-2586 Implement PostgresBlobStoreDAO
---
 server/blob/blob-postgres/pom.xml                  | 161 +++++++++++++++++++++
 .../blob/postgres/PostgresBlobStorageModule.java   |  62 ++++++++
 .../james/blob/postgres/PostgresBlobStoreDAO.java  | 156 ++++++++++++++++++++
 .../blob/postgres/PostgresBlobStoreDAOTest.java    |  50 +++++++
 server/blob/pom.xml                                |   1 +
 5 files changed, 430 insertions(+)

diff --git a/server/blob/blob-postgres/pom.xml 
b/server/blob/blob-postgres/pom.xml
new file mode 100644
index 0000000000..09ab43e02b
--- /dev/null
+++ b/server/blob/blob-postgres/pom.xml
@@ -0,0 +1,161 @@
+<?xml version="1.0" encoding="UTF-8"?>
+<!--
+    Licensed to the Apache Software Foundation (ASF) under one
+    or more contributor license agreements. See the NOTICE file
+    distributed with this work for additional information
+    regarding copyright ownership. The ASF licenses this file
+    to you under the Apache License, Version 2.0 (the
+    "License"); you may not use this file except in compliance
+    with the License. You may obtain a copy of the License at
+
+    http://www.apache.org/licenses/LICENSE-2.0
+
+    Unless required by applicable law or agreed to in writing,
+    software distributed under the License is distributed on an
+    "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+    KIND, either express or implied. See the License for the
+    specific language governing permissions and limitations
+    under the License.
+-->
+<project xmlns="http://maven.apache.org/POM/4.0.0"; 
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"; 
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 
http://maven.apache.org/maven-v4_0_0.xsd";>
+    <modelVersion>4.0.0</modelVersion>
+
+    <parent>
+        <groupId>org.apache.james</groupId>
+        <artifactId>james-server-blob</artifactId>
+        <version>3.9.0-SNAPSHOT</version>
+        <relativePath>../pom.xml</relativePath>
+    </parent>
+
+    <artifactId>blob-postgres</artifactId>
+
+    <name>Apache James :: Server :: Blob :: Postgres</name>
+
+    <properties>
+        <jooq.version>3.16.22</jooq.version>
+        <r2dbc.postgresql.version>1.0.2.RELEASE</r2dbc.postgresql.version>
+    </properties>
+
+    <dependencies>
+        <dependency>
+            <groupId>${james.groupId}</groupId>
+            <artifactId>apache-james-backends-postgres</artifactId>
+        </dependency>
+        <dependency>
+            <groupId>${james.groupId}</groupId>
+            <artifactId>apache-james-backends-postgres</artifactId>
+            <type>test-jar</type>
+            <scope>test</scope>
+        </dependency>
+        <dependency>
+            <groupId>${james.groupId}</groupId>
+            <artifactId>blob-api</artifactId>
+        </dependency>
+        <dependency>
+            <groupId>${james.groupId}</groupId>
+            <artifactId>blob-api</artifactId>
+            <type>test-jar</type>
+            <scope>test</scope>
+        </dependency>
+        <dependency>
+            <groupId>${james.groupId}</groupId>
+            <artifactId>blob-storage-strategy</artifactId>
+        </dependency>
+        <dependency>
+            <groupId>${james.groupId}</groupId>
+            <artifactId>blob-storage-strategy</artifactId>
+            <type>test-jar</type>
+            <scope>test</scope>
+        </dependency>
+        <dependency>
+            <groupId>${james.groupId}</groupId>
+            <artifactId>james-server-guice-common</artifactId>
+            <type>test-jar</type>
+            <scope>test</scope>
+        </dependency>
+        <dependency>
+            <groupId>${james.groupId}</groupId>
+            <artifactId>james-server-testing</artifactId>
+            <scope>test</scope>
+        </dependency>
+        <dependency>
+            <groupId>${james.groupId}</groupId>
+            <artifactId>james-server-util</artifactId>
+            <scope>test</scope>
+        </dependency>
+        <dependency>
+            <groupId>${james.groupId}</groupId>
+            <artifactId>metrics-tests</artifactId>
+            <scope>test</scope>
+        </dependency>
+        <dependency>
+            <groupId>${james.groupId}</groupId>
+            <artifactId>testing-base</artifactId>
+            <scope>test</scope>
+        </dependency>
+        <dependency>
+            <groupId>commons-io</groupId>
+            <artifactId>commons-io</artifactId>
+        </dependency>
+        <dependency>
+            <groupId>io.projectreactor</groupId>
+            <artifactId>reactor-core</artifactId>
+        </dependency>
+        <dependency>
+            <groupId>org.awaitility</groupId>
+            <artifactId>awaitility</artifactId>
+            <scope>test</scope>
+        </dependency>
+        <dependency>
+            <groupId>org.jooq</groupId>
+            <artifactId>jooq</artifactId>
+            <version>${jooq.version}</version>
+        </dependency>
+        <dependency>
+            <groupId>org.jooq</groupId>
+            <artifactId>jooq-postgres-extensions</artifactId>
+            <version>${jooq.version}</version>
+        </dependency>
+        <dependency>
+            <groupId>org.mockito</groupId>
+            <artifactId>mockito-core</artifactId>
+            <scope>test</scope>
+        </dependency>
+        <dependency>
+            <groupId>org.postgresql</groupId>
+            <artifactId>r2dbc-postgresql</artifactId>
+            <version>${r2dbc.postgresql.version}</version>
+        </dependency>
+        <dependency>
+            <groupId>org.testcontainers</groupId>
+            <artifactId>junit-jupiter</artifactId>
+            <scope>test</scope>
+        </dependency>
+        <dependency>
+            <groupId>org.testcontainers</groupId>
+            <artifactId>postgresql</artifactId>
+            <scope>test</scope>
+        </dependency>
+        <dependency>
+            <groupId>org.testcontainers</groupId>
+            <artifactId>testcontainers</artifactId>
+            <scope>test</scope>
+        </dependency>
+    </dependencies>
+    <build>
+        <plugins>
+            <plugin>
+                <groupId>org.apache.maven.plugins</groupId>
+                <artifactId>maven-surefire-plugin</artifactId>
+                <configuration>
+                    <argLine>-Djava.library.path=
+                        
-javaagent:"${settings.localRepository}"/org/jacoco/org.jacoco.agent/${jacoco-maven-plugin.version}/org.jacoco.agent-${jacoco-maven-plugin.version}-runtime.jar=destfile=${basedir}/target/jacoco.exec
+                        -Xms1024m -Xmx2048m</argLine>
+                    <reuseForks>true</reuseForks>
+                    
<forkedProcessTimeoutInSeconds>1800</forkedProcessTimeoutInSeconds>
+                </configuration>
+            </plugin>
+        </plugins>
+    </build>
+
+</project>
diff --git 
a/server/blob/blob-postgres/src/main/java/org/apache/james/blob/postgres/PostgresBlobStorageModule.java
 
b/server/blob/blob-postgres/src/main/java/org/apache/james/blob/postgres/PostgresBlobStorageModule.java
new file mode 100644
index 0000000000..d5eab5e4eb
--- /dev/null
+++ 
b/server/blob/blob-postgres/src/main/java/org/apache/james/blob/postgres/PostgresBlobStorageModule.java
@@ -0,0 +1,62 @@
+/****************************************************************
+ * Licensed to the Apache Software Foundation (ASF) under one   *
+ * or more contributor license agreements.  See the NOTICE file *
+ * distributed with this work for additional information        *
+ * regarding copyright ownership.  The ASF licenses this file   *
+ * to you under the Apache License, Version 2.0 (the            *
+ * "License"); you may not use this file except in compliance   *
+ * with the License.  You may obtain a copy of the License at   *
+ *                                                              *
+ *   http://www.apache.org/licenses/LICENSE-2.0                 *
+ *                                                              *
+ * Unless required by applicable law or agreed to in writing,   *
+ * software distributed under the License is distributed on an  *
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY       *
+ * KIND, either express or implied.  See the License for the    *
+ * specific language governing permissions and limitations      *
+ * under the License.                                           *
+ ****************************************************************/
+
+package org.apache.james.blob.postgres;
+
+import static 
org.apache.james.blob.postgres.PostgresBlobStorageModule.PostgresBlobStorageTable.BUCKET_NAME_INDEX;
+import static org.jooq.impl.SQLDataType.BLOB;
+
+import org.apache.james.backends.postgres.PostgresIndex;
+import org.apache.james.backends.postgres.PostgresModule;
+import org.apache.james.backends.postgres.PostgresTable;
+import org.jooq.Field;
+import org.jooq.Record;
+import org.jooq.Table;
+import org.jooq.impl.DSL;
+import org.jooq.impl.SQLDataType;
+
+public interface PostgresBlobStorageModule {
+    interface PostgresBlobStorageTable {
+        Table<Record> TABLE_NAME = DSL.table("blob_storage");
+
+        Field<String> BUCKET_NAME = DSL.field("bucket_name", 
SQLDataType.VARCHAR(200).notNull());
+        Field<String> BLOB_ID = DSL.field("blob_id", 
SQLDataType.VARCHAR(200).notNull());
+        Field<byte[]> DATA = DSL.field("data", BLOB.notNull());
+        Field<Integer> SIZE = DSL.field("size", SQLDataType.INTEGER.notNull());
+
+        PostgresTable TABLE = PostgresTable.name(TABLE_NAME.getName())
+            .createTableStep(((dsl, tableName) -> 
dsl.createTableIfNotExists(tableName)
+                .column(BUCKET_NAME)
+                .column(BLOB_ID)
+                .column(DATA)
+                .column(SIZE)
+                .constraint(DSL.primaryKey(BUCKET_NAME, BLOB_ID))))
+            .disableRowLevelSecurity()
+            .build();
+
+        PostgresIndex BUCKET_NAME_INDEX = 
PostgresIndex.name("blob_storage_bucket_name_index")
+            .createIndexStep((dsl, indexName) -> 
dsl.createIndexIfNotExists(indexName)
+                .on(TABLE_NAME, BUCKET_NAME));
+    }
+
+    PostgresModule MODULE = PostgresModule.builder()
+        .addTable(PostgresBlobStorageTable.TABLE)
+        .addIndex(BUCKET_NAME_INDEX)
+        .build();
+}
diff --git 
a/server/blob/blob-postgres/src/main/java/org/apache/james/blob/postgres/PostgresBlobStoreDAO.java
 
b/server/blob/blob-postgres/src/main/java/org/apache/james/blob/postgres/PostgresBlobStoreDAO.java
new file mode 100644
index 0000000000..dbbd67abaf
--- /dev/null
+++ 
b/server/blob/blob-postgres/src/main/java/org/apache/james/blob/postgres/PostgresBlobStoreDAO.java
@@ -0,0 +1,156 @@
+/****************************************************************
+ * Licensed to the Apache Software Foundation (ASF) under one   *
+ * or more contributor license agreements.  See the NOTICE file *
+ * distributed with this work for additional information        *
+ * regarding copyright ownership.  The ASF licenses this file   *
+ * to you under the Apache License, Version 2.0 (the            *
+ * "License"); you may not use this file except in compliance   *
+ * with the License.  You may obtain a copy of the License at   *
+ *                                                              *
+ *   http://www.apache.org/licenses/LICENSE-2.0                 *
+ *                                                              *
+ * Unless required by applicable law or agreed to in writing,   *
+ * software distributed under the License is distributed on an  *
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY       *
+ * KIND, either express or implied.  See the License for the    *
+ * specific language governing permissions and limitations      *
+ * under the License.                                           *
+ ****************************************************************/
+
+package org.apache.james.blob.postgres;
+
+import static 
org.apache.james.blob.postgres.PostgresBlobStorageModule.PostgresBlobStorageTable.BLOB_ID;
+import static 
org.apache.james.blob.postgres.PostgresBlobStorageModule.PostgresBlobStorageTable.BUCKET_NAME;
+import static 
org.apache.james.blob.postgres.PostgresBlobStorageModule.PostgresBlobStorageTable.DATA;
+import static 
org.apache.james.blob.postgres.PostgresBlobStorageModule.PostgresBlobStorageTable.SIZE;
+import static 
org.apache.james.blob.postgres.PostgresBlobStorageModule.PostgresBlobStorageTable.TABLE_NAME;
+
+import java.io.ByteArrayInputStream;
+import java.io.IOException;
+import java.io.InputStream;
+import java.util.Collection;
+
+import javax.inject.Inject;
+
+import org.apache.commons.io.IOUtils;
+import org.apache.james.backends.postgres.utils.PostgresExecutor;
+import org.apache.james.blob.api.BlobId;
+import org.apache.james.blob.api.BlobStoreDAO;
+import org.apache.james.blob.api.BucketName;
+import org.apache.james.blob.api.ObjectNotFoundException;
+import org.apache.james.blob.api.ObjectStoreIOException;
+
+import com.google.common.base.Preconditions;
+import com.google.common.collect.ImmutableList;
+import com.google.common.io.ByteSource;
+
+import reactor.core.publisher.Flux;
+import reactor.core.publisher.Mono;
+
+public class PostgresBlobStoreDAO implements BlobStoreDAO {
+    private final PostgresExecutor postgresExecutor;
+    private final BlobId.Factory blobIdFactory;
+
+    @Inject
+    public PostgresBlobStoreDAO(PostgresExecutor postgresExecutor, 
BlobId.Factory blobIdFactory) {
+        this.postgresExecutor = postgresExecutor;
+        this.blobIdFactory = blobIdFactory;
+    }
+
+    @Override
+    public InputStream read(BucketName bucketName, BlobId blobId) throws 
ObjectStoreIOException, ObjectNotFoundException {
+        return Mono.from(readReactive(bucketName, blobId))
+            .block();
+    }
+
+    @Override
+    public Mono<InputStream> readReactive(BucketName bucketName, BlobId 
blobId) {
+        return Mono.from(readBytes(bucketName, blobId))
+            .map(ByteArrayInputStream::new);
+    }
+
+    @Override
+    public Mono<byte[]> readBytes(BucketName bucketName, BlobId blobId) {
+        return postgresExecutor.executeRow(dsl -> Mono.from(dsl.select(DATA)
+                .from(TABLE_NAME)
+                .where(BUCKET_NAME.eq(bucketName.asString()))
+                .and(BLOB_ID.eq(blobId.asString()))))
+            .map(record -> record.get(DATA))
+            .switchIfEmpty(Mono.error(() -> new ObjectNotFoundException("Blob 
" + blobId + " does not exist in bucket " + bucketName)));
+    }
+
+    @Override
+    public Mono<Void> save(BucketName bucketName, BlobId blobId, byte[] data) {
+        Preconditions.checkNotNull(data);
+
+        return postgresExecutor.executeVoid(dslContext ->
+            Mono.from(dslContext.insertInto(TABLE_NAME, BUCKET_NAME, BLOB_ID, 
DATA, SIZE)
+                .values(bucketName.asString(),
+                    blobId.asString(),
+                    data,
+                    data.length)
+                .onConflict(BUCKET_NAME, BLOB_ID)
+                .doUpdate()
+                .set(DATA, data)
+                .set(SIZE, data.length)));
+    }
+
+    @Override
+    public Mono<Void> save(BucketName bucketName, BlobId blobId, InputStream 
inputStream) {
+        Preconditions.checkNotNull(inputStream);
+
+        return Mono.fromCallable(() -> {
+            try {
+                return IOUtils.toByteArray(inputStream);
+            } catch (IOException e) {
+                throw new ObjectStoreIOException("IOException occurred", e);
+            }
+        }).flatMap(bytes -> save(bucketName, blobId, bytes));
+    }
+
+    @Override
+    public Mono<Void> save(BucketName bucketName, BlobId blobId, ByteSource 
content) {
+        return Mono.fromCallable(() -> {
+            try {
+                return content.read();
+            } catch (IOException e) {
+                throw new ObjectStoreIOException("IOException occurred", e);
+            }
+        }).flatMap(bytes -> save(bucketName, blobId, bytes));
+    }
+
+    @Override
+    public Mono<Void> delete(BucketName bucketName, BlobId blobId) {
+        return postgresExecutor.executeVoid(dsl -> 
Mono.from(dsl.deleteFrom(TABLE_NAME)
+            .where(BUCKET_NAME.eq(bucketName.asString()))
+            .and(BLOB_ID.eq(blobId.asString()))));
+    }
+
+    @Override
+    public Mono<Void> delete(BucketName bucketName, Collection<BlobId> 
blobIds) {
+        return postgresExecutor.executeVoid(dsl -> 
Mono.from(dsl.deleteFrom(TABLE_NAME)
+            .where(BUCKET_NAME.eq(bucketName.asString()))
+            
.and(BLOB_ID.in(blobIds.stream().map(BlobId::asString).collect(ImmutableList.toImmutableList())))));
+    }
+
+    @Override
+    public Mono<Void> deleteBucket(BucketName bucketName) {
+        return postgresExecutor.executeVoid(dsl -> 
Mono.from(dsl.deleteFrom(TABLE_NAME)
+            .where(BUCKET_NAME.eq(bucketName.asString()))));
+    }
+
+    @Override
+    public Flux<BucketName> listBuckets() {
+        return postgresExecutor.executeRows(dsl -> 
Flux.from(dsl.selectDistinct(BUCKET_NAME)
+                .from(TABLE_NAME)))
+            .map(record -> BucketName.of(record.get(BUCKET_NAME)));
+    }
+
+    @Override
+    public Flux<BlobId> listBlobs(BucketName bucketName) {
+        return postgresExecutor.executeRows(dsl -> 
Flux.from(dsl.select(BLOB_ID)
+                .from(TABLE_NAME)
+                .where(BUCKET_NAME.eq(bucketName.asString()))))
+            .map(record -> blobIdFactory.from(record.get(BLOB_ID)));
+    }
+}
diff --git 
a/server/blob/blob-postgres/src/test/java/org/apache/james/blob/postgres/PostgresBlobStoreDAOTest.java
 
b/server/blob/blob-postgres/src/test/java/org/apache/james/blob/postgres/PostgresBlobStoreDAOTest.java
new file mode 100644
index 0000000000..c053232632
--- /dev/null
+++ 
b/server/blob/blob-postgres/src/test/java/org/apache/james/blob/postgres/PostgresBlobStoreDAOTest.java
@@ -0,0 +1,50 @@
+/****************************************************************
+ * Licensed to the Apache Software Foundation (ASF) under one   *
+ * or more contributor license agreements.  See the NOTICE file *
+ * distributed with this work for additional information        *
+ * regarding copyright ownership.  The ASF licenses this file   *
+ * to you under the Apache License, Version 2.0 (the            *
+ * "License"); you may not use this file except in compliance   *
+ * with the License.  You may obtain a copy of the License at   *
+ *                                                              *
+ *   http://www.apache.org/licenses/LICENSE-2.0                 *
+ *                                                              *
+ * Unless required by applicable law or agreed to in writing,   *
+ * software distributed under the License is distributed on an  *
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY       *
+ * KIND, either express or implied.  See the License for the    *
+ * specific language governing permissions and limitations      *
+ * under the License.                                           *
+ ****************************************************************/
+
+package org.apache.james.blob.postgres;
+
+import org.apache.james.backends.postgres.PostgresExtension;
+import org.apache.james.blob.api.BlobStoreDAO;
+import org.apache.james.blob.api.BlobStoreDAOContract;
+import org.apache.james.blob.api.HashBlobId;
+import org.junit.jupiter.api.BeforeEach;
+import org.junit.jupiter.api.Disabled;
+import org.junit.jupiter.api.extension.RegisterExtension;
+
+class PostgresBlobStoreDAOTest implements BlobStoreDAOContract {
+    @RegisterExtension
+    static PostgresExtension postgresExtension = 
PostgresExtension.withoutRowLevelSecurity(PostgresBlobStorageModule.MODULE);
+
+    private PostgresBlobStoreDAO blobStore;
+
+    @BeforeEach
+    void setUp() {
+        blobStore = new 
PostgresBlobStoreDAO(postgresExtension.getPostgresExecutor(), new 
HashBlobId.Factory());
+    }
+
+    @Override
+    public BlobStoreDAO testee() {
+        return blobStore;
+    }
+
+    @Override
+    @Disabled("Not supported")
+    public void listBucketsShouldReturnBucketsWithNoBlob() {
+    }
+}
diff --git a/server/blob/pom.xml b/server/blob/pom.xml
index d429b1ad4f..bd2aaa9f6b 100644
--- a/server/blob/pom.xml
+++ b/server/blob/pom.xml
@@ -41,6 +41,7 @@
         <module>blob-export-file</module>
         <module>blob-file</module>
         <module>blob-memory</module>
+        <module>blob-postgres</module>
         <module>blob-s3</module>
         <module>blob-storage-strategy</module>
 


---------------------------------------------------------------------
To unsubscribe, e-mail: notifications-unsubscr...@james.apache.org
For additional commands, e-mail: notifications-h...@james.apache.org

Reply via email to