http://git-wip-us.apache.org/repos/asf/ignite/blob/a1e5cc57/modules/core/src/test/java/org/apache/ignite/testframework/junits/multijvm/IgniteCacheProcessProxy.java ---------------------------------------------------------------------- diff --git a/modules/core/src/test/java/org/apache/ignite/testframework/junits/multijvm/IgniteCacheProcessProxy.java b/modules/core/src/test/java/org/apache/ignite/testframework/junits/multijvm/IgniteCacheProcessProxy.java index ac8c5af..d89e397 100644 --- a/modules/core/src/test/java/org/apache/ignite/testframework/junits/multijvm/IgniteCacheProcessProxy.java +++ b/modules/core/src/test/java/org/apache/ignite/testframework/junits/multijvm/IgniteCacheProcessProxy.java @@ -22,7 +22,6 @@ import java.util.Collection; import java.util.Iterator; import java.util.Map; import java.util.Set; -import java.util.UUID; import java.util.concurrent.locks.Lock; import javax.cache.CacheException; import javax.cache.CacheManager; @@ -35,7 +34,6 @@ import javax.cache.processor.EntryProcessorResult; import org.apache.ignite.Ignite; import org.apache.ignite.IgniteCache; import org.apache.ignite.IgniteCompute; -import org.apache.ignite.Ignition; import org.apache.ignite.cache.CacheEntryProcessor; import org.apache.ignite.cache.CacheMetrics; import org.apache.ignite.cache.CachePeekMode; @@ -48,8 +46,8 @@ import org.apache.ignite.internal.util.future.IgniteFinishedFutureImpl; import org.apache.ignite.lang.IgniteBiPredicate; import org.apache.ignite.lang.IgniteCallable; import org.apache.ignite.lang.IgniteFuture; -import org.apache.ignite.lang.IgniteRunnable; import org.apache.ignite.mxbean.CacheMetricsMXBean; +import org.apache.ignite.resources.IgniteInstanceResource; import org.jetbrains.annotations.Nullable; /** @@ -63,9 +61,6 @@ public class IgniteCacheProcessProxy<K, V> implements IgniteCache<K, V> { /** Cache name. */ private final String cacheName; - /** Grid id. */ - private final UUID gridId; - /** With async. */ private final boolean isAsync; @@ -82,31 +77,16 @@ public class IgniteCacheProcessProxy<K, V> implements IgniteCache<K, V> { /** * @param name Name. - * @param async + * @param async Async flag. * @param proxy Ignite Process Proxy. */ public IgniteCacheProcessProxy(String name, boolean async, IgniteProcessProxy proxy) { cacheName = name; isAsync = async; - gridId = proxy.getId(); igniteProxy = proxy; compute = proxy.remoteCompute(); } - /** - * Returns cache instance. Method to be called from closure at another JVM. - * - * @return Cache. - */ - private IgniteCache<Object, Object> cache() { - IgniteCache cache = Ignition.ignite(gridId).cache(cacheName); - - if (isAsync) - cache = cache.withAsync(); - - return cache; - } - /** {@inheritDoc} */ @Override public IgniteCache<K, V> withAsync() { return new IgniteCacheProcessProxy<>(cacheName, true, igniteProxy); @@ -124,14 +104,8 @@ public class IgniteCacheProcessProxy<K, V> implements IgniteCache<K, V> { } /** {@inheritDoc} */ - @Override public <C extends Configuration<K, V>> C getConfiguration(final Class<C> clazz) { - final Class cl = clazz; - - return (C)compute.call(new IgniteCallable<Object>() { - @Override public Object call() throws Exception { - return cache().getConfiguration(cl); - } - }); + @Override public <C extends Configuration<K, V>> C getConfiguration(Class<C> clazz) { + return compute.call(new GetConfigurationTask<>(cacheName, isAsync, clazz)); } /** {@inheritDoc} */ @@ -149,33 +123,26 @@ public class IgniteCacheProcessProxy<K, V> implements IgniteCache<K, V> { throw new UnsupportedOperationException("Method should be supported."); } + /** {@inheritDoc} */ @Override public IgniteCache<K, V> withNoRetries() { throw new UnsupportedOperationException("Method should be supported."); } /** {@inheritDoc} */ - @Override public void loadCache(@Nullable IgniteBiPredicate<K, V> p, @Nullable Object... args) throws CacheException { + @Override public void loadCache(@Nullable IgniteBiPredicate<K, V> p, @Nullable Object... args) + throws CacheException { throw new UnsupportedOperationException("Method should be supported."); } /** {@inheritDoc} */ - @Override public void localLoadCache(@Nullable final IgniteBiPredicate<K, V> p, @Nullable final Object... args) throws CacheException { - final IgniteBiPredicate pCopy = p; - - compute.run(new IgniteRunnable() { - @Override public void run() { - cache().localLoadCache(pCopy, args); - } - }); + @Override public void localLoadCache(@Nullable IgniteBiPredicate<K, V> p, @Nullable Object... args) + throws CacheException { + compute.call(new LocalLoadCacheTask<>(cacheName, isAsync, p, args)); } /** {@inheritDoc} */ - @Override public V getAndPutIfAbsent(final K key, final V val) throws CacheException { - return (V)compute.call(new IgniteCallable<Object>() { - @Override public Object call() throws Exception { - return cache().getAndPutIfAbsent(key, val); - } - }); + @Override public V getAndPutIfAbsent(K key, V val) throws CacheException { + return compute.call(new GetAndPutIfAbsentTask<>(cacheName, isAsync, key, val)); } /** {@inheritDoc} */ @@ -189,12 +156,8 @@ public class IgniteCacheProcessProxy<K, V> implements IgniteCache<K, V> { } /** {@inheritDoc} */ - @Override public boolean isLocalLocked(final K key, final boolean byCurrThread) { - return compute.call(new IgniteCallable<Boolean>() { - @Override public Boolean call() throws Exception { - return cache().isLocalLocked(key, byCurrThread); - } - }); + @Override public boolean isLocalLocked(K key, boolean byCurrThread) { + return compute.call(new IsLocalLockedTask<>(cacheName, isAsync, key, byCurrThread)); } /** {@inheritDoc} */ @@ -203,18 +166,8 @@ public class IgniteCacheProcessProxy<K, V> implements IgniteCache<K, V> { } /** {@inheritDoc} */ - @SuppressWarnings("unchecked") - @Override public Iterable<Entry<K, V>> localEntries(final CachePeekMode... peekModes) throws CacheException { - return (Iterable<Entry<K, V>>)compute.call(new IgniteCallable<Object>() { - @Override public Object call() throws Exception { - Collection<Entry> res = new ArrayList<>(); - - for (Entry e : cache().localEntries(peekModes)) - res.add(new CacheEntryImpl(e.getKey(), e.getValue())); - - return res; - } - }); + @Override public Iterable<Entry<K, V>> localEntries(CachePeekMode... peekModes) throws CacheException { + return compute.call(new LocalEntriesTask<K, V>(cacheName, isAsync, peekModes)); } /** {@inheritDoc} */ @@ -223,21 +176,13 @@ public class IgniteCacheProcessProxy<K, V> implements IgniteCache<K, V> { } /** {@inheritDoc} */ - @Override public void localEvict(final Collection<? extends K> keys) { - compute.run(new IgniteRunnable() { - @Override public void run() { - cache().localEvict(keys); - } - }); + @Override public void localEvict(Collection<? extends K> keys) { + compute.call(new LocalEvictTask<>(cacheName, isAsync, keys)); } /** {@inheritDoc} */ - @Override public V localPeek(final K key, final CachePeekMode... peekModes) { - return (V)compute.call(new IgniteCallable<Object>() { - @Override public Object call() throws Exception { - return cache().localPeek(key, peekModes); - } - }); + @Override public V localPeek(K key, CachePeekMode... peekModes) { + return compute.call(new LocalPeekTask<K, V>(cacheName, isAsync, key, peekModes)); } /** {@inheritDoc} */ @@ -246,274 +191,160 @@ public class IgniteCacheProcessProxy<K, V> implements IgniteCache<K, V> { } /** {@inheritDoc} */ - @Override public int size(final CachePeekMode... peekModes) throws CacheException { - return (int)compute.call(new IgniteCallable<Object>() { - @Override public Object call() throws Exception { - return cache().size(peekModes); - } - }); + @Override public int size(CachePeekMode... peekModes) throws CacheException { + return compute.call(new SizeTask(cacheName, isAsync, peekModes, false)); } /** {@inheritDoc} */ - @Override public int localSize(final CachePeekMode... peekModes) { - return (int)compute.call(new IgniteCallable<Object>() { - @Override public Object call() throws Exception { - return cache().localSize(peekModes); - } - }); + @Override public int localSize(CachePeekMode... peekModes) { + return compute.call(new SizeTask(cacheName, isAsync, peekModes, true)); } /** {@inheritDoc} */ - @Override public <T> Map<K, EntryProcessorResult<T>> invokeAll(Map<? extends K, ? extends EntryProcessor<K, V, T>> map, - Object... args) { + @Override public <T> Map<K, EntryProcessorResult<T>> invokeAll( + Map<? extends K, ? extends EntryProcessor<K, V, T>> map, + Object... args) + { throw new UnsupportedOperationException("Method should be supported."); } /** {@inheritDoc} */ - @Override public V get(final K key) { - return (V)compute.call(new IgniteCallable<Object>() { - @Override public Object call() throws Exception { - return cache().get(key); - } - }); + @Override public V get(K key) { + return compute.call(new GetTask<K, V>(cacheName, isAsync, key)); } /** {@inheritDoc} */ - @Override public Map<K, V> getAll(final Set<? extends K> keys) { - return (Map<K, V>)compute.call(new IgniteCallable<Object>() { - @Override public Object call() throws Exception { - return cache().getAll(keys); - } - }); + @Override public Map<K, V> getAll(Set<? extends K> keys) { + return compute.call(new GetAllTask<K, V>(cacheName, isAsync, keys)); } - @Override public Map<K, V> getAllOutTx(final Set<? extends K> keys) { - return (Map<K, V>)compute.call(new IgniteCallable<Object>() { - @Override public Object call() throws Exception { - return cache().getAllOutTx(keys); - } - }); + /** {@inheritDoc} */ + @Override public Map<K, V> getAllOutTx(Set<? extends K> keys) { + return compute.call(new GetAllOutTxTask<K, V>(cacheName, isAsync, keys)); } /** {@inheritDoc} */ - @Override public boolean containsKey(final K key) { - return (boolean)compute.call(new IgniteCallable<Object>() { - @Override public Object call() throws Exception { - return cache().containsKey(key); - } - }); + @Override public boolean containsKey(K key) { + return compute.call(new ContainsKeyTask<>(cacheName, isAsync, key)); } /** {@inheritDoc} */ - @Override public void loadAll(Set<? extends K> keys, boolean replaceExistingValues, CompletionListener completionLsnr) { + @Override public void loadAll(Set<? extends K> keys, boolean replaceExistVals, CompletionListener completionLsnr) { throw new UnsupportedOperationException("Oparetion can't be supported automatically."); } /** {@inheritDoc} */ - @Override public boolean containsKeys(final Set<? extends K> keys) { - return (boolean)compute.call(new IgniteCallable<Object>() { - @Override public Object call() throws Exception { - return cache().containsKeys(keys); - } - }); + @Override public boolean containsKeys(Set<? extends K> keys) { + return compute.call(new ContainsKeysTask<>(cacheName, isAsync, keys)); } /** {@inheritDoc} */ - @Override public void put(final K key, final V val) {; - compute.run(new IgniteRunnable() { - @Override public void run() { - cache().put(key, val); - } - }); + @Override public void put(K key, V val) { + compute.call(new PutTask<>(cacheName, isAsync, key, val)); } /** {@inheritDoc} */ - @Override public V getAndPut(final K key, final V val) { - return (V)compute.call(new IgniteCallable<Object>() { - @Override public Object call() throws Exception { - return cache().getAndPut(key, val); - } - }); + @Override public V getAndPut(K key, V val) { + return compute.call(new GetAndPutTask<>(cacheName, isAsync, key, val)); } /** {@inheritDoc} */ - @Override public void putAll(final Map<? extends K, ? extends V> map) { - compute.run(new IgniteRunnable() { - @Override public void run() { - cache().putAll(map); - } - }); + @Override public void putAll(Map<? extends K, ? extends V> map) { + compute.call(new PutAllTask<>(cacheName, isAsync, map)); } /** {@inheritDoc} */ - @Override public boolean putIfAbsent(final K key, final V val) { - return (boolean)compute.call(new IgniteCallable<Object>() { - @Override public Object call() throws Exception { - return cache().putIfAbsent(key, val); - } - }); + @Override public boolean putIfAbsent(K key, V val) { + return compute.call(new PutIfAbsentTask<>(cacheName, isAsync, key, val)); } /** {@inheritDoc} */ - @Override public boolean remove(final K key) { - return (boolean)compute.call(new IgniteCallable<Object>() { - @Override public Object call() throws Exception { - return cache().remove(key); - } - }); + @Override public boolean remove(K key) { + return compute.call(new RemoveTask<>(cacheName, isAsync, key)); } /** {@inheritDoc} */ - @Override public boolean remove(final K key, final V oldVal) { - return (boolean)compute.call(new IgniteCallable<Object>() { - @Override public Object call() throws Exception { - return cache().remove(key, oldVal); - } - }); + @Override public boolean remove(K key, V oldVal) { + return compute.call(new RemoveIfExistsTask<>(cacheName, isAsync, key, oldVal)); } /** {@inheritDoc} */ - @Override public V getAndRemove(final K key) { - return (V)compute.call(new IgniteCallable<Object>() { - @Override public Object call() throws Exception { - return cache().getAndRemove(key); - } - }); + @Override public V getAndRemove(K key) { + return compute.call(new GetAndRemoveTask<K, V>(cacheName, isAsync, key)); } /** {@inheritDoc} */ - @Override public boolean replace(final K key, final V oldVal, final V newVal) { - return (boolean)compute.call(new IgniteCallable<Object>() { - @Override public Object call() throws Exception { - return cache().replace(key, oldVal, newVal); - } - }); + @Override public boolean replace(K key, V oldVal, V newVal) { + return compute.call(new ReplaceIfExistsTask<>(cacheName, isAsync, key, oldVal, newVal)); } /** {@inheritDoc} */ - @Override public boolean replace(final K key, final V val) { - return (boolean)compute.call(new IgniteCallable<Object>() { - @Override public Object call() throws Exception { - return cache().replace(key, val); - } - }); + @Override public boolean replace(K key, V val) { + return compute.call(new ReplaceTask<>(cacheName, isAsync, key, val)); } /** {@inheritDoc} */ - @Override public V getAndReplace(final K key, final V val) { - return (V)compute.call(new IgniteCallable<Object>() { - @Override public Object call() throws Exception { - return cache().getAndReplace(key, val); - } - }); + @Override public V getAndReplace(K key, V val) { + return compute.call(new GetAndReplaceTask<>(cacheName, isAsync, key, val)); } /** {@inheritDoc} */ - @Override public void removeAll(final Set<? extends K> keys) { - compute.run(new IgniteRunnable() { - @Override public void run() { - cache().removeAll(keys); - } - }); + @Override public void removeAll(Set<? extends K> keys) { + compute.call(new RemoveAllKeysTask<>(cacheName, isAsync, keys)); } /** {@inheritDoc} */ @Override public void removeAll() { - compute.run(new IgniteRunnable() { - @Override public void run() { - IgniteCache<Object, Object> cache = cache(); - - cache.removeAll(); - - if (isAsync) - cache.future().get(); - } - }); + compute.call(new RemoveAllTask<K, V>(cacheName, isAsync)); } /** {@inheritDoc} */ @Override public void clear() { - compute.run(new IgniteRunnable() { - @Override public void run() { - cache().clear(); - } - }); + compute.call(new ClearTask(cacheName, isAsync)); } /** {@inheritDoc} */ - @Override public void clear(final K key) { - compute.run(new IgniteRunnable() { - @Override public void run() { - cache().clear(key); - } - }); + @Override public void clear(K key) { + compute.call(new ClearKeyTask<>(cacheName, isAsync, false, key)); } /** {@inheritDoc} */ - @Override public void clearAll(final Set<? extends K> keys) { - compute.run(new IgniteRunnable() { - @Override public void run() { - cache().clearAll(keys); - } - }); + @Override public void clearAll(Set<? extends K> keys) { + compute.call(new ClearAllKeys<>(cacheName, isAsync, false, keys)); } /** {@inheritDoc} */ - @Override public void localClear(final K key) { - compute.run(new IgniteRunnable() { - @Override public void run() { - cache().localClear(key); - } - }); + @Override public void localClear(K key) { + compute.call(new ClearKeyTask<>(cacheName, isAsync, true, key)); } /** {@inheritDoc} */ - @Override public void localClearAll(final Set<? extends K> keys) { - compute.run(new IgniteRunnable() { - @Override public void run() { - cache().localClearAll(keys); - } - }); + @Override public void localClearAll(Set<? extends K> keys) { + compute.call(new ClearAllKeys<>(cacheName, isAsync, true, keys)); } /** {@inheritDoc} */ - @Override public <T> T invoke(final K key, final EntryProcessor<K, V, T> entryProcessor, final Object... arguments) { - return (T)compute.call(new IgniteCallable<Object>() { - @Override public Object call() throws Exception { - return cache().invoke(key, - (EntryProcessor<Object, Object, Object>)entryProcessor, arguments); - } - }); + @Override public <T> T invoke(K key, EntryProcessor<K, V, T> processor, Object... args) { + return compute.call(new InvokeTask<>(cacheName, isAsync, key, processor, args)); } /** {@inheritDoc} */ - @Override public <T> T invoke(final K key, final CacheEntryProcessor<K, V, T> entryProcessor, final Object... arguments) { - return (T)compute.call(new IgniteCallable<Object>() { - @Override public Object call() throws Exception { - return cache().invoke(key, - (CacheEntryProcessor<Object, Object, Object>)entryProcessor, arguments); - } - }); + @Override public <T> T invoke(K key, CacheEntryProcessor<K, V, T> processor, Object... args) { + return compute.call(new InvokeTask<>(cacheName, isAsync, key, processor, args)); } /** {@inheritDoc} */ - @Override public <T> Map<K, EntryProcessorResult<T>> invokeAll(final Set<? extends K> keys, final EntryProcessor<K, V, T> entryProcessor, - final Object... args) { - return (Map<K, EntryProcessorResult<T>>)compute.call(new IgniteCallable<Object>() { - @Override public Object call() throws Exception { - return cache().invokeAll(keys, - (EntryProcessor<Object, Object, Object>)entryProcessor, args); - } - }); + @Override public <T> Map<K, EntryProcessorResult<T>> invokeAll( + Set<? extends K> keys, + EntryProcessor<K, V, T> processor, + Object... args) + { + return compute.call(new InvokeAllTask<>(cacheName, isAsync, keys, processor, args)); } /** {@inheritDoc} */ @Override public String getName() { - return (String)compute.call(new IgniteCallable<Object>() { - @Override public Object call() throws Exception { - return cache().getName(); - } - }); + return compute.call(new GetNameTask(cacheName, isAsync)); } /** {@inheritDoc} */ @@ -523,72 +354,47 @@ public class IgniteCacheProcessProxy<K, V> implements IgniteCache<K, V> { /** {@inheritDoc} */ @Override public void close() { - compute.run(new IgniteRunnable() { - @Override public void run() { - cache().close(); - } - }); + compute.call(new CloseTask(cacheName, isAsync)); } /** {@inheritDoc} */ @Override public void destroy() { - compute.run(new IgniteRunnable() { - @Override public void run() { - cache().destroy(); - } - }); + compute.call(new DestroyTask(cacheName, isAsync)); } /** {@inheritDoc} */ @Override public boolean isClosed() { - return (boolean)compute.call(new IgniteCallable<Object>() { - @Override public Object call() throws Exception { - return cache().isClosed(); - } - }); + return compute.call(new IsClosedTask(cacheName, isAsync)); } /** {@inheritDoc} */ - @Override public <T> T unwrap(final Class<T> clazz) { + @SuppressWarnings("unchecked") + @Override public <T> T unwrap(Class<T> clazz) { if (Ignite.class.equals(clazz)) return (T)igniteProxy; try { - return (T)compute.call(new IgniteCallable<Object>() { - @Override public Object call() throws Exception { - return cache().unwrap(clazz); - } - }); + return compute.call(new UnwrapTask<>(cacheName, isAsync, clazz)); } catch (Exception e) { - throw new IllegalArgumentException("Looks like class " + clazz + " is unmarshallable. Exception type:" + e.getClass(), e); + throw new IllegalArgumentException("Looks like class " + clazz + + " is unmarshallable. Exception type:" + e.getClass(), e); } } /** {@inheritDoc} */ - @Override public void registerCacheEntryListener(CacheEntryListenerConfiguration<K, V> cacheEntryListenerConfiguration) { + @Override public void registerCacheEntryListener(CacheEntryListenerConfiguration<K, V> lsnrCfg) { throw new UnsupportedOperationException("Method should be supported."); } /** {@inheritDoc} */ - @Override public void deregisterCacheEntryListener(CacheEntryListenerConfiguration<K, V> cacheEntryListenerConfiguration) { + @Override public void deregisterCacheEntryListener(CacheEntryListenerConfiguration<K, V> lsnrCfg) { throw new UnsupportedOperationException("Method should be supported."); } /** {@inheritDoc} */ @Override public Iterator<Entry<K, V>> iterator() { - final Collection<Entry<K, V>> col = (Collection<Entry<K, V>>)compute.call(new IgniteCallable<Object>() { - @Override public Object call() throws Exception { - Collection res = new ArrayList(); - - for (Object o : cache()) - res.add(o); - - return res; - } - }); - - return col.iterator(); + return compute.call(new IteratorTask<K, V>(cacheName, isAsync)).iterator(); } /** {@inheritDoc} */ @@ -616,4 +422,968 @@ public class IgniteCacheProcessProxy<K, V> implements IgniteCache<K, V> { @Override public CacheMetricsMXBean mxBean() { throw new UnsupportedOperationException("Method should be supported."); } + + /** + * + */ + private static class GetConfigurationTask<K, V, C extends Configuration<K, V>> extends CacheTaskAdapter<K, V, C> { + /** Clazz. */ + private final Class<C> clazz; + + /** + * @param cacheName Cache name. + * @param async Async. + * @param clazz Clazz. + */ + public GetConfigurationTask(String cacheName, boolean async, Class<C> clazz) { + super(cacheName, async); + this.clazz = clazz; + } + + /** {@inheritDoc} */ + @Override public C call() throws Exception { + return cache().getConfiguration(clazz); + } + } + + /** + * + */ + private static class LocalLoadCacheTask<K, V> extends CacheTaskAdapter<K, V, Void> { + /** Predicate. */ + private final IgniteBiPredicate<K, V> p; + + /** Args. */ + private final Object[] args; + + /** + * @param cacheName Cache name. + * @param async Async. + * @param p P. + * @param args Args. + */ + public LocalLoadCacheTask(String cacheName, boolean async, IgniteBiPredicate<K, V> p, Object[] args) { + super(cacheName, async); + this.p = p; + this.args = args; + } + + /** {@inheritDoc} */ + @Override public Void call() { + cache().localLoadCache(p, args); + + return null; + } + } + + /** + * + */ + private static class GetAndPutIfAbsentTask<K, V> extends CacheTaskAdapter<K, V, V> { + /** Key. */ + private final K key; + + /** Value. */ + private final V val; + + /** + * @param cacheName Cache name. + * @param async Async. + * @param key Key. + * @param val Value. + */ + public GetAndPutIfAbsentTask(String cacheName, boolean async, K key, V val) { + super(cacheName, async); + this.key = key; + this.val = val; + } + + /** {@inheritDoc} */ + @Override public V call() throws Exception { + return cache().getAndPutIfAbsent(key, val); + } + } + + /** + * + */ + private static class IsLocalLockedTask<K> extends CacheTaskAdapter<K, Void, Boolean> { + /** Key. */ + private final K key; + + /** By current thread. */ + private final boolean byCurrThread; + + /** + * @param cacheName Cache name. + * @param async Async. + * @param key Key. + * @param byCurrThread By current thread. + */ + public IsLocalLockedTask(String cacheName, boolean async, K key, boolean byCurrThread) { + super(cacheName, async); + this.key = key; + this.byCurrThread = byCurrThread; + } + + /** {@inheritDoc} */ + @Override public Boolean call() throws Exception { + return cache().isLocalLocked(key, byCurrThread); + } + } + + /** + * + */ + private static class LocalEntriesTask<K, V> extends CacheTaskAdapter<K, V, Iterable<Entry<K, V>>> { + /** Peek modes. */ + private final CachePeekMode[] peekModes; + + /** + * @param cacheName Cache name. + * @param async Async. + * @param peekModes Peek modes. + */ + public LocalEntriesTask(String cacheName, boolean async, CachePeekMode[] peekModes) { + super(cacheName, async); + this.peekModes = peekModes; + } + + /** {@inheritDoc} */ + @Override public Iterable<Entry<K, V>> call() throws Exception { + Collection<Entry<K, V>> res = new ArrayList<>(); + + for (Entry<K, V> e : cache().localEntries(peekModes)) + res.add(new CacheEntryImpl<>(e.getKey(), e.getValue())); + + return res; + } + } + + /** + * + */ + private static class LocalEvictTask<K> extends CacheTaskAdapter<K, Void, Void> { + /** Keys. */ + private final Collection<? extends K> keys; + + /** + * @param cacheName Cache name. + * @param async Async. + * @param keys Keys. + */ + public LocalEvictTask(String cacheName, boolean async, Collection<? extends K> keys) { + super(cacheName, async); + this.keys = keys; + } + + /** {@inheritDoc} */ + @Override public Void call() { + cache().localEvict(keys); + + return null; + } + } + + /** + * + */ + private static class LocalPeekTask<K, V> extends CacheTaskAdapter<K, V, V> { + /** Key. */ + private final K key; + + /** Peek modes. */ + private final CachePeekMode[] peekModes; + + /** + * @param cacheName Cache name. + * @param async Async. + * @param key Key. + * @param peekModes Peek modes. + */ + public LocalPeekTask(String cacheName, boolean async, K key, CachePeekMode[] peekModes) { + super(cacheName, async); + this.key = key; + this.peekModes = peekModes; + } + + /** {@inheritDoc} */ + @Override public V call() throws Exception { + return cache().localPeek(key, peekModes); + } + } + + /** + * + */ + private static class SizeTask extends CacheTaskAdapter<Void, Void, Integer> { + /** Peek modes. */ + private final CachePeekMode[] peekModes; + + /** Local. */ + private final boolean loc; + + /** + * @param cacheName Cache name. + * @param async Async. + * @param peekModes Peek modes. + * @param loc Local. + */ + public SizeTask(String cacheName, boolean async, CachePeekMode[] peekModes, boolean loc) { + super(cacheName, async); + this.loc = loc; + this.peekModes = peekModes; + } + + /** {@inheritDoc} */ + @Override public Integer call() throws Exception { + return loc ? cache().localSize(peekModes) : cache().size(peekModes); + } + } + + /** + * + */ + private static class GetTask<K, V> extends CacheTaskAdapter<K, V, V> { + /** Key. */ + private final K key; + + /** + * @param cacheName Cache name. + * @param async Async. + * @param key Key. + */ + public GetTask(String cacheName, boolean async, K key) { + super(cacheName, async); + this.key = key; + } + + /** {@inheritDoc} */ + @Override public V call() throws Exception { + return cache().get(key); + } + } + + /** + * + */ + private static class RemoveAllTask<K, V> extends CacheTaskAdapter<K, V, Void> { + /** + * @param cacheName Cache name. + * @param async Async. + */ + public RemoveAllTask(String cacheName, boolean async) { + super(cacheName, async); + } + + /** {@inheritDoc} */ + @Override public Void call() { + IgniteCache<K, V> cache = cache(); + + cache.removeAll(); + + if (async) + cache.future().get(); + + return null; + } + } + + /** + * + */ + private static class PutTask<K, V> extends CacheTaskAdapter<K, V, Void> { + /** Key. */ + private final K key; + + /** Value. */ + private final V val; + + /** + * @param cacheName Cache name. + * @param async Async. + * @param key Key. + * @param val Value. + */ + public PutTask(String cacheName, boolean async, K key, V val) { + super(cacheName, async); + this.key = key; + this.val = val; + } + + /** {@inheritDoc} */ + @Override public Void call() { + cache().put(key, val); + + return null; + } + } + + /** + * + */ + private static class ContainsKeyTask<K> extends CacheTaskAdapter<K, Object, Boolean> { + /** Key. */ + private final K key; + + /** + * @param cacheName Cache name. + * @param async Async. + * @param key Key. + */ + public ContainsKeyTask(String cacheName, boolean async, K key) { + super(cacheName, async); + this.key = key; + } + + /** {@inheritDoc} */ + @Override public Boolean call() throws Exception { + return cache().containsKey(key); + } + } + + /** + * + */ + private static class ClearTask extends CacheTaskAdapter<Object, Object, Void> { + /** + * @param cacheName Cache name. + * @param async Async. + */ + public ClearTask(String cacheName, boolean async) { + super(cacheName, async); + } + + /** {@inheritDoc} */ + @Override public Void call() { + cache().clear(); + + return null; + } + } + + /** + * + */ + private static class IteratorTask<K, V> extends CacheTaskAdapter<K, V, Collection<Entry<K, V>>> { + /** + * @param cacheName Cache name. + * @param async Async. + */ + public IteratorTask(String cacheName, boolean async) { + super(cacheName, async); + } + + /** {@inheritDoc} */ + @Override public Collection<Entry<K, V>> call() throws Exception { + Collection<Entry<K, V>> res = new ArrayList<>(); + + for (Entry<K, V> o : cache()) + res.add(o); + + return res; + } + } + + /** + * + */ + private static class ReplaceTask<K, V> extends CacheTaskAdapter<K, V, Boolean> { + /** Key. */ + private final K key; + + /** Value. */ + private final V val; + + /** + * @param cacheName Cache name. + * @param async Async. + * @param key Key. + * @param val Value. + */ + public ReplaceTask(String cacheName, boolean async, K key, V val) { + super(cacheName, async); + this.key = key; + this.val = val; + } + + /** {@inheritDoc} */ + @Override public Boolean call() throws Exception { + return cache().replace(key, val); + } + } + + /** + * + */ + private static class GetNameTask extends CacheTaskAdapter<Void, Void, String> { + /** + * @param cacheName Cache name. + * @param async Async. + */ + public GetNameTask(String cacheName, boolean async) { + super(cacheName, async); + } + + /** {@inheritDoc} */ + @Override public String call() throws Exception { + return cache().getName(); + } + } + + /** + * + */ + private static class RemoveTask<K> extends CacheTaskAdapter<K, Void, Boolean> { + /** Key. */ + private final K key; + + /** + * @param cacheName Cache name. + * @param async Async. + * @param key Key. + */ + public RemoveTask(String cacheName, boolean async, K key) { + super(cacheName, async); + this.key = key; + } + + /** {@inheritDoc} */ + @Override public Boolean call() throws Exception { + return cache().remove(key); + } + } + + /** + * + */ + private static class PutAllTask<K, V> extends CacheTaskAdapter<K, V, Void> { + /** Map. */ + private final Map<? extends K, ? extends V> map; + + /** + * @param cacheName Cache name. + * @param async Async. + * @param map Map. + */ + public PutAllTask(String cacheName, boolean async, Map<? extends K, ? extends V> map) { + super(cacheName, async); + this.map = map; + } + + /** {@inheritDoc} */ + @Override public Void call() { + cache().putAll(map); + + return null; + } + } + + /** + * + */ + private static class RemoveAllKeysTask<K> extends CacheTaskAdapter<K, Void, Void> { + /** Keys. */ + private final Set<? extends K> keys; + + /** + * @param cacheName Cache name. + * @param async Async. + * @param keys Keys. + */ + public RemoveAllKeysTask(String cacheName, boolean async, Set<? extends K> keys) { + super(cacheName, async); + this.keys = keys; + } + + /** {@inheritDoc} */ + @Override public Void call() { + cache().removeAll(keys); + + return null; + } + } + + /** + * + */ + private static class GetAllTask<K, V> extends CacheTaskAdapter<K, V, Map<K, V>> { + /** Keys. */ + private final Set<? extends K> keys; + + /** + * @param cacheName Cache name. + * @param async Async. + * @param keys Keys. + */ + public GetAllTask(String cacheName, boolean async, Set<? extends K> keys) { + super(cacheName, async); + this.keys = keys; + } + + /** {@inheritDoc} */ + @Override public Map<K, V> call() throws Exception { + return cache().getAll(keys); + } + } + + /** + * + */ + private static class GetAllOutTxTask<K, V> extends CacheTaskAdapter<K, V, Map<K, V>> { + /** Keys. */ + private final Set<? extends K> keys; + + /** + * @param cacheName Cache name. + * @param async Async. + * @param keys Keys. + */ + public GetAllOutTxTask(String cacheName, boolean async, Set<? extends K> keys) { + super(cacheName, async); + this.keys = keys; + } + + /** {@inheritDoc} */ + @Override public Map<K, V> call() throws Exception { + return cache().getAllOutTx(keys); + } + } + + /** + * + */ + private static class ContainsKeysTask<K, V> extends CacheTaskAdapter<K, V, Boolean> { + /** Keys. */ + private final Set<? extends K> keys; + + /** + * @param cacheName Cache name. + * @param async Async. + * @param keys Keys. + */ + public ContainsKeysTask(String cacheName, boolean async, Set<? extends K> keys) { + super(cacheName, async); + this.keys = keys; + } + + /** {@inheritDoc} */ + @Override public Boolean call() throws Exception { + return cache().containsKeys(keys); + } + } + + /** + * + */ + private static class GetAndPutTask<K, V> extends CacheTaskAdapter<K, V, V> { + /** Key. */ + private final K key; + + /** Value. */ + private final V val; + + /** + * @param cacheName Cache name. + * @param async Async. + * @param key Key. + * @param val Value. + */ + public GetAndPutTask(String cacheName, boolean async, K key, V val) { + super(cacheName, async); + this.key = key; + this.val = val; + } + + /** {@inheritDoc} */ + @Override public V call() throws Exception { + return cache().getAndPut(key, val); + } + } + + /** + * + */ + private static class PutIfAbsentTask<K, V> extends CacheTaskAdapter<K, V, Boolean> { + /** Key. */ + private final K key; + + /** Value. */ + private final V val; + + /** + * @param cacheName Cache name. + * @param async Async. + * @param key Key. + * @param val Value. + */ + public PutIfAbsentTask(String cacheName, boolean async, K key, V val) { + super(cacheName, async); + this.key = key; + this.val = val; + } + + /** {@inheritDoc} */ + @Override public Boolean call() throws Exception { + return cache().putIfAbsent(key, val); + } + } + + /** + * + */ + private static class RemoveIfExistsTask<K, V> extends CacheTaskAdapter<K, V, Boolean> { + /** Key. */ + private final K key; + + /** Old value. */ + private final V oldVal; + + /** + * @param cacheName Cache name. + * @param async Async. + * @param key Key. + * @param oldVal Old value. + */ + public RemoveIfExistsTask(String cacheName, boolean async, K key, V oldVal) { + super(cacheName, async); + this.key = key; + this.oldVal = oldVal; + } + + /** {@inheritDoc} */ + @Override public Boolean call() throws Exception { + return cache().remove(key, oldVal); + } + } + + /** + * + */ + private static class GetAndRemoveTask<K, V> extends CacheTaskAdapter<K, V, V> { + /** Key. */ + private final K key; + + /** + * @param cacheName Cache name. + * @param async Async. + * @param key Key. + */ + public GetAndRemoveTask(String cacheName, boolean async, K key) { + super(cacheName, async); + this.key = key; + } + + /** {@inheritDoc} */ + @Override public V call() throws Exception { + return cache().getAndRemove(key); + } + } + + /** + * + */ + private static class ReplaceIfExistsTask<K, V> extends CacheTaskAdapter<K, V, Boolean> { + /** Key. */ + private final K key; + + /** Old value. */ + private final V oldVal; + + /** New value. */ + private final V newVal; + + /** + * @param cacheName Cache name. + * @param async Async. + * @param key Key. + * @param oldVal Old value. + * @param newVal New value. + */ + public ReplaceIfExistsTask(String cacheName, boolean async, K key, V oldVal, V newVal) { + super(cacheName, async); + this.key = key; + this.oldVal = oldVal; + this.newVal = newVal; + } + + /** {@inheritDoc} */ + @Override public Boolean call() throws Exception { + return cache().replace(key, oldVal, newVal); + } + } + + /** + * + */ + private static class GetAndReplaceTask<K, V> extends CacheTaskAdapter<K, V, V> { + /** Key. */ + private final K key; + + /** Value. */ + private final V val; + + /** + * @param cacheName Cache name. + * @param async Async. + * @param key Key. + * @param val Value. + */ + public GetAndReplaceTask(String cacheName, boolean async, K key, V val) { + super(cacheName, async); + this.key = key; + this.val = val; + } + + /** {@inheritDoc} */ + @Override public V call() throws Exception { + return cache().getAndReplace(key, val); + } + } + + /** + * + */ + private static class ClearKeyTask<K> extends CacheTaskAdapter<K, Void, Void> { + /** Key. */ + private final K key; + + /** Local. */ + private final boolean loc; + + /** + * @param cacheName Cache name. + * @param async Async. + * @param key Key. + */ + public ClearKeyTask(String cacheName, boolean async, boolean loc, K key) { + super(cacheName, async); + this.key = key; + this.loc = loc; + } + + /** {@inheritDoc} */ + @Override public Void call() { + if (loc) + cache().localClear(key); + else + cache().clear(key); + + return null; + } + } + + /** + * + */ + private static class ClearAllKeys<K> extends CacheTaskAdapter<K, Void, Void> { + /** Keys. */ + private final Set<? extends K> keys; + + /** Local. */ + private final boolean loc; + + /** + * @param cacheName Cache name. + * @param async Async. + * @param keys Keys. + */ + public ClearAllKeys(String cacheName, boolean async, boolean loc, Set<? extends K> keys) { + super(cacheName, async); + this.keys = keys; + this.loc = loc; + } + + /** {@inheritDoc} */ + @Override public Void call() { + if (loc) + cache().localClearAll(keys); + else + cache().clearAll(keys); + + return null; + } + } + + /** + * + */ + private static class InvokeTask<K, V, R> extends CacheTaskAdapter<K, V, R> { + /** Key. */ + private final K key; + + /** Processor. */ + private final EntryProcessor<K, V, R> processor; + + /** Args. */ + private final Object[] args; + + /** + * @param cacheName Cache name. + * @param async Async. + * @param key Key. + * @param processor Processor. + * @param args Args. + */ + public InvokeTask(String cacheName, boolean async, K key, EntryProcessor<K, V, R> processor, + Object[] args) { + super(cacheName, async); + this.args = args; + this.key = key; + this.processor = processor; + } + + /** {@inheritDoc} */ + @Override public R call() throws Exception { + return cache().invoke(key, processor, args); + } + } + + /** + * + */ + private static class InvokeAllTask<K, V, T> extends CacheTaskAdapter<K, V, Map<K, EntryProcessorResult<T>>> { + /** Keys. */ + private final Set<? extends K> keys; + + /** Processor. */ + private final EntryProcessor<K, V, T> processor; + + /** Args. */ + private final Object[] args; + + /** + * @param cacheName Cache name. + * @param async Async. + * @param keys Keys. + * @param processor Processor. + * @param args Args. + */ + public InvokeAllTask(String cacheName, boolean async, Set<? extends K> keys, + EntryProcessor<K, V, T> processor, Object[] args) { + super(cacheName, async); + this.args = args; + this.keys = keys; + this.processor = processor; + } + + /** {@inheritDoc} */ + @Override public Map<K, EntryProcessorResult<T>> call() throws Exception { + return cache().invokeAll(keys, processor, args); + } + } + + /** + * + */ + private static class CloseTask extends CacheTaskAdapter<Void, Void, Void> { + /** + * @param cacheName Cache name. + * @param async Async. + */ + public CloseTask(String cacheName, boolean async) { + super(cacheName, async); + } + + /** {@inheritDoc} */ + @Override public Void call() { + cache().close(); + + return null; + } + } + + /** + * + */ + private static class DestroyTask extends CacheTaskAdapter<Void, Void, Void> { + /** + * @param cacheName Cache name. + * @param async Async. + */ + public DestroyTask(String cacheName, boolean async) { + super(cacheName, async); + } + + /** {@inheritDoc} */ + @Override public Void call() { + cache().destroy(); + + return null; + } + } + + /** + * + */ + private static class IsClosedTask extends CacheTaskAdapter<Void, Void, Boolean> { + /** + * @param cacheName Cache name. + * @param async Async. + */ + public IsClosedTask(String cacheName, boolean async) { + super(cacheName, async); + } + + /** {@inheritDoc} */ + @Override public Boolean call() throws Exception { + return cache().isClosed(); + } + } + + /** + * + */ + private static class UnwrapTask<R> extends CacheTaskAdapter<Void, Void, R> { + /** Clazz. */ + private final Class<R> clazz; + + /** + * @param cacheName Cache name. + * @param async Async. + * @param clazz Clazz. + */ + public UnwrapTask(String cacheName, boolean async, Class<R> clazz) { + super(cacheName, async); + this.clazz = clazz; + } + + /** {@inheritDoc} */ + @Override public R call() throws Exception { + return cache().unwrap(clazz); + } + } + + /** + * + */ + private static abstract class CacheTaskAdapter<K, V, R> implements IgniteCallable<R> { + /** Ignite. */ + @IgniteInstanceResource + protected Ignite ignite; + + /** Cache name. */ + protected final String cacheName; + + /** Async. */ + protected final boolean async; + + /** + * @param cacheName Cache name. + * @param async Async. + */ + public CacheTaskAdapter(String cacheName, boolean async) { + this.async = async; + this.cacheName = cacheName; + } + + /** + * Returns cache instance. + */ + protected IgniteCache<K, V> cache() { + IgniteCache<K, V> cache = ignite.cache(cacheName); + + return async ? cache.withAsync() : cache; + } + } } \ No newline at end of file
http://git-wip-us.apache.org/repos/asf/ignite/blob/a1e5cc57/modules/core/src/test/java/org/apache/ignite/testframework/junits/multijvm/IgniteClusterProcessProxy.java ---------------------------------------------------------------------- diff --git a/modules/core/src/test/java/org/apache/ignite/testframework/junits/multijvm/IgniteClusterProcessProxy.java b/modules/core/src/test/java/org/apache/ignite/testframework/junits/multijvm/IgniteClusterProcessProxy.java index 05d6533..633e9d0 100644 --- a/modules/core/src/test/java/org/apache/ignite/testframework/junits/multijvm/IgniteClusterProcessProxy.java +++ b/modules/core/src/test/java/org/apache/ignite/testframework/junits/multijvm/IgniteClusterProcessProxy.java @@ -26,7 +26,6 @@ import org.apache.ignite.Ignite; import org.apache.ignite.IgniteCluster; import org.apache.ignite.IgniteCompute; import org.apache.ignite.IgniteException; -import org.apache.ignite.Ignition; import org.apache.ignite.cluster.ClusterGroup; import org.apache.ignite.cluster.ClusterMetrics; import org.apache.ignite.cluster.ClusterNode; @@ -36,6 +35,7 @@ import org.apache.ignite.internal.cluster.IgniteClusterEx; import org.apache.ignite.lang.IgniteCallable; import org.apache.ignite.lang.IgniteFuture; import org.apache.ignite.lang.IgnitePredicate; +import org.apache.ignite.resources.IgniteInstanceResource; import org.jetbrains.annotations.Nullable; /** @@ -43,9 +43,6 @@ import org.jetbrains.annotations.Nullable; */ @SuppressWarnings("TransientFieldInNonSerializableClass") public class IgniteClusterProcessProxy implements IgniteClusterEx { - /** Grid id. */ - private final UUID gridId; - /** Compute. */ private final transient IgniteCompute compute; @@ -57,21 +54,11 @@ public class IgniteClusterProcessProxy implements IgniteClusterEx { */ public IgniteClusterProcessProxy(IgniteProcessProxy proxy) { this.proxy = proxy; - gridId = proxy.getId(); compute = proxy.remoteCompute(); } - /** - * Returns cluster instance. Method to be called from closure at another JVM. - * - * @return Cache. - */ - private IgniteClusterEx cluster() { - return (IgniteClusterEx)Ignition.ignite(gridId).cluster(); - } - /** {@inheritDoc} */ - @Override public ClusterGroupEx forSubjectId(final UUID subjId) { + @Override public ClusterGroupEx forSubjectId(UUID subjId) { throw new UnsupportedOperationException("Operation is not supported yet."); } @@ -83,11 +70,7 @@ public class IgniteClusterProcessProxy implements IgniteClusterEx { /** {@inheritDoc} */ @Override public ClusterNode localNode() { - return compute.call(new IgniteCallable<ClusterNode>() { - @Override public ClusterNode call() throws Exception { - return cluster().localNode(); - } - }); + return compute.call(new LocalNodeTask()); } /** {@inheritDoc} */ @@ -285,38 +268,22 @@ public class IgniteClusterProcessProxy implements IgniteClusterEx { /** {@inheritDoc} */ @Override public Collection<ClusterNode> nodes() { - return compute.call(new IgniteCallable<Collection<ClusterNode>>() { - @Override public Collection<ClusterNode> call() throws Exception { - return cluster().nodes(); - } - }); + return compute.call(new NodesTask()); } /** {@inheritDoc} */ - @Override public ClusterNode node(final UUID nid) { - return compute.call(new IgniteCallable<ClusterNode>() { - @Override public ClusterNode call() throws Exception { - return cluster().node(nid); - } - }); + @Override public ClusterNode node(UUID nid) { + return compute.call(new NodeTask(nid)); } /** {@inheritDoc} */ @Override public ClusterNode node() { - return compute.call(new IgniteCallable<ClusterNode>() { - @Override public ClusterNode call() throws Exception { - return cluster().node(); - } - }); + return compute.call(new NodeTask(null)); } /** {@inheritDoc} */ @Override public Collection<String> hostNames() { - return compute.call(new IgniteCallable<Collection<String>>() { - @Override public Collection<String> call() throws Exception { - return cluster().hostNames(); - } - }); + return compute.call(new HostNamesTask()); } /** {@inheritDoc} */ @@ -333,4 +300,70 @@ public class IgniteClusterProcessProxy implements IgniteClusterEx { @Nullable @Override public IgniteFuture<?> clientReconnectFuture() { throw new UnsupportedOperationException("Operation is not supported yet."); } + + /** + * + */ + private static class LocalNodeTask extends ClusterTaskAdapter<ClusterNode> { + /** {@inheritDoc} */ + @Override public ClusterNode call() throws Exception { + return cluster().localNode(); + } + } + + /** + * + */ + private static class NodesTask extends ClusterTaskAdapter<Collection<ClusterNode>> { + /** {@inheritDoc} */ + @Override public Collection<ClusterNode> call() throws Exception { + return cluster().nodes(); + } + } + + /** + * + */ + private static class NodeTask extends ClusterTaskAdapter<ClusterNode> { + /** Node id. */ + private final UUID nodeId; + + /** + * @param nodeId Node id. + */ + public NodeTask(UUID nodeId) { + this.nodeId = nodeId; + } + + /** {@inheritDoc} */ + @Override public ClusterNode call() throws Exception { + return nodeId == null ? cluster().node() : cluster().node(nodeId); + } + } + + /** + * + */ + private static class HostNamesTask extends ClusterTaskAdapter<Collection<String>> { + /** {@inheritDoc} */ + @Override public Collection<String> call() throws Exception { + return cluster().hostNames(); + } + } + + /** + * + */ + private abstract static class ClusterTaskAdapter<R> implements IgniteCallable<R> { + /** Ignite. */ + @IgniteInstanceResource + protected Ignite ignite; + + /** + * + */ + protected IgniteCluster cluster() { + return ignite.cluster(); + } + } } \ No newline at end of file http://git-wip-us.apache.org/repos/asf/ignite/blob/a1e5cc57/modules/core/src/test/java/org/apache/ignite/testframework/junits/multijvm/IgniteEventsProcessProxy.java ---------------------------------------------------------------------- diff --git a/modules/core/src/test/java/org/apache/ignite/testframework/junits/multijvm/IgniteEventsProcessProxy.java b/modules/core/src/test/java/org/apache/ignite/testframework/junits/multijvm/IgniteEventsProcessProxy.java index 860f889..d5af81e 100644 --- a/modules/core/src/test/java/org/apache/ignite/testframework/junits/multijvm/IgniteEventsProcessProxy.java +++ b/modules/core/src/test/java/org/apache/ignite/testframework/junits/multijvm/IgniteEventsProcessProxy.java @@ -20,15 +20,16 @@ package org.apache.ignite.testframework.junits.multijvm; import java.util.Collection; import java.util.List; import java.util.UUID; +import org.apache.ignite.Ignite; import org.apache.ignite.IgniteEvents; import org.apache.ignite.IgniteException; -import org.apache.ignite.Ignition; import org.apache.ignite.cluster.ClusterGroup; import org.apache.ignite.events.Event; import org.apache.ignite.lang.IgniteBiPredicate; import org.apache.ignite.lang.IgniteFuture; import org.apache.ignite.lang.IgnitePredicate; import org.apache.ignite.lang.IgniteRunnable; +import org.apache.ignite.resources.IgniteInstanceResource; import org.jetbrains.annotations.Nullable; /** @@ -39,23 +40,11 @@ public class IgniteEventsProcessProxy implements IgniteEvents { /** Ignite proxy. */ private final transient IgniteProcessProxy igniteProxy; - /** Grid id. */ - private final UUID gridId; - /** * @param igniteProxy Ignite proxy. */ public IgniteEventsProcessProxy(IgniteProcessProxy igniteProxy) { this.igniteProxy = igniteProxy; - - gridId = igniteProxy.getId(); - } - - /** - * @return Events instance. - */ - private IgniteEvents events() { - return Ignition.ignite(gridId).events(); } /** {@inheritDoc} */ @@ -105,11 +94,7 @@ public class IgniteEventsProcessProxy implements IgniteEvents { /** {@inheritDoc} */ @Override public void localListen(final IgnitePredicate<? extends Event> lsnr, final int... types) { - igniteProxy.remoteCompute().run(new IgniteRunnable() { - @Override public void run() { - events().localListen(lsnr, types); - } - }); + igniteProxy.remoteCompute().run(new LocalListenTask(lsnr, types)); } /** {@inheritDoc} */ @@ -151,4 +136,33 @@ public class IgniteEventsProcessProxy implements IgniteEvents { @Override public <R> IgniteFuture<R> future() { throw new UnsupportedOperationException("Operation isn't supported yet."); } + + /** + * + */ + private static class LocalListenTask implements IgniteRunnable { + /** Ignite. */ + @IgniteInstanceResource + private Ignite ignite; + + /** Listener. */ + private final IgnitePredicate<? extends Event> lsnr; + + /** Types. */ + private final int[] types; + + /** + * @param lsnr Listener. + * @param types Types. + */ + public LocalListenTask(IgnitePredicate<? extends Event> lsnr, int[] types) { + this.lsnr = lsnr; + this.types = types; + } + + /** {@inheritDoc} */ + @Override public void run() { + ignite.events().localListen(lsnr, types); + } + } } \ No newline at end of file http://git-wip-us.apache.org/repos/asf/ignite/blob/a1e5cc57/modules/core/src/test/java/org/apache/ignite/testframework/junits/multijvm/IgniteNodeRunner.java ---------------------------------------------------------------------- diff --git a/modules/core/src/test/java/org/apache/ignite/testframework/junits/multijvm/IgniteNodeRunner.java b/modules/core/src/test/java/org/apache/ignite/testframework/junits/multijvm/IgniteNodeRunner.java index f46e8e9..0597eda 100644 --- a/modules/core/src/test/java/org/apache/ignite/testframework/junits/multijvm/IgniteNodeRunner.java +++ b/modules/core/src/test/java/org/apache/ignite/testframework/junits/multijvm/IgniteNodeRunner.java @@ -25,13 +25,12 @@ import java.io.FileOutputStream; import java.io.FileReader; import java.io.IOException; import java.io.OutputStream; -import java.lang.reflect.Field; import java.util.ArrayList; import java.util.Arrays; import java.util.List; import java.util.Set; import org.apache.ignite.Ignite; -import org.apache.ignite.IgniteException; +import org.apache.ignite.IgniteCheckedException; import org.apache.ignite.Ignition; import org.apache.ignite.configuration.IgniteConfiguration; import org.apache.ignite.internal.IgniteEx; @@ -39,8 +38,9 @@ import org.apache.ignite.internal.processors.cache.GridCacheAbstractFullApiSelfT import org.apache.ignite.internal.util.GridJavaProcess; import org.apache.ignite.internal.util.typedef.X; import org.apache.ignite.internal.util.typedef.internal.U; -import org.apache.ignite.marshaller.optimized.OptimizedMarshaller; +import org.apache.ignite.marshaller.Marshaller; import org.apache.ignite.spi.discovery.tcp.TcpDiscoverySpi; +import org.apache.ignite.testframework.junits.IgniteTestResources; import sun.jvmstat.monitor.HostIdentifier; import sun.jvmstat.monitor.MonitoredHost; import sun.jvmstat.monitor.MonitoredVm; @@ -99,30 +99,6 @@ public class IgniteNodeRunner { public static String storeToFile(IgniteConfiguration cfg) throws IOException { String fileName = IGNITE_CONFIGURATION_FILE + cfg.getNodeId(); - // Check marshaller configuration, because read configuration method expect specific marshaller. - if (cfg.getMarshaller() instanceof OptimizedMarshaller){ - OptimizedMarshaller marsh = (OptimizedMarshaller)cfg.getMarshaller(); - - try { - Field isRequireFiled = marsh.getClass().getDeclaredField("requireSer"); - - isRequireFiled.setAccessible(true); - - boolean isRequireSer = isRequireFiled.getBoolean(marsh); - - if (isRequireSer) - throw new UnsupportedOperationException("Unsupported marshaller configuration. " + - "readCfgFromFileAndDeleteFile method expect " + OptimizedMarshaller.class.getSimpleName() + - "with requireSerializeble flag in 'false'."); - } - catch (NoSuchFieldException|IllegalAccessException e) { - throw new IgniteException("Failed to check filed of " + OptimizedMarshaller.class.getSimpleName(), e); - } - } - else - throw new UnsupportedOperationException("Unsupported marshaller. " + - "readCfgFromFileAndDeleteFile method expect " + OptimizedMarshaller.class.getSimpleName()); - try(OutputStream out = new BufferedOutputStream(new FileOutputStream(fileName))) { cfg.setMBeanServer(null); cfg.setMarshaller(null); @@ -143,11 +119,16 @@ public class IgniteNodeRunner { * @throws IOException If failed. * @see #storeToFile(IgniteConfiguration) */ - private static IgniteConfiguration readCfgFromFileAndDeleteFile(String fileName) throws IOException { + private static IgniteConfiguration readCfgFromFileAndDeleteFile(String fileName) + throws IOException, IgniteCheckedException { try(BufferedReader cfgReader = new BufferedReader(new FileReader(fileName))) { IgniteConfiguration cfg = (IgniteConfiguration)new XStream().fromXML(cfgReader); - cfg.setMarshaller(new OptimizedMarshaller(false)); + Marshaller marsh = IgniteTestResources.getMarshaller(); + + cfg.setMarshaller(marsh); + + X.println("Configured marshaller class: " + marsh.getClass().getName()); TcpDiscoverySpi disco = new TcpDiscoverySpi(); disco.setIpFinder(GridCacheAbstractFullApiSelfTest.LOCAL_IP_FINDER); http://git-wip-us.apache.org/repos/asf/ignite/blob/a1e5cc57/modules/core/src/test/java/org/apache/ignite/testframework/junits/multijvm/IgniteProcessProxy.java ---------------------------------------------------------------------- diff --git a/modules/core/src/test/java/org/apache/ignite/testframework/junits/multijvm/IgniteProcessProxy.java b/modules/core/src/test/java/org/apache/ignite/testframework/junits/multijvm/IgniteProcessProxy.java index ec7dab7..aa1d470 100644 --- a/modules/core/src/test/java/org/apache/ignite/testframework/junits/multijvm/IgniteProcessProxy.java +++ b/modules/core/src/test/java/org/apache/ignite/testframework/junits/multijvm/IgniteProcessProxy.java @@ -41,13 +41,11 @@ import org.apache.ignite.IgniteFileSystem; import org.apache.ignite.IgniteIllegalStateException; import org.apache.ignite.IgniteLogger; import org.apache.ignite.IgniteMessaging; -import org.apache.ignite.internal.portable.api.IgnitePortables; import org.apache.ignite.IgniteQueue; import org.apache.ignite.IgniteScheduler; import org.apache.ignite.IgniteServices; import org.apache.ignite.IgniteSet; import org.apache.ignite.IgniteTransactions; -import org.apache.ignite.Ignition; import org.apache.ignite.cache.affinity.Affinity; import org.apache.ignite.cluster.ClusterGroup; import org.apache.ignite.cluster.ClusterNode; @@ -62,6 +60,7 @@ import org.apache.ignite.internal.GridKernalContext; import org.apache.ignite.internal.IgniteEx; import org.apache.ignite.internal.IgniteInterruptedCheckedException; import org.apache.ignite.internal.cluster.IgniteClusterEx; +import org.apache.ignite.internal.portable.api.IgnitePortables; import org.apache.ignite.internal.processors.cache.GridCacheUtilityKey; import org.apache.ignite.internal.processors.cache.IgniteInternalCache; import org.apache.ignite.internal.processors.hadoop.Hadoop; @@ -78,6 +77,8 @@ import org.apache.ignite.lang.IgniteProductVersion; import org.apache.ignite.lang.IgniteRunnable; import org.apache.ignite.plugin.IgnitePlugin; import org.apache.ignite.plugin.PluginNotFoundException; +import org.apache.ignite.resources.IgniteInstanceResource; +import org.apache.ignite.testframework.junits.IgniteTestResources; import org.jetbrains.annotations.Nullable; /** @@ -88,6 +89,9 @@ public class IgniteProcessProxy implements IgniteEx { /** Grid proxies. */ private static final transient ConcurrentMap<String, IgniteProcessProxy> gridProxies = new ConcurrentHashMap<>(); + /** Property that specify alternative {@code JAVA_HOME}. */ + private static final String TEST_MULTIJVM_JAVA_HOME = "test.multijvm.java.home"; + /** Jvm process with ignite instance. */ private final transient GridJavaProcess proc; @@ -108,7 +112,7 @@ public class IgniteProcessProxy implements IgniteEx { * @param log Logger. * @param locJvmGrid Local JVM grid. */ - public IgniteProcessProxy(final IgniteConfiguration cfg, final IgniteLogger log, final Ignite locJvmGrid) + public IgniteProcessProxy(IgniteConfiguration cfg, IgniteLogger log, Ignite locJvmGrid) throws Exception { this.cfg = cfg; this.locJvmGrid = locJvmGrid; @@ -121,7 +125,9 @@ public class IgniteProcessProxy implements IgniteEx { Collection<String> filteredJvmArgs = new ArrayList<>(); for (String arg : jvmArgs) { - if(!arg.toLowerCase().startsWith("-agentlib")) + if(arg.startsWith("-Xmx") || arg.startsWith("-Xms") || + arg.startsWith("-cp") || arg.startsWith("-classpath") || + arg.startsWith("-D" + IgniteTestResources.MARSH_CLASS_NAME)) filteredJvmArgs.add(arg); } @@ -130,7 +136,7 @@ public class IgniteProcessProxy implements IgniteEx { locJvmGrid.events().localListen(new NodeStartedListener(id, rmtNodeStartedLatch), EventType.EVT_NODE_JOINED); proc = GridJavaProcess.exec( - IgniteNodeRunner.class, + IgniteNodeRunner.class.getCanonicalName(), cfgFileName, // Params. this.log, // Optional closure to be called each time wrapped process prints line to system.out or system.err. @@ -140,6 +146,7 @@ public class IgniteProcessProxy implements IgniteEx { } }, null, + System.getProperty(TEST_MULTIJVM_JAVA_HOME), filteredJvmArgs, // JVM Args. System.getProperty("surefire.test.class.path") ); @@ -149,11 +156,7 @@ public class IgniteProcessProxy implements IgniteEx { IgniteProcessProxy prevVal = gridProxies.putIfAbsent(cfg.getGridName(), this); if (prevVal != null) { - remoteCompute().run(new IgniteRunnable() { - @Override public void run() { - G.stop(cfg.getGridName(), true); - } - }); + remoteCompute().run(new StopGridTask(cfg.getGridName(), true)); throw new IllegalStateException("There was found instance assotiated with " + cfg.getGridName() + ", instance= " + prevVal + ". New started node was stopped."); @@ -208,30 +211,17 @@ public class IgniteProcessProxy implements IgniteEx { * @param gridName Grid name. * @param cancel Cacnel flag. */ - public static void stop(final String gridName, final boolean cancel) { + public static void stop(String gridName, boolean cancel) { IgniteProcessProxy proxy = gridProxies.get(gridName); if (proxy != null) { - proxy.remoteCompute().run(new IgniteRunnable() { - @Override public void run() { - G.stop(gridName, cancel); - } - }); + proxy.remoteCompute().run(new StopGridTask(gridName, cancel)); gridProxies.remove(gridName, proxy); } } /** - * For usage in closures. - * - * @return Ignite instance. - */ - private Ignite igniteById() { - return Ignition.ignite(id); - } - - /** * @param locNodeId ID of local node the requested grid instance is managing. * @return An instance of named grid. This method never returns {@code null}. * @throws IgniteIllegalStateException Thrown if grid was not properly initialized or grid instance was stopped or @@ -357,11 +347,7 @@ public class IgniteProcessProxy implements IgniteEx { /** {@inheritDoc} */ @Override public ClusterNode localNode() { - return remoteCompute().call(new IgniteCallable<ClusterNode>() { - @Override public ClusterNode call() throws Exception { - return ((IgniteEx)Ignition.ignite(id)).localNode(); - } - }); + return remoteCompute().call(new NodeTask()); } /** {@inheritDoc} */ @@ -467,7 +453,10 @@ public class IgniteProcessProxy implements IgniteEx { } /** {@inheritDoc} */ - @Override public <K, V> IgniteCache<K, V> createNearCache(@Nullable String cacheName, NearCacheConfiguration<K, V> nearCfg) { + @Override public <K, V> IgniteCache<K, V> createNearCache( + @Nullable String cacheName, + NearCacheConfiguration<K, V> nearCfg) + { throw new UnsupportedOperationException("Operation isn't supported yet."); } @@ -508,7 +497,8 @@ public class IgniteProcessProxy implements IgniteEx { } /** {@inheritDoc} */ - @Override public IgniteAtomicSequence atomicSequence(String name, long initVal, boolean create) throws IgniteException { + @Override public IgniteAtomicSequence atomicSequence(String name, long initVal, boolean create) + throws IgniteException { throw new UnsupportedOperationException("Operation isn't supported yet."); } @@ -524,8 +514,12 @@ public class IgniteProcessProxy implements IgniteEx { } /** {@inheritDoc} */ - @Override public <T, S> IgniteAtomicStamped<T, S> atomicStamped(String name, @Nullable T initVal, @Nullable S initStamp, - boolean create) throws IgniteException { + @Override public <T, S> IgniteAtomicStamped<T, S> atomicStamped( + String name, + @Nullable T initVal, + @Nullable S initStamp, + boolean create) throws IgniteException + { throw new UnsupportedOperationException("Operation isn't supported yet."); } @@ -572,11 +566,7 @@ public class IgniteProcessProxy implements IgniteEx { } }, EventType.EVT_NODE_LEFT, EventType.EVT_NODE_FAILED); - compute().run(new IgniteRunnable() { - @Override public void run() { - igniteById().close(); - } - }); + compute().run(new StopGridTask(localJvmGrid().name(), true)); try { assert U.await(rmtNodeStoppedLatch, 15, TimeUnit.SECONDS) : "NodeId=" + id; @@ -616,4 +606,43 @@ public class IgniteProcessProxy implements IgniteEx { return locJvmGrid.compute(grp); } + + /** + * + */ + private static class StopGridTask implements IgniteRunnable { + /** Grid name. */ + private final String gridName; + + /** Cancel. */ + private final boolean cancel; + + /** + * @param gridName Grid name. + * @param cancel Cancel. + */ + public StopGridTask(String gridName, boolean cancel) { + this.gridName = gridName; + this.cancel = cancel; + } + + /** {@inheritDoc} */ + @Override public void run() { + G.stop(gridName, cancel); + } + } + + /** + * + */ + private static class NodeTask implements IgniteCallable<ClusterNode> { + /** Ignite. */ + @IgniteInstanceResource + private Ignite ignite; + + /** {@inheritDoc} */ + @Override public ClusterNode call() throws Exception { + return ((IgniteEx)ignite).localNode(); + } + } } \ No newline at end of file http://git-wip-us.apache.org/repos/asf/ignite/blob/a1e5cc57/modules/indexing/src/test/java/org/apache/ignite/internal/processors/cache/CacheConfigurationP2PTest.java ---------------------------------------------------------------------- diff --git a/modules/indexing/src/test/java/org/apache/ignite/internal/processors/cache/CacheConfigurationP2PTest.java b/modules/indexing/src/test/java/org/apache/ignite/internal/processors/cache/CacheConfigurationP2PTest.java index 5298dd4..3777154 100644 --- a/modules/indexing/src/test/java/org/apache/ignite/internal/processors/cache/CacheConfigurationP2PTest.java +++ b/modules/indexing/src/test/java/org/apache/ignite/internal/processors/cache/CacheConfigurationP2PTest.java @@ -103,6 +103,7 @@ public class CacheConfigurationP2PTest extends GridCommonAbstractTest { } }, null, + null, jvmArgs, null ); @@ -119,6 +120,7 @@ public class CacheConfigurationP2PTest extends GridCommonAbstractTest { } }, null, + null, jvmArgs, null ); @@ -139,6 +141,7 @@ public class CacheConfigurationP2PTest extends GridCommonAbstractTest { } }, null, + null, jvmArgs, cp );