# ignite-26
Project: http://git-wip-us.apache.org/repos/asf/incubator-ignite/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-ignite/commit/a88b637b Tree: http://git-wip-us.apache.org/repos/asf/incubator-ignite/tree/a88b637b Diff: http://git-wip-us.apache.org/repos/asf/incubator-ignite/diff/a88b637b Branch: refs/heads/sprint-1 Commit: a88b637bf4228a35b84fde6f875c5fc6e4e97ab0 Parents: 60f1967 Author: sboikov <[email protected]> Authored: Mon Feb 2 09:29:59 2015 +0300 Committer: sboikov <[email protected]> Committed: Mon Feb 2 10:48:16 2015 +0300 ---------------------------------------------------------------------- .../datagrid/CachePopularNumbersExample.java | 4 +- .../org/apache/ignite/IgniteDataLoader.java | 58 +++++++++---------- .../processors/cache/GridCacheAdapter.java | 38 +++++++------ .../processors/cache/IgniteCacheProxy.java | 2 +- .../GridAtomicCacheQueueImpl.java | 2 +- .../dataload/GridDataLoaderFuture.java | 6 +- .../dataload/GridDataLoaderProcessor.java | 8 +-- .../dataload/IgniteDataLoaderImpl.java | 59 +++++++++++++++----- .../processors/fs/GridGgfsDataManager.java | 10 ++-- .../ignite/internal/util/IgniteUtils.java | 2 +- .../util/future/IgniteFinishedFutureImpl.java | 33 +++++++++++ .../processors/cache/IgniteTxAbstractTest.java | 2 +- ...cOriginatingNodeFailureAbstractSelfTest.java | 2 +- ...ockPartitionedMultiNodeAbstractSelfTest.java | 2 +- .../near/GridCacheNearOnlyTopologySelfTest.java | 2 +- .../GridDataLoaderProcessorSelfTest.java | 14 ++--- .../processors/fs/GridGgfsAbstractSelfTest.java | 2 +- .../window/GridStreamerWindowSelfTest.java | 26 ++++----- 18 files changed, 171 insertions(+), 101 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/a88b637b/examples/src/main/java/org/apache/ignite/examples/datagrid/CachePopularNumbersExample.java ---------------------------------------------------------------------- diff --git a/examples/src/main/java/org/apache/ignite/examples/datagrid/CachePopularNumbersExample.java b/examples/src/main/java/org/apache/ignite/examples/datagrid/CachePopularNumbersExample.java index 1617662..df8a6dd 100644 --- a/examples/src/main/java/org/apache/ignite/examples/datagrid/CachePopularNumbersExample.java +++ b/examples/src/main/java/org/apache/ignite/examples/datagrid/CachePopularNumbersExample.java @@ -90,9 +90,9 @@ public class CachePopularNumbersExample { * Populates cache in real time with numbers and keeps count for every number. * * @param g Grid. - * @throws IgniteCheckedException If failed. + * @throws IgniteException If failed. */ - private static void streamData(final Ignite g) throws IgniteCheckedException { + private static void streamData(final Ignite g) throws IgniteException { try (IgniteDataLoader<Integer, Long> ldr = g.dataLoader(CACHE_NAME)) { // Set larger per-node buffer size since our state is relatively small. ldr.perNodeBufferSize(2048); http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/a88b637b/modules/core/src/main/java/org/apache/ignite/IgniteDataLoader.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/IgniteDataLoader.java b/modules/core/src/main/java/org/apache/ignite/IgniteDataLoader.java index 796e0c9..cfb87e6 100644 --- a/modules/core/src/main/java/org/apache/ignite/IgniteDataLoader.java +++ b/modules/core/src/main/java/org/apache/ignite/IgniteDataLoader.java @@ -18,7 +18,7 @@ package org.apache.ignite; import org.apache.ignite.dataload.*; -import org.apache.ignite.internal.*; +import org.apache.ignite.lang.*; import org.jetbrains.annotations.*; import java.util.*; @@ -34,14 +34,14 @@ import java.util.*; * the loader. * <p> * Also note that {@code GridDataLoader} is not the only way to load data into cache. - * Alternatively you can use {@link org.apache.ignite.cache.GridCache#loadCache(org.apache.ignite.lang.IgniteBiPredicate, long, Object...)} + * Alternatively you can use {@link IgniteCache#loadCache(IgniteBiPredicate, Object...)} * method to load data from underlying data store. You can also use standard * cache {@code put(...)} and {@code putAll(...)} operations as well, but they most * likely will not perform as well as this class for loading data. And finally, * data can be loaded from underlying data store on demand, whenever it is accessed - * for this no explicit data loading step is needed. * <p> - * {@code GridDataLoader} supports the following configuration properties: + * {@code IgniteDataLoader} supports the following configuration properties: * <ul> * <li> * {@link #perNodeBufferSize(int)} - when entries are added to data loader via @@ -115,9 +115,9 @@ public interface IgniteDataLoader<K, V> extends AutoCloseable { * Default is {@code false}. * * @param isolated Flag value. - * @throws IgniteCheckedException If failed. + * @throws IgniteException If failed. */ - public void isolated(boolean isolated) throws IgniteCheckedException; + public void isolated(boolean isolated) throws IgniteException; /** * Gets flag indicating that write-through behavior should be disabled for data loading. @@ -209,7 +209,7 @@ public interface IgniteDataLoader<K, V> extends AutoCloseable { * * @return Future for this loading process. */ - public IgniteInternalFuture<?> future(); + public IgniteFuture<?> future(); /** * Optional deploy class for peer deployment. All classes loaded by a data loader @@ -235,12 +235,12 @@ public interface IgniteDataLoader<K, V> extends AutoCloseable { * * @param key Key. * @return Future fo this operation. - * @throws IgniteCheckedException If failed to map key to node. - * @throws org.apache.ignite.internal.IgniteInterruptedCheckedException If thread has been interrupted. + * @throws IgniteException If failed to map key to node. + * @throws IgniteInterruptedException If thread has been interrupted. * @throws IllegalStateException If grid has been concurrently stopped or * {@link #close(boolean)} has already been called on loader. */ - public IgniteInternalFuture<?> removeData(K key) throws IgniteCheckedException, IgniteInterruptedCheckedException, IllegalStateException; + public IgniteFuture<?> removeData(K key) throws IgniteException, IgniteInterruptedException, IllegalStateException; /** * Adds data for loading on remote node. This method can be called from multiple @@ -253,12 +253,12 @@ public interface IgniteDataLoader<K, V> extends AutoCloseable { * @param key Key. * @param val Value or {@code null} if respective entry must be removed from cache. * @return Future fo this operation. - * @throws IgniteCheckedException If failed to map key to node. - * @throws org.apache.ignite.internal.IgniteInterruptedCheckedException If thread has been interrupted. + * @throws IgniteException If failed to map key to node. + * @throws IgniteInterruptedException If thread has been interrupted. * @throws IllegalStateException If grid has been concurrently stopped or * {@link #close(boolean)} has already been called on loader. */ - public IgniteInternalFuture<?> addData(K key, @Nullable V val) throws IgniteCheckedException, IgniteInterruptedCheckedException, + public IgniteFuture<?> addData(K key, @Nullable V val) throws IgniteException, IgniteInterruptedException, IllegalStateException; /** @@ -271,12 +271,12 @@ public interface IgniteDataLoader<K, V> extends AutoCloseable { * * @param entry Entry. * @return Future fo this operation. - * @throws IgniteCheckedException If failed to map key to node. - * @throws org.apache.ignite.internal.IgniteInterruptedCheckedException If thread has been interrupted. + * @throws IgniteException If failed to map key to node. + * @throws IgniteInterruptedException If thread has been interrupted. * @throws IllegalStateException If grid has been concurrently stopped or * {@link #close(boolean)} has already been called on loader. */ - public IgniteInternalFuture<?> addData(Map.Entry<K, V> entry) throws IgniteCheckedException, IgniteInterruptedCheckedException, + public IgniteFuture<?> addData(Map.Entry<K, V> entry) throws IgniteException, IgniteInterruptedException, IllegalStateException; /** @@ -292,7 +292,7 @@ public interface IgniteDataLoader<K, V> extends AutoCloseable { * {@link #close(boolean)} has already been called on loader. * @return Future for this load operation. */ - public IgniteInternalFuture<?> addData(Collection<? extends Map.Entry<K, V>> entries) throws IllegalStateException; + public IgniteFuture<?> addData(Collection<? extends Map.Entry<K, V>> entries) throws IllegalStateException; /** * Adds data for loading on remote node. This method can be called from multiple @@ -307,7 +307,7 @@ public interface IgniteDataLoader<K, V> extends AutoCloseable { * {@link #close(boolean)} has already been called on loader. * @return Future for this load operation. */ - public IgniteInternalFuture<?> addData(Map<K, V> entries) throws IllegalStateException; + public IgniteFuture<?> addData(Map<K, V> entries) throws IllegalStateException; /** * Loads any remaining data, but doesn't close the loader. Data can be still added after @@ -318,34 +318,34 @@ public interface IgniteDataLoader<K, V> extends AutoCloseable { * another thread to complete flush and exit. If you don't want to wait in this case, * use {@link #tryFlush()} method. * - * @throws IgniteCheckedException If failed to map key to node. - * @throws org.apache.ignite.internal.IgniteInterruptedCheckedException If thread has been interrupted. + * @throws IgniteException If failed to map key to node. + * @throws IgniteInterruptedException If thread has been interrupted. * @throws IllegalStateException If grid has been concurrently stopped or * {@link #close(boolean)} has already been called on loader. * @see #tryFlush() */ - public void flush() throws IgniteCheckedException, IgniteInterruptedCheckedException, IllegalStateException; + public void flush() throws IgniteException, IgniteInterruptedException, IllegalStateException; /** * Makes an attempt to load remaining data. This method is mostly similar to {@link #flush}, * with the difference that it won't wait and will exit immediately. * - * @throws IgniteCheckedException If failed to map key to node. - * @throws org.apache.ignite.internal.IgniteInterruptedCheckedException If thread has been interrupted. + * @throws IgniteException If failed to map key to node. + * @throws IgniteInterruptedException If thread has been interrupted. * @throws IllegalStateException If grid has been concurrently stopped or * {@link #close(boolean)} has already been called on loader. * @see #flush() */ - public void tryFlush() throws IgniteCheckedException, IgniteInterruptedCheckedException, IllegalStateException; + public void tryFlush() throws IgniteException, IgniteInterruptedException, IllegalStateException; /** * Loads any remaining data and closes this loader. * * @param cancel {@code True} to cancel ongoing loading operations. - * @throws IgniteCheckedException If failed to map key to node. - * @throws org.apache.ignite.internal.IgniteInterruptedCheckedException If thread has been interrupted. + * @throws IgniteException If failed to map key to node. + * @throws org.apache.ignite.IgniteInterruptedException If thread has been interrupted. */ - public void close(boolean cancel) throws IgniteCheckedException, IgniteInterruptedCheckedException; + public void close(boolean cancel) throws IgniteException, IgniteInterruptedException; /** * Closes data loader. This method is identical to calling {@link #close(boolean) close(false)} method. @@ -353,8 +353,8 @@ public interface IgniteDataLoader<K, V> extends AutoCloseable { * The method is invoked automatically on objects managed by the * {@code try-with-resources} statement. * - * @throws IgniteCheckedException If failed to close data loader. - * @throws org.apache.ignite.internal.IgniteInterruptedCheckedException If thread has been interrupted. + * @throws IgniteException If failed to close data loader. + * @throws IgniteInterruptedException If thread has been interrupted. */ - @Override public void close() throws IgniteCheckedException, IgniteInterruptedCheckedException; + @Override public void close() throws IgniteException, IgniteInterruptedException; } http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/a88b637b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheAdapter.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheAdapter.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheAdapter.java index df03333..27d7548 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheAdapter.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheAdapter.java @@ -29,6 +29,7 @@ import org.apache.ignite.internal.*; import org.apache.ignite.internal.cluster.*; import org.apache.ignite.internal.compute.*; import org.apache.ignite.internal.processors.cache.version.*; +import org.apache.ignite.internal.processors.dataload.*; import org.apache.ignite.internal.transactions.*; import org.apache.ignite.internal.util.*; import org.apache.ignite.lang.*; @@ -3632,7 +3633,9 @@ public abstract class GridCacheAdapter<K, V> implements GridCache<K, V>, final long topVer = ctx.affinity().affinityTopologyVersion(); if (ctx.store().isLocalStore()) { - try (final IgniteDataLoader<K, V> ldr = ctx.kernalContext().<K, V>dataLoad().dataLoader(ctx.namex(), false)) { + IgniteDataLoaderImpl<K, V> ldr = ctx.kernalContext().<K, V>dataLoad().dataLoader(ctx.namex(), false); + + try { ldr.updater(new GridDrDataLoadCacheUpdater<K, V>()); LocalStoreLoadClosure c = new LocalStoreLoadClosure(p, ldr, ttl); @@ -3641,6 +3644,9 @@ public abstract class GridCacheAdapter<K, V> implements GridCache<K, V>, c.onDone(); } + finally { + ldr.closeEx(false); + } } else { // Version for all loaded entries. @@ -3802,7 +3808,9 @@ public abstract class GridCacheAdapter<K, V> implements GridCache<K, V>, final long topVer = ctx.affinity().affinityTopologyVersion(); if (ctx.store().isLocalStore()) { - try (final IgniteDataLoader<K, V> ldr = ctx.kernalContext().<K, V>dataLoad().dataLoader(ctx.namex(), false)) { + IgniteDataLoaderImpl<K, V> ldr = ctx.kernalContext().<K, V>dataLoad().dataLoader(ctx.namex(), false); + + try { ldr.updater(new GridDrDataLoadCacheUpdater<K, V>()); LocalStoreLoadClosure c = new LocalStoreLoadClosure(null, ldr, 0); @@ -3811,6 +3819,9 @@ public abstract class GridCacheAdapter<K, V> implements GridCache<K, V>, c.onDone(); } + finally { + ldr.closeEx(false); + } } else { // Version for all loaded entries. @@ -3830,29 +3841,24 @@ public abstract class GridCacheAdapter<K, V> implements GridCache<K, V>, * @throws IgniteCheckedException If failed. */ void globalLoadCache(@Nullable IgniteBiPredicate<K, V> p, @Nullable Object... args) throws IgniteCheckedException { - ClusterGroup nodes = ctx.kernalContext().grid().cluster().forCache(ctx.name()); - - IgniteCompute comp = ctx.kernalContext().grid().compute(nodes).withNoFailover(); - - comp.broadcast(new LoadCacheClosure<>(ctx.name(), p, args)); + globalLoadCacheAsync(p, args).get(); } /** * @param p Predicate. * @param args Arguments. * @throws IgniteCheckedException If failed. + * @return Load cache future. */ - IgniteFuture<?> globalLoadCacheAsync(@Nullable IgniteBiPredicate<K, V> p, @Nullable Object... args) + IgniteInternalFuture<?> globalLoadCacheAsync(@Nullable IgniteBiPredicate<K, V> p, @Nullable Object... args) throws IgniteCheckedException { ClusterGroup nodes = ctx.kernalContext().grid().cluster().forCache(ctx.name()); - IgniteCompute comp = ctx.kernalContext().grid().compute(nodes).withNoFailover(); - - comp = comp.withAsync(); - - comp.broadcast(new LoadCacheClosure<>(ctx.name(), p, args)); + ctx.kernalContext().task().setThreadContext(TC_NO_FAILOVER, true); - return comp.future(); + return ctx.kernalContext().closure().callAsync(BROADCAST, + Arrays.asList(new LoadCacheClosure<>(ctx.name(), p, args)), + nodes.nodes()); } /** {@inheritDoc} */ @@ -5547,7 +5553,7 @@ public abstract class GridCacheAdapter<K, V> implements GridCache<K, V>, final Collection<Map.Entry<K, V>> col; /** */ - final IgniteDataLoader<K, V> ldr; + final IgniteDataLoaderImpl<K, V> ldr; /** */ final long ttl; @@ -5557,7 +5563,7 @@ public abstract class GridCacheAdapter<K, V> implements GridCache<K, V>, * @param ldr Loader. * @param ttl TTL. */ - private LocalStoreLoadClosure(@Nullable IgniteBiPredicate<K, V> p, IgniteDataLoader<K, V> ldr, long ttl) { + private LocalStoreLoadClosure(@Nullable IgniteBiPredicate<K, V> p, IgniteDataLoaderImpl<K, V> ldr, long ttl) { this.p = p; this.ldr = ldr; this.ttl = ttl; http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/a88b637b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/IgniteCacheProxy.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/IgniteCacheProxy.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/IgniteCacheProxy.java index 9a365ec..86fdeda 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/IgniteCacheProxy.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/IgniteCacheProxy.java @@ -154,7 +154,7 @@ public class IgniteCacheProxy<K, V> extends IgniteAsyncSupportAdapter<IgniteCach try { if (isAsync()) - curFut.set(ctx.cache().globalLoadCacheAsync(p, args)); + setFuture(ctx.cache().globalLoadCacheAsync(p, args)); else ctx.cache().globalLoadCache(p, args); } http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/a88b637b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/datastructures/GridAtomicCacheQueueImpl.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/datastructures/GridAtomicCacheQueueImpl.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/datastructures/GridAtomicCacheQueueImpl.java index 9511c86..d5ae97b 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/datastructures/GridAtomicCacheQueueImpl.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/datastructures/GridAtomicCacheQueueImpl.java @@ -63,7 +63,7 @@ public class GridAtomicCacheQueueImpl<T> extends GridCacheQueueAdapter<T> { break; } - catch (CachePartialUpdateException e) { + catch (CachePartialUpdateCheckedException e) { if (cnt++ == MAX_UPDATE_RETRIES) throw e; else { http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/a88b637b/modules/core/src/main/java/org/apache/ignite/internal/processors/dataload/GridDataLoaderFuture.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/dataload/GridDataLoaderFuture.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/dataload/GridDataLoaderFuture.java index 2912db7..a39c2e9 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/dataload/GridDataLoaderFuture.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/dataload/GridDataLoaderFuture.java @@ -34,7 +34,7 @@ class GridDataLoaderFuture extends GridFutureAdapter<Object> { /** Data loader. */ @GridToStringExclude - private IgniteDataLoader dataLdr; + private IgniteDataLoaderImpl dataLdr; /** * Default constructor for {@link Externalizable} support. @@ -47,7 +47,7 @@ class GridDataLoaderFuture extends GridFutureAdapter<Object> { * @param ctx Context. * @param dataLdr Data loader. */ - GridDataLoaderFuture(GridKernalContext ctx, IgniteDataLoader dataLdr) { + GridDataLoaderFuture(GridKernalContext ctx, IgniteDataLoaderImpl dataLdr) { super(ctx); assert dataLdr != null; @@ -60,7 +60,7 @@ class GridDataLoaderFuture extends GridFutureAdapter<Object> { checkValid(); if (onCancelled()) { - dataLdr.close(true); + dataLdr.closeEx(true); return true; } http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/a88b637b/modules/core/src/main/java/org/apache/ignite/internal/processors/dataload/GridDataLoaderProcessor.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/dataload/GridDataLoaderProcessor.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/dataload/GridDataLoaderProcessor.java index 9e0042a..1e1ecae 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/dataload/GridDataLoaderProcessor.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/dataload/GridDataLoaderProcessor.java @@ -119,12 +119,12 @@ public class GridDataLoaderProcessor<K, V> extends GridProcessorAdapter { U.interrupt(flusher); U.join(flusher, log); - for (IgniteDataLoader<?, ?> ldr : ldrs) { + for (IgniteDataLoaderImpl<?, ?> ldr : ldrs) { if (log.isDebugEnabled()) log.debug("Closing active data loader on grid stop [ldr=" + ldr + ", cancel=" + cancel + ']'); try { - ldr.close(cancel); + ldr.closeEx(cancel); } catch (IgniteInterruptedCheckedException e) { U.warn(log, "Interrupted while waiting for completion of the data loader: " + ldr, e); @@ -143,7 +143,7 @@ public class GridDataLoaderProcessor<K, V> extends GridProcessorAdapter { * @param compact {@code true} if data loader should transfer data in compact format. * @return Data loader. */ - public IgniteDataLoader<K, V> dataLoader(@Nullable String cacheName, boolean compact) { + public IgniteDataLoaderImpl<K, V> dataLoader(@Nullable String cacheName, boolean compact) { if (!busyLock.enterBusy()) throw new IllegalStateException("Failed to create data loader (grid is stopping)."); @@ -152,7 +152,7 @@ public class GridDataLoaderProcessor<K, V> extends GridProcessorAdapter { ldrs.add(ldr); - ldr.future().listenAsync(new CI1<IgniteInternalFuture<?>>() { + ldr.internalFuture().listenAsync(new CI1<IgniteInternalFuture<?>>() { @Override public void apply(IgniteInternalFuture<?> f) { boolean b = ldrs.remove(ldr); http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/a88b637b/modules/core/src/main/java/org/apache/ignite/internal/processors/dataload/IgniteDataLoaderImpl.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/dataload/IgniteDataLoaderImpl.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/dataload/IgniteDataLoaderImpl.java index 20978d3..1c03066 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/dataload/IgniteDataLoaderImpl.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/dataload/IgniteDataLoaderImpl.java @@ -136,6 +136,9 @@ public class IgniteDataLoaderImpl<K, V> implements IgniteDataLoader<K, V>, Delay /** Future to track loading finish. */ private final GridFutureAdapter<?> fut; + /** Public API future to track loading finish. */ + private final IgniteFuture<?> publicFut; + /** Busy lock. */ private final GridSpinBusyLock busyLock = new GridSpinBusyLock(); @@ -240,6 +243,8 @@ public class IgniteDataLoaderImpl<K, V> implements IgniteDataLoader<K, V>, Delay log.debug("Added response listener within topic: " + topic); fut = new GridDataLoaderFuture(ctx, this); + + publicFut = new IgniteFutureImpl<>(fut); } /** @@ -258,7 +263,14 @@ public class IgniteDataLoaderImpl<K, V> implements IgniteDataLoader<K, V>, Delay } /** {@inheritDoc} */ - @Override public IgniteInternalFuture<?> future() { + @Override public IgniteFuture<?> future() { + return publicFut; + } + + /** + * @return Internal future. + */ + public IgniteInternalFuture<?> internalFuture() { return fut; } @@ -280,14 +292,14 @@ public class IgniteDataLoaderImpl<K, V> implements IgniteDataLoader<K, V>, Delay } /** {@inheritDoc} */ - @Override public void isolated(boolean isolated) throws IgniteCheckedException { + @Override public void isolated(boolean isolated) { if (isolated()) return; ClusterNode node = F.first(ctx.grid().forCache(cacheName).nodes()); if (node == null) - throw new IgniteCheckedException("Failed to get node for cache: " + cacheName); + throw new IgniteException("Failed to get node for cache: " + cacheName); GridCacheAttributes a = U.cacheAttributes(node, cacheName); @@ -357,14 +369,14 @@ public class IgniteDataLoaderImpl<K, V> implements IgniteDataLoader<K, V>, Delay } /** {@inheritDoc} */ - @Override public IgniteInternalFuture<?> addData(Map<K, V> entries) throws IllegalStateException { + @Override public IgniteFuture<?> addData(Map<K, V> entries) throws IllegalStateException { A.notNull(entries, "entries"); return addData(entries.entrySet()); } /** {@inheritDoc} */ - @Override public IgniteInternalFuture<?> addData(Collection<? extends Map.Entry<K, V>> entries) { + @Override public IgniteFuture<?> addData(Collection<? extends Map.Entry<K, V>> entries) { A.notEmpty(entries, "entries"); enterBusy(); @@ -387,10 +399,10 @@ public class IgniteDataLoaderImpl<K, V> implements IgniteDataLoader<K, V>, Delay load0(entries, resFut, keys, 0); - return resFut; + return new IgniteFutureImpl<>(resFut); } catch (IgniteException e) { - return new GridFinishedFuture<>(ctx, e); + return new IgniteFinishedFutureImpl<>(ctx, e); } finally { leaveBusy(); @@ -398,21 +410,21 @@ public class IgniteDataLoaderImpl<K, V> implements IgniteDataLoader<K, V>, Delay } /** {@inheritDoc} */ - @Override public IgniteInternalFuture<?> addData(Map.Entry<K, V> entry) throws IgniteCheckedException, IllegalStateException { + @Override public IgniteFuture<?> addData(Map.Entry<K, V> entry) { A.notNull(entry, "entry"); return addData(F.asList(entry)); } /** {@inheritDoc} */ - @Override public IgniteInternalFuture<?> addData(K key, V val) throws IgniteCheckedException, IllegalStateException { + @Override public IgniteFuture<?> addData(K key, V val) { A.notNull(key, "key"); return addData(new Entry0<>(key, val)); } /** {@inheritDoc} */ - @Override public IgniteInternalFuture<?> removeData(K key) throws IgniteCheckedException, IllegalStateException { + @Override public IgniteFuture<?> removeData(K key) { return addData(key, null); } @@ -638,12 +650,15 @@ public class IgniteDataLoaderImpl<K, V> implements IgniteDataLoader<K, V>, Delay /** {@inheritDoc} */ @SuppressWarnings("ForLoopReplaceableByForEach") - @Override public void flush() throws IgniteCheckedException { + @Override public void flush() throws IgniteException { enterBusy(); try { doFlush(); } + catch (IgniteCheckedException e) { + throw U.convertException(e); + } finally { leaveBusy(); } @@ -656,7 +671,7 @@ public class IgniteDataLoaderImpl<K, V> implements IgniteDataLoader<K, V>, Delay * Does not wait for result and does not fail on errors assuming that this method * should be called periodically. */ - @Override public void tryFlush() throws IgniteInterruptedCheckedException { + @Override public void tryFlush() throws IgniteInterruptedException { if (!busyLock.enterBusy()) return; @@ -666,6 +681,9 @@ public class IgniteDataLoaderImpl<K, V> implements IgniteDataLoader<K, V>, Delay lastFlushTime = U.currentTimeMillis(); } + catch (IgniteInterruptedCheckedException e) { + throw U.convertException(e); + } finally { leaveBusy(); } @@ -673,9 +691,22 @@ public class IgniteDataLoaderImpl<K, V> implements IgniteDataLoader<K, V>, Delay /** * @param cancel {@code True} to close with cancellation. + * @throws IgniteException If failed. + */ + @Override public void close(boolean cancel) throws IgniteException { + try { + closeEx(cancel); + } + catch (IgniteCheckedException e) { + throw U.convertException(e); + } + } + + /** + * @param cancel {@code True} to close with cancellation. * @throws IgniteCheckedException If failed. */ - @Override public void close(boolean cancel) throws IgniteCheckedException { + public void closeEx(boolean cancel) throws IgniteCheckedException { if (!closed.compareAndSet(false, true)) return; @@ -719,7 +750,7 @@ public class IgniteDataLoaderImpl<K, V> implements IgniteDataLoader<K, V>, Delay } /** {@inheritDoc} */ - @Override public void close() throws IgniteCheckedException { + @Override public void close() throws IgniteException { close(false); } http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/a88b637b/modules/core/src/main/java/org/apache/ignite/internal/processors/fs/GridGgfsDataManager.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/fs/GridGgfsDataManager.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/fs/GridGgfsDataManager.java index ef8bf97..53e4ca2 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/fs/GridGgfsDataManager.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/fs/GridGgfsDataManager.java @@ -655,7 +655,7 @@ public class GridGgfsDataManager extends GridGgfsManager { } } } - catch (IgniteCheckedException e) { + catch (IgniteException e) { log.error("Failed to clean up file range [fileInfo=" + fileInfo + ", range=" + range + ']', e); } } @@ -1726,10 +1726,10 @@ public class GridGgfsDataManager extends GridGgfsManager { block)); } } - catch (IgniteInterruptedCheckedException ignored) { + catch (IgniteInterruptedException ignored) { // Ignore interruption during shutdown. } - catch (IgniteCheckedException e) { + catch (IgniteException e) { log.error("Failed to remove file contents: " + fileInfo, e); } finally { @@ -1740,14 +1740,14 @@ public class GridGgfsDataManager extends GridGgfsManager { ldr.removeData(new GridGgfsBlockKey(fileId, fileInfo.affinityKey(), fileInfo.evictExclude(), block)); } - catch (IgniteCheckedException e) { + catch (IgniteException e) { log.error("Failed to remove file contents: " + fileInfo, e); } finally { try { ldr.close(isCancelled()); } - catch (IgniteCheckedException e) { + catch (IgniteException e) { log.error("Failed to stop data loader while shutting down ggfs async delete thread.", e); } finally { http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/a88b637b/modules/core/src/main/java/org/apache/ignite/internal/util/IgniteUtils.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/util/IgniteUtils.java b/modules/core/src/main/java/org/apache/ignite/internal/util/IgniteUtils.java index e4c7f6c..da9fba8 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/util/IgniteUtils.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/util/IgniteUtils.java @@ -9180,6 +9180,6 @@ public abstract class IgniteUtils { else if (e.getCause() instanceof IgniteException) return (IgniteException)e.getCause(); - return new IgniteException(e.getMessage(), e.getCause() != null ? e.getCause() : e); + return new IgniteException(e.getMessage(), e); } } http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/a88b637b/modules/core/src/main/java/org/apache/ignite/internal/util/future/IgniteFinishedFutureImpl.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/util/future/IgniteFinishedFutureImpl.java b/modules/core/src/main/java/org/apache/ignite/internal/util/future/IgniteFinishedFutureImpl.java new file mode 100644 index 0000000..3aa9f4d --- /dev/null +++ b/modules/core/src/main/java/org/apache/ignite/internal/util/future/IgniteFinishedFutureImpl.java @@ -0,0 +1,33 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ignite.internal.util.future; + +import org.apache.ignite.internal.*; + +/** + * + */ +public class IgniteFinishedFutureImpl<V> extends IgniteFutureImpl<V> { + /** + * @param ctx Context. + * @param err Error. + */ + public IgniteFinishedFutureImpl(GridKernalContext ctx, Throwable err) { + super(new GridFinishedFuture<V>(ctx, err)); + } +} http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/a88b637b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/IgniteTxAbstractTest.java ---------------------------------------------------------------------- diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/IgniteTxAbstractTest.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/IgniteTxAbstractTest.java index 386060a..ed1bf46 100644 --- a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/IgniteTxAbstractTest.java +++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/IgniteTxAbstractTest.java @@ -236,7 +236,7 @@ abstract class IgniteTxAbstractTest extends GridCommonAbstractTest { if (isTestDebug()) debug("Committed transaction [i=" + i + ", tx=" + tx + ']'); } - catch (IgniteTxOptimisticCheckedException e) { + catch (IgniteTxOptimisticException e) { if (concurrency != OPTIMISTIC || isolation != SERIALIZABLE) { error("Received invalid optimistic failure.", e); http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/a88b637b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/IgniteTxPessimisticOriginatingNodeFailureAbstractSelfTest.java ---------------------------------------------------------------------- diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/IgniteTxPessimisticOriginatingNodeFailureAbstractSelfTest.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/IgniteTxPessimisticOriginatingNodeFailureAbstractSelfTest.java index 6832a07..7bd04bb 100644 --- a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/IgniteTxPessimisticOriginatingNodeFailureAbstractSelfTest.java +++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/IgniteTxPessimisticOriginatingNodeFailureAbstractSelfTest.java @@ -196,7 +196,7 @@ public abstract class IgniteTxPessimisticOriginatingNodeFailureAbstractSelfTest fut.get(3, TimeUnit.SECONDS); } - catch (IgniteFutureTimeoutCheckedException ignored) { + catch (IgniteFutureTimeoutException ignored) { info("Failed to wait for commit future completion [fullFailure=" + fullFailure + ']'); } http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/a88b637b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridCacheGroupLockPartitionedMultiNodeAbstractSelfTest.java ---------------------------------------------------------------------- diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridCacheGroupLockPartitionedMultiNodeAbstractSelfTest.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridCacheGroupLockPartitionedMultiNodeAbstractSelfTest.java index 1e581f5..a696df3 100644 --- a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridCacheGroupLockPartitionedMultiNodeAbstractSelfTest.java +++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridCacheGroupLockPartitionedMultiNodeAbstractSelfTest.java @@ -70,7 +70,7 @@ public abstract class GridCacheGroupLockPartitionedMultiNodeAbstractSelfTest ext fail("Exception should be thrown."); } - catch (IgniteCheckedException ignored) { + catch (IgniteException ignored) { // Expected exception. } finally { http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/a88b637b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/near/GridCacheNearOnlyTopologySelfTest.java ---------------------------------------------------------------------- diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/near/GridCacheNearOnlyTopologySelfTest.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/near/GridCacheNearOnlyTopologySelfTest.java index e43a6a7..f96ae02 100644 --- a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/near/GridCacheNearOnlyTopologySelfTest.java +++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/near/GridCacheNearOnlyTopologySelfTest.java @@ -195,7 +195,7 @@ public class GridCacheNearOnlyTopologySelfTest extends GridCommonAbstractTest { return null; } - }, ClusterTopologyCheckedException.class, null); + }, ClusterTopologyException.class, null); // Test pessimistic transaction. GridTestUtils.assertThrows(log, new Callable<Object>() { http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/a88b637b/modules/core/src/test/java/org/apache/ignite/internal/processors/dataload/GridDataLoaderProcessorSelfTest.java ---------------------------------------------------------------------- diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/dataload/GridDataLoaderProcessorSelfTest.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/dataload/GridDataLoaderProcessorSelfTest.java index c4e6cb5..0d618e5 100644 --- a/modules/core/src/test/java/org/apache/ignite/internal/processors/dataload/GridDataLoaderProcessorSelfTest.java +++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/dataload/GridDataLoaderProcessorSelfTest.java @@ -211,7 +211,7 @@ public class GridDataLoaderProcessorSelfTest extends GridCommonAbstractTest { IgniteInternalFuture<?> f1 = multithreadedAsync(new Callable<Object>() { @Override public Object call() throws Exception { - Collection<IgniteInternalFuture<?>> futs = new ArrayList<>(cnt); + Collection<IgniteFuture<?>> futs = new ArrayList<>(cnt); for (int i = 0; i < cnt; i++) { int idx = idxGen.getAndIncrement(); @@ -221,7 +221,7 @@ public class GridDataLoaderProcessorSelfTest extends GridCommonAbstractTest { l1.countDown(); - for (IgniteInternalFuture<?> fut : futs) + for (IgniteFuture<?> fut : futs) fut.get(); return null; @@ -250,7 +250,7 @@ public class GridDataLoaderProcessorSelfTest extends GridCommonAbstractTest { IgniteInternalFuture<?> f2 = multithreadedAsync(new Callable<Object>() { @Override public Object call() throws Exception { - Collection<IgniteInternalFuture<?>> futs = new ArrayList<>(cnt); + Collection<IgniteFuture<?>> futs = new ArrayList<>(cnt); for (int i = 0; i < cnt; i++) { final int key = idxGen.decrementAndGet(); @@ -260,7 +260,7 @@ public class GridDataLoaderProcessorSelfTest extends GridCommonAbstractTest { l2.countDown(); - for (IgniteInternalFuture<?> fut : futs) + for (IgniteFuture<?> fut : futs) fut.get(); return null; @@ -394,7 +394,7 @@ public class GridDataLoaderProcessorSelfTest extends GridCommonAbstractTest { IgniteInternalFuture<?> fut1 = multithreadedAsync(new Callable<Object>() { @Override public Object call() throws Exception { - Collection<IgniteInternalFuture<?>> futs = new ArrayList<>(); + Collection<IgniteFuture<?>> futs = new ArrayList<>(); while (!done.get()) { int idx = idxGen.getAndIncrement(); @@ -410,7 +410,7 @@ public class GridDataLoaderProcessorSelfTest extends GridCommonAbstractTest { ldr.flush(); - for (IgniteInternalFuture<?> fut : futs) + for (IgniteFuture<?> fut : futs) fut.get(); return null; @@ -538,7 +538,7 @@ public class GridDataLoaderProcessorSelfTest extends GridCommonAbstractTest { assert false; } - catch (IgniteFutureCancelledCheckedException e) { + catch (IgniteFutureCancelledException e) { info("Caught expected exception: " + e); } http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/a88b637b/modules/core/src/test/java/org/apache/ignite/internal/processors/fs/GridGgfsAbstractSelfTest.java ---------------------------------------------------------------------- diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/fs/GridGgfsAbstractSelfTest.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/fs/GridGgfsAbstractSelfTest.java index c5e72b0..3f0f87e 100644 --- a/modules/core/src/test/java/org/apache/ignite/internal/processors/fs/GridGgfsAbstractSelfTest.java +++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/fs/GridGgfsAbstractSelfTest.java @@ -1230,7 +1230,7 @@ public abstract class GridGgfsAbstractSelfTest extends GridGgfsCommonAbstractTes return null; } - }, IgniteCheckedException.class, null); + }, IgniteException.class, null); } /** http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/a88b637b/modules/core/src/test/java/org/apache/ignite/streamer/window/GridStreamerWindowSelfTest.java ---------------------------------------------------------------------- diff --git a/modules/core/src/test/java/org/apache/ignite/streamer/window/GridStreamerWindowSelfTest.java b/modules/core/src/test/java/org/apache/ignite/streamer/window/GridStreamerWindowSelfTest.java index 2aca81c..b2d89c4 100644 --- a/modules/core/src/test/java/org/apache/ignite/streamer/window/GridStreamerWindowSelfTest.java +++ b/modules/core/src/test/java/org/apache/ignite/streamer/window/GridStreamerWindowSelfTest.java @@ -50,7 +50,7 @@ public class GridStreamerWindowSelfTest extends GridCommonAbstractTest { return null; } - }, IgniteCheckedException.class, null); + }, IgniteException.class, null); } /** @@ -65,7 +65,7 @@ public class GridStreamerWindowSelfTest extends GridCommonAbstractTest { return null; } - }, IgniteCheckedException.class, null); + }, IgniteException.class, null); win.setTimeInterval(1); @@ -79,7 +79,7 @@ public class GridStreamerWindowSelfTest extends GridCommonAbstractTest { return null; } - }, IgniteCheckedException.class, null); + }, IgniteException.class, null); } /** @@ -94,7 +94,7 @@ public class GridStreamerWindowSelfTest extends GridCommonAbstractTest { return null; } - }, IgniteCheckedException.class, null); + }, IgniteException.class, null); win.setBatchSize(1); @@ -108,7 +108,7 @@ public class GridStreamerWindowSelfTest extends GridCommonAbstractTest { return null; } - }, IgniteCheckedException.class, null); + }, IgniteException.class, null); } /** @@ -123,7 +123,7 @@ public class GridStreamerWindowSelfTest extends GridCommonAbstractTest { return null; } - }, IgniteCheckedException.class, null); + }, IgniteException.class, null); win.setBatchSize(1); @@ -133,7 +133,7 @@ public class GridStreamerWindowSelfTest extends GridCommonAbstractTest { return null; } - }, IgniteCheckedException.class, null); + }, IgniteException.class, null); win.setBatchTimeInterval(1); win.setBatchSize(-1); @@ -144,7 +144,7 @@ public class GridStreamerWindowSelfTest extends GridCommonAbstractTest { return null; } - }, IgniteCheckedException.class, null); + }, IgniteException.class, null); win.setBatchSize(1); @@ -158,7 +158,7 @@ public class GridStreamerWindowSelfTest extends GridCommonAbstractTest { return null; } - }, IgniteCheckedException.class, null); + }, IgniteException.class, null); } /** @@ -286,7 +286,7 @@ public class GridStreamerWindowSelfTest extends GridCommonAbstractTest { return null; } - }, IgniteCheckedException.class, null); + }, IgniteException.class, null); win.setMaximumSize(60); win.setUnique(true); @@ -335,7 +335,7 @@ public class GridStreamerWindowSelfTest extends GridCommonAbstractTest { return null; } - }, IgniteCheckedException.class, null); + }, IgniteException.class, null); win.setBatchSize(10); win.setMaximumBatches(2); @@ -410,7 +410,7 @@ public class GridStreamerWindowSelfTest extends GridCommonAbstractTest { return null; } - }, IgniteCheckedException.class, null); + }, IgniteException.class, null); win.setMaximumSize(60); win.setTimeInterval(40); @@ -462,7 +462,7 @@ public class GridStreamerWindowSelfTest extends GridCommonAbstractTest { return null; } - }, IgniteCheckedException.class, null); + }, IgniteException.class, null); win.setBatchSize(50); win.setBatchTimeInterval(500);
