This is an automated email from the ASF dual-hosted git repository. av pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/ignite.git
The following commit(s) were added to refs/heads/master by this push: new 853b3da0529 IGNITE-20123 IgniteTxHandler initial cleanup (#10869) 853b3da0529 is described below commit 853b3da05299255269c6d6e6aaf7840594ed8c47 Author: Anton Vinogradov <a...@apache.org> AuthorDate: Fri Aug 4 12:38:25 2023 +0300 IGNITE-20123 IgniteTxHandler initial cleanup (#10869) --- .../cache/transactions/IgniteTxHandler.java | 299 ++++++++------------- 1 file changed, 110 insertions(+), 189 deletions(-) diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxHandler.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxHandler.java index c503957b838..b3d93189a9f 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxHandler.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxHandler.java @@ -91,9 +91,6 @@ import org.apache.ignite.internal.transactions.IgniteTxRollbackCheckedException; import org.apache.ignite.internal.transactions.IgniteTxTimeoutCheckedException; import org.apache.ignite.internal.util.future.GridCompoundFuture; import org.apache.ignite.internal.util.future.GridFinishedFuture; -import org.apache.ignite.internal.util.typedef.C1; -import org.apache.ignite.internal.util.typedef.CI1; -import org.apache.ignite.internal.util.typedef.CI2; import org.apache.ignite.internal.util.typedef.F; import org.apache.ignite.internal.util.typedef.X; import org.apache.ignite.internal.util.typedef.internal.CU; @@ -136,7 +133,7 @@ import static org.apache.ignite.transactions.TransactionState.ROLLING_BACK; */ public class IgniteTxHandler { /** Logger. */ - private IgniteLogger log; + private final IgniteLogger log; /** */ private final IgniteLogger txPrepareMsgLog; @@ -148,7 +145,7 @@ public class IgniteTxHandler { private final IgniteLogger txRecoveryMsgLog; /** Shared cache context. */ - private GridCacheSharedContext<?, ?> ctx; + private final GridCacheSharedContext<?, ?> ctx; /** * @param nearNodeId Sender node ID. @@ -182,16 +179,13 @@ public class IgniteTxHandler { * @param nearNode Sender node. * @param req Request. */ - private IgniteInternalFuture<GridNearTxPrepareResponse> processNearTxPrepareRequest0( - ClusterNode nearNode, - GridNearTxPrepareRequest req - ) { + private void processNearTxPrepareRequest0(ClusterNode nearNode, GridNearTxPrepareRequest req) { IgniteInternalFuture<GridNearTxPrepareResponse> fut; if (req.firstClientRequest() && req.allowWaitTopologyFuture()) { for (;;) { if (waitForExchangeFuture(nearNode, req)) - return new GridFinishedFuture<>(); + return; fut = prepareNearTx(nearNode, req); @@ -204,8 +198,6 @@ public class IgniteTxHandler { assert req.txState() != null || fut == null || fut.error() != null || (ctx.tm().tx(req.version()) == null && ctx.tm().nearTx(req.version()) == null); - - return fut; } /** @@ -220,68 +212,38 @@ public class IgniteTxHandler { txPrepareMsgLog = ctx.logger(CU.TX_MSG_PREPARE_LOG_CATEGORY); txFinishMsgLog = ctx.logger(CU.TX_MSG_FINISH_LOG_CATEGORY); - ctx.io().addCacheHandler(GridNearTxPrepareRequest.class, new CI2<UUID, GridCacheMessage>() { - @Override public void apply(UUID nodeId, GridCacheMessage msg) { - processNearTxPrepareRequest(nodeId, (GridNearTxPrepareRequest)msg); - } - }); + ctx.io().addCacheHandler(GridNearTxPrepareRequest.class, (UUID nodeId, GridCacheMessage msg) -> + processNearTxPrepareRequest(nodeId, (GridNearTxPrepareRequest)msg)); - ctx.io().addCacheHandler(GridNearTxPrepareResponse.class, new CI2<UUID, GridCacheMessage>() { - @Override public void apply(UUID nodeId, GridCacheMessage msg) { - processNearTxPrepareResponse(nodeId, (GridNearTxPrepareResponse)msg); - } - }); + ctx.io().addCacheHandler(GridNearTxPrepareResponse.class, (UUID nodeId, GridCacheMessage msg) -> + processNearTxPrepareResponse(nodeId, (GridNearTxPrepareResponse)msg)); - ctx.io().addCacheHandler(GridNearTxFinishRequest.class, new CI2<UUID, GridCacheMessage>() { - @Override public void apply(UUID nodeId, GridCacheMessage msg) { - processNearTxFinishRequest(nodeId, (GridNearTxFinishRequest)msg); - } - }); + ctx.io().addCacheHandler(GridNearTxFinishRequest.class, (UUID nodeId, GridCacheMessage msg) -> + processNearTxFinishRequest(nodeId, (GridNearTxFinishRequest)msg)); - ctx.io().addCacheHandler(GridNearTxFinishResponse.class, new CI2<UUID, GridCacheMessage>() { - @Override public void apply(UUID nodeId, GridCacheMessage msg) { - processNearTxFinishResponse(nodeId, (GridNearTxFinishResponse)msg); - } - }); + ctx.io().addCacheHandler(GridNearTxFinishResponse.class, (UUID nodeId, GridCacheMessage msg) -> + processNearTxFinishResponse(nodeId, (GridNearTxFinishResponse)msg)); - ctx.io().addCacheHandler(GridDhtTxPrepareRequest.class, new CI2<UUID, GridCacheMessage>() { - @Override public void apply(UUID nodeId, GridCacheMessage msg) { - processDhtTxPrepareRequest(nodeId, (GridDhtTxPrepareRequest)msg); - } - }); + ctx.io().addCacheHandler(GridDhtTxPrepareRequest.class, (UUID nodeId, GridCacheMessage msg) -> + processDhtTxPrepareRequest(nodeId, (GridDhtTxPrepareRequest)msg)); - ctx.io().addCacheHandler(GridDhtTxPrepareResponse.class, new CI2<UUID, GridCacheMessage>() { - @Override public void apply(UUID nodeId, GridCacheMessage msg) { - processDhtTxPrepareResponse(nodeId, (GridDhtTxPrepareResponse)msg); - } - }); + ctx.io().addCacheHandler(GridDhtTxPrepareResponse.class, (UUID nodeId, GridCacheMessage msg) -> + processDhtTxPrepareResponse(nodeId, (GridDhtTxPrepareResponse)msg)); - ctx.io().addCacheHandler(GridDhtTxFinishRequest.class, new CI2<UUID, GridCacheMessage>() { - @Override public void apply(UUID nodeId, GridCacheMessage msg) { - processDhtTxFinishRequest(nodeId, (GridDhtTxFinishRequest)msg); - } - }); + ctx.io().addCacheHandler(GridDhtTxFinishRequest.class, (UUID nodeId, GridCacheMessage msg) -> + processDhtTxFinishRequest(nodeId, (GridDhtTxFinishRequest)msg)); - ctx.io().addCacheHandler(GridDhtTxOnePhaseCommitAckRequest.class, new CI2<UUID, GridCacheMessage>() { - @Override public void apply(UUID nodeId, GridCacheMessage msg) { - processDhtTxOnePhaseCommitAckRequest(nodeId, (GridDhtTxOnePhaseCommitAckRequest)msg); - } - }); + ctx.io().addCacheHandler(GridDhtTxOnePhaseCommitAckRequest.class, (UUID nodeId, GridCacheMessage msg) -> + processDhtTxOnePhaseCommitAckRequest(nodeId, (GridDhtTxOnePhaseCommitAckRequest)msg)); - ctx.io().addCacheHandler(GridDhtTxFinishResponse.class, new CI2<UUID, GridCacheMessage>() { - @Override public void apply(UUID nodeId, GridCacheMessage msg) { - processDhtTxFinishResponse(nodeId, (GridDhtTxFinishResponse)msg); - } - }); + ctx.io().addCacheHandler(GridDhtTxFinishResponse.class, (UUID nodeId, GridCacheMessage msg) -> + processDhtTxFinishResponse(nodeId, (GridDhtTxFinishResponse)msg)); - ctx.io().addCacheHandler(GridCacheTxRecoveryRequest.class, - (CI2<UUID, GridCacheTxRecoveryRequest>)this::processCheckPreparedTxRequest); + ctx.io().addCacheHandler(GridCacheTxRecoveryRequest.class, this::processCheckPreparedTxRequest); - ctx.io().addCacheHandler(GridCacheTxRecoveryResponse.class, - (CI2<UUID, GridCacheTxRecoveryResponse>)this::processCheckPreparedTxResponse); + ctx.io().addCacheHandler(GridCacheTxRecoveryResponse.class, this::processCheckPreparedTxResponse); - ctx.io().addCacheHandler(IncrementalSnapshotAwareMessage.class, - (CI2<UUID, IncrementalSnapshotAwareMessage>)this::processIncrementalSnapshotAwareMessage); + ctx.io().addCacheHandler(IncrementalSnapshotAwareMessage.class, this::processIncrementalSnapshotAwareMessage); } /** */ @@ -373,31 +335,29 @@ public class IgniteTxHandler { IgniteInternalFuture<GridNearTxPrepareResponse> fut = locTx.prepareAsyncLocal(req); - return fut.chain(new C1<IgniteInternalFuture<GridNearTxPrepareResponse>, GridNearTxPrepareResponse>() { - @Override public GridNearTxPrepareResponse apply(IgniteInternalFuture<GridNearTxPrepareResponse> f) { - try { - return f.get(); - } - catch (Exception e) { - locTx.setRollbackOnly(); // Just in case. + return fut.chain((IgniteInternalFuture<GridNearTxPrepareResponse> f) -> { + try { + return f.get(); + } + catch (Exception e) { + locTx.setRollbackOnly(); // Just in case. - if (!X.hasCause(e, IgniteTxOptimisticCheckedException.class) && - !X.hasCause(e, IgniteFutureCancelledException.class)) - U.error(log, "Failed to prepare DHT transaction: " + locTx, e); - - return new GridNearTxPrepareResponse( - req.partition(), - req.version(), - req.futureId(), - req.miniId(), - req.version(), - req.version(), - null, - e, - null, - req.onePhaseCommit(), - req.deployInfo() != null); - } + if (!X.hasCause(e, IgniteTxOptimisticCheckedException.class) && + !X.hasCause(e, IgniteFutureCancelledException.class)) + U.error(log, "Failed to prepare DHT transaction: " + locTx, e); + + return new GridNearTxPrepareResponse( + req.partition(), + req.version(), + req.futureId(), + req.miniId(), + req.version(), + req.version(), + null, + e, + null, + req.onePhaseCommit(), + req.deployInfo() != null); } }); } @@ -658,18 +618,16 @@ public class IgniteTxHandler { final GridDhtTxLocal tx0 = tx; - fut.listen(new CI1<IgniteInternalFuture<?>>() { - @Override public void apply(IgniteInternalFuture<?> txFut) { - try { - txFut.get(); - } - catch (IgniteCheckedException e) { - tx0.setRollbackOnly(); // Just in case. + fut.listen((IgniteInternalFuture<?> txFut) -> { + try { + txFut.get(); + } + catch (IgniteCheckedException e) { + tx0.setRollbackOnly(); // Just in case. - if (!X.hasCause(e, IgniteTxOptimisticCheckedException.class) && - !X.hasCause(e, IgniteFutureCancelledException.class) && !ctx.kernalContext().isStopping()) - U.error(log, "Failed to prepare DHT transaction: " + tx0, e); - } + if (!X.hasCause(e, IgniteTxOptimisticCheckedException.class) && + !X.hasCause(e, IgniteFutureCancelledException.class) && !ctx.kernalContext().isStopping()) + U.error(log, "Failed to prepare DHT transaction: " + tx0, e); } }); @@ -806,7 +764,7 @@ public class IgniteTxHandler { nodeId + ']'); GridNearTxPrepareFutureAdapter fut = (GridNearTxPrepareFutureAdapter)ctx.mvcc() - .<IgniteInternalTx>versionedFuture(res.version(), res.futureId()); + .versionedFuture(res.version(), res.futureId()); if (fut == null) { U.warn(log, "Failed to find future for near prepare response [txId=" + res.version() + @@ -836,7 +794,7 @@ public class IgniteTxHandler { if (txFinishMsgLog.isDebugEnabled()) txFinishMsgLog.debug("Received near finish response [txId=" + res.xid() + ", node=" + nodeId + ']'); - GridNearTxFinishFuture fut = (GridNearTxFinishFuture)ctx.mvcc().<IgniteInternalTx>future(res.futureId()); + GridNearTxFinishFuture fut = (GridNearTxFinishFuture)ctx.mvcc().future(res.futureId()); if (fut == null) { if (txFinishMsgLog.isDebugEnabled()) { @@ -897,7 +855,7 @@ public class IgniteTxHandler { assert res != null; if (res.checkCommitted()) { - GridNearTxFinishFuture fut = (GridNearTxFinishFuture)ctx.mvcc().<IgniteInternalTx>future(res.futureId()); + GridNearTxFinishFuture fut = (GridNearTxFinishFuture)ctx.mvcc().future(res.futureId()); if (fut == null) { if (txFinishMsgLog.isDebugEnabled()) { @@ -918,7 +876,7 @@ public class IgniteTxHandler { fut.onResult(nodeId, res); } else { - GridDhtTxFinishFuture fut = (GridDhtTxFinishFuture)ctx.mvcc().<IgniteInternalTx>future(res.futureId()); + GridDhtTxFinishFuture fut = (GridDhtTxFinishFuture)ctx.mvcc().future(res.futureId()); if (fut == null) { if (txFinishMsgLog.isDebugEnabled()) { @@ -944,12 +902,8 @@ public class IgniteTxHandler { /** * @param nodeId Node ID. * @param req Request. - * @return Future. */ - @Nullable private IgniteInternalFuture<IgniteInternalTx> processNearTxFinishRequest( - UUID nodeId, - GridNearTxFinishRequest req - ) { + private void processNearTxFinishRequest(UUID nodeId, GridNearTxFinishRequest req) { try (TraceSurroundings ignored = MTC.support(ctx.kernalContext().tracing().create(TX_NEAR_FINISH_REQ, MTC.span()))) { if (txFinishMsgLog.isDebugEnabled()) @@ -961,8 +915,6 @@ public class IgniteTxHandler { assert req.txState() != null || fut == null || fut.error() != null || (ctx.tm().tx(req.version()) == null && ctx.tm().nearTx(req.version()) == null) : "[req=" + req + ", fut=" + fut + "]"; - - return fut; } } @@ -988,7 +940,7 @@ public class IgniteTxHandler { // Transaction on local cache only. if (locTx != null && !locTx.nearLocallyMapped() && !locTx.colocatedLocallyMapped()) - return new GridFinishedFuture<IgniteInternalTx>(locTx); + return new GridFinishedFuture<>(locTx); if (log.isDebugEnabled()) log.debug("Processing near tx finish request [nodeId=" + nodeId + ", req=" + req + "]"); @@ -1325,37 +1277,15 @@ public class IgniteTxHandler { } if (req.onePhaseCommit()) { - IgniteInternalFuture completeFut; - - IgniteInternalFuture<IgniteInternalTx> dhtFin = dhtTx == null ? - null : dhtTx.done() ? null : dhtTx.finishFuture(); - - final IgniteInternalFuture<IgniteInternalTx> nearFin = nearTx == null ? - null : nearTx.done() ? null : nearTx.finishFuture(); - - if (dhtFin != null && nearFin != null) { - GridCompoundFuture fut = new GridCompoundFuture(); - - fut.add(dhtFin); - fut.add(nearFin); - - fut.markInitialized(); - - completeFut = fut; - } - else - completeFut = dhtFin != null ? dhtFin : nearFin; + IgniteInternalFuture<IgniteInternalTx> completeFut = completeFuture(dhtTx, nearTx); if (completeFut != null) { final GridDhtTxPrepareResponse res0 = res; final GridDhtTxRemote dhtTx0 = dhtTx; final GridNearTxRemote nearTx0 = nearTx; - completeFut.listen(new CI1<IgniteInternalFuture<IgniteInternalTx>>() { - @Override public void apply(IgniteInternalFuture<IgniteInternalTx> fut) { - sendReply(nodeId, req, res0, dhtTx0, nearTx0); - } - }); + completeFut.listen((IgniteInternalFuture<IgniteInternalTx> fut) -> + sendReply(nodeId, req, res0, dhtTx0, nearTx0)); } else sendReply(nodeId, req, res, dhtTx, nearTx); @@ -1368,6 +1298,31 @@ public class IgniteTxHandler { } } + /** + * @param dhtTx Dht tx. + * @param nearTx Near tx. + */ + private IgniteInternalFuture<IgniteInternalTx> completeFuture(GridDhtTxRemote dhtTx, GridNearTxRemote nearTx) { + IgniteInternalFuture<IgniteInternalTx> dhtFin = + dhtTx == null ? null : dhtTx.done() ? null : dhtTx.finishFuture(); + + final IgniteInternalFuture<IgniteInternalTx> nearFin = + nearTx == null ? null : nearTx.done() ? null : nearTx.finishFuture(); + + if (dhtFin != null && nearFin != null) { + GridCompoundFuture<IgniteInternalTx, IgniteInternalTx> fut = new GridCompoundFuture<>(); + + fut.add(dhtFin); + fut.add(nearFin); + + fut.markInitialized(); + + return fut; + } + else + return dhtFin != null ? dhtFin : nearFin; + } + /** * @param nodeId Node ID. * @param req Request. @@ -1391,7 +1346,6 @@ public class IgniteTxHandler { * @param nodeId Node ID. * @param req Request. */ - @SuppressWarnings({"unchecked"}) private void processDhtTxFinishRequest(final UUID nodeId, final GridDhtTxFinishRequest req) { try (TraceSurroundings ignored = MTC.support(ctx.kernalContext().tracing().create(TX_PROCESS_DHT_FINISH_REQ, MTC.span()))) { @@ -1406,11 +1360,7 @@ public class IgniteTxHandler { else { IgniteInternalFuture<?> fut = ctx.tm().remoteTxFinishFuture(req.version()); - fut.listen(new CI1<IgniteInternalFuture<?>>() { - @Override public void apply(IgniteInternalFuture<?> fut) { - sendReply(nodeId, req, true, null); - } - }); + fut.listen((IgniteInternalFuture<?> f) -> sendReply(nodeId, req, true, null)); } return; @@ -1435,7 +1385,7 @@ public class IgniteTxHandler { ctx.tm().addCommittedTx(null, req.version(), null); if (dhtTx != null) - finish(nodeId, dhtTx, req); + finish(dhtTx, req); else { try { applyPartitionsUpdatesCounters(req.updateCounters(), !req.commit(), false); @@ -1446,36 +1396,14 @@ public class IgniteTxHandler { } if (nearTx != null) - finish(nodeId, nearTx, req); + finish(nearTx, req); if (req.replyRequired()) { - IgniteInternalFuture completeFut; - - IgniteInternalFuture<IgniteInternalTx> dhtFin = dhtTx == null ? - null : dhtTx.done() ? null : dhtTx.finishFuture(); - - final IgniteInternalFuture<IgniteInternalTx> nearFin = nearTx == null ? - null : nearTx.done() ? null : nearTx.finishFuture(); - - if (dhtFin != null && nearFin != null) { - GridCompoundFuture fut = new GridCompoundFuture(); - - fut.add(dhtFin); - fut.add(nearFin); - - fut.markInitialized(); - - completeFut = fut; - } - else - completeFut = dhtFin != null ? dhtFin : nearFin; + IgniteInternalFuture<IgniteInternalTx> completeFut = completeFuture(dhtTx, nearTx); if (completeFut != null) { - completeFut.listen(new CI1<IgniteInternalFuture<IgniteInternalTx>>() { - @Override public void apply(IgniteInternalFuture<IgniteInternalTx> fut) { - sendReply(nodeId, req, true, nearTxId); - } - }); + completeFut.listen((IgniteInternalFuture<IgniteInternalTx> fut) -> + sendReply(nodeId, req, true, nearTxId)); } else sendReply(nodeId, req, true, nearTxId); @@ -1488,15 +1416,10 @@ public class IgniteTxHandler { } /** - * @param nodeId Node ID. * @param tx Transaction. * @param req Request. */ - protected void finish( - UUID nodeId, - IgniteTxRemoteEx tx, - GridDhtTxFinishRequest req - ) { + protected void finish(IgniteTxRemoteEx tx, GridDhtTxFinishRequest req) { assert tx != null; req.txState(tx.txState()); @@ -2209,7 +2132,7 @@ public class IgniteTxHandler { boolean prepared; try { - prepared = fut == null ? true : fut.get(); + prepared = fut == null || fut.get(); } catch (IgniteCheckedException e) { U.error(log, "Check prepared transaction future failed [req=" + req + ']', e); @@ -2220,21 +2143,19 @@ public class IgniteTxHandler { sendCheckPreparedResponse(nodeId, req, prepared); } else { - fut.listen(new CI1<IgniteInternalFuture<Boolean>>() { - @Override public void apply(IgniteInternalFuture<Boolean> fut) { - boolean prepared; - - try { - prepared = fut.get(); - } - catch (IgniteCheckedException e) { - U.error(log, "Check prepared transaction future failed [req=" + req + ']', e); + fut.listen((IgniteInternalFuture<Boolean> f) -> { + boolean prepared; - prepared = false; - } + try { + prepared = fut.get(); + } + catch (IgniteCheckedException e) { + U.error(log, "Check prepared transaction future failed [req=" + req + ']', e); - sendCheckPreparedResponse(nodeId, req, prepared); + prepared = false; } + + sendCheckPreparedResponse(nodeId, req, prepared); }); } } @@ -2425,14 +2346,14 @@ public class IgniteTxHandler { AffinityTopologyVersion top = tx.topologyVersionSnapshot(); for (PartitionUpdateCountersMessage partCntrs : updCntrs) { - GridDhtPartitionTopology topology = ctx.cacheContext(partCntrs.cacheId()).topology(); + GridDhtPartitionTopology partTop = ctx.cacheContext(partCntrs.cacheId()).topology(); PartitionUpdateCountersMessage resCntrs = new PartitionUpdateCountersMessage(partCntrs.cacheId(), partCntrs.size()); for (int i = 0; i < partCntrs.size(); i++) { int part = partCntrs.partition(i); - if (topology.nodes(part, top).indexOf(node) > 0) + if (partTop.nodes(part, top).indexOf(node) > 0) resCntrs.add(part, partCntrs.initialCounter(i), partCntrs.updatesCount(i)); }