ignite-1516 Optimize GridH2AbstractKeyValueRow.getValue
Project: http://git-wip-us.apache.org/repos/asf/ignite/repo Commit: http://git-wip-us.apache.org/repos/asf/ignite/commit/72c3eef2 Tree: http://git-wip-us.apache.org/repos/asf/ignite/tree/72c3eef2 Diff: http://git-wip-us.apache.org/repos/asf/ignite/diff/72c3eef2 Branch: refs/heads/ignite-1093-2 Commit: 72c3eef2aa31df4a68b46a8877809cc0f49c1368 Parents: 39dace4 Author: sboikov <sboi...@gridgain.com> Authored: Tue Sep 22 13:51:09 2015 +0300 Committer: sboikov <sboi...@gridgain.com> Committed: Tue Sep 22 13:51:09 2015 +0300 ---------------------------------------------------------------------- .../processors/cache/GridCacheAdapter.java | 8 +-- .../processors/cache/GridCacheMapEntry.java | 14 ++--- .../processors/cache/GridCacheProcessor.java | 6 +-- .../cache/GridCacheSwapEntryImpl.java | 31 +++++++++-- .../processors/cache/GridCacheSwapManager.java | 56 +++++++++++++------- .../processors/query/h2/IgniteH2Indexing.java | 19 ++++--- .../query/h2/opt/GridH2AbstractKeyValueRow.java | 49 ++++++++++------- .../query/h2/opt/GridH2KeyValueRowOffheap.java | 11 +++- .../query/h2/opt/GridH2RowDescriptor.java | 5 ++ .../cache/CacheIndexStreamerTest.java | 33 +++++++++--- .../processors/cache/GridCacheSwapSelfTest.java | 4 +- .../IgniteCacheWithIndexingTestSuite.java | 2 + 12 files changed, 158 insertions(+), 80 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/ignite/blob/72c3eef2/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheAdapter.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheAdapter.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheAdapter.java index 1fc94ec..ae987b7 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheAdapter.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheAdapter.java @@ -805,9 +805,7 @@ public abstract class GridCacheAdapter<K, V> implements IgniteInternalCache<K, V if (modes.offheap || modes.swap) { GridCacheSwapManager swapMgr = ctx.isNear() ? ctx.near().dht().context().swap() : ctx.swap(); - GridCacheSwapEntry swapEntry = swapMgr.read(cacheKey, modes.offheap, modes.swap); - - cacheVal = swapEntry != null ? swapEntry.value() : null; + cacheVal = swapMgr.readValue(cacheKey, modes.offheap, modes.swap); } } else @@ -856,9 +854,7 @@ public abstract class GridCacheAdapter<K, V> implements IgniteInternalCache<K, V if (offheap || swap) { GridCacheSwapManager swapMgr = ctx.isNear() ? ctx.near().dht().context().swap() : ctx.swap(); - GridCacheSwapEntry swapEntry = swapMgr.read(key, offheap, swap); - - return swapEntry != null ? swapEntry.value() : null; + return swapMgr.readValue(key, offheap, swap); } return null; http://git-wip-us.apache.org/repos/asf/ignite/blob/72c3eef2/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheMapEntry.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheMapEntry.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheMapEntry.java index 961c792..4bf0aa1 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheMapEntry.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheMapEntry.java @@ -512,7 +512,7 @@ public abstract class GridCacheMapEntry extends GridMetadataAwareAdapter impleme } } else - e = detached() ? cctx.swap().read(this, true, true, true) : cctx.swap().readAndRemove(this); + e = detached() ? cctx.swap().read(this, true, true, true, false) : cctx.swap().readAndRemove(this); if (log.isDebugEnabled()) log.debug("Read swap entry [swapEntry=" + e + ", cacheEntry=" + this + ']'); @@ -2840,7 +2840,7 @@ public abstract class GridCacheMapEntry extends GridMetadataAwareAdapter impleme } if (offheap || swap) { - GridCacheSwapEntry e = cctx.swap().read(this, false, offheap, swap); + GridCacheSwapEntry e = cctx.swap().read(this, false, offheap, swap, true); return e != null ? e.value() : null; } @@ -3581,14 +3581,8 @@ public abstract class GridCacheMapEntry extends GridMetadataAwareAdapter impleme CacheObject val = rawGetOrUnmarshalUnlocked(false); - if (val == null) { - GridCacheSwapEntry swapEntry = cctx.swap().read(key, true, true); - - if (swapEntry == null) - return null; - - return swapEntry.value(); - } + if (val == null) + val = cctx.swap().readValue(key, true, true); return val; } http://git-wip-us.apache.org/repos/asf/ignite/blob/72c3eef2/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheProcessor.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheProcessor.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheProcessor.java index 9c325aa..e92ea57 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheProcessor.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheProcessor.java @@ -2763,14 +2763,10 @@ public class GridCacheProcessor extends GridProcessorAdapter { try { KeyCacheObject key = cctx.toCacheKeyObject(keyBytes); - GridCacheSwapEntry swapEntry = GridCacheSwapEntryImpl.unmarshal(valBytes); + GridCacheSwapEntry swapEntry = GridCacheSwapEntryImpl.unmarshal(valBytes, true); CacheObject val = swapEntry.value(); - if (val == null) - val = cctx.cacheObjects().toCacheObject(cctx.cacheObjectContext(), swapEntry.type(), - swapEntry.valueBytes()); - assert val != null; qryMgr.remove(key, val); http://git-wip-us.apache.org/repos/asf/ignite/blob/72c3eef2/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheSwapEntryImpl.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheSwapEntryImpl.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheSwapEntryImpl.java index b7c66d3..6b1266f 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheSwapEntryImpl.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheSwapEntryImpl.java @@ -94,8 +94,6 @@ public class GridCacheSwapEntryImpl implements GridCacheSwapEntry { long expireTime, @Nullable IgniteUuid keyClsLdrId, @Nullable IgniteUuid valClsLdrId) { - assert ver != null; - this.valBytes = valBytes; this.type = type; this.ver = ver; @@ -268,9 +266,36 @@ public class GridCacheSwapEntryImpl implements GridCacheSwapEntry { /** * @param arr Entry bytes. + * @param valOnly If {@code true} unmarshalls only entry value. * @return Entry. */ - public static GridCacheSwapEntryImpl unmarshal(byte[] arr) { + public static GridCacheSwapEntryImpl unmarshal(byte[] arr, boolean valOnly) { + if (valOnly) { + long off = BYTE_ARR_OFF + VERSION_OFFSET; // Skip ttl, expire time. + + boolean verEx = UNSAFE.getByte(arr, off++) != 0; + + off += verEx ? VERSION_EX_SIZE : VERSION_SIZE; + + int arrLen = UNSAFE.getInt(arr, off); + + off += 4; + + byte type = UNSAFE.getByte(arr, off++); + + byte[] valBytes = new byte[arrLen]; + + UNSAFE.copyMemory(arr, off, valBytes, BYTE_ARR_OFF, arrLen); + + return new GridCacheSwapEntryImpl(ByteBuffer.wrap(valBytes), + type, + null, + 0L, + 0L, + null, + null); + } + long off = BYTE_ARR_OFF; long ttl = UNSAFE.getLong(arr, off); http://git-wip-us.apache.org/repos/asf/ignite/blob/72c3eef2/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheSwapManager.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheSwapManager.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheSwapManager.java index d9a8b5c..2ab7b5b 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheSwapManager.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheSwapManager.java @@ -569,6 +569,7 @@ public class GridCacheSwapManager extends GridCacheManagerAdapter { * @param entryLocked {@code True} if cache entry is locked. * @param readOffheap Read offheap flag. * @param readSwap Read swap flag. + * @param valOnly If {@code true} unmarshals only entry value. * @return Value from swap or {@code null}. * @throws IgniteCheckedException If failed. */ @@ -578,7 +579,8 @@ public class GridCacheSwapManager extends GridCacheManagerAdapter { int part, boolean entryLocked, boolean readOffheap, - boolean readSwap) + boolean readSwap, + boolean valOnly) throws IgniteCheckedException { assert readOffheap || readSwap; @@ -605,7 +607,7 @@ public class GridCacheSwapManager extends GridCacheManagerAdapter { cctx.cache().metrics0().onOffHeapRead(bytes != null); if (bytes != null) - return swapEntry(unmarshalSwapEntry(bytes)); + return swapEntry(unmarshalSwapEntry(bytes, valOnly)); } if (!swapEnabled || !readSwap) @@ -620,7 +622,7 @@ public class GridCacheSwapManager extends GridCacheManagerAdapter { if (bytes == null && lsnr != null) return lsnr.entry; - return bytes != null ? swapEntry(unmarshalSwapEntry(bytes)) : null; + return bytes != null ? swapEntry(unmarshalSwapEntry(bytes, valOnly)) : null; } finally { if (lsnr != null) @@ -706,7 +708,7 @@ public class GridCacheSwapManager extends GridCacheManagerAdapter { if (rmv != null) { try { - GridCacheSwapEntry entry = swapEntry(unmarshalSwapEntry(rmv)); + GridCacheSwapEntry entry = swapEntry(unmarshalSwapEntry(rmv, false)); if (entry == null) return; @@ -756,20 +758,22 @@ public class GridCacheSwapManager extends GridCacheManagerAdapter { * @param locked {@code True} if cache entry is locked. * @param readOffheap Read offheap flag. * @param readSwap Read swap flag. + * @param valOnly If {@code true} unmarshals only entry value. * @return Read value. * @throws IgniteCheckedException If read failed. */ @Nullable GridCacheSwapEntry read(GridCacheEntryEx entry, boolean locked, boolean readOffheap, - boolean readSwap) + boolean readSwap, + boolean valOnly) throws IgniteCheckedException { if (!offheapEnabled && !swapEnabled) return null; return read(entry.key(), entry.key().valueBytes(cctx.cacheObjectContext()), entry.partition(), locked, - readOffheap, readSwap); + readOffheap, readSwap, valOnly); } /** @@ -805,7 +809,7 @@ public class GridCacheSwapManager extends GridCacheManagerAdapter { * @return Read value. * @throws IgniteCheckedException If read failed. */ - @Nullable public GridCacheSwapEntry read(KeyCacheObject key, + @Nullable public CacheObject readValue(KeyCacheObject key, boolean readOffheap, boolean readSwap) throws IgniteCheckedException @@ -815,7 +819,17 @@ public class GridCacheSwapManager extends GridCacheManagerAdapter { int part = cctx.affinity().partition(key); - return read(key, key.valueBytes(cctx.cacheObjectContext()), part, false, readOffheap, readSwap); + GridCacheSwapEntry swapEntry = read(key, + key.valueBytes(cctx.cacheObjectContext()), + part, + false, + readOffheap, + readSwap, + true); + + assert swapEntry == null || swapEntry.value() != null : swapEntry; + + return swapEntry != null ? swapEntry.value() : null; } /** @@ -865,7 +879,7 @@ public class GridCacheSwapManager extends GridCacheManagerAdapter { cctx.cache().metrics0().onOffHeapRemove(); } - entry = entryBytes == null ? null : swapEntry(unmarshalSwapEntry(entryBytes)); + entry = entryBytes == null ? null : swapEntry(unmarshalSwapEntry(entryBytes, false)); } return entry; @@ -972,7 +986,7 @@ public class GridCacheSwapManager extends GridCacheManagerAdapter { if (rmv != null) { try { - GridCacheSwapEntry entry = swapEntry(unmarshalSwapEntry(rmv)); + GridCacheSwapEntry entry = swapEntry(unmarshalSwapEntry(rmv, false)); if (entry == null) return; @@ -1078,7 +1092,7 @@ public class GridCacheSwapManager extends GridCacheManagerAdapter { Collection<GridCacheSwapListener> lsnrs = offheapLsnrs.get(part); if (lsnrs != null) { - GridCacheSwapEntry e = swapEntry(GridCacheSwapEntryImpl.unmarshal(entry)); + GridCacheSwapEntry e = swapEntry(GridCacheSwapEntryImpl.unmarshal(entry, false)); for (GridCacheSwapListener lsnr : lsnrs) lsnr.onEntryUnswapped(part, key, e); @@ -1132,7 +1146,7 @@ public class GridCacheSwapManager extends GridCacheManagerAdapter { cctx.cache().metrics0().onOffHeapRead(entryBytes != null); if (entryBytes != null) { - GridCacheSwapEntry entry = swapEntry(unmarshalSwapEntry(entryBytes)); + GridCacheSwapEntry entry = swapEntry(unmarshalSwapEntry(entryBytes, false)); if (entry != null) { cctx.queries().onUnswap(key, entry.value()); @@ -1165,7 +1179,7 @@ public class GridCacheSwapManager extends GridCacheManagerAdapter { if (entryBytes == null) return false; - GridCacheSwapEntry entry = swapEntry(unmarshalSwapEntry(entryBytes)); + GridCacheSwapEntry entry = swapEntry(unmarshalSwapEntry(entryBytes, true)); if (entry == null) return false; @@ -2063,7 +2077,7 @@ public class GridCacheSwapManager extends GridCacheManagerAdapter { try { for (Map.Entry<byte[], byte[]> e : iter) { try { - GridCacheSwapEntry swapEntry = unmarshalSwapEntry(e.getValue()); + GridCacheSwapEntry swapEntry = unmarshalSwapEntry(e.getValue(), false); IgniteUuid valLdrId = swapEntry.valueClassLoaderId(); @@ -2120,10 +2134,11 @@ public class GridCacheSwapManager extends GridCacheManagerAdapter { /** * @param bytes Bytes to unmarshal. + * @param valOnly If {@code true} unmarshalls only value. * @return Unmarshalled entry. */ - private GridCacheSwapEntry unmarshalSwapEntry(byte[] bytes) { - return GridCacheSwapEntryImpl.unmarshal(bytes); + private GridCacheSwapEntry unmarshalSwapEntry(byte[] bytes, boolean valOnly) { + return GridCacheSwapEntryImpl.unmarshal(bytes, valOnly); } /** @@ -2169,7 +2184,7 @@ public class GridCacheSwapManager extends GridCacheManagerAdapter { @Override protected Map.Entry<byte[], GridCacheSwapEntry> onNext() throws IgniteCheckedException { Map.Entry<byte[], byte[]> e = iter.nextX(); - GridCacheSwapEntry unmarshalled = unmarshalSwapEntry(e.getValue()); + GridCacheSwapEntry unmarshalled = unmarshalSwapEntry(e.getValue(), false); return F.t(e.getKey(), swapEntry(unmarshalled)); } @@ -2446,6 +2461,9 @@ public class GridCacheSwapManager extends GridCacheManagerAdapter { abstract protected GridCloseableIterator<T1> partitionIterator(int part) throws IgniteCheckedException; } + /** + * + */ private class GridVersionedMapEntry<K,V> implements Map.Entry<K,V>, GridCacheVersionAware { /** */ private Map.Entry<byte[], byte[]> entry; @@ -2474,7 +2492,7 @@ public class GridCacheSwapManager extends GridCacheManagerAdapter { /** {@inheritDoc} */ @Override public V getValue() { try { - GridCacheSwapEntry e = unmarshalSwapEntry(entry.getValue()); + GridCacheSwapEntry e = unmarshalSwapEntry(entry.getValue(), false); swapEntry(e); @@ -2487,7 +2505,7 @@ public class GridCacheSwapManager extends GridCacheManagerAdapter { /** {@inheritDoc} */ @Override public GridCacheVersion version() { - GridCacheSwapEntry e = unmarshalSwapEntry(entry.getValue()); + GridCacheSwapEntry e = unmarshalSwapEntry(entry.getValue(), false); return e.version(); } http://git-wip-us.apache.org/repos/asf/ignite/blob/72c3eef2/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/IgniteH2Indexing.java ---------------------------------------------------------------------- diff --git a/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/IgniteH2Indexing.java b/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/IgniteH2Indexing.java index 2af1386..8595187 100644 --- a/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/IgniteH2Indexing.java +++ b/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/IgniteH2Indexing.java @@ -71,7 +71,6 @@ import org.apache.ignite.internal.processors.cache.CacheObjectContext; import org.apache.ignite.internal.processors.cache.GridCacheAdapter; import org.apache.ignite.internal.processors.cache.GridCacheAffinityManager; import org.apache.ignite.internal.processors.cache.GridCacheContext; -import org.apache.ignite.internal.processors.cache.GridCacheSwapEntry; import org.apache.ignite.internal.processors.cache.IgniteInternalCache; import org.apache.ignite.internal.processors.cache.QueryCursorImpl; import org.apache.ignite.internal.processors.cache.query.GridCacheTwoStepQuery; @@ -2108,6 +2107,9 @@ public class IgniteH2Indexing implements GridQueryIndexing { /** */ private final GridUnsafeGuard guard; + /** */ + private final boolean preferSwapVal; + /** * @param type Type descriptor. * @param schema Schema. @@ -2136,6 +2138,8 @@ public class IgniteH2Indexing implements GridQueryIndexing { keyType = DataType.getTypeFromClass(type.keyClass()); valType = DataType.getTypeFromClass(type.valueClass()); + + preferSwapVal = schema.ccfg.getMemoryMode() == CacheMemoryMode.OFFHEAP_TIERED; } /** {@inheritDoc} */ @@ -2263,15 +2267,11 @@ public class IgniteH2Indexing implements GridQueryIndexing { if (cctx.isNear()) cctx = cctx.near().dht().context(); - GridCacheSwapEntry e = cctx.swap().read(cctx.toCacheKeyObject(key), true, true); + CacheObject v = cctx.swap().readValue(cctx.toCacheKeyObject(key), true, true); - if (e == null) + if (v == null) return null; - CacheObject v = e.value(); - - assert v != null : "swap must unmarshall it for us"; - return v.value(cctx.cacheObjectContext(), false); } @@ -2312,5 +2312,10 @@ public class IgniteH2Indexing implements GridQueryIndexing { return new GridH2KeyValueRowOffheap(this, ptr); } + + /** {@inheritDoc} */ + @Override public boolean preferSwapValue() { + return preferSwapVal; + } } } \ No newline at end of file http://git-wip-us.apache.org/repos/asf/ignite/blob/72c3eef2/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/opt/GridH2AbstractKeyValueRow.java ---------------------------------------------------------------------- diff --git a/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/opt/GridH2AbstractKeyValueRow.java b/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/opt/GridH2AbstractKeyValueRow.java index 4a16284..c11f541 100644 --- a/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/opt/GridH2AbstractKeyValueRow.java +++ b/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/opt/GridH2AbstractKeyValueRow.java @@ -130,20 +130,23 @@ public abstract class GridH2AbstractKeyValueRow extends GridH2Row { /** * Atomically updates weak value. * - * @param upd New value. - * @return {@code null} If update succeeded, unexpected value otherwise. + * @param valObj New value. + * @return New value if old value is empty, old value otherwise. + * @throws IgniteCheckedException If failed. */ - protected synchronized Value updateWeakValue(Value upd) { + protected synchronized Value updateWeakValue(Object valObj) throws IgniteCheckedException { Value res = peekValue(VAL_COL); if (res != null && !(res instanceof WeakValue)) return res; + Value upd = desc.wrap(valObj, desc.valueType()); + setValue(VAL_COL, new WeakValue(upd)); notifyAll(); - return null; + return upd; } /** @@ -188,21 +191,23 @@ public abstract class GridH2AbstractKeyValueRow extends GridH2Row { Value v; if (col == VAL_COL) { - v = syncValue(0); + v = peekValue(VAL_COL); long start = 0; int attempt = 0; while ((v = WeakValue.unwrap(v)) == null) { - v = getOffheapValue(VAL_COL); + if (!desc.preferSwapValue()) { + v = getOffheapValue(VAL_COL); - if (v != null) { - setValue(VAL_COL, v); + if (v != null) { + setValue(VAL_COL, v); - if (peekValue(KEY_COL) == null) - cache(); + if (peekValue(KEY_COL) == null) + cache(); - return v; + return v; + } } Object k = getValue(KEY_COL).getObject(); @@ -213,16 +218,24 @@ public abstract class GridH2AbstractKeyValueRow extends GridH2Row { if (valObj != null) { // Even if we've found valObj in swap, it is may be some new value, // while the needed value was already unswapped, so we have to recheck it. - if ((v = WeakValue.unwrap(syncValue(0))) == null && (v = getOffheapValue(VAL_COL)) == null) { - Value upd = desc.wrap(valObj, desc.valueType()); - - v = updateWeakValue(upd); - - return v == null ? upd : v; - } + if ((v = getOffheapValue(VAL_COL)) == null) + return updateWeakValue(valObj); } else { // If nothing found in swap then we should be already unswapped. + if (desc.preferSwapValue()) { + v = getOffheapValue(VAL_COL); + + if (v != null) { + setValue(VAL_COL, v); + + if (peekValue(KEY_COL) == null) + cache(); + + return v; + } + } + v = syncValue(attempt); } } http://git-wip-us.apache.org/repos/asf/ignite/blob/72c3eef2/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/opt/GridH2KeyValueRowOffheap.java ---------------------------------------------------------------------- diff --git a/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/opt/GridH2KeyValueRowOffheap.java b/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/opt/GridH2KeyValueRowOffheap.java index de31fe1..2dd9f25 100644 --- a/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/opt/GridH2KeyValueRowOffheap.java +++ b/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/opt/GridH2KeyValueRowOffheap.java @@ -216,12 +216,19 @@ public class GridH2KeyValueRowOffheap extends GridH2AbstractKeyValueRow { /** {@inheritDoc} */ @SuppressWarnings("NonSynchronizedMethodOverridesSynchronizedMethod") - @Override protected synchronized Value updateWeakValue(Value upd) { + @Override protected synchronized Value updateWeakValue(Object valObj) throws IgniteCheckedException { + Value val = peekValue(VAL_COL); + + if (val != null) + return val; + + Value upd = desc.wrap(valObj, desc.valueType()); + setValue(VAL_COL, upd); notifyAll(); - return null; + return upd; } /** {@inheritDoc} */ http://git-wip-us.apache.org/repos/asf/ignite/blob/72c3eef2/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/opt/GridH2RowDescriptor.java ---------------------------------------------------------------------- diff --git a/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/opt/GridH2RowDescriptor.java b/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/opt/GridH2RowDescriptor.java index 0edd102..ed3ff7a 100644 --- a/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/opt/GridH2RowDescriptor.java +++ b/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/opt/GridH2RowDescriptor.java @@ -111,4 +111,9 @@ public interface GridH2RowDescriptor extends GridOffHeapSmartPointerFactory<Grid * @throws IgniteCheckedException If failed. */ public Value wrap(Object o, int type) throws IgniteCheckedException; + + /** + * @return {@code True} if should check swap value before offheap. + */ + public boolean preferSwapValue(); } \ No newline at end of file http://git-wip-us.apache.org/repos/asf/ignite/blob/72c3eef2/modules/indexing/src/test/java/org/apache/ignite/internal/processors/cache/CacheIndexStreamerTest.java ---------------------------------------------------------------------- diff --git a/modules/indexing/src/test/java/org/apache/ignite/internal/processors/cache/CacheIndexStreamerTest.java b/modules/indexing/src/test/java/org/apache/ignite/internal/processors/cache/CacheIndexStreamerTest.java index 23f4e91..e6bf22b 100644 --- a/modules/indexing/src/test/java/org/apache/ignite/internal/processors/cache/CacheIndexStreamerTest.java +++ b/modules/indexing/src/test/java/org/apache/ignite/internal/processors/cache/CacheIndexStreamerTest.java @@ -23,7 +23,7 @@ import java.util.concurrent.atomic.AtomicBoolean; import org.apache.ignite.Ignite; import org.apache.ignite.IgniteCache; import org.apache.ignite.IgniteDataStreamer; -import org.apache.ignite.cache.CacheMemoryMode; +import org.apache.ignite.cache.CacheAtomicityMode; import org.apache.ignite.configuration.CacheConfiguration; import org.apache.ignite.configuration.IgniteConfiguration; import org.apache.ignite.internal.IgniteInternalFuture; @@ -36,6 +36,8 @@ import org.apache.ignite.testframework.GridTestUtils; import org.apache.ignite.testframework.junits.common.GridCommonAbstractTest; import static org.apache.ignite.cache.CacheAtomicityMode.ATOMIC; +import static org.apache.ignite.cache.CacheAtomicityMode.TRANSACTIONAL; +import static org.apache.ignite.cache.CacheMemoryMode.OFFHEAP_TIERED; import static org.apache.ignite.cache.CacheWriteSynchronizationMode.FULL_SYNC; /** @@ -45,7 +47,6 @@ public class CacheIndexStreamerTest extends GridCommonAbstractTest { /** */ private final TcpDiscoveryIpFinder ipFinder = new TcpDiscoveryVmIpFinder(true); - /** {@inheritDoc} */ @Override protected IgniteConfiguration getConfiguration(String gridName) throws Exception { IgniteConfiguration cfg = super.getConfiguration(gridName); @@ -60,14 +61,29 @@ public class CacheIndexStreamerTest extends GridCommonAbstractTest { /** * @throws Exception If failed. */ - public void testStreamer() throws Exception { + public void testStreamerAtomic() throws Exception { + checkStreamer(ATOMIC); + } + + /** + * @throws Exception If failed. + */ + public void testStreamerTx() throws Exception { + checkStreamer(TRANSACTIONAL); + } + + /** + * @param atomicityMode Cache atomicity mode. + * @throws Exception If failed. + */ + public void checkStreamer(CacheAtomicityMode atomicityMode) throws Exception { final Ignite ignite = startGrid(0); - final IgniteCache<Integer, String> cache = ignite.createCache(cacheConfiguration()); + final IgniteCache<Integer, String> cache = ignite.createCache(cacheConfiguration(atomicityMode)); final AtomicBoolean stop = new AtomicBoolean(); - final int KEYS= 10_000; + final int KEYS = 10_000; try { IgniteInternalFuture streamerFut = GridTestUtils.runAsync(new Callable() { @@ -118,14 +134,15 @@ public class CacheIndexStreamerTest extends GridCommonAbstractTest { } /** + * @param atomicityMode Cache atomicity mode. * @return Cache configuration. */ - private CacheConfiguration cacheConfiguration() { + private CacheConfiguration cacheConfiguration(CacheAtomicityMode atomicityMode) { CacheConfiguration ccfg = new CacheConfiguration(); - ccfg.setAtomicityMode(ATOMIC); + ccfg.setAtomicityMode(atomicityMode); ccfg.setWriteSynchronizationMode(FULL_SYNC); - ccfg.setMemoryMode(CacheMemoryMode.OFFHEAP_TIERED); + ccfg.setMemoryMode(OFFHEAP_TIERED); ccfg.setOffHeapMaxMemory(0); ccfg.setBackups(1); ccfg.setIndexedTypes(Integer.class, String.class); http://git-wip-us.apache.org/repos/asf/ignite/blob/72c3eef2/modules/indexing/src/test/java/org/apache/ignite/internal/processors/cache/GridCacheSwapSelfTest.java ---------------------------------------------------------------------- diff --git a/modules/indexing/src/test/java/org/apache/ignite/internal/processors/cache/GridCacheSwapSelfTest.java b/modules/indexing/src/test/java/org/apache/ignite/internal/processors/cache/GridCacheSwapSelfTest.java index e0e6ff0..cd1fc93 100644 --- a/modules/indexing/src/test/java/org/apache/ignite/internal/processors/cache/GridCacheSwapSelfTest.java +++ b/modules/indexing/src/test/java/org/apache/ignite/internal/processors/cache/GridCacheSwapSelfTest.java @@ -244,12 +244,12 @@ public class GridCacheSwapSelfTest extends GridCommonAbstractTest { } /** - * TODO: IGNITE-599. - * * @throws Exception If failed. */ public void testSwapEviction() throws Exception { try { + fail("https://issues.apache.org/jira/browse/IGNITE-599"); + final CountDownLatch evicted = new CountDownLatch(10); startGrids(1); http://git-wip-us.apache.org/repos/asf/ignite/blob/72c3eef2/modules/indexing/src/test/java/org/apache/ignite/testsuites/IgniteCacheWithIndexingTestSuite.java ---------------------------------------------------------------------- diff --git a/modules/indexing/src/test/java/org/apache/ignite/testsuites/IgniteCacheWithIndexingTestSuite.java b/modules/indexing/src/test/java/org/apache/ignite/testsuites/IgniteCacheWithIndexingTestSuite.java index f30f70e..550c69f 100644 --- a/modules/indexing/src/test/java/org/apache/ignite/testsuites/IgniteCacheWithIndexingTestSuite.java +++ b/modules/indexing/src/test/java/org/apache/ignite/testsuites/IgniteCacheWithIndexingTestSuite.java @@ -19,6 +19,7 @@ package org.apache.ignite.testsuites; import junit.framework.TestSuite; import org.apache.ignite.internal.processors.cache.CacheConfigurationP2PTest; +import org.apache.ignite.internal.processors.cache.CacheIndexStreamerTest; import org.apache.ignite.internal.processors.cache.GridCacheOffHeapAndSwapSelfTest; import org.apache.ignite.internal.processors.cache.GridCacheOffHeapSelfTest; import org.apache.ignite.internal.processors.cache.GridCacheOffheapIndexEntryEvictTest; @@ -63,6 +64,7 @@ public class IgniteCacheWithIndexingTestSuite extends TestSuite { suite.addTestSuite(GridCacheOffheapIndexGetSelfTest.class); suite.addTestSuite(GridCacheOffheapIndexEntryEvictTest.class); + suite.addTestSuite(CacheIndexStreamerTest.class); suite.addTestSuite(CacheConfigurationP2PTest.class);