This is an automated email from the ASF dual-hosted git repository. tledkov pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/ignite.git
The following commit(s) were added to refs/heads/master by this push: new ec608b0 IGNITE-15267 Check statistics obsolescence on server nodes only (#9318) ec608b0 is described below commit ec608b0b223671ed2517b5c05836ae52e6830b2a Author: Berkof <sasha_bel...@mail.ru> AuthorDate: Mon Oct 4 14:44:26 2021 +0700 IGNITE-15267 Check statistics obsolescence on server nodes only (#9318) --- .../stat/IgniteStatisticsConfigurationManager.java | 142 +++++++++++++-------- .../query/stat/IgniteStatisticsManagerImpl.java | 87 +++++++------ .../processors/query/stat/StatisticsGatherer.java | 13 +- .../query/stat/StatisticsConfigurationTest.java | 49 ++++++- 4 files changed, 196 insertions(+), 95 deletions(-) diff --git a/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/stat/IgniteStatisticsConfigurationManager.java b/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/stat/IgniteStatisticsConfigurationManager.java index 12f9495..c81e146 100644 --- a/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/stat/IgniteStatisticsConfigurationManager.java +++ b/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/stat/IgniteStatisticsConfigurationManager.java @@ -29,7 +29,6 @@ import java.util.Set; import java.util.function.BiConsumer; import java.util.function.Function; import java.util.stream.Collectors; - import org.apache.ignite.IgniteCheckedException; import org.apache.ignite.IgniteLogger; import org.apache.ignite.cluster.ClusterState; @@ -113,34 +112,39 @@ public class IgniteStatisticsConfigurationManager { /** */ private final GridCachePartitionExchangeManager exchange; + /** Is server node flag. */ + private final boolean isServerNode; + /** */ private final DistributedMetastorageLifecycleListener distrMetaStoreLsnr = new DistributedMetastorageLifecycleListener() { @Override public void onReadyForRead(ReadableDistributedMetaStorage metastorage) { distrMetaStorage = (DistributedMetaStorage)metastorage; - distrMetaStorage.listen( - (metaKey) -> metaKey.startsWith(STAT_OBJ_PREFIX), - (k, oldV, newV) -> { - // Skip invoke on start node (see 'ReadableDistributedMetaStorage#listen' the second case) - // The update statistics on start node is handled by 'scanAndCheckLocalStatistic' method - // called on exchange done. - if (!started) - return; - - mgmtPool.submit(() -> { - try { - onChangeStatisticConfiguration( - (StatisticsObjectConfiguration)oldV, - (StatisticsObjectConfiguration)newV - ); - } - catch (Throwable e) { - log.warning("Unexpected exception on change statistic configuration [old=" - + oldV + ", new=" + newV + ']', e); - } - }); - } - ); + if (isServerNode) { + distrMetaStorage.listen( + (metaKey) -> metaKey.startsWith(STAT_OBJ_PREFIX), + (k, oldV, newV) -> { + // Skip invoke on start node (see 'ReadableDistributedMetaStorage#listen' the second case) + // The update statistics on start node is handled by 'scanAndCheckLocalStatistic' method + // called on exchange done. + if (!started) + return; + + mgmtPool.submit(() -> { + try { + onChangeStatisticConfiguration( + (StatisticsObjectConfiguration) oldV, + (StatisticsObjectConfiguration) newV + ); + } + catch (Throwable e) { + log.warning("Unexpected exception on change statistic configuration [old=" + + oldV + ", new=" + newV + ']', e); + } + }); + } + ); + } } }; @@ -218,7 +222,20 @@ public class IgniteStatisticsConfigurationManager { } }; - /** */ + /** + * Constructor. + * + * @param schemaMgr Schema manager. + * @param subscriptionProcessor Grid internal subsctiption processor. + * @param sysViewMgr Grid system view manager. + * @param cluster Grid cluster state processor. + * @param exchange Grid cache partition exchange manager. + * @param repo Ignite statistics repository. + * @param gatherer Statistics gatherer (or {@code null} for client nodes). + * @param mgmtPool Statistics management thread pool (or {@code null} for client nodes). + * @param logSupplier Ignite logger supplier. + * @param isServerNode server node flag. + */ public IgniteStatisticsConfigurationManager( SchemaManager schemaMgr, GridInternalSubscriptionProcessor subscriptionProcessor, @@ -228,7 +245,8 @@ public class IgniteStatisticsConfigurationManager { IgniteStatisticsRepository repo, StatisticsGatherer gatherer, IgniteThreadPoolExecutor mgmtPool, - Function<Class<?>, IgniteLogger> logSupplier + Function<Class<?>, IgniteLogger> logSupplier, + boolean isServerNode ) { this.schemaMgr = schemaMgr; log = logSupplier.apply(IgniteStatisticsConfigurationManager.class); @@ -238,6 +256,7 @@ public class IgniteStatisticsConfigurationManager { this.cluster = cluster; this.subscriptionProcessor = subscriptionProcessor; this.exchange = exchange; + this.isServerNode = isServerNode; this.subscriptionProcessor.registerDistributedMetastorageListener(distrMetaStoreLsnr); @@ -308,15 +327,17 @@ public class IgniteStatisticsConfigurationManager { if (log.isTraceEnabled()) log.trace("Statistics configuration manager starting..."); - exchange.registerExchangeAwareComponent(exchAwareLsnr); + if (isServerNode) { + exchange.registerExchangeAwareComponent(exchAwareLsnr); - schemaMgr.registerDropColumnsListener(dropColsLsnr); - schemaMgr.registerDropTableListener(dropTblLsnr); + schemaMgr.registerDropColumnsListener(dropColsLsnr); + schemaMgr.registerDropTableListener(dropTblLsnr); + } if (log.isDebugEnabled()) log.debug("Statistics configuration manager started."); - if (distrMetaStorage != null) + if (distrMetaStorage != null && isServerNode) scanAndCheckLocalStatistics(exchange.readyAffinityVersion()); } @@ -327,10 +348,12 @@ public class IgniteStatisticsConfigurationManager { if (log.isTraceEnabled()) log.trace("Statistics configuration manager stopping..."); - exchange.unregisterExchangeAwareComponent(exchAwareLsnr); + if (isServerNode) { + exchange.unregisterExchangeAwareComponent(exchAwareLsnr); - schemaMgr.unregisterDropColumnsListener(dropColsLsnr); - schemaMgr.unregisterDropTableListener(dropTblLsnr); + schemaMgr.unregisterDropColumnsListener(dropColsLsnr); + schemaMgr.unregisterDropTableListener(dropTblLsnr); + } if (log.isDebugEnabled()) log.debug("Statistics configuration manager stopped."); @@ -566,9 +589,9 @@ public class IgniteStatisticsConfigurationManager { AffinityTopologyVersion topVer0 = cctx.affinity().affinityReadyFuture(topVer).get(); - final Set<Integer> parts = cctx.affinity().primaryPartitions(cctx.localNodeId(), topVer0); + final Set<Integer> primParts = cctx.affinity().primaryPartitions(cctx.localNodeId(), topVer0); - if (F.isEmpty(parts)) { + if (F.isEmpty(primParts)) { // There is no data on the node for specified cache. // Remove oll data dropColumnsOnLocalStatistics(cfg, cfg.columns().keySet()); @@ -580,27 +603,32 @@ public class IgniteStatisticsConfigurationManager { cctx.affinity().backupPartitions(cctx.localNodeId(), topVer0) ); - partsOwn.addAll(parts); + partsOwn.addAll(primParts); if (log.isDebugEnabled()) - log.debug("Check local statistics [key=" + cfg + ", parts=" + parts + ']'); + log.debug("Check local statistics [key=" + cfg + ", parts=" + primParts + ']'); - Collection<ObjectPartitionStatisticsImpl> partStats = repo.getLocalPartitionsStatistics(cfg.key()); + Collection<ObjectPartitionStatisticsImpl> collectedPartStats = repo.getLocalPartitionsStatistics(cfg.key()); Set<Integer> partsToRmv = new HashSet<>(); - Set<Integer> partsToCollect = new HashSet<>(parts); + Set<Integer> partsToCollect = new HashSet<>(primParts); + + /** All necessary partition already collected so each possibly column can be calculated from it. */ + boolean allColumnsChecked = collectedPartStats.stream().map(ObjectPartitionStatisticsImpl::partId) + .collect(Collectors.toList()).containsAll(partsToCollect); + Map<String, StatisticsColumnConfiguration> colsToCollect = new HashMap<>(); Set<String> colsToRmv = new HashSet<>(); - if (!F.isEmpty(partStats)) { - for (ObjectPartitionStatisticsImpl pstat : partStats) { - if (!partsOwn.contains(pstat.partId())) - partsToRmv.add(pstat.partId()); + if (!F.isEmpty(collectedPartStats)) { + for (ObjectPartitionStatisticsImpl collPStat : collectedPartStats) { + if (!partsOwn.contains(collPStat.partId())) + partsToRmv.add(collPStat.partId()); - boolean partExists = true; + boolean partFullyCollected = true; for (StatisticsColumnConfiguration colCfg : cfg.columnsAll().values()) { - ColumnStatistics colStat = pstat.columnStatistics(colCfg.name()); + ColumnStatistics colStat = collPStat.columnStatistics(colCfg.name()); if (colCfg.tombstone()) { if (colStat != null) @@ -610,15 +638,15 @@ public class IgniteStatisticsConfigurationManager { if (colStat == null || colStat.version() < colCfg.version()) { colsToCollect.put(colCfg.name(), colCfg); - partsToCollect.add(pstat.partId()); + partsToCollect.add(collPStat.partId()); - partExists = false; + partFullyCollected = false; } } } - if (partExists) - partsToCollect.remove(pstat.partId()); + if (partFullyCollected) + partsToCollect.remove(collPStat.partId()); } } @@ -638,15 +666,23 @@ public class IgniteStatisticsConfigurationManager { if (!F.isEmpty(colsToRmv)) dropColumnsOnLocalStatistics(cfg, colsToRmv); - if (!F.isEmpty(partsToCollect)) - gatherLocalStatistics(cfg, tbl, parts, partsToCollect, colsToCollect); + // No columns to collect statistics by. + if (cfg.columns().isEmpty()) + return Collections.emptySet(); + + if (!F.isEmpty(partsToCollect)) { + if (!allColumnsChecked) + colsToCollect = cfg.columns(); + + gatherLocalStatistics(cfg, tbl, primParts, partsToCollect, colsToCollect); + } else { // All local statistics by partition are available. // Only refresh aggregated local statistics. - gatherer.aggregateStatisticsAsync(cfg.key(), () -> aggregateLocalGathering(cfg.key(), parts)); + gatherer.aggregateStatisticsAsync(cfg.key(), () -> aggregateLocalGathering(cfg.key(), primParts)); } - return parts; + return primParts; } catch (Throwable ex) { log.error("Unexpected error on check local statistics", ex); diff --git a/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/stat/IgniteStatisticsManagerImpl.java b/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/stat/IgniteStatisticsManagerImpl.java index 2398263..3ff3fa6 100644 --- a/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/stat/IgniteStatisticsManagerImpl.java +++ b/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/stat/IgniteStatisticsManagerImpl.java @@ -101,6 +101,8 @@ public class IgniteStatisticsManagerImpl implements IgniteStatisticsManager { this.ctx = ctx; this.schemaMgr = schemaMgr; + boolean serverNode = !(ctx.config().isClientMode() || ctx.isDaemon()); + helper = new IgniteStatisticsHelper(ctx.localNodeId(), schemaMgr, ctx::log); log = ctx.log(IgniteStatisticsManagerImpl.class); @@ -108,30 +110,33 @@ public class IgniteStatisticsManagerImpl implements IgniteStatisticsManager { IgniteCacheDatabaseSharedManager db = (GridCacheUtils.isPersistenceEnabled(ctx.config())) ? ctx.cache().context().database() : null; - gatherPool = new IgniteThreadPoolExecutor("stat-gather", - ctx.igniteInstanceName(), - 0, - STATS_POOL_SIZE, - IgniteConfiguration.DFLT_THREAD_KEEP_ALIVE_TIME, - new LinkedBlockingQueue<>(), - GridIoPolicy.UNDEFINED, - ctx.uncaughtExceptionHandler() - ); - - mgmtPool = new IgniteThreadPoolExecutor("stat-mgmt", - ctx.igniteInstanceName(), - 0, - 1, - IgniteConfiguration.DFLT_THREAD_KEEP_ALIVE_TIME, - new LinkedBlockingQueue<>(), - GridIoPolicy.UNDEFINED, - ctx.uncaughtExceptionHandler() - ); - - boolean storeData = !(ctx.config().isClientMode() || ctx.isDaemon()); + if (serverNode) { + gatherPool = new IgniteThreadPoolExecutor("stat-gather", + ctx.igniteInstanceName(), + 0, + STATS_POOL_SIZE, + IgniteConfiguration.DFLT_THREAD_KEEP_ALIVE_TIME, + new LinkedBlockingQueue<>(), + GridIoPolicy.UNDEFINED, + ctx.uncaughtExceptionHandler() + ); + mgmtPool = new IgniteThreadPoolExecutor("stat-mgmt", + ctx.igniteInstanceName(), + 0, + 1, + IgniteConfiguration.DFLT_THREAD_KEEP_ALIVE_TIME, + new LinkedBlockingQueue<>(), + GridIoPolicy.UNDEFINED, + ctx.uncaughtExceptionHandler() + ); + } + else { + gatherPool = null; + mgmtPool = null; + } IgniteStatisticsStore store; - if (!storeData) + if (!serverNode) store = new IgniteStatisticsDummyStoreImpl(ctx::log); else if (db == null) store = new IgniteStatisticsInMemoryStoreImpl(ctx::log); @@ -140,11 +145,11 @@ public class IgniteStatisticsManagerImpl implements IgniteStatisticsManager { statsRepos = new IgniteStatisticsRepository(store, ctx.systemView(), helper, ctx::log); - gatherer = new StatisticsGatherer( + gatherer = serverNode ? new StatisticsGatherer( statsRepos, gatherPool, ctx::log - ); + ) : null; statCfgMgr = new IgniteStatisticsConfigurationManager( schemaMgr, @@ -155,7 +160,8 @@ public class IgniteStatisticsManagerImpl implements IgniteStatisticsManager { statsRepos, gatherer, mgmtPool, - ctx::log + ctx::log, + serverNode ); ctx.internalSubscriptionProcessor().registerDistributedConfigurationListener(dispatcher -> { @@ -186,20 +192,23 @@ public class IgniteStatisticsManagerImpl implements IgniteStatisticsManager { if (currState == ON || currState == NO_UPDATE) enableOperations(); - ctx.timeout().schedule(() -> { - StatisticsUsageState state = usageState(); - if (state == ON && !ctx.isStopping()) { - if (log.isTraceEnabled()) - log.trace("Processing statistics obsolescence..."); + if (serverNode) { + ctx.timeout().schedule(() -> { + StatisticsUsageState state = usageState(); - try { - processObsolescence(); - } catch (Throwable e) { - log.warning("Error while processing statistics obsolescence", e); + if (state == ON && !ctx.isStopping()) { + if (log.isTraceEnabled()) + log.trace("Processing statistics obsolescence..."); + + try { + processObsolescence(); + } catch (Throwable e) { + log.warning("Error while processing statistics obsolescence", e); + } } - } - }, OBSOLESCENCE_INTERVAL * 1000, OBSOLESCENCE_INTERVAL * 1000); + }, OBSOLESCENCE_INTERVAL * 1000, OBSOLESCENCE_INTERVAL * 1000); + } } /** @@ -207,7 +216,8 @@ public class IgniteStatisticsManagerImpl implements IgniteStatisticsManager { */ private synchronized void enableOperations() { statsRepos.start(); - gatherer.start(); + if (gatherer != null) + gatherer.start(); statCfgMgr.start(); } @@ -216,7 +226,8 @@ public class IgniteStatisticsManagerImpl implements IgniteStatisticsManager { */ private synchronized void disableOperations() { statCfgMgr.stop(); - gatherer.stop(); + if (gatherer != null) + gatherer.stop(); statsRepos.stop(); } diff --git a/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/stat/StatisticsGatherer.java b/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/stat/StatisticsGatherer.java index 4c333fc..488b71f 100644 --- a/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/stat/StatisticsGatherer.java +++ b/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/stat/StatisticsGatherer.java @@ -272,11 +272,18 @@ public class StatisticsGatherer { if (log.isTraceEnabled()) log.trace(String.format("Statistics gathering stopping %d task...", gatheringInProgress.size())); - gatheringInProgress.values().forEach(ctx -> ctx.futureGather().cancel(true)); - - gatheringInProgress.clear(); + cancelAllTasks(); if (log.isDebugEnabled()) log.debug("Statistics gathering stopped."); } + + /** + * Cancel all currently running statistics gathering tasks. + */ + public void cancelAllTasks() { + gatheringInProgress.values().forEach(ctx -> ctx.futureGather().cancel(true)); + + gatheringInProgress.clear(); + } } diff --git a/modules/indexing/src/test/java/org/apache/ignite/internal/processors/query/stat/StatisticsConfigurationTest.java b/modules/indexing/src/test/java/org/apache/ignite/internal/processors/query/stat/StatisticsConfigurationTest.java index 746fbdd..be28aab 100644 --- a/modules/indexing/src/test/java/org/apache/ignite/internal/processors/query/stat/StatisticsConfigurationTest.java +++ b/modules/indexing/src/test/java/org/apache/ignite/internal/processors/query/stat/StatisticsConfigurationTest.java @@ -34,11 +34,14 @@ import org.apache.ignite.internal.IgniteEx; import org.apache.ignite.internal.IgniteInterruptedCheckedException; import org.apache.ignite.internal.processors.cache.persistence.IgniteCacheDatabaseSharedManager; import org.apache.ignite.internal.processors.query.h2.IgniteH2Indexing; +import org.apache.ignite.internal.processors.query.stat.config.StatisticsObjectConfiguration; import org.apache.ignite.internal.processors.query.stat.messages.StatisticsObjectData; import org.apache.ignite.internal.util.typedef.F; import org.apache.ignite.internal.util.typedef.G; import org.apache.ignite.internal.util.typedef.internal.U; import org.apache.ignite.testframework.GridTestUtils; +import org.apache.ignite.testframework.ListeningTestLogger; +import org.apache.ignite.testframework.LogListener; import org.jetbrains.annotations.NotNull; import org.junit.Test; import org.junit.runner.RunWith; @@ -57,6 +60,13 @@ public class StatisticsConfigurationTest extends StatisticsAbstractTest { /** Columns to check.*/ private static final String[] COLUMNS = {"A", "B", "C"}; + /** Listener which catches client-side statistics store warning. */ + private LogListener obsolescenceLsnr = LogListener + .matches("Unable to save statistics obsolescence info on non server node.").build(); + + /** Logger which tries to catch client-side statistics store warning. */ + private final ListeningTestLogger obsolescenceAwareLog = new ListeningTestLogger(log(), obsolescenceLsnr); + /** Lazy mode. */ @Parameterized.Parameter(value = 0) public boolean persist; @@ -110,7 +120,7 @@ public class StatisticsConfigurationTest extends StatisticsAbstractTest { new DataRegionConfiguration() .setPersistenceEnabled(persist) ) - ); + ).setGridLogger(obsolescenceAwareLog); } /** {@inheritDoc} */ @@ -207,6 +217,43 @@ public class StatisticsConfigurationTest extends StatisticsAbstractTest { } /** + * Start client node and check no store related errors in log. + * + * @throws Exception In case of errors. + */ + @Test + public void checkClientNode() throws Exception { + startGridAndChangeBaseline(0); + + createSmallTable(null); + + IgniteEx client = startClientGrid("cli"); + + collectStatistics(SMALL_TARGET); + + sql("delete from small"); + + for (int i = 0; i < 1000; i++) + sql(String.format("INSERT INTO small(a, b, c) VALUES(%d, %d, %d)", i, i, i % 10)); + + StatisticsObjectConfiguration smallCfg = statisticsMgr(0).statisticConfiguration().config(SMALL_KEY); + + statisticsMgr(client).refreshStatistics(SMALL_TARGET); + + Thread.sleep(100); + + StatisticsObjectConfiguration smallCfg2 = statisticsMgr(0).statisticConfiguration().config(SMALL_KEY); + + assertNotSame(smallCfg.columns().get("A").version(), smallCfg2.columns().get("A").version()); + + client.cluster().state(ClusterState.INACTIVE); + + client.cluster().state(ClusterState.ACTIVE); + + assertFalse(obsolescenceLsnr.check(TIMEOUT)); + } + + /** * Check statistics on cluster after change topology. * 1. Create statistic for a table; * 2. Check statistics on all nodes of the cluster;