This is an automated email from the ASF dual-hosted git repository. btellier pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/james-project.git
commit 82438a76f99a620d08acfc712b2950e76f1c27a5 Author: Benoit Tellier <[email protected]> AuthorDate: Fri Feb 21 12:59:10 2020 +0700 JAMES-3059 Allow Cassandra test suite to inject fault at the session level --- .../james/backends/cassandra/CassandraCluster.java | 9 +- .../james/backends/cassandra/TestingSession.java | 245 +++++++++++++++++++++ .../backends/cassandra/TestingSessionTest.java | 161 ++++++++++++++ 3 files changed, 411 insertions(+), 4 deletions(-) diff --git a/backends-common/cassandra/src/test/java/org/apache/james/backends/cassandra/CassandraCluster.java b/backends-common/cassandra/src/test/java/org/apache/james/backends/cassandra/CassandraCluster.java index f4be201..4b4854b 100644 --- a/backends-common/cassandra/src/test/java/org/apache/james/backends/cassandra/CassandraCluster.java +++ b/backends-common/cassandra/src/test/java/org/apache/james/backends/cassandra/CassandraCluster.java @@ -30,14 +30,13 @@ import org.apache.james.backends.cassandra.init.configuration.ClusterConfigurati import org.apache.james.util.Host; import com.datastax.driver.core.Cluster; -import com.datastax.driver.core.Session; public final class CassandraCluster implements AutoCloseable { public static final String KEYSPACE = "testing"; private static Optional<Exception> startStackTrace = Optional.empty(); private final CassandraModule module; - private Session session; + private TestingSession session; private CassandraTypesProvider typesProvider; private Cluster cluster; @@ -65,14 +64,15 @@ public final class CassandraCluster implements AutoCloseable { .build(); cluster = ClusterFactory.create(clusterConfiguration); KeyspaceFactory.createKeyspace(clusterConfiguration, cluster); - session = new SessionWithInitializedTablesFactory(clusterConfiguration, cluster, module).get(); + session = new TestingSession( + new SessionWithInitializedTablesFactory(clusterConfiguration, cluster, module).get()); typesProvider = new CassandraTypesProvider(module, session); } catch (Exception exception) { throw new RuntimeException(exception); } } - public Session getConf() { + public TestingSession getConf() { return session; } @@ -82,6 +82,7 @@ public final class CassandraCluster implements AutoCloseable { @Override public void close() { + session.resetExecutionHook(); if (!cluster.isClosed()) { clearTables(); closeCluster(); diff --git a/backends-common/cassandra/src/test/java/org/apache/james/backends/cassandra/TestingSession.java b/backends-common/cassandra/src/test/java/org/apache/james/backends/cassandra/TestingSession.java new file mode 100644 index 0000000..5e0ad9d --- /dev/null +++ b/backends-common/cassandra/src/test/java/org/apache/james/backends/cassandra/TestingSession.java @@ -0,0 +1,245 @@ +/**************************************************************** + * Licensed to the Apache Software Foundation (ASF) under one * + * or more contributor license agreements. See the NOTICE file * + * distributed with this work for additional information * + * regarding copyright ownership. The ASF licenses this file * + * to you under the Apache License, Version 2.0 (the * + * "License"); you may not use this file except in compliance * + * with the License. You may obtain a copy of the License at * + * * + * http://www.apache.org/licenses/LICENSE-2.0 * + * * + * Unless required by applicable law or agreed to in writing, * + * software distributed under the License is distributed on an * + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY * + * KIND, either express or implied. See the License for the * + * specific language governing permissions and limitations * + * under the License. * + ****************************************************************/ + +package org.apache.james.backends.cassandra; + +import java.util.Map; +import java.util.concurrent.atomic.AtomicInteger; +import java.util.function.BiFunction; +import java.util.function.Predicate; + +import com.datastax.driver.core.BoundStatement; +import com.datastax.driver.core.CloseFuture; +import com.datastax.driver.core.Cluster; +import com.datastax.driver.core.PreparedStatement; +import com.datastax.driver.core.RegularStatement; +import com.datastax.driver.core.ResultSet; +import com.datastax.driver.core.ResultSetFuture; +import com.datastax.driver.core.Session; +import com.datastax.driver.core.Statement; +import com.google.common.util.concurrent.ListenableFuture; + +public class TestingSession implements Session { + enum Behavior { + THROW((session, statement) -> { + RuntimeException injected_failure = new RuntimeException("Injected failure"); + injected_failure.printStackTrace(); + throw injected_failure; + }), + EXECUTE_NORMALLY(Session::executeAsync); + + private final BiFunction<Session, Statement, ResultSetFuture> behaviour; + + Behavior(BiFunction<Session, Statement, ResultSetFuture> behaviour) { + this.behaviour = behaviour; + } + + ResultSetFuture execute(Session session, Statement statement) { + return behaviour.apply(session, statement); + } + } + + @FunctionalInterface + interface StatementPredicate extends Predicate<Statement> { + + } + + static class BoundStatementStartingWith implements StatementPredicate { + private final String queryStringPrefix; + + BoundStatementStartingWith(String queryStringPrefix) { + this.queryStringPrefix = queryStringPrefix; + } + + @Override + public boolean test(Statement statement) { + if (statement instanceof BoundStatement) { + BoundStatement boundStatement = (BoundStatement) statement; + return boundStatement.preparedStatement() + .getQueryString() + .startsWith(queryStringPrefix); + } + return false; + } + } + + @FunctionalInterface + public interface RequiresCondition { + RequiresApplyCount condition(StatementPredicate statementPredicate); + + default RequiresApplyCount always() { + return condition(ALL_STATEMENTS); + } + + default RequiresApplyCount whenBoundStatementStartsWith(String queryStringPrefix) { + return condition(new BoundStatementStartingWith(queryStringPrefix)); + } + } + + @FunctionalInterface + public interface RequiresApplyCount { + FinalStage times(int applyCount); + } + + @FunctionalInterface + public interface FinalStage { + void setExecutionHook(); + } + + private static class ExecutionHook { + final StatementPredicate statementPredicate; + final Behavior behavior; + final AtomicInteger remaining; + + private ExecutionHook(StatementPredicate statementPredicate, Behavior behavior, int applyCount) { + this.statementPredicate = statementPredicate; + this.behavior = behavior; + this.remaining = new AtomicInteger(applyCount); + } + + ResultSetFuture execute(Session session, Statement statement) { + if (statementPredicate.test(statement)) { + int hookPosition = remaining.getAndDecrement(); + if (hookPosition > 0) { + return behavior.execute(session, statement); + } + } + return Behavior.EXECUTE_NORMALLY.execute(session, statement); + } + } + + private static StatementPredicate ALL_STATEMENTS = statement -> true; + private static ExecutionHook NO_EXECUTION_HOOK = new ExecutionHook(ALL_STATEMENTS, Behavior.EXECUTE_NORMALLY, 0); + + private final Session delegate; + private volatile ExecutionHook executionHook; + + TestingSession(Session delegate) { + this.delegate = delegate; + this.executionHook = NO_EXECUTION_HOOK; + } + + public RequiresCondition fail() { + return condition -> applyCount -> () -> executionHook = new ExecutionHook(condition, Behavior.THROW, applyCount); + } + + public void resetExecutionHook() { + executionHook = NO_EXECUTION_HOOK; + } + + @Override + public String getLoggedKeyspace() { + return delegate.getLoggedKeyspace(); + } + + @Override + public Session init() { + return delegate.init(); + } + + @Override + public ListenableFuture<Session> initAsync() { + return delegate.initAsync(); + } + + @Override + public ResultSet execute(String query) { + return delegate.execute(query); + } + + @Override + public ResultSet execute(String query, Object... values) { + return delegate.execute(query, values); + } + + @Override + public ResultSet execute(String query, Map<String, Object> values) { + return delegate.execute(query, values); + } + + @Override + public ResultSet execute(Statement statement) { + return delegate.execute(statement); + } + + @Override + public ResultSetFuture executeAsync(String query) { + return delegate.executeAsync(query); + } + + @Override + public ResultSetFuture executeAsync(String query, Object... values) { + return delegate.executeAsync(query, values); + } + + @Override + public ResultSetFuture executeAsync(String query, Map<String, Object> values) { + return delegate.executeAsync(query, values); + } + + @Override + public ResultSetFuture executeAsync(Statement statement) { + return executionHook.execute(delegate, statement); + } + + @Override + public PreparedStatement prepare(String query) { + return delegate.prepare(query); + } + + @Override + public PreparedStatement prepare(RegularStatement statement) { + return delegate.prepare(statement); + } + + @Override + public ListenableFuture<PreparedStatement> prepareAsync(String query) { + return delegate.prepareAsync(query); + } + + @Override + public ListenableFuture<PreparedStatement> prepareAsync(RegularStatement statement) { + return delegate.prepareAsync(statement); + } + + @Override + public CloseFuture closeAsync() { + return delegate.closeAsync(); + } + + @Override + public void close() { + delegate.close(); + } + + @Override + public boolean isClosed() { + return delegate.isClosed(); + } + + @Override + public Cluster getCluster() { + return delegate.getCluster(); + } + + @Override + public State getState() { + return delegate.getState(); + } +} diff --git a/backends-common/cassandra/src/test/java/org/apache/james/backends/cassandra/TestingSessionTest.java b/backends-common/cassandra/src/test/java/org/apache/james/backends/cassandra/TestingSessionTest.java new file mode 100644 index 0000000..1983e99 --- /dev/null +++ b/backends-common/cassandra/src/test/java/org/apache/james/backends/cassandra/TestingSessionTest.java @@ -0,0 +1,161 @@ +/**************************************************************** + * Licensed to the Apache Software Foundation (ASF) under one * + * or more contributor license agreements. See the NOTICE file * + * distributed with this work for additional information * + * regarding copyright ownership. The ASF licenses this file * + * to you under the Apache License, Version 2.0 (the * + * "License"); you may not use this file except in compliance * + * with the License. You may obtain a copy of the License at * + * * + * http://www.apache.org/licenses/LICENSE-2.0 * + * * + * Unless required by applicable law or agreed to in writing, * + * software distributed under the License is distributed on an * + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY * + * KIND, either express or implied. See the License for the * + * specific language governing permissions and limitations * + * under the License. * + ****************************************************************/ + +package org.apache.james.backends.cassandra; + +import static org.assertj.core.api.Assertions.assertThatCode; +import static org.assertj.core.api.Assertions.assertThatThrownBy; + +import org.apache.james.backends.cassandra.versions.CassandraSchemaVersionDAO; +import org.apache.james.backends.cassandra.versions.CassandraSchemaVersionModule; +import org.apache.james.backends.cassandra.versions.SchemaVersion; +import org.assertj.core.api.SoftAssertions; +import org.junit.jupiter.api.BeforeEach; +import org.junit.jupiter.api.Test; +import org.junit.jupiter.api.extension.RegisterExtension; + +class TestingSessionTest { + @RegisterExtension + static CassandraClusterExtension cassandraCluster = new CassandraClusterExtension(CassandraSchemaVersionModule.MODULE); + + private CassandraSchemaVersionDAO dao; + + @BeforeEach + void setUp(CassandraCluster cassandra) { + dao = new CassandraSchemaVersionDAO(cassandra.getConf()); + } + + @Test + void daoOperationShouldNotBeInstrumentedByDefault() { + assertThatCode(() -> dao.getCurrentSchemaVersion().block()) + .doesNotThrowAnyException(); + } + + @Test + void daoOperationShouldNotBeInstrumentedWhenNotMatching(CassandraCluster cassandra) { + cassandra.getConf() + .fail() + .whenBoundStatementStartsWith("non matching") + .times(1) + .setExecutionHook(); + + assertThatCode(() -> dao.getCurrentSchemaVersion().block()) + .doesNotThrowAnyException(); + } + + @Test + void daoOperationShouldNotBeInstrumentedWhenTimesIsZero(CassandraCluster cassandra) { + cassandra.getConf() + .fail() + .whenBoundStatementStartsWith("SELECT value FROM schemaVersion;") + .times(0) + .setExecutionHook(); + + assertThatCode(() -> dao.getCurrentSchemaVersion().block()) + .doesNotThrowAnyException(); + } + + @Test + void daoOperationShouldNotBeInstrumentedWhenTimesIsNegative(CassandraCluster cassandra) { + cassandra.getConf() + .fail() + .whenBoundStatementStartsWith("SELECT value FROM schemaVersion;") + .times(-1) + .setExecutionHook(); + + assertThatCode(() -> dao.getCurrentSchemaVersion().block()) + .doesNotThrowAnyException(); + } + + @Test + void daoOperationShouldFailWhenInstrumented(CassandraCluster cassandra) { + cassandra.getConf() + .fail() + .whenBoundStatementStartsWith("SELECT value FROM schemaVersion;") + .times(1) + .setExecutionHook(); + + assertThatThrownBy(() -> dao.getCurrentSchemaVersion().block()) + .isInstanceOf(RuntimeException.class); + } + + @Test + void daoShouldNotBeInstrumentedWhenTimesIsExceeded(CassandraCluster cassandra) { + cassandra.getConf() + .fail() + .whenBoundStatementStartsWith("SELECT value FROM schemaVersion;") + .times(1) + .setExecutionHook(); + + try { + dao.getCurrentSchemaVersion().block(); + } catch (Exception e) { + // discard expected exception + } + + assertThatCode(() -> dao.getCurrentSchemaVersion().block()) + .doesNotThrowAnyException(); + } + + @Test + void timesShouldSpecifyExactlyTheFailureCount(CassandraCluster cassandra) { + cassandra.getConf() + .fail() + .whenBoundStatementStartsWith("SELECT value FROM schemaVersion;") + .times(2) + .setExecutionHook(); + + SoftAssertions.assertSoftly(softly -> { + assertThatThrownBy(() -> dao.getCurrentSchemaVersion().block()) + .isInstanceOf(RuntimeException.class); + assertThatThrownBy(() -> dao.getCurrentSchemaVersion().block()) + .isInstanceOf(RuntimeException.class); + assertThatCode(() -> dao.getCurrentSchemaVersion().block()) + .doesNotThrowAnyException(); + }); + } + + @Test + void resetExecutionHookShouldClearInstrumentation(CassandraCluster cassandra) { + cassandra.getConf() + .fail() + .whenBoundStatementStartsWith("SELECT value FROM schemaVersion;") + .times(1) + .setExecutionHook(); + + cassandra.getConf().resetExecutionHook(); + + assertThatCode(() -> dao.getCurrentSchemaVersion().block()) + .doesNotThrowAnyException(); + } + + @Test + void timesShouldBeTakenIntoAccountOnlyForMatchingStatements(CassandraCluster cassandra) { + cassandra.getConf() + .fail() + .whenBoundStatementStartsWith("SELECT value FROM schemaVersion;") + .times(1) + .setExecutionHook(); + + dao.updateVersion(new SchemaVersion(36)).block(); + + assertThatThrownBy(() -> dao.getCurrentSchemaVersion().block()) + .isInstanceOf(RuntimeException.class); + } +} --------------------------------------------------------------------- To unsubscribe, e-mail: [email protected] For additional commands, e-mail: [email protected]
