This is an automated email from the ASF dual-hosted git repository. btellier pushed a commit to branch postgresql in repository https://gitbox.apache.org/repos/asf/james-project.git
commit 5c125e46e26553580a96ccac075b8cb2c689ce64 Author: hung phan <hp...@linagora.com> AuthorDate: Tue Dec 26 20:51:07 2023 +0700 JAMES-2586 Implement PostgresBlobStoreDAO --- server/blob/blob-postgres/pom.xml | 161 +++++++++++++++++++++ .../blob/postgres/PostgresBlobStorageModule.java | 62 ++++++++ .../james/blob/postgres/PostgresBlobStoreDAO.java | 156 ++++++++++++++++++++ .../blob/postgres/PostgresBlobStoreDAOTest.java | 50 +++++++ server/blob/pom.xml | 1 + 5 files changed, 430 insertions(+) diff --git a/server/blob/blob-postgres/pom.xml b/server/blob/blob-postgres/pom.xml new file mode 100644 index 0000000000..09ab43e02b --- /dev/null +++ b/server/blob/blob-postgres/pom.xml @@ -0,0 +1,161 @@ +<?xml version="1.0" encoding="UTF-8"?> +<!-- + Licensed to the Apache Software Foundation (ASF) under one + or more contributor license agreements. See the NOTICE file + distributed with this work for additional information + regarding copyright ownership. The ASF licenses this file + to you under the Apache License, Version 2.0 (the + "License"); you may not use this file except in compliance + with the License. You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + + Unless required by applicable law or agreed to in writing, + software distributed under the License is distributed on an + "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + KIND, either express or implied. See the License for the + specific language governing permissions and limitations + under the License. +--> +<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/maven-v4_0_0.xsd"> + <modelVersion>4.0.0</modelVersion> + + <parent> + <groupId>org.apache.james</groupId> + <artifactId>james-server-blob</artifactId> + <version>3.9.0-SNAPSHOT</version> + <relativePath>../pom.xml</relativePath> + </parent> + + <artifactId>blob-postgres</artifactId> + + <name>Apache James :: Server :: Blob :: Postgres</name> + + <properties> + <jooq.version>3.16.22</jooq.version> + <r2dbc.postgresql.version>1.0.2.RELEASE</r2dbc.postgresql.version> + </properties> + + <dependencies> + <dependency> + <groupId>${james.groupId}</groupId> + <artifactId>apache-james-backends-postgres</artifactId> + </dependency> + <dependency> + <groupId>${james.groupId}</groupId> + <artifactId>apache-james-backends-postgres</artifactId> + <type>test-jar</type> + <scope>test</scope> + </dependency> + <dependency> + <groupId>${james.groupId}</groupId> + <artifactId>blob-api</artifactId> + </dependency> + <dependency> + <groupId>${james.groupId}</groupId> + <artifactId>blob-api</artifactId> + <type>test-jar</type> + <scope>test</scope> + </dependency> + <dependency> + <groupId>${james.groupId}</groupId> + <artifactId>blob-storage-strategy</artifactId> + </dependency> + <dependency> + <groupId>${james.groupId}</groupId> + <artifactId>blob-storage-strategy</artifactId> + <type>test-jar</type> + <scope>test</scope> + </dependency> + <dependency> + <groupId>${james.groupId}</groupId> + <artifactId>james-server-guice-common</artifactId> + <type>test-jar</type> + <scope>test</scope> + </dependency> + <dependency> + <groupId>${james.groupId}</groupId> + <artifactId>james-server-testing</artifactId> + <scope>test</scope> + </dependency> + <dependency> + <groupId>${james.groupId}</groupId> + <artifactId>james-server-util</artifactId> + <scope>test</scope> + </dependency> + <dependency> + <groupId>${james.groupId}</groupId> + <artifactId>metrics-tests</artifactId> + <scope>test</scope> + </dependency> + <dependency> + <groupId>${james.groupId}</groupId> + <artifactId>testing-base</artifactId> + <scope>test</scope> + </dependency> + <dependency> + <groupId>commons-io</groupId> + <artifactId>commons-io</artifactId> + </dependency> + <dependency> + <groupId>io.projectreactor</groupId> + <artifactId>reactor-core</artifactId> + </dependency> + <dependency> + <groupId>org.awaitility</groupId> + <artifactId>awaitility</artifactId> + <scope>test</scope> + </dependency> + <dependency> + <groupId>org.jooq</groupId> + <artifactId>jooq</artifactId> + <version>${jooq.version}</version> + </dependency> + <dependency> + <groupId>org.jooq</groupId> + <artifactId>jooq-postgres-extensions</artifactId> + <version>${jooq.version}</version> + </dependency> + <dependency> + <groupId>org.mockito</groupId> + <artifactId>mockito-core</artifactId> + <scope>test</scope> + </dependency> + <dependency> + <groupId>org.postgresql</groupId> + <artifactId>r2dbc-postgresql</artifactId> + <version>${r2dbc.postgresql.version}</version> + </dependency> + <dependency> + <groupId>org.testcontainers</groupId> + <artifactId>junit-jupiter</artifactId> + <scope>test</scope> + </dependency> + <dependency> + <groupId>org.testcontainers</groupId> + <artifactId>postgresql</artifactId> + <scope>test</scope> + </dependency> + <dependency> + <groupId>org.testcontainers</groupId> + <artifactId>testcontainers</artifactId> + <scope>test</scope> + </dependency> + </dependencies> + <build> + <plugins> + <plugin> + <groupId>org.apache.maven.plugins</groupId> + <artifactId>maven-surefire-plugin</artifactId> + <configuration> + <argLine>-Djava.library.path= + -javaagent:"${settings.localRepository}"/org/jacoco/org.jacoco.agent/${jacoco-maven-plugin.version}/org.jacoco.agent-${jacoco-maven-plugin.version}-runtime.jar=destfile=${basedir}/target/jacoco.exec + -Xms1024m -Xmx2048m</argLine> + <reuseForks>true</reuseForks> + <forkedProcessTimeoutInSeconds>1800</forkedProcessTimeoutInSeconds> + </configuration> + </plugin> + </plugins> + </build> + +</project> diff --git a/server/blob/blob-postgres/src/main/java/org/apache/james/blob/postgres/PostgresBlobStorageModule.java b/server/blob/blob-postgres/src/main/java/org/apache/james/blob/postgres/PostgresBlobStorageModule.java new file mode 100644 index 0000000000..d5eab5e4eb --- /dev/null +++ b/server/blob/blob-postgres/src/main/java/org/apache/james/blob/postgres/PostgresBlobStorageModule.java @@ -0,0 +1,62 @@ +/**************************************************************** + * Licensed to the Apache Software Foundation (ASF) under one * + * or more contributor license agreements. See the NOTICE file * + * distributed with this work for additional information * + * regarding copyright ownership. The ASF licenses this file * + * to you under the Apache License, Version 2.0 (the * + * "License"); you may not use this file except in compliance * + * with the License. You may obtain a copy of the License at * + * * + * http://www.apache.org/licenses/LICENSE-2.0 * + * * + * Unless required by applicable law or agreed to in writing, * + * software distributed under the License is distributed on an * + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY * + * KIND, either express or implied. See the License for the * + * specific language governing permissions and limitations * + * under the License. * + ****************************************************************/ + +package org.apache.james.blob.postgres; + +import static org.apache.james.blob.postgres.PostgresBlobStorageModule.PostgresBlobStorageTable.BUCKET_NAME_INDEX; +import static org.jooq.impl.SQLDataType.BLOB; + +import org.apache.james.backends.postgres.PostgresIndex; +import org.apache.james.backends.postgres.PostgresModule; +import org.apache.james.backends.postgres.PostgresTable; +import org.jooq.Field; +import org.jooq.Record; +import org.jooq.Table; +import org.jooq.impl.DSL; +import org.jooq.impl.SQLDataType; + +public interface PostgresBlobStorageModule { + interface PostgresBlobStorageTable { + Table<Record> TABLE_NAME = DSL.table("blob_storage"); + + Field<String> BUCKET_NAME = DSL.field("bucket_name", SQLDataType.VARCHAR(200).notNull()); + Field<String> BLOB_ID = DSL.field("blob_id", SQLDataType.VARCHAR(200).notNull()); + Field<byte[]> DATA = DSL.field("data", BLOB.notNull()); + Field<Integer> SIZE = DSL.field("size", SQLDataType.INTEGER.notNull()); + + PostgresTable TABLE = PostgresTable.name(TABLE_NAME.getName()) + .createTableStep(((dsl, tableName) -> dsl.createTableIfNotExists(tableName) + .column(BUCKET_NAME) + .column(BLOB_ID) + .column(DATA) + .column(SIZE) + .constraint(DSL.primaryKey(BUCKET_NAME, BLOB_ID)))) + .disableRowLevelSecurity() + .build(); + + PostgresIndex BUCKET_NAME_INDEX = PostgresIndex.name("blob_storage_bucket_name_index") + .createIndexStep((dsl, indexName) -> dsl.createIndexIfNotExists(indexName) + .on(TABLE_NAME, BUCKET_NAME)); + } + + PostgresModule MODULE = PostgresModule.builder() + .addTable(PostgresBlobStorageTable.TABLE) + .addIndex(BUCKET_NAME_INDEX) + .build(); +} diff --git a/server/blob/blob-postgres/src/main/java/org/apache/james/blob/postgres/PostgresBlobStoreDAO.java b/server/blob/blob-postgres/src/main/java/org/apache/james/blob/postgres/PostgresBlobStoreDAO.java new file mode 100644 index 0000000000..dbbd67abaf --- /dev/null +++ b/server/blob/blob-postgres/src/main/java/org/apache/james/blob/postgres/PostgresBlobStoreDAO.java @@ -0,0 +1,156 @@ +/**************************************************************** + * Licensed to the Apache Software Foundation (ASF) under one * + * or more contributor license agreements. See the NOTICE file * + * distributed with this work for additional information * + * regarding copyright ownership. The ASF licenses this file * + * to you under the Apache License, Version 2.0 (the * + * "License"); you may not use this file except in compliance * + * with the License. You may obtain a copy of the License at * + * * + * http://www.apache.org/licenses/LICENSE-2.0 * + * * + * Unless required by applicable law or agreed to in writing, * + * software distributed under the License is distributed on an * + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY * + * KIND, either express or implied. See the License for the * + * specific language governing permissions and limitations * + * under the License. * + ****************************************************************/ + +package org.apache.james.blob.postgres; + +import static org.apache.james.blob.postgres.PostgresBlobStorageModule.PostgresBlobStorageTable.BLOB_ID; +import static org.apache.james.blob.postgres.PostgresBlobStorageModule.PostgresBlobStorageTable.BUCKET_NAME; +import static org.apache.james.blob.postgres.PostgresBlobStorageModule.PostgresBlobStorageTable.DATA; +import static org.apache.james.blob.postgres.PostgresBlobStorageModule.PostgresBlobStorageTable.SIZE; +import static org.apache.james.blob.postgres.PostgresBlobStorageModule.PostgresBlobStorageTable.TABLE_NAME; + +import java.io.ByteArrayInputStream; +import java.io.IOException; +import java.io.InputStream; +import java.util.Collection; + +import javax.inject.Inject; + +import org.apache.commons.io.IOUtils; +import org.apache.james.backends.postgres.utils.PostgresExecutor; +import org.apache.james.blob.api.BlobId; +import org.apache.james.blob.api.BlobStoreDAO; +import org.apache.james.blob.api.BucketName; +import org.apache.james.blob.api.ObjectNotFoundException; +import org.apache.james.blob.api.ObjectStoreIOException; + +import com.google.common.base.Preconditions; +import com.google.common.collect.ImmutableList; +import com.google.common.io.ByteSource; + +import reactor.core.publisher.Flux; +import reactor.core.publisher.Mono; + +public class PostgresBlobStoreDAO implements BlobStoreDAO { + private final PostgresExecutor postgresExecutor; + private final BlobId.Factory blobIdFactory; + + @Inject + public PostgresBlobStoreDAO(PostgresExecutor postgresExecutor, BlobId.Factory blobIdFactory) { + this.postgresExecutor = postgresExecutor; + this.blobIdFactory = blobIdFactory; + } + + @Override + public InputStream read(BucketName bucketName, BlobId blobId) throws ObjectStoreIOException, ObjectNotFoundException { + return Mono.from(readReactive(bucketName, blobId)) + .block(); + } + + @Override + public Mono<InputStream> readReactive(BucketName bucketName, BlobId blobId) { + return Mono.from(readBytes(bucketName, blobId)) + .map(ByteArrayInputStream::new); + } + + @Override + public Mono<byte[]> readBytes(BucketName bucketName, BlobId blobId) { + return postgresExecutor.executeRow(dsl -> Mono.from(dsl.select(DATA) + .from(TABLE_NAME) + .where(BUCKET_NAME.eq(bucketName.asString())) + .and(BLOB_ID.eq(blobId.asString())))) + .map(record -> record.get(DATA)) + .switchIfEmpty(Mono.error(() -> new ObjectNotFoundException("Blob " + blobId + " does not exist in bucket " + bucketName))); + } + + @Override + public Mono<Void> save(BucketName bucketName, BlobId blobId, byte[] data) { + Preconditions.checkNotNull(data); + + return postgresExecutor.executeVoid(dslContext -> + Mono.from(dslContext.insertInto(TABLE_NAME, BUCKET_NAME, BLOB_ID, DATA, SIZE) + .values(bucketName.asString(), + blobId.asString(), + data, + data.length) + .onConflict(BUCKET_NAME, BLOB_ID) + .doUpdate() + .set(DATA, data) + .set(SIZE, data.length))); + } + + @Override + public Mono<Void> save(BucketName bucketName, BlobId blobId, InputStream inputStream) { + Preconditions.checkNotNull(inputStream); + + return Mono.fromCallable(() -> { + try { + return IOUtils.toByteArray(inputStream); + } catch (IOException e) { + throw new ObjectStoreIOException("IOException occurred", e); + } + }).flatMap(bytes -> save(bucketName, blobId, bytes)); + } + + @Override + public Mono<Void> save(BucketName bucketName, BlobId blobId, ByteSource content) { + return Mono.fromCallable(() -> { + try { + return content.read(); + } catch (IOException e) { + throw new ObjectStoreIOException("IOException occurred", e); + } + }).flatMap(bytes -> save(bucketName, blobId, bytes)); + } + + @Override + public Mono<Void> delete(BucketName bucketName, BlobId blobId) { + return postgresExecutor.executeVoid(dsl -> Mono.from(dsl.deleteFrom(TABLE_NAME) + .where(BUCKET_NAME.eq(bucketName.asString())) + .and(BLOB_ID.eq(blobId.asString())))); + } + + @Override + public Mono<Void> delete(BucketName bucketName, Collection<BlobId> blobIds) { + return postgresExecutor.executeVoid(dsl -> Mono.from(dsl.deleteFrom(TABLE_NAME) + .where(BUCKET_NAME.eq(bucketName.asString())) + .and(BLOB_ID.in(blobIds.stream().map(BlobId::asString).collect(ImmutableList.toImmutableList()))))); + } + + @Override + public Mono<Void> deleteBucket(BucketName bucketName) { + return postgresExecutor.executeVoid(dsl -> Mono.from(dsl.deleteFrom(TABLE_NAME) + .where(BUCKET_NAME.eq(bucketName.asString())))); + } + + @Override + public Flux<BucketName> listBuckets() { + return postgresExecutor.executeRows(dsl -> Flux.from(dsl.selectDistinct(BUCKET_NAME) + .from(TABLE_NAME))) + .map(record -> BucketName.of(record.get(BUCKET_NAME))); + } + + @Override + public Flux<BlobId> listBlobs(BucketName bucketName) { + return postgresExecutor.executeRows(dsl -> Flux.from(dsl.select(BLOB_ID) + .from(TABLE_NAME) + .where(BUCKET_NAME.eq(bucketName.asString())))) + .map(record -> blobIdFactory.from(record.get(BLOB_ID))); + } +} diff --git a/server/blob/blob-postgres/src/test/java/org/apache/james/blob/postgres/PostgresBlobStoreDAOTest.java b/server/blob/blob-postgres/src/test/java/org/apache/james/blob/postgres/PostgresBlobStoreDAOTest.java new file mode 100644 index 0000000000..c053232632 --- /dev/null +++ b/server/blob/blob-postgres/src/test/java/org/apache/james/blob/postgres/PostgresBlobStoreDAOTest.java @@ -0,0 +1,50 @@ +/**************************************************************** + * Licensed to the Apache Software Foundation (ASF) under one * + * or more contributor license agreements. See the NOTICE file * + * distributed with this work for additional information * + * regarding copyright ownership. The ASF licenses this file * + * to you under the Apache License, Version 2.0 (the * + * "License"); you may not use this file except in compliance * + * with the License. You may obtain a copy of the License at * + * * + * http://www.apache.org/licenses/LICENSE-2.0 * + * * + * Unless required by applicable law or agreed to in writing, * + * software distributed under the License is distributed on an * + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY * + * KIND, either express or implied. See the License for the * + * specific language governing permissions and limitations * + * under the License. * + ****************************************************************/ + +package org.apache.james.blob.postgres; + +import org.apache.james.backends.postgres.PostgresExtension; +import org.apache.james.blob.api.BlobStoreDAO; +import org.apache.james.blob.api.BlobStoreDAOContract; +import org.apache.james.blob.api.HashBlobId; +import org.junit.jupiter.api.BeforeEach; +import org.junit.jupiter.api.Disabled; +import org.junit.jupiter.api.extension.RegisterExtension; + +class PostgresBlobStoreDAOTest implements BlobStoreDAOContract { + @RegisterExtension + static PostgresExtension postgresExtension = PostgresExtension.withoutRowLevelSecurity(PostgresBlobStorageModule.MODULE); + + private PostgresBlobStoreDAO blobStore; + + @BeforeEach + void setUp() { + blobStore = new PostgresBlobStoreDAO(postgresExtension.getPostgresExecutor(), new HashBlobId.Factory()); + } + + @Override + public BlobStoreDAO testee() { + return blobStore; + } + + @Override + @Disabled("Not supported") + public void listBucketsShouldReturnBucketsWithNoBlob() { + } +} diff --git a/server/blob/pom.xml b/server/blob/pom.xml index d429b1ad4f..bd2aaa9f6b 100644 --- a/server/blob/pom.xml +++ b/server/blob/pom.xml @@ -41,6 +41,7 @@ <module>blob-export-file</module> <module>blob-file</module> <module>blob-memory</module> + <module>blob-postgres</module> <module>blob-s3</module> <module>blob-storage-strategy</module> --------------------------------------------------------------------- To unsubscribe, e-mail: notifications-unsubscr...@james.apache.org For additional commands, e-mail: notifications-h...@james.apache.org