This is an automated email from the ASF dual-hosted git repository. eshu11 pushed a commit to branch feature/GEODE-5624 in repository https://gitbox.apache.org/repos/asf/geode.git
The following commit(s) were added to refs/heads/feature/GEODE-5624 by this push: new 13b6cfe more refactoring. 13b6cfe is described below commit 13b6cfe2cf2e1709a12d47e1dce7030af57e89f2 Author: eshu <e...@pivotal.io> AuthorDate: Mon Aug 27 13:11:04 2018 -0700 more refactoring. --- .../geode/internal/cache/AfterCompletion.java | 105 +++++++++++++ .../geode/internal/cache/BeforeCompletion.java | 64 ++++++++ .../internal/cache/SingleThreadJTAExecutor.java | 172 +++------------------ .../org/apache/geode/internal/cache/TXState.java | 11 +- .../geode/internal/cache/AfterCompletionTest.java | 131 ++++++++++++++++ .../geode/internal/cache/BeforeCompletionTest.java | 98 ++++++++++++ .../cache/SingleThreadJTAExecutorTest.java | 84 +++------- 7 files changed, 447 insertions(+), 218 deletions(-) diff --git a/geode-core/src/main/java/org/apache/geode/internal/cache/AfterCompletion.java b/geode-core/src/main/java/org/apache/geode/internal/cache/AfterCompletion.java new file mode 100644 index 0000000..02ef92c --- /dev/null +++ b/geode-core/src/main/java/org/apache/geode/internal/cache/AfterCompletion.java @@ -0,0 +1,105 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more contributor license + * agreements. See the NOTICE file distributed with this work for additional information regarding + * copyright ownership. The ASF licenses this file to You under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance with the License. You may obtain a + * copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software distributed under the License + * is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express + * or implied. See the License for the specific language governing permissions and limitations under + * the License. + */ +package org.apache.geode.internal.cache; + +import java.util.function.BooleanSupplier; + +import org.apache.logging.log4j.Logger; + +import org.apache.geode.CancelCriterion; +import org.apache.geode.internal.logging.LogService; + +public class AfterCompletion { + private static final Logger logger = LogService.getLogger(); + + private boolean started; + private boolean finished; + private int status = -1; + private boolean cancelled; + private RuntimeException exception; + + public synchronized void doOp(TXState txState, CancelCriterion cancelCriterion) { + // there should be a transaction timeout that keeps this thread + // from sitting around forever if the client goes away + // The above was done by setting afterCompletionCancelled in txState + // during cleanup. When client departed, the transaction/JTA + // will be timed out and cleanup code will be executed. + waitForExecuteOrCancel(cancelCriterion); + started = true; + logger.debug("executing afterCompletion notification"); + + try { + if (cancelled) { + txState.doCleanup(); + } else { + txState.doAfterCompletion(status); + } + } catch (RuntimeException exception) { + this.exception = exception; + } finally { + logger.debug("afterCompletion notification completed"); + finished = true; + notifyAll(); + } + } + + private void waitForExecuteOrCancel(CancelCriterion cancelCriterion) { + waitForCondition(cancelCriterion, () -> { + return (status == -1 && !cancelled); + }); + } + + private synchronized void waitForCondition(CancelCriterion cancelCriterion, + BooleanSupplier condition) { + while (condition.getAsBoolean()) { + cancelCriterion.checkCancelInProgress(null); + try { + logger.debug("waiting for notification"); + wait(1000); + } catch (InterruptedException ignore) { + // eat the interrupt and check for exit conditions + } + } + } + + public synchronized void execute(CancelCriterion cancelCriterion, int status) { + this.status = status; + signalAndWaitForDoOp(cancelCriterion); + } + + private void signalAndWaitForDoOp(CancelCriterion cancelCriterion) { + notifyAll(); + waitUntilFinished(cancelCriterion); + if (exception != null) { + throw exception; + } + } + + private void waitUntilFinished(CancelCriterion cancelCriterion) { + waitForCondition(cancelCriterion, () -> { + return !finished; + }); + + } + + public synchronized void cancel(CancelCriterion cancelCriterion) { + cancelled = true; + signalAndWaitForDoOp(cancelCriterion); + } + + public synchronized boolean isStarted() { + return started; + } +} diff --git a/geode-core/src/main/java/org/apache/geode/internal/cache/BeforeCompletion.java b/geode-core/src/main/java/org/apache/geode/internal/cache/BeforeCompletion.java new file mode 100644 index 0000000..99a062c --- /dev/null +++ b/geode-core/src/main/java/org/apache/geode/internal/cache/BeforeCompletion.java @@ -0,0 +1,64 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more contributor license + * agreements. See the NOTICE file distributed with this work for additional information regarding + * copyright ownership. The ASF licenses this file to You under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance with the License. You may obtain a + * copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software distributed under the License + * is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express + * or implied. See the License for the specific language governing permissions and limitations under + * the License. + */ +package org.apache.geode.internal.cache; + +import org.apache.logging.log4j.Logger; + +import org.apache.geode.CancelCriterion; +import org.apache.geode.cache.SynchronizationCommitConflictException; +import org.apache.geode.internal.logging.LogService; + +public class BeforeCompletion { + private static final Logger logger = LogService.getLogger(); + + private boolean started; + private boolean finished; + private SynchronizationCommitConflictException exception; + + public synchronized void doOp(TXState txState) { + try { + txState.doBeforeCompletion(); + } catch (SynchronizationCommitConflictException exception) { + this.exception = exception; + } finally { + logger.debug("beforeCompletion notification completed"); + finished = true; + notifyAll(); + } + } + + public synchronized void execute(CancelCriterion cancelCriterion) { + started = true; + waitUntilFinished(cancelCriterion); + if (exception != null) { + throw exception; + } + } + + private void waitUntilFinished(CancelCriterion cancelCriterion) { + while (!finished) { + cancelCriterion.checkCancelInProgress(null); + try { + wait(1000); + } catch (InterruptedException ignore) { + // eat the interrupt and check for exit conditions + } + } + } + + public synchronized boolean isStarted() { + return started; + } +} diff --git a/geode-core/src/main/java/org/apache/geode/internal/cache/SingleThreadJTAExecutor.java b/geode-core/src/main/java/org/apache/geode/internal/cache/SingleThreadJTAExecutor.java index 4df8ca4..7ecca6a 100644 --- a/geode-core/src/main/java/org/apache/geode/internal/cache/SingleThreadJTAExecutor.java +++ b/geode-core/src/main/java/org/apache/geode/internal/cache/SingleThreadJTAExecutor.java @@ -18,180 +18,54 @@ import java.util.concurrent.Executor; import org.apache.logging.log4j.Logger; -import org.apache.geode.cache.SynchronizationCommitConflictException; +import org.apache.geode.CancelCriterion; import org.apache.geode.internal.logging.LogService; /** - * TXStateSynchronizationThread manages beforeCompletion and afterCompletion calls. - * The thread should be instantiated with a Runnable that invokes beforeCompletion behavior. - * Then you must invoke executeAfterCompletion() with another Runnable that invokes afterCompletion - * behavior. + * This class ensures that beforeCompletion and afterCompletion are executed in the same thread. * - * @since Geode 1.6.0 + * @since Geode 1.7.0 */ public class SingleThreadJTAExecutor { private static final Logger logger = LogService.getLogger(); - private final Object beforeCompletionSync = new Object(); - private boolean beforeCompletionStarted; - private boolean beforeCompletionFinished; - private SynchronizationCommitConflictException beforeCompletionException; + private final BeforeCompletion beforeCompletion; + private final AfterCompletion afterCompletion; - private final Object afterCompletionSync = new Object(); - private boolean afterCompletionStarted; - private boolean afterCompletionFinished; - private int afterCompletionStatus = -1; - private boolean afterCompletionCancelled; - private RuntimeException afterCompletionException; - - public SingleThreadJTAExecutor() {} - - void doOps(TXState txState) { - doBeforeCompletionOp(txState); - doAfterCompletionOp(txState); - } - - void doBeforeCompletionOp(TXState txState) { - synchronized (beforeCompletionSync) { - try { - txState.doBeforeCompletion(); - } catch (SynchronizationCommitConflictException exception) { - beforeCompletionException = exception; - } finally { - if (logger.isDebugEnabled()) { - logger.debug("beforeCompletion notification completed"); - } - beforeCompletionFinished = true; - beforeCompletionSync.notifyAll(); - } - } - } - - boolean isBeforeCompletionStarted() { - synchronized (beforeCompletionSync) { - return beforeCompletionStarted; - } - } - - boolean isAfterCompletionStarted() { - synchronized (afterCompletionSync) { - return afterCompletionStarted; - } + public SingleThreadJTAExecutor() { + this(new BeforeCompletion(), new AfterCompletion()); } - boolean isBeforeCompletionFinished() { - synchronized (beforeCompletionSync) { - return beforeCompletionFinished; - } + public SingleThreadJTAExecutor(BeforeCompletion beforeCompletion, + AfterCompletion afterCompletion) { + this.beforeCompletion = beforeCompletion; + this.afterCompletion = afterCompletion; } - boolean isAfterCompletionFinished() { - synchronized (afterCompletionSync) { - return afterCompletionFinished; - } + private void doOps(TXState txState, CancelCriterion cancelCriterion) { + beforeCompletion.doOp(txState); + afterCompletion.doOp(txState, cancelCriterion); } - public void executeBeforeCompletion(TXState txState, Executor executor) { - executor.execute(() -> doOps(txState)); - - synchronized (beforeCompletionSync) { - beforeCompletionStarted = true; - while (!beforeCompletionFinished) { - try { - beforeCompletionSync.wait(1000); - } catch (InterruptedException ignore) { - // eat the interrupt and check for exit conditions - } - txState.getCache().getCancelCriterion().checkCancelInProgress(null); - } - if (getBeforeCompletionException() != null) { - throw getBeforeCompletionException(); - } - } - } - - SynchronizationCommitConflictException getBeforeCompletionException() { - return beforeCompletionException; - } - - private void doAfterCompletionOp(TXState txState) { - synchronized (afterCompletionSync) { - // there should be a transaction timeout that keeps this thread - // from sitting around forever if the client goes away - // The above was done by setting afterCompletionCancelled in txState - // during cleanup. When client departed, the transaction/JTA - // will be timed out and cleanup code will be executed. - final boolean isDebugEnabled = logger.isDebugEnabled(); - while (afterCompletionStatus == -1 && !afterCompletionCancelled) { - try { - if (isDebugEnabled) { - logger.debug("waiting for afterCompletion notification"); - } - afterCompletionSync.wait(1000); - } catch (InterruptedException ignore) { - // eat the interrupt and check for exit conditions - } - } - afterCompletionStarted = true; - if (isDebugEnabled) { - logger.debug("executing afterCompletion notification"); - } - try { - if (!afterCompletionCancelled) { - txState.doAfterCompletion(afterCompletionStatus); - } else { - txState.doCleanup(); - } - } catch (RuntimeException exception) { - afterCompletionException = exception; - } finally { - if (isDebugEnabled) { - logger.debug("afterCompletion notification completed"); - } - afterCompletionFinished = true; - afterCompletionSync.notifyAll(); - } - } - } - - public void executeAfterCompletion(TXState txState, int status) { - synchronized (afterCompletionSync) { - afterCompletionStatus = status; - afterCompletionSync.notifyAll(); - waitForAfterCompletionToFinish(txState); - if (getAfterCompletionException() != null) { - throw getAfterCompletionException(); - } - } - } + public void executeBeforeCompletion(TXState txState, Executor executor, + CancelCriterion cancelCriterion) { + executor.execute(() -> doOps(txState, cancelCriterion)); - private void waitForAfterCompletionToFinish(TXState txState) { - while (!afterCompletionFinished) { - try { - afterCompletionSync.wait(1000); - } catch (InterruptedException ignore) { - // eat the interrupt and check for exit conditions - } - txState.getCache().getCancelCriterion().checkCancelInProgress(null); - } + beforeCompletion.execute(cancelCriterion); } - RuntimeException getAfterCompletionException() { - return afterCompletionException; + public void executeAfterCompletion(CancelCriterion cancelCriterion, int status) { + afterCompletion.execute(cancelCriterion, status); } /** * stop waiting for an afterCompletion to arrive and just exit */ - public void cleanup(TXState txState) { - synchronized (afterCompletionSync) { - afterCompletionCancelled = true; - afterCompletionSync.notifyAll(); - waitForAfterCompletionToFinish(txState); - } + public void cleanup(CancelCriterion cancelCriterion) { + afterCompletion.cancel(cancelCriterion); } public boolean shouldDoCleanup() { - return isBeforeCompletionStarted() && !isAfterCompletionStarted(); + return beforeCompletion.isStarted() && !afterCompletion.isStarted(); } } diff --git a/geode-core/src/main/java/org/apache/geode/internal/cache/TXState.java b/geode-core/src/main/java/org/apache/geode/internal/cache/TXState.java index bce1af4..83d00d1 100644 --- a/geode-core/src/main/java/org/apache/geode/internal/cache/TXState.java +++ b/geode-core/src/main/java/org/apache/geode/internal/cache/TXState.java @@ -31,6 +31,7 @@ import javax.transaction.Status; import org.apache.logging.log4j.Logger; +import org.apache.geode.CancelCriterion; import org.apache.geode.CancelException; import org.apache.geode.SystemFailure; import org.apache.geode.cache.CommitConflictException; @@ -869,7 +870,7 @@ public class TXState implements TXStateInterface { protected void cleanup() { if (singleThreadJTAExecutor.shouldDoCleanup()) { - singleThreadJTAExecutor.cleanup(this); + singleThreadJTAExecutor.cleanup(getCancelCriterion()); } else { doCleanup(); } @@ -1036,13 +1037,17 @@ public class TXState implements TXStateInterface { } beforeCompletionCalled = true; singleThreadJTAExecutor.executeBeforeCompletion(this, - getExecutor()); + getExecutor(), getCancelCriterion()); } Executor getExecutor() { return getCache().getDistributionManager().getWaitingThreadPool(); } + CancelCriterion getCancelCriterion() { + return getCache().getCancelCriterion(); + } + void doBeforeCompletion() { proxy.getTxMgr().setTXState(null); final long opStart = CachePerfStats.getStatTime(); @@ -1105,7 +1110,7 @@ public class TXState implements TXStateInterface { // sitting in the waiting pool to execute afterCompletion. Otherwise // throw FailedSynchronizationException(). if (beforeCompletionCalled) { - singleThreadJTAExecutor.executeAfterCompletion(this, status); + singleThreadJTAExecutor.executeAfterCompletion(getCancelCriterion(), status); } else { // rollback does not run beforeCompletion. if (status != Status.STATUS_ROLLEDBACK) { diff --git a/geode-core/src/test/java/org/apache/geode/internal/cache/AfterCompletionTest.java b/geode-core/src/test/java/org/apache/geode/internal/cache/AfterCompletionTest.java new file mode 100644 index 0000000..d94df0e --- /dev/null +++ b/geode-core/src/test/java/org/apache/geode/internal/cache/AfterCompletionTest.java @@ -0,0 +1,131 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more contributor license + * agreements. See the NOTICE file distributed with this work for additional information regarding + * copyright ownership. The ASF licenses this file to You under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance with the License. You may obtain a + * copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software distributed under the License + * is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express + * or implied. See the License for the specific language governing permissions and limitations under + * the License. + */ +package org.apache.geode.internal.cache; + +import static org.assertj.core.api.Assertions.assertThat; +import static org.assertj.core.api.Assertions.assertThatThrownBy; +import static org.mockito.ArgumentMatchers.eq; +import static org.mockito.Mockito.doThrow; +import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.times; +import static org.mockito.Mockito.verify; + +import java.util.concurrent.TimeUnit; + +import javax.transaction.Status; + +import org.awaitility.Awaitility; +import org.junit.Before; +import org.junit.Test; + +import org.apache.geode.CancelCriterion; + +public class AfterCompletionTest { + private AfterCompletion afterCompletion; + private CancelCriterion cancelCriterion; + private TXState txState; + private Thread doOpThread; + + @Before + public void setup() { + afterCompletion = new AfterCompletion(); + cancelCriterion = mock(CancelCriterion.class); + txState = mock(TXState.class); + } + + @Test + public void executeThrowsIfCancelCriterionThrows() { + doThrow(new RuntimeException()).when(cancelCriterion).checkCancelInProgress(null); + + assertThatThrownBy(() -> afterCompletion.execute(cancelCriterion, Status.STATUS_COMMITTED)) + .isInstanceOf(RuntimeException.class); + } + + @Test + public void cancelThrowsIfCancelCriterionThrows() { + doThrow(new RuntimeException()).when(cancelCriterion).checkCancelInProgress(null); + + assertThatThrownBy(() -> afterCompletion.cancel(cancelCriterion)) + .isInstanceOf(RuntimeException.class); + } + + @Test + public void isStartedReturnsFalseIfNotExecuted() { + assertThat(afterCompletion.isStarted()).isFalse(); + } + + @Test + public void isStartedReturnsTrueIfExecuted() { + startDoOp(); + + afterCompletion.execute(cancelCriterion, Status.STATUS_COMMITTED); + + verifyDoOpFinished(); + assertThat(afterCompletion.isStarted()).isTrue(); + } + + @Test + public void executeCallsDoAfterCompletion() { + startDoOp(); + + afterCompletion.execute(cancelCriterion, Status.STATUS_COMMITTED); + verifyDoOpFinished(); + verify(txState, times(1)).doAfterCompletion(eq(Status.STATUS_COMMITTED)); + } + + @Test + public void executeThrowsDoAfterCompletionThrows() { + startDoOp(); + doThrow(new RuntimeException()).when(txState).doAfterCompletion(Status.STATUS_COMMITTED); + + assertThatThrownBy(() -> afterCompletion.execute(cancelCriterion, Status.STATUS_COMMITTED)) + .isInstanceOf(RuntimeException.class); + + verifyDoOpFinished(); + } + + @Test + public void cancelCallsDoCleanup() { + startDoOp(); + + afterCompletion.cancel(cancelCriterion); + verifyDoOpFinished(); + verify(txState, times(1)).doCleanup(); + } + + @Test + public void cancelThrowsDoCleanupThrows() { + startDoOp(); + doThrow(new RuntimeException()).when(txState).doCleanup(); + + assertThatThrownBy(() -> afterCompletion.cancel(cancelCriterion)) + .isInstanceOf(RuntimeException.class); + + verifyDoOpFinished(); + } + + private void startDoOp() { + doOpThread = new Thread(() -> afterCompletion.doOp(txState, cancelCriterion)); + doOpThread.start(); + Awaitility.await().atMost(60, TimeUnit.SECONDS) + .untilAsserted(() -> verify(cancelCriterion, times(1)).checkCancelInProgress(null)); + + } + + private void verifyDoOpFinished() { + Awaitility.await().atMost(60, TimeUnit.SECONDS).until(() -> !doOpThread.isAlive()); + } + +} diff --git a/geode-core/src/test/java/org/apache/geode/internal/cache/BeforeCompletionTest.java b/geode-core/src/test/java/org/apache/geode/internal/cache/BeforeCompletionTest.java new file mode 100644 index 0000000..1f541b6 --- /dev/null +++ b/geode-core/src/test/java/org/apache/geode/internal/cache/BeforeCompletionTest.java @@ -0,0 +1,98 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more contributor license + * agreements. See the NOTICE file distributed with this work for additional information regarding + * copyright ownership. The ASF licenses this file to You under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance with the License. You may obtain a + * copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software distributed under the License + * is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express + * or implied. See the License for the specific language governing permissions and limitations under + * the License. + */ +package org.apache.geode.internal.cache; + +import static org.assertj.core.api.Assertions.assertThat; +import static org.assertj.core.api.Assertions.assertThatThrownBy; +import static org.mockito.Mockito.doThrow; +import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.times; +import static org.mockito.Mockito.verify; + +import java.util.concurrent.TimeUnit; + +import org.awaitility.Awaitility; +import org.junit.Before; +import org.junit.Test; + +import org.apache.geode.CancelCriterion; +import org.apache.geode.cache.SynchronizationCommitConflictException; + +public class BeforeCompletionTest { + + private BeforeCompletion beforeCompletion; + private CancelCriterion cancelCriterion; + private TXState txState; + + @Before + public void setup() { + beforeCompletion = new BeforeCompletion(); + cancelCriterion = mock(CancelCriterion.class); + txState = mock(TXState.class); + } + + @Test + public void executeThrowsExceptionIfDoOpFailedWithException() { + doThrow(new SynchronizationCommitConflictException("")).when(txState).doBeforeCompletion(); + + beforeCompletion.doOp(txState); + + assertThatThrownBy(() -> beforeCompletion.execute(cancelCriterion)) + .isInstanceOf(SynchronizationCommitConflictException.class); + } + + @Test + public void doOpCallsDoBeforeCompletion() { + beforeCompletion.doOp(txState); + + verify(txState, times(1)).doBeforeCompletion(); + } + + @Test + public void isStartedReturnsFalseIfNotExecuted() { + assertThat(beforeCompletion.isStarted()).isFalse(); + } + + @Test + public void isStartedReturnsTrueIfExecuted() { + beforeCompletion.doOp(txState); + beforeCompletion.execute(cancelCriterion); + + assertThat(beforeCompletion.isStarted()).isTrue(); + } + + @Test + public void executeThrowsIfCancelCriterionThrows() { + doThrow(new RuntimeException()).when(cancelCriterion).checkCancelInProgress(null); + + assertThatThrownBy(() -> beforeCompletion.execute(cancelCriterion)) + .isInstanceOf(RuntimeException.class); + } + + @Test + public void executeWaitsUntilDoOpFinish() throws Exception { + Thread thread = new Thread(() -> beforeCompletion.execute(cancelCriterion)); + thread.start(); + // give the thread a chance to get past the "finished" check by waiting until + // checkCancelInProgress is called + Awaitility.await().atMost(60, TimeUnit.SECONDS) + .untilAsserted(() -> verify(cancelCriterion, times(1)).checkCancelInProgress(null)); + + beforeCompletion.doOp(txState); + + Awaitility.await().atMost(60, TimeUnit.SECONDS).until(() -> !(thread.isAlive())); + } + +} diff --git a/geode-core/src/test/java/org/apache/geode/internal/cache/SingleThreadJTAExecutorTest.java b/geode-core/src/test/java/org/apache/geode/internal/cache/SingleThreadJTAExecutorTest.java index 79f4324..1cf70a3 100644 --- a/geode-core/src/test/java/org/apache/geode/internal/cache/SingleThreadJTAExecutorTest.java +++ b/geode-core/src/test/java/org/apache/geode/internal/cache/SingleThreadJTAExecutorTest.java @@ -14,10 +14,9 @@ */ package org.apache.geode.internal.cache; -import static org.assertj.core.api.Assertions.assertThatThrownBy; -import static org.assertj.core.api.Java6Assertions.assertThat; +import static org.mockito.ArgumentMatchers.eq; import static org.mockito.Mockito.RETURNS_DEEP_STUBS; -import static org.mockito.Mockito.doThrow; +import static org.mockito.Mockito.inOrder; import static org.mockito.Mockito.mock; import static org.mockito.Mockito.times; import static org.mockito.Mockito.verify; @@ -25,85 +24,38 @@ import static org.mockito.Mockito.verify; import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; -import javax.transaction.Status; - import org.junit.Before; import org.junit.Test; +import org.mockito.InOrder; -import org.apache.geode.cache.SynchronizationCommitConflictException; -import org.apache.geode.cache.TransactionException; +import org.apache.geode.CancelCriterion; public class SingleThreadJTAExecutorTest { - private TXState txState; private SingleThreadJTAExecutor singleThreadJTAExecutor; + private TXState txState; private ExecutorService executor; + private BeforeCompletion beforeCompletion; + private AfterCompletion afterCompletion; + private CancelCriterion cancelCriterion; @Before public void setup() { txState = mock(TXState.class, RETURNS_DEEP_STUBS); executor = Executors.newSingleThreadExecutor(); + beforeCompletion = mock(BeforeCompletion.class); + afterCompletion = mock(AfterCompletion.class); + cancelCriterion = mock(CancelCriterion.class); + singleThreadJTAExecutor = new SingleThreadJTAExecutor(beforeCompletion, afterCompletion); } @Test - public void executeBeforeCompletionCallsDoBeforeCompletion() { - singleThreadJTAExecutor = new SingleThreadJTAExecutor(); - - singleThreadJTAExecutor.executeBeforeCompletion(txState, executor); - - verify(txState, times(1)).doBeforeCompletion(); - - assertThat(singleThreadJTAExecutor.isBeforeCompletionFinished()).isTrue(); - } - - @Test(expected = SynchronizationCommitConflictException.class) - public void executeBeforeCompletionThrowsExceptionIfBeforeCompletionFailed() { - singleThreadJTAExecutor = new SingleThreadJTAExecutor(); - doThrow(new SynchronizationCommitConflictException("")).when(txState).doBeforeCompletion(); - - singleThreadJTAExecutor.executeBeforeCompletion(txState, executor); - - verify(txState, times(1)).doBeforeCompletion(); - assertThat(singleThreadJTAExecutor.isBeforeCompletionFinished()).isTrue(); - } - - @Test - public void executeAfterCompletionCallsDoAfterCompletion() { - singleThreadJTAExecutor = new SingleThreadJTAExecutor(); - int status = Status.STATUS_COMMITTED; - - singleThreadJTAExecutor.executeBeforeCompletion(txState, executor); - singleThreadJTAExecutor.executeAfterCompletion(txState, status); - - verify(txState, times(1)).doBeforeCompletion(); - verify(txState, times(1)).doAfterCompletion(status); - assertThat(singleThreadJTAExecutor.isBeforeCompletionFinished()).isTrue(); - } - - @Test - public void executeAfterCompletionThrowsExceptionIfAfterCompletionFailed() { - singleThreadJTAExecutor = new SingleThreadJTAExecutor(); - int status = Status.STATUS_COMMITTED; - TransactionException exception = new TransactionException(""); - doThrow(exception).when(txState).doAfterCompletion(status); - - singleThreadJTAExecutor.executeBeforeCompletion(txState, executor); - - assertThatThrownBy(() -> singleThreadJTAExecutor.executeAfterCompletion(txState, status)) - .isSameAs(exception); - verify(txState, times(1)).doBeforeCompletion(); - verify(txState, times(1)).doAfterCompletion(status); - } - - @Test - public void executorThreadNoLongerWaitForAfterCompletionIfTXStateIsCleanedUp() { - singleThreadJTAExecutor = new SingleThreadJTAExecutor(); - - singleThreadJTAExecutor.executeBeforeCompletion(txState, executor); - singleThreadJTAExecutor.cleanup(txState); + public void executeBeforeCompletionCallsDoOps() { + singleThreadJTAExecutor.executeBeforeCompletion(txState, executor, cancelCriterion); - verify(txState, times(1)).doBeforeCompletion(); - assertThat(singleThreadJTAExecutor.isBeforeCompletionFinished()).isTrue(); - assertThat(singleThreadJTAExecutor.isAfterCompletionFinished()).isTrue(); + InOrder inOrder = inOrder(beforeCompletion, afterCompletion); + inOrder.verify(beforeCompletion, times(1)).doOp(eq(txState)); + inOrder.verify(afterCompletion, times(1)).doOp(eq(txState), eq(cancelCriterion)); + verify(beforeCompletion, times(1)).execute(eq(cancelCriterion)); } }