This is an automated email from the ASF dual-hosted git repository. rcordier pushed a commit to branch postgresql in repository https://gitbox.apache.org/repos/asf/james-project.git
commit 392ece628f3201db667c81fd7163242e433b3576 Author: Quan Tran <[email protected]> AuthorDate: Fri Mar 29 13:05:58 2024 +0700 JAMES-2586 Implement PostgresTaskExecutionDetailsProjection --- .../james/backends/postgres/PostgresCommons.java | 4 + server/task/task-postgres/pom.xml | 6 + .../PostgresTaskExecutionDetailsProjection.scala | 54 ++++++ ...PostgresTaskExecutionDetailsProjectionDAO.scala | 112 ++++++++++++ ...tgresTaskExecutionDetailsProjectionModule.scala | 72 ++++++++ ...tgresTaskExecutionDetailsProjectionDAOTest.java | 202 +++++++++++++++++++++ ...PostgresTaskExecutionDetailsProjectionTest.java | 52 ++++++ 7 files changed, 502 insertions(+) diff --git a/backends-common/postgres/src/main/java/org/apache/james/backends/postgres/PostgresCommons.java b/backends-common/postgres/src/main/java/org/apache/james/backends/postgres/PostgresCommons.java index 5557b591b9..88201ac066 100644 --- a/backends-common/postgres/src/main/java/org/apache/james/backends/postgres/PostgresCommons.java +++ b/backends-common/postgres/src/main/java/org/apache/james/backends/postgres/PostgresCommons.java @@ -64,6 +64,10 @@ public class PostgresCommons { .map(value -> LocalDateTime.ofInstant(value.toInstant(), ZoneOffset.UTC)) .orElse(null); + public static final Function<ZonedDateTime, LocalDateTime> ZONED_DATE_TIME_TO_LOCAL_DATE_TIME = date -> Optional.ofNullable(date) + .map(value -> value.withZoneSameInstant(ZoneOffset.UTC).toLocalDateTime()) + .orElse(null); + public static final Function<Instant, LocalDateTime> INSTANT_TO_LOCAL_DATE_TIME = instant -> Optional.ofNullable(instant) .map(value -> LocalDateTime.ofInstant(instant, ZoneOffset.UTC)) .orElse(null); diff --git a/server/task/task-postgres/pom.xml b/server/task/task-postgres/pom.xml index 3cf839f848..35160283f7 100644 --- a/server/task/task-postgres/pom.xml +++ b/server/task/task-postgres/pom.xml @@ -33,6 +33,12 @@ <type>test-jar</type> <scope>test</scope> </dependency> + <dependency> + <groupId>${james.groupId}</groupId> + <artifactId>james-server-guice-common</artifactId> + <type>test-jar</type> + <scope>test</scope> + </dependency> <dependency> <groupId>${james.groupId}</groupId> <artifactId>james-server-lifecycle-api</artifactId> diff --git a/server/task/task-postgres/src/main/scala/org/apache/james/task/eventsourcing/postgres/PostgresTaskExecutionDetailsProjection.scala b/server/task/task-postgres/src/main/scala/org/apache/james/task/eventsourcing/postgres/PostgresTaskExecutionDetailsProjection.scala new file mode 100644 index 0000000000..999ea770d4 --- /dev/null +++ b/server/task/task-postgres/src/main/scala/org/apache/james/task/eventsourcing/postgres/PostgresTaskExecutionDetailsProjection.scala @@ -0,0 +1,54 @@ + /*************************************************************** + * Licensed to the Apache Software Foundation (ASF) under one * + * or more contributor license agreements. See the NOTICE file * + * distributed with this work for additional information * + * regarding copyright ownership. The ASF licenses this file * + * to you under the Apache License, Version 2.0 (the * + * "License"); you may not use this file except in compliance * + * with the License. You may obtain a copy of the License at * + * * + * http://www.apache.org/licenses/LICENSE-2.0 * + * * + * Unless required by applicable law or agreed to in writing, * + * software distributed under the License is distributed on an * + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY * + * KIND, either express or implied. See the License for the * + * specific language governing permissions and limitations * + * under the License. * + ****************************************************************/ + +package org.apache.james.task.eventsourcing.postgres + +import java.time.Instant + +import javax.inject.Inject +import org.apache.james.task.eventsourcing.TaskExecutionDetailsProjection +import org.apache.james.task.{TaskExecutionDetails, TaskId} +import org.reactivestreams.Publisher + +import scala.compat.java8.OptionConverters._ +import scala.jdk.CollectionConverters._ + +class PostgresTaskExecutionDetailsProjection @Inject()(taskExecutionDetailsProjectionDAO: PostgresTaskExecutionDetailsProjectionDAO) + extends TaskExecutionDetailsProjection { + + override def load(taskId: TaskId): Option[TaskExecutionDetails] = + taskExecutionDetailsProjectionDAO.readDetails(taskId).blockOptional().asScala + + override def list: List[TaskExecutionDetails] = + taskExecutionDetailsProjectionDAO.listDetails().collectList().block().asScala.toList + + override def update(details: TaskExecutionDetails): Unit = + taskExecutionDetailsProjectionDAO.saveDetails(details).block() + + override def loadReactive(taskId: TaskId): Publisher[TaskExecutionDetails] = + taskExecutionDetailsProjectionDAO.readDetails(taskId) + + override def listReactive(): Publisher[TaskExecutionDetails] = taskExecutionDetailsProjectionDAO.listDetails() + + override def updateReactive(details: TaskExecutionDetails): Publisher[Void] = taskExecutionDetailsProjectionDAO.saveDetails(details) + + override def listDetailsByBeforeDate(beforeDate: Instant): Publisher[TaskExecutionDetails] = taskExecutionDetailsProjectionDAO.listDetailsByBeforeDate(beforeDate) + + override def remove(taskExecutionDetails: TaskExecutionDetails): Publisher[Void] = taskExecutionDetailsProjectionDAO.remove(taskExecutionDetails) +} diff --git a/server/task/task-postgres/src/main/scala/org/apache/james/task/eventsourcing/postgres/PostgresTaskExecutionDetailsProjectionDAO.scala b/server/task/task-postgres/src/main/scala/org/apache/james/task/eventsourcing/postgres/PostgresTaskExecutionDetailsProjectionDAO.scala new file mode 100644 index 0000000000..5ed08bc536 --- /dev/null +++ b/server/task/task-postgres/src/main/scala/org/apache/james/task/eventsourcing/postgres/PostgresTaskExecutionDetailsProjectionDAO.scala @@ -0,0 +1,112 @@ +/**************************************************************** + * Licensed to the Apache Software Foundation (ASF) under one * + * or more contributor license agreements. See the NOTICE file * + * distributed with this work for additional information * + * regarding copyright ownership. The ASF licenses this file * + * to you under the Apache License, Version 2.0 (the * + * "License"); you may not use this file except in compliance * + * with the License. You may obtain a copy of the License at * + * * + * http://www.apache.org/licenses/LICENSE-2.0 * + * * + * Unless required by applicable law or agreed to in writing, * + * software distributed under the License is distributed on an * + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY * + * KIND, either express or implied. See the License for the * + * specific language governing permissions and limitations * + * under the License. * + ****************************************************************/ + +package org.apache.james.task.eventsourcing.postgres + +import java.time.{Instant, LocalDateTime} +import java.util.Optional + +import com.google.common.collect.ImmutableMap +import javax.inject.Inject +import org.apache.james.backends.postgres.PostgresCommons.{LOCAL_DATE_TIME_ZONED_DATE_TIME_FUNCTION, ZONED_DATE_TIME_TO_LOCAL_DATE_TIME, INSTANT_TO_LOCAL_DATE_TIME} +import org.apache.james.backends.postgres.utils.PostgresExecutor +import org.apache.james.server.task.json.JsonTaskAdditionalInformationSerializer +import org.apache.james.task._ +import org.apache.james.task.eventsourcing.postgres.PostgresTaskExecutionDetailsProjectionModule._ +import org.apache.james.util.ReactorUtils +import org.jooq.JSONB.jsonb +import org.jooq.{InsertQuery, Record} +import reactor.core.publisher.{Flux, Mono} + +class PostgresTaskExecutionDetailsProjectionDAO @Inject()(postgresExecutor: PostgresExecutor, jsonTaskAdditionalInformationSerializer: JsonTaskAdditionalInformationSerializer) { + + def saveDetails(details: TaskExecutionDetails): Mono[Void] = + Mono.from(serializeAdditionalInformation(details) + .flatMap(serializedAdditionalInformation => postgresExecutor.executeVoid(dsl => { + val insertValues: ImmutableMap[Any, Any] = toInsertValues(details, serializedAdditionalInformation) + + val insertStatement: InsertQuery[Record] = dsl.insertQuery(TABLE_NAME) + insertStatement.addValue(TASK_ID, details.getTaskId.getValue) + insertStatement.addValues(insertValues) + insertStatement.onConflict(TASK_ID) + insertStatement.onDuplicateKeyUpdate(true) + insertStatement.addValuesForUpdate(insertValues) + + Mono.from(insertStatement) + }))) + + private def toInsertValues(details: TaskExecutionDetails, serializedAdditionalInformation: Optional[String]): ImmutableMap[Any, Any] = { + val builder: ImmutableMap.Builder[Any, Any] = ImmutableMap.builder() + builder.put(TYPE, details.getType.asString()) + builder.put(STATUS, details.getStatus.getValue) + builder.put(SUBMITTED_DATE, ZONED_DATE_TIME_TO_LOCAL_DATE_TIME.apply(details.getSubmittedDate)) + builder.put(SUBMITTED_NODE, details.getSubmittedNode.asString) + details.getStartedDate.ifPresent(startedDate => builder.put(STARTED_DATE, ZONED_DATE_TIME_TO_LOCAL_DATE_TIME.apply(startedDate))) + details.getRanNode.ifPresent(hostname => builder.put(RAN_NODE, hostname.asString)) + details.getCompletedDate.ifPresent(completedDate => builder.put(COMPLETED_DATE, ZONED_DATE_TIME_TO_LOCAL_DATE_TIME.apply(completedDate))) + details.getCanceledDate.ifPresent(canceledDate => builder.put(CANCELED_DATE, ZONED_DATE_TIME_TO_LOCAL_DATE_TIME.apply(canceledDate))) + details.getCancelRequestedNode.ifPresent(hostname => builder.put(CANCEL_REQUESTED_NODE, hostname.asString)) + details.getFailedDate.ifPresent(failedDate => builder.put(FAILED_DATE, ZONED_DATE_TIME_TO_LOCAL_DATE_TIME.apply(failedDate))) + serializedAdditionalInformation.ifPresent(info => builder.put(ADDITIONAL_INFORMATION, jsonb(info))) + builder.build() + } + + private def serializeAdditionalInformation(details: TaskExecutionDetails): Mono[Optional[String]] = Mono.fromCallable(() => details + .getAdditionalInformation + .map(jsonTaskAdditionalInformationSerializer.serialize(_))) + .cast(classOf[Optional[String]]) + .subscribeOn(ReactorUtils.BLOCKING_CALL_WRAPPER) + + def readDetails(taskId: TaskId): Mono[TaskExecutionDetails] = + postgresExecutor.executeRow(dsl => Mono.from(dsl.selectFrom(TABLE_NAME) + .where(TASK_ID.eq(taskId.getValue)))) + .map(toTaskExecutionDetails) + + def listDetails(): Flux[TaskExecutionDetails] = + postgresExecutor.executeRows(dsl => Flux.from(dsl.selectFrom(TABLE_NAME))) + .map(toTaskExecutionDetails) + + def listDetailsByBeforeDate(beforeDate: Instant): Flux[TaskExecutionDetails] = + postgresExecutor.executeRows(dsl => Flux.from(dsl.selectFrom(TABLE_NAME) + .where(SUBMITTED_DATE.lt(INSTANT_TO_LOCAL_DATE_TIME.apply(beforeDate))))) + .map(toTaskExecutionDetails) + + def remove(details: TaskExecutionDetails): Mono[Void] = + postgresExecutor.executeVoid(dsl => Mono.from(dsl.deleteFrom(TABLE_NAME) + .where(TASK_ID.eq(details.getTaskId.getValue)))) + + private def toTaskExecutionDetails(record: Record): TaskExecutionDetails = + new TaskExecutionDetails( + taskId = TaskId.fromUUID(record.get(TASK_ID)), + `type` = TaskType.of(record.get(TYPE)), + status = TaskManager.Status.fromString(record.get(STATUS)), + submittedDate = LOCAL_DATE_TIME_ZONED_DATE_TIME_FUNCTION.apply(record.get(SUBMITTED_DATE, classOf[LocalDateTime])), + submittedNode = Hostname(record.get(SUBMITTED_NODE)), + startedDate = Optional.ofNullable(LOCAL_DATE_TIME_ZONED_DATE_TIME_FUNCTION.apply(record.get(STARTED_DATE, classOf[LocalDateTime]))), + ranNode = Optional.ofNullable(record.get(RAN_NODE)).map(Hostname(_)), + completedDate = Optional.ofNullable(LOCAL_DATE_TIME_ZONED_DATE_TIME_FUNCTION.apply(record.get(COMPLETED_DATE, classOf[LocalDateTime]))), + canceledDate = Optional.ofNullable(LOCAL_DATE_TIME_ZONED_DATE_TIME_FUNCTION.apply(record.get(CANCELED_DATE, classOf[LocalDateTime]))), + cancelRequestedNode = Optional.ofNullable(record.get(CANCEL_REQUESTED_NODE)).map(Hostname(_)), + failedDate = Optional.ofNullable(LOCAL_DATE_TIME_ZONED_DATE_TIME_FUNCTION.apply(record.get(FAILED_DATE, classOf[LocalDateTime]))), + additionalInformation = () => deserializeAdditionalInformation(record)) + + private def deserializeAdditionalInformation(record: Record): Optional[TaskExecutionDetails.AdditionalInformation] = + Optional.ofNullable(record.get(ADDITIONAL_INFORMATION)) + .map(additionalInformation => jsonTaskAdditionalInformationSerializer.deserialize(additionalInformation.data())) +} diff --git a/server/task/task-postgres/src/main/scala/org/apache/james/task/eventsourcing/postgres/PostgresTaskExecutionDetailsProjectionModule.scala b/server/task/task-postgres/src/main/scala/org/apache/james/task/eventsourcing/postgres/PostgresTaskExecutionDetailsProjectionModule.scala new file mode 100644 index 0000000000..21918fd804 --- /dev/null +++ b/server/task/task-postgres/src/main/scala/org/apache/james/task/eventsourcing/postgres/PostgresTaskExecutionDetailsProjectionModule.scala @@ -0,0 +1,72 @@ +/**************************************************************** + * Licensed to the Apache Software Foundation (ASF) under one * + * or more contributor license agreements. See the NOTICE file * + * distributed with this work for additional information * + * regarding copyright ownership. The ASF licenses this file * + * to you under the Apache License, Version 2.0 (the * + * "License"); you may not use this file except in compliance * + * with the License. You may obtain a copy of the License at * + * * + * http://www.apache.org/licenses/LICENSE-2.0 * + * * + * Unless required by applicable law or agreed to in writing, * + * software distributed under the License is distributed on an * + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY * + * KIND, either express or implied. See the License for the * + * specific language governing permissions and limitations * + * under the License. * + ****************************************************************/ + +package org.apache.james.task.eventsourcing.postgres + +import java.time.LocalDateTime +import java.util.UUID + +import org.apache.james.backends.postgres.{PostgresCommons, PostgresIndex, PostgresModule, PostgresTable} +import org.jooq.impl.{DSL, SQLDataType} +import org.jooq.{Field, JSONB, Record, Table} + +object PostgresTaskExecutionDetailsProjectionModule { + val TABLE_NAME: Table[Record] = DSL.table("task_execution_details_projection") + + val TASK_ID: Field[UUID] = DSL.field("task_id", SQLDataType.UUID.notNull) + val ADDITIONAL_INFORMATION: Field[JSONB] = DSL.field("additional_information", SQLDataType.JSONB) + val TYPE: Field[String] = DSL.field("type", SQLDataType.VARCHAR) + val STATUS: Field[String] = DSL.field("status", SQLDataType.VARCHAR) + val SUBMITTED_DATE: Field[LocalDateTime] = DSL.field("submitted_date", PostgresCommons.DataTypes.TIMESTAMP) + val SUBMITTED_NODE: Field[String] = DSL.field("submitted_node", SQLDataType.VARCHAR) + val STARTED_DATE: Field[LocalDateTime] = DSL.field("started_date", PostgresCommons.DataTypes.TIMESTAMP) + val RAN_NODE: Field[String] = DSL.field("ran_node", SQLDataType.VARCHAR) + val COMPLETED_DATE: Field[LocalDateTime] = DSL.field("completed_date", PostgresCommons.DataTypes.TIMESTAMP) + val CANCELED_DATE: Field[LocalDateTime] = DSL.field("canceled_date", PostgresCommons.DataTypes.TIMESTAMP) + val CANCEL_REQUESTED_NODE: Field[String] = DSL.field("cancel_requested_node", SQLDataType.VARCHAR) + val FAILED_DATE: Field[LocalDateTime] = DSL.field("failed_date", PostgresCommons.DataTypes.TIMESTAMP) + + private val TABLE: PostgresTable = PostgresTable.name(TABLE_NAME.getName) + .createTableStep((dsl, tableName) => dsl.createTableIfNotExists(tableName) + .column(TASK_ID) + .column(ADDITIONAL_INFORMATION) + .column(TYPE) + .column(STATUS) + .column(SUBMITTED_DATE) + .column(SUBMITTED_NODE) + .column(STARTED_DATE) + .column(RAN_NODE) + .column(COMPLETED_DATE) + .column(CANCELED_DATE) + .column(CANCEL_REQUESTED_NODE) + .column(FAILED_DATE) + .constraint(DSL.primaryKey(TASK_ID))) + .disableRowLevelSecurity + .build + + private val SUBMITTED_DATE_INDEX: PostgresIndex = PostgresIndex.name("task_execution_details_projection_submittedDate_index") + .createIndexStep((dsl, indexName) => dsl.createIndexIfNotExists(indexName) + .on(TABLE_NAME, SUBMITTED_DATE)); + + val MODULE: PostgresModule = PostgresModule + .builder + .addTable(TABLE) + .addIndex(SUBMITTED_DATE_INDEX) + .build +} diff --git a/server/task/task-postgres/src/test/java/org/apache/james/task/eventsourcing/postgres/PostgresTaskExecutionDetailsProjectionDAOTest.java b/server/task/task-postgres/src/test/java/org/apache/james/task/eventsourcing/postgres/PostgresTaskExecutionDetailsProjectionDAOTest.java new file mode 100644 index 0000000000..22f07fd340 --- /dev/null +++ b/server/task/task-postgres/src/test/java/org/apache/james/task/eventsourcing/postgres/PostgresTaskExecutionDetailsProjectionDAOTest.java @@ -0,0 +1,202 @@ +/**************************************************************** + * Licensed to the Apache Software Foundation (ASF) under one * + * or more contributor license agreements. See the NOTICE file * + * distributed with this work for additional information * + * regarding copyright ownership. The ASF licenses this file * + * to you under the Apache License, Version 2.0 (the * + * "License"); you may not use this file except in compliance * + * with the License. You may obtain a copy of the License at * + * * + * http://www.apache.org/licenses/LICENSE-2.0 * + * * + * Unless required by applicable law or agreed to in writing, * + * software distributed under the License is distributed on an * + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY * + * KIND, either express or implied. See the License for the * + * specific language governing permissions and limitations * + * under the License. * + ****************************************************************/ + +package org.apache.james.task.eventsourcing.postgres; + +import static org.apache.james.task.TaskExecutionDetailsFixture.TASK_EXECUTION_DETAILS; +import static org.apache.james.task.TaskExecutionDetailsFixture.TASK_EXECUTION_DETAILS_2; +import static org.apache.james.task.TaskExecutionDetailsFixture.TASK_EXECUTION_DETAILS_UPDATED; +import static org.apache.james.task.TaskExecutionDetailsFixture.TASK_EXECUTION_DETAILS_WITH_ADDITIONAL_INFORMATION; +import static org.apache.james.task.TaskExecutionDetailsFixture.TASK_ID; +import static org.apache.james.task.TaskExecutionDetailsFixture.TASK_ID_2; +import static org.assertj.core.api.Assertions.assertThat; + +import java.time.Instant; +import java.time.ZoneId; +import java.time.ZonedDateTime; +import java.util.Optional; +import java.util.stream.Stream; + +import org.apache.james.backends.postgres.PostgresExtension; +import org.apache.james.server.task.json.JsonTaskAdditionalInformationSerializer; +import org.apache.james.server.task.json.dto.MemoryReferenceWithCounterTaskAdditionalInformationDTO; +import org.apache.james.task.TaskExecutionDetails; +import org.apache.james.task.TaskExecutionDetailsFixture; +import org.apache.james.task.TaskManager; +import org.apache.james.task.TaskType; +import org.junit.jupiter.api.BeforeEach; +import org.junit.jupiter.api.Test; +import org.junit.jupiter.api.extension.RegisterExtension; + +import reactor.core.publisher.Flux; + +class PostgresTaskExecutionDetailsProjectionDAOTest { + @RegisterExtension + static PostgresExtension postgresExtension = PostgresExtension.withoutRowLevelSecurity(PostgresTaskExecutionDetailsProjectionModule.MODULE()); + + private static final JsonTaskAdditionalInformationSerializer JSON_TASK_ADDITIONAL_INFORMATION_SERIALIZER = JsonTaskAdditionalInformationSerializer.of(MemoryReferenceWithCounterTaskAdditionalInformationDTO.SERIALIZATION_MODULE); + + private PostgresTaskExecutionDetailsProjectionDAO testee; + + @BeforeEach + void setUp() { + testee = new PostgresTaskExecutionDetailsProjectionDAO(postgresExtension.getPostgresExecutor(), JSON_TASK_ADDITIONAL_INFORMATION_SERIALIZER); + } + + @Test + void readDetailsShouldBeAbleToRetrieveASavedRecord() { + testee.saveDetails(TASK_EXECUTION_DETAILS()).block(); + + TaskExecutionDetails taskExecutionDetails = testee.readDetails(TASK_ID()).block(); + + assertThat(taskExecutionDetails) + .usingRecursiveComparison() + .ignoringFields("submittedDate") + .isEqualTo(TASK_EXECUTION_DETAILS()); + } + + @Test + void readDetailsShouldBeAbleToRetrieveASavedRecordWithAdditionalInformation() { + testee.saveDetails(TASK_EXECUTION_DETAILS_WITH_ADDITIONAL_INFORMATION()).block(); + + TaskExecutionDetails taskExecutionDetails = testee.readDetails(TASK_ID()).block(); + + assertThat(taskExecutionDetails) + .usingRecursiveComparison() + .ignoringFields("submittedDate") + .isEqualTo(TASK_EXECUTION_DETAILS_WITH_ADDITIONAL_INFORMATION()); + + assertThat(taskExecutionDetails.getSubmittedDate().isEqual(TASK_EXECUTION_DETAILS_WITH_ADDITIONAL_INFORMATION().getSubmittedDate())) + .isTrue(); + } + + @Test + void saveDetailsShouldUpdateRecords() { + testee.saveDetails(TASK_EXECUTION_DETAILS()).block(); + + testee.saveDetails(TASK_EXECUTION_DETAILS_UPDATED()).block(); + + TaskExecutionDetails taskExecutionDetails = testee.readDetails(TASK_ID()).block(); + + assertThat(taskExecutionDetails) + .usingRecursiveComparison() + .ignoringFields("submittedDate") + .isEqualTo(TASK_EXECUTION_DETAILS_UPDATED()); + + assertThat(taskExecutionDetails.getSubmittedDate().isEqual(TASK_EXECUTION_DETAILS_UPDATED().getSubmittedDate())) + .isTrue(); + } + + @Test + void readDetailsShouldReturnEmptyWhenNone() { + Optional<TaskExecutionDetails> taskExecutionDetails = testee.readDetails(TASK_ID()).blockOptional(); + assertThat(taskExecutionDetails).isEmpty(); + } + + @Test + void listDetailsShouldReturnEmptyWhenNone() { + Stream<TaskExecutionDetails> taskExecutionDetails = testee.listDetails().toStream(); + assertThat(taskExecutionDetails).isEmpty(); + } + + @Test + void listDetailsShouldReturnAllRecords() { + testee.saveDetails(TASK_EXECUTION_DETAILS()).block(); + testee.saveDetails(TASK_EXECUTION_DETAILS_2()).block(); + + Stream<TaskExecutionDetails> taskExecutionDetails = testee.listDetails().toStream(); + + assertThat(taskExecutionDetails) + .usingRecursiveFieldByFieldElementComparatorIgnoringFields("submittedDate") + .containsOnly(TASK_EXECUTION_DETAILS(), TASK_EXECUTION_DETAILS_2()); + } + + @Test + void listDetailsShouldReturnLastUpdatedRecords() { + testee.saveDetails(TASK_EXECUTION_DETAILS()).block(); + testee.saveDetails(TASK_EXECUTION_DETAILS_UPDATED()).block(); + + Stream<TaskExecutionDetails> taskExecutionDetails = testee.listDetails().toStream(); + assertThat(taskExecutionDetails) + .usingRecursiveFieldByFieldElementComparatorIgnoringFields("submittedDate") + .containsOnly(TASK_EXECUTION_DETAILS_UPDATED()); + } + + @Test + void listBeforeDateShouldReturnCorrectEntry() { + TaskExecutionDetails taskExecutionDetails1 = new TaskExecutionDetails(TASK_ID(), + TaskType.of("type"), + TaskManager.Status.COMPLETED, + ZonedDateTime.ofInstant(Instant.parse("2000-01-01T00:00:00Z"), ZoneId.systemDefault()), + TaskExecutionDetailsFixture.SUBMITTED_NODE(), + Optional::empty, + Optional.empty(), + Optional.empty(), + Optional.empty(), + Optional.empty(), + Optional.empty(), + Optional.empty()); + + TaskExecutionDetails taskExecutionDetails2 = new TaskExecutionDetails(TASK_ID_2(), + TaskType.of("type"), + TaskManager.Status.COMPLETED, + ZonedDateTime.ofInstant(Instant.parse("2000-01-20T00:00:00Z"), ZoneId.systemDefault()), + TaskExecutionDetailsFixture.SUBMITTED_NODE(), + Optional::empty, + Optional.empty(), + Optional.empty(), + Optional.empty(), + Optional.empty(), + Optional.empty(), + Optional.empty()); + + testee.saveDetails(taskExecutionDetails1).block(); + testee.saveDetails(taskExecutionDetails2).block(); + + assertThat(Flux.from(testee.listDetailsByBeforeDate(Instant.parse("2000-01-15T12:00:55Z"))).collectList().block()) + .usingRecursiveFieldByFieldElementComparatorIgnoringFields("submittedDate") + .containsOnly(taskExecutionDetails1); + } + + @Test + void removeShouldDeleteAssignEntry() { + TaskExecutionDetails taskExecutionDetails1 = new TaskExecutionDetails(TASK_ID(), + TaskType.of("type"), + TaskManager.Status.COMPLETED, + ZonedDateTime.ofInstant(Instant.parse("2000-01-01T00:00:00Z"), ZoneId.systemDefault()), + TaskExecutionDetailsFixture.SUBMITTED_NODE(), + Optional::empty, + Optional.empty(), + Optional.empty(), + Optional.empty(), + Optional.empty(), + Optional.empty(), + Optional.empty()); + + testee.saveDetails(taskExecutionDetails1).block(); + + assertThat(testee.listDetails().collectList().block()) + .hasSize(1); + + testee.remove(taskExecutionDetails1).block(); + + assertThat(testee.listDetails().collectList().block()) + .isEmpty(); + } +} \ No newline at end of file diff --git a/server/task/task-postgres/src/test/java/org/apache/james/task/eventsourcing/postgres/PostgresTaskExecutionDetailsProjectionTest.java b/server/task/task-postgres/src/test/java/org/apache/james/task/eventsourcing/postgres/PostgresTaskExecutionDetailsProjectionTest.java new file mode 100644 index 0000000000..d64c0688d2 --- /dev/null +++ b/server/task/task-postgres/src/test/java/org/apache/james/task/eventsourcing/postgres/PostgresTaskExecutionDetailsProjectionTest.java @@ -0,0 +1,52 @@ +/**************************************************************** + * Licensed to the Apache Software Foundation (ASF) under one * + * or more contributor license agreements. See the NOTICE file * + * distributed with this work for additional information * + * regarding copyright ownership. The ASF licenses this file * + * to you under the Apache License, Version 2.0 (the * + * "License"); you may not use this file except in compliance * + * with the License. You may obtain a copy of the License at * + * * + * http://www.apache.org/licenses/LICENSE-2.0 * + * * + * Unless required by applicable law or agreed to in writing, * + * software distributed under the License is distributed on an * + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY * + * KIND, either express or implied. See the License for the * + * specific language governing permissions and limitations * + * under the License. * + ****************************************************************/ + +package org.apache.james.task.eventsourcing.postgres; + +import java.util.function.Supplier; + +import org.apache.james.backends.postgres.PostgresExtension; +import org.apache.james.server.task.json.JsonTaskAdditionalInformationSerializer; +import org.apache.james.server.task.json.dto.MemoryReferenceWithCounterTaskAdditionalInformationDTO; +import org.apache.james.task.eventsourcing.TaskExecutionDetailsProjection; +import org.apache.james.task.eventsourcing.TaskExecutionDetailsProjectionContract; +import org.junit.jupiter.api.BeforeEach; +import org.junit.jupiter.api.extension.RegisterExtension; + +class PostgresTaskExecutionDetailsProjectionTest implements TaskExecutionDetailsProjectionContract { + @RegisterExtension + static PostgresExtension postgresExtension = PostgresExtension.withoutRowLevelSecurity(PostgresTaskExecutionDetailsProjectionModule.MODULE()); + + private static final JsonTaskAdditionalInformationSerializer JSON_TASK_ADDITIONAL_INFORMATION_SERIALIZER = JsonTaskAdditionalInformationSerializer.of(MemoryReferenceWithCounterTaskAdditionalInformationDTO.SERIALIZATION_MODULE); + + private Supplier<PostgresTaskExecutionDetailsProjection> testeeSupplier; + + @BeforeEach + void setUp() { + PostgresTaskExecutionDetailsProjectionDAO dao = new PostgresTaskExecutionDetailsProjectionDAO(postgresExtension.getPostgresExecutor(), + JSON_TASK_ADDITIONAL_INFORMATION_SERIALIZER); + testeeSupplier = () -> new PostgresTaskExecutionDetailsProjection(dao); + } + + @Override + public TaskExecutionDetailsProjection testee() { + return testeeSupplier.get(); + } + +} --------------------------------------------------------------------- To unsubscribe, e-mail: [email protected] For additional commands, e-mail: [email protected]
