This is an automated email from the ASF dual-hosted git repository. eshu11 pushed a commit to branch feature/GEODE-7713 in repository https://gitbox.apache.org/repos/asf/geode.git
The following commit(s) were added to refs/heads/feature/GEODE-7713 by this push: new c938be9 GEODE-7713: Throw TransactionDataRebalancedException during get operation c938be9 is described below commit c938be920e2a8ff11b93970243276ba2676efcf1 Author: Eric Shu <eshu@EricMacBookPro.local> AuthorDate: Thu Jan 16 16:56:12 2020 -0800 GEODE-7713: Throw TransactionDataRebalancedException during get operation * Throw TransactionDataRebalancedException during get operation if bucket moved to other member. --- .../geode/internal/cache/PartitionedRegion.java | 55 ++++++++-------- .../internal/cache/PartitionedRegionTest.java | 74 +++++++++++++++++++++- 2 files changed, 97 insertions(+), 32 deletions(-) diff --git a/geode-core/src/main/java/org/apache/geode/internal/cache/PartitionedRegion.java b/geode-core/src/main/java/org/apache/geode/internal/cache/PartitionedRegion.java index 96ae5af..2c1ec04 100755 --- a/geode-core/src/main/java/org/apache/geode/internal/cache/PartitionedRegion.java +++ b/geode-core/src/main/java/org/apache/geode/internal/cache/PartitionedRegion.java @@ -288,6 +288,8 @@ public class PartitionedRegion extends LocalRegion * a primary in a different server group */ public static final int NETWORK_HOP_TO_DIFFERENT_GROUP = 2; + public static final String DATA_MOVED_BY_REBALANCE = + "Transactional data moved, due to rebalancing."; private final DiskRegionStats diskRegionStats; @@ -4154,26 +4156,7 @@ public class PartitionedRegion extends LocalRegion } } else { // with transaction - if (prce instanceof BucketNotFoundException) { - throw new TransactionDataRebalancedException( - "Transactional data moved, due to rebalancing.", - prce); - } - Throwable cause = prce.getCause(); - if (cause instanceof PrimaryBucketException) { - throw (PrimaryBucketException) cause; - } else if (cause instanceof TransactionDataRebalancedException) { - throw (TransactionDataRebalancedException) cause; - } else if (cause instanceof RegionDestroyedException) { - throw new TransactionDataRebalancedException( - "Transactional data moved, due to rebalancing.", - cause); - } else { - // Make transaction fail so client could retry - // instead of returning null if ForceReattemptException is thrown. - // Should not see it currently, added to be protected against future changes. - throw new TransactionException("Failed to get key: " + key, prce); - } + handleForceReattemptExceptionWithTransaction(prce); } } catch (PrimaryBucketException notPrimary) { if (allowRetry) { @@ -4216,6 +4199,25 @@ public class PartitionedRegion extends LocalRegion return null; } + void handleForceReattemptExceptionWithTransaction( + ForceReattemptException forceReattemptException) { + if (forceReattemptException instanceof BucketNotFoundException) { + throw new TransactionDataRebalancedException(DATA_MOVED_BY_REBALANCE, + forceReattemptException); + } + Throwable cause = forceReattemptException.getCause(); + if (cause instanceof PrimaryBucketException) { + throw (PrimaryBucketException) cause; + } else if (cause instanceof TransactionDataRebalancedException) { + throw (TransactionDataRebalancedException) cause; + } else if (cause instanceof RegionDestroyedException) { + throw new TransactionDataRebalancedException(DATA_MOVED_BY_REBALANCE, cause); + } else { + throw new TransactionDataRebalancedException(DATA_MOVED_BY_REBALANCE, + forceReattemptException); + } + } + /** * If a bucket is local, try to fetch the value from it */ @@ -9497,9 +9499,7 @@ public class PartitionedRegion extends LocalRegion try { br.checkForPrimary(); } catch (PrimaryBucketException pbe) { - throw new TransactionDataRebalancedException( - "Transactional data moved, due to rebalancing.", - pbe); + throw new TransactionDataRebalancedException(DATA_MOVED_BY_REBALANCE, pbe); } } } catch (RegionDestroyedException ignore) { @@ -9550,13 +9550,8 @@ public class PartitionedRegion extends LocalRegion if (keyInfo.isCheckPrimary()) { br.checkForPrimary(); } - } catch (PrimaryBucketException pbe) { - throw new TransactionDataRebalancedException( - "Transactional data moved, due to rebalancing.", - pbe); - } catch (RegionDestroyedException ignore) { - throw new TransactionDataRebalancedException( - "Transactional data moved, due to rebalancing."); + } catch (PrimaryBucketException | RegionDestroyedException exception) { + throw new TransactionDataRebalancedException(DATA_MOVED_BY_REBALANCE, exception); } return br; } diff --git a/geode-core/src/test/java/org/apache/geode/internal/cache/PartitionedRegionTest.java b/geode-core/src/test/java/org/apache/geode/internal/cache/PartitionedRegionTest.java index 4bea39f..210ce16 100644 --- a/geode-core/src/test/java/org/apache/geode/internal/cache/PartitionedRegionTest.java +++ b/geode-core/src/test/java/org/apache/geode/internal/cache/PartitionedRegionTest.java @@ -479,7 +479,7 @@ public class PartitionedRegionTest { catchThrowable(() -> spyPartitionedRegion.getDataRegionForWrite(keyInfo)); assertThat(caughtException).isInstanceOf(TransactionDataRebalancedException.class) - .hasMessage("Transactional data moved, due to rebalancing."); + .hasMessage(PartitionedRegion.DATA_MOVED_BY_REBALANCE); } @Test @@ -500,7 +500,77 @@ public class PartitionedRegionTest { catchThrowable(() -> spyPartitionedRegion.getDataRegionForWrite(keyInfo)); assertThat(caughtException).isInstanceOf(TransactionDataRebalancedException.class) - .hasMessage("Transactional data moved, due to rebalancing."); + .hasMessage(PartitionedRegion.DATA_MOVED_BY_REBALANCE); + } + + @Test + public void transactionThrowsTransactionDataRebalancedExceptionIfBucketNotFoundException() { + PartitionedRegion spyPartitionedRegion = spy(partitionedRegion); + ForceReattemptException exception = mock(BucketNotFoundException.class); + + Throwable caughtException = + catchThrowable( + () -> spyPartitionedRegion.handleForceReattemptExceptionWithTransaction(exception)); + + assertThat(caughtException).isInstanceOf(TransactionDataRebalancedException.class) + .hasMessage(PartitionedRegion.DATA_MOVED_BY_REBALANCE); + } + + @Test + public void transactionThrowsPrimaryBucketExceptionIfForceReattemptExceptionIsCausedByPrimaryBucketException() { + PartitionedRegion spyPartitionedRegion = spy(partitionedRegion); + ForceReattemptException exception = mock(ForceReattemptException.class); + PrimaryBucketException primaryBucketException = new PrimaryBucketException(); + when(exception.getCause()).thenReturn(primaryBucketException); + + Throwable caughtException = + catchThrowable( + () -> spyPartitionedRegion.handleForceReattemptExceptionWithTransaction(exception)); + + assertThat(caughtException).isSameAs(primaryBucketException); + } + + @Test + public void transactionThrowsTransactionDataRebalancedExceptionIfForceReattemptExceptionIsCausedByTransactionDataRebalancedException() { + PartitionedRegion spyPartitionedRegion = spy(partitionedRegion); + ForceReattemptException exception = mock(ForceReattemptException.class); + TransactionDataRebalancedException transactionDataRebalancedException = + new TransactionDataRebalancedException(""); + when(exception.getCause()).thenReturn(transactionDataRebalancedException); + + Throwable caughtException = + catchThrowable( + () -> spyPartitionedRegion.handleForceReattemptExceptionWithTransaction(exception)); + + assertThat(caughtException).isSameAs(transactionDataRebalancedException); + } + + @Test + public void transactionThrowsTransactionDataRebalancedExceptionIfForceReattemptExceptionIsCausedByRegionDestroyedException() { + PartitionedRegion spyPartitionedRegion = spy(partitionedRegion); + ForceReattemptException exception = mock(ForceReattemptException.class); + RegionDestroyedException regionDestroyedException = new RegionDestroyedException("", ""); + when(exception.getCause()).thenReturn(regionDestroyedException); + + Throwable caughtException = + catchThrowable( + () -> spyPartitionedRegion.handleForceReattemptExceptionWithTransaction(exception)); + + assertThat(caughtException).isInstanceOf(TransactionDataRebalancedException.class) + .hasMessage(PartitionedRegion.DATA_MOVED_BY_REBALANCE).hasCause(regionDestroyedException); + } + + @Test + public void transactionThrowsTransactionDataRebalancedExceptionIfIsAForceReattemptException() { + PartitionedRegion spyPartitionedRegion = spy(partitionedRegion); + ForceReattemptException exception = mock(ForceReattemptException.class); + + Throwable caughtException = + catchThrowable( + () -> spyPartitionedRegion.handleForceReattemptExceptionWithTransaction(exception)); + + assertThat(caughtException).isInstanceOf(TransactionDataRebalancedException.class) + .hasMessage(PartitionedRegion.DATA_MOVED_BY_REBALANCE).hasCause(exception); } private static <K> Set<K> asSet(K... values) {