ignite-6924: Fixed missed CacheStoreSessionListener#onSessionStart() call
Project: http://git-wip-us.apache.org/repos/asf/ignite/repo Commit: http://git-wip-us.apache.org/repos/asf/ignite/commit/e3099cc4 Tree: http://git-wip-us.apache.org/repos/asf/ignite/tree/e3099cc4 Diff: http://git-wip-us.apache.org/repos/asf/ignite/diff/e3099cc4 Branch: refs/heads/ignite-zk Commit: e3099cc47e4086605312d88aeda3ca85e1e6aeff Parents: 1ebeee0 Author: Slava Koptilin <slava.kopti...@gmail.com> Authored: Fri Nov 17 15:05:31 2017 +0300 Committer: sboikov <sboi...@gridgain.com> Committed: Fri Nov 17 15:05:31 2017 +0300 ---------------------------------------------------------------------- .../cache/store/CacheStoreManager.java | 12 +- .../store/GridCacheStoreManagerAdapter.java | 7 ++ .../cache/store/GridCacheWriteBehindStore.java | 10 +- ...reSessionListenerWriteBehindEnabledTest.java | 117 +++++++++++++++++-- 4 files changed, 135 insertions(+), 11 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/ignite/blob/e3099cc4/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/store/CacheStoreManager.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/store/CacheStoreManager.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/store/CacheStoreManager.java index 83428b3..e22cb05 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/store/CacheStoreManager.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/store/CacheStoreManager.java @@ -175,13 +175,23 @@ public interface CacheStoreManager extends GridCacheManager { public void sessionEnd(IgniteInternalTx tx, boolean commit, boolean last, boolean storeSessionEnded) throws IgniteCheckedException; /** - * End session initiated by write-behind store. + * Start session initiated by write-behind store. * * @throws IgniteCheckedException If failed. */ public void writeBehindSessionInit() throws IgniteCheckedException; /** + * Notifies cache store session listeners. + * + * This method is called by write-behind store in case of back-pressure mechanism is initiated. + * It is assumed that cache store session was started by CacheStoreManager before. + * + * @throws IgniteCheckedException If failed. + */ + public void writeBehindCacheStoreSessionListenerStart() throws IgniteCheckedException; + + /** * End session initiated by write-behind store. * * @param threwEx If exception was thrown. http://git-wip-us.apache.org/repos/asf/ignite/blob/e3099cc4/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/store/GridCacheStoreManagerAdapter.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/store/GridCacheStoreManagerAdapter.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/store/GridCacheStoreManagerAdapter.java index 22c2381..e862c0a 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/store/GridCacheStoreManagerAdapter.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/store/GridCacheStoreManagerAdapter.java @@ -829,6 +829,13 @@ public abstract class GridCacheStoreManagerAdapter extends GridCacheManagerAdapt } /** {@inheritDoc} */ + @Override public void writeBehindCacheStoreSessionListenerStart() throws IgniteCheckedException { + assert sesHolder.get() != null; + + notifyCacheStoreSessionListeners(sesHolder.get(), null, true); + } + + /** {@inheritDoc} */ @Override public void writeBehindSessionEnd(boolean threwEx) throws IgniteCheckedException { sessionEnd0(null, threwEx); } http://git-wip-us.apache.org/repos/asf/ignite/blob/e3099cc4/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/store/GridCacheWriteBehindStore.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/store/GridCacheWriteBehindStore.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/store/GridCacheWriteBehindStore.java index 831b1b0..d7a13e6 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/store/GridCacheWriteBehindStore.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/store/GridCacheWriteBehindStore.java @@ -798,8 +798,14 @@ public class GridCacheWriteBehindStore<K, V> implements CacheStore<K, V>, Lifecy Flusher flusher ) { try { - if (initSes && storeMgr != null) - storeMgr.writeBehindSessionInit(); + if (storeMgr != null) { + if (initSes) + storeMgr.writeBehindSessionInit(); + else + // Back-pressure mechanism is running. + // Cache store session must be initialized by storeMgr. + storeMgr.writeBehindCacheStoreSessionListenerStart(); + } boolean threwEx = true; http://git-wip-us.apache.org/repos/asf/ignite/blob/e3099cc4/modules/core/src/test/java/org/apache/ignite/cache/store/CacheStoreSessionListenerWriteBehindEnabledTest.java ---------------------------------------------------------------------- diff --git a/modules/core/src/test/java/org/apache/ignite/cache/store/CacheStoreSessionListenerWriteBehindEnabledTest.java b/modules/core/src/test/java/org/apache/ignite/cache/store/CacheStoreSessionListenerWriteBehindEnabledTest.java index b9095d0..c9a912a 100644 --- a/modules/core/src/test/java/org/apache/ignite/cache/store/CacheStoreSessionListenerWriteBehindEnabledTest.java +++ b/modules/core/src/test/java/org/apache/ignite/cache/store/CacheStoreSessionListenerWriteBehindEnabledTest.java @@ -26,6 +26,7 @@ import java.util.ArrayList; import java.util.Collection; import java.util.Collections; import java.util.List; +import java.util.concurrent.CyclicBarrier; import java.util.concurrent.atomic.AtomicInteger; import java.util.logging.Logger; import javax.cache.Cache; @@ -34,14 +35,15 @@ import javax.cache.configuration.FactoryBuilder; import javax.cache.integration.CacheLoaderException; import javax.cache.integration.CacheWriterException; import javax.sql.DataSource; -import org.apache.ignite.Ignite; import org.apache.ignite.IgniteCache; import org.apache.ignite.IgniteException; import org.apache.ignite.cache.store.jdbc.CacheJdbcStoreSessionListener; import org.apache.ignite.configuration.CacheConfiguration; +import org.apache.ignite.internal.IgniteInternalFuture; import org.apache.ignite.internal.processors.cache.GridCacheAbstractSelfTest; import org.apache.ignite.internal.processors.cache.store.GridCacheWriteBehindStore; -import org.apache.ignite.resources.IgniteInstanceResource; +import org.apache.ignite.resources.CacheStoreSessionResource; +import org.apache.ignite.testframework.GridTestUtils; /** * This class tests that calls of {@link CacheStoreSessionListener#onSessionStart(CacheStoreSession)} @@ -61,6 +63,9 @@ public class CacheStoreSessionListenerWriteBehindEnabledTest extends GridCacheAb /** */ private static final AtomicInteger entryCnt = new AtomicInteger(); + /** */ + private static final AtomicInteger uninitializedListenerCnt = new AtomicInteger(); + /** {@inheritDoc} */ @Override protected int gridCount() { return 1; @@ -93,6 +98,8 @@ public class CacheStoreSessionListenerWriteBehindEnabledTest extends GridCacheAb operations.clear(); entryCnt.set(0); + + uninitializedListenerCnt.set(0); } /** @@ -136,6 +143,83 @@ public class CacheStoreSessionListenerWriteBehindEnabledTest extends GridCacheAb } /** + * Tests that cache store session listeners are notified by write-behind store. + */ + public void testFlushSingleValue() throws Exception { + CacheConfiguration cfg = cacheConfiguration(getTestIgniteInstanceName()); + + cfg.setName("back-pressure-control"); + cfg.setWriteBehindBatchSize(2); + cfg.setWriteBehindFlushSize(2); + cfg.setWriteBehindFlushFrequency(1_000); + cfg.setWriteBehindCoalescing(true); + + IgniteCache<Object, Object> cache = grid(0).getOrCreateCache(cfg); + + try { + int nUploaders = 5; + + final CyclicBarrier barrier = new CyclicBarrier(nUploaders); + + IgniteInternalFuture[] uploaders = new IgniteInternalFuture[nUploaders]; + + for (int i = 0; i < nUploaders; ++i) { + uploaders[i] = GridTestUtils.runAsync( + new Uploader(cache, barrier, i * CNT), + "uploader-" + i); + } + + for (int i = 0; i < nUploaders; ++i) + uploaders[i].get(); + + assertEquals("Uninitialized cache store session listener.", 0, uninitializedListenerCnt.get()); + } + finally { + cache.destroy(); + } + } + + /** + * + */ + public static class Uploader implements Runnable { + /** */ + private final int start; + + /** */ + private final CyclicBarrier barrier; + + /** */ + private final IgniteCache<Object, Object> cache; + + /** + * @param cache Ignite cache. + * @param barrier Cyclic barrier. + * @param start Key index. + */ + public Uploader(IgniteCache<Object, Object> cache, CyclicBarrier barrier, int start) { + this.cache = cache; + + this.barrier = barrier; + + this.start = start; + } + + /** {@inheritDoc} */ + @Override public void run() { + try { + barrier.await(); + + for (int i = start; i < start + CNT; ++i) + cache.put(i, i); + } + catch (Exception e) { + fail("Unexpected exception [" + e + "]"); + } + } + } + + /** * @param startedSessions Number of expected sessions. */ private void checkSessionCounters(int startedSessions) { @@ -145,6 +229,8 @@ public class CacheStoreSessionListenerWriteBehindEnabledTest extends GridCacheAb assertEquals(CNT, entryCnt.get()); + assertEquals("Uninitialized cache store session listener.", 0, uninitializedListenerCnt.get()); + checkOpCount(operations, OperationType.SESSION_START, startedSessions); checkOpCount(operations, OperationType.SESSION_END, startedSessions); @@ -201,18 +287,19 @@ public class CacheStoreSessionListenerWriteBehindEnabledTest extends GridCacheAb * Test cache store session listener. */ public static class TestCacheStoreSessionListener extends CacheJdbcStoreSessionListener { - /** */ - @IgniteInstanceResource - private Ignite ignite; - /** {@inheritDoc} */ @Override public void onSessionStart(CacheStoreSession ses) { operations.add(OperationType.SESSION_START); + + if (ses.attachment() == null) + ses.attach(new Object()); } /** {@inheritDoc} */ @Override public void onSessionEnd(CacheStoreSession ses, boolean commit) { operations.add(OperationType.SESSION_END); + + ses.attach(null); } } @@ -224,31 +311,45 @@ public class CacheStoreSessionListenerWriteBehindEnabledTest extends GridCacheAb */ public static class EmptyCacheStore extends CacheStoreAdapter<Object, Object> { /** */ - @IgniteInstanceResource - private Ignite ignite; + @CacheStoreSessionResource + private CacheStoreSession ses; /** {@inheritDoc} */ @Override public Object load(Object key) throws CacheLoaderException { entryCnt.getAndIncrement(); + + if (ses.attachment() == null) + uninitializedListenerCnt.incrementAndGet(); + return null; } /** {@inheritDoc} */ @Override public void writeAll(Collection<Cache.Entry<?, ?>> entries) { entryCnt.addAndGet(entries.size()); + + if (ses.attachment() == null) + uninitializedListenerCnt.incrementAndGet(); } /** {@inheritDoc} */ @Override public void write(Cache.Entry entry) throws CacheWriterException { + if (ses.attachment() == null) + uninitializedListenerCnt.incrementAndGet(); } /** {@inheritDoc} */ @Override public void deleteAll(Collection<?> keys) { entryCnt.addAndGet(keys.size()); + + if (ses.attachment() == null) + uninitializedListenerCnt.incrementAndGet(); } /** {@inheritDoc} */ @Override public void delete(Object key) throws CacheWriterException { + if (ses.attachment() == null) + uninitializedListenerCnt.incrementAndGet(); } }