Merge branch ignite-1.5 into ignite-1282
Project: http://git-wip-us.apache.org/repos/asf/ignite/repo Commit: http://git-wip-us.apache.org/repos/asf/ignite/commit/3d4ce809 Tree: http://git-wip-us.apache.org/repos/asf/ignite/tree/3d4ce809 Diff: http://git-wip-us.apache.org/repos/asf/ignite/diff/3d4ce809 Branch: refs/heads/ignite-1282 Commit: 3d4ce809fc96d93936a69a6076e7141da41d739c Parents: c505f48 900788b Author: Alexey Goncharuk <alexey.goncha...@gmail.com> Authored: Fri Nov 20 14:03:43 2015 +0300 Committer: Alexey Goncharuk <alexey.goncha...@gmail.com> Committed: Fri Nov 20 14:03:43 2015 +0300 ---------------------------------------------------------------------- modules/camel/README.txt | 34 + modules/camel/licenses/apache-2.0.txt | 202 ++ modules/camel/pom.xml | 102 + .../ignite/stream/camel/CamelStreamer.java | 237 ++ .../stream/camel/IgniteCamelStreamerTest.java | 420 ++++ .../camel/IgniteCamelStreamerTestSuite.java | 48 + .../src/test/resources/camel.test.properties | 18 + .../ignite/codegen/MessageCodeGenerator.java | 1 + .../java/org/apache/ignite/IgniteCache.java | 3 +- .../java/org/apache/ignite/IgniteCompute.java | 3 +- .../org/apache/ignite/compute/ComputeJob.java | 2 +- .../internal/GridEventConsumeHandler.java | 22 +- .../internal/GridMessageListenHandler.java | 18 + .../ignite/internal/GridUpdateNotifier.java | 2 +- .../apache/ignite/internal/IgniteKernal.java | 9 +- .../communication/GridIoMessageFactory.java | 26 +- .../discovery/GridDiscoveryManager.java | 2 +- .../processors/cache/GridCacheAdapter.java | 151 +- .../processors/cache/GridCacheAtomicFuture.java | 6 + .../cache/GridCacheDeploymentManager.java | 2 +- .../processors/cache/GridCacheEntryEx.java | 12 +- .../processors/cache/GridCacheFuture.java | 13 - .../processors/cache/GridCacheGateway.java | 1 - .../processors/cache/GridCacheIoManager.java | 50 +- .../processors/cache/GridCacheMapEntry.java | 158 +- .../processors/cache/GridCacheMessage.java | 20 +- .../processors/cache/GridCacheMvcc.java | 7 - .../processors/cache/GridCacheMvccFuture.java | 7 + .../processors/cache/GridCacheMvccManager.java | 150 +- .../GridCachePartitionExchangeManager.java | 59 +- .../cache/GridCacheSharedContext.java | 38 +- .../cache/GridCacheUpdateAtomicResult.java | 15 +- .../cache/GridCacheUpdateTxResult.java | 24 +- .../processors/cache/IgniteCacheProxy.java | 3 + .../distributed/GridCacheTxRecoveryFuture.java | 54 +- .../distributed/GridDistributedBaseMessage.java | 56 - .../distributed/GridDistributedLockRequest.java | 6 - .../GridDistributedLockResponse.java | 32 +- .../distributed/GridDistributedTxMapping.java | 78 - .../GridDistributedTxPrepareRequest.java | 67 +- .../GridDistributedTxRemoteAdapter.java | 158 +- .../dht/CacheDistributedGetFutureAdapter.java | 27 +- .../cache/distributed/dht/CacheGetFuture.java | 32 + .../dht/GridClientPartitionTopology.java | 38 +- .../distributed/dht/GridDhtCacheAdapter.java | 141 ++ .../distributed/dht/GridDhtLocalPartition.java | 35 + .../distributed/dht/GridDhtLockFuture.java | 79 +- .../distributed/dht/GridDhtLockRequest.java | 2 +- .../dht/GridDhtPartitionTopology.java | 26 +- .../dht/GridDhtPartitionTopologyImpl.java | 112 +- .../dht/GridDhtTransactionalCacheAdapter.java | 14 +- .../distributed/dht/GridDhtTxFinishFuture.java | 38 +- .../distributed/dht/GridDhtTxFinishRequest.java | 112 +- .../cache/distributed/dht/GridDhtTxLocal.java | 28 +- .../distributed/dht/GridDhtTxLocalAdapter.java | 89 +- .../cache/distributed/dht/GridDhtTxMapping.java | 134 +- .../distributed/dht/GridDhtTxPrepareFuture.java | 136 +- .../dht/GridDhtTxPrepareRequest.java | 54 +- .../cache/distributed/dht/GridDhtTxRemote.java | 29 +- .../dht/GridPartitionedGetFuture.java | 69 +- .../dht/GridPartitionedSingleGetFuture.java | 697 ++++++ .../dht/atomic/GridDhtAtomicCache.java | 206 +- .../dht/atomic/GridDhtAtomicUpdateFuture.java | 159 +- .../dht/atomic/GridDhtAtomicUpdateRequest.java | 121 +- .../dht/atomic/GridNearAtomicUpdateFuture.java | 5 - .../dht/colocated/GridDhtColocatedCache.java | 162 +- .../colocated/GridDhtColocatedLockFuture.java | 81 +- .../GridDhtPartitionsExchangeFuture.java | 35 +- .../preloader/GridDhtPartitionsFullMessage.java | 64 +- .../GridDhtPartitionsSingleMessage.java | 56 +- .../distributed/near/CacheVersionedValue.java | 2 +- .../distributed/near/GridNearAtomicCache.java | 10 +- .../distributed/near/GridNearCacheAdapter.java | 4 +- .../distributed/near/GridNearGetFuture.java | 49 +- .../distributed/near/GridNearGetRequest.java | 1 - .../distributed/near/GridNearGetResponse.java | 2 - .../distributed/near/GridNearLockFuture.java | 72 +- .../distributed/near/GridNearLockRequest.java | 4 +- ...arOptimisticSerializableTxPrepareFuture.java | 124 +- .../near/GridNearOptimisticTxPrepareFuture.java | 170 +- ...ridNearOptimisticTxPrepareFutureAdapter.java | 72 +- .../GridNearPessimisticTxPrepareFuture.java | 59 +- .../near/GridNearSingleGetRequest.java | 396 ++++ .../near/GridNearSingleGetResponse.java | 321 +++ .../near/GridNearTransactionalCache.java | 10 +- .../near/GridNearTxFinishFuture.java | 103 +- .../cache/distributed/near/GridNearTxLocal.java | 273 ++- .../near/GridNearTxPrepareFutureAdapter.java | 20 +- .../near/GridNearTxPrepareRequest.java | 61 +- .../distributed/near/GridNearTxRemote.java | 33 +- .../distributed/near/IgniteTxMappings.java | 75 + .../distributed/near/IgniteTxMappingsImpl.java | 92 + .../near/IgniteTxMappingsSingleImpl.java | 101 + .../processors/cache/local/GridLocalCache.java | 4 +- .../cache/local/GridLocalLockFuture.java | 5 - .../CacheContinuousQueryBatchAck.java | 163 ++ .../continuous/CacheContinuousQueryEntry.java | 196 +- .../continuous/CacheContinuousQueryHandler.java | 811 ++++++- .../CacheContinuousQueryListener.java | 35 + .../continuous/CacheContinuousQueryManager.java | 151 +- .../cache/transactions/IgniteInternalTx.java | 13 +- .../cache/transactions/IgniteTxAdapter.java | 68 +- .../cache/transactions/IgniteTxEntry.java | 29 +- .../cache/transactions/IgniteTxHandler.java | 38 +- .../IgniteTxImplicitSingleStateImpl.java | 266 +++ .../transactions/IgniteTxLocalAdapter.java | 1424 ++++++----- .../cache/transactions/IgniteTxLocalEx.java | 30 +- .../cache/transactions/IgniteTxLocalState.java | 44 + .../transactions/IgniteTxLocalStateAdapter.java | 41 + .../cache/transactions/IgniteTxManager.java | 21 +- .../cache/transactions/IgniteTxMap.java | 3 +- .../cache/transactions/IgniteTxRemoteEx.java | 18 +- .../IgniteTxRemoteSingleStateImpl.java | 108 + .../cache/transactions/IgniteTxRemoteState.java | 34 + .../IgniteTxRemoteStateAdapter.java | 115 + .../transactions/IgniteTxRemoteStateImpl.java | 124 + .../cache/transactions/IgniteTxState.java | 177 ++ .../cache/transactions/IgniteTxStateImpl.java | 414 ++++ .../clock/GridClockSyncProcessor.java | 28 +- .../continuous/GridContinuousBatch.java | 44 + .../continuous/GridContinuousBatchAdapter.java | 46 + .../continuous/GridContinuousHandler.java | 22 + .../continuous/GridContinuousProcessor.java | 221 +- .../StartRoutineAckDiscoveryMessage.java | 14 +- .../StartRoutineDiscoveryMessage.java | 21 +- .../internal/util/UUIDCollectionMessage.java | 114 + .../util/future/GridCompoundFuture.java | 15 +- .../ignite/internal/util/lang/GridFunc.java | 8 +- .../ignite/internal/util/nio/GridNioServer.java | 13 +- .../ignite/marshaller/MarshallerExclusions.java | 4 +- .../org/apache/ignite/mxbean/IgniteMXBean.java | 8 +- .../org/apache/ignite/stream/StreamAdapter.java | 18 +- .../IgniteClientReconnectCacheTest.java | 11 +- .../cache/GridCacheAbstractFullApiSelfTest.java | 75 + .../GridCacheConcurrentTxMultiNodeTest.java | 15 - .../cache/GridCachePartitionedGetSelfTest.java | 3 +- .../processors/cache/GridCacheTestEntryEx.java | 10 +- .../IgniteCacheAbstractStopBusySelfTest.java | 27 +- .../IgniteCacheP2pUnmarshallingErrorTest.java | 184 +- .../CacheGetFutureHangsSelfTest.java | 6 + .../GridCacheAbstractNodeRestartSelfTest.java | 2 + .../IgniteCacheSingleGetMessageTest.java | 357 +++ .../GridCacheReplicatedMetricsSelfTest.java | 9 - .../IgniteCacheTxStoreSessionTest.java | 2 +- ...ContinuousQueryFailoverAbstractSelfTest.java | 2235 ++++++++++++++++++ ...ryFailoverAtomicNearEnabledSelfSelfTest.java | 46 + ...FailoverAtomicPrimaryWriteOrderSelfTest.java | 44 + ...usQueryFailoverAtomicReplicatedSelfTest.java | 40 + ...inuousQueryFailoverTxReplicatedSelfTest.java | 32 + .../CacheContinuousQueryFailoverTxSelfTest.java | 39 + ...ridCacheContinuousQueryAbstractSelfTest.java | 153 +- .../GridCacheContinuousQueryTxSelfTest.java | 49 + ...CacheContinuousQueryClientReconnectTest.java | 187 ++ .../IgniteCacheContinuousQueryClientTest.java | 157 +- ...cheContinuousQueryClientTxReconnectTest.java | 32 + .../p2p/GridP2PSameClassLoaderSelfTest.java | 16 +- .../testframework/junits/GridAbstractTest.java | 2 +- .../junits/common/GridCommonAbstractTest.java | 3 + .../testsuites/IgniteCacheTestSuite3.java | 2 + .../testsuites/IgniteCacheTestSuite4.java | 3 + .../ignite/util/mbeans/GridMBeanSelfTest.java | 33 +- modules/flume/README.txt | 72 + modules/flume/licenses/apache-2.0.txt | 202 ++ modules/flume/pom.xml | 77 + .../ignite/stream/flume/EventTransformer.java | 36 + .../apache/ignite/stream/flume/IgniteSink.java | 186 ++ .../stream/flume/IgniteSinkConstants.java | 35 + .../ignite/stream/flume/IgniteSinkTest.java | 142 ++ .../stream/flume/IgniteSinkTestSuite.java | 37 + .../stream/flume/TestEventTransformer.java | 66 + .../flume/src/test/resources/example-ignite.xml | 71 + .../IgniteCacheQuerySelfTestSuite.java | 16 +- .../GridSpringResourceInjectionSelfTest.java | 143 ++ .../processors/resource/spring-resource.xml | 27 + .../testsuites/IgniteResourceSelfTestSuite.java | 2 + modules/twitter/README.txt | 32 + modules/twitter/licenses/apache-2.0.txt | 202 ++ modules/twitter/pom.xml | 122 + .../ignite/stream/twitter/OAuthSettings.java | 86 + .../ignite/stream/twitter/TwitterStreamer.java | 295 +++ .../twitter/IgniteTwitterStreamerTest.java | 234 ++ .../twitter/IgniteTwitterStreamerTestSuite.java | 32 + .../stream/twitter/TwitterStreamerImpl.java | 79 + .../config/benchmark-multicast.properties | 6 +- .../benchmark-query-put-separated.properties | 87 + .../yardstick/cache/CacheEntryEventProbe.java | 156 ++ .../cache/IgniteSqlQueryPutBenchmark.java | 31 +- .../IgniteSqlQueryPutSeparatedBenchmark.java | 84 + parent/pom.xml | 1 + pom.xml | 3 + 190 files changed, 15606 insertions(+), 2789 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/ignite/blob/3d4ce809/modules/codegen/src/main/java/org/apache/ignite/codegen/MessageCodeGenerator.java ---------------------------------------------------------------------- diff --cc modules/codegen/src/main/java/org/apache/ignite/codegen/MessageCodeGenerator.java index b8ccc03,74c71c4..5a31415 --- a/modules/codegen/src/main/java/org/apache/ignite/codegen/MessageCodeGenerator.java +++ b/modules/codegen/src/main/java/org/apache/ignite/codegen/MessageCodeGenerator.java @@@ -43,7 -43,10 +43,8 @@@ import org.apache.ignite.internal.GridD import org.apache.ignite.internal.GridDirectMap; import org.apache.ignite.internal.GridDirectTransient; import org.apache.ignite.internal.IgniteCodeGeneratingFail; -import org.apache.ignite.internal.processors.cache.distributed.GridDistributedTxPrepareRequest; -import org.apache.ignite.internal.processors.cache.distributed.dht.GridDhtTxPrepareRequest; -import org.apache.ignite.internal.processors.cache.distributed.near.GridNearTxPrepareRequest; +import org.apache.ignite.internal.processors.cache.distributed.dht.atomic.GridNearAtomicUpdateRequest; + import org.apache.ignite.internal.util.UUIDCollectionMessage; import org.apache.ignite.internal.util.typedef.internal.SB; import org.apache.ignite.internal.util.typedef.internal.U; import org.apache.ignite.lang.IgniteUuid; http://git-wip-us.apache.org/repos/asf/ignite/blob/3d4ce809/modules/core/src/main/java/org/apache/ignite/IgniteCache.java ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/ignite/blob/3d4ce809/modules/core/src/main/java/org/apache/ignite/internal/IgniteKernal.java ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/ignite/blob/3d4ce809/modules/core/src/main/java/org/apache/ignite/internal/managers/communication/GridIoMessageFactory.java ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/ignite/blob/3d4ce809/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheAdapter.java ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/ignite/blob/3d4ce809/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheEntryEx.java ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/ignite/blob/3d4ce809/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheMapEntry.java ---------------------------------------------------------------------- diff --cc modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheMapEntry.java index 5ced545,8d363ad..512a801 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheMapEntry.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheMapEntry.java @@@ -1205,14 -1186,15 +1215,16 @@@ public abstract class GridCacheMapEntr val != null, evtOld, evtOld != null || hasValueUnlocked(), - subjId, null, taskName); + subjId, null, taskName, + keepPortable); } - if (cctx.isLocal() || cctx.isReplicated() || (tx != null && tx.local() && !isNear())) - cctx.continuousQueries().onEntryUpdated(this, key, val, old, false); + if (cctx.isLocal() || cctx.isReplicated() || + (!isNear() && !(tx != null && tx.onePhaseCommit() && !tx.local()))) + cctx.continuousQueries().onEntryUpdated(key, val, old, isInternal() || !context().userCache(), + partition(), tx.local(), false, updateCntr0, topVer); - cctx.dataStructures().onEntryUpdated(key, false); + cctx.dataStructures().onEntryUpdated(key, false, keepPortable); } if (log.isDebugEnabled()) @@@ -1224,9 -1206,9 +1236,9 @@@ cctx.store().put(tx, keyValue(false), CU.value(val, cctx, false), newVer); if (intercept) - cctx.config().getInterceptor().onAfterPut(new CacheLazyEntry(cctx, key, key0, val, val0)); + cctx.config().getInterceptor().onAfterPut(new CacheLazyEntry(cctx, key, key0, val, val0, keepPortable)); - return valid ? new GridCacheUpdateTxResult(true, retval ? old : null) : + return valid ? new GridCacheUpdateTxResult(true, retval ? old : null, updateCntr0) : new GridCacheUpdateTxResult(false, null); } @@@ -1377,14 -1366,15 +1397,16 @@@ evtOld != null || hasValueUnlocked(), subjId, null, - taskName); + taskName, + keepPortable); } - if (cctx.isLocal() || cctx.isReplicated() || (tx != null && tx.local() && !isNear())) - cctx.continuousQueries().onEntryUpdated(this, key, null, old, false); + if (cctx.isLocal() || cctx.isReplicated() || + (!isNear() && !(tx != null && tx.onePhaseCommit() && !tx.local()))) + cctx.continuousQueries().onEntryUpdated(key, null, old, isInternal() + || !context().userCache(),partition(), tx.local(), false, updateCntr0, topVer); - cctx.dataStructures().onEntryUpdated(key, true); + cctx.dataStructures().onEntryUpdated(key, true, keepPortable); deferred = cctx.deferredDelete() && !detached() && !isInternal(); @@@ -1718,9 -1707,14 +1740,14 @@@ if (res) updateMetrics(op, metrics); - cctx.continuousQueries().onEntryUpdated(this, key, val, old, false); + if (!isNear()) { + long updateCntr = nextPartCounter(AffinityTopologyVersion.NONE); + + cctx.continuousQueries().onEntryUpdated(key, val, old, isInternal() || !context().userCache(), + partition(), true, false, updateCntr, AffinityTopologyVersion.NONE); + } - cctx.dataStructures().onEntryUpdated(key, op == GridCacheOperation.DELETE); + cctx.dataStructures().onEntryUpdated(key, op == GridCacheOperation.DELETE, keepBinary); if (intercept) { if (op == GridCacheOperation.UPDATE) @@@ -2336,10 -2377,7 +2415,7 @@@ if (res) updateMetrics(op, metrics); - if (cctx.isReplicated() || primary) - cctx.continuousQueries().onEntryUpdated(this, key, val, oldVal, false); - - cctx.dataStructures().onEntryUpdated(key, op == GridCacheOperation.DELETE); + cctx.dataStructures().onEntryUpdated(key, op == GridCacheOperation.DELETE, keepPortable); if (intercept) { if (op == GridCacheOperation.UPDATE) @@@ -3186,10 -3230,10 +3268,10 @@@ drReplicate(drType, val, ver); if (!skipQryNtf) { - if (cctx.isLocal() || cctx.isReplicated() || cctx.affinity().primary(cctx.localNode(), key, topVer)) - cctx.continuousQueries().onEntryUpdated(this, key, val, null, preload); + cctx.continuousQueries().onEntryUpdated(key, val, null, this.isInternal() + || !this.context().userCache(), this.partition(), true, preload, updateCntr, topVer); - cctx.dataStructures().onEntryUpdated(key, false); + cctx.dataStructures().onEntryUpdated(key, false, true); } if (cctx.store().isLocal()) { http://git-wip-us.apache.org/repos/asf/ignite/blob/3d4ce809/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/IgniteCacheProxy.java ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/ignite/blob/3d4ce809/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/GridDistributedLockRequest.java ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/ignite/blob/3d4ce809/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/GridDistributedTxRemoteAdapter.java ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/ignite/blob/3d4ce809/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtLocalPartition.java ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/ignite/blob/3d4ce809/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtLockFuture.java ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/ignite/blob/3d4ce809/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtLockRequest.java ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/ignite/blob/3d4ce809/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTransactionalCacheAdapter.java ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/ignite/blob/3d4ce809/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTxLocalAdapter.java ---------------------------------------------------------------------- diff --cc modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTxLocalAdapter.java index db19c67,55ca12d..2330a95 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTxLocalAdapter.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTxLocalAdapter.java @@@ -707,11 -649,9 +651,10 @@@ public abstract class GridDhtTxLocalAda passedKeys, read, needRetVal, - skipped, accessTtl, null, - skipStore); + skipStore, + keepBinary); } catch (IgniteCheckedException e) { setRollbackOnly(); @@@ -738,14 -677,11 +680,12 @@@ final Collection<KeyCacheObject> passedKeys, final boolean read, final boolean needRetVal, - final Set<KeyCacheObject> skipped, final long accessTtl, @Nullable final CacheEntryPredicate[] filter, - boolean skipStore) { + boolean skipStore, + boolean keepBinary) { if (log.isDebugEnabled()) - log.debug("Before acquiring transaction lock on keys [passedKeys=" + passedKeys + ", skipped=" + - skipped + ']'); + log.debug("Before acquiring transaction lock on keys [keys=" + passedKeys + ']'); if (passedKeys.isEmpty()) return new GridFinishedFuture<>(ret); http://git-wip-us.apache.org/repos/asf/ignite/blob/3d4ce809/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTxPrepareFuture.java ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/ignite/blob/3d4ce809/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTxRemote.java ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/ignite/blob/3d4ce809/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridPartitionedGetFuture.java ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/ignite/blob/3d4ce809/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridDhtAtomicCache.java ---------------------------------------------------------------------- diff --cc modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridDhtAtomicCache.java index 255640f,3ee1048..cd76a56 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridDhtAtomicCache.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridDhtAtomicCache.java @@@ -1765,8 -1875,7 +1881,8 @@@ public class GridDhtAtomicCache<K, V> e req.invokeArguments(), primary && writeThrough() && !req.skipStore(), !req.skipStore(), - req.returnValue(), + sndPrevVal || req.returnValue(), + req.keepBinary(), expiry, true, true, @@@ -2036,8 -2156,7 +2164,8 @@@ null, /*write-through*/false, /*read-through*/false, - /*retval*/false, + /*retval*/sndPrevVal, + req.keepBinary(), expiry, /*event*/true, /*metrics*/true, http://git-wip-us.apache.org/repos/asf/ignite/blob/3d4ce809/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridDhtAtomicUpdateFuture.java ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/ignite/blob/3d4ce809/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridDhtAtomicUpdateRequest.java ---------------------------------------------------------------------- diff --cc modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridDhtAtomicUpdateRequest.java index 7b95042,72a60d2..a8807e1 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridDhtAtomicUpdateRequest.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridDhtAtomicUpdateRequest.java @@@ -139,9 -144,19 +144,22 @@@ public class GridDhtAtomicUpdateReques /** Task name hash. */ private int taskNameHash; + /** Partition. */ + private GridLongList updateCntrs; + + /** On response flag. Access should be synced on future. */ + @GridDirectTransient + private boolean onRes; + + @GridDirectTransient + private List<Integer> partIds; + + @GridDirectTransient + private List<CacheObject> localPrevVals; + + /** Keep portable flag. */ + private boolean keepBinary; + /** * Empty constructor required by {@link Externalizable}. */ @@@ -191,9 -205,10 +209,11 @@@ this.taskNameHash = taskNameHash; this.invokeArgs = invokeArgs; this.addDepInfo = addDepInfo; + this.keepBinary = keepBinary; keys = new ArrayList<>(); + partIds = new ArrayList<>(); + localPrevVals = new ArrayList<>(); if (forceTransformBackups) { entryProcessors = new ArrayList<>(); http://git-wip-us.apache.org/repos/asf/ignite/blob/3d4ce809/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridNearAtomicUpdateFuture.java ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/ignite/blob/3d4ce809/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/colocated/GridDhtColocatedCache.java ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/ignite/blob/3d4ce809/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/colocated/GridDhtColocatedLockFuture.java ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/ignite/blob/3d4ce809/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearAtomicCache.java ---------------------------------------------------------------------- diff --cc modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearAtomicCache.java index 648a248,706655b..49a267a --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearAtomicCache.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearAtomicCache.java @@@ -251,8 -249,7 +251,8 @@@ public class GridNearAtomicCache<K, V> /*write-through*/false, /*read-through*/false, /*retval*/false, + keepPortable, - /**expiry policy*/null, + /*expiry policy*/null, /*event*/true, /*metrics*/true, /*primary*/false, http://git-wip-us.apache.org/repos/asf/ignite/blob/3d4ce809/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearGetFuture.java ---------------------------------------------------------------------- diff --cc modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearGetFuture.java index 685b998,dfaa44e..00f0a75 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearGetFuture.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearGetFuture.java @@@ -562,8 -520,17 +545,10 @@@ public final class GridNearGetFuture<K add(new GridFinishedFuture<>(Collections.singletonMap(key0, val0))); } else { - K key0 = key.value(cctx.cacheObjectContext(), true); - key0 = (K)cctx.unwrapPortableIfNeeded(key0, !deserializePortable); - - V val0; - - if (!skipVals) { - val0 = v.value(cctx.cacheObjectContext(), true); - val0 = (V)cctx.unwrapPortableIfNeeded(val0, !deserializePortable); - } - else - val0 = (V)Boolean.TRUE; + K key0 = (K)cctx.unwrapPortableIfNeeded(key, !deserializePortable); - V val0 = (V)cctx.unwrapPortableIfNeeded(v, !deserializePortable); ++ V val0 = !skipVals ? ++ (V)cctx.unwrapPortableIfNeeded(v, !deserializePortable) : ++ (V)Boolean.TRUE; add(new GridFinishedFuture<>(Collections.singletonMap(key0, val0))); } http://git-wip-us.apache.org/repos/asf/ignite/blob/3d4ce809/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearLockFuture.java ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/ignite/blob/3d4ce809/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearLockRequest.java ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/ignite/blob/3d4ce809/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearTransactionalCache.java ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/ignite/blob/3d4ce809/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearTxLocal.java ---------------------------------------------------------------------- diff --cc modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearTxLocal.java index 4b5b204,1c01e4e..2eb4c68 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearTxLocal.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearTxLocal.java @@@ -347,9 -332,8 +332,9 @@@ public class GridNearTxLocal extends Gr boolean readThrough, boolean async, final Collection<KeyCacheObject> keys, - boolean skipVals, + final boolean skipVals, final boolean needVer, + boolean keepBinary, final GridInClosure3<KeyCacheObject, Object, GridCacheVersion> c ) { if (cacheCtx.isNear()) { http://git-wip-us.apache.org/repos/asf/ignite/blob/3d4ce809/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearTxRemote.java ---------------------------------------------------------------------- diff --cc modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearTxRemote.java index 58ee0c6,ba58f57..cef8371 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearTxRemote.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearTxRemote.java @@@ -390,10 -392,9 +393,10 @@@ public class GridNearTxRemote extends G -1L, cached, drVer, - skipStore); + skipStore, + keepBinary); - writeMap.put(key, txEntry); + txState.addWriteEntry(key, txEntry); return true; } http://git-wip-us.apache.org/repos/asf/ignite/blob/3d4ce809/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/local/GridLocalLockFuture.java ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/ignite/blob/3d4ce809/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxAdapter.java ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/ignite/blob/3d4ce809/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxEntry.java ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/ignite/blob/3d4ce809/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxLocalAdapter.java ---------------------------------------------------------------------- diff --cc modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxLocalAdapter.java index 9f52699,cff62d9..fae7d8c --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxLocalAdapter.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxLocalAdapter.java @@@ -2133,265 -2183,363 +2201,363 @@@ public abstract class IgniteTxLocalAdap KeyCacheObject cacheKey = cacheCtx.toCacheKeyObject(key); - IgniteTxKey txKey = cacheCtx.txKey(cacheKey); - - IgniteTxEntry txEntry = entry(txKey); - - // First time access. - if (txEntry == null) { - while (true) { - GridCacheEntryEx entry = entryEx(cacheCtx, txKey, topologyVersion()); - - try { - entry.unswap(false); - - // Check if lock is being explicitly acquired by the same thread. - if (!implicit && cctx.kernalContext().config().isCacheSanityCheckEnabled() && - entry.lockedByThread(threadId, xidVer)) - throw new IgniteCheckedException("Cannot access key within transaction if lock is " + - "externally held [key=" + key + ", entry=" + entry + ", xidVer=" + xidVer + - ", threadId=" + threadId + - ", locNodeId=" + cctx.localNodeId() + ']'); - - CacheObject old = null; - GridCacheVersion readVer = null; + boolean loadMissed = enlistWriteEntry(cacheCtx, + cacheKey, + val, + entryProcessor, + invokeArgs, + expiryPlc, + retval, + lockOnly, + filter, + drVer, + drTtl, + drExpireTime, + ret, + enlisted, + skipStore, + singleRmv, + hasFilters, + needVal, + needReadVer); + + if (loadMissed) { + if (missedForLoad == null) + missedForLoad = new HashSet<>(); + + missedForLoad.add(cacheKey); + } + } - if (optimistic() && !implicit()) { - try { - if (needReadVer) { - T2<CacheObject, GridCacheVersion> res = primaryLocal(entry) ? - entry.innerGetVersioned(this, - /*swap*/false, - /*unmarshal*/retval, - /*metrics*/retval, - /*events*/retval, - CU.subjectId(this, cctx), - entryProcessor, - resolveTaskName(), - null, - keepBinary) : null; + if (missedForLoad != null) { + return loadMissing(cacheCtx, + missedForLoad, + filter, + ret, + needReadVer, + singleRmv, + hasFilters, + skipStore, + retval); + } - if (res != null) { - old = res.get1(); - readVer = res.get2(); - } - } - else { - old = entry.innerGet(this, - /*swap*/false, - /*read-through*/false, - /*fail-fast*/false, - /*unmarshal*/retval, - /*metrics*/retval, - /*events*/retval, - /*temporary*/false, - CU.subjectId(this, cctx), - entryProcessor, - resolveTaskName(), - null, - keepBinary); - } - } - catch (ClusterTopologyCheckedException e) { - entry.context().evicts().touch(entry, topologyVersion()); + return new GridFinishedFuture<>(); + } + catch (IgniteCheckedException e) { + return new GridFinishedFuture<>(e); + } + } - throw e; - } - } - else - old = retval ? entry.rawGetOrUnmarshal(false) : entry.rawGet(); + /** + * @param cacheCtx Cache context. + * @param keys Keys to load. + * @param filter Filter. + * @param ret Return value. + * @param needReadVer Read version flag. + * @param singleRmv {@code True} for single remove operation. + * @param hasFilters {@code True} if filters not empty. + * @param skipStore Skip store flag. + * @param retval Return value flag. + * @return Load future. + */ + private IgniteInternalFuture<Void> loadMissing( + final GridCacheContext cacheCtx, + final Set<KeyCacheObject> keys, + final CacheEntryPredicate[] filter, + final GridCacheReturn ret, + final boolean needReadVer, + final boolean singleRmv, + final boolean hasFilters, + final boolean skipStore, + final boolean retval) { + GridInClosure3<KeyCacheObject, Object, GridCacheVersion> c = + new GridInClosure3<KeyCacheObject, Object, GridCacheVersion>() { + @Override public void apply(KeyCacheObject key, + @Nullable Object val, + @Nullable GridCacheVersion loadVer) { + if (log.isDebugEnabled()) + log.debug("Loaded value from remote node [key=" + key + ", val=" + val + ']'); - if (old != null && hasFilters && !filter(entry.context(), cacheKey, old, filter)) { - skipped = skip(skipped, cacheKey); + IgniteTxEntry e = entry(new IgniteTxKey(key, cacheCtx.cacheId())); - ret.set(cacheCtx, old, false, keepBinary); + assert e != null; - if (!readCommitted()) { - // Enlist failed filters as reads for non-read-committed mode, - // so future ops will get the same values. - txEntry = addEntry(READ, - old, - null, - null, - entry, - null, - CU.empty0(), - false, - -1L, - -1L, - null, - skipStore, - keepBinary); + if (needReadVer) { + assert loadVer != null; - txEntry.markValid(); + e.serializableReadVersion(singleRmv && val != null ? SER_READ_NOT_EMPTY_VER : loadVer); + } - if (needReadVer) { - assert readVer != null; + if (singleRmv) { + assert !hasFilters && !retval; + assert val == null || Boolean.TRUE.equals(val) : val; - txEntry.serializableReadVersion(singleRmv ? SER_READ_NOT_EMPTY_VER : readVer); - } - } - ret.set(cacheCtx, null, val != null); ++ ret.set(cacheCtx, null, val != null, keepBinary); + } + else { + CacheObject cacheVal = cacheCtx.toCacheObject(val); - if (readCommitted()) - cacheCtx.evicts().touch(entry, topologyVersion()); + if (e.op() == TRANSFORM) { + GridCacheVersion ver; - break; // While. + try { + ver = e.cached().version(); } + catch (GridCacheEntryRemovedException ex) { + assert optimistic() : e; - final GridCacheOperation op = lockOnly ? NOOP : rmv ? DELETE : - entryProcessor != null ? TRANSFORM : old != null ? UPDATE : CREATE; + if (log.isDebugEnabled()) + log.debug("Failed to get entry version: [msg=" + ex.getMessage() + ']'); - txEntry = addEntry(op, - cacheCtx.toCacheObject(val), - entryProcessor, - invokeArgs, - entry, - expiryPlc, - filter, - true, - drTtl, - drExpireTime, - drVer, - skipStore, - keepBinary); + ver = null; + } - if (!implicit() && readCommitted() && !cacheCtx.offheapTiered()) - cacheCtx.evicts().touch(entry, topologyVersion()); + addInvokeResult(e, cacheVal, ret, ver); + } + else { + boolean success = !hasFilters || isAll(e.context(), key, cacheVal, filter); - enlisted.add(cacheKey); + ret.set(cacheCtx, cacheVal, success); + } + } + } + }; - if (!pessimistic() && !implicit()) { - txEntry.markValid(); + return loadMissing( + cacheCtx, + /*read through*/cacheCtx.config().isLoadPreviousValue() && !skipStore, + /*async*/true, + keys, + /*skipVals*/singleRmv, + needReadVer, + c); + } - if (old == null) { - if (needVal) { - if (missedForLoad == null) - missedForLoad = new HashSet<>(); + /** + * @param cacheCtx Cache context. + * @param cacheKey Key. + * @param val Value. + * @param entryProcessor Entry processor. + * @param invokeArgs Optional arguments for EntryProcessor. + * @param expiryPlc Explicitly specified expiry policy for entry. + * @param retval Return value flag. + * @param lockOnly + * @param filter Filter. + * @param drVer DR version. + * @param drTtl DR ttl. + * @param drExpireTime DR expire time. + * @param ret Return value. + * @param enlisted Enlisted keys collection. + * @param skipStore Skip store flag. + * @param singleRmv {@code True} for single remove operation. + * @param hasFilters {@code True} if filters not empty. + * @param needVal {@code True} if value is needed. + * @param needReadVer {@code True} if need read entry version. + * @return {@code True} if entry value should be loaded. + * @throws IgniteCheckedException If failed. + */ + private boolean enlistWriteEntry(GridCacheContext cacheCtx, + final KeyCacheObject cacheKey, + final @Nullable Object val, + final @Nullable EntryProcessor<?, ?, ?> entryProcessor, + final @Nullable Object[] invokeArgs, + final @Nullable ExpiryPolicy expiryPlc, + final boolean retval, + final boolean lockOnly, + final CacheEntryPredicate[] filter, + final GridCacheVersion drVer, + final long drTtl, + long drExpireTime, + final GridCacheReturn ret, + @Nullable final Collection<KeyCacheObject> enlisted, + boolean skipStore, + boolean singleRmv, + boolean hasFilters, + final boolean needVal, + boolean needReadVer + ) throws IgniteCheckedException { + boolean loadMissed = false; - missedForLoad.add(cacheKey); - } - else { - assert !implicit() || !transform : this; - assert txEntry.op() != TRANSFORM : txEntry; + final boolean rmv = val == null && entryProcessor == null; - if (retval) - ret.set(cacheCtx, null, true, keepBinary); - else - ret.success(true); - } - } - else { - if (needReadVer) { - assert readVer != null; + IgniteTxKey txKey = cacheCtx.txKey(cacheKey); - txEntry.serializableReadVersion(singleRmv ? SER_READ_NOT_EMPTY_VER : readVer); - } + IgniteTxEntry txEntry = entry(txKey); - if (retval && !transform) - ret.set(cacheCtx, old, true, keepBinary); - else { - if (txEntry.op() == TRANSFORM) { - GridCacheVersion ver; + // First time access. + if (txEntry == null) { + while (true) { + GridCacheEntryEx entry = entryEx(cacheCtx, txKey, topologyVersion()); - try { - ver = entry.version(); - } - catch (GridCacheEntryRemovedException ex) { - assert optimistic() : txEntry; + try { + entry.unswap(false); + + // Check if lock is being explicitly acquired by the same thread. + if (!implicit && cctx.kernalContext().config().isCacheSanityCheckEnabled() && + entry.lockedByThread(threadId, xidVer)) { + throw new IgniteCheckedException("Cannot access key within transaction if lock is " + + "externally held [key=" + CU.value(cacheKey, cacheCtx, false) + + ", entry=" + entry + + ", xidVer=" + xidVer + + ", threadId=" + threadId + + ", locNodeId=" + cctx.localNodeId() + ']'); + } - if (log.isDebugEnabled()) - log.debug("Failed to get entry version " + - "[err=" + ex.getMessage() + ']'); + CacheObject old = null; + GridCacheVersion readVer = null; - ver = null; - } + if (optimistic() && !implicit()) { + try { + if (needReadVer) { + T2<CacheObject, GridCacheVersion> res = primaryLocal(entry) ? + entry.innerGetVersioned(this, + /*swap*/false, + /*unmarshal*/retval, + /*metrics*/retval, + /*events*/retval, + CU.subjectId(this, cctx), + entryProcessor, + resolveTaskName(), + null) : null; - addInvokeResult(txEntry, old, ret, ver); - } - else - ret.success(true); - } + if (res != null) { + old = res.get1(); + readVer = res.get2(); } } - // Pessimistic. else { - if (retval && !transform) - ret.set(cacheCtx, old, true, keepBinary); - else - ret.success(true); + old = entry.innerGet(this, + /*swap*/false, + /*read-through*/false, + /*fail-fast*/false, + /*unmarshal*/retval, + /*metrics*/retval, + /*events*/retval, + /*temporary*/false, + CU.subjectId(this, cctx), + entryProcessor, + resolveTaskName(), + null); } - - break; // While. } - catch (GridCacheEntryRemovedException ignore) { - if (log.isDebugEnabled()) - log.debug("Got removed entry in transaction putAll0 method: " + entry); + catch (ClusterTopologyCheckedException e) { + entry.context().evicts().touch(entry, topologyVersion()); + + throw e; } } - } - else { - if (entryProcessor == null && txEntry.op() == TRANSFORM) - throw new IgniteCheckedException("Failed to enlist write value for key (cannot have update value in " + - "transaction after EntryProcessor is applied): " + key); - - GridCacheEntryEx entry = txEntry.cached(); + else + old = retval ? entry.rawGetOrUnmarshal(false) : entry.rawGet(); - CacheObject v = txEntry.value(); + if (old != null && hasFilters && !filter(entry.context(), cacheKey, old, filter)) { + ret.set(cacheCtx, old, false); - boolean del = txEntry.op() == DELETE && rmv; + if (!readCommitted()) { + // Enlist failed filters as reads for non-read-committed mode, + // so future ops will get the same values. + txEntry = addEntry(READ, + old, + null, + null, + entry, + null, + CU.empty0(), + false, + -1L, + -1L, + null, + skipStore); - if (!del) { - if (hasFilters && !filter(entry.context(), cacheKey, v, filter)) { - skipped = skip(skipped, cacheKey); + txEntry.markValid(); - ret.set(cacheCtx, v, false, keepBinary); + if (needReadVer) { + assert readVer != null; - continue; + txEntry.serializableReadVersion(singleRmv ? SER_READ_NOT_EMPTY_VER : readVer); + } } - GridCacheOperation op = rmv ? DELETE : entryProcessor != null ? TRANSFORM : - v != null ? UPDATE : CREATE; + if (readCommitted()) + cacheCtx.evicts().touch(entry, topologyVersion()); - txEntry = addEntry(op, - cacheCtx.toCacheObject(val), - entryProcessor, - invokeArgs, - entry, - expiryPlc, - filter, - true, - drTtl, - drExpireTime, - drVer, - skipStore, - keepBinary); + break; // While. + } + + final GridCacheOperation op = lockOnly ? NOOP : rmv ? DELETE : + entryProcessor != null ? TRANSFORM : old != null ? UPDATE : CREATE; + + txEntry = addEntry(op, + cacheCtx.toCacheObject(val), + entryProcessor, + invokeArgs, + entry, + expiryPlc, + filter, + true, + drTtl, + drExpireTime, + drVer, + skipStore); + if (!implicit() && readCommitted() && !cacheCtx.offheapTiered()) + cacheCtx.evicts().touch(entry, topologyVersion()); + + if (enlisted != null) enlisted.add(cacheKey); - if (txEntry.op() == TRANSFORM) { - GridCacheVersion ver; + if (!pessimistic() && !implicit()) { + txEntry.markValid(); - try { - ver = entry.version(); - } - catch (GridCacheEntryRemovedException e) { - assert optimistic() : txEntry; + if (old == null) { + if (needVal) + loadMissed = true; + else { + assert !implicit() || !transform : this; + assert txEntry.op() != TRANSFORM : txEntry; - if (log.isDebugEnabled()) - log.debug("Failed to get entry version: [msg=" + e.getMessage() + ']'); + if (retval) + ret.set(cacheCtx, null, true); + else + ret.success(true); + } + } + else { + if (needReadVer) { + assert readVer != null; - ver = null; + txEntry.serializableReadVersion(singleRmv ? SER_READ_NOT_EMPTY_VER : readVer); } - addInvokeResult(txEntry, txEntry.value(), ret, ver); - } - } + if (retval && !transform) + ret.set(cacheCtx, old, true); + else { + if (txEntry.op() == TRANSFORM) { + GridCacheVersion ver; - if (!pessimistic()) { - txEntry.markValid(); + try { + ver = entry.version(); + } + catch (GridCacheEntryRemovedException ex) { + assert optimistic() : txEntry; + if (log.isDebugEnabled()) + log.debug("Failed to get entry version " + + "[err=" + ex.getMessage() + ']'); + + ver = null; + } + + addInvokeResult(txEntry, old, ret, ver); + } + else + ret.success(true); + } + } + } + // Pessimistic. + else { if (retval && !transform) - ret.set(cacheCtx, v, true, keepBinary); + ret.set(cacheCtx, old, true); else ret.success(true); } @@@ -2829,19 -3081,15 +3101,16 @@@ drMap, null, opCtx != null && opCtx.skipStore(), - false); + false, + opCtx != null && opCtx.isKeepBinary()); if (pessimistic()) { - // Loose all skipped. - final Set<KeyCacheObject> loaded = loadFut.get(); - - final Collection<KeyCacheObject> keys = F.view(enlisted, F0.notIn(loaded)); + assert loadFut == null || loadFut.isDone() : loadFut; if (log.isDebugEnabled()) - log.debug("Before acquiring transaction lock for put on keys: " + keys); + log.debug("Before acquiring transaction lock for put on keys: " + enlisted); - IgniteInternalFuture<Boolean> fut = cacheCtx.cache().txLockAsync(keys, + IgniteInternalFuture<Boolean> fut = cacheCtx.cache().txLockAsync(enlisted, lockTimeout(), this, false, @@@ -3029,141 -3292,131 +3313,132 @@@ init(); - try { - Collection<KeyCacheObject> enlisted = new ArrayList<>(); - - CacheOperationContext opCtx = cacheCtx.operationContextPerCall(); + final Collection<KeyCacheObject> enlisted = new ArrayList<>(); - ExpiryPolicy plc; + CacheOperationContext opCtx = cacheCtx.operationContextPerCall(); - if (!F.isEmpty(filter)) - plc = opCtx != null ? opCtx.expiry() : null; - else - plc = null; + ExpiryPolicy plc; - final IgniteInternalFuture<Set<KeyCacheObject>> loadFut = enlistWrite( - cacheCtx, - keys0, - plc, - implicit, - /** lookup map */null, - /** invoke map */null, - /** invoke arguments */null, - retval, - /** lock only */false, - filter, - ret, - enlisted, - null, - drMap, - opCtx != null && opCtx.skipStore(), - singleRmv, - opCtx != null && opCtx.isKeepBinary() - ); + if (!F.isEmpty(filter)) + plc = opCtx != null ? opCtx.expiry() : null; + else + plc = null; - if (log.isDebugEnabled()) - log.debug("Remove keys: " + enlisted); + final IgniteInternalFuture<Void> loadFut = enlistWrite( + cacheCtx, + keys0, + plc, + /** lookup map */null, + /** invoke map */null, + /** invoke arguments */null, + retval, + /** lock only */false, + filter, + ret, + enlisted, + null, + drMap, + opCtx != null && opCtx.skipStore(), - singleRmv ++ singleRmv, ++ opCtx != null && opCtx.isKeepBinary() + ); - // Acquire locks only after having added operation to the write set. - // Otherwise, during rollback we will not know whether locks need - // to be rolled back. - if (pessimistic()) { - // Loose all skipped. - final Collection<KeyCacheObject> passedKeys = F.view(enlisted, F0.notIn(loadFut.get())); + if (log.isDebugEnabled()) + log.debug("Remove keys: " + enlisted); - if (log.isDebugEnabled()) - log.debug("Before acquiring transaction lock for remove on keys: " + passedKeys); + // Acquire locks only after having added operation to the write set. + // Otherwise, during rollback we will not know whether locks need + // to be rolled back. + if (pessimistic()) { + assert loadFut.isDone() : loadFut; - IgniteInternalFuture<Boolean> fut = cacheCtx.cache().txLockAsync(passedKeys, - lockTimeout(), - this, - false, - retval, - isolation, - isInvalidate(), - -1L); + if (log.isDebugEnabled()) + log.debug("Before acquiring transaction lock for remove on keys: " + enlisted); - PLC1<GridCacheReturn> plc1 = new PLC1<GridCacheReturn>(ret) { - @Override protected GridCacheReturn postLock(GridCacheReturn ret) - throws IgniteCheckedException - { - if (log.isDebugEnabled()) - log.debug("Acquired transaction lock for remove on keys: " + passedKeys); + IgniteInternalFuture<Boolean> fut = cacheCtx.cache().txLockAsync(enlisted, + lockTimeout(), + this, + false, + retval, + isolation, + isInvalidate(), + -1L); + + PLC1<GridCacheReturn> plc1 = new PLC1<GridCacheReturn>(ret) { + @Override protected GridCacheReturn postLock(GridCacheReturn ret) + throws IgniteCheckedException + { + if (log.isDebugEnabled()) + log.debug("Acquired transaction lock for remove on keys: " + enlisted); - postLockWrite(cacheCtx, - passedKeys, - loadFut.get(), - ret, + postLockWrite(cacheCtx, + enlisted, + ret, /*remove*/true, - retval, + retval, /*read*/false, - -1L, - filter, + -1L, + filter, /*computeInvoke*/false); - return ret; - } - }; + return ret; + } + }; - if (fut.isDone()) { + if (fut.isDone()) { + try { + return nonInterruptable(plc1.apply(fut.get(), null)); + } + catch (GridClosureException e) { + return new GridFinishedFuture<>(e.unwrap()); + } + catch (IgniteCheckedException e) { try { - return nonInterruptable(plc1.apply(fut.get(), null)); + return nonInterruptable(plc1.apply(false, e)); } - catch (GridClosureException e) { - return new GridFinishedFuture<>(e.unwrap()); - } - catch (IgniteCheckedException e) { - try { - return nonInterruptable(plc1.apply(false, e)); - } - catch (Exception e1) { - return new GridFinishedFuture<>(e1); - } + catch (Exception e1) { + return new GridFinishedFuture<>(e1); } } - else - return nonInterruptable(new GridEmbeddedFuture<>( - fut, - plc1 - )); } - else { - if (implicit()) { - // Should never load missing values for implicit transaction as values will be returned - // with prepare response, if required. - assert loadFut.isDone(); - - return nonInterruptable(commitAsync().chain(new CX1<IgniteInternalFuture<IgniteInternalTx>, GridCacheReturn>() { - @Override public GridCacheReturn applyx(IgniteInternalFuture<IgniteInternalTx> txFut) - throws IgniteCheckedException { - try { - txFut.get(); + else + return nonInterruptable(new GridEmbeddedFuture<>( + fut, + plc1 + )); + } + else { + if (implicit()) { + // Should never load missing values for implicit transaction as values will be returned + // with prepare response, if required. + assert loadFut.isDone(); - return implicitRes; - } - catch (IgniteCheckedException | RuntimeException e) { - rollbackAsync(); + return nonInterruptable(commitAsync().chain(new CX1<IgniteInternalFuture<IgniteInternalTx>, GridCacheReturn>() { + @Override public GridCacheReturn applyx(IgniteInternalFuture<IgniteInternalTx> txFut) + throws IgniteCheckedException { + try { + txFut.get(); - throw e; - } + return implicitRes; } - })); - } - else - return nonInterruptable(loadFut.chain(new CX1<IgniteInternalFuture<Set<KeyCacheObject>>, GridCacheReturn>() { - @Override public GridCacheReturn applyx(IgniteInternalFuture<Set<KeyCacheObject>> f) - throws IgniteCheckedException { - f.get(); + catch (IgniteCheckedException | RuntimeException e) { + rollbackAsync(); - return ret; + throw e; } - })); + } + })); } - } - catch (IgniteCheckedException e) { - setRollbackOnly(); + else { + return nonInterruptable(loadFut.chain(new CX1<IgniteInternalFuture<Void>, GridCacheReturn>() { + @Override public GridCacheReturn applyx(IgniteInternalFuture<Void> f) + throws IgniteCheckedException { + f.get(); - return new GridFinishedFuture<>(e); + return ret; + } + })); + } } } http://git-wip-us.apache.org/repos/asf/ignite/blob/3d4ce809/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxLocalEx.java ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/ignite/blob/3d4ce809/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxManager.java ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/ignite/blob/3d4ce809/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/GridCacheTestEntryEx.java ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/ignite/blob/3d4ce809/modules/core/src/test/java/org/apache/ignite/testframework/junits/GridAbstractTest.java ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/ignite/blob/3d4ce809/modules/yardstick/src/main/java/org/apache/ignite/yardstick/cache/IgniteSqlQueryPutBenchmark.java ---------------------------------------------------------------------- diff --cc modules/yardstick/src/main/java/org/apache/ignite/yardstick/cache/IgniteSqlQueryPutBenchmark.java index db44fac,99b2423..dfa4cbc --- a/modules/yardstick/src/main/java/org/apache/ignite/yardstick/cache/IgniteSqlQueryPutBenchmark.java +++ b/modules/yardstick/src/main/java/org/apache/ignite/yardstick/cache/IgniteSqlQueryPutBenchmark.java @@@ -74,21 -76,6 +76,11 @@@ public class IgniteSqlQueryPutBenchmar return true; } + /** {@inheritDoc} */ + @Override public void onWarmupFinished() { + super.onWarmupFinished(); - - resCnt.reset(); - cnt.reset(); - } - - /** {@inheritDoc} */ - @Override public void tearDown() throws Exception { - ignite().log().info("Average number of entries per query: " + ((double)resCnt.longValue() / cnt.longValue())); - - super.tearDown(); + } + /** * @param minSalary Min salary. * @param maxSalary Max salary. http://git-wip-us.apache.org/repos/asf/ignite/blob/3d4ce809/parent/pom.xml ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/ignite/blob/3d4ce809/pom.xml ---------------------------------------------------------------------- diff --cc pom.xml index 68e57c6,1008981..115df88 --- a/pom.xml +++ b/pom.xml @@@ -73,10 -73,14 +73,13 @@@ <module>modules/cloud</module> <module>modules/mesos</module> <module>modules/kafka</module> + <module>modules/flume</module> <module>modules/yarn</module> <module>modules/jms11</module> + <module>modules/twitter</module> <module>modules/mqtt</module> <module>modules/zookeeper</module> + <module>modules/camel</module> - <module>modules/platform</module> </modules> <profiles>