ignite-1.5 Fix for transaction retry logic in DataStructuresProcessor.
Project: http://git-wip-us.apache.org/repos/asf/ignite/repo Commit: http://git-wip-us.apache.org/repos/asf/ignite/commit/847bd424 Tree: http://git-wip-us.apache.org/repos/asf/ignite/tree/847bd424 Diff: http://git-wip-us.apache.org/repos/asf/ignite/diff/847bd424 Branch: refs/heads/ignite-1537 Commit: 847bd424e098875c00bf5d3f8d42cf40b6e2dd52 Parents: a88d81a Author: sboikov <[email protected]> Authored: Fri Dec 11 10:53:57 2015 +0300 Committer: sboikov <[email protected]> Committed: Fri Dec 11 10:53:57 2015 +0300 ---------------------------------------------------------------------- .../datastructures/DataStructuresProcessor.java | 161 +++++++++---------- 1 file changed, 79 insertions(+), 82 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/ignite/blob/847bd424/modules/core/src/main/java/org/apache/ignite/internal/processors/datastructures/DataStructuresProcessor.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/datastructures/DataStructuresProcessor.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/datastructures/DataStructuresProcessor.java index 51c4067..cd783e4 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/datastructures/DataStructuresProcessor.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/datastructures/DataStructuresProcessor.java @@ -57,15 +57,16 @@ import org.apache.ignite.internal.GridKernalContext; import org.apache.ignite.internal.IgniteInternalFuture; import org.apache.ignite.internal.IgniteInterruptedCheckedException; import org.apache.ignite.internal.cluster.ClusterTopologyCheckedException; +import org.apache.ignite.internal.cluster.ClusterTopologyServerNotFoundException; import org.apache.ignite.internal.managers.eventstorage.GridLocalEventListener; import org.apache.ignite.internal.processors.GridProcessorAdapter; import org.apache.ignite.internal.processors.cache.CacheType; +import org.apache.ignite.internal.processors.cache.GridCacheAdapter; import org.apache.ignite.internal.processors.cache.GridCacheContext; import org.apache.ignite.internal.processors.cache.GridCacheInternal; import org.apache.ignite.internal.processors.cache.GridCacheUtils; import org.apache.ignite.internal.processors.cache.IgniteInternalCache; import org.apache.ignite.internal.processors.cache.transactions.IgniteInternalTx; -import org.apache.ignite.internal.transactions.IgniteTxRollbackCheckedException; import org.apache.ignite.internal.util.lang.IgniteClosureX; import org.apache.ignite.internal.util.lang.IgniteInClosureX; import org.apache.ignite.internal.util.lang.IgniteOutClosureX; @@ -504,8 +505,8 @@ public final class DataStructuresProcessor extends GridProcessorAdapter { * @throws IgniteCheckedException If failed. */ @Nullable private <T> T getAtomic(final IgniteOutClosureX<T> c, - DataStructureInfo dsInfo, - boolean create, + final DataStructureInfo dsInfo, + final boolean create, Class<? extends T> cls) throws IgniteCheckedException { @@ -527,39 +528,26 @@ public final class DataStructuresProcessor extends GridProcessorAdapter { if (dataStructure != null) return dataStructure; - while (true) { - try { + return retryTopologySafe(new IgniteOutClosureX<T>() { + @Override public T applyx() throws IgniteCheckedException { if (!create) return c.applyx(); try (IgniteInternalTx tx = utilityCache.txStartEx(PESSIMISTIC, REPEATABLE_READ)) { - err = utilityCache.invoke(DATA_STRUCTURES_KEY, new AddAtomicProcessor(dsInfo)).get(); + IgniteCheckedException err = + utilityCache.invoke(DATA_STRUCTURES_KEY, new AddAtomicProcessor(dsInfo)).get(); if (err != null) throw err; - dataStructure = c.applyx(); + T dataStructure = c.applyx(); tx.commit(); return dataStructure; } } - catch (IgniteTxRollbackCheckedException ignore) { - // Safe to retry right away. - } - catch (IgniteCheckedException e) { - ClusterTopologyCheckedException topErr = e.getCause(ClusterTopologyCheckedException.class); - - if (topErr == null) - throw e; - - IgniteInternalFuture<?> fut = topErr.retryReadyFuture(); - - if (fut != null) - fut.get(); - } - } + }); } /** @@ -597,10 +585,10 @@ public final class DataStructuresProcessor extends GridProcessorAdapter { * @param afterRmv Optional closure to run after data structure removed. * @throws IgniteCheckedException If failed. */ - private <T> void removeDataStructure(IgniteOutClosureX<T> c, + private <T> void removeDataStructure(final IgniteOutClosureX<T> c, String name, DataStructureType type, - @Nullable IgniteInClosureX<T> afterRmv) + @Nullable final IgniteInClosureX<T> afterRmv) throws IgniteCheckedException { Map<String, DataStructureInfo> dsMap = utilityCache.get(DATA_STRUCTURES_KEY); @@ -608,52 +596,42 @@ public final class DataStructuresProcessor extends GridProcessorAdapter { if (dsMap == null || !dsMap.containsKey(name)) return; - DataStructureInfo dsInfo = new DataStructureInfo(name, type, null); + final DataStructureInfo dsInfo = new DataStructureInfo(name, type, null); IgniteCheckedException err = validateDataStructure(dsMap, dsInfo, false); if (err != null) throw err; - while (true) { - try (IgniteInternalTx tx = utilityCache.txStartEx(PESSIMISTIC, REPEATABLE_READ)) { - T2<Boolean, IgniteCheckedException> res = - utilityCache.invoke(DATA_STRUCTURES_KEY, new RemoveDataStructureProcessor(dsInfo)).get(); - - err = res.get2(); + retryTopologySafe(new IgniteOutClosureX<Void>() { + @Override public Void applyx() throws IgniteCheckedException { + try (IgniteInternalTx tx = utilityCache.txStartEx(PESSIMISTIC, REPEATABLE_READ)) { + T2<Boolean, IgniteCheckedException> res = + utilityCache.invoke(DATA_STRUCTURES_KEY, new RemoveDataStructureProcessor(dsInfo)).get(); - if (err != null) - throw err; + IgniteCheckedException err = res.get2(); - assert res.get1() != null; + if (err != null) + throw err; - boolean exists = res.get1(); + assert res.get1() != null; - if (!exists) - return; + boolean exists = res.get1(); - T rmvInfo = c.applyx(); + if (!exists) + return null; - tx.commit(); + T rmvInfo = c.applyx(); - if (afterRmv != null && rmvInfo != null) - afterRmv.applyx(rmvInfo); - } - catch (IgniteTxRollbackCheckedException ignore) { - // Safe to retry right away. - } - catch (IgniteCheckedException e) { - ClusterTopologyCheckedException topErr = e.getCause(ClusterTopologyCheckedException.class); - - if (topErr == null) - throw e; + tx.commit(); - IgniteInternalFuture<?> fut = topErr.retryReadyFuture(); + if (afterRmv != null && rmvInfo != null) + afterRmv.applyx(rmvInfo); - if (fut != null) - fut.get(); + return null; + } } - } + }); } /** @@ -1000,7 +978,7 @@ public final class DataStructuresProcessor extends GridProcessorAdapter { * @throws IgniteCheckedException If failed. */ @Nullable private <T> T getCollection(final IgniteClosureX<GridCacheContext, T> c, - DataStructureInfo dsInfo, + final DataStructureInfo dsInfo, boolean create) throws IgniteCheckedException { @@ -1028,41 +1006,29 @@ public final class DataStructuresProcessor extends GridProcessorAdapter { return c.applyx(cacheCtx); } - while (true) { - try (IgniteInternalTx tx = utilityCache.txStartEx(PESSIMISTIC, REPEATABLE_READ)) { - T2<String, IgniteCheckedException> res = - utilityCache.invoke(DATA_STRUCTURES_KEY, new AddCollectionProcessor(dsInfo)).get(); - - err = res.get2(); - - if (err != null) - throw err; - - String cacheName = res.get1(); + return retryTopologySafe(new IgniteOutClosureX<T>() { + @Override public T applyx() throws IgniteCheckedException { + try (IgniteInternalTx tx = utilityCache.txStartEx(PESSIMISTIC, REPEATABLE_READ)) { + T2<String, IgniteCheckedException> res = + utilityCache.invoke(DATA_STRUCTURES_KEY, new AddCollectionProcessor(dsInfo)).get(); - final GridCacheContext cacheCtx = ctx.cache().internalCache(cacheName).context(); + IgniteCheckedException err = res.get2(); - T col = c.applyx(cacheCtx); + if (err != null) + throw err; - tx.commit(); + String cacheName = res.get1(); - return col; - } - catch (IgniteTxRollbackCheckedException ignore) { - // Safe to retry right away. - } - catch (IgniteCheckedException e) { - ClusterTopologyCheckedException topErr = e.getCause(ClusterTopologyCheckedException.class); + final GridCacheContext cacheCtx = ctx.cache().internalCache(cacheName).context(); - if (topErr == null) - throw e; + T col = c.applyx(cacheCtx); - IgniteInternalFuture<?> fut = topErr.retryReadyFuture(); + tx.commit(); - if (fut != null) - fut.get(); + return col; + } } - } + }); } /** @@ -1659,6 +1625,37 @@ public final class DataStructuresProcessor extends GridProcessorAdapter { } /** + * @param c Closure to run. + * @throws IgniteCheckedException If failed. + * @return Closure return value. + */ + private static <T> T retryTopologySafe(IgniteOutClosureX<T> c) throws IgniteCheckedException { + for (int i = 0; i < GridCacheAdapter.MAX_RETRIES; i++) { + try { + return c.applyx(); + } + catch (IgniteCheckedException e) { + if (i == GridCacheAdapter.MAX_RETRIES - 1) + throw e; + + ClusterTopologyCheckedException topErr = e.getCause(ClusterTopologyCheckedException.class); + + if (topErr == null || (topErr instanceof ClusterTopologyServerNotFoundException)) + throw e; + + IgniteInternalFuture<?> fut = topErr.retryReadyFuture(); + + if (fut != null) + fut.get(); + } + } + + assert false; + + return null; + } + + /** * */ enum DataStructureType {
